Python - Multithreading Tutorial
• Python's Global Interpreter Lock (GIL) prevents true parallel execution of threads, making multithreading effective only for I/O-bound tasks, not CPU-bound operations
Key Insights
• Python’s Global Interpreter Lock (GIL) prevents true parallel execution of threads, making multithreading effective only for I/O-bound tasks, not CPU-bound operations
• The threading module provides high-level interfaces for concurrent execution, while concurrent.futures.ThreadPoolExecutor offers a modern, cleaner API for managing thread pools
• Proper thread synchronization using locks, events, and queues prevents race conditions and ensures thread-safe data access in shared-memory scenarios
Understanding Python Threading Fundamentals
Python’s threading model differs significantly from other languages due to the Global Interpreter Lock. The GIL ensures only one thread executes Python bytecode at a time, even on multi-core systems. This makes threading ideal for I/O-bound operations like network requests, file operations, or database queries where threads spend time waiting rather than executing.
import threading
import time
def worker(name, duration):
print(f"Thread {name} starting")
time.sleep(duration) # Simulates I/O operation
print(f"Thread {name} completed after {duration} seconds")
# Create and start threads
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 1))
thread1.start()
thread2.start()
# Wait for threads to complete
thread1.join()
thread2.join()
print("All threads finished")
This basic example demonstrates thread creation, starting, and joining. The join() method blocks the main thread until worker threads complete execution.
Thread Pool Execution with Concurrent Futures
The concurrent.futures module provides a high-level interface for asynchronously executing callables. ThreadPoolExecutor manages a pool of threads, automatically handling thread lifecycle and resource management.
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def fetch_url(url):
"""Fetch URL and return status code and response time"""
start = time.time()
try:
response = requests.get(url, timeout=5)
duration = time.time() - start
return {
'url': url,
'status': response.status_code,
'time': duration
}
except Exception as e:
return {'url': url, 'error': str(e)}
urls = [
'https://api.github.com',
'https://httpbin.org/delay/1',
'https://jsonplaceholder.typicode.com/posts',
'https://www.python.org'
]
# Using ThreadPoolExecutor with context manager
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit all tasks
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# Process results as they complete
for future in as_completed(future_to_url):
result = future.result()
if 'error' in result:
print(f"Error fetching {result['url']}: {result['error']}")
else:
print(f"{result['url']}: {result['status']} ({result['time']:.2f}s)")
The context manager automatically handles cleanup. as_completed() yields futures as they finish, enabling processing of results immediately rather than waiting for all tasks.
Thread Synchronization with Locks
When multiple threads access shared data, race conditions occur without proper synchronization. Locks ensure exclusive access to critical sections.
import threading
class BankAccount:
def __init__(self, initial_balance):
self.balance = initial_balance
self.lock = threading.Lock()
def deposit(self, amount):
with self.lock: # Acquire lock automatically
current = self.balance
# Simulate processing time
import time
time.sleep(0.001)
self.balance = current + amount
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
current = self.balance
import time
time.sleep(0.001)
self.balance = current - amount
return True
return False
account = BankAccount(1000)
def perform_transactions(account, num_transactions):
for _ in range(num_transactions):
account.deposit(10)
account.withdraw(5)
threads = []
for i in range(10):
t = threading.Thread(target=perform_transactions, args=(account, 100))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final balance: {account.balance}") # Should be 1500
Without the lock, concurrent modifications would cause incorrect balance calculations due to race conditions.
Producer-Consumer Pattern with Queue
The queue.Queue class provides thread-safe FIFO implementation, perfect for producer-consumer patterns where threads communicate through shared data.
import threading
import queue
import time
import random
def producer(q, producer_id):
"""Produce items and add to queue"""
for i in range(5):
item = f"Item-{producer_id}-{i}"
time.sleep(random.uniform(0.1, 0.5))
q.put(item)
print(f"Producer {producer_id} produced {item}")
def consumer(q, consumer_id):
"""Consume items from queue"""
while True:
try:
# Wait up to 2 seconds for an item
item = q.get(timeout=2)
print(f"Consumer {consumer_id} processing {item}")
time.sleep(random.uniform(0.2, 0.6))
q.task_done()
except queue.Empty:
print(f"Consumer {consumer_id} timed out, exiting")
break
# Create queue
work_queue = queue.Queue()
# Start producers
producers = []
for i in range(2):
t = threading.Thread(target=producer, args=(work_queue, i))
t.start()
producers.append(t)
# Start consumers
consumers = []
for i in range(3):
t = threading.Thread(target=consumer, args=(work_queue, i))
t.start()
consumers.append(t)
# Wait for producers to finish
for t in producers:
t.join()
# Wait for queue to be processed
work_queue.join()
print("All work completed")
The Queue handles all locking internally, making it safer than manual lock management for inter-thread communication.
Thread Events for Coordination
Events provide a simple mechanism for thread coordination, allowing threads to wait for signals from other threads.
import threading
import time
def wait_for_event(event, name):
"""Wait for event to be set"""
print(f"{name} waiting for event")
event.wait() # Block until event is set
print(f"{name} received event, proceeding")
def wait_for_event_timeout(event, name, timeout):
"""Wait for event with timeout"""
print(f"{name} waiting for event (timeout={timeout}s)")
received = event.wait(timeout=timeout)
if received:
print(f"{name} received event")
else:
print(f"{name} timed out")
# Create event
event = threading.Event()
# Start waiting threads
threads = [
threading.Thread(target=wait_for_event, args=(event, "Thread-1")),
threading.Thread(target=wait_for_event, args=(event, "Thread-2")),
threading.Thread(target=wait_for_event_timeout, args=(event, "Thread-3", 2))
]
for t in threads:
t.start()
# Simulate some work before signaling
time.sleep(1)
print("Main thread setting event")
event.set() # Signal all waiting threads
for t in threads:
t.join()
Events are particularly useful for coordinating startup sequences or signaling shutdown conditions across multiple threads.
Thread-Local Data
Thread-local storage allows each thread to maintain its own data without explicit locking, useful for maintaining per-thread state like database connections.
import threading
import random
# Create thread-local storage
thread_local = threading.local()
def process_data(item):
"""Process item using thread-local connection"""
# Initialize thread-local data if not exists
if not hasattr(thread_local, 'connection_id'):
thread_local.connection_id = random.randint(1000, 9999)
print(f"Thread {threading.current_thread().name} initialized connection {thread_local.connection_id}")
print(f"Thread {threading.current_thread().name} processing {item} with connection {thread_local.connection_id}")
items = ['A', 'B', 'C', 'D', 'E', 'F']
with ThreadPoolExecutor(max_workers=3) as executor:
executor.map(process_data, items)
Each thread gets its own connection_id without risk of cross-thread contamination, eliminating the need for locks when accessing thread-specific resources.
Performance Considerations
Always profile before implementing multithreading. For CPU-bound tasks, use multiprocessing instead to bypass the GIL. Threading shines when waiting dominates execution time—network calls, disk I/O, or database queries. A simple benchmark demonstrates the difference:
import time
from concurrent.futures import ThreadPoolExecutor
def io_bound_task(n):
time.sleep(0.1) # Simulates I/O wait
return n * 2
def cpu_bound_task(n):
return sum(i * i for i in range(10**6))
# I/O-bound benefits from threading
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(io_bound_task, range(10)))
print(f"Threaded I/O: {time.time() - start:.2f}s")
# Sequential for comparison
start = time.time()
for i in range(10):
io_bound_task(i)
print(f"Sequential I/O: {time.time() - start:.2f}s")
Threading reduces I/O-bound execution time from 1 second to approximately 0.1 seconds, while CPU-bound tasks see minimal improvement due to the GIL.