Spark Streaming - Deduplication in Streaming
Streaming data pipelines frequently encounter duplicate records due to at-least-once delivery semantics in message brokers, network retries, or upstream system failures. Unlike batch processing where...
Key Insights
- Deduplication in Spark Streaming requires stateful processing with watermarking to handle late-arriving data while preventing unbounded state growth
- The
dropDuplicates()API with watermark configuration provides automatic duplicate removal based on event time, but understanding its memory implications is critical for production deployments - Custom deduplication logic using
mapGroupsWithStateorflatMapGroupsWithStateoffers fine-grained control over state management and enables complex business rules beyond simple key-based deduplication
Understanding Deduplication Challenges in Streaming
Streaming data pipelines frequently encounter duplicate records due to at-least-once delivery semantics in message brokers, network retries, or upstream system failures. Unlike batch processing where you can simply use distinct() on the entire dataset, streaming deduplication must maintain state across micro-batches while managing memory constraints.
The fundamental challenge is determining how long to remember previously seen records. Store state indefinitely and you’ll exhaust memory; expire state too aggressively and duplicates slip through. This is where watermarking becomes essential.
Basic Deduplication with dropDuplicates
Spark Structured Streaming provides dropDuplicates() for straightforward deduplication scenarios. This approach works when you can identify duplicates by comparing specific columns.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder()
.appName("StreamingDeduplication")
.getOrCreate()
import spark.implicits._
// Read from Kafka
val rawStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.load()
// Parse JSON and extract fields
val parsedStream = rawStream
.selectExpr("CAST(value AS STRING) as json")
.select(from_json($"json", schema).as("data"))
.select("data.*")
.withColumn("event_time", $"timestamp".cast("timestamp"))
// Apply watermark and deduplication
val dedupedStream = parsedStream
.withWatermark("event_time", "2 hours")
.dropDuplicates("transaction_id", "event_time")
dedupedStream.writeStream
.format("parquet")
.option("path", "/output/deduplicated")
.option("checkpointLocation", "/checkpoint/dedup")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
The watermark of “2 hours” tells Spark to maintain state for records within 2 hours of the latest event time seen. Records older than this threshold are dropped, and their state is purged. This prevents unbounded state growth.
Watermark Configuration and Trade-offs
Choosing the correct watermark duration requires understanding your data characteristics:
// Conservative watermark for systems with significant delays
val conservativeDedup = stream
.withWatermark("event_time", "6 hours")
.dropDuplicates("id")
// Aggressive watermark for low-latency requirements
val aggressiveDedup = stream
.withWatermark("event_time", "15 minutes")
.dropDuplicates("id")
The watermark duration should exceed your maximum expected delay plus a safety buffer. Monitor your streaming metrics to identify late-arriving data patterns:
// Add monitoring columns to track late data
val monitoredStream = parsedStream
.withColumn("processing_time", current_timestamp())
.withColumn("latency_seconds",
unix_timestamp($"processing_time") - unix_timestamp($"event_time"))
.withWatermark("event_time", "2 hours")
.dropDuplicates("transaction_id")
Composite Key Deduplication
Real-world scenarios often require deduplication across multiple fields. Consider an IoT system where device_id plus measurement_time identifies unique records:
case class SensorReading(
device_id: String,
measurement_time: Long,
temperature: Double,
humidity: Double,
event_timestamp: java.sql.Timestamp
)
val sensorStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sensors")
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", sensorSchema).as("data"))
.select("data.*")
// Deduplicate on composite key
val dedupedSensors = sensorStream
.withWatermark("event_timestamp", "1 hour")
.dropDuplicates("device_id", "measurement_time")
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/sensors")
.option("path", "/delta/sensors")
.start()
Custom Deduplication with mapGroupsWithState
When you need complex deduplication logic—such as keeping the first occurrence, last occurrence, or merging duplicate records—use mapGroupsWithState:
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
case class Event(id: String, timestamp: Long, value: Double, eventTime: java.sql.Timestamp)
case class DeduplicationState(lastSeen: Long, count: Int)
def deduplicateWithState(
key: String,
values: Iterator[Event],
state: GroupState[DeduplicationState]
): Iterator[Event] = {
val events = values.toList.sortBy(_.timestamp)
if (state.exists) {
val currentState = state.get
// Filter out events we've already seen
val newEvents = events.filter(_.timestamp > currentState.lastSeen)
if (newEvents.nonEmpty) {
state.update(DeduplicationState(
newEvents.last.timestamp,
currentState.count + newEvents.size
))
newEvents.iterator
} else {
Iterator.empty
}
} else {
// First time seeing this key
state.update(DeduplicationState(events.last.timestamp, events.size))
events.iterator
}
}
val customDeduped = sensorStream
.withWatermark("event_timestamp", "1 hour")
.groupByKey(_.device_id)
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(deduplicateWithState)
Handling State Expiration and Cleanup
State management directly impacts memory consumption. Implement timeout handling to explicitly control state lifecycle:
def deduplicateWithTimeout(
key: String,
values: Iterator[Event],
state: GroupState[DeduplicationState]
): Iterator[Event] = {
// Check if state has timed out
if (state.hasTimedOut) {
state.remove()
return Iterator.empty
}
val events = values.toList
if (events.isEmpty) {
Iterator.empty
} else {
val latestEvent = events.maxBy(_.timestamp)
// Set timeout relative to event time
state.setTimeoutTimestamp(latestEvent.eventTime.getTime + (2 * 60 * 60 * 1000))
if (state.exists) {
val currentState = state.get
val newEvents = events.filter(_.timestamp > currentState.lastSeen)
state.update(DeduplicationState(latestEvent.timestamp, currentState.count + newEvents.size))
newEvents.iterator
} else {
state.update(DeduplicationState(latestEvent.timestamp, events.size))
events.iterator
}
}
}
Deduplication with Exactly-Once Semantics
For critical pipelines requiring exactly-once processing, combine deduplication with idempotent writes:
val dedupedWithIdempotency = parsedStream
.withWatermark("event_time", "2 hours")
.dropDuplicates("transaction_id")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/exactly-once")
.option("path", "/delta/transactions")
.start()
Delta Lake provides transactional guarantees that complement Spark’s deduplication. Even if the same micro-batch is reprocessed due to failure, Delta’s transaction log prevents duplicate writes.
Monitoring Deduplication Performance
Track deduplication effectiveness with custom metrics:
val withMetrics = parsedStream
.withWatermark("event_time", "2 hours")
.transform { df =>
val inputCount = df.count()
val deduped = df.dropDuplicates("transaction_id")
val outputCount = deduped.count()
val duplicateRate = (inputCount - outputCount).toDouble / inputCount
// Log metrics (integrate with your monitoring system)
println(s"Duplicate rate: ${duplicateRate * 100}%")
deduped
}
For production systems, integrate with monitoring frameworks:
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
val progress = event.progress
val stateOperators = progress.stateOperators
stateOperators.foreach { op =>
println(s"State memory: ${op.customMetrics.get("stateMemory")}")
println(s"Num state rows: ${op.numRowsTotal}")
}
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
})
Production Considerations
Size your cluster based on state requirements. Each unique key maintains state, so cardinality directly impacts memory usage. For high-cardinality scenarios, consider:
- Shorter watermark windows to reduce state size
- Partitioning by time windows before deduplication
- Using RocksDB state store for disk-backed state management
Configure RocksDB for large state:
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "true")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.blockSizeKB", "64")
Deduplication in streaming systems requires balancing correctness, latency, and resource utilization. Start with dropDuplicates() for simple cases, move to stateful operations when you need custom logic, and always monitor state growth in production.