Apache Spark - Tungsten Execution Engine

Tungsten represents Apache Spark's low-level execution engine that sits beneath the DataFrame and Dataset APIs. It addresses three critical bottlenecks in distributed data processing: memory...

Key Insights

  • Tungsten’s off-heap memory management and custom binary format eliminates JVM garbage collection overhead, achieving 5-10x performance improvements for CPU-bound operations
  • Whole-stage code generation compiles entire query stages into single optimized functions, removing virtual function calls and enabling modern CPU optimizations like pipelining and prefetching
  • Cache-aware computation with explicit memory layout control keeps hot data in L1/L2 caches, dramatically reducing memory access latency from ~100ns to ~1ns

Understanding Tungsten’s Architecture

Tungsten represents Apache Spark’s low-level execution engine that sits beneath the DataFrame and Dataset APIs. It addresses three critical bottlenecks in distributed data processing: memory management overhead, CPU efficiency, and cache locality.

Traditional Spark execution relied on JVM object serialization and garbage collection. For a simple aggregation over 1 billion rows, the JVM would create billions of objects, triggering frequent GC pauses. Tungsten bypasses this entirely by managing memory as raw bytes off-heap, using sun.misc.Unsafe for direct memory access.

// Traditional approach - creates objects for each row
case class Person(id: Long, name: String, age: Int)
val rdd = sc.parallelize(1 to 1000000).map(i => Person(i, s"name$i", i % 100))
val result = rdd.groupBy(_.age).count()

// Tungsten approach - binary format, no object creation
val df = spark.range(1000000)
  .selectExpr("id", "concat('name', id) as name", "id % 100 as age")
val result = df.groupBy("age").count()

The DataFrame version uses Tungsten’s binary encoding where each row exists as contiguous bytes in managed memory regions, not as separate JVM objects.

Memory Management and Binary Format

Tungsten introduces a custom memory manager that allocates large off-heap memory blocks and subdivides them internally. Each record uses an optimized binary format called UnsafeRow that stores fixed-length fields inline and variable-length fields with offset pointers.

// Simplified UnsafeRow structure
public class UnsafeRow {
    private Object baseObject;  // memory region
    private long baseOffset;    // starting position
    private int numFields;
    
    // Direct memory access without object creation
    public long getLong(int ordinal) {
        return PlatformDependent.UNSAFE.getLong(
            baseObject, 
            baseOffset + calculateOffset(ordinal)
        );
    }
    
    public void setLong(int ordinal, long value) {
        PlatformDependent.UNSAFE.putLong(
            baseObject,
            baseOffset + calculateOffset(ordinal),
            value
        );
    }
}

This format enables several optimizations:

Cache-line awareness: Rows align to 64-byte cache lines, ensuring single-cache-line access for common operations.

Pointer swizzling: Sorting and shuffling operations manipulate pointers instead of copying data.

Zero-copy serialization: Binary format works directly with network buffers.

Here’s how to observe memory usage differences:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.execution.debug._

val df = spark.range(10000000)
  .selectExpr("id", "id * 2 as doubled", "id % 1000 as bucket")
  
// Force materialization and examine memory layout
df.cache()
df.count()

// Check execution plan showing tungsten operators
df.groupBy("bucket").agg(sum("doubled")).explain(true)

Whole-Stage Code Generation

Tungsten’s code generation eliminates the iterator model’s virtual function call overhead. Instead of calling next() millions of times, it generates specialized Java bytecode for entire query stages.

// Query that benefits from code generation
val result = spark.range(100000000)
  .filter($"id" % 2 === 0)
  .filter($"id" % 3 === 0)
  .selectExpr("id", "id * id as squared")
  .filter($"squared" < 1000000)

Without code generation, this executes as:

// Iterator model - virtual calls for each row
while (input.hasNext()) {
    InternalRow row = input.next();
    if (row.getLong(0) % 2 == 0) {
        if (row.getLong(0) % 3 == 0) {
            long squared = row.getLong(0) * row.getLong(0);
            if (squared < 1000000) {
                output.write(row);
            }
        }
    }
}

