Spark Streaming - Exactly-Once Semantics

Exactly-once semantics ensures each record is processed once and only once, even during failures and restarts. This differs from at-least-once (potential duplicates) and at-most-once (potential data...

Key Insights

  • Exactly-once semantics in Spark Streaming requires coordinating idempotent writes, transactional outputs, and checkpoint management to prevent data duplication or loss during failures
  • Structured Streaming provides built-in exactly-once guarantees through write-ahead logs and offset tracking, while DStreams require manual implementation of idempotency patterns
  • Production implementations must handle reprocessing scenarios with idempotent sinks, transactional databases, or deduplication strategies based on unique message identifiers

Understanding Exactly-Once Guarantees

Exactly-once semantics ensures each record is processed once and only once, even during failures and restarts. This differs from at-least-once (potential duplicates) and at-most-once (potential data loss) delivery guarantees.

Spark Streaming achieves exactly-once processing through three mechanisms:

  1. Reliable sources with offset tracking (Kafka, Kinesis)
  2. Deterministic recomputation via RDD lineage
  3. Idempotent or transactional sinks that handle duplicate writes

The challenge lies in coordinating these components. Spark can reprocess data reliably, but the output operation must be designed to produce the same result when executed multiple times.

Structured Streaming Exactly-Once Implementation

Structured Streaming provides exactly-once semantics by default when using supported sources and sinks. Here’s a Kafka-to-Delta Lake pipeline:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import io.delta.tables._

val spark = SparkSession.builder()
  .appName("ExactlyOnceStreaming")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "transactions")
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "true")
  .load()

val transactions = kafkaStream
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .selectExpr(
    "key as transaction_id",
    "get_json_object(value, '$.amount') as amount",
    "get_json_object(value, '$.user_id') as user_id",
    "get_json_object(value, '$.timestamp') as event_time"
  )

val query = transactions.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/checkpoint/transactions")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start("/tmp/delta/transactions")

query.awaitTermination()

The checkpoint location stores offset information and metadata. If the application crashes, Spark reads the checkpoint, determines the last committed offset, and resumes from that point. Delta Lake provides ACID transactions, ensuring atomic writes.

Implementing Idempotent Sinks

When using non-transactional outputs like JDBC or REST APIs, implement idempotency using unique identifiers:

import org.apache.spark.sql.ForeachWriter
import java.sql.{Connection, DriverManager, PreparedStatement}

class IdempotentJDBCWriter extends ForeachWriter[Row] {
  var connection: Connection = _
  var statement: PreparedStatement = _
  
  def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(
      "jdbc:postgresql://localhost:5432/mydb",
      "user",
      "password"
    )
    
    // Use INSERT ON CONFLICT for idempotency
    statement = connection.prepareStatement(
      """
      INSERT INTO transactions (transaction_id, amount, user_id, event_time, processing_time)
      VALUES (?, ?, ?, ?, ?)
      ON CONFLICT (transaction_id) DO NOTHING
      """
    )
    true
  }
  
  def process(row: Row): Unit = {
    statement.setString(1, row.getAs[String]("transaction_id"))
    statement.setDouble(2, row.getAs[String]("amount").toDouble)
    statement.setString(3, row.getAs[String]("user_id"))
    statement.setTimestamp(4, java.sql.Timestamp.valueOf(row.getAs[String]("event_time")))
    statement.setTimestamp(5, new java.sql.Timestamp(System.currentTimeMillis()))
    statement.executeUpdate()
  }
  
  def close(errorOrNull: Throwable): Unit = {
    if (statement != null) statement.close()
    if (connection != null) connection.close()
  }
}

val writeQuery = transactions.writeStream
  .foreach(new IdempotentJDBCWriter())
  .option("checkpointLocation", "/tmp/checkpoint/jdbc")
  .start()

The ON CONFLICT DO NOTHING clause makes writes idempotent. If Spark reprocesses a batch, duplicate transaction IDs are ignored.

Deduplication with Watermarking

For streams without natural unique identifiers, implement deduplication using watermarks:

