PySpark - Read Nested JSON File
PySpark's `spark.read.json()` method automatically infers schema from JSON files, including nested structures. Start with a simple nested JSON file:
Key Insights
- PySpark handles nested JSON through schema inference or explicit schema definition, with explicit schemas providing better performance and type safety for production workloads
- Complex nested structures require specialized functions like
explode(),select(), and dot notation to flatten arrays and access deeply nested fields - Reading nested JSON efficiently involves understanding struct types, array handling, and the trade-offs between automatic schema inference and manual schema specification
Reading Basic Nested JSON
PySpark’s spark.read.json() method automatically infers schema from JSON files, including nested structures. Start with a simple nested JSON file:
{
"id": 1,
"name": "John Doe",
"address": {
"street": "123 Main St",
"city": "New York",
"zipcode": "10001"
}
}
Read this file with automatic schema inference:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("NestedJSON") \
.getOrCreate()
df = spark.read.json("data/nested.json")
df.printSchema()
df.show(truncate=False)
Output shows the nested structure:
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
| |-- zipcode: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
Access nested fields using dot notation:
df.select("id", "name", "address.city", "address.zipcode").show()
Defining Explicit Schemas for Nested JSON
Explicit schemas eliminate inference overhead and ensure data type consistency. Define schemas using StructType and StructField:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("address", StructType([
StructField("street", StringType(), True),
StructField("city", StringType(), True),
StructField("zipcode", StringType(), True)
]), True)
])
df = spark.read.schema(schema).json("data/nested.json")
This approach provides three advantages: faster processing for large files, guaranteed data types, and early detection of schema mismatches.
Handling Arrays in Nested JSON
Arrays within JSON require special handling. Consider this structure:
{
"id": 1,
"name": "John Doe",
"orders": [
{"order_id": 101, "amount": 250.50},
{"order_id": 102, "amount": 180.75}
]
}
Define the schema with ArrayType:
from pyspark.sql.types import ArrayType, DoubleType
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("orders", ArrayType(
StructType([
StructField("order_id", IntegerType(), True),
StructField("amount", DoubleType(), True)
])
), True)
])
df = spark.read.schema(schema).json("data/orders.json")
df.printSchema()
The array remains as a single column. To work with individual array elements, use explode():
from pyspark.sql.functions import explode, col
df_exploded = df.select(
col("id"),
col("name"),
explode(col("orders")).alias("order")
)
df_exploded.select(
"id",
"name",
"order.order_id",
"order.amount"
).show()
Output creates one row per array element:
+---+--------+--------+------+
| id| name|order_id|amount|
+---+--------+--------+------+
| 1|John Doe| 101|250.50|
| 1|John Doe| 102|180.75|
+---+--------+--------+------+
Flattening Deeply Nested Structures
Real-world JSON often contains multiple nesting levels. Handle this scenario:
{
"customer_id": 1,
"profile": {
"personal": {
"first_name": "John",
"last_name": "Doe"
},
"contact": {
"email": "john@example.com",
"phones": ["555-1234", "555-5678"]
}
},
"purchases": [
{
"date": "2024-01-15",
"items": [
{"product": "Laptop", "price": 1200},
{"product": "Mouse", "price": 25}
]
}
]
}
Flatten this structure systematically:
from pyspark.sql.functions import explode, col
# Read with inferred schema
df = spark.read.json("data/complex.json")
# Flatten nested structs
df_flat = df.select(
col("customer_id"),
col("profile.personal.first_name"),
col("profile.personal.last_name"),
col("profile.contact.email"),
col("profile.contact.phones"),
col("purchases")
)
# Explode purchases array
df_purchases = df_flat.select(
col("customer_id"),
col("first_name"),
col("last_name"),
explode(col("purchases")).alias("purchase")
)
# Explode items within each purchase
df_items = df_purchases.select(
col("customer_id"),
col("first_name"),
col("last_name"),
col("purchase.date").alias("purchase_date"),
explode(col("purchase.items")).alias("item")
)
# Final flattened structure
df_final = df_items.select(
"customer_id",
"first_name",
"last_name",
"purchase_date",
col("item.product"),
col("item.price")
)
df_final.show()
Working with Multiline JSON
JSON files come in two formats: line-delimited (each line is a complete JSON object) and multiline (pretty-printed JSON). Specify the format explicitly:
# Line-delimited JSON (default)
df = spark.read.json("data/records.json")
# Multiline JSON
df = spark.read.option("multiline", "true").json("data/pretty.json")
Multiline mode is slower and more memory-intensive but necessary for formatted JSON files. For large files, prefer line-delimited format.
Handling Corrupt or Missing Fields
Production JSON data often contains inconsistencies. Configure PySpark to handle these cases:
# Set mode for handling corrupt records
df = spark.read \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.json("data/messy.json")
# Filter out corrupt records
df_clean = df.filter(col("_corrupt_record").isNull())
df_corrupt = df.filter(col("_corrupt_record").isNotNull())
print(f"Clean records: {df_clean.count()}")
print(f"Corrupt records: {df_corrupt.count()}")
Three mode options exist:
PERMISSIVE(default): Sets corrupt records to null and captures them in a separate columnDROPMALFORMED: Drops corrupt records entirelyFAILFAST: Throws exception on corrupt records
Performance Optimization Techniques
Optimize nested JSON reading with these strategies:
# 1. Use explicit schema for large files
schema = StructType([...]) # Define your schema
df = spark.read.schema(schema).json("data/large.json")
# 2. Partition reading for very large files
df = spark.read.json("data/partitioned/*.json")
# 3. Sample data for schema inference
sample_df = spark.read.json("data/sample.json")
inferred_schema = sample_df.schema
full_df = spark.read.schema(inferred_schema).json("data/full/*.json")
# 4. Enable predicate pushdown by flattening early
df_flat = df.select(
"id",
"address.city",
"address.zipcode"
).filter(col("address.city") == "New York")
Cache frequently accessed nested data after flattening:
df_processed = df.select(
"customer_id",
explode("orders").alias("order")
).select(
"customer_id",
"order.order_id",
"order.amount"
)
df_processed.cache()
df_processed.count() # Trigger caching
Converting Nested JSON to Parquet
After processing nested JSON, convert to Parquet for better performance:
# Read nested JSON
df = spark.read.json("data/nested.json")
# Write as Parquet preserving nested structure
df.write.mode("overwrite").parquet("output/nested.parquet")
# Or flatten before writing
df_flat = df.select(
"id",
"name",
col("address.city").alias("city"),
col("address.zipcode").alias("zipcode")
)
df_flat.write.mode("overwrite").parquet("output/flat.parquet")
Parquet maintains nested structures while providing columnar storage benefits, compression, and predicate pushdown capabilities that JSON lacks.