Slowly Changing Dimensions (SCD) with Spark

Slowly Changing Dimensions (SCDs) are a fundamental pattern in data warehousing that addresses a simple but critical question: what happens when your reference data changes over time?

Key Insights

  • SCD Type 2 is the most common pattern for production data warehouses, but its complexity requires careful handling of effective dates, end dates, and current flags to avoid gaps or overlaps in your dimension history.
  • Delta Lake’s MERGE operation simplifies SCD implementations significantly, but you still need to structure your merge conditions correctly—especially the combination of matched updates and conditional inserts for Type 2.
  • Performance optimization matters more than you think: partition your dimension tables by a meaningful key, use broadcast joins for smaller dimensions, and always process incrementally rather than scanning full tables.

Introduction to Slowly Changing Dimensions

Slowly Changing Dimensions (SCDs) are a fundamental pattern in data warehousing that addresses a simple but critical question: what happens when your reference data changes over time?

Consider a customer dimension table. A customer moves to a new city. Do you overwrite their old address? Keep both versions? Track when the change happened? The answer depends on your analytical requirements, and SCDs provide the framework for making that decision systematically.

SCDs matter because business analysis often requires historical context. When your sales team asks “what was our revenue by customer region last quarter?”, they typically want the regions as they existed then, not as they exist now. Without proper SCD handling, you’ll report inaccurate historical data.

The three primary SCD types are:

  • Type 1: Overwrite the old value. No history preserved.
  • Type 2: Add a new row with versioning metadata. Full history preserved.
  • Type 3: Add a new column for the previous value. Limited history (usually just one prior version).

Let’s implement each with Spark.

SCD Types Deep Dive

Before writing code, you need to understand when each type applies.

Type 1 is appropriate when historical accuracy doesn’t matter or when you’re correcting data errors. If a customer’s name was misspelled, you don’t need to preserve the typo.

Type 2 is the workhorse of production data warehouses. Use it when you need accurate historical reporting. Customer addresses, product prices, employee departments—anything where the old value matters for past transactions.

Type 3 is rarely used in practice. It’s a compromise that adds complexity without providing full history. I generally recommend avoiding it unless you have a specific requirement for exactly one prior value.

Here are the schema definitions for each type:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, BooleanType

# SCD Type 1: Simple current state
scd_type1_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("customer_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("updated_at", DateType(), True)
])

# SCD Type 2: Full history with versioning
scd_type2_schema = StructType([
    StructField("customer_sk", IntegerType(), False),  # Surrogate key
    StructField("customer_id", IntegerType(), False),  # Natural/business key
    StructField("customer_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("effective_date", DateType(), False),
    StructField("end_date", DateType(), True),
    StructField("is_current", BooleanType(), False)
])

# SCD Type 3: Previous value column
scd_type3_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("customer_name", StringType(), True),
    StructField("current_city", StringType(), True),
    StructField("previous_city", StringType(), True),
    StructField("city_changed_date", DateType(), True)
])

Implementing SCD Type 1 with Spark

Type 1 is straightforward: merge incoming records with existing ones, overwriting any matches.

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date

