Apache Spark - Catalyst Optimizer Explained

Catalyst is Spark's query optimizer that transforms SQL queries and DataFrame operations into optimized execution plans. The optimizer operates on abstract syntax trees (ASTs) representing query...

Key Insights

  • Catalyst uses a tree-based representation of query plans and applies rule-based and cost-based optimizations through multiple phases: analysis, logical optimization, physical planning, and code generation
  • The optimizer leverages Scala’s pattern matching and quasiquotes to transform expression trees, enabling extensible optimization rules that developers can customize for domain-specific requirements
  • Understanding Catalyst’s optimization pipeline helps data engineers write more efficient Spark SQL queries and debug performance issues by examining query plans with explain() methods

The Catalyst Architecture

Catalyst is Spark’s query optimizer that transforms SQL queries and DataFrame operations into optimized execution plans. The optimizer operates on abstract syntax trees (ASTs) representing query logic, applying transformations through four distinct phases.

The architecture separates logical query representation from physical execution. This separation allows Catalyst to explore multiple execution strategies and select optimal plans based on data statistics and cluster characteristics.

// Example query that will go through Catalyst optimization
val df = spark.read.parquet("users.parquet")
  .filter($"age" > 25)
  .select($"name", $"country")
  .groupBy($"country")
  .count()

// View the optimized physical plan
df.explain(true)

Phase 1: Analysis

During analysis, Catalyst resolves references and validates the query against the catalog. Unresolved logical plans contain column names and table identifiers that must be mapped to actual schema definitions.

The analyzer uses rules to resolve attributes, functions, and relations. It consults the catalog to verify table existence, resolve column references, and infer data types.

// Unresolved logical plan (internal representation)
// Filter (age > 25)
//   UnresolvedRelation [users]

// After analysis - resolved logical plan
// Filter (age#123 > 25)
//   Relation[id#122, name#124, age#123, country#125] parquet

The analysis phase catches semantic errors like referencing non-existent columns or type mismatches:

// This will fail during analysis
spark.sql("SELECT invalid_column FROM users WHERE age > 25")
// Error: cannot resolve 'invalid_column' given input columns: [id, name, age, country]

Phase 2: Logical Optimization

Logical optimization applies rule-based transformations to produce an equivalent but more efficient logical plan. These rules operate independently of physical execution details.

Key optimization rules include:

Predicate Pushdown: Moves filter operations closer to data sources to reduce data volume early.

// Original query
val result = spark.read.parquet("orders.parquet")
  .join(spark.read.parquet("customers.parquet"), "customer_id")
  .filter($"order_date" > "2024-01-01")

// Catalyst pushes predicates down
// Optimized: filters are applied before the join
// Filter on orders happens at the scan level
// Reduces data shuffled during join

Constant Folding: Evaluates expressions with constant values at compile time.

// Before optimization
df.filter($"price" * 2 > 100)

// After constant folding
df.filter($"price" > 50)

Column Pruning: Eliminates unused columns from scans to reduce I/O.

// Query only needs 'name' and 'age'
val result = spark.read.parquet("users.parquet")
  .select($"name", $"age", $"address", $"phone")
  .filter($"age" > 25)
  .select($"name", $"age")

// Catalyst prunes 'address' and 'phone' from the scan
// Only reads necessary columns from Parquet

Boolean Expression Simplification: Simplifies complex boolean logic.

// Before
df.filter(($"status" === "active" && true) || ($"status" === "active" && false))

// After simplification
df.filter($"status" === "active")

Phase 3: Physical Planning

Physical planning converts the optimized logical plan into one or more physical plans, then selects the best one using cost-based optimization (CBO).

Physical plans specify concrete execution strategies: broadcast joins vs. shuffle hash joins, sort-merge joins, etc. The choice depends on table sizes, data distribution, and available cluster resources.

// Enable cost-based optimization
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")

// Analyze table statistics for better planning
spark.sql("ANALYZE TABLE users COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, order_date")

