Spark Streaming - Output Modes Explained

Spark Structured Streaming's output modes determine how the engine writes query results to external storage systems. When you work with streaming aggregations, the result table continuously changes...

Key Insights

  • Output modes (Complete, Append, Update) control how Spark Streaming writes aggregated results to sinks, with each mode serving distinct use cases based on whether you need full snapshots, incremental updates, or only new rows
  • Complete mode rewrites entire result tables and works only with aggregations, making it suitable for small aggregation results but inefficient for large datasets due to full table overwrites
  • Append mode is the most restrictive but memory-efficient option, writing only new rows that will never change, ideal for event-time windowed aggregations after watermarks expire

Understanding Output Modes

Spark Structured Streaming’s output modes determine how the engine writes query results to external storage systems. When you work with streaming aggregations, the result table continuously changes as new data arrives. Output modes define which parts of this evolving result get written to the sink.

Three output modes exist: Complete, Append, and Update. Choosing the wrong mode leads to runtime errors or inefficient resource usage. The choice depends on your query type, whether you’re aggregating data, and how downstream consumers need to read results.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("OutputModes") \
    .getOrCreate()

# Read streaming data from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .load()

# Parse JSON values
parsed = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

Complete Mode: Full Table Snapshots

Complete mode writes the entire result table to the sink with every trigger. This mode only works with aggregation queries because non-aggregated streaming data would grow infinitely.

Use Complete mode when downstream systems need the full current state and can handle complete table replacements. This works well for dashboards displaying real-time aggregated metrics or small lookup tables that fit in memory.

# Count events by category - Complete mode
category_counts = parsed.groupBy("category").count()

query = category_counts.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("category_counts") \
    .start()

# Query the in-memory table
spark.sql("SELECT * FROM category_counts").show()

The problem with Complete mode: every trigger rewrites all data. For aggregations with thousands of groups, you’re writing massive amounts of redundant data. A table with 10,000 categories gets completely rewritten even if only 5 categories received new events.

# Complete mode with multiple aggregations
summary = parsed.groupBy("category", "region").agg(
    count("*").alias("event_count"),
    sum("amount").alias("total_amount"),
    avg("amount").alias("avg_amount")
)

# This rewrites the entire summary table every trigger
query = summary.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

Complete mode maintains the full aggregation state in memory. If you’re aggregating over unbounded data without time windows, memory usage grows indefinitely until the job crashes. Always combine Complete mode with appropriate state management.

Append Mode: Write-Once Semantics

Append mode writes only new rows that will never change. This is the most restrictive mode but offers the best performance for scenarios where data becomes immutable after a certain point.

Append mode works for non-aggregated queries and time-windowed aggregations with watermarks. The watermark tells Spark when data for a particular window is complete, allowing those results to be appended.

# Non-aggregated query - Append mode
filtered = parsed.filter(col("amount") > 1000)

query = filtered.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/high_value_transactions") \
    .option("checkpointLocation", "/checkpoints/high_value") \
    .start()

For aggregations, you must define watermarks to use Append mode. The watermark specifies how late data can arrive. Once the watermark passes, Spark finalizes the window results and appends them.

# Windowed aggregation with watermark - Append mode
windowed_agg = parsed \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("category")
    ) \
    .agg(
        count("*").alias("event_count"),
        sum("amount").alias("total_amount")
    )

query = windowed_agg.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/windowed_metrics") \
    .option("checkpointLocation", "/checkpoints/windowed") \
    .start()

Append mode without watermarks on aggregations throws an error:

# This will fail at runtime
no_watermark = parsed.groupBy("category").count()

# Error: Append output mode not supported when there are 
# streaming aggregations on streaming DataFrames/Datasets 
# without watermark
query = no_watermark.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

Update Mode: Incremental Changes

Update mode writes only rows that changed since the last trigger. For aggregations, this means updated aggregation results. For non-aggregated queries, it behaves like Append mode.

Update mode balances efficiency and completeness. Unlike Complete mode, you don’t rewrite unchanged rows. Unlike Append mode, you can output aggregations without watermarks.

# Stateful aggregation - Update mode
running_totals = parsed.groupBy("user_id", "category").agg(
    count("*").alias("transaction_count"),
    sum("amount").alias("total_spent"),
    max("timestamp").alias("last_transaction")
)

query = running_totals.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("path", "/data/user_totals") \
    .option("checkpointLocation", "/checkpoints/user_totals") \
    .start()

Update mode works well with Delta Lake or databases that support upserts. The sink must handle updates to existing keys. File formats like Parquet don’t support in-place updates, making Update mode less useful for them.

# Update mode with foreachBatch for custom sink logic
def write_to_database(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost/metrics") \
        .option("dbtable", "user_metrics") \
        .option("user", "admin") \
        .option("password", "password") \
        .mode("overwrite") \
        .save()

query = running_totals.writeStream \
    .outputMode("update") \
    .foreachBatch(write_to_database) \
    .start()

Choosing the Right Mode

The decision matrix is straightforward. For non-aggregated queries, use Append mode. For aggregations, consider your requirements:

Use Complete mode when:

  • Result set is small (hundreds to thousands of rows)
  • Downstream needs full snapshots
  • Using in-memory tables for interactive queries
  • Implementing real-time dashboards with full refreshes

Use Append mode when:

  • Processing event-time windowed aggregations
  • Writing to immutable storage (Parquet, ORC)
  • Need exactly-once semantics
  • Can define appropriate watermarks

Use Update mode when:

  • Running unbounded aggregations
  • Sink supports upserts (Delta Lake, databases)
  • Need incremental updates without full rewrites
  • Cannot define watermarks but need aggregations
# Practical example: Session analytics
from pyspark.sql.types import *

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("session_id", StringType())
])

events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user_events") \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Append: Write raw events
events.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("path", "/data/raw_events") \
    .option("checkpointLocation", "/checkpoints/raw") \
    .start()

# Update: Session metrics (unbounded aggregation)
session_metrics = events.groupBy("session_id").agg(
    count("*").alias("event_count"),
    countDistinct("event_type").alias("unique_events"),
    max("timestamp").alias("last_activity")
)

session_metrics.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("path", "/data/session_metrics") \
    .option("checkpointLocation", "/checkpoints/sessions") \
    .start()

# Append: Hourly aggregates with watermark
hourly_stats = events \
    .withWatermark("timestamp", "1 hour") \
    .groupBy(window(col("timestamp"), "1 hour")) \
    .agg(
        countDistinct("user_id").alias("active_users"),
        count("*").alias("total_events")
    )

hourly_stats.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("path", "/data/hourly_stats") \
    .option("checkpointLocation", "/checkpoints/hourly") \
    .start()

Output modes fundamentally affect performance, storage costs, and downstream processing patterns. Complete mode’s full rewrites consume unnecessary I/O. Append mode’s immutability enables efficient incremental processing. Update mode requires sinks with update capabilities but provides the most flexibility for stateful aggregations.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.