Spark Streaming - Stream-Stream Joins

Stream-stream joins combine records from two independent data streams based on matching keys and time windows. Unlike stream-static joins, both sides continuously receive new data, requiring Spark to...

Key Insights

  • Stream-stream joins in Spark Structured Streaming require watermarking on both streams to manage state and prevent unbounded memory growth, with join conditions determining whether the operation is inner, outer, or semi join.
  • Time constraints through watermarks define how long Spark retains state for unmatched records, with the buffer window calculated as maxEventTime - watermark for each stream, making proper watermark configuration critical for join correctness.
  • Stateful operations like stream-stream joins checkpoint progress to fault-tolerant storage, requiring careful consideration of state store size, cleanup policies, and query restart behavior in production deployments.

Understanding Stream-Stream Join Mechanics

Stream-stream joins combine records from two independent data streams based on matching keys and time windows. Unlike stream-static joins, both sides continuously receive new data, requiring Spark to buffer records from each stream until potential matches arrive.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder()
  .appName("StreamStreamJoin")
  .master("local[*]")
  .config("spark.sql.streaming.stateStore.providerClass", 
    "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
  .getOrCreate()

import spark.implicits._

// Define schema for impression events
case class Impression(adId: String, userId: String, timestamp: Long)
case class Click(adId: String, userId: String, timestamp: Long)

// Create streaming DataFrames
val impressions = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "impressions")
  .load()
  .select(from_json($"value".cast("string"), schema).as("data"))
  .select("data.*")
  .withColumn("impressionTime", $"timestamp".cast("timestamp"))
  .withWatermark("impressionTime", "10 minutes")

val clicks = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "clicks")
  .load()
  .select(from_json($"value".cast("string"), schema).as("data"))
  .select("data.*")
  .withColumn("clickTime", $"timestamp".cast("timestamp"))
  .withWatermark("clickTime", "5 minutes")

Watermark Configuration and State Management

Watermarks determine when Spark can safely discard buffered state. Each stream maintains its own watermark, calculated as the maximum event time seen minus the threshold. Records older than the watermark are dropped.

// Inner join with time constraints
val adAttributions = impressions.join(
  clicks,
  expr("""
    adId = adId AND 
    userId = userId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 30 minutes
  """)
)

// The effective state retention for impressions:
// max(clickTime) - 5 minutes (click watermark)
// Impressions are kept for up to 30 minutes after impression time
// to allow for late-arriving clicks

val query = adAttributions
  .writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

The watermark interaction follows specific rules:

// Calculating state retention windows
// For impression stream: keeps records for (30 min + 5 min watermark) = 35 min
// For click stream: keeps records for (0 min + 10 min watermark) = 10 min

// Example with explicit time bounds
val joinCondition = expr("""
  impressions.adId = clicks.adId AND
  impressions.userId = clicks.userId AND
  clicks.clickTime BETWEEN 
    impressions.impressionTime AND 
    impressions.impressionTime + INTERVAL 1 HOUR
""")

val strictAttributions = impressions
  .as("impressions")
  .join(
    clicks.as("clicks"),
    joinCondition,
    "inner"
  )
  .select(
    $"impressions.adId",
    $"impressions.userId",
    $"impressions.impressionTime",
    $"clicks.clickTime",
    ($"clicks.clickTime".cast("long") - 
     $"impressions.impressionTime".cast("long")).as("timeToClick")
  )

Join Types and Output Modes

Different join types produce different semantics for matched and unmatched records. Output mode restrictions apply based on the join type.

// Inner join - only matched records (append mode)
val innerJoin = impressions.join(clicks, 
  expr("adId = adId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 15 minutes"),
  "inner"
)

// Left outer join - all impressions, matched clicks (append mode with watermarks)
val leftOuterJoin = impressions.join(clicks,
  expr("adId = adId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 15 minutes"),
  "left_outer"
)

// Right outer join - all clicks, matched impressions
val rightOuterJoin = impressions.join(clicks,
  expr("adId = adId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 15 minutes"),
  "right_outer"
)

// Full outer join - all records from both streams
val fullOuterJoin = impressions.join(clicks,
  expr("adId = adId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 15 minutes"),
  "full_outer"
)

