Scala - Concurrent Collections
• Scala's concurrent collections provide thread-safe operations without explicit locking, using lock-free algorithms and compare-and-swap operations for better performance than synchronized...
Key Insights
• Scala’s concurrent collections provide thread-safe operations without explicit locking, using lock-free algorithms and compare-and-swap operations for better performance than synchronized alternatives • The TrieMap is Scala’s primary concurrent collection, offering O(log n) operations with snapshot isolation that enables consistent iteration even during concurrent modifications • Understanding the trade-offs between concurrent collections and synchronized wrappers is critical—concurrent collections excel in high-contention scenarios but have higher memory overhead and don’t support all standard collection operations
Understanding Concurrent Collections in Scala
Scala provides concurrent collections in the scala.collection.concurrent package, designed for safe access from multiple threads without external synchronization. Unlike synchronized collections that use coarse-grained locking, concurrent collections employ lock-free algorithms that allow multiple threads to operate simultaneously.
The primary concurrent collection in Scala is TrieMap, a hash array mapped trie (HAMT) that supports lock-free concurrent operations. Here’s a basic example:
import scala.collection.concurrent.TrieMap
val cache = TrieMap[String, Int]()
// Thread-safe operations
cache.put("key1", 100)
cache.putIfAbsent("key2", 200)
cache.get("key1") // Some(100)
TrieMap: Architecture and Performance
TrieMap uses a tree structure where nodes are updated atomically using compare-and-swap (CAS) operations. This enables non-blocking reads and writes that scale linearly with the number of processors.
import scala.collection.concurrent.TrieMap
import scala.concurrent.{Future, ExecutionContext}
import ExecutionContext.Implicits.global
val metrics = TrieMap[String, Long]()
def incrementCounter(key: String): Unit = {
metrics.get(key) match {
case Some(value) =>
// Atomic update using putIfAbsent pattern
if (!metrics.replace(key, value, value + 1)) {
incrementCounter(key) // Retry on failure
}
case None =>
metrics.putIfAbsent(key, 1L)
}
}
// Concurrent updates from multiple threads
val futures = (1 to 1000).map { i =>
Future {
incrementCounter("requests")
incrementCounter(s"user_${i % 10}")
}
}
Future.sequence(futures).foreach { _ =>
println(s"Total requests: ${metrics("requests")}")
println(s"Unique keys: ${metrics.size}")
}
Snapshot Isolation and Consistent Iteration
One of TrieMap’s most powerful features is snapshot isolation. When you create a snapshot, you get a consistent view of the collection at that point in time, unaffected by subsequent modifications.
import scala.collection.concurrent.TrieMap
val activeConnections = TrieMap[String, Long]()
// Simulate connections being added
(1 to 100).foreach { i =>
activeConnections.put(s"conn_$i", System.currentTimeMillis())
}
// Create a snapshot for reporting
val snapshot = activeConnections.snapshot()
// Continue modifying the original
activeConnections.put("conn_101", System.currentTimeMillis())
activeConnections.remove("conn_1")
// Snapshot remains unchanged
println(s"Original size: ${activeConnections.size}") // 100
println(s"Snapshot size: ${snapshot.size}") // 100
println(s"Snapshot has conn_1: ${snapshot.contains("conn_1")}") // true
Snapshots are implemented using a copy-on-write mechanism at the structural level, making them efficient for read-heavy workloads where you need consistent views.
Atomic Operations and Conditional Updates
TrieMap provides several atomic operations that are essential for building thread-safe algorithms without explicit locking:
import scala.collection.concurrent.TrieMap
case class UserSession(userId: String, loginTime: Long, requestCount: Int)
val sessions = TrieMap[String, UserSession]()
def recordRequest(userId: String): Unit = {
sessions.get(userId) match {
case Some(session) =>
val updated = session.copy(requestCount = session.requestCount + 1)
// Only update if value hasn't changed
if (!sessions.replace(userId, session, updated)) {
recordRequest(userId) // Retry
}
case None =>
val newSession = UserSession(userId, System.currentTimeMillis(), 1)
sessions.putIfAbsent(userId, newSession)
}
}
def getOrCreateSession(userId: String): UserSession = {
sessions.getOrElseUpdate(userId, {
UserSession(userId, System.currentTimeMillis(), 0)
})
}
// Conditional removal
def expireSession(userId: String, expectedSession: UserSession): Boolean = {
sessions.remove(userId, expectedSession)
}
Building a Thread-Safe Cache
Here’s a practical implementation of a concurrent LRU-like cache using TrieMap:
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._
case class CacheEntry[V](value: V, timestamp: Long, accessCount: Int)
class ConcurrentCache[K, V](maxSize: Int, ttl: Duration) {
private val cache = TrieMap[K, CacheEntry[V]]()
def get(key: K): Option[V] = {
cache.get(key).flatMap { entry =>
val now = System.currentTimeMillis()
if (now - entry.timestamp > ttl.toMillis) {
cache.remove(key, entry)
None
} else {
// Update access count atomically
val updated = entry.copy(accessCount = entry.accessCount + 1)
cache.replace(key, entry, updated)
Some(entry.value)
}
}
}
def put(key: K, value: V): Unit = {
if (cache.size >= maxSize) {
evictLeastUsed()
}
val entry = CacheEntry(value, System.currentTimeMillis(), 0)
cache.put(key, entry)
}
private def evictLeastUsed(): Unit = {
val snapshot = cache.snapshot()
if (snapshot.nonEmpty) {
val (keyToRemove, _) = snapshot.minBy(_._2.accessCount)
cache.remove(keyToRemove)
}
}
def size: Int = cache.size
def clear(): Unit = cache.clear()
}
// Usage
val userCache = new ConcurrentCache[String, String](1000, 5.minutes)
userCache.put("user123", "John Doe")
userCache.get("user123") // Some("John Doe")
Performance Considerations and Trade-offs
Concurrent collections have different performance characteristics than their synchronized counterparts:
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
// Concurrent collection - better for high contention
val concurrentMap = TrieMap[String, Int]()
// Synchronized wrapper - simpler but coarser locking
val syncMap = collection.mutable.HashMap[String, Int]()
val synchronizedMap = scala.collection.mutable.Map.empty[String, Int].synchronized
// Benchmark helper
def benchmark(name: String)(op: => Unit): Unit = {
val start = System.nanoTime()
op
val duration = (System.nanoTime() - start) / 1000000
println(s"$name: ${duration}ms")
}
// High contention scenario
import scala.concurrent.{Future, Await, ExecutionContext}
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
benchmark("TrieMap concurrent writes") {
val futures = (1 to 10000).map { i =>
Future { concurrentMap.put(s"key_$i", i) }
}
Await.result(Future.sequence(futures), 10.seconds)
}
benchmark("Synchronized map concurrent writes") {
val futures = (1 to 10000).map { i =>
Future {
synchronizedMap.synchronized {
synchronizedMap.put(s"key_$i", i)
}
}
}
Await.result(Future.sequence(futures), 10.seconds)
}
Integration with Futures and Parallel Collections
Concurrent collections integrate naturally with Scala’s concurrency primitives:
import scala.collection.concurrent.TrieMap
import scala.concurrent.{Future, ExecutionContext}
import ExecutionContext.Implicits.global
case class ProcessingResult(id: String, status: String, duration: Long)
val results = TrieMap[String, ProcessingResult]()
def processItems(items: Seq[String]): Future[Unit] = {
val futures = items.map { item =>
Future {
val start = System.currentTimeMillis()
// Simulate processing
Thread.sleep(100)
val duration = System.currentTimeMillis() - start
results.put(item, ProcessingResult(item, "completed", duration))
}
}
Future.sequence(futures).map(_ => ())
}
// Process and aggregate
val items = (1 to 50).map(i => s"item_$i")
processItems(items).foreach { _ =>
val snapshot = results.snapshot()
val avgDuration = snapshot.values.map(_.duration).sum / snapshot.size
println(s"Processed ${snapshot.size} items, avg duration: ${avgDuration}ms")
}
When to Use Concurrent Collections
Use concurrent collections when:
- Multiple threads frequently read and write the same collection
- You need lock-free operations for better scalability
- Snapshot isolation is valuable for consistent iteration
- Performance under high contention is critical
Avoid them when:
- Single-threaded access is the norm (use regular mutable collections)
- You need the full range of collection operations (concurrent collections have limited APIs)
- Memory overhead is a concern (TrieMap uses more memory than HashMap)
Concurrent collections are specialized tools. For simple thread-safety needs, synchronized wrappers or immutable collections with atomic references often suffice. For high-performance concurrent scenarios, TrieMap and its atomic operations provide the necessary guarantees without sacrificing throughput.