PySpark - Feature Engineering (VectorAssembler, StringIndexer)
• VectorAssembler consolidates multiple feature columns into a single vector column required by Spark MLlib algorithms, handling numeric types automatically while requiring preprocessing for...
Key Insights
• VectorAssembler consolidates multiple feature columns into a single vector column required by Spark MLlib algorithms, handling numeric types automatically while requiring preprocessing for categorical data • StringIndexer converts categorical string values to numeric indices based on label frequency, enabling ML algorithms to process text-based features while maintaining model interpretability through inverse transformation • Combining these transformers in a Pipeline ensures consistent feature engineering across training and inference, preventing data leakage and simplifying production deployment
Understanding Feature Engineering in PySpark
Feature engineering transforms raw data into formats consumable by machine learning algorithms. PySpark MLlib requires all features consolidated into a single vector column of type org.apache.spark.ml.linalg.Vector. This constraint drives the need for transformers like VectorAssembler and StringIndexer.
Unlike pandas-based workflows where you pass DataFrames directly to scikit-learn, Spark’s distributed architecture demands explicit feature vectorization. This approach optimizes data serialization across cluster nodes and standardizes input formats for all MLlib estimators.
VectorAssembler Fundamentals
VectorAssembler merges multiple columns into a single vector column. It accepts numeric types (IntegerType, DoubleType, FloatType, LongType) and existing vector columns.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
# Create sample dataset
data = [
(1, 25, 50000.0, 3.5),
(2, 30, 75000.0, 4.2),
(3, 22, 45000.0, 3.1),
(4, 35, 90000.0, 4.8),
(5, 28, 62000.0, 3.9)
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("age", IntegerType(), True),
StructField("salary", DoubleType(), True),
StructField("experience", DoubleType(), True)
])
df = spark.createDataFrame(data, schema)
# Assemble features
assembler = VectorAssembler(
inputCols=["age", "salary", "experience"],
outputCol="features"
)
assembled_df = assembler.transform(df)
assembled_df.select("id", "features").show(truncate=False)
Output:
+---+-------------------+
|id |features |
+---+-------------------+
|1 |[25.0,50000.0,3.5] |
|2 |[30.0,75000.0,4.2] |
|3 |[22.0,45000.0,3.1] |
|4 |[35.0,90000.0,4.8] |
|5 |[28.0,62000.0,3.9] |
+---+-------------------+
Handling Missing Values with VectorAssembler
VectorAssembler throws errors on null values by default. Use the handleInvalid parameter to control this behavior.
from pyspark.sql.functions import lit
# Create data with nulls
data_with_nulls = [
(1, 25, 50000.0, None),
(2, None, 75000.0, 4.2),
(3, 22, None, 3.1)
]
df_nulls = spark.createDataFrame(data_with_nulls, schema)
# Option 1: Skip rows with nulls
assembler_skip = VectorAssembler(
inputCols=["age", "salary", "experience"],
outputCol="features",
handleInvalid="skip"
)
# Option 2: Keep nulls (converts to NaN)
assembler_keep = VectorAssembler(
inputCols=["age", "salary", "experience"],
outputCol="features",
handleInvalid="keep"
)
result_keep = assembler_keep.transform(df_nulls)
result_keep.select("id", "features").show(truncate=False)
Output:
+---+--------------------+
|id |features |
+---+--------------------+
|1 |[25.0,50000.0,NaN] |
|2 |[NaN,75000.0,4.2] |
|3 |[22.0,NaN,3.1] |
+---+--------------------+
StringIndexer for Categorical Encoding
StringIndexer converts string columns to numeric indices. It assigns indices based on label frequency: most frequent label gets index 0.
from pyspark.ml.feature import StringIndexer
# Create dataset with categorical features
categorical_data = [
(1, "Engineering", "Senior", 90000.0),
(2, "Marketing", "Junior", 50000.0),
(3, "Engineering", "Mid", 70000.0),
(4, "Sales", "Senior", 85000.0),
(5, "Engineering", "Junior", 55000.0),
(6, "Marketing", "Mid", 60000.0)
]
cat_df = spark.createDataFrame(
categorical_data,
["id", "department", "level", "salary"]
)
# Index department column
dept_indexer = StringIndexer(
inputCol="department",
outputCol="department_index"
)
dept_model = dept_indexer.fit(cat_df)
indexed_df = dept_model.transform(cat_df)
indexed_df.select("department", "department_index").show()
Output:
+-----------+----------------+
| department|department_index|
+-----------+----------------+
|Engineering| 0.0|
| Marketing| 1.0|
|Engineering| 0.0|
| Sales| 2.0|
|Engineering| 0.0|
| Marketing| 1.0|
+-----------+----------------+
Multiple StringIndexers and Label Retrieval
Index multiple categorical columns simultaneously and retrieve original labels using IndexToString.
from pyspark.ml.feature import IndexToString
# Index multiple columns
level_indexer = StringIndexer(
inputCol="level",
outputCol="level_index"
)
# Fit both indexers
dept_model = dept_indexer.fit(cat_df)
level_model = level_indexer.fit(cat_df)
# Transform sequentially
indexed_df = dept_model.transform(cat_df)
indexed_df = level_model.transform(indexed_df)
# Retrieve original labels
converter = IndexToString(
inputCol="department_index",
outputCol="department_original",
labels=dept_model.labels
)
converted_df = converter.transform(indexed_df)
converted_df.select("department", "department_index", "department_original").show()
Handling Unseen Labels
StringIndexer encounters unseen labels during prediction. Configure this behavior with handleInvalid.
# Training data
train_data = [
(1, "Engineering"),
(2, "Marketing"),
(3, "Engineering")
]
train_df = spark.createDataFrame(train_data, ["id", "department"])
# Test data with unseen label
test_data = [
(4, "Engineering"),
(5, "HR") # Unseen label
]
test_df = spark.createDataFrame(test_data, ["id", "department"])
# Option 1: Error on unseen labels (default)
indexer_error = StringIndexer(inputCol="department", outputCol="dept_index")
# Option 2: Assign special index to unseen labels
indexer_keep = StringIndexer(
inputCol="department",
outputCol="dept_index",
handleInvalid="keep"
)
model_keep = indexer_keep.fit(train_df)
result = model_keep.transform(test_df)
result.show()
Output:
+---+----------+----------+
| id|department|dept_index|
+---+----------+----------+
| 4|Engineering| 0.0|
| 5| HR| 2.0|
+---+----------+----------+
Complete Pipeline Example
Combine StringIndexer and VectorAssembler in a Pipeline for end-to-end feature engineering.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
# Complete dataset
full_data = [
(1, "Engineering", "Senior", 25, 90000.0, 1),
(2, "Marketing", "Junior", 22, 50000.0, 0),
(3, "Engineering", "Mid", 30, 70000.0, 1),
(4, "Sales", "Senior", 35, 85000.0, 1),
(5, "Engineering", "Junior", 23, 55000.0, 0),
(6, "Marketing", "Mid", 28, 60000.0, 0)
]
full_df = spark.createDataFrame(
full_data,
["id", "department", "level", "age", "salary", "promoted"]
)
# Define pipeline stages
dept_indexer = StringIndexer(
inputCol="department",
outputCol="dept_index",
handleInvalid="keep"
)
level_indexer = StringIndexer(
inputCol="level",
outputCol="level_index",
handleInvalid="keep"
)
assembler = VectorAssembler(
inputCols=["dept_index", "level_index", "age", "salary"],
outputCol="features"
)
lr = LogisticRegression(
featuresCol="features",
labelCol="promoted",
maxIter=10
)
# Create and fit pipeline
pipeline = Pipeline(stages=[dept_indexer, level_indexer, assembler, lr])
model = pipeline.fit(full_df)
predictions = model.transform(full_df)
predictions.select("id", "features", "promoted", "prediction").show(truncate=False)
Performance Considerations
VectorAssembler and StringIndexer are lightweight transformations but impact performance at scale. StringIndexer requires a full pass over data during fit() to compute label frequencies. For datasets with high cardinality categorical features, consider:
# Cache DataFrame before fitting multiple indexers
df_cached = full_df.cache()
# Fit indexers on cached data
dept_model = dept_indexer.fit(df_cached)
level_model = level_indexer.fit(df_cached)
# Unpersist when done
df_cached.unpersist()
For production pipelines, serialize fitted models to avoid recomputing label mappings:
# Save pipeline model
model.write().overwrite().save("path/to/model")
# Load for inference
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("path/to/model")
VectorAssembler performs no computation during fit(), only metadata validation. All transformation logic executes during transform(), making it efficient for streaming applications where the same assembler configuration applies to micro-batches.