Python - asyncio (Async/Await) Tutorial

• Asyncio enables concurrent I/O-bound operations in Python using cooperative multitasking, allowing thousands of operations to run efficiently on a single thread without blocking

Key Insights

• Asyncio enables concurrent I/O-bound operations in Python using cooperative multitasking, allowing thousands of operations to run efficiently on a single thread without blocking • The async/await syntax provides a clean, synchronous-looking way to write asynchronous code, making it easier to reason about compared to callbacks or raw coroutines • Understanding the event loop, tasks, and proper exception handling is critical for building production-ready async applications that avoid common pitfalls like blocking calls and resource leaks

Understanding the Event Loop

The event loop is the core of asyncio. It manages and executes asynchronous tasks, handles I/O operations, and runs callbacks. Think of it as a task scheduler that switches between coroutines when they’re waiting for I/O.

import asyncio
import time

async def task(name, delay):
    print(f"{name} starting at {time.strftime('%X')}")
    await asyncio.sleep(delay)
    print(f"{name} completed at {time.strftime('%X')}")
    return f"{name} result"

async def main():
    # Sequential execution - takes 6 seconds
    result1 = await task("Task 1", 2)
    result2 = await task("Task 2", 2)
    result3 = await task("Task 3", 2)
    
if __name__ == "__main__":
    asyncio.run(main())

The asyncio.run() function creates an event loop, runs the coroutine, and closes the loop. For Python 3.7+, this is the standard way to run async code.

Concurrent Execution with Tasks

To run coroutines concurrently, convert them to tasks using asyncio.create_task(). This schedules the coroutine for execution without waiting for it to complete.

import asyncio
import time

async def fetch_data(source, delay):
    print(f"Fetching from {source}...")
    await asyncio.sleep(delay)  # Simulates I/O operation
    return {"source": source, "data": f"Data from {source}"}

async def main():
    start = time.perf_counter()
    
    # Create tasks - they start running immediately
    task1 = asyncio.create_task(fetch_data("API-1", 2))
    task2 = asyncio.create_task(fetch_data("API-2", 3))
    task3 = asyncio.create_task(fetch_data("API-3", 1))
    
    # Wait for all tasks to complete
    results = await asyncio.gather(task1, task2, task3)
    
    elapsed = time.perf_counter() - start
    print(f"\nResults: {results}")
    print(f"Total time: {elapsed:.2f} seconds")  # ~3 seconds, not 6

asyncio.run(main())

The tasks run concurrently, so total execution time equals the longest task (3 seconds), not the sum (6 seconds).

Working with asyncio.gather and Task Groups

asyncio.gather() runs multiple coroutines concurrently and returns their results in order. For Python 3.11+, TaskGroup provides better exception handling.

import asyncio
from typing import List

async def process_item(item_id: int) -> dict:
    await asyncio.sleep(0.5)  # Simulate processing
    if item_id == 5:
        raise ValueError(f"Failed to process item {item_id}")
    return {"id": item_id, "status": "processed"}

async def main_gather():
    """Using gather - continues on error if return_exceptions=True"""
    items = range(1, 8)
    results = await asyncio.gather(
        *[process_item(i) for i in items],
        return_exceptions=True
    )
    
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"Item {i} failed: {result}")
        else:
            print(f"Item {i}: {result}")

async def main_taskgroup():
    """Using TaskGroup (Python 3.11+) - cancels all on first error"""
    results = []
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(process_item(i)) for i in range(1, 8)]
        
        results = [task.result() for task in tasks]
    except* ValueError as eg:
        print(f"Errors occurred: {eg.exceptions}")
    
    print(f"Processed: {len(results)} items")

asyncio.run(main_gather())

Real-World Example: Concurrent HTTP Requests

Here’s a practical example using aiohttp for concurrent HTTP requests:

import asyncio
import aiohttp
from typing import List, Dict

async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
    """Fetch a single URL and return response data"""
    try:
        async with session.get(url, timeout=10) as response:
            return {
                "url": url,
                "status": response.status,
                "content_length": len(await response.text())
            }
    except asyncio.TimeoutError:
        return {"url": url, "error": "Timeout"}
    except Exception as e:
        return {"url": url, "error": str(e)}

