Apache Spark - Configuration Properties (Complete List)
Apache Spark's configuration system is deceptively simple on the surface but hides significant complexity. Every Spark application reads configuration from multiple sources, and knowing which source...
Key Insights
- Spark’s configuration hierarchy follows a strict precedence order: SparkConf in code > command-line flags > spark-defaults.conf, and understanding this prevents hours of debugging “why isn’t my setting applied” issues.
- Memory configuration is where most production jobs fail—the relationship between
spark.executor.memory,spark.memory.fraction, andspark.memory.storageFractiondetermines whether your job completes or dies with OOM errors. - Dynamic allocation should be your default for shared clusters, but you must configure both the shuffle service and idle timeout thresholds to avoid resource hoarding or premature executor termination.
Introduction to Spark Configuration
Apache Spark’s configuration system is deceptively simple on the surface but hides significant complexity. Every Spark application reads configuration from multiple sources, and knowing which source wins matters when you’re debugging why your 32GB executor is actually running with 1GB.
The configuration hierarchy, from lowest to highest precedence:
spark-defaults.confin the Spark installation directory- Command-line flags passed via
spark-submit SparkConfset programmatically in your application code
Properties set later in this chain override earlier ones. However, there’s a critical caveat: some properties must be set before the SparkContext initializes, while others can be modified at runtime.
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
.setAppName("ConfigurationDemo")
.setMaster("yarn")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "2")
.set("spark.dynamicAllocation.enabled", "true")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
// Check effective configuration
spark.conf.getAll.foreach(println)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ConfigurationDemo") \
.master("yarn") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "2") \
.config("spark.dynamicAllocation.enabled", "true") \
.getOrCreate()
# Inspect all active configurations
for item in spark.sparkContext.getConf().getAll():
print(f"{item[0]}: {item[1]}")
Application Properties
Application properties define the fundamental identity and resource allocation of your Spark job. Get these wrong, and your job either won’t start or will run inefficiently.
Core Application Settings:
| Property | Default | Description |
|---|---|---|
spark.app.name |
(none) | Application name shown in cluster UIs |
spark.master |
(none) | Cluster manager URL (yarn, k8s, local[*]) |
spark.submit.deployMode |
client | Deploy driver on worker (cluster) or locally (client) |
spark.driver.memory |
1g | Memory allocated to driver process |
spark.driver.cores |
1 | CPU cores for driver (cluster mode only) |
spark.driver.maxResultSize |
1g | Maximum size of serialized results per action |
The spark.driver.maxResultSize property catches many developers off guard. When you call collect() on a large DataFrame, the entire result must fit within this limit. Set it to 0 for unlimited, but understand you’re risking driver OOM errors.
val conf = new SparkConf()
.setAppName("ProductionJob")
.set("spark.driver.memory", "8g")
.set("spark.driver.cores", "4")
.set("spark.driver.maxResultSize", "2g")
.set("spark.jars", "/path/to/dependency1.jar,/path/to/dependency2.jar")
.set("spark.files", "/path/to/config.properties")
.set("spark.submit.deployMode", "cluster")
For JAR dependencies, use spark.jars for code dependencies and spark.files for configuration files or data files that executors need. The files specified in spark.files are distributed to executor working directories and accessible via SparkFiles.get("filename").
Runtime & Execution Properties
Executor configuration directly impacts job performance and cost. The relationship between executor count, memory, and cores determines parallelism and resource utilization.
Key Executor Properties:
| Property | Default | Description |
|---|---|---|
spark.executor.memory |
1g | Memory per executor |
spark.executor.cores |
1 (YARN), all (standalone) | CPU cores per executor |
spark.executor.instances |
2 | Static number of executors |
spark.executor.memoryOverhead |
10% of executor memory | Off-heap memory for JVM overhead |
# Configuration for CPU-intensive workloads (many small tasks)
spark = SparkSession.builder \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.instances", "10") \
.config("spark.task.cpus", "1") \
.getOrCreate()
# Configuration for memory-intensive workloads (large shuffles, caching)
spark = SparkSession.builder \
.config("spark.executor.memory", "16g") \
.config("spark.executor.cores", "2") \
.config("spark.executor.instances", "20") \
.config("spark.executor.memoryOverhead", "4g") \
.getOrCreate()
Dynamic Allocation eliminates the guesswork of executor sizing by automatically scaling based on workload:
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "100")
.set("spark.dynamicAllocation.initialExecutors", "5")
.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
.set("spark.shuffle.service.enabled", "true") // Required for dynamic allocation
The shuffle service is mandatory for dynamic allocation because executors can be removed while their shuffle data is still needed by other stages.
Shuffle & Memory Management
Shuffle operations are where Spark jobs go to die. Understanding memory management prevents the dreaded OutOfMemoryError and FetchFailedException.
Spark’s unified memory model divides executor memory into regions:
Total Executor Memory
├── Reserved Memory (300MB fixed)
├── User Memory (1 - spark.memory.fraction) × (Total - Reserved)
└── Spark Memory (spark.memory.fraction) × (Total - Reserved)
├── Storage Memory (spark.memory.storageFraction)
└── Execution Memory (1 - spark.memory.storageFraction)
Critical Memory Properties:
| Property | Default | Description |
|---|---|---|
spark.memory.fraction |
0.6 | Fraction of heap for Spark operations |
spark.memory.storageFraction |
0.5 | Fraction of Spark memory for caching |
spark.memory.offHeap.enabled |
false | Enable off-heap memory allocation |
spark.memory.offHeap.size |
0 | Off-heap memory size in bytes |
# Optimizing for large shuffle operations
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.shuffle.compress", "true") \
.config("spark.shuffle.spill.compress", "true") \
.config("spark.reducer.maxSizeInFlight", "96m") \
.config("spark.shuffle.file.buffer", "64k") \
.config("spark.memory.fraction", "0.7") \
.config("spark.memory.storageFraction", "0.3") \
.getOrCreate()
# For a 100GB shuffle with 400 partitions
# Each partition handles ~256MB, which is manageable
df_large = spark.read.parquet("/data/large_table")
df_joined = df_large.join(df_dimension, "key")
df_joined.write.parquet("/output/result")
The spark.sql.shuffle.partitions default of 200 is almost never correct. For large datasets, increase it to ensure each partition fits comfortably in memory. A good rule of thumb: target 128MB-256MB per partition.
Networking & Scheduling Properties
Network timeouts and scheduler configuration become critical in large clusters with hundreds of executors or when running on congested networks.
Network Properties:
| Property | Default | Description |
|---|---|---|
spark.network.timeout |
120s | Default timeout for network operations |
spark.rpc.askTimeout |
120s | Timeout for RPC ask operations |
spark.executor.heartbeatInterval |
10s | Heartbeat interval to driver |
spark.storage.blockManagerSlaveTimeoutMs |
120s | Block manager timeout |
Scheduler Configuration:
Spark supports FIFO (default) and FAIR scheduling modes. Fair scheduling is essential for multi-tenant environments:
<!-- fairscheduler.xml -->
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>2</weight>
<minShare>10</minShare>
</pool>
<pool name="adhoc">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
</allocations>
val conf = new SparkConf()
.set("spark.scheduler.mode", "FAIR")
.set("spark.scheduler.allocation.file", "/path/to/fairscheduler.xml")
.set("spark.locality.wait", "3s")
.set("spark.locality.wait.node", "3s")
.set("spark.locality.wait.rack", "3s")
.set("spark.speculation", "true")
.set("spark.speculation.multiplier", "1.5")
.set("spark.speculation.quantile", "0.9")
Speculation reruns slow tasks on different executors. Enable it for production jobs where stragglers impact completion time, but disable it for jobs with non-idempotent side effects.
Storage & I/O Properties
Serialization and compression settings dramatically impact both performance and storage costs.
Serialization Properties:
| Property | Default | Description |
|---|---|---|
spark.serializer |
JavaSerializer | Serializer class for objects |
spark.kryo.registrationRequired |
false | Require Kryo class registration |
spark.kryoserializer.buffer.max |
64m | Maximum Kryo buffer size |
Kryo serialization is significantly faster than Java serialization. Always use it:
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class CustomKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[MyCustomClass])
kryo.register(classOf[Array[MyCustomClass]])
}
}
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.mycompany.CustomKryoRegistrator")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryoserializer.buffer", "64k")
.set("spark.kryoserializer.buffer.max", "128m")
Compression Settings:
spark = SparkSession.builder \
.config("spark.io.compression.codec", "zstd") \
.config("spark.io.compression.zstd.level", "1") \
.config("spark.sql.parquet.compression.codec", "zstd") \
.config("spark.broadcast.compress", "true") \
.config("spark.rdd.compress", "true") \
.getOrCreate()
Zstandard (zstd) offers the best compression ratio to speed tradeoff for most workloads. LZ4 is faster but compresses less.
Security & Monitoring Properties
Production deployments require proper security configuration and monitoring setup for debugging and compliance.
val conf = new SparkConf()
// SSL Configuration
.set("spark.ssl.enabled", "true")
.set("spark.ssl.keyStore", "/path/to/keystore.jks")
.set("spark.ssl.keyStorePassword", sys.env("KEYSTORE_PASSWORD"))
.set("spark.ssl.trustStore", "/path/to/truststore.jks")
.set("spark.ssl.trustStorePassword", sys.env("TRUSTSTORE_PASSWORD"))
// Authentication
.set("spark.authenticate", "true")
.set("spark.authenticate.secret", sys.env("SPARK_SECRET"))
// History Server & Event Logging
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", "hdfs:///spark-history")
.set("spark.eventLog.compress", "true")
.set("spark.history.fs.logDirectory", "hdfs:///spark-history")
// UI Configuration
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "4040")
.set("spark.ui.retainedJobs", "1000")
.set("spark.ui.retainedStages", "1000")
Event logging is non-negotiable for production. Without it, you lose all debugging capability once a job completes. The history server reads these logs and provides post-mortem analysis of completed applications.
Configure metrics for operational visibility:
# metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=graphite.mycompany.com
*.sink.graphite.port=2003
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
*.sink.graphite.prefix=spark
These configurations form the foundation of reliable Spark deployments. Start with sensible defaults, measure actual resource utilization, and iterate based on your specific workload characteristics.