Spark Streaming - Monitoring and Metrics
Spark Streaming exposes metrics through multiple layers: the Spark UI, REST API, and programmatic listeners. The streaming tab in Spark UI displays real-time statistics, but production systems...
Key Insights
- Spark Streaming monitoring requires tracking both micro-batch execution metrics (scheduling delay, processing time) and application-level health indicators to prevent data loss and maintain throughput
- The StreamingListener API provides programmatic access to streaming metrics, enabling custom monitoring solutions that integrate with enterprise observability platforms like Prometheus, Datadog, or CloudWatch
- Production streaming applications need automated alerting on critical metrics: batch scheduling delay exceeding processing time signals backpressure, while increasing total delay indicates the system cannot keep pace with input rates
Understanding Spark Streaming Metrics Architecture
Spark Streaming exposes metrics through multiple layers: the Spark UI, REST API, and programmatic listeners. The streaming tab in Spark UI displays real-time statistics, but production systems require automated metric collection and alerting.
The core metrics architecture consists of:
- StreamingContext: Tracks overall streaming application state
- StreamingListener: Event-driven interface for metric collection
- MetricsSystem: Spark’s internal metrics framework supporting various sinks
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.scheduler._
class CustomStreamingListener extends StreamingListener {
override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
// Core metrics
val batchTime = batchInfo.batchTime.milliseconds
val processingDelay = batchInfo.processingDelay.getOrElse(0L)
val schedulingDelay = batchInfo.schedulingDelay.getOrElse(0L)
val totalDelay = batchInfo.totalDelay.getOrElse(0L)
val numRecords = batchInfo.numRecords
println(s"""
|Batch: $batchTime
|Records: $numRecords
|Scheduling Delay: ${schedulingDelay}ms
|Processing Time: ${processingDelay}ms
|Total Delay: ${totalDelay}ms
""".stripMargin)
}
override def onBatchStarted(batchStarted: BatchStarted): Unit = {
// Track batch start events
}
override def onReceiverError(receiverError: ReceiverError): Unit = {
// Alert on receiver failures
println(s"Receiver error: ${receiverError.receiverInfo.lastErrorMessage}")
}
}
// Register the listener
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.addStreamingListener(new CustomStreamingListener())
Critical Metrics for Production Monitoring
Production streaming applications require monitoring specific metrics that indicate system health and performance degradation.
Scheduling Delay: Time a batch waits in queue before processing begins. Increasing scheduling delay indicates the system cannot process batches fast enough.
Processing Time: Actual time spent processing a batch. Should remain stable and below batch interval.
Total Delay: End-to-end latency from batch submission to completion. Critical for SLA compliance.
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
class ProductionMetricsCollector extends StreamingListener {
private val metricsBuffer = scala.collection.mutable.ListBuffer[BatchMetrics]()
case class BatchMetrics(
batchTime: Long,
schedulingDelay: Long,
processingTime: Long,
totalDelay: Long,
inputRecords: Long,
processedRecords: Long
)
override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
val info = batchCompleted.batchInfo
val metrics = BatchMetrics(
batchTime = info.batchTime.milliseconds,
schedulingDelay = info.schedulingDelay.getOrElse(0L),
processingTime = info.processingDelay.getOrElse(0L),
totalDelay = info.totalDelay.getOrElse(0L),
inputRecords = info.numRecords,
processedRecords = info.outputOperationInfos.values
.map(_.numRecords.getOrElse(0L))
.sum
)
metricsBuffer += metrics
checkHealthThresholds(metrics)
exportToMonitoringSystem(metrics)
}
private def checkHealthThresholds(metrics: BatchMetrics): Unit = {
val batchInterval = 10000L // 10 seconds in ms
// Alert if scheduling delay exceeds 50% of batch interval
if (metrics.schedulingDelay > batchInterval * 0.5) {
alertBackpressure(metrics)
}
// Alert if processing time exceeds batch interval
if (metrics.processingTime > batchInterval) {
alertProcessingOverload(metrics)
}
// Alert if no records processed
if (metrics.inputRecords > 0 && metrics.processedRecords == 0) {
alertProcessingFailure(metrics)
}
}
private def alertBackpressure(metrics: BatchMetrics): Unit = {
println(s"ALERT: Backpressure detected. Scheduling delay: ${metrics.schedulingDelay}ms")
}
private def alertProcessingOverload(metrics: BatchMetrics): Unit = {
println(s"ALERT: Processing overload. Processing time: ${metrics.processingTime}ms")
}
private def alertProcessingFailure(metrics: BatchMetrics): Unit = {
println(s"ALERT: Processing failure. Input: ${metrics.inputRecords}, Processed: ${metrics.processedRecords}")
}
private def exportToMonitoringSystem(metrics: BatchMetrics): Unit = {
// Export to Prometheus, CloudWatch, etc.
}
}
Integrating with Prometheus
Prometheus integration enables standardized metrics collection and alerting. Use the Prometheus JMX exporter or implement custom metrics exporters.
import io.prometheus.client._
import io.prometheus.client.exporter.HTTPServer
object StreamingMetricsExporter {
// Define Prometheus metrics
private val schedulingDelayGauge = Gauge.build()
.name("spark_streaming_scheduling_delay_ms")
.help("Scheduling delay in milliseconds")
.register()
private val processingTimeGauge = Gauge.build()
.name("spark_streaming_processing_time_ms")
.help("Processing time in milliseconds")
.register()
private val totalDelayGauge = Gauge.build()
.name("spark_streaming_total_delay_ms")
.help("Total delay in milliseconds")
.register()
private val recordsProcessedCounter = Counter.build()
.name("spark_streaming_records_processed_total")
.help("Total records processed")
.register()
private val batchDurationHistogram = Histogram.build()
.name("spark_streaming_batch_duration_seconds")
.help("Batch processing duration")
.buckets(0.1, 0.5, 1.0, 2.5, 5.0, 10.0)
.register()
// Start Prometheus HTTP server
def startMetricsServer(port: Int): HTTPServer = {
new HTTPServer(port)
}
def updateMetrics(batchInfo: BatchInfo): Unit = {
schedulingDelayGauge.set(batchInfo.schedulingDelay.getOrElse(0L).toDouble)
processingTimeGauge.set(batchInfo.processingDelay.getOrElse(0L).toDouble)
totalDelayGauge.set(batchInfo.totalDelay.getOrElse(0L).toDouble)
recordsProcessedCounter.inc(batchInfo.numRecords.toDouble)
val processingSeconds = batchInfo.processingDelay.getOrElse(0L) / 1000.0
batchDurationHistogram.observe(processingSeconds)
}
}
class PrometheusStreamingListener extends StreamingListener {
override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
StreamingMetricsExporter.updateMetrics(batchCompleted.batchInfo)
}
}
// Initialize in main application
val metricsServer = StreamingMetricsExporter.startMetricsServer(9091)
ssc.addStreamingListener(new PrometheusStreamingListener())
Monitoring Kafka Integration Metrics
When using Kafka as a source, monitor consumer lag and offset progression to detect ingestion issues.
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
class KafkaMetricsMonitor(
bootstrapServers: String,
groupId: String,
topics: Set[String]
) {
private val consumer = createConsumer()
private def createConsumer(): KafkaConsumer[String, String] = {
val props = new java.util.Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
new KafkaConsumer[String, String](props)
}
def getConsumerLag(): Map[TopicPartition, Long] = {
val topicPartitions = topics.flatMap { topic =>
consumer.partitionsFor(topic).asScala.map { partitionInfo =>
new TopicPartition(partitionInfo.topic(), partitionInfo.partition())
}
}.toList
// Get committed offsets
val committedOffsets = consumer.committed(topicPartitions.toSet.asJava)
// Get end offsets (latest available)
val endOffsets = consumer.endOffsets(topicPartitions.asJava)
topicPartitions.map { tp =>
val committed = Option(committedOffsets.get(tp))
.map(_.offset())
.getOrElse(0L)
val end = endOffsets.get(tp)
val lag = end - committed
tp -> lag
}.toMap
}
def getTotalLag(): Long = {
getConsumerLag().values.sum
}
def monitorLag(thresholdMessages: Long): Unit = {
val lag = getTotalLag()
if (lag > thresholdMessages) {
println(s"WARNING: Consumer lag exceeded threshold. Current lag: $lag messages")
}
// Export to metrics system
println(s"Current consumer lag: $lag messages")
}
def close(): Unit = {
consumer.close()
}
}
// Usage in streaming application
val kafkaMonitor = new KafkaMetricsMonitor(
bootstrapServers = "localhost:9092",
groupId = "streaming-app-group",
topics = Set("input-topic")
)
// Monitor lag periodically
import java.util.concurrent.{Executors, TimeUnit}
val scheduler = Executors.newScheduledThreadPool(1)
scheduler.scheduleAtFixedRate(
new Runnable {
def run(): Unit = kafkaMonitor.monitorLag(10000L)
},
0,
30,
TimeUnit.SECONDS
)
Structured Streaming Metrics
Structured Streaming provides a different metrics interface through the StreamingQuery API.
import org.apache.spark.sql.streaming._
class StructuredStreamingMonitor(query: StreamingQuery) {
def collectMetrics(): Unit = {
val progress = query.lastProgress
if (progress != null) {
println(s"""
|Query: ${progress.name}
|Batch ID: ${progress.batchId}
|Input Rows: ${progress.numInputRows}
|Processed Rows/Sec: ${progress.processedRowsPerSecond}
|Duration: ${progress.durationMs.get("triggerExecution")}ms
""".stripMargin)
// Check for state store metrics
progress.stateOperators.foreach { stateOp =>
println(s"""
|State Store:
| Num Rows Total: ${stateOp.numRowsTotal}
| Num Rows Updated: ${stateOp.numRowsUpdated}
| Memory Used: ${stateOp.memoryUsedBytes} bytes
""".stripMargin)
}
// Source metrics
progress.sources.foreach { source =>
println(s"Source: ${source.description}, Input Rows: ${source.numInputRows}")
}
// Sink metrics
progress.sink.foreach { sink =>
println(s"Sink: ${sink.description}")
}
}
}
def checkQueryHealth(): Boolean = {
query.status.isDataAvailable && !query.exception.isDefined
}
def getExceptionIfPresent(): Option[StreamingQueryException] = {
query.exception
}
}
// Implement monitoring loop
val query = df.writeStream
.format("console")
.outputMode("append")
.start()
val monitor = new StructuredStreamingMonitor(query)
while (query.isActive) {
Thread.sleep(10000)
monitor.collectMetrics()
if (!monitor.checkQueryHealth()) {
monitor.getExceptionIfPresent().foreach { ex =>
println(s"Query failed: ${ex.getMessage}")
// Alert and handle failure
}
}
}
Implementing Automated Alerting
Production systems require automated alerting when metrics exceed thresholds. Integrate with PagerDuty, Slack, or email notification systems.
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.ExecutionContext.Implicits.global
trait AlertChannel {
def sendAlert(message: String, severity: AlertSeverity): Future[Unit]
}
sealed trait AlertSeverity
case object Critical extends AlertSeverity
case object Warning extends AlertSeverity
case object Info extends AlertSeverity
class SlackAlertChannel(webhookUrl: String) extends AlertChannel {
def sendAlert(message: String, severity: AlertSeverity): Future[Unit] = Future {
val payload = s"""{"text": "[${severity}] $message"}"""
// HTTP POST to Slack webhook
println(s"Sending to Slack: $payload")
}
}
class AlertingStreamingListener(
alertChannels: List[AlertChannel],
batchIntervalMs: Long
) extends StreamingListener {
private var consecutiveSlowBatches = 0
override def onBatchCompleted(batchCompleted: BatchCompleted): Unit = {
val info = batchCompleted.batchInfo
val processingTime = info.processingDelay.getOrElse(0L)
val schedulingDelay = info.schedulingDelay.getOrElse(0L)
// Check for processing time exceeding batch interval
if (processingTime > batchIntervalMs) {
consecutiveSlowBatches += 1
if (consecutiveSlowBatches >= 3) {
alertChannels.foreach { channel =>
channel.sendAlert(
s"Processing time (${processingTime}ms) exceeds batch interval for 3 consecutive batches",
Critical
)
}
}
} else {
consecutiveSlow