Apache Spark - Transformations vs Actions
Apache Spark operates on a lazy evaluation model where operations fall into two categories: transformations and actions. Transformations build up a logical execution plan (DAG - Directed Acyclic...
Key Insights
- Transformations are lazy operations that define a computation graph without executing it, while actions trigger actual computation and return results to the driver or write data to storage
- Understanding the distinction between transformations and actions is critical for optimizing Spark jobs, managing memory, and avoiding unnecessary recomputations
- Strategic use of caching between transformations and multiple actions can dramatically improve performance by preventing redundant computation of the entire DAG
The Fundamental Difference
Apache Spark operates on a lazy evaluation model where operations fall into two categories: transformations and actions. Transformations build up a logical execution plan (DAG - Directed Acyclic Graph) without performing any computation. Actions trigger the execution of that plan and materialize results.
This distinction isn’t academic—it fundamentally affects how you write efficient Spark code. Every transformation you call simply adds a step to the execution plan. Nothing actually happens until you invoke an action.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TransformationsVsActions").getOrCreate()
# Read data - this is a transformation (lazy)
df = spark.read.parquet("s3://bucket/data/")
# Filter - transformation (lazy)
filtered_df = df.filter(df["age"] > 25)
# Select - transformation (lazy)
result_df = filtered_df.select("name", "age", "city")
# At this point, ZERO data has been read or processed
# The DAG has been built but not executed
# count() is an ACTION - this triggers execution
total_count = result_df.count() # NOW everything executes
Common Transformations
Transformations return a new RDD, DataFrame, or Dataset. They’re the building blocks of your data processing pipeline.
Narrow transformations operate on a single partition without requiring data shuffle:
# map - transforms each element
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
squared = rdd.map(lambda x: x * x)
# filter - selects elements matching a predicate
even_numbers = rdd.filter(lambda x: x % 2 == 0)
# flatMap - maps each element to 0 or more elements
words_rdd = spark.sparkContext.parallelize(["hello world", "spark streaming"])
words = words_rdd.flatMap(lambda line: line.split(" "))
Wide transformations require shuffling data across partitions:
# groupByKey - groups values by key (expensive, avoid when possible)
pairs = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])
grouped = pairs.groupByKey()
# reduceByKey - aggregates values by key (preferred over groupByKey)
summed = pairs.reduceByKey(lambda x, y: x + y)
# join - combines two datasets
users = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
purchases = spark.sparkContext.parallelize([(1, 100), (1, 200), (2, 150)])
joined = users.join(purchases)
For DataFrames, transformations look more SQL-like:
from pyspark.sql.functions import col, avg, sum, when
# Multiple transformations chained
result = (df
.filter(col("status") == "active")
.groupBy("department")
.agg(
avg("salary").alias("avg_salary"),
sum("revenue").alias("total_revenue")
)
.withColumn("performance_tier",
when(col("total_revenue") > 1000000, "high")
.when(col("total_revenue") > 500000, "medium")
.otherwise("low")
)
.orderBy(col("total_revenue").desc())
)
# Still no execution - just building the plan
Common Actions
Actions force computation and return values to the driver program or write data to external storage.
# collect() - returns all elements to driver (dangerous for large datasets)
data = rdd.collect()
# count() - returns the number of elements
num_elements = rdd.count()
# first() - returns the first element
first_element = rdd.first()
# take(n) - returns first n elements
first_ten = rdd.take(10)
# reduce() - aggregates elements using a function
total = rdd.reduce(lambda x, y: x + y)
# foreach() - applies a function to each element (side effects)
rdd.foreach(lambda x: print(x))
# saveAsTextFile() - writes to storage
rdd.saveAsTextFile("hdfs://output/path")
DataFrame-specific actions:
# show() - displays data in tabular format
df.show(20, truncate=False)
# write operations are actions
df.write.mode("overwrite").parquet("s3://bucket/output/")
# toPandas() - converts to Pandas DataFrame (careful with memory)
pandas_df = df.toPandas()
# describe() - computes statistics
stats = df.describe()
The Performance Impact
Understanding transformations vs actions is crucial for optimization. Consider this inefficient code:
# BAD: Multiple actions on the same transformation chain
df = spark.read.parquet("large_dataset.parquet")
filtered = df.filter(col("value") > 100)
count1 = filtered.count() # Action 1 - full DAG execution
max_value = filtered.agg({"value": "max"}).collect() # Action 2 - full DAG execution again
sample = filtered.take(10) # Action 3 - full DAG execution again
Each action recomputes the entire DAG from scratch. For large datasets, this is catastrophically inefficient. The solution is caching:
# GOOD: Cache intermediate results
df = spark.read.parquet("large_dataset.parquet")
filtered = df.filter(col("value") > 100)
filtered.cache() # or persist()
count1 = filtered.count() # Computes and caches
max_value = filtered.agg({"value": "max"}).collect() # Uses cache
sample = filtered.take(10) # Uses cache
Persistence Strategies
The cache() method is shorthand for persist(StorageLevel.MEMORY_AND_DISK). You can choose different storage levels:
from pyspark import StorageLevel
# Memory only (fastest, but may lose partitions if memory is full)
df.persist(StorageLevel.MEMORY_ONLY)
# Memory and disk (spills to disk if needed)
df.persist(StorageLevel.MEMORY_AND_DISK)
# Disk only (slower but reliable)
df.persist(StorageLevel.DISK_ONLY)
# Serialized in memory (more space efficient, CPU overhead)
df.persist(StorageLevel.MEMORY_ONLY_SER)
# With replication
df.persist(StorageLevel.MEMORY_AND_DISK_2)
Always unpersist when you’re done:
df.unpersist()
Identifying Transformations vs Actions
When in doubt, check the return type. Transformations return RDD, DataFrame, or Dataset objects. Actions return Python/Scala/Java types or Unit (void).
// Scala examples showing return types
val rdd: RDD[Int] = sc.parallelize(1 to 100)
// Transformation - returns RDD[Int]
val doubled: RDD[Int] = rdd.map(_ * 2)
// Action - returns Long
val count: Long = rdd.count()
// Transformation - returns RDD[(Int, Int)]
val pairs: RDD[(Int, Int)] = rdd.map(x => (x, x * x))
// Action - returns Array[Int]
val array: Array[Int] = rdd.collect()
Real-World Example: ETL Pipeline
Here’s a practical ETL pipeline demonstrating proper use of transformations and actions:
from pyspark.sql.functions import col, to_date, year, month
# Extract
raw_orders = spark.read.json("s3://raw/orders/")
raw_customers = spark.read.json("s3://raw/customers/")
# Transform (all lazy)
orders_cleaned = (raw_orders
.filter(col("order_status").isNotNull())
.withColumn("order_date", to_date(col("order_timestamp")))
.withColumn("year", year(col("order_date")))
.withColumn("month", month(col("order_date")))
)
customers_cleaned = (raw_customers
.filter(col("customer_id").isNotNull())
.dropDuplicates(["customer_id"])
)
# Join (still lazy)
enriched = orders_cleaned.join(
customers_cleaned,
on="customer_id",
how="left"
)
# Aggregation (still lazy)
monthly_summary = (enriched
.groupBy("year", "month", "customer_segment")
.agg(
sum("order_amount").alias("total_revenue"),
count("order_id").alias("order_count")
)
)
# Cache before multiple actions
monthly_summary.cache()
# Load - these are actions that trigger execution
monthly_summary.write.mode("overwrite").partitionBy("year", "month").parquet("s3://processed/monthly_summary/")
# Quality check - reuses cache
quality_check = monthly_summary.filter(col("total_revenue") < 0).count()
if quality_check > 0:
print(f"WARNING: {quality_check} records with negative revenue")
# Cleanup
monthly_summary.unpersist()
This pattern—build transformation chains, cache before multiple actions, then unpersist—is the foundation of efficient Spark programming. Master this distinction and you’ll write Spark jobs that are both correct and performant.