Spark Streaming - Watermarking for Late Data

• Watermarks define how long Spark Streaming waits for late-arriving data before finalizing aggregations, balancing between data completeness and processing latency

Key Insights

• Watermarks define how long Spark Streaming waits for late-arriving data before finalizing aggregations, balancing between data completeness and processing latency • Event time processing with watermarks prevents unbounded state growth by allowing Spark to discard old aggregation state that’s no longer needed • Proper watermark configuration requires understanding your data’s lateness patterns—set it too low and you’ll drop valid data, too high and you’ll consume excessive memory

Understanding the Late Data Problem

Streaming data rarely arrives in perfect chronological order. Network delays, system failures, and distributed processing create scenarios where events generated at time T arrive after events generated at T+10. Without proper handling, these late arrivals corrupt aggregation results or force you to maintain infinite state.

Spark Structured Streaming uses watermarks to address this challenge. A watermark represents a moving threshold that tracks “how late is too late.” When Spark processes a batch with maximum event time T, it sets the watermark to T minus the allowed lateness duration. Any data with event time below this watermark gets dropped from stateful operations.

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, count, avg

spark = SparkSession.builder \
    .appName("WatermarkExample") \
    .getOrCreate()

# Schema: timestamp, sensor_id, temperature
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor-data") \
    .load()

parsed_df = stream_df.selectExpr(
    "CAST(value AS STRING) as json"
).selectExpr(
    "get_json_object(json, '$.timestamp') as event_time",
    "get_json_object(json, '$.sensor_id') as sensor_id",
    "CAST(get_json_object(json, '$.temperature') as DOUBLE) as temperature"
).withColumn("event_time", col("event_time").cast("timestamp"))

Implementing Basic Watermarking

The withWatermark() method configures how long Spark waits for late data. This must be applied directly to the timestamp column before any time-based aggregations.

windowed_aggregation = parsed_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("sensor_id")
    ) \
    .agg(
        avg("temperature").alias("avg_temp"),
        count("*").alias("reading_count")
    )

query = windowed_aggregation.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

In this example, Spark maintains state for windows up to 10 minutes behind the latest event time seen. If the stream processes an event with timestamp 2024-01-15 10:30:00, the watermark moves to 10:20:00. Any subsequent events with timestamps before 10:20:00 are ignored for aggregations.

Watermark Behavior with Different Output Modes

Output modes interact critically with watermarks. The behavior changes significantly between update, append, and complete modes.

# Append mode - only outputs finalized windows
append_query = windowed_aggregation.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/sensor-aggregates") \
    .option("checkpointLocation", "/checkpoints/sensor-append") \
    .start()

# Update mode - outputs changed rows
update_query = windowed_aggregation.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("sensor_updates") \
    .option("checkpointLocation", "/checkpoints/sensor-update") \
    .start()

# Complete mode - outputs entire result table (no watermark benefit)
complete_query = windowed_aggregation.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "/checkpoints/sensor-complete") \
    .start()

Append mode only emits windows after the watermark passes their end time, guaranteeing no further updates. This provides exactly-once semantics but introduces latency equal to your watermark duration.

Update mode emits rows as they change, providing lower latency but requiring downstream systems to handle updates. Windows continue updating until the watermark expires them.

Complete mode ignores watermarks entirely, outputting the full result table each trigger. Use this sparingly—it doesn’t clean up state.

Multiple Aggregations with Watermarks

Complex streaming applications often require multiple aggregation levels. Watermarks propagate through transformations, but you must understand the implications.

# First level aggregation - 5 minute windows
sensor_windows = parsed_df \
    .withWatermark("event_time", "15 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes", "5 minutes"),
        col("sensor_id")
    ) \
    .agg(
        avg("temperature").alias("avg_temp"),
        count("*").alias("count")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("sensor_id"),
        col("avg_temp"),
        col("count")
    )

# Second level aggregation - hourly rollup
hourly_aggregation = sensor_windows \
    .groupBy(
        window(col("window_start"), "1 hour"),
        col("sensor_id")
    ) \
    .agg(
        avg("avg_temp").alias("hourly_avg_temp"),
        sum("count").alias("total_readings")
    )

