PySpark - Read Avro File into DataFrame

• PySpark requires the spark-avro package to read Avro files, which must be specified during SparkSession initialization or provided at runtime via --packages

Key Insights

• PySpark requires the spark-avro package to read Avro files, which must be specified during SparkSession initialization or provided at runtime via –packages • Reading Avro files into DataFrames preserves complex data types including nested structures, arrays, and maps with automatic schema inference • Performance optimization techniques like partition pruning, predicate pushdown, and column projection significantly reduce I/O overhead when working with large Avro datasets

Setting Up PySpark with Avro Support

PySpark doesn’t include Avro support by default. You need to add the spark-avro package when creating your SparkSession. The package version must match your Spark version.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AvroReader") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") \
    .getOrCreate()

For Spark 3.x deployments, use the appropriate scala version (2.12 or 2.13). If you’re running on a cluster, pass the package during spark-submit:

spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.0 your_script.py

Alternatively, download the JAR file manually and add it to the classpath:

spark = SparkSession.builder \
    .appName("AvroReader") \
    .config("spark.jars", "/path/to/spark-avro_2.12-3.5.0.jar") \
    .getOrCreate()

Basic Avro File Reading

Once the spark-avro package is loaded, reading Avro files is straightforward using the DataFrameReader API:

df = spark.read.format("avro").load("data/users.avro")
df.show()
df.printSchema()

For multiple Avro files in a directory:

df = spark.read.format("avro").load("data/users/*.avro")

You can also use the shorthand method:

df = spark.read.avro("data/users.avro")

The schema is automatically inferred from the Avro file’s embedded schema. Here’s what a typical output looks like:

# Sample output
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- zipcode: string (nullable = true)

Working with Complex Avro Schemas

Avro excels at handling complex nested data structures. PySpark preserves these structures when reading into DataFrames:

# Reading Avro with nested structures
df = spark.read.format("avro").load("data/complex_data.avro")

# Access nested fields using dot notation
df.select("user.id", "user.profile.age", "user.preferences").show()

# Explode arrays for analysis
from pyspark.sql.functions import explode

df.select("user_id", explode("orders").alias("order")).show()

# Working with maps
df.select("user_id", "metadata['last_login']").show()

For deeply nested structures, use column expressions:

from pyspark.sql.functions import col

df.select(
    col("id"),
    col("profile.contact.email").alias("email"),
    col("profile.contact.phone.mobile").alias("mobile")
).show()

Specifying Custom Avro Schemas

While schema inference works well, you can provide an explicit Avro schema for validation or when reading schema-less data:

avro_schema = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "username", "type": "string"},
    {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
  ]
}
"""

df = spark.read.format("avro") \
    .option("avroSchema", avro_schema) \
    .load("data/users.avro")

This approach is useful when you need to enforce specific type conversions or handle schema evolution scenarios.

Reading Partitioned Avro Data

Partitioned Avro datasets enable efficient data processing through partition pruning:

# Directory structure: data/year=2024/month=01/*.avro
df = spark.read.format("avro").load("data/")

# Partition columns are automatically added to the schema
df.printSchema()
# Shows: year, month, and data columns

# Filter on partition columns for efficient reads
filtered_df = df.filter("year = 2024 AND month = 1")

# Verify partition pruning in execution plan
filtered_df.explain()

Explicitly specify partition discovery:

df = spark.read.format("avro") \
    .option("basePath", "data/") \
    .load("data/year=2024/month=*/")

Performance Optimization Techniques

Column Projection

Read only required columns to minimize I/O:

# Bad: reads entire file
df = spark.read.format("avro").load("data/large_file.avro")
result = df.select("id", "name")

# Better: PySpark optimizes this automatically with predicate pushdown
df = spark.read.format("avro").load("data/large_file.avro")
result = df.select("id", "name")  # Only these columns are read from disk

Compression Settings

Avro files are typically compressed. Configure decompression appropriately:

# Avro handles compression internally, but you can verify
df = spark.read.format("avro") \
    .option("compression", "snappy") \
    .load("data/compressed.avro")

Parallel Reading

Increase parallelism for large datasets:

# Repartition after reading for better parallelism
df = spark.read.format("avro").load("data/large/*.avro")
df = df.repartition(200)  # Adjust based on cluster size

# Or use coalesce to reduce partitions
df = df.coalesce(50)

Handling Schema Evolution

Avro supports schema evolution. PySpark handles compatible schema changes:

# Old schema: {id, name}
# New schema: {id, name, email}

df = spark.read.format("avro").load("data/mixed_schema/*.avro")

# Missing fields are filled with null
df.select("id", "name", "email").show()

# Check for null values in evolved fields
from pyspark.sql.functions import when, count

df.select(
    count(when(col("email").isNull(), True)).alias("missing_emails")
).show()

Set merge schema option explicitly:

df = spark.read.format("avro") \
    .option("mergeSchema", "true") \
    .load("data/*.avro")

Converting Avro to Other Formats

After reading Avro, convert to other formats for downstream processing:

# Read Avro and write as Parquet
df = spark.read.format("avro").load("data/input.avro")
df.write.format("parquet") \
    .mode("overwrite") \
    .save("data/output.parquet")

# Write as partitioned Parquet
df.write.format("parquet") \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .save("data/partitioned_output")

# Convert to Delta Lake
df.write.format("delta") \
    .mode("overwrite") \
    .save("data/delta_table")

Error Handling and Validation

Implement robust error handling when reading Avro files:

from pyspark.sql.utils import AnalysisException

try:
    df = spark.read.format("avro").load("data/users.avro")
    
    # Validate expected columns exist
    expected_columns = {"id", "name", "email"}
    actual_columns = set(df.columns)
    
    if not expected_columns.issubset(actual_columns):
        missing = expected_columns - actual_columns
        raise ValueError(f"Missing columns: {missing}")
    
    # Validate data types
    schema_dict = {field.name: field.dataType for field in df.schema.fields}
    print(f"Schema validation passed: {schema_dict}")
    
except AnalysisException as e:
    print(f"Failed to read Avro file: {e}")
except ValueError as e:
    print(f"Schema validation failed: {e}")

Reading Avro files with PySpark provides efficient handling of complex data structures with built-in schema management. The combination of automatic schema inference, support for nested types, and performance optimizations makes Avro an excellent choice for big data pipelines. Focus on partition strategies and column projection to maximize throughput when processing large-scale Avro datasets.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.