Apache Spark - Memory Management (On-Heap vs Off-Heap)

Memory management determines whether your Spark job completes in minutes or crashes with an OutOfMemoryError. In distributed computing, memory isn't just about capacity—it's about how efficiently you...

Key Insights

  • Spark’s unified memory management divides executor memory into storage and execution regions, with on-heap memory subject to JVM garbage collection while off-heap memory bypasses it entirely for more predictable performance.
  • Off-heap memory reduces GC pauses by up to 90% in shuffle-heavy workloads, but requires careful sizing since memory leaks won’t trigger automatic cleanup.
  • The optimal configuration often combines both approaches: on-heap for cached DataFrames with frequent access patterns, off-heap for large shuffle operations and serialized data.

Introduction to Spark Memory Architecture

Memory management determines whether your Spark job completes in minutes or crashes with an OutOfMemoryError. In distributed computing, memory isn’t just about capacity—it’s about how efficiently you allocate, access, and release resources across hundreds of executors processing terabytes of data.

Spark’s memory model has evolved significantly. Early versions (pre-1.6) used static memory allocation, rigidly partitioning memory between storage and execution. This led to either wasted storage space or execution failures depending on workload characteristics. Spark 1.6 introduced unified memory management, allowing dynamic sharing between storage and execution regions. Spark 2.0 and beyond refined this further, adding robust off-heap support for workloads where garbage collection overhead becomes the bottleneck.

Understanding when to use on-heap versus off-heap memory isn’t academic—it’s the difference between a stable production pipeline and 3 AM pages about killed executors.

Understanding On-Heap Memory

On-heap memory lives within the JVM heap, managed by Java’s garbage collector. When you create a DataFrame or cache an RDD, Spark allocates objects on the heap. The JVM tracks these objects, and when they’re no longer referenced, garbage collection reclaims the space.

Spark divides on-heap memory into two primary regions under unified memory management:

Execution Memory: Used for shuffles, joins, sorts, and aggregations. This is working memory for active computations.

Storage Memory: Used for caching DataFrames, broadcast variables, and accumulated results.

The boundary between these regions is soft. If execution needs more memory and storage has free space, execution can borrow it (and vice versa). However, execution can evict storage but storage cannot evict execution—active computations take priority.

val spark = SparkSession.builder()
  .appName("OnHeapConfiguration")
  .config("spark.executor.memory", "8g")
  .config("spark.memory.fraction", "0.6")
  .config("spark.memory.storageFraction", "0.5")
  .getOrCreate()

// With these settings on an 8GB executor:
// Reserved memory: 300MB (fixed)
// Usable memory: 8GB - 300MB = 7.7GB
// Unified memory (spark.memory.fraction): 7.7GB * 0.6 = 4.62GB
// Storage region (spark.memory.storageFraction): 4.62GB * 0.5 = 2.31GB
// Execution region: 4.62GB * 0.5 = 2.31GB
// User memory (remaining): 7.7GB * 0.4 = 3.08GB

The garbage collection implications are significant. When the JVM heap fills up, GC pauses halt all executor threads. Minor collections (young generation) typically take milliseconds, but major collections (full GC) can pause executors for seconds. In a cluster with thousands of tasks, frequent full GCs cascade into massive slowdowns.

Tuning GC for Spark workloads typically means:

  • Using G1GC for heaps larger than 4GB
  • Setting appropriate young generation sizes
  • Monitoring GC logs for pause time patterns

Understanding Off-Heap Memory

Off-heap memory allocates native memory outside the JVM heap using sun.misc.Unsafe or direct byte buffers. The JVM doesn’t track these allocations, which means no garbage collection overhead—but also no automatic cleanup.

Spark’s off-heap storage uses a memory allocator that manages blocks of native memory. When you enable off-heap mode, Spark serializes objects into these memory blocks rather than storing them as JVM objects.

val spark = SparkSession.builder()
  .appName("OffHeapConfiguration")
  .config("spark.executor.memory", "4g")
  .config("spark.memory.offHeap.enabled", "true")
  .config("spark.memory.offHeap.size", "8g")
  .config("spark.executor.memoryOverhead", "1g")
  .getOrCreate()

// Memory layout with off-heap:
// JVM heap: 4GB (reduced since less on-heap work)
// Off-heap unified memory: 8GB (for storage and execution)
// Container overhead: 1GB (for native code, thread stacks)
// Total container memory: ~13GB

The benefits are substantial for the right workloads:

Reduced GC pressure: Large cached datasets don’t contribute to heap size, dramatically reducing collection frequency.

Predictable latency: No GC pauses means more consistent task completion times.

Larger working sets: You can cache more data since you’re not limited by practical heap size limits (heaps beyond 32GB lose compressed object pointers).

The trade-offs are equally real:

Manual lifecycle management: Memory leaks in off-heap don’t trigger OutOfMemoryErrors until you exhaust system memory.

Serialization overhead: Data must be serialized to store off-heap, adding CPU cost.

Debugging complexity: Standard JVM profiling tools don’t see off-heap allocations.

When to Use Each Approach

Choose on-heap memory when:

  • Your datasets fit comfortably in executor memory with room for GC
  • You’re iteratively developing and need straightforward debugging
  • Your workload involves many small objects with short lifetimes
  • You’re running interactive queries where serialization latency matters

