Spark SQL - JSON Functions
• Spark SQL provides over 20 specialized JSON functions for parsing, extracting, and manipulating JSON data directly within DataFrames without requiring external libraries or UDFs
Key Insights
• Spark SQL provides over 20 specialized JSON functions for parsing, extracting, and manipulating JSON data directly within DataFrames without requiring external libraries or UDFs
• The get_json_object and json_tuple functions offer different performance characteristics—use json_tuple when extracting multiple fields from the same JSON string to avoid redundant parsing
• Complex nested JSON structures can be flattened and transformed using combinations of from_json, explode, and schema inference functions, enabling efficient ETL pipelines on semi-structured data
Reading and Extracting JSON Fields
Spark SQL’s get_json_object function extracts values from JSON strings using JSONPath expressions. This is the most straightforward approach for simple extractions.
from pyspark.sql import SparkSession
from pyspark.sql.functions import get_json_object, col
spark = SparkSession.builder.appName("json_examples").getOrCreate()
# Sample data with JSON strings
data = [
(1, '{"name": "John", "age": 30, "address": {"city": "NYC", "zip": "10001"}}'),
(2, '{"name": "Jane", "age": 25, "address": {"city": "LA", "zip": "90001"}}'),
(3, '{"name": "Bob", "age": 35}')
]
df = spark.createDataFrame(data, ["id", "json_data"])
# Extract simple fields
result = df.select(
col("id"),
get_json_object(col("json_data"), "$.name").alias("name"),
get_json_object(col("json_data"), "$.age").alias("age"),
get_json_object(col("json_data"), "$.address.city").alias("city")
)
result.show()
The JSONPath syntax supports nested navigation with dot notation. For arrays, use bracket notation like $.items[0] or $.items[*] for all elements.
When extracting multiple fields from the same JSON string, json_tuple provides better performance by parsing the JSON only once:
from pyspark.sql.functions import json_tuple
# More efficient for multiple extractions
result = df.select(
col("id"),
json_tuple(col("json_data"), "name", "age", "address")
.alias("name", "age", "address_json")
)
result.show(truncate=False)
Parsing JSON with Explicit Schemas
The from_json function converts JSON strings into structured types using a defined schema. This approach provides type safety and enables direct column access.
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("address", StructType([
StructField("city", StringType(), True),
StructField("zip", StringType(), True)
]), True)
])
# Parse JSON into struct
parsed_df = df.select(
col("id"),
from_json(col("json_data"), schema).alias("parsed")
)
# Access nested fields directly
final_df = parsed_df.select(
col("id"),
col("parsed.name"),
col("parsed.age"),
col("parsed.address.city"),
col("parsed.address.zip")
)
final_df.show()
For schema inference, use schema_of_json to automatically detect the structure:
from pyspark.sql.functions import schema_of_json, lit
# Infer schema from a sample JSON string
sample_json = '{"name": "John", "age": 30, "address": {"city": "NYC", "zip": "10001"}}'
inferred_schema = spark.range(1).select(schema_of_json(lit(sample_json))).collect()[0][0]
# Use inferred schema
parsed_df = df.select(
col("id"),
from_json(col("json_data"), inferred_schema).alias("parsed")
)
Working with JSON Arrays
JSON arrays require special handling using explode or explode_outer to create rows from array elements.
from pyspark.sql.functions import explode, explode_outer
# Data with JSON arrays
array_data = [
(1, '{"items": [{"product": "laptop", "price": 1200}, {"product": "mouse", "price": 25}]}'),
(2, '{"items": [{"product": "keyboard", "price": 80}]}'),
(3, '{"items": []}')
]
array_df = spark.createDataFrame(array_data, ["order_id", "json_data"])
# Schema for array structure
array_schema = StructType([
StructField("items", ArrayType(StructType([
StructField("product", StringType(), True),
StructField("price", IntegerType(), True)
])), True)
])
# Parse and explode
parsed = array_df.select(
col("order_id"),
from_json(col("json_data"), array_schema).alias("data")
)
# explode creates one row per array element
exploded = parsed.select(
col("order_id"),
explode(col("data.items")).alias("item")
)
result = exploded.select(
col("order_id"),
col("item.product"),
col("item.price")
)
result.show()
Use explode_outer to preserve rows with empty arrays:
# explode_outer keeps rows even when array is empty
exploded_outer = parsed.select(
col("order_id"),
explode_outer(col("data.items")).alias("item")
)
Generating JSON from Structured Data
The to_json function converts structs or maps back into JSON strings, useful for API integrations or data exports.
from pyspark.sql.functions import to_json, struct, map_from_arrays, array
# Create structured data
structured_data = [
(1, "Alice", 28, "Engineering"),
(2, "Bob", 35, "Sales"),
(3, "Carol", 42, "Marketing")
]
structured_df = spark.createDataFrame(
structured_data,
["id", "name", "age", "department"]
)
# Convert to JSON
json_output = structured_df.select(
col("id"),
to_json(struct(
col("name"),
col("age"),
col("department")
)).alias("employee_json")
)
json_output.show(truncate=False)
For nested structures:
# Create nested JSON
nested_json = structured_df.select(
to_json(struct(
col("id"),
struct(
col("name"),
col("age")
).alias("personal_info"),
col("department")
)).alias("full_record")
)
nested_json.show(truncate=False)
Advanced JSON Manipulation
Combine multiple JSON functions for complex transformations:
from pyspark.sql.functions import when, coalesce
# Complex JSON with optional fields
complex_data = [
(1, '{"user": {"name": "Alice", "email": "alice@example.com"}, "metadata": {"source": "web"}}'),
(2, '{"user": {"name": "Bob"}, "metadata": {}}'),
(3, '{"user": {"name": "Carol", "email": "carol@example.com"}}')
]
complex_df = spark.createDataFrame(complex_data, ["id", "json_data"])
# Extract with fallbacks
result = complex_df.select(
col("id"),
get_json_object(col("json_data"), "$.user.name").alias("name"),
coalesce(
get_json_object(col("json_data"), "$.user.email"),
lit("no-email@example.com")
).alias("email"),
coalesce(
get_json_object(col("json_data"), "$.metadata.source"),
lit("unknown")
).alias("source")
)
result.show(truncate=False)
For filtering based on JSON content:
# Filter rows based on JSON field values
filtered = df.filter(
get_json_object(col("json_data"), "$.age").cast("int") > 28
)
filtered.show()
Performance Considerations
When working with JSON in Spark SQL, consider these optimization strategies:
Cache parsed results: If you’re accessing the same JSON multiple times, parse once and cache:
parsed_cached = df.select(
col("id"),
from_json(col("json_data"), schema).alias("parsed")
).cache()
# Multiple operations on cached parsed data
result1 = parsed_cached.select("id", "parsed.name")
result2 = parsed_cached.filter(col("parsed.age") > 30)
Use json_tuple over multiple get_json_object calls:
# Inefficient - parses JSON three times
inefficient = df.select(
get_json_object(col("json_data"), "$.name"),
get_json_object(col("json_data"), "$.age"),
get_json_object(col("json_data"), "$.address.city")
)
# Efficient - parses JSON once
efficient = df.select(
json_tuple(col("json_data"), "name", "age", "address")
)
Partition data before JSON operations: For large datasets, partition by a non-JSON column first to reduce the amount of data processed per partition.
Spark SQL’s JSON functions provide comprehensive capabilities for handling semi-structured data within distributed processing pipelines. Choose the appropriate function based on your use case: get_json_object for quick extractions, json_tuple for multiple fields, and from_json for complex schema-based transformations.