Spark MLlib - Cross-Validation
Cross-validation in Spark MLlib operates differently than scikit-learn or other single-machine frameworks. Spark distributes both data and model training across cluster nodes, making hyperparameter...
Key Insights
- Cross-validation in Spark MLlib prevents overfitting by systematically splitting data into training and validation sets, with built-in support for k-fold CV through
CrossValidatorandTrainValidationSplit - The
ParamGridBuilderenables exhaustive hyperparameter tuning across multiple model parameters simultaneously, while Spark’s distributed architecture parallelizes model training across the grid - Proper evaluation requires selecting appropriate metrics for your problem type (BinaryClassificationEvaluator for classification, RegressionEvaluator for regression) and understanding the tradeoff between thoroughness and computational cost
Understanding Cross-Validation in Distributed Systems
Cross-validation in Spark MLlib operates differently than scikit-learn or other single-machine frameworks. Spark distributes both data and model training across cluster nodes, making hyperparameter tuning scalable for large datasets. The framework provides two primary approaches: CrossValidator for k-fold cross-validation and TrainValidationSplit for a single train-test split with parameter tuning.
The fundamental difference lies in computational cost. CrossValidator trains k models for each parameter combination, while TrainValidationSplit trains only one model per combination using a fixed train-validation ratio. For a parameter grid with 20 combinations and 3-fold CV, you’ll train 60 models versus 20 models with train-validation split.
Building a Classification Pipeline with Cross-Validation
Here’s a complete example using logistic regression with cross-validation on a binary classification problem:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Initialize Spark session
spark = SparkSession.builder \
.appName("MLlib CrossValidation") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
# Load data
df = spark.read.format("libsvm").load("data/sample_binary_classification.txt")
# Split into train and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
# Create feature engineering pipeline
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="raw_features"
)
scaler = StandardScaler(
inputCol="raw_features",
outputCol="features",
withStd=True,
withMean=True
)
lr = LogisticRegression(
featuresCol="features",
labelCol="label",
maxIter=100
)
# Build pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Create parameter grid
param_grid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.addGrid(lr.maxIter, [50, 100]) \
.build()
# Configure evaluator
evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
# Create CrossValidator
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4,
seed=42
)
# Train with cross-validation
cv_model = cv.fit(train_data)
# Evaluate on test set
predictions = cv_model.transform(test_data)
test_auc = evaluator.evaluate(predictions)
print(f"Test AUC: {test_auc:.4f}")
print(f"Best model parameters: {cv_model.bestModel.stages[-1].extractParamMap()}")
Accessing Cross-Validation Metrics
The CrossValidator stores detailed metrics for each parameter combination, enabling analysis of how different hyperparameters affect performance:
# Extract average metrics for each parameter combination
avg_metrics = cv_model.avgMetrics
# Get parameter combinations
param_maps = cv_model.getEstimatorParamMaps
# Create results dataframe
results = []
for params, metric in zip(param_maps, avg_metrics):
param_dict = {
'regParam': params[lr.regParam],
'elasticNetParam': params[lr.elasticNetParam],
'maxIter': params[lr.maxIter],
'avg_auc': metric
}
results.append(param_dict)
results_df = spark.createDataFrame(results)
results_df.orderBy('avg_auc', ascending=False).show()
# Output:
# +---------+---------------+-------+------------------+
# |regParam|elasticNetParam|maxIter| avg_auc|
# +---------+---------------+-------+------------------+
# | 0.01| 0.0| 100|0.8945123456789012|
# | 0.01| 0.5| 100|0.8923456789012345|
# | 0.1| 0.0| 100|0.8876543210987654|
# +---------+---------------+-------+------------------+
TrainValidationSplit for Faster Tuning
When computational resources are limited or you’re working with extremely large datasets, TrainValidationSplit provides a faster alternative:
from pyspark.ml.tuning import TrainValidationSplit
# Create TrainValidationSplit
tvs = TrainValidationSplit(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
trainRatio=0.8,
parallelism=4,
seed=42
)
# Train model
tvs_model = tvs.fit(train_data)
# Evaluate
tvs_predictions = tvs_model.transform(test_data)
tvs_auc = evaluator.evaluate(tvs_predictions)
print(f"TrainValidationSplit AUC: {tvs_auc:.4f}")
The trainRatio parameter determines the proportion of data used for training versus validation. A ratio of 0.8 means 80% for training and 20% for validation during the hyperparameter search.
Regression with Custom Metrics
Cross-validation works identically for regression problems, requiring only a different evaluator:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
# Create regression pipeline
rf = RandomForestRegressor(
featuresCol="features",
labelCol="target",
seed=42
)
rf_pipeline = Pipeline(stages=[assembler, scaler, rf])
# Parameter grid for random forest
rf_param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.build()
# Regression evaluator
reg_evaluator = RegressionEvaluator(
labelCol="target",
predictionCol="prediction",
metricName="rmse"
)
# Cross-validation
rf_cv = CrossValidator(
estimator=rf_pipeline,
estimatorParamMaps=rf_param_grid,
evaluator=reg_evaluator,
numFolds=3,
parallelism=8
)
rf_model = rf_cv.fit(train_data)
Performance Optimization Strategies
The parallelism parameter controls how many models train simultaneously. Set this based on cluster resources:
# Check available executors
spark.sparkContext.getExecutorMemoryStatus()
# Configure parallelism based on resources
num_executors = len(spark.sparkContext.getExecutorMemoryStatus()) - 1
optimal_parallelism = min(num_executors * 2, len(param_grid))
cv_optimized = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=optimal_parallelism
)
For large parameter grids, consider reducing the search space strategically:
# Coarse-to-fine grid search
coarse_grid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.001, 0.1, 10.0]) \
.addGrid(lr.elasticNetParam, [0.0, 1.0]) \
.build()
# Train with coarse grid first
coarse_cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=coarse_grid,
evaluator=evaluator,
numFolds=3
)
coarse_model = coarse_cv.fit(train_data)
best_coarse_params = coarse_model.bestModel.stages[-1].extractParamMap()
# Refine around best parameters
fine_grid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.05, 0.1, 0.15]) \
.addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5]) \
.build()
Saving and Loading Cross-Validated Models
Persist the best model for production deployment:
# Save the best model
cv_model.bestModel.write().overwrite().save("models/best_lr_model")
# Load for inference
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("models/best_lr_model")
# Make predictions
new_predictions = loaded_model.transform(new_data)
Cross-validation in Spark MLlib balances model performance with computational efficiency. Choose CrossValidator for robust validation on moderate datasets and TrainValidationSplit when speed matters. Always monitor cluster resource utilization and adjust parallelism accordingly to maximize throughput without overwhelming executors.