Output mode compatibility:

// Append mode - supported for all join types with watermarks
innerJoin.writeStream
  .outputMode("append")
  .format("parquet")
  .option("path", "/output/inner")
  .option("checkpointLocation", "/checkpoint/inner")
  .start()

// Update mode - supported for outer joins
leftOuterJoin.writeStream
  .outputMode("update")
  .format("delta")
  .option("path", "/output/left")
  .option("checkpointLocation", "/checkpoint/left")
  .start()

// Complete mode - not supported for stream-stream joins
// Will throw AnalysisException

Production Considerations and Optimization

State store management directly impacts performance and reliability. Monitor state size and configure cleanup appropriately.

// Configure state store cleanup
spark.conf.set("spark.sql.streaming.stateStore.maintenanceInterval", "60s")
spark.conf.set("spark.sql.streaming.minBatchesToRetain", "2")

// Monitor state store metrics
val query = adAttributions.writeStream
  .foreachBatch { (batchDF, batchId) =>
    val metrics = spark.streams.active
      .find(_.name == "adAttributions")
      .map(_.lastProgress)
    
    metrics.foreach { progress =>
      println(s"Batch $batchId:")
      println(s"  State rows: ${progress.stateOperators(0).numRowsTotal}")
      println(s"  State memory: ${progress.stateOperators(0).memoryUsedBytes}")
    }
    
    batchDF.write
      .format("delta")
      .mode("append")
      .save("/output/attributions")
  }
  .option("checkpointLocation", "/checkpoint/attributions")
  .start()

Handle late data and configure appropriate watermark delays:

// Aggressive watermark for low-latency requirements
val lowLatencyJoin = impressions
  .withWatermark("impressionTime", "1 minute")
  .join(
    clicks.withWatermark("clickTime", "1 minute"),
    expr("adId = adId AND clickTime BETWEEN impressionTime AND impressionTime + interval 5 minutes")
  )

// Conservative watermark for high data completeness
val highCompletenessJoin = impressions
  .withWatermark("impressionTime", "30 minutes")
  .join(
    clicks.withWatermark("clickTime", "30 minutes"),
    expr("adId = adId AND clickTime BETWEEN impressionTime AND impressionTime + interval 2 hours")
  )

Partition and optimize for large-scale deployments:

// Repartition before join to balance load
val optimizedJoin = impressions
  .repartition(200, $"adId", $"userId")
  .withWatermark("impressionTime", "15 minutes")
  .join(
    clicks
      .repartition(200, $"adId", $"userId")
      .withWatermark("clickTime", "15 minutes"),
    Seq("adId", "userId")
  )
  .where("clickTime >= impressionTime AND clickTime <= impressionTime + interval 45 minutes")

// Enable adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

optimizedJoin.writeStream
  .partitionBy("date")
  .format("delta")
  .option("checkpointLocation", "/checkpoint/optimized")
  .option("maxFilesPerTrigger", "1000")
  .start("/output/optimized")

Debugging and Troubleshooting

Monitor query progress and identify bottlenecks:

// Access detailed metrics
query.lastProgress.stateOperators.foreach { op =>
  println(s"Operator: ${op.operatorName}")
  println(s"  Rows total: ${op.numRowsTotal}")
  println(s"  Rows updated: ${op.numRowsUpdated}")
  println(s"  Memory: ${op.memoryUsedBytes / 1024 / 1024} MB")
  println(s"  Custom metrics: ${op.customMetrics}")
}

// Check watermark progression
query.lastProgress.sources.foreach { source =>
  println(s"Source: ${source.description}")
  println(s"  Latest offset: ${source.endOffset}")
}

// Validate join conditions produce matches
val debugJoin = impressions
  .withColumn("stream", lit("impression"))
  .union(clicks.withColumn("stream", lit("click")))
  .writeStream
  .outputMode("append")
  .format("console")
  .start()

Stream-stream joins enable real-time correlation of events across multiple data sources. Success requires understanding watermark semantics, state management implications, and careful tuning of time constraints based on business requirements and infrastructure capabilities.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.