Apache Spark - Dynamic Resource Allocation

Static resource allocation in Spark is wasteful. You request 100 executors, but your job only needs that many during the shuffle-heavy middle stage. The rest of the time, those resources sit idle...

Key Insights

  • Dynamic Resource Allocation automatically scales Spark executors based on workload demand, reducing cluster costs by 30-50% for variable workloads while maintaining performance during peak processing.
  • The external shuffle service is non-negotiable for DRA—without it, removing executors destroys shuffle data and causes job failures.
  • Tuning idle timeouts and backlog thresholds requires understanding your workload patterns; defaults work poorly for both streaming micro-batches and large batch jobs.

Introduction to Dynamic Resource Allocation

Static resource allocation in Spark is wasteful. You request 100 executors, but your job only needs that many during the shuffle-heavy middle stage. The rest of the time, those resources sit idle while other jobs queue behind yours.

Dynamic Resource Allocation (DRA) solves this by scaling executors up and down based on actual demand. When your job has pending tasks, Spark requests more executors. When executors sit idle, Spark releases them back to the cluster.

The business case is straightforward: you pay for what you use. In cloud environments where you’re billed by compute-hour, DRA typically reduces costs by 30-50% for batch workloads with variable resource needs. On shared clusters, it improves overall utilization by returning resources to the pool when jobs don’t need them.

But DRA isn’t magic. Misconfigured, it causes thrashing, data loss, and worse performance than static allocation. Understanding the mechanics is essential.

How Dynamic Resource Allocation Works

DRA operates on a simple feedback loop. The Spark scheduler tracks two signals: pending tasks waiting for executors and idle executors with no work.

When tasks queue up waiting for compute resources, Spark requests additional executors from the cluster manager. The scaling is exponential—Spark requests 1 executor, then 2, then 4, then 8—until demand is met or maxExecutors is reached. This aggressive ramp-up ensures jobs don’t starve during sudden load spikes.

When executors complete their tasks and remain idle beyond a configurable timeout, Spark releases them. This happens gradually to avoid thrashing when new tasks arrive shortly after.

The critical dependency here is the external shuffle service. During a Spark job, executors write shuffle data to local disk. If an executor is removed before downstream tasks read that shuffle data, the job fails. The external shuffle service runs independently of executors and serves shuffle files even after the executor that wrote them is gone.

Here’s the minimal configuration to enable DRA:

val spark = SparkSession.builder()
  .appName("DRA Example")
  .config("spark.dynamicAllocation.enabled", "true")
  .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")
  .config("spark.dynamicAllocation.minExecutors", "2")
  .config("spark.dynamicAllocation.maxExecutors", "100")
  .getOrCreate()

The shuffleTracking.enabled setting is a newer alternative to the external shuffle service (Spark 3.0+). It tracks which executors hold shuffle data and prevents their removal until that data is no longer needed. This simplifies deployment but has limitations—I’ll cover when to use each approach later.

Key Configuration Parameters

Getting DRA right requires understanding six key parameters:

spark.dynamicAllocation.minExecutors: The floor. Spark never scales below this number. Set it to handle your baseline load without scaling delays. For jobs that need fast startup, set this higher.

spark.dynamicAllocation.maxExecutors: The ceiling. This prevents runaway scaling and protects cluster resources. Set it based on your fair share of cluster capacity or budget constraints.

spark.dynamicAllocation.initialExecutors: How many executors to request at job start. Defaults to minExecutors. For jobs with predictable initial load, set this higher to avoid slow ramp-up.

spark.dynamicAllocation.executorIdleTimeout: How long an executor must be idle before removal. Default is 60 seconds. Too low causes thrashing; too high wastes resources.

spark.dynamicAllocation.schedulerBacklogTimeout: How long tasks must wait before Spark requests new executors. Default is 1 second. Lower values improve responsiveness but increase allocation churn.

spark.dynamicAllocation.cachedExecutorIdleTimeout: Idle timeout for executors holding cached RDDs. Default is infinity—executors with cached data are never removed. Set this explicitly if you cache data but still want scaling.

Here’s a production configuration for a batch ETL workload:

val spark = SparkSession.builder()
  .appName("Production ETL")
  .config("spark.dynamicAllocation.enabled", "true")
  .config("spark.dynamicAllocation.minExecutors", "5")
  .config("spark.dynamicAllocation.maxExecutors", "200")
  .config("spark.dynamicAllocation.initialExecutors", "20")
  .config("spark.dynamicAllocation.executorIdleTimeout", "120s")
  .config("spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
  .config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "600s")
  .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")
  .config("spark.dynamicAllocation.shuffleTracking.timeout", "30m")
  .getOrCreate()

For streaming workloads with micro-batches, tighten the timeouts:

