Spark Scala - Structured Streaming Example

Spark Structured Streaming fundamentally changed how we think about stream processing. Instead of treating streams as sequences of discrete events that require specialized APIs, Spark presents...

Key Insights

  • Structured Streaming treats real-time data as an unbounded table, letting you use familiar DataFrame operations for stream processing without learning a new paradigm.
  • Watermarking and windowing are essential for handling late-arriving data and producing meaningful aggregations over time-based intervals.
  • Checkpointing isn’t optional in production—it’s the foundation of fault tolerance and exactly-once processing guarantees.

Introduction to Structured Streaming

Spark Structured Streaming fundamentally changed how we think about stream processing. Instead of treating streams as sequences of discrete events that require specialized APIs, Spark presents streaming data as a continuously appending table. Every new record that arrives extends this table, and your queries run incrementally against it.

This “continuous DataFrame” model means you write streaming applications using the same DataFrame and Dataset APIs you already know from batch processing. The Spark engine handles the complexity of incremental execution, state management, and fault recovery behind the scenes.

The practical benefit is significant: teams don’t need separate skill sets for batch and streaming workloads. A developer comfortable with Spark SQL can build production streaming pipelines with minimal additional learning.

Setting Up the Project

Start with a proper build configuration. Structured Streaming is part of Spark SQL, so you need the core Spark dependencies plus any connector libraries for your sources and sinks.

// build.sbt
ThisBuild / scalaVersion := "2.12.18"

lazy val root = (project in file("."))
  .settings(
    name := "streaming-example",
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided",
      "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0"
    )
  )

Note the provided scope for spark-sql—you’ll have Spark available at runtime in your cluster. The Kafka connector is a separate artifact and needs to be included in your application JAR.

Initialize your SparkSession with appropriate streaming configurations:

import org.apache.spark.sql.SparkSession

object StreamingApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("StructuredStreamingExample")
      .config("spark.sql.shuffle.partitions", "4")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .getOrCreate()

    import spark.implicits._
    
    // Your streaming logic here
  }
}

The shuffle partitions setting matters more in streaming than batch. The default of 200 creates excessive overhead for typical streaming workloads. Start with a lower number and tune based on your data volume.

Creating a Streaming Source

Kafka is the dominant streaming source in production environments. Here’s how to read from a Kafka topic with proper schema handling:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val kafkaSchema = new StructType()
  .add("userId", StringType)
  .add("action", StringType)
  .add("timestamp", TimestampType)
  .add("metadata", MapType(StringType, StringType))

val rawStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .load()

val parsedStream = rawStream
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json($"json", kafkaSchema).as("data"))
  .select("data.*")

Kafka messages arrive as binary key-value pairs. You’ll typically cast the value to a string and parse it as JSON. The from_json function with an explicit schema gives you a properly typed DataFrame.

For local development and testing, the socket source provides a quick way to push data manually:

val socketStream = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

Run nc -lk 9999 in a terminal and type messages to test your pipeline. Don’t use this in production—it has no fault tolerance guarantees.

Transformations and Processing Logic

Streaming transformations work identically to batch operations, with the addition of time-based windowing for aggregations. Here’s a practical example computing user activity counts in tumbling windows:

val windowedCounts = parsedStream
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window($"timestamp", "5 minutes"),
    $"action"
  )
  .agg(
    count("*").as("event_count"),
    countDistinct("userId").as("unique_users")
  )

The withWatermark call is critical. It tells Spark how long to wait for late-arriving data before finalizing window results. Without it, Spark must maintain state indefinitely, eventually exhausting memory.

A 10-minute watermark means events arriving more than 10 minutes after the window closes will be dropped. Choose this value based on your data characteristics and business requirements. Too short and you lose legitimate late data; too long and you consume excessive memory and delay results.

For more complex logic, you can chain multiple transformations:

val enrichedStream = parsedStream
  .filter($"action".isNotNull)
  .withColumn("hour", hour($"timestamp"))
  .withColumn("is_weekend", 
    dayofweek($"timestamp").isin(1, 7))
  .where($"action" =!= "heartbeat")

Output Sinks and Write Modes

