Spark Streaming - Deduplication in Streaming

Streaming data pipelines frequently encounter duplicate records due to at-least-once delivery semantics in message brokers, network retries, or upstream system failures. Unlike batch processing where...

Key Insights

  • Deduplication in Spark Streaming requires stateful processing with watermarking to handle late-arriving data while preventing unbounded state growth
  • The dropDuplicates() API with watermark configuration provides automatic duplicate removal based on event time, but understanding its memory implications is critical for production deployments
  • Custom deduplication logic using mapGroupsWithState or flatMapGroupsWithState offers fine-grained control over state management and enables complex business rules beyond simple key-based deduplication

Understanding Deduplication Challenges in Streaming

Streaming data pipelines frequently encounter duplicate records due to at-least-once delivery semantics in message brokers, network retries, or upstream system failures. Unlike batch processing where you can simply use distinct() on the entire dataset, streaming deduplication must maintain state across micro-batches while managing memory constraints.

The fundamental challenge is determining how long to remember previously seen records. Store state indefinitely and you’ll exhaust memory; expire state too aggressively and duplicates slip through. This is where watermarking becomes essential.

Basic Deduplication with dropDuplicates

Spark Structured Streaming provides dropDuplicates() for straightforward deduplication scenarios. This approach works when you can identify duplicates by comparing specific columns.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder()
  .appName("StreamingDeduplication")
  .getOrCreate()

import spark.implicits._

// Read from Kafka
val rawStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "transactions")
  .load()

// Parse JSON and extract fields
val parsedStream = rawStream
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json($"json", schema).as("data"))
  .select("data.*")
  .withColumn("event_time", $"timestamp".cast("timestamp"))

// Apply watermark and deduplication
val dedupedStream = parsedStream
  .withWatermark("event_time", "2 hours")
  .dropDuplicates("transaction_id", "event_time")

dedupedStream.writeStream
  .format("parquet")
  .option("path", "/output/deduplicated")
  .option("checkpointLocation", "/checkpoint/dedup")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

The watermark of “2 hours” tells Spark to maintain state for records within 2 hours of the latest event time seen. Records older than this threshold are dropped, and their state is purged. This prevents unbounded state growth.

Watermark Configuration and Trade-offs

Choosing the correct watermark duration requires understanding your data characteristics:

// Conservative watermark for systems with significant delays
val conservativeDedup = stream
  .withWatermark("event_time", "6 hours")
  .dropDuplicates("id")

// Aggressive watermark for low-latency requirements
val aggressiveDedup = stream
  .withWatermark("event_time", "15 minutes")
  .dropDuplicates("id")

The watermark duration should exceed your maximum expected delay plus a safety buffer. Monitor your streaming metrics to identify late-arriving data patterns:

// Add monitoring columns to track late data
val monitoredStream = parsedStream
  .withColumn("processing_time", current_timestamp())
  .withColumn("latency_seconds", 
    unix_timestamp($"processing_time") - unix_timestamp($"event_time"))
  .withWatermark("event_time", "2 hours")
  .dropDuplicates("transaction_id")

Composite Key Deduplication

Real-world scenarios often require deduplication across multiple fields. Consider an IoT system where device_id plus measurement_time identifies unique records:

case class SensorReading(
  device_id: String,
  measurement_time: Long,
  temperature: Double,
  humidity: Double,
  event_timestamp: java.sql.Timestamp
)

val sensorStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "sensors")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", sensorSchema).as("data"))
  .select("data.*")

// Deduplicate on composite key
val dedupedSensors = sensorStream
  .withWatermark("event_timestamp", "1 hour")
  .dropDuplicates("device_id", "measurement_time")
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/sensors")
  .option("path", "/delta/sensors")
  .start()

Custom Deduplication with mapGroupsWithState

When you need complex deduplication logic—such as keeping the first occurrence, last occurrence, or merging duplicate records—use mapGroupsWithState:

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

case class Event(id: String, timestamp: Long, value: Double, eventTime: java.sql.Timestamp)
case class DeduplicationState(lastSeen: Long, count: Int)

