PySpark - Streaming Watermark and Late Data

Watermarks solve a fundamental problem in stream processing: when can you safely finalize an aggregation? In batch processing, you know when all data has arrived. In streaming, data arrives...

Key Insights

  • Watermarks in PySpark Structured Streaming define how long the system waits for late-arriving data before finalizing aggregations, balancing between result accuracy and memory consumption.
  • Late data handling strategies include dropping events beyond the watermark threshold, updating previous aggregates within the watermark window, or routing late events to separate storage for offline processing.
  • Proper watermark configuration requires understanding your data’s latency characteristics—setting it too low loses data, too high causes memory pressure and delayed results.

Understanding Watermarks in Streaming Context

Watermarks solve a fundamental problem in stream processing: when can you safely finalize an aggregation? In batch processing, you know when all data has arrived. In streaming, data arrives continuously and potentially out of order.

A watermark is a moving threshold that tracks event time progression. PySpark uses it to determine when to emit final results for time-based windows and when to discard state for old aggregations.

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, count, avg
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

spark = SparkSession.builder \
    .appName("WatermarkExample") \
    .getOrCreate()

schema = StructType([
    StructField("event_id", StringType()),
    StructField("event_time", TimestampType()),
    StructField("sensor_id", StringType()),
    StructField("temperature", DoubleType())
])

stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor-data") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json_data") \
    .select(from_json(col("json_data"), schema).alias("data")) \
    .select("data.*")

# Define watermark: wait up to 10 minutes for late data
watermarked_df = stream_df \
    .withWatermark("event_time", "10 minutes")

# Aggregate with 5-minute windows
aggregated = watermarked_df \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("sensor_id")
    ) \
    .agg(
        avg("temperature").alias("avg_temp"),
        count("*").alias("event_count")
    )

How Watermark Calculation Works

The watermark advances based on the maximum event time seen so far minus the watermark delay. PySpark calculates: current_watermark = max_event_time - watermark_delay.

Events with timestamps older than the current watermark get dropped. Events within the watermark window update existing aggregates.

from pyspark.sql.functions import current_timestamp, expr

# Simulate streaming data with varying lateness
def generate_test_stream():
    return spark.readStream \
        .format("rate") \
        .option("rowsPerSecond", 10) \
        .load() \
        .withColumn("event_time", 
            expr("timestamp - INTERVAL 30 SECONDS + INTERVAL (rand() * 120) SECONDS"))

test_stream = generate_test_stream()

# Apply watermark
watermarked = test_stream \
    .withWatermark("event_time", "1 minute") \
    .groupBy(
        window(col("event_time"), "30 seconds", "10 seconds")  # sliding window
    ) \
    .count()

query = watermarked.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

The sliding window creates overlapping intervals. The watermark ensures that once a window’s end time passes the watermark threshold, no more updates occur for that window.

Output Modes and Watermark Interaction

Watermarks behave differently depending on the output mode. Understanding this interaction is critical for correct implementation.

Append Mode: Outputs results only when windows are finalized (past the watermark). Guarantees immutability but introduces latency.

Update Mode: Emits updates as data arrives, even for windows not yet finalized. Allows low-latency results but requires downstream systems to handle updates.

Complete Mode: Outputs entire result table on every trigger. Doesn’t work with watermarks for non-aggregated queries.

from pyspark.sql.functions import window, sum as _sum

# Append mode example - results only after watermark passes
append_query = watermarked_df \
    .withWatermark("event_time", "15 minutes") \
    .groupBy(
        window(col("event_time"), "10 minutes"),
        col("sensor_id")
    ) \
    .agg(_sum("temperature").alias("total_temp")) \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/sensor-aggregates") \
    .option("checkpointLocation", "/checkpoints/append") \
    .start()

# Update mode - get intermediate results
update_query = watermarked_df \
    .withWatermark("event_time", "15 minutes") \
    .groupBy(
        window(col("event_time"), "10 minutes"),
        col("sensor_id")
    ) \
    .agg(_sum("temperature").alias("total_temp")) \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("live_aggregates") \
    .option("checkpointLocation", "/checkpoints/update") \
    .start()

Handling Late Data Explicitly

Beyond watermarks, you can implement custom late data handling by comparing event time against processing time or routing late events to different sinks.

from pyspark.sql.functions import current_timestamp, when, unix_timestamp

