PySpark - Read Multiline JSON

PySpark's JSON reader expects newline-delimited JSON (NDJSON) by default. Each line must contain a complete, valid JSON object:

Key Insights

  • PySpark’s default JSON reader fails on multiline JSON because it expects each line to be a complete JSON object; enable multiline=True to parse JSON spanning multiple lines
  • Multiline JSON parsing is significantly slower and more memory-intensive than single-line JSON due to the need to read entire files into memory before parsing
  • Schema inference with multiline JSON can cause performance issues at scale; explicitly define schemas whenever possible to avoid multiple file scans

Understanding Multiline vs Single-line JSON

PySpark’s JSON reader expects newline-delimited JSON (NDJSON) by default. Each line must contain a complete, valid JSON object:

{"id": 1, "name": "Alice", "age": 30}
{"id": 2, "name": "Bob", "age": 25}

Multiline JSON formats the data with indentation and line breaks for readability:

[
  {
    "id": 1,
    "name": "Alice",
    "age": 30
  },
  {
    "id": 2,
    "name": "Bob",
    "age": 25
  }
]

Without the multiline option, PySpark will throw parsing errors or return null values because it attempts to parse each individual line as complete JSON.

Basic Multiline JSON Reading

Enable multiline JSON parsing with the multiline parameter:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MultilineJSON") \
    .getOrCreate()

df = spark.read \
    .option("multiline", "true") \
    .json("path/to/multiline.json")

df.show()

For JSON arrays at the root level, PySpark automatically flattens the array into individual rows:

# Input: [{"id": 1}, {"id": 2}]
df = spark.read.option("multiline", True).json("array.json")
df.printSchema()
# root
#  |-- id: long (nullable = true)

Defining Explicit Schemas

Schema inference with multiline JSON requires reading the entire file, which becomes expensive with large datasets. Define schemas explicitly:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("addresses", ArrayType(
        StructType([
            StructField("street", StringType(), True),
            StructField("city", StringType(), True),
            StructField("zipcode", StringType(), True)
        ])
    ), True)
])

df = spark.read \
    .option("multiline", "true") \
    .schema(schema) \
    .json("complex_multiline.json")

This approach skips schema inference entirely, reducing read time by 50-70% on large files.

Handling Nested JSON Structures

Multiline JSON often contains deeply nested objects. PySpark preserves this structure:

# Sample nested JSON
nested_json = '''
{
  "user": {
    "id": 1,
    "profile": {
      "name": "Alice",
      "contact": {
        "email": "alice@example.com",
        "phone": "555-0100"
      }
    }
  },
  "orders": [
    {"order_id": 101, "amount": 250.00},
    {"order_id": 102, "amount": 175.50}
  ]
}
'''

df = spark.read.option("multiline", "true").json(sc.parallelize([nested_json]))
df.printSchema()

# Access nested fields
df.select("user.profile.name", "user.profile.contact.email").show()

# Explode arrays
from pyspark.sql.functions import explode

df.select("user.id", explode("orders").alias("order")).show()

Output:

+-------+--------------------+
|     id|               order|
+-------+--------------------+
|      1|{101, 250.0}        |
|      1|{102, 175.5}        |
+-------+--------------------+

Reading Multiple Multiline JSON Files

When processing directories with multiple multiline JSON files, PySpark reads each file independently:

# Read all JSON files in a directory
df = spark.read \
    .option("multiline", "true") \
    .json("path/to/json_directory/")

# Add source filename to track origin
from pyspark.sql.functions import input_file_name

df_with_source = df.withColumn("source_file", input_file_name())
df_with_source.select("id", "name", "source_file").show(truncate=False)

Be cautious with large directories. Each file must be read entirely into memory before parsing, which can overwhelm executors.

Handling Malformed JSON

Multiline JSON parsing is less forgiving than single-line parsing. Configure error handling:

# Drop malformed records
df = spark.read \
    .option("multiline", "true") \
    .option("mode", "DROPMALFORMED") \
    .json("potentially_corrupt.json")

# Permissive mode (default) - sets malformed records to null
df = spark.read \
    .option("multiline", "true") \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json("potentially_corrupt.json")

# Check for corrupt records
corrupt_records = df.filter(df._corrupt_record.isNotNull())
corrupt_records.show(truncate=False)

# Fail fast on any malformed record
df = spark.read \
    .option("multiline", "true") \
    .option("mode", "FAILFAST") \
    .json("strict_validation.json")

Performance Optimization Strategies

Multiline JSON parsing doesn’t parallelize well. Here are strategies to improve performance:

# Strategy 1: Convert to NDJSON if possible
# Pre-process files to single-line format externally

# Strategy 2: Partition large files before processing
# Split large multiline JSON into smaller files

# Strategy 3: Increase executor memory
spark = SparkSession.builder \
    .appName("MultilineJSON") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Strategy 4: Use compression for I/O efficiency
df = spark.read \
    .option("multiline", "true") \
    .option("compression", "gzip") \
    .json("compressed_multiline.json.gz")

Benchmark comparison on a 500MB JSON file:

import time

# Multiline with schema inference
start = time.time()
df1 = spark.read.option("multiline", "true").json("large.json")
df1.count()
print(f"With inference: {time.time() - start:.2f}s")

# Multiline with explicit schema
start = time.time()
df2 = spark.read.option("multiline", "true").schema(schema).json("large.json")
df2.count()
print(f"With schema: {time.time() - start:.2f}s")

# Results typically show 50-70% improvement with explicit schema

Working with JSON Lines Alternative

When you control the data format, convert multiline JSON to JSON Lines for better performance:

import json

# Convert multiline to NDJSON
def convert_to_jsonl(input_path, output_path):
    with open(input_path, 'r') as f:
        data = json.load(f)
    
    with open(output_path, 'w') as f:
        if isinstance(data, list):
            for item in data:
                f.write(json.dumps(item) + '\n')
        else:
            f.write(json.dumps(data) + '\n')

# Then read without multiline option
df = spark.read.json("converted.jsonl")  # Much faster

This approach enables full parallelization across partitions and reduces memory pressure on executors.

Real-World Example: API Response Processing

Processing paginated API responses stored as multiline JSON:

from pyspark.sql.functions import col, explode_outer

# Schema for API response
api_schema = StructType([
    StructField("page", IntegerType(), True),
    StructField("total_pages", IntegerType(), True),
    StructField("results", ArrayType(
        StructType([
            StructField("id", StringType(), True),
            StructField("timestamp", StringType(), True),
            StructField("value", DoubleType(), True)
        ])
    ), True)
])

# Read API responses
df = spark.read \
    .option("multiline", "true") \
    .schema(api_schema) \
    .json("api_responses/*.json")

# Flatten results
flattened = df.select(
    col("page"),
    explode_outer("results").alias("result")
).select(
    "page",
    "result.id",
    "result.timestamp",
    "result.value"
)

flattened.write.parquet("processed_api_data/")

This pattern efficiently processes complex API responses while maintaining data lineage through the page number.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.