Spark SQL - JSON Functions

• Spark SQL provides over 20 specialized JSON functions for parsing, extracting, and manipulating JSON data directly within DataFrames without requiring external libraries or UDFs

Key Insights

• Spark SQL provides over 20 specialized JSON functions for parsing, extracting, and manipulating JSON data directly within DataFrames without requiring external libraries or UDFs • The get_json_object and json_tuple functions offer different performance characteristics—use json_tuple when extracting multiple fields from the same JSON string to avoid redundant parsing • Complex nested JSON structures can be flattened and transformed using combinations of from_json, explode, and schema inference functions, enabling efficient ETL pipelines on semi-structured data

Reading and Extracting JSON Fields

Spark SQL’s get_json_object function extracts values from JSON strings using JSONPath expressions. This is the most straightforward approach for simple extractions.

from pyspark.sql import SparkSession
from pyspark.sql.functions import get_json_object, col

spark = SparkSession.builder.appName("json_examples").getOrCreate()

# Sample data with JSON strings
data = [
    (1, '{"name": "John", "age": 30, "address": {"city": "NYC", "zip": "10001"}}'),
    (2, '{"name": "Jane", "age": 25, "address": {"city": "LA", "zip": "90001"}}'),
    (3, '{"name": "Bob", "age": 35}')
]

df = spark.createDataFrame(data, ["id", "json_data"])

# Extract simple fields
result = df.select(
    col("id"),
    get_json_object(col("json_data"), "$.name").alias("name"),
    get_json_object(col("json_data"), "$.age").alias("age"),
    get_json_object(col("json_data"), "$.address.city").alias("city")
)

result.show()

The JSONPath syntax supports nested navigation with dot notation. For arrays, use bracket notation like $.items[0] or $.items[*] for all elements.

When extracting multiple fields from the same JSON string, json_tuple provides better performance by parsing the JSON only once:

from pyspark.sql.functions import json_tuple

# More efficient for multiple extractions
result = df.select(
    col("id"),
    json_tuple(col("json_data"), "name", "age", "address")
        .alias("name", "age", "address_json")
)

result.show(truncate=False)

Parsing JSON with Explicit Schemas

The from_json function converts JSON strings into structured types using a defined schema. This approach provides type safety and enables direct column access.

from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("zip", StringType(), True)
    ]), True)
])

# Parse JSON into struct
parsed_df = df.select(
    col("id"),
    from_json(col("json_data"), schema).alias("parsed")
)

# Access nested fields directly
final_df = parsed_df.select(
    col("id"),
    col("parsed.name"),
    col("parsed.age"),
    col("parsed.address.city"),
    col("parsed.address.zip")
)

final_df.show()

For schema inference, use schema_of_json to automatically detect the structure:

from pyspark.sql.functions import schema_of_json, lit

# Infer schema from a sample JSON string
sample_json = '{"name": "John", "age": 30, "address": {"city": "NYC", "zip": "10001"}}'
inferred_schema = spark.range(1).select(schema_of_json(lit(sample_json))).collect()[0][0]

# Use inferred schema
parsed_df = df.select(
    col("id"),
    from_json(col("json_data"), inferred_schema).alias("parsed")
)

Working with JSON Arrays

JSON arrays require special handling using explode or explode_outer to create rows from array elements.

from pyspark.sql.functions import explode, explode_outer

# Data with JSON arrays
array_data = [
    (1, '{"items": [{"product": "laptop", "price": 1200}, {"product": "mouse", "price": 25}]}'),
    (2, '{"items": [{"product": "keyboard", "price": 80}]}'),
    (3, '{"items": []}')
]

array_df = spark.createDataFrame(array_data, ["order_id", "json_data"])

# Schema for array structure
array_schema = StructType([
    StructField("items", ArrayType(StructType([
        StructField("product", StringType(), True),
        StructField("price", IntegerType(), True)
    ])), True)
])