# Tag late vs on-time data
def classify_lateness(df, watermark_minutes):
    return df \
        .withColumn("processing_time", current_timestamp()) \
        .withColumn("lateness_seconds", 
            unix_timestamp("processing_time") - unix_timestamp("event_time")) \
        .withColumn("is_late", 
            col("lateness_seconds") > (watermark_minutes * 60))

classified_stream = classify_lateness(stream_df, 10)

# Split stream into on-time and late
on_time_stream = classified_stream.filter(col("is_late") == False)
late_stream = classified_stream.filter(col("is_late") == True)

# Process on-time data normally
on_time_query = on_time_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window(col("event_time"), "5 minutes"), col("sensor_id")) \
    .agg(avg("temperature").alias("avg_temp")) \
    .writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("path", "/data/on-time-aggregates") \
    .option("checkpointLocation", "/checkpoints/on-time") \
    .start()

# Route late data to separate storage for analysis
late_query = late_stream \
    .writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("path", "/data/late-arrivals") \
    .option("checkpointLocation", "/checkpoints/late") \
    .start()

Stateful Operations and Memory Management

Watermarks directly impact state store size. Without watermarks, PySpark retains state indefinitely, causing memory issues. With watermarks, state for old windows gets purged.

from pyspark.sql.functions import approx_count_distinct

# Without watermark - state grows unbounded
unbounded_state = stream_df \
    .groupBy(window(col("event_time"), "1 hour")) \
    .agg(approx_count_distinct("sensor_id").alias("unique_sensors"))
# Memory grows continuously - dangerous in production

# With watermark - state cleaned after 2 hours
bounded_state = stream_df \
    .withWatermark("event_time", "2 hours") \
    .groupBy(window(col("event_time"), "1 hour")) \
    .agg(approx_count_distinct("sensor_id").alias("unique_sensors"))
# State for windows older than (max_event_time - 2 hours) gets purged

# Monitor state store metrics
bounded_query = bounded_state.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "/checkpoints/bounded") \
    .start()

# Access metrics programmatically
last_progress = bounded_query.lastProgress
if last_progress:
    state_operators = last_progress.get('stateOperators', [])
    for operator in state_operators:
        print(f"Num rows in state: {operator.get('numRowsTotal', 0)}")
        print(f"Memory used: {operator.get('memoryUsedBytes', 0)} bytes")

Stream-Stream Joins with Watermarks

Joining two streams requires watermarks on both sides to bound the state and define matching time ranges.

# First stream: user clicks
clicks_schema = StructType([
    StructField("user_id", StringType()),
    StructField("click_time", TimestampType()),
    StructField("page_id", StringType())
])

clicks = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clicks") \
    .load() \
    .select(from_json(col("value").cast("string"), clicks_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("click_time", "20 minutes")

# Second stream: user purchases
purchases_schema = StructType([
    StructField("user_id", StringType()),
    StructField("purchase_time", TimestampType()),
    StructField("amount", DoubleType())
])

purchases = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "purchases") \
    .load() \
    .select(from_json(col("value").cast("string"), purchases_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("purchase_time", "20 minutes")

# Join with time constraint
joined = clicks.join(
    purchases,
    expr("""
        clicks.user_id = purchases.user_id AND
        purchase_time >= click_time AND
        purchase_time <= click_time + INTERVAL 30 MINUTES
    """)
)

join_query = joined.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "/checkpoints/join") \
    .start()

Both watermarks must be defined for stream-stream joins. PySpark uses the minimum watermark from both streams to determine when to drop state for unmatched records.

Choosing the Right Watermark Delay

Set watermark delay based on empirical latency measurements from your data source. Monitor dropped events and adjust accordingly.

# Add monitoring for dropped events
monitored_stream = stream_df \
    .withColumn("arrival_time", current_timestamp()) \
    .withColumn("latency_seconds", 
        unix_timestamp("arrival_time") - unix_timestamp("event_time"))

# Calculate latency percentiles in a separate aggregation
latency_stats = monitored_stream \
    .groupBy(window(col("arrival_time"), "5 minutes")) \
    .agg(
        expr("percentile_approx(latency_seconds, 0.95)").alias("p95_latency"),
        expr("percentile_approx(latency_seconds, 0.99)").alias("p99_latency"),
        avg("latency_seconds").alias("avg_latency")
    )

latency_query = latency_stats.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("checkpointLocation", "/checkpoints/latency") \
    .start()

Use p95 or p99 latency as your watermark delay baseline, then add a buffer. For example, if p99 latency is 8 minutes, set watermark to 10-12 minutes. Balance data completeness against memory usage and result freshness based on business requirements.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.