Thread Pool: Reusing Worker Threads

Every time you spawn a new thread, your operating system allocates a stack (typically 1-2 MB), creates kernel data structures, and adds the thread to its scheduling queue. For a single task, this...

Key Insights

  • Thread pools eliminate the overhead of repeatedly creating and destroying threads by maintaining a fixed set of workers that process tasks from a shared queue
  • Pool sizing depends critically on workload type: CPU-bound tasks benefit from thread counts near the core count, while I/O-bound tasks can use significantly more threads
  • Most production code should use battle-tested standard library implementations rather than custom thread pools, but understanding the internals helps you configure them correctly

The Cost of Thread Creation

Every time you spawn a new thread, your operating system allocates a stack (typically 1-2 MB), creates kernel data structures, and adds the thread to its scheduling queue. For a single task, this overhead is negligible. When you’re processing thousands of requests per second, it becomes a bottleneck.

Consider a web server handling 10,000 concurrent connections. Creating a dedicated thread for each connection would consume 10-20 GB of memory just for thread stacks—before your application does any actual work. The OS scheduler would also struggle to efficiently manage that many threads, leading to excessive context switching.

Thread pools solve this by maintaining a fixed set of worker threads that pull tasks from a queue. The threads stay alive between tasks, amortizing creation costs across thousands of operations. This pattern is so fundamental that it’s built into every major language’s standard library.

Thread Pool Architecture

A thread pool consists of three core components working together:

Worker threads are long-lived threads that continuously poll for work. When no tasks are available, they block on the queue rather than spinning and wasting CPU cycles.

Task queue holds pending work items. Workers compete to dequeue tasks, and producers submit new tasks without waiting for execution.

Pool manager handles lifecycle operations: starting workers, tracking their state, and coordinating shutdown.

from collections import deque
from threading import Thread, Lock, Condition
from typing import Callable, Any

class ThreadPool:
    def __init__(self, num_workers: int):
        self.task_queue: deque[Callable[[], Any]] = deque()
        self.lock = Lock()
        self.condition = Condition(self.lock)
        self.workers: list[Thread] = []
        self.shutdown_flag = False
        
        # Initialize worker threads
        for i in range(num_workers):
            worker = Thread(target=self._worker_loop, name=f"Worker-{i}")
            worker.daemon = True
            self.workers.append(worker)
    
    def _worker_loop(self):
        """Main loop for worker threads."""
        while True:
            task = None
            with self.condition:
                while not self.task_queue and not self.shutdown_flag:
                    self.condition.wait()
                
                if self.shutdown_flag and not self.task_queue:
                    return
                
                task = self.task_queue.popleft()
            
            if task:
                try:
                    task()
                except Exception as e:
                    print(f"Task failed: {e}")

The condition variable is crucial here. Workers don’t busy-wait checking for tasks—they sleep until notify() signals that work is available. This keeps idle threads from consuming CPU time.

Implementing a Simple Thread Pool

Let’s complete the implementation with task submission and lifecycle management:

class ThreadPool:
    def __init__(self, num_workers: int):
        self.task_queue: deque[Callable[[], Any]] = deque()
        self.lock = Lock()
        self.condition = Condition(self.lock)
        self.workers: list[Thread] = []
        self.shutdown_flag = False
        self.started = False
        
        for i in range(num_workers):
            worker = Thread(target=self._worker_loop, name=f"Worker-{i}")
            worker.daemon = True
            self.workers.append(worker)
    
    def start(self):
        """Start all worker threads."""
        if self.started:
            return
        self.started = True
        for worker in self.workers:
            worker.start()
    
    def submit(self, task: Callable[[], Any]):
        """Add a task to the queue."""
        if self.shutdown_flag:
            raise RuntimeError("Pool is shutting down")
        
        with self.condition:
            self.task_queue.append(task)
            self.condition.notify()  # Wake one waiting worker
    
    def shutdown(self, wait: bool = True):
        """Signal shutdown and optionally wait for completion."""
        with self.condition:
            self.shutdown_flag = True
            self.condition.notify_all()  # Wake all workers
        
        if wait:
            for worker in self.workers:
                worker.join()
    
    def _worker_loop(self):
        while True:
            task = None
            with self.condition:
                while not self.task_queue and not self.shutdown_flag:
                    self.condition.wait()
                
                if self.shutdown_flag and not self.task_queue:
                    return
                
                task = self.task_queue.popleft()
            
            if task:
                try:
                    task()
                except Exception as e:
                    print(f"Task failed: {e}")


# Usage
pool = ThreadPool(4)
pool.start()

for i in range(100):
    pool.submit(lambda x=i: print(f"Processing task {x}"))

pool.shutdown(wait=True)

Notice the lambda x=i pattern—this captures the loop variable by value. Without it, all tasks would reference the final value of i.

Task Queue Strategies

The queue strategy significantly impacts behavior under load:

Unbounded queues accept tasks indefinitely. Simple to implement, but dangerous—if producers outpace consumers, memory usage grows without limit until your process crashes.

Bounded queues have a maximum capacity. When full, you need a rejection policy:

from enum import Enum
from queue import Queue, Full

class RejectionPolicy(Enum):
    ABORT = "abort"           # Raise exception
    CALLER_RUNS = "caller"    # Execute in submitting thread
    DISCARD = "discard"       # Silently drop
    DISCARD_OLDEST = "oldest" # Drop oldest queued task

