Python concurrent.futures: Thread and Process Pools
Python's `concurrent.futures` module is the standard library's high-level interface for executing tasks concurrently. It abstracts away the complexity of threading and multiprocessing, providing a...
Key Insights
- Use
ThreadPoolExecutorfor I/O-bound tasks like API calls and file operations, andProcessPoolExecutorfor CPU-bound work that needs to bypass Python’s Global Interpreter Lock - The
concurrent.futuresmodule provides a cleaner, more Pythonic interface than raw threading or multiprocessing, with automatic resource management through context managers - Choose
executor.map()for simple batch processing andsubmit()withas_completed()when you need fine-grained control over task execution and result retrieval
Introduction to Concurrent Execution
Python’s concurrent.futures module is the standard library’s high-level interface for executing tasks concurrently. It abstracts away the complexity of threading and multiprocessing, providing a unified API for both approaches. This matters because choosing the wrong concurrency primitive leads to performance degradation rather than improvement.
The fundamental distinction: use threads for I/O-bound operations (network requests, disk access, database queries) and processes for CPU-bound tasks (data processing, calculations, image manipulation). Threads in Python don’t provide true parallelism for CPU work due to the Global Interpreter Lock (GIL), but they excel at managing I/O wait times. Processes bypass the GIL entirely but incur overhead from inter-process communication.
Here’s the performance difference in action:
import time
import requests
from concurrent.futures import ThreadPoolExecutor
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
'https://jsonplaceholder.typicode.com/posts/4',
]
def fetch_url(url):
response = requests.get(url)
return len(response.content)
# Sequential execution
start = time.time()
results_seq = [fetch_url(url) for url in urls]
print(f"Sequential: {time.time() - start:.2f}s")
# Concurrent execution
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results_con = list(executor.map(fetch_url, urls))
print(f"Concurrent: {time.time() - start:.2f}s")
On a typical network connection, the concurrent version runs 3-4x faster because threads wait for I/O in parallel rather than sequentially.
ThreadPoolExecutor Fundamentals
ThreadPoolExecutor manages a pool of worker threads, distributing tasks among them automatically. The context manager pattern ensures proper cleanup even if exceptions occur.
Basic usage with submit() returns Future objects immediately, allowing you to continue program execution while tasks run in the background:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch_user_data(user_id):
url = f'https://jsonplaceholder.typicode.com/users/{user_id}'
response = requests.get(url)
return response.json()
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit tasks and get Future objects
futures = {executor.submit(fetch_user_data, i): i for i in range(1, 11)}
# Process results as they complete
for future in as_completed(futures):
user_id = futures[future]
try:
data = future.result()
print(f"User {user_id}: {data['name']}")
except Exception as e:
print(f"User {user_id} failed: {e}")
The max_workers parameter controls pool size. For I/O-bound tasks, you can often use more workers than CPU cores since threads spend most time waiting. Start with min(32, cpu_count() + 4) as a reasonable default for I/O operations.
ProcessPoolExecutor Fundamentals
When your bottleneck is CPU computation rather than I/O, ProcessPoolExecutor provides true parallelism by running tasks in separate Python processes. Each process has its own GIL, enabling simultaneous execution on multiple cores.
Here’s a CPU-bound task where processes dramatically outperform threads:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
def count_primes(start, end):
return sum(1 for n in range(start, end) if is_prime(n))
ranges = [(i, i + 10000) for i in range(0, 100000, 10000)]
# Thread pool (limited by GIL)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(lambda r: count_primes(*r), ranges))
print(f"Threads: {time.time() - start:.2f}s, Total primes: {sum(results)}")
# Process pool (true parallelism)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(lambda r: count_primes(*r), ranges))
print(f"Processes: {time.time() - start:.2f}s, Total primes: {sum(results)}")
On a quad-core machine, ProcessPoolExecutor typically runs 3-4x faster for this workload. However, processes have startup overhead and must pickle/unpickle data for inter-process communication, making them inefficient for small, quick tasks.
Working with Futures
Future objects represent pending computations. They provide methods to check status, retrieve results, and handle exceptions:
from concurrent.futures import ThreadPoolExecutor
import time
def slow_operation(n):
time.sleep(n)
if n == 3:
raise ValueError(f"Number {n} is unlucky!")
return n * 2
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(slow_operation, i) for i in [1, 2, 3, 4]]
for i, future in enumerate(futures):
print(f"Future {i} done: {future.done()}")
time.sleep(2)
for i, future in enumerate(futures):
try:
# Timeout prevents indefinite blocking
result = future.result(timeout=2)
print(f"Future {i} result: {result}")
except ValueError as e:
print(f"Future {i} raised: {e}")
except TimeoutError:
print(f"Future {i} timed out")
Callbacks execute when futures complete, useful for updating progress indicators or aggregating results:
def process_result(future):
try:
result = future.result()
print(f"Processed: {result}")
except Exception as e:
print(f"Error: {e}")
with ThreadPoolExecutor(max_workers=2) as executor:
for i in range(5):
future = executor.submit(slow_operation, i)
future.add_done_callback(process_result)
Executor Patterns and Best Practices
The map() method provides a cleaner interface when you have a simple function to apply to multiple inputs:
from concurrent.futures import ThreadPoolExecutor
def process_item(item):
return item.upper()
items = ['apple', 'banana', 'cherry', 'date']
with ThreadPoolExecutor(max_workers=2) as executor:
# map() returns results in original order
results = list(executor.map(process_item, items))
print(results) # ['APPLE', 'BANANA', 'CHERRY', 'DATE']
Use as_completed() when you want to process results as soon as they’re available rather than waiting for all tasks:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def variable_task(n):
time.sleep(random.uniform(0.1, 2.0))
return n * 2
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(variable_task, i): i for i in range(10)}
for future in as_completed(futures, timeout=5):
original_value = futures[future]
try:
result = future.result()
print(f"Task {original_value} completed with result {result}")
except Exception as e:
print(f"Task {original_value} failed: {e}")
Always handle exceptions within your task functions or when retrieving results. Unhandled exceptions are stored in the Future and raised when you call result().
Real-World Use Case
Here’s a practical data processing pipeline that downloads data, processes it with CPU-intensive operations, and saves results:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
import json
def download_data(url):
"""I/O-bound: use threads"""
response = requests.get(url)
return response.json()
def process_data(data):
"""CPU-bound: use processes"""
# Simulate expensive computation
result = sum(item['id'] ** 2 for item in data if 'id' in item)
return {'processed': result, 'count': len(data)}
def save_result(result):
"""I/O-bound: use threads"""
with open(f"result_{result['count']}.json", 'w') as f:
json.dump(result, f)
return result['count']
urls = [f'https://jsonplaceholder.typicode.com/posts?userId={i}'
for i in range(1, 6)]
# Stage 1: Download with threads
with ThreadPoolExecutor(max_workers=5) as executor:
downloaded_data = list(executor.map(download_data, urls))
print(f"Downloaded {len(downloaded_data)} datasets")
# Stage 2: Process with processes
with ProcessPoolExecutor(max_workers=4) as executor:
processed_data = list(executor.map(process_data, downloaded_data))
print(f"Processed {len(processed_data)} datasets")
# Stage 3: Save with threads
with ThreadPoolExecutor(max_workers=3) as executor:
saved_counts = list(executor.map(save_result, processed_data))
print(f"Saved {sum(saved_counts)} total items")
This pattern maximizes efficiency by using the right tool for each stage: threads for I/O, processes for computation.
Common Pitfalls and Performance Tips
Don’t create more workers than beneficial. Too many threads thrash the scheduler; too many processes exhaust memory:
import time
from concurrent.futures import ThreadPoolExecutor
def tiny_task(n):
return n * 2
data = list(range(100))
# Bad: overhead dominates
start = time.time()
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(tiny_task, data))
print(f"100 workers: {time.time() - start:.4f}s")
# Good: reasonable worker count
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(tiny_task, data))
print(f"4 workers: {time.time() - start:.4f}s")
# Best: no concurrency for trivial tasks
start = time.time()
results = [tiny_task(n) for n in data]
print(f"Sequential: {time.time() - start:.4f}s")
For ProcessPoolExecutor, remember that arguments and return values must be picklable. Lambda functions, local classes, and certain objects can’t be pickled. Define functions at module level and use simple data types when possible.
Set appropriate pool sizes: for CPU-bound work, use max_workers=cpu_count(). For I/O-bound work, experiment with higher values based on your specific workload characteristics. Monitor system resources to find the sweet spot.
Finally, don’t use concurrency when tasks are too small or when sequential execution is already fast enough. The overhead of thread/process management can make concurrent code slower than sequential code for trivial operations. Profile first, optimize second.