ETL Pipelines: Extract, Transform, Load

ETL stands for Extract, Transform, Load—three distinct phases that move data from source systems into a format and location suitable for analysis. Every organization with more than one data source...

Key Insights

  • ETL pipelines are the backbone of data-driven organizations, but most teams overcomplicate them—start with simple Python scripts before reaching for Spark or complex orchestration tools.
  • The transform step is where pipelines fail most often; invest heavily in data validation and make your transformations idempotent from day one.
  • ELT (Extract, Load, Transform) often beats traditional ETL for analytical workloads because modern data warehouses handle transformations more efficiently than your application code.

What is ETL? The Data Movement Foundation

ETL stands for Extract, Transform, Load—three distinct phases that move data from source systems into a format and location suitable for analysis. Every organization with more than one data source needs ETL, whether they call it that or not.

The pattern is straightforward: pull data from where it lives (databases, APIs, files), reshape it into something useful, and write it somewhere analysts and applications can access it. The complexity comes from scale, reliability requirements, and the messy reality of production data.

Modern data architecture typically places ETL between operational systems and analytical stores. Your PostgreSQL database handles transactions, but your data warehouse (Snowflake, BigQuery, Redshift) handles reporting. ETL bridges that gap.

ETL vs ELT: Traditional ETL transforms data before loading it into the target system. ELT flips this—you load raw data first, then transform it in the warehouse. ELT has become the dominant pattern for analytics because warehouses are optimized for transformation workloads, and storing raw data preserves flexibility. Use ETL when you need to filter sensitive data before it reaches the warehouse, or when your target system lacks transformation capabilities.

Extract: Pulling Data from Sources

Extraction seems simple until you deal with API rate limits, database connection pools, and source systems that go down at 3 AM. The extraction phase must handle four common source types: databases, REST APIs, files (CSV, JSON, Parquet), and streaming systems.

Incremental vs full extraction is your first architectural decision. Full extraction pulls everything every time—simple but expensive. Incremental extraction tracks what changed since the last run using timestamps, change data capture (CDC), or sequence IDs. Start with full extraction for small datasets, but plan for incremental from the beginning.

import requests
import psycopg2
from datetime import datetime, timedelta
from typing import Iterator, Dict, Any

class DataExtractor:
    def __init__(self, api_base_url: str, db_connection_string: str):
        self.api_base_url = api_base_url
        self.db_conn = psycopg2.connect(db_connection_string)
    
    def extract_from_api(self, endpoint: str, since: datetime = None) -> Iterator[Dict[Any, Any]]:
        """Extract data from REST API with pagination and incremental support."""
        params = {}
        if since:
            params['updated_after'] = since.isoformat()
        
        page = 1
        while True:
            params['page'] = page
            response = requests.get(
                f"{self.api_base_url}/{endpoint}",
                params=params,
                timeout=30
            )
            response.raise_for_status()
            
            data = response.json()
            if not data['results']:
                break
                
            yield from data['results']
            page += 1
    
    def extract_from_postgres(self, table: str, since: datetime = None) -> Iterator[Dict[Any, Any]]:
        """Extract data from PostgreSQL with incremental support."""
        query = f"SELECT * FROM {table}"
        params = []
        
        if since:
            query += " WHERE updated_at > %s"
            params.append(since)
        
        with self.db_conn.cursor() as cursor:
            cursor.execute(query, params)
            columns = [desc[0] for desc in cursor.description]
            
            while batch := cursor.fetchmany(1000):
                for row in batch:
                    yield dict(zip(columns, row))

This extractor handles both API and database sources with incremental extraction support. The generator pattern keeps memory usage constant regardless of dataset size.

Transform: Cleaning and Shaping Data

Transformation is where data engineering gets real. Source data is messy: null values where you expect integers, dates in seventeen different formats, duplicate records, and schema changes that break your pipeline without warning.

