PySpark - Convert DataFrame to JSON

PySpark DataFrames are the backbone of distributed data processing, but eventually you need to export that data for consumption by other systems. JSON remains one of the most universal data...

Key Insights

  • PySpark offers multiple JSON conversion methods: toJSON() for RDD of JSON strings, write.json() for file output, and toPandas().to_json() for single-document formatting
  • Use coalesce(1) to generate a single JSON file, but avoid this pattern for large datasets as it creates performance bottlenecks by forcing all data through one executor
  • The write.json() method always produces one JSON object per line (newline-delimited JSON), not a JSON array—use Pandas conversion if you need traditional array format

Introduction

PySpark DataFrames are the backbone of distributed data processing, but eventually you need to export that data for consumption by other systems. JSON remains one of the most universal data interchange formats, making DataFrame-to-JSON conversion a critical operation for API integrations, data lake exports, and cross-platform data sharing.

Unlike simple serialization libraries, PySpark provides multiple approaches to JSON conversion, each optimized for different scenarios. Understanding these methods prevents common pitfalls like accidentally creating thousands of small files or running out of memory when trying to collect large datasets to a single JSON document.

Basic DataFrame to JSON Conversion

The toJSON() method converts each DataFrame row into a JSON string, returning an RDD of strings rather than writing directly to storage. This approach is useful when you need to process JSON strings programmatically or send them to message queues.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataFrame to JSON") \
    .getOrCreate()

# Create sample employee DataFrame
data = [
    ("Alice Johnson", 29, "Engineering"),
    ("Bob Smith", 35, "Marketing"),
    ("Carol White", 28, "Engineering"),
    ("David Brown", 42, "Sales")
]

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("department", StringType(), True)
])

df = spark.createDataFrame(data, schema)

# Convert to JSON strings
json_rdd = df.toJSON()

# Collect and display (only for small datasets)
for json_str in json_rdd.take(2):
    print(json_str)

Output:

{"name":"Alice Johnson","age":29,"department":"Engineering"}
{"name":"Bob Smith","age":35,"department":"Marketing"}

Each row becomes a compact, single-line JSON object. The toJSON() method preserves the DataFrame schema, automatically handling nested structures and arrays. However, this returns an RDD, not a DataFrame, so you lose the DataFrame API benefits. For most production scenarios, writing directly to files is more practical.

Writing DataFrame to JSON Files

The write.json() method provides the most efficient way to export DataFrames to JSON format, leveraging Spark’s distributed writing capabilities. This approach handles large datasets by writing partitioned files in parallel.

# Write to JSON file
output_path = "/tmp/employees_json"

# Basic write with overwrite mode
df.write.mode("overwrite").json(output_path)

# Write with compression
df.write.mode("overwrite") \
    .option("compression", "gzip") \
    .json(output_path + "_compressed")

# Append to existing data
new_data = [("Eve Davis", 31, "Engineering")]
new_df = spark.createDataFrame(new_data, schema)
new_df.write.mode("append").json(output_path)

By default, Spark creates a directory containing multiple part files, not a single JSON file. The directory structure looks like:

employees_json/
├── _SUCCESS
├── part-00000-<uuid>.json
├── part-00001-<uuid>.json
└── part-00002-<uuid>.json

Each part file contains newline-delimited JSON (NDJSON), where each line is a complete JSON object. This format is ideal for streaming processing and line-by-line parsing but differs from traditional JSON arrays.

The write modes control how Spark handles existing data:

  • overwrite: Deletes existing directory and writes fresh data
  • append: Adds new files to existing directory
  • ignore: Skips write if directory exists
  • error (default): Throws exception if directory exists

Controlling JSON Output Format

The default NDJSON format works well for big data pipelines, but sometimes you need a single JSON array or pretty-printed output. For these cases, convert through Pandas, accepting the limitation that data must fit in driver memory.

# Convert to JSON array format using Pandas
json_array = df.toPandas().to_json(orient='records', indent=2)
print(json_array)

Output:

[
  {
    "name": "Alice Johnson",
    "age": 29,
    "department": "Engineering"
  },
  {
    "name": "Bob Smith",
    "age": 35,
    "department": "Marketing"
  }
]

For nested structures with arrays and structs, PySpark handles the complexity automatically:

from pyspark.sql.functions import struct, collect_list

# Create nested DataFrame
nested_df = df.groupBy("department") \
    .agg(
        collect_list(
            struct("name", "age")
        ).alias("employees")
    )