With code generation, Tungsten produces:

// Generated code - single tight loop, no virtual calls
public void processNext() {
    while (input_index < input_length) {
        long id = input_buffer[input_index++];
        if ((id & 1) == 0 && id % 3 == 0) {
            long squared = id * id;
            if (squared < 1000000) {
                output_buffer[output_index++] = id;
                output_buffer[output_index++] = squared;
            }
        }
    }
}

Enable code generation debugging to see generated code:

spark.conf.set("spark.sql.codegen.wholeStage", true)
spark.conf.set("spark.sql.codegen.comments", true)

val df = spark.range(1000).filter($"id" % 2 === 0)
df.queryExecution.debug.codegen()

Cache-Aware Computation

Modern CPUs spend more time waiting for memory than computing. Tungsten optimizes for cache locality by controlling memory layout and access patterns.

// Hash aggregation with cache-friendly data structures
val aggregation = spark.range(100000000)
  .selectExpr("id % 10000 as key", "id as value")
  .groupBy("key")
  .agg(
    sum("value").as("total"),
    count("*").as("count"),
    avg("value").as("average")
  )

Tungsten implements this using:

BytesToBytesMap: Open-addressed hash table with linear probing, keeping keys and values contiguous in memory.

UnsafeFixedWidthAggregationMap: Specialized aggregation buffer where each group’s aggregates sit in adjacent memory locations.

// Simplified cache-friendly aggregation buffer
public class AggregationBuffer {
    private ByteBuffer buffer;
    private int recordSize = 32; // fits in single cache line
    
    public void aggregate(int groupId, long value) {
        int offset = groupId * recordSize;
        // All fields for this group in same cache line
        long currentSum = buffer.getLong(offset);
        long currentCount = buffer.getLong(offset + 8);
        buffer.putLong(offset, currentSum + value);
        buffer.putLong(offset + 8, currentCount + 1);
    }
}

Performance Tuning

Configure Tungsten for your workload:

// Memory configuration
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "16g")

// Code generation limits
spark.conf.set("spark.sql.codegen.maxFields", 200)
spark.conf.set("spark.sql.codegen.methodSplitThreshold", 1024)

// Aggregation optimization
spark.conf.set("spark.sql.execution.useObjectHashAggregateExec", false)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Monitor Tungsten’s effectiveness:

val metrics = df.groupBy("key").count()
  .queryExecution
  .executedPlan
  .metrics

metrics.foreach { case (name, metric) =>
  println(s"$name: ${metric.value}")
}

Key metrics include:

  • number of output rows: Verify data volume
  • peak memory: Monitor memory pressure
  • spill size: Detect when data exceeds memory
  • time in aggregation: Identify bottlenecks

Real-World Application

Here’s a complete example processing clickstream data:

case class Click(userId: Long, timestamp: Long, pageId: String, duration: Int)

val clicks = spark.read.parquet("s3://bucket/clicks")
  .as[Click]

// Tungsten optimizes this entire pipeline
val sessionStats = clicks
  .filter($"duration" > 0)
  .withColumn("hour", hour(from_unixtime($"timestamp")))
  .groupBy($"userId", $"hour")
  .agg(
    count("*").as("clicks"),
    sum("duration").as("totalDuration"),
    countDistinct("pageId").as("uniquePages")
  )
  .filter($"clicks" >= 5)
  .orderBy($"totalDuration".desc)

sessionStats.write.parquet("s3://bucket/sessions")

This query benefits from:

  • Binary encoding for efficient filtering
  • Code generation for the filter-aggregate-filter pipeline
  • Cache-aware hash aggregation for grouping
  • Off-heap memory management for large intermediate results

Tungsten transforms this from a query requiring 100GB+ heap and 5+ minutes to one running in under a minute with 10GB off-heap memory.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.