PySpark: RDD vs DataFrame Guide
PySpark gives you two primary ways to work with distributed data: RDDs and DataFrames. This isn't redundant design—it reflects a fundamental trade-off between control and optimization.
Key Insights
- DataFrames should be your default choice—Catalyst optimizer and Tungsten execution engine deliver 10-100x performance gains over equivalent RDD operations for structured data processing.
- RDDs remain essential for unstructured data, custom partitioning schemes, and scenarios requiring fine-grained control over distributed computation that DataFrames can’t express.
- The two APIs are fully interoperable; understanding when and how to convert between them lets you leverage the strengths of each within a single pipeline.
Introduction to PySpark’s Data Abstractions
PySpark gives you two primary ways to work with distributed data: RDDs and DataFrames. This isn’t redundant design—it reflects a fundamental trade-off between control and optimization.
RDDs (Resilient Distributed Datasets) came first. They’re the foundation Spark was built on, offering maximum flexibility at the cost of requiring you to think about distributed computation explicitly. DataFrames arrived later, borrowing concepts from pandas and SQL to provide a higher-level abstraction that Spark can optimize aggressively.
Choosing between them isn’t about preference. It’s about understanding what your data looks like, what operations you need, and whether Spark’s optimizer can help you. Get this wrong and you’ll either fight unnecessary complexity or leave significant performance on the table.
RDD Fundamentals
RDDs are immutable, distributed collections of objects. When you work with an RDD, you’re working with raw Python (or Java/Scala) objects partitioned across your cluster. Spark tracks the lineage of transformations but doesn’t know anything about the structure of your data.
This opacity is both RDD’s strength and weakness. Spark can’t optimize what it can’t see, but you can put anything in an RDD and transform it however you want.
Here’s how you create and manipulate RDDs:
from pyspark import SparkContext
sc = SparkContext("local[*]", "RDD Example")
# Create RDD from a Python list
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# Create RDD from a text file
log_lines = sc.textFile("hdfs:///logs/application.log")
# Basic transformations
squared = numbers.map(lambda x: x ** 2)
evens = numbers.filter(lambda x: x % 2 == 0)
# Reduce to a single value
total = numbers.reduce(lambda a, b: a + b)
print(f"Sum: {total}") # Sum: 55
# More complex transformation chain
result = (numbers
.filter(lambda x: x > 3)
.map(lambda x: (x, x ** 2))
.collect())
print(result) # [(4, 16), (5, 25), (6, 36), (7, 49), (8, 64), (9, 81), (10, 100)]
For processing unstructured text:
# Word count - the canonical MapReduce example
word_counts = (log_lines
.flatMap(lambda line: line.split())
.map(lambda word: (word.lower(), 1))
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: x[1], ascending=False))
top_words = word_counts.take(10)
Notice that every transformation is a function you provide. Spark executes your lambdas as black boxes—it can’t look inside lambda x: x ** 2 to optimize it.
DataFrame Fundamentals
DataFrames represent data as a distributed table with named columns and known types. This schema awareness is the key difference. When Spark knows your data has columns user_id: int, amount: double, and timestamp: timestamp, it can optimize storage, predicate pushdown, and execution plans.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
# Create DataFrame from CSV
transactions = spark.read.csv(
"hdfs:///data/transactions.csv",
header=True,
inferSchema=True
)
# Create from JSON
events = spark.read.json("hdfs:///data/events/*.json")
# Basic operations
transactions.printSchema()
# Select and filter
high_value = (transactions
.select("user_id", "amount", "category")
.filter(col("amount") > 1000))
# Aggregations
category_stats = (transactions
.groupBy("category")
.agg(
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
count("*").alias("transaction_count")
)
.orderBy(col("total_amount").desc()))
category_stats.show()
DataFrames also support SQL directly:
# Register as a temporary view
transactions.createOrReplaceTempView("transactions")
# Run SQL queries
monthly_summary = spark.sql("""
SELECT
date_trunc('month', transaction_date) as month,
category,
SUM(amount) as total,
COUNT(*) as num_transactions
FROM transactions
WHERE transaction_date >= '2024-01-01'
GROUP BY 1, 2
ORDER BY 1, 3 DESC
""")
The SQL interface isn’t just syntactic sugar. It goes through the same Catalyst optimizer, producing identical execution plans to the DataFrame API equivalent.
Performance Comparison
The performance gap between RDDs and DataFrames is substantial and well-documented. DataFrames benefit from two major optimizations RDDs can’t access:
Catalyst Optimizer: Analyzes your query plan, reorders operations, pushes predicates down to data sources, and eliminates unnecessary computation. A filter that can run at the data source level never loads filtered-out data into memory.
Tungsten Execution Engine: Uses off-heap memory management, cache-aware computation, and whole-stage code generation. Instead of interpreting your operations row by row, Tungsten generates optimized bytecode.
Here’s a concrete comparison:
import time
from pyspark.sql.functions import col, sum as spark_sum
# Generate test data - 10 million records
data = [(i, i % 100, i * 1.5) for i in range(10_000_000)]
# RDD approach
rdd = sc.parallelize(data)
start = time.time()
rdd_result = (rdd
.filter(lambda x: x[1] < 50) # Filter by category
.map(lambda x: (x[1], x[2])) # Select category and amount
.reduceByKey(lambda a, b: a + b) # Sum by category
.collect())
rdd_time = time.time() - start
print(f"RDD execution time: {rdd_time:.2f}s")
# DataFrame approach
df = spark.createDataFrame(data, ["id", "category", "amount"])
start = time.time()
df_result = (df
.filter(col("category") < 50)
.groupBy("category")
.agg(spark_sum("amount").alias("total"))
.collect())
df_time = time.time() - start
print(f"DataFrame execution time: {df_time:.2f}s")
print(f"DataFrame is {rdd_time / df_time:.1f}x faster")
On a typical local setup, you’ll see DataFrames complete 5-20x faster. On larger clusters with more data, the gap often widens to 50-100x because Catalyst’s optimizations compound—predicate pushdown reduces network shuffle, better memory layout improves cache utilization, and code generation eliminates interpretation overhead.
You can examine what Catalyst does with explain():
df.filter(col("category") < 50).groupBy("category").agg(spark_sum("amount")).explain(True)
This shows the logical plan, optimized plan, and physical execution plan. RDDs have no equivalent—what you write is what runs.
When to Use Each
Use DataFrames when:
- Your data has a schema (CSV, JSON, Parquet, database tables)
- You’re doing SQL-like operations (filters, joins, aggregations)
- Performance matters (it usually does)
- You’re building ETL pipelines or analytics workflows
- You want to leverage Spark’s built-in functions
Use RDDs when:
- You’re processing truly unstructured data (binary files, custom formats)
- You need custom partitioning logic that DataFrame’s
repartition()can’t express - You’re implementing algorithms that don’t map to relational operations
- You need fine-grained control over data placement and computation
- You’re working with complex nested objects that don’t flatten well to tables
A concrete example where RDDs make sense: processing a corpus of documents where each document needs custom NLP preprocessing that varies by document type, followed by a graph algorithm that requires specific partitioning to minimize cross-partition communication.
Converting Between RDDs and DataFrames
Real pipelines often need both. Maybe you load structured data as a DataFrame, extract a column for custom processing as an RDD, then convert back. PySpark makes this straightforward.
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# DataFrame to RDD
df = spark.read.parquet("hdfs:///data/users.parquet")
rdd = df.rdd # Each element is a Row object
# Access Row fields
user_names = rdd.map(lambda row: row.name).collect()
# RDD to DataFrame - with schema inference
rdd = sc.parallelize([
Row(id=1, name="Alice", score=95.5),
Row(id=2, name="Bob", score=87.0)
])
df = rdd.toDF() # Infers schema from Row objects
# RDD to DataFrame - with explicit schema (preferred for production)
rdd = sc.parallelize([(1, "Alice", 95.5), (2, "Bob", 87.0)])
schema = StructType([
StructField("id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=True),
StructField("score", DoubleType(), nullable=True)
])
df = spark.createDataFrame(rdd, schema)
Always prefer explicit schemas in production code. Schema inference reads data twice and can guess wrong (is “123” a string or integer?). Explicit schemas are faster and eliminate ambiguity.
Summary and Best Practices
Start with DataFrames. For the vast majority of data engineering work—ETL, analytics, feature engineering, report generation—DataFrames are faster to write and faster to execute. The Catalyst optimizer is genuinely good at its job.
Reserve RDDs for specific situations: unstructured data that resists tabular representation, algorithms requiring custom partitioning, or legacy code that predates the DataFrame API. When you do use RDDs, consider whether the RDD portion can be isolated to a small part of your pipeline with DataFrame conversions at the boundaries.
When converting, always specify schemas explicitly. Use df.rdd and spark.createDataFrame(rdd, schema) rather than relying on inference. This makes your code more predictable and often faster.
Finally, use explain() liberally when debugging DataFrame performance. Understanding what Catalyst does with your query is essential for writing efficient Spark code. If you see a full table scan where you expected predicate pushdown, that’s actionable information.