PySpark - Feature Engineering (VectorAssembler, StringIndexer)

• VectorAssembler consolidates multiple feature columns into a single vector column required by Spark MLlib algorithms, handling numeric types automatically while requiring preprocessing for...

Key Insights

• VectorAssembler consolidates multiple feature columns into a single vector column required by Spark MLlib algorithms, handling numeric types automatically while requiring preprocessing for categorical data • StringIndexer converts categorical string values to numeric indices based on label frequency, enabling ML algorithms to process text-based features while maintaining model interpretability through inverse transformation • Combining these transformers in a Pipeline ensures consistent feature engineering across training and inference, preventing data leakage and simplifying production deployment

Understanding Feature Engineering in PySpark

Feature engineering transforms raw data into formats consumable by machine learning algorithms. PySpark MLlib requires all features consolidated into a single vector column of type org.apache.spark.ml.linalg.Vector. This constraint drives the need for transformers like VectorAssembler and StringIndexer.

Unlike pandas-based workflows where you pass DataFrames directly to scikit-learn, Spark’s distributed architecture demands explicit feature vectorization. This approach optimizes data serialization across cluster nodes and standardizes input formats for all MLlib estimators.

VectorAssembler Fundamentals

VectorAssembler merges multiple columns into a single vector column. It accepts numeric types (IntegerType, DoubleType, FloatType, LongType) and existing vector columns.

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType

spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()

# Create sample dataset
data = [
    (1, 25, 50000.0, 3.5),
    (2, 30, 75000.0, 4.2),
    (3, 22, 45000.0, 3.1),
    (4, 35, 90000.0, 4.8),
    (5, 28, 62000.0, 3.9)
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True),
    StructField("experience", DoubleType(), True)
])

df = spark.createDataFrame(data, schema)

# Assemble features
assembler = VectorAssembler(
    inputCols=["age", "salary", "experience"],
    outputCol="features"
)

assembled_df = assembler.transform(df)
assembled_df.select("id", "features").show(truncate=False)

Output:

+---+-------------------+
|id |features           |
+---+-------------------+
|1  |[25.0,50000.0,3.5] |
|2  |[30.0,75000.0,4.2] |
|3  |[22.0,45000.0,3.1] |
|4  |[35.0,90000.0,4.8] |
|5  |[28.0,62000.0,3.9] |
+---+-------------------+

Handling Missing Values with VectorAssembler

VectorAssembler throws errors on null values by default. Use the handleInvalid parameter to control this behavior.

from pyspark.sql.functions import lit

# Create data with nulls
data_with_nulls = [
    (1, 25, 50000.0, None),
    (2, None, 75000.0, 4.2),
    (3, 22, None, 3.1)
]

df_nulls = spark.createDataFrame(data_with_nulls, schema)

# Option 1: Skip rows with nulls
assembler_skip = VectorAssembler(
    inputCols=["age", "salary", "experience"],
    outputCol="features",
    handleInvalid="skip"
)

# Option 2: Keep nulls (converts to NaN)
assembler_keep = VectorAssembler(
    inputCols=["age", "salary", "experience"],
    outputCol="features",
    handleInvalid="keep"
)

result_keep = assembler_keep.transform(df_nulls)
result_keep.select("id", "features").show(truncate=False)

Output:

+---+--------------------+
|id |features            |
+---+--------------------+
|1  |[25.0,50000.0,NaN]  |
|2  |[NaN,75000.0,4.2]   |
|3  |[22.0,NaN,3.1]      |
+---+--------------------+

StringIndexer for Categorical Encoding

StringIndexer converts string columns to numeric indices. It assigns indices based on label frequency: most frequent label gets index 0.

from pyspark.ml.feature import StringIndexer

# Create dataset with categorical features
categorical_data = [
    (1, "Engineering", "Senior", 90000.0),
    (2, "Marketing", "Junior", 50000.0),
    (3, "Engineering", "Mid", 70000.0),
    (4, "Sales", "Senior", 85000.0),
    (5, "Engineering", "Junior", 55000.0),
    (6, "Marketing", "Mid", 60000.0)
]

cat_df = spark.createDataFrame(
    categorical_data,
    ["id", "department", "level", "salary"]
)

# Index department column
dept_indexer = StringIndexer(
    inputCol="department",
    outputCol="department_index"
)

dept_model = dept_indexer.fit(cat_df)
indexed_df = dept_model.transform(cat_df)

indexed_df.select("department", "department_index").show()

