Python - Multiprocessing Tutorial

Python's Global Interpreter Lock prevents multiple threads from executing Python bytecode simultaneously. For I/O-bound operations, threading works fine since threads release the GIL during I/O...

Key Insights

  • Python’s multiprocessing module bypasses the Global Interpreter Lock (GIL) by spawning separate processes, enabling true parallel execution on multi-core systems for CPU-bound tasks
  • Process pools with Pool.map() and Pool.starmap() provide simple parallelization patterns, while Process objects offer fine-grained control for complex workflows
  • Shared memory objects (Value, Array, Manager) and queues enable inter-process communication, though serialization overhead makes them unsuitable for high-frequency data exchange

Understanding the GIL Problem

Python’s Global Interpreter Lock prevents multiple threads from executing Python bytecode simultaneously. For I/O-bound operations, threading works fine since threads release the GIL during I/O waits. For CPU-intensive tasks, you need multiprocessing.

import time
import threading
import multiprocessing

def cpu_intensive_task(n):
    """Simulate CPU-bound work"""
    count = 0
    for i in range(n):
        count += i ** 2
    return count

# Threading approach - limited by GIL
def test_threading():
    start = time.time()
    threads = []
    for _ in range(4):
        t = threading.Thread(target=cpu_intensive_task, args=(10_000_000,))
        t.start()
        threads.append(t)
    
    for t in threads:
        t.join()
    
    print(f"Threading: {time.time() - start:.2f}s")

# Multiprocessing approach - true parallelism
def test_multiprocessing():
    start = time.time()
    processes = []
    for _ in range(4):
        p = multiprocessing.Process(target=cpu_intensive_task, args=(10_000_000,))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Multiprocessing: {time.time() - start:.2f}s")

if __name__ == '__main__':
    test_threading()        # ~4.2s on quad-core
    test_multiprocessing()  # ~1.1s on quad-core

Process Pools for Simple Parallelization

Process pools abstract away process management. Use Pool.map() when you have a function and an iterable of arguments.

from multiprocessing import Pool
import time

def process_data(item):
    """Process a single item"""
    result = sum(i ** 2 for i in range(item))
    return result

def sequential_processing(data):
    start = time.time()
    results = [process_data(item) for item in data]
    print(f"Sequential: {time.time() - start:.2f}s")
    return results

def parallel_processing(data):
    start = time.time()
    with Pool(processes=4) as pool:
        results = pool.map(process_data, data)
    print(f"Parallel: {time.time() - start:.2f}s")
    return results

if __name__ == '__main__':
    data = [1_000_000] * 8
    
    seq_results = sequential_processing(data)
    par_results = parallel_processing(data)
    
    assert seq_results == par_results

For functions requiring multiple arguments, use starmap():

from multiprocessing import Pool

def calculate_range(start, end, step):
    """Calculate sum of range with custom step"""
    return sum(range(start, end, step))

if __name__ == '__main__':
    # List of argument tuples
    tasks = [
        (0, 1_000_000, 1),
        (0, 1_000_000, 2),
        (0, 1_000_000, 3),
        (0, 1_000_000, 4),
    ]
    
    with Pool(processes=4) as pool:
        results = pool.starmap(calculate_range, tasks)
    
    print(results)

Handling Return Values and Error Management

Use apply_async() for non-blocking execution with callbacks:

from multiprocessing import Pool
import time

def long_running_task(x):
    time.sleep(2)
    if x == 3:
        raise ValueError(f"Invalid input: {x}")
    return x * x

def success_callback(result):
    print(f"Success: {result}")

def error_callback(error):
    print(f"Error: {error}")

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        results = []
        
        for i in range(5):
            result = pool.apply_async(
                long_running_task,
                args=(i,),
                callback=success_callback,
                error_callback=error_callback
            )
            results.append(result)
        
        # Wait for all tasks to complete
        pool.close()
        pool.join()
        
        # Get results (will raise exception if task failed)
        for i, r in enumerate(results):
            try:
                value = r.get(timeout=1)
                print(f"Task {i}: {value}")
            except Exception as e:
                print(f"Task {i} failed: {e}")

Inter-Process Communication with Queues

Queues enable safe data exchange between processes:

from multiprocessing import Process, Queue
import time

def producer(queue, items):
    """Produce items and put them in queue"""
    for item in items:
        print(f"Producing: {item}")
        queue.put(item)
        time.sleep(0.5)
    queue.put(None)  # Sentinel value to signal completion

