Python concurrent.futures: Thread and Process Pools

Python's `concurrent.futures` module is the standard library's high-level interface for executing tasks concurrently. It abstracts away the complexity of threading and multiprocessing, providing a...

Key Insights

  • Use ThreadPoolExecutor for I/O-bound tasks like API calls and file operations, and ProcessPoolExecutor for CPU-bound work that needs to bypass Python’s Global Interpreter Lock
  • The concurrent.futures module provides a cleaner, more Pythonic interface than raw threading or multiprocessing, with automatic resource management through context managers
  • Choose executor.map() for simple batch processing and submit() with as_completed() when you need fine-grained control over task execution and result retrieval

Introduction to Concurrent Execution

Python’s concurrent.futures module is the standard library’s high-level interface for executing tasks concurrently. It abstracts away the complexity of threading and multiprocessing, providing a unified API for both approaches. This matters because choosing the wrong concurrency primitive leads to performance degradation rather than improvement.

The fundamental distinction: use threads for I/O-bound operations (network requests, disk access, database queries) and processes for CPU-bound tasks (data processing, calculations, image manipulation). Threads in Python don’t provide true parallelism for CPU work due to the Global Interpreter Lock (GIL), but they excel at managing I/O wait times. Processes bypass the GIL entirely but incur overhead from inter-process communication.

Here’s the performance difference in action:

import time
import requests
from concurrent.futures import ThreadPoolExecutor

urls = [
    'https://jsonplaceholder.typicode.com/posts/1',
    'https://jsonplaceholder.typicode.com/posts/2',
    'https://jsonplaceholder.typicode.com/posts/3',
    'https://jsonplaceholder.typicode.com/posts/4',
]

def fetch_url(url):
    response = requests.get(url)
    return len(response.content)

# Sequential execution
start = time.time()
results_seq = [fetch_url(url) for url in urls]
print(f"Sequential: {time.time() - start:.2f}s")

# Concurrent execution
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results_con = list(executor.map(fetch_url, urls))
print(f"Concurrent: {time.time() - start:.2f}s")

On a typical network connection, the concurrent version runs 3-4x faster because threads wait for I/O in parallel rather than sequentially.

ThreadPoolExecutor Fundamentals

ThreadPoolExecutor manages a pool of worker threads, distributing tasks among them automatically. The context manager pattern ensures proper cleanup even if exceptions occur.

Basic usage with submit() returns Future objects immediately, allowing you to continue program execution while tasks run in the background:

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def fetch_user_data(user_id):
    url = f'https://jsonplaceholder.typicode.com/users/{user_id}'
    response = requests.get(url)
    return response.json()

with ThreadPoolExecutor(max_workers=5) as executor:
    # Submit tasks and get Future objects
    futures = {executor.submit(fetch_user_data, i): i for i in range(1, 11)}
    
    # Process results as they complete
    for future in as_completed(futures):
        user_id = futures[future]
        try:
            data = future.result()
            print(f"User {user_id}: {data['name']}")
        except Exception as e:
            print(f"User {user_id} failed: {e}")

The max_workers parameter controls pool size. For I/O-bound tasks, you can often use more workers than CPU cores since threads spend most time waiting. Start with min(32, cpu_count() + 4) as a reasonable default for I/O operations.

ProcessPoolExecutor Fundamentals

When your bottleneck is CPU computation rather than I/O, ProcessPoolExecutor provides true parallelism by running tasks in separate Python processes. Each process has its own GIL, enabling simultaneous execution on multiple cores.

Here’s a CPU-bound task where processes dramatically outperform threads:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

def count_primes(start, end):
    return sum(1 for n in range(start, end) if is_prime(n))

ranges = [(i, i + 10000) for i in range(0, 100000, 10000)]

# Thread pool (limited by GIL)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(lambda r: count_primes(*r), ranges))
print(f"Threads: {time.time() - start:.2f}s, Total primes: {sum(results)}")

# Process pool (true parallelism)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(lambda r: count_primes(*r), ranges))
print(f"Processes: {time.time() - start:.2f}s, Total primes: {sum(results)}")

On a quad-core machine, ProcessPoolExecutor typically runs 3-4x faster for this workload. However, processes have startup overhead and must pickle/unpickle data for inter-process communication, making them inefficient for small, quick tasks.

Working with Futures

Future objects represent pending computations. They provide methods to check status, retrieve results, and handle exceptions:

from concurrent.futures import ThreadPoolExecutor
import time

