Python asyncio Synchronization Primitives
Many developers assume that single-threaded asyncio code doesn't need synchronization. This is wrong. While asyncio runs on a single thread, coroutines can interleave execution at any `await` point,...
Key Insights
- Asyncio synchronization primitives prevent race conditions when coroutines share state, but they’re fundamentally different from threading primitives—they yield control during waits rather than blocking threads
- Lock and Semaphore are your workhorses for most scenarios: use Lock for exclusive access to shared resources and Semaphore for rate limiting or connection pooling
- Queue is often the best choice for coroutine communication because it combines synchronization with data passing, eliminating the need for separate locks and conditions in producer-consumer patterns
Understanding Why Async Code Needs Synchronization
Many developers assume that single-threaded asyncio code doesn’t need synchronization. This is wrong. While asyncio runs on a single thread, coroutines can interleave execution at any await point, creating race conditions just like in multithreaded code.
Consider this broken counter:
import asyncio
counter = 0
async def increment():
global counter
for _ in range(1000):
temp = counter
await asyncio.sleep(0) # Yield control
counter = temp + 1
async def main():
await asyncio.gather(*[increment() for _ in range(10)])
print(f"Counter: {counter}") # Expected: 10000, Actual: ~1000
asyncio.run(main())
The await asyncio.sleep(0) forces a context switch. Each coroutine reads the counter, yields, then writes back a stale value. You get a classic race condition despite running on one thread.
Lock: Protecting Critical Sections
asyncio.Lock provides mutual exclusion. Only one coroutine can hold the lock at a time. Others wait asynchronously until it’s released.
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
for _ in range(1000):
async with lock:
temp = counter
await asyncio.sleep(0)
counter = temp + 1
async def main():
await asyncio.gather(*[increment() for _ in range(10)])
print(f"Counter: {counter}") # Now correctly prints 10000
asyncio.run(main())
Always use the async with context manager. It guarantees lock release even if exceptions occur.
Here’s a practical example—rate limiting requests to the same domain:
import asyncio
import aiohttp
from collections import defaultdict
class DomainRateLimiter:
def __init__(self):
self.locks = defaultdict(asyncio.Lock)
async def fetch(self, url):
domain = url.split('/')[2]
async with self.locks[domain]:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
await asyncio.sleep(1) # Enforce 1 req/sec per domain
return await response.text()
async def main():
limiter = DomainRateLimiter()
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://other.com/page1',
]
results = await asyncio.gather(*[limiter.fetch(url) for url in urls])
Requests to example.com serialize, but other.com runs concurrently.
Event: Simple Signaling
asyncio.Event is a boolean flag that coroutines can wait for. It’s perfect for one-time signals like “initialization complete” or “shutdown requested.”
import asyncio
async def producer(event, data):
print("Producer: Preparing data...")
await asyncio.sleep(2)
data.append("important_result")
print("Producer: Data ready!")
event.set()
async def consumer(event, data):
print("Consumer: Waiting for data...")
await event.wait() # Blocks until event.set() is called
print(f"Consumer: Got data: {data}")
async def main():
event = asyncio.Event()
data = []
await asyncio.gather(
producer(event, data),
consumer(event, data)
)
asyncio.run(main())
Events are ideal for graceful shutdown patterns:
import asyncio
shutdown_event = asyncio.Event()
async def background_task():
while not shutdown_event.is_set():
print("Working...")
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=1.0)
except asyncio.TimeoutError:
continue
print("Task shutting down gracefully")
async def main():
task = asyncio.create_task(background_task())
await asyncio.sleep(5)
print("Initiating shutdown...")
shutdown_event.set()
await task
asyncio.run(main())
Semaphore: Limiting Concurrent Access
asyncio.Semaphore allows N coroutines to access a resource simultaneously. It’s essential for rate limiting and connection pooling.
import asyncio
import aiohttp
async def fetch_with_limit(url, semaphore):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
urls = [f'https://httpbin.org/delay/1' for _ in range(20)]
results = await asyncio.gather(*[
fetch_with_limit(url, semaphore) for url in urls
])
print(f"Fetched {len(results)} pages")
asyncio.run(main())
This limits concurrent requests to 5, preventing server overload and local resource exhaustion.
BoundedSemaphore adds a safety check—it raises ValueError if you release more than you acquired. Use it when you want to catch bugs:
import asyncio
async def main():
sem = asyncio.BoundedSemaphore(2)
await sem.acquire()
sem.release()
sem.release()
sem.release() # ValueError: BoundedSemaphore released too many times
asyncio.run(main())
Here’s a database connection pool simulation:
import asyncio
class ConnectionPool:
def __init__(self, max_connections):
self.semaphore = asyncio.Semaphore(max_connections)
self.active = 0
async def execute(self, query):
async with self.semaphore:
self.active += 1
print(f"Executing '{query}' (active: {self.active})")
await asyncio.sleep(1) # Simulate query
self.active -= 1
async def main():
pool = ConnectionPool(max_connections=3)
queries = [f"SELECT * FROM table{i}" for i in range(10)]
await asyncio.gather(*[pool.execute(q) for q in queries])
asyncio.run(main())
Condition: Advanced Coordination
asyncio.Condition combines a lock with the ability to wait for notifications. It’s for complex scenarios where simple events aren’t enough.
import asyncio
from collections import deque
class BoundedBuffer:
def __init__(self, capacity):
self.buffer = deque()
self.capacity = capacity
self.condition = asyncio.Condition()
async def produce(self, item):
async with self.condition:
while len(self.buffer) >= self.capacity:
print(f"Buffer full, producer waiting...")
await self.condition.wait()
self.buffer.append(item)
print(f"Produced {item} (buffer: {len(self.buffer)})")
self.condition.notify() # Wake up one consumer
async def consume(self):
async with self.condition:
while not self.buffer:
print("Buffer empty, consumer waiting...")
await self.condition.wait()
item = self.buffer.popleft()
print(f"Consumed {item} (buffer: {len(self.buffer)})")
self.condition.notify() # Wake up one producer
return item
async def producer(buffer, items):
for item in items:
await buffer.produce(item)
await asyncio.sleep(0.5)
async def consumer(buffer, count):
for _ in range(count):
await buffer.consume()
await asyncio.sleep(1)
async def main():
buffer = BoundedBuffer(capacity=3)
await asyncio.gather(
producer(buffer, range(10)),
consumer(buffer, 10)
)
asyncio.run(main())
The condition ensures producers wait when the buffer is full and consumers wait when it’s empty.
Queue: The Practical Choice
asyncio.Queue is usually better than manual condition variables. It handles synchronization internally and provides clean semantics.
import asyncio
async def worker(name, queue):
while True:
item = await queue.get()
if item is None: # Poison pill for shutdown
queue.task_done()
break
print(f"{name} processing {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.Queue()
# Start workers
workers = [
asyncio.create_task(worker(f"Worker-{i}", queue))
for i in range(3)
]
# Add tasks
for i in range(10):
await queue.put(f"task-{i}")
await queue.join() # Wait for all tasks to complete
# Shutdown workers
for _ in workers:
await queue.put(None)
await asyncio.gather(*workers)
asyncio.run(main())
PriorityQueue handles tasks by priority:
import asyncio
async def main():
queue = asyncio.PriorityQueue()
await queue.put((3, "Low priority"))
await queue.put((1, "High priority"))
await queue.put((2, "Medium priority"))
while not queue.empty():
priority, task = await queue.get()
print(f"Processing: {task} (priority: {priority})")
asyncio.run(main())
Best Practices and Primitive Selection
Use Lock when: You need exclusive access to shared state. Keep critical sections small—don’t await long operations inside locks.
Use Semaphore when: You need to limit concurrent access to N resources. Perfect for rate limiting and connection pools.
Use Event when: You need simple one-time or repeated signaling. Great for initialization flags and shutdown coordination.
Use Condition when: You have complex state-dependent waiting conditions. Usually, Queue is simpler.
Use Queue when: Coroutines need to communicate via data passing. It’s the most robust choice for producer-consumer patterns.
Common pitfalls:
# BAD: Blocking operation in critical section
async with lock:
data = await slow_network_call() # Holds lock too long
# GOOD: Minimize critical section
data = await slow_network_call()
async with lock:
shared_state.update(data)
# BAD: Forgetting async/await
lock.acquire() # Returns coroutine, doesn't actually acquire!
# GOOD: Always await
await lock.acquire()
# Or use context manager
async with lock:
pass
Choose the simplest primitive that solves your problem. Queue is often the answer because it combines synchronization with communication. When you need fine-grained control, reach for Lock or Semaphore. Save Condition for truly complex coordination scenarios where nothing else fits.