PySpark - Streaming Window Operations
Streaming window operations partition unbounded data streams into finite chunks for aggregation. Unlike batch processing where you operate on complete datasets, streaming windows define temporal...
Key Insights
- Window operations in PySpark Structured Streaming enable time-based aggregations over continuous data streams using tumbling, sliding, and session windows with configurable watermarks for late data handling.
- Tumbling windows partition data into fixed, non-overlapping intervals while sliding windows create overlapping intervals, allowing the same event to contribute to multiple aggregation windows simultaneously.
- Session windows dynamically group events based on activity gaps rather than fixed time intervals, making them ideal for user behavior analysis where interaction patterns determine window boundaries.
Understanding Streaming Windows
Streaming window operations partition unbounded data streams into finite chunks for aggregation. Unlike batch processing where you operate on complete datasets, streaming windows define temporal boundaries over continuously arriving data.
PySpark Structured Streaming provides three window types: tumbling (fixed, non-overlapping), sliding (overlapping), and session (gap-based). Each serves distinct use cases in real-time analytics.
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, count, sum
spark = SparkSession.builder \
.appName("StreamingWindows") \
.config("spark.sql.streaming.schemaInference", "true") \
.getOrCreate()
# Sample streaming source - Kafka topic with IoT sensor data
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor-data") \
.load()
# Parse JSON payload
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
schema = StructType([
StructField("sensor_id", StringType()),
StructField("temperature", DoubleType()),
StructField("timestamp", TimestampType())
])
parsed_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
Tumbling Windows
Tumbling windows divide the timeline into fixed-duration, non-overlapping segments. A 5-minute tumbling window creates discrete intervals: 00:00-00:05, 00:05-00:10, 00:10-00:15, with each event belonging to exactly one window.
# 10-minute tumbling window aggregation
tumbling_agg = parsed_df \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("sensor_id")
) \
.agg(
avg("temperature").alias("avg_temp"),
count("*").alias("reading_count"),
max("temperature").alias("max_temp"),
min("temperature").alias("min_temp")
)
# Write results to console
query = tumbling_agg.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
The window function creates a struct column containing start and end timestamps. Access these fields for custom processing:
tumbling_with_bounds = tumbling_agg \
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("sensor_id"),
col("avg_temp"),
col("reading_count")
)
Sliding Windows
Sliding windows overlap, allowing events to contribute to multiple windows. Define both window duration and slide interval. A 10-minute window sliding every 5 minutes creates windows: 00:00-00:10, 00:05-00:15, 00:10-00:20.
# 10-minute window sliding every 5 minutes
sliding_agg = parsed_df \
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("sensor_id")
) \
.agg(
avg("temperature").alias("avg_temp"),
count("*").alias("reading_count")
)
# Detect temperature spikes using sliding window trends
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
windowSpec = Window.partitionBy("sensor_id").orderBy("window.start")
spike_detection = sliding_agg \
.withColumn("prev_avg", lag("avg_temp").over(windowSpec)) \
.withColumn("temp_change", col("avg_temp") - col("prev_avg")) \
.filter(col("temp_change") > 5.0) # Alert on 5+ degree increases
Sliding windows are computationally expensive since events participate in multiple aggregations. A 1-hour window sliding every 1 minute generates 60x more windows than a tumbling window.
Session Windows
Session windows group events separated by inactivity gaps. Instead of fixed time boundaries, windows close after a specified gap duration without events. Perfect for user session analysis.
# User activity session tracking
user_events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-clicks") \
.load()
event_schema = StructType([
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("page_url", StringType()),
StructField("timestamp", TimestampType())
])
parsed_events = user_events \
.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), event_schema).alias("data")) \
.select("data.*")
# 30-minute session gap
session_agg = parsed_events \
.groupBy(
col("user_id"),
session_window(col("timestamp"), "30 minutes")
) \
.agg(
count("*").alias("events_in_session"),
collect_list("page_url").alias("pages_visited"),
min("timestamp").alias("session_start"),
max("timestamp").alias("session_end")
)
Session windows dynamically adapt to user behavior. Active users generate longer sessions; inactive users create multiple short sessions.
Watermarking for Late Data
Streaming systems must handle late-arriving data. Watermarks define how long to wait for delayed events before finalizing window results.
# Configure 10-minute watermark
watermarked_df = parsed_df \
.withWatermark("timestamp", "10 minutes")
# Tumbling window with watermark
late_data_handling = watermarked_df \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("sensor_id")
) \
.agg(avg("temperature").alias("avg_temp"))
query = late_data_handling.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/windowed_results") \
.option("checkpointLocation", "/data/checkpoints") \
.start()
Watermark behavior:
- Events arriving within the watermark window update existing aggregations
- Events beyond the watermark are dropped
appendmode only outputs finalized windows (past watermark)updatemode continuously updates all active windows
# Monitor late data metrics
from pyspark.sql.functions import current_timestamp, expr
with_latency = parsed_df \
.withColumn("processing_time", current_timestamp()) \
.withColumn("latency_seconds",
expr("unix_timestamp(processing_time) - unix_timestamp(timestamp)"))
latency_stats = with_latency \
.groupBy(window(col("timestamp"), "1 minute")) \
.agg(
avg("latency_seconds").alias("avg_latency"),
max("latency_seconds").alias("max_latency"),
count(expr("latency_seconds > 600")).alias("late_events_count")
)
Complex Window Patterns
Combine multiple window operations for sophisticated analytics:
# Multi-level aggregation: 1-minute windows rolled up to 1-hour
minute_agg = parsed_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window(col("timestamp"), "1 minute"),
col("sensor_id")
) \
.agg(avg("temperature").alias("minute_avg"))
hour_agg = minute_agg \
.groupBy(
window(col("window.start"), "1 hour"),
col("sensor_id")
) \
.agg(
avg("minute_avg").alias("hour_avg"),
max("minute_avg").alias("hour_max"),
min("minute_avg").alias("hour_min")
)
Join streaming windows with static reference data:
# Load sensor metadata
sensor_metadata = spark.read \
.format("parquet") \
.load("/data/sensor_metadata")
enriched_agg = tumbling_agg \
.join(sensor_metadata, "sensor_id") \
.select(
col("window.start"),
col("sensor_id"),
col("location"),
col("sensor_type"),
col("avg_temp")
) \
.filter(col("avg_temp") > col("threshold_temp"))
Performance Optimization
Window operations consume memory proportional to active window count and cardinality. Optimize with these techniques:
# Partition by high-cardinality keys before windowing
optimized_stream = parsed_df \
.repartition(col("sensor_id")) \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("sensor_id")
) \
.agg(avg("temperature").alias("avg_temp"))
# Configure state cleanup
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# Limit state growth with watermarks
conservative_watermark = parsed_df \
.withWatermark("timestamp", "2 minutes") \
.groupBy(window(col("timestamp"), "1 minute")) \
.agg(count("*").alias("event_count"))
Monitor streaming query metrics:
query = tumbling_agg.writeStream \
.outputMode("update") \
.format("memory") \
.queryName("sensor_windows") \
.start()
# Access metrics
query.lastProgress # Processing rates, latencies
query.status # Query state
spark.streams.active # All active queries
Window operations transform unbounded streams into analyzable time-series data. Choose tumbling windows for discrete reporting periods, sliding windows for trend detection, and session windows for behavior analysis. Always configure watermarks to balance late data handling with state management overhead.