Incremental Data Processing with Spark

Every data engineer has inherited that job. The one that reads the entire customer table—all 500 million rows—just to process yesterday's 50,000 new records. It runs for six hours, costs a small...

Key Insights

  • Incremental processing reduces compute costs by 60-90% compared to full reprocessing, but requires intentional design around partitioning, checkpointing, and change tracking mechanisms.
  • Delta Lake’s Change Data Feed eliminates the need for custom CDC logic, providing row-level change tracking with minimal overhead when enabled proactively.
  • The choice between timestamp filtering, CDC, and streaming depends on your data mutation patterns—append-only workloads need different strategies than tables with frequent updates.

The Problem with Full Data Reprocessing

Every data engineer has inherited that job. The one that reads the entire customer table—all 500 million rows—just to process yesterday’s 50,000 new records. It runs for six hours, costs a small fortune in cluster compute, and fails unpredictably when upstream systems hiccup.

Full reprocessing made sense when datasets fit in memory and cloud compute bills weren’t a line item executives questioned. Those days are gone. Modern data platforms handle terabytes daily, and reprocessing everything on each run creates compounding problems: longer job durations, higher costs, wider blast radius when failures occur, and SLA violations that erode trust in your data platform.

Incremental processing solves this by identifying and processing only what changed since the last run. The concept is simple. The implementation requires careful thought about how changes are detected, how state is managed, and how failures are recovered. Spark provides multiple mechanisms for incremental processing, each suited to different use cases.

Incremental Processing Patterns Overview

Three primary patterns dominate incremental data processing: timestamp-based filtering, change data capture (CDC), and streaming with watermarks. Understanding when to apply each pattern prevents over-engineering simple problems and under-engineering complex ones.

Timestamp-based filtering works for append-only or slowly changing data where records have reliable created/modified timestamps. It’s the simplest approach but breaks down when records are updated without timestamp changes or when clock skew exists across source systems.

Change data capture tracks row-level mutations directly, typically by reading database transaction logs. It handles updates and deletes gracefully but requires infrastructure support (Debezium, AWS DMS, or native database CDC features).

Streaming with watermarks processes data continuously with built-in handling for late-arriving records. It’s ideal for event-driven architectures but introduces complexity around state management and exactly-once semantics.

Here’s the cost difference in practice:

# Full table scan - processes all historical data every run
df_full = spark.read.parquet("s3://data-lake/events/")
df_filtered = df_full.filter(col("event_type") == "purchase")
# Scans: ~500GB, Runtime: ~45 minutes

# Timestamp-based incremental - processes only recent data
last_processed = get_checkpoint_timestamp()  # e.g., "2024-01-14 00:00:00"

df_incremental = (
    spark.read.parquet("s3://data-lake/events/")
    .filter(col("event_timestamp") >= last_processed)
    .filter(col("event_type") == "purchase")
)
# Scans: ~5GB, Runtime: ~3 minutes

The incremental version requires maintaining checkpoint state externally, but the 90% reduction in data scanned justifies the added complexity for any production workload.

Structured Streaming for Continuous Incremental Loads

Spark Structured Streaming treats incremental processing as a first-class concept. Rather than you managing checkpoints manually, the streaming engine tracks exactly which records have been processed and resumes from that point after restarts.

The key abstraction is the streaming query, which continuously monitors a source for new data and processes it in micro-batches (or continuously, for lower latency). Checkpoints persist to durable storage, enabling exactly-once processing semantics even across cluster failures.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, TimestampType, DoubleType

spark = SparkSession.builder.appName("IncrementalOrders").getOrCreate()

# Define schema for incoming events
order_schema = StructType() \
    .add("order_id", StringType()) \
    .add("customer_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("event_time", TimestampType())

# Read from Kafka with automatic offset tracking
orders_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "orders")
    .option("startingOffsets", "earliest")  # Only applies on first run
    .load()
    .select(from_json(col("value").cast("string"), order_schema).alias("data"))
    .select("data.*")
)

# Process and write with checkpointing
query = (
    orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://checkpoints/orders-ingestion/")
    .trigger(processingTime="5 minutes")  # Micro-batch every 5 minutes
    .start("s3://data-lake/bronze/orders/")
)

query.awaitTermination()

The checkpointLocation is critical. It stores Kafka offsets, intermediate state, and metadata required for recovery. Without it, your job restarts from startingOffsets on every failure, potentially reprocessing days of data or missing records entirely.

Trigger modes control the latency-throughput tradeoff. processingTime batches records over a specified interval, while availableNow processes all available data then terminates—useful for scheduled batch jobs that want streaming’s checkpoint management without continuous execution.

Delta Lake and Change Data Feed

Delta Lake’s Change Data Feed (CDF) transforms incremental processing for tables with updates and deletes. Instead of scanning entire tables to detect changes, CDF records row-level mutations in a separate change log that’s cheap to query.

Enable CDF when creating tables or alter existing ones:

