Spark Streaming - Window Operations
Window operations partition streaming data into finite chunks based on time intervals. Unlike batch processing where you work with complete datasets, streaming windows let you perform aggregations...
Key Insights
- Window operations in Spark Streaming enable time-based aggregations over sliding intervals, essential for real-time analytics like rolling averages, trend detection, and session analysis
- Tumbling windows (non-overlapping) and sliding windows (overlapping) serve different use cases: tumbling for discrete time buckets, sliding for continuous monitoring with configurable overlap
- Proper watermark configuration is critical for handling late-arriving data in structured streaming, preventing unbounded state growth while maintaining result accuracy
Understanding Window Operations
Window operations partition streaming data into finite chunks based on time intervals. Unlike batch processing where you work with complete datasets, streaming windows let you perform aggregations over continuously arriving data within specific time boundaries.
Spark Streaming provides two APIs: DStreams (legacy) and Structured Streaming (recommended). This article focuses on Structured Streaming, which offers better performance, exactly-once semantics, and a more intuitive DataFrame API.
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, count
spark = SparkSession.builder \
.appName("WindowOperations") \
.getOrCreate()
# Read streaming data from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Parse JSON data
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType, TimestampType, DoubleType
schema = StructType() \
.add("user_id", StringType()) \
.add("event_time", TimestampType()) \
.add("value", DoubleType())
parsed_df = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
Tumbling Windows
Tumbling windows divide the timeline into non-overlapping, fixed-duration intervals. Each event belongs to exactly one window. Use tumbling windows for discrete time-bucket aggregations like hourly sales totals or daily active users.
# 10-minute tumbling window
tumbling_aggregates = parsed_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "10 minutes"),
col("user_id")
) \
.agg(
count("*").alias("event_count"),
avg("value").alias("avg_value")
)
query = tumbling_aggregates.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
The output shows discrete 10-minute buckets:
+------------------------------------------+-------+-----------+---------+
|window |user_id|event_count|avg_value|
+------------------------------------------+-------+-----------+---------+
|{2024-01-15 10:00:00, 2024-01-15 10:10:00}|user_1 |45 |23.4 |
|{2024-01-15 10:10:00, 2024-01-15 10:20:00}|user_1 |52 |25.1 |
+------------------------------------------+-------+-----------+---------+
Sliding Windows
Sliding windows overlap, allowing events to belong to multiple windows. Define both window duration and slide interval. When slide interval < window duration, windows overlap. This is crucial for moving averages and trend detection.
# 10-minute window sliding every 5 minutes
sliding_aggregates = parsed_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "10 minutes", "5 minutes"),
col("user_id")
) \
.agg(
count("*").alias("event_count"),
avg("value").alias("moving_avg")
)
Each event appears in multiple windows:
+------------------------------------------+-------+-----------+----------+
|window |user_id|event_count|moving_avg|
+------------------------------------------+-------+-----------+----------+
|{2024-01-15 10:00:00, 2024-01-15 10:10:00}|user_1 |45 |23.4 |
|{2024-01-15 10:05:00, 2024-01-15 10:15:00}|user_1 |63 |24.2 |
|{2024-01-15 10:10:00, 2024-01-15 10:20:00}|user_1 |52 |25.1 |
+------------------------------------------+-------+-----------+----------+
Watermarks and Late Data
Watermarks define how long Spark waits for late-arriving data. Without watermarks, state grows unbounded as Spark must maintain all historical windows. Watermarks enable state cleanup while controlling result accuracy trade-offs.
from pyspark.sql.functions import current_timestamp
# Watermark of 15 minutes
windowed_counts = parsed_df \
.withWatermark("event_time", "15 minutes") \
.groupBy(
window(col("event_time"), "10 minutes")
) \
.count()
Watermark logic: If the maximum event timestamp seen is max_event_time, Spark drops events with event_time < (max_event_time - watermark_delay).
Example scenario:
- Current max event_time: 10:25:00
- Watermark: 15 minutes
- Threshold: 10:10:00
- Events with event_time < 10:10:00 are dropped
# Monitoring late data
with_metadata = parsed_df \
.withColumn("processing_time", current_timestamp()) \
.withWatermark("event_time", "15 minutes") \
.groupBy(window(col("event_time"), "10 minutes")) \
.agg(
count("*").alias("event_count"),
max("processing_time").alias("last_processed")
)
Session Windows
Session windows group events by activity sessions with configurable gap durations. Unlike fixed windows, session windows have dynamic lengths based on event patterns. Critical for user session analysis and anomaly detection.
from pyspark.sql.functions import session_window
# Session window with 30-minute gap timeout
session_aggregates = parsed_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
col("user_id"),
session_window(col("event_time"), "30 minutes")
) \
.agg(
count("*").alias("events_in_session"),
min("event_time").alias("session_start"),
max("event_time").alias("session_end")
)
If events arrive within 30 minutes of each other, they belong to the same session. A gap exceeding 30 minutes starts a new session.
Multiple Aggregations on Windows
Combine multiple aggregations for comprehensive analytics:
from pyspark.sql.functions import sum, min, max, stddev
complex_aggregates = parsed_df \
.withWatermark("event_time", "20 minutes") \
.groupBy(
window(col("event_time"), "15 minutes", "5 minutes"),
col("user_id")
) \
.agg(
count("*").alias("total_events"),
sum("value").alias("total_value"),
avg("value").alias("avg_value"),
min("value").alias("min_value"),
max("value").alias("max_value"),
stddev("value").alias("stddev_value")
)
# Write to Delta Lake for downstream analysis
query = complex_aggregates.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start("/tmp/windowed_aggregates")
Output Modes
Output mode determines what gets written to the sink:
Append: Only new rows (complete windows past watermark)
.outputMode("append") # Use with watermarks
Update: New and updated rows
.outputMode("update") # Updates existing aggregates
Complete: Entire result table (small state only)
.outputMode("complete") # All aggregates every trigger
For windowed aggregations with watermarks, use append mode to write only finalized windows:
finalized_windows = parsed_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(window(col("event_time"), "5 minutes")) \
.count()
query = finalized_windows.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/output/windows") \
.option("checkpointLocation", "/checkpoint/windows") \
.start()
Performance Optimization
State management impacts performance. Minimize state size and enable state cleanup:
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# Partition by key for better state distribution
optimized = parsed_df \
.repartition(col("user_id")) \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes"),
col("user_id")
) \
.count()
Monitor state metrics:
query = optimized.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime="30 seconds") \
.start()
# Check progress
query.lastProgress # Contains stateOperators metrics
Window operations transform real-time data into actionable insights. Choose tumbling windows for discrete buckets, sliding windows for continuous monitoring, and session windows for activity-based grouping. Always configure watermarks to balance latency against completeness, and monitor state size to maintain performance at scale.