Spark Scala - Kafka Integration
Streaming data pipelines have become the backbone of modern data architectures. Whether you're processing clickstream data, IoT sensor readings, or financial transactions, the ability to handle data...
Key Insights
- Spark Structured Streaming and Kafka form a powerful combination for building real-time data pipelines, with Spark handling the processing logic while Kafka manages durable, distributed message transport.
- Checkpointing is non-negotiable in production—it enables exactly-once semantics and allows your streaming jobs to recover gracefully from failures without data loss or duplication.
- Schema management is your responsibility when reading from Kafka; messages arrive as binary, and how you deserialize them determines the reliability of your entire pipeline.
Introduction to Spark-Kafka Integration
Streaming data pipelines have become the backbone of modern data architectures. Whether you’re processing clickstream data, IoT sensor readings, or financial transactions, the ability to handle data in near real-time separates reactive systems from proactive ones.
Apache Kafka excels at durable, distributed message transport. Apache Spark Structured Streaming excels at complex data transformations and analytics. Together, they form a pipeline architecture where Kafka handles ingestion and buffering while Spark handles the heavy lifting of processing, aggregation, and enrichment.
This article walks through building a production-ready Spark Scala application that consumes from Kafka, processes streaming data, and writes results back to Kafka. We’ll cover the practical details that documentation often glosses over.
Environment Setup & Dependencies
Before writing any streaming code, you need the right dependencies. The spark-sql-kafka connector bridges Spark Structured Streaming with Kafka. Version alignment matters here—mismatched versions cause cryptic runtime errors.
// build.sbt
name := "spark-kafka-streaming"
version := "1.0.0"
scalaVersion := "2.12.18"
val sparkVersion = "3.5.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"
)
// Assembly plugin for fat JAR
assembly / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
A few notes on this configuration. First, mark Spark dependencies as provided since your cluster already has them. Second, the Kafka connector uses the 0-10 suffix regardless of your actual Kafka version—it refers to the client protocol version, not the broker version. Third, Scala 2.12 remains the safer choice for Spark 3.x compatibility; Scala 2.13 support exists but has edge cases.
Your Kafka cluster needs to be accessible from all Spark executors. In production, this typically means proper network configuration, authentication (SASL/SSL), and sufficient topic partitions to parallelize consumption.
Reading from Kafka Topics
Reading from Kafka starts with readStream and requires understanding how Spark handles Kafka’s binary messages. Every Kafka message has a key and value, both arriving as byte arrays. You must deserialize them yourself.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object KafkaConsumer {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("KafkaStreamProcessor")
.getOrCreate()
import spark.implicits._
// Define schema for incoming JSON messages
val eventSchema = StructType(Seq(
StructField("userId", StringType, nullable = false),
StructField("eventType", StringType, nullable = false),
StructField("timestamp", TimestampType, nullable = false),
StructField("properties", MapType(StringType, StringType), nullable = true)
))
// Read from Kafka
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.load()
// Parse the binary value as JSON
val parsedStream = kafkaStream
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as json_value")
.select(
col("key"),
from_json(col("json_value"), eventSchema).as("data")
)
.select("key", "data.*")
parsedStream.printSchema()
}
}
The startingOffsets option controls where consumption begins. Use earliest for backfilling historical data, latest for real-time only, or a JSON string specifying exact offsets per partition. The failOnDataLoss option is worth understanding: setting it to false prevents job failures when Kafka retention deletes messages before Spark processes them. In production, you want monitoring around this rather than job crashes.
Processing Streaming Data
Once you have a parsed DataFrame, streaming transformations look nearly identical to batch operations. The key difference is handling time—specifically, event time versus processing time.
Watermarking tells Spark how long to wait for late-arriving data before finalizing aggregations. Without it, Spark must maintain state indefinitely, eventually exhausting memory.
import org.apache.spark.sql.streaming.Trigger
// Windowed aggregation with watermarking
val aggregatedStream = parsedStream
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("eventType")
)
.agg(
count("*").as("eventCount"),
countDistinct("userId").as("uniqueUsers"),
collect_set("userId").as("userIds")
)
.select(
col("window.start").as("windowStart"),
col("window.end").as("windowEnd"),
col("eventType"),
col("eventCount"),
col("uniqueUsers")
)
// More complex transformation: sessionization
val sessionizedStream = parsedStream
.withWatermark("timestamp", "30 minutes")
.groupBy(
col("userId"),
session_window(col("timestamp"), "15 minutes")
)
.agg(
count("*").as("eventsInSession"),
min("timestamp").as("sessionStart"),
max("timestamp").as("sessionEnd"),
collect_list("eventType").as("eventSequence")
)
The 10-minute watermark means Spark will wait up to 10 minutes for late data before closing a window. Events arriving after this threshold are dropped. Tune this based on your data’s latency characteristics—too short and you lose data, too long and you consume excessive memory.
Session windows (introduced in Spark 3.2) are particularly useful for user behavior analysis. They group events by gaps in activity rather than fixed time boundaries.
Writing to Kafka Topics
Writing processed results back to Kafka requires structuring your output with key and value columns. Both must be strings or binary.
// Prepare output for Kafka
val kafkaOutput = aggregatedStream
.select(
col("eventType").as("key"),
to_json(struct(
col("windowStart"),
col("windowEnd"),
col("eventType"),
col("eventCount"),
col("uniqueUsers")
)).as("value")
)
// Write to Kafka
val query = kafkaOutput.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
.option("topic", "aggregated-events")
.option("checkpointLocation", "/checkpoints/aggregated-events")
.outputMode("update")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
query.awaitTermination()
Output modes determine what gets written. append writes only new rows (works with aggregations only when watermarking is enabled). update writes rows that changed since the last trigger. complete writes the entire result table every trigger—avoid this for large state.
The trigger interval controls how frequently Spark processes micro-batches. Shorter intervals reduce latency but increase overhead. For most use cases, 10-60 seconds balances these concerns.
Checkpointing & Fault Tolerance
Checkpointing is mandatory for production streaming jobs. It stores offset information and intermediate state, enabling exactly-once processing semantics and recovery from failures.
val reliableQuery = kafkaOutput.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
.option("topic", "aggregated-events")
.option("checkpointLocation", "s3a://my-bucket/checkpoints/aggregated-events")
.outputMode("update")
.trigger(Trigger.ProcessingTime("30 seconds"))
.option("kafka.acks", "all")
.option("kafka.retries", "3")
.option("kafka.max.in.flight.requests.per.connection", "1")
.start()
Store checkpoints on distributed storage (S3, HDFS, GCS)—never local disk. Each streaming query needs its own checkpoint directory. Changing your query logic often requires clearing checkpoints and reprocessing, so design for this eventuality.
The Kafka producer options (acks, retries, max.in.flight.requests.per.connection) ensure idempotent writes. With acks=all and limited in-flight requests, you get exactly-once delivery to Kafka even during retries.
Production Considerations
Production deployments require tuning beyond the defaults. Here’s a configuration template that addresses common performance bottlenecks:
val spark = SparkSession.builder()
.appName("KafkaStreamProcessor")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.streaming.kafka.maxRatePerPartition", "10000")
.config("spark.sql.streaming.stateStore.stateSchemaCheck", "false")
.config("spark.sql.streaming.metricsEnabled", "true")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "2")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.minExecutors", "2")
.config("spark.dynamicAllocation.maxExecutors", "10")
.getOrCreate()
// Monitor progress programmatically
query.recentProgress.foreach { progress =>
println(s"Input rows/sec: ${progress.inputRowsPerSecond}")
println(s"Processing rows/sec: ${progress.processedRowsPerSecond}")
println(s"Batch duration: ${progress.batchDuration}ms")
}
Key tuning parameters: shuffle.partitions should roughly match your Kafka partition count multiplied by a small factor. maxRatePerPartition prevents overwhelming your cluster during backfill scenarios. Enable streaming metrics for Spark UI visibility.
Watch for these common pitfalls. Serialization errors usually indicate schema mismatches—add error handling with from_json’s PERMISSIVE mode. Backpressure manifests as increasing batch durations; scale executors or reduce trigger frequency. State store growth indicates missing or insufficient watermarks.
Monitor the Spark UI’s Structured Streaming tab religiously. It shows input rates, processing rates, and state store sizes—the metrics that predict failures before they happen.