async def fetch_all(urls: List[str]) -> List[Dict]:
    """Fetch multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/status/404",
    ]
    
    results = await fetch_all(urls)
    for result in results:
        print(result)

# Install aiohttp first: pip install aiohttp
asyncio.run(main())

Semaphores for Rate Limiting

Control concurrency with semaphores to avoid overwhelming external services:

import asyncio
import aiohttp

async def fetch_with_limit(
    session: aiohttp.ClientSession, 
    url: str, 
    semaphore: asyncio.Semaphore
) -> dict:
    """Fetch URL with concurrency limiting"""
    async with semaphore:  # Only N requests run simultaneously
        print(f"Fetching {url}")
        async with session.get(url) as response:
            data = await response.json()
            print(f"Completed {url}")
            return data

async def main():
    # Limit to 3 concurrent requests
    semaphore = asyncio.Semaphore(3)
    
    urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(session, url, semaphore) 
            for url in urls
        ]
        results = await asyncio.gather(*tasks)
    
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())

Async Generators and Iteration

Async generators allow you to yield values asynchronously, useful for streaming data:

import asyncio
from typing import AsyncIterator

async def fetch_pages(num_pages: int) -> AsyncIterator[dict]:
    """Simulate paginated API responses"""
    for page in range(1, num_pages + 1):
        await asyncio.sleep(0.5)  # Simulate API delay
        yield {
            "page": page,
            "data": [f"item_{page}_{i}" for i in range(5)]
        }

async def process_stream():
    """Process data as it arrives"""
    async for page_data in fetch_pages(5):
        print(f"Processing page {page_data['page']}")
        # Process items immediately without waiting for all pages
        for item in page_data['data']:
            print(f"  - {item}")

asyncio.run(process_stream())

Handling Timeouts

Always set timeouts for external operations to prevent indefinite hanging:

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "Completed"

async def main():
    try:
        # Timeout after 2 seconds
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

    # Alternative: using timeout context manager (Python 3.11+)
    try:
        async with asyncio.timeout(2.0):
            result = await slow_operation()
            print(result)
    except TimeoutError:
        print("Operation timed out")

asyncio.run(main())

Mixing Sync and Async Code

Never call blocking synchronous code directly in async functions. Use run_in_executor() for CPU-bound or blocking I/O:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io_operation(filename: str) -> str:
    """Simulates blocking I/O (legacy library without async support)"""
    time.sleep(2)
    return f"Data from {filename}"

def cpu_intensive_task(n: int) -> int:
    """CPU-bound operation"""
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_running_loop()
    
    # Run blocking I/O in thread pool
    with ThreadPoolExecutor(max_workers=3) as pool:
        results = await asyncio.gather(
            loop.run_in_executor(pool, blocking_io_operation, "file1.txt"),
            loop.run_in_executor(pool, blocking_io_operation, "file2.txt"),
            loop.run_in_executor(pool, cpu_intensive_task, 10000000)
        )
    
    print(results)

asyncio.run(main())

Common Pitfalls

Forgetting await: Calling an async function without await returns a coroutine object, not the result:

async def get_data():
    return "data"

async def wrong():
    result = get_data()  # Wrong! Returns coroutine object
    print(result)  # <coroutine object get_data>

async def correct():
    result = await get_data()  # Correct
    print(result)  # "data"

Blocking the event loop: Never use time.sleep() in async code—use asyncio.sleep() instead. Blocking calls prevent other tasks from running.

Not handling cancellation: Tasks can be cancelled. Always handle asyncio.CancelledError for cleanup:

async def cancellable_task():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("Task was cancelled, cleaning up...")
        # Perform cleanup
        raise  # Re-raise to propagate cancellation

Asyncio transforms Python into a powerful tool for I/O-bound concurrency. Master these patterns to build responsive, efficient applications that handle thousands of concurrent operations without the complexity of threading.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.