Spark Streaming - File Source Processing
Spark Structured Streaming treats file sources as unbounded tables, continuously monitoring a directory for new files. Unlike traditional batch processing, the file source uses checkpoint metadata to...
Key Insights
- Spark Streaming’s file source monitors directories for new files and processes them incrementally, making it ideal for batch-to-streaming migrations and log processing pipelines where files arrive periodically
- File-based streaming requires careful handling of schema evolution, file formats (Parquet, JSON, CSV), and checkpoint management to ensure exactly-once processing semantics
- Production deployments must address file discovery latency, small file problems, and proper cleanup strategies to prevent metadata bloat and processing delays
Understanding File Source Streaming Architecture
Spark Structured Streaming treats file sources as unbounded tables, continuously monitoring a directory for new files. Unlike traditional batch processing, the file source uses checkpoint metadata to track which files have been processed, enabling incremental consumption.
The core mechanism relies on file modification timestamps and path-based tracking. When you define a streaming query on a directory, Spark periodically lists files, compares them against checkpoint state, and processes only new additions.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
spark = SparkSession.builder \
.appName("FileSourceStreaming") \
.config("spark.sql.streaming.schemaInference", "true") \
.getOrCreate()
# Define explicit schema for production use
schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("value", IntegerType(), True)
])
# Create streaming DataFrame from JSON files
streaming_df = spark.readStream \
.format("json") \
.schema(schema) \
.option("maxFilesPerTrigger", 10) \
.load("/data/input/events/")
The maxFilesPerTrigger option controls throughput and latency. Lower values provide more frequent micro-batches but increase overhead. Higher values improve throughput but increase latency.
Processing Different File Formats
Each file format presents unique considerations for streaming workloads. Parquet offers columnar efficiency but requires complete file writes. JSON and CSV provide flexibility but need careful schema management.
# Parquet streaming with partition discovery
parquet_stream = spark.readStream \
.format("parquet") \
.schema(schema) \
.option("maxFilesPerTrigger", 5) \
.option("latestFirst", "false") \
.load("/data/input/parquet_events/year=*/month=*/")
# CSV with header and delimiter options
csv_stream = spark.readStream \
.format("csv") \
.schema(schema) \
.option("header", "true") \
.option("delimiter", ",") \
.option("maxFilesPerTrigger", 20) \
.load("/data/input/csv_events/")
# Multi-line JSON for nested structures
json_stream = spark.readStream \
.format("json") \
.schema(schema) \
.option("multiLine", "false") \
.option("maxFilesPerTrigger", 15) \
.load("/data/input/json_events/")
For Parquet files, disable schema merging in streaming contexts to avoid expensive metadata operations:
parquet_optimized = spark.readStream \
.format("parquet") \
.schema(schema) \
.option("mergeSchema", "false") \
.option("pathGlobFilter", "*.parquet") \
.load("/data/input/events/")
Implementing Transformation Pipelines
File source streams support the full Spark SQL API. Apply transformations, aggregations, and windowing operations before writing results.
from pyspark.sql.functions import col, window, count, sum, avg, current_timestamp
# Stateless transformations
transformed = streaming_df \
.filter(col("event_type").isin(["click", "purchase"])) \
.withColumn("processing_time", current_timestamp()) \
.select("user_id", "event_type", "timestamp", "value", "processing_time")
# Stateful aggregations with watermarking
windowed_aggregates = streaming_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("event_type")
) \
.agg(
count("*").alias("event_count"),
sum("value").alias("total_value"),
avg("value").alias("avg_value")
)
# Write aggregated results
query = windowed_aggregates.writeStream \
.format("parquet") \
.outputMode("append") \
.option("path", "/data/output/aggregates/") \
.option("checkpointLocation", "/data/checkpoints/aggregates/") \
.trigger(processingTime="30 seconds") \
.start()
Watermarking is critical for stateful operations. It tells Spark how late data can arrive, enabling state cleanup and preventing unbounded memory growth.
Checkpoint Management and Recovery
Checkpoints store offset information and metadata about processed files. Proper checkpoint configuration ensures exactly-once semantics and enables failure recovery.
# Production checkpoint configuration
query = streaming_df \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/data/checkpoints/events/") \
.option("path", "/data/output/events/") \
.queryName("event_processor") \
.start()
# Monitor query progress
query.awaitTermination(timeout=60)
status = query.status
print(f"Query ID: {query.id}")
print(f"Status: {status}")
Never delete checkpoint directories while queries are running. To reset processing from scratch, stop the query, delete the checkpoint, and restart:
# Stop existing query
query.stop()
# Delete checkpoint (use with caution)
import shutil
shutil.rmtree("/data/checkpoints/events/", ignore_errors=True)
# Restart with clean state
new_query = streaming_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/data/checkpoints/events/") \
.option("path", "/data/output/events/") \
.start()
Handling Schema Evolution
Schema changes require careful planning. Spark Streaming doesn’t automatically adapt to schema changes in file sources during execution.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# Initial schema
schema_v1 = StructType([
StructField("user_id", StringType(), False),
StructField("value", IntegerType(), True)
])
# Evolved schema with new field
schema_v2 = StructType([
StructField("user_id", StringType(), False),
StructField("value", IntegerType(), True),
StructField("score", DoubleType(), True) # New field
])
# Use schema_v2 for forward compatibility
stream_with_evolution = spark.readStream \
.format("json") \
.schema(schema_v2) \
.load("/data/input/events/")
# Handle missing fields gracefully
from pyspark.sql.functions import coalesce, lit
normalized = stream_with_evolution \
.withColumn("score", coalesce(col("score"), lit(0.0)))
For breaking schema changes, deploy a new streaming job with updated schema and checkpoint location, then migrate traffic gradually.
Production Optimization Strategies
File discovery latency impacts processing delay. Configure latestFirst and file listing intervals based on your SLA requirements:
# Optimize for low latency
low_latency_stream = spark.readStream \
.format("parquet") \
.schema(schema) \
.option("latestFirst", "true") \
.option("maxFilesPerTrigger", 100) \
.option("fileNameOnly", "true") \
.load("/data/input/events/")
# Configure trigger interval
query = low_latency_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/data/checkpoints/events/") \
.trigger(processingTime="10 seconds") \
.start()
Address small file problems by implementing file compaction:
# Repartition before writing to control output file size
optimized_output = streaming_df \
.repartition(10) \
.writeStream \
.format("parquet") \
.outputMode("append") \
.option("checkpointLocation", "/data/checkpoints/compacted/") \
.option("path", "/data/output/compacted/") \
.trigger(processingTime="5 minutes") \
.start()
Implement cleanup policies for processed files to prevent directory bloat:
import os
from datetime import datetime, timedelta
def cleanup_old_files(directory, days_old=7):
"""Remove files older than specified days after processing"""
cutoff = datetime.now() - timedelta(days=days_old)
for root, dirs, files in os.walk(directory):
for file in files:
filepath = os.path.join(root, file)
if os.path.getmtime(filepath) < cutoff.timestamp():
os.remove(filepath)
print(f"Removed: {filepath}")
# Run cleanup periodically (outside streaming context)
cleanup_old_files("/data/input/events/", days_old=7)
Monitor streaming metrics using Spark’s built-in capabilities:
# Access streaming query metrics
progress = query.lastProgress
if progress:
print(f"Input rows/sec: {progress['inputRowsPerSecond']}")
print(f"Process rows/sec: {progress['processedRowsPerSecond']}")
print(f"Batch duration: {progress['batchDuration']} ms")
print(f"Number of input files: {progress['numInputRows']}")
File source streaming provides a bridge between batch and real-time processing, enabling incremental ETL pipelines without complex message broker infrastructure. Success depends on proper schema management, checkpoint hygiene, and optimization for your specific file arrival patterns and processing SLAs.