Rust Crossbeam: Lock-Free Concurrent Tools

Traditional mutex-based concurrency works well until it doesn't. Under high contention, threads spend more time waiting for locks than doing actual work. Lock-free programming sidesteps this by using...

Key Insights

  • Crossbeam provides lock-free data structures and channels that significantly outperform standard library primitives in high-contention scenarios, often by 2-10x depending on workload patterns.
  • Epoch-based memory reclamation solves the fundamental challenge of safely freeing memory in lock-free structures without garbage collection, enabling custom concurrent data structures in Rust.
  • Scoped threads eliminate the need for Arc when sharing stack data across threads, resulting in cleaner code and better performance for structured concurrent workloads.

Introduction to Lock-Free Programming

Traditional mutex-based concurrency works well until it doesn’t. Under high contention, threads spend more time waiting for locks than doing actual work. Lock-free programming sidesteps this by using atomic operations to coordinate access, allowing threads to make progress independently.

The tradeoff is complexity. Lock-free algorithms are notoriously difficult to implement correctly. Memory ordering, the ABA problem, and safe memory reclamation create a minefield for developers.

Crossbeam abstracts these challenges into battle-tested primitives. It’s the de facto standard for high-performance concurrency in Rust, providing channels, queues, and memory management tools that would take months to implement correctly from scratch.

[dependencies]
crossbeam = "0.8"
crossbeam-channel = "0.5"
crossbeam-epoch = "0.9"
crossbeam-utils = "0.8"

Crossbeam Channels: Beyond std::sync::mpsc

The standard library’s mpsc channel has limitations: single consumer only, no select! macro, and performance that degrades under contention. Crossbeam channels fix all of these.

Bounded channels provide backpressure—when the buffer fills, senders block. This prevents fast producers from overwhelming slow consumers and consuming unbounded memory. Unbounded channels buffer indefinitely, useful when you can’t afford to block producers.

The select! macro multiplexes across multiple channels, similar to Go’s select statement. This enables complex coordination patterns without spawning additional threads.

use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::thread;
use std::time::Duration;

fn worker_pool_with_backpressure() {
    // Bounded channel limits in-flight work to 100 items
    let (task_tx, task_rx): (Sender<u64>, Receiver<u64>) = bounded(100);
    let (result_tx, result_rx): (Sender<u64>, Receiver<u64>) = bounded(100);
    
    // Spawn worker threads
    let workers: Vec<_> = (0..4)
        .map(|id| {
            let task_rx = task_rx.clone();
            let result_tx = result_tx.clone();
            thread::spawn(move || {
                for task in task_rx {
                    // Simulate processing
                    let result = task * 2;
                    thread::sleep(Duration::from_micros(100));
                    if result_tx.send(result).is_err() {
                        break;
                    }
                }
                println!("Worker {} shutting down", id);
            })
        })
        .collect();
    
    drop(task_rx); // Workers own the receiver now
    drop(result_tx); // Main thread only reads results
    
    // Producer with timeout handling
    thread::spawn(move || {
        for i in 0..1000 {
            select! {
                send(task_tx, i) -> res => {
                    if res.is_err() { break; }
                }
                default(Duration::from_secs(1)) => {
                    println!("Send timeout - workers overwhelmed");
                    break;
                }
            }
        }
    });
    
    // Collect results
    let results: Vec<u64> = result_rx.iter().collect();
    println!("Processed {} results", results.len());
    
    for worker in workers {
        let _ = worker.join();
    }
}

Lock-Free Data Structures: ArrayQueue and SegQueue

Crossbeam provides two concurrent queue implementations optimized for different scenarios.

ArrayQueue is a fixed-capacity, lock-free queue backed by a contiguous array. It’s cache-friendly and has predictable memory usage, making it ideal for bounded work queues where you know the maximum size upfront.

SegQueue is an unbounded, lock-free queue that grows dynamically. It uses linked segments internally, trading some cache locality for flexibility. Use it when you can’t predict queue depth.

use crossbeam_deque::{Injector, Stealer, Worker};
use std::sync::Arc;
use std::thread;

