Spark Streaming - Kafka Source Integration

Spark Structured Streaming integrates with Kafka through the `kafka` source format. The minimal configuration requires bootstrap servers and topic subscription:

Key Insights

  • Spark Structured Streaming with Kafka requires careful configuration of offsets, checkpointing, and exactly-once semantics to prevent data loss or duplication in production environments.
  • The kafka source connector supports both batch and streaming modes, with critical parameters like startingOffsets, failOnDataLoss, and maxOffsetsPerTrigger controlling data ingestion behavior.
  • Production deployments must handle schema evolution, backpressure, and monitoring through Kafka metrics integration and Spark’s streaming query listeners.

Basic Kafka Source Configuration

Spark Structured Streaming integrates with Kafka through the kafka source format. The minimal configuration requires bootstrap servers and topic subscription:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder()
  .appName("KafkaStreamingApp")
  .config("spark.sql.streaming.schemaInference", "true")
  .getOrCreate()

val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .option("startingOffsets", "latest")
  .load()

// Kafka messages arrive as binary data
kafkaStream.printSchema()
/*
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
*/

The raw Kafka data requires deserialization. For JSON payloads:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("user_id", StringType, nullable = false),
  StructField("event_type", StringType, nullable = false),
  StructField("timestamp", LongType, nullable = false),
  StructField("properties", MapType(StringType, StringType), nullable = true)
))

val parsedStream = kafkaStream
  .selectExpr("CAST(value AS STRING) as json_value")
  .select(from_json(col("json_value"), schema).as("data"))
  .select("data.*")

val query = parsedStream.writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

Offset Management and Checkpointing

Offset management determines where Spark begins reading from Kafka. The startingOffsets option accepts three values:

// Start from latest available offset (default)
.option("startingOffsets", "latest")

// Start from earliest available offset
.option("startingOffsets", "earliest")

// Start from specific offsets per partition
.option("startingOffsets", """{"user-events":{"0":23,"1":45,"2":67}}""")

Checkpointing persists offset information for fault tolerance:

val checkpointDir = "s3a://data-lake/checkpoints/kafka-streaming"

val query = parsedStream.writeStream
  .outputMode("append")
  .format("parquet")
  .option("path", "s3a://data-lake/output/user-events")
  .option("checkpointLocation", checkpointDir)
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

The checkpoint directory stores:

  • Offset ranges processed per batch
  • Metadata about the streaming query
  • State store data for stateful operations

Critical: Never delete checkpoint directories while queries are running or you risk reprocessing data.

Multiple Topic Subscription Patterns

Kafka source supports multiple subscription strategies:

// Subscribe to multiple topics
val multiTopicStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events,logs,metrics")
  .load()

// Subscribe using pattern matching
val patternStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "prod-.*")
  .load()

// Assign specific partitions
val assignStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"events":[0,1,2],"logs":[0,1]}""")
  .load()

Route messages based on topic:

val routedStream = multiTopicStream
  .selectExpr("CAST(value AS STRING) as json_value", "topic")
  .where("topic IN ('events', 'logs')")
  .withColumn("event_data", 
    when(col("topic") === "events", from_json(col("json_value"), eventSchema))
    .when(col("topic") === "logs", from_json(col("json_value"), logSchema))
  )

Backpressure and Rate Limiting

Control ingestion rate to prevent overwhelming downstream systems:

val rateLimitedStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "high-volume-topic")
  .option("maxOffsetsPerTrigger", "100000")  // Max records per batch
  .option("minPartitions", "10")              // Min read partitions
  .option("kafka.max.poll.records", "5000")   // Kafka consumer config
  .load()

For dynamic backpressure based on processing time:

spark.conf.set("spark.sql.streaming.backpressure.enabled", "true")
spark.conf.set("spark.sql.streaming.kafka.consumer.pollTimeoutMs", "512")

val adaptiveStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "adaptive-topic")
  .load()

Exactly-Once Semantics with Idempotent Writes

Achieve exactly-once processing using Kafka transactions and idempotent sinks:

val deduplicatedStream = parsedStream
  .withWatermark("event_timestamp", "1 hour")
  .dropDuplicates("user_id", "event_id")

// Write to Kafka with exactly-once semantics
deduplicatedStream
  .selectExpr("CAST(user_id AS STRING) as key", "to_json(struct(*)) as value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "processed-events")
  .option("checkpointLocation", "s3a://checkpoints/output")
  .option("kafka.enable.idempotence", "true")
  .option("kafka.transactional.id", "spark-kafka-writer-1")
  .start()

For database sinks, use foreachBatch with transactional writes:

parsedStream.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql://localhost:5432/events")
      .option("dbtable", "user_events")
      .option("user", "writer")
      .option("password", "secret")
      .mode("append")
      .save()
  }
  .option("checkpointLocation", "s3a://checkpoints/jdbc-sink")
  .start()

Security and Authentication

Configure SASL/SSL for secure Kafka connections:

val secureStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker1:9093,broker2:9093")
  .option("subscribe", "secure-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.sasl.jaas.config", 
    """org.apache.kafka.common.security.plain.PlainLoginModule required 
       username="spark-consumer" 
       password="consumer-secret";""")
  .option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
  .option("kafka.ssl.truststore.password", "truststore-password")
  .load()

For Kerberos authentication:

.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.jaas.config", 
  """com.sun.security.auth.module.Krb5LoginModule required 
     useKeyTab=true 
     storeKey=true 
     keyTab="/path/to/spark.keytab" 
     principal="spark@REALM.COM";""")

Monitoring and Observability

Track streaming query metrics:

import org.apache.spark.sql.streaming.StreamingQueryListener

spark.streams.addListener(new StreamingQueryListener {
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
    println(s"Query started: ${event.id}")
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    val progress = event.progress
    println(s"Batch ${progress.batchId}: " +
            s"Input rows: ${progress.numInputRows}, " +
            s"Processing time: ${progress.durationMs.get("triggerExecution")} ms")
    
    progress.sources.foreach { source =>
      println(s"Source: ${source.description}")
      println(s"Start offset: ${source.startOffset}")
      println(s"End offset: ${source.endOffset}")
    }
  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println(s"Query terminated: ${event.id}")
  }
})

Access Kafka-specific metrics programmatically:

val query = parsedStream.writeStream.format("console").start()

// Monitor lag and throughput
while (query.isActive) {
  val progress = query.lastProgress
  if (progress != null) {
    val kafkaSource = progress.sources.head
    val inputRowsPerSecond = progress.inputRowsPerSecond
    val processedRowsPerSecond = progress.processedRowsPerSecond
    
    println(s"Input rate: $inputRowsPerSecond rows/sec")
    println(s"Processing rate: $processedRowsPerSecond rows/sec")
  }
  Thread.sleep(10000)
}

Production deployments should export these metrics to monitoring systems like Prometheus or CloudWatch for alerting on lag, throughput degradation, or processing failures.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.