Go Worker Pool Pattern: Concurrent Job Processing

The worker pool pattern solves a fundamental problem in concurrent programming: how do you process many tasks concurrently without overwhelming your system? Go makes it trivially easy to spawn...

Key Insights

  • Worker pools prevent resource exhaustion by limiting concurrent goroutines to a fixed number, making your application predictable and stable under load
  • The pattern relies on three components: a job channel that acts as a queue, worker goroutines that process jobs, and a results channel for collecting outputs
  • Proper shutdown handling with WaitGroups and context cancellation is critical to prevent goroutine leaks and ensure all jobs complete before program exit

Understanding the Worker Pool Pattern

The worker pool pattern solves a fundamental problem in concurrent programming: how do you process many tasks concurrently without overwhelming your system? Go makes it trivially easy to spawn goroutines, but spawning one goroutine per task is a recipe for disaster when you’re dealing with thousands or millions of jobs.

Consider a web scraper processing 100,000 URLs. Launching 100,000 goroutines simultaneously will exhaust file descriptors, overwhelm network connections, and likely crash your application. A worker pool limits concurrency to a sensible number—say, 50 workers—allowing you to process all jobs while maintaining control over resource usage.

The pattern is particularly valuable when tasks are I/O-bound (API calls, database queries, file operations) where you want concurrency but need bounded parallelism, or CPU-bound operations where you want to match worker count to available cores.

Core Components Explained

A worker pool consists of four key elements working together:

Job Queue: A channel that holds pending work. Producers send jobs to this channel, and workers pull from it. This channel acts as a buffer between job creation and job execution.

Workers: A fixed number of goroutines that continuously pull jobs from the queue, process them, and optionally send results to an output channel.

Results Channel: An optional channel for collecting processed results. Not all worker pools need this—some jobs may write directly to databases or files.

Dispatcher: The coordination logic that spawns workers, manages shutdown, and ensures clean termination.

Here’s the conceptual structure:

type Job struct {
    ID   int
    Data interface{}
}

type Result struct {
    JobID int
    Value interface{}
    Error error
}

// jobs channel receives work
jobs := make(chan Job, 100)

// results channel collects outputs
results := make(chan Result, 100)

// spawn N workers
for i := 0; i < workerCount; i++ {
    go worker(jobs, results)
}

Building a Basic Worker Pool

Let’s build a functional worker pool from scratch. This example processes simple computation jobs with a fixed pool of workers:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID    int
    Value int
}

type Result struct {
    JobID  int
    Result int
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job.ID)
        
        // Simulate work
        time.Sleep(100 * time.Millisecond)
        result := job.Value * 2
        
        results <- Result{
            JobID:  job.ID,
            Result: result,
        }
    }
    
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    const numWorkers = 3
    const numJobs = 10
    
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // Send jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- Job{ID: j, Value: j * 10}
    }
    close(jobs) // Signal no more jobs
    
    // Close results when all workers finish
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Printf("Job %d result: %d\n", result.JobID, result.Result)
    }
}

This implementation demonstrates the complete lifecycle: workers start and wait for jobs, the main goroutine sends all jobs and closes the channel, workers process jobs and send results, and finally we collect all results. The WaitGroup ensures we don’t close the results channel until all workers have finished.

Handling Results and Errors

Production worker pools need robust error handling and graceful shutdown. Here’s an enhanced version with context cancellation:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Data string
}

type Result struct {
    JobID int
    Value string
    Error error
}

func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d cancelled\n", id)
            return
        case job, ok := <-jobs:
            if !ok {
                return // Channel closed
            }
            
            // Simulate work that might fail
            time.Sleep(50 * time.Millisecond)
            
            var result Result
            result.JobID = job.ID
            
            if job.ID%7 == 0 {
                result.Error = fmt.Errorf("job %d failed", job.ID)
            } else {
                result.Value = fmt.Sprintf("Processed: %s", job.Data)
            }
            
            select {
            case results <- result:
            case <-ctx.Done():
                return
            }
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    const numWorkers = 4
    jobs := make(chan Job, 20)
    results := make(chan Result, 20)
    
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            worker(ctx, workerID, jobs, results)
        }(i)
    }
    
    // Send jobs
    go func() {
        for i := 1; i <= 20; i++ {
            jobs <- Job{ID: i, Data: fmt.Sprintf("task-%d", i)}
        }
        close(jobs)
    }()
    
    // Close results after workers finish
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Process results
    successCount := 0
    errorCount := 0
    
    for result := range results {
        if result.Error != nil {
            fmt.Printf("Error in job %d: %v\n", result.JobID, result.Error)
            errorCount++
        } else {
            successCount++
        }
    }
    
    fmt.Printf("Completed: %d successful, %d errors\n", successCount, errorCount)
}

