Spark Streaming - Fault Tolerance and Checkpointing
• Spark Streaming achieves fault tolerance through Write-Ahead Logs (WAL) and checkpointing, ensuring exactly-once semantics for stateful operations and at-least-once for receivers
Key Insights
• Spark Streaming achieves fault tolerance through Write-Ahead Logs (WAL) and checkpointing, ensuring exactly-once semantics for stateful operations and at-least-once for receivers • Checkpointing stores both metadata (configuration, DStream operations) and data (RDDs for stateful transformations) to HDFS-compatible storage, enabling driver and executor recovery • Production deployments require careful checkpoint interval tuning (5-10x batch interval), reliable distributed storage, and proper handling of code changes that invalidate checkpoints
Understanding Spark Streaming Fault Tolerance
Spark Streaming builds fault tolerance on top of Spark’s core RDD lineage mechanism. Each batch of streaming data creates an RDD with a lineage graph that can reconstruct lost partitions. However, streaming applications introduce additional complexity: long-running computations, stateful operations, and the need to recover from driver failures.
The framework provides two primary fault tolerance mechanisms:
RDD Lineage Recovery: For stateless transformations, Spark recomputes lost data from source systems using the RDD lineage graph. This works well for transformations like map, filter, and reduceByKey on individual batches.
Checkpointing: For stateful operations and metadata preservation, Spark writes periodic snapshots to reliable storage. This prevents unbounded lineage chains and enables recovery from driver failures.
Checkpoint Types and Storage
Spark Streaming uses two distinct checkpoint types:
import org.apache.spark.streaming._
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("hdfs://namenode:9000/spark-checkpoints")
// Metadata checkpointing (automatic)
// - Configuration
// - DStream operation graph
// - Incomplete batches
// Data checkpointing (for stateful ops)
val stateStream = stream.updateStateByKey(updateFunction)
// Automatically checkpoints RDDs
Metadata checkpointing saves the streaming computation graph and configuration. This enables driver recovery and is mandatory for stateful operations.
Data checkpointing persists intermediate RDDs generated by stateful transformations like updateStateByKey and reduceByKeyAndWindow. Without this, the lineage graph would grow indefinitely.
Storage requirements:
// HDFS (recommended for production)
ssc.checkpoint("hdfs://namenode:9000/checkpoints/app-name")
// S3 (with proper configuration)
ssc.checkpoint("s3a://bucket-name/checkpoints/app-name")
// Local filesystem (development only)
ssc.checkpoint("file:///tmp/checkpoints")
Implementing Stateful Operations with Checkpointing
Stateful operations maintain information across batches. Here’s a practical implementation tracking user sessions:
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
case class UserEvent(userId: String, action: String, timestamp: Long)
case class SessionState(eventCount: Int, lastSeen: Long, actions: List[String])
def updateSessionState(
newEvents: Seq[UserEvent],
currentState: Option[SessionState]
): Option[SessionState] = {
val state = currentState.getOrElse(
SessionState(0, 0L, List.empty)
)
val sessionTimeout = 30 * 60 * 1000 // 30 minutes
val latestTimestamp = newEvents.map(_.timestamp).maxOption.getOrElse(state.lastSeen)
// Expire old sessions
if (latestTimestamp - state.lastSeen > sessionTimeout) {
None // Remove state
} else {
Some(SessionState(
eventCount = state.eventCount + newEvents.length,
lastSeen = latestTimestamp,
actions = (state.actions ++ newEvents.map(_.action)).takeRight(100)
))
}
}
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("hdfs://namenode:9000/checkpoints/sessions")
val events: DStream[UserEvent] = // ... from Kafka or other source
val userEvents = events.map(e => (e.userId, e))
val sessions = userEvents.updateStateByKey(updateSessionState _)
sessions.checkpoint(Duration(60000)) // Checkpoint every 60 seconds
The checkpoint() method on DStreams sets the checkpoint interval. This controls how frequently RDDs are saved to reliable storage.
Window Operations and Checkpointing
Window operations maintain state across multiple batches. Inverse functions optimize performance by avoiding full recomputation:
val windowedCounts = events
.map(e => (e.action, 1))
.reduceByKeyAndWindow(
reduceFunc = _ + _,
invReduceFunc = _ - _,
windowDuration = Minutes(10),
slideDuration = Seconds(30)
)
// Checkpoint interval should be multiple of slide duration
windowedCounts.checkpoint(Minutes(2))
Without the inverse function, Spark recomputes the entire window on each slide. With it, Spark adds new batches and removes old ones incrementally, but requires checkpointing for fault tolerance.
Driver Recovery Pattern
Production applications must handle driver failures. Use the StreamingContext.getOrCreate pattern:
def createStreamingContext(): StreamingContext = {
val conf = new SparkConf().setAppName("FaultTolerantApp")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("hdfs://namenode:9000/checkpoints/app")
// Define your streaming computation
val stream = ssc.socketTextStream("localhost", 9999)
val wordCounts = stream
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordCounts.print()
ssc
}
val checkpointDir = "hdfs://namenode:9000/checkpoints/app"
val ssc = StreamingContext.getOrCreate(
checkpointDir,
createStreamingContext _
)
ssc.start()
ssc.awaitTermination()
On startup, getOrCreate attempts to recreate the context from checkpoint data. If no checkpoint exists, it calls the creation function.
Kafka Integration with Checkpointing
Kafka-based streaming requires careful offset management for exactly-once semantics:
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-streaming-app",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("input-topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
// Process RDD
val results = rdd.map(_.value).collect()
// Commit offsets after successful processing
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Disable auto-commit and manually commit offsets after processing to ensure exactly-once semantics. Checkpointing stores offset information alongside state data.
Checkpoint Interval Tuning
The checkpoint interval impacts performance and recovery time:
// Too frequent: High I/O overhead
stream.checkpoint(Seconds(5)) // Batch interval = 10s
// Recommended: 5-10x batch interval
stream.checkpoint(Seconds(50)) // Batch interval = 10s
// Too infrequent: Slow recovery, long lineage chains
stream.checkpoint(Minutes(10)) // Batch interval = 10s
Guidelines:
- Set checkpoint interval to 5-10x the batch interval
- For window operations, use a multiple of the slide duration
- Monitor checkpoint write times in Spark UI
- Ensure checkpoint storage has sufficient I/O throughput
Handling Code Changes
Checkpoints serialize the entire DStream graph. Code changes often invalidate existing checkpoints:
// Version 1 (checkpointed)
val counts = stream.map(_.length).reduce(_ + _)
// Version 2 (incompatible)
val counts = stream.map(_.split(",").length).reduce(_ + _)
Deployment strategies:
Option 1: Delete checkpoints (loses state)
hdfs dfs -rm -r /checkpoints/app-name
Option 2: Versioned checkpoint directories
val version = "v2"
ssc.checkpoint(s"hdfs://namenode:9000/checkpoints/app-$version")
Option 3: Dual deployment (zero downtime) Deploy new version with different checkpoint path, drain old version, then decommission.
Production Monitoring
Monitor checkpoint health in production:
ssc.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val processingDelay = batchCompleted.batchInfo.processingDelay.getOrElse(0L)
val schedulingDelay = batchCompleted.batchInfo.schedulingDelay.getOrElse(0L)
// Alert if processing falls behind
if (schedulingDelay > 30000) {
logger.warn(s"High scheduling delay: ${schedulingDelay}ms")
}
}
})
Key metrics:
- Checkpoint write duration
- Scheduling delay (indicates backlog)
- Processing time vs. batch interval
- Checkpoint directory size growth
Properly configured checkpointing transforms Spark Streaming from a fragile prototype into a production-grade system capable of handling failures while maintaining data consistency.