Python - Multithreading Tutorial

• Python's Global Interpreter Lock (GIL) prevents true parallel execution of threads, making multithreading effective only for I/O-bound tasks, not CPU-bound operations

Key Insights

• Python’s Global Interpreter Lock (GIL) prevents true parallel execution of threads, making multithreading effective only for I/O-bound tasks, not CPU-bound operations • The threading module provides high-level interfaces for concurrent execution, while concurrent.futures.ThreadPoolExecutor offers a modern, cleaner API for managing thread pools • Proper thread synchronization using locks, events, and queues prevents race conditions and ensures thread-safe data access in shared-memory scenarios

Understanding Python Threading Fundamentals

Python’s threading model differs significantly from other languages due to the Global Interpreter Lock. The GIL ensures only one thread executes Python bytecode at a time, even on multi-core systems. This makes threading ideal for I/O-bound operations like network requests, file operations, or database queries where threads spend time waiting rather than executing.

import threading
import time

def worker(name, duration):
    print(f"Thread {name} starting")
    time.sleep(duration)  # Simulates I/O operation
    print(f"Thread {name} completed after {duration} seconds")

# Create and start threads
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 1))

thread1.start()
thread2.start()

# Wait for threads to complete
thread1.join()
thread2.join()

print("All threads finished")

This basic example demonstrates thread creation, starting, and joining. The join() method blocks the main thread until worker threads complete execution.

Thread Pool Execution with Concurrent Futures

The concurrent.futures module provides a high-level interface for asynchronously executing callables. ThreadPoolExecutor manages a pool of threads, automatically handling thread lifecycle and resource management.

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def fetch_url(url):
    """Fetch URL and return status code and response time"""
    start = time.time()
    try:
        response = requests.get(url, timeout=5)
        duration = time.time() - start
        return {
            'url': url,
            'status': response.status_code,
            'time': duration
        }
    except Exception as e:
        return {'url': url, 'error': str(e)}

urls = [
    'https://api.github.com',
    'https://httpbin.org/delay/1',
    'https://jsonplaceholder.typicode.com/posts',
    'https://www.python.org'
]

# Using ThreadPoolExecutor with context manager
with ThreadPoolExecutor(max_workers=4) as executor:
    # Submit all tasks
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
    
    # Process results as they complete
    for future in as_completed(future_to_url):
        result = future.result()
        if 'error' in result:
            print(f"Error fetching {result['url']}: {result['error']}")
        else:
            print(f"{result['url']}: {result['status']} ({result['time']:.2f}s)")

The context manager automatically handles cleanup. as_completed() yields futures as they finish, enabling processing of results immediately rather than waiting for all tasks.

Thread Synchronization with Locks

When multiple threads access shared data, race conditions occur without proper synchronization. Locks ensure exclusive access to critical sections.

import threading

class BankAccount:
    def __init__(self, initial_balance):
        self.balance = initial_balance
        self.lock = threading.Lock()
    
    def deposit(self, amount):
        with self.lock:  # Acquire lock automatically
            current = self.balance
            # Simulate processing time
            import time
            time.sleep(0.001)
            self.balance = current + amount
    
    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                current = self.balance
                import time
                time.sleep(0.001)
                self.balance = current - amount
                return True
            return False

account = BankAccount(1000)

def perform_transactions(account, num_transactions):
    for _ in range(num_transactions):
        account.deposit(10)
        account.withdraw(5)

threads = []
for i in range(10):
    t = threading.Thread(target=perform_transactions, args=(account, 100))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final balance: {account.balance}")  # Should be 1500

Without the lock, concurrent modifications would cause incorrect balance calculations due to race conditions.

Producer-Consumer Pattern with Queue

The queue.Queue class provides thread-safe FIFO implementation, perfect for producer-consumer patterns where threads communicate through shared data.

import threading
import queue
import time
import random

def producer(q, producer_id):
    """Produce items and add to queue"""
    for i in range(5):
        item = f"Item-{producer_id}-{i}"
        time.sleep(random.uniform(0.1, 0.5))
        q.put(item)
        print(f"Producer {producer_id} produced {item}")
    
