Channels: Message Passing Between Threads
'Don't communicate by sharing memory; share memory by communicating.' This Go proverb captures a fundamental shift in how we think about concurrent programming. Instead of multiple threads fighting...
Key Insights
- Channels enforce thread safety through ownership transfer at compile time, eliminating data races by design rather than by careful programming
- Bounded channels provide natural backpressure, preventing fast producers from overwhelming slow consumers and causing memory exhaustion
- The choice between channels and shared state isn’t ideological—channels excel at transferring ownership and coordinating work, while mutexes are better for shared access to a single resource
Introduction to Channel-Based Concurrency
“Don’t communicate by sharing memory; share memory by communicating.” This Go proverb captures a fundamental shift in how we think about concurrent programming. Instead of multiple threads fighting over shared data protected by locks, threads own their data exclusively and pass messages when coordination is needed.
Traditional shared-state concurrency with mutexes is error-prone. You forget to acquire a lock, you acquire locks in the wrong order, you hold a lock too long. These bugs are subtle, intermittent, and notoriously difficult to reproduce. Channels flip the model: data moves between threads through typed conduits, and the type system ensures you can’t accidentally access data that another thread owns.
Rust takes this further than most languages. Its ownership system means that when you send a value through a channel, you literally give up access to it. The compiler rejects code that tries to use data after sending it. This isn’t a runtime check or a convention—it’s a compile-time guarantee.
Channel Fundamentals
A channel consists of two halves: a sender and a receiver. The sender pushes values into the channel; the receiver pulls them out. In Rust’s standard library, these are Sender<T> and Receiver<T> from std::sync::mpsc (multiple producer, single consumer).
Channels can be bounded or unbounded. An unbounded channel accepts values indefinitely (until you run out of memory). A bounded channel has a fixed capacity—when full, senders block until space becomes available. This distinction matters enormously for system behavior under load.
Here’s the simplest possible channel usage:
use std::sync::mpsc;
use std::thread;
fn main() {
// Create an unbounded channel
let (sender, receiver) = mpsc::channel();
// Spawn a thread that sends a message
let handle = thread::spawn(move || {
let data = vec![1, 2, 3, 4, 5];
sender.send(data).unwrap();
// data is now owned by the channel - we can't use it here
println!("Data sent from worker thread");
});
// Receive the message in the main thread
let received = receiver.recv().unwrap();
println!("Received: {:?}", received);
handle.join().unwrap();
}
Notice the move keyword on the closure. The sender is moved into the spawned thread, transferring ownership. After sender.send(data), the data vector belongs to whoever receives it. Try adding println!("{:?}", data) after the send—the compiler will reject it.
Synchronous vs Asynchronous Channels
The standard library’s mpsc::channel() creates an asynchronous (unbounded) channel. Sends never block; they just allocate more buffer space. This sounds convenient but can be dangerous—a fast producer can queue millions of messages while a slow consumer falls behind, exhausting memory.
mpsc::sync_channel(n) creates a bounded channel with capacity n. When the buffer is full, send() blocks until a receiver consumes a message. With n = 0, you get a rendezvous channel: every send blocks until a corresponding receive happens, providing tight synchronization.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Bounded channel with capacity 2
let (sender, receiver) = mpsc::sync_channel(2);
let producer = thread::spawn(move || {
for i in 0..5 {
println!("Sending {}", i);
sender.send(i).unwrap();
println!("Sent {}", i);
}
});
let consumer = thread::spawn(move || {
for _ in 0..5 {
thread::sleep(Duration::from_millis(500));
let val = receiver.recv().unwrap();
println!("Received {}", val);
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
Run this and watch the output. The producer sends 0 and 1 immediately (filling the buffer), then blocks on sending 2 until the consumer catches up. This backpressure propagates naturally through your system, preventing resource exhaustion.
For rendezvous semantics, use sync_channel(0):
let (sender, receiver) = mpsc::sync_channel(0);
// Now every send blocks until a receive happens
This is useful when you need confirmation that a message was received before proceeding.
Multiple Producers and Consumers
The “mpsc” in std::sync::mpsc stands for multiple producer, single consumer. You can clone senders to create multiple producers:
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
let mut handles = vec![];
// Spawn 4 producer threads
for id in 0..4 {
let sender_clone = sender.clone();
let handle = thread::spawn(move || {
for i in 0..3 {
sender_clone.send(format!("Worker {}: message {}", id, i)).unwrap();
}
});
handles.push(handle);
}
// Drop the original sender so the channel closes when all clones are dropped
drop(sender);
// Single consumer processes all messages
for message in receiver {
println!("{}", message);
}
for handle in handles {
handle.join().unwrap();
}
}
The drop(sender) is crucial. A channel closes when all senders are dropped. The for message in receiver loop iterates until the channel closes. If you keep the original sender alive, the loop blocks forever waiting for more messages.
For multiple consumers, the standard library doesn’t help—you need a crate like crossbeam-channel, which provides multi-producer, multi-consumer (mpmc) channels:
use crossbeam_channel::unbounded;
use std::thread;
fn main() {
let (sender, receiver) = unbounded();
let mut handles = vec![];
// Multiple producers
for id in 0..2 {
let s = sender.clone();
handles.push(thread::spawn(move || {
for i in 0..5 {
s.send(format!("Producer {}: item {}", id, i)).unwrap();
}
}));
}
// Multiple consumers
for id in 0..2 {
let r = receiver.clone();
handles.push(thread::spawn(move || {
while let Ok(msg) = r.recv() {
println!("Consumer {} got: {}", id, msg);
}
}));
}
drop(sender);
drop(receiver);
for handle in handles {
handle.join().unwrap();
}
}
Select and Multiplexing
Real systems often need to wait on multiple channels simultaneously—receiving from whichever has data first, or timing out if nothing arrives. The crossbeam-channel crate provides a select! macro for this:
use crossbeam_channel::{bounded, select, Receiver};
use std::thread;
use std::time::Duration;
fn main() {
let (urgent_send, urgent_recv) = bounded(10);
let (normal_send, normal_recv) = bounded(10);
// Producer for urgent messages
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
urgent_send.send("URGENT: Server on fire").unwrap();
});
// Producer for normal messages
thread::spawn(move || {
for i in 0..3 {
thread::sleep(Duration::from_millis(50));
normal_send.send(format!("Normal message {}", i)).unwrap();
}
});
// Consumer that handles both with timeout
let timeout = Duration::from_millis(500);
loop {
select! {
recv(urgent_recv) -> msg => {
match msg {
Ok(m) => println!("🚨 {}", m),
Err(_) => println!("Urgent channel closed"),
}
}
recv(normal_recv) -> msg => {
match msg {
Ok(m) => println!("📬 {}", m),
Err(_) => println!("Normal channel closed"),
}
}
default(timeout) => {
println!("No messages for 500ms, shutting down");
break;
}
}
}
}
The select! macro blocks until one of the operations can proceed, then executes the corresponding branch. The default(timeout) branch fires if nothing happens within the specified duration.
Common Patterns and Pitfalls
The pipeline pattern chains processing stages, each running in its own thread:
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
fn stage<T, U, F>(input: Receiver<T>, output: Sender<U>, transform: F) -> thread::JoinHandle<()>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
thread::spawn(move || {
for item in input {
output.send(transform(item)).unwrap();
}
})
}
fn main() {
let (input_send, input_recv) = channel();
let (stage1_send, stage1_recv) = channel();
let (stage2_send, stage2_recv) = channel();
// Stage 1: Parse strings to numbers
let h1 = stage(input_recv, stage1_send, |s: String| s.parse::<i32>().unwrap());
// Stage 2: Double the numbers
let h2 = stage(stage1_recv, stage2_send, |n| n * 2);
// Stage 3: Format as strings
let h3 = thread::spawn(move || {
for n in stage2_recv {
println!("Result: {}", n);
}
});
// Feed input
for i in 1..=5 {
input_send.send(i.to_string()).unwrap();
}
drop(input_send); // Close the pipeline
h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
}
Deadlock warning: Channels can deadlock if threads wait on each other circularly. Thread A waits to send to B, B waits to send to C, C waits to send to A—everyone blocks forever. Design your data flow as a directed acyclic graph, or use timeouts and try_send/try_recv for non-blocking operations.
When to Use Channels vs Other Primitives
Channels excel when you’re transferring ownership of data between threads or coordinating sequential work. Use them for:
- Worker pools processing independent tasks
- Pipeline architectures with distinct stages
- Event distribution systems
- Request/response patterns between components
Mutexes and atomics are better when multiple threads need to read or modify the same data structure in place. A shared configuration object, a cache, or a counter—these are naturally shared state. Wrapping them in channels adds complexity without benefit.
The actor model (as seen in Erlang or Akka) is channels taken to their logical conclusion: every piece of state lives in an actor that communicates only through messages. Rust channels give you the building blocks; you choose how far to take the pattern.
Start with channels when coordinating threads. Reach for mutexes when you genuinely need shared access. The ownership system will guide you—fighting the borrow checker often means you’ve chosen the wrong primitive for your problem.