Spark MLlib - VectorAssembler Tutorial

Spark MLlib algorithms expect features as a single vector column rather than individual columns. VectorAssembler consolidates multiple input columns into one feature vector, acting as a critical...

Key Insights

  • VectorAssembler transforms multiple feature columns into a single vector column required by Spark MLlib algorithms, eliminating manual feature engineering overhead
  • Handles mixed data types (numeric and boolean) seamlessly while providing control over invalid value handling through setHandleInvalid parameter
  • Integration with Pipeline API enables reproducible ML workflows where the same transformation logic applies consistently across training and inference

Understanding VectorAssembler’s Role in MLlib

Spark MLlib algorithms expect features as a single vector column rather than individual columns. VectorAssembler consolidates multiple input columns into one feature vector, acting as a critical preprocessing step in any machine learning pipeline.

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName("VectorAssemblerDemo").getOrCreate()

# Sample dataset
data = [
    (1, 25, 50000, 3, 1),
    (2, 35, 75000, 5, 0),
    (3, 45, 100000, 10, 1),
    (4, 28, 60000, 2, 0)
]

df = spark.createDataFrame(data, ["id", "age", "salary", "experience", "purchased"])
df.show()
+---+---+------+----------+---------+
| id|age|salary|experience|purchased|
+---+---+------+----------+---------+
|  1| 25| 50000|         3|        1|
|  2| 35| 75000|         5|        0|
|  3| 45|100000|        10|        1|
|  4| 28| 60000|         2|        0|
+---+---+------+----------+---------+

Basic VectorAssembler Implementation

The core functionality requires specifying input columns and an output column name. VectorAssembler creates a dense or sparse vector depending on the data characteristics.

assembler = VectorAssembler(
    inputCols=["age", "salary", "experience"],
    outputCol="features"
)

assembled_df = assembler.transform(df)
assembled_df.select("id", "features", "purchased").show(truncate=False)
+---+-------------------+---------+
|id |features           |purchased|
+---+-------------------+---------+
|1  |[25.0,50000.0,3.0] |1        |
|2  |[35.0,75000.0,5.0] |0        |
|3  |[45.0,100000.0,10.0]|1        |
|4  |[28.0,60000.0,2.0] |0        |
+---+-------------------+---------+

The resulting features column contains a DenseVector with values from all input columns in the specified order. This vector can now feed directly into MLlib estimators.

Handling Invalid Values

Real-world datasets contain nulls and NaNs. VectorAssembler provides three strategies via setHandleInvalid: “error” (default), “skip”, and “keep”.

# Dataset with missing values
data_with_nulls = [
    (1, 25, 50000, 3),
    (2, None, 75000, 5),
    (3, 45, None, 10),
    (4, 28, 60000, None)
]

df_nulls = spark.createDataFrame(data_with_nulls, ["id", "age", "salary", "experience"])

# Strategy 1: Skip rows with nulls
assembler_skip = VectorAssembler(
    inputCols=["age", "salary", "experience"],
    outputCol="features",
    handleInvalid="skip"
)

result_skip = assembler_skip.transform(df_nulls)
result_skip.select("id", "features").show()
+---+-------------------+
| id|           features|
+---+-------------------+
|  1|[25.0,50000.0,3.0] |
+---+-------------------+
# Strategy 2: Keep nulls as NaN
assembler_keep = VectorAssembler(
    inputCols=["age", "salary", "experience"],
    outputCol="features",
    handleInvalid="keep"
)

result_keep = assembler_keep.transform(df_nulls)
result_keep.select("id", "features").show(truncate=False)
+---+--------------------+
|id |features            |
+---+--------------------+
|1  |[25.0,50000.0,3.0]  |
|2  |[NaN,75000.0,5.0]   |
|3  |[45.0,NaN,10.0]     |
|4  |[28.0,60000.0,NaN]  |
+---+--------------------+

The “keep” strategy proves useful when downstream algorithms handle NaN values or when you implement custom imputation logic post-assembly.

Integration with ML Pipeline

VectorAssembler shines within Pipeline constructs, ensuring consistent transformations across training and production environments.

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler

# Complete training dataset
train_data = [
    (25, 50000, 3, 0),
    (35, 75000, 5, 1),
    (45, 100000, 10, 1),
    (28, 60000, 2, 0),
    (52, 120000, 15, 1),
    (23, 45000, 1, 0)
]

train_df = spark.createDataFrame(train_data, ["age", "salary", "experience", "label"])

# Build pipeline
assembler = VectorAssembler(
    inputCols=["age", "salary", "experience"],
    outputCol="raw_features"
)

scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="features",
    withStd=True,
    withMean=True
)

lr = LogisticRegression(maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[assembler, scaler, lr])

# Train model
model = pipeline.fit(train_df)

# Test data
test_data = [(30, 65000, 4), (48, 95000, 12)]
test_df = spark.createDataFrame(test_data, ["age", "salary", "experience"])

# Make predictions - same transformations applied automatically
predictions = model.transform(test_df)
predictions.select("age", "salary", "experience", "prediction", "probability").show(truncate=False)
+---+------+----------+----------+----------------------------------------+
|age|salary|experience|prediction|probability                             |
+---+------+----------+----------+----------------------------------------+
|30 |65000 |4         |0.0       |[0.6234567891234568,0.3765432108765432] |
|48 |95000 |12        |1.0       |[0.2891234567890123,0.7108765432109877] |
+---+------+----------+----------+----------------------------------------+

Working with Categorical Features

VectorAssembler accepts numeric and boolean types but requires encoding categorical features first using StringIndexer or OneHotEncoder.

from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Dataset with categorical feature
data_cat = [
    (1, 25, "Engineering", 50000, 1),
    (2, 35, "Sales", 75000, 0),
    (3, 45, "Engineering", 100000, 1),
    (4, 28, "Marketing", 60000, 0)
]

df_cat = spark.createDataFrame(data_cat, ["id", "age", "department", "salary", "label"])

# Index and encode categorical feature
indexer = StringIndexer(inputCol="department", outputCol="dept_index")
encoder = OneHotEncoder(inputCol="dept_index", outputCol="dept_encoded")

# Assemble all features
assembler = VectorAssembler(
    inputCols=["age", "dept_encoded", "salary"],
    outputCol="features"
)

# Create and fit pipeline
pipeline = Pipeline(stages=[indexer, encoder, assembler])
result = pipeline.fit(df_cat).transform(df_cat)

result.select("department", "dept_encoded", "features").show(truncate=False)
+-----------+-------------+---------------------------+
|department |dept_encoded |features                   |
+-----------+-------------+---------------------------+
|Engineering|(2,[0],[1.0])|[25.0,1.0,0.0,50000.0]     |
|Sales      |(2,[1],[1.0])|[35.0,0.0,1.0,75000.0]     |
|Engineering|(2,[0],[1.0])|[45.0,1.0,0.0,100000.0]    |
|Marketing  |(2,[],[])    |[28.0,0.0,0.0,60000.0]     |
+-----------+-------------+---------------------------+

Notice how OneHotEncoder produces sparse vectors that VectorAssembler expands into the final feature vector.

Performance Considerations

VectorAssembler performs minimal computation but impacts memory usage based on vector density. Monitor your feature vector size, especially with high-cardinality categorical features after one-hot encoding.

from pyspark.ml.linalg import Vectors

# Check vector type and size
def analyze_vector(row):
    vector = row.features
    return (
        type(vector).__name__,
        vector.size,
        len(vector.toArray().nonzero()[0])  # Non-zero elements
    )

result.select("features").rdd.map(analyze_vector).collect()

For datasets with many sparse features, VectorAssembler automatically creates SparseVector instances, optimizing memory usage.

# Force sparse vector creation
sparse_data = [(1, 0, 0, 100, 0, 0, 50)]
sparse_df = spark.createDataFrame(sparse_data, ["id", "f1", "f2", "f3", "f4", "f5", "f6"])

assembler = VectorAssembler(
    inputCols=["f1", "f2", "f3", "f4", "f5", "f6"],
    outputCol="features"
)

sparse_result = assembler.transform(sparse_df)
sparse_result.select("features").show(truncate=False)
+----------------------------+
|features                    |
+----------------------------+
|(6,[2,5],[100.0,50.0])      |
+----------------------------+

The output shows SparseVector notation: (size, [indices], [values]), storing only non-zero elements.

Common Pitfalls and Solutions

Pitfall 1: Including the label column in features. Always exclude target variables from input columns.

Pitfall 2: Forgetting to handle nulls in production data when training data was clean. Always set handleInvalid="skip" or implement upstream null handling.

Pitfall 3: Inconsistent column ordering between training and inference. Pipeline serialization prevents this, but manual assembly requires strict column order adherence.

# Save and load pipeline to ensure consistency
model.write().overwrite().save("path/to/model")

from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("path/to/model")

# Same transformations guaranteed
new_predictions = loaded_model.transform(test_df)

VectorAssembler forms the foundation of feature preparation in Spark MLlib. Master its configuration options and integration patterns to build robust, production-ready machine learning pipelines that handle real-world data complexity.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.