Spark Streaming - Sources and Sinks Overview

Structured Streaming sources define where your streaming application reads data from. Each source type provides different guarantees around fault tolerance and data ordering.

Key Insights

  • Spark Structured Streaming treats data sources as unbounded tables and supports connectors for Kafka, file systems, sockets, and rate sources, each with specific reliability guarantees and configuration requirements
  • Sinks determine output destinations and delivery semantics—file and Kafka sinks provide exactly-once guarantees while foreach sinks require manual idempotency handling
  • Checkpoint locations are mandatory for production workloads to enable fault tolerance and exactly-once processing, storing offset information and metadata for recovery

Understanding Streaming Sources

Structured Streaming sources define where your streaming application reads data from. Each source type provides different guarantees around fault tolerance and data ordering.

The most common production source is Kafka. Here’s a basic configuration:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaSource") \
    .getOrCreate()

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "earliest") \
    .load()

# Kafka returns binary data - deserialize it
parsed_df = kafka_df.selectExpr(
    "CAST(key AS STRING)",
    "CAST(value AS STRING)",
    "topic",
    "partition",
    "offset",
    "timestamp"
)

The startingOffsets parameter controls where to begin reading. Use earliest for reprocessing all data, latest for new data only, or specify exact offsets as JSON:

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", """{"orders":{"0":23,"1":45}}""") \
    .load()

File-Based Sources

File sources monitor directories for new files. Supported formats include JSON, CSV, Parquet, ORC, and text:

schema = "order_id LONG, customer_id LONG, amount DOUBLE, timestamp TIMESTAMP"

file_df = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 10) \
    .load("/data/orders/")

Critical considerations for file sources:

  • Schema must be explicitly provided - Spark cannot infer schemas from streaming sources
  • Files must be atomically placed in the directory (rename operations, not direct writes)
  • Once processed, files cannot be modified - Spark tracks them by name

The maxFilesPerTrigger option controls throughput and micro-batch size. Without it, Spark processes all available files in a single batch.

For CSV files with headers:

csv_df = spark.readStream \
    .format("csv") \
    .schema(schema) \
    .option("header", "true") \
    .option("maxFilesPerTrigger", 5) \
    .load("/data/csv_orders/")

Socket and Rate Sources

Socket sources are useful for testing but not recommended for production due to lack of fault tolerance:

socket_df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

The rate source generates dummy data for testing and benchmarking:

rate_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1000) \
    .option("numPartitions", 4) \
    .load()

This creates a stream with timestamp and value columns, generating 1000 rows per second across 4 partitions.

Sink Types and Guarantees

Sinks determine where processed data goes and what delivery guarantees you get. The three semantic levels are:

  • At-most-once: Data may be lost but never duplicated
  • At-least-once: Data may be duplicated but never lost
  • Exactly-once: Each record processed exactly once

File Sink

File sinks provide exactly-once guarantees and support partitioning:

query = parsed_df.writeStream \
    .format("parquet") \
    .option("path", "/output/orders") \
    .option("checkpointLocation", "/checkpoints/orders") \
    .partitionBy("date") \
    .outputMode("append") \
    .start()

The checkpoint location is critical—it stores metadata about processed offsets and ensures exactly-once semantics. Without it, the query fails.

Supported file formats: Parquet, ORC, JSON, CSV, and text. Parquet is recommended for production due to compression and columnar storage benefits.

Kafka Sink

Writing back to Kafka enables event-driven architectures:

from pyspark.sql.functions import col, to_json, struct

# Prepare data for Kafka (requires key and value columns)
kafka_output = processed_df.select(
    col("order_id").cast("string").alias("key"),
    to_json(struct("*")).alias("value")
)

query = kafka_output.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "processed_orders") \
    .option("checkpointLocation", "/checkpoints/kafka_sink") \
    .start()

Kafka sinks require:

  • key column (optional, but recommended for partitioning)
  • value column (required)
  • Both must be STRING or BINARY types

For exactly-once delivery to Kafka, enable idempotent writes:

query = kafka_output.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "processed_orders") \
    .option("kafka.enable.idempotence", "true") \
    .option("checkpointLocation", "/checkpoints/kafka_sink") \
    .start()

Memory and Console Sinks

Memory sink stores results in an in-memory table for debugging:

query = processed_df.writeStream \
    .format("memory") \
    .queryName("orders_table") \
    .outputMode("complete") \
    .start()

# Query the in-memory table
spark.sql("SELECT * FROM orders_table").show()

Console sink prints to stdout:

query = processed_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", "false") \
    .option("numRows", 20) \
    .start()

Neither provides fault tolerance—use only for development.

ForeachBatch Sink

For custom logic or writing to multiple destinations:

def process_batch(batch_df, batch_id):
    # Write to PostgreSQL
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost/db") \
        .option("dbtable", "orders") \
        .option("user", "admin") \
        .mode("append") \
        .save()
    
    # Also write to S3
    batch_df.write \
        .format("parquet") \
        .mode("append") \
        .save(f"s3://bucket/orders/batch_{batch_id}")

query = processed_df.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/checkpoints/custom") \
    .start()

ForeachBatch provides exactly-once guarantees for the batch delivery to your function, but you must ensure idempotency in your custom code.

Output Modes

Output modes control what data gets written to the sink:

Append: Only new rows since the last trigger. Works with all sinks but requires operations that don’t update previous results.

Complete: Entire result table written after each trigger. Only works with aggregations and requires memory sink or foreach.

Update: Only updated rows since last trigger. Useful for aggregations with state.

# Append mode - streaming ETL
query = df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .start("/output")

# Complete mode - aggregations
aggregated = df.groupBy("category").count()
query = aggregated.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("category_counts") \
    .start()

# Update mode - stateful aggregations
windowed = df.groupBy(
    window("timestamp", "1 hour"),
    "category"
).count()
query = windowed.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

Checkpoint Management

Checkpoints store offset information, metadata, and state. They’re stored in a distributed file system and must be reliable:

query = df.writeStream \
    .format("parquet") \
    .option("path", "/output/data") \
    .option("checkpointLocation", "s3://bucket/checkpoints/app1") \
    .start()

Never share checkpoint locations between queries. Each streaming query needs its own directory. If you delete checkpoints, the query restarts from scratch based on source configuration.

For production deployments on cloud platforms, use object storage for checkpoints (S3, GCS, ADLS) with versioning enabled to prevent accidental deletion.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.