Python asyncio Queues: Producer-Consumer Pattern

The producer-consumer pattern solves a fundamental problem in concurrent programming: decoupling data generation from data processing. Producers create work items and place them in a queue, while...

Key Insights

  • asyncio queues enable efficient coordination between producers and consumers in async Python applications, with built-in backpressure handling through bounded queue sizes
  • The task_done() and join() pattern provides elegant synchronization for tracking work completion without manual counter management
  • For I/O-bound concurrent tasks like API calls or web scraping, asyncio queues outperform threading queues by avoiding GIL contention and reducing memory overhead

Understanding asyncio Queues

The producer-consumer pattern solves a fundamental problem in concurrent programming: decoupling data generation from data processing. Producers create work items and place them in a queue, while consumers pull items and process them independently. This separation allows you to scale producers and consumers independently based on your bottlenecks.

Python’s asyncio.Queue implements this pattern for asynchronous code. Unlike queue.Queue from the threading module, asyncio.Queue uses coroutines and the event loop rather than threads. This makes it ideal for I/O-bound workloads where you’re waiting on network requests, database queries, or file operations.

Common use cases include:

  • Rate-limiting API clients that need to throttle requests
  • Web scraping pipelines that fetch and parse pages concurrently
  • Message processing systems that consume from external queues
  • Data transformation pipelines with multiple processing stages

Queue Basics and Operations

The asyncio.Queue API is straightforward but powerful. Here are the essential operations:

import asyncio

async def queue_basics():
    # Create a bounded queue with max size of 10
    queue = asyncio.Queue(maxsize=10)
    
    # Put items (blocks if queue is full)
    await queue.put("item1")
    await queue.put("item2")
    
    # Get items (blocks if queue is empty)
    item = await queue.get()
    print(f"Got: {item}")
    
    # Mark task as done (required for join())
    queue.task_done()
    
    # Check queue state
    print(f"Queue size: {queue.qsize()}")
    print(f"Is empty: {queue.empty()}")
    print(f"Is full: {queue.full()}")

asyncio.run(queue_basics())

The task_done() and join() pattern is crucial for synchronization. Each time you get() an item, you should call task_done() after processing it. The join() method blocks until all items have been processed, making it perfect for waiting until all work completes.

asyncio also provides LifoQueue (last-in-first-out) and PriorityQueue for specialized use cases. Priority queues are particularly useful when certain tasks need preferential processing.

Building Your First Producer-Consumer System

Let’s build a simple system where a producer generates numbers and a consumer processes them:

import asyncio
import random

