Spark MLlib - Pipeline API Tutorial

Spark MLlib organizes machine learning workflows around two core abstractions: Transformers and Estimators. A Transformer takes a DataFrame as input and produces a new DataFrame with additional...

Key Insights

  • Spark MLlib’s Pipeline API standardizes the machine learning workflow by chaining transformers and estimators into reproducible sequences, eliminating ad-hoc preprocessing and model training code.
  • Pipelines enable seamless integration of feature engineering, model training, and cross-validation while maintaining parameter consistency across stages through a unified interface.
  • Production deployment becomes straightforward as trained pipelines serialize the entire transformation and prediction workflow, ensuring training-serving consistency.

Understanding Pipeline Components

Spark MLlib organizes machine learning workflows around two core abstractions: Transformers and Estimators. A Transformer takes a DataFrame as input and produces a new DataFrame with additional columns. An Estimator implements a learning algorithm that fits on data to produce a Transformer.

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("MLPipeline").getOrCreate()

# Sample data
data = spark.createDataFrame([
    (1.0, 2.0, 3.0, 0),
    (2.0, 3.0, 4.0, 1),
    (3.0, 4.0, 5.0, 0),
    (4.0, 5.0, 6.0, 1),
    (5.0, 6.0, 7.0, 1)
], ["feature1", "feature2", "feature3", "label"])

# VectorAssembler is a Transformer
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features"
)

# StandardScaler is an Estimator that produces a Transformer
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withMean=True,
    withStd=True
)

# LogisticRegression is an Estimator
lr = LogisticRegression(
    featuresCol="scaledFeatures",
    labelCol="label",
    maxIter=10
)

The VectorAssembler immediately transforms data without learning. StandardScaler must first analyze the data to compute means and standard deviations. LogisticRegression learns model coefficients from training data.

Building a Complete Pipeline

A Pipeline chains multiple stages sequentially. When you call fit() on a Pipeline, it processes stages in order, fitting Estimators and transforming data through Transformers.

# Create pipeline with three stages
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Split data
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Fit the entire pipeline
pipeline_model = pipeline.fit(train_data)

# Make predictions
predictions = pipeline_model.transform(test_data)
predictions.select("features", "scaledFeatures", "prediction", "probability").show()

When pipeline.fit(train_data) executes:

  1. Assembler transforms train_data, adding the “features” column
  2. Scaler fits on the assembled features, computing statistics
  3. The fitted scaler transforms data, adding “scaledFeatures”
  4. LogisticRegression fits on scaled features

The resulting pipeline_model is a PipelineModel containing all fitted transformers, ready for predictions on new data.

Text Processing Pipeline

Text classification demonstrates Pipeline’s power for complex preprocessing. This example builds a sentiment classifier with multiple feature engineering steps.

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Sample text data
text_data = spark.createDataFrame([
    ("This product is amazing and works great", 1),
    ("Terrible quality, waste of money", 0),
    ("Excellent service, highly recommend", 1),
    ("Poor performance, very disappointed", 0),
    ("Outstanding features and great value", 1),
    ("Not worth the price, bad experience", 0)
], ["text", "label"])

# Stage 1: Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Stage 2: Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# Stage 3: Term frequency
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=100)

# Stage 4: TF-IDF
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")

# Stage 5: Classification
rf = RandomForestClassifier(
    featuresCol="tfidf_features",
    labelCol="label",
    numTrees=20,
    maxDepth=5
)

# Build and train pipeline
text_pipeline = Pipeline(stages=[tokenizer, remover, hashing_tf, idf, rf])
text_model = text_pipeline.fit(text_data)

# Test on new data
test_texts = spark.createDataFrame([
    ("Great product, very satisfied",),
    ("Horrible quality, returning it",)
], ["text"])

results = text_model.transform(test_texts)
results.select("text", "prediction").show(truncate=False)

Each stage builds on the previous output. The tokenizer splits text into words, the remover filters common words, HashingTF converts to term frequencies, IDF weighs terms by document frequency, and RandomForest learns classification patterns.

Pipelines integrate with cross-validation and hyperparameter tuning. Use ParamGridBuilder to specify parameter combinations and CrossValidator to find optimal settings.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Create base pipeline
base_pipeline = Pipeline(stages=[assembler, scaler, lr])

# Define parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 50]) \
    .build()

# Setup cross-validation
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

cv = CrossValidator(
    estimator=base_pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)

# Fit with cross-validation
cv_model = cv.fit(train_data)

# Best model
best_model = cv_model.bestModel
print(f"Best regParam: {best_model.stages[-1].getRegParam()}")
print(f"Best elasticNetParam: {best_model.stages[-1].getElasticNetParam()}")

# Evaluate
test_predictions = cv_model.transform(test_data)
accuracy = evaluator.evaluate(test_predictions)
print(f"Test Accuracy: {accuracy}")

CrossValidator trains 3 × 3 × 2 × 3 = 54 models (parameter combinations × folds). It returns the best-performing pipeline configuration, automatically handling the complexity of distributed hyperparameter search.

Persistence and Deployment

Trained pipelines serialize to disk, preserving all transformations and model parameters. This ensures identical preprocessing in production.

# Save pipeline model
pipeline_model.write().overwrite().save("models/sentiment_pipeline")

# Load in production environment
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("models/sentiment_pipeline")

# Apply to new data
production_data = spark.createDataFrame([
    ("Fantastic product with excellent quality",),
    ("Completely disappointed with this purchase",)
], ["text"])

production_predictions = loaded_model.transform(production_data)
production_predictions.select("text", "prediction", "probability").show(truncate=False)

The loaded model contains the exact StandardScaler statistics, LogisticRegression coefficients, and all intermediate transformations from training. No manual parameter tracking or separate preprocessing code required.

Custom Transformers

Extend Pipeline functionality by implementing custom transformers for domain-specific logic.

from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType

class RatioTransformer(Transformer, HasInputCol, HasOutputCol, 
                       DefaultParamsReadable, DefaultParamsWritable):
    
    def __init__(self, inputCol=None, outputCol=None, numeratorCol=None, denominatorCol=None):
        super().__init__()
        self.numeratorCol = numeratorCol
        self.denominatorCol = denominatorCol
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)
    
    def _transform(self, dataset):
        ratio_udf = udf(lambda num, denom: float(num / denom) if denom != 0 else 0.0, DoubleType())
        return dataset.withColumn(
            self.getOutputCol(),
            ratio_udf(col(self.numeratorCol), col(self.denominatorCol))
        )

# Use custom transformer in pipeline
custom_data = spark.createDataFrame([
    (100, 10, 1),
    (200, 20, 0),
    (150, 15, 1)
], ["revenue", "cost", "label"])

ratio_transformer = RatioTransformer(
    outputCol="profit_margin",
    numeratorCol="revenue",
    denominatorCol="cost"
)

custom_pipeline = Pipeline(stages=[
    ratio_transformer,
    VectorAssembler(inputCols=["profit_margin"], outputCol="features"),
    LogisticRegression(featuresCol="features", labelCol="label")
])

custom_model = custom_pipeline.fit(custom_data)

Custom transformers integrate seamlessly with built-in stages, enabling reusable feature engineering logic across projects while maintaining pipeline serialization capabilities.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.