# Enable CDF on new table
spark.sql("""
    CREATE TABLE customers (
        customer_id STRING,
        email STRING,
        subscription_tier STRING,
        updated_at TIMESTAMP
    )
    USING DELTA
    TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Or enable on existing table
spark.sql("""
    ALTER TABLE customers 
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

Once enabled, query changes between versions or timestamps:

# Get all changes since last processing
last_version = get_last_processed_version()  # Stored externally

changes_df = (
    spark.read
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", last_version + 1)
    .table("customers")
)

# CDF adds metadata columns automatically
# _change_type: insert, update_preimage, update_postimage, delete
# _commit_version: Delta version number
# _commit_timestamp: When the change occurred

# Process only actual changes
inserts = changes_df.filter(col("_change_type") == "insert")
updates = changes_df.filter(col("_change_type") == "update_postimage")
deletes = changes_df.filter(col("_change_type") == "delete")

# Update downstream systems accordingly
propagate_changes(inserts, updates, deletes)

# Persist new checkpoint
save_last_processed_version(changes_df.select("_commit_version").agg(max("_commit_version")).collect()[0][0])

CDF adds roughly 5-10% storage overhead but eliminates expensive change detection queries. For tables with frequent updates, this tradeoff pays for itself immediately.

Implementing Merge (Upsert) Operations

Incremental loads often require upsert semantics: insert new records and update existing ones. The naive approach—delete then insert—creates data quality issues and poor performance. Delta Lake’s MERGE operation handles this atomically.

from delta.tables import DeltaTable

# Load incremental data from source
incremental_customers = (
    spark.read.parquet("s3://staging/customers-daily/")
    .filter(col("extract_date") == current_date())
)

# Reference existing Delta table
target_table = DeltaTable.forPath(spark, "s3://data-lake/silver/customers/")

# SCD Type 1: Overwrite with latest values
(
    target_table.alias("target")
    .merge(
        incremental_customers.alias("source"),
        "target.customer_id = source.customer_id"
    )
    .whenMatchedUpdate(set={
        "email": "source.email",
        "subscription_tier": "source.subscription_tier",
        "updated_at": "source.updated_at"
    })
    .whenNotMatchedInsert(values={
        "customer_id": "source.customer_id",
        "email": "source.email",
        "subscription_tier": "source.subscription_tier",
        "updated_at": "source.updated_at"
    })
    .execute()
)

MERGE operations benefit significantly from proper partitioning and Z-ordering on the join keys. Without these optimizations, Spark must scan the entire target table to find matching records, negating most incremental processing benefits.

Partitioning Strategies for Incremental Efficiency

Partition design determines whether your incremental queries scan megabytes or terabytes. Align partitions with your incremental load patterns—typically by date for time-series data.

# Write with date partitioning aligned to incremental loads
(
    processed_events
    .write
    .format("delta")
    .partitionBy("event_date")
    .mode("append")
    .save("s3://data-lake/silver/events/")
)

# Incremental query with partition pruning
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

# Spark eliminates partitions not matching the filter before scanning
recent_events = (
    spark.read.format("delta")
    .load("s3://data-lake/silver/events/")
    .filter(col("event_date") >= yesterday)
)

# Verify partition pruning in query plan
recent_events.explain(True)
# Look for "PartitionFilters" in the physical plan

Over-partitioning creates small file problems; under-partitioning defeats the purpose. Target partition sizes between 128MB and 1GB. For high-cardinality scenarios, combine coarse partitions (month) with Z-ordering on frequently filtered columns.

Monitoring and Failure Recovery

Incremental processing introduces failure modes that full reprocessing avoids. A corrupted checkpoint, missed offset, or partial write can cause data loss or duplication. Design for recovery from the start.

# Idempotent write configuration for batch incremental loads
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

def run_incremental_load(checkpoint_path: str, target_path: str):
    # Read checkpoint or initialize
    try:
        checkpoint = spark.read.json(checkpoint_path).collect()[0]
        last_timestamp = checkpoint["last_processed_timestamp"]
    except:
        last_timestamp = "1970-01-01 00:00:00"
    
    # Process incremental data
    incremental_data = extract_since(last_timestamp)
    
    if incremental_data.isEmpty():
        return  # Nothing to process
    
    max_timestamp = incremental_data.agg(max("event_timestamp")).collect()[0][0]
    
    # Write with idempotent semantics
    # Dynamic partition overwrite replaces only affected partitions
    (
        incremental_data
        .write
        .format("delta")
        .partitionBy("event_date")
        .mode("overwrite")
        .save(target_path)
    )
    
    # Update checkpoint only after successful write
    checkpoint_df = spark.createDataFrame([{
        "last_processed_timestamp": str(max_timestamp),
        "updated_at": str(datetime.now())
    }])
    checkpoint_df.write.mode("overwrite").json(checkpoint_path)

For streaming workloads, Structured Streaming handles checkpointing automatically, but monitor checkpoint sizes. Unbounded state in stateful operations (aggregations, joins) causes checkpoint bloat and eventual failures. Configure state timeouts explicitly:

from pyspark.sql.functions import window

# Windowed aggregation with state cleanup
windowed_counts = (
    orders_stream
    .withWatermark("event_time", "1 hour")  # Late data tolerance
    .groupBy(
        window("event_time", "10 minutes"),
        "customer_id"
    )
    .count()
)

The watermark tells Spark it can discard state for windows older than one hour, preventing unbounded memory growth.

Incremental processing isn’t optional for modern data platforms—it’s a requirement. Start with the simplest pattern that meets your requirements, instrument thoroughly, and evolve toward more sophisticated approaches as data volumes and latency requirements demand.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.