Apache Spark - Accumulators with Examples

When processing data across a distributed cluster, you often need to aggregate information back to a central location. Counting malformed records, tracking processing metrics, or summing values...

Key Insights

  • Accumulators are write-only shared variables that allow workers to safely aggregate values back to the driver, making them ideal for counters, sums, and debugging metrics.
  • Using accumulators inside transformations (like map) can lead to incorrect counts due to task retries and stage recomputation—always prefer actions (like foreach) for guaranteed accuracy.
  • Custom accumulators extend AccumulatorV2 and require implementing merge logic, enabling complex aggregations like tracking min/max values or collecting unique error types across distributed workers.

Introduction to Accumulators

When processing data across a distributed cluster, you often need to aggregate information back to a central location. Counting malformed records, tracking processing metrics, or summing values across partitions—these tasks require a mechanism that works safely in a parallel environment.

Accumulators solve this problem. They’re shared variables that workers can only add to, while only the driver can read the final value. This “write-only from workers” design eliminates race conditions and makes them safe for distributed aggregation.

Think of accumulators as distributed counters. Each executor updates its local copy, and Spark handles merging these updates back to the driver when an action completes. They’re not meant for passing data between tasks—they’re strictly for aggregating information upward.

Built-in Accumulator Types

Spark provides three built-in accumulator types that cover most common use cases.

LongAccumulator handles integer counting and summation. Use it for record counts, error tallies, or any whole-number aggregation.

DoubleAccumulator works with floating-point values. It’s useful for summing monetary values, calculating totals with decimal precision, or aggregating measurements.

CollectionAccumulator collects values into a list. Use it sparingly—collecting large amounts of data defeats the purpose of distributed processing and can cause memory issues on the driver.

Here’s a basic example tracking processed records:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("AccumulatorBasics")
  .master("local[*]")
  .getOrCreate()

val sc = spark.sparkContext

// Create a LongAccumulator
val recordCount = sc.longAccumulator("processedRecords")

val data = sc.parallelize(1 to 10000)

data.foreach { record =>
  recordCount.add(1)
  // Process record...
}

println(s"Total records processed: ${recordCount.value}")
// Output: Total records processed: 10000

Creating and Using Accumulators

The workflow for accumulators follows three steps: create on the driver, update in distributed code, read on the driver.

Creating accumulators happens through SparkContext. You can optionally name them for visibility in the Spark UI:

val counter = sc.longAccumulator("myCounter")
val sum = sc.doubleAccumulator("runningTotal")
val collected = sc.collectionAccumulator[String]("errorMessages")

A common real-world pattern is tracking data quality issues while processing. Here’s how to count malformed records without disrupting your main pipeline:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("DataQualityTracking")
  .master("local[*]")
  .getOrCreate()

val sc = spark.sparkContext

// Accumulators for tracking data quality
val validRecords = sc.longAccumulator("validRecords")
val malformedRecords = sc.longAccumulator("malformedRecords")
val nullFieldCount = sc.longAccumulator("nullFields")

case class UserRecord(id: String, email: String, age: Int)

def parseRecord(line: String): Option[UserRecord] = {
  val parts = line.split(",")
  if (parts.length != 3) {
    malformedRecords.add(1)
    None
  } else {
    try {
      val record = UserRecord(parts(0), parts(1), parts(2).toInt)
      if (record.email.isEmpty) nullFieldCount.add(1)
      validRecords.add(1)
      Some(record)
    } catch {
      case _: NumberFormatException =>
        malformedRecords.add(1)
        None
    }
  }
}

val rawData = sc.parallelize(Seq(
  "1,alice@example.com,30",
  "2,bob@example.com,invalid",  // malformed
  "3,,25",                       // null email
  "4,dave@example.com",          // missing field
  "5,eve@example.com,28"
))

val validUsers = rawData.flatMap(parseRecord).collect()

println(s"Valid records: ${validRecords.value}")
println(s"Malformed records: ${malformedRecords.value}")
println(s"Records with null fields: ${nullFieldCount.value}")

Custom Accumulators

When built-in types aren’t enough, extend AccumulatorV2 to create custom accumulators. You must implement six methods:

  • reset(): Reset to initial state
  • add(): Add a value
  • merge(): Combine with another accumulator
  • value: Return the current value
  • copy(): Create a copy
  • isZero: Check if in initial state

Here’s a custom accumulator tracking minimum and maximum values simultaneously:

import org.apache.spark.util.AccumulatorV2

case class MinMax(min: Double, max: Double)

class MinMaxAccumulator extends AccumulatorV2[Double, MinMax] {
  private var _min: Double = Double.MaxValue
  private var _max: Double = Double.MinValue

  override def isZero: Boolean = 
    _min == Double.MaxValue && _max == Double.MinValue

  override def copy(): AccumulatorV2[Double, MinMax] = {
    val newAcc = new MinMaxAccumulator
    newAcc._min = this._min
    newAcc._max = this._max
    newAcc
  }

  override def reset(): Unit = {
    _min = Double.MaxValue
    _max = Double.MinValue
  }

  override def add(v: Double): Unit = {
    _min = math.min(_min, v)
    _max = math.max(_max, v)
  }

  override def merge(other: AccumulatorV2[Double, MinMax]): Unit = {
    val o = other.asInstanceOf[MinMaxAccumulator]
    _min = math.min(_min, o._min)
    _max = math.max(_max, o._max)
  }

  override def value: MinMax = MinMax(_min, _max)
}

// Usage
val minMaxAcc = new MinMaxAccumulator
sc.register(minMaxAcc, "temperatureRange")

val temperatures = sc.parallelize(Seq(72.5, 68.0, 85.3, 91.2, 55.8, 78.4))

