PySpark MLlib Tutorial - Machine Learning with PySpark
• PySpark MLlib provides distributed machine learning algorithms that scale horizontally across clusters, making it ideal for training models on datasets that don't fit in memory on a single machine.
Key Insights
• PySpark MLlib provides distributed machine learning algorithms that scale horizontally across clusters, making it ideal for training models on datasets that don’t fit in memory on a single machine. • The DataFrame-based API (spark.ml) has replaced the RDD-based API (spark.mllib) as the primary interface, offering better integration with Spark SQL and more intuitive pipeline construction. • Feature engineering in PySpark requires explicit vectorization using VectorAssembler, and all transformations must be serializable to work across distributed executors.
Setting Up PySpark MLlib
PySpark MLlib comes bundled with Apache Spark. Install it using pip and initialize a Spark session optimized for machine learning workloads:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Initialize Spark session with ML-optimized configuration
spark = SparkSession.builder \
.appName("MLlib-Tutorial") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
# Load sample dataset
df = spark.read.csv("data/customer_churn.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)
The spark.sql.shuffle.partitions configuration controls parallelism during aggregations and joins. Adjust based on your cluster size and data volume.
Data Preprocessing and Feature Engineering
MLlib requires features in a single vector column. Use VectorAssembler to combine multiple feature columns:
# Inspect data types and missing values
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# Handle categorical variables
categorical_cols = ["gender", "contract_type", "payment_method"]
indexers = [
StringIndexer(inputCol=col, outputCol=f"{col}_indexed", handleInvalid="keep")
for col in categorical_cols
]
# Define numerical features
numerical_cols = ["tenure", "monthly_charges", "total_charges"]
# Combine all feature columns
feature_cols = [f"{col}_indexed" for col in categorical_cols] + numerical_cols
# Create feature vector
assembler = VectorAssembler(
inputCols=feature_cols,
outputCol="features_raw",
handleInvalid="skip" # Skip rows with invalid values
)
# Scale features for algorithms sensitive to feature magnitude
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withStd=True,
withMean=True
)
# Index target variable
label_indexer = StringIndexer(inputCol="churn", outputCol="label")
The handleInvalid="skip" parameter prevents pipeline failures from null values. For production systems, implement explicit null handling strategies.
Building a Classification Pipeline
Pipelines chain transformers and estimators into a single workflow, ensuring consistent preprocessing across training and inference:
# Create logistic regression model
lr = LogisticRegression(
featuresCol="features",
labelCol="label",
maxIter=100,
regParam=0.01,
elasticNetParam=0.5
)
# Construct pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler, label_indexer, lr])
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Train model
model = pipeline.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
predictions.select("features", "label", "prediction", "probability").show(10, truncate=False)
The pipeline automatically applies all transformations in sequence. Once fitted, the pipeline model can be saved and loaded for deployment:
# Save model
model.write().overwrite().save("models/churn_model")
# Load model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("models/churn_model")
Model Evaluation and Metrics
MLlib provides specialized evaluators for different problem types:
# Binary classification metrics
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")
# Multiclass metrics
multiclass_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction"
)
accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "accuracy"})
f1_score = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "f1"})
precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedPrecision"})
recall = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedRecall"})
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
# Confusion matrix
predictions.groupBy("label", "prediction").count().show()
For detailed metrics, convert predictions to Pandas and use scikit-learn:
from sklearn.metrics import classification_report
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()
print(classification_report(y_true, y_pred))
Hyperparameter Tuning with Cross-Validation
MLlib implements distributed cross-validation for hyperparameter optimization:
# Create Random Forest classifier
rf = RandomForestClassifier(
featuresCol="features",
labelCol="label",
seed=42
)
# Build pipeline with Random Forest
rf_pipeline = Pipeline(stages=indexers + [assembler, scaler, label_indexer, rf])
# Define parameter grid
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.build()
# Configure cross-validator
cv = CrossValidator(
estimator=rf_pipeline,
estimatorParamMaps=param_grid,
evaluator=binary_evaluator,
numFolds=3,
parallelism=4, # Number of models to train in parallel
seed=42
)
# Run cross-validation
cv_model = cv.fit(train_df)
# Get best model
best_model = cv_model.bestModel
# Evaluate on test set
cv_predictions = cv_model.transform(test_df)
cv_auc = binary_evaluator.evaluate(cv_predictions)
print(f"Cross-validated AUC: {cv_auc:.4f}")
# Extract best parameters
best_rf_model = best_model.stages[-1]
print(f"Best numTrees: {best_rf_model.getNumTrees}")
print(f"Best maxDepth: {best_rf_model.getMaxDepth()}")
The parallelism parameter controls how many models train simultaneously. Set it based on available cluster resources.
Feature Importance and Model Interpretation
Tree-based models provide feature importance scores:
# Extract feature importance from Random Forest
feature_importance = best_rf_model.featureImportances.toArray()
# Map to feature names
feature_importance_dict = dict(zip(feature_cols, feature_importance))
sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)
print("\nTop 10 Most Important Features:")
for feature, importance in sorted_features[:10]:
print(f"{feature}: {importance:.4f}")
# Create visualization data
import pandas as pd
importance_df = pd.DataFrame(sorted_features, columns=["Feature", "Importance"])
For linear models, extract coefficients:
# Get logistic regression coefficients
lr_model = model.stages[-1]
coefficients = lr_model.coefficients.toArray()
intercept = lr_model.intercept
coef_dict = dict(zip(feature_cols, coefficients))
sorted_coefs = sorted(coef_dict.items(), key=lambda x: abs(x[1]), reverse=True)
print("\nTop Features by Coefficient Magnitude:")
for feature, coef in sorted_coefs[:10]:
print(f"{feature}: {coef:.4f}")
Handling Imbalanced Datasets
Address class imbalance using stratified sampling or class weights:
# Calculate class distribution
class_counts = train_df.groupBy("label").count().collect()
total = sum([row['count'] for row in class_counts])
class_weights = {row['label']: total / row['count'] for row in class_counts}
# Apply class weights to logistic regression
weighted_lr = LogisticRegression(
featuresCol="features",
labelCol="label",
weightCol="class_weight",
maxIter=100
)
# Add weight column to DataFrame
from pyspark.sql.functions import when
train_weighted = train_df.withColumn(
"class_weight",
when(col("label") == 0, class_weights[0]).otherwise(class_weights[1])
)
# Alternative: Undersample majority class
minority_count = train_df.filter(col("label") == 1).count()
majority_sampled = train_df.filter(col("label") == 0).sample(False, minority_count / train_df.filter(col("label") == 0).count())
minority = train_df.filter(col("label") == 1)
balanced_df = majority_sampled.union(minority)
Production Deployment Considerations
For production deployments, implement batch prediction with partitioning:
# Batch scoring with partitioning
def batch_predict(input_path, output_path, model_path):
model = PipelineModel.load(model_path)
df = spark.read.parquet(input_path)
predictions = model.transform(df)
# Write predictions partitioned by date
predictions.select("customer_id", "prediction", "probability", "date") \
.write \
.partitionBy("date") \
.mode("overwrite") \
.parquet(output_path)
batch_predict("data/customers", "output/predictions", "models/churn_model")
Monitor model performance over time by tracking prediction distributions and comparing against validation metrics. Implement automated retraining pipelines when performance degrades below acceptable thresholds.
PySpark MLlib excels at distributed model training but introduces overhead for small datasets. Use scikit-learn for datasets under 100GB that fit in memory on a single machine. For distributed inference at scale, consider exporting models to PMML or ONNX formats for deployment in lower-latency serving systems.