Your transformation layer must handle validation (is this data correct?), normalization (make it consistent), and enrichment (add derived fields). Build these as composable functions that can be tested independently.

import pandas as pd
import numpy as np
from typing import List, Callable
from datetime import datetime

class DataTransformer:
    def __init__(self):
        self.transformations: List[Callable] = []
        self.validation_errors: List[Dict] = []
    
    def add_transformation(self, func: Callable) -> 'DataTransformer':
        self.transformations.append(func)
        return self
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        for func in self.transformations:
            df = func(df)
        return df

def clean_customer_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and normalize customer records."""
    # Standardize email to lowercase
    df['email'] = df['email'].str.lower().str.strip()
    
    # Parse dates with multiple format support
    df['created_at'] = pd.to_datetime(
        df['created_at'], 
        format='mixed',
        errors='coerce'
    )
    
    # Handle missing values with explicit defaults
    df['country'] = df['country'].fillna('UNKNOWN')
    df['revenue'] = pd.to_numeric(df['revenue'], errors='coerce').fillna(0)
    
    # Remove duplicates, keeping most recent
    df = df.sort_values('created_at').drop_duplicates(
        subset=['email'], 
        keep='last'
    )
    
    return df

def validate_required_fields(df: pd.DataFrame, required: List[str]) -> pd.DataFrame:
    """Validate required fields exist and aren't null."""
    for field in required:
        if field not in df.columns:
            raise ValueError(f"Missing required column: {field}")
        
        null_count = df[field].isna().sum()
        if null_count > 0:
            # Log but don't fail—filter invalid rows
            print(f"Warning: {null_count} null values in {field}")
            df = df[df[field].notna()]
    
    return df

def add_derived_fields(df: pd.DataFrame) -> pd.DataFrame:
    """Add computed fields for analytics."""
    df['customer_tenure_days'] = (datetime.now() - df['created_at']).dt.days
    df['revenue_tier'] = pd.cut(
        df['revenue'],
        bins=[0, 100, 1000, 10000, float('inf')],
        labels=['bronze', 'silver', 'gold', 'platinum']
    )
    return df

# Usage
transformer = DataTransformer()
transformer.add_transformation(lambda df: validate_required_fields(df, ['email', 'created_at']))
transformer.add_transformation(clean_customer_data)
transformer.add_transformation(add_derived_fields)

clean_data = transformer.transform(raw_data)

Notice the pipeline pattern—each transformation is a pure function that takes a DataFrame and returns a DataFrame. This makes testing trivial and debugging straightforward.

Load: Writing to Target Systems

Loading data sounds simple until you need to handle failures mid-write, avoid duplicates, and maintain consistency. The key principle is idempotency: running your load multiple times with the same data should produce the same result.

from sqlalchemy import create_engine, text
from sqlalchemy.dialects.postgresql import insert
import pandas as pd

class DataLoader:
    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)
    
    def upsert_batch(
        self, 
        df: pd.DataFrame, 
        table: str, 
        conflict_columns: List[str],
        update_columns: List[str]
    ) -> int:
        """Upsert data with conflict resolution for idempotency."""
        
        records = df.to_dict('records')
        
        with self.engine.begin() as conn:
            # PostgreSQL upsert using ON CONFLICT
            stmt = insert(self._get_table(table)).values(records)
            
            update_dict = {col: stmt.excluded[col] for col in update_columns}
            update_dict['updated_at'] = text('NOW()')
            
            upsert_stmt = stmt.on_conflict_do_update(
                index_elements=conflict_columns,
                set_=update_dict
            )
            
            result = conn.execute(upsert_stmt)
            return result.rowcount
    
    def load_with_staging(
        self, 
        df: pd.DataFrame, 
        target_table: str,
        staging_table: str
    ) -> int:
        """Load via staging table for atomic updates."""
        
        with self.engine.begin() as conn:
            # Truncate and load staging
            conn.execute(text(f"TRUNCATE TABLE {staging_table}"))
            df.to_sql(staging_table, conn, if_exists='append', index=False)
            
            # Atomic swap
            result = conn.execute(text(f"""
                INSERT INTO {target_table}
                SELECT * FROM {staging_table}
                ON CONFLICT (id) DO UPDATE SET
                    email = EXCLUDED.email,
                    revenue = EXCLUDED.revenue,
                    updated_at = NOW()
            """))
            
            return result.rowcount