# Write nested structure to JSON
nested_df.write.mode("overwrite").json("/tmp/departments_json")

# View the structure
nested_df.toJSON().take(1)

This produces JSON with nested arrays:

{"department":"Engineering","employees":[{"name":"Alice Johnson","age":29},{"name":"Carol White","age":28}]}

The orient parameter in Pandas to_json() provides additional formatting options:

  • records: List of dictionaries (most common)
  • split: Dictionary with index, columns, and data arrays
  • index: Dictionary keyed by row index
  • columns: Dictionary keyed by column name

Advanced Options and Performance Considerations

For production workloads, controlling file output and partitioning strategy significantly impacts performance and downstream processing efficiency.

# Generate single JSON file (use cautiously)
df.coalesce(1) \
    .write.mode("overwrite") \
    .json("/tmp/single_file_json")

# Partition by department for organized output
df.write.mode("overwrite") \
    .partitionBy("department") \
    .json("/tmp/partitioned_json")

# Control number of output files
df.repartition(4) \
    .write.mode("overwrite") \
    .json("/tmp/four_files_json")

# Optimize for large datasets with compression
df.repartition(100) \
    .write.mode("overwrite") \
    .option("compression", "gzip") \
    .json("/tmp/optimized_json")

The coalesce(1) pattern forces all data through a single partition, creating one output file. This seems convenient but creates a severe bottleneck—all data must flow through one executor, eliminating parallelism benefits. Only use this for small datasets (under 1GB).

Partitioning by column values creates subdirectories:

partitioned_json/
├── department=Engineering/
│   └── part-00000.json
├── department=Marketing/
│   └── part-00001.json
└── department=Sales/
    └── part-00002.json

This organization enables partition pruning when reading data back, dramatically improving query performance for filtered reads.

For compression, gzip offers the best compression ratio but slower read/write speeds. Use snappy for better performance with moderate compression, especially when downstream systems will read the data frequently.

Common Use Cases and Best Practices

Real-world JSON export scenarios require handling schema evolution, null values, and integration with external systems. Here’s a complete workflow demonstrating best practices:

from pyspark.sql.functions import col, when, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Read source data (e.g., from Parquet)
source_df = spark.read.parquet("/data/raw/employees")

# Transform and clean data
transformed_df = source_df \
    .select(
        col("employee_id"),
        col("name"),
        col("department"),
        when(col("salary").isNull(), 0).otherwise(col("salary")).alias("salary"),
        col("hire_date").cast("string")
    ) \
    .filter(col("is_active") == True)

# Add metadata
final_df = transformed_df \
    .withColumn("export_timestamp", lit(current_timestamp()))

# Write with error handling
try:
    final_df.write \
        .mode("overwrite") \
        .option("compression", "gzip") \
        .partitionBy("department") \
        .json("/data/exports/employees_json")
    
    print(f"Successfully exported {final_df.count()} records")
    
except Exception as e:
    print(f"Export failed: {str(e)}")
    # Implement retry logic or alerting
    raise

# For API integration: collect small result sets
api_payload = final_df.limit(100) \
    .toPandas() \
    .to_json(orient='records')

# For streaming to message queue: use toJSON()
for json_record in final_df.toJSON().toLocalIterator():
    # send_to_kafka(json_record)
    pass

Best practices for JSON export:

  1. Handle nulls explicitly: JSON null handling varies across systems. Use fillna() or coalesce() to provide sensible defaults.

  2. Cast timestamps to strings: Spark’s timestamp format may not match downstream system expectations. Explicitly format dates as ISO 8601 strings.

  3. Validate schema before export: Use df.printSchema() to verify structure matches consumer expectations, especially after transformations.

  4. Partition intelligently: Partition by columns that downstream queries will filter on, typically date or category fields.

  5. Monitor file sizes: Target 128MB-1GB per file. Too many small files create metadata overhead; too few large files reduce parallelism.

  6. Use compression for storage: Always compress when writing to data lakes. The storage savings outweigh the CPU cost for most workloads.

  7. Test with small datasets: Before running full exports, test with .limit(1000) to verify format and catch errors quickly.

Converting PySpark DataFrames to JSON is straightforward, but choosing the right method and configuration determines whether your pipeline scales efficiently or collapses under production load. Match your approach to your use case: toJSON() for streaming, write.json() for batch exports, and Pandas conversion only when you need specific formatting for small datasets.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.