Spark Streaming - Monitoring and Metrics

Spark Streaming exposes metrics through multiple layers: the Spark UI, REST API, and programmatic listeners. The streaming tab in Spark UI displays real-time statistics, but production systems...

Key Insights

  • Spark Streaming monitoring requires tracking both micro-batch execution metrics (scheduling delay, processing time) and application-level health indicators to prevent data loss and maintain throughput
  • The StreamingListener API provides programmatic access to streaming metrics, enabling custom monitoring solutions that integrate with enterprise observability platforms like Prometheus, Datadog, or CloudWatch
  • Production streaming applications need automated alerting on critical metrics: batch scheduling delay exceeding processing time signals backpressure, while increasing total delay indicates the system cannot keep pace with input rates

Understanding Spark Streaming Metrics Architecture

Spark Streaming exposes metrics through multiple layers: the Spark UI, REST API, and programmatic listeners. The streaming tab in Spark UI displays real-time statistics, but production systems require automated metric collection and alerting.

The core metrics architecture consists of:

  • StreamingContext: Tracks overall streaming application state
  • StreamingListener: Event-driven interface for metric collection
  • MetricsSystem: Spark’s internal metrics framework supporting various sinks
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.scheduler._

class CustomStreamingListener extends StreamingListener {
  
  override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
    val batchInfo = batchCompleted.batchInfo
    
    // Core metrics
    val batchTime = batchInfo.batchTime.milliseconds
    val processingDelay = batchInfo.processingDelay.getOrElse(0L)
    val schedulingDelay = batchInfo.schedulingDelay.getOrElse(0L)
    val totalDelay = batchInfo.totalDelay.getOrElse(0L)
    val numRecords = batchInfo.numRecords
    
    println(s"""
      |Batch: $batchTime
      |Records: $numRecords
      |Scheduling Delay: ${schedulingDelay}ms
      |Processing Time: ${processingDelay}ms
      |Total Delay: ${totalDelay}ms
    """.stripMargin)
  }
  
  override def onBatchStarted(batchStarted: BatchStarted): Unit = {
    // Track batch start events
  }
  
  override def onReceiverError(receiverError: ReceiverError): Unit = {
    // Alert on receiver failures
    println(s"Receiver error: ${receiverError.receiverInfo.lastErrorMessage}")
  }
}

// Register the listener
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.addStreamingListener(new CustomStreamingListener())

Critical Metrics for Production Monitoring

Production streaming applications require monitoring specific metrics that indicate system health and performance degradation.

Scheduling Delay: Time a batch waits in queue before processing begins. Increasing scheduling delay indicates the system cannot process batches fast enough.

Processing Time: Actual time spent processing a batch. Should remain stable and below batch interval.

Total Delay: End-to-end latency from batch submission to completion. Critical for SLA compliance.

import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

class ProductionMetricsCollector extends StreamingListener {
  
  private val metricsBuffer = scala.collection.mutable.ListBuffer[BatchMetrics]()
  
  case class BatchMetrics(
    batchTime: Long,
    schedulingDelay: Long,
    processingTime: Long,
    totalDelay: Long,
    inputRecords: Long,
    processedRecords: Long
  )
  
  override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
    val info = batchCompleted.batchInfo
    
    val metrics = BatchMetrics(
      batchTime = info.batchTime.milliseconds,
      schedulingDelay = info.schedulingDelay.getOrElse(0L),
      processingTime = info.processingDelay.getOrElse(0L),
      totalDelay = info.totalDelay.getOrElse(0L),
      inputRecords = info.numRecords,
      processedRecords = info.outputOperationInfos.values
        .map(_.numRecords.getOrElse(0L))
        .sum
    )
    
    metricsBuffer += metrics
    checkHealthThresholds(metrics)
    exportToMonitoringSystem(metrics)
  }
  
  private def checkHealthThresholds(metrics: BatchMetrics): Unit = {
    val batchInterval = 10000L // 10 seconds in ms
    
    // Alert if scheduling delay exceeds 50% of batch interval
    if (metrics.schedulingDelay > batchInterval * 0.5) {
      alertBackpressure(metrics)
    }
    
    // Alert if processing time exceeds batch interval
    if (metrics.processingTime > batchInterval) {
      alertProcessingOverload(metrics)
    }
    
    // Alert if no records processed
    if (metrics.inputRecords > 0 && metrics.processedRecords == 0) {
      alertProcessingFailure(metrics)
    }
  }
  
  private def alertBackpressure(metrics: BatchMetrics): Unit = {
    println(s"ALERT: Backpressure detected. Scheduling delay: ${metrics.schedulingDelay}ms")
  }
  
  private def alertProcessingOverload(metrics: BatchMetrics): Unit = {
    println(s"ALERT: Processing overload. Processing time: ${metrics.processingTime}ms")
  }
  
  private def alertProcessingFailure(metrics: BatchMetrics): Unit = {
    println(s"ALERT: Processing failure. Input: ${metrics.inputRecords}, Processed: ${metrics.processedRecords}")
  }
  
  private def exportToMonitoringSystem(metrics: BatchMetrics): Unit = {
    // Export to Prometheus, CloudWatch, etc.
  }
}