def deduplicateWithState(
  key: String,
  values: Iterator[Event],
  state: GroupState[DeduplicationState]
): Iterator[Event] = {
  
  val events = values.toList.sortBy(_.timestamp)
  
  if (state.exists) {
    val currentState = state.get
    // Filter out events we've already seen
    val newEvents = events.filter(_.timestamp > currentState.lastSeen)
    
    if (newEvents.nonEmpty) {
      state.update(DeduplicationState(
        newEvents.last.timestamp,
        currentState.count + newEvents.size
      ))
      newEvents.iterator
    } else {
      Iterator.empty
    }
  } else {
    // First time seeing this key
    state.update(DeduplicationState(events.last.timestamp, events.size))
    events.iterator
  }
}

val customDeduped = sensorStream
  .withWatermark("event_timestamp", "1 hour")
  .groupByKey(_.device_id)
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(deduplicateWithState)

Handling State Expiration and Cleanup

State management directly impacts memory consumption. Implement timeout handling to explicitly control state lifecycle:

def deduplicateWithTimeout(
  key: String,
  values: Iterator[Event],
  state: GroupState[DeduplicationState]
): Iterator[Event] = {
  
  // Check if state has timed out
  if (state.hasTimedOut) {
    state.remove()
    return Iterator.empty
  }
  
  val events = values.toList
  
  if (events.isEmpty) {
    Iterator.empty
  } else {
    val latestEvent = events.maxBy(_.timestamp)
    
    // Set timeout relative to event time
    state.setTimeoutTimestamp(latestEvent.eventTime.getTime + (2 * 60 * 60 * 1000))
    
    if (state.exists) {
      val currentState = state.get
      val newEvents = events.filter(_.timestamp > currentState.lastSeen)
      state.update(DeduplicationState(latestEvent.timestamp, currentState.count + newEvents.size))
      newEvents.iterator
    } else {
      state.update(DeduplicationState(latestEvent.timestamp, events.size))
      events.iterator
    }
  }
}

Deduplication with Exactly-Once Semantics

For critical pipelines requiring exactly-once processing, combine deduplication with idempotent writes:

val dedupedWithIdempotency = parsedStream
  .withWatermark("event_time", "2 hours")
  .dropDuplicates("transaction_id")
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/checkpoint/exactly-once")
  .option("path", "/delta/transactions")
  .start()

Delta Lake provides transactional guarantees that complement Spark’s deduplication. Even if the same micro-batch is reprocessed due to failure, Delta’s transaction log prevents duplicate writes.

Monitoring Deduplication Performance

Track deduplication effectiveness with custom metrics:

val withMetrics = parsedStream
  .withWatermark("event_time", "2 hours")
  .transform { df =>
    val inputCount = df.count()
    val deduped = df.dropDuplicates("transaction_id")
    val outputCount = deduped.count()
    val duplicateRate = (inputCount - outputCount).toDouble / inputCount
    
    // Log metrics (integrate with your monitoring system)
    println(s"Duplicate rate: ${duplicateRate * 100}%")
    
    deduped
  }

For production systems, integrate with monitoring frameworks:

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryStarted(event: QueryStartedEvent): Unit = {}
  
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    val progress = event.progress
    val stateOperators = progress.stateOperators
    
    stateOperators.foreach { op =>
      println(s"State memory: ${op.customMetrics.get("stateMemory")}")
      println(s"Num state rows: ${op.numRowsTotal}")
    }
  }
  
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
})

Production Considerations

Size your cluster based on state requirements. Each unique key maintains state, so cardinality directly impacts memory usage. For high-cardinality scenarios, consider:

  • Shorter watermark windows to reduce state size
  • Partitioning by time windows before deduplication
  • Using RocksDB state store for disk-backed state management

Configure RocksDB for large state:

spark.conf.set("spark.sql.streaming.stateStore.providerClass", 
  "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "true")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.blockSizeKB", "64")

Deduplication in streaming systems requires balancing correctness, latency, and resource utilization. Start with dropDuplicates() for simple cases, move to stateful operations when you need custom logic, and always monitor state growth in production.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.