PySpark - Read JSON File into DataFrame

Reading JSON files into a PySpark DataFrame starts with the `spark.read.json()` method. This approach automatically infers the schema from the JSON structure.

Key Insights

  • PySpark provides multiple methods to read JSON files including spark.read.json() for standard JSON and spark.read.option("multiline", "true") for complex nested structures
  • Schema inference works automatically but explicitly defining schemas improves performance and prevents type mismatches in production environments
  • Handling malformed JSON records requires specific configurations like mode options (PERMISSIVE, DROPMALFORMED, FAILFAST) and the _corrupt_record column strategy

Basic JSON File Reading

Reading JSON files into a PySpark DataFrame starts with the spark.read.json() method. This approach automatically infers the schema from the JSON structure.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("JSONReader") \
    .getOrCreate()

# Read single JSON file
df = spark.read.json("data/users.json")
df.show()
df.printSchema()

For multiple JSON files in a directory:

# Read all JSON files in directory
df = spark.read.json("data/json_files/")

# Read specific files using wildcards
df = spark.read.json("data/users_*.json")

# Read multiple specific files
df = spark.read.json(["data/users1.json", "data/users2.json"])

Handling Multiline JSON

Standard JSON files have one record per line. Multiline JSON files contain formatted JSON objects spanning multiple lines, common with pretty-printed API responses.

# Sample multiline JSON structure
"""
{
  "id": 1,
  "name": "John Doe",
  "address": {
    "street": "123 Main St",
    "city": "Boston"
  }
}
{
  "id": 2,
  "name": "Jane Smith",
  "address": {
    "street": "456 Oak Ave",
    "city": "Seattle"
  }
}
"""

# Read multiline JSON
df = spark.read \
    .option("multiline", "true") \
    .json("data/users_multiline.json")

df.show(truncate=False)

Explicit Schema Definition

Schema inference scans the entire dataset, which impacts performance on large files. Defining schemas explicitly ensures data type consistency and faster reads.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

# Define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True),
        StructField("zipcode", StringType(), True)
    ]), True),
    StructField("skills", ArrayType(StringType()), True)
])

# Read with explicit schema
df = spark.read \
    .schema(schema) \
    .json("data/employees.json")

df.printSchema()

Handling Nested JSON Structures

PySpark handles nested JSON naturally, creating nested StructType columns. Access nested fields using dot notation or the getField() method.

# Sample nested JSON
nested_df = spark.read.json("data/nested_users.json")

# Access nested fields
from pyspark.sql.functions import col

df_flattened = nested_df.select(
    col("id"),
    col("name"),
    col("address.street").alias("street"),
    col("address.city").alias("city"),
    col("address.zipcode").alias("zipcode")
)

df_flattened.show()

# Explode arrays
from pyspark.sql.functions import explode

df_skills = nested_df.select(
    col("id"),
    col("name"),
    explode(col("skills")).alias("skill")
)

df_skills.show()

Managing Malformed Records

Production JSON data often contains malformed records. PySpark provides three modes to handle corrupt data:

# PERMISSIVE (default): Sets malformed records to null
df_permissive = spark.read \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json("data/mixed_quality.json")

# Show corrupt records
df_permissive.filter(col("_corrupt_record").isNotNull()).show()

# DROPMALFORMED: Ignores malformed records
df_drop = spark.read \
    .option("mode", "DROPMALFORMED") \
    .json("data/mixed_quality.json")

# FAILFAST: Throws exception on malformed records
try:
    df_failfast = spark.read \
        .option("mode", "FAILFAST") \
        .json("data/mixed_quality.json")
except Exception as e:
    print(f"Error: {e}")

Advanced Read Options

Fine-tune JSON reading behavior with additional options:

df = spark.read \
    .option("multiline", "true") \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .option("dateFormat", "yyyy-MM-dd") \
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
    .option("primitivesAsString", "false") \
    .option("allowComments", "true") \
    .option("allowUnquotedFieldNames", "true") \
    .option("allowSingleQuotes", "true") \
    .option("allowNumericLeadingZeros", "true") \
    .option("allowBackslashEscapingAnyCharacter", "true") \
    .json("data/complex.json")

Reading JSON from Different Sources

PySpark reads JSON from various sources beyond local files:

# From HDFS
df_hdfs = spark.read.json("hdfs://namenode:9000/data/users.json")

# From S3
df_s3 = spark.read.json("s3a://bucket-name/path/to/data.json")

# From Azure Blob Storage
df_azure = spark.read.json("wasbs://container@account.blob.core.windows.net/data.json")

# From string/RDD
json_strings = [
    '{"id": 1, "name": "Alice"}',
    '{"id": 2, "name": "Bob"}'
]
rdd = spark.sparkContext.parallelize(json_strings)
df_rdd = spark.read.json(rdd)

Optimizing JSON Reads

Improve performance with partitioning and compression:

# Read compressed JSON
df_compressed = spark.read.json("data/users.json.gz")

# Read with custom partition count
df_partitioned = spark.read \
    .option("multiline", "false") \
    .json("data/large_dataset.json") \
    .repartition(100)

# Sample data for testing
df_sample = spark.read \
    .json("data/large_dataset.json") \
    .sample(fraction=0.1, seed=42)

print(f"Partitions: {df_partitioned.rdd.getNumPartitions()}")

Practical Example: ETL Pipeline

Complete example processing user activity logs:

from pyspark.sql.functions import col, to_date, hour, count

# Define schema for performance
activity_schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("timestamp", StringType(), False),
    StructField("action", StringType(), False),
    StructField("metadata", StructType([
        StructField("ip_address", StringType(), True),
        StructField("user_agent", StringType(), True)
    ]), True)
])

# Read JSON logs
df_logs = spark.read \
    .schema(activity_schema) \
    .option("mode", "DROPMALFORMED") \
    .json("logs/user_activity/*.json")

# Transform data
df_processed = df_logs \
    .withColumn("date", to_date(col("timestamp"))) \
    .withColumn("hour", hour(col("timestamp"))) \
    .select(
        col("user_id"),
        col("date"),
        col("hour"),
        col("action"),
        col("metadata.ip_address").alias("ip")
    )

# Aggregate
df_summary = df_processed \
    .groupBy("date", "hour", "action") \
    .agg(count("*").alias("action_count")) \
    .orderBy("date", "hour")

df_summary.show(20)

# Write results
df_summary.write \
    .mode("overwrite") \
    .parquet("output/activity_summary")

This approach handles schema definition, malformed records, nested field extraction, and aggregation in a production-ready pipeline. The explicit schema prevents runtime errors, while DROPMALFORMED mode ensures processing continues despite bad data.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.