PySpark - ML Pipeline with Examples
PySpark's Pipeline API standardizes the machine learning workflow by treating data transformations and model training as a sequence of stages. Each stage is either a Transformer (transforms data) or...
Key Insights
- PySpark MLlib provides a high-level Pipeline API that chains multiple transformers and estimators into a reproducible ML workflow, enabling consistent data preprocessing and model training across development and production environments.
- Pipelines solve the common problem of data leakage by ensuring transformations fitted on training data are correctly applied to validation and test sets without refitting, maintaining the integrity of model evaluation.
- The Pipeline abstraction supports hyperparameter tuning through CrossValidator and TrainValidationSplit, automatically applying the entire transformation sequence during grid search to prevent overfitting.
Understanding PySpark ML Pipelines
PySpark’s Pipeline API standardizes the machine learning workflow by treating data transformations and model training as a sequence of stages. Each stage is either a Transformer (transforms data) or an Estimator (learns from data and produces a Transformer).
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import LogisticRegression
# Initialize Spark session
spark = SparkSession.builder \
.appName("MLPipelineExample") \
.getOrCreate()
# Sample dataset
data = spark.createDataFrame([
(0, "male", 22, 1, 0, 7.25, 0),
(1, "female", 38, 1, 0, 71.28, 1),
(1, "female", 26, 0, 0, 7.92, 1),
(1, "female", 35, 1, 0, 53.10, 1),
(0, "male", 35, 0, 0, 8.05, 0),
], ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare", "Survived"])
Building a Classification Pipeline
A typical ML pipeline includes feature encoding, feature assembly, scaling, and model training. Here’s a complete classification example:
# Stage 1: Encode categorical variable
sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
# Stage 2: Assemble features into a single vector
feature_cols = ["Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")
# Stage 3: Scale features
scaler = StandardScaler(inputCol="rawFeatures", outputCol="features",
withStd=True, withMean=True)
# Stage 4: Train model
lr = LogisticRegression(featuresCol="features", labelCol="Survived",
maxIter=10, regParam=0.01)
# Create pipeline
pipeline = Pipeline(stages=[sex_indexer, assembler, scaler, lr])
# Split data
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
# Fit pipeline on training data
model = pipeline.fit(train_data)
# Make predictions
predictions = model.transform(test_data)
predictions.select("Survived", "prediction", "probability").show()
The pipeline ensures that the StringIndexer’s mapping, the StandardScaler’s mean and standard deviation, and the LogisticRegression coefficients are all learned from training data only.
Regression Pipeline with Feature Engineering
For regression tasks, pipelines can include more complex feature engineering:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import Bucketizer, OneHotEncoder
# Create sample housing data
housing_data = spark.createDataFrame([
(1200, 3, 2, 15, 250000),
(1800, 4, 3, 10, 380000),
(900, 2, 1, 20, 180000),
(2200, 5, 3, 5, 520000),
(1500, 3, 2, 12, 310000),
], ["sqft", "bedrooms", "bathrooms", "age", "price"])
# Stage 1: Bucketize age into categories
age_bucketizer = Bucketizer(
splits=[0, 5, 10, 15, float('inf')],
inputCol="age",
outputCol="age_category"
)
# Stage 2: One-hot encode age categories
age_encoder = OneHotEncoder(inputCol="age_category", outputCol="age_vec")
# Stage 3: Assemble all features
feature_assembler = VectorAssembler(
inputCols=["sqft", "bedrooms", "bathrooms", "age_vec"],
outputCol="features"
)
# Stage 4: Random Forest Regressor
rf = RandomForestRegressor(
featuresCol="features",
labelCol="price",
numTrees=20,
maxDepth=5
)
# Build and train pipeline
regression_pipeline = Pipeline(stages=[
age_bucketizer, age_encoder, feature_assembler, rf
])
regression_model = regression_pipeline.fit(housing_data)
regression_predictions = regression_model.transform(housing_data)
regression_predictions.select("price", "prediction").show()
Cross-Validation with Pipelines
Pipelines integrate seamlessly with hyperparameter tuning tools. CrossValidator applies the entire pipeline during each fold:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Define parameter grid
param_grid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.addGrid(lr.maxIter, [10, 20]) \
.build()
# Define evaluator
evaluator = BinaryClassificationEvaluator(
labelCol="Survived",
metricName="areaUnderROC"
)
# Create CrossValidator
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
seed=42
)
# Train with cross-validation
cv_model = cv.fit(train_data)
# Best model
best_model = cv_model.bestModel
print(f"Best regParam: {best_model.stages[-1].getRegParam()}")
print(f"Best elasticNetParam: {best_model.stages[-1].getElasticNetParam()}")
# Evaluate on test data
test_predictions = cv_model.transform(test_data)
auc = evaluator.evaluate(test_predictions)
print(f"Test AUC: {auc:.4f}")
Saving and Loading Pipelines
Pipelines and fitted models can be persisted for production deployment:
# Save the fitted pipeline model
model_path = "/tmp/spark_pipeline_model"
model.save(model_path)
# Load the model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load(model_path)
# Use loaded model for predictions
new_data = spark.createDataFrame([
(1, "female", 29, 0, 0, 25.50),
], ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare"])
new_predictions = loaded_model.transform(new_data)
new_predictions.select("prediction", "probability").show()
Custom Transformers in Pipelines
You can create custom transformers for domain-specific logic:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
class AgeGroupTransformer(Transformer, HasInputCol, HasOutputCol):
def __init__(self, inputCol=None, outputCol=None):
super(AgeGroupTransformer, self).__init__()
if inputCol is not None:
self.setInputCol(inputCol)
if outputCol is not None:
self.setOutputCol(outputCol)
def _transform(self, dataset):
def categorize_age(age):
if age < 18:
return 0.0
elif age < 35:
return 1.0
elif age < 60:
return 2.0
else:
return 3.0
age_udf = udf(categorize_age, DoubleType())
return dataset.withColumn(self.getOutputCol(),
age_udf(dataset[self.getInputCol()]))
# Use custom transformer in pipeline
age_transformer = AgeGroupTransformer(inputCol="Age", outputCol="AgeGroup")
custom_pipeline = Pipeline(stages=[
sex_indexer,
age_transformer,
VectorAssembler(inputCols=["Pclass", "SexIndex", "AgeGroup", "Fare"],
outputCol="features"),
lr
])
custom_model = custom_pipeline.fit(train_data)
Pipeline Best Practices
Handle missing values before pipeline execution or use Imputer as a pipeline stage:
from pyspark.ml.feature import Imputer
# Create imputer for missing values
imputer = Imputer(
inputCols=["Age", "Fare"],
outputCols=["Age_imputed", "Fare_imputed"],
strategy="mean"
)
# Updated pipeline with imputation
robust_pipeline = Pipeline(stages=[
imputer,
sex_indexer,
VectorAssembler(
inputCols=["Pclass", "SexIndex", "Age_imputed",
"SibSp", "Parch", "Fare_imputed"],
outputCol="features"
),
scaler,
lr
])
For production systems, always validate pipeline stages during development and implement monitoring for feature drift. Use TrainValidationSplit for faster iteration during development:
from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
trainRatio=0.8,
seed=42
)
tvs_model = tvs.fit(train_data)
Pipelines eliminate manual tracking of transformation parameters and ensure reproducibility across environments. They’re essential for maintaining ML systems at scale, preventing data leakage, and streamlining the path from experimentation to production deployment.