val query = spark.sql("""
  SELECT u.name, COUNT(o.order_id) as order_count
  FROM users u
  JOIN orders o ON u.id = o.customer_id
  WHERE o.order_date > '2024-01-01'
  GROUP BY u.name
""")

// View physical plan selection
query.explain("cost")

For joins, Catalyst evaluates multiple strategies:

// Small table (< spark.sql.autoBroadcastJoinThreshold)
// Catalyst chooses BroadcastHashJoin
val smallTable = spark.read.parquet("countries.parquet") // 100 rows
val largeTable = spark.read.parquet("users.parquet")     // 10M rows

val result = largeTable.join(smallTable, "country_code")
// Physical plan: BroadcastHashJoin - broadcasts 'countries' to all executors

// Both tables large - uses SortMergeJoin
val orders = spark.read.parquet("orders.parquet")    // 50M rows
val shipments = spark.read.parquet("shipments.parquet") // 45M rows

val result2 = orders.join(shipments, "order_id")
// Physical plan: SortMergeJoin - shuffles and sorts both sides

Phase 4: Code Generation

Catalyst uses Scala quasiquotes to generate optimized Java bytecode for expression evaluation. This whole-stage code generation eliminates virtual function calls and improves CPU cache efficiency.

// Expression tree for: (age * 2) + 10
// Add(Multiply(AttributeReference("age"), Literal(2)), Literal(10))

// Generated code (simplified)
/* 
public Object generate(Object[] references) {
  return new GeneratedIterator(references);
}

class GeneratedIterator extends Iterator {
  public boolean hasNext() { ... }
  
  public InternalRow next() {
    int age = input.getInt(2);
    int result = (age * 2) + 10;
    return row;
  }
}
*/

Whole-stage code generation combines multiple operators into a single function:

val df = spark.range(10000000)
  .filter($"id" % 2 === 0)
  .filter($"id" < 5000000)
  .selectExpr("id * 2 as doubled")

// Without codegen: separate function calls for each operation
// With codegen: fused into single tight loop

Custom Optimization Rules

Catalyst’s extensibility allows adding custom optimization rules for domain-specific logic.

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.expressions._

object CustomFilterRule extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Filter(condition, child) =>
      // Custom logic: replace expensive UDFs with native functions
      val optimizedCondition = condition transform {
        case expr if isExpensiveUDF(expr) => 
          convertToNativeExpression(expr)
      }
      Filter(optimizedCondition, child)
  }
  
  private def isExpensiveUDF(expr: Expression): Boolean = {
    // Implementation details
    false
  }
  
  private def convertToNativeExpression(expr: Expression): Expression = {
    // Implementation details
    expr
  }
}

// Register custom rule
spark.experimental.extraOptimizations = Seq(CustomFilterRule)

Debugging Query Plans

Understanding query plans is essential for performance tuning. Spark provides several methods to inspect optimization stages.

val query = spark.sql("""
  SELECT customer_id, SUM(amount) as total
  FROM orders
  WHERE order_date >= '2024-01-01'
  GROUP BY customer_id
  HAVING SUM(amount) > 1000
""")

// Show all optimization stages
query.explain("extended")

// Show formatted physical plan
query.explain("formatted")

// Show cost-based optimization details
query.explain("cost")

// Programmatic access to query plans
val logicalPlan = query.queryExecution.logical
val analyzedPlan = query.queryExecution.analyzed
val optimizedPlan = query.queryExecution.optimizedPlan
val physicalPlan = query.queryExecution.sparkPlan

Performance Implications

Catalyst’s effectiveness depends on proper configuration and data statistics. Missing statistics force the optimizer to make suboptimal decisions.

// Collect statistics for better optimization
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS")

// Configure optimizer behavior
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

// Monitor optimization effectiveness
spark.conf.set("spark.sql.planChangeLog.level", "WARN")

Catalyst represents a sophisticated approach to query optimization that balances compile-time transformations with runtime adaptability. By understanding its internals, engineers can write more efficient queries and diagnose performance bottlenecks systematically.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.