Python asyncio Synchronization Primitives

Many developers assume that single-threaded asyncio code doesn't need synchronization. This is wrong. While asyncio runs on a single thread, coroutines can interleave execution at any `await` point,...

Key Insights

  • Asyncio synchronization primitives prevent race conditions when coroutines share state, but they’re fundamentally different from threading primitives—they yield control during waits rather than blocking threads
  • Lock and Semaphore are your workhorses for most scenarios: use Lock for exclusive access to shared resources and Semaphore for rate limiting or connection pooling
  • Queue is often the best choice for coroutine communication because it combines synchronization with data passing, eliminating the need for separate locks and conditions in producer-consumer patterns

Understanding Why Async Code Needs Synchronization

Many developers assume that single-threaded asyncio code doesn’t need synchronization. This is wrong. While asyncio runs on a single thread, coroutines can interleave execution at any await point, creating race conditions just like in multithreaded code.

Consider this broken counter:

import asyncio

counter = 0

async def increment():
    global counter
    for _ in range(1000):
        temp = counter
        await asyncio.sleep(0)  # Yield control
        counter = temp + 1

async def main():
    await asyncio.gather(*[increment() for _ in range(10)])
    print(f"Counter: {counter}")  # Expected: 10000, Actual: ~1000

asyncio.run(main())

The await asyncio.sleep(0) forces a context switch. Each coroutine reads the counter, yields, then writes back a stale value. You get a classic race condition despite running on one thread.

Lock: Protecting Critical Sections

asyncio.Lock provides mutual exclusion. Only one coroutine can hold the lock at a time. Others wait asynchronously until it’s released.

import asyncio

counter = 0
lock = asyncio.Lock()

async def increment():
    global counter
    for _ in range(1000):
        async with lock:
            temp = counter
            await asyncio.sleep(0)
            counter = temp + 1

async def main():
    await asyncio.gather(*[increment() for _ in range(10)])
    print(f"Counter: {counter}")  # Now correctly prints 10000

asyncio.run(main())

Always use the async with context manager. It guarantees lock release even if exceptions occur.

Here’s a practical example—rate limiting requests to the same domain:

import asyncio
import aiohttp
from collections import defaultdict

class DomainRateLimiter:
    def __init__(self):
        self.locks = defaultdict(asyncio.Lock)
    
    async def fetch(self, url):
        domain = url.split('/')[2]
        async with self.locks[domain]:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    await asyncio.sleep(1)  # Enforce 1 req/sec per domain
                    return await response.text()

async def main():
    limiter = DomainRateLimiter()
    urls = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://other.com/page1',
    ]
    results = await asyncio.gather(*[limiter.fetch(url) for url in urls])

Requests to example.com serialize, but other.com runs concurrently.

Event: Simple Signaling

asyncio.Event is a boolean flag that coroutines can wait for. It’s perfect for one-time signals like “initialization complete” or “shutdown requested.”

import asyncio

async def producer(event, data):
    print("Producer: Preparing data...")
    await asyncio.sleep(2)
    data.append("important_result")
    print("Producer: Data ready!")
    event.set()

async def consumer(event, data):
    print("Consumer: Waiting for data...")
    await event.wait()  # Blocks until event.set() is called
    print(f"Consumer: Got data: {data}")

async def main():
    event = asyncio.Event()
    data = []
    await asyncio.gather(
        producer(event, data),
        consumer(event, data)
    )

asyncio.run(main())

Events are ideal for graceful shutdown patterns:

import asyncio

shutdown_event = asyncio.Event()

async def background_task():
    while not shutdown_event.is_set():
        print("Working...")
        try:
            await asyncio.wait_for(shutdown_event.wait(), timeout=1.0)
        except asyncio.TimeoutError:
            continue
    print("Task shutting down gracefully")

async def main():
    task = asyncio.create_task(background_task())
    await asyncio.sleep(5)
    print("Initiating shutdown...")
    shutdown_event.set()
    await task

asyncio.run(main())

Semaphore: Limiting Concurrent Access

asyncio.Semaphore allows N coroutines to access a resource simultaneously. It’s essential for rate limiting and connection pooling.

import asyncio
import aiohttp

async def fetch_with_limit(url, semaphore):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()

async def main():
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests
    urls = [f'https://httpbin.org/delay/1' for _ in range(20)]
    
    results = await asyncio.gather(*[
        fetch_with_limit(url, semaphore) for url in urls
    ])
    print(f"Fetched {len(results)} pages")

asyncio.run(main())

This limits concurrent requests to 5, preventing server overload and local resource exhaustion.

BoundedSemaphore adds a safety check—it raises ValueError if you release more than you acquired. Use it when you want to catch bugs:

import asyncio

