Apache Spark - Adaptive Query Execution (AQE)

Adaptive Query Execution fundamentally changes how Spark processes queries by making optimization decisions during execution rather than solely at planning time. Traditional Spark query optimization...

Key Insights

  • Adaptive Query Execution dynamically optimizes Spark SQL queries at runtime by adjusting physical execution plans based on actual data statistics, reducing query times by 2-10x in many real-world scenarios
  • AQE’s three core optimizations—dynamically coalescing shuffle partitions, converting sort-merge joins to broadcast joins, and optimizing skewed joins—address the most common performance bottlenecks in distributed query processing
  • Enabling AQE requires minimal configuration changes but understanding its behavior is critical for tuning partition sizes, broadcast thresholds, and skew handling to match your specific workload characteristics

Understanding Adaptive Query Execution

Adaptive Query Execution fundamentally changes how Spark processes queries by making optimization decisions during execution rather than solely at planning time. Traditional Spark query optimization relies on statistics collected during analysis, but these statistics are often incomplete, outdated, or based on assumptions that don’t hold for complex transformations.

AQE introduces re-optimization points at shuffle boundaries. After each shuffle stage completes, Spark collects precise runtime statistics and can replan subsequent stages. This addresses three critical limitations of static optimization: inaccurate cardinality estimates, unknown data distributions after complex operations, and inability to adapt to actual cluster conditions.

Enable AQE with this basic configuration:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

In Spark 3.2+, AQE is enabled by default. However, understanding and tuning its sub-features remains essential.

Dynamic Partition Coalescing

The most immediate benefit of AQE comes from dynamically coalescing shuffle partitions. Without AQE, you set spark.sql.shuffle.partitions (default 200) for all shuffle operations. This creates a dilemma: too few partitions cause memory pressure and skew; too many create excessive task overhead.

AQE solves this by starting with a high partition count and combining adjacent small partitions after the shuffle completes. Here’s a practical example:

// Configure coalescing behavior
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "200")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB")

// Example query with aggregation
val df = spark.read.parquet("s3://data/events")
  .filter($"event_date" >= "2024-01-01")
  .groupBy("user_id", "event_type")
  .agg(count("*").as("event_count"))
  .write.parquet("s3://output/aggregated")

Without AQE, this query creates 200 output partitions regardless of data size. If the filtered dataset is only 2GB, you get 200 files averaging 10MB each—far below optimal. With AQE, Spark coalesces these into approximately 32 partitions (2GB / 64MB), significantly reducing file count and metadata overhead.

The key tuning parameters:

// Target size for each partition after coalescing
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

// Minimum partition size to avoid excessive coalescing
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB")

// Minimum number of partitions after coalescing (useful for parallelism)
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "10")

Converting Sort-Merge Joins to Broadcast Joins

Join strategy selection is notoriously difficult during static planning. Spark estimates table sizes and chooses broadcast hash join if one side is smaller than spark.sql.autoBroadcastJoinThreshold (default 10MB). However, after filters and projections, the actual data size often differs dramatically from estimates.

AQE re-evaluates join strategies after computing shuffle statistics:

spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "50MB")

val orders = spark.read.parquet("s3://data/orders")
  .filter($"order_date" >= "2024-01-01")
  
val products = spark.read.parquet("s3://data/products")
  .filter($"category" === "electronics")

// Static planning might choose sort-merge join
// AQE converts to broadcast if post-filter data is small
val result = orders.join(products, "product_id")
  .select("order_id", "product_name", "price")

Suppose the products table is 5GB unfiltered but only 30MB after the category filter. Static planning sees 5GB and chooses sort-merge join (expensive shuffle on both sides). AQE observes the 30MB post-filter size and converts to broadcast join, eliminating one shuffle entirely.

Monitor this behavior in the Spark UI. Look for stages marked “AQE OptimizeSkewedJoin” or “AQE CoalescePartitions” in the SQL tab. The query plan shows:

AdaptiveSparkPlan
+- == Final Plan ==
   BroadcastHashJoin [product_id]
   :- FileScan parquet [order_id, product_id]
   +- BroadcastExchange
      +- FileScan parquet [product_id, product_name]

Tune the broadcast threshold based on your executor memory:

// Conservative for 4GB executors
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB")

// Aggressive for 16GB executors  
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "500MB")

Handling Skewed Joins

Data skew—where a few keys have disproportionately large value counts—causes severe performance degradation in distributed joins. One task processes 90% of data while others finish quickly, wasting cluster resources.

AQE detects skewed partitions and splits them into multiple sub-partitions:

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

val userEvents = spark.read.parquet("s3://data/user_events")
val userProfiles = spark.read.parquet("s3://data/user_profiles")

// Some users generate 1000x more events than others
val enriched = userEvents.join(userProfiles, "user_id")
  .groupBy("user_id", "country")
  .agg(sum("event_value").as("total_value"))

Without skew handling, if user “power_user_123” generates 10GB of events while average users generate 10MB, that single partition takes 100x longer than others. AQE detects this by comparing partition sizes. If a partition exceeds both:

  • skewedPartitionFactor × median partition size
  • skewedPartitionThresholdInBytes absolute threshold

Then AQE splits it into multiple sub-partitions, each processed by separate tasks. The skewed partition on the other side of the join is replicated to all sub-partitions.

View skew optimization in action:

// Force skew for demonstration
val skewedData = Seq(
  ("normal_user", 1), ("normal_user", 2),
  ("power_user", 1), ("power_user", 2) // Replicate 10000x
).toDF("user_id", "value")

val duplicated = (1 to 10000).foldLeft(skewedData) { (df, _) =>
  df.union(skewedData.filter($"user_id" === "power_user"))
}

duplicated.join(profiles, "user_id")
  .explain() // Shows AQE skew join optimization

Optimizing Local Shuffle Reader

When a join’s output is partitioned on the same key as its input, AQE can eliminate unnecessary data shuffling by reading shuffle files locally:

spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

val data = spark.read.parquet("s3://data/transactions")
  .repartition($"account_id") // Shuffle by account_id
  
val aggregated = data
  .groupBy("account_id", "transaction_type") // Same key
  .agg(sum("amount").as("total"))

The groupBy requires data partitioned by account_id, which already exists from repartition. Without local shuffle reader, Spark performs a full shuffle. With it enabled, tasks read their required partitions directly from local shuffle files, avoiding network transfer.

This optimization is particularly valuable in multi-stage aggregations:

val result = spark.read.parquet("s3://data/sales")
  .groupBy("region", "product_category")
  .agg(sum("revenue").as("revenue"))
  .filter($"revenue" > 100000)
  .groupBy("region")
  .agg(sum("revenue").as("total_revenue"))

The second groupBy benefits from local shuffle reader since data is already partitioned by region from the first aggregation.

Production Configuration Template

Here’s a battle-tested configuration for production workloads:

// Core AQE settings
spark.conf.set("spark.sql.adaptive.enabled", "true")

// Partition coalescing - tune based on data size
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1MB")

// Broadcast join conversion - tune based on executor memory
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB")

// Skew handling - adjust thresholds for your data distribution
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

// Local shuffle optimization
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

// Initial partition count - start high, let AQE coalesce
spark.conf.set("spark.sql.shuffle.partitions", "400")

Monitor AQE effectiveness through the Spark UI’s SQL tab. Look for “AdaptiveSparkPlan” nodes in query plans and compare execution times before and after enabling AQE. For most analytical workloads, expect 2-5x improvements on queries with joins and aggregations, with occasional 10x gains on severely skewed datasets.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.