val spark = SparkSession.builder()
  .appName("Streaming Job")
  .config("spark.dynamicAllocation.enabled", "true")
  .config("spark.dynamicAllocation.minExecutors", "10")
  .config("spark.dynamicAllocation.maxExecutors", "50")
  .config("spark.dynamicAllocation.executorIdleTimeout", "30s")
  .config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
  .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")
  .getOrCreate()

External Shuffle Service Setup

Shuffle tracking works for many scenarios, but the external shuffle service remains the robust choice for production clusters. It handles edge cases better and doesn’t require Spark 3.0+.

On YARN, configure the shuffle service in yarn-site.xml:

<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>spark_shuffle</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
  <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
  <name>spark.shuffle.service.port</name>
  <value>7337</value>
</property>

Copy the spark-<version>-yarn-shuffle.jar to YARN’s classpath and restart NodeManagers.

On Kubernetes, deploy the shuffle service as a DaemonSet:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: spark-shuffle-service
spec:
  selector:
    matchLabels:
      app: spark-shuffle
  template:
    metadata:
      labels:
        app: spark-shuffle
    spec:
      containers:
      - name: shuffle-service
        image: apache/spark:3.5.0
        command: ["/opt/spark/sbin/start-shuffle-service.sh"]
        ports:
        - containerPort: 7337
          hostPort: 7337
        volumeMounts:
        - name: spark-local-dir
          mountPath: /tmp/spark-local
      volumes:
      - name: spark-local-dir
        hostPath:
          path: /tmp/spark-local
          type: DirectoryOrCreate

Then configure your Spark application:

.config("spark.shuffle.service.enabled", "true")
.config("spark.shuffle.service.port", "7337")

Monitoring and Debugging DRA Behavior

The Spark UI’s Executors tab shows current executor count and allocation history. Watch for these patterns:

Healthy scaling: Executor count rises during shuffles and compute-heavy stages, drops during I/O waits and between jobs.

Thrashing: Rapid add/remove cycles. Executors are allocated, do minimal work, then removed, then reallocated. Increase executorIdleTimeout.

Starvation: Pending tasks but executor count stuck at max. Either raise maxExecutors or investigate why the cluster isn’t granting requests.

Stuck high: Executor count stays at max even during idle periods. Check for cached RDDs holding executors or misconfigured timeouts.

For deeper visibility, implement a custom listener:

import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorRemoved}

class DRAMonitoringListener extends SparkListener {
  override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
    println(s"[DRA] Executor added: ${event.executorId} at ${event.time}")
    println(s"[DRA] Host: ${event.executorInfo.executorHost}")
    println(s"[DRA] Cores: ${event.executorInfo.totalCores}")
  }

  override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
    println(s"[DRA] Executor removed: ${event.executorId} at ${event.time}")
    println(s"[DRA] Reason: ${event.reason}")
  }
}

// Register the listener
spark.sparkContext.addSparkListener(new DRAMonitoringListener())

In production, replace println with proper metrics emission to your monitoring system.

Common Pitfalls and Best Practices

Pitfall: Cached data loss. When an executor with cached RDD partitions is removed, that data is lost. Subsequent tasks must recompute it. Either set cachedExecutorIdleTimeout appropriately or use MEMORY_AND_DISK persistence with replication.

Pitfall: Speculative execution conflicts. Speculative tasks can keep executors busy, preventing scale-down. If you use speculation, increase idle timeouts to account for this.

Pitfall: Small file output. If DRA scales down during the write phase, you get fewer output files. This might be good (fewer small files) or bad (larger files, less parallelism for downstream jobs). Control this with explicit coalescing before writes.

Pitfall: Cluster manager delays. DRA requests executors, but YARN or Kubernetes may take time to provision them. Account for this latency in your backlog timeout settings.

Best practice: Set meaningful bounds. Don’t set maxExecutors to 1000 “just in case.” Set it to what your job actually needs at peak, plus 20% buffer.

Best practice: Test scaling behavior. Run your job with DRA enabled and watch the executor graph. Verify it scales as expected before production deployment.

Best practice: Use shuffle tracking on Spark 3.0+. It’s simpler than external shuffle service and handles most cases. Reserve external shuffle service for multi-tenant clusters or when you need shuffle data to survive driver restarts.

Conclusion

Dynamic Resource Allocation is essential for cost-efficient Spark deployments. Enable it for batch jobs with variable resource needs, multi-stage pipelines with different parallelism requirements, and shared clusters where hoarding resources impacts other users.

Skip DRA for short-lived jobs where allocation overhead exceeds savings, streaming jobs with consistent load, and jobs where startup latency is critical.

The key to success: understand your workload patterns, set appropriate bounds and timeouts, and monitor scaling behavior. DRA works best when you tune it to your specific use case rather than accepting defaults.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.