PySpark - Read Parquet File into DataFrame

Reading Parquet files in PySpark starts with initializing a SparkSession and using the DataFrame reader API. The simplest approach loads the entire file into memory as a distributed DataFrame.

Key Insights

  • PySpark reads Parquet files efficiently through the spark.read.parquet() method, supporting both single files and entire directories with automatic schema inference and predicate pushdown optimization
  • Parquet’s columnar storage format reduces I/O operations by 10-50x compared to row-based formats when selecting specific columns, making it ideal for analytical workloads on large datasets
  • Advanced reading techniques include partition pruning, schema evolution handling, and merge schemas for datasets with changing structures across multiple files

Basic Parquet File Reading

Reading Parquet files in PySpark starts with initializing a SparkSession and using the DataFrame reader API. The simplest approach loads the entire file into memory as a distributed DataFrame.

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ParquetReader") \
    .getOrCreate()

# Read single Parquet file
df = spark.read.parquet("data/users.parquet")

# Display schema and data
df.printSchema()
df.show(10)

For reading multiple Parquet files from a directory:

# Read all Parquet files in directory
df = spark.read.parquet("data/users/")

# Read specific files using wildcards
df = spark.read.parquet("data/users/part-*.parquet")

# Read from multiple paths
df = spark.read.parquet("data/2023/", "data/2024/")

The DataFrame API automatically handles distributed reading across cluster nodes, partitioning the data for parallel processing.

Schema Handling and Inference

PySpark automatically infers schema from Parquet metadata, but you can also specify or merge schemas when dealing with evolving datasets.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Automatic schema inference (default)
df = spark.read.parquet("data/events.parquet")

# Define explicit schema
schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("timestamp", TimestampType(), True),
    StructField("value", IntegerType(), True)
])

df = spark.read.schema(schema).parquet("data/events.parquet")

# Merge schemas from multiple files with different structures
df = spark.read \
    .option("mergeSchema", "true") \
    .parquet("data/events/2023/", "data/events/2024/")

Schema merging is critical when columns are added over time. Without mergeSchema, PySpark uses the schema from the first file encountered, potentially causing data loss.

Optimizing Read Performance with Options

PySpark provides several options to optimize Parquet reading for specific use cases.

# Configure read options for performance
df = spark.read \
    .option("compression", "snappy") \
    .option("mergeSchema", "false") \
    .option("pathGlobFilter", "*.parquet") \
    .option("recursiveFileLookup", "true") \
    .parquet("data/nested_structure/")

# Disable timestamp conversion for performance
df = spark.read \
    .option("datetimeRebaseMode", "LEGACY") \
    .parquet("data/legacy_data.parquet")

# Read with modified file lookup
df = spark.read \
    .option("modifiedBefore", "2024-01-01T00:00:00") \
    .option("modifiedAfter", "2023-01-01T00:00:00") \
    .parquet("data/time_series/")

Common options:

  • mergeSchema: Combines schemas from different files (expensive operation)
  • pathGlobFilter: Filters files by pattern before reading
  • recursiveFileLookup: Searches subdirectories recursively
  • datetimeRebaseMode: Handles timestamp compatibility between different Parquet versions

Partition Pruning and Predicate Pushdown

Parquet’s columnar format enables powerful optimizations. Partition pruning skips entire directories, while predicate pushdown filters data at the file level before loading into memory.

# Data partitioned by date and region
# Directory structure: data/year=2024/month=01/region=US/part-*.parquet

# Partition pruning - only reads specific partitions
df = spark.read.parquet("data/") \
    .filter("year = 2024 AND month = 1 AND region = 'US'")

# Predicate pushdown - filters applied during file read
df = spark.read.parquet("data/transactions.parquet") \
    .filter("amount > 1000 AND status = 'completed'")

# Column pruning - only reads specified columns
df = spark.read.parquet("data/users.parquet") \
    .select("user_id", "email", "created_at")

These optimizations can reduce read time by 90% or more on large datasets. PySpark automatically pushes filters down to the Parquet reader when possible.

Reading Partitioned Parquet Datasets

Partitioned datasets organize files into directory hierarchies based on column values. This structure enables efficient querying of specific data subsets.

# Read partitioned dataset with automatic partition discovery
df = spark.read.parquet("data/partitioned/")

# Partition columns automatically added to schema
df.printSchema()
# Shows: year, month, region (partition columns) + data columns

# Query specific partitions efficiently
recent_us_data = df.filter(
    (df.year == 2024) & 
    (df.month >= 10) & 
    (df.region == "US")
)

# Disable partition discovery if needed
df = spark.read \
    .option("basePath", "data/partitioned/") \
    .parquet("data/partitioned/year=2024/month=12/")

The basePath option maintains partition columns in the schema even when reading specific partition paths directly.

Handling Nested and Complex Types

Parquet natively supports complex data types including structs, arrays, and maps. PySpark preserves these structures when reading.

# Sample nested Parquet data
df = spark.read.parquet("data/nested_events.parquet")

# Access nested fields
df.select(
    "event_id",
    "user.id",
    "user.profile.name",
    "tags[0]",
    "metadata.source"
).show()

# Explode arrays
from pyspark.sql.functions import explode

df.select("event_id", explode("tags").alias("tag")).show()

# Flatten struct columns
df.select("event_id", "user.*").show()

# Work with map types
df.select("event_id", "properties['category']").show()

Complex types reduce storage overhead and maintain data relationships without expensive joins.

Reading from Cloud Storage

PySpark seamlessly reads Parquet files from cloud storage systems with appropriate configuration.

# AWS S3
df = spark.read.parquet("s3a://bucket-name/data/events.parquet")

# With credentials configured
spark.conf.set("spark.hadoop.fs.s3a.access.key", "ACCESS_KEY")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "SECRET_KEY")
df = spark.read.parquet("s3a://bucket/path/")

# Azure Blob Storage
df = spark.read.parquet("wasbs://container@account.blob.core.windows.net/data/")

# Google Cloud Storage
df = spark.read.parquet("gs://bucket-name/data/events.parquet")

# With optimized settings for cloud reads
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728")  # 128MB
spark.conf.set("spark.sql.files.openCostInBytes", "134217728")
df = spark.read.parquet("s3a://large-dataset/")

Adjust maxPartitionBytes based on file sizes and cluster resources to optimize parallelism.

Error Handling and Validation

Implement proper error handling when reading Parquet files to manage corrupted data or missing files.

from pyspark.sql.utils import AnalysisException

try:
    df = spark.read.parquet("data/potentially_missing.parquet")
    record_count = df.count()
    print(f"Successfully read {record_count} records")
except AnalysisException as e:
    print(f"Error reading Parquet file: {str(e)}")
    # Handle missing file scenario
    df = spark.createDataFrame([], schema)

# Validate schema after reading
expected_columns = {"user_id", "timestamp", "event_type"}
actual_columns = set(df.columns)

if not expected_columns.issubset(actual_columns):
    missing = expected_columns - actual_columns
    raise ValueError(f"Missing required columns: {missing}")

# Check for corrupted files with mode option
df = spark.read \
    .option("mode", "DROPMALFORMED") \
    .parquet("data/potentially_corrupt/")

The mode option accepts PERMISSIVE (default), DROPMALFORMED, or FAILFAST for different error handling strategies.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.