def slow_operation(n):
    time.sleep(n)
    if n == 3:
        raise ValueError(f"Number {n} is unlucky!")
    return n * 2

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(slow_operation, i) for i in [1, 2, 3, 4]]
    
    for i, future in enumerate(futures):
        print(f"Future {i} done: {future.done()}")
    
    time.sleep(2)
    
    for i, future in enumerate(futures):
        try:
            # Timeout prevents indefinite blocking
            result = future.result(timeout=2)
            print(f"Future {i} result: {result}")
        except ValueError as e:
            print(f"Future {i} raised: {e}")
        except TimeoutError:
            print(f"Future {i} timed out")

Callbacks execute when futures complete, useful for updating progress indicators or aggregating results:

def process_result(future):
    try:
        result = future.result()
        print(f"Processed: {result}")
    except Exception as e:
        print(f"Error: {e}")

with ThreadPoolExecutor(max_workers=2) as executor:
    for i in range(5):
        future = executor.submit(slow_operation, i)
        future.add_done_callback(process_result)

Executor Patterns and Best Practices

The map() method provides a cleaner interface when you have a simple function to apply to multiple inputs:

from concurrent.futures import ThreadPoolExecutor

def process_item(item):
    return item.upper()

items = ['apple', 'banana', 'cherry', 'date']

with ThreadPoolExecutor(max_workers=2) as executor:
    # map() returns results in original order
    results = list(executor.map(process_item, items))
    print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

Use as_completed() when you want to process results as soon as they’re available rather than waiting for all tasks:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def variable_task(n):
    time.sleep(random.uniform(0.1, 2.0))
    return n * 2

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(variable_task, i): i for i in range(10)}
    
    for future in as_completed(futures, timeout=5):
        original_value = futures[future]
        try:
            result = future.result()
            print(f"Task {original_value} completed with result {result}")
        except Exception as e:
            print(f"Task {original_value} failed: {e}")

Always handle exceptions within your task functions or when retrieving results. Unhandled exceptions are stored in the Future and raised when you call result().

Real-World Use Case

Here’s a practical data processing pipeline that downloads data, processes it with CPU-intensive operations, and saves results:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
import json

def download_data(url):
    """I/O-bound: use threads"""
    response = requests.get(url)
    return response.json()

def process_data(data):
    """CPU-bound: use processes"""
    # Simulate expensive computation
    result = sum(item['id'] ** 2 for item in data if 'id' in item)
    return {'processed': result, 'count': len(data)}

def save_result(result):
    """I/O-bound: use threads"""
    with open(f"result_{result['count']}.json", 'w') as f:
        json.dump(result, f)
    return result['count']

urls = [f'https://jsonplaceholder.typicode.com/posts?userId={i}' 
        for i in range(1, 6)]

# Stage 1: Download with threads
with ThreadPoolExecutor(max_workers=5) as executor:
    downloaded_data = list(executor.map(download_data, urls))

print(f"Downloaded {len(downloaded_data)} datasets")

# Stage 2: Process with processes
with ProcessPoolExecutor(max_workers=4) as executor:
    processed_data = list(executor.map(process_data, downloaded_data))

print(f"Processed {len(processed_data)} datasets")

# Stage 3: Save with threads
with ThreadPoolExecutor(max_workers=3) as executor:
    saved_counts = list(executor.map(save_result, processed_data))

print(f"Saved {sum(saved_counts)} total items")

This pattern maximizes efficiency by using the right tool for each stage: threads for I/O, processes for computation.

Common Pitfalls and Performance Tips

Don’t create more workers than beneficial. Too many threads thrash the scheduler; too many processes exhaust memory:

import time
from concurrent.futures import ThreadPoolExecutor

def tiny_task(n):
    return n * 2

data = list(range(100))

# Bad: overhead dominates
start = time.time()
with ThreadPoolExecutor(max_workers=100) as executor:
    results = list(executor.map(tiny_task, data))
print(f"100 workers: {time.time() - start:.4f}s")

# Good: reasonable worker count
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(tiny_task, data))
print(f"4 workers: {time.time() - start:.4f}s")

# Best: no concurrency for trivial tasks
start = time.time()
results = [tiny_task(n) for n in data]
print(f"Sequential: {time.time() - start:.4f}s")

For ProcessPoolExecutor, remember that arguments and return values must be picklable. Lambda functions, local classes, and certain objects can’t be pickled. Define functions at module level and use simple data types when possible.

Set appropriate pool sizes: for CPU-bound work, use max_workers=cpu_count(). For I/O-bound work, experiment with higher values based on your specific workload characteristics. Monitor system resources to find the sweet spot.

Finally, don’t use concurrency when tasks are too small or when sequential execution is already fast enough. The overhead of thread/process management can make concurrent code slower than sequential code for trivial operations. Profile first, optimize second.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.