How to Create a DataFrame in PySpark
If you're working with big data in Python, PySpark DataFrames are non-negotiable. They replaced RDDs as the primary abstraction for structured data processing years ago, and for good reason....
Key Insights
- PySpark DataFrames are the primary abstraction for structured data processing, offering significant performance improvements over RDDs through Catalyst optimizer and Tungsten execution engine
- Always define explicit schemas for production workloads—schema inference is convenient for exploration but adds overhead and can produce unexpected data types
- The
SparkSessionis your single entry point for all DataFrame operations; understanding its configuration options is essential for tuning performance
Why DataFrames Matter in PySpark
If you’re working with big data in Python, PySpark DataFrames are non-negotiable. They replaced RDDs as the primary abstraction for structured data processing years ago, and for good reason. DataFrames give you a familiar tabular interface while PySpark’s Catalyst optimizer figures out the most efficient execution plan behind the scenes.
The performance difference isn’t marginal. DataFrames can be orders of magnitude faster than equivalent RDD operations because Spark understands the structure of your data and can optimize accordingly. RDDs treat everything as opaque objects; DataFrames know about columns, types, and relationships.
This article covers every practical way to create DataFrames in PySpark. Whether you’re building from scratch, reading files, or migrating legacy RDD code, you’ll find working examples here.
Setting Up Your PySpark Environment
Everything in PySpark starts with a SparkSession. This is your entry point for creating DataFrames, reading data, and configuring Spark behavior. In Spark 2.0+, SparkSession unified the old SQLContext and HiveContext into a single interface.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrame Creation Examples") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
A few notes on this configuration:
appNameidentifies your application in the Spark UI—use something descriptivemaster("local[*]")runs Spark locally using all available cores; in production, this comes from your cluster managerconfig()lets you set any Spark property; shuffle partitions default to 200, which is overkill for small datasetsgetOrCreate()returns an existing session if one exists, preventing duplicate sessions
For production deployments, you’ll typically omit master() and let the cluster manager handle it. The configuration options you need depend heavily on your workload and cluster setup.
Creating DataFrames from Python Collections
The most direct way to create a DataFrame is from Python data structures. This is useful for testing, prototyping, and creating small reference datasets.
From a List of Tuples
When you have structured data as tuples, pass them directly to createDataFrame() with column names:
# Data as list of tuples
data = [
("Alice", "Engineering", 75000),
("Bob", "Marketing", 65000),
("Charlie", "Engineering", 80000),
("Diana", "Sales", 70000)
]
# Create DataFrame with column names
df = spark.createDataFrame(data, ["name", "department", "salary"])
df.show()
Output:
+-------+-----------+------+
| name| department|salary|
+-------+-----------+------+
| Alice|Engineering| 75000|
| Bob| Marketing| 65000|
|Charlie|Engineering| 80000|
| Diana| Sales| 70000|
+-------+-----------+------+
This approach infers data types from the Python objects. Spark maps Python str to StringType, int to LongType, and float to DoubleType. The inference works well for simple cases but can surprise you with edge cases.
From a List of Dictionaries
Dictionaries are more self-documenting since keys become column names:
# Data as list of dictionaries
data = [
{"name": "Alice", "department": "Engineering", "salary": 75000},
{"name": "Bob", "department": "Marketing", "salary": 65000},
{"name": "Charlie", "department": "Engineering", "salary": 80000},
{"name": "Diana", "department": "Sales", "salary": 70000}
]
df = spark.createDataFrame(data)
df.show()
The dictionary approach is cleaner when your data comes from JSON APIs or other key-value sources. However, column ordering isn’t guaranteed in older Python versions, so specify column order explicitly if it matters.
From Pandas DataFrames
If you’re already using Pandas, conversion is trivial:
import pandas as pd
pandas_df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"score": [85.5, 92.0, 78.5]
})
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()
Be careful with this approach for large datasets. The entire Pandas DataFrame must fit in driver memory, and the conversion involves serialization overhead. Use it for small datasets or when you genuinely need Pandas functionality first.
Defining Schemas Explicitly
Schema inference is convenient but problematic in production. It requires an extra pass over the data, can produce inconsistent types across runs, and fails silently when data doesn’t match expectations. Explicit schemas solve all of these issues.
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType,
DoubleType, BooleanType, DateType
)
# Define schema explicitly
schema = StructType([
StructField("employee_id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=False),
StructField("department", StringType(), nullable=True),
StructField("salary", DoubleType(), nullable=True),
StructField("is_active", BooleanType(), nullable=False)
])
# Data matching the schema
data = [
(1, "Alice", "Engineering", 75000.00, True),
(2, "Bob", "Marketing", 65000.00, True),
(3, "Charlie", None, 80000.00, False)
]
df = spark.createDataFrame(data, schema)
df.printSchema()
Output:
root
|-- employee_id: integer (nullable = false)
|-- name: string (nullable = false)
|-- department: string (nullable = true)
|-- salary: double (nullable = true)
|-- is_active: boolean (nullable = false)
The nullable parameter matters for optimization. When Spark knows a column can’t contain nulls, it can skip null checks in generated code. Set it accurately based on your data guarantees.
For complex nested structures, schemas can include ArrayType, MapType, and nested StructType:
from pyspark.sql.types import ArrayType, MapType
complex_schema = StructType([
StructField("id", IntegerType(), False),
StructField("tags", ArrayType(StringType()), True),
StructField("metadata", MapType(StringType(), StringType()), True)
])
Creating DataFrames from External Data Sources
Real-world data lives in files and databases. PySpark’s DataFrameReader API handles the common formats with sensible defaults and extensive configuration options.
Reading CSV Files
CSV is ubiquitous despite its flaws. PySpark handles most CSV quirks:
# Basic CSV read with common options
df = spark.read.csv(
"data/employees.csv",
header=True,
inferSchema=True,
sep=",",
nullValue="NA",
mode="DROPMALFORMED"
)
# Production approach: explicit schema, no inference
df = spark.read.csv(
"data/employees.csv",
header=True,
schema=schema, # Your predefined schema
sep=",",
nullValue="NA"
)
Key options to know:
header=Trueuses the first row as column namesinferSchema=Truescans data to determine types (avoid in production)modecontrols malformed record handling:PERMISSIVE(default),DROPMALFORMED, orFAILFASTnullValuespecifies strings that represent null
Reading JSON Files
JSON reading is more forgiving since the format carries type information:
# Single-line JSON (one record per line)
df = spark.read.json("data/events.json")
# Multi-line JSON (array of objects or nested structure)
df = spark.read.json("data/events.json", multiLine=True)
# With explicit schema for consistency
df = spark.read.json("data/events.json", schema=event_schema)
PySpark expects newline-delimited JSON by default (one JSON object per line). This format parallelizes well. For traditional JSON arrays, use multiLine=True, but know that it’s slower because the entire file must be read to parse the structure.
Reading Parquet Files
Parquet is the preferred format for analytical workloads. It’s columnar, compressed, and carries schema information:
# Parquet is straightforward - schema is embedded
df = spark.read.parquet("data/transactions.parquet")
# Read multiple files or partitioned datasets
df = spark.read.parquet("data/transactions/year=2024/")
Parquet files store their schema, so inference isn’t needed. The columnar format means Spark only reads the columns your query needs, dramatically improving performance for wide tables.
Reading from Databases
JDBC connections work for any database with a JDBC driver:
df = spark.read.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="employees",
properties={
"user": "username",
"password": "password",
"driver": "org.postgresql.Driver"
}
)
# For large tables, partition the read
df = spark.read.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="employees",
column="employee_id", # Partition column
lowerBound=1,
upperBound=1000000,
numPartitions=10,
properties={"user": "username", "password": "password"}
)
Always partition large JDBC reads. Without partitioning, a single executor fetches all data sequentially—a guaranteed bottleneck.
Creating DataFrames from RDDs
If you’re maintaining legacy code or need RDD-level control for specific operations, converting RDDs to DataFrames is straightforward.
# Create an RDD
rdd = spark.sparkContext.parallelize([
(1, "Alice", 75000),
(2, "Bob", 65000),
(3, "Charlie", 80000)
])
# Simple conversion with toDF()
df = rdd.toDF(["id", "name", "salary"])
# With explicit schema for type control
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("salary", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
The toDF() method is concise but infers types. For production code, use createDataFrame() with an explicit schema.
Quick Verification and Next Steps
After creating a DataFrame, verify it looks correct before running expensive operations:
# Inspect your DataFrame
df.show(5) # Display first 5 rows
df.printSchema() # Show column names and types
df.count() # Total row count
df.describe().show() # Summary statistics for numeric columns
# Chained inspection for quick overview
(df
.select("name", "salary")
.filter(df.salary > 70000)
.show())
These methods trigger computation, so use them judiciously on large datasets. For quick sanity checks, show(5) and printSchema() are usually sufficient.
From here, you’ll typically move into transformations: select(), filter(), groupBy(), join(), and aggregations. The DataFrame API mirrors SQL closely, so if you know SQL, you already know most of PySpark’s transformation vocabulary.