How to Use Pipeline in scikit-learn

Every machine learning workflow involves a sequence of transformations: scaling features, encoding categories, imputing missing values, and finally training a model. Without pipelines, you'll find...

Key Insights

  • Pipelines prevent data leakage by ensuring preprocessing steps are fit only on training data and applied consistently to test data, eliminating the most common source of overly optimistic model performance estimates.
  • The ColumnTransformer class enables different preprocessing strategies for different feature types within a single pipeline, making it trivial to handle mixed data types without manual column tracking.
  • Custom transformers integrated into pipelines create reproducible, deployable ML workflows where the entire preprocessing and prediction logic is encapsulated in a single serializable object.

Introduction to Pipelines

Every machine learning workflow involves a sequence of transformations: scaling features, encoding categories, imputing missing values, and finally training a model. Without pipelines, you’ll find yourself writing repetitive code and risking data leakage—the silent killer of model validity.

Data leakage occurs when information from your test set influences your training process. The classic mistake looks like this:

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris

# Load data
X, y = load_iris(return_X_y=True)

# WRONG: Fitting scaler on all data before split
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split happens after scaling - data leakage!
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)

model = LogisticRegression()
model.fit(X_train, y_train)

The scaler learned statistics from the entire dataset, including the test set. Your test accuracy is now artificially inflated. Here’s the correct approach with a pipeline:

from sklearn.pipeline import Pipeline

# Split first
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Pipeline ensures scaler fits only on training data
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

pipeline.fit(X_train, y_train)
score = pipeline.score(X_test, y_test)

The pipeline fits the scaler on training data only, then applies those learned parameters to the test set. This is the correct workflow.

Creating a Basic Pipeline

You can construct pipelines using Pipeline() with named steps or make_pipeline() for automatic naming. I prefer explicit naming for clarity when debugging and tuning hyperparameters.

from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier

# Explicit naming with Pipeline
pipeline_named = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('rf', RandomForestClassifier(n_estimators=100))
])

# Automatic naming with make_pipeline
pipeline_auto = make_pipeline(
    StandardScaler(),
    PCA(n_components=2),
    RandomForestClassifier(n_estimators=100)
)

The make_pipeline() version generates names like standardscaler, pca, and randomforestclassifier. This works fine for simple cases, but explicit naming becomes essential when you have multiple transformers of the same type or when tuning hyperparameters.

All steps except the last must be transformers (implementing fit and transform). The final step can be a transformer or an estimator (implementing fit and predict).

Preprocessing with ColumnTransformer

Real-world datasets contain mixed feature types. You need to one-hot encode categorical variables while scaling numerical ones. ColumnTransformer handles this elegantly:

import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import GradientBoostingClassifier

# Sample data with mixed types
data = pd.DataFrame({
    'age': [25, 32, 47, 51],
    'income': [50000, 75000, 100000, 120000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago'],
    'education': ['BS', 'MS', 'PhD', 'BS'],
    'purchased': [0, 1, 1, 1]
})

X = data.drop('purchased', axis=1)
y = data['purchased']

# Define column groups
numerical_features = ['age', 'income']
categorical_features = ['city', 'education']

# Create preprocessing pipeline
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_features),
        ('cat', OneHotEncoder(drop='first', sparse_output=False), categorical_features)
    ])

# Complete pipeline
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', GradientBoostingClassifier())
])

pipeline.fit(X, y)

The ColumnTransformer applies different transformations to specified columns and concatenates the results. The drop='first' parameter in OneHotEncoder prevents multicollinearity by dropping one dummy variable per category.

Pipelines shine during hyperparameter tuning. They ensure preprocessing steps are refit for each cross-validation fold, preventing leakage while searching the parameter space.

from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC

# Pipeline with multiple tunable components
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svm', SVC())
])

# Parameter grid using step__parameter naming convention
param_grid = {
    'pca__n_components': [2, 3, 4],
    'svm__C': [0.1, 1, 10],
    'svm__kernel': ['linear', 'rbf'],
    'svm__gamma': ['scale', 'auto']
}

grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)

grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_:.3f}")
print(f"Test score: {grid_search.score(X_test, y_test):.3f}")

The parameter naming convention step_name__parameter_name allows you to tune any parameter in any pipeline step. This becomes powerful when you want to tune preprocessing decisions (like PCA components) alongside model hyperparameters.

Custom Transformers in Pipelines

Domain-specific feature engineering often requires custom transformers. Create them by inheriting from BaseEstimator and TransformerMixin:

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class LogTransformer(BaseEstimator, TransformerMixin):
    """Apply log transformation to specified columns."""
    
    def __init__(self, columns=None):
        self.columns = columns
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        if self.columns is None:
            # Apply to all columns
            return np.log1p(X_copy)
        else:
            # Apply to specified columns only
            X_copy[self.columns] = np.log1p(X_copy[self.columns])
        return X_copy

class FeatureInteraction(BaseEstimator, TransformerMixin):
    """Create interaction features between specified column pairs."""
    
    def __init__(self, interactions=None):
        self.interactions = interactions or []
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        for col1, col2 in self.interactions:
            X_copy[f'{col1}_x_{col2}'] = X_copy[col1] * X_copy[col2]
        return X_copy

# Use custom transformers in pipeline
custom_pipeline = Pipeline([
    ('log_transform', LogTransformer(columns=['income'])),
    ('interactions', FeatureInteraction(interactions=[('age', 'income')])),
    ('scaler', StandardScaler()),
    ('model', GradientBoostingClassifier())
])

The fit method must return self even if it does nothing. This enables method chaining. The transform method performs the actual transformation. Always work on copies of the input data to avoid modifying the original.

Saving and Loading Pipelines

Pipelines are serializable objects. Save fitted pipelines for production deployment:

import joblib

# Train and save
pipeline.fit(X_train, y_train)
joblib.dump(pipeline, 'model_pipeline.joblib')

# Load and predict
loaded_pipeline = joblib.load('model_pipeline.joblib')
predictions = loaded_pipeline.predict(X_new)

Use joblib instead of pickle for objects containing large numpy arrays—it’s more efficient. The saved pipeline contains all fitted transformers and the trained model. When you load it, everything is ready for predictions without refitting.

This is crucial for deployment. Your preprocessing logic is encapsulated with your model, ensuring consistency between training and production environments.

Best Practices and Common Pitfalls

Access intermediate steps using named_steps to inspect learned parameters or debug transformations:

# Access fitted scaler
scaler = pipeline.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature scales: {scaler.scale_}")

# Get transformed features from intermediate step
X_scaled = pipeline.named_steps['scaler'].transform(X_test)

# Access the final estimator
model = pipeline.named_steps['classifier']
print(f"Feature importances: {model.feature_importances_}")

Use descriptive step names that clarify intent. Instead of ('step1', StandardScaler()), use ('scaler', StandardScaler()). This makes debugging and hyperparameter tuning much clearer.

Remember that pipelines require consistent column ordering. If you train on a DataFrame with columns ['age', 'income', 'city'], your prediction data must have the same column order. This is why I often use ColumnTransformer with explicit column names rather than relying on column positions.

Don’t fit pipelines on test data. This seems obvious, but I’ve seen code that calls pipeline.fit_transform(X_test) instead of pipeline.transform(X_test). The former refits all transformers on test data, causing leakage.

Cache expensive transformations using the memory parameter:

from tempfile import mkdtemp
from shutil import rmtree

cachedir = mkdtemp()
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=50)),
    ('classifier', RandomForestClassifier())
], memory=cachedir)

# PCA results are cached during grid search
# Cleanup when done
rmtree(cachedir)

This is valuable during grid search when expensive transformations (like PCA) don’t change across parameter combinations.

Pipelines transform chaotic ML code into reproducible, maintainable workflows. They prevent data leakage, reduce boilerplate, and create deployable artifacts. Start using them for every project—your future self will thank you.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.