Spark Streaming - Kafka Source Integration
Spark Structured Streaming integrates with Kafka through the `kafka` source format. The minimal configuration requires bootstrap servers and topic subscription:
Key Insights
- Spark Structured Streaming with Kafka requires careful configuration of offsets, checkpointing, and exactly-once semantics to prevent data loss or duplication in production environments.
- The kafka source connector supports both batch and streaming modes, with critical parameters like
startingOffsets,failOnDataLoss, andmaxOffsetsPerTriggercontrolling data ingestion behavior. - Production deployments must handle schema evolution, backpressure, and monitoring through Kafka metrics integration and Spark’s streaming query listeners.
Basic Kafka Source Configuration
Spark Structured Streaming integrates with Kafka through the kafka source format. The minimal configuration requires bootstrap servers and topic subscription:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder()
.appName("KafkaStreamingApp")
.config("spark.sql.streaming.schemaInference", "true")
.getOrCreate()
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.load()
// Kafka messages arrive as binary data
kafkaStream.printSchema()
/*
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
*/
The raw Kafka data requires deserialization. For JSON payloads:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("user_id", StringType, nullable = false),
StructField("event_type", StringType, nullable = false),
StructField("timestamp", LongType, nullable = false),
StructField("properties", MapType(StringType, StringType), nullable = true)
))
val parsedStream = kafkaStream
.selectExpr("CAST(value AS STRING) as json_value")
.select(from_json(col("json_value"), schema).as("data"))
.select("data.*")
val query = parsedStream.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
Offset Management and Checkpointing
Offset management determines where Spark begins reading from Kafka. The startingOffsets option accepts three values:
// Start from latest available offset (default)
.option("startingOffsets", "latest")
// Start from earliest available offset
.option("startingOffsets", "earliest")
// Start from specific offsets per partition
.option("startingOffsets", """{"user-events":{"0":23,"1":45,"2":67}}""")
Checkpointing persists offset information for fault tolerance:
val checkpointDir = "s3a://data-lake/checkpoints/kafka-streaming"
val query = parsedStream.writeStream
.outputMode("append")
.format("parquet")
.option("path", "s3a://data-lake/output/user-events")
.option("checkpointLocation", checkpointDir)
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
The checkpoint directory stores:
- Offset ranges processed per batch
- Metadata about the streaming query
- State store data for stateful operations
Critical: Never delete checkpoint directories while queries are running or you risk reprocessing data.
Multiple Topic Subscription Patterns
Kafka source supports multiple subscription strategies:
// Subscribe to multiple topics
val multiTopicStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events,logs,metrics")
.load()
// Subscribe using pattern matching
val patternStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "prod-.*")
.load()
// Assign specific partitions
val assignStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"events":[0,1,2],"logs":[0,1]}""")
.load()
Route messages based on topic:
val routedStream = multiTopicStream
.selectExpr("CAST(value AS STRING) as json_value", "topic")
.where("topic IN ('events', 'logs')")
.withColumn("event_data",
when(col("topic") === "events", from_json(col("json_value"), eventSchema))
.when(col("topic") === "logs", from_json(col("json_value"), logSchema))
)
Backpressure and Rate Limiting
Control ingestion rate to prevent overwhelming downstream systems:
val rateLimitedStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-volume-topic")
.option("maxOffsetsPerTrigger", "100000") // Max records per batch
.option("minPartitions", "10") // Min read partitions
.option("kafka.max.poll.records", "5000") // Kafka consumer config
.load()
For dynamic backpressure based on processing time:
spark.conf.set("spark.sql.streaming.backpressure.enabled", "true")
spark.conf.set("spark.sql.streaming.kafka.consumer.pollTimeoutMs", "512")
val adaptiveStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "adaptive-topic")
.load()
Exactly-Once Semantics with Idempotent Writes
Achieve exactly-once processing using Kafka transactions and idempotent sinks:
val deduplicatedStream = parsedStream
.withWatermark("event_timestamp", "1 hour")
.dropDuplicates("user_id", "event_id")
// Write to Kafka with exactly-once semantics
deduplicatedStream
.selectExpr("CAST(user_id AS STRING) as key", "to_json(struct(*)) as value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-events")
.option("checkpointLocation", "s3a://checkpoints/output")
.option("kafka.enable.idempotence", "true")
.option("kafka.transactional.id", "spark-kafka-writer-1")
.start()
For database sinks, use foreachBatch with transactional writes:
parsedStream.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/events")
.option("dbtable", "user_events")
.option("user", "writer")
.option("password", "secret")
.mode("append")
.save()
}
.option("checkpointLocation", "s3a://checkpoints/jdbc-sink")
.start()
Security and Authentication
Configure SASL/SSL for secure Kafka connections:
val secureStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9093,broker2:9093")
.option("subscribe", "secure-topic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"""org.apache.kafka.common.security.plain.PlainLoginModule required
username="spark-consumer"
password="consumer-secret";""")
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-password")
.load()
For Kerberos authentication:
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.jaas.config",
"""com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/spark.keytab"
principal="spark@REALM.COM";""")
Monitoring and Observability
Track streaming query metrics:
import org.apache.spark.sql.streaming.StreamingQueryListener
spark.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println(s"Query started: ${event.id}")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
val progress = event.progress
println(s"Batch ${progress.batchId}: " +
s"Input rows: ${progress.numInputRows}, " +
s"Processing time: ${progress.durationMs.get("triggerExecution")} ms")
progress.sources.foreach { source =>
println(s"Source: ${source.description}")
println(s"Start offset: ${source.startOffset}")
println(s"End offset: ${source.endOffset}")
}
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println(s"Query terminated: ${event.id}")
}
})
Access Kafka-specific metrics programmatically:
val query = parsedStream.writeStream.format("console").start()
// Monitor lag and throughput
while (query.isActive) {
val progress = query.lastProgress
if (progress != null) {
val kafkaSource = progress.sources.head
val inputRowsPerSecond = progress.inputRowsPerSecond
val processedRowsPerSecond = progress.processedRowsPerSecond
println(s"Input rate: $inputRowsPerSecond rows/sec")
println(s"Processing rate: $processedRowsPerSecond rows/sec")
}
Thread.sleep(10000)
}
Production deployments should export these metrics to monitoring systems like Prometheus or CloudWatch for alerting on lag, throughput degradation, or processing failures.