Apache Spark - Bucketing for Performance

Bucketing is Spark's mechanism for pre-shuffling data at write time. Instead of paying the shuffle cost during every query, you pay it once when writing the data. The result: joins and aggregations...

Key Insights

  • Bucketing pre-organizes data by hash value during write time, eliminating expensive shuffle operations during joins and aggregations on bucketed columns
  • Unlike partitioning which creates directory structures based on column values, bucketing creates a fixed number of files using hash distribution—making it ideal for high-cardinality columns
  • Shuffle-free joins require both tables to be bucketed on join keys with compatible bucket counts, and the tables must be read from the Hive metastore (not direct file paths)

Introduction to Bucketing

Bucketing is Spark’s mechanism for pre-shuffling data at write time. Instead of paying the shuffle cost during every query, you pay it once when writing the data. The result: joins and aggregations on bucketed columns can skip the shuffle phase entirely.

This differs fundamentally from partitioning. Partitioning creates directory structures based on column values—great for filtering on low-cardinality columns like date or region. Bucketing creates a fixed number of files where each record’s destination is determined by hashing the bucket column. This makes bucketing ideal for high-cardinality columns like user_id or order_id that you frequently join or aggregate on.

Use bucketing when you have:

  • Large tables that are repeatedly joined on the same columns
  • Aggregation queries that consistently group by specific columns
  • Relatively stable schemas and write patterns
  • Tables stored in formats that support bucketing (Parquet, ORC)

How Bucketing Works Under the Hood

When you write a bucketed table, Spark hashes each row’s bucket column value and assigns it to one of N buckets (files). The hash function is deterministic—the same value always lands in the same bucket. This means when two tables are bucketed on their join keys, matching records are guaranteed to be in corresponding files.

Here’s what a bucketed table looks like on disk:

warehouse/
└── orders_bucketed/
    ├── part-00000-xxx_00000.snappy.parquet  # bucket 0
    ├── part-00000-xxx_00001.snappy.parquet  # bucket 1
    ├── part-00000-xxx_00002.snappy.parquet  # bucket 2
    ├── part-00000-xxx_00003.snappy.parquet  # bucket 3
    └── ...                                   # up to bucket N-1

The bucket number is embedded in the filename. Spark stores bucket metadata in the Hive metastore, including the bucket columns, number of buckets, and sort columns if specified. During query planning, the Catalyst optimizer reads this metadata to determine whether shuffles can be eliminated.

The hash function Spark uses is consistent across the cluster, meaning bucket 5 on one table will contain values that would also hash to bucket 5 on another table with the same bucket count. This is the foundation of shuffle-free joins.

Creating Bucketed Tables

You can create bucketed tables using either the DataFrame API or Spark SQL. The DataFrame approach uses bucketBy() combined with saveAsTable():

// DataFrame API - bucketing orders by customer_id
orders
  .write
  .format("parquet")
  .bucketBy(64, "customer_id")
  .sortBy("order_date")
  .saveAsTable("orders_bucketed")

// Reading the bucketed table
val ordersBucketed = spark.table("orders_bucketed")

The sortBy() is optional but recommended—it enables additional optimizations like sort-merge joins without local sorting.

Using Spark SQL:

-- SQL syntax for bucketed table
CREATE TABLE orders_bucketed
USING parquet
CLUSTERED BY (customer_id) 
SORTED BY (order_date)
INTO 64 BUCKETS
AS SELECT * FROM orders;

-- Or with explicit schema
CREATE TABLE customers_bucketed (
    customer_id BIGINT,
    name STRING,
    email STRING,
    created_at TIMESTAMP
)
USING parquet
CLUSTERED BY (customer_id) INTO 64 BUCKETS;

Critical requirement: you must use saveAsTable(), not save(). Bucketing metadata lives in the Hive metastore, and writing to a path directly bypasses this. Tables written with save() won’t be recognized as bucketed.

Optimizing Joins with Bucketing

The primary use case for bucketing is eliminating shuffle in joins. When both sides of a join are bucketed on the join keys, Spark can perform a bucket-to-bucket join without moving data across the network.

Let’s compare explain plans:

// Without bucketing - shuffle required
val ordersRegular = spark.read.parquet("/data/orders")
val customersRegular = spark.read.parquet("/data/customers")

ordersRegular.join(customersRegular, "customer_id").explain()
== Physical Plan ==
*(5) SortMergeJoin [customer_id], [customer_id], Inner
:- *(2) Sort [customer_id ASC], false, 0
:  +- Exchange hashpartitioning(customer_id, 200)  // SHUFFLE!
:     +- *(1) FileScan parquet [customer_id,...]
+- *(4) Sort [customer_id ASC], false, 0
   +- Exchange hashpartitioning(customer_id, 200)  // SHUFFLE!
      +- *(3) FileScan parquet [customer_id,...]

