How to Convert Pandas to PySpark DataFrame
You've built a data processing pipeline in Pandas. It works great on your laptop with sample data. Then production hits, and suddenly you're dealing with 500GB of daily logs. Pandas chokes, your...
Key Insights
- Use
spark.createDataFrame(pandas_df)for basic conversions, but enable Apache Arrow (spark.sql.execution.arrow.pyspark.enabled) for 10-100x faster serialization on larger datasets. - Always define explicit schemas with
StructTypewhen converting production data—schema inference is slow, inconsistent, and will bite you with null handling and datetime types. - Converting large Pandas DataFrames defeats the purpose of Spark; if your data fits in Pandas memory, you probably don’t need Spark. Load directly into Spark from source files instead.
Why Convert Between Pandas and PySpark?
You’ve built a data processing pipeline in Pandas. It works great on your laptop with sample data. Then production hits, and suddenly you’re dealing with 500GB of daily logs. Pandas chokes, your machine runs out of memory, and you need to scale.
This is the most common scenario for converting Pandas to PySpark: migrating existing single-machine code to distributed processing. Other legitimate use cases include integrating with existing Spark infrastructure, leveraging Spark’s SQL optimizations, or preparing data for ML pipelines built on Spark MLlib.
But here’s the uncomfortable truth: if you’re converting a large Pandas DataFrame to PySpark, you’re probably doing something wrong. The conversion itself happens on a single machine and requires the data to fit in memory. The real power comes from reading data directly into Spark from distributed storage. That said, there are valid scenarios—small lookup tables, configuration data, or results from external APIs—where conversion makes sense.
Prerequisites and Setup
You’ll need PySpark and Pandas installed. For Arrow optimization (which you should use), install PyArrow as well:
pip install pyspark pandas pyarrow
Every PySpark operation starts with a SparkSession. Here’s a production-ready setup:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("PandasConversion") \
.config("spark.driver.memory", "4g") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") \
.getOrCreate()
The driver.memory config matters because the conversion happens on the driver node. If your Pandas DataFrame is 2GB, you need at least 4GB of driver memory to handle the conversion overhead. The Arrow configs enable optimized serialization with a fallback for unsupported types.
Basic Conversion with createDataFrame()
The simplest conversion uses spark.createDataFrame():
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BasicConversion").getOrCreate()
# Create a sample Pandas DataFrame
pandas_df = pd.DataFrame({
"user_id": [1, 2, 3, 4, 5],
"name": ["Alice", "Bob", "Charlie", "Diana", "Eve"],
"score": [85.5, 92.0, 78.3, 95.1, 88.7],
"active": [True, False, True, True, False],
"signup_date": pd.to_datetime(["2023-01-15", "2023-02-20", "2023-03-10",
"2023-04-05", "2023-05-12"])
})
# Convert to PySpark DataFrame
spark_df = spark.createDataFrame(pandas_df)
spark_df.printSchema()
spark_df.show()
Output:
root
|-- user_id: long (nullable = true)
|-- name: string (nullable = true)
|-- score: double (nullable = true)
|-- active: boolean (nullable = true)
|-- signup_date: timestamp (nullable = true)
+-------+-------+-----+------+-------------------+
|user_id| name|score|active| signup_date|
+-------+-------+-----+------+-------------------+
| 1| Alice| 85.5| true|2023-01-15 00:00:00|
| 2| Bob| 92.0| false|2023-02-20 00:00:00|
| 3|Charlie| 78.3| true|2023-03-10 00:00:00|
| 4| Diana| 95.1| true|2023-04-05 00:00:00|
| 5| Eve| 88.7| false|2023-05-12 00:00:00|
+-------+-------+-----+------+-------------------+
Notice that Spark inferred the schema automatically. This is convenient but dangerous. Schema inference scans the data, slows down conversion, and can produce inconsistent results if your data has nulls or mixed types.
Using Arrow for Optimized Conversion
Apache Arrow provides a columnar memory format that both Pandas and Spark understand. Enabling Arrow-based conversion can speed up the process by 10-100x, depending on data size and types.
import pandas as pd
import numpy as np
import time
from pyspark.sql import SparkSession
# Create session without Arrow
spark_no_arrow = SparkSession.builder \
.appName("NoArrow") \
.config("spark.sql.execution.arrow.pyspark.enabled", "false") \
.getOrCreate()
# Create a larger DataFrame for meaningful comparison
n_rows = 100000
pandas_df = pd.DataFrame({
"id": range(n_rows),
"value": np.random.randn(n_rows),
"category": np.random.choice(["A", "B", "C", "D"], n_rows),
"timestamp": pd.date_range("2023-01-01", periods=n_rows, freq="s")
})
# Time conversion without Arrow
start = time.time()
spark_df_slow = spark_no_arrow.createDataFrame(pandas_df)
spark_df_slow.count() # Force evaluation
no_arrow_time = time.time() - start
print(f"Without Arrow: {no_arrow_time:.2f} seconds")
spark_no_arrow.stop()
# Create session with Arrow
spark_arrow = SparkSession.builder \
.appName("WithArrow") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
# Time conversion with Arrow
start = time.time()
spark_df_fast = spark_arrow.createDataFrame(pandas_df)
spark_df_fast.count() # Force evaluation
arrow_time = time.time() - start
print(f"With Arrow: {arrow_time:.2f} seconds")
print(f"Speedup: {no_arrow_time / arrow_time:.1f}x")
On a typical machine, you’ll see results like:
Without Arrow: 12.34 seconds
With Arrow: 0.89 seconds
Speedup: 13.9x
The speedup increases with data size. For DataFrames with millions of rows, Arrow can be 50-100x faster.
Handling Schema and Data Type Mappings
Relying on schema inference is a rookie mistake. Define your schema explicitly:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType, StructField, IntegerType, StringType,
DoubleType, BooleanType, TimestampType, ArrayType, MapType
)
import pandas as pd
spark = SparkSession.builder.appName("ExplicitSchema").getOrCreate()
# Define schema explicitly
schema = StructType([
StructField("user_id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=False),
StructField("score", DoubleType(), nullable=True),
StructField("active", BooleanType(), nullable=False),
StructField("signup_date", TimestampType(), nullable=True)
])
pandas_df = pd.DataFrame({
"user_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"score": [85.5, None, 78.3], # Note the None
"active": [True, False, True],
"signup_date": pd.to_datetime(["2023-01-15", "2023-02-20", None])
})
# Convert with explicit schema
spark_df = spark.createDataFrame(pandas_df, schema=schema)
spark_df.printSchema()
Here’s a reference table for common type mappings:
| Pandas Type | PySpark Type | Notes |
|---|---|---|
| int64 | LongType() | Use IntegerType() for smaller ranges |
| float64 | DoubleType() | FloatType() for single precision |
| object (str) | StringType() | |
| bool | BooleanType() | |
| datetime64 | TimestampType() | Watch for timezone issues |
| category | StringType() | Spark doesn’t have native categorical |
| timedelta | StringType() or LongType() | Convert to seconds/string first |
Common Pitfalls and Troubleshooting
Null Handling Differences
Pandas uses NaN for missing floats and None for objects. Spark uses null universally. This causes subtle bugs:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NullHandling").getOrCreate()
# Pandas DataFrame with various null representations
pandas_df = pd.DataFrame({
"int_col": [1, 2, None], # Will become float64 in Pandas!
"float_col": [1.0, np.nan, 3.0],
"str_col": ["a", None, "c"],
"bool_col": [True, None, False] # Will become object dtype
})
print("Pandas dtypes:")
print(pandas_df.dtypes)
# Fix integer column before conversion
pandas_df["int_col"] = pandas_df["int_col"].astype("Int64") # Nullable integer
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()
Timestamp Timezone Issues
Spark timestamps are timezone-aware by default. Pandas timestamps are often naive. This causes silent data corruption:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("TimezoneHandling") \
.config("spark.sql.session.timeZone", "UTC") \
.getOrCreate()
# Create timezone-naive timestamps
pandas_df = pd.DataFrame({
"event_time": pd.to_datetime(["2023-06-15 14:30:00", "2023-06-15 18:45:00"])
})
# Localize to UTC before conversion
pandas_df["event_time"] = pandas_df["event_time"].dt.tz_localize("UTC")
spark_df = spark.createDataFrame(pandas_df)
spark_df.show(truncate=False)
Always set spark.sql.session.timeZone explicitly and ensure your Pandas timestamps are timezone-aware before conversion.
Memory Issues
If you’re hitting OutOfMemoryError during conversion, your DataFrame is too large:
# DON'T do this with large data
huge_pandas_df = pd.read_csv("50gb_file.csv") # Crashes
spark_df = spark.createDataFrame(huge_pandas_df)
# DO this instead - read directly into Spark
spark_df = spark.read.csv("50gb_file.csv", header=True, inferSchema=True)
Best Practices and When to Convert
Convert when:
- You have small lookup tables or configuration data (< 100MB)
- You’re integrating results from external APIs or services
- You’re prototyping and need quick iteration
- You have existing Pandas code that needs to feed into a Spark pipeline
Don’t convert when:
- Your data is larger than available driver memory
- Your source data is already in a distributed format (Parquet, Delta, HDFS)
- You’re processing data in a production pipeline (read directly into Spark)
For large data that must go through Pandas, use chunking:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ChunkedConversion").getOrCreate()
# Process in chunks
chunk_size = 100000
chunks = []
for chunk in pd.read_csv("large_file.csv", chunksize=chunk_size):
# Process each chunk
spark_chunk = spark.createDataFrame(chunk)
chunks.append(spark_chunk)
# Union all chunks
spark_df = chunks[0]
for chunk in chunks[1:]:
spark_df = spark_df.union(chunk)
The bottom line: converting Pandas to PySpark is a tool for specific situations, not a general-purpose data scaling strategy. If you find yourself converting large DataFrames regularly, redesign your pipeline to read directly into Spark from the source. Your future self will thank you.