PySpark - Read Parquet File into DataFrame
Reading Parquet files in PySpark starts with initializing a SparkSession and using the DataFrame reader API. The simplest approach loads the entire file into memory as a distributed DataFrame.
Key Insights
- PySpark reads Parquet files efficiently through the
spark.read.parquet()method, supporting both single files and entire directories with automatic schema inference and predicate pushdown optimization - Parquet’s columnar storage format reduces I/O operations by 10-50x compared to row-based formats when selecting specific columns, making it ideal for analytical workloads on large datasets
- Advanced reading techniques include partition pruning, schema evolution handling, and merge schemas for datasets with changing structures across multiple files
Basic Parquet File Reading
Reading Parquet files in PySpark starts with initializing a SparkSession and using the DataFrame reader API. The simplest approach loads the entire file into memory as a distributed DataFrame.
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder \
.appName("ParquetReader") \
.getOrCreate()
# Read single Parquet file
df = spark.read.parquet("data/users.parquet")
# Display schema and data
df.printSchema()
df.show(10)
For reading multiple Parquet files from a directory:
# Read all Parquet files in directory
df = spark.read.parquet("data/users/")
# Read specific files using wildcards
df = spark.read.parquet("data/users/part-*.parquet")
# Read from multiple paths
df = spark.read.parquet("data/2023/", "data/2024/")
The DataFrame API automatically handles distributed reading across cluster nodes, partitioning the data for parallel processing.
Schema Handling and Inference
PySpark automatically infers schema from Parquet metadata, but you can also specify or merge schemas when dealing with evolving datasets.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# Automatic schema inference (default)
df = spark.read.parquet("data/events.parquet")
# Define explicit schema
schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", TimestampType(), True),
StructField("value", IntegerType(), True)
])
df = spark.read.schema(schema).parquet("data/events.parquet")
# Merge schemas from multiple files with different structures
df = spark.read \
.option("mergeSchema", "true") \
.parquet("data/events/2023/", "data/events/2024/")
Schema merging is critical when columns are added over time. Without mergeSchema, PySpark uses the schema from the first file encountered, potentially causing data loss.
Optimizing Read Performance with Options
PySpark provides several options to optimize Parquet reading for specific use cases.
# Configure read options for performance
df = spark.read \
.option("compression", "snappy") \
.option("mergeSchema", "false") \
.option("pathGlobFilter", "*.parquet") \
.option("recursiveFileLookup", "true") \
.parquet("data/nested_structure/")
# Disable timestamp conversion for performance
df = spark.read \
.option("datetimeRebaseMode", "LEGACY") \
.parquet("data/legacy_data.parquet")
# Read with modified file lookup
df = spark.read \
.option("modifiedBefore", "2024-01-01T00:00:00") \
.option("modifiedAfter", "2023-01-01T00:00:00") \
.parquet("data/time_series/")
Common options:
mergeSchema: Combines schemas from different files (expensive operation)pathGlobFilter: Filters files by pattern before readingrecursiveFileLookup: Searches subdirectories recursivelydatetimeRebaseMode: Handles timestamp compatibility between different Parquet versions
Partition Pruning and Predicate Pushdown
Parquet’s columnar format enables powerful optimizations. Partition pruning skips entire directories, while predicate pushdown filters data at the file level before loading into memory.
# Data partitioned by date and region
# Directory structure: data/year=2024/month=01/region=US/part-*.parquet
# Partition pruning - only reads specific partitions
df = spark.read.parquet("data/") \
.filter("year = 2024 AND month = 1 AND region = 'US'")
# Predicate pushdown - filters applied during file read
df = spark.read.parquet("data/transactions.parquet") \
.filter("amount > 1000 AND status = 'completed'")
# Column pruning - only reads specified columns
df = spark.read.parquet("data/users.parquet") \
.select("user_id", "email", "created_at")
These optimizations can reduce read time by 90% or more on large datasets. PySpark automatically pushes filters down to the Parquet reader when possible.
Reading Partitioned Parquet Datasets
Partitioned datasets organize files into directory hierarchies based on column values. This structure enables efficient querying of specific data subsets.
# Read partitioned dataset with automatic partition discovery
df = spark.read.parquet("data/partitioned/")
# Partition columns automatically added to schema
df.printSchema()
# Shows: year, month, region (partition columns) + data columns
# Query specific partitions efficiently
recent_us_data = df.filter(
(df.year == 2024) &
(df.month >= 10) &
(df.region == "US")
)
# Disable partition discovery if needed
df = spark.read \
.option("basePath", "data/partitioned/") \
.parquet("data/partitioned/year=2024/month=12/")
The basePath option maintains partition columns in the schema even when reading specific partition paths directly.
Handling Nested and Complex Types
Parquet natively supports complex data types including structs, arrays, and maps. PySpark preserves these structures when reading.
# Sample nested Parquet data
df = spark.read.parquet("data/nested_events.parquet")
# Access nested fields
df.select(
"event_id",
"user.id",
"user.profile.name",
"tags[0]",
"metadata.source"
).show()
# Explode arrays
from pyspark.sql.functions import explode
df.select("event_id", explode("tags").alias("tag")).show()
# Flatten struct columns
df.select("event_id", "user.*").show()
# Work with map types
df.select("event_id", "properties['category']").show()
Complex types reduce storage overhead and maintain data relationships without expensive joins.
Reading from Cloud Storage
PySpark seamlessly reads Parquet files from cloud storage systems with appropriate configuration.
# AWS S3
df = spark.read.parquet("s3a://bucket-name/data/events.parquet")
# With credentials configured
spark.conf.set("spark.hadoop.fs.s3a.access.key", "ACCESS_KEY")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "SECRET_KEY")
df = spark.read.parquet("s3a://bucket/path/")
# Azure Blob Storage
df = spark.read.parquet("wasbs://container@account.blob.core.windows.net/data/")
# Google Cloud Storage
df = spark.read.parquet("gs://bucket-name/data/events.parquet")
# With optimized settings for cloud reads
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128MB
spark.conf.set("spark.sql.files.openCostInBytes", "134217728")
df = spark.read.parquet("s3a://large-dataset/")
Adjust maxPartitionBytes based on file sizes and cluster resources to optimize parallelism.
Error Handling and Validation
Implement proper error handling when reading Parquet files to manage corrupted data or missing files.
from pyspark.sql.utils import AnalysisException
try:
df = spark.read.parquet("data/potentially_missing.parquet")
record_count = df.count()
print(f"Successfully read {record_count} records")
except AnalysisException as e:
print(f"Error reading Parquet file: {str(e)}")
# Handle missing file scenario
df = spark.createDataFrame([], schema)
# Validate schema after reading
expected_columns = {"user_id", "timestamp", "event_type"}
actual_columns = set(df.columns)
if not expected_columns.issubset(actual_columns):
missing = expected_columns - actual_columns
raise ValueError(f"Missing required columns: {missing}")
# Check for corrupted files with mode option
df = spark.read \
.option("mode", "DROPMALFORMED") \
.parquet("data/potentially_corrupt/")
The mode option accepts PERMISSIVE (default), DROPMALFORMED, or FAILFAST for different error handling strategies.