Python Async/Await: Asynchronous Programming Guide
Asynchronous programming allows your application to handle multiple operations concurrently without blocking execution. When you make a network request synchronously, your program waits idly for the...
Key Insights
- Async/await shines for I/O-bound operations like network requests and file operations, potentially improving throughput by 10-100x compared to synchronous code, but provides no benefit for CPU-bound tasks
- The event loop is the heart of async Python—it manages coroutine execution, allowing thousands of concurrent operations without the overhead of threads or processes
- Common pitfalls include accidentally blocking the event loop with synchronous I/O, forgetting to await coroutines, and using async when threading or multiprocessing would be more appropriate
Introduction to Asynchronous Programming
Asynchronous programming allows your application to handle multiple operations concurrently without blocking execution. When you make a network request synchronously, your program waits idly for the response. With async, it can start other tasks while waiting.
The difference is dramatic for I/O-bound operations:
import time
import asyncio
# Synchronous approach
def sync_sleep():
print(f"Start: {time.time():.2f}")
time.sleep(1)
print(f"End: {time.time():.2f}")
return "Done"
# Run three times
start = time.time()
for _ in range(3):
sync_sleep()
print(f"Total time: {time.time() - start:.2f}s") # ~3 seconds
# Asynchronous approach
async def async_sleep():
print(f"Start: {time.time():.2f}")
await asyncio.sleep(1)
print(f"End: {time.time():.2f}")
return "Done"
async def main():
start = time.time()
await asyncio.gather(async_sleep(), async_sleep(), async_sleep())
print(f"Total time: {time.time() - start:.2f}s") # ~1 second
asyncio.run(main())
The synchronous version takes 3 seconds—each operation blocks the next. The async version takes 1 second—all operations run concurrently.
Core Concepts: async, await, and the Event Loop
The async keyword defines a coroutine function. When called, it returns a coroutine object that must be awaited or scheduled on the event loop. The await keyword pauses coroutine execution until the awaited operation completes, allowing other coroutines to run.
import asyncio
from datetime import datetime
async def fetch_data(source_id):
print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Fetching from {source_id}")
await asyncio.sleep(1) # Simulates I/O operation
print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Completed {source_id}")
return f"Data from {source_id}"
async def main():
# Sequential execution - takes 3 seconds
result1 = await fetch_data("API-1")
result2 = await fetch_data("API-2")
result3 = await fetch_data("API-3")
# Concurrent execution - takes 1 second
results = await asyncio.gather(
fetch_data("API-1"),
fetch_data("API-2"),
fetch_data("API-3")
)
print(results)
asyncio.run(main())
The event loop manages task scheduling. When you await an I/O operation, the event loop switches to other ready tasks. This cooperative multitasking requires no locks or thread synchronization.
Working with asyncio Tasks and Gathering
Tasks wrap coroutines and schedule them on the event loop. Use asyncio.create_task() to start a coroutine immediately without waiting for its result:
import asyncio
async def process_item(item_id):
await asyncio.sleep(1)
return f"Processed {item_id}"
async def main():
# Create tasks immediately starts execution
task1 = asyncio.create_task(process_item(1))
task2 = asyncio.create_task(process_item(2))
task3 = asyncio.create_task(process_item(3))
# Do other work here while tasks run
print("Tasks running in background...")
# Wait for all results
results = await asyncio.gather(task1, task2, task3)
print(results)
asyncio.run(main())
For operations that might hang, use asyncio.wait_for() to enforce timeouts:
async def fetch_with_timeout(url):
try:
result = await asyncio.wait_for(
fetch_data(url),
timeout=5.0
)
return result
except asyncio.TimeoutError:
print(f"Request to {url} timed out")
return None
async def cancelable_operation():
task = asyncio.create_task(long_running_operation())
# Cancel after 2 seconds
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
The gather() function collects results from multiple coroutines. Use return_exceptions=True to handle failures gracefully:
async def main():
results = await asyncio.gather(
fetch_data("valid-url"),
fetch_data("invalid-url"),
fetch_data("another-valid"),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Request {i} failed: {result}")
else:
print(f"Request {i} succeeded: {result}")
Async Context Managers and Iterators
Async context managers use async with for resource management with async setup/teardown:
import aiohttp
import asyncio
async def fetch_multiple_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(fetch_url(session, url))
return await asyncio.gather(*tasks)
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
Async iterators use async for to process streaming data:
async def fetch_paginated_results(api_endpoint):
"""Async generator for paginated API results"""
page = 1
async with aiohttp.ClientSession() as session:
while True:
async with session.get(f"{api_endpoint}?page={page}") as response:
data = await response.json()
if not data['results']:
break
for item in data['results']:
yield item
page += 1
async def process_all_results():
async for item in fetch_paginated_results("https://api.example.com/data"):
print(f"Processing {item}")
await process_item(item)
Python 3.9+ includes async file operations:
import aiofiles
async def read_files_concurrently(file_paths):
async def read_file(path):
async with aiofiles.open(path, 'r') as f:
return await f.read()
contents = await asyncio.gather(*[read_file(p) for p in file_paths])
return contents
Common Patterns and Best Practices
Always handle exceptions in async code. Use gather() with return_exceptions=True to prevent one failure from canceling all tasks:
async def safe_fetch(url):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
return await response.json()
except asyncio.TimeoutError:
print(f"Timeout fetching {url}")
return None
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
Use semaphores to limit concurrent operations and prevent overwhelming servers:
async def fetch_with_rate_limit(urls, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url):
async with semaphore:
return await fetch_url(url)
return await asyncio.gather(*[fetch_one(url) for url in urls])
Never block the event loop with synchronous I/O. Use asyncio.to_thread() for blocking operations:
import asyncio
import requests # Synchronous library
async def fetch_with_sync_library(url):
# Run blocking call in thread pool
response = await asyncio.to_thread(requests.get, url)
return response.json()
async def cpu_intensive_work(data):
# Offload CPU-bound work to thread
result = await asyncio.to_thread(expensive_calculation, data)
return result
Real-World Application: Building an Async Web Scraper
Here’s a complete web scraper combining error handling, rate limiting, and retries:
import asyncio
import aiohttp
from typing import List, Dict
import time
async def fetch_with_retry(session, url, max_retries=3):
"""Fetch URL with exponential backoff retry"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.text()
elif response.status == 429: # Rate limited
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
else:
return None
except asyncio.TimeoutError:
if attempt == max_retries - 1:
return None
await asyncio.sleep(2 ** attempt)
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
return None
async def scrape_urls(urls: List[str], max_concurrent=10) -> Dict[str, str]:
"""Scrape multiple URLs concurrently with rate limiting"""
semaphore = asyncio.Semaphore(max_concurrent)
results = {}
async def fetch_one(url):
async with semaphore:
async with aiohttp.ClientSession() as session:
content = await fetch_with_retry(session, url)
return url, content
start_time = time.time()
responses = await asyncio.gather(
*[fetch_one(url) for url in urls],
return_exceptions=True
)
for response in responses:
if isinstance(response, Exception):
print(f"Failed: {response}")
else:
url, content = response
results[url] = content
elapsed = time.time() - start_time
print(f"Scraped {len(urls)} URLs in {elapsed:.2f}s")
return results
# Usage
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
# ... more URLs
]
results = await scrape_urls(urls, max_concurrent=5)
print(f"Successfully scraped {len(results)} pages")
if __name__ == "__main__":
asyncio.run(main())
Performance Comparison and When NOT to Use Async
Async excels at I/O-bound tasks but provides no benefit for CPU-bound operations:
import asyncio
import time
# I/O-bound: Async wins
async def io_bound_async():
start = time.time()
await asyncio.gather(*[asyncio.sleep(0.1) for _ in range(100)])
print(f"Async I/O: {time.time() - start:.2f}s") # ~0.1s
def io_bound_sync():
start = time.time()
for _ in range(100):
time.sleep(0.1)
print(f"Sync I/O: {time.time() - start:.2f}s") # ~10s
# CPU-bound: Async provides no benefit
async def cpu_bound_async():
start = time.time()
await asyncio.gather(*[asyncio.to_thread(sum, range(10_000_000)) for _ in range(4)])
print(f"Async CPU: {time.time() - start:.2f}s")
def cpu_bound_sync():
start = time.time()
for _ in range(4):
sum(range(10_000_000))
print(f"Sync CPU: {time.time() - start:.2f}s")
Use async when:
- Making multiple network requests
- Handling many concurrent connections (websockets, chat servers)
- Performing file I/O operations
- Interacting with databases with async drivers
Don’t use async when:
- Performing CPU-intensive calculations (use multiprocessing)
- Working with libraries without async support
- Simple scripts with minimal I/O
- The added complexity outweighs benefits
For CPU-bound tasks, use multiprocessing or concurrent.futures.ProcessPoolExecutor. For I/O with synchronous libraries, use ThreadPoolExecutor. Async is powerful but not a universal solution—choose the right tool for your specific workload.