Slowly Changing Dimensions (SCD) with Spark
Slowly Changing Dimensions (SCDs) are a fundamental pattern in data warehousing that addresses a simple but critical question: what happens when your reference data changes over time?
Key Insights
- SCD Type 2 is the most common pattern for production data warehouses, but its complexity requires careful handling of effective dates, end dates, and current flags to avoid gaps or overlaps in your dimension history.
- Delta Lake’s MERGE operation simplifies SCD implementations significantly, but you still need to structure your merge conditions correctly—especially the combination of matched updates and conditional inserts for Type 2.
- Performance optimization matters more than you think: partition your dimension tables by a meaningful key, use broadcast joins for smaller dimensions, and always process incrementally rather than scanning full tables.
Introduction to Slowly Changing Dimensions
Slowly Changing Dimensions (SCDs) are a fundamental pattern in data warehousing that addresses a simple but critical question: what happens when your reference data changes over time?
Consider a customer dimension table. A customer moves to a new city. Do you overwrite their old address? Keep both versions? Track when the change happened? The answer depends on your analytical requirements, and SCDs provide the framework for making that decision systematically.
SCDs matter because business analysis often requires historical context. When your sales team asks “what was our revenue by customer region last quarter?”, they typically want the regions as they existed then, not as they exist now. Without proper SCD handling, you’ll report inaccurate historical data.
The three primary SCD types are:
- Type 1: Overwrite the old value. No history preserved.
- Type 2: Add a new row with versioning metadata. Full history preserved.
- Type 3: Add a new column for the previous value. Limited history (usually just one prior version).
Let’s implement each with Spark.
SCD Types Deep Dive
Before writing code, you need to understand when each type applies.
Type 1 is appropriate when historical accuracy doesn’t matter or when you’re correcting data errors. If a customer’s name was misspelled, you don’t need to preserve the typo.
Type 2 is the workhorse of production data warehouses. Use it when you need accurate historical reporting. Customer addresses, product prices, employee departments—anything where the old value matters for past transactions.
Type 3 is rarely used in practice. It’s a compromise that adds complexity without providing full history. I generally recommend avoiding it unless you have a specific requirement for exactly one prior value.
Here are the schema definitions for each type:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, BooleanType
# SCD Type 1: Simple current state
scd_type1_schema = StructType([
StructField("customer_id", IntegerType(), False),
StructField("customer_name", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("updated_at", DateType(), True)
])
# SCD Type 2: Full history with versioning
scd_type2_schema = StructType([
StructField("customer_sk", IntegerType(), False), # Surrogate key
StructField("customer_id", IntegerType(), False), # Natural/business key
StructField("customer_name", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("effective_date", DateType(), False),
StructField("end_date", DateType(), True),
StructField("is_current", BooleanType(), False)
])
# SCD Type 3: Previous value column
scd_type3_schema = StructType([
StructField("customer_id", IntegerType(), False),
StructField("customer_name", StringType(), True),
StructField("current_city", StringType(), True),
StructField("previous_city", StringType(), True),
StructField("city_changed_date", DateType(), True)
])
Implementing SCD Type 1 with Spark
Type 1 is straightforward: merge incoming records with existing ones, overwriting any matches.
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date
spark = SparkSession.builder \
.appName("SCD Type 1") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
def apply_scd_type1(spark, target_path: str, source_df):
"""
Apply SCD Type 1 logic: upsert with overwrite.
"""
source_with_timestamp = source_df.withColumn("updated_at", current_date())
if DeltaTable.isDeltaTable(spark, target_path):
target_table = DeltaTable.forPath(spark, target_path)
target_table.alias("target").merge(
source_with_timestamp.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
# First load: write directly
source_with_timestamp.write.format("delta").save(target_path)
# Usage
incoming_customers = spark.createDataFrame([
(1, "John Smith", "Seattle", "WA"),
(2, "Jane Doe", "Portland", "OR"),
], ["customer_id", "customer_name", "city", "state"])
apply_scd_type1(spark, "/data/dim_customer", incoming_customers)
The whenMatchedUpdateAll() handles existing records, while whenNotMatchedInsertAll() handles new ones. Simple and effective.
Implementing SCD Type 2 with Spark
Type 2 is where things get interesting. You need to handle three scenarios:
- New records: Insert with
is_current = Trueandend_date = NULL - Changed records: Close out the old version (set
end_dateandis_current = False), then insert a new version - Unchanged records: Do nothing
Here’s a complete implementation:
from pyspark.sql.functions import col, lit, current_date, coalesce, when, max as spark_max
from delta.tables import DeltaTable
def apply_scd_type2(spark, target_path: str, source_df, key_columns: list, tracked_columns: list):
"""
Apply SCD Type 2 logic with full history tracking.
Args:
target_path: Path to Delta table
source_df: Incoming data
key_columns: Business key columns (e.g., ["customer_id"])
tracked_columns: Columns to track for changes
"""
process_date = current_date()
# Prepare source with SCD metadata
staged_source = source_df \
.withColumn("effective_date", process_date) \
.withColumn("end_date", lit(None).cast("date")) \
.withColumn("is_current", lit(True))
if not DeltaTable.isDeltaTable(spark, target_path):
# Generate surrogate keys for initial load
staged_source = staged_source.withColumn(
"customer_sk",
monotonically_increasing_id()
)
staged_source.write.format("delta").save(target_path)
return
target_table = DeltaTable.forPath(spark, target_path)
target_df = spark.read.format("delta").load(target_path)
# Build the join condition on business keys
key_condition = " AND ".join([
f"target.{k} = source.{k}" for k in key_columns
])
# Build change detection condition
change_condition = " OR ".join([
f"target.{c} <> source.{c} OR (target.{c} IS NULL AND source.{c} IS NOT NULL) OR (target.{c} IS NOT NULL AND source.{c} IS NULL)"
for c in tracked_columns
])
# Get max surrogate key for new key generation
max_sk = target_df.agg(spark_max("customer_sk")).collect()[0][0] or 0
# Identify records that have changed
current_target = target_df.filter(col("is_current") == True)
changed_records = staged_source.alias("source").join(
current_target.alias("target"),
[col(f"source.{k}") == col(f"target.{k}") for k in key_columns],
"inner"
).filter(change_condition).select("source.*")
new_records = staged_source.alias("source").join(
current_target.alias("target"),
[col(f"source.{k}") == col(f"target.{k}") for k in key_columns],
"left_anti"
)
# Combine new and changed records, assign surrogate keys
records_to_insert = changed_records.union(new_records)
records_to_insert = records_to_insert.withColumn(
"customer_sk",
monotonically_increasing_id() + lit(max_sk + 1)
)
# Execute the merge
target_table.alias("target").merge(
staged_source.alias("source"),
f"{key_condition} AND target.is_current = true"
).whenMatchedUpdate(
condition=change_condition,
set={
"end_date": process_date,
"is_current": lit(False)
}
).execute()
# Insert new versions
if records_to_insert.count() > 0:
records_to_insert.write.format("delta").mode("append").save(target_path)
# Usage
from pyspark.sql.functions import monotonically_increasing_id
apply_scd_type2(
spark,
"/data/dim_customer_scd2",
incoming_customers,
key_columns=["customer_id"],
tracked_columns=["customer_name", "city", "state"]
)
Performance Optimization Strategies
SCD operations can become bottlenecks as your dimension tables grow. Here are the optimizations that actually matter:
Partition by a date-based column if your dimensions are large. For Type 2, partitioning by effective_date or a derived year-month column enables partition pruning during merges.
# Optimized write with partitioning
staged_source.write \
.format("delta") \
.partitionBy("effective_year_month") \
.option("optimizeWrite", "true") \
.save(target_path)
Use broadcast joins for dimension tables under 100MB. This avoids expensive shuffle operations:
from pyspark.sql.functions import broadcast
# Broadcast the smaller dimension table
result = fact_table.join(
broadcast(dim_customer.filter(col("is_current") == True)),
"customer_id"
)
Process incrementally. Never scan your entire dimension table when you can filter to relevant partitions:
# Only process records that could have changed
recent_changes = source_df.filter(
col("last_modified_date") >= date_sub(current_date(), 7)
)
Testing and Data Quality
SCD Type 2 tables have specific invariants that must hold:
- No gaps in date ranges for a given business key
- No overlapping date ranges
- Exactly one current record per business key
Here’s how to validate these:
def validate_scd_type2(spark, table_path: str, key_columns: list):
"""Validate SCD Type 2 data quality invariants."""
df = spark.read.format("delta").load(table_path)
# Check 1: Exactly one current record per business key
current_counts = df.filter(col("is_current") == True) \
.groupBy(key_columns) \
.count() \
.filter(col("count") != 1)
if current_counts.count() > 0:
raise AssertionError(f"Found {current_counts.count()} keys with != 1 current record")
# Check 2: No overlapping date ranges
from pyspark.sql.window import Window
window = Window.partitionBy(key_columns).orderBy("effective_date")
overlaps = df.withColumn("prev_end_date", lag("end_date").over(window)) \
.filter(col("effective_date") < col("prev_end_date"))
if overlaps.count() > 0:
raise AssertionError(f"Found {overlaps.count()} overlapping date ranges")
print("All SCD Type 2 validations passed")
validate_scd_type2(spark, "/data/dim_customer_scd2", ["customer_id"])
Production Considerations
Scheduling: Run SCD jobs after your source systems complete their daily processing. Use Airflow or Dagster sensors to wait for upstream data availability rather than fixed schedules.
Late-arriving dimensions: When dimension changes arrive after related facts, you have two options: reprocess affected facts (expensive but accurate) or accept some historical inaccuracy. Most organizations choose the latter for practical reasons.
Monitoring: Track these metrics for your SCD jobs:
- Number of new records inserted
- Number of records closed out (Type 2)
- Processing time trends
- Data quality check results
Set up alerts when these metrics deviate significantly from historical baselines—it usually indicates either a source data issue or a bug in your transformation logic.
SCDs aren’t glamorous, but they’re essential infrastructure for any serious analytics platform. Get them right once, and your downstream consumers will thank you every time they need accurate historical reporting.