Spark Scala - RDD Operations

Resilient Distributed Datasets (RDDs) are Spark's original abstraction for distributed data processing. While DataFrames and Datasets have become the preferred API for most workloads, understanding...

Key Insights

  • RDDs are immutable, lazily-evaluated distributed collections that form the foundation of Spark’s fault-tolerant processing model—understanding them deeply makes you better at DataFrames too
  • The distinction between narrow transformations (no shuffle) and wide transformations (shuffle required) is the single most important factor in Spark job performance
  • Always prefer reduceByKey over groupByKey for aggregations; the former reduces data locally before shuffling, often cutting network transfer by 90% or more

Introduction to RDDs

Resilient Distributed Datasets (RDDs) are Spark’s original abstraction for distributed data processing. While DataFrames and Datasets have become the preferred API for most workloads, understanding RDDs remains essential. They’re what DataFrames compile down to, and certain operations—particularly those involving complex custom logic—still require dropping to the RDD level.

Three properties define RDDs:

Immutability: Once created, an RDD cannot be modified. Transformations create new RDDs. This simplifies reasoning about distributed state and enables fault recovery.

Lazy Evaluation: Transformations don’t execute immediately. Spark builds a Directed Acyclic Graph (DAG) of operations and only executes when an action is called. This allows the optimizer to plan the entire computation.

Fault Tolerance: RDDs track their lineage—the sequence of transformations that created them. If a partition is lost, Spark recomputes it from the original data using this lineage.

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("RDD Operations").setMaster("local[*]")
val sc = new SparkContext(conf)

// Creating RDD from a collection
val numbersRDD = sc.parallelize(Seq(1, 2, 3, 4, 5), numSlices = 4)

// Creating RDD from an external file
val logsRDD = sc.textFile("hdfs:///logs/application.log")

// Creating RDD from multiple files with wildcards
val allLogsRDD = sc.textFile("hdfs:///logs/2024-*/*.log")

The numSlices parameter controls partitioning. For files, Spark uses HDFS block boundaries by default, typically creating one partition per 128MB block.

Transformations: Narrow Operations

Narrow transformations operate on data within a single partition. No data movement between executors is required, making these operations fast.

map: Applies a function to each element, returning a new RDD of the same size.

filter: Returns elements that satisfy a predicate.

flatMap: Like map, but each input can produce zero or more outputs.

mapPartitions: Operates on entire partitions, useful when you need to amortize setup costs.

case class LogEntry(timestamp: Long, level: String, message: String)

val rawLogs = sc.textFile("hdfs:///logs/app.log")

// Parse log lines with flatMap (handles malformed lines gracefully)
val parsedLogs: RDD[LogEntry] = rawLogs.flatMap { line =>
  val parts = line.split("\\|")
  if (parts.length >= 3) {
    try {
      Some(LogEntry(parts(0).toLong, parts(1).trim, parts(2)))
    } catch {
      case _: NumberFormatException => None
    }
  } else None
}

// Filter to errors only
val errorLogs = parsedLogs.filter(_.level == "ERROR")

// Extract just the messages
val errorMessages = errorLogs.map(_.message)

When you need expensive initialization (database connections, ML models), use mapPartitions:

val enrichedLogs = parsedLogs.mapPartitions { partition =>
  // Initialize once per partition, not once per record
  val geoService = new GeoLocationService()
  
  partition.map { entry =>
    val location = geoService.lookup(entry.message)
    (entry, location)
  }
}

This pattern reduces initialization overhead from millions of times to hundreds (one per partition).

Transformations: Wide Operations

Wide transformations require data shuffling across the cluster. Understanding their cost is critical for performance.

groupByKey: Groups all values for each key. Transfers all data across the network.

reduceByKey: Combines values locally before shuffling. Dramatically more efficient for associative operations.

join: Combines two RDDs by key. Requires shuffling both datasets.

repartition: Redistributes data across partitions. Always triggers a full shuffle.

The classic word count demonstrates why reduceByKey dominates groupByKey:

val text = sc.textFile("hdfs:///data/books/*.txt")
val words = text.flatMap(_.split("\\s+")).map(word => (word.toLowerCase, 1))

// BAD: groupByKey transfers ALL (word, 1) pairs across network
val wordCountsBad = words
  .groupByKey()                    // Shuffles everything
  .mapValues(_.sum)                // Then sums locally

// GOOD: reduceByKey combines locally first
val wordCountsGood = words
  .reduceByKey(_ + _)              // Local combine, then shuffle reduced data

// For 1 billion words with 100K unique words:
// groupByKey shuffles: ~1 billion records
// reduceByKey shuffles: ~100K records (after local reduction)

For joins, consider broadcast joins when one dataset is small:

val userActions = sc.textFile("hdfs:///data/actions.csv")
  .map(line => { val p = line.split(","); (p(0), p(1)) })

val userProfiles = sc.textFile("hdfs:///data/profiles.csv")
  .map(line => { val p = line.split(","); (p(0), p(1)) })
  .collectAsMap()

