Apache Spark - Handling Small Files Problem

Every Spark developer eventually encounters the small files problem. You've built a pipeline that works perfectly in development, but in production, jobs that should take minutes stretch into hours....

Key Insights

  • Small files (under 128MB) create excessive task overhead in Spark, as each file spawns a separate task, overwhelming the driver and degrading performance by orders of magnitude
  • Prevention beats cure: configure maxRecordsPerFile, use proper coalescing on writes, and avoid over-partitioning to stop small files at the source
  • Modern table formats like Delta Lake, Iceberg, and Hudi provide built-in compaction features that automate small file management with minimal operational overhead

The Small Files Problem

Every Spark developer eventually encounters the small files problem. You’ve built a pipeline that works perfectly in development, but in production, jobs that should take minutes stretch into hours. The culprit? Thousands or millions of tiny files scattered across your data lake.

In Spark’s context, “small files” typically means anything under 128MB—the default HDFS block size. But the real threshold depends on your cluster. Files under 32MB are almost always problematic, and even 64MB files cause issues at scale.

The problem stems from three common sources: streaming pipelines that write micro-batches every few seconds, over-partitioned datasets where each partition combination creates a separate file, and migrations from legacy systems that preserved original file structures.

Understanding the Performance Impact

Spark’s execution model creates one task per input partition, and by default, each file becomes at least one partition. When you read a directory containing 10,000 small files, Spark spawns 10,000 tasks. Each task carries overhead: scheduling time, serialization, network coordination, and result collection.

The driver becomes the bottleneck. It must track every task, maintain metadata for every file, and coordinate the entire execution. With millions of small files, driver memory exhaustion becomes a real risk, and even before that point, the coordination overhead dominates actual computation time.

Consider the I/O pattern difference. Reading one 1GB file means one connection setup, one metadata lookup, and sequential reads. Reading 1,000 1MB files means 1,000 connection setups, 1,000 metadata lookups, and constant context switching.

Here’s a benchmark that demonstrates the impact:

from pyspark.sql import SparkSession
import time

spark = SparkSession.builder \
    .appName("SmallFilesBenchmark") \
    .getOrCreate()

# Generate test data
df = spark.range(0, 10_000_000).withColumn("data", F.rand())

# Write as many small files (1000 files)
start = time.time()
df.repartition(1000).write.mode("overwrite").parquet("/tmp/small_files")
small_write_time = time.time() - start

# Write as consolidated files (10 files)
start = time.time()
df.repartition(10).write.mode("overwrite").parquet("/tmp/large_files")
large_write_time = time.time() - start

# Benchmark reads
start = time.time()
spark.read.parquet("/tmp/small_files").count()
small_read_time = time.time() - start

start = time.time()
spark.read.parquet("/tmp/large_files").count()
large_read_time = time.time() - start

print(f"Small files read: {small_read_time:.2f}s")
print(f"Large files read: {large_read_time:.2f}s")
print(f"Performance ratio: {small_read_time / large_read_time:.1f}x slower")

In typical environments, the small files read takes 5-10x longer than the consolidated version, and the gap widens with file count.

Detection and Diagnosis

Before fixing the problem, you need to find it. The Spark UI provides clear indicators: look for stages with thousands of tasks that complete in milliseconds each. When you see 10,000 tasks averaging 50ms, you’re spending more time on overhead than computation.

The Jobs tab shows total task counts. If a simple read operation spawns more tasks than your data size in gigabytes, investigate further.

Here’s a diagnostic script to analyze file sizes in your data lake:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("FileAnalysis").getOrCreate()

def analyze_directory(path: str):
    """Analyze file sizes in a directory and identify small file problems."""
    
    # Get file listing using Hadoop filesystem API
    hadoop_conf = spark._jsc.hadoopConfiguration()
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jvm.java.net.URI(path), hadoop_conf
    )
    
    files = []
    iterator = fs.listFiles(spark._jvm.org.apache.hadoop.fs.Path(path), True)
    
    while iterator.hasNext():
        file_status = iterator.next()
        if file_status.isFile():
            files.append((
                file_status.getPath().toString(),
                file_status.getLen()
            ))
    
    # Convert to DataFrame for analysis
    file_df = spark.createDataFrame(files, ["path", "size_bytes"])
    
    stats = file_df.agg(
        F.count("*").alias("total_files"),
        F.sum("size_bytes").alias("total_bytes"),
        F.avg("size_bytes").alias("avg_bytes"),
        F.min("size_bytes").alias("min_bytes"),
        F.max("size_bytes").alias("max_bytes"),
        F.sum(F.when(F.col("size_bytes") < 128 * 1024 * 1024, 1).otherwise(0)).alias("small_files")
    )
    
    stats.show(truncate=False)
    
    # Distribution by size buckets
    file_df.withColumn(
        "size_bucket",
        F.when(F.col("size_bytes") < 1024 * 1024, "< 1MB")
         .when(F.col("size_bytes") < 32 * 1024 * 1024, "1-32MB")
         .when(F.col("size_bytes") < 128 * 1024 * 1024, "32-128MB")
         .otherwise(">= 128MB")
    ).groupBy("size_bucket").count().orderBy("count", ascending=False).show()

