How to Read Parquet Files in PySpark
Parquet has become the de facto standard for storing analytical data in big data ecosystems, and for good reason. Its columnar storage format means you only read the columns you need. Built-in...
Key Insights
- Parquet’s columnar format combined with PySpark’s predicate pushdown and column pruning can reduce I/O by 90%+ when you only need a subset of your data
- Schema evolution is a common pain point—use
mergeSchemacarefully and understand when explicit schemas outperform inference - Partition pruning is the single most impactful optimization for large datasets, but only works when you filter on partition columns before any shuffles
Introduction
Parquet has become the de facto standard for storing analytical data in big data ecosystems, and for good reason. Its columnar storage format means you only read the columns you need. Built-in compression reduces storage costs and I/O. Schema preservation eliminates the CSV parsing nightmares that plague data teams. When you’re working with PySpark, Parquet isn’t just a good choice—it’s usually the right choice.
This article covers everything you need to read Parquet files effectively in PySpark, from basic operations to performance optimizations that can dramatically speed up your jobs. I’ll skip the theory where possible and focus on practical patterns you can use immediately.
Setting Up Your PySpark Environment
Before reading any Parquet files, you need a properly configured SparkSession. The defaults work fine for local development, but production workloads benefit from explicit configuration.
from pyspark.sql import SparkSession
# Basic session for local development
spark = SparkSession.builder \
.appName("ParquetReader") \
.getOrCreate()
# Production-oriented configuration
spark = SparkSession.builder \
.appName("ParquetReader") \
.config("spark.sql.parquet.compression.codec", "snappy") \
.config("spark.sql.parquet.mergeSchema", "false") \
.config("spark.sql.parquet.filterPushdown", "true") \
.config("spark.sql.files.maxPartitionBytes", "128m") \
.getOrCreate()
A few notes on these configurations: mergeSchema defaults to false for performance reasons—enable it only when you need it. Filter pushdown is enabled by default in modern Spark versions, but being explicit doesn’t hurt. The maxPartitionBytes setting controls how Spark splits files into partitions; 128MB is a reasonable starting point.
Basic Parquet Reading Operations
Reading Parquet files is straightforward. The spark.read.parquet() method handles single files, directories, and glob patterns.
# Read a single Parquet file
df = spark.read.parquet("/data/users.parquet")
# Read all Parquet files in a directory
df = spark.read.parquet("/data/events/")
# Read from multiple paths
df = spark.read.parquet(
"/data/events/2024-01/",
"/data/events/2024-02/",
"/data/events/2024-03/"
)
# Use glob patterns for flexible path matching
df = spark.read.parquet("/data/events/2024-*/day=0[1-7]/*")
The DataFrame API also supports the format() method, which is useful when you’re building dynamic readers:
df = spark.read \
.format("parquet") \
.load("/data/events/")
# Equivalent to spark.read.parquet(), but more flexible for parameterization
file_format = "parquet" # Could come from config
df = spark.read.format(file_format).load("/data/events/")
One thing to remember: these operations are lazy. Spark doesn’t actually read the data until you call an action like show(), count(), or collect(). This laziness is what enables Spark’s query optimization.
Schema Handling and Options
Parquet files embed their schema, so Spark can infer it automatically. But automatic isn’t always best.
# Let Spark infer the schema (reads file metadata)
df = spark.read.parquet("/data/events/")
df.printSchema()
For production pipelines, I recommend defining schemas explicitly. This catches data quality issues early and avoids surprises when upstream schemas change.
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
event_schema = StructType([
StructField("event_id", StringType(), nullable=False),
StructField("user_id", LongType(), nullable=False),
StructField("event_type", StringType(), nullable=True),
StructField("timestamp", TimestampType(), nullable=False),
StructField("properties", StringType(), nullable=True)
])
df = spark.read \
.schema(event_schema) \
.parquet("/data/events/")
Schema evolution is where things get interesting. When Parquet files have different schemas—say, a new column was added last month—you have options:
# Enable schema merging to combine schemas from all files
df = spark.read \
.option("mergeSchema", "true") \
.parquet("/data/events/")
# Or set it globally (affects all reads in this session)
spark.conf.set("spark.sql.parquet.mergeSchema", "true")
Be cautious with mergeSchema. It requires reading metadata from all files, which can be slow with thousands of files. If you know your schema, provide it explicitly instead.
Reading Partitioned Parquet Data
Partitioned datasets are organized into directories like /data/events/year=2024/month=01/day=15/. Spark recognizes this Hive-style partitioning automatically and exposes partition columns in your DataFrame.
# Spark automatically discovers partition columns
df = spark.read.parquet("/data/events/")
# Partition columns (year, month, day) are available as regular columns
df.select("event_id", "year", "month", "day").show()
The real power comes from partition pruning. When you filter on partition columns, Spark skips entire directories:
# This only reads files from January 2024
df = spark.read.parquet("/data/events/") \
.filter("year = 2024 AND month = 1")
# Equivalent using Column expressions
from pyspark.sql.functions import col
df = spark.read.parquet("/data/events/") \
.filter((col("year") == 2024) & (col("month") == 1))
Check your query plan to verify partition pruning is working:
df.explain(True)
# Look for "PartitionFilters" in the output
A common mistake: applying transformations before filtering defeats partition pruning. Always filter on partition columns as early as possible in your pipeline.
Performance Optimization Tips
Reading Parquet efficiently requires understanding what Spark can optimize automatically and what needs explicit configuration.
Column Pruning: Only select the columns you need. Parquet’s columnar format means unselected columns aren’t read from disk.
# Bad: reads all columns, then selects
df = spark.read.parquet("/data/events/")
result = df.select("event_id", "user_id", "timestamp")
# Better: same result, but Spark optimizes this
# The physical plan will only read three columns
df = spark.read.parquet("/data/events/") \
.select("event_id", "user_id", "timestamp")
Both examples produce the same optimized plan because Spark’s Catalyst optimizer pushes the projection down. But being explicit about your intent makes code clearer.
Predicate Pushdown: Filters on non-partition columns can still be pushed down to the Parquet reader level, skipping row groups that don’t match.
# Filter pushdown reduces data scanned
df = spark.read.parquet("/data/events/") \
.filter(col("event_type") == "purchase") \
.filter(col("timestamp") >= "2024-01-01")
Parallelism Tuning: Control how Spark splits files into tasks.
# Increase partition size for fewer, larger tasks (less overhead)
spark.conf.set("spark.sql.files.maxPartitionBytes", "256m")
# Decrease for more parallelism on small files
spark.conf.set("spark.sql.files.maxPartitionBytes", "64m")
# Handle many small files by combining them
spark.conf.set("spark.sql.files.openCostInBytes", "4m")
For datasets with many small files, consider coalescing after reading:
df = spark.read.parquet("/data/many_small_files/") \
.coalesce(100) # Reduce to 100 partitions
Common Issues and Troubleshooting
Corrupt Files: Production data gets corrupted. Handle it gracefully.
# Skip corrupt files instead of failing the job
df = spark.read \
.option("ignoreCorruptFiles", "true") \
.parquet("/data/events/")
# For corrupt records within files, use PERMISSIVE mode
df = spark.read \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.parquet("/data/events/")
Missing Columns: When your schema expects columns that don’t exist in some files:
from pyspark.sql.functions import lit
# Read with schema, missing columns become null
df = spark.read \
.schema(event_schema) \
.parquet("/data/events/")
# Or add missing columns manually
df = spark.read.parquet("/data/events/")
if "new_column" not in df.columns:
df = df.withColumn("new_column", lit(None).cast(StringType()))
Type Mismatches: When the same column has different types across files:
# Cast columns explicitly after reading
df = spark.read \
.option("mergeSchema", "true") \
.parquet("/data/events/") \
.withColumn("user_id", col("user_id").cast(LongType()))
Memory Issues: Large Parquet files can cause executor OOMs.
# Increase executor memory
spark.conf.set("spark.executor.memory", "8g")
# Or limit columns and add filters before any actions
df = spark.read.parquet("/data/huge_dataset/") \
.select("id", "value") \
.filter(col("date") == "2024-01-15") \
.limit(1000000)
The key principle: push filters and projections as early as possible. Spark optimizes well, but helping it helps you.