PySpark - Streaming Output Modes (Append, Complete, Update)

PySpark Structured Streaming output modes determine how the streaming query writes data to external storage systems. The choice of output mode depends on your query type, whether you're performing...

Key Insights

  • PySpark Structured Streaming offers three output modes (Append, Complete, Update) that control how results are written to sinks, with each mode suited for specific aggregation and windowing scenarios.
  • Append mode only works with queries where existing rows never change (like event-time watermarked aggregations), while Complete mode rewrites the entire result table on every trigger, making it suitable for small aggregation tables.
  • Update mode provides the optimal balance for most stateful operations by only outputting changed rows, significantly reducing I/O overhead compared to Complete mode while supporting a broader range of queries than Append mode.

Understanding Output Modes

PySpark Structured Streaming output modes determine how the streaming query writes data to external storage systems. The choice of output mode depends on your query type, whether you’re performing aggregations, and how you want downstream systems to consume the data.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, count, avg

spark = SparkSession.builder \
    .appName("StreamingOutputModes") \
    .getOrCreate()

# Create a streaming DataFrame from a socket source
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

The three output modes—Append, Complete, and Update—each have specific use cases and limitations. Selecting the wrong mode results in runtime errors or unexpected behavior.

Append Mode: For Immutable Results

Append mode only adds new rows to the output sink. Once a row is written, it never changes. This mode works only with queries where rows are guaranteed to never be updated.

from pyspark.sql.functions import split, explode, current_timestamp

# Non-aggregated query - works with Append mode
words = lines.select(
    explode(split(col("value"), " ")).alias("word"),
    current_timestamp().alias("timestamp")
)

# Write using Append mode
query = words.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

For aggregations, Append mode requires watermarking to determine when aggregation windows are complete and won’t receive more data:

from pyspark.sql.functions import window, current_timestamp

# Create streaming data with timestamp
events = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load() \
    .withColumn("event_time", current_timestamp())

# Windowed aggregation with watermark
windowed_counts = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes", "5 minutes")
    ) \
    .count()

# Append mode works because watermark ensures finality
append_query = windowed_counts.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

The watermark tells Spark that events more than 10 minutes late should be dropped. When a window closes (current watermark passes the window end time), Spark outputs the final aggregation result knowing it won’t change.

Without watermarking, attempting Append mode on aggregations fails:

# This will throw an error at runtime
try:
    invalid_append = events \
        .groupBy(
            window(col("event_time"), "5 minutes")
        ) \
        .count() \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .start()
except Exception as e:
    print(f"Error: {e}")
    # "Append output mode not supported when there are streaming aggregations 
    # on streaming DataFrames/Datasets without watermark"

Complete Mode: Full Table Refresh

Complete mode outputs the entire result table after every trigger. This mode only works with aggregation queries and is suitable when the result set remains small.

# Simple aggregation - Complete mode
word_counts = lines \
    .select(explode(split(col("value"), " ")).alias("word")) \
    .groupBy("word") \
    .count()

complete_query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Complete mode rewrites all rows every time, making it expensive for large result sets but useful for dashboards requiring full state visibility:

from pyspark.sql.functions import sum, avg

# Multiple aggregations with Complete mode
sensor_data = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensors") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

from pyspark.sql.functions import from_json, schema_of_json

# Define schema
schema = "sensor_id STRING, temperature DOUBLE, humidity DOUBLE, timestamp TIMESTAMP"

parsed_data = sensor_data.select(
    from_json(col("value"), schema).alias("data")
).select("data.*")

# Aggregate by sensor
sensor_stats = parsed_data \
    .groupBy("sensor_id") \
    .agg(
        count("*").alias("reading_count"),
        avg("temperature").alias("avg_temp"),
        avg("humidity").alias("avg_humidity")
    )

# Complete mode outputs all sensors every trigger
complete_stats_query = sensor_stats.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sensor_stats_table") \
    .start()

Complete mode doesn’t work with non-aggregated queries:

# This will fail
try:
    invalid_complete = parsed_data.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()
except Exception as e:
    print(f"Error: {e}")
    # "Complete output mode not supported when there are no streaming aggregations"

Update Mode: Incremental Changes

Update mode outputs only rows that changed since the last trigger. This provides the efficiency of Append mode with the flexibility to handle aggregations without watermarking.

# Update mode with aggregation (no watermark needed)
real_time_counts = events \
    .groupBy(
        window(col("event_time"), "5 minutes", "1 minute")
    ) \
    .count()

update_query = real_time_counts.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

Update mode is ideal for maintaining incremental state in external systems:

# Stateful aggregation with multiple keys
user_activity = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user_events") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

activity_schema = "user_id STRING, action STRING, value DOUBLE, timestamp TIMESTAMP"

user_metrics = user_activity \
    .select(from_json(col("value"), activity_schema).alias("data")) \
    .select("data.*") \
    .groupBy("user_id", "action") \
    .agg(
        count("*").alias("action_count"),
        sum("value").alias("total_value")
    )

# Update mode - only changed user/action combinations are output
update_metrics_query = user_metrics.writeStream \
    .outputMode("update") \
    .format("parquet") \
    .option("path", "/tmp/user_metrics") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start()

Update mode also works with mapGroupsWithState and flatMapGroupsWithState for arbitrary stateful processing:

from pyspark.sql.streaming import GroupState, GroupStateTimeout

def update_user_state(key, values, state):
    # Custom stateful logic
    if state.exists:
        existing_count = state.get
    else:
        existing_count = 0
    
    new_count = existing_count + sum(1 for _ in values)
    state.update(new_count)
    
    return (key, new_count)

stateful_counts = events \
    .groupBy("value") \
    .mapGroupsWithState(update_user_state, GroupStateTimeout.NoTimeout)

# Update mode required for mapGroupsWithState
stateful_query = stateful_counts.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

Choosing the Right Mode

Use Append mode when:

  • Processing event logs where each event is independent
  • Running windowed aggregations with watermarks where windows close definitively
  • Writing to append-only systems like S3 or HDFS with partitioning

Use Complete mode when:

  • Building real-time dashboards requiring full result visibility
  • Result sets are small (thousands of rows, not millions)
  • Downstream systems need complete snapshots

Use Update mode when:

  • Running aggregations without watermarks
  • Maintaining incremental state in databases (upsert semantics)
  • Result sets are large but changes are sparse
  • Using custom stateful operations
# Practical comparison with same query
from pyspark.sql.functions import current_timestamp

clicks = spark.readStream \
    .format("rate") \
    .load() \
    .withColumn("timestamp", current_timestamp())

page_views = clicks \
    .withWatermark("timestamp", "1 hour") \
    .groupBy(window("timestamp", "10 minutes")) \
    .count()

# Append: Only finalized windows
append_pv = page_views.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Update: All windows including open ones, only changes
update_pv = page_views.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# Complete: All windows every trigger (expensive)
complete_pv = page_views.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Understanding output modes prevents runtime errors and optimizes streaming pipeline performance. Match the mode to your query semantics and downstream system requirements.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.