Output:

+-----------+----------------+
| department|department_index|
+-----------+----------------+
|Engineering|             0.0|
|  Marketing|             1.0|
|Engineering|             0.0|
|      Sales|             2.0|
|Engineering|             0.0|
|  Marketing|             1.0|
+-----------+----------------+

Multiple StringIndexers and Label Retrieval

Index multiple categorical columns simultaneously and retrieve original labels using IndexToString.

from pyspark.ml.feature import IndexToString

# Index multiple columns
level_indexer = StringIndexer(
    inputCol="level",
    outputCol="level_index"
)

# Fit both indexers
dept_model = dept_indexer.fit(cat_df)
level_model = level_indexer.fit(cat_df)

# Transform sequentially
indexed_df = dept_model.transform(cat_df)
indexed_df = level_model.transform(indexed_df)

# Retrieve original labels
converter = IndexToString(
    inputCol="department_index",
    outputCol="department_original",
    labels=dept_model.labels
)

converted_df = converter.transform(indexed_df)
converted_df.select("department", "department_index", "department_original").show()

Handling Unseen Labels

StringIndexer encounters unseen labels during prediction. Configure this behavior with handleInvalid.

# Training data
train_data = [
    (1, "Engineering"),
    (2, "Marketing"),
    (3, "Engineering")
]

train_df = spark.createDataFrame(train_data, ["id", "department"])

# Test data with unseen label
test_data = [
    (4, "Engineering"),
    (5, "HR")  # Unseen label
]

test_df = spark.createDataFrame(test_data, ["id", "department"])

# Option 1: Error on unseen labels (default)
indexer_error = StringIndexer(inputCol="department", outputCol="dept_index")

# Option 2: Assign special index to unseen labels
indexer_keep = StringIndexer(
    inputCol="department",
    outputCol="dept_index",
    handleInvalid="keep"
)

model_keep = indexer_keep.fit(train_df)
result = model_keep.transform(test_df)
result.show()

Output:

+---+----------+----------+
| id|department|dept_index|
+---+----------+----------+
|  4|Engineering|       0.0|
|  5|        HR|       2.0|
+---+----------+----------+

Complete Pipeline Example

Combine StringIndexer and VectorAssembler in a Pipeline for end-to-end feature engineering.

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

# Complete dataset
full_data = [
    (1, "Engineering", "Senior", 25, 90000.0, 1),
    (2, "Marketing", "Junior", 22, 50000.0, 0),
    (3, "Engineering", "Mid", 30, 70000.0, 1),
    (4, "Sales", "Senior", 35, 85000.0, 1),
    (5, "Engineering", "Junior", 23, 55000.0, 0),
    (6, "Marketing", "Mid", 28, 60000.0, 0)
]

full_df = spark.createDataFrame(
    full_data,
    ["id", "department", "level", "age", "salary", "promoted"]
)

# Define pipeline stages
dept_indexer = StringIndexer(
    inputCol="department",
    outputCol="dept_index",
    handleInvalid="keep"
)

level_indexer = StringIndexer(
    inputCol="level",
    outputCol="level_index",
    handleInvalid="keep"
)

assembler = VectorAssembler(
    inputCols=["dept_index", "level_index", "age", "salary"],
    outputCol="features"
)

lr = LogisticRegression(
    featuresCol="features",
    labelCol="promoted",
    maxIter=10
)

# Create and fit pipeline
pipeline = Pipeline(stages=[dept_indexer, level_indexer, assembler, lr])

model = pipeline.fit(full_df)
predictions = model.transform(full_df)

predictions.select("id", "features", "promoted", "prediction").show(truncate=False)

Performance Considerations

VectorAssembler and StringIndexer are lightweight transformations but impact performance at scale. StringIndexer requires a full pass over data during fit() to compute label frequencies. For datasets with high cardinality categorical features, consider:

# Cache DataFrame before fitting multiple indexers
df_cached = full_df.cache()

# Fit indexers on cached data
dept_model = dept_indexer.fit(df_cached)
level_model = level_indexer.fit(df_cached)

# Unpersist when done
df_cached.unpersist()

For production pipelines, serialize fitted models to avoid recomputing label mappings:

# Save pipeline model
model.write().overwrite().save("path/to/model")

# Load for inference
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("path/to/model")

VectorAssembler performs no computation during fit(), only metadata validation. All transformation logic executes during transform(), making it efficient for streaming applications where the same assembler configuration applies to micro-batches.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.