PySpark: Working with Nested JSON
If you've worked with data from REST APIs, MongoDB exports, or event logging systems, you've encountered deeply nested JSON. A single record might contain arrays of objects, objects within objects,...
Key Insights
- Always define explicit schemas for nested JSON in production—schema inference scans your entire dataset and fails silently on edge cases
- Use
explode_outer()instead ofexplode()when flattening arrays to preserve rows with null or empty arrays - Flatten nested structures at ingestion time when you query the same fields repeatedly; keep them nested when you need flexibility or write back to JSON-based systems
The Nested JSON Problem
If you’ve worked with data from REST APIs, MongoDB exports, or event logging systems, you’ve encountered deeply nested JSON. A single record might contain arrays of objects, objects within objects, and maps with dynamic keys. PySpark handles this well—once you know the patterns.
The challenge isn’t that PySpark can’t process nested data. It’s that the intuitive approaches often lead to performance problems, lost data, or schemas that drift silently in production. This article covers the practical techniques you need.
Loading Nested JSON into DataFrames
PySpark’s spark.read.json() method handles nested structures automatically, but the defaults aren’t always what you want.
# Sample nested JSON structure (orders.json):
# {"user_id": "u1", "name": "Alice", "orders": [{"id": "o1", "amount": 99.99, "items": [{"sku": "A1", "qty": 2}]}]}
# {"user_id": "u2", "name": "Bob", "orders": [{"id": "o2", "amount": 149.50, "items": [{"sku": "B1", "qty": 1}, {"sku": "B2", "qty": 3}]}]}
df = spark.read.json("s3://bucket/orders.json")
df.printSchema()
Output:
root
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- amount: double (nullable = true)
| | |-- id: string (nullable = true)
| | |-- items: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- qty: long (nullable = true)
| | | | |-- sku: string (nullable = true)
|-- user_id: string (nullable = true)
Schema inference works, but it reads your entire dataset to determine types. For a 100GB file, that’s expensive. Worse, if one record has "amount": "pending" instead of a number, inference might pick string for the whole column.
Handle malformed records explicitly:
df = spark.read.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt") \
.json("s3://bucket/orders.json")
# Check for parsing failures
df.filter(df._corrupt.isNotNull()).show()
The PERMISSIVE mode (default) keeps malformed rows with nulls in parsed fields. Use DROPMALFORMED to skip bad records entirely, or FAILFAST to throw an exception immediately.
Understanding Complex Data Types
PySpark represents nested JSON with three complex types:
StructType: A fixed set of named fields (like a row within a row). JSON objects become structs.
ArrayType: An ordered collection of elements with the same type. JSON arrays become arrays.
MapType: Key-value pairs where keys are strings and values share a type. Less common, but useful for dynamic keys.
from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType,
LongType, ArrayType
)
item_schema = StructType([
StructField("sku", StringType(), True),
StructField("qty", LongType(), True)
])
order_schema = StructType([
StructField("id", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("items", ArrayType(item_schema), True)
])
user_schema = StructType([
StructField("user_id", StringType(), True),
StructField("name", StringType(), True),
StructField("orders", ArrayType(order_schema), True)
])
# Load with explicit schema - no inference scan
df = spark.read.schema(user_schema).json("s3://bucket/orders.json")
Explicit schemas are verbose but worth it. You get faster loads, consistent types, and immediate failures when data doesn’t match expectations.
Accessing Nested Fields
PySpark provides several ways to reach into nested structures.
Dot notation works for struct fields at any depth:
from pyspark.sql.functions import col
# These are equivalent
df.select("orders.amount")
df.select(col("orders.amount"))
df.select(df.orders.amount)
When you access an array field this way, you get an array of the nested values:
df.select("user_id", "orders.id").show()
# +-------+--------+
# |user_id| id|
# +-------+--------+
# | u1| [o1]|
# | u2| [o2]|
# +-------+--------+
getItem() accesses specific array indices or map keys:
# First order's amount
df.select(col("orders").getItem(0).getItem("amount").alias("first_order_amount"))
# Or with bracket notation
df.select(col("orders")[0]["amount"].alias("first_order_amount"))
getField() explicitly accesses struct fields (useful when field names conflict with DataFrame methods):
df.select(col("orders").getItem(0).getField("id"))
Flattening Nested Structures
Most analytical queries need flat tables. The explode() function converts array elements into separate rows.
from pyspark.sql.functions import explode, explode_outer
# Explode orders array - one row per order
df_orders = df.select(
"user_id",
"name",
explode("orders").alias("order")
)
df_orders.printSchema()
# root
# |-- user_id: string
# |-- name: string
# |-- order: struct
# | |-- amount: double
# | |-- id: string
# | |-- items: array
# Now access order fields with dot notation
df_orders.select(
"user_id",
"order.id",
"order.amount"
).show()
Critical: use explode_outer() when arrays might be null or empty. Regular explode() drops those rows entirely:
# User with no orders disappears with explode()
df_with_empty = spark.createDataFrame([
("u3", "Charlie", None)
], ["user_id", "name", "orders"])
df_with_empty.select("user_id", explode("orders")).show() # Empty!
df_with_empty.select("user_id", explode_outer("orders")).show() # Keeps row with null
For deeply nested structures, flatten iteratively:
from pyspark.sql.functions import explode_outer, col
def flatten_df(df):
"""Recursively flatten all struct and array columns."""
flat_cols = []
nested_cols = []
for field in df.schema.fields:
if isinstance(field.dataType, StructType):
# Expand struct fields with parent prefix
for nested_field in field.dataType.fields:
flat_cols.append(
col(f"{field.name}.{nested_field.name}")
.alias(f"{field.name}_{nested_field.name}")
)
elif isinstance(field.dataType, ArrayType):
nested_cols.append(field.name)
flat_cols.append(col(field.name))
else:
flat_cols.append(col(field.name))
df = df.select(flat_cols)
# Explode arrays one at a time
for nested_col in nested_cols:
df = df.withColumn(nested_col, explode_outer(col(nested_col)))
# Recursively flatten if the exploded column is a struct
df = flatten_df(df)
return df
# Fully flatten the nested structure
flat_df = flatten_df(df)
flat_df.show()
Transforming and Restructuring
Sometimes you need to go the other direction—creating nested structures from flat data.
struct() combines columns into a nested object:
from pyspark.sql.functions import struct, collect_list, to_json
# Assume flat_orders has: user_id, user_name, order_id, order_amount
flat_orders = spark.createDataFrame([
("u1", "Alice", "o1", 99.99),
("u1", "Alice", "o2", 149.50),
("u2", "Bob", "o3", 75.00)
], ["user_id", "user_name", "order_id", "order_amount"])
# Reconstruct nested structure
nested_df = flat_orders.groupBy("user_id", "user_name").agg(
collect_list(
struct(
col("order_id").alias("id"),
col("order_amount").alias("amount")
)
).alias("orders")
)
nested_df.show(truncate=False)
# +-------+---------+----------------------------------+
# |user_id|user_name|orders |
# +-------+---------+----------------------------------+
# |u1 |Alice |[{o1, 99.99}, {o2, 149.5}] |
# |u2 |Bob |[{o3, 75.0}] |
# +-------+---------+----------------------------------+
to_json() serializes structs or maps back to JSON strings:
api_output = nested_df.select(
to_json(struct("user_id", "user_name", "orders")).alias("payload")
)
from_json() parses JSON strings back into structs when you have JSON embedded in a column:
from pyspark.sql.functions import from_json
json_string_df = spark.createDataFrame([
('{"id": "o1", "amount": 99.99}',)
], ["order_json"])
parsed = json_string_df.select(
from_json("order_json", order_schema).alias("order")
)
Performance Considerations
Nested JSON creates specific performance challenges.
Schema inference is expensive. On large datasets, Spark samples or reads all records to infer types. Define schemas explicitly in production jobs.
Predicate pushdown has limits. When reading Parquet, Spark can skip row groups that don’t match filter predicates. With nested fields, this optimization often doesn’t apply. If you filter on orders.amount > 100 frequently, consider flattening at write time.
Explode multiplies data. An array with 100 elements becomes 100 rows. If you then join or aggregate, you’re processing 100x more data. Filter before exploding when possible:
from pyspark.sql.functions import filter as array_filter
# Filter array elements before exploding
df.select(
"user_id",
explode(
array_filter("orders", lambda x: x.amount > 50)
).alias("large_order")
)
Choose your flattening strategy based on access patterns:
- Flatten at ingestion when the same fields are queried repeatedly
- Keep nested when queries vary or you write back to JSON systems
- Consider hybrid approaches—flatten commonly queried fields, keep rarely-used arrays nested
Partition wisely. If you flatten an orders array and partition by order_date, you distribute the exploded data. If you keep it nested and partition by user_id, each partition contains complete user records. Match partitioning to your query patterns.
Nested JSON is unavoidable in modern data pipelines. The techniques here—explicit schemas, careful flattening, and strategic restructuring—turn a common pain point into manageable complexity.