// Broadcast small dataset to all executors
val profilesBroadcast = sc.broadcast(userProfiles)

// Map-side join: no shuffle required
val enrichedActions = userActions.map { case (userId, action) =>
  val profile = profilesBroadcast.value.getOrElse(userId, "unknown")
  (userId, action, profile)
}

Actions: Triggering Computation

Actions force Spark to execute the DAG and return results. Until an action is called, nothing happens.

val processedData = rawLogs
  .flatMap(parseLine)
  .filter(_.level == "ERROR")
  .map(_.message)
// Nothing executed yet—just a DAG definition

// Actions that return data to the driver
val count: Long = processedData.count()
val first10: Array[String] = processedData.take(10)
val sample: Array[String] = processedData.takeSample(withReplacement = false, 100)

// Dangerous on large datasets—pulls everything to driver
val allData: Array[String] = processedData.collect()

// Aggregation actions
val totalLength: Int = processedData.map(_.length).reduce(_ + _)

// Side-effect actions
processedData.foreach(msg => sendAlert(msg))  // Runs on executors

// Output actions
processedData.saveAsTextFile("hdfs:///output/errors")
processedData.coalesce(1).saveAsTextFile("hdfs:///output/errors-single")

Use coalesce before writing when you want fewer output files. Unlike repartition, coalesce avoids a full shuffle when reducing partitions.

Pair RDD Operations

Key-value pair RDDs unlock powerful operations for grouped processing.

val salesData: RDD[(String, Double)] = sc.textFile("hdfs:///sales.csv")
  .map(line => {
    val parts = line.split(",")
    (parts(0), parts(1).toDouble)  // (product, amount)
  })

// Basic pair operations
val products: RDD[String] = salesData.keys
val amounts: RDD[Double] = salesData.values
val doubled: RDD[(String, Double)] = salesData.mapValues(_ * 2)

combineByKey is the most flexible aggregation primitive. Here’s computing average sales per product:

val averageSales: RDD[(String, Double)] = salesData.combineByKey(
  // createCombiner: first value for a key in a partition
  (amount: Double) => (amount, 1),
  
  // mergeValue: add a value to an existing accumulator
  (acc: (Double, Int), amount: Double) => (acc._1 + amount, acc._2 + 1),
  
  // mergeCombiners: combine accumulators from different partitions
  (acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues { case (sum, count) => sum / count }

cogroup combines multiple RDDs by key, useful for complex joins:

val orders: RDD[(String, Order)] = // ...
val returns: RDD[(String, Return)] = // ...

val combined: RDD[(String, (Iterable[Order], Iterable[Return]))] = 
  orders.cogroup(returns)

val netSales = combined.mapValues { case (orders, returns) =>
  orders.map(_.amount).sum - returns.map(_.amount).sum
}

Partitioning and Persistence

Controlling partitioning prevents unnecessary shuffles. Persistence avoids recomputation.

import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel

// Partition by key for efficient joins
val partitionedSales = salesData.partitionBy(new HashPartitioner(100))

// Subsequent operations on the same keys won't shuffle
val joinedData = partitionedSales.join(otherDataWithSamePartitioner)

Choose storage levels based on your constraints:

// Memory only—fast but may spill or recompute
expensiveRDD.cache()  // Equivalent to persist(MEMORY_ONLY)

// Memory + disk—won't lose data if memory is tight
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK)

// Serialized—uses less memory but more CPU
expensiveRDD.persist(StorageLevel.MEMORY_ONLY_SER)

// Replicated—for critical data or unstable clusters
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_2)

// Unpersist when done
expensiveRDD.unpersist()

Cache RDDs that are used multiple times. Don’t cache RDDs used only once—it wastes memory.

Best Practices and Common Pitfalls

Here’s a real-world refactoring example:

// BEFORE: Multiple anti-patterns
val result = sc.textFile("hdfs:///huge-file.csv")
  .map(parseLine)
  .groupByKey()                           // Anti-pattern 1: groupByKey
  .mapValues(values => values.sum)
  .collect()                              // Anti-pattern 2: collect on large data
  .foreach(println)                       // Anti-pattern 3: driver-side iteration

// AFTER: Optimized version
sc.textFile("hdfs:///huge-file.csv")
  .map(parseLine)
  .reduceByKey(_ + _)                     // Local reduction before shuffle
  .foreachPartition { partition =>        // Process on executors
    partition.foreach(record => println(record))
  }

Handle skewed data by salting keys:

val skewedData: RDD[(String, Int)] = // One key has 90% of data

val salted = skewedData.map { case (key, value) =>
  val salt = scala.util.Random.nextInt(100)
  ((key, salt), value)
}

val partialAgg = salted.reduceByKey(_ + _)

val finalAgg = partialAgg.map { case ((key, _), value) => (key, value) }
  .reduceByKey(_ + _)

Key rules to remember: minimize shuffles, prefer narrow transformations, never call collect() on production data, and always check the Spark UI to understand your job’s execution plan.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.