fn work_stealing_pool() {
    // Global queue for new tasks
    let injector: Arc<Injector<u64>> = Arc::new(Injector::new());
    
    // Per-worker local queues
    let workers: Vec<Worker<u64>> = (0..4).map(|_| Worker::new_fifo()).collect();
    let stealers: Vec<Stealer<u64>> = workers.iter().map(|w| w.stealer()).collect();
    
    // Inject initial work
    for i in 0..1000 {
        injector.push(i);
    }
    
    let stealers = Arc::new(stealers);
    let handles: Vec<_> = workers
        .into_iter()
        .enumerate()
        .map(|(id, worker)| {
            let injector = Arc::clone(&injector);
            let stealers = Arc::clone(&stealers);
            
            thread::spawn(move || {
                let mut processed = 0u64;
                loop {
                    // Try local queue first
                    let task = worker.pop().or_else(|| {
                        // Try stealing from global queue
                        std::iter::repeat_with(|| injector.steal_batch_and_pop(&worker))
                            .find(|s| !s.is_retry())
                            .and_then(|s| s.success())
                    }).or_else(|| {
                        // Steal from other workers
                        stealers.iter()
                            .filter(|s| !s.is_empty())
                            .map(|s| s.steal())
                            .find(|s| !s.is_retry())
                            .and_then(|s| s.success())
                    });
                    
                    match task {
                        Some(t) => {
                            processed += t;
                        }
                        None if injector.is_empty() => break,
                        None => thread::yield_now(),
                    }
                }
                println!("Worker {} processed sum: {}", id, processed);
            })
        })
        .collect();
    
    for h in handles {
        h.join().unwrap();
    }
}

Epoch-Based Memory Reclamation

Lock-free data structures face a fundamental problem: when can you safely free removed nodes? Another thread might still hold a pointer to that memory. Traditional garbage collectors solve this, but Rust doesn’t have one.

Crossbeam uses epoch-based reclamation. The idea: divide time into epochs. Threads announce when they’re accessing shared data by “pinning” the current epoch. Memory is only freed when all threads have moved past the epoch where it was removed.

use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
use std::sync::atomic::Ordering::{Acquire, Release, Relaxed};

struct Node<T> {
    data: T,
    next: Atomic<Node<T>>,
}

pub struct LockFreeStack<T> {
    head: Atomic<Node<T>>,
}

impl<T> LockFreeStack<T> {
    pub fn new() -> Self {
        LockFreeStack {
            head: Atomic::null(),
        }
    }
    
    pub fn push(&self, data: T) {
        let mut node = Owned::new(Node {
            data,
            next: Atomic::null(),
        });
        
        let guard = epoch::pin();
        loop {
            let head = self.head.load(Acquire, &guard);
            node.next.store(head, Relaxed);
            
            match self.head.compare_exchange(
                head,
                node,
                Release,
                Relaxed,
                &guard,
            ) {
                Ok(_) => break,
                Err(e) => node = e.new,
            }
        }
    }
    
    pub fn pop(&self) -> Option<T> {
        let guard = epoch::pin();
        loop {
            let head = self.head.load(Acquire, &guard);
            let head_ref = unsafe { head.as_ref() }?;
            
            let next = head_ref.next.load(Relaxed, &guard);
            
            if self.head
                .compare_exchange(head, next, Release, Relaxed, &guard)
                .is_ok()
            {
                // Safe to defer destruction - epoch system tracks readers
                unsafe {
                    guard.defer_destroy(head);
                }
                // We need to extract data before the node is destroyed
                // This is safe because we own the node now
                return Some(unsafe { std::ptr::read(&head_ref.data) });
            }
        }
    }
}

impl<T> Drop for LockFreeStack<T> {
    fn drop(&mut self) {
        while self.pop().is_some() {}
    }
}

Scoped Threads and Structured Concurrency

Standard thread::spawn requires 'static bounds because the spawned thread might outlive the caller. This forces you to wrap shared data in Arc, even when you know the threads will join before the data goes out of scope.

Scoped threads solve this with a lifetime guarantee: all spawned threads must complete before the scope exits. This lets you borrow stack data directly.

use crossbeam_utils::thread;