# Parse and explode
parsed = array_df.select(
    col("order_id"),
    from_json(col("json_data"), array_schema).alias("data")
)

# explode creates one row per array element
exploded = parsed.select(
    col("order_id"),
    explode(col("data.items")).alias("item")
)

result = exploded.select(
    col("order_id"),
    col("item.product"),
    col("item.price")
)

result.show()

Use explode_outer to preserve rows with empty arrays:

# explode_outer keeps rows even when array is empty
exploded_outer = parsed.select(
    col("order_id"),
    explode_outer(col("data.items")).alias("item")
)

Generating JSON from Structured Data

The to_json function converts structs or maps back into JSON strings, useful for API integrations or data exports.

from pyspark.sql.functions import to_json, struct, map_from_arrays, array

# Create structured data
structured_data = [
    (1, "Alice", 28, "Engineering"),
    (2, "Bob", 35, "Sales"),
    (3, "Carol", 42, "Marketing")
]

structured_df = spark.createDataFrame(
    structured_data, 
    ["id", "name", "age", "department"]
)

# Convert to JSON
json_output = structured_df.select(
    col("id"),
    to_json(struct(
        col("name"),
        col("age"),
        col("department")
    )).alias("employee_json")
)

json_output.show(truncate=False)

For nested structures:

# Create nested JSON
nested_json = structured_df.select(
    to_json(struct(
        col("id"),
        struct(
            col("name"),
            col("age")
        ).alias("personal_info"),
        col("department")
    )).alias("full_record")
)

nested_json.show(truncate=False)

Advanced JSON Manipulation

Combine multiple JSON functions for complex transformations:

from pyspark.sql.functions import when, coalesce

# Complex JSON with optional fields
complex_data = [
    (1, '{"user": {"name": "Alice", "email": "alice@example.com"}, "metadata": {"source": "web"}}'),
    (2, '{"user": {"name": "Bob"}, "metadata": {}}'),
    (3, '{"user": {"name": "Carol", "email": "carol@example.com"}}')
]

complex_df = spark.createDataFrame(complex_data, ["id", "json_data"])

# Extract with fallbacks
result = complex_df.select(
    col("id"),
    get_json_object(col("json_data"), "$.user.name").alias("name"),
    coalesce(
        get_json_object(col("json_data"), "$.user.email"),
        lit("no-email@example.com")
    ).alias("email"),
    coalesce(
        get_json_object(col("json_data"), "$.metadata.source"),
        lit("unknown")
    ).alias("source")
)

result.show(truncate=False)

For filtering based on JSON content:

# Filter rows based on JSON field values
filtered = df.filter(
    get_json_object(col("json_data"), "$.age").cast("int") > 28
)

filtered.show()

Performance Considerations

When working with JSON in Spark SQL, consider these optimization strategies:

Cache parsed results: If you’re accessing the same JSON multiple times, parse once and cache:

parsed_cached = df.select(
    col("id"),
    from_json(col("json_data"), schema).alias("parsed")
).cache()

# Multiple operations on cached parsed data
result1 = parsed_cached.select("id", "parsed.name")
result2 = parsed_cached.filter(col("parsed.age") > 30)

Use json_tuple over multiple get_json_object calls:

# Inefficient - parses JSON three times
inefficient = df.select(
    get_json_object(col("json_data"), "$.name"),
    get_json_object(col("json_data"), "$.age"),
    get_json_object(col("json_data"), "$.address.city")
)

# Efficient - parses JSON once
efficient = df.select(
    json_tuple(col("json_data"), "name", "age", "address")
)

Partition data before JSON operations: For large datasets, partition by a non-JSON column first to reduce the amount of data processed per partition.

Spark SQL’s JSON functions provide comprehensive capabilities for handling semi-structured data within distributed processing pipelines. Choose the appropriate function based on your use case: get_json_object for quick extractions, json_tuple for multiple fields, and from_json for complex schema-based transformations.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.