PySpark - Linear Regression with MLlib
Linear regression in PySpark requires a SparkSession and proper schema definition. Start by initializing Spark with adequate memory allocation for your dataset size.
Key Insights
- PySpark’s MLlib provides distributed linear regression that scales horizontally across clusters, enabling you to train models on datasets too large for single-machine libraries like scikit-learn.
- Feature engineering in PySpark requires VectorAssembler to combine input columns into a single feature vector, a mandatory preprocessing step before training any MLlib model.
- MLlib’s LinearRegression estimator supports L1/L2 regularization, weighted samples, and automatic feature standardization, with performance metrics accessible through the trained model’s summary object.
Setting Up the Environment
Linear regression in PySpark requires a SparkSession and proper schema definition. Start by initializing Spark with adequate memory allocation for your dataset size.
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
spark = SparkSession.builder \
.appName("LinearRegressionMLlib") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
# Sample dataset: house prices
data = spark.createDataFrame([
(1500, 3, 2, 250000),
(2000, 4, 3, 350000),
(1200, 2, 1, 180000),
(1800, 3, 2, 290000),
(2500, 5, 4, 450000),
(1350, 2, 2, 210000),
(2200, 4, 3, 380000),
(1600, 3, 2, 270000)
], ["sqft", "bedrooms", "bathrooms", "price"])
data.show()
Feature Vector Assembly
MLlib requires all features in a single vector column. VectorAssembler transforms multiple input columns into one feature vector, which becomes the input for the regression model.
# Define feature columns
feature_cols = ["sqft", "bedrooms", "bathrooms"]
# Create VectorAssembler
assembler = VectorAssembler(
inputCols=feature_cols,
outputCol="features"
)
# Transform the data
assembled_data = assembler.transform(data)
assembled_data.select("features", "price").show(truncate=False)
Output shows the feature vector format:
+-------------------+------+
|features |price |
+-------------------+------+
|[1500.0,3.0,2.0] |250000|
|[2000.0,4.0,3.0] |350000|
|[1200.0,2.0,1.0] |180000|
+-------------------+------+
Training a Basic Linear Regression Model
The LinearRegression class follows the estimator-transformer pattern. Configure hyperparameters before calling fit() on your assembled dataset.
# Create LinearRegression instance
lr = LinearRegression(
featuresCol="features",
labelCol="price",
predictionCol="predicted_price",
maxIter=100,
regParam=0.0, # No regularization initially
elasticNetParam=0.0 # 0 = L2, 1 = L1
)
# Train the model
lr_model = lr.fit(assembled_data)
# Extract coefficients and intercept
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")
# Make predictions
predictions = lr_model.transform(assembled_data)
predictions.select("features", "price", "predicted_price").show()
The coefficients represent the weight for each feature (sqft, bedrooms, bathrooms), while the intercept is the baseline price.
Model Evaluation and Metrics
Access comprehensive training metrics through the model’s summary object. These metrics help assess model performance and identify potential issues.
# Training summary
train_summary = lr_model.summary
print(f"RMSE: {train_summary.rootMeanSquaredError}")
print(f"R2: {train_summary.r2}")
print(f"MAE: {train_summary.meanAbsoluteError}")
print(f"Explained Variance: {train_summary.explainedVariance}")
# Residuals analysis
residuals = train_summary.residuals
residuals.describe().show()
# Predictions vs actuals
train_summary.predictions.select("price", "predicted_price").show()
For production models, use a separate test set with RegressionEvaluator:
evaluator = RegressionEvaluator(
labelCol="price",
predictionCol="predicted_price",
metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse}")
# Calculate multiple metrics
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"Test R2: {r2}")
Train-Test Split and Cross-Validation
Split your data properly to avoid overfitting. PySpark provides randomSplit for creating training and test sets.
# Load larger dataset for meaningful split
from pyspark.sql.types import StructType, StructField, DoubleType
schema = StructType([
StructField("sqft", DoubleType(), True),
StructField("bedrooms", DoubleType(), True),
StructField("bathrooms", DoubleType(), True),
StructField("age", DoubleType(), True),
StructField("price", DoubleType(), True)
])
# Assume you have a CSV file
df = spark.read.csv("housing_data.csv", header=True, schema=schema)
# Split data: 80% training, 20% testing
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training samples: {train_data.count()}")
print(f"Test samples: {test_data.count()}")
# Assemble features
feature_cols = ["sqft", "bedrooms", "bathrooms", "age"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_assembled = assembler.transform(train_data)
test_assembled = assembler.transform(test_data)
# Train on training set
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train_assembled)
# Evaluate on test set
test_predictions = model.transform(test_assembled)
test_rmse = evaluator.evaluate(test_predictions)
print(f"Test Set RMSE: {test_rmse}")
Regularization and Hyperparameter Tuning
Regularization prevents overfitting by penalizing large coefficients. MLlib supports L1 (Lasso), L2 (Ridge), and Elastic Net regularization.
# Ridge Regression (L2)
ridge_lr = LinearRegression(
featuresCol="features",
labelCol="price",
regParam=0.1, # Regularization strength
elasticNetParam=0.0, # Pure L2
standardization=True # Standardize features
)
ridge_model = ridge_lr.fit(train_assembled)
# Lasso Regression (L1)
lasso_lr = LinearRegression(
featuresCol="features",
labelCol="price",
regParam=0.1,
elasticNetParam=1.0, # Pure L1
standardization=True
)
lasso_model = lasso_lr.fit(train_assembled)
# Elastic Net (combination of L1 and L2)
elastic_lr = LinearRegression(
featuresCol="features",
labelCol="price",
regParam=0.1,
elasticNetParam=0.5, # 50% L1, 50% L2
standardization=True
)
elastic_model = elastic_lr.fit(train_assembled)
# Compare coefficients
print("Ridge coefficients:", ridge_model.coefficients)
print("Lasso coefficients:", lasso_model.coefficients)
print("Elastic Net coefficients:", elastic_model.coefficients)
Pipeline Integration
Combine preprocessing and modeling into a single pipeline for cleaner code and easier deployment.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
# Create pipeline stages
assembler = VectorAssembler(
inputCols=["sqft", "bedrooms", "bathrooms", "age"],
outputCol="raw_features"
)
scaler = StandardScaler(
inputCol="raw_features",
outputCol="features",
withStd=True,
withMean=True
)
lr = LinearRegression(
featuresCol="features",
labelCol="price",
regParam=0.1,
elasticNetParam=0.5
)
# Build pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Fit pipeline
pipeline_model = pipeline.fit(train_data)
# Make predictions
pipeline_predictions = pipeline_model.transform(test_data)
# Evaluate
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
pipeline_rmse = evaluator.evaluate(pipeline_predictions)
print(f"Pipeline RMSE: {pipeline_rmse}")
# Save model
pipeline_model.write().overwrite().save("models/linear_regression_pipeline")
# Load model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("models/linear_regression_pipeline")
Handling Categorical Features
Real-world datasets contain categorical variables that require encoding before use in linear regression.
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# Sample data with categorical feature
data_with_cat = spark.createDataFrame([
(1500, 3, "urban", 250000),
(2000, 4, "suburban", 350000),
(1200, 2, "rural", 180000),
(1800, 3, "urban", 290000)
], ["sqft", "bedrooms", "location", "price"])
# Index categorical column
indexer = StringIndexer(inputCol="location", outputCol="location_index")
# One-hot encode
encoder = OneHotEncoder(
inputCols=["location_index"],
outputCols=["location_vec"]
)
# Assemble all features
assembler = VectorAssembler(
inputCols=["sqft", "bedrooms", "location_vec"],
outputCol="features"
)
# Create pipeline with encoding
cat_pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
cat_model = cat_pipeline.fit(data_with_cat)
PySpark’s MLlib linear regression scales to massive datasets while maintaining familiar machine learning workflows. Use pipelines for production deployments, apply appropriate regularization to prevent overfitting, and leverage distributed computing for datasets exceeding single-machine memory constraints.