Apache Spark - Explain Plan (explain()) for Query Analysis

Every Spark query goes through a multi-stage compilation process before execution. Understanding this process separates developers who write functional code from those who write performant code. When...

Key Insights

  • The explain() method reveals Spark’s query execution strategy before running expensive operations, helping you catch performance issues during development rather than in production
  • Physical plan operators like Exchange (shuffle), BroadcastHashJoin, and SortMergeJoin directly indicate where your query will spend time and resources
  • Predicate pushdown and partition pruning shown in explain output are your best indicators that Spark is optimizing data access effectively

Introduction to Query Execution Plans

Every Spark query goes through a multi-stage compilation process before execution. Understanding this process separates developers who write functional code from those who write performant code. When your job takes 4 hours instead of 20 minutes, the explain plan is where you start debugging.

Spark transforms your DataFrame operations into a directed acyclic graph (DAG) of execution stages. This happens in two major phases: logical planning and physical planning. The logical plan represents what you want to compute—the abstract operations. The physical plan represents how Spark will actually compute it—specific algorithms, data distribution strategies, and execution order.

You need to analyze these plans when queries run slower than expected, when you’re optimizing critical pipelines, or when you want to verify that Spark’s optimizer is making intelligent decisions about your data.

The explain() Method Basics

The explain() method prints the execution plan without running the query. This is crucial—you can analyze expensive operations without waiting for them to complete.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg

spark = SparkSession.builder.appName("ExplainDemo").getOrCreate()

# Sample DataFrame
df = spark.read.parquet("/data/sales")

# Basic explain - shows only physical plan
df.filter(col("amount") > 100).groupBy("region").agg(sum("amount")).explain()

# Extended explain - shows logical and physical plans
df.filter(col("amount") > 100).groupBy("region").agg(sum("amount")).explain(True)

# Formatted explain - structured output with plan details
df.filter(col("amount") > 100).groupBy("region").agg(sum("amount")).explain("formatted")

# Cost-based explain - includes statistics when available
df.filter(col("amount") > 100).groupBy("region").agg(sum("amount")).explain("cost")

# Codegen explain - shows generated Java code
df.filter(col("amount") > 100).groupBy("region").agg(sum("amount")).explain("codegen")

Each mode serves a different purpose:

  • Default/simple: Quick check of physical execution strategy
  • Extended: Full pipeline from parsed query to physical plan
  • Formatted: Human-readable sections with operator details
  • Cost: Statistics-based estimates (requires table statistics)
  • Codegen: Generated bytecode for debugging whole-stage code generation

For daily development, explain("formatted") gives the best balance of detail and readability.

Reading Logical Plans

The logical plan shows your query’s intent through abstract operators. When you call explain(True) or explain("extended"), you see three logical plan stages: Parsed, Analyzed, and Optimized.

# Complex transformation chain
orders = spark.read.parquet("/data/orders")
customers = spark.read.parquet("/data/customers")

result = (orders
    .filter(col("order_date") >= "2024-01-01")
    .filter(col("status") == "completed")
    .join(customers, "customer_id")
    .select("customer_id", "customer_name", "order_total", "region")
    .filter(col("region") == "US")
    .groupBy("customer_name")
    .agg(sum("order_total").alias("total_spent")))

result.explain("extended")

The output shows how Spark optimizes your code:

