PySpark - Repartition and Coalesce
Partitioning is the foundation of distributed computing in PySpark. Your DataFrame is split across multiple partitions, each processed independently on different executor cores. Get this wrong, and...
Key Insights
- Repartition performs a full shuffle and can increase or decrease partitions, while coalesce only reduces partitions with minimal data movement, making it significantly faster for downsizing operations.
- Partition management directly impacts performance: too few partitions underutilize cluster resources, while too many create excessive overhead and small file problems.
- Choose repartition when you need even data distribution or to partition by specific columns; use coalesce when reducing partitions before writing output or when you can tolerate some partition imbalance.
Introduction to Data Partitioning in PySpark
Partitioning is the foundation of distributed computing in PySpark. Your DataFrame is split across multiple partitions, each processed independently on different executor cores. Get this wrong, and you’ll either waste cluster resources or create bottlenecks that cripple performance.
The partition count affects everything: parallelism levels, memory consumption per task, shuffle operation costs, and the number of output files generated. A 100GB dataset with 2 partitions means each task processes 50GB—likely causing out-of-memory errors. The same dataset with 10,000 partitions creates excessive scheduling overhead and tiny tasks that spend more time on coordination than actual work.
Here’s how to check your current partition count:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PartitionDemo").getOrCreate()
# Create sample DataFrame
df = spark.range(0, 1000000).toDF("id")
# Check partition count
num_partitions = df.rdd.getNumPartitions()
print(f"Current partitions: {num_partitions}")
# See how data is distributed across partitions
partition_counts = df.rdd.mapPartitions(
lambda it: [sum(1 for _ in it)]
).collect()
print(f"Records per partition: {partition_counts}")
Understanding Repartition()
Repartition is the sledgehammer of partition management. It performs a full shuffle, redistributing all data across the cluster to create the specified number of partitions with relatively even distribution. This is expensive—every record gets serialized, sent across the network, and deserialized—but sometimes necessary.
Use repartition when you need to:
- Increase partition count for better parallelism
- Ensure even data distribution across partitions
- Partition data by specific columns for downstream operations
- Fix severely skewed partitions
Here’s repartition in action:
# Basic repartition - specify number of partitions
df_repart = df.repartition(10)
print(f"After repartition: {df_repart.rdd.getNumPartitions()}")
# Repartition by column - critical for join optimization
users_df = spark.createDataFrame([
(1, "Alice", "NY"),
(2, "Bob", "CA"),
(3, "Charlie", "NY"),
(4, "David", "CA"),
(5, "Eve", "TX")
], ["user_id", "name", "state"])
# Partition by state - all NY records in same partition(s)
by_state = users_df.repartition("state")
print(f"Partitioned by state: {by_state.rdd.getNumPartitions()}")
# Repartition by multiple columns with specific partition count
by_multiple = users_df.repartition(5, "state", "name")
# Verify distribution
def show_partition_data(partition_iter):
data = list(partition_iter)
if data:
yield f"Partition has {len(data)} records: {data[:3]}"
else:
yield "Empty partition"
partition_info = by_state.rdd.mapPartitions(show_partition_data).collect()
for info in partition_info:
print(info)
When you repartition by column, Spark uses hash partitioning. Records with the same key(s) land in the same partition, which is invaluable for joins, groupBy operations, and ensuring related data stays together.
Understanding Coalesce()
Coalesce is the scalpel—precise and efficient, but limited in scope. It only reduces partition count and does so by combining existing partitions without a full shuffle. Instead of redistributing all data, it simply merges partitions together, which is dramatically faster.
The tradeoff: partition sizes may become uneven. If you have 10 partitions with varying sizes and coalesce to 2, you might end up with one partition containing 80% of your data and another with 20%.
# Start with many partitions
df_many = spark.range(0, 100000).repartition(100)
print(f"Initial partitions: {df_many.rdd.getNumPartitions()}")
# Coalesce down to fewer partitions
df_few = df_many.coalesce(5)
print(f"After coalesce: {df_few.rdd.getNumPartitions()}")
# Examine partition distribution
def count_partition_size(idx, partition_iter):
count = sum(1 for _ in partition_iter)
yield (idx, count)
partition_sizes = df_few.rdd.mapPartitionsWithIndex(
count_partition_size
).collect()
for idx, size in partition_sizes:
print(f"Partition {idx}: {size} records")
Coalesce is perfect for the final stage before writing data. If your job creates 200 partitions during processing but you only want 10 output files, coalesce(10) is far cheaper than repartition(10).
Key Differences and Performance Comparison
The fundamental difference is shuffle behavior:
| Aspect | Repartition | Coalesce |
|---|---|---|
| Shuffle | Full shuffle always | No shuffle (minimal movement) |
| Direction | Increase or decrease | Decrease only |
| Distribution | Even distribution | Potentially uneven |
| Performance | Slower, network-intensive | Faster, local operation |
| Use case | Need even distribution | Reducing partitions efficiently |
Here’s a performance comparison:
import time
# Create a larger dataset
large_df = spark.range(0, 10000000).repartition(100)
# Benchmark repartition
start = time.time()
large_df.repartition(10).write.mode("overwrite").parquet("/tmp/repart_test")
repartition_time = time.time() - start
# Benchmark coalesce
start = time.time()
large_df.coalesce(10).write.mode("overwrite").parquet("/tmp/coalesce_test")
coalesce_time = time.time() - start
print(f"Repartition time: {repartition_time:.2f}s")
print(f"Coalesce time: {coalesce_time:.2f}s")
print(f"Coalesce is {repartition_time/coalesce_time:.1f}x faster")
# Check Spark UI for shuffle read/write metrics
# Repartition will show significant shuffle write/read
# Coalesce will show minimal to no shuffle
In typical scenarios, coalesce is 3-10x faster when reducing partitions because it avoids network transfer of data.
Real-World Use Cases and Best Practices
Writing Output Files
The most common use case: controlling output file count. Each partition becomes one output file.
# Bad: Creates 200 small files
df.write.parquet("/output/path")
# Good: Creates 10 reasonably-sized files
df.coalesce(10).write.parquet("/output/path")
# When you need exactly 1 file (use sparingly - bottleneck!)
df.coalesce(1).write.csv("/output/single_file.csv")
Optimizing Joins
Repartition both DataFrames by the join key before large joins:
# Inefficient join
result = users.join(orders, "user_id")
# Optimized join - data pre-shuffled by join key
users_repart = users.repartition("user_id")
orders_repart = orders.repartition("user_id")
result = users_repart.join(orders_repart, "user_id")
This eliminates shuffle during the join itself since matching keys are already co-located.
Handling Data Skew
When one partition is much larger than others:
# Detect skew
partition_sizes = df.rdd.mapPartitionsWithIndex(
lambda idx, it: [(idx, sum(1 for _ in it))]
).collect()
max_size = max(size for _, size in partition_sizes)
min_size = min(size for _, size in partition_sizes)
skew_ratio = max_size / min_size if min_size > 0 else float('inf')
print(f"Skew ratio: {skew_ratio:.2f}")
# Fix with repartition
if skew_ratio > 3: # Arbitrary threshold
df = df.repartition(df.rdd.getNumPartitions() * 2)
Pre-Aggregation Optimization
Increase partitions before expensive operations, reduce after:
# Increase parallelism for heavy computation
df_expanded = df.repartition(200)
# Perform expensive transformations
df_processed = df_expanded.withColumn("complex_calc", expensive_udf(col("data")))
# Reduce before writing
df_processed.coalesce(20).write.parquet("/output")
Common Pitfalls and Troubleshooting
Too Many Small Files
Symptom: Thousands of tiny output files, slow read performance.
# Check output file count
import os
output_path = "/tmp/output"
file_count = len([f for f in os.listdir(output_path) if f.endswith('.parquet')])
# Fix: Coalesce before writing
target_file_size_mb = 128
total_size_mb = df.count() * avg_record_size / (1024 * 1024)
optimal_partitions = max(1, int(total_size_mb / target_file_size_mb))
df.coalesce(optimal_partitions).write.parquet(output_path)
Out of Memory Errors
Symptom: Executors crashing during repartition or coalesce.
# Too few partitions = too much data per partition
# Rule of thumb: 128MB - 1GB per partition
df_size_gb = 500 # Your dataset size
partition_size_mb = 256
optimal_partitions = int((df_size_gb * 1024) / partition_size_mb)
df = df.repartition(optimal_partitions)
Unnecessary Repartitioning
Don’t repartition if Spark’s default partitioning is already good:
# Bad: Repartitioning when reading already creates good partitions
df = spark.read.parquet("/input").repartition(100)
# Good: Trust the input partitioning if it's reasonable
df = spark.read.parquet("/input")
if df.rdd.getNumPartitions() < 50: # Only if needed
df = df.repartition(100)
Coalesce on Skewed Data
Coalescing skewed data can create super-partitions:
# Check skew before coalescing
partition_sizes = df.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
if max(partition_sizes) / (sum(partition_sizes) / len(partition_sizes)) > 2:
# Skewed - use repartition instead
df = df.repartition(target_partitions)
else:
# Balanced - coalesce is safe
df = df.coalesce(target_partitions)
Master these tools and you’ll write PySpark jobs that scale efficiently, avoid common performance pitfalls, and make optimal use of your cluster resources. The key is understanding when to pay the shuffle cost for perfect distribution versus when good-enough distribution with minimal overhead is the right choice.