Apache Spark - Predicate Pushdown

Predicate pushdown is one of Spark's most impactful performance optimizations, yet many developers don't fully understand when it works and when it silently fails. The concept is straightforward:...

Key Insights

  • Predicate pushdown moves filter conditions to the data source level, dramatically reducing I/O by reading only relevant data rather than loading everything into memory first
  • Not all data sources support pushdown equally—Parquet and ORC excel at it, JDBC has partial support, while CSV and JSON offer none, making format choice a critical performance decision
  • UDFs and complex expressions silently break pushdown optimization; always verify with .explain() to ensure your filters are actually being pushed down

Introduction to Predicate Pushdown

Predicate pushdown is one of Spark’s most impactful performance optimizations, yet many developers don’t fully understand when it works and when it silently fails. The concept is straightforward: instead of reading all data into Spark and then filtering, push the filter conditions down to the data source so irrelevant data never gets read in the first place.

Consider reading a 100GB Parquet file where you only need rows from the last week. Without pushdown, Spark reads all 100GB, deserializes it, and then applies your filter. With pushdown, Spark tells the Parquet reader to skip row groups that don’t match your date criteria. You might read only 5GB instead of 100.

This optimization matters because I/O is typically the bottleneck in data processing. Reducing data read at the source compounds throughout your pipeline—less network transfer, less memory pressure, less CPU spent on deserialization and filtering.

How Predicate Pushdown Works in Spark

Spark’s Catalyst optimizer analyzes your query plan and identifies filter predicates that can be pushed to the data source. During logical plan optimization, Catalyst applies the PushDownPredicate rule, which moves filters as close to the scan operation as possible.

Here’s how to observe this in action:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("PredicatePushdown").getOrCreate()

# Create sample data and write as Parquet
df = spark.range(1000000).withColumn("category", (col("id") % 10).cast("string"))
df.write.mode("overwrite").parquet("/tmp/sample_data")

# Read and filter
filtered_df = spark.read.parquet("/tmp/sample_data").filter(col("id") > 999000)

# Examine the physical plan
filtered_df.explain(True)

The output reveals the optimization:

