PySpark - Convert DataFrame to Dictionary

Converting PySpark DataFrames to Python dictionaries is a common requirement when you need to export data for API responses, prepare test fixtures, or integrate with non-Spark libraries. However,...

Key Insights

  • Converting PySpark DataFrames to dictionaries requires collecting distributed data to the driver, which can cause memory issues with datasets larger than a few thousand rows
  • The toPandas().to_dict() approach offers the most flexibility with multiple orientation options, while collect() with asDict() provides better control for custom transformations
  • For simple key-value mappings from two-column DataFrames, rdd.collectAsMap() is the most efficient method, but it only works with exactly two columns

Introduction

Converting PySpark DataFrames to Python dictionaries is a common requirement when you need to export data for API responses, prepare test fixtures, or integrate with non-Spark libraries. However, this operation fundamentally contradicts Spark’s distributed computing model—you’re pulling distributed data into the driver’s memory as a single collection.

This means you should only convert DataFrames to dictionaries when working with small to medium datasets (typically under 10,000 rows). For larger datasets, consider alternative approaches like writing to file formats or processing data in batches.

Let’s create a sample DataFrame we’ll use throughout this article:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

spark = SparkSession.builder.appName("DataFrameToDict").getOrCreate()

data = [
    (1, "Alice", 29, "Engineering"),
    (2, "Bob", 35, "Marketing"),
    (3, "Charlie", 42, "Sales"),
    (4, "Diana", 31, "Engineering")
]

schema = ["id", "name", "age", "department"]
df = spark.createDataFrame(data, schema)
df.show()

Using toPandas() and to_dict()

The most flexible approach combines PySpark’s toPandas() method with Pandas’ to_dict() function. This two-step conversion gives you access to Pandas’ powerful dictionary orientation options.

The to_dict() method accepts an orient parameter that controls the output structure:

# Convert to Pandas first
pandas_df = df.toPandas()

# Orient='dict' - Column names as keys, nested dicts with row indices
dict_format = pandas_df.to_dict(orient='dict')
print("orient='dict':")
print(dict_format)
# {'id': {0: 1, 1: 2, 2: 3, 3: 4}, 
#  'name': {0: 'Alice', 1: 'Bob', 2: 'Charlie', 3: 'Diana'}, ...}

# Orient='list' - Column names as keys, lists of values
list_format = pandas_df.to_dict(orient='list')
print("\norient='list':")
print(list_format)
# {'id': [1, 2, 3, 4], 
#  'name': ['Alice', 'Bob', 'Charlie', 'Diana'], ...}

# Orient='records' - List of dictionaries, one per row
records_format = pandas_df.to_dict(orient='records')
print("\norient='records':")
print(records_format)
# [{'id': 1, 'name': 'Alice', 'age': 29, 'department': 'Engineering'},
#  {'id': 2, 'name': 'Bob', 'age': 35, 'department': 'Marketing'}, ...]

# Orient='index' - Outer keys are row indices
index_format = pandas_df.to_dict(orient='index')
print("\norient='index':")
print(index_format)
# {0: {'id': 1, 'name': 'Alice', 'age': 29, 'department': 'Engineering'},
#  1: {'id': 2, 'name': 'Bob', 'age': 35, 'department': 'Marketing'}, ...}

The records orientation is most commonly used for API responses and JSON serialization, as it produces a list of flat dictionaries. The list orientation is efficient for columnar operations, while dict and index are useful when you need to preserve row indexing.

The main drawback of this approach is the overhead of converting to Pandas first. For DataFrames with complex types or very wide schemas, this conversion can be slow and memory-intensive.

Using collect() with List Comprehension

For more direct control, use PySpark’s collect() method combined with the Row object’s asDict() method. This approach avoids the Pandas dependency and gives you fine-grained control over the transformation.

# Get a single row as a dictionary
first_row_dict = df.first().asDict()
print("First row as dict:")
print(first_row_dict)
# {'id': 1, 'name': 'Alice', 'age': 29, 'department': 'Engineering'}

# Convert all rows to a list of dictionaries
all_rows_dict = [row.asDict() for row in df.collect()]
print("\nAll rows as list of dicts:")
print(all_rows_dict)

# Custom transformation with list comprehension
custom_dict = [
    {"employee_id": row.id, "full_name": row.name.upper()}
    for row in df.collect()
]
print("\nCustom transformation:")
print(custom_dict)
# [{'employee_id': 1, 'full_name': 'ALICE'}, ...]

# Using traditional dictionary access
dict_with_column_access = [
    {
        "id": row["id"],
        "name": row["name"],
        "age": row["age"]
    }
    for row in df.collect()
]

This method is particularly useful when you need to apply transformations during the conversion or when you want to select specific columns. The asDict() method preserves the original column names and handles nested structures gracefully.

One important consideration: collect() brings all data to the driver node. If you’re working with a filtered subset or aggregated data, ensure the filter happens before calling collect():

# Good: Filter before collecting
small_subset = df.filter(df.department == "Engineering").collect()
result = [row.asDict() for row in small_subset]

