PySpark - PCA (Principal Component Analysis) with MLlib

Principal Component Analysis reduces dimensionality by identifying orthogonal axes (principal components) that capture the most variance in your data. In PySpark, this operation distributes across...

Key Insights

  • PySpark MLlib’s PCA implementation handles distributed dimensionality reduction on massive datasets, transforming high-dimensional feature vectors into a lower-dimensional space while preserving maximum variance
  • The StandardScaler preprocessing step is critical for PCA in PySpark—features with larger scales will dominate principal components without normalization
  • PySpark PCA returns both the transformed data and the principal components matrix, enabling you to analyze feature contributions and reconstruct original data with controlled information loss

Understanding PCA in Distributed Computing

Principal Component Analysis reduces dimensionality by identifying orthogonal axes (principal components) that capture the most variance in your data. In PySpark, this operation distributes across cluster nodes, making it feasible to perform PCA on datasets that won’t fit in memory on a single machine.

The MLlib implementation computes the covariance matrix in a distributed fashion, then extracts eigenvectors locally. This hybrid approach balances computational efficiency with the mathematical requirements of eigendecomposition.

Setting Up the Environment

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
import numpy as np

spark = SparkSession.builder \
    .appName("PCA_Analysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Create sample dataset with correlated features
data = [(float(i), float(i * 2 + np.random.randn()), 
         float(i * 0.5 + np.random.randn()), 
         float(i * 1.5 + np.random.randn())) 
        for i in range(1000)]

df = spark.createDataFrame(data, ["feature1", "feature2", "feature3", "feature4"])
df.show(5)

Preprocessing with VectorAssembler and StandardScaler

PCA requires features in a single vector column. VectorAssembler combines multiple columns into one feature vector. StandardScaler then normalizes these features to have zero mean and unit variance.

# Assemble features into a single vector
feature_columns = ["feature1", "feature2", "feature3", "feature4"]
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features"
)

# Standardize features - critical for PCA
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

# Apply transformations
assembled_df = assembler.transform(df)
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)

scaled_df.select("scaled_features").show(5, truncate=False)

Without standardization, features with larger magnitudes dominate the principal components. A feature ranging from 0-1000 will have far more influence than one ranging from 0-1, regardless of their actual predictive importance.

Applying PCA Transformation

The PCA estimator requires specifying the number of principal components to retain. This determines the dimensionality of your output space.

# Reduce from 4 dimensions to 2
pca = PCA(
    k=2,
    inputCol="scaled_features",
    outputCol="pca_features"
)

# Fit the model
pca_model = pca.fit(scaled_df)

# Transform the data
pca_df = pca_model.transform(scaled_df)

# Display transformed features
pca_df.select("pca_features").show(10, truncate=False)

The k parameter controls the trade-off between dimensionality reduction and information preservation. Smaller k values mean more compression but potentially more information loss.

Analyzing Explained Variance

Understanding how much variance each principal component captures helps you choose an appropriate k value.

# Get explained variance for each component
explained_variance = pca_model.explainedVariance

print("Explained Variance Ratio:")
for i, variance in enumerate(explained_variance):
    print(f"PC{i+1}: {variance:.4f} ({variance*100:.2f}%)")

# Calculate cumulative variance
cumulative_variance = np.cumsum(explained_variance.toArray())
print(f"\nCumulative variance explained: {cumulative_variance}")

# Total variance captured
total_variance = sum(explained_variance)
print(f"Total variance with {pca.getK()} components: {total_variance:.4f}")

A common heuristic is to retain enough components to explain 95% of the variance. This balances dimensionality reduction with information preservation.

Examining Principal Components

The principal components matrix reveals how original features contribute to each new dimension.

# Get the principal components matrix
pc_matrix = pca_model.pc

print(f"Principal Components Matrix shape: {pc_matrix.numRows} x {pc_matrix.numCols}")
print("\nPrincipal Components:")
print(pc_matrix.toArray())

# Analyze feature contributions to first principal component
pc1 = pc_matrix.toArray()[:, 0]
feature_contributions = list(zip(feature_columns, pc1))
feature_contributions.sort(key=lambda x: abs(x[1]), reverse=True)

print("\nFeature contributions to PC1:")
for feature, contribution in feature_contributions:
    print(f"{feature}: {contribution:.4f}")

High absolute values indicate features that strongly influence that principal component. This helps interpret what each component represents in your domain.

Building a Complete Pipeline

Pipelines chain transformations together, making the workflow reproducible and easier to deploy.

# Create complete pipeline
pipeline = Pipeline(stages=[
    assembler,
    scaler,
    pca
])

# Fit pipeline on training data
pipeline_model = pipeline.fit(df)

# Transform new data
new_data = [(10.0, 20.0, 5.0, 15.0), 
            (15.0, 30.0, 7.5, 22.5)]
new_df = spark.createDataFrame(new_data, feature_columns)

transformed_new = pipeline_model.transform(new_df)
transformed_new.select("pca_features").show(truncate=False)

Choosing the Optimal Number of Components

Programmatically determine the optimal k by testing multiple values and analyzing the explained variance curve.

# Test different k values
variance_results = []

for k in range(1, len(feature_columns) + 1):
    pca_test = PCA(k=k, inputCol="scaled_features", outputCol="pca_features")
    pca_test_model = pca_test.fit(scaled_df)
    total_var = sum(pca_test_model.explainedVariance)
    variance_results.append((k, total_var))
    print(f"k={k}: {total_var:.4f} ({total_var*100:.2f}% variance)")

# Find k for 95% variance threshold
threshold = 0.95
optimal_k = next(k for k, var in variance_results if var >= threshold)
print(f"\nOptimal k for {threshold*100}% variance: {optimal_k}")

Reconstructing Original Data

You can reconstruct the original features from PCA components to measure reconstruction error.

# Get one transformed sample
sample_pca = pca_df.select("pca_features").first()[0]

# Reconstruct using PC matrix
pc_array = pca_model.pc.toArray()
reconstructed = np.dot(sample_pca, pc_array.T)

# Get original scaled features for comparison
sample_original = scaled_df.select("scaled_features").first()[0]

# Calculate reconstruction error
reconstruction_error = np.linalg.norm(
    np.array(sample_original) - reconstructed
)
print(f"Reconstruction error: {reconstruction_error:.6f}")

Lower reconstruction error indicates better preservation of original information. This metric validates your choice of k.

Performance Considerations

PCA computation scales with the number of features squared and the number of samples linearly. For datasets with thousands of features, consider these optimizations:

# Cache intermediate results for iterative operations
scaled_df.cache()

# Repartition for better parallelism
scaled_df = scaled_df.repartition(200)

# Use SVD-based PCA for very wide datasets
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import Vectors

# Convert to RDD for mllib functions
vectors_rdd = scaled_df.select("scaled_features").rdd.map(
    lambda row: Vectors.dense(row[0])
)

# Compute SVD directly
mat = RowMatrix(vectors_rdd)
svd = mat.computeSVD(k=2, computeU=True)

print(f"Singular values: {svd.s}")

The MLlib RowMatrix approach provides more control over the SVD computation but requires working with RDDs instead of DataFrames.

PCA in PySpark enables dimensionality reduction at scale, transforming high-dimensional datasets into manageable representations while preserving the statistical structure. Proper preprocessing, variance analysis, and component interpretation ensure you extract meaningful insights from compressed data.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.