Choose off-heap memory when:

  • Shuffle operations dominate your job runtime
  • GC pauses exceed 5% of total task time
  • You’re caching large datasets (100GB+ per executor)
  • Latency consistency matters more than raw throughput
  • You’re running streaming workloads with strict SLAs

A common pattern in production: use off-heap for shuffle data (which is already serialized) while keeping frequently-accessed cached DataFrames on-heap for faster deserialization.

Configuration Deep Dive

Here’s a complete configuration comparing both approaches for a production workload:

// On-heap optimized configuration
val onHeapConfig = SparkSession.builder()
  .appName("OnHeapOptimized")
  .config("spark.executor.memory", "16g")
  .config("spark.executor.memoryOverhead", "2g")
  .config("spark.memory.fraction", "0.6")
  .config("spark.memory.storageFraction", "0.5")
  .config("spark.executor.extraJavaOptions", 
    "-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:InitiatingHeapOccupancyPercent=35")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()

// Off-heap optimized configuration
val offHeapConfig = SparkSession.builder()
  .appName("OffHeapOptimized")
  .config("spark.executor.memory", "8g")
  .config("spark.memory.offHeap.enabled", "true")
  .config("spark.memory.offHeap.size", "16g")
  .config("spark.executor.memoryOverhead", "2g")
  .config("spark.memory.fraction", "0.6")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .config("spark.sql.shuffle.partitions", "400")
  .getOrCreate()

Monitor memory usage through the Spark UI and programmatically:

import org.apache.spark.SparkContext

def logMemoryMetrics(sc: SparkContext): Unit = {
  val statusTracker = sc.statusTracker
  val executorInfos = statusTracker.getExecutorInfos
  
  executorInfos.foreach { info =>
    println(s"Executor ${info.executorId}:")
    println(s"  Total memory: ${info.totalOnHeapStorageMemory / 1024 / 1024}MB on-heap")
    println(s"  Used memory: ${info.usedOnHeapStorageMemory / 1024 / 1024}MB on-heap")
    println(s"  Total off-heap: ${info.totalOffHeapStorageMemory / 1024 / 1024}MB")
    println(s"  Used off-heap: ${info.usedOffHeapStorageMemory / 1024 / 1024}MB")
  }
}

// Call periodically or after major operations
logMemoryMetrics(spark.sparkContext)

Performance Benchmarking

Measure the actual impact before committing to a configuration:

import org.apache.spark.sql.functions._
import System.nanoTime

def benchmarkMemoryConfig(spark: SparkSession, label: String): Unit = {
  val runtime = Runtime.getRuntime
  
  // Force GC to get clean baseline
  System.gc()
  Thread.sleep(1000)
  
  val startTime = nanoTime()
  val startMemory = runtime.totalMemory() - runtime.freeMemory()
  
  // Generate test dataset
  val df = spark.range(0, 100000000)
    .withColumn("value", rand() * 1000)
    .withColumn("category", (col("id") % 1000).cast("string"))
  
  // Cache and materialize
  df.cache()
  val count1 = df.count()
  
  // Shuffle-heavy operation
  val aggregated = df.groupBy("category")
    .agg(
      sum("value").as("total"),
      avg("value").as("average"),
      count("*").as("records")
    )
    .orderBy(desc("total"))
  
  val count2 = aggregated.count()
  
  val endTime = nanoTime()
  val endMemory = runtime.totalMemory() - runtime.freeMemory()
  
  val durationMs = (endTime - startTime) / 1000000
  val memoryUsedMB = (endMemory - startMemory) / 1024 / 1024
  
  println(s"[$label] Duration: ${durationMs}ms, Memory delta: ${memoryUsedMB}MB")
  println(s"[$label] Processed ${count1} rows, ${count2} groups")
  
  // Check GC metrics
  val gcBeans = java.lang.management.ManagementFactory.getGarbageCollectorMXBeans
  gcBeans.forEach { gc =>
    println(s"[$label] ${gc.getName}: ${gc.getCollectionCount} collections, ${gc.getCollectionTime}ms total")
  }
  
  df.unpersist()
}

Key metrics to track: GC time as percentage of job duration, task duration variance (P50 vs P99), memory utilization efficiency, and spill-to-disk frequency.

Best Practices and Troubleshooting

Common errors and fixes:

Container killed by YARN for exceeding memory limits: Your total memory (heap + off-heap + overhead) exceeds the container allocation. Increase spark.executor.memoryOverhead or reduce other allocations.

java.lang.OutOfMemoryError: Java heap space: On-heap execution or storage exhausted. Increase spark.executor.memory, reduce spark.memory.storageFraction, or enable off-heap.

ExecutorLostFailure: Executor heartbeat timed out: Often caused by long GC pauses. Enable off-heap for shuffle data or tune GC settings.

Production recommendations:

  1. Start with on-heap, measure GC overhead, then migrate to off-heap if GC exceeds 10% of job time.
  2. Always set spark.executor.memoryOverhead explicitly—the default (10% or 384MB minimum) is often insufficient.
  3. Use Kryo serialization regardless of memory mode; it’s faster and more compact.
  4. Monitor memory metrics in production; memory behavior changes as data volumes grow.
  5. For hybrid configurations, allocate off-heap for shuffle-heavy stages while keeping interactive caches on-heap.

Memory management in Spark isn’t set-and-forget. Profile your actual workloads, measure the impact of configuration changes, and adjust as your data characteristics evolve.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.