The staging table pattern is worth noting—it lets you validate the entire batch before committing and provides an atomic swap mechanism.

Building a Complete Pipeline

Individual extraction, transformation, and loading scripts are useful, but production pipelines need orchestration. Airflow remains the industry standard, though Prefect and Dagster offer more modern APIs.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
}

def extract_customers(**context):
    extractor = DataExtractor(API_URL, DB_CONNECTION)
    execution_date = context['execution_date']
    since = execution_date - timedelta(days=1)
    
    records = list(extractor.extract_from_api('customers', since=since))
    # Push to XCom for next task
    context['ti'].xcom_push(key='customer_records', value=records)
    return len(records)

def transform_customers(**context):
    records = context['ti'].xcom_pull(key='customer_records')
    df = pd.DataFrame(records)
    
    transformer = DataTransformer()
    transformer.add_transformation(clean_customer_data)
    transformer.add_transformation(add_derived_fields)
    
    clean_df = transformer.transform(df)
    context['ti'].xcom_push(key='clean_customers', value=clean_df.to_dict('records'))
    return len(clean_df)

def load_customers(**context):
    records = context['ti'].xcom_pull(key='clean_customers')
    df = pd.DataFrame(records)
    
    loader = DataLoader(WAREHOUSE_CONNECTION)
    rows = loader.upsert_batch(df, 'dim_customers', ['email'], ['revenue', 'country'])
    return rows

with DAG(
    'customer_etl',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=days_ago(1),
    catchup=False,
) as dag:
    
    extract = PythonOperator(task_id='extract', python_callable=extract_customers)
    transform = PythonOperator(task_id='transform', python_callable=transform_customers)
    load = PythonOperator(task_id='load', python_callable=load_customers)
    
    extract >> transform >> load

Monitoring and Observability

Pipelines fail silently without proper monitoring. Track three categories: operational metrics (did it run?), performance metrics (how long?), and data quality metrics (is the output correct?).

import structlog
import time
from dataclasses import dataclass
from typing import Optional

logger = structlog.get_logger()

@dataclass
class PipelineMetrics:
    records_extracted: int = 0
    records_transformed: int = 0
    records_loaded: int = 0
    records_failed: int = 0
    duration_seconds: float = 0
    
def run_with_metrics(pipeline_name: str, func, *args, **kwargs):
    start = time.time()
    
    try:
        result = func(*args, **kwargs)
        duration = time.time() - start
        
        logger.info(
            "pipeline_step_completed",
            pipeline=pipeline_name,
            duration_seconds=duration,
            result=result
        )
        return result
        
    except Exception as e:
        logger.error(
            "pipeline_step_failed",
            pipeline=pipeline_name,
            error=str(e),
            duration_seconds=time.time() - start
        )
        raise

Scaling ETL in Production

Start simple. A Python script running on a single machine handles more data than most teams realize—gigabytes per hour is achievable with proper batching. Only reach for Spark or distributed systems when you’re processing data that doesn’t fit in memory.

When you do need to scale, partition your data by time or key ranges and process partitions in parallel. dbt has become the standard for SQL-based transformations in the warehouse, letting you define transforms as SELECT statements while handling dependency resolution automatically.

Cost optimization matters at scale. Process data during off-peak hours, compress intermediate storage, and delete staging data aggressively. The biggest cost savings usually come from running less frequently—does that pipeline really need to run every hour, or would daily suffice?

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.