fn parallel_array_processing() {
    let mut data = vec![1u64; 1_000_000];
    let chunk_size = data.len() / 4;
    
    // No Arc needed - scope guarantees threads complete before data is dropped
    thread::scope(|s| {
        for chunk in data.chunks_mut(chunk_size) {
            s.spawn(move |_| {
                for item in chunk.iter_mut() {
                    *item = item.wrapping_mul(7).wrapping_add(3);
                }
            });
        }
    }).unwrap();
    
    println!("First 5 results: {:?}", &data[..5]);
}

Utilities: Parker, ShardedLock, and WaitGroup

Crossbeam includes utilities for common synchronization patterns.

WaitGroup coordinates thread groups, similar to Go’s sync.WaitGroup. It’s cleaner than manually joining threads when you don’t need return values.

ShardedLock optimizes for read-heavy workloads by sharding the lock across CPU cores, reducing contention on the reader path.

use crossbeam_utils::sync::WaitGroup;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;

fn phased_computation() {
    let shared_state = Arc::new(AtomicU64::new(0));
    
    for phase in 0..3 {
        let wg = WaitGroup::new();
        
        for worker_id in 0..4 {
            let wg = wg.clone();
            let state = Arc::clone(&shared_state);
            
            thread::spawn(move || {
                // Simulate phase work
                let contribution = (phase * 10 + worker_id) as u64;
                state.fetch_add(contribution, Ordering::Relaxed);
                
                println!("Phase {} Worker {} complete", phase, worker_id);
                drop(wg); // Signal completion
            });
        }
        
        wg.wait(); // Block until all workers finish this phase
        println!(
            "Phase {} complete. State: {}",
            phase,
            shared_state.load(Ordering::Relaxed)
        );
    }
}

Benchmarks and When to Use Crossbeam

In my benchmarks, Crossbeam channels outperform std::sync::mpsc by 2-5x under contention. The gap widens with more producers and consumers.

use std::time::Instant;

fn benchmark_channels() {
    let iterations = 1_000_000;
    let producers = 4;
    let items_per_producer = iterations / producers;
    
    // Crossbeam bounded channel
    let start = Instant::now();
    {
        let (tx, rx) = crossbeam_channel::bounded::<u64>(1024);
        
        crossbeam_utils::thread::scope(|s| {
            for _ in 0..producers {
                let tx = tx.clone();
                s.spawn(move |_| {
                    for i in 0..items_per_producer {
                        tx.send(i as u64).unwrap();
                    }
                });
            }
            drop(tx);
            
            s.spawn(|_| {
                let mut count = 0;
                while rx.recv().is_ok() {
                    count += 1;
                }
                assert_eq!(count, iterations);
            });
        }).unwrap();
    }
    let crossbeam_time = start.elapsed();
    
    // std::sync::mpsc
    let start = Instant::now();
    {
        let (tx, rx) = std::sync::mpsc::sync_channel::<u64>(1024);
        
        crossbeam_utils::thread::scope(|s| {
            for _ in 0..producers {
                let tx = tx.clone();
                s.spawn(move |_| {
                    for i in 0..items_per_producer {
                        tx.send(i as u64).unwrap();
                    }
                });
            }
            drop(tx);
            
            s.spawn(|_| {
                let mut count = 0;
                while rx.recv().is_ok() {
                    count += 1;
                }
                assert_eq!(count, iterations);
            });
        }).unwrap();
    }
    let std_time = start.elapsed();
    
    println!("Crossbeam: {:?}", crossbeam_time);
    println!("std::mpsc: {:?}", std_time);
    println!("Speedup: {:.2}x", std_time.as_nanos() as f64 / crossbeam_time.as_nanos() as f64);
}

When to use Crossbeam:

  • Multiple producers or consumers on channels
  • High-contention scenarios with many threads
  • Work-stealing patterns
  • When you need select! across channels
  • Scoped threads for cleaner borrowing

When standard library suffices:

  • Single-producer, single-consumer with low volume
  • Simple spawn-and-join patterns
  • When minimizing dependencies matters more than performance

Common pitfalls:

  • Don’t use unbounded channels without understanding memory implications
  • Epoch pinning has overhead—don’t hold guards across long operations
  • Work-stealing shines with many small tasks, not few large ones

Crossbeam is production-ready and powers critical infrastructure at companies processing millions of messages per second. If you’re building concurrent Rust and haven’t adopted it yet, you’re leaving performance on the table.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.