Spark Scala - Repartition and Coalesce
Partitioning is the foundation of Spark's distributed computing model. When you load data into Spark, it divides that data into chunks called partitions, distributing them across your cluster's...
Key Insights
- Use
coalesce()when reducing partitions to avoid expensive shuffles, but be aware it can create uneven data distribution - Use
repartition()when you need to increase partitions, redistribute skewed data, or partition by a specific column for optimized joins - Aim for partition sizes between 100MB-200MB, with partition counts typically 2-4x your cluster’s CPU cores
Introduction to Data Partitioning in Spark
Partitioning is the foundation of Spark’s distributed computing model. When you load data into Spark, it divides that data into chunks called partitions, distributing them across your cluster’s executors. Each partition is processed independently, enabling parallel computation.
The number of partitions directly impacts your job’s performance. Too few partitions underutilize your cluster—you might have 100 cores but only 10 partitions, leaving 90 cores idle. Too many partitions create excessive overhead from task scheduling and shuffle operations. Getting this balance right is where repartition() and coalesce() come in.
Both operations change your partition count, but they work fundamentally differently. Understanding when to use each can mean the difference between a job that runs in minutes versus hours.
Understanding Repartition
The repartition() operation performs a full shuffle of your data across the cluster. It reads all records, redistributes them evenly across the specified number of partitions, and writes them to new partition files. This is an expensive operation but guarantees even data distribution.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("RepartitionExample")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Create a DataFrame with default partitioning
val df = spark.range(1, 1000000).toDF("id")
println(s"Initial partitions: ${df.rdd.getNumPartitions}")
// Output: Initial partitions: 8 (varies by environment)
// Repartition to 20 partitions
val repartitionedDf = df.repartition(20)
println(s"After repartition: ${repartitionedDf.rdd.getNumPartitions}")
// Output: After repartition: 20
Repartition can both increase and decrease partition counts. When you call repartition(20) on a DataFrame with 8 partitions, Spark shuffles all data and creates 20 new, evenly-sized partitions. When you call it on a DataFrame with 100 partitions, it still shuffles everything but consolidates into 20 partitions.
Use repartition() when you need to increase partitions, when your data is skewed and needs redistribution, or before wide transformations like joins where even distribution matters.
Understanding Coalesce
The coalesce() operation reduces partition count without a full shuffle. It’s a narrow transformation that combines existing partitions on the same executor, avoiding network data transfer when possible.
// Start with a large dataset
val largeDf = spark.range(1, 10000000).toDF("id")
.repartition(100) // Force 100 partitions
println(s"Before filter: ${largeDf.rdd.getNumPartitions}")
// Output: Before filter: 100
// Filter reduces data significantly
val filteredDf = largeDf.filter($"id" % 1000 === 0)
// Coalesce to reduce partitions without shuffle
val coalescedDf = filteredDf.coalesce(10)
println(s"After coalesce: ${coalescedDf.rdd.getNumPartitions}")
// Output: After coalesce: 10
Coalesce only reduces partitions—it cannot increase them. If you call coalesce(100) on a DataFrame with 10 partitions, you’ll still have 10 partitions. Spark silently ignores requests to increase partitions via coalesce.
The key advantage is performance. Since coalesce avoids shuffling, it’s dramatically faster than repartition for reducing partition counts. Use it after filtering operations that significantly reduce data volume, or before writing output files when you want fewer files than your current partition count.
Repartition vs Coalesce: Key Differences
Let’s examine the execution plans to understand what’s happening under the hood:
val testDf = spark.range(1, 1000000).toDF("id").repartition(50)
// Examine repartition execution plan
println("=== Repartition Plan ===")
testDf.repartition(10).explain(true)
// Examine coalesce execution plan
println("\n=== Coalesce Plan ===")
testDf.coalesce(10).explain(true)
The repartition plan shows an Exchange operator with RoundRobinPartitioning, indicating a full shuffle. The coalesce plan shows a Coalesce operator without an exchange—data stays local.
| Aspect | Repartition | Coalesce |
|---|---|---|
| Shuffle | Full shuffle | No shuffle (narrow) |
| Direction | Increase or decrease | Decrease only |
| Distribution | Even across partitions | May be uneven |
| Performance | Expensive | Cheap |
| Network I/O | High | Minimal |
| Use case | Redistribution, joins | Reducing output files |
The performance difference is substantial. On a moderately sized dataset, repartition might take 30 seconds while coalesce completes in under a second. However, coalesce’s lack of shuffling means it simply combines adjacent partitions, potentially creating uneven sizes.
Practical Use Cases and Best Practices
Repartitioning by Column
One of repartition’s most powerful features is column-based partitioning. When you repartition by a column, all rows with the same key land in the same partition:
case class Transaction(userId: Int, amount: Double, category: String)
val transactions = Seq(
Transaction(1, 100.0, "food"),
Transaction(2, 200.0, "electronics"),
Transaction(1, 50.0, "food"),
Transaction(3, 300.0, "travel"),
Transaction(2, 150.0, "food")
).toDF()
// Repartition by userId for optimized joins
val partitionedByUser = transactions.repartition($"userId")
// Now joining with another DataFrame partitioned by userId
// avoids additional shuffles
val userProfiles = Seq(
(1, "Alice"),
(2, "Bob"),
(3, "Charlie")
).toDF("userId", "name")
val userProfilesPartitioned = userProfiles.repartition($"userId")
// This join benefits from co-located data
val joined = partitionedByUser.join(userProfilesPartitioned, "userId")
Coalesce Before Writing
The most common coalesce use case is controlling output file count:
val processedData = spark.read.parquet("/input/large-dataset")
.filter($"status" === "active")
.select("id", "name", "value")
// Instead of writing 200 small files, write 10 larger ones
processedData
.coalesce(10)
.write
.mode("overwrite")
.parquet("/output/processed-data")
Partition Count Guidelines
For optimal performance, target partition sizes between 100MB and 200MB. Calculate your ideal partition count:
// Estimate data size and calculate partitions
val dataSizeBytes = 10L * 1024 * 1024 * 1024 // 10 GB
val targetPartitionSizeBytes = 128L * 1024 * 1024 // 128 MB
val idealPartitions = (dataSizeBytes / targetPartitionSizeBytes).toInt
println(s"Recommended partitions: $idealPartitions")
// Output: Recommended partitions: 80
As a rule of thumb, use 2-4x your total executor cores. If you have 10 executors with 4 cores each, target 80-160 partitions.
Common Pitfalls and Performance Considerations
Uneven Partitions After Coalesce
Coalesce’s biggest drawback is uneven data distribution. Let’s measure it:
val unevenDf = spark.range(1, 1000000).toDF("id")
.repartition(100)
.filter($"id" < 100000) // 90% of data filtered out
.coalesce(10)
// Count records per partition
val partitionCounts = unevenDf.rdd.mapPartitionsWithIndex {
(index, iterator) =>
Iterator((index, iterator.size))
}.collect()
partitionCounts.foreach { case (partition, count) =>
println(s"Partition $partition: $count records")
}
// Output shows uneven distribution:
// Partition 0: 15234 records
// Partition 1: 8123 records
// Partition 2: 12456 records
// ... (varies significantly)
If partition imbalance causes performance issues, use repartition() instead despite the shuffle cost.
Over-Partitioning Overhead
Each partition creates task scheduling overhead. With 10,000 partitions for a 1GB dataset, you spend more time coordinating tasks than processing data:
// Bad: way too many partitions for the data size
val overPartitioned = smallDf.repartition(10000)
// Better: appropriate partition count
val rightSized = smallDf.repartition(8)
Unnecessary Shuffles
Avoid repartitioning when you don’t need to. Every shuffle is expensive:
// Bad: unnecessary repartition before simple transformation
val wasteful = df
.repartition(100)
.withColumn("doubled", $"value" * 2)
.repartition(100) // Why?
// Good: repartition only when needed
val efficient = df
.withColumn("doubled", $"value" * 2)
.repartition(100) // Once, before the write
Summary
| Scenario | Use | Reason |
|---|---|---|
| Reducing output file count | coalesce() |
Avoids shuffle overhead |
| Increasing partition count | repartition() |
Coalesce cannot increase |
| Skewed data before join | repartition(col) |
Even distribution needed |
| After heavy filtering | coalesce() |
Combine sparse partitions |
| Before wide transformation | repartition() |
Prevent downstream skew |
| Data already well-distributed | coalesce() |
Preserve existing layout |
The decision comes down to this: if you need even distribution or more partitions, pay the shuffle cost with repartition(). If you’re reducing partitions and can tolerate some imbalance, use coalesce() for the performance win.
Monitor your partition sizes in the Spark UI. Look for tasks that take significantly longer than others—that’s partition skew. When you see it, repartition() is your fix. When partitions are reasonably balanced and you just need fewer of them, coalesce() is the efficient choice.