Spark Structured Streaming - Architecture Guide

Structured Streaming builds on Spark SQL's engine, treating streaming data as an unbounded input table. Each micro-batch incrementally processes new rows, updating result tables that can be written...

Key Insights

  • Structured Streaming uses micro-batch processing with configurable triggers, processing data as unbounded tables where new rows continuously append, enabling SQL-like operations on streaming data with exactly-once semantics
  • The architecture separates logical planning (query optimization) from physical execution through Catalyst optimizer and Tungsten engine, allowing the same DataFrame API code to work on both batch and streaming data
  • State management through checkpointing and write-ahead logs ensures fault tolerance, while watermarking handles late-arriving data and prevents unbounded state growth in aggregation operations

Core Architecture Components

Structured Streaming builds on Spark SQL’s engine, treating streaming data as an unbounded input table. Each micro-batch incrementally processes new rows, updating result tables that can be written to sinks using different output modes.

The architecture consists of four primary layers:

Source Layer: Reads data from streaming sources (Kafka, file systems, sockets). Sources must implement the Source or MicroBatchStream interface to provide offset management and batch retrieval.

Execution Layer: The Catalyst optimizer creates logical and physical plans. The IncrementalExecution class extends Spark’s QueryExecution to handle stateful operations and watermarking.

State Management Layer: Manages operator state across micro-batches using HDFS-compatible storage. State stores maintain versioned key-value data for operations like aggregations and joins.

Sink Layer: Writes results to external systems with exactly-once or at-least-once guarantees depending on sink capabilities.

Micro-Batch Processing Model

Structured Streaming defaults to micro-batch execution with configurable trigger intervals. Each micro-batch is a Spark job that processes a range of offsets.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

val streamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .load()

val query = streamingDF
  .selectExpr("CAST(value AS STRING)")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .start()

The trigger determines when the engine checks for new data. Trigger.ProcessingTime starts a new micro-batch at fixed intervals. Trigger.Once processes all available data then stops, useful for scheduled batch jobs. Trigger.Continuous enables experimental continuous processing with millisecond latencies for specific operations.

State Store Architecture

Stateful operations require maintaining data across micro-batches. Structured Streaming uses a state store abstraction backed by versioned key-value storage.

case class Event(userId: String, action: String, timestamp: Long)

val events = streamingDF
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json($"json", schema).as("data"))
  .select($"data.*")
  .as[Event]

val sessionCounts = events
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window($"timestamp", "5 minutes", "5 minutes"),
    $"userId"
  )
  .count()
  .writeStream
  .format("parquet")
  .option("path", "/output/sessions")
  .option("checkpointLocation", "/checkpoints/sessions")
  .outputMode("append")
  .start()

The state store maintains aggregation state partitioned by grouping keys. Each executor manages state for its partitions. During checkpointing, state snapshots are written to reliable storage. On failure, the engine reconstructs state from the last checkpoint.

State store implementations include:

  • HDFSBackedStateStore: Default implementation using HDFS-compatible file systems
  • RocksDBStateStore: Uses RocksDB for better performance with large state (requires additional configuration)

Watermarking and Late Data Handling

Watermarks track event time progress, enabling the engine to bound state size and handle late-arriving data predictably.

val lateEvents = events
  .withWatermark("timestamp", "1 hour")
  .groupBy(
    window($"timestamp", "15 minutes"),
    $"userId"
  )
  .agg(
    count("*").as("eventCount"),
    collect_list("action").as("actions")
  )

lateEvents
  .writeStream
  .outputMode("update")
  .format("delta")
  .option("checkpointLocation", "/checkpoints/late-events")
  .start("/output/late-events")

The watermark is calculated as max(event_time) - threshold. Events with timestamps below the watermark are considered too late and dropped. The engine can purge state for windows that have closed based on the watermark.

Watermarking behavior:

  • Append mode: Results are emitted only after the watermark passes the window end time
  • Update mode: Results are updated until the watermark passes, then finalized
  • Complete mode: Watermarking doesn’t apply; all state is maintained

Checkpointing and Fault Tolerance

Checkpointing ensures exactly-once processing semantics by storing metadata about processed offsets and operator state.

val checkpointConfig = streamingDF
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "processed-events")
  .option("checkpointLocation", "/checkpoints/kafka-to-kafka")
  .queryName("event-processor")
  .start()

The checkpoint directory contains:

  • offsets/: Committed offset ranges for each batch
  • commits/: Marker files indicating successful batch completion
  • state/: State store data for stateful operations
  • metadata: Query configuration and schema information

Recovery process:

  1. Read the latest committed batch from checkpoint
  2. Restore state store to the committed version
  3. Replay uncommitted batches if any
  4. Resume processing from the last committed offset

Stream-Stream Joins

Joining two streams requires buffering data from both sides within time bounds defined by watermarks.

val impressions = spark
  .readStream
  .format("kafka")
  .option("subscribe", "impressions")
  .load()
  .select(from_json($"value".cast("string"), impressionSchema).as("data"))
  .select($"data.*")
  .withWatermark("impressionTime", "2 hours")

val clicks = spark
  .readStream
  .format("kafka")
  .option("subscribe", "clicks")
  .load()
  .select(from_json($"value".cast("string"), clickSchema).as("data"))
  .select($"data.*")
  .withWatermark("clickTime", "3 hours")

val joined = impressions.join(
  clicks,
  expr("""
    impressionId = clickImpressionId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  "leftOuter"
)

joined
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoints/attribution")
  .start("/output/attribution")

The engine maintains buffered state for both streams within the watermark bounds. Time constraints in the join condition allow the engine to prune old state. Without watermarks, state grows unbounded causing memory issues.

Output Modes and Sink Semantics

Output modes determine what gets written to the sink after each trigger:

// Append: Only new rows (requires watermark for aggregations)
query.writeStream
  .outputMode("append")
  .format("parquet")
  .start("/output/append")

// Update: Only changed rows since last trigger
query.writeStream
  .outputMode("update")
  .format("delta")
  .start("/output/update")

// Complete: Entire result table (only for aggregations)
query.writeStream
  .outputMode("complete")
  .format("memory")
  .queryName("complete_table")
  .start()

Sink guarantees vary by implementation:

  • File sinks (Parquet, Delta): Exactly-once through atomic file operations
  • Kafka sink: At-least-once (idempotent producer can achieve effectively-once)
  • Foreach/ForeachBatch sinks: Depends on implementation

Performance Optimization

Monitor streaming queries using the StreamingQueryListener API and Spark UI:

spark.streams.addListener(new StreamingQueryListener {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    val progress = event.progress
    println(s"Batch ${progress.batchId}: " +
      s"${progress.numInputRows} rows, " +
      s"${progress.processedRowsPerSecond} rows/sec")
    
    if (progress.durationMs.get("triggerExecution") > 60000) {
      // Alert: Processing taking too long
    }
  }
  
  override def onQueryStarted(event: QueryStartedEvent): Unit = {}
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
})

Key optimization strategies:

  • Partition tuning: Increase spark.sql.shuffle.partitions for stateful operations with large state
  • State store tuning: Use RocksDB for state > 100GB per partition
  • Trigger intervals: Balance latency vs. throughput; longer intervals amortize overhead
  • Watermark tuning: Shorter watermarks reduce state but may drop more late data

The architecture’s separation of logical and physical planning enables Spark to optimize streaming queries like batch queries while handling the additional complexity of incremental processing and state management.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.