analyze_directory("s3://your-bucket/your-data/")

This script reveals the distribution of file sizes and quantifies how many files fall below the 128MB threshold.

Prevention Strategies

The best solution is preventing small files from being created. Spark provides several mechanisms to control output file sizes.

The maxRecordsPerFile option limits records per output file, useful when you know your record sizes:

# Write with maximum records per file
df.write \
    .option("maxRecordsPerFile", 1_000_000) \
    .mode("overwrite") \
    .parquet("/data/output")

For more direct control, use coalesce() or repartition() before writing:

# Calculate target partitions based on data size
data_size_mb = df.rdd.map(lambda x: len(str(x))).sum() / (1024 * 1024)
target_file_size_mb = 256
num_partitions = max(1, int(data_size_mb / target_file_size_mb))

df.coalesce(num_partitions).write.mode("overwrite").parquet("/data/output")

When writing partitioned data, the combination of partition columns and output partitions multiplies file count. Be conservative:

# Problematic: creates files for every combination
df.repartition("year", "month", "day", "hour") \
    .write.partitionBy("year", "month", "day", "hour") \
    .parquet("/data/events")

# Better: control file count within partitions
df.repartition("year", "month", "day") \
    .write.partitionBy("year", "month", "day") \
    .option("maxRecordsPerFile", 500_000) \
    .parquet("/data/events")

Compaction Techniques

When small files already exist, compaction consolidates them. The basic approach reads and rewrites data with fewer partitions:

def compact_partition(spark, base_path: str, partition_path: str, target_size_mb: int = 256):
    """Compact files within a single partition."""
    
    full_path = f"{base_path}/{partition_path}"
    
    # Read existing data
    df = spark.read.parquet(full_path)
    
    # Estimate current size and calculate target partitions
    row_count = df.count()
    if row_count == 0:
        return
    
    # Sample to estimate row size
    sample_size = min(10000, row_count)
    sample_bytes = df.limit(sample_size).toPandas().memory_usage(deep=True).sum()
    estimated_total_mb = (sample_bytes / sample_size * row_count) / (1024 * 1024)
    
    target_partitions = max(1, int(estimated_total_mb / target_size_mb))
    
    # Write compacted data to temp location
    temp_path = f"{full_path}_compacting"
    df.coalesce(target_partitions).write.mode("overwrite").parquet(temp_path)
    
    # Atomic swap using filesystem operations
    hadoop_conf = spark._jsc.hadoopConfiguration()
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jvm.java.net.URI(base_path), hadoop_conf
    )
    
    original = spark._jvm.org.apache.hadoop.fs.Path(full_path)
    temp = spark._jvm.org.apache.hadoop.fs.Path(temp_path)
    backup = spark._jvm.org.apache.hadoop.fs.Path(f"{full_path}_old")
    
    fs.rename(original, backup)
    fs.rename(temp, original)
    fs.delete(backup, True)

# Run compaction for specific partitions
compact_partition(spark, "s3://bucket/events", "year=2024/month=01/day=15")

For production use, schedule compaction during low-traffic periods and process partitions that exceed a small file threshold.

Leveraging Table Formats

Modern table formats handle compaction automatically. Delta Lake’s OPTIMIZE command consolidates small files:

from delta.tables import DeltaTable

# Create or reference Delta table
df.write.format("delta").mode("overwrite").save("/data/events_delta")

# Run optimization
spark.sql("OPTIMIZE delta.`/data/events_delta`")

# With Z-ordering for query performance
spark.sql("""
    OPTIMIZE delta.`/data/events_delta`
    ZORDER BY (event_date, user_id)
""")

# Programmatic approach
delta_table = DeltaTable.forPath(spark, "/data/events_delta")
delta_table.optimize().executeCompaction()

Delta Lake also supports auto-compaction on writes:

spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.minNumFiles", 50)
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")

Apache Iceberg provides similar functionality:

# Iceberg compaction
spark.sql("""
    CALL catalog.system.rewrite_data_files(
        table => 'db.events',
        options => map('target-file-size-bytes', '268435456')
    )
""")

Best Practices Summary

Target file sizes between 256MB and 1GB for most workloads. Smaller files (128-256MB) work better for frequently filtered data; larger files (512MB-1GB) suit sequential scan workloads.

Establish monitoring that alerts when average file size drops below thresholds. Track the ratio of files to total data size—anything over 10 files per GB warrants investigation.

Schedule compaction jobs during off-peak hours. For streaming pipelines, run compaction hourly for recent partitions and daily for older data.

Accept the trade-off: preventing small files on write adds latency. Streaming applications may need to write small files initially and compact later. Batch jobs should consolidate before writing.

The small files problem is solvable. With proper write configurations, regular compaction, and modern table formats, you can maintain a healthy data lake that performs at scale.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.