Apache Spark - Performance Tuning Complete Guide
Before tuning anything, you need to understand what Spark is actually doing. Every Spark application breaks down into jobs, stages, and tasks. Jobs are triggered by actions like `count()` or...
Key Insights
- Spark performance problems are usually caused by data skew, improper partitioning, or memory misconfiguration—not cluster size. Diagnose before scaling.
- The difference between a 2-hour job and a 10-minute job often comes down to understanding shuffle operations and minimizing wide transformations.
- Adaptive Query Execution (AQE) in Spark 3.x solves many historical tuning headaches automatically, but you still need to understand the fundamentals to debug edge cases.
Understanding Spark Execution Model
Before tuning anything, you need to understand what Spark is actually doing. Every Spark application breaks down into jobs, stages, and tasks. Jobs are triggered by actions like count() or write(). Stages are groups of tasks that can run without shuffling data. Tasks are the smallest unit of work, each processing one partition.
The critical concept is the distinction between narrow and wide transformations. Narrow transformations like map(), filter(), and select() can be pipelined within a single stage because each output partition depends on only one input partition. Wide transformations like groupBy(), join(), and repartition() require shuffling data across the network, creating stage boundaries.
Every shuffle is expensive. It involves serializing data, writing to disk, transferring over the network, and deserializing. Your first tuning goal should always be minimizing shuffles.
The Spark UI’s DAG visualization shows you exactly where shuffles occur. But for programmatic analysis, use explain():
val salesByRegion = spark.read.parquet("s3://data/sales")
.filter($"year" === 2024)
.groupBy("region")
.agg(sum("amount").as("total"))
salesByRegion.explain(mode = "extended")
This outputs the parsed logical plan, analyzed logical plan, optimized logical plan, and physical plan. Look for Exchange operators in the physical plan—these indicate shuffles. If you see multiple exchanges where you expected one, you have optimization opportunities.
Memory Management & Configuration
Spark executor memory is divided into three regions: execution memory for shuffles, joins, and aggregations; storage memory for cached data; and overhead for JVM internals and off-heap allocations.
The unified memory model (default since Spark 1.6) allows execution and storage to borrow from each other, governed by spark.memory.fraction (default 0.6) and spark.memory.storageFraction (default 0.5). This means 60% of executor heap is available for Spark operations, split evenly between execution and storage with dynamic borrowing.
Here’s a properly configured SparkSession for a memory-intensive workload:
val spark = SparkSession.builder()
.appName("MemoryOptimizedJob")
.config("spark.executor.memory", "8g")
.config("spark.executor.memoryOverhead", "2g")
.config("spark.memory.fraction", "0.7")
.config("spark.memory.storageFraction", "0.3")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.executor.cores", "4")
.getOrCreate()
For garbage collection tuning, G1GC is now the default and works well for most workloads. If you’re seeing long GC pauses in executor logs, try:
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16m")
The key insight: if you’re spilling to disk frequently (visible in Spark UI stage metrics), increase spark.memory.fraction or executor memory. If GC is killing you, reduce memory per executor and add more executors.
Data Serialization & Storage Formats
Serialization affects both shuffle performance and cache efficiency. Kryo serialization is 10x faster and 2-5x more compact than Java serialization. There’s no reason not to use it.
val spark = SparkSession.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrationRequired", "false")
.config("spark.kryoserializer.buffer.max", "512m")
.getOrCreate()
// Register custom classes for better performance
spark.conf.set("spark.kryo.classesToRegister",
"com.myapp.UserEvent,com.myapp.Transaction")
For storage formats, Parquet with Snappy compression is the default choice for good reason. It provides columnar storage with predicate pushdown, meaning Spark can skip reading irrelevant columns and row groups entirely. ORC is comparable and sometimes better for Hive-heavy environments.
Avoid JSON and CSV for anything beyond small datasets or debugging. They’re 5-10x larger and dramatically slower to parse.
Compression codec selection matters for I/O-bound workloads:
| Codec | Compression Ratio | Speed | Splittable |
|---|---|---|---|
| Snappy | Medium | Fast | Yes (in Parquet) |
| LZ4 | Medium | Fastest | Yes |
| Zstd | High | Medium | Yes |
| Gzip | Highest | Slow | No |
Use Zstd for cold storage where read speed matters less than storage costs. Use Snappy or LZ4 for hot data with frequent reads.
Partitioning Strategies
The 128MB partition rule exists because it balances parallelism against task overhead. Too many small partitions create scheduling overhead; too few large partitions underutilize cores and risk OOM errors.
Calculate target partitions as: total_data_size / 128MB. For a 100GB dataset, aim for roughly 800 partitions.
repartition() and coalesce() are not interchangeable. repartition(n) performs a full shuffle to create exactly n partitions with even distribution. coalesce(n) only reduces partitions by combining existing ones without a shuffle—it cannot increase partition count and may create uneven partitions.
// After filtering, you have 1000 partitions but only 10GB of data
// Use coalesce to reduce without shuffle
val filtered = rawData.filter($"status" === "active")
val optimized = filtered.coalesce(80) // ~128MB per partition
// For joins, repartition by join key to co-locate data
val orders = spark.read.parquet("orders")
.repartition(200, $"customer_id")
val customers = spark.read.parquet("customers")
.repartition(200, $"customer_id")
// Now the join is partition-local, minimizing shuffle
val joined = orders.join(customers, "customer_id")
Handling Data Skew
Data skew is the silent killer of Spark jobs. One partition with 10x the data of others means one task runs 10x longer while other executors sit idle.
Identify skew in the Spark UI by examining stage task metrics. Look for massive variance between median and max task duration, or check the “Input Size” column for tasks within a stage.
The salting technique handles skewed joins by distributing hot keys across multiple partitions:
import org.apache.spark.sql.functions._
val saltBuckets = 10
// Add salt to the skewed (large) table
val saltedOrders = orders
.withColumn("salt", (rand() * saltBuckets).cast("int"))
.withColumn("salted_key", concat($"customer_id", lit("_"), $"salt"))
// Explode the small table to match all salt values
val saltRange = (0 until saltBuckets).toDF("salt")
val explodedCustomers = customers
.crossJoin(saltRange)
.withColumn("salted_key", concat($"customer_id", lit("_"), $"salt"))
// Join on salted key
val result = saltedOrders
.join(explodedCustomers, "salted_key")
.drop("salt", "salted_key")
Spark 3.0+ includes Adaptive Query Execution which handles skew automatically. Enable it:
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
AQE detects skewed partitions at runtime and splits them automatically. It’s not magic—extremely skewed data still needs manual intervention—but it handles 80% of cases.
Join Optimization Techniques
Broadcast joins eliminate shuffles entirely by sending the small table to all executors. Spark automatically broadcasts tables under spark.sql.autoBroadcastJoinThreshold (default 10MB). For larger dimension tables, use explicit hints:
import org.apache.spark.sql.functions.broadcast
// Explicit broadcast for a 500MB dimension table
val enriched = facts.join(broadcast(dimensions), "dim_key")
For repeated joins on the same keys, bucketing pre-partitions data on disk:
// Create bucketed tables once
orders.write
.bucketBy(256, "customer_id")
.sortBy("customer_id")
.saveAsTable("orders_bucketed")
customers.write
.bucketBy(256, "customer_id")
.sortBy("customer_id")
.saveAsTable("customers_bucketed")
// Subsequent joins are shuffle-free
val ordersDf = spark.table("orders_bucketed")
val customersDf = spark.table("customers_bucketed")
val joined = ordersDf.join(customersDf, "customer_id") // No exchange in plan
Bucketing requires both tables to have the same bucket count and be sorted on the join key. The payoff is massive for ETL pipelines that join the same tables repeatedly.
Caching, Checkpointing & Cluster Sizing
Cache when you reuse a DataFrame multiple times in the same job. Don’t cache speculatively.
import org.apache.spark.storage.StorageLevel
// For iterative algorithms or multi-use DataFrames
val processed = rawData
.transform(cleanData)
.transform(enrichData)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Force materialization
processed.count()
// Use multiple times
val summary1 = processed.groupBy("category").count()
val summary2 = processed.groupBy("region").agg(sum("amount"))
// Clean up when done
processed.unpersist()
MEMORY_AND_DISK_SER is the pragmatic default—it serializes data (saving memory) and spills to disk if needed (preventing failures).
Checkpointing breaks lineage for iterative algorithms where the DAG grows unboundedly:
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints")
var graph = initialGraph
for (i <- 1 to 100) {
graph = graph.transform(iterate)
if (i % 10 == 0) {
graph.checkpoint()
graph.count() // Force materialization
}
}
For cluster sizing, follow this formula: 4-5 cores per executor (more causes GC pressure), memory at 4-8GB per core, and total executors based on your data size and parallelism needs. A 100GB shuffle workload with 200 partitions needs at least 40 cores to process partitions efficiently, suggesting 8-10 executors with 4-5 cores each.
Start conservative and scale based on Spark UI metrics—not guesswork.