== Parsed Logical Plan ==
'Aggregate ['customer_name], ['customer_name, sum('order_total) AS total_spent]
+- 'Filter ('region = US)
   +- 'Project [customer_id, customer_name, order_total, region]
      +- 'Join Inner, (customer_id = customer_id)
         :- 'Filter (('status = completed) AND ('order_date >= 2024-01-01))
         :  +- 'Relation [orders]
         +- 'Relation [customers]

== Optimized Logical Plan ==
Aggregate [customer_name], [customer_name, sum(order_total) AS total_spent]
+- Project [customer_name, order_total]
   +- Join Inner, (customer_id = customer_id)
      :- Filter ((status = completed) AND (order_date >= 2024-01-01))
      :  +- Relation [orders] parquet
      +- Filter (region = US)
         +- Project [customer_id, customer_name, region]
            +- Relation [customers] parquet

Notice what the optimizer did: it pushed the region = "US" filter down to the customers table scan and removed unnecessary columns early through column pruning. The two separate filters on orders were combined. These optimizations happen automatically through Catalyst, Spark’s query optimizer.

Common logical operators to recognize:

  • Project: Column selection (SELECT)
  • Filter: Row filtering (WHERE)
  • Aggregate: Grouping operations (GROUP BY)
  • Join: Table combinations with various strategies

Understanding Physical Plans

The physical plan shows actual execution algorithms. This is where performance implications become concrete.

# Small dimension table - should trigger broadcast
small_regions = spark.read.parquet("/data/regions")  # ~10MB
large_sales = spark.read.parquet("/data/sales")      # ~100GB

# Join with small table
broadcast_join = large_sales.join(small_regions, "region_id")
broadcast_join.explain()

# Force sort-merge join for comparison
from pyspark.sql.functions import broadcast

sort_merge_join = large_sales.join(
    small_regions.hint("shuffle_hash"), 
    "region_id"
)
sort_merge_join.explain()

Broadcast join output:

== Physical Plan ==
*(2) Project [region_id, sale_amount, region_name]
+- *(2) BroadcastHashJoin [region_id], [region_id], Inner, BuildRight
   :- *(2) FileScan parquet [region_id, sale_amount]
   +- BroadcastExchange HashedRelationBroadcastMode
      +- *(1) FileScan parquet [region_id, region_name]

Sort-merge join output:

== Physical Plan ==
*(3) Project [region_id, sale_amount, region_name]
+- *(3) SortMergeJoin [region_id], [region_id], Inner
   :- *(1) Sort [region_id ASC], false, 0
   :  +- Exchange hashpartitioning(region_id, 200)
   :     +- *(1) FileScan parquet [region_id, sale_amount]
   +- *(2) Sort [region_id ASC], false, 0
      +- Exchange hashpartitioning(region_id, 200)
         +- *(2) FileScan parquet [region_id, region_name]

The Exchange operator means shuffle—data movement across the network. BroadcastHashJoin avoids shuffling the large table by sending the small table to all executors. SortMergeJoin requires shuffling both tables, which is dramatically slower for skewed data sizes.

Key physical operators:

  • FileScan: Reading from storage (check for partition pruning)
  • Exchange: Shuffle operation (expensive network I/O)
  • HashAggregate: Hash-based grouping (memory-intensive)
  • Sort: Sorting for merge operations
  • BroadcastExchange: Sending small data to all nodes

Identifying Performance Issues

Red flags in explain output indicate optimization opportunities:

# Problematic query
sales = spark.read.parquet("/data/sales")  # Partitioned by date

# Bad: Filter doesn't use partition column
bad_query = (sales
    .filter(col("product_category") == "electronics")
    .groupBy("store_id")
    .agg(sum("amount")))

bad_query.explain("formatted")

Output showing full scan:

== Physical Plan ==
HashAggregate(keys=[store_id], functions=[sum(amount)])
+- Exchange hashpartitioning(store_id, 200)
   +- HashAggregate(keys=[store_id], functions=[partial_sum(amount)])
      +- FileScan parquet [store_id, amount, product_category]
         PartitionFilters: []
         PushedFilters: [IsNotNull(product_category), EqualTo(product_category,electronics)]

Notice PartitionFilters: [] is empty—Spark scans all partitions. Now the optimized version:

# Good: Include partition column in filter
good_query = (sales
    .filter(col("date") >= "2024-01-01")
    .filter(col("date") < "2024-02-01")
    .filter(col("product_category") == "electronics")
    .groupBy("store_id")
    .agg(sum("amount")))

good_query.explain("formatted")

Output with partition pruning:

== Physical Plan ==
HashAggregate(keys=[store_id], functions=[sum(amount)])
+- Exchange hashpartitioning(store_id, 200)
   +- HashAggregate(keys=[store_id], functions=[partial_sum(amount)])
      +- FileScan parquet [store_id, amount, product_category]
         PartitionFilters: [date >= 2024-01-01, date < 2024-02-01]
         PushedFilters: [IsNotNull(product_category), EqualTo(product_category,electronics)]

The PartitionFilters now shows date constraints, meaning Spark reads only relevant partitions.

Practical Debugging Workflow

When a query runs slowly, follow this systematic approach:

# Slow aggregation query to debug
transactions = spark.read.parquet("/data/transactions")
merchants = spark.read.parquet("/data/merchants")

slow_query = (transactions
    .join(merchants, "merchant_id")
    .groupBy("merchant_category", "transaction_date")
    .agg(
        sum("amount").alias("total"),
        avg("amount").alias("average"),
        count("*").alias("count")
    )
    .orderBy(col("total").desc()))

# Step 1: Check the plan
slow_query.explain("formatted")

# Step 2: Identify issues
# - Is the join strategy appropriate?
# - Are there unnecessary shuffles?
# - Is partition pruning happening?

# Step 3: Apply fixes based on findings
from pyspark.sql.functions import broadcast

optimized_query = (transactions
    .filter(col("transaction_date") >= "2024-01-01")  # Add partition filter
    .join(broadcast(merchants), "merchant_id")        # Broadcast small table
    .groupBy("merchant_category", "transaction_date")
    .agg(
        sum("amount").alias("total"),
        avg("amount").alias("average"),
        count("*").alias("count")
    )
    .orderBy(col("total").desc()))

# Step 4: Verify improvement
optimized_query.explain("formatted")

After reviewing the explain output, cross-reference with Spark UI. The SQL tab shows the same plan visually with runtime statistics, revealing actual versus estimated row counts and identifying skewed stages.

Summary and Best Practices

Use explain() proactively during development, not just when debugging production issues. Make it part of your code review process for data pipelines.

Quick reference for explain modes:

  • explain(): Fast physical plan check
  • explain("formatted"): Development and code review
  • explain("extended"): Understanding optimizer behavior
  • explain("cost"): Validating statistics accuracy

Operators to watch:

  • Exchange: Every shuffle costs network I/O
  • BroadcastHashJoin: Preferred for small-large table joins
  • SortMergeJoin: Check if broadcast is possible instead
  • FileScan with empty PartitionFilters: Missing partition pruning

Optimization checklist:

  1. Verify partition filters appear in FileScan
  2. Confirm small tables use broadcast joins
  3. Count Exchange operators and minimize shuffles
  4. Check that column pruning removes unused fields early

The explain plan is your window into Spark’s decision-making. Learn to read it fluently, and you’ll catch performance problems before they become production incidents.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.