class BoundedThreadPool:
    def __init__(self, num_workers: int, queue_size: int, 
                 policy: RejectionPolicy = RejectionPolicy.ABORT):
        self.task_queue: Queue = Queue(maxsize=queue_size)
        self.policy = policy
        self.shutdown_flag = False
        # ... worker initialization
    
    def submit(self, task: Callable[[], Any]):
        if self.shutdown_flag:
            raise RuntimeError("Pool is shutting down")
        
        try:
            self.task_queue.put_nowait(task)
        except Full:
            self._handle_rejection(task)
    
    def _handle_rejection(self, task: Callable[[], Any]):
        if self.policy == RejectionPolicy.ABORT:
            raise RuntimeError("Task queue full")
        elif self.policy == RejectionPolicy.CALLER_RUNS:
            task()  # Block the caller
        elif self.policy == RejectionPolicy.DISCARD:
            pass  # Drop silently
        elif self.policy == RejectionPolicy.DISCARD_OLDEST:
            try:
                self.task_queue.get_nowait()  # Remove oldest
                self.task_queue.put_nowait(task)
            except:
                pass

The CALLER_RUNS policy provides natural backpressure—when the pool is saturated, submitting threads slow down by executing tasks themselves.

Sizing Your Thread Pool

Pool sizing is workload-dependent. The standard formulas provide starting points:

CPU-bound tasks: threads = number_of_cores

These tasks keep the CPU busy. More threads than cores means context switching overhead without throughput gains.

I/O-bound tasks: threads = number_of_cores * (1 + wait_time/compute_time)

If tasks spend 90% of their time waiting on I/O, you can productively use 10x more threads than cores.

import os
import time
from concurrent.futures import ThreadPoolExecutor

def cpu_bound_task():
    """Simulates CPU-intensive work."""
    total = 0
    for i in range(10_000_000):
        total += i * i
    return total

def io_bound_task():
    """Simulates I/O wait."""
    time.sleep(0.1)
    return "done"

def benchmark(pool_size: int, task_func, num_tasks: int):
    start = time.time()
    with ThreadPoolExecutor(max_workers=pool_size) as pool:
        list(pool.map(lambda _: task_func(), range(num_tasks)))
    return time.time() - start

cores = os.cpu_count()

# CPU-bound: more threads don't help (and may hurt due to GIL in Python)
for size in [cores, cores * 2, cores * 4]:
    elapsed = benchmark(size, cpu_bound_task, 20)
    print(f"CPU-bound, {size} threads: {elapsed:.2f}s")

# I/O-bound: more threads improve throughput
for size in [cores, cores * 4, cores * 16]:
    elapsed = benchmark(size, io_bound_task, 100)
    print(f"I/O-bound, {size} threads: {elapsed:.2f}s")

For Python specifically, the Global Interpreter Lock (GIL) means CPU-bound tasks don’t parallelize with threads—use ProcessPoolExecutor instead.

Using Built-in Thread Pools

Production code should use standard library implementations. They’re battle-tested and handle edge cases you haven’t thought of.

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request

def fetch_url(url: str) -> tuple[str, int]:
    with urllib.request.urlopen(url, timeout=10) as response:
        return url, len(response.read())

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/json",
]

# Submit tasks and process results as they complete
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {executor.submit(fetch_url, url): url for url in urls}
    
    for future in as_completed(futures):
        url = futures[future]
        try:
            result_url, size = future.result(timeout=30)
            print(f"{result_url}: {size} bytes")
        except Exception as e:
            print(f"{url} failed: {e}")

Java’s ExecutorService offers similar functionality with more configuration options:

ExecutorService executor = new ThreadPoolExecutor(
    4,                      // core pool size
    16,                     // maximum pool size  
    60L, TimeUnit.SECONDS,  // idle thread timeout
    new ArrayBlockingQueue<>(100),  // bounded queue
    new ThreadPoolExecutor.CallerRunsPolicy()  // rejection policy
);

Future<String> future = executor.submit(() -> {
    return fetchData();
});

executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);

Common Pitfalls and Best Practices

Deadlocks from task dependencies: If task A waits for task B’s result, and both are in the same pool, you can deadlock when all workers are blocked on waiting tasks. Use separate pools for tasks with dependencies, or use async/await patterns instead.

Thread leaks: Always shut down pools explicitly. In Java, use try-with-resources or finally blocks. In Python, use context managers.

Swallowed exceptions: Exceptions in worker threads don’t propagate to the main thread automatically. Always check Future.result() or implement exception handlers.

Blocking the queue indefinitely: Bounded queues with blocking submission can deadlock if the submitting thread is also a worker. Use submit_nowait with rejection handling instead.

Production checklist:

  • Set meaningful thread names for debugging
  • Configure appropriate queue bounds
  • Implement rejection policies explicitly
  • Add monitoring for queue depth and active thread count
  • Use timeouts on all blocking operations
  • Ensure graceful shutdown in signal handlers

Thread pools are foundational infrastructure. Get them right once, and they’ll reliably serve thousands of concurrent operations. Get them wrong, and you’ll spend hours debugging mysterious hangs and memory leaks.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.