async def main():
    sem = asyncio.BoundedSemaphore(2)
    await sem.acquire()
    sem.release()
    sem.release()
    sem.release()  # ValueError: BoundedSemaphore released too many times

asyncio.run(main())

Here’s a database connection pool simulation:

import asyncio

class ConnectionPool:
    def __init__(self, max_connections):
        self.semaphore = asyncio.Semaphore(max_connections)
        self.active = 0
    
    async def execute(self, query):
        async with self.semaphore:
            self.active += 1
            print(f"Executing '{query}' (active: {self.active})")
            await asyncio.sleep(1)  # Simulate query
            self.active -= 1

async def main():
    pool = ConnectionPool(max_connections=3)
    queries = [f"SELECT * FROM table{i}" for i in range(10)]
    await asyncio.gather(*[pool.execute(q) for q in queries])

asyncio.run(main())

Condition: Advanced Coordination

asyncio.Condition combines a lock with the ability to wait for notifications. It’s for complex scenarios where simple events aren’t enough.

import asyncio
from collections import deque

class BoundedBuffer:
    def __init__(self, capacity):
        self.buffer = deque()
        self.capacity = capacity
        self.condition = asyncio.Condition()
    
    async def produce(self, item):
        async with self.condition:
            while len(self.buffer) >= self.capacity:
                print(f"Buffer full, producer waiting...")
                await self.condition.wait()
            
            self.buffer.append(item)
            print(f"Produced {item} (buffer: {len(self.buffer)})")
            self.condition.notify()  # Wake up one consumer
    
    async def consume(self):
        async with self.condition:
            while not self.buffer:
                print("Buffer empty, consumer waiting...")
                await self.condition.wait()
            
            item = self.buffer.popleft()
            print(f"Consumed {item} (buffer: {len(self.buffer)})")
            self.condition.notify()  # Wake up one producer
            return item

async def producer(buffer, items):
    for item in items:
        await buffer.produce(item)
        await asyncio.sleep(0.5)

async def consumer(buffer, count):
    for _ in range(count):
        await buffer.consume()
        await asyncio.sleep(1)

async def main():
    buffer = BoundedBuffer(capacity=3)
    await asyncio.gather(
        producer(buffer, range(10)),
        consumer(buffer, 10)
    )

asyncio.run(main())

The condition ensures producers wait when the buffer is full and consumers wait when it’s empty.

Queue: The Practical Choice

asyncio.Queue is usually better than manual condition variables. It handles synchronization internally and provides clean semantics.

import asyncio

async def worker(name, queue):
    while True:
        item = await queue.get()
        if item is None:  # Poison pill for shutdown
            queue.task_done()
            break
        
        print(f"{name} processing {item}")
        await asyncio.sleep(1)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    
    # Start workers
    workers = [
        asyncio.create_task(worker(f"Worker-{i}", queue))
        for i in range(3)
    ]
    
    # Add tasks
    for i in range(10):
        await queue.put(f"task-{i}")
    
    await queue.join()  # Wait for all tasks to complete
    
    # Shutdown workers
    for _ in workers:
        await queue.put(None)
    
    await asyncio.gather(*workers)

asyncio.run(main())

PriorityQueue handles tasks by priority:

import asyncio

async def main():
    queue = asyncio.PriorityQueue()
    
    await queue.put((3, "Low priority"))
    await queue.put((1, "High priority"))
    await queue.put((2, "Medium priority"))
    
    while not queue.empty():
        priority, task = await queue.get()
        print(f"Processing: {task} (priority: {priority})")

asyncio.run(main())

Best Practices and Primitive Selection

Use Lock when: You need exclusive access to shared state. Keep critical sections small—don’t await long operations inside locks.

Use Semaphore when: You need to limit concurrent access to N resources. Perfect for rate limiting and connection pools.

Use Event when: You need simple one-time or repeated signaling. Great for initialization flags and shutdown coordination.

Use Condition when: You have complex state-dependent waiting conditions. Usually, Queue is simpler.

Use Queue when: Coroutines need to communicate via data passing. It’s the most robust choice for producer-consumer patterns.

Common pitfalls:

# BAD: Blocking operation in critical section
async with lock:
    data = await slow_network_call()  # Holds lock too long

# GOOD: Minimize critical section
data = await slow_network_call()
async with lock:
    shared_state.update(data)

# BAD: Forgetting async/await
lock.acquire()  # Returns coroutine, doesn't actually acquire!

# GOOD: Always await
await lock.acquire()
# Or use context manager
async with lock:
    pass

Choose the simplest primitive that solves your problem. Queue is often the answer because it combines synchronization with communication. When you need fine-grained control, reach for Lock or Semaphore. Save Condition for truly complex coordination scenarios where nothing else fits.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.