Python asyncio Queues: Producer-Consumer Pattern
The producer-consumer pattern solves a fundamental problem in concurrent programming: decoupling data generation from data processing. Producers create work items and place them in a queue, while...
Key Insights
- asyncio queues enable efficient coordination between producers and consumers in async Python applications, with built-in backpressure handling through bounded queue sizes
- The
task_done()andjoin()pattern provides elegant synchronization for tracking work completion without manual counter management - For I/O-bound concurrent tasks like API calls or web scraping, asyncio queues outperform threading queues by avoiding GIL contention and reducing memory overhead
Understanding asyncio Queues
The producer-consumer pattern solves a fundamental problem in concurrent programming: decoupling data generation from data processing. Producers create work items and place them in a queue, while consumers pull items and process them independently. This separation allows you to scale producers and consumers independently based on your bottlenecks.
Python’s asyncio.Queue implements this pattern for asynchronous code. Unlike queue.Queue from the threading module, asyncio.Queue uses coroutines and the event loop rather than threads. This makes it ideal for I/O-bound workloads where you’re waiting on network requests, database queries, or file operations.
Common use cases include:
- Rate-limiting API clients that need to throttle requests
- Web scraping pipelines that fetch and parse pages concurrently
- Message processing systems that consume from external queues
- Data transformation pipelines with multiple processing stages
Queue Basics and Operations
The asyncio.Queue API is straightforward but powerful. Here are the essential operations:
import asyncio
async def queue_basics():
# Create a bounded queue with max size of 10
queue = asyncio.Queue(maxsize=10)
# Put items (blocks if queue is full)
await queue.put("item1")
await queue.put("item2")
# Get items (blocks if queue is empty)
item = await queue.get()
print(f"Got: {item}")
# Mark task as done (required for join())
queue.task_done()
# Check queue state
print(f"Queue size: {queue.qsize()}")
print(f"Is empty: {queue.empty()}")
print(f"Is full: {queue.full()}")
asyncio.run(queue_basics())
The task_done() and join() pattern is crucial for synchronization. Each time you get() an item, you should call task_done() after processing it. The join() method blocks until all items have been processed, making it perfect for waiting until all work completes.
asyncio also provides LifoQueue (last-in-first-out) and PriorityQueue for specialized use cases. Priority queues are particularly useful when certain tasks need preferential processing.
Building Your First Producer-Consumer System
Let’s build a simple system where a producer generates numbers and a consumer processes them:
import asyncio
import random
async def producer(queue, producer_id):
"""Generate random numbers and put them in the queue."""
for i in range(5):
item = random.randint(1, 100)
await queue.put(item)
print(f"Producer {producer_id} added: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue, consumer_id):
"""Process items from the queue."""
while True:
item = await queue.get()
# Process the item
await asyncio.sleep(random.uniform(0.2, 0.8))
print(f"Consumer {consumer_id} processed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Create producer and consumer tasks
producers = [asyncio.create_task(producer(queue, i)) for i in range(1)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(1)]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for all items to be processed
await queue.join()
# Cancel consumers (they run forever)
for c in consumers:
c.cancel()
asyncio.run(main())
This example demonstrates the core pattern. Producers finish after generating items, but consumers run in infinite loops. We use queue.join() to wait until all items are processed, then cancel the consumer tasks.
Scaling with Multiple Workers
Real applications need multiple producers and consumers. Here’s a web scraping example with 3 producers and 5 consumers:
import asyncio
import aiohttp
from typing import List
async def url_producer(queue: asyncio.Queue, urls: List[str], producer_id: int):
"""Add URLs to the processing queue."""
for url in urls:
await queue.put(url)
print(f"Producer {producer_id} queued: {url}")
print(f"Producer {producer_id} finished")
async def page_consumer(queue: asyncio.Queue, consumer_id: int):
"""Fetch and process pages from the queue."""
async with aiohttp.ClientSession() as session:
while True:
url = await queue.get()
try:
async with session.get(url, timeout=10) as response:
content = await response.text()
print(f"Consumer {consumer_id} fetched {url}: {len(content)} bytes")
except Exception as e:
print(f"Consumer {consumer_id} error fetching {url}: {e}")
finally:
queue.task_done()
async def scraping_pipeline():
queue = asyncio.Queue(maxsize=20)
# Distribute URLs across 3 producers
all_urls = [f"https://example.com/page{i}" for i in range(30)]
urls_per_producer = len(all_urls) // 3
producers = [
asyncio.create_task(
url_producer(queue, all_urls[i:i+urls_per_producer], i)
)
for i in range(0, len(all_urls), urls_per_producer)
]
# Create 5 consumers
consumers = [
asyncio.create_task(page_consumer(queue, i))
for i in range(5)
]
# Wait for producers and processing
await asyncio.gather(*producers)
await queue.join()
# Cleanup
for c in consumers:
c.cancel()
# asyncio.run(scraping_pipeline())
The bounded queue (maxsize=20) provides backpressure. If consumers can’t keep up, producers will block when the queue fills, preventing memory exhaustion.
Advanced Patterns and Error Handling
Production systems need robust error handling and prioritization. Here’s a priority queue example with timeout handling:
import asyncio
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedTask:
priority: int
data: Any = field(compare=False)
async def priority_producer(queue: asyncio.Queue):
"""Add tasks with different priorities."""
tasks = [
PrioritizedTask(1, "critical-task"),
PrioritizedTask(5, "low-priority-task"),
PrioritizedTask(2, "high-priority-task"),
PrioritizedTask(5, "another-low-task"),
]
for task in tasks:
await queue.put(task)
print(f"Queued: {task.data} (priority {task.priority})")
async def priority_consumer(queue: asyncio.Queue, consumer_id: int):
"""Process tasks with timeout handling."""
while True:
try:
# Wait up to 5 seconds for an item
task = await asyncio.wait_for(queue.get(), timeout=5.0)
try:
# Simulate processing
await asyncio.sleep(1)
print(f"Consumer {consumer_id} processed: {task.data}")
except Exception as e:
print(f"Consumer {consumer_id} error: {e}")
# Optionally re-queue failed tasks
await queue.put(task)
finally:
queue.task_done()
except asyncio.TimeoutError:
print(f"Consumer {consumer_id} timed out waiting for tasks")
break
async def priority_pipeline():
queue = asyncio.PriorityQueue()
producer = asyncio.create_task(priority_producer(queue))
consumers = [
asyncio.create_task(priority_consumer(queue, i))
for i in range(2)
]
await producer
await queue.join()
# Consumers will exit after timeout
await asyncio.gather(*consumers)
asyncio.run(priority_pipeline())
This pattern uses timeouts to gracefully shut down consumers instead of cancellation. Tasks are processed in priority order (lower numbers first).
Real-World Example: Rate-Limited API Client
Here’s a complete implementation combining queues with rate limiting:
import asyncio
import time
from typing import List, Dict, Any
class RateLimitedAPIClient:
def __init__(self, rate_limit: int, time_window: float):
self.rate_limit = rate_limit
self.time_window = time_window
self.semaphore = asyncio.Semaphore(rate_limit)
self.request_times: List[float] = []
async def _enforce_rate_limit(self):
"""Ensure we don't exceed rate limit."""
now = time.time()
# Remove old timestamps
self.request_times = [
t for t in self.request_times
if now - t < self.time_window
]
# Wait if at limit
if len(self.request_times) >= self.rate_limit:
sleep_time = self.time_window - (now - self.request_times[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.request_times.append(time.time())
async def make_request(self, endpoint: str) -> Dict[str, Any]:
"""Make a rate-limited API request."""
async with self.semaphore:
await self._enforce_rate_limit()
# Simulate API call
await asyncio.sleep(0.1)
return {"endpoint": endpoint, "status": "success"}
async def api_producer(queue: asyncio.Queue, endpoints: List[str]):
"""Queue API requests."""
for endpoint in endpoints:
await queue.put(endpoint)
async def api_consumer(queue: asyncio.Queue, client: RateLimitedAPIClient, consumer_id: int):
"""Process API requests with rate limiting."""
while True:
endpoint = await queue.get()
try:
result = await client.make_request(endpoint)
print(f"Consumer {consumer_id}: {result}")
except Exception as e:
print(f"Consumer {consumer_id} error: {e}")
finally:
queue.task_done()
async def rate_limited_pipeline():
# 10 requests per 1 second window
client = RateLimitedAPIClient(rate_limit=10, time_window=1.0)
queue = asyncio.Queue(maxsize=50)
endpoints = [f"/api/endpoint/{i}" for i in range(100)]
producer = asyncio.create_task(api_producer(queue, endpoints))
consumers = [
asyncio.create_task(api_consumer(queue, client, i))
for i in range(5)
]
await producer
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(rate_limited_pipeline())
This implementation combines semaphores with time-based rate limiting to ensure compliance with API limits while maximizing throughput.
Best Practices and Pitfalls
Choose asyncio queues when: You have I/O-bound tasks (API calls, database queries, file operations). The GIL doesn’t affect async I/O, and you’ll use less memory than threading.
Avoid asyncio queues when: You have CPU-bound tasks. Use multiprocessing instead to leverage multiple cores.
Memory management: Bounded queues prevent memory exhaustion. Monitor queue depth in production—consistently full queues indicate consumers can’t keep up.
Testing: Use asyncio.Queue with deterministic task ordering for reproducible tests. Consider dependency injection to mock queue behavior.
Graceful shutdown: Always use queue.join() and properly cancel consumer tasks. Sentinel values (like None) work but join() is cleaner.
The producer-consumer pattern with asyncio queues is a powerful tool for building scalable concurrent Python applications. Master these patterns and you’ll handle complex async workflows with confidence.