PySpark - Random Forest Classifier with MLlib
PySpark's MLlib provides a distributed implementation of Random Forest that scales across clusters. Start by initializing a SparkSession and importing the necessary components:
Key Insights
- Random Forest in PySpark MLlib handles large-scale classification through distributed computing, making it ideal for datasets that don’t fit in memory on a single machine
- Feature engineering with VectorAssembler and proper data splitting are critical steps before training; PySpark’s lazy evaluation means understanding the execution plan prevents costly mistakes
- Model evaluation requires converting PySpark predictions to formats compatible with metrics calculations, and hyperparameter tuning with CrossValidator can significantly improve accuracy at the cost of training time
Setting Up the Environment
PySpark’s MLlib provides a distributed implementation of Random Forest that scales across clusters. Start by initializing a SparkSession and importing the necessary components:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
spark = SparkSession.builder \
.appName("RandomForestClassifier") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
# Load data
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)
For this example, we’ll use a dataset with both numerical and categorical features. The schema inference automatically detects data types, but verify them before proceeding.
Data Preparation and Feature Engineering
Random Forest in MLlib requires features in a single vector column. Use VectorAssembler to combine multiple feature columns:
# Identify feature columns
feature_cols = ['age', 'income', 'credit_score', 'loan_amount', 'employment_years']
categorical_cols = ['education', 'marital_status', 'employment_type']
# Handle categorical variables with StringIndexer
indexers = [
StringIndexer(inputCol=col, outputCol=f"{col}_indexed", handleInvalid="keep")
for col in categorical_cols
]
# Combine all features into a single vector
indexed_cols = [f"{col}_indexed" for col in categorical_cols]
all_features = feature_cols + indexed_cols
assembler = VectorAssembler(
inputCols=all_features,
outputCol="features",
handleInvalid="skip" # Skip rows with invalid values
)
# Index the label column if it's categorical
label_indexer = StringIndexer(
inputCol="default",
outputCol="label"
)
The handleInvalid="skip" parameter in VectorAssembler removes rows with null or NaN values. For production systems, consider imputation strategies instead.
Building the Random Forest Model
Configure the Random Forest classifier with appropriate hyperparameters. The distributed nature means you can train larger forests than scikit-learn:
rf = RandomForestClassifier(
featuresCol="features",
labelCol="label",
numTrees=100,
maxDepth=10,
maxBins=32,
minInstancesPerNode=1,
minInfoGain=0.0,
subsamplingRate=1.0,
featureSubsetStrategy="auto",
seed=42
)
# Create a pipeline combining all preprocessing and training steps
pipeline = Pipeline(stages=indexers + [label_indexer, assembler, rf])
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training set size: {train_df.count()}")
print(f"Test set size: {test_df.count()}")
Key parameters:
numTrees: More trees generally improve accuracy but increase training timemaxDepth: Deeper trees capture complex patterns but risk overfittingmaxBins: Must be at least the maximum number of categories in any categorical featurefeatureSubsetStrategy: “auto” uses sqrt(numFeatures) for classification
Training and Making Predictions
Execute the pipeline to train the model. PySpark’s lazy evaluation means no computation happens until an action is called:
# Train the model
model = pipeline.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
# Show sample predictions
predictions.select(
"features",
"label",
"prediction",
"probability"
).show(10, truncate=False)
The predictions DataFrame contains:
prediction: The predicted class labelprobability: Vector of probabilities for each classrawPrediction: Raw prediction values before normalization
Model Evaluation
Evaluate the model using multiple metrics to understand performance across different dimensions:
# Accuracy
accuracy_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = accuracy_evaluator.evaluate(predictions)
# F1 Score
f1_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="f1"
)
f1 = f1_evaluator.evaluate(predictions)
# Weighted Precision and Recall
precision_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="weightedPrecision"
)
precision = precision_evaluator.evaluate(predictions)
recall_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="weightedRecall"
)
recall = recall_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
For binary classification, extract the confusion matrix:
from pyspark.sql.functions import col
tp = predictions.filter((col("label") == 1) & (col("prediction") == 1)).count()
tn = predictions.filter((col("label") == 0) & (col("prediction") == 0)).count()
fp = predictions.filter((col("label") == 0) & (col("prediction") == 1)).count()
fn = predictions.filter((col("label") == 1) & (col("prediction") == 0)).count()
print(f"True Positives: {tp}")
print(f"True Negatives: {tn}")
print(f"False Positives: {fp}")
print(f"False Negatives: {fn}")
Feature Importance Analysis
Extract feature importance scores to understand which variables drive predictions:
# Get the Random Forest model from the pipeline
rf_model = model.stages[-1]
# Extract feature importances
feature_importances = rf_model.featureImportances.toArray()
# Create a list of feature names and importances
feature_importance_list = list(zip(all_features, feature_importances))
feature_importance_list.sort(key=lambda x: x[1], reverse=True)
print("\nTop 10 Most Important Features:")
for feature, importance in feature_importance_list[:10]:
print(f"{feature}: {importance:.4f}")
Feature importance uses Gini importance (mean decrease in impurity), which measures how much each feature contributes to reducing uncertainty across all trees.
Hyperparameter Tuning with Cross-Validation
Optimize model performance through systematic hyperparameter search:
# Create a new Random Forest for tuning
rf_tuning = RandomForestClassifier(
featuresCol="features",
labelCol="label",
seed=42
)
# Build parameter grid
param_grid = ParamGridBuilder() \
.addGrid(rf_tuning.numTrees, [50, 100, 150]) \
.addGrid(rf_tuning.maxDepth, [5, 10, 15]) \
.addGrid(rf_tuning.minInstancesPerNode, [1, 5, 10]) \
.build()
# Create cross-validator
cv = CrossValidator(
estimator=Pipeline(stages=indexers + [label_indexer, assembler, rf_tuning]),
estimatorParamMaps=param_grid,
evaluator=MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="f1"
),
numFolds=3,
parallelism=4,
seed=42
)
# Train with cross-validation
cv_model = cv.fit(train_df)
# Get best model
best_model = cv_model.bestModel
# Evaluate on test set
cv_predictions = best_model.transform(test_df)
cv_accuracy = accuracy_evaluator.evaluate(cv_predictions)
print(f"Cross-validated Accuracy: {cv_accuracy:.4f}")
# Extract best parameters
best_rf_model = best_model.stages[-1]
print(f"\nBest Parameters:")
print(f"Number of Trees: {best_rf_model.getNumTrees}")
print(f"Max Depth: {best_rf_model.getMaxDepth()}")
print(f"Min Instances Per Node: {best_rf_model.getMinInstancesPerNode()}")
Saving and Loading Models
Persist trained models for production deployment:
# Save the entire pipeline model
model.write().overwrite().save("models/rf_pipeline_model")
# Save just the Random Forest model
rf_model.write().overwrite().save("models/rf_model_only")
# Load the model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("models/rf_pipeline_model")
# Make predictions with loaded model
new_predictions = loaded_model.transform(test_df)
The saved model includes all preprocessing steps, ensuring consistent feature engineering during inference. This eliminates training-serving skew common in production ML systems.
Performance Optimization
For large datasets, optimize Spark configuration and data handling:
# Cache frequently accessed DataFrames
train_df.cache()
test_df.cache()
# Repartition for better parallelism
train_df = train_df.repartition(200)
# Persist predictions if reusing them
predictions.persist()
# Monitor execution plan
predictions.explain(True)
# Clean up when done
train_df.unpersist()
test_df.unpersist()
predictions.unpersist()
Random Forest in PySpark MLlib scales horizontally, making it suitable for datasets with millions of rows. The key is balancing the number of partitions with cluster resources and understanding when to cache intermediate results.