PySpark - K-Means Clustering with MLlib
Start by initializing a Spark session with appropriate configurations for MLlib operations. The following setup allocates sufficient memory and enables dynamic allocation for optimal cluster...
Key Insights
- K-Means clustering in PySpark MLlib requires vectorized features and scales efficiently across distributed datasets, making it ideal for large-scale unsupervised learning tasks
- The algorithm’s performance depends heavily on proper feature scaling, optimal K selection using methods like the elbow method or silhouette analysis, and appropriate initialization strategies
- PySpark’s MLlib implementation provides both KMeans and BisectingKMeans algorithms with built-in support for model persistence, prediction, and cluster evaluation metrics
Setting Up the Environment
Start by initializing a Spark session with appropriate configurations for MLlib operations. The following setup allocates sufficient memory and enables dynamic allocation for optimal cluster performance.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("KMeansClusteringMLlib") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")
Loading and Preparing Data
For this example, we’ll use a customer segmentation dataset. The data preparation phase involves loading raw data, selecting relevant features, and handling missing values.
# Create sample customer data
data = spark.createDataFrame([
(1, 25, 50000, 5, 200),
(2, 35, 75000, 10, 450),
(3, 45, 95000, 15, 600),
(4, 28, 55000, 6, 220),
(5, 50, 120000, 20, 800),
(6, 32, 68000, 8, 380),
(7, 42, 88000, 12, 520),
(8, 29, 58000, 7, 240),
(9, 38, 82000, 11, 490),
(10, 48, 105000, 18, 720)
], ["customer_id", "age", "annual_income", "years_customer", "avg_monthly_spend"])
# Alternatively, load from file
# data = spark.read.csv("customers.csv", header=True, inferSchema=True)
# Check for missing values
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()
# Display basic statistics
data.describe().show()
Feature Engineering and Vectorization
MLlib requires features in a single vector column. Use VectorAssembler to combine multiple feature columns into a single vector column, then apply StandardScaler for normalization.
# Select features for clustering
feature_columns = ["age", "annual_income", "years_customer", "avg_monthly_spend"]
# Assemble features into a vector
assembler = VectorAssembler(
inputCols=feature_columns,
outputCol="features_raw"
)
assembled_data = assembler.transform(data)
# Scale features to normalize different ranges
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withStd=True,
withMean=True
)
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)
# Cache the scaled data for multiple iterations
scaled_data.cache()
Determining Optimal Number of Clusters
The elbow method evaluates clustering quality across different K values by calculating the Within Set Sum of Squared Errors (WSSSE) or silhouette score.
from pyspark.sql.functions import lit
# Elbow method: compute cost for different K values
costs = []
silhouette_scores = []
evaluator = ClusteringEvaluator(
featuresCol="features",
predictionCol="prediction",
metricName="silhouette"
)
K_range = range(2, 11)
for k in K_range:
kmeans = KMeans(
featuresCol="features",
predictionCol="prediction",
k=k,
seed=42,
maxIter=20
)
model = kmeans.fit(scaled_data)
predictions = model.transform(scaled_data)
# Calculate WSSSE
wssse = model.summary.trainingCost
costs.append(wssse)
# Calculate silhouette score
silhouette = evaluator.evaluate(predictions)
silhouette_scores.append(silhouette)
print(f"K={k}, WSSSE={wssse:.2f}, Silhouette={silhouette:.4f}")
# Optimal K is typically where the elbow occurs or silhouette is maximized
Training the K-Means Model
Once you’ve determined the optimal K value, train the final model with additional hyperparameter tuning options.
# Train final model with optimal K
optimal_k = 3
kmeans = KMeans(
featuresCol="features",
predictionCol="prediction",
k=optimal_k,
seed=42,
maxIter=100,
initMode="k-means||", # k-means|| initialization (default)
tol=1e-4
)
# Fit the model
kmeans_model = kmeans.fit(scaled_data)
# Make predictions
predictions = kmeans_model.transform(scaled_data)
# Display cluster assignments
predictions.select("customer_id", "age", "annual_income", "prediction").show(10)
Analyzing Cluster Centers and Results
Examine cluster centers to understand the characteristics of each segment. Remember that centers are in the scaled feature space.
import numpy as np
# Get cluster centers (in scaled space)
centers = kmeans_model.clusterCenters()
print(f"Number of clusters: {len(centers)}")
print(f"\nCluster Centers (scaled):")
for i, center in enumerate(centers):
print(f"Cluster {i}: {center}")
# Inverse transform to get centers in original scale
from pyspark.ml.feature import StandardScalerModel
# Manual inverse transformation
scaler_std = scaler_model.std.toArray()
scaler_mean = scaler_model.mean.toArray()
print(f"\nCluster Centers (original scale):")
for i, center in enumerate(centers):
original_center = center * scaler_std + scaler_mean
print(f"Cluster {i}:")
for j, feature_name in enumerate(feature_columns):
print(f" {feature_name}: {original_center[j]:.2f}")
print()
# Analyze cluster sizes
cluster_sizes = predictions.groupBy("prediction").count().orderBy("prediction")
cluster_sizes.show()
Model Evaluation and Validation
Evaluate cluster quality using silhouette score and within-cluster sum of squares. Higher silhouette scores indicate better-defined clusters.
# Evaluate clustering quality
evaluator = ClusteringEvaluator(
featuresCol="features",
predictionCol="prediction",
metricName="silhouette",
distanceMeasure="squaredEuclidean"
)
silhouette_score = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette_score:.4f}")
# Get model summary statistics
print(f"Training Cost (WSSSE): {kmeans_model.summary.trainingCost:.2f}")
print(f"Number of iterations: {kmeans_model.summary.numIter}")
# Cluster-wise statistics
for i in range(optimal_k):
cluster_data = predictions.filter(col("prediction") == i)
print(f"\nCluster {i} Statistics:")
cluster_data.select(feature_columns).describe().show()
Saving and Loading Models
Persist trained models and scalers for production deployment and future predictions.
# Save the model
model_path = "kmeans_model"
scaler_path = "scaler_model"
kmeans_model.write().overwrite().save(model_path)
scaler_model.write().overwrite().save(scaler_path)
# Load the model
from pyspark.ml.clustering import KMeansModel
from pyspark.ml.feature import StandardScalerModel
loaded_kmeans = KMeansModel.load(model_path)
loaded_scaler = StandardScalerModel.load(scaler_path)
# Make predictions on new data
new_data = spark.createDataFrame([
(11, 33, 72000, 9, 410),
(12, 44, 92000, 14, 580)
], ["customer_id", "age", "annual_income", "years_customer", "avg_monthly_spend"])
# Apply same transformations
new_assembled = assembler.transform(new_data)
new_scaled = loaded_scaler.transform(new_assembled)
new_predictions = loaded_kmeans.transform(new_scaled)
new_predictions.select("customer_id", "prediction").show()
Using Bisecting K-Means
For hierarchical clustering with better performance on large datasets, use BisectingKMeans as an alternative.
from pyspark.ml.clustering import BisectingKMeans
bisecting_kmeans = BisectingKMeans(
featuresCol="features",
predictionCol="prediction",
k=optimal_k,
seed=42,
maxIter=20,
minDivisibleClusterSize=1.0
)
bisecting_model = bisecting_kmeans.fit(scaled_data)
bisecting_predictions = bisecting_model.transform(scaled_data)
# Evaluate bisecting k-means
bisecting_silhouette = evaluator.evaluate(bisecting_predictions)
print(f"Bisecting K-Means Silhouette Score: {bisecting_silhouette:.4f}")
print(f"Training Cost: {bisecting_model.summary.trainingCost:.2f}")
K-Means clustering in PySpark MLlib provides a robust, scalable solution for unsupervised learning on distributed datasets. Proper feature scaling, optimal K selection, and thorough evaluation are critical for meaningful cluster discovery. The ability to persist models and apply them to streaming or batch data makes PySpark an excellent choice for production machine learning pipelines.