PySpark - Read ORC File into DataFrame

ORC is a columnar storage format optimized for Hadoop workloads. Unlike row-based formats, ORC stores data by columns, enabling efficient compression and faster query execution when you only need...

Key Insights

  • ORC (Optimized Row Columnar) files provide superior compression and query performance compared to CSV or JSON, making them ideal for big data workloads in PySpark
  • PySpark’s native ORC reader supports schema inference, predicate pushdown, and column pruning to minimize data transfer and improve read performance
  • Reading ORC files requires understanding partitioning strategies, schema evolution handling, and proper Spark session configuration for production environments

Understanding ORC Format in PySpark

ORC is a columnar storage format optimized for Hadoop workloads. Unlike row-based formats, ORC stores data by columns, enabling efficient compression and faster query execution when you only need specific columns. PySpark provides built-in support for reading ORC files through the DataFrameReader API.

The basic syntax for reading an ORC file:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ORC Reader") \
    .getOrCreate()

df = spark.read.orc("path/to/file.orc")
df.show()

This simple approach works for single files, but production scenarios require more sophisticated handling.

Reading Single and Multiple ORC Files

PySpark handles both individual files and entire directories containing multiple ORC files. When pointing to a directory, Spark automatically reads all ORC files and combines them into a single DataFrame.

# Single file
df_single = spark.read.orc("data/users.orc")

# Multiple files in a directory
df_multiple = spark.read.orc("data/users/")

# Specific files using wildcards
df_pattern = spark.read.orc("data/users/part-*.orc")

# Multiple paths
df_multi_path = spark.read.orc(
    ["data/2023/users.orc", "data/2024/users.orc"]
)

For large datasets split across multiple files, the directory approach is most efficient as Spark parallelizes the read operation across available executors.

Schema Inference and Explicit Schema Definition

By default, PySpark infers the schema from ORC files, which already contain embedded schema metadata. However, you can define explicit schemas for validation or to override inferred types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Schema inference (default)
df_inferred = spark.read.orc("data/users.orc")
df_inferred.printSchema()

# Explicit schema definition
user_schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("last_login", TimestampType(), True)
])

df_explicit = spark.read \
    .schema(user_schema) \
    .orc("data/users.orc")

# Verify schema matches
df_explicit.printSchema()

Explicit schemas provide two advantages: faster read times (no schema inference overhead) and data validation at read time. If the ORC file doesn’t match your schema, Spark raises an exception.

Handling Partitioned ORC Files

Partitioned datasets organize files into directory structures based on column values. This is critical for query performance when filtering on partition columns.

# Read partitioned data
# Directory structure: data/year=2024/month=01/part-00000.orc
df_partitioned = spark.read.orc("data/")

# Partition columns are automatically added to the schema
df_partitioned.printSchema()
# Includes: year, month columns

# Filter using partition columns (very efficient)
df_filtered = df_partitioned.filter(
    (df_partitioned.year == 2024) & (df_partitioned.month == 1)
)

# Verify partition pruning in query plan
df_filtered.explain()

When filtering on partition columns, Spark only reads relevant directories, dramatically reducing I/O. Check the execution plan to confirm partition pruning is occurring.

Column Selection and Predicate Pushdown

ORC’s columnar format enables reading only required columns. Combined with predicate pushdown, this minimizes data transfer from storage to compute.

# Read only specific columns
df_selected = spark.read.orc("data/users.orc") \
    .select("user_id", "username", "email")

# Predicate pushdown - filter is pushed to ORC reader
df_active = spark.read.orc("data/users.orc") \
    .filter("last_login > '2024-01-01'")

# Combine column selection and filtering
df_optimized = spark.read.orc("data/users.orc") \
    .select("user_id", "username") \
    .filter("created_at > '2024-01-01'")

# Verify pushdown in execution plan
df_optimized.explain(True)

The execution plan shows “PushedFilters” when predicate pushdown is active. This means Spark applies filters at the ORC reader level, not after loading all data into memory.