def consumer(queue):
    """Consume items from queue"""
    while True:
        item = queue.get()
        if item is None:  # Check for sentinel
            break
        print(f"Consuming: {item}")
        time.sleep(1)

if __name__ == '__main__':
    q = Queue()
    
    items = ['data_1', 'data_2', 'data_3', 'data_4']
    
    prod = Process(target=producer, args=(q, items))
    cons = Process(target=consumer, args=(q,))
    
    prod.start()
    cons.start()
    
    prod.join()
    cons.join()

Shared Memory for Performance

Shared memory objects avoid serialization overhead but require synchronization:

from multiprocessing import Process, Value, Array, Lock
import time

def increment_counter(counter, lock, iterations):
    """Safely increment shared counter"""
    for _ in range(iterations):
        with lock:
            counter.value += 1

def update_array(shared_array, lock, index, value):
    """Update shared array element"""
    with lock:
        shared_array[index] = value

if __name__ == '__main__':
    # Shared value with lock
    counter = Value('i', 0)  # 'i' = integer
    lock = Lock()
    
    processes = []
    for _ in range(4):
        p = Process(target=increment_counter, args=(counter, lock, 25000))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Final counter value: {counter.value}")  # Should be 100000
    
    # Shared array
    shared_arr = Array('d', [0.0] * 10)  # 'd' = double
    arr_lock = Lock()
    
    processes = []
    for i in range(10):
        p = Process(target=update_array, args=(shared_arr, arr_lock, i, i * 1.5))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Shared array: {list(shared_arr)}")

Manager for Complex Shared Objects

Use Manager for dictionaries, lists, and custom objects:

from multiprocessing import Process, Manager
import time

def update_shared_dict(shared_dict, key, value):
    """Update shared dictionary"""
    shared_dict[key] = value
    time.sleep(0.1)

def append_to_list(shared_list, value):
    """Append to shared list"""
    shared_list.append(value)

if __name__ == '__main__':
    with Manager() as manager:
        # Shared dictionary
        shared_dict = manager.dict()
        
        processes = []
        for i in range(5):
            p = Process(target=update_shared_dict, args=(shared_dict, f'key_{i}', i * 10))
            p.start()
            processes.append(p)
        
        for p in processes:
            p.join()
        
        print(f"Shared dict: {dict(shared_dict)}")
        
        # Shared list
        shared_list = manager.list()
        
        processes = []
        for i in range(5):
            p = Process(target=append_to_list, args=(shared_list, i))
            p.start()
            processes.append(p)
        
        for p in processes:
            p.join()
        
        print(f"Shared list: {list(shared_list)}")

Process Synchronization Primitives

Control process execution order with Events and Semaphores:

from multiprocessing import Process, Event, Semaphore
import time

def wait_for_event(event, process_id):
    """Wait for event before proceeding"""
    print(f"Process {process_id} waiting...")
    event.wait()
    print(f"Process {process_id} proceeding!")

def limited_resource(semaphore, process_id):
    """Access limited resource using semaphore"""
    print(f"Process {process_id} attempting to acquire...")
    with semaphore:
        print(f"Process {process_id} acquired resource")
        time.sleep(2)
    print(f"Process {process_id} released resource")

if __name__ == '__main__':
    # Event example
    event = Event()
    
    processes = []
    for i in range(3):
        p = Process(target=wait_for_event, args=(event, i))
        p.start()
        processes.append(p)
    
    time.sleep(2)
    print("Setting event...")
    event.set()  # Release all waiting processes
    
    for p in processes:
        p.join()
    
    # Semaphore example - limit to 2 concurrent processes
    semaphore = Semaphore(2)
    
    processes = []
    for i in range(5):
        p = Process(target=limited_resource, args=(semaphore, i))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()

Best Practices

Choose the right process count based on workload. For CPU-bound tasks, use multiprocessing.cpu_count(). Avoid creating more processes than CPU cores for compute-intensive work.

Always use if __name__ == '__main__': guards to prevent recursive process spawning on Windows and macOS. Minimize data transfer between processes—serialization with pickle is expensive. For large datasets, consider memory-mapped files or shared memory.

Handle process cleanup properly. Use context managers (with Pool()) or explicitly call terminate() and join(). Orphaned processes consume system resources.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.