# Bad: Collecting everything then filtering in Python
all_data = df.collect()  # Potentially huge
result = [row.asDict() for row in all_data if row.department == "Engineering"]

Using rdd.collectAsMap() for Key-Value Pairs

When you have a two-column DataFrame and need a simple key-value dictionary, collectAsMap() is the most efficient option. This method works directly on the underlying RDD and produces a clean dictionary without nested structures.

# Create a simple key-value DataFrame
kv_data = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "Diana")]
kv_df = spark.createDataFrame(kv_data, ["id", "name"])

# Convert to dictionary using collectAsMap()
id_to_name = kv_df.rdd.collectAsMap()
print(id_to_name)
# {1: 'Alice', 2: 'Bob', 3: 'Charlie', 4: 'Diana'}

# Works with any two columns - select them first
dept_to_count = df.groupBy("department").count() \
    .select("department", "count") \
    .rdd.collectAsMap()
print(dept_to_count)
# {'Engineering': 2, 'Marketing': 1, 'Sales': 1}

This method only works with exactly two columns. If your DataFrame has more columns, select the two you need before calling collectAsMap(). The first column becomes the key, and the second becomes the value.

Performance Considerations and Best Practices

Different conversion methods have different performance characteristics. Here’s a practical comparison:

import time
from pyspark.sql.functions import col

# Create a larger dataset for testing
large_data = [(i, f"name_{i}", i % 100, f"dept_{i % 10}") 
              for i in range(10000)]
large_df = spark.createDataFrame(large_data, schema)

def time_conversion(func, name):
    start = time.time()
    result = func()
    end = time.time()
    print(f"{name}: {end - start:.4f} seconds")
    return result

# Method 1: toPandas + to_dict
result1 = time_conversion(
    lambda: large_df.toPandas().to_dict(orient='records'),
    "toPandas + to_dict"
)

# Method 2: collect + asDict
result2 = time_conversion(
    lambda: [row.asDict() for row in large_df.collect()],
    "collect + asDict"
)

# Method 3: collectAsMap (for two columns only)
result3 = time_conversion(
    lambda: large_df.select("id", "name").rdd.collectAsMap(),
    "collectAsMap"
)

Key performance guidelines:

  1. Memory limits: Never convert DataFrames larger than your driver’s available memory. A good rule of thumb is to keep conversions under 1GB of data.

  2. Use limit() for testing: When developing, always test with a limited dataset:

sample_dict = df.limit(100).toPandas().to_dict(orient='records')
  1. Prefer collectAsMap() for lookups: If you’re building a lookup dictionary, collectAsMap() is faster and more memory-efficient than other methods.

  2. Cache before multiple conversions: If you’re converting the same DataFrame multiple times, cache it first:

df.cache()
dict1 = df.select("id", "name").rdd.collectAsMap()
dict2 = df.toPandas().to_dict(orient='records')

Handling Complex Data Types

PySpark DataFrames often contain complex types like arrays, structs, and maps. These require special handling during conversion:

from pyspark.sql.functions import array, struct

# Create DataFrame with complex types
complex_data = [
    (1, "Alice", ["Python", "Scala"], {"city": "NYC", "country": "USA"}),
    (2, "Bob", ["Java", "SQL"], {"city": "SF", "country": "USA"})
]

complex_schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("skills", ArrayType(StringType())),
    StructField("location", StructType([
        StructField("city", StringType()),
        StructField("country", StringType())
    ]))
])

complex_df = spark.createDataFrame(complex_data, complex_schema)

# Convert with toPandas - handles complex types automatically
pandas_result = complex_df.toPandas().to_dict(orient='records')
print(pandas_result)
# [{'id': 1, 'name': 'Alice', 'skills': ['Python', 'Scala'], 
#   'location': Row(city='NYC', country='USA')}, ...]

# Convert with asDict - also handles complex types
collect_result = [row.asDict(recursive=True) for row in complex_df.collect()]
print(collect_result)
# [{'id': 1, 'name': 'Alice', 'skills': ['Python', 'Scala'],
#   'location': {'city': 'NYC', 'country': 'USA'}}, ...]

The recursive=True parameter in asDict() is crucial for nested structures. It ensures that nested Row objects are also converted to dictionaries rather than remaining as Row objects.

Conclusion

Choosing the right conversion method depends on your specific use case:

  • Use toPandas().to_dict(orient='records') when you need a list of row dictionaries for API responses or JSON serialization, and you’re already using Pandas in your workflow.

  • Use collect() with asDict() when you need custom transformations during conversion or want to avoid the Pandas dependency.

  • Use rdd.collectAsMap() for simple two-column key-value mappings, especially when building lookup dictionaries.

Always remember that converting DataFrames to dictionaries defeats the purpose of distributed computing. Keep your conversions limited to small datasets, use limit() during development, and consider whether you really need the data in dictionary form. Often, writing to a file format like JSON or Parquet is more appropriate for large datasets.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.