Spark MLlib - Machine Learning Overview

• Spark MLlib provides distributed machine learning algorithms that scale horizontally across clusters, making it ideal for training models on datasets too large for single-machine frameworks like...

Key Insights

• Spark MLlib provides distributed machine learning algorithms that scale horizontally across clusters, making it ideal for training models on datasets too large for single-machine frameworks like scikit-learn. • The DataFrame-based API (spark.ml) has superseded the RDD-based API (spark.mllib), offering better integration with Spark SQL and a pipeline-based workflow similar to scikit-learn. • MLlib excels at feature engineering and model training on big data but lacks the algorithm diversity of specialized frameworks—consider it for preprocessing massive datasets before feeding to other ML tools.

Core Architecture and APIs

Spark MLlib operates on two distinct APIs that serve different purposes. The older RDD-based spark.mllib package is in maintenance mode, while the DataFrame-based spark.ml package represents the active development path.

The spark.ml API organizes machine learning workflows into pipelines consisting of transformers and estimators. Transformers modify DataFrames (like feature scalers or trained models making predictions), while estimators learn from data to produce transformers (like training algorithms).

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

spark = SparkSession.builder \
    .appName("MLlibExample") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load data
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Define pipeline stages
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features"
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)

lr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="label",
    maxIter=10
)

# Create and fit pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
model = pipeline.fit(df)

This pipeline approach ensures that transformation logic stays consistent between training and inference, preventing common data leakage issues.

Feature Engineering at Scale

MLlib shines when transforming massive datasets. The library includes vectorizers, encoders, and statistical transformers designed for distributed execution.

from pyspark.ml.feature import StringIndexer, OneHotEncoder, Bucketizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

# Categorical encoding
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")

# Numerical binning
bucketizer = Bucketizer(
    splits=[-float("inf"), 0, 10, 50, float("inf")],
    inputCol="age",
    outputCol="age_bucket"
)

# Text feature extraction
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")

# Apply transformations
df_indexed = indexer.fit(df).transform(df)
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
df_bucketed = bucketizer.transform(df_encoded)

# TF-IDF pipeline
df_tokens = tokenizer.transform(df)
df_tf = hashing_tf.transform(df_tokens)
idf_model = idf.fit(df_tf)
df_tfidf = idf_model.transform(df_tf)

For high-cardinality categorical features, use FeatureHasher instead of one-hot encoding to avoid dimension explosion:

from pyspark.ml.feature import FeatureHasher

hasher = FeatureHasher(
    inputCols=["user_id", "product_id", "category"],
    outputCol="hashed_features",
    numFeatures=1024
)

df_hashed = hasher.transform(df)

Classification and Regression Models

MLlib supports common supervised learning algorithms with distributed training implementations. Each algorithm exposes hyperparameters through a consistent interface.

from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

# Random Forest Classification
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    maxBins=32,
    minInstancesPerNode=1,
    seed=42
)

rf_model = rf.fit(train_df)
predictions = rf_model.transform(test_df)

# Evaluate
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Gradient Boosted Trees
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=20,
    maxDepth=5,
    stepSize=0.1
)

gbt_model = gbt.fit(train_df)

# Linear Regression with regularization
lr = LinearRegression(
    featuresCol="features",
    labelCol="target",
    elasticNetParam=0.5,  # 0 = L2, 1 = L1
    regParam=0.01,
    maxIter=100
)

lr_model = lr.fit(train_df)
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")
print(f"RMSE: {lr_model.summary.rootMeanSquaredError}")

Access feature importance for tree-based models:

import pandas as pd

feature_importance = pd.DataFrame({
    'feature': feature_columns,
    'importance': rf_model.featureImportances.toArray()
}).sort_values('importance', ascending=False)

print(feature_importance.head(10))

Hyperparameter Tuning with Cross-Validation

MLlib provides CrossValidator and TrainValidationSplit for automated hyperparameter search across distributed folds.

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
    .build()

# Create cross-validator
cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=param_grid,
    evaluator=MulticlassClassificationEvaluator(labelCol="label"),
    numFolds=3,
    parallelism=4,  # Number of models to train in parallel
    seed=42
)

# Run cross-validation
cv_model = cv.fit(train_df)

# Best model and parameters
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.getNumTrees}")
print(f"Best maxDepth: {best_model.getMaxDepth()}")

# Evaluate on test set
predictions = cv_model.transform(test_df)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")

For faster iteration on large datasets, use TrainValidationSplit which performs a single train/validation split instead of k-fold cross-validation:

from pyspark.ml.tuning import TrainValidationSplit

tvs = TrainValidationSplit(
    estimator=rf,
    estimatorParamMaps=param_grid,
    evaluator=MulticlassClassificationEvaluator(labelCol="label"),
    trainRatio=0.8,
    parallelism=4
)

tvs_model = tvs.fit(train_df)

Clustering and Dimensionality Reduction

MLlib includes unsupervised learning algorithms for discovering patterns in unlabeled data.

from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.feature import PCA

# K-Means clustering
kmeans = KMeans(
    featuresCol="features",
    k=5,
    maxIter=20,
    seed=42
)

kmeans_model = kmeans.fit(df)
predictions = kmeans_model.transform(df)

# Cluster centers
centers = kmeans_model.clusterCenters()
print(f"Cluster Centers:\n{centers}")

# Within-cluster sum of squared distances
wssse = kmeans_model.summary.trainingCost
print(f"WSSSE: {wssse}")

# PCA for dimensionality reduction
pca = PCA(
    k=10,
    inputCol="features",
    outputCol="pca_features"
)

pca_model = pca.fit(df)
df_reduced = pca_model.transform(df)

# Explained variance
explained_variance = pca_model.explainedVariance
print(f"Explained Variance: {explained_variance}")

Model Persistence and Deployment

Save trained models and entire pipelines to disk for later use or deployment to production environments.

# Save model
model.write().overwrite().save("models/rf_model")

# Load model
from pyspark.ml.classification import RandomForestClassificationModel
loaded_model = RandomForestClassificationModel.load("models/rf_model")

# Save entire pipeline
pipeline_model.write().overwrite().save("models/pipeline")

# Load pipeline
from pyspark.ml import PipelineModel
loaded_pipeline = PipelineModel.load("models/pipeline")

# Make predictions with loaded model
new_predictions = loaded_pipeline.transform(new_data)

MLlib models serialize to Parquet format, making them portable across different Spark versions and deployment environments. For production serving, consider exporting to PMML or ONNX formats if your serving infrastructure doesn’t run Spark, though MLlib’s native support for these formats is limited compared to dedicated ML frameworks.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.