PySpark - Cross-Validation and Hyperparameter Tuning
• Cross-validation in PySpark uses `CrossValidator` and `TrainValidationSplit` to systematically evaluate model performance across different data splits, preventing overfitting on specific train-test...
Key Insights
• Cross-validation in PySpark uses CrossValidator and TrainValidationSplit to systematically evaluate model performance across different data splits, preventing overfitting on specific train-test partitions.
• ParamGridBuilder enables exhaustive hyperparameter search by creating combinations of parameters, while PySpark’s distributed architecture parallelizes model training across the grid.
• Proper evaluation metrics selection and understanding the trade-offs between CrossValidator (more robust, slower) and TrainValidationSplit (faster, less comprehensive) directly impact model quality and training time.
Understanding Cross-Validation in Distributed Environments
Cross-validation in PySpark operates differently from scikit-learn due to Spark’s distributed nature. While the concept remains the same—splitting data into folds and training multiple models—PySpark executes this across a cluster, making it suitable for large-scale datasets.
PySpark provides two primary cross-validation approaches: CrossValidator for k-fold cross-validation and TrainValidationSplit for a single train-validation split. Both work with Spark ML pipelines and support parallel execution.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Initialize Spark session
spark = SparkSession.builder \
.appName("CrossValidation") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
# Load sample data
data = spark.read.format("libsvm").load("sample_data.txt")
# Split into train and test
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
Building Parameter Grids
ParamGridBuilder constructs a grid of hyperparameters to search. Each combination creates a distinct model configuration that will be trained and evaluated.
# Create feature pipeline
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="raw_features"
)
scaler = StandardScaler(
inputCol="raw_features",
outputCol="features",
withStd=True,
withMean=True
)
# Initialize model
lr = LogisticRegression(
featuresCol="features",
labelCol="label",
maxIter=100
)
# Build parameter grid
param_grid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0, 10.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.addGrid(lr.maxIter, [50, 100, 200]) \
.build()
print(f"Total parameter combinations: {len(param_grid)}")
# Output: 36 combinations (4 * 3 * 3)
This grid creates 36 different model configurations. With 3-fold cross-validation, PySpark will train 108 models (36 combinations × 3 folds).
Implementing CrossValidator
CrossValidator performs k-fold cross-validation, splitting training data into k subsets, training on k-1 folds, and validating on the remaining fold. This process repeats k times.
# Create pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Configure evaluator
evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
# Configure CrossValidator
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
parallelism=4, # Number of models to train in parallel
seed=42
)
# Train with cross-validation
cv_model = cv.fit(train_data)
# Access best model
best_model = cv_model.bestModel
print(f"Best model parameters:")
print(f"RegParam: {best_model.stages[-1].getRegParam()}")
print(f"ElasticNetParam: {best_model.stages[-1].getElasticNetParam()}")
print(f"MaxIter: {best_model.stages[-1].getMaxIter()}")
# Evaluate on test data
predictions = cv_model.transform(test_data)
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.4f}")
The parallelism parameter controls how many models train simultaneously. Setting this too high can overwhelm cluster resources, while too low underutilizes capacity.
Using TrainValidationSplit for Faster Tuning
TrainValidationSplit provides a faster alternative by performing a single train-validation split instead of k-fold cross-validation. Use this when dataset size is large or training time is prohibitive.
from pyspark.ml.tuning import TrainValidationSplit
# Configure TrainValidationSplit
tvs = TrainValidationSplit(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
trainRatio=0.8, # 80% for training, 20% for validation
parallelism=4,
seed=42
)
# Train
tvs_model = tvs.fit(train_data)
# Compare validation metrics
avg_metrics = tvs_model.validationMetrics
for idx, metric in enumerate(avg_metrics):
print(f"Config {idx}: AUC = {metric:.4f}")
# Get best model
best_tvs_model = tvs_model.bestModel
TrainValidationSplit trains each configuration once, making it approximately k times faster than k-fold cross-validation, but with potentially less reliable performance estimates.
Advanced Hyperparameter Tuning Strategies
For complex models like Random Forests or Gradient Boosted Trees, hyperparameter spaces become larger and training time increases significantly.
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(
featuresCol="features",
labelCol="label",
seed=42
)
# More complex parameter grid
rf_param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15, 20]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.addGrid(rf.maxBins, [32, 64]) \
.build()
print(f"RF combinations: {len(rf_param_grid)}")
# Output: 72 combinations
# Use TrainValidationSplit for faster iteration
rf_pipeline = Pipeline(stages=[assembler, scaler, rf])
rf_tvs = TrainValidationSplit(
estimator=rf_pipeline,
estimatorParamMaps=rf_param_grid,
evaluator=evaluator,
trainRatio=0.8,
parallelism=8,
seed=42
)
rf_model = rf_tvs.fit(train_data)
Extracting and Analyzing Results
Understanding which hyperparameters performed best helps refine future searches and provides insights into model behavior.
# Extract all metrics and parameters
import pandas as pd
results = []
for params, metric in zip(cv_model.getEstimatorParamMaps(),
cv_model.avgMetrics):
param_dict = {param.name: value for param, value in params.items()}
param_dict['avg_auc'] = metric
results.append(param_dict)
# Convert to pandas for analysis
results_df = pd.DataFrame(results)
results_df = results_df.sort_values('avg_auc', ascending=False)
print(results_df.head(10))
# Identify parameter impact
print("\nParameter correlation with AUC:")
print(results_df.corr()['avg_auc'].sort_values(ascending=False))
Multi-Metric Evaluation
While cross-validation typically optimizes a single metric, you often need to evaluate multiple metrics to understand model trade-offs.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Train with primary metric (AUC)
cv_model = cv.fit(train_data)
predictions = cv_model.transform(test_data)
# Evaluate multiple metrics
binary_eval = BinaryClassificationEvaluator(labelCol="label")
multi_eval = MulticlassClassificationEvaluator(labelCol="label")
metrics = {
'AUC': binary_eval.evaluate(predictions, {binary_eval.metricName: "areaUnderROC"}),
'AUPR': binary_eval.evaluate(predictions, {binary_eval.metricName: "areaUnderPR"}),
'Accuracy': multi_eval.evaluate(predictions, {multi_eval.metricName: "accuracy"}),
'F1': multi_eval.evaluate(predictions, {multi_eval.metricName: "f1"}),
'Precision': multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedPrecision"}),
'Recall': multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedRecall"})
}
for metric_name, value in metrics.items():
print(f"{metric_name}: {value:.4f}")
Performance Optimization
Cross-validation in PySpark can be resource-intensive. Several strategies improve performance:
# Cache training data to avoid recomputation
train_data.cache()
# Adjust parallelism based on cluster size
# Rule of thumb: parallelism = number of executor cores / 2
cv_optimized = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
parallelism=8,
collectSubModels=False, # Don't keep all sub-models in memory
seed=42
)
# Monitor progress
import time
start_time = time.time()
cv_model = cv_optimized.fit(train_data)
elapsed = time.time() - start_time
print(f"Training completed in {elapsed:.2f} seconds")
print(f"Average time per model: {elapsed/len(param_grid)/3:.2f} seconds")
# Unpersist when done
train_data.unpersist()
Saving and Loading Tuned Models
Persisting cross-validated models enables reuse without retraining.
# Save the entire CrossValidator model
cv_model.write().overwrite().save("models/cv_model")
# Load later
from pyspark.ml.tuning import CrossValidatorModel
loaded_cv_model = CrossValidatorModel.load("models/cv_model")
# Save only the best model
best_pipeline = cv_model.bestModel
best_pipeline.write().overwrite().save("models/best_pipeline")
# Load best model
from pyspark.ml import PipelineModel
loaded_best = PipelineModel.load("models/best_pipeline")
# Make predictions with loaded model
new_predictions = loaded_best.transform(test_data)
Cross-validation and hyperparameter tuning in PySpark requires balancing thoroughness with computational efficiency. Choose CrossValidator for smaller datasets where robust performance estimates matter, and TrainValidationSplit for large-scale scenarios where training time dominates. Always cache training data, set appropriate parallelism levels, and monitor resource utilization to maximize cluster efficiency.