Apache Spark - Driver and Executor Explained

Apache Spark uses a master-slave architecture where the driver program acts as the master and executors function as workers. The driver runs your `main()` function, creates the SparkContext, and...

Key Insights

  • The driver program orchestrates Spark jobs and maintains the SparkContext, while executors run on worker nodes to execute tasks and store data for your application
  • Understanding the driver-executor architecture is critical for optimizing resource allocation, debugging performance bottlenecks, and preventing out-of-memory errors
  • Driver memory handles job coordination and result collection; executor memory manages task execution and data caching—misconfiguring either causes job failures

The Driver-Executor Architecture

Apache Spark uses a master-slave architecture where the driver program acts as the master and executors function as workers. The driver runs your main() function, creates the SparkContext, and converts your application into a DAG (Directed Acyclic Graph) of tasks. Executors run on cluster nodes, executing these tasks and storing data in memory or disk.

When you submit a Spark application, the driver negotiates resources with the cluster manager (YARN, Mesos, Kubernetes, or Standalone), which then launches executors on worker nodes. This separation allows Spark to scale horizontally—adding more executors increases parallel processing capacity.

from pyspark.sql import SparkSession

# Driver code starts here
spark = SparkSession.builder \
    .appName("DriverExecutorDemo") \
    .master("yarn") \
    .config("spark.executor.instances", "10") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# This runs on the driver
df = spark.read.parquet("s3://bucket/data/")

# Transformations create execution plan (lazy evaluation)
filtered_df = df.filter(df["amount"] > 1000)
grouped_df = filtered_df.groupBy("category").sum("amount")

# Action triggers job execution on executors
results = grouped_df.collect()  # Results sent back to driver

Driver Responsibilities

The driver performs several critical functions:

Job Scheduling: Converts your high-level operations into stages and tasks. Each stage contains tasks that can execute in parallel.

Task Distribution: Assigns tasks to executors based on data locality preferences, attempting to minimize network transfers.

Result Collection: Gathers results from executors when you call actions like collect(), count(), or take().

Metadata Management: Maintains catalog information, broadcast variables, and accumulators.

Here’s what happens in the driver during a typical job:

import org.apache.spark.sql.SparkSession

object DriverExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Driver Operations")
      .getOrCreate()
    
    import spark.implicits._
    
    // Driver reads schema and creates execution plan
    val transactions = spark.read
      .option("header", "true")
      .csv("hdfs:///data/transactions.csv")
    
    // Driver broadcasts small lookup table to all executors
    val categoryMap = Map("A" -> "Electronics", "B" -> "Clothing")
    val broadcastMap = spark.sparkContext.broadcast(categoryMap)
    
    // Transformation - driver builds DAG
    val enriched = transactions.map { row =>
      val category = row.getAs[String]("category_code")
      val description = broadcastMap.value.getOrElse(category, "Unknown")
      (row.getAs[String]("id"), description, row.getAs[Double]("amount"))
    }
    
    // Action - driver triggers execution and collects results
    val summary = enriched.groupBy(_._2)
      .agg(sum(_._3))
      .collect()  // Driver OOM risk if result set is large
    
    summary.foreach(println)  // Runs on driver
  }
}

Executor Responsibilities

Executors are JVM processes that run on worker nodes. Each executor has a fixed number of cores and memory allocated at startup. Their responsibilities include:

Task Execution: Running the actual data processing logic (map, filter, reduce operations).

Data Storage: Caching RDD partitions and DataFrame data in memory or disk when you call cache() or persist().

Shuffle Operations: Writing intermediate data during shuffles and reading shuffle data from other executors.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.getOrCreate()

# This function executes on executors
def complex_transformation(value):
    # CPU-intensive operation runs in executor JVM
    import time
    time.sleep(0.01)  # Simulating processing
    return value.upper()

transform_udf = udf(complex_transformation, StringType())

df = spark.read.parquet("s3://bucket/large_dataset/")

# Each partition processed by one task on one executor core
processed = df.withColumn("processed_name", transform_udf(col("name")))