import org.apache.spark.sql.functions._

val deduplicated = kafkaStream
  .selectExpr("CAST(value AS STRING) as json_value")
  .select(
    get_json_object($"json_value", "$.event_id").as("event_id"),
    get_json_object($"json_value", "$.user_id").as("user_id"),
    get_json_object($"json_value", "$.amount").as("amount"),
    get_json_object($"json_value", "$.timestamp").cast("timestamp").as("event_time")
  )
  .withWatermark("event_time", "1 hour")
  .dropDuplicates("event_id", "event_time")

val aggregated = deduplicated
  .groupBy(
    window($"event_time", "5 minutes"),
    $"user_id"
  )
  .agg(
    sum($"amount").as("total_amount"),
    count("*").as("transaction_count")
  )

aggregated.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/checkpoint/dedup")
  .start("/tmp/delta/aggregated")

Watermarking allows Spark to drop state for old events, preventing unbounded memory growth while maintaining exactly-once guarantees within the watermark window.

Transactional Writes with foreachBatch

For complex output operations requiring transactions across multiple sinks:

def writeBatch(batchDF: DataFrame, batchId: Long): Unit = {
  // Write to Delta Lake
  batchDF.write
    .format("delta")
    .mode("append")
    .save("/tmp/delta/raw_transactions")
  
  // Update aggregated metrics in PostgreSQL within transaction
  val connection = DriverManager.getConnection(
    "jdbc:postgresql://localhost:5432/mydb",
    "user",
    "password"
  )
  
  try {
    connection.setAutoCommit(false)
    
    val metrics = batchDF
      .groupBy("user_id")
      .agg(
        sum("amount").as("batch_total"),
        count("*").as("batch_count")
      )
      .collect()
    
    val updateStmt = connection.prepareStatement(
      """
      INSERT INTO user_metrics (user_id, total_amount, transaction_count, last_updated)
      VALUES (?, ?, ?, ?)
      ON CONFLICT (user_id) DO UPDATE SET
        total_amount = user_metrics.total_amount + EXCLUDED.total_amount,
        transaction_count = user_metrics.transaction_count + EXCLUDED.transaction_count,
        last_updated = EXCLUDED.last_updated
      """
    )
    
    metrics.foreach { row =>
      updateStmt.setString(1, row.getAs[String]("user_id"))
      updateStmt.setDouble(2, row.getAs[Double]("batch_total"))
      updateStmt.setLong(3, row.getAs[Long]("batch_count"))
      updateStmt.setTimestamp(4, new java.sql.Timestamp(System.currentTimeMillis()))
      updateStmt.addBatch()
    }
    
    updateStmt.executeBatch()
    connection.commit()
  } catch {
    case e: Exception =>
      connection.rollback()
      throw e
  } finally {
    connection.close()
  }
}

transactions.writeStream
  .foreachBatch(writeBatch _)
  .option("checkpointLocation", "/tmp/checkpoint/batch")
  .start()

The foreachBatch sink processes each micro-batch as a static DataFrame, enabling complex operations with transactional guarantees. If the batch fails, Spark reprocesses it with the same batchId.

Monitoring and Validation

Validate exactly-once semantics in production:

// Add processing metadata for auditing
val enriched = transactions
  .withColumn("processing_timestamp", current_timestamp())
  .withColumn("batch_id", expr("uuid()"))

// Count records at source and sink
val sourceCount = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "transactions")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
  .count()

val sinkCount = spark.read
  .format("delta")
  .load("/tmp/delta/transactions")
  .count()

println(s"Source: $sourceCount, Sink: $sinkCount, Match: ${sourceCount == sinkCount}")

Monitor checkpoint health and lag metrics through Spark UI and streaming query progress. Set up alerts for checkpoint failures or increasing processing delays that might indicate duplicate processing issues.

Exactly-once semantics requires careful coordination between sources, processing logic, and sinks. Structured Streaming simplifies this significantly, but understanding the underlying mechanisms ensures robust production deployments.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.