async def producer(queue, producer_id):
    """Generate random numbers and put them in the queue."""
    for i in range(5):
        item = random.randint(1, 100)
        await queue.put(item)
        print(f"Producer {producer_id} added: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    
async def consumer(queue, consumer_id):
    """Process items from the queue."""
    while True:
        item = await queue.get()
        
        # Process the item
        await asyncio.sleep(random.uniform(0.2, 0.8))
        print(f"Consumer {consumer_id} processed: {item}")
        
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    
    # Create producer and consumer tasks
    producers = [asyncio.create_task(producer(queue, i)) for i in range(1)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(1)]
    
    # Wait for all producers to finish
    await asyncio.gather(*producers)
    
    # Wait for all items to be processed
    await queue.join()
    
    # Cancel consumers (they run forever)
    for c in consumers:
        c.cancel()

asyncio.run(main())

This example demonstrates the core pattern. Producers finish after generating items, but consumers run in infinite loops. We use queue.join() to wait until all items are processed, then cancel the consumer tasks.

Scaling with Multiple Workers

Real applications need multiple producers and consumers. Here’s a web scraping example with 3 producers and 5 consumers:

import asyncio
import aiohttp
from typing import List

async def url_producer(queue: asyncio.Queue, urls: List[str], producer_id: int):
    """Add URLs to the processing queue."""
    for url in urls:
        await queue.put(url)
        print(f"Producer {producer_id} queued: {url}")
    print(f"Producer {producer_id} finished")

async def page_consumer(queue: asyncio.Queue, consumer_id: int):
    """Fetch and process pages from the queue."""
    async with aiohttp.ClientSession() as session:
        while True:
            url = await queue.get()
            
            try:
                async with session.get(url, timeout=10) as response:
                    content = await response.text()
                    print(f"Consumer {consumer_id} fetched {url}: {len(content)} bytes")
            except Exception as e:
                print(f"Consumer {consumer_id} error fetching {url}: {e}")
            finally:
                queue.task_done()

async def scraping_pipeline():
    queue = asyncio.Queue(maxsize=20)
    
    # Distribute URLs across 3 producers
    all_urls = [f"https://example.com/page{i}" for i in range(30)]
    urls_per_producer = len(all_urls) // 3
    
    producers = [
        asyncio.create_task(
            url_producer(queue, all_urls[i:i+urls_per_producer], i)
        )
        for i in range(0, len(all_urls), urls_per_producer)
    ]
    
    # Create 5 consumers
    consumers = [
        asyncio.create_task(page_consumer(queue, i)) 
        for i in range(5)
    ]
    
    # Wait for producers and processing
    await asyncio.gather(*producers)
    await queue.join()
    
    # Cleanup
    for c in consumers:
        c.cancel()

# asyncio.run(scraping_pipeline())

The bounded queue (maxsize=20) provides backpressure. If consumers can’t keep up, producers will block when the queue fills, preventing memory exhaustion.

Advanced Patterns and Error Handling

Production systems need robust error handling and prioritization. Here’s a priority queue example with timeout handling:

import asyncio
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedTask:
    priority: int
    data: Any = field(compare=False)

async def priority_producer(queue: asyncio.Queue):
    """Add tasks with different priorities."""
    tasks = [
        PrioritizedTask(1, "critical-task"),
        PrioritizedTask(5, "low-priority-task"),
        PrioritizedTask(2, "high-priority-task"),
        PrioritizedTask(5, "another-low-task"),
    ]
    
    for task in tasks:
        await queue.put(task)
        print(f"Queued: {task.data} (priority {task.priority})")

async def priority_consumer(queue: asyncio.Queue, consumer_id: int):
    """Process tasks with timeout handling."""
    while True:
        try:
            # Wait up to 5 seconds for an item
            task = await asyncio.wait_for(queue.get(), timeout=5.0)
            
            try:
                # Simulate processing
                await asyncio.sleep(1)
                print(f"Consumer {consumer_id} processed: {task.data}")
            except Exception as e:
                print(f"Consumer {consumer_id} error: {e}")
                # Optionally re-queue failed tasks
                await queue.put(task)
            finally:
                queue.task_done()
                
        except asyncio.TimeoutError:
            print(f"Consumer {consumer_id} timed out waiting for tasks")
            break

async def priority_pipeline():
    queue = asyncio.PriorityQueue()
    
    producer = asyncio.create_task(priority_producer(queue))
    consumers = [
        asyncio.create_task(priority_consumer(queue, i)) 
        for i in range(2)
    ]
    
    await producer
    await queue.join()
    
    # Consumers will exit after timeout
    await asyncio.gather(*consumers)

asyncio.run(priority_pipeline())

This pattern uses timeouts to gracefully shut down consumers instead of cancellation. Tasks are processed in priority order (lower numbers first).

Real-World Example: Rate-Limited API Client

Here’s a complete implementation combining queues with rate limiting:

import asyncio
import time
from typing import List, Dict, Any

class RateLimitedAPIClient:
    def __init__(self, rate_limit: int, time_window: float):
        self.rate_limit = rate_limit
        self.time_window = time_window
        self.semaphore = asyncio.Semaphore(rate_limit)
        self.request_times: List[float] = []
    
    async def _enforce_rate_limit(self):
        """Ensure we don't exceed rate limit."""
        now = time.time()
        
        # Remove old timestamps
        self.request_times = [
            t for t in self.request_times 
            if now - t < self.time_window
        ]
        
        # Wait if at limit
        if len(self.request_times) >= self.rate_limit:
            sleep_time = self.time_window - (now - self.request_times[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)
        
        self.request_times.append(time.time())
    
    async def make_request(self, endpoint: str) -> Dict[str, Any]:
        """Make a rate-limited API request."""
        async with self.semaphore:
            await self._enforce_rate_limit()
            # Simulate API call
            await asyncio.sleep(0.1)
            return {"endpoint": endpoint, "status": "success"}

async def api_producer(queue: asyncio.Queue, endpoints: List[str]):
    """Queue API requests."""
    for endpoint in endpoints:
        await queue.put(endpoint)

async def api_consumer(queue: asyncio.Queue, client: RateLimitedAPIClient, consumer_id: int):
    """Process API requests with rate limiting."""
    while True:
        endpoint = await queue.get()
        
        try:
            result = await client.make_request(endpoint)
            print(f"Consumer {consumer_id}: {result}")
        except Exception as e:
            print(f"Consumer {consumer_id} error: {e}")
        finally:
            queue.task_done()

async def rate_limited_pipeline():
    # 10 requests per 1 second window
    client = RateLimitedAPIClient(rate_limit=10, time_window=1.0)
    queue = asyncio.Queue(maxsize=50)
    
    endpoints = [f"/api/endpoint/{i}" for i in range(100)]
    
    producer = asyncio.create_task(api_producer(queue, endpoints))
    consumers = [
        asyncio.create_task(api_consumer(queue, client, i)) 
        for i in range(5)
    ]
    
    await producer
    await queue.join()
    
    for c in consumers:
        c.cancel()

asyncio.run(rate_limited_pipeline())

This implementation combines semaphores with time-based rate limiting to ensure compliance with API limits while maximizing throughput.

Best Practices and Pitfalls

Choose asyncio queues when: You have I/O-bound tasks (API calls, database queries, file operations). The GIL doesn’t affect async I/O, and you’ll use less memory than threading.

Avoid asyncio queues when: You have CPU-bound tasks. Use multiprocessing instead to leverage multiple cores.

Memory management: Bounded queues prevent memory exhaustion. Monitor queue depth in production—consistently full queues indicate consumers can’t keep up.

Testing: Use asyncio.Queue with deterministic task ordering for reproducible tests. Consider dependency injection to mock queue behavior.

Graceful shutdown: Always use queue.join() and properly cancel consumer tasks. Sentinel values (like None) work but join() is cleaner.

The producer-consumer pattern with asyncio queues is a powerful tool for building scalable concurrent Python applications. Master these patterns and you’ll handle complex async workflows with confidence.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.