The output mode determines what gets written to the sink after each micro-batch:

  • Append: Only new rows since the last trigger. Use this for non-aggregated streams or when writing to append-only sinks.
  • Complete: The entire result table. Required for aggregations without watermarks.
  • Update: Only rows that changed. Efficient for aggregations when the sink supports updates.

Here’s the console sink for debugging:

val consoleQuery = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .option("truncate", "false")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

For production Kafka output:

val kafkaOutput = windowedCounts
  .selectExpr(
    "CAST(action AS STRING) AS key",
    "to_json(struct(*)) AS value"
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "aggregated-events")
  .option("checkpointLocation", "/tmp/checkpoints/kafka-sink")
  .outputMode("update")
  .start()

Every streaming query needs a checkpoint location. This isn’t optional—it’s how Spark tracks progress and recovers from failures.

File sinks work well for data lake architectures:

val fileOutput = parsedStream.writeStream
  .format("parquet")
  .option("path", "/data/events")
  .option("checkpointLocation", "/tmp/checkpoints/file-sink")
  .partitionBy("action", "hour")
  .outputMode("append")
  .start()

Managing State and Fault Tolerance

Checkpointing stores two things: the offset progress (which records have been processed) and the state data (aggregation results, window contents). Configure checkpoint locations on reliable distributed storage:

val query = aggregatedStream.writeStream
  .option("checkpointLocation", "s3a://my-bucket/checkpoints/app-name")
  .format("kafka")
  .start()

Use a unique checkpoint path per query. Reusing paths across different queries causes corruption.

For custom stateful logic beyond built-in aggregations, use mapGroupsWithState:

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

case class UserState(lastSeen: Long, sessionCount: Int)
case class UserEvent(userId: String, timestamp: Long)
case class SessionOutput(userId: String, sessions: Int, duration: Long)

def updateUserState(
    userId: String,
    events: Iterator[UserEvent],
    state: GroupState[UserState]
): SessionOutput = {
  
  val currentTime = events.map(_.timestamp).max
  val oldState = state.getOption.getOrElse(UserState(currentTime, 0))
  
  val sessionTimeout = 30 * 60 * 1000L // 30 minutes
  val newSessionCount = if (currentTime - oldState.lastSeen > sessionTimeout) {
    oldState.sessionCount + 1
  } else {
    oldState.sessionCount
  }
  
  val newState = UserState(currentTime, newSessionCount)
  state.update(newState)
  state.setTimeoutDuration("1 hour")
  
  SessionOutput(userId, newSessionCount, currentTime - oldState.lastSeen)
}

val sessionStream = parsedStream
  .as[UserEvent]
  .groupByKey(_.userId)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateUserState)

This pattern enables session detection, complex event processing, and any logic requiring memory across micro-batches.

Running and Monitoring the Application

Start your queries and keep the application alive:

val query1 = stream1.writeStream.start()
val query2 = stream2.writeStream.start()

// Wait for any query to terminate (or all of them)
spark.streams.awaitAnyTermination()

For graceful shutdown handling:

sys.addShutdownHook {
  println("Stopping streaming queries...")
  spark.streams.active.foreach(_.stop())
  spark.stop()
}

Monitor query progress programmatically with a StreamingQueryListener:

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

spark.streams.addListener(new StreamingQueryListener {
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
    println(s"Query started: ${event.name}")
  }
  
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    val progress = event.progress
    println(s"Processed ${progress.numInputRows} rows")
    println(s"Processing rate: ${progress.processedRowsPerSecond} rows/sec")
    
    if (progress.processedRowsPerSecond < 1000) {
      // Alert on slow processing
    }
  }
  
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println(s"Query terminated: ${event.exception}")
  }
})

The Spark UI provides visual monitoring at http://driver-host:4040/streaming. Watch for increasing batch durations, growing state sizes, and processing lag. These metrics tell you when to scale resources or optimize your logic.

Structured Streaming handles the hard parts of stream processing—exactly-once semantics, state management, fault recovery—so you can focus on business logic. Start simple, add complexity incrementally, and always test with realistic data volumes before production deployment.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.