# Executors cache data in their memory
processed.cache()

# Executors perform aggregation locally before shuffle
result = processed.groupBy("country").count()

# Executors write results to storage
result.write.mode("overwrite").parquet("s3://bucket/output/")

Memory Management

Understanding memory allocation prevents the most common Spark failures. Both driver and executor memory divide into several regions:

Driver Memory:

  • Execution memory: Temporary storage for operations
  • Storage memory: Cached broadcast variables
  • User memory: Your application objects and data structures

Executor Memory:

  • Execution memory (spark.memory.fraction * 0.5): Shuffles, joins, sorts
  • Storage memory (spark.memory.fraction * 0.5): Cached RDDs/DataFrames
  • Reserved memory: ~300MB for Spark internals
  • User memory: UDF objects and user-created data structures
# Configure memory allocation
spark = SparkSession.builder \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .getOrCreate()

# Example causing driver OOM
large_df = spark.range(0, 1000000000)
# DON'T DO THIS - brings all data to driver
all_data = large_df.collect()  # Driver crashes

# Correct approach - process on executors
large_df.write.parquet("output/")  # Executors write directly

# Safe driver collection with limit
sample = large_df.limit(1000).collect()  # Only 1000 rows to driver

Dynamic Allocation

Dynamic allocation adjusts executor count based on workload, optimizing resource utilization:

val spark = SparkSession.builder()
  .config("spark.dynamicAllocation.enabled", "true")
  .config("spark.dynamicAllocation.minExecutors", "2")
  .config("spark.dynamicAllocation.maxExecutors", "100")
  .config("spark.dynamicAllocation.initialExecutors", "10")
  .config("spark.dynamicAllocation.executorIdleTimeout", "60s")
  .config("spark.shuffle.service.enabled", "true")  // Required for dynamic allocation
  .getOrCreate()

val data = spark.read.parquet("hdfs:///large_dataset")

// Spark requests more executors as tasks queue up
val processed = data
  .repartition(1000)  // Creates many tasks
  .map(row => expensiveOperation(row))

processed.write.parquet("hdfs:///output")
// Spark releases idle executors after timeout

Debugging and Monitoring

The Spark UI (default port 4040) shows driver and executor metrics:

import logging
from pyspark.sql import SparkSession

# Enable detailed logging
spark = SparkSession.builder \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "hdfs:///spark-logs") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("INFO")

# Monitor executor status programmatically
status = sc.getExecutorMemoryStatus()
print(f"Active executors: {status}")

# Track task distribution
df = spark.read.parquet("data/")
df.rdd.glom().map(len).collect()  # Shows records per partition

# Identify skewed partitions
def partition_stats(iterator):
    import time
    start = time.time()
    count = sum(1 for _ in iterator)
    duration = time.time() - start
    yield (count, duration)

stats = df.rdd.mapPartitions(partition_stats).collect()
for i, (count, duration) in enumerate(stats):
    print(f"Partition {i}: {count} records, {duration:.2f}s")

Common Pitfalls

Driver OOM from collect(): Calling collect() on large datasets transfers all data to the driver. Use take(), first(), or write results directly to storage.

Executor OOM from skewed data: Uneven partition sizes cause some executors to process far more data. Use salting or custom partitioning:

from pyspark.sql.functions import rand, col

# Fix skewed joins by adding salt
skewed_df = large_df.withColumn("salt", (rand() * 10).cast("int"))
salted_join = skewed_df.join(
    small_df.withColumn("salt", (rand() * 10).cast("int")),
    ["join_key", "salt"]
).drop("salt")

Insufficient parallelism: Too few partitions underutilize executors. Aim for 2-3 tasks per executor core:

# Check current partitions
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Increase parallelism
df_repartitioned = df.repartition(200)

The driver-executor model enables Spark’s scalability but requires careful resource planning. Monitor memory usage, partition data appropriately, and avoid operations that move large datasets to the driver. Master these fundamentals to build reliable, performant Spark applications.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.