Python - Multiprocessing Tutorial
Python's Global Interpreter Lock prevents multiple threads from executing Python bytecode simultaneously. For I/O-bound operations, threading works fine since threads release the GIL during I/O...
Key Insights
- Python’s multiprocessing module bypasses the Global Interpreter Lock (GIL) by spawning separate processes, enabling true parallel execution on multi-core systems for CPU-bound tasks
- Process pools with
Pool.map()andPool.starmap()provide simple parallelization patterns, whileProcessobjects offer fine-grained control for complex workflows - Shared memory objects (
Value,Array,Manager) and queues enable inter-process communication, though serialization overhead makes them unsuitable for high-frequency data exchange
Understanding the GIL Problem
Python’s Global Interpreter Lock prevents multiple threads from executing Python bytecode simultaneously. For I/O-bound operations, threading works fine since threads release the GIL during I/O waits. For CPU-intensive tasks, you need multiprocessing.
import time
import threading
import multiprocessing
def cpu_intensive_task(n):
"""Simulate CPU-bound work"""
count = 0
for i in range(n):
count += i ** 2
return count
# Threading approach - limited by GIL
def test_threading():
start = time.time()
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_intensive_task, args=(10_000_000,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"Threading: {time.time() - start:.2f}s")
# Multiprocessing approach - true parallelism
def test_multiprocessing():
start = time.time()
processes = []
for _ in range(4):
p = multiprocessing.Process(target=cpu_intensive_task, args=(10_000_000,))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Multiprocessing: {time.time() - start:.2f}s")
if __name__ == '__main__':
test_threading() # ~4.2s on quad-core
test_multiprocessing() # ~1.1s on quad-core
Process Pools for Simple Parallelization
Process pools abstract away process management. Use Pool.map() when you have a function and an iterable of arguments.
from multiprocessing import Pool
import time
def process_data(item):
"""Process a single item"""
result = sum(i ** 2 for i in range(item))
return result
def sequential_processing(data):
start = time.time()
results = [process_data(item) for item in data]
print(f"Sequential: {time.time() - start:.2f}s")
return results
def parallel_processing(data):
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(process_data, data)
print(f"Parallel: {time.time() - start:.2f}s")
return results
if __name__ == '__main__':
data = [1_000_000] * 8
seq_results = sequential_processing(data)
par_results = parallel_processing(data)
assert seq_results == par_results
For functions requiring multiple arguments, use starmap():
from multiprocessing import Pool
def calculate_range(start, end, step):
"""Calculate sum of range with custom step"""
return sum(range(start, end, step))
if __name__ == '__main__':
# List of argument tuples
tasks = [
(0, 1_000_000, 1),
(0, 1_000_000, 2),
(0, 1_000_000, 3),
(0, 1_000_000, 4),
]
with Pool(processes=4) as pool:
results = pool.starmap(calculate_range, tasks)
print(results)
Handling Return Values and Error Management
Use apply_async() for non-blocking execution with callbacks:
from multiprocessing import Pool
import time
def long_running_task(x):
time.sleep(2)
if x == 3:
raise ValueError(f"Invalid input: {x}")
return x * x
def success_callback(result):
print(f"Success: {result}")
def error_callback(error):
print(f"Error: {error}")
if __name__ == '__main__':
with Pool(processes=4) as pool:
results = []
for i in range(5):
result = pool.apply_async(
long_running_task,
args=(i,),
callback=success_callback,
error_callback=error_callback
)
results.append(result)
# Wait for all tasks to complete
pool.close()
pool.join()
# Get results (will raise exception if task failed)
for i, r in enumerate(results):
try:
value = r.get(timeout=1)
print(f"Task {i}: {value}")
except Exception as e:
print(f"Task {i} failed: {e}")
Inter-Process Communication with Queues
Queues enable safe data exchange between processes:
from multiprocessing import Process, Queue
import time
def producer(queue, items):
"""Produce items and put them in queue"""
for item in items:
print(f"Producing: {item}")
queue.put(item)
time.sleep(0.5)
queue.put(None) # Sentinel value to signal completion
def consumer(queue):
"""Consume items from queue"""
while True:
item = queue.get()
if item is None: # Check for sentinel
break
print(f"Consuming: {item}")
time.sleep(1)
if __name__ == '__main__':
q = Queue()
items = ['data_1', 'data_2', 'data_3', 'data_4']
prod = Process(target=producer, args=(q, items))
cons = Process(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()
Shared Memory for Performance
Shared memory objects avoid serialization overhead but require synchronization:
from multiprocessing import Process, Value, Array, Lock
import time
def increment_counter(counter, lock, iterations):
"""Safely increment shared counter"""
for _ in range(iterations):
with lock:
counter.value += 1
def update_array(shared_array, lock, index, value):
"""Update shared array element"""
with lock:
shared_array[index] = value
if __name__ == '__main__':
# Shared value with lock
counter = Value('i', 0) # 'i' = integer
lock = Lock()
processes = []
for _ in range(4):
p = Process(target=increment_counter, args=(counter, lock, 25000))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Final counter value: {counter.value}") # Should be 100000
# Shared array
shared_arr = Array('d', [0.0] * 10) # 'd' = double
arr_lock = Lock()
processes = []
for i in range(10):
p = Process(target=update_array, args=(shared_arr, arr_lock, i, i * 1.5))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Shared array: {list(shared_arr)}")
Manager for Complex Shared Objects
Use Manager for dictionaries, lists, and custom objects:
from multiprocessing import Process, Manager
import time
def update_shared_dict(shared_dict, key, value):
"""Update shared dictionary"""
shared_dict[key] = value
time.sleep(0.1)
def append_to_list(shared_list, value):
"""Append to shared list"""
shared_list.append(value)
if __name__ == '__main__':
with Manager() as manager:
# Shared dictionary
shared_dict = manager.dict()
processes = []
for i in range(5):
p = Process(target=update_shared_dict, args=(shared_dict, f'key_{i}', i * 10))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Shared dict: {dict(shared_dict)}")
# Shared list
shared_list = manager.list()
processes = []
for i in range(5):
p = Process(target=append_to_list, args=(shared_list, i))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Shared list: {list(shared_list)}")
Process Synchronization Primitives
Control process execution order with Events and Semaphores:
from multiprocessing import Process, Event, Semaphore
import time
def wait_for_event(event, process_id):
"""Wait for event before proceeding"""
print(f"Process {process_id} waiting...")
event.wait()
print(f"Process {process_id} proceeding!")
def limited_resource(semaphore, process_id):
"""Access limited resource using semaphore"""
print(f"Process {process_id} attempting to acquire...")
with semaphore:
print(f"Process {process_id} acquired resource")
time.sleep(2)
print(f"Process {process_id} released resource")
if __name__ == '__main__':
# Event example
event = Event()
processes = []
for i in range(3):
p = Process(target=wait_for_event, args=(event, i))
p.start()
processes.append(p)
time.sleep(2)
print("Setting event...")
event.set() # Release all waiting processes
for p in processes:
p.join()
# Semaphore example - limit to 2 concurrent processes
semaphore = Semaphore(2)
processes = []
for i in range(5):
p = Process(target=limited_resource, args=(semaphore, i))
p.start()
processes.append(p)
for p in processes:
p.join()
Best Practices
Choose the right process count based on workload. For CPU-bound tasks, use multiprocessing.cpu_count(). Avoid creating more processes than CPU cores for compute-intensive work.
Always use if __name__ == '__main__': guards to prevent recursive process spawning on Windows and macOS. Minimize data transfer between processes—serialization with pickle is expensive. For large datasets, consider memory-mapped files or shared memory.
Handle process cleanup properly. Use context managers (with Pool()) or explicitly call terminate() and join(). Orphaned processes consume system resources.