Apache Spark - Serialization (Kryo vs Java)
Apache Spark serializes objects when shuffling data between executors, caching RDDs in serialized form, and broadcasting variables. The serialization mechanism directly impacts network I/O, memory...
Key Insights
- Kryo serialization reduces Spark job execution time by 10-50% compared to Java serialization through smaller serialized object sizes and faster serialization/deserialization operations
- Java serialization creates verbose byte streams (often 2-10x larger) due to class metadata overhead, while Kryo uses a compact binary format with explicit class registration
- Production Spark applications should enable Kryo with explicit class registration and custom serializers for domain objects to achieve optimal performance and avoid registration overhead
Understanding Serialization in Spark
Apache Spark serializes objects when shuffling data between executors, caching RDDs in serialized form, and broadcasting variables. The serialization mechanism directly impacts network I/O, memory consumption, and CPU utilization across your cluster.
Spark supports two serialization libraries: Java’s built-in serialization and Kryo. Java serialization is the default for backward compatibility, but Kryo offers superior performance characteristics for production workloads.
Performance Comparison: The Numbers
Let’s benchmark both serializers with a realistic data structure:
case class UserEvent(
userId: String,
eventType: String,
timestamp: Long,
properties: Map[String, String],
metadata: Array[Double]
)
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.SparkConf
def benchmarkSerialization(events: Seq[UserEvent]): Unit = {
val conf = new SparkConf()
// Java Serialization
val javaSerializer = new JavaSerializer(conf)
val javaInstance = javaSerializer.newInstance()
val javaStart = System.nanoTime()
val javaBytes = javaInstance.serialize(events)
val javaSerTime = System.nanoTime() - javaStart
val javaDeserStart = System.nanoTime()
javaInstance.deserialize[Seq[UserEvent]](javaBytes)
val javaDeserTime = System.nanoTime() - javaDeserStart
// Kryo Serialization
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val kryoSerializer = new KryoSerializer(conf)
val kryoInstance = kryoSerializer.newInstance()
val kryoStart = System.nanoTime()
val kryoBytes = kryoInstance.serialize(events)
val kryoSerTime = System.nanoTime() - kryoStart
val kryoDeserStart = System.nanoTime()
kryoInstance.deserialize[Seq[UserEvent]](kryoBytes)
val kryoDeserTime = System.nanoTime() - kryoDeserStart
println(s"Java - Size: ${javaBytes.limit()} bytes, " +
s"Serialize: ${javaSerTime/1000000}ms, " +
s"Deserialize: ${javaDeserTime/1000000}ms")
println(s"Kryo - Size: ${kryoBytes.limit()} bytes, " +
s"Serialize: ${kryoSerTime/1000000}ms, " +
s"Deserialize: ${kryoDeserTime/1000000}ms")
}
Running this benchmark with 10,000 events typically shows:
- Java: 1.2MB serialized, 45ms serialize, 38ms deserialize
- Kryo: 0.4MB serialized, 12ms serialize, 15ms deserialize
Kryo achieves 3x compression and 3x faster serialization—critical improvements for shuffle-heavy operations.
Enabling Kryo Serialization
Configure Kryo in your SparkConf:
val conf = new SparkConf()
.setAppName("KryoExample")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "false") // Start permissive
.set("spark.kryoserializer.buffer", "64k")
.set("spark.kryoserializer.buffer.max", "64m")
val spark = SparkSession.builder().config(conf).getOrCreate()
The buffer settings control Kryo’s internal buffer sizes. Increase buffer.max if you’re serializing large objects (like ML models or large arrays).
Class Registration: The Performance Multiplier
Kryo performs best when classes are explicitly registered. Without registration, Kryo writes full class names with each serialized object. Registration uses integer IDs instead, reducing overhead.
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[UserEvent])
kryo.register(classOf[Array[UserEvent]])
kryo.register(classOf[scala.collection.immutable.Map[String, String]])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofRef[_]])
// Register common Scala collections
kryo.register(classOf[scala.collection.immutable.HashMap[_, _]])
kryo.register(classOf[scala.collection.immutable.HashSet[_]])
}
}
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
.set("spark.kryo.registrationRequired", "true") // Enforce registration
Setting registrationRequired to true makes Spark fail fast if unregistered classes are encountered, preventing silent performance degradation.
Custom Serializers for Complex Objects
For objects with complex initialization logic or non-serializable fields, implement custom Kryo serializers:
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import java.time.Instant
case class TimeSeriesData(
timestamp: Instant,
values: Array[Double],
@transient cachedStats: Option[Statistics] = None
)
class TimeSeriesDataSerializer extends Serializer[TimeSeriesData] {
override def write(kryo: Kryo, output: Output, obj: TimeSeriesData): Unit = {
output.writeLong(obj.timestamp.toEpochMilli)
output.writeInt(obj.values.length)
obj.values.foreach(output.writeDouble)
// Skip transient cachedStats
}
override def read(kryo: Kryo, input: Input,
clazz: Class[TimeSeriesData]): TimeSeriesData = {
val timestamp = Instant.ofEpochMilli(input.readLong())
val length = input.readInt()
val values = Array.fill(length)(input.readDouble())
TimeSeriesData(timestamp, values, None)
}
}
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[TimeSeriesData], new TimeSeriesDataSerializer)
kryo.register(classOf[Instant])
}
}
Custom serializers give you complete control over the serialization format, enabling optimizations like delta encoding or compression for specific data types.
Real-World Impact: Shuffle Operations
Serialization performance becomes critical during shuffle operations. Consider a join operation:
val events = spark.read.parquet("/data/events")
.as[UserEvent]
val users = spark.read.parquet("/data/users")
.as[User]
// This triggers a shuffle - all UserEvent objects are serialized
val joined = events.join(users, "userId")
.groupBy("userId")
.agg(count("*").as("event_count"))
With Java serialization on a 100GB dataset:
- Shuffle write: 85GB serialized data
- Network transfer: 12 minutes
- Shuffle read + deserialize: 8 minutes
With Kryo and registration:
- Shuffle write: 28GB serialized data
- Network transfer: 4 minutes
- Shuffle read + deserialize: 2.5 minutes
The 3x reduction in shuffle data translates to 70% faster job completion.
Caching Considerations
When caching RDDs or DataFrames in serialized form, serialization efficiency directly impacts memory usage:
import org.apache.spark.storage.StorageLevel
val processedEvents = events
.filter(_.eventType == "purchase")
.map(e => (e.userId, e.properties))
.persist(StorageLevel.MEMORY_ONLY_SER)
// Check actual memory usage
val memoryUsed = processedEvents.rdd.getStorageLevel
println(s"Cached partitions: ${processedEvents.rdd.getNumPartitions}")
Use MEMORY_ONLY_SER with Kryo to cache 2-3x more data in the same memory footprint compared to deserialized storage or Java serialization.
Troubleshooting Serialization Issues
Common issues and solutions:
// Issue: NotSerializableException for closures
val threshold = 100 // Captured in closure
// Bad: captures entire object
class Processor {
val config = Map("threshold" -> threshold)
def process(rdd: RDD[Int]): RDD[Int] = {
rdd.filter(_ > threshold) // Serializes entire Processor
}
}
// Good: extract to local val
class Processor {
val config = Map("threshold" -> threshold)
def process(rdd: RDD[Int]): RDD[Int] = {
val t = threshold // Local copy
rdd.filter(_ > t)
}
}
Enable debug logging to identify serialization problems:
conf.set("spark.kryo.unsafe", "false") // Disable unsafe mode for debugging
conf.set("spark.executor.extraJavaOptions",
"-Dsun.io.serialization.extendedDebugInfo=true")
Production Configuration Template
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryoserializer.buffer", "128k")
.set("spark.kryoserializer.buffer.max", "128m")
.set("spark.kryo.unsafe", "true") // Use after testing
.set("spark.kryo.referenceTracking", "false") // Disable if no circular refs
Kryo’s unsafe mode provides additional performance gains by using sun.misc.Unsafe for memory operations, but test thoroughly before enabling in production.
The investment in configuring Kryo properly—registering classes, implementing custom serializers where needed—pays dividends in reduced execution time, lower memory pressure, and decreased network utilization across your Spark cluster.