Spark Scala - Repartition and Coalesce

Partitioning is the foundation of Spark's distributed computing model. When you load data into Spark, it divides that data into chunks called partitions, distributing them across your cluster's...

Key Insights

  • Use coalesce() when reducing partitions to avoid expensive shuffles, but be aware it can create uneven data distribution
  • Use repartition() when you need to increase partitions, redistribute skewed data, or partition by a specific column for optimized joins
  • Aim for partition sizes between 100MB-200MB, with partition counts typically 2-4x your cluster’s CPU cores

Introduction to Data Partitioning in Spark

Partitioning is the foundation of Spark’s distributed computing model. When you load data into Spark, it divides that data into chunks called partitions, distributing them across your cluster’s executors. Each partition is processed independently, enabling parallel computation.

The number of partitions directly impacts your job’s performance. Too few partitions underutilize your cluster—you might have 100 cores but only 10 partitions, leaving 90 cores idle. Too many partitions create excessive overhead from task scheduling and shuffle operations. Getting this balance right is where repartition() and coalesce() come in.

Both operations change your partition count, but they work fundamentally differently. Understanding when to use each can mean the difference between a job that runs in minutes versus hours.

Understanding Repartition

The repartition() operation performs a full shuffle of your data across the cluster. It reads all records, redistributes them evenly across the specified number of partitions, and writes them to new partition files. This is an expensive operation but guarantees even data distribution.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("RepartitionExample")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

// Create a DataFrame with default partitioning
val df = spark.range(1, 1000000).toDF("id")

println(s"Initial partitions: ${df.rdd.getNumPartitions}")
// Output: Initial partitions: 8 (varies by environment)

// Repartition to 20 partitions
val repartitionedDf = df.repartition(20)

println(s"After repartition: ${repartitionedDf.rdd.getNumPartitions}")
// Output: After repartition: 20

Repartition can both increase and decrease partition counts. When you call repartition(20) on a DataFrame with 8 partitions, Spark shuffles all data and creates 20 new, evenly-sized partitions. When you call it on a DataFrame with 100 partitions, it still shuffles everything but consolidates into 20 partitions.

Use repartition() when you need to increase partitions, when your data is skewed and needs redistribution, or before wide transformations like joins where even distribution matters.

Understanding Coalesce

The coalesce() operation reduces partition count without a full shuffle. It’s a narrow transformation that combines existing partitions on the same executor, avoiding network data transfer when possible.

// Start with a large dataset
val largeDf = spark.range(1, 10000000).toDF("id")
  .repartition(100) // Force 100 partitions

println(s"Before filter: ${largeDf.rdd.getNumPartitions}")
// Output: Before filter: 100

// Filter reduces data significantly
val filteredDf = largeDf.filter($"id" % 1000 === 0)

// Coalesce to reduce partitions without shuffle
val coalescedDf = filteredDf.coalesce(10)

println(s"After coalesce: ${coalescedDf.rdd.getNumPartitions}")
// Output: After coalesce: 10

Coalesce only reduces partitions—it cannot increase them. If you call coalesce(100) on a DataFrame with 10 partitions, you’ll still have 10 partitions. Spark silently ignores requests to increase partitions via coalesce.

The key advantage is performance. Since coalesce avoids shuffling, it’s dramatically faster than repartition for reducing partition counts. Use it after filtering operations that significantly reduce data volume, or before writing output files when you want fewer files than your current partition count.

Repartition vs Coalesce: Key Differences

Let’s examine the execution plans to understand what’s happening under the hood:

val testDf = spark.range(1, 1000000).toDF("id").repartition(50)

// Examine repartition execution plan
println("=== Repartition Plan ===")
testDf.repartition(10).explain(true)

// Examine coalesce execution plan
println("\n=== Coalesce Plan ===")
testDf.coalesce(10).explain(true)

The repartition plan shows an Exchange operator with RoundRobinPartitioning, indicating a full shuffle. The coalesce plan shows a Coalesce operator without an exchange—data stays local.

Aspect Repartition Coalesce
Shuffle Full shuffle No shuffle (narrow)
Direction Increase or decrease Decrease only
Distribution Even across partitions May be uneven
Performance Expensive Cheap
Network I/O High Minimal
Use case Redistribution, joins Reducing output files

The performance difference is substantial. On a moderately sized dataset, repartition might take 30 seconds while coalesce completes in under a second. However, coalesce’s lack of shuffling means it simply combines adjacent partitions, potentially creating uneven sizes.

