PySpark - Read Multiline JSON
PySpark's JSON reader expects newline-delimited JSON (NDJSON) by default. Each line must contain a complete, valid JSON object:
Key Insights
- PySpark’s default JSON reader fails on multiline JSON because it expects each line to be a complete JSON object; enable
multiline=Trueto parse JSON spanning multiple lines - Multiline JSON parsing is significantly slower and more memory-intensive than single-line JSON due to the need to read entire files into memory before parsing
- Schema inference with multiline JSON can cause performance issues at scale; explicitly define schemas whenever possible to avoid multiple file scans
Understanding Multiline vs Single-line JSON
PySpark’s JSON reader expects newline-delimited JSON (NDJSON) by default. Each line must contain a complete, valid JSON object:
{"id": 1, "name": "Alice", "age": 30}
{"id": 2, "name": "Bob", "age": 25}
Multiline JSON formats the data with indentation and line breaks for readability:
[
{
"id": 1,
"name": "Alice",
"age": 30
},
{
"id": 2,
"name": "Bob",
"age": 25
}
]
Without the multiline option, PySpark will throw parsing errors or return null values because it attempts to parse each individual line as complete JSON.
Basic Multiline JSON Reading
Enable multiline JSON parsing with the multiline parameter:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MultilineJSON") \
.getOrCreate()
df = spark.read \
.option("multiline", "true") \
.json("path/to/multiline.json")
df.show()
For JSON arrays at the root level, PySpark automatically flattens the array into individual rows:
# Input: [{"id": 1}, {"id": 2}]
df = spark.read.option("multiline", True).json("array.json")
df.printSchema()
# root
# |-- id: long (nullable = true)
Defining Explicit Schemas
Schema inference with multiline JSON requires reading the entire file, which becomes expensive with large datasets. Define schemas explicitly:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("addresses", ArrayType(
StructType([
StructField("street", StringType(), True),
StructField("city", StringType(), True),
StructField("zipcode", StringType(), True)
])
), True)
])
df = spark.read \
.option("multiline", "true") \
.schema(schema) \
.json("complex_multiline.json")
This approach skips schema inference entirely, reducing read time by 50-70% on large files.
Handling Nested JSON Structures
Multiline JSON often contains deeply nested objects. PySpark preserves this structure:
# Sample nested JSON
nested_json = '''
{
"user": {
"id": 1,
"profile": {
"name": "Alice",
"contact": {
"email": "alice@example.com",
"phone": "555-0100"
}
}
},
"orders": [
{"order_id": 101, "amount": 250.00},
{"order_id": 102, "amount": 175.50}
]
}
'''
df = spark.read.option("multiline", "true").json(sc.parallelize([nested_json]))
df.printSchema()
# Access nested fields
df.select("user.profile.name", "user.profile.contact.email").show()
# Explode arrays
from pyspark.sql.functions import explode
df.select("user.id", explode("orders").alias("order")).show()
Output:
+-------+--------------------+
| id| order|
+-------+--------------------+
| 1|{101, 250.0} |
| 1|{102, 175.5} |
+-------+--------------------+
Reading Multiple Multiline JSON Files
When processing directories with multiple multiline JSON files, PySpark reads each file independently:
# Read all JSON files in a directory
df = spark.read \
.option("multiline", "true") \
.json("path/to/json_directory/")
# Add source filename to track origin
from pyspark.sql.functions import input_file_name
df_with_source = df.withColumn("source_file", input_file_name())
df_with_source.select("id", "name", "source_file").show(truncate=False)
Be cautious with large directories. Each file must be read entirely into memory before parsing, which can overwhelm executors.
Handling Malformed JSON
Multiline JSON parsing is less forgiving than single-line parsing. Configure error handling:
# Drop malformed records
df = spark.read \
.option("multiline", "true") \
.option("mode", "DROPMALFORMED") \
.json("potentially_corrupt.json")
# Permissive mode (default) - sets malformed records to null
df = spark.read \
.option("multiline", "true") \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.json("potentially_corrupt.json")
# Check for corrupt records
corrupt_records = df.filter(df._corrupt_record.isNotNull())
corrupt_records.show(truncate=False)
# Fail fast on any malformed record
df = spark.read \
.option("multiline", "true") \
.option("mode", "FAILFAST") \
.json("strict_validation.json")
Performance Optimization Strategies
Multiline JSON parsing doesn’t parallelize well. Here are strategies to improve performance:
# Strategy 1: Convert to NDJSON if possible
# Pre-process files to single-line format externally
# Strategy 2: Partition large files before processing
# Split large multiline JSON into smaller files
# Strategy 3: Increase executor memory
spark = SparkSession.builder \
.appName("MultilineJSON") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
# Strategy 4: Use compression for I/O efficiency
df = spark.read \
.option("multiline", "true") \
.option("compression", "gzip") \
.json("compressed_multiline.json.gz")
Benchmark comparison on a 500MB JSON file:
import time
# Multiline with schema inference
start = time.time()
df1 = spark.read.option("multiline", "true").json("large.json")
df1.count()
print(f"With inference: {time.time() - start:.2f}s")
# Multiline with explicit schema
start = time.time()
df2 = spark.read.option("multiline", "true").schema(schema).json("large.json")
df2.count()
print(f"With schema: {time.time() - start:.2f}s")
# Results typically show 50-70% improvement with explicit schema
Working with JSON Lines Alternative
When you control the data format, convert multiline JSON to JSON Lines for better performance:
import json
# Convert multiline to NDJSON
def convert_to_jsonl(input_path, output_path):
with open(input_path, 'r') as f:
data = json.load(f)
with open(output_path, 'w') as f:
if isinstance(data, list):
for item in data:
f.write(json.dumps(item) + '\n')
else:
f.write(json.dumps(data) + '\n')
# Then read without multiline option
df = spark.read.json("converted.jsonl") # Much faster
This approach enables full parallelization across partitions and reduces memory pressure on executors.
Real-World Example: API Response Processing
Processing paginated API responses stored as multiline JSON:
from pyspark.sql.functions import col, explode_outer
# Schema for API response
api_schema = StructType([
StructField("page", IntegerType(), True),
StructField("total_pages", IntegerType(), True),
StructField("results", ArrayType(
StructType([
StructField("id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("value", DoubleType(), True)
])
), True)
])
# Read API responses
df = spark.read \
.option("multiline", "true") \
.schema(api_schema) \
.json("api_responses/*.json")
# Flatten results
flattened = df.select(
col("page"),
explode_outer("results").alias("result")
).select(
"page",
"result.id",
"result.timestamp",
"result.value"
)
flattened.write.parquet("processed_api_data/")
This pattern efficiently processes complex API responses while maintaining data lineage through the page number.