PySpark - Streaming from File Source

PySpark Structured Streaming treats file sources as unbounded tables, continuously monitoring directories for new files. Unlike batch processing, the streaming engine maintains state through...

Key Insights

  • PySpark’s file source streaming monitors directories for new files and processes them incrementally, supporting formats like JSON, CSV, Parquet, and text with schema inference and enforcement
  • File streaming requires careful configuration of maxFilesPerTrigger, schema management, and checkpoint locations to ensure exactly-once processing and fault tolerance
  • Production deployments must handle schema evolution, file arrival patterns, and cleanup strategies to prevent memory issues and maintain processing performance

Understanding File Source Streaming

PySpark Structured Streaming treats file sources as unbounded tables, continuously monitoring directories for new files. Unlike batch processing, the streaming engine maintains state through checkpoints and processes files incrementally as they arrive.

The file source supports multiple formats and provides configurable triggers to control processing rates. Each micro-batch reads a subset of files based on your trigger configuration, making it suitable for scenarios where data arrives periodically rather than in real-time streams.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

spark = SparkSession.builder \
    .appName("FileSourceStreaming") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

# Define explicit schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("value", IntegerType(), True)
])

# Create streaming DataFrame
streaming_df = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 10) \
    .load("/data/input/")

Configuring File Processing Options

The maxFilesPerTrigger option controls throughput and resource utilization. Setting it too high causes large micro-batches that may overwhelm memory, while too low values create processing lag.

# JSON streaming with multiple options
json_stream = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 5) \
    .option("multiLine", "true") \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .load("/data/json_input/")

# CSV streaming with header and delimiter
csv_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("amount", IntegerType(), True)
])

csv_stream = spark.readStream \
    .format("csv") \
    .schema(csv_schema) \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("maxFilesPerTrigger", 20) \
    .load("/data/csv_input/")

# Parquet streaming (most efficient)
parquet_stream = spark.readStream \
    .format("parquet") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 100) \
    .load("/data/parquet_input/")

Implementing Transformations and Aggregations

File source streams support the full range of DataFrame operations. Stateful operations require watermarking to manage state growth.

from pyspark.sql.functions import col, window, count, sum, avg

# Simple transformations
transformed = streaming_df \
    .filter(col("value") > 100) \
    .withColumn("value_doubled", col("value") * 2) \
    .select("id", "name", "timestamp", "value_doubled")

# Windowed aggregations with watermarking
windowed_agg = streaming_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes", "1 minute"),
        col("name")
    ) \
    .agg(
        count("*").alias("event_count"),
        sum("value").alias("total_value"),
        avg("value").alias("avg_value")
    )

# Tumbling window without slide
tumbling_window = streaming_df \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        window(col("timestamp"), "10 minutes"),
        col("name")
    ) \
    .agg(count("*").alias("count"))

Managing Checkpoints and Output

Checkpoints store metadata about processed files and streaming state. Without checkpoints, the stream processes all files on every restart.

# Console output for debugging
query_console = transformed \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 20) \
    .option("checkpointLocation", "/checkpoints/console") \
    .start()

# Parquet output with partitioning
query_parquet = windowed_agg \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/output/aggregated") \
    .option("checkpointLocation", "/checkpoints/parquet") \
    .partitionBy("window") \
    .trigger(processingTime="30 seconds") \
    .start()

# Memory sink for testing
query_memory = streaming_df \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("streaming_table") \
    .option("checkpointLocation", "/checkpoints/memory") \
    .start()

# Query the in-memory table
spark.sql("SELECT * FROM streaming_table LIMIT 10").show()

Handling Schema Evolution

File sources require consistent schemas across all files. Schema mismatches cause processing failures unless handled explicitly.

# Schema with optional fields
flexible_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("value", IntegerType(), True),
    StructField("metadata", StringType(), True),  # Optional field
    StructField("_corrupt_record", StringType(), True)
])

# Stream with schema enforcement
enforced_stream = spark.readStream \
    .format("json") \
    .schema(flexible_schema) \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .load("/data/input/")

# Filter and log corrupt records
clean_records = enforced_stream.filter(col("_corrupt_record").isNull())
corrupt_records = enforced_stream.filter(col("_corrupt_record").isNotNull())

query_clean = clean_records \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/output/clean") \
    .option("checkpointLocation", "/checkpoints/clean") \
    .start()

query_corrupt = corrupt_records \
    .writeStream \
    .format("json") \
    .option("path", "/data/output/corrupt") \
    .option("checkpointLocation", "/checkpoints/corrupt") \
    .start()

Production Patterns

Production deployments require monitoring, error handling, and resource management strategies.

from pyspark.sql.functions import current_timestamp, input_file_name

# Add metadata columns for tracking
enriched_stream = streaming_df \
    .withColumn("processing_time", current_timestamp()) \
    .withColumn("source_file", input_file_name())

# Configure trigger intervals
query_fixed = enriched_stream \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/output/fixed_interval") \
    .option("checkpointLocation", "/checkpoints/fixed") \
    .trigger(processingTime="1 minute") \
    .start()

# One-time trigger for controlled processing
query_once = enriched_stream \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/output/once") \
    .option("checkpointLocation", "/checkpoints/once") \
    .trigger(once=True) \
    .start()

# Available-now trigger (Spark 3.3+)
query_available = enriched_stream \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/output/available") \
    .option("checkpointLocation", "/checkpoints/available") \
    .trigger(availableNow=True) \
    .start()

Monitoring and Cleanup

Track streaming progress and implement file cleanup to prevent directory bloat.

# Monitor streaming query
query = enriched_stream \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/output/monitored") \
    .option("checkpointLocation", "/checkpoints/monitored") \
    .start()

# Access query metrics
print(f"Query ID: {query.id}")
print(f"Run ID: {query.runId}")
print(f"Status: {query.status}")
print(f"Recent Progress: {query.recentProgress}")

# Wait with timeout
try:
    query.awaitTermination(timeout=3600)
except Exception as e:
    print(f"Query terminated with error: {e}")
    query.stop()

# Cleanup processed files (external script)
import os
import time

def cleanup_old_files(directory, age_hours=24):
    current_time = time.time()
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        if os.path.isfile(filepath):
            file_age = (current_time - os.path.getmtime(filepath)) / 3600
            if file_age > age_hours:
                os.remove(filepath)
                print(f"Removed: {filepath}")

File source streaming provides reliable, scalable ingestion for batch-oriented data pipelines. Proper configuration of schemas, checkpoints, and triggers ensures consistent processing with minimal operational overhead. Monitor checkpoint sizes and implement file retention policies to maintain long-running production streams.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.