Apache Spark - Tungsten Execution Engine
Tungsten represents Apache Spark's low-level execution engine that sits beneath the DataFrame and Dataset APIs. It addresses three critical bottlenecks in distributed data processing: memory...
Key Insights
- Tungsten’s off-heap memory management and custom binary format eliminates JVM garbage collection overhead, achieving 5-10x performance improvements for CPU-bound operations
- Whole-stage code generation compiles entire query stages into single optimized functions, removing virtual function calls and enabling modern CPU optimizations like pipelining and prefetching
- Cache-aware computation with explicit memory layout control keeps hot data in L1/L2 caches, dramatically reducing memory access latency from ~100ns to ~1ns
Understanding Tungsten’s Architecture
Tungsten represents Apache Spark’s low-level execution engine that sits beneath the DataFrame and Dataset APIs. It addresses three critical bottlenecks in distributed data processing: memory management overhead, CPU efficiency, and cache locality.
Traditional Spark execution relied on JVM object serialization and garbage collection. For a simple aggregation over 1 billion rows, the JVM would create billions of objects, triggering frequent GC pauses. Tungsten bypasses this entirely by managing memory as raw bytes off-heap, using sun.misc.Unsafe for direct memory access.
// Traditional approach - creates objects for each row
case class Person(id: Long, name: String, age: Int)
val rdd = sc.parallelize(1 to 1000000).map(i => Person(i, s"name$i", i % 100))
val result = rdd.groupBy(_.age).count()
// Tungsten approach - binary format, no object creation
val df = spark.range(1000000)
.selectExpr("id", "concat('name', id) as name", "id % 100 as age")
val result = df.groupBy("age").count()
The DataFrame version uses Tungsten’s binary encoding where each row exists as contiguous bytes in managed memory regions, not as separate JVM objects.
Memory Management and Binary Format
Tungsten introduces a custom memory manager that allocates large off-heap memory blocks and subdivides them internally. Each record uses an optimized binary format called UnsafeRow that stores fixed-length fields inline and variable-length fields with offset pointers.
// Simplified UnsafeRow structure
public class UnsafeRow {
private Object baseObject; // memory region
private long baseOffset; // starting position
private int numFields;
// Direct memory access without object creation
public long getLong(int ordinal) {
return PlatformDependent.UNSAFE.getLong(
baseObject,
baseOffset + calculateOffset(ordinal)
);
}
public void setLong(int ordinal, long value) {
PlatformDependent.UNSAFE.putLong(
baseObject,
baseOffset + calculateOffset(ordinal),
value
);
}
}
This format enables several optimizations:
Cache-line awareness: Rows align to 64-byte cache lines, ensuring single-cache-line access for common operations.
Pointer swizzling: Sorting and shuffling operations manipulate pointers instead of copying data.
Zero-copy serialization: Binary format works directly with network buffers.
Here’s how to observe memory usage differences:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.execution.debug._
val df = spark.range(10000000)
.selectExpr("id", "id * 2 as doubled", "id % 1000 as bucket")
// Force materialization and examine memory layout
df.cache()
df.count()
// Check execution plan showing tungsten operators
df.groupBy("bucket").agg(sum("doubled")).explain(true)
Whole-Stage Code Generation
Tungsten’s code generation eliminates the iterator model’s virtual function call overhead. Instead of calling next() millions of times, it generates specialized Java bytecode for entire query stages.
// Query that benefits from code generation
val result = spark.range(100000000)
.filter($"id" % 2 === 0)
.filter($"id" % 3 === 0)
.selectExpr("id", "id * id as squared")
.filter($"squared" < 1000000)
Without code generation, this executes as:
// Iterator model - virtual calls for each row
while (input.hasNext()) {
InternalRow row = input.next();
if (row.getLong(0) % 2 == 0) {
if (row.getLong(0) % 3 == 0) {
long squared = row.getLong(0) * row.getLong(0);
if (squared < 1000000) {
output.write(row);
}
}
}
}
With code generation, Tungsten produces:
// Generated code - single tight loop, no virtual calls
public void processNext() {
while (input_index < input_length) {
long id = input_buffer[input_index++];
if ((id & 1) == 0 && id % 3 == 0) {
long squared = id * id;
if (squared < 1000000) {
output_buffer[output_index++] = id;
output_buffer[output_index++] = squared;
}
}
}
}
Enable code generation debugging to see generated code:
spark.conf.set("spark.sql.codegen.wholeStage", true)
spark.conf.set("spark.sql.codegen.comments", true)
val df = spark.range(1000).filter($"id" % 2 === 0)
df.queryExecution.debug.codegen()
Cache-Aware Computation
Modern CPUs spend more time waiting for memory than computing. Tungsten optimizes for cache locality by controlling memory layout and access patterns.
// Hash aggregation with cache-friendly data structures
val aggregation = spark.range(100000000)
.selectExpr("id % 10000 as key", "id as value")
.groupBy("key")
.agg(
sum("value").as("total"),
count("*").as("count"),
avg("value").as("average")
)
Tungsten implements this using:
BytesToBytesMap: Open-addressed hash table with linear probing, keeping keys and values contiguous in memory.
UnsafeFixedWidthAggregationMap: Specialized aggregation buffer where each group’s aggregates sit in adjacent memory locations.
// Simplified cache-friendly aggregation buffer
public class AggregationBuffer {
private ByteBuffer buffer;
private int recordSize = 32; // fits in single cache line
public void aggregate(int groupId, long value) {
int offset = groupId * recordSize;
// All fields for this group in same cache line
long currentSum = buffer.getLong(offset);
long currentCount = buffer.getLong(offset + 8);
buffer.putLong(offset, currentSum + value);
buffer.putLong(offset + 8, currentCount + 1);
}
}
Performance Tuning
Configure Tungsten for your workload:
// Memory configuration
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "16g")
// Code generation limits
spark.conf.set("spark.sql.codegen.maxFields", 200)
spark.conf.set("spark.sql.codegen.methodSplitThreshold", 1024)
// Aggregation optimization
spark.conf.set("spark.sql.execution.useObjectHashAggregateExec", false)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
Monitor Tungsten’s effectiveness:
val metrics = df.groupBy("key").count()
.queryExecution
.executedPlan
.metrics
metrics.foreach { case (name, metric) =>
println(s"$name: ${metric.value}")
}
Key metrics include:
- number of output rows: Verify data volume
- peak memory: Monitor memory pressure
- spill size: Detect when data exceeds memory
- time in aggregation: Identify bottlenecks
Real-World Application
Here’s a complete example processing clickstream data:
case class Click(userId: Long, timestamp: Long, pageId: String, duration: Int)
val clicks = spark.read.parquet("s3://bucket/clicks")
.as[Click]
// Tungsten optimizes this entire pipeline
val sessionStats = clicks
.filter($"duration" > 0)
.withColumn("hour", hour(from_unixtime($"timestamp")))
.groupBy($"userId", $"hour")
.agg(
count("*").as("clicks"),
sum("duration").as("totalDuration"),
countDistinct("pageId").as("uniquePages")
)
.filter($"clicks" >= 5)
.orderBy($"totalDuration".desc)
sessionStats.write.parquet("s3://bucket/sessions")
This query benefits from:
- Binary encoding for efficient filtering
- Code generation for the filter-aggregate-filter pipeline
- Cache-aware hash aggregation for grouping
- Off-heap memory management for large intermediate results
Tungsten transforms this from a query requiring 100GB+ heap and 5+ minutes to one running in under a minute with 10GB off-heap memory.