Python Async/Await: Asynchronous Programming Guide

Asynchronous programming allows your application to handle multiple operations concurrently without blocking execution. When you make a network request synchronously, your program waits idly for the...

Key Insights

  • Async/await shines for I/O-bound operations like network requests and file operations, potentially improving throughput by 10-100x compared to synchronous code, but provides no benefit for CPU-bound tasks
  • The event loop is the heart of async Python—it manages coroutine execution, allowing thousands of concurrent operations without the overhead of threads or processes
  • Common pitfalls include accidentally blocking the event loop with synchronous I/O, forgetting to await coroutines, and using async when threading or multiprocessing would be more appropriate

Introduction to Asynchronous Programming

Asynchronous programming allows your application to handle multiple operations concurrently without blocking execution. When you make a network request synchronously, your program waits idly for the response. With async, it can start other tasks while waiting.

The difference is dramatic for I/O-bound operations:

import time
import asyncio

# Synchronous approach
def sync_sleep():
    print(f"Start: {time.time():.2f}")
    time.sleep(1)
    print(f"End: {time.time():.2f}")
    return "Done"

# Run three times
start = time.time()
for _ in range(3):
    sync_sleep()
print(f"Total time: {time.time() - start:.2f}s")  # ~3 seconds

# Asynchronous approach
async def async_sleep():
    print(f"Start: {time.time():.2f}")
    await asyncio.sleep(1)
    print(f"End: {time.time():.2f}")
    return "Done"

async def main():
    start = time.time()
    await asyncio.gather(async_sleep(), async_sleep(), async_sleep())
    print(f"Total time: {time.time() - start:.2f}s")  # ~1 second

asyncio.run(main())

The synchronous version takes 3 seconds—each operation blocks the next. The async version takes 1 second—all operations run concurrently.

Core Concepts: async, await, and the Event Loop

The async keyword defines a coroutine function. When called, it returns a coroutine object that must be awaited or scheduled on the event loop. The await keyword pauses coroutine execution until the awaited operation completes, allowing other coroutines to run.

import asyncio
from datetime import datetime

async def fetch_data(source_id):
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Fetching from {source_id}")
    await asyncio.sleep(1)  # Simulates I/O operation
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Completed {source_id}")
    return f"Data from {source_id}"

async def main():
    # Sequential execution - takes 3 seconds
    result1 = await fetch_data("API-1")
    result2 = await fetch_data("API-2")
    result3 = await fetch_data("API-3")
    
    # Concurrent execution - takes 1 second
    results = await asyncio.gather(
        fetch_data("API-1"),
        fetch_data("API-2"),
        fetch_data("API-3")
    )
    print(results)

asyncio.run(main())

The event loop manages task scheduling. When you await an I/O operation, the event loop switches to other ready tasks. This cooperative multitasking requires no locks or thread synchronization.

Working with asyncio Tasks and Gathering

Tasks wrap coroutines and schedule them on the event loop. Use asyncio.create_task() to start a coroutine immediately without waiting for its result:

import asyncio

async def process_item(item_id):
    await asyncio.sleep(1)
    return f"Processed {item_id}"

async def main():
    # Create tasks immediately starts execution
    task1 = asyncio.create_task(process_item(1))
    task2 = asyncio.create_task(process_item(2))
    task3 = asyncio.create_task(process_item(3))
    
    # Do other work here while tasks run
    print("Tasks running in background...")
    
    # Wait for all results
    results = await asyncio.gather(task1, task2, task3)
    print(results)

asyncio.run(main())

For operations that might hang, use asyncio.wait_for() to enforce timeouts:

async def fetch_with_timeout(url):
    try:
        result = await asyncio.wait_for(
            fetch_data(url),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        print(f"Request to {url} timed out")
        return None

async def cancelable_operation():
    task = asyncio.create_task(long_running_operation())
    
    # Cancel after 2 seconds
    await asyncio.sleep(2)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

The gather() function collects results from multiple coroutines. Use return_exceptions=True to handle failures gracefully:

async def main():
    results = await asyncio.gather(
        fetch_data("valid-url"),
        fetch_data("invalid-url"),
        fetch_data("another-valid"),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Request {i} failed: {result}")
        else:
            print(f"Request {i} succeeded: {result}")

Async Context Managers and Iterators

Async context managers use async with for resource management with async setup/teardown:

import aiohttp
import asyncio

async def fetch_multiple_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(fetch_url(session, url))
        return await asyncio.gather(*tasks)

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

Async iterators use async for to process streaming data:

async def fetch_paginated_results(api_endpoint):
    """Async generator for paginated API results"""
    page = 1
    async with aiohttp.ClientSession() as session:
        while True:
            async with session.get(f"{api_endpoint}?page={page}") as response:
                data = await response.json()
                if not data['results']:
                    break
                for item in data['results']:
                    yield item
                page += 1

async def process_all_results():
    async for item in fetch_paginated_results("https://api.example.com/data"):
        print(f"Processing {item}")
        await process_item(item)

Python 3.9+ includes async file operations:

import aiofiles

async def read_files_concurrently(file_paths):
    async def read_file(path):
        async with aiofiles.open(path, 'r') as f:
            return await f.read()
    
    contents = await asyncio.gather(*[read_file(p) for p in file_paths])
    return contents

Common Patterns and Best Practices

Always handle exceptions in async code. Use gather() with return_exceptions=True to prevent one failure from canceling all tasks:

async def safe_fetch(url):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=10) as response:
                return await response.json()
    except asyncio.TimeoutError:
        print(f"Timeout fetching {url}")
        return None
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None

Use semaphores to limit concurrent operations and prevent overwhelming servers:

async def fetch_with_rate_limit(urls, max_concurrent=5):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_one(url):
        async with semaphore:
            return await fetch_url(url)
    
    return await asyncio.gather(*[fetch_one(url) for url in urls])

Never block the event loop with synchronous I/O. Use asyncio.to_thread() for blocking operations:

import asyncio
import requests  # Synchronous library

async def fetch_with_sync_library(url):
    # Run blocking call in thread pool
    response = await asyncio.to_thread(requests.get, url)
    return response.json()

async def cpu_intensive_work(data):
    # Offload CPU-bound work to thread
    result = await asyncio.to_thread(expensive_calculation, data)
    return result

Real-World Application: Building an Async Web Scraper

Here’s a complete web scraper combining error handling, rate limiting, and retries:

import asyncio
import aiohttp
from typing import List, Dict
import time

async def fetch_with_retry(session, url, max_retries=3):
    """Fetch URL with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=10) as response:
                if response.status == 200:
                    return await response.text()
                elif response.status == 429:  # Rate limited
                    wait_time = 2 ** attempt
                    await asyncio.sleep(wait_time)
                else:
                    return None
        except asyncio.TimeoutError:
            if attempt == max_retries - 1:
                return None
            await asyncio.sleep(2 ** attempt)
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None
    return None

async def scrape_urls(urls: List[str], max_concurrent=10) -> Dict[str, str]:
    """Scrape multiple URLs concurrently with rate limiting"""
    semaphore = asyncio.Semaphore(max_concurrent)
    results = {}
    
    async def fetch_one(url):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                content = await fetch_with_retry(session, url)
                return url, content
    
    start_time = time.time()
    responses = await asyncio.gather(
        *[fetch_one(url) for url in urls],
        return_exceptions=True
    )
    
    for response in responses:
        if isinstance(response, Exception):
            print(f"Failed: {response}")
        else:
            url, content = response
            results[url] = content
    
    elapsed = time.time() - start_time
    print(f"Scraped {len(urls)} URLs in {elapsed:.2f}s")
    return results

# Usage
async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
        # ... more URLs
    ]
    results = await scrape_urls(urls, max_concurrent=5)
    print(f"Successfully scraped {len(results)} pages")

if __name__ == "__main__":
    asyncio.run(main())

Performance Comparison and When NOT to Use Async

Async excels at I/O-bound tasks but provides no benefit for CPU-bound operations:

import asyncio
import time

# I/O-bound: Async wins
async def io_bound_async():
    start = time.time()
    await asyncio.gather(*[asyncio.sleep(0.1) for _ in range(100)])
    print(f"Async I/O: {time.time() - start:.2f}s")  # ~0.1s

def io_bound_sync():
    start = time.time()
    for _ in range(100):
        time.sleep(0.1)
    print(f"Sync I/O: {time.time() - start:.2f}s")  # ~10s

# CPU-bound: Async provides no benefit
async def cpu_bound_async():
    start = time.time()
    await asyncio.gather(*[asyncio.to_thread(sum, range(10_000_000)) for _ in range(4)])
    print(f"Async CPU: {time.time() - start:.2f}s")

def cpu_bound_sync():
    start = time.time()
    for _ in range(4):
        sum(range(10_000_000))
    print(f"Sync CPU: {time.time() - start:.2f}s")

Use async when:

  • Making multiple network requests
  • Handling many concurrent connections (websockets, chat servers)
  • Performing file I/O operations
  • Interacting with databases with async drivers

Don’t use async when:

  • Performing CPU-intensive calculations (use multiprocessing)
  • Working with libraries without async support
  • Simple scripts with minimal I/O
  • The added complexity outweighs benefits

For CPU-bound tasks, use multiprocessing or concurrent.futures.ProcessPoolExecutor. For I/O with synchronous libraries, use ThreadPoolExecutor. Async is powerful but not a universal solution—choose the right tool for your specific workload.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.