Change Data Capture (CDC) with Spark
Change Data Capture tracks and propagates data modifications from source systems in near real-time. Instead of periodic batch extracts that miss intermediate states, CDC captures every insert,...
Key Insights
- Log-based CDC with Debezium provides the most reliable change capture mechanism for Spark pipelines, avoiding the performance overhead and consistency issues of query-based approaches.
- Delta Lake’s MERGE INTO operation transforms complex CDC processing into a single atomic statement that handles inserts, updates, and deletes while maintaining ACID guarantees.
- Production CDC pipelines require careful attention to event ordering, schema evolution, and idempotency—problems that compound quickly at scale if not addressed early.
Introduction to CDC
Change Data Capture tracks and propagates data modifications from source systems in near real-time. Instead of periodic batch extracts that miss intermediate states, CDC captures every insert, update, and delete as it happens.
Three primary use cases drive CDC adoption. First, data synchronization keeps operational databases in sync with analytical systems without expensive full table scans. Second, audit trails maintain complete modification history for compliance and debugging. Third, event-driven architectures react to data changes immediately, enabling real-time analytics and downstream processing.
Spark enters this picture as the processing engine that consumes, transforms, and materializes CDC events at scale. With Structured Streaming, Spark handles continuous CDC ingestion while maintaining exactly-once semantics and integration with the broader data lakehouse ecosystem.
CDC Patterns and Approaches
Three fundamental approaches exist for capturing database changes, each with distinct tradeoffs.
Query-based CDC periodically scans source tables for modifications using timestamp columns or checksums. It’s simple to implement but misses intermediate changes between scans, creates load on source systems, and can’t reliably detect deletes without soft-delete patterns.
Trigger-based CDC uses database triggers to write changes to shadow tables. This captures all operations but adds overhead to every write transaction, couples application logic to capture logic, and complicates schema evolution.
Log-based CDC reads the database’s transaction log directly. Every committed change appears in the log, making capture complete and non-invasive. The source database experiences minimal overhead since the log already exists for replication and recovery.
For Spark workloads, log-based CDC with Debezium is the clear winner. Debezium connectors read transaction logs from PostgreSQL, MySQL, MongoDB, and other databases, then publish standardized change events to Kafka. This decouples capture from processing and provides a durable event stream that Spark can consume at its own pace.
A typical Debezium CDC record contains the operation context and both before and after states:
{
"before": {
"id": 1001,
"name": "Alice",
"email": "alice@old.com",
"updated_at": 1699900000000
},
"after": {
"id": 1001,
"name": "Alice",
"email": "alice@new.com",
"updated_at": 1699900500000
},
"source": {
"version": "2.4.0",
"connector": "postgresql",
"ts_ms": 1699900500123,
"db": "customers",
"table": "users",
"txId": 58923,
"lsn": 234881024
},
"op": "u",
"ts_ms": 1699900500456
}
The op field indicates operation type: c for create, u for update, d for delete, and r for read (initial snapshot). The before state is null for inserts; after is null for deletes.
Reading CDC Data into Spark
Spark Structured Streaming connects to Kafka topics containing CDC events through the Kafka source connector. The key configuration decisions involve schema handling and deserialization format.
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import (
StructType, StructField, StringType, LongType
)
spark = SparkSession.builder \
.appName("CDC-Processor") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Define schema for the payload (adjust to match your table structure)
payload_schema = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("updated_at", LongType(), True)
])
# Full Debezium envelope schema
debezium_schema = StructType([
StructField("before", payload_schema, True),
StructField("after", payload_schema, True),
StructField("op", StringType(), False),
StructField("ts_ms", LongType(), False)
])
# Read from Kafka
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbserver1.public.users") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 100000) \
.load()
# Parse the Debezium JSON payload
parsed_stream = cdc_stream \
.select(
col("key").cast("string").alias("kafka_key"),
col("offset").alias("kafka_offset"),
col("partition").alias("kafka_partition"),
from_json(col("value").cast("string"), debezium_schema).alias("data")
) \
.select(
"kafka_key",
"kafka_offset",
"kafka_partition",
"data.op",
"data.ts_ms",
"data.before",
"data.after"
)
For production systems using Avro with Schema Registry, replace from_json with the from_avro function from the spark-avro package and configure the schema registry URL. Avro provides better schema evolution support and smaller payload sizes.
Processing CDC Operations
Raw CDC events need transformation before materialization. The processing logic must handle each operation type appropriately, extracting the relevant record state and preparing merge keys.
from pyspark.sql.functions import when, coalesce, lit, current_timestamp
def process_cdc_events(df):
"""
Transform CDC events into merge-ready format.
Extract the relevant record state based on operation type.
"""
return df \
.withColumn(
"record",
when(col("op") == "d", col("before"))
.otherwise(col("after"))
) \
.withColumn("is_delete", col("op") == "d") \
.withColumn(
"merge_key",
coalesce(col("after.id"), col("before.id"))
) \
.withColumn("cdc_timestamp", col("ts_ms")) \
.withColumn("processed_at", current_timestamp()) \
.select(
"merge_key",
"record.id",
"record.name",
"record.email",
"record.updated_at",
"is_delete",
"cdc_timestamp",
"processed_at",
"op"
)
processed_stream = process_cdc_events(parsed_stream)
For complex transformations or business logic that doesn’t fit cleanly into DataFrame operations, custom UDFs provide flexibility:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
@udf(returnType=BooleanType())
def should_propagate_change(op, before_email, after_email):
"""
Example business logic: only propagate changes that
modify the email field, or all inserts/deletes.
"""
if op in ('c', 'd', 'r'):
return True
return before_email != after_email
filtered_stream = processed_stream.filter(
should_propagate_change(col("op"), col("before.email"), col("after.email"))
)
Delta Lake Integration for CDC
Delta Lake transforms CDC processing with its MERGE INTO operation. Instead of manually tracking which records to insert, update, or delete, a single merge statement handles all cases atomically.
from delta.tables import DeltaTable
def upsert_to_delta(batch_df, batch_id):
"""
Merge CDC batch into Delta table.
Handles inserts, updates, and deletes in one atomic operation.
"""
if batch_df.isEmpty():
return
target_path = "/data/lakehouse/users"
# Ensure target table exists
if not DeltaTable.isDeltaTable(spark, target_path):
batch_df.filter(~col("is_delete")).write \
.format("delta") \
.mode("overwrite") \
.save(target_path)
return
target_table = DeltaTable.forPath(spark, target_path)
# Deduplicate within batch - keep latest event per key
deduped_batch = batch_df \
.withColumn(
"row_num",
row_number().over(
Window.partitionBy("merge_key")
.orderBy(col("cdc_timestamp").desc())
)
) \
.filter(col("row_num") == 1) \
.drop("row_num")
target_table.alias("target").merge(
deduped_batch.alias("source"),
"target.id = source.merge_key"
).whenMatchedDelete(
condition="source.is_delete = true"
).whenMatchedUpdate(
condition="source.is_delete = false",
set={
"name": "source.name",
"email": "source.email",
"updated_at": "source.updated_at",
"cdc_timestamp": "source.cdc_timestamp",
"processed_at": "source.processed_at"
}
).whenNotMatchedInsert(
condition="source.is_delete = false",
values={
"id": "source.id",
"name": "source.name",
"email": "source.email",
"updated_at": "source.updated_at",
"cdc_timestamp": "source.cdc_timestamp",
"processed_at": "source.processed_at"
}
).execute()
# Execute streaming merge
query = processed_stream.writeStream \
.foreachBatch(upsert_to_delta) \
.outputMode("update") \
.option("checkpointLocation", "/checkpoints/users-cdc") \
.trigger(processingTime="30 seconds") \
.start()
Delta’s time travel capability provides automatic audit trails. Query historical states with VERSION AS OF or TIMESTAMP AS OF syntax without maintaining separate audit tables.
Performance and Operational Considerations
Partitioning the target Delta table by a low-cardinality column (date, region) reduces merge operation scope. Avoid partitioning by the merge key itself—it defeats the purpose.
Watermarking handles late-arriving events in streaming contexts:
watermarked_stream = parsed_stream \
.withWatermark("event_time", "10 minutes")
Checkpointing is non-negotiable. Structured Streaming checkpoints track Kafka offsets and processing state. Place checkpoints on reliable storage (S3, HDFS) and never delete them while the pipeline runs.
Monitoring CDC lag requires tracking the delta between event time (when the change occurred in the source) and processing time (when Spark materialized it). Alert when lag exceeds acceptable thresholds for your SLA.
Configure maxOffsetsPerTrigger to control batch sizes and prevent memory pressure during catch-up scenarios after pipeline restarts.
Production Patterns and Pitfalls
Schema evolution is inevitable. Debezium propagates schema changes, but your Spark pipeline must handle them gracefully. Use schema registry with compatibility checks, and design your processing logic to tolerate missing or additional fields.
Out-of-order events occur when Kafka partitions don’t align with your ordering requirements. The deduplication pattern in the merge example handles this by keeping only the latest event per key within each batch. For strict ordering across batches, track the last processed timestamp per key and filter stale events.
Idempotency matters because failures happen. Design your merge logic so reprocessing the same events produces identical results. The merge key and timestamp-based deduplication approach achieves this naturally.
Scaling CDC pipelines requires attention to both Kafka consumer parallelism and Spark executor configuration. Match Kafka partition count to your parallelism requirements, and size executors to handle your batch volumes without excessive garbage collection.
Alternatives exist for specific use cases. Apache Flink CDC provides lower latency for sub-second requirements. Native database replication (PostgreSQL logical replication, MySQL binlog replication) works when you don’t need the transformation capabilities Spark provides. But for data lakehouse architectures where CDC feeds into Delta Lake alongside other data sources, Spark remains the practical choice.