temperatures.foreach(temp => minMaxAcc.add(temp))

val result = minMaxAcc.value
println(s"Temperature range: ${result.min}°F to ${result.max}°F")
// Output: Temperature range: 55.8°F to 91.2°F

For collecting unique values, here’s an accumulator using a Set:

import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable

class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
  private val _set = mutable.Set.empty[T]

  override def isZero: Boolean = _set.isEmpty
  override def copy(): AccumulatorV2[T, Set[T]] = {
    val newAcc = new SetAccumulator[T]
    newAcc._set ++= this._set
    newAcc
  }
  override def reset(): Unit = _set.clear()
  override def add(v: T): Unit = _set += v
  override def merge(other: AccumulatorV2[T, Set[T]]): Unit = 
    _set ++= other.value
  override def value: Set[T] = _set.toSet
}

// Track unique error types
val errorTypes = new SetAccumulator[String]
sc.register(errorTypes, "uniqueErrors")

Accumulators in Actions vs Transformations

This is the most critical gotcha with accumulators. Updates inside transformations are not guaranteed to execute exactly once.

Spark may retry failed tasks, recompute lost partitions, or re-execute stages. Each recomputation triggers accumulator updates again. Only accumulators updated inside actions have guaranteed exactly-once semantics.

val sc = spark.sparkContext
val transformationCounter = sc.longAccumulator("inTransformation")
val actionCounter = sc.longAccumulator("inAction")

val data = sc.parallelize(1 to 1000)

// DANGEROUS: accumulator in transformation
val mapped = data.map { x =>
  transformationCounter.add(1)
  x * 2
}

// This forces computation
mapped.count()
println(s"Transformation counter: ${transformationCounter.value}")

// Access again - transformation re-executes!
mapped.collect()
println(s"Transformation counter after recompute: ${transformationCounter.value}")
// Value is now 2000, not 1000!

// SAFE: accumulator in action
data.foreach { x =>
  actionCounter.add(1)
}
println(s"Action counter: ${actionCounter.value}")
// Reliably 1000

To use accumulators safely in transformations, cache the RDD first:

val cachedData = data.map { x =>
  transformationCounter.add(1)
  x * 2
}.cache()

cachedData.count()  // Computes and caches
cachedData.collect() // Uses cache, no recomputation

Practical Use Cases

Here’s a complete pipeline demonstrating multiple accumulators for production monitoring:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("LogProcessingPipeline")
  .master("local[*]")
  .getOrCreate()

val sc = spark.sparkContext

// Metrics accumulators
val totalLines = sc.longAccumulator("totalLines")
val blankLines = sc.longAccumulator("blankLines")
val errorLines = sc.longAccumulator("errorLines")
val warnLines = sc.longAccumulator("warnLines")
val infoLines = sc.longAccumulator("infoLines")
val parseFailures = sc.longAccumulator("parseFailures")
val bytesProcessed = sc.longAccumulator("bytesProcessed")

case class LogEntry(level: String, timestamp: String, message: String)

def parseLine(line: String): Option[LogEntry] = {
  totalLines.add(1)
  bytesProcessed.add(line.getBytes.length)
  
  if (line.trim.isEmpty) {
    blankLines.add(1)
    return None
  }
  
  val pattern = """(\w+)\s+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+(.+)""".r
  
  line match {
    case pattern(level, timestamp, message) =>
      level.toUpperCase match {
        case "ERROR" => errorLines.add(1)
        case "WARN" => warnLines.add(1)
        case "INFO" => infoLines.add(1)
        case _ => // other levels
      }
      Some(LogEntry(level, timestamp, message))
    case _ =>
      parseFailures.add(1)
      None
  }
}

val logData = sc.parallelize(Seq(
  "ERROR 2024-01-15 10:30:45 Database connection failed",
  "INFO 2024-01-15 10:30:46 Retrying connection",
  "WARN 2024-01-15 10:30:47 Connection pool exhausted",
  "",
  "malformed log entry without proper format",
  "INFO 2024-01-15 10:30:48 Connection established"
))

val parsedLogs = logData.flatMap(parseLine).cache()
val errorLogs = parsedLogs.filter(_.level == "ERROR").collect()

println("=== Pipeline Metrics ===")
println(s"Total lines processed: ${totalLines.value}")
println(s"Blank lines skipped: ${blankLines.value}")
println(s"Parse failures: ${parseFailures.value}")
println(s"Bytes processed: ${bytesProcessed.value}")
println(s"\n=== Log Level Distribution ===")
println(s"ERROR: ${errorLines.value}")
println(s"WARN: ${warnLines.value}")
println(s"INFO: ${infoLines.value}")

Best Practices and Limitations

Use accumulators for debugging and monitoring, not business logic. If your application correctness depends on accumulator values, you’re using them wrong. They’re meant for side-channel metrics.

Always name your accumulators. Named accumulators appear in the Spark UI under the “Accumulators” tab, making debugging and monitoring much easier.

Prefer actions over transformations for accuracy. If you need exact counts, update accumulators inside foreach, foreachPartition, or similar actions.

Cache RDDs when accumulator accuracy matters. This prevents recomputation and duplicate updates.

Don’t collect too much data. CollectionAccumulator brings all collected values to the driver. For large datasets, this causes OutOfMemoryErrors.

Consider alternatives for complex aggregations. If you need the aggregated value for further distributed computation, use reduce, aggregate, or DataFrame aggregations instead. Accumulators only make values available on the driver.

Thread safety is handled by Spark. You don’t need to synchronize accumulator updates—Spark manages this internally.

Accumulators fill a specific niche: lightweight, fire-and-forget metrics collection during distributed processing. Use them for that purpose, and they’re invaluable. Try to bend them into something else, and you’ll fight the framework.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.