PySpark - Streaming Triggers Explained

Streaming triggers in PySpark determine when the streaming engine processes new data. Unlike traditional batch jobs that run once and complete, streaming queries continuously monitor data sources and...

Key Insights

  • PySpark Structured Streaming offers five trigger types that control micro-batch execution: unspecified (default), fixed interval, one-time, available-now, and continuous processing
  • Choosing the right trigger impacts end-to-end latency, resource utilization, and cost—fixed interval triggers provide predictable scheduling while available-now optimizes for batch-like workloads
  • Trigger configuration directly affects checkpoint behavior and exactly-once semantics, making it critical for production streaming pipelines

Understanding Streaming Triggers

Streaming triggers in PySpark determine when the streaming engine processes new data. Unlike traditional batch jobs that run once and complete, streaming queries continuously monitor data sources and process incoming records. The trigger mechanism controls the timing and frequency of these processing cycles.

Every streaming query in PySpark operates in micro-batches, even when configured for continuous processing. The trigger type you select fundamentally changes how these micro-batches execute, affecting latency, throughput, and resource consumption.

Default Trigger (Micro-Batch)

When no trigger is specified, PySpark uses the default micro-batch mode. The engine processes data as soon as the previous micro-batch completes, creating a continuous processing loop with minimal idle time.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, count

spark = SparkSession.builder \
    .appName("DefaultTrigger") \
    .getOrCreate()

# Read from Kafka stream
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Process the stream
query = df.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .start()  # No trigger specified - uses default

query.awaitTermination()

The default trigger maximizes throughput by eliminating artificial delays between batches. However, this can lead to resource contention if your processing logic can’t keep up with incoming data rates. Monitor the processingTime metric to ensure batches complete faster than new data arrives.

Fixed Interval Trigger

Fixed interval triggers introduce predictable scheduling by processing data at specified time intervals. This approach provides better resource planning and prevents the streaming job from monopolizing cluster resources.

from pyspark.sql.streaming import Trigger

# Process every 30 seconds
query = df.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/output") \
    .option("checkpointLocation", "/data/checkpoint") \
    .trigger(Trigger.ProcessingTime("30 seconds")) \
    .start()

Fixed intervals work well when:

  • You need predictable resource allocation patterns
  • Processing latency requirements allow for batching (e.g., 1-5 minute windows)
  • You’re joining streams with different arrival rates

The interval can be specified using strings like “10 seconds”, “2 minutes”, or “1 hour”. If processing takes longer than the interval, the next batch starts immediately after completion—intervals represent minimum time between batches, not guaranteed execution frequency.

# Multiple time unit formats
trigger_10s = Trigger.ProcessingTime("10 seconds")
trigger_2m = Trigger.ProcessingTime("2 minutes")
trigger_1h = Trigger.ProcessingTime("1 hour")

# Numeric format also supported
trigger_millis = Trigger.ProcessingTime(5000)  # 5000 milliseconds

One-Time Trigger

One-time triggers execute a single micro-batch and then stop. This converts your streaming query into a batch job while maintaining streaming semantics and checkpoint compatibility.

# Process all available data once and stop
query = df.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("delta") \
    .option("path", "/data/delta_table") \
    .option("checkpointLocation", "/data/checkpoint") \
    .trigger(Trigger.Once()) \
    .start()

query.awaitTermination()  # Will complete after single batch

Use one-time triggers for:

  • Testing streaming logic without continuous execution
  • Scheduled batch jobs that need streaming checkpoint management
  • Backfilling historical data through streaming pipelines

One-time triggers maintain checkpoint state, enabling incremental processing across multiple executions. Each run processes only new data since the last checkpoint, making it ideal for scheduled jobs that need exactly-once processing guarantees.

Available-Now Trigger

Introduced in Spark 3.3, available-now triggers process all currently available data across multiple micro-batches, then stop. Unlike one-time triggers that process everything in a single batch, available-now splits work into multiple batches for better memory management.

# Process all available data in multiple micro-batches
query = df.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("delta") \
    .option("path", "/data/delta_table") \
    .option("checkpointLocation", "/data/checkpoint") \
    .trigger(Trigger.AvailableNow()) \
    .start()

query.awaitTermination()

This trigger type excels when processing large backlogs:

from delta.tables import DeltaTable

# Scenario: Process 1TB of accumulated Kafka data
large_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "large_topic") \
    .option("startingOffsets", "earliest") \
    .load()

# Transform and aggregate
processed = large_stream \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .groupBy("key") \
    .count()

# Write with available-now trigger
query = processed.writeStream \
    .format("delta") \
    .outputMode("complete") \
    .option("path", "/data/aggregated") \
    .option("checkpointLocation", "/data/agg_checkpoint") \
    .trigger(Trigger.AvailableNow()) \
    .start()

query.awaitTermination()

Available-now triggers automatically determine optimal batch sizes based on available memory and data volume, preventing out-of-memory errors common with one-time triggers on large datasets.

Continuous Processing Trigger

Continuous processing achieves millisecond-level latency by processing records individually rather than in micro-batches. This experimental feature supports limited operations and requires careful configuration.

# Continuous processing with 1-second checkpointing
query = df.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output") \
    .option("checkpointLocation", "/data/continuous_checkpoint") \
    .trigger(Trigger.Continuous("1 second")) \
    .start()

Continuous processing limitations:

  • Only supports map-like operations (select, filter, map)
  • No aggregations or joins
  • Limited sink support (Kafka, console, memory)
  • Requires Spark 2.3+

The checkpoint interval in continuous mode controls fault-tolerance overhead. Shorter intervals (1 second) provide faster recovery but increase checkpoint I/O. Longer intervals (10 seconds) reduce overhead but extend recovery time.

Trigger Selection Strategy

Choose triggers based on your latency and throughput requirements:

class StreamingPipeline:
    def __init__(self, spark, latency_requirement):
        self.spark = spark
        self.latency = latency_requirement
    
    def get_trigger(self):
        if self.latency == "real-time":  # < 100ms
            return Trigger.Continuous("1 second")
        elif self.latency == "near-real-time":  # < 5s
            return Trigger.ProcessingTime("2 seconds")
        elif self.latency == "micro-batch":  # < 1m
            return Trigger.ProcessingTime("10 seconds")
        elif self.latency == "batch":
            return Trigger.AvailableNow()
        else:
            return None  # Default trigger
    
    def create_query(self, df, output_path):
        return df.writeStream \
            .format("delta") \
            .option("path", output_path) \
            .option("checkpointLocation", f"{output_path}/_checkpoint") \
            .trigger(self.get_trigger()) \
            .start()

Monitoring Trigger Performance

Track trigger effectiveness using StreamingQuery metrics:

query = df.writeStream \
    .format("console") \
    .trigger(Trigger.ProcessingTime("10 seconds")) \
    .start()

# Monitor query progress
import time
while query.isActive:
    progress = query.lastProgress
    if progress:
        print(f"Batch ID: {progress['batchId']}")
        print(f"Processing Time: {progress['durationMs']['triggerExecution']}ms")
        print(f"Input Rows: {progress['numInputRows']}")
        print(f"Trigger Interval: 10000ms")
    time.sleep(10)

Key metrics to monitor:

  • triggerExecution: Total time to process a batch
  • numInputRows: Records processed per batch
  • inputRowsPerSecond: Incoming data rate

If triggerExecution consistently exceeds your trigger interval, consider increasing the interval, optimizing transformations, or scaling cluster resources.

Trigger selection directly impacts your streaming pipeline’s behavior, cost, and reliability. Start with fixed interval triggers for predictable behavior, then optimize based on observed metrics and requirements.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.