Spark Streaming - Triggers (ProcessingTime, Once, Continuous)

Spark Structured Streaming processes data as a series of incremental queries against an unbounded input table. Triggers determine the timing and frequency of these query executions. Without an...

Key Insights

  • Triggers control when Spark Structured Streaming executes micro-batches, with ProcessingTime offering fixed-interval processing, Once enabling batch-like execution, and Continuous providing low-latency sub-second processing for specific operations
  • ProcessingTime triggers balance throughput and latency by processing data at configurable intervals, making them ideal for most streaming workloads where sub-second latency isn’t critical
  • Continuous triggers achieve millisecond-level latency but support only map-like operations and limited sinks, requiring careful evaluation of trade-offs between latency requirements and operational complexity

Understanding Trigger Semantics

Spark Structured Streaming processes data as a series of incremental queries against an unbounded input table. Triggers determine the timing and frequency of these query executions. Without an explicit trigger, Spark uses a default micro-batch trigger that processes data as soon as the previous micro-batch completes.

The trigger mechanism directly impacts your streaming application’s latency, throughput, and resource utilization. Choosing the wrong trigger can lead to resource starvation, increased costs, or failure to meet SLA requirements.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, count

spark = SparkSession.builder \
    .appName("TriggerExample") \
    .getOrCreate()

# Read from Kafka source
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Parse and transform
parsed_df = df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

ProcessingTime Trigger

ProcessingTime triggers execute micro-batches at fixed intervals. This is the most commonly used trigger type for production workloads, offering predictable resource consumption and good throughput.

# Process every 30 seconds
query = parsed_df \
    .groupBy(window(col("timestamp"), "5 minutes"), col("event_type")) \
    .agg(count("*").alias("event_count")) \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

The interval you specify represents the minimum time between micro-batch executions. If processing a micro-batch takes longer than the interval, Spark immediately starts the next micro-batch after completion. This prevents micro-batch overlap but means intervals act as a ceiling, not a guarantee.

// Scala example with different intervals
import org.apache.spark.sql.streaming.Trigger

// Process every 10 seconds
val query1 = streamingDF
  .writeStream
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("parquet")
  .option("path", "/data/output")
  .option("checkpointLocation", "/data/checkpoint1")
  .start()

// Process every 2 minutes
val query2 = streamingDF
  .writeStream
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .format("delta")
  .option("path", "/data/delta-output")
  .option("checkpointLocation", "/data/checkpoint2")
  .start()

When choosing an interval, consider:

  • Data arrival rate: Higher throughput sources benefit from longer intervals to batch more data
  • Latency requirements: Shorter intervals reduce end-to-end latency but increase overhead
  • Downstream system capacity: Match intervals to your sink’s write capacity
  • State size: Stateful operations accumulate more data with longer intervals

Once Trigger

The Once trigger processes all available data in a single micro-batch and then stops. This transforms your streaming query into a batch job, useful for scheduled processing or testing streaming pipelines with batch semantics.

from pyspark.sql.streaming import Trigger

# Process all available data once and stop
query = parsed_df \
    .withWatermark("timestamp", "1 hour") \
    .groupBy(window(col("timestamp"), "10 minutes"), col("user_id")) \
    .agg(count("*").alias("action_count")) \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/hourly-aggregates") \
    .option("checkpointLocation", "/data/checkpoint-hourly") \
    .trigger(once=True) \
    .start()

query.awaitTermination()

This pattern is particularly valuable for:

Scheduled batch processing: Run streaming code on a schedule (e.g., hourly via Airflow) while maintaining exactly-once semantics and checkpoint-based progress tracking.

# Airflow DAG example
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-eng',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('hourly_streaming_batch',
          default_args=default_args,
          schedule_interval='@hourly')

process_task = SparkSubmitOperator(
    task_id='process_streaming_data',
    application='/jobs/streaming_once.py',
    conf={'spark.sql.streaming.trigger.once': 'true'},
    dag=dag
)

Testing and development: Validate streaming logic with finite datasets before deploying continuous processing.

# Test with Once trigger
def test_streaming_aggregation():
    test_df = spark.readStream \
        .format("json") \
        .schema(test_schema) \
        .load("/test/input")
    
    query = test_df \
        .groupBy("category") \
        .agg(sum("amount").alias("total")) \
        .writeStream \
        .format("memory") \
        .queryName("test_output") \
        .trigger(once=True) \
        .start()
    
    query.awaitTermination()
    
    result = spark.sql("SELECT * FROM test_output")
    assert result.count() > 0

Continuous Trigger

Continuous processing mode achieves millisecond-level latency by using long-running tasks that continuously read, process, and write data without micro-batch boundaries. This is fundamentally different from the micro-batch execution model.

from pyspark.sql.streaming import Trigger

# Continuous processing with 1-second checkpointing
query = parsed_df \
    .select(col("sensor_id"), col("temperature"), col("timestamp")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "processed-sensors") \
    .option("checkpointLocation", "/data/checkpoint-continuous") \
    .trigger(continuous="1 second") \
    .start()

The parameter in continuous mode specifies the checkpoint interval, not the processing interval. Checkpoints occur asynchronously without stopping data processing.

Critical limitations:

Continuous mode only supports:

  • Map-like operations: select, map, flatMap, mapPartitions
  • Projections and filters
  • Limited sinks: Kafka, console, memory (for testing)

It does not support:

  • Aggregations
  • Joins
  • Deduplication
  • Most stateful operations
  • File-based sinks (Parquet, Delta, etc.)
// Scala continuous processing example
import org.apache.spark.sql.streaming.Trigger

val continuousQuery = inputDF
  .select($"device_id", $"metric_value", $"event_time")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
  .option("topic", "low-latency-metrics")
  .option("checkpointLocation", "/checkpoints/continuous")
  .trigger(Trigger.Continuous("1 second"))
  .start()

Choosing the Right Trigger

Use ProcessingTime when:

  • You need aggregations, joins, or complex stateful operations
  • Latency requirements are in seconds or minutes
  • You want predictable resource consumption
  • Writing to file-based sinks (Parquet, Delta, Iceberg)

Use Once when:

  • Implementing scheduled batch processing with streaming semantics
  • Testing streaming pipelines
  • Processing historical data with checkpoint-based recovery
  • Integrating with workflow orchestrators

Use Continuous when:

  • Sub-second latency is mandatory
  • Operations are simple transformations
  • Writing to Kafka or similar streaming sinks
  • You can accept operational complexity and limited functionality
# Monitoring trigger performance
def monitor_streaming_query(query):
    while query.isActive:
        status = query.status
        progress = query.lastProgress
        
        if progress:
            print(f"Batch ID: {progress['batchId']}")
            print(f"Input rows: {progress['numInputRows']}")
            print(f"Processing time: {progress['durationMs']['triggerExecution']} ms")
            print(f"Trigger: {progress.get('trigger', 'default')}")
        
        time.sleep(10)

# Start monitoring in separate thread
import threading
monitor_thread = threading.Thread(target=monitor_streaming_query, args=(query,))
monitor_thread.start()

Most production workloads use ProcessingTime triggers with intervals between 10 seconds and 5 minutes, balancing latency and efficiency. Start with 30-second intervals and adjust based on monitoring data, input rates, and downstream capacity.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.