def consumer(q, consumer_id):
    """Consume items from queue"""
    while True:
        try:
            # Wait up to 2 seconds for an item
            item = q.get(timeout=2)
            print(f"Consumer {consumer_id} processing {item}")
            time.sleep(random.uniform(0.2, 0.6))
            q.task_done()
        except queue.Empty:
            print(f"Consumer {consumer_id} timed out, exiting")
            break

# Create queue
work_queue = queue.Queue()

# Start producers
producers = []
for i in range(2):
    t = threading.Thread(target=producer, args=(work_queue, i))
    t.start()
    producers.append(t)

# Start consumers
consumers = []
for i in range(3):
    t = threading.Thread(target=consumer, args=(work_queue, i))
    t.start()
    consumers.append(t)

# Wait for producers to finish
for t in producers:
    t.join()

# Wait for queue to be processed
work_queue.join()

print("All work completed")

The Queue handles all locking internally, making it safer than manual lock management for inter-thread communication.

Thread Events for Coordination

Events provide a simple mechanism for thread coordination, allowing threads to wait for signals from other threads.

import threading
import time

def wait_for_event(event, name):
    """Wait for event to be set"""
    print(f"{name} waiting for event")
    event.wait()  # Block until event is set
    print(f"{name} received event, proceeding")

def wait_for_event_timeout(event, name, timeout):
    """Wait for event with timeout"""
    print(f"{name} waiting for event (timeout={timeout}s)")
    received = event.wait(timeout=timeout)
    if received:
        print(f"{name} received event")
    else:
        print(f"{name} timed out")

# Create event
event = threading.Event()

# Start waiting threads
threads = [
    threading.Thread(target=wait_for_event, args=(event, "Thread-1")),
    threading.Thread(target=wait_for_event, args=(event, "Thread-2")),
    threading.Thread(target=wait_for_event_timeout, args=(event, "Thread-3", 2))
]

for t in threads:
    t.start()

# Simulate some work before signaling
time.sleep(1)
print("Main thread setting event")
event.set()  # Signal all waiting threads

for t in threads:
    t.join()

Events are particularly useful for coordinating startup sequences or signaling shutdown conditions across multiple threads.

Thread-Local Data

Thread-local storage allows each thread to maintain its own data without explicit locking, useful for maintaining per-thread state like database connections.

import threading
import random

# Create thread-local storage
thread_local = threading.local()

def process_data(item):
    """Process item using thread-local connection"""
    # Initialize thread-local data if not exists
    if not hasattr(thread_local, 'connection_id'):
        thread_local.connection_id = random.randint(1000, 9999)
        print(f"Thread {threading.current_thread().name} initialized connection {thread_local.connection_id}")
    
    print(f"Thread {threading.current_thread().name} processing {item} with connection {thread_local.connection_id}")

items = ['A', 'B', 'C', 'D', 'E', 'F']

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(process_data, items)

Each thread gets its own connection_id without risk of cross-thread contamination, eliminating the need for locks when accessing thread-specific resources.

Performance Considerations

Always profile before implementing multithreading. For CPU-bound tasks, use multiprocessing instead to bypass the GIL. Threading shines when waiting dominates execution time—network calls, disk I/O, or database queries. A simple benchmark demonstrates the difference:

import time
from concurrent.futures import ThreadPoolExecutor

def io_bound_task(n):
    time.sleep(0.1)  # Simulates I/O wait
    return n * 2

def cpu_bound_task(n):
    return sum(i * i for i in range(10**6))

# I/O-bound benefits from threading
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    list(executor.map(io_bound_task, range(10)))
print(f"Threaded I/O: {time.time() - start:.2f}s")

# Sequential for comparison
start = time.time()
for i in range(10):
    io_bound_task(i)
print(f"Sequential I/O: {time.time() - start:.2f}s")

Threading reduces I/O-bound execution time from 1 second to approximately 0.1 seconds, while CPU-bound tasks see minimal improvement due to the GIL.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.