== Physical Plan ==
*(1) Filter (id#0L > 999000)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#0L,category#1] Batched: true, DataFilters: [(id#0L > 999000)], 
      Format: Parquet, PushedFilters: [IsNotNull(id), GreaterThan(id,999000)], ...

Notice PushedFilters: [IsNotNull(id), GreaterThan(id,999000)]. This confirms the Parquet reader will use these predicates to skip row groups where the maximum id value is less than 999000.

Supported Data Sources

Data source support for predicate pushdown varies significantly:

Parquet and ORC: Full support. These columnar formats store min/max statistics per row group, enabling the reader to skip entire chunks of data. They also support column pruning, reading only the columns you select.

Delta Lake: Inherits Parquet’s capabilities plus adds data skipping via file-level statistics and Z-ordering for multi-dimensional clustering.

JDBC: Partial support. Spark translates filters to SQL WHERE clauses, but complex expressions may not translate correctly. The database then handles the filtering.

CSV and JSON: No pushdown support. These formats lack the metadata structure needed for selective reading. Every byte gets read regardless of your filters.

Here’s a concrete comparison:

# Write the same data in both formats
spark.range(1000000).write.mode("overwrite").parquet("/tmp/data_parquet")
spark.range(1000000).write.mode("overwrite").csv("/tmp/data_csv")

# Parquet query plan
print("=== Parquet Plan ===")
spark.read.parquet("/tmp/data_parquet").filter("id > 999000").explain()

# CSV query plan  
print("=== CSV Plan ===")
spark.read.csv("/tmp/data_csv").toDF("id").filter("id > 999000").explain()

The Parquet plan shows PushedFilters, while the CSV plan shows the filter applied after a full table scan. On large datasets, this difference translates to orders of magnitude performance gap.

Partition Pruning vs. Predicate Pushdown

These optimizations work at different levels and complement each other:

Partition pruning eliminates entire files or directories based on partition column values. It happens at the file system level before any data is read.

Predicate pushdown filters within files using column statistics and row group metadata. It reduces data read from files that partition pruning didn’t eliminate.

# Create partitioned data
spark.range(1000000) \
    .withColumn("year", (col("id") % 3 + 2022).cast("int")) \
    .withColumn("month", (col("id") % 12 + 1).cast("int")) \
    .write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("/tmp/partitioned_data")

# Query with both partition filter and value filter
query = spark.read.parquet("/tmp/partitioned_data") \
    .filter((col("year") == 2023) & (col("month") == 6) & (col("id") > 500000))

query.explain(True)

The plan shows:

PartitionFilters: [isnotnull(year#10), isnotnull(month#11), (year#10 = 2023), (month#11 = 6)]
PushedFilters: [IsNotNull(id), GreaterThan(id,500000)]

Spark first prunes partitions (skipping all directories except year=2023/month=6), then applies predicate pushdown within those remaining files. The combination is powerful—partition pruning might reduce 36 directories to 1, then pushdown might skip 80% of the row groups in that directory.

Common Pitfalls and Anti-Patterns

Several patterns silently disable predicate pushdown:

UDFs in filters: User-defined functions are opaque to Catalyst. It cannot push them down because the data source doesn’t understand Python or Scala code.

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# This UDF breaks pushdown
@udf(BooleanType())
def is_valid_id(id_val):
    return id_val is not None and id_val > 999000

# BAD: No pushdown possible
broken_query = spark.read.parquet("/tmp/sample_data").filter(is_valid_id(col("id")))
broken_query.explain()  # No PushedFilters

# GOOD: Use native Spark expressions
working_query = spark.read.parquet("/tmp/sample_data").filter(
    col("id").isNotNull() & (col("id") > 999000)
)
working_query.explain()  # PushedFilters present

Column transformations in predicates: Applying functions to columns before comparison prevents pushdown.

from pyspark.sql.functions import lower, lit

# BAD: Transformation on column side
spark.read.parquet("/tmp/sample_data") \
    .filter(lower(col("category")) == "5") \
    .explain()  # No pushdown for this predicate

# GOOD: Transform the literal instead (when possible)
spark.read.parquet("/tmp/sample_data") \
    .filter(col("category") == "5") \
    .explain()  # Pushdown works

Type mismatches: Comparing columns to incompatible types can prevent pushdown or cause unexpected behavior.

# If 'id' is Long but you compare to a string
spark.read.parquet("/tmp/sample_data") \
    .filter(col("id") == "999000") \
    .explain()  # May not push down optimally

Verifying and Debugging Pushdown

Always verify pushdown is working. Don’t assume.

Use explain(True) for detailed plans:

df = spark.read.parquet("/tmp/sample_data") \
    .filter((col("id") > 100) & (col("id") < 1000) & (col("category") == "5"))

df.explain(True)

Look for these indicators in the physical plan:

  • PushedFilters: Lists predicates sent to the data source
  • PartitionFilters: Lists predicates used for partition pruning
  • DataFilters: Lists filters applied after reading (not pushed down)

Check Spark UI metrics: The SQL tab shows “number of files read” and “size of files read.” Compare these against total dataset size to gauge pushdown effectiveness.

Use DataFrame metrics programmatically:

# Enable metrics collection
spark.conf.set("spark.sql.parquet.recordLevelFilter.enabled", "true")

# After running a query, check the metrics
df.collect()  # Execute the query

# Access metrics via SparkListener or Spark UI

Best Practices and Performance Tips

Filter early and often: Place filters as early as possible in your transformation chain. While Catalyst reorders operations, explicit early filtering makes your intent clear and ensures optimization.

# Preferred pattern
spark.read.parquet("/tmp/data") \
    .filter(col("date") >= "2024-01-01") \
    .select("id", "value", "date") \
    .groupBy("id").sum("value")

Use native Spark functions: Avoid UDFs for filter conditions. Most common operations have built-in equivalents that support pushdown.

Choose columnar formats: Default to Parquet or ORC for analytical workloads. The performance difference versus CSV/JSON is substantial and grows with data size.

Design partition schemes thoughtfully: Partition by columns you frequently filter on. Common choices include date, region, or category. Avoid over-partitioning (too many small files) or under-partitioning (no pruning benefit).

Maintain statistics: For Delta Lake, run OPTIMIZE and consider Z-ordering on frequently filtered columns. For plain Parquet, ensure you’re writing with statistics enabled (the default).

Test with production-scale data: Pushdown benefits scale with data size. A query that runs fine on sample data might timeout on production volumes if pushdown isn’t working.

Predicate pushdown is a foundational optimization that separates performant Spark applications from slow ones. Understanding when it works—and more importantly, when it doesn’t—gives you the knowledge to write queries that scale efficiently with your data growth.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.