Python Multiprocessing: Parallel Execution Guide

Python's Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecode simultaneously. This means that even on a...

Key Insights

  • Python’s Global Interpreter Lock (GIL) prevents true parallel execution of threads for CPU-bound tasks, making multiprocessing essential for leveraging multiple cores
  • The multiprocessing.Pool class provides the most practical interface for parallelizing data processing tasks with minimal boilerplate code
  • Inter-process communication requires serializable objects (pickling), and shared state must be explicitly synchronized with locks to prevent race conditions

Introduction to Multiprocessing vs Threading

Python’s Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecode simultaneously. This means that even on a multi-core processor, CPU-bound Python threads execute sequentially. The GIL exists because Python’s memory management isn’t thread-safe, but it creates a significant performance bottleneck.

For I/O-bound operations—network requests, file operations, database queries—threading works well because threads release the GIL while waiting for I/O. But for CPU-intensive tasks like data processing, image manipulation, or mathematical computations, you need true parallelism. That’s where multiprocessing comes in.

Here’s a concrete benchmark demonstrating the difference:

import time
import threading
import multiprocessing

def calculate_primes(n):
    """CPU-intensive task: count primes up to n"""
    primes = 0
    for num in range(2, n):
        if all(num % i != 0 for i in range(2, int(num ** 0.5) + 1)):
            primes += 1
    return primes

def benchmark_sequential(iterations):
    start = time.time()
    results = [calculate_primes(50000) for _ in range(iterations)]
    return time.time() - start

def benchmark_threading(iterations):
    start = time.time()
    threads = [threading.Thread(target=calculate_primes, args=(50000,)) 
               for _ in range(iterations)]
    for t in threads: t.start()
    for t in threads: t.join()
    return time.time() - start

def benchmark_multiprocessing(iterations):
    start = time.time()
    with multiprocessing.Pool() as pool:
        results = pool.map(calculate_primes, [50000] * iterations)
    return time.time() - start

if __name__ == '__main__':
    iterations = 4
    print(f"Sequential: {benchmark_sequential(iterations):.2f}s")
    print(f"Threading: {benchmark_threading(iterations):.2f}s")
    print(f"Multiprocessing: {benchmark_multiprocessing(iterations):.2f}s")

On a quad-core machine, you’ll see threading performs similarly to sequential execution (due to the GIL), while multiprocessing achieves near-linear speedup.

Getting Started with the Multiprocessing Module

The Process class is the fundamental building block. Each Process object represents an activity running in a separate process with its own Python interpreter and memory space.

from multiprocessing import Process
import os

def worker(name, iterations):
    """Function to run in separate process"""
    print(f"Worker {name} starting (PID: {os.getpid()})")
    total = sum(i * i for i in range(iterations))
    print(f"Worker {name} finished: {total}")

if __name__ == '__main__':
    print(f"Main process PID: {os.getpid()}")
    
    # Create processes
    p1 = Process(target=worker, args=('A', 1000000))
    p2 = Process(target=worker, args=('B', 2000000))
    
    # Start processes
    p1.start()
    p2.start()
    
    # Wait for completion
    p1.join()
    p2.join()
    
    print("All workers completed")

The if __name__ == '__main__' guard is critical on Windows and recommended everywhere. When a new process starts, it imports the main module. Without this guard, each child process would spawn its own children, creating an infinite cascade of processes.

You can also pass keyword arguments and use daemon processes:

if __name__ == '__main__':
    p = Process(target=worker, kwargs={'name': 'C', 'iterations': 500000})
    p.daemon = True  # Dies when main process exits
    p.start()
    p.join(timeout=5)  # Wait max 5 seconds
    
    if p.is_alive():
        p.terminate()  # Force kill
        p.join()

Process Pools and Parallel Mapping

For most real-world scenarios, the Pool class provides a superior interface. It manages a pool of worker processes, distributing tasks and collecting results automatically.

from multiprocessing import Pool
import time

def process_item(item):
    """Simulate processing work"""
    time.sleep(0.1)
    return item * item

if __name__ == '__main__':
    data = range(20)
    
    # Automatic worker count (CPU cores)
    with Pool() as pool:
        results = pool.map(process_item, data)
    
    print(f"Results: {results}")
    
    # Specify worker count
    with Pool(processes=4) as pool:
        # For functions with multiple arguments
        pairs = [(2, 3), (4, 5), (6, 7)]
        results = pool.starmap(pow, pairs)
        print(f"Powers: {results}")

For non-blocking execution with callbacks, use apply_async():

def process_callback(result):
    print(f"Got result: {result}")

def error_callback(error):
    print(f"Error occurred: {error}")

if __name__ == '__main__':
    with Pool(4) as pool:
        # Submit tasks asynchronously
        async_results = []
        for i in range(10):
            result = pool.apply_async(
                process_item, 
                args=(i,),
                callback=process_callback,
                error_callback=error_callback
            )
            async_results.append(result)
        
        # Wait for all tasks
        for result in async_results:
            result.wait()

The imap() method returns an iterator, useful for processing large datasets without loading all results into memory:

if __name__ == '__main__':
    with Pool() as pool:
        for result in pool.imap(process_item, range(1000000)):
            # Process results as they arrive
            if result > 1000:
                break

Inter-Process Communication

Processes don’t share memory by default. You need explicit mechanisms for communication.

Queues are the most common approach, providing thread and process-safe FIFO communication:

from multiprocessing import Process, Queue

def producer(queue, items):
    for item in items:
        queue.put(item)
        print(f"Produced: {item}")
    queue.put(None)  # Sentinel value

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")

