Apache Spark - Complete Architecture Explained
Apache Spark's architecture consists of a driver program that coordinates execution across multiple executor processes. The driver runs your `main()` function, creates the SparkContext, and builds...
Key Insights
- Spark’s architecture separates cluster management from distributed computation through a driver-executor model, enabling it to run on YARN, Mesos, Kubernetes, or standalone mode while maintaining the same application code
- The DAG scheduler and Catalyst optimizer transform high-level operations into optimized physical execution plans, eliminating the need for manual MapReduce job chaining and achieving 100x faster performance for iterative algorithms
- RDD lineage and lazy evaluation enable fault tolerance without replication by recomputing lost partitions from their transformation history, while the Tungsten execution engine uses off-heap memory management and whole-stage code generation for near-native performance
Core Components and Execution Model
Apache Spark’s architecture consists of a driver program that coordinates execution across multiple executor processes. The driver runs your main() function, creates the SparkContext, and builds the execution plan. Executors run on worker nodes, execute tasks, and store data for caching.
// Driver program creates SparkContext
val conf = new SparkConf()
.setAppName("ArchitectureDemo")
.setMaster("spark://master:7077")
val sc = new SparkContext(conf)
// Driver coordinates work across executors
val data = sc.textFile("hdfs://data/logs.txt")
.map(line => (line.split(",")(0), 1))
.reduceByKey(_ + _)
.collect()
When you submit this application, the driver connects to the cluster manager (YARN, Mesos, Kubernetes, or Spark’s standalone manager), which allocates executors. The driver then serializes tasks and sends them to executors for parallel execution.
DAG Scheduler and Job Execution
Spark transforms your code into a Directed Acyclic Graph (DAG) of stages. Transformations like map() and filter() are lazy—they don’t execute immediately. Actions like collect() or count() trigger execution.
# Python example showing transformation pipeline
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DAGExample").getOrCreate()
# These transformations build the DAG but don't execute
df = spark.read.json("events.json") \
.filter(df.event_type == "purchase") \
.groupBy("user_id") \
.agg({"amount": "sum"}) \
.filter(col("sum(amount)") > 1000)
# Action triggers DAG execution
results = df.collect() # Execution happens here
The DAG scheduler breaks the job into stages at shuffle boundaries. A shuffle occurs when data must be redistributed across partitions (groupBy, join, reduceByKey). Each stage contains tasks that can execute in parallel without data movement.
// Examining the execution plan
val rdd = sc.parallelize(1 to 1000000)
.map(x => (x % 100, x))
.reduceByKey(_ + _) // Shuffle boundary - creates new stage
.filter(_._2 > 1000)
println(rdd.toDebugString)
// Shows:
// (2) ShuffledRDD[2] at reduceByKey
// +-(2) MapPartitionsRDD[1] at map
// +- ParallelCollectionRDD[0] at parallelize
Memory Management and Caching
Spark uses a unified memory manager that dynamically adjusts between execution memory (for shuffles, joins, sorts) and storage memory (for caching). This replaced the static 60/40 split in earlier versions.
// Caching strategies
val frequentlyUsed = spark.read.parquet("large_dataset.parquet")
// Cache in memory (deserialized)
frequentlyUsed.cache() // Same as persist(MEMORY_ONLY)
// Cache with serialization for memory efficiency
frequentlyUsed.persist(StorageLevel.MEMORY_ONLY_SER)
// Spill to disk if memory insufficient
frequentlyUsed.persist(StorageLevel.MEMORY_AND_DISK)
// Use off-heap memory (requires configuration)
frequentlyUsed.persist(StorageLevel.OFF_HEAP)
Configuration for memory tuning:
# spark-defaults.conf
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5
spark.executor.memory=4g
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=2g
Shuffle Mechanism
Shuffles are expensive operations requiring disk I/O, serialization, and network transfer. Understanding shuffle internals helps optimize performance.
// High-shuffle operation
val joined = largeDF.join(smallDF, "key") // Shuffle-based join
// Optimized with broadcast join
import org.apache.spark.sql.functions.broadcast
val optimized = largeDF.join(broadcast(smallDF), "key")
// Configure shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")
// Monitor shuffle read/write
val shuffleStats = sc.statusTracker.getExecutorInfos
During a shuffle, Spark’s shuffle write phase creates one file per reducer partition. The shuffle read phase fetches these files across the network. Hash-based shuffling (default) creates M x R files where M is mappers and R is reducers. Sort-based shuffling consolidates to M files with index structures.
Catalyst Optimizer and Tungsten
The Catalyst optimizer transforms DataFrame/SQL operations through four phases: analysis, logical optimization, physical planning, and code generation.
// Query that gets optimized
val query = spark.sql("""
SELECT customer_id, SUM(amount) as total
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY customer_id
HAVING SUM(amount) > 1000
""")
// View the optimized plan
query.explain(true)
// Shows:
// == Parsed Logical Plan ==
// == Analyzed Logical Plan ==
// == Optimized Logical Plan == # Predicate pushdown, projection pruning
// == Physical Plan == # Whole-stage code generation
Catalyst applies optimizations like:
- Predicate pushdown: Filters pushed to data source
- Projection pruning: Only read required columns
- Constant folding: Evaluate constants at compile time
- Join reordering: Optimize join order based on statistics
# Demonstrating predicate pushdown
# Filter pushed to Parquet reader - only matching data loaded
df = spark.read.parquet("partitioned_data/") \
.filter(col("year") == 2024) \
.select("customer_id", "amount")
# Physical plan shows PartitionFilters and PushedFilters
df.explain()
Tungsten provides the execution engine with:
// Tungsten optimizations in action
case class Record(id: Long, value: String)
val ds = spark.createDataset(Seq(
Record(1, "data"),
Record(2, "more")
))
// Tungsten generates specialized bytecode
// Uses sun.misc.Unsafe for direct memory access
// Eliminates virtual function calls
ds.map(r => r.id * 2).collect()
Fault Tolerance Through Lineage
RDDs maintain lineage information—the sequence of transformations that created them. If a partition is lost, Spark recomputes only that partition.
// Lineage example
val input = sc.textFile("input.txt") // Lineage: textFile
val words = input.flatMap(_.split(" ")) // Lineage: textFile -> flatMap
val pairs = words.map(w => (w, 1)) // Lineage: textFile -> flatMap -> map
val counts = pairs.reduceByKey(_ + _) // Lineage: ... -> reduceByKey
// View lineage
println(counts.toDebugString)
// If executor fails during reduceByKey:
// - Spark detects missing partition
// - Recomputes from textFile through the transformation chain
// - Only lost partitions recomputed, not entire dataset
For iterative algorithms, checkpointing truncates lineage to prevent recomputation chains from growing:
sc.setCheckpointDir("hdfs://checkpoint")
var current = initialRDD
for (i <- 1 to 100) {
current = current.map(complexTransformation)
if (i % 10 == 0) current.checkpoint() // Truncate lineage
}
Adaptive Query Execution
Spark 3.0+ includes Adaptive Query Execution (AQE) that optimizes plans during execution based on runtime statistics.
# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE dynamically:
# - Coalesces shuffle partitions
# - Converts sort-merge joins to broadcast joins
# - Handles skewed joins by splitting partitions
df1.join(df2, "key").groupBy("category").count().show()
AQE monitors stage execution and adjusts subsequent stages. If a shuffle produces small partitions, it coalesces them. If join statistics show one table is small enough, it switches to broadcast join mid-execution.
The architecture’s power lies in its layered design: high-level APIs (DataFrame, SQL) compile through Catalyst to optimized plans, execute via Tungsten’s engine, distribute through the DAG scheduler, and recover through lineage—all while abstracting cluster management details from application code.