Real-Time Data Pipeline with Spark Streaming and Kafka

Real-time data processing has shifted from a nice-to-have to a core requirement. Batch processing with hourly or daily refreshes no longer cuts it when your business needs immediate insights—whether...

Key Insights

  • Spark Structured Streaming treats real-time data as an unbounded table, letting you use familiar DataFrame APIs while Kafka handles the heavy lifting of message durability and partitioning.
  • Watermarking is non-negotiable for production pipelines—without it, your state will grow unbounded and eventually crash your application.
  • Checkpointing isn’t optional; it’s your recovery mechanism. Configure it from day one, not after your first production failure.

Introduction & Use Case Overview

Real-time data processing has shifted from a nice-to-have to a core requirement. Batch processing with hourly or daily refreshes no longer cuts it when your business needs immediate insights—whether that’s detecting fraudulent transactions, personalizing user experiences, or monitoring infrastructure health.

This article walks through building a production-ready real-time pipeline using Apache Kafka and Spark Structured Streaming. We’ll use a concrete use case: real-time clickstream analytics for an e-commerce platform. The goal is to track user behavior as it happens, compute rolling metrics like page views per product category, and detect unusual activity patterns within seconds of occurrence.

By the end, you’ll have working code for each pipeline component and understand the operational considerations that separate toy examples from production systems.

Architecture Overview

The pipeline follows a straightforward pattern:

[Web Events] → [Kafka] → [Spark Streaming] → [PostgreSQL/Delta Lake]
                    [Product Catalog]
                    (Static Dataset)

Kafka serves as the durable message broker. It decouples your event producers from consumers, handles backpressure naturally, and provides replay capability when things go wrong. Events are organized into topics, and topics are split into partitions for parallel processing.

Spark Structured Streaming consumes from Kafka and processes data in micro-batches (typically 100ms to a few seconds). Despite the micro-batch model, the API treats data as a continuous stream—you write transformations once, and Spark handles the incremental execution.

Key concepts to internalize:

  • Topics and Partitions: A topic is a logical channel; partitions enable parallelism. Your Spark job will have one task per partition.
  • Consumer Groups: Spark acts as a consumer group, with each executor claiming partitions. Kafka tracks offsets per group.
  • Micro-batch vs. Continuous: Structured Streaming defaults to micro-batch. Continuous processing exists but has limitations. Stick with micro-batch for most use cases.

Setting Up Kafka

For local development, use Docker Compose. For production, managed services like Confluent Cloud or Amazon MSK reduce operational burden significantly.

Create a topic with enough partitions to match your parallelism needs:

kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic clickstream \
  --partitions 6 \
  --replication-factor 3

Six partitions means up to six Spark tasks can process in parallel. The replication factor of three provides fault tolerance.

Here’s a Python producer simulating clickstream events:

import json
import random
import time
from datetime import datetime
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

categories = ['electronics', 'clothing', 'home', 'sports', 'books']
actions = ['view', 'click', 'add_to_cart', 'purchase']

def generate_event():
    return {
        'event_id': f"evt_{random.randint(100000, 999999)}",
        'user_id': f"user_{random.randint(1, 10000)}",
        'product_id': f"prod_{random.randint(1, 500)}",
        'category': random.choice(categories),
        'action': random.choice(actions),
        'timestamp': datetime.utcnow().isoformat(),
        'session_id': f"sess_{random.randint(1, 5000)}"
    }

if __name__ == "__main__":
    while True:
        event = generate_event()
        # Use user_id as key for partition affinity
        producer.send('clickstream', key=event['user_id'], value=event)
        time.sleep(random.uniform(0.01, 0.1))

Using user_id as the partition key ensures all events for a user land in the same partition, which matters for session-based analytics.

Configuring Spark Structured Streaming

Start with the SparkSession configuration. The Kafka connector isn’t bundled with Spark—you need to include it:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("ClickstreamProcessor")
  .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints")
  .config("spark.sql.shuffle.partitions", "6")
  .getOrCreate()

// Define the schema for our JSON events
val clickstreamSchema = StructType(Seq(
  StructField("event_id", StringType),
  StructField("user_id", StringType),
  StructField("product_id", StringType),
  StructField("category", StringType),
  StructField("action", StringType),
  StructField("timestamp", TimestampType),
  StructField("session_id", StringType)
))

// Read from Kafka
val rawStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "clickstream")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .load()

// Parse the JSON payload
val clickstream = rawStream
  .selectExpr("CAST(value AS STRING) as json_string", "timestamp as kafka_timestamp")
  .select(
    from_json(col("json_string"), clickstreamSchema).alias("data"),
    col("kafka_timestamp")
  )
  .select("data.*", "kafka_timestamp")

A few things to note:

  • startingOffsets: latest means new deployments start from current messages. Use earliest for reprocessing scenarios.
  • failOnDataLoss: false prevents crashes when Kafka retention deletes old messages. In production, monitor for data loss separately.
  • shuffle.partitions should match your Kafka partition count to avoid unnecessary shuffles.

