Go Channel Patterns: Fan-In, Fan-Out, Pipeline
Go's concurrency model centers on the philosophy 'don't communicate by sharing memory; share memory by communicating.' Channels are the pipes that connect concurrent goroutines, and specific patterns...
Key Insights
- Pipelines transform Go’s concurrency primitives into composable data processing stages, where each stage runs in its own goroutine and passes results through channels
- Fan-out distributes work across multiple goroutines for parallel processing, while fan-in merges results from multiple sources back into a single channel
- Proper channel closing, context cancellation, and goroutine lifecycle management are critical to prevent leaks and ensure clean shutdown in concurrent systems
Understanding Go’s Channel Patterns
Go’s concurrency model centers on the philosophy “don’t communicate by sharing memory; share memory by communicating.” Channels are the pipes that connect concurrent goroutines, and specific patterns have emerged for structuring complex concurrent operations. The pipeline, fan-out, and fan-in patterns form the foundation of scalable concurrent architectures in Go.
These patterns aren’t academic exercises—they solve real problems. When you need to process thousands of API requests, transform streaming data, or build a web scraper that respects rate limits while maximizing throughput, these patterns provide battle-tested solutions.
Let’s start with the basics:
package main
import "fmt"
func main() {
// Create a channel
messages := make(chan string)
// Send in a goroutine
go func() {
messages <- "hello from goroutine"
}()
// Receive in main
msg := <-messages
fmt.Println(msg)
}
This simple example shows channel communication, but real applications need structured approaches to manage complexity.
Pipeline Pattern
A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. Each stage receives values from upstream, performs some operation, and sends values downstream.
Here’s a three-stage pipeline that generates numbers, squares them, and prints the results:
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func printer(in <-chan int) {
for n := range in {
fmt.Println(n)
}
}
func main() {
// Build the pipeline
numbers := generator(1, 2, 3, 4, 5)
squares := square(numbers)
printer(squares)
}
Each stage closes its output channel when done, signaling downstream stages to finish. The range loop automatically exits when the channel closes.
Pipelines should support cancellation. Using context, we can stop processing early:
func generatorWithContext(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func squareWithContext(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
select {
case out <- n * n:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
Now you can cancel the entire pipeline by canceling the context, preventing goroutine leaks.
Fan-Out Pattern
Fan-out distributes work from a single channel across multiple worker goroutines. This pattern excels when you have CPU-intensive operations or I/O-bound tasks that can execute in parallel.
Here’s a worker pool that processes jobs concurrently:
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Second) // Simulate work
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start 3 workers (fan-out)
numWorkers := 3
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(w)
}
// Send jobs
go func() {
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
}()
// Close results when all workers finish
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Println("Result:", result)
}
}
The key insight: multiple goroutines read from the same jobs channel. Go’s runtime ensures each job is delivered to exactly one worker. The WaitGroup tracks when all workers finish, allowing us to close the results channel safely.
Fan-In Pattern
Fan-in is the inverse of fan-out: it multiplexes multiple input channels into a single output channel. This pattern merges results from parallel operations.
Here’s a basic fan-in function that merges two channels:
func fanIn(ch1, ch2 <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case v, ok := <-ch1:
if !ok {
ch1 = nil
continue
}
out <- v
case v, ok := <-ch2:
if !ok {
ch2 = nil
continue
}
out <- v
}
if ch1 == nil && ch2 == nil {
return
}
}
}()
return out
}
Setting closed channels to nil prevents them from being selected again. The function returns when both inputs are exhausted.
For an arbitrary number of channels, use a more general approach:
func fanInMultiple(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
multiplex := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
out <- v
}
}
wg.Add(len(channels))
for _, ch := range channels {
go multiplex(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Each input channel gets its own goroutine that forwards values to the output. When all inputs close, the output closes.
Combining Patterns: Real-World Example
Let’s build a log processor that reads log files, parses entries in parallel, and aggregates results. This combines all three patterns:
type LogEntry struct {
Level string
Message string
}
// Pipeline stage 1: Generate file paths
func fileGenerator(ctx context.Context, files []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, file := range files {
select {
case out <- file:
case <-ctx.Done():
return
}
}
}()
return out
}
// Pipeline stage 2: Parse logs (fan-out)
func logParser(ctx context.Context, id int, files <-chan string) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for file := range files {
// Simulate parsing
entries := []LogEntry{
{Level: "INFO", Message: fmt.Sprintf("Processed by worker %d: %s", id, file)},
{Level: "ERROR", Message: fmt.Sprintf("Error in %s", file)},
}
for _, entry := range entries {
select {
case out <- entry:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Pipeline stage 3: Aggregate (fan-in)
func aggregator(ctx context.Context, inputs []<-chan LogEntry) <-chan map[string]int {
out := make(chan map[string]int)
go func() {
defer close(out)
counts := make(map[string]int)
// Fan-in all parser outputs
merged := fanInLogs(inputs...)
for entry := range merged {
counts[entry.Level]++
}
select {
case out <- counts:
case <-ctx.Done():
}
}()
return out
}
func fanInLogs(channels ...<-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan LogEntry) {
defer wg.Done()
for entry := range c {
out <- entry
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
files := []string{"app.log", "error.log", "access.log", "debug.log"}
// Stage 1: Generate file paths
fileChan := fileGenerator(ctx, files)
// Stage 2: Fan-out to multiple parsers
numParsers := 3
parsers := make([]<-chan LogEntry, numParsers)
for i := 0; i < numParsers; i++ {
parsers[i] = logParser(ctx, i, fileChan)
}
// Stage 3: Fan-in and aggregate
results := aggregator(ctx, parsers)
// Print results
for counts := range results {
fmt.Println("Log level counts:", counts)
}
}
This example demonstrates production-ready patterns: context cancellation, proper channel closing, and goroutine lifecycle management.
Best Practices and Gotchas
Always close channels from the sender side. Closing from the receiver causes panics if the sender tries to write. The sender knows when no more data will arrive:
func producer(out chan<- int) {
defer close(out) // Sender closes
for i := 0; i < 10; i++ {
out <- i
}
}
Prevent goroutine leaks with context. Without cancellation, blocked goroutines leak:
// BAD: Goroutine leaks if nobody reads from out
func leakyGenerator() <-chan int {
out := make(chan int)
go func() {
for i := 0; ; i++ {
out <- i // Blocks forever if no receiver
}
}()
return out
}
// GOOD: Respects cancellation
func safeGenerator(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; ; i++ {
select {
case out <- i:
case <-ctx.Done():
return
}
}
}()
return out
}
Use buffered channels judiciously. Buffering can improve throughput by decoupling sender and receiver speeds, but excessive buffering hides backpressure problems. Start unbuffered, add buffering when profiling shows contention.
Handle errors explicitly. Channels don’t carry error information. Either use a struct that includes an error field or use separate error channels:
type Result struct {
Value int
Err error
}
func workerWithErrors(jobs <-chan int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range jobs {
if job < 0 {
out <- Result{Err: fmt.Errorf("invalid job: %d", job)}
continue
}
out <- Result{Value: job * 2}
}
}()
return out
}
These patterns form the vocabulary for concurrent Go programs. Master them, and you’ll build systems that scale efficiently while remaining comprehensible and maintainable.