The context allows us to cancel all workers immediately if needed. Each worker checks ctx.Done() in its select statement, enabling clean shutdown even if workers are blocked.

Advanced Patterns: Timeouts and Priority

Real-world applications often need per-job timeouts and priority handling. Here’s how to implement job-level timeouts:

func workerWithTimeout(id int, jobs <-chan Job, results chan<- Result) {
    for job := range jobs {
        // Create timeout context for this specific job
        ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
        
        resultChan := make(chan Result, 1)
        
        go func() {
            // Simulate work
            time.Sleep(time.Duration(job.ID*50) * time.Millisecond)
            resultChan <- Result{
                JobID: job.ID,
                Value: fmt.Sprintf("Completed job %d", job.ID),
            }
        }()
        
        select {
        case result := <-resultChan:
            results <- result
        case <-ctx.Done():
            results <- Result{
                JobID: job.ID,
                Error: fmt.Errorf("job %d timed out", job.ID),
            }
        }
        
        cancel() // Clean up context
    }
}

For priority queues, use separate channels for different priority levels and select with priority:

func priorityWorker(highPriority, lowPriority <-chan Job, results chan<- Result) {
    for {
        select {
        case job := <-highPriority:
            processJob(job, results)
        default:
            select {
            case job := <-highPriority:
                processJob(job, results)
            case job := <-lowPriority:
                processJob(job, results)
            }
        }
    }
}

Real-World Use Case: Image Processing

Here’s a practical example processing image thumbnails concurrently:

package main

import (
    "fmt"
    "path/filepath"
    "sync"
    "sync/atomic"
)

type ImageJob struct {
    SourcePath string
    TargetPath string
}

type ImageResult struct {
    SourcePath string
    Success    bool
    Error      error
}

func imageWorker(id int, jobs <-chan ImageJob, results chan<- ImageResult, processed *atomic.Int32) {
    for job := range jobs {
        // Simulate image processing
        // In reality: load image, resize, save thumbnail
        err := processImage(job.SourcePath, job.TargetPath)
        
        results <- ImageResult{
            SourcePath: job.SourcePath,
            Success:    err == nil,
            Error:      err,
        }
        
        processed.Add(1)
    }
}

func processImage(source, target string) error {
    // Placeholder for actual image processing
    // img, _ := imaging.Open(source)
    // thumb := imaging.Resize(img, 200, 0, imaging.Lanczos)
    // return imaging.Save(thumb, target)
    return nil
}

func main() {
    images := []string{"img1.jpg", "img2.jpg", "img3.jpg", "img4.jpg", "img5.jpg"}
    
    jobs := make(chan ImageJob, len(images))
    results := make(chan ImageResult, len(images))
    
    var wg sync.WaitGroup
    var processed atomic.Int32
    
    // Start 3 workers
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            imageWorker(workerID, jobs, results, &processed)
        }(i)
    }
    
    // Queue jobs
    for _, img := range images {
        jobs <- ImageJob{
            SourcePath: img,
            TargetPath: filepath.Join("thumbnails", img),
        }
    }
    close(jobs)
    
    // Progress reporter
    go func() {
        for processed.Load() < int32(len(images)) {
            fmt.Printf("Progress: %d/%d\n", processed.Load(), len(images))
            // time.Sleep(500 * time.Millisecond)
        }
    }()
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        if result.Success {
            fmt.Printf("✓ %s\n", result.SourcePath)
        } else {
            fmt.Printf("✗ %s: %v\n", result.SourcePath, result.Error)
        }
    }
}

Performance Considerations

Worker Count: For I/O-bound tasks, start with 2-4× your CPU cores. For CPU-bound work, match runtime.NumCPU(). Always benchmark with realistic workloads.

Channel Buffering: Buffered channels reduce contention. Buffer size should roughly match your batch size or expected burst rate. Unbuffered channels create backpressure but can cause blocking.

Monitoring: Track queue depth, worker utilization, and job latency. Expose metrics via expvar or Prometheus:

var (
    jobsProcessed = expvar.NewInt("jobs_processed")
    jobsFailed    = expvar.NewInt("jobs_failed")
)

Common Pitfalls: Never close channels from the consumer side—only producers should close. Always use WaitGroups to prevent premature result channel closure. Don’t forget to handle context cancellation in long-running jobs.

The worker pool pattern is fundamental to building scalable Go applications. Master it, and you’ll have a reliable tool for managing concurrency in everything from web scrapers to data pipelines.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.