Rust Crossbeam: Lock-Free Concurrent Tools
Traditional mutex-based concurrency works well until it doesn't. Under high contention, threads spend more time waiting for locks than doing actual work. Lock-free programming sidesteps this by using...
Key Insights
- Crossbeam provides lock-free data structures and channels that significantly outperform standard library primitives in high-contention scenarios, often by 2-10x depending on workload patterns.
- Epoch-based memory reclamation solves the fundamental challenge of safely freeing memory in lock-free structures without garbage collection, enabling custom concurrent data structures in Rust.
- Scoped threads eliminate the need for
Arcwhen sharing stack data across threads, resulting in cleaner code and better performance for structured concurrent workloads.
Introduction to Lock-Free Programming
Traditional mutex-based concurrency works well until it doesn’t. Under high contention, threads spend more time waiting for locks than doing actual work. Lock-free programming sidesteps this by using atomic operations to coordinate access, allowing threads to make progress independently.
The tradeoff is complexity. Lock-free algorithms are notoriously difficult to implement correctly. Memory ordering, the ABA problem, and safe memory reclamation create a minefield for developers.
Crossbeam abstracts these challenges into battle-tested primitives. It’s the de facto standard for high-performance concurrency in Rust, providing channels, queues, and memory management tools that would take months to implement correctly from scratch.
[dependencies]
crossbeam = "0.8"
crossbeam-channel = "0.5"
crossbeam-epoch = "0.9"
crossbeam-utils = "0.8"
Crossbeam Channels: Beyond std::sync::mpsc
The standard library’s mpsc channel has limitations: single consumer only, no select! macro, and performance that degrades under contention. Crossbeam channels fix all of these.
Bounded channels provide backpressure—when the buffer fills, senders block. This prevents fast producers from overwhelming slow consumers and consuming unbounded memory. Unbounded channels buffer indefinitely, useful when you can’t afford to block producers.
The select! macro multiplexes across multiple channels, similar to Go’s select statement. This enables complex coordination patterns without spawning additional threads.
use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::thread;
use std::time::Duration;
fn worker_pool_with_backpressure() {
// Bounded channel limits in-flight work to 100 items
let (task_tx, task_rx): (Sender<u64>, Receiver<u64>) = bounded(100);
let (result_tx, result_rx): (Sender<u64>, Receiver<u64>) = bounded(100);
// Spawn worker threads
let workers: Vec<_> = (0..4)
.map(|id| {
let task_rx = task_rx.clone();
let result_tx = result_tx.clone();
thread::spawn(move || {
for task in task_rx {
// Simulate processing
let result = task * 2;
thread::sleep(Duration::from_micros(100));
if result_tx.send(result).is_err() {
break;
}
}
println!("Worker {} shutting down", id);
})
})
.collect();
drop(task_rx); // Workers own the receiver now
drop(result_tx); // Main thread only reads results
// Producer with timeout handling
thread::spawn(move || {
for i in 0..1000 {
select! {
send(task_tx, i) -> res => {
if res.is_err() { break; }
}
default(Duration::from_secs(1)) => {
println!("Send timeout - workers overwhelmed");
break;
}
}
}
});
// Collect results
let results: Vec<u64> = result_rx.iter().collect();
println!("Processed {} results", results.len());
for worker in workers {
let _ = worker.join();
}
}
Lock-Free Data Structures: ArrayQueue and SegQueue
Crossbeam provides two concurrent queue implementations optimized for different scenarios.
ArrayQueue is a fixed-capacity, lock-free queue backed by a contiguous array. It’s cache-friendly and has predictable memory usage, making it ideal for bounded work queues where you know the maximum size upfront.
SegQueue is an unbounded, lock-free queue that grows dynamically. It uses linked segments internally, trading some cache locality for flexibility. Use it when you can’t predict queue depth.
use crossbeam_deque::{Injector, Stealer, Worker};
use std::sync::Arc;
use std::thread;
fn work_stealing_pool() {
// Global queue for new tasks
let injector: Arc<Injector<u64>> = Arc::new(Injector::new());
// Per-worker local queues
let workers: Vec<Worker<u64>> = (0..4).map(|_| Worker::new_fifo()).collect();
let stealers: Vec<Stealer<u64>> = workers.iter().map(|w| w.stealer()).collect();
// Inject initial work
for i in 0..1000 {
injector.push(i);
}
let stealers = Arc::new(stealers);
let handles: Vec<_> = workers
.into_iter()
.enumerate()
.map(|(id, worker)| {
let injector = Arc::clone(&injector);
let stealers = Arc::clone(&stealers);
thread::spawn(move || {
let mut processed = 0u64;
loop {
// Try local queue first
let task = worker.pop().or_else(|| {
// Try stealing from global queue
std::iter::repeat_with(|| injector.steal_batch_and_pop(&worker))
.find(|s| !s.is_retry())
.and_then(|s| s.success())
}).or_else(|| {
// Steal from other workers
stealers.iter()
.filter(|s| !s.is_empty())
.map(|s| s.steal())
.find(|s| !s.is_retry())
.and_then(|s| s.success())
});
match task {
Some(t) => {
processed += t;
}
None if injector.is_empty() => break,
None => thread::yield_now(),
}
}
println!("Worker {} processed sum: {}", id, processed);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
Epoch-Based Memory Reclamation
Lock-free data structures face a fundamental problem: when can you safely free removed nodes? Another thread might still hold a pointer to that memory. Traditional garbage collectors solve this, but Rust doesn’t have one.
Crossbeam uses epoch-based reclamation. The idea: divide time into epochs. Threads announce when they’re accessing shared data by “pinning” the current epoch. Memory is only freed when all threads have moved past the epoch where it was removed.
use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
use std::sync::atomic::Ordering::{Acquire, Release, Relaxed};
struct Node<T> {
data: T,
next: Atomic<Node<T>>,
}
pub struct LockFreeStack<T> {
head: Atomic<Node<T>>,
}
impl<T> LockFreeStack<T> {
pub fn new() -> Self {
LockFreeStack {
head: Atomic::null(),
}
}
pub fn push(&self, data: T) {
let mut node = Owned::new(Node {
data,
next: Atomic::null(),
});
let guard = epoch::pin();
loop {
let head = self.head.load(Acquire, &guard);
node.next.store(head, Relaxed);
match self.head.compare_exchange(
head,
node,
Release,
Relaxed,
&guard,
) {
Ok(_) => break,
Err(e) => node = e.new,
}
}
}
pub fn pop(&self) -> Option<T> {
let guard = epoch::pin();
loop {
let head = self.head.load(Acquire, &guard);
let head_ref = unsafe { head.as_ref() }?;
let next = head_ref.next.load(Relaxed, &guard);
if self.head
.compare_exchange(head, next, Release, Relaxed, &guard)
.is_ok()
{
// Safe to defer destruction - epoch system tracks readers
unsafe {
guard.defer_destroy(head);
}
// We need to extract data before the node is destroyed
// This is safe because we own the node now
return Some(unsafe { std::ptr::read(&head_ref.data) });
}
}
}
}
impl<T> Drop for LockFreeStack<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}
Scoped Threads and Structured Concurrency
Standard thread::spawn requires 'static bounds because the spawned thread might outlive the caller. This forces you to wrap shared data in Arc, even when you know the threads will join before the data goes out of scope.
Scoped threads solve this with a lifetime guarantee: all spawned threads must complete before the scope exits. This lets you borrow stack data directly.
use crossbeam_utils::thread;
fn parallel_array_processing() {
let mut data = vec![1u64; 1_000_000];
let chunk_size = data.len() / 4;
// No Arc needed - scope guarantees threads complete before data is dropped
thread::scope(|s| {
for chunk in data.chunks_mut(chunk_size) {
s.spawn(move |_| {
for item in chunk.iter_mut() {
*item = item.wrapping_mul(7).wrapping_add(3);
}
});
}
}).unwrap();
println!("First 5 results: {:?}", &data[..5]);
}
Utilities: Parker, ShardedLock, and WaitGroup
Crossbeam includes utilities for common synchronization patterns.
WaitGroup coordinates thread groups, similar to Go’s sync.WaitGroup. It’s cleaner than manually joining threads when you don’t need return values.
ShardedLock optimizes for read-heavy workloads by sharding the lock across CPU cores, reducing contention on the reader path.
use crossbeam_utils::sync::WaitGroup;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
fn phased_computation() {
let shared_state = Arc::new(AtomicU64::new(0));
for phase in 0..3 {
let wg = WaitGroup::new();
for worker_id in 0..4 {
let wg = wg.clone();
let state = Arc::clone(&shared_state);
thread::spawn(move || {
// Simulate phase work
let contribution = (phase * 10 + worker_id) as u64;
state.fetch_add(contribution, Ordering::Relaxed);
println!("Phase {} Worker {} complete", phase, worker_id);
drop(wg); // Signal completion
});
}
wg.wait(); // Block until all workers finish this phase
println!(
"Phase {} complete. State: {}",
phase,
shared_state.load(Ordering::Relaxed)
);
}
}
Benchmarks and When to Use Crossbeam
In my benchmarks, Crossbeam channels outperform std::sync::mpsc by 2-5x under contention. The gap widens with more producers and consumers.
use std::time::Instant;
fn benchmark_channels() {
let iterations = 1_000_000;
let producers = 4;
let items_per_producer = iterations / producers;
// Crossbeam bounded channel
let start = Instant::now();
{
let (tx, rx) = crossbeam_channel::bounded::<u64>(1024);
crossbeam_utils::thread::scope(|s| {
for _ in 0..producers {
let tx = tx.clone();
s.spawn(move |_| {
for i in 0..items_per_producer {
tx.send(i as u64).unwrap();
}
});
}
drop(tx);
s.spawn(|_| {
let mut count = 0;
while rx.recv().is_ok() {
count += 1;
}
assert_eq!(count, iterations);
});
}).unwrap();
}
let crossbeam_time = start.elapsed();
// std::sync::mpsc
let start = Instant::now();
{
let (tx, rx) = std::sync::mpsc::sync_channel::<u64>(1024);
crossbeam_utils::thread::scope(|s| {
for _ in 0..producers {
let tx = tx.clone();
s.spawn(move |_| {
for i in 0..items_per_producer {
tx.send(i as u64).unwrap();
}
});
}
drop(tx);
s.spawn(|_| {
let mut count = 0;
while rx.recv().is_ok() {
count += 1;
}
assert_eq!(count, iterations);
});
}).unwrap();
}
let std_time = start.elapsed();
println!("Crossbeam: {:?}", crossbeam_time);
println!("std::mpsc: {:?}", std_time);
println!("Speedup: {:.2}x", std_time.as_nanos() as f64 / crossbeam_time.as_nanos() as f64);
}
When to use Crossbeam:
- Multiple producers or consumers on channels
- High-contention scenarios with many threads
- Work-stealing patterns
- When you need
select!across channels - Scoped threads for cleaner borrowing
When standard library suffices:
- Single-producer, single-consumer with low volume
- Simple spawn-and-join patterns
- When minimizing dependencies matters more than performance
Common pitfalls:
- Don’t use unbounded channels without understanding memory implications
- Epoch pinning has overhead—don’t hold guards across long operations
- Work-stealing shines with many small tasks, not few large ones
Crossbeam is production-ready and powers critical infrastructure at companies processing millions of messages per second. If you’re building concurrent Rust and haven’t adopted it yet, you’re leaving performance on the table.