if __name__ == '__main__':
    q = Queue()
    items = [1, 2, 3, 4, 5]
    
    prod = Process(target=producer, args=(q, items))
    cons = Process(target=consumer, args=(q,))
    
    prod.start()
    cons.start()
    
    prod.join()
    cons.join()

Pipes provide two-way communication between exactly two processes:

from multiprocessing import Process, Pipe

def worker_process(conn):
    conn.send("Hello from worker")
    response = conn.recv()
    print(f"Worker received: {response}")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker_process, args=(child_conn,))
    p.start()
    
    msg = parent_conn.recv()
    print(f"Parent received: {msg}")
    parent_conn.send("Hello from parent")
    
    p.join()

Shared memory with Value and Array for simple data types:

from multiprocessing import Process, Value, Lock

def increment_counter(counter, lock, iterations):
    for _ in range(iterations):
        with lock:  # Prevent race conditions
            counter.value += 1

if __name__ == '__main__':
    counter = Value('i', 0)  # 'i' = integer
    lock = Lock()
    
    processes = [
        Process(target=increment_counter, args=(counter, lock, 10000))
        for _ in range(4)
    ]
    
    for p in processes: p.start()
    for p in processes: p.join()
    
    print(f"Final count: {counter.value}")  # Should be 40000

Synchronization and Shared State

Without proper synchronization, concurrent access to shared resources causes race conditions. Here’s a demonstration:

from multiprocessing import Process, Value, Lock
import time

def unsafe_increment(counter, iterations):
    """Race condition: read-modify-write not atomic"""
    for _ in range(iterations):
        temp = counter.value
        time.sleep(0.000001)  # Exaggerate timing issue
        counter.value = temp + 1

def safe_increment(counter, lock, iterations):
    """Protected with lock"""
    for _ in range(iterations):
        with lock:
            counter.value += 1

if __name__ == '__main__':
    # Unsafe version
    counter = Value('i', 0)
    procs = [Process(target=unsafe_increment, args=(counter, 100)) for _ in range(4)]
    for p in procs: p.start()
    for p in procs: p.join()
    print(f"Unsafe result: {counter.value} (expected 400)")
    
    # Safe version
    counter = Value('i', 0)
    lock = Lock()
    procs = [Process(target=safe_increment, args=(counter, lock, 100)) for _ in range(4)]
    for p in procs: p.start()
    for p in procs: p.join()
    print(f"Safe result: {counter.value}")

Use Event for signaling between processes:

from multiprocessing import Process, Event

def wait_for_signal(event, name):
    print(f"{name} waiting...")
    event.wait()  # Block until set
    print(f"{name} proceeding!")

if __name__ == '__main__':
    event = Event()
    workers = [Process(target=wait_for_signal, args=(event, f"Worker-{i}")) 
               for i in range(3)]
    
    for w in workers: w.start()
    time.sleep(2)
    event.set()  # Release all workers
    for w in workers: w.join()

Best Practices and Common Pitfalls

Pickling limitations: Objects passed between processes must be serializable. Lambda functions, local classes, and certain objects won’t work:

# This fails - lambdas can't be pickled
# pool.map(lambda x: x * 2, data)

# Use named functions instead
def double(x):
    return x * 2

with Pool() as pool:
    pool.map(double, data)

Process initialization for expensive setup:

import sqlite3

def init_worker():
    """Called once per worker process"""
    global db_conn
    db_conn = sqlite3.connect('data.db')

def query_database(query):
    return db_conn.execute(query).fetchall()

if __name__ == '__main__':
    with Pool(initializer=init_worker) as pool:
        results = pool.map(query_database, queries)

Error handling in pools:

def risky_function(x):
    if x < 0:
        raise ValueError("Negative value")
    return x * 2

if __name__ == '__main__':
    data = [1, 2, -3, 4]
    with Pool() as pool:
        try:
            results = pool.map(risky_function, data)
        except ValueError as e:
            print(f"Error in worker: {e}")

Real-World Use Case: Parallel Image Processing

Here’s a complete example processing images in parallel with progress tracking:

from multiprocessing import Pool, Queue, Manager
from pathlib import Path
from PIL import Image
import time

def process_image(args):
    """Resize image and return stats"""
    filepath, output_dir, size = args
    try:
        img = Image.open(filepath)
        img.thumbnail(size)
        
        output_path = output_dir / filepath.name
        img.save(output_path)
        
        return {'file': filepath.name, 'status': 'success', 'size': img.size}
    except Exception as e:
        return {'file': filepath.name, 'status': 'error', 'error': str(e)}

def init_pool_worker():
    """Suppress worker output"""
    import warnings
    warnings.filterwarnings('ignore')

if __name__ == '__main__':
    input_dir = Path('images')
    output_dir = Path('thumbnails')
    output_dir.mkdir(exist_ok=True)
    
    image_files = list(input_dir.glob('*.jpg'))
    tasks = [(f, output_dir, (300, 300)) for f in image_files]
    
    start_time = time.time()
    
    with Pool(processes=4, initializer=init_pool_worker) as pool:
        results = pool.map(process_image, tasks)
    
    # Analyze results
    successful = sum(1 for r in results if r['status'] == 'success')
    failed = sum(1 for r in results if r['status'] == 'error')
    
    print(f"Processed {len(results)} images in {time.time() - start_time:.2f}s")
    print(f"Success: {successful}, Failed: {failed}")
    
    for result in results:
        if result['status'] == 'error':
            print(f"Error processing {result['file']}: {result['error']}")

Multiprocessing unlocks Python’s full potential on modern multi-core systems. Use Pool for data processing tasks, implement proper synchronization for shared state, and always remember the pickling constraints. With these patterns, you can achieve substantial performance improvements for CPU-bound workloads.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.