hourly_query = hourly_aggregation.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("path", "/data/hourly-sensor-data") \
    .option("checkpointLocation", "/checkpoints/hourly") \
    .start()

The watermark from the first aggregation carries forward. However, the second aggregation creates larger windows, effectively extending the total lateness tolerance. Data arriving within 15 minutes of generation can still affect 5-minute windows, but those windows must complete before rolling into hourly aggregates.

Joining Streams with Watermarks

Stream-stream joins require watermarks on both sides to prevent unbounded state growth. Spark needs to know when it’s safe to discard buffered rows that will never find a match.

# Stream 1: Click events
clicks_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clicks") \
    .load() \
    .selectExpr(
        "CAST(value AS STRING) as json"
    ).selectExpr(
        "get_json_object(json, '$.timestamp') as click_time",
        "get_json_object(json, '$.user_id') as user_id",
        "get_json_object(json, '$.page') as page"
    ).withColumn("click_time", col("click_time").cast("timestamp"))

# Stream 2: Impression events
impressions_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "impressions") \
    .load() \
    .selectExpr(
        "CAST(value AS STRING) as json"
    ).selectExpr(
        "get_json_object(json, '$.timestamp') as impression_time",
        "get_json_object(json, '$.user_id') as user_id",
        "get_json_object(json, '$.ad_id') as ad_id"
    ).withColumn("impression_time", col("impression_time").cast("timestamp"))

# Apply watermarks to both streams
clicks_with_watermark = clicks_df \
    .withWatermark("click_time", "2 hours")

impressions_with_watermark = impressions_df \
    .withWatermark("impression_time", "3 hours")

# Join with time bounds
joined_df = clicks_with_watermark.join(
    impressions_with_watermark,
    expr("""
        user_id = user_id AND
        click_time >= impression_time AND
        click_time <= impression_time + interval 1 hour
    """)
)

join_query = joined_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "/checkpoints/join") \
    .start()

Spark uses the minimum watermark (2 hours in this case) to clean up state. The join condition’s time bounds must align with watermark settings—requiring clicks within 1 hour of impressions while allowing 2-3 hours of lateness provides reasonable coverage.

Monitoring Watermark Progress

Understanding watermark advancement is crucial for debugging late data issues and optimizing configurations.

from pyspark.sql.streaming import StreamingQueryListener

class WatermarkListener(StreamingQueryListener):
    def onQueryProgress(self, event):
        progress = event.progress
        print(f"Batch: {progress.batchId}")
        print(f"Watermark: {progress.eventTime.get('watermark', 'N/A')}")
        print(f"Max Event Time: {progress.eventTime.get('max', 'N/A')}")
        print(f"Min Event Time: {progress.eventTime.get('min', 'N/A')}")
        print(f"Avg Event Time: {progress.eventTime.get('avg', 'N/A')}")
        
    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")
        
    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")

spark.streams.addListener(WatermarkListener())

Monitor the gap between maximum event time and watermark. A growing gap indicates increasing lateness or processing lag. A stable gap suggests your watermark configuration matches your data’s lateness characteristics.

Choosing the Right Watermark Delay

Setting watermark delays requires balancing data completeness against resource consumption and latency. Analyze your data’s lateness distribution:

# Batch analysis to determine lateness patterns
batch_df = spark.read.parquet("/historical/sensor-data")

lateness_analysis = batch_df \
    .withColumn("processing_time", col("_processing_time")) \
    .withColumn("lateness_seconds", 
                (col("processing_time").cast("long") - 
                 col("event_time").cast("long"))) \
    .groupBy("sensor_id") \
    .agg(
        expr("percentile(lateness_seconds, 0.95)").alias("p95_lateness"),
        expr("percentile(lateness_seconds, 0.99)").alias("p99_lateness"),
        expr("max(lateness_seconds)").alias("max_lateness")
    )

lateness_analysis.show()

Set your watermark at the 95th or 99th percentile of observed lateness, not the maximum. Extreme outliers often represent data quality issues rather than legitimate late arrivals. A 95th percentile approach drops 5% of late data while preventing unbounded state growth from pathological cases.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.