Scala - Concurrent Collections

• Scala's concurrent collections provide thread-safe operations without explicit locking, using lock-free algorithms and compare-and-swap operations for better performance than synchronized...

Key Insights

• Scala’s concurrent collections provide thread-safe operations without explicit locking, using lock-free algorithms and compare-and-swap operations for better performance than synchronized alternatives • The TrieMap is Scala’s primary concurrent collection, offering O(log n) operations with snapshot isolation that enables consistent iteration even during concurrent modifications • Understanding the trade-offs between concurrent collections and synchronized wrappers is critical—concurrent collections excel in high-contention scenarios but have higher memory overhead and don’t support all standard collection operations

Understanding Concurrent Collections in Scala

Scala provides concurrent collections in the scala.collection.concurrent package, designed for safe access from multiple threads without external synchronization. Unlike synchronized collections that use coarse-grained locking, concurrent collections employ lock-free algorithms that allow multiple threads to operate simultaneously.

The primary concurrent collection in Scala is TrieMap, a hash array mapped trie (HAMT) that supports lock-free concurrent operations. Here’s a basic example:

import scala.collection.concurrent.TrieMap

val cache = TrieMap[String, Int]()

// Thread-safe operations
cache.put("key1", 100)
cache.putIfAbsent("key2", 200)
cache.get("key1") // Some(100)

TrieMap: Architecture and Performance

TrieMap uses a tree structure where nodes are updated atomically using compare-and-swap (CAS) operations. This enables non-blocking reads and writes that scale linearly with the number of processors.

import scala.collection.concurrent.TrieMap
import scala.concurrent.{Future, ExecutionContext}
import ExecutionContext.Implicits.global

val metrics = TrieMap[String, Long]()

def incrementCounter(key: String): Unit = {
  metrics.get(key) match {
    case Some(value) => 
      // Atomic update using putIfAbsent pattern
      if (!metrics.replace(key, value, value + 1)) {
        incrementCounter(key) // Retry on failure
      }
    case None => 
      metrics.putIfAbsent(key, 1L)
  }
}

// Concurrent updates from multiple threads
val futures = (1 to 1000).map { i =>
  Future {
    incrementCounter("requests")
    incrementCounter(s"user_${i % 10}")
  }
}

Future.sequence(futures).foreach { _ =>
  println(s"Total requests: ${metrics("requests")}")
  println(s"Unique keys: ${metrics.size}")
}

Snapshot Isolation and Consistent Iteration

One of TrieMap’s most powerful features is snapshot isolation. When you create a snapshot, you get a consistent view of the collection at that point in time, unaffected by subsequent modifications.

import scala.collection.concurrent.TrieMap

val activeConnections = TrieMap[String, Long]()

// Simulate connections being added
(1 to 100).foreach { i =>
  activeConnections.put(s"conn_$i", System.currentTimeMillis())
}

// Create a snapshot for reporting
val snapshot = activeConnections.snapshot()

// Continue modifying the original
activeConnections.put("conn_101", System.currentTimeMillis())
activeConnections.remove("conn_1")

// Snapshot remains unchanged
println(s"Original size: ${activeConnections.size}") // 100
println(s"Snapshot size: ${snapshot.size}")          // 100
println(s"Snapshot has conn_1: ${snapshot.contains("conn_1")}") // true

Snapshots are implemented using a copy-on-write mechanism at the structural level, making them efficient for read-heavy workloads where you need consistent views.

Atomic Operations and Conditional Updates

TrieMap provides several atomic operations that are essential for building thread-safe algorithms without explicit locking:

import scala.collection.concurrent.TrieMap

case class UserSession(userId: String, loginTime: Long, requestCount: Int)

val sessions = TrieMap[String, UserSession]()

def recordRequest(userId: String): Unit = {
  sessions.get(userId) match {
    case Some(session) =>
      val updated = session.copy(requestCount = session.requestCount + 1)
      // Only update if value hasn't changed
      if (!sessions.replace(userId, session, updated)) {
        recordRequest(userId) // Retry
      }
    case None =>
      val newSession = UserSession(userId, System.currentTimeMillis(), 1)
      sessions.putIfAbsent(userId, newSession)
  }
}

def getOrCreateSession(userId: String): UserSession = {
  sessions.getOrElseUpdate(userId, {
    UserSession(userId, System.currentTimeMillis(), 0)
  })
}

// Conditional removal
def expireSession(userId: String, expectedSession: UserSession): Boolean = {
  sessions.remove(userId, expectedSession)
}

Building a Thread-Safe Cache

Here’s a practical implementation of a concurrent LRU-like cache using TrieMap:

import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._

case class CacheEntry[V](value: V, timestamp: Long, accessCount: Int)

class ConcurrentCache[K, V](maxSize: Int, ttl: Duration) {
  private val cache = TrieMap[K, CacheEntry[V]]()
  
  def get(key: K): Option[V] = {
    cache.get(key).flatMap { entry =>
      val now = System.currentTimeMillis()
      if (now - entry.timestamp > ttl.toMillis) {
        cache.remove(key, entry)
        None
      } else {
        // Update access count atomically
        val updated = entry.copy(accessCount = entry.accessCount + 1)
        cache.replace(key, entry, updated)
        Some(entry.value)
      }
    }
  }
  
  def put(key: K, value: V): Unit = {
    if (cache.size >= maxSize) {
      evictLeastUsed()
    }
    
    val entry = CacheEntry(value, System.currentTimeMillis(), 0)
    cache.put(key, entry)
  }
  
  private def evictLeastUsed(): Unit = {
    val snapshot = cache.snapshot()
    if (snapshot.nonEmpty) {
      val (keyToRemove, _) = snapshot.minBy(_._2.accessCount)
      cache.remove(keyToRemove)
    }
  }
  
  def size: Int = cache.size
  
  def clear(): Unit = cache.clear()
}

// Usage
val userCache = new ConcurrentCache[String, String](1000, 5.minutes)

userCache.put("user123", "John Doe")
userCache.get("user123") // Some("John Doe")

Performance Considerations and Trade-offs

Concurrent collections have different performance characteristics than their synchronized counterparts:

import scala.collection.concurrent.TrieMap
import scala.collection.mutable

// Concurrent collection - better for high contention
val concurrentMap = TrieMap[String, Int]()

// Synchronized wrapper - simpler but coarser locking
val syncMap = collection.mutable.HashMap[String, Int]()
val synchronizedMap = scala.collection.mutable.Map.empty[String, Int].synchronized

// Benchmark helper
def benchmark(name: String)(op: => Unit): Unit = {
  val start = System.nanoTime()
  op
  val duration = (System.nanoTime() - start) / 1000000
  println(s"$name: ${duration}ms")
}

// High contention scenario
import scala.concurrent.{Future, Await, ExecutionContext}
import ExecutionContext.Implicits.global
import scala.concurrent.duration._

benchmark("TrieMap concurrent writes") {
  val futures = (1 to 10000).map { i =>
    Future { concurrentMap.put(s"key_$i", i) }
  }
  Await.result(Future.sequence(futures), 10.seconds)
}

benchmark("Synchronized map concurrent writes") {
  val futures = (1 to 10000).map { i =>
    Future { 
      synchronizedMap.synchronized {
        synchronizedMap.put(s"key_$i", i)
      }
    }
  }
  Await.result(Future.sequence(futures), 10.seconds)
}

Integration with Futures and Parallel Collections

Concurrent collections integrate naturally with Scala’s concurrency primitives:

import scala.collection.concurrent.TrieMap
import scala.concurrent.{Future, ExecutionContext}
import ExecutionContext.Implicits.global

case class ProcessingResult(id: String, status: String, duration: Long)

val results = TrieMap[String, ProcessingResult]()

def processItems(items: Seq[String]): Future[Unit] = {
  val futures = items.map { item =>
    Future {
      val start = System.currentTimeMillis()
      // Simulate processing
      Thread.sleep(100)
      val duration = System.currentTimeMillis() - start
      
      results.put(item, ProcessingResult(item, "completed", duration))
    }
  }
  
  Future.sequence(futures).map(_ => ())
}

// Process and aggregate
val items = (1 to 50).map(i => s"item_$i")
processItems(items).foreach { _ =>
  val snapshot = results.snapshot()
  val avgDuration = snapshot.values.map(_.duration).sum / snapshot.size
  println(s"Processed ${snapshot.size} items, avg duration: ${avgDuration}ms")
}

When to Use Concurrent Collections

Use concurrent collections when:

  • Multiple threads frequently read and write the same collection
  • You need lock-free operations for better scalability
  • Snapshot isolation is valuable for consistent iteration
  • Performance under high contention is critical

Avoid them when:

  • Single-threaded access is the norm (use regular mutable collections)
  • You need the full range of collection operations (concurrent collections have limited APIs)
  • Memory overhead is a concern (TrieMap uses more memory than HashMap)

Concurrent collections are specialized tools. For simple thread-safety needs, synchronized wrappers or immutable collections with atomic references often suffice. For high-performance concurrent scenarios, TrieMap and its atomic operations provide the necessary guarantees without sacrificing throughput.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.