Spark Scala - Read JSON File
JSON remains the lingua franca of data interchange. APIs return it, logging systems emit it, and configuration files use it. When you're building data pipelines with Apache Spark, you'll inevitably...
Key Insights
- Always define explicit schemas for production JSON workloads—schema inference scans your entire dataset and kills performance at scale.
- Use
multiLinemode only when necessary; single-line JSON (JSON Lines format) is significantly faster to parse and parallelize. - The
modeoption determines how Spark handles malformed records—chooseFAILFASTfor data quality pipelines andPERMISSIVEwith a corrupt record column for debugging.
Introduction
JSON remains the lingua franca of data interchange. APIs return it, logging systems emit it, and configuration files use it. When you’re building data pipelines with Apache Spark, you’ll inevitably need to ingest JSON data at scale.
Spark’s DataFrame API makes reading JSON straightforward, but the simplicity masks important decisions that affect performance, reliability, and correctness. This article covers the practical aspects of reading JSON files in Spark using Scala—from basic reads to handling deeply nested structures that make you question your career choices.
Environment Setup
Before reading any JSON, you need a properly configured SparkSession. Here’s a standard setup for local development:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("JSONReader")
.master("local[*]")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
// Reduce logging noise during development
spark.sparkContext.setLogLevel("WARN")
// Import implicits for DataFrame operations
import spark.implicits._
The spark.sql.session.timeZone configuration matters when your JSON contains timestamp fields. Set it explicitly to avoid timezone-related surprises when your pipeline runs in different environments.
For production deployments, remove the .master() call—let your cluster manager handle resource allocation.
Basic JSON File Reading
The simplest approach uses spark.read.json():
// Reading a single JSON file
val df = spark.read.json("data/users.json")
// Display the inferred schema
df.printSchema()
// root
// |-- id: long (nullable = true)
// |-- name: string (nullable = true)
// |-- email: string (nullable = true)
// |-- created_at: string (nullable = true)
// Show first 10 rows
df.show(10, truncate = false)
// Reading multiple files with glob patterns
val allUsers = spark.read.json("data/users/*.json")
// Reading from multiple specific paths
val combinedDf = spark.read.json(
"data/2024/01/users.json",
"data/2024/02/users.json",
"data/2024/03/users.json"
)
Spark expects JSON Lines format by default—one JSON object per line:
{"id": 1, "name": "Alice", "email": "alice@example.com"}
{"id": 2, "name": "Bob", "email": "bob@example.com"}
This format enables parallel processing since Spark can split the file and parse lines independently across executors.
Handling Schema
Schema inference is convenient but expensive. Spark must scan your data to determine types, and it often gets them wrong—integers become longs, timestamps become strings, and nullable fields cause headaches downstream.
Define explicit schemas for production workloads:
val userSchema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("email", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("created_at", TimestampType, nullable = false),
StructField("preferences", StructType(Seq(
StructField("newsletter", BooleanType, nullable = true),
StructField("theme", StringType, nullable = true)
)), nullable = true)
))
val df = spark.read
.schema(userSchema)
.json("data/users.json")
df.printSchema()
// root
// |-- id: long (nullable = false)
// |-- name: string (nullable = false)
// |-- email: string (nullable = true)
// |-- age: integer (nullable = true)
// |-- created_at: timestamp (nullable = false)
// |-- preferences: struct (nullable = true)
// | |-- newsletter: boolean (nullable = true)
// | |-- theme: string (nullable = true)
For complex schemas, you can also use DDL strings:
val schemaFromDDL = "id LONG, name STRING, email STRING, created_at TIMESTAMP"
val df = spark.read
.schema(schemaFromDDL)
.json("data/users.json")
Explicit schemas provide three benefits: faster reads (no inference scan), type safety (catch issues early), and documentation (the schema describes your data contract).
Reading Options and Configurations
Spark’s JSON reader accepts numerous options that control parsing behavior:
val df = spark.read
.option("multiLine", "true") // Handle pretty-printed JSON
.option("mode", "PERMISSIVE") // How to handle malformed records
.option("columnNameOfCorruptRecord", "_corrupt_record")
.option("dateFormat", "yyyy-MM-dd")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
.option("allowComments", "true") // Allow // and /* */ comments
.option("allowUnquotedFieldNames", "true")
.option("allowSingleQuotes", "true")
.schema(userSchema)
.json("data/users.json")
The mode option deserves special attention:
// PERMISSIVE (default): Sets malformed fields to null, stores raw input in corrupt column
val permissiveDf = spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.schema(userSchema.add("_corrupt_record", StringType))
.json("data/messy_users.json")
// Find corrupt records for debugging
permissiveDf.filter($"_corrupt_record".isNotNull).show(truncate = false)
// DROPMALFORMED: Silently drops bad records
val cleanDf = spark.read
.option("mode", "DROPMALFORMED")
.schema(userSchema)
.json("data/messy_users.json")
// FAILFAST: Throws exception on first malformed record
val strictDf = spark.read
.option("mode", "FAILFAST")
.schema(userSchema)
.json("data/validated_users.json")
Use FAILFAST in pipelines where data quality is critical. Use PERMISSIVE with a corrupt record column when debugging or processing data from unreliable sources.
For multiline JSON (pretty-printed or arrays), you must enable the multiLine option:
// This handles JSON files formatted like:
// {
// "id": 1,
// "name": "Alice"
// }
val prettyJsonDf = spark.read
.option("multiLine", "true")
.json("data/pretty_users.json")
Be aware that multiLine mode cannot split files across partitions—each file becomes one partition. For large files, this creates bottlenecks.
Working with Nested JSON
Real-world JSON is rarely flat. Here’s how to handle nested structures:
// Sample nested JSON structure:
// {
// "order_id": "ORD-001",
// "customer": {
// "id": 123,
// "name": "Alice",
// "address": {
// "city": "Seattle",
// "zip": "98101"
// }
// },
// "items": [
// {"sku": "ABC", "quantity": 2, "price": 29.99},
// {"sku": "XYZ", "quantity": 1, "price": 49.99}
// ]
// }
val ordersDf = spark.read
.option("multiLine", "true")
.json("data/orders.json")
// Access nested fields with dot notation
val flatCustomerDf = ordersDf.select(
$"order_id",
$"customer.id".as("customer_id"),
$"customer.name".as("customer_name"),
$"customer.address.city".as("city"),
$"customer.address.zip".as("zip_code")
)
flatCustomerDf.show()
// +--------+-----------+-------------+-------+--------+
// |order_id|customer_id|customer_name| city|zip_code|
// +--------+-----------+-------------+-------+--------+
// | ORD-001| 123| Alice|Seattle| 98101|
// +--------+-----------+-------------+-------+--------+
For arrays, use explode() to create one row per element:
// Explode items array into separate rows
val itemsDf = ordersDf
.select($"order_id", explode($"items").as("item"))
.select(
$"order_id",
$"item.sku",
$"item.quantity",
$"item.price"
)
itemsDf.show()
// +--------+---+--------+-----+
// |order_id|sku|quantity|price|
// +--------+---+--------+-----+
// | ORD-001|ABC| 2|29.99|
// | ORD-001|XYZ| 1|49.99|
// +--------+---+--------+-----+
// Calculate order totals
val orderTotals = itemsDf
.withColumn("line_total", $"quantity" * $"price")
.groupBy("order_id")
.agg(
sum("line_total").as("order_total"),
count("sku").as("item_count")
)
When you need to preserve rows even when arrays are empty, use explode_outer():
// explode_outer keeps rows with null when array is empty or null
val allOrdersDf = ordersDf
.select($"order_id", explode_outer($"items").as("item"))
Common Issues and Best Practices
Malformed JSON debugging: When records fail silently, add the corrupt record column and filter for nulls in expected fields:
val debugDf = spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_raw")
.schema(schema.add("_raw", StringType))
.json(path)
val badRecords = debugDf.filter($"id".isNull && $"_raw".isNotNull)
badRecords.select("_raw").show(truncate = false)
Performance tips:
- Prefer JSON Lines over multiline JSON. Single-line format enables parallel parsing.
- Always define schemas. Schema inference reads your data twice.
- Partition your input files. Many small files (64MB-256MB each) parallelize better than few large files.
- Consider compression. Gzip-compressed JSON files read transparently but can’t be split. Use uncompressed or splittable codecs like bzip2 for large files.
When to skip JSON entirely: If you control both the producer and consumer, use Parquet or ORC instead. They’re columnar, compressed, and include schemas. JSON makes sense for external data sources, APIs, and human-readable logs—not for internal data lake storage.
// Convert JSON to Parquet for downstream processing
df.write
.mode("overwrite")
.parquet("data/users.parquet")
JSON is ubiquitous, but it’s also verbose and schema-less. Treat it as an ingestion format, transform it early, and store it in something better.