spark = SparkSession.builder \
    .appName("SCD Type 1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

def apply_scd_type1(spark, target_path: str, source_df):
    """
    Apply SCD Type 1 logic: upsert with overwrite.
    """
    source_with_timestamp = source_df.withColumn("updated_at", current_date())
    
    if DeltaTable.isDeltaTable(spark, target_path):
        target_table = DeltaTable.forPath(spark, target_path)
        
        target_table.alias("target").merge(
            source_with_timestamp.alias("source"),
            "target.customer_id = source.customer_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        # First load: write directly
        source_with_timestamp.write.format("delta").save(target_path)

# Usage
incoming_customers = spark.createDataFrame([
    (1, "John Smith", "Seattle", "WA"),
    (2, "Jane Doe", "Portland", "OR"),
], ["customer_id", "customer_name", "city", "state"])

apply_scd_type1(spark, "/data/dim_customer", incoming_customers)

The whenMatchedUpdateAll() handles existing records, while whenNotMatchedInsertAll() handles new ones. Simple and effective.

Implementing SCD Type 2 with Spark

Type 2 is where things get interesting. You need to handle three scenarios:

  1. New records: Insert with is_current = True and end_date = NULL
  2. Changed records: Close out the old version (set end_date and is_current = False), then insert a new version
  3. Unchanged records: Do nothing

Here’s a complete implementation:

from pyspark.sql.functions import col, lit, current_date, coalesce, when, max as spark_max
from delta.tables import DeltaTable

def apply_scd_type2(spark, target_path: str, source_df, key_columns: list, tracked_columns: list):
    """
    Apply SCD Type 2 logic with full history tracking.
    
    Args:
        target_path: Path to Delta table
        source_df: Incoming data
        key_columns: Business key columns (e.g., ["customer_id"])
        tracked_columns: Columns to track for changes
    """
    process_date = current_date()
    
    # Prepare source with SCD metadata
    staged_source = source_df \
        .withColumn("effective_date", process_date) \
        .withColumn("end_date", lit(None).cast("date")) \
        .withColumn("is_current", lit(True))
    
    if not DeltaTable.isDeltaTable(spark, target_path):
        # Generate surrogate keys for initial load
        staged_source = staged_source.withColumn(
            "customer_sk", 
            monotonically_increasing_id()
        )
        staged_source.write.format("delta").save(target_path)
        return
    
    target_table = DeltaTable.forPath(spark, target_path)
    target_df = spark.read.format("delta").load(target_path)
    
    # Build the join condition on business keys
    key_condition = " AND ".join([
        f"target.{k} = source.{k}" for k in key_columns
    ])
    
    # Build change detection condition
    change_condition = " OR ".join([
        f"target.{c} <> source.{c} OR (target.{c} IS NULL AND source.{c} IS NOT NULL) OR (target.{c} IS NOT NULL AND source.{c} IS NULL)"
        for c in tracked_columns
    ])
    
    # Get max surrogate key for new key generation
    max_sk = target_df.agg(spark_max("customer_sk")).collect()[0][0] or 0
    
    # Identify records that have changed
    current_target = target_df.filter(col("is_current") == True)
    
    changed_records = staged_source.alias("source").join(
        current_target.alias("target"),
        [col(f"source.{k}") == col(f"target.{k}") for k in key_columns],
        "inner"
    ).filter(change_condition).select("source.*")
    
    new_records = staged_source.alias("source").join(
        current_target.alias("target"),
        [col(f"source.{k}") == col(f"target.{k}") for k in key_columns],
        "left_anti"
    )
    
    # Combine new and changed records, assign surrogate keys
    records_to_insert = changed_records.union(new_records)
    records_to_insert = records_to_insert.withColumn(
        "customer_sk",
        monotonically_increasing_id() + lit(max_sk + 1)
    )
    
    # Execute the merge
    target_table.alias("target").merge(
        staged_source.alias("source"),
        f"{key_condition} AND target.is_current = true"
    ).whenMatchedUpdate(
        condition=change_condition,
        set={
            "end_date": process_date,
            "is_current": lit(False)
        }
    ).execute()
    
    # Insert new versions
    if records_to_insert.count() > 0:
        records_to_insert.write.format("delta").mode("append").save(target_path)

# Usage
from pyspark.sql.functions import monotonically_increasing_id

apply_scd_type2(
    spark,
    "/data/dim_customer_scd2",
    incoming_customers,
    key_columns=["customer_id"],
    tracked_columns=["customer_name", "city", "state"]
)

Performance Optimization Strategies

SCD operations can become bottlenecks as your dimension tables grow. Here are the optimizations that actually matter:

Partition by a date-based column if your dimensions are large. For Type 2, partitioning by effective_date or a derived year-month column enables partition pruning during merges.

# Optimized write with partitioning
staged_source.write \
    .format("delta") \
    .partitionBy("effective_year_month") \
    .option("optimizeWrite", "true") \
    .save(target_path)

Use broadcast joins for dimension tables under 100MB. This avoids expensive shuffle operations:

from pyspark.sql.functions import broadcast

# Broadcast the smaller dimension table
result = fact_table.join(
    broadcast(dim_customer.filter(col("is_current") == True)),
    "customer_id"
)

Process incrementally. Never scan your entire dimension table when you can filter to relevant partitions:

# Only process records that could have changed
recent_changes = source_df.filter(
    col("last_modified_date") >= date_sub(current_date(), 7)
)

Testing and Data Quality

SCD Type 2 tables have specific invariants that must hold:

  1. No gaps in date ranges for a given business key
  2. No overlapping date ranges
  3. Exactly one current record per business key

Here’s how to validate these:

def validate_scd_type2(spark, table_path: str, key_columns: list):
    """Validate SCD Type 2 data quality invariants."""
    df = spark.read.format("delta").load(table_path)
    
    # Check 1: Exactly one current record per business key
    current_counts = df.filter(col("is_current") == True) \
        .groupBy(key_columns) \
        .count() \
        .filter(col("count") != 1)
    
    if current_counts.count() > 0:
        raise AssertionError(f"Found {current_counts.count()} keys with != 1 current record")
    
    # Check 2: No overlapping date ranges
    from pyspark.sql.window import Window
    
    window = Window.partitionBy(key_columns).orderBy("effective_date")
    
    overlaps = df.withColumn("prev_end_date", lag("end_date").over(window)) \
        .filter(col("effective_date") < col("prev_end_date"))
    
    if overlaps.count() > 0:
        raise AssertionError(f"Found {overlaps.count()} overlapping date ranges")
    
    print("All SCD Type 2 validations passed")

validate_scd_type2(spark, "/data/dim_customer_scd2", ["customer_id"])

Production Considerations

Scheduling: Run SCD jobs after your source systems complete their daily processing. Use Airflow or Dagster sensors to wait for upstream data availability rather than fixed schedules.

Late-arriving dimensions: When dimension changes arrive after related facts, you have two options: reprocess affected facts (expensive but accurate) or accept some historical inaccuracy. Most organizations choose the latter for practical reasons.

Monitoring: Track these metrics for your SCD jobs:

  • Number of new records inserted
  • Number of records closed out (Type 2)
  • Processing time trends
  • Data quality check results

Set up alerts when these metrics deviate significantly from historical baselines—it usually indicates either a source data issue or a bug in your transformation logic.

SCDs aren’t glamorous, but they’re essential infrastructure for any serious analytics platform. Get them right once, and your downstream consumers will thank you every time they need accurate historical reporting.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.