Now with bucketed tables:

// With bucketing - no shuffle
val ordersBucketed = spark.table("orders_bucketed")
val customersBucketed = spark.table("customers_bucketed")

ordersBucketed.join(customersBucketed, "customer_id").explain()
== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id], Inner
:- *(1) Sort [customer_id ASC], false, 0
:  +- *(1) FileScan parquet [customer_id,...]  // NO SHUFFLE
+- *(2) Sort [customer_id ASC], false, 0
   +- *(2) FileScan parquet [customer_id,...]  // NO SHUFFLE

The Exchange operators (shuffles) disappear. For large tables, this can reduce query time from hours to minutes.

Requirements for shuffle-free joins:

  1. Both tables bucketed on the join columns
  2. Same number of buckets (or one is a multiple of the other in Spark 3.0+)
  3. Tables read via spark.table(), not spark.read.parquet(path)
  4. spark.sql.sources.bucketing.enabled set to true (default)

Bucketing for Aggregations

Bucketing also accelerates GROUP BY operations. When data is pre-bucketed on the grouping column, each task can compute partial aggregates independently without shuffling.

// Create bucketed table for analytics
transactions
  .write
  .format("parquet")
  .bucketBy(128, "merchant_id")
  .saveAsTable("transactions_bucketed")

// Aggregation query - no shuffle needed
spark.sql("""
  SELECT 
    merchant_id,
    COUNT(*) as transaction_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount
  FROM transactions_bucketed
  GROUP BY merchant_id
""").explain()

Without bucketing, this query requires a shuffle to bring all records for each merchant_id to the same partition. With bucketing, records are already co-located by merchant_id.

Performance comparison on a 500GB transactions table:

// Benchmark setup
def timeQuery(name: String)(query: => DataFrame): Unit = {
  val start = System.currentTimeMillis()
  query.collect()
  val duration = System.currentTimeMillis() - start
  println(s"$name: ${duration}ms")
}

// Non-bucketed: ~180 seconds (shuffle-heavy)
timeQuery("Regular table") {
  spark.read.parquet("/data/transactions")
    .groupBy("merchant_id")
    .agg(sum("amount"), count("*"))
}

// Bucketed: ~45 seconds (no shuffle)
timeQuery("Bucketed table") {
  spark.table("transactions_bucketed")
    .groupBy("merchant_id")
    .agg(sum("amount"), count("*"))
}

Best Practices and Pitfalls

Choosing bucket count: Use a number that results in files between 128MB and 1GB each. Too few buckets create large files that limit parallelism; too many create small files with overhead. A common formula: total_data_size / 256MB.

Combining with partitioning: Bucketing and partitioning work together. Partition by date for time-based filtering, bucket by join keys within each partition:

orders
  .write
  .format("parquet")
  .partitionBy("order_date")
  .bucketBy(64, "customer_id")
  .saveAsTable("orders_partitioned_bucketed")

Configuration settings:

// Ensure bucketing is enabled (default: true)
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")

// Allow bucket pruning for filtered queries (Spark 3.0+)
spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", "true")

// Respect bucket sorting for merge joins
spark.conf.set("spark.sql.sources.bucketing.sortSupportEnabled", "true")

Common mistakes:

  • Mismatched bucket counts between tables you want to join
  • Using save() instead of saveAsTable()
  • Bucketing on columns with extreme skew (one bucket becomes massive)
  • Forgetting to read via spark.table() after writing

When Not to Use Bucketing

Bucketing isn’t free. It adds write-time overhead and constraints that don’t always pay off.

Skip bucketing for:

  • Small tables (under 1GB)—shuffle cost is negligible
  • Tables with frequently changing schemas—you’ll need to rewrite the entire table
  • Write-heavy workloads with infrequent reads—the write overhead won’t amortize
  • Ad-hoc analysis where join/group columns vary—you’d need multiple bucketed copies
  • Tables primarily filtered rather than joined—partitioning is more effective

The write cost is real: Bucketing requires a shuffle during write to organize data into buckets. If you’re writing a table once and reading it hundreds of times, this cost is trivial. If you’re rewriting daily and reading twice, bucketing may hurt overall performance.

Bucketing is a commitment. It works best for stable, large tables that serve as the foundation for repeated analytical queries. For everything else, let Spark handle shuffles dynamically.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.