Advanced Read Options

PySpark provides several options to fine-tune ORC reading behavior:

# Merge schema from multiple ORC files
df_merged = spark.read \
    .option("mergeSchema", "true") \
    .orc("data/evolving_schema/")

# Handle corrupted files
df_safe = spark.read \
    .option("ignoreCorruptFiles", "true") \
    .orc("data/users/")

# Recursive file lookup
df_recursive = spark.read \
    .option("recursiveFileLookup", "true") \
    .orc("data/")

# Path globbing filter
df_filtered_path = spark.read \
    .option("pathGlobFilter", "*.orc") \
    .option("modifiedBefore", "2024-01-01T00:00:00") \
    .orc("data/")

# Combine multiple options
df_configured = spark.read \
    .option("mergeSchema", "true") \
    .option("ignoreCorruptFiles", "true") \
    .option("recursiveFileLookup", "true") \
    .orc("data/")

The mergeSchema option is essential when dealing with schema evolution, where different ORC files have slightly different schemas. Spark unifies them into a single schema.

Reading from Cloud Storage

ORC files stored in cloud object storage (S3, Azure Blob, GCS) require appropriate configuration:

# AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set(
    "fs.s3a.access.key", "YOUR_ACCESS_KEY"
)
spark.sparkContext._jsc.hadoopConfiguration().set(
    "fs.s3a.secret.key", "YOUR_SECRET_KEY"
)

df_s3 = spark.read.orc("s3a://bucket-name/path/to/data.orc")

# Azure Blob Storage
spark.conf.set(
    "fs.azure.account.key.youraccount.blob.core.windows.net",
    "YOUR_ACCOUNT_KEY"
)

df_azure = spark.read.orc(
    "wasbs://container@youraccount.blob.core.windows.net/data.orc"
)

# Google Cloud Storage
spark.conf.set("google.cloud.auth.service.account.json.keyfile",
               "/path/to/keyfile.json")

df_gcs = spark.read.orc("gs://bucket-name/path/to/data.orc")

For production environments, use IAM roles or managed identities instead of hardcoding credentials.

Performance Optimization Techniques

Optimize ORC reads by configuring Spark appropriately:

# Increase parallelism for large files
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728")  # 128 MB

# Configure ORC-specific settings
spark.conf.set("spark.sql.orc.filterPushdown", "true")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

# Read with optimized configuration
df_optimized = spark.read.orc("data/large_dataset/")

# Cache if reading multiple times
df_cached = spark.read.orc("data/users.orc").cache()
df_cached.count()  # Triggers caching

# Use repartition for downstream operations
df_repartitioned = spark.read.orc("data/users.orc") \
    .repartition(100)

Vectorized reading processes multiple rows simultaneously, significantly improving performance for primitive data types.

Error Handling and Validation

Implement robust error handling when reading ORC files:

from pyspark.sql.utils import AnalysisException

try:
    df = spark.read.orc("data/users.orc")
    
    # Validate record count
    record_count = df.count()
    if record_count == 0:
        raise ValueError("ORC file is empty")
    
    # Validate required columns
    required_cols = ["user_id", "username", "email"]
    missing_cols = set(required_cols) - set(df.columns)
    if missing_cols:
        raise ValueError(f"Missing columns: {missing_cols}")
    
    # Check for null values in critical columns
    null_counts = df.select(
        [count(when(col(c).isNull(), c)).alias(c) 
         for c in required_cols]
    ).collect()[0]
    
    for col_name in required_cols:
        if null_counts[col_name] > 0:
            print(f"Warning: {null_counts[col_name]} nulls in {col_name}")
    
except AnalysisException as e:
    print(f"File not found or invalid ORC format: {e}")
except Exception as e:
    print(f"Error reading ORC file: {e}")

This validation ensures data quality and catches issues early in your pipeline. Reading ORC files in PySpark is straightforward but requires understanding of partitioning, schema handling, and performance optimization for production workloads.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.