Python Multiprocessing: Parallel Execution Guide
Python's Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecode simultaneously. This means that even on a...
Key Insights
- Python’s Global Interpreter Lock (GIL) prevents true parallel execution of threads for CPU-bound tasks, making multiprocessing essential for leveraging multiple cores
- The
multiprocessing.Poolclass provides the most practical interface for parallelizing data processing tasks with minimal boilerplate code - Inter-process communication requires serializable objects (pickling), and shared state must be explicitly synchronized with locks to prevent race conditions
Introduction to Multiprocessing vs Threading
Python’s Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecode simultaneously. This means that even on a multi-core processor, CPU-bound Python threads execute sequentially. The GIL exists because Python’s memory management isn’t thread-safe, but it creates a significant performance bottleneck.
For I/O-bound operations—network requests, file operations, database queries—threading works well because threads release the GIL while waiting for I/O. But for CPU-intensive tasks like data processing, image manipulation, or mathematical computations, you need true parallelism. That’s where multiprocessing comes in.
Here’s a concrete benchmark demonstrating the difference:
import time
import threading
import multiprocessing
def calculate_primes(n):
"""CPU-intensive task: count primes up to n"""
primes = 0
for num in range(2, n):
if all(num % i != 0 for i in range(2, int(num ** 0.5) + 1)):
primes += 1
return primes
def benchmark_sequential(iterations):
start = time.time()
results = [calculate_primes(50000) for _ in range(iterations)]
return time.time() - start
def benchmark_threading(iterations):
start = time.time()
threads = [threading.Thread(target=calculate_primes, args=(50000,))
for _ in range(iterations)]
for t in threads: t.start()
for t in threads: t.join()
return time.time() - start
def benchmark_multiprocessing(iterations):
start = time.time()
with multiprocessing.Pool() as pool:
results = pool.map(calculate_primes, [50000] * iterations)
return time.time() - start
if __name__ == '__main__':
iterations = 4
print(f"Sequential: {benchmark_sequential(iterations):.2f}s")
print(f"Threading: {benchmark_threading(iterations):.2f}s")
print(f"Multiprocessing: {benchmark_multiprocessing(iterations):.2f}s")
On a quad-core machine, you’ll see threading performs similarly to sequential execution (due to the GIL), while multiprocessing achieves near-linear speedup.
Getting Started with the Multiprocessing Module
The Process class is the fundamental building block. Each Process object represents an activity running in a separate process with its own Python interpreter and memory space.
from multiprocessing import Process
import os
def worker(name, iterations):
"""Function to run in separate process"""
print(f"Worker {name} starting (PID: {os.getpid()})")
total = sum(i * i for i in range(iterations))
print(f"Worker {name} finished: {total}")
if __name__ == '__main__':
print(f"Main process PID: {os.getpid()}")
# Create processes
p1 = Process(target=worker, args=('A', 1000000))
p2 = Process(target=worker, args=('B', 2000000))
# Start processes
p1.start()
p2.start()
# Wait for completion
p1.join()
p2.join()
print("All workers completed")
The if __name__ == '__main__' guard is critical on Windows and recommended everywhere. When a new process starts, it imports the main module. Without this guard, each child process would spawn its own children, creating an infinite cascade of processes.
You can also pass keyword arguments and use daemon processes:
if __name__ == '__main__':
p = Process(target=worker, kwargs={'name': 'C', 'iterations': 500000})
p.daemon = True # Dies when main process exits
p.start()
p.join(timeout=5) # Wait max 5 seconds
if p.is_alive():
p.terminate() # Force kill
p.join()
Process Pools and Parallel Mapping
For most real-world scenarios, the Pool class provides a superior interface. It manages a pool of worker processes, distributing tasks and collecting results automatically.
from multiprocessing import Pool
import time
def process_item(item):
"""Simulate processing work"""
time.sleep(0.1)
return item * item
if __name__ == '__main__':
data = range(20)
# Automatic worker count (CPU cores)
with Pool() as pool:
results = pool.map(process_item, data)
print(f"Results: {results}")
# Specify worker count
with Pool(processes=4) as pool:
# For functions with multiple arguments
pairs = [(2, 3), (4, 5), (6, 7)]
results = pool.starmap(pow, pairs)
print(f"Powers: {results}")
For non-blocking execution with callbacks, use apply_async():
def process_callback(result):
print(f"Got result: {result}")
def error_callback(error):
print(f"Error occurred: {error}")
if __name__ == '__main__':
with Pool(4) as pool:
# Submit tasks asynchronously
async_results = []
for i in range(10):
result = pool.apply_async(
process_item,
args=(i,),
callback=process_callback,
error_callback=error_callback
)
async_results.append(result)
# Wait for all tasks
for result in async_results:
result.wait()
The imap() method returns an iterator, useful for processing large datasets without loading all results into memory:
if __name__ == '__main__':
with Pool() as pool:
for result in pool.imap(process_item, range(1000000)):
# Process results as they arrive
if result > 1000:
break
Inter-Process Communication
Processes don’t share memory by default. You need explicit mechanisms for communication.
Queues are the most common approach, providing thread and process-safe FIFO communication:
from multiprocessing import Process, Queue
def producer(queue, items):
for item in items:
queue.put(item)
print(f"Produced: {item}")
queue.put(None) # Sentinel value
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consumed: {item}")
if __name__ == '__main__':
q = Queue()
items = [1, 2, 3, 4, 5]
prod = Process(target=producer, args=(q, items))
cons = Process(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()
Pipes provide two-way communication between exactly two processes:
from multiprocessing import Process, Pipe
def worker_process(conn):
conn.send("Hello from worker")
response = conn.recv()
print(f"Worker received: {response}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker_process, args=(child_conn,))
p.start()
msg = parent_conn.recv()
print(f"Parent received: {msg}")
parent_conn.send("Hello from parent")
p.join()
Shared memory with Value and Array for simple data types:
from multiprocessing import Process, Value, Lock
def increment_counter(counter, lock, iterations):
for _ in range(iterations):
with lock: # Prevent race conditions
counter.value += 1
if __name__ == '__main__':
counter = Value('i', 0) # 'i' = integer
lock = Lock()
processes = [
Process(target=increment_counter, args=(counter, lock, 10000))
for _ in range(4)
]
for p in processes: p.start()
for p in processes: p.join()
print(f"Final count: {counter.value}") # Should be 40000
Synchronization and Shared State
Without proper synchronization, concurrent access to shared resources causes race conditions. Here’s a demonstration:
from multiprocessing import Process, Value, Lock
import time
def unsafe_increment(counter, iterations):
"""Race condition: read-modify-write not atomic"""
for _ in range(iterations):
temp = counter.value
time.sleep(0.000001) # Exaggerate timing issue
counter.value = temp + 1
def safe_increment(counter, lock, iterations):
"""Protected with lock"""
for _ in range(iterations):
with lock:
counter.value += 1
if __name__ == '__main__':
# Unsafe version
counter = Value('i', 0)
procs = [Process(target=unsafe_increment, args=(counter, 100)) for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print(f"Unsafe result: {counter.value} (expected 400)")
# Safe version
counter = Value('i', 0)
lock = Lock()
procs = [Process(target=safe_increment, args=(counter, lock, 100)) for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print(f"Safe result: {counter.value}")
Use Event for signaling between processes:
from multiprocessing import Process, Event
def wait_for_signal(event, name):
print(f"{name} waiting...")
event.wait() # Block until set
print(f"{name} proceeding!")
if __name__ == '__main__':
event = Event()
workers = [Process(target=wait_for_signal, args=(event, f"Worker-{i}"))
for i in range(3)]
for w in workers: w.start()
time.sleep(2)
event.set() # Release all workers
for w in workers: w.join()
Best Practices and Common Pitfalls
Pickling limitations: Objects passed between processes must be serializable. Lambda functions, local classes, and certain objects won’t work:
# This fails - lambdas can't be pickled
# pool.map(lambda x: x * 2, data)
# Use named functions instead
def double(x):
return x * 2
with Pool() as pool:
pool.map(double, data)
Process initialization for expensive setup:
import sqlite3
def init_worker():
"""Called once per worker process"""
global db_conn
db_conn = sqlite3.connect('data.db')
def query_database(query):
return db_conn.execute(query).fetchall()
if __name__ == '__main__':
with Pool(initializer=init_worker) as pool:
results = pool.map(query_database, queries)
Error handling in pools:
def risky_function(x):
if x < 0:
raise ValueError("Negative value")
return x * 2
if __name__ == '__main__':
data = [1, 2, -3, 4]
with Pool() as pool:
try:
results = pool.map(risky_function, data)
except ValueError as e:
print(f"Error in worker: {e}")
Real-World Use Case: Parallel Image Processing
Here’s a complete example processing images in parallel with progress tracking:
from multiprocessing import Pool, Queue, Manager
from pathlib import Path
from PIL import Image
import time
def process_image(args):
"""Resize image and return stats"""
filepath, output_dir, size = args
try:
img = Image.open(filepath)
img.thumbnail(size)
output_path = output_dir / filepath.name
img.save(output_path)
return {'file': filepath.name, 'status': 'success', 'size': img.size}
except Exception as e:
return {'file': filepath.name, 'status': 'error', 'error': str(e)}
def init_pool_worker():
"""Suppress worker output"""
import warnings
warnings.filterwarnings('ignore')
if __name__ == '__main__':
input_dir = Path('images')
output_dir = Path('thumbnails')
output_dir.mkdir(exist_ok=True)
image_files = list(input_dir.glob('*.jpg'))
tasks = [(f, output_dir, (300, 300)) for f in image_files]
start_time = time.time()
with Pool(processes=4, initializer=init_pool_worker) as pool:
results = pool.map(process_image, tasks)
# Analyze results
successful = sum(1 for r in results if r['status'] == 'success')
failed = sum(1 for r in results if r['status'] == 'error')
print(f"Processed {len(results)} images in {time.time() - start_time:.2f}s")
print(f"Success: {successful}, Failed: {failed}")
for result in results:
if result['status'] == 'error':
print(f"Error processing {result['file']}: {result['error']}")
Multiprocessing unlocks Python’s full potential on modern multi-core systems. Use Pool for data processing tasks, implement proper synchronization for shared state, and always remember the pickling constraints. With these patterns, you can achieve substantial performance improvements for CPU-bound workloads.