Scala - Futures and Promises (Concurrency)
Futures in Scala provide a clean abstraction for asynchronous computation. A Future represents a value that may not yet be available, allowing you to write non-blocking code without callback hell.
Key Insights
- Futures represent values that will be available asynchronously, while Promises are writable containers that complete exactly one Future—understanding this producer-consumer relationship is fundamental to Scala concurrency
- Proper error handling with
recover,recoverWith, andfallbackToprevents silent failures and enables robust async pipelines, while combinators likeflatMapandsequencecompose complex concurrent workflows - ExecutionContext configuration directly impacts performance—global context works for I/O-bound tasks, but CPU-intensive operations require dedicated thread pools to prevent resource starvation
Understanding Futures and Promises
Futures in Scala provide a clean abstraction for asynchronous computation. A Future represents a value that may not yet be available, allowing you to write non-blocking code without callback hell.
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}
// Simple Future creation
val futureResult: Future[Int] = Future {
Thread.sleep(1000)
42
}
// Non-blocking callback
futureResult.onComplete {
case Success(value) => println(s"Result: $value")
case Failure(exception) => println(s"Failed: ${exception.getMessage}")
}
The ExecutionContext determines where the Future executes. The global context uses a ForkJoinPool, suitable for most scenarios but not optimal for all workloads.
Promise: The Writable Side of Future
A Promise is a writable, single-assignment container that completes a Future. While Futures are read-only views of async computations, Promises let you manually control completion.
import scala.concurrent.Promise
def fetchUserData(userId: String): Future[User] = {
val promise = Promise[User]()
// Simulate async operation
Future {
try {
val user = database.findUser(userId)
promise.success(user)
} catch {
case e: Exception => promise.failure(e)
}
}
promise.future
}
// Alternative: completing with another Future
def fetchWithTimeout(userId: String): Future[User] = {
val promise = Promise[User]()
val dataFuture = database.findUser(userId)
val timeoutFuture = Future {
Thread.sleep(5000)
throw new TimeoutException("Request timed out")
}
promise.completeWith(dataFuture)
// Whichever completes first wins
promise.future
}
Promises are particularly useful when integrating with callback-based APIs or implementing custom async patterns.
Composing Futures with Combinators
Futures shine when composed using functional combinators. Avoid blocking with Await.result—use transformations instead.
case class User(id: String, name: String)
case class Order(userId: String, amount: Double)
case class Invoice(user: User, orders: List[Order], total: Double)
def getUser(id: String): Future[User] = Future {
User(id, "John Doe")
}
def getOrders(userId: String): Future[List[Order]] = Future {
List(Order(userId, 100.0), Order(userId, 250.0))
}
// Sequential composition with flatMap
def getUserInvoice(userId: String): Future[Invoice] = {
getUser(userId).flatMap { user =>
getOrders(user.id).map { orders =>
val total = orders.map(_.amount).sum
Invoice(user, orders, total)
}
}
}
// For-comprehension syntax (desugars to flatMap/map)
def getUserInvoiceClean(userId: String): Future[Invoice] = for {
user <- getUser(userId)
orders <- getOrders(user.id)
total = orders.map(_.amount).sum
} yield Invoice(user, orders, total)
The key difference: map transforms the value inside a Future, while flatMap prevents nesting Futures within Futures.
Parallel Execution with Future.sequence
When you need to execute multiple independent Futures concurrently, use Future.sequence or Future.traverse.
import scala.concurrent.Future
def fetchMultipleUsers(userIds: List[String]): Future[List[User]] = {
val futures: List[Future[User]] = userIds.map(id => getUser(id))
Future.sequence(futures)
}
// traverse combines map and sequence
def fetchMultipleUsersTraverse(userIds: List[String]): Future[List[User]] = {
Future.traverse(userIds)(id => getUser(id))
}
// Handling partial failures
def fetchUsersWithFallback(userIds: List[String]): Future[List[Option[User]]] = {
val futures = userIds.map { id =>
getUser(id).map(Some(_)).recover { case _ => None }
}
Future.sequence(futures)
}
Both approaches execute Futures in parallel (up to thread pool limits), but sequence requires pre-created Futures while traverse creates them lazily.
Error Handling Strategies
Futures fail when exceptions occur. Handle errors explicitly to prevent silent failures.
def fetchDataWithRecovery(id: String): Future[String] = {
getUser(id)
.map(_.name)
.recover {
case _: NoSuchElementException => "Unknown User"
case _: TimeoutException => "Service Unavailable"
}
}
// recoverWith for async recovery
def fetchWithFallback(id: String): Future[User] = {
getUser(id).recoverWith {
case _: DatabaseException =>
// Fallback to cache
getCachedUser(id)
}
}
// fallbackTo tries alternative Future on failure
def fetchReliable(id: String): Future[User] = {
getPrimaryDatabase(id).fallbackTo(getSecondaryDatabase(id))
}
// Transform both success and failure
def fetchAndLog(id: String): Future[User] = {
getUser(id).transform {
case Success(user) =>
logger.info(s"Fetched user: ${user.id}")
Success(user)
case Failure(ex) =>
logger.error(s"Failed to fetch user: $id", ex)
Failure(ex)
}
}
The transform method is particularly powerful for side effects like logging without altering the Future’s outcome.
Custom ExecutionContext Configuration
The default global ExecutionContext isn’t always appropriate. CPU-bound tasks can starve I/O operations, and blocking calls can exhaust thread pools.
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
// Dedicated thread pool for blocking I/O
val ioExecutionContext: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(32))
// CPU-bound operations
val cpuExecutionContext: ExecutionContext =
ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
)
def performBlockingIO(): Future[String] = Future {
// Blocking database call
database.blockingQuery()
}(ioExecutionContext)
def performComputation(data: List[Int]): Future[Int] = Future {
// CPU-intensive calculation
data.foldLeft(0)(_ + _)
}(cpuExecutionContext)
For production systems, use monitored thread pools that expose metrics and prevent unbounded growth.
Real-World Pattern: Concurrent API Aggregation
Here’s a practical example combining multiple techniques to aggregate data from different services.
case class UserProfile(user: User, orders: List[Order], preferences: Preferences)
def aggregateUserProfile(userId: String): Future[UserProfile] = {
val userFuture = getUser(userId)
val ordersFuture = getOrders(userId)
val prefsFuture = getPreferences(userId)
for {
user <- userFuture
orders <- ordersFuture.recover { case _ => List.empty }
prefs <- prefsFuture.fallbackTo(Future.successful(Preferences.default))
} yield UserProfile(user, orders, prefs)
}
// With timeout handling
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
def aggregateWithTimeout(userId: String): Future[UserProfile] = {
val result = aggregateUserProfile(userId)
val timeout = Future {
Thread.sleep(3000)
throw new TimeoutException("Profile aggregation timed out")
}
Future.firstCompletedOf(Seq(result, timeout))
}
This pattern executes three API calls concurrently, handles partial failures gracefully, and enforces timeouts—essential for responsive microservices.
Blocking and Awaiting Results
Sometimes you must block, typically at application boundaries. Use Await sparingly and always specify timeouts.
import scala.concurrent.Await
import scala.concurrent.duration._
// Only at application edges
def main(args: Array[String]): Unit = {
val result = getUserInvoice("user123")
try {
val invoice = Await.result(result, 10.seconds)
println(s"Invoice total: ${invoice.total}")
} catch {
case _: TimeoutException =>
println("Request timed out")
case e: Exception =>
println(s"Error: ${e.getMessage}")
}
}
Inside async workflows, never use Await—it defeats the purpose of non-blocking code and can cause deadlocks in fixed-size thread pools.