PySpark - Logistic Regression with MLlib

PySpark MLlib requires a SparkSession as the entry point. For production environments, configure executor memory and cores based on your cluster resources. For development, local mode suffices.

Key Insights

  • PySpark MLlib provides a distributed implementation of logistic regression that scales horizontally across clusters, making it ideal for datasets that exceed single-machine memory constraints
  • The library offers both binary and multinomial classification with built-in support for regularization (L1, L2, ElasticNet), class weighting, and standardization through a unified pipeline API
  • Feature engineering and model evaluation in MLlib require explicit DataFrame transformations using VectorAssembler and evaluators, with careful attention to column naming conventions and data types

Setting Up the Environment

PySpark MLlib requires a SparkSession as the entry point. For production environments, configure executor memory and cores based on your cluster resources. For development, local mode suffices.

from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

spark = SparkSession.builder \
    .appName("LogisticRegressionMLlib") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Set log level to reduce noise
spark.sparkContext.setLogLevel("WARN")

Loading and Preparing Data

MLlib expects features in a single vector column. Use VectorAssembler to combine multiple feature columns into one dense or sparse vector.

# Load data (example with CSV)
data = spark.read.csv("customer_data.csv", header=True, inferSchema=True)

# Inspect schema
data.printSchema()
data.show(5)

# Define feature columns
feature_cols = ['age', 'income', 'credit_score', 'account_balance', 
                'num_transactions', 'avg_transaction_value']

# Assemble features into a vector
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"  # Options: skip, error, keep
)

# Apply transformation
assembled_data = assembler.transform(data)
assembled_data.select("features", "label").show(5, truncate=False)

The handleInvalid parameter determines behavior when encountering null or NaN values. In production, choose between skipping invalid rows, raising errors, or keeping them based on your data quality requirements.

Feature Scaling and Preprocessing

Logistic regression benefits from feature scaling, especially when features have different magnitudes. StandardScaler normalizes features to have zero mean and unit variance.

# Create scaler
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

# Fit and transform
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)

# Split data
train_data, test_data = scaled_data.randomSplit([0.8, 0.2], seed=42)

print(f"Training set: {train_data.count()} rows")
print(f"Test set: {test_data.count()} rows")

Binary Classification

Binary logistic regression predicts outcomes between two classes (0 or 1). Configure regularization parameters to prevent overfitting.

# Initialize logistic regression model
lr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="label",
    maxIter=100,
    regParam=0.01,  # L2 regularization parameter
    elasticNetParam=0.0,  # 0 = L2, 1 = L1, (0,1) = ElasticNet
    family="binomial",
    standardization=False  # Already scaled
)

# Train model
lr_model = lr.fit(train_data)

# Model coefficients and intercept
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")

# Training summary
training_summary = lr_model.summary
print(f"Total iterations: {training_summary.totalIterations}")
print(f"Objective history: {training_summary.objectiveHistory}")

Making Predictions and Probability Estimates

The trained model generates predictions along with probability estimates for each class.

# Make predictions
predictions = lr_model.transform(test_data)

# Show predictions with probabilities
predictions.select("label", "prediction", "probability", "rawPrediction").show(10, truncate=False)

# Extract probability for positive class
from pyspark.sql.functions import udf, col
from pyspark.ml.linalg import Vectors, VectorUDT

# UDF to extract probability of positive class
def extract_prob(probability_vector):
    return float(probability_vector[1])

extract_prob_udf = udf(extract_prob)

predictions_with_prob = predictions.withColumn(
    "prob_positive", 
    extract_prob_udf(col("probability"))
)

predictions_with_prob.select("label", "prediction", "prob_positive").show(10)

Model Evaluation

Evaluate binary classification performance using multiple metrics through evaluators.

# Binary classification evaluator
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc_roc = binary_evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc_roc:.4f}")

# Change metric to Area Under PR
binary_evaluator.setMetricName("areaUnderPR")
auc_pr = binary_evaluator.evaluate(predictions)
print(f"Area Under PR: {auc_pr:.4f}")

# Multiclass metrics for binary case
multiclass_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "accuracy"})
precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedPrecision"})
recall = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedRecall"})
f1 = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "f1"})

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Multinomial Classification

For multi-class problems, set the family parameter to “multinomial”. The model uses softmax regression internally.

# Multinomial logistic regression
mlr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="label",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.0,
    family="multinomial"
)

mlr_model = mlr.fit(train_data)

# Multinomial models have coefficient matrices
print(f"Coefficient Matrix shape: {mlr_model.coefficientMatrix.numRows} x {mlr_model.coefficientMatrix.numCols}")
print(f"Intercept Vector: {mlr_model.interceptVector}")

# Predictions
mlr_predictions = mlr_model.transform(test_data)

# Evaluate
mlr_accuracy = multiclass_evaluator.evaluate(mlr_predictions, {multiclass_evaluator.metricName: "accuracy"})
print(f"Multinomial Accuracy: {mlr_accuracy:.4f}")

Building ML Pipelines

Pipelines chain transformations and estimators into a single workflow, ensuring consistency between training and prediction.

# Create complete pipeline
pipeline = Pipeline(stages=[
    assembler,
    scaler,
    lr
])

# Fit pipeline
pipeline_model = pipeline.fit(train_data)

# Transform test data through entire pipeline
pipeline_predictions = pipeline_model.transform(test_data)

# Evaluate
pipeline_auc = binary_evaluator.evaluate(pipeline_predictions)
print(f"Pipeline AUC-ROC: {pipeline_auc:.4f}")

# Save pipeline model
pipeline_model.write().overwrite().save("models/lr_pipeline_model")

# Load saved model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("models/lr_pipeline_model")

Hyperparameter Tuning with Cross-Validation

Use CrossValidator or TrainValidationSplit for automated hyperparameter optimization.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Build parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.001, 0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [50, 100]) \
    .build()

# Create cross-validator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=binary_evaluator,
    numFolds=5,
    parallelism=4,
    seed=42
)

# Run cross-validation
cv_model = cv.fit(train_data)

# Best model
best_model = cv_model.bestModel
best_lr_stage = best_model.stages[-1]

print(f"Best regParam: {best_lr_stage.getRegParam()}")
print(f"Best elasticNetParam: {best_lr_stage.getElasticNetParam()}")
print(f"Best maxIter: {best_lr_stage.getMaxIter()}")

# Evaluate best model
best_predictions = cv_model.transform(test_data)
best_auc = binary_evaluator.evaluate(best_predictions)
print(f"Best Model AUC-ROC: {best_auc:.4f}")

Handling Class Imbalance

For imbalanced datasets, adjust class weights to penalize misclassification of minority classes.

# Calculate class distribution
class_counts = train_data.groupBy("label").count().collect()
total = sum([row['count'] for row in class_counts])

# Calculate weights (inverse frequency)
class_weights = {
    row['label']: total / (len(class_counts) * row['count']) 
    for row in class_counts
}

print(f"Class weights: {class_weights}")

# Apply weights using weightCol
from pyspark.sql.functions import when

train_weighted = train_data.withColumn(
    "weight",
    when(col("label") == 0, class_weights[0]).otherwise(class_weights[1])
)

# Train with weights
lr_weighted = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="label",
    weightCol="weight",
    maxIter=100,
    regParam=0.01
)

weighted_model = lr_weighted.fit(train_weighted)

PySpark MLlib’s logistic regression implementation provides production-ready distributed classification with comprehensive support for regularization, pipelines, and evaluation metrics. The framework handles data partitioning and parallel computation transparently, allowing you to focus on model development rather than infrastructure concerns.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.