Stream Processing & Transformations

Now for the interesting part: computing real-time metrics. Let’s calculate page views per category in 5-minute tumbling windows, with a watermark to handle late-arriving data.

val windowedCounts = clickstream
  .filter(col("action") === "view")
  .withWatermark("timestamp", "2 minutes")
  .groupBy(
    window(col("timestamp"), "5 minutes"),
    col("category")
  )
  .agg(
    count("*").alias("view_count"),
    countDistinct("user_id").alias("unique_users"),
    countDistinct("session_id").alias("unique_sessions")
  )
  .select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("category"),
    col("view_count"),
    col("unique_users"),
    col("unique_sessions")
  )

The watermark of 2 minutes tells Spark: “After 2 minutes past the window end, stop accepting late data for that window.” This bounds state size. Without it, Spark would keep every window’s state forever, eventually exhausting memory.

For enrichment, join the stream with a static product catalog:

// Load product catalog (refresh periodically in production)
val productCatalog = spark.read
  .option("header", "true")
  .csv("/data/product_catalog.csv")
  .select(
    col("product_id"),
    col("product_name"),
    col("brand"),
    col("price").cast(DoubleType)
  )

// Enrich clickstream with product details
val enrichedStream = clickstream
  .join(productCatalog, Seq("product_id"), "left")
  .select(
    col("event_id"),
    col("user_id"),
    col("product_id"),
    col("product_name"),
    col("brand"),
    col("price"),
    col("action"),
    col("timestamp")
  )

Stream-static joins are efficient because Spark broadcasts the static dataset to all executors. For large catalogs, consider using a broadcast hint or restructuring as a stream-stream join with a compacted Kafka topic.

Output Sinks & Checkpointing

Writing results requires choosing an output mode:

  • Append: Only new rows since last trigger. Use for non-aggregated data.
  • Complete: Entire result table. Use for aggregations where you need the full picture.
  • Update: Only changed rows. Efficient for aggregations when the sink supports updates.

Here’s writing windowed aggregations to PostgreSQL:

import java.util.Properties

val jdbcProperties = new Properties()
jdbcProperties.setProperty("user", "analytics")
jdbcProperties.setProperty("password", sys.env("DB_PASSWORD"))
jdbcProperties.setProperty("driver", "org.postgresql.Driver")

val query = windowedCounts.writeStream
  .outputMode("update")
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write
      .mode("append")
      .jdbc(
        "jdbc:postgresql://localhost:5432/analytics",
        "category_metrics",
        jdbcProperties
      )
  }
  .option("checkpointLocation", "/checkpoints/category_metrics")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

For high-throughput scenarios, Delta Lake provides better performance:

val deltaQuery = enrichedStream.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/checkpoints/enriched_clickstream")
  .option("path", "/data/delta/enriched_clickstream")
  .partitionBy("category")
  .trigger(Trigger.ProcessingTime("1 minute"))
  .start()

query.awaitTermination()

Checkpoint location is critical. It stores offset information and intermediate state. If your job fails, it resumes from the last checkpoint. Store checkpoints on durable storage (S3, HDFS)—never local disk in a cluster environment.

Monitoring, Tuning & Production Considerations

Once your pipeline runs, you need visibility into its health.

Key metrics to monitor:

  • Input rate vs. processing rate: If processing can’t keep up with input, you’ll see growing batch durations and eventual backpressure.
  • Batch duration: Should be less than your trigger interval. If batches take 45 seconds with a 30-second trigger, you’re falling behind.
  • State size: For stateful operations (windows, aggregations), monitor memory usage. Unbounded state growth indicates missing watermarks.

Access metrics programmatically:

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .start()

// Check progress
while (query.isActive) {
  val progress = query.lastProgress
  if (progress != null) {
    println(s"Input rows/sec: ${progress.inputRowsPerSecond}")
    println(s"Processed rows/sec: ${progress.processedRowsPerSecond}")
    println(s"Batch duration: ${progress.batchDuration}ms")
  }
  Thread.sleep(10000)
}

Common tuning parameters:

  • spark.sql.shuffle.partitions: Match to your Kafka partitions or slightly higher.
  • spark.streaming.backpressure.enabled: Set to true to automatically throttle when falling behind.
  • maxOffsetsPerTrigger: Limits records per batch, useful during catch-up scenarios.

Deployment considerations:

Run in cluster mode with dedicated resources. A typical starting point:

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 6 \
  --executor-cores 2 \
  --executor-memory 4g \
  --driver-memory 2g \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 \
  --class com.example.ClickstreamProcessor \
  clickstream-processor.jar

Match executor count to Kafka partitions. Two cores per executor handles the parallel reads and processing. Adjust memory based on state size—windowed aggregations over large time ranges need more.

Finally, implement alerting on query failures. Structured Streaming queries can silently stop on unhandled exceptions. Use query.exception to detect failures and integrate with your alerting system.

Real-time pipelines require more operational attention than batch jobs. Invest in monitoring early, and you’ll spend far less time debugging production issues later.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.