Integrating with Prometheus

Prometheus integration enables standardized metrics collection and alerting. Use the Prometheus JMX exporter or implement custom metrics exporters.

import io.prometheus.client._
import io.prometheus.client.exporter.HTTPServer

object StreamingMetricsExporter {
  
  // Define Prometheus metrics
  private val schedulingDelayGauge = Gauge.build()
    .name("spark_streaming_scheduling_delay_ms")
    .help("Scheduling delay in milliseconds")
    .register()
  
  private val processingTimeGauge = Gauge.build()
    .name("spark_streaming_processing_time_ms")
    .help("Processing time in milliseconds")
    .register()
  
  private val totalDelayGauge = Gauge.build()
    .name("spark_streaming_total_delay_ms")
    .help("Total delay in milliseconds")
    .register()
  
  private val recordsProcessedCounter = Counter.build()
    .name("spark_streaming_records_processed_total")
    .help("Total records processed")
    .register()
  
  private val batchDurationHistogram = Histogram.build()
    .name("spark_streaming_batch_duration_seconds")
    .help("Batch processing duration")
    .buckets(0.1, 0.5, 1.0, 2.5, 5.0, 10.0)
    .register()
  
  // Start Prometheus HTTP server
  def startMetricsServer(port: Int): HTTPServer = {
    new HTTPServer(port)
  }
  
  def updateMetrics(batchInfo: BatchInfo): Unit = {
    schedulingDelayGauge.set(batchInfo.schedulingDelay.getOrElse(0L).toDouble)
    processingTimeGauge.set(batchInfo.processingDelay.getOrElse(0L).toDouble)
    totalDelayGauge.set(batchInfo.totalDelay.getOrElse(0L).toDouble)
    recordsProcessedCounter.inc(batchInfo.numRecords.toDouble)
    
    val processingSeconds = batchInfo.processingDelay.getOrElse(0L) / 1000.0
    batchDurationHistogram.observe(processingSeconds)
  }
}

class PrometheusStreamingListener extends StreamingListener {
  
  override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
    StreamingMetricsExporter.updateMetrics(batchCompleted.batchInfo)
  }
}

// Initialize in main application
val metricsServer = StreamingMetricsExporter.startMetricsServer(9091)
ssc.addStreamingListener(new PrometheusStreamingListener())

Monitoring Kafka Integration Metrics

When using Kafka as a source, monitor consumer lag and offset progression to detect ingestion issues.

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._

class KafkaMetricsMonitor(
  bootstrapServers: String,
  groupId: String,
  topics: Set[String]
) {
  
  private val consumer = createConsumer()
  
  private def createConsumer(): KafkaConsumer[String, String] = {
    val props = new java.util.Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringDeserializer")
    
    new KafkaConsumer[String, String](props)
  }
  
  def getConsumerLag(): Map[TopicPartition, Long] = {
    val topicPartitions = topics.flatMap { topic =>
      consumer.partitionsFor(topic).asScala.map { partitionInfo =>
        new TopicPartition(partitionInfo.topic(), partitionInfo.partition())
      }
    }.toList
    
    // Get committed offsets
    val committedOffsets = consumer.committed(topicPartitions.toSet.asJava)
    
    // Get end offsets (latest available)
    val endOffsets = consumer.endOffsets(topicPartitions.asJava)
    
    topicPartitions.map { tp =>
      val committed = Option(committedOffsets.get(tp))
        .map(_.offset())
        .getOrElse(0L)
      val end = endOffsets.get(tp)
      val lag = end - committed
      
      tp -> lag
    }.toMap
  }
  
  def getTotalLag(): Long = {
    getConsumerLag().values.sum
  }
  
  def monitorLag(thresholdMessages: Long): Unit = {
    val lag = getTotalLag()
    
    if (lag > thresholdMessages) {
      println(s"WARNING: Consumer lag exceeded threshold. Current lag: $lag messages")
    }
    
    // Export to metrics system
    println(s"Current consumer lag: $lag messages")
  }
  
  def close(): Unit = {
    consumer.close()
  }
}

// Usage in streaming application
val kafkaMonitor = new KafkaMetricsMonitor(
  bootstrapServers = "localhost:9092",
  groupId = "streaming-app-group",
  topics = Set("input-topic")
)

// Monitor lag periodically
import java.util.concurrent.{Executors, TimeUnit}

val scheduler = Executors.newScheduledThreadPool(1)
scheduler.scheduleAtFixedRate(
  new Runnable {
    def run(): Unit = kafkaMonitor.monitorLag(10000L)
  },
  0,
  30,
  TimeUnit.SECONDS
)

Structured Streaming Metrics

Structured Streaming provides a different metrics interface through the StreamingQuery API.

import org.apache.spark.sql.streaming._

class StructuredStreamingMonitor(query: StreamingQuery) {
  
  def collectMetrics(): Unit = {
    val progress = query.lastProgress
    
    if (progress != null) {
      println(s"""
        |Query: ${progress.name}
        |Batch ID: ${progress.batchId}
        |Input Rows: ${progress.numInputRows}
        |Processed Rows/Sec: ${progress.processedRowsPerSecond}
        |Duration: ${progress.durationMs.get("triggerExecution")}ms
      """.stripMargin)
      
      // Check for state store metrics
      progress.stateOperators.foreach { stateOp =>
        println(s"""
          |State Store:
          |  Num Rows Total: ${stateOp.numRowsTotal}
          |  Num Rows Updated: ${stateOp.numRowsUpdated}
          |  Memory Used: ${stateOp.memoryUsedBytes} bytes
        """.stripMargin)
      }
      
      // Source metrics
      progress.sources.foreach { source =>
        println(s"Source: ${source.description}, Input Rows: ${source.numInputRows}")
      }
      
      // Sink metrics
      progress.sink.foreach { sink =>
        println(s"Sink: ${sink.description}")
      }
    }
  }
  
  def checkQueryHealth(): Boolean = {
    query.status.isDataAvailable && !query.exception.isDefined
  }
  
  def getExceptionIfPresent(): Option[StreamingQueryException] = {
    query.exception
  }
}

// Implement monitoring loop
val query = df.writeStream
  .format("console")
  .outputMode("append")
  .start()

val monitor = new StructuredStreamingMonitor(query)

while (query.isActive) {
  Thread.sleep(10000)
  monitor.collectMetrics()
  
  if (!monitor.checkQueryHealth()) {
    monitor.getExceptionIfPresent().foreach { ex =>
      println(s"Query failed: ${ex.getMessage}")
      // Alert and handle failure
    }
  }
}

Implementing Automated Alerting

Production systems require automated alerting when metrics exceed thresholds. Integrate with PagerDuty, Slack, or email notification systems.

import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.ExecutionContext.Implicits.global

trait AlertChannel {
  def sendAlert(message: String, severity: AlertSeverity): Future[Unit]
}

sealed trait AlertSeverity
case object Critical extends AlertSeverity
case object Warning extends AlertSeverity
case object Info extends AlertSeverity

class SlackAlertChannel(webhookUrl: String) extends AlertChannel {
  def sendAlert(message: String, severity: AlertSeverity): Future[Unit] = Future {
    val payload = s"""{"text": "[${severity}] $message"}"""
    // HTTP POST to Slack webhook
    println(s"Sending to Slack: $payload")
  }
}

class AlertingStreamingListener(
  alertChannels: List[AlertChannel],
  batchIntervalMs: Long
) extends StreamingListener {
  
  private var consecutiveSlowBatches = 0
  
  override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
    val info = batchCompleted.batchInfo
    val processingTime = info.processingDelay.getOrElse(0L)
    val schedulingDelay = info.schedulingDelay.getOrElse(0L)
    
    // Check for processing time exceeding batch interval
    if (processingTime > batchIntervalMs) {
      consecutiveSlowBatches += 1
      
      if (consecutiveSlowBatches >= 3) {
        alertChannels.foreach { channel =>
          channel.sendAlert(
            s"Processing time (${processingTime}ms) exceeds batch interval for 3 consecutive batches",
            Critical
          )
        }
      }
    } else {
      consecutiveSlow

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.