Practical Use Cases and Best Practices

Repartitioning by Column

One of repartition’s most powerful features is column-based partitioning. When you repartition by a column, all rows with the same key land in the same partition:

case class Transaction(userId: Int, amount: Double, category: String)

val transactions = Seq(
  Transaction(1, 100.0, "food"),
  Transaction(2, 200.0, "electronics"),
  Transaction(1, 50.0, "food"),
  Transaction(3, 300.0, "travel"),
  Transaction(2, 150.0, "food")
).toDF()

// Repartition by userId for optimized joins
val partitionedByUser = transactions.repartition($"userId")

// Now joining with another DataFrame partitioned by userId
// avoids additional shuffles
val userProfiles = Seq(
  (1, "Alice"),
  (2, "Bob"),
  (3, "Charlie")
).toDF("userId", "name")

val userProfilesPartitioned = userProfiles.repartition($"userId")

// This join benefits from co-located data
val joined = partitionedByUser.join(userProfilesPartitioned, "userId")

Coalesce Before Writing

The most common coalesce use case is controlling output file count:

val processedData = spark.read.parquet("/input/large-dataset")
  .filter($"status" === "active")
  .select("id", "name", "value")

// Instead of writing 200 small files, write 10 larger ones
processedData
  .coalesce(10)
  .write
  .mode("overwrite")
  .parquet("/output/processed-data")

Partition Count Guidelines

For optimal performance, target partition sizes between 100MB and 200MB. Calculate your ideal partition count:

// Estimate data size and calculate partitions
val dataSizeBytes = 10L * 1024 * 1024 * 1024 // 10 GB
val targetPartitionSizeBytes = 128L * 1024 * 1024 // 128 MB
val idealPartitions = (dataSizeBytes / targetPartitionSizeBytes).toInt

println(s"Recommended partitions: $idealPartitions")
// Output: Recommended partitions: 80

As a rule of thumb, use 2-4x your total executor cores. If you have 10 executors with 4 cores each, target 80-160 partitions.

Common Pitfalls and Performance Considerations

Uneven Partitions After Coalesce

Coalesce’s biggest drawback is uneven data distribution. Let’s measure it:

val unevenDf = spark.range(1, 1000000).toDF("id")
  .repartition(100)
  .filter($"id" < 100000) // 90% of data filtered out
  .coalesce(10)

// Count records per partition
val partitionCounts = unevenDf.rdd.mapPartitionsWithIndex { 
  (index, iterator) =>
    Iterator((index, iterator.size))
}.collect()

partitionCounts.foreach { case (partition, count) =>
  println(s"Partition $partition: $count records")
}
// Output shows uneven distribution:
// Partition 0: 15234 records
// Partition 1: 8123 records
// Partition 2: 12456 records
// ... (varies significantly)

If partition imbalance causes performance issues, use repartition() instead despite the shuffle cost.

Over-Partitioning Overhead

Each partition creates task scheduling overhead. With 10,000 partitions for a 1GB dataset, you spend more time coordinating tasks than processing data:

// Bad: way too many partitions for the data size
val overPartitioned = smallDf.repartition(10000)

// Better: appropriate partition count
val rightSized = smallDf.repartition(8)

Unnecessary Shuffles

Avoid repartitioning when you don’t need to. Every shuffle is expensive:

// Bad: unnecessary repartition before simple transformation
val wasteful = df
  .repartition(100)
  .withColumn("doubled", $"value" * 2)
  .repartition(100) // Why?

// Good: repartition only when needed
val efficient = df
  .withColumn("doubled", $"value" * 2)
  .repartition(100) // Once, before the write

Summary

Scenario Use Reason
Reducing output file count coalesce() Avoids shuffle overhead
Increasing partition count repartition() Coalesce cannot increase
Skewed data before join repartition(col) Even distribution needed
After heavy filtering coalesce() Combine sparse partitions
Before wide transformation repartition() Prevent downstream skew
Data already well-distributed coalesce() Preserve existing layout

The decision comes down to this: if you need even distribution or more partitions, pay the shuffle cost with repartition(). If you’re reducing partitions and can tolerate some imbalance, use coalesce() for the performance win.

Monitor your partition sizes in the Spark UI. Look for tasks that take significantly longer than others—that’s partition skew. When you see it, repartition() is your fix. When partitions are reasonably balanced and you just need fewer of them, coalesce() is the efficient choice.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.