Python - asyncio (Async/Await) Tutorial
• Asyncio enables concurrent I/O-bound operations in Python using cooperative multitasking, allowing thousands of operations to run efficiently on a single thread without blocking
Key Insights
• Asyncio enables concurrent I/O-bound operations in Python using cooperative multitasking, allowing thousands of operations to run efficiently on a single thread without blocking • The async/await syntax provides a clean, synchronous-looking way to write asynchronous code, making it easier to reason about compared to callbacks or raw coroutines • Understanding the event loop, tasks, and proper exception handling is critical for building production-ready async applications that avoid common pitfalls like blocking calls and resource leaks
Understanding the Event Loop
The event loop is the core of asyncio. It manages and executes asynchronous tasks, handles I/O operations, and runs callbacks. Think of it as a task scheduler that switches between coroutines when they’re waiting for I/O.
import asyncio
import time
async def task(name, delay):
print(f"{name} starting at {time.strftime('%X')}")
await asyncio.sleep(delay)
print(f"{name} completed at {time.strftime('%X')}")
return f"{name} result"
async def main():
# Sequential execution - takes 6 seconds
result1 = await task("Task 1", 2)
result2 = await task("Task 2", 2)
result3 = await task("Task 3", 2)
if __name__ == "__main__":
asyncio.run(main())
The asyncio.run() function creates an event loop, runs the coroutine, and closes the loop. For Python 3.7+, this is the standard way to run async code.
Concurrent Execution with Tasks
To run coroutines concurrently, convert them to tasks using asyncio.create_task(). This schedules the coroutine for execution without waiting for it to complete.
import asyncio
import time
async def fetch_data(source, delay):
print(f"Fetching from {source}...")
await asyncio.sleep(delay) # Simulates I/O operation
return {"source": source, "data": f"Data from {source}"}
async def main():
start = time.perf_counter()
# Create tasks - they start running immediately
task1 = asyncio.create_task(fetch_data("API-1", 2))
task2 = asyncio.create_task(fetch_data("API-2", 3))
task3 = asyncio.create_task(fetch_data("API-3", 1))
# Wait for all tasks to complete
results = await asyncio.gather(task1, task2, task3)
elapsed = time.perf_counter() - start
print(f"\nResults: {results}")
print(f"Total time: {elapsed:.2f} seconds") # ~3 seconds, not 6
asyncio.run(main())
The tasks run concurrently, so total execution time equals the longest task (3 seconds), not the sum (6 seconds).
Working with asyncio.gather and Task Groups
asyncio.gather() runs multiple coroutines concurrently and returns their results in order. For Python 3.11+, TaskGroup provides better exception handling.
import asyncio
from typing import List
async def process_item(item_id: int) -> dict:
await asyncio.sleep(0.5) # Simulate processing
if item_id == 5:
raise ValueError(f"Failed to process item {item_id}")
return {"id": item_id, "status": "processed"}
async def main_gather():
"""Using gather - continues on error if return_exceptions=True"""
items = range(1, 8)
results = await asyncio.gather(
*[process_item(i) for i in items],
return_exceptions=True
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"Item {i} failed: {result}")
else:
print(f"Item {i}: {result}")
async def main_taskgroup():
"""Using TaskGroup (Python 3.11+) - cancels all on first error"""
results = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_item(i)) for i in range(1, 8)]
results = [task.result() for task in tasks]
except* ValueError as eg:
print(f"Errors occurred: {eg.exceptions}")
print(f"Processed: {len(results)} items")
asyncio.run(main_gather())
Real-World Example: Concurrent HTTP Requests
Here’s a practical example using aiohttp for concurrent HTTP requests:
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""Fetch a single URL and return response data"""
try:
async with session.get(url, timeout=10) as response:
return {
"url": url,
"status": response.status,
"content_length": len(await response.text())
}
except asyncio.TimeoutError:
return {"url": url, "error": "Timeout"}
except Exception as e:
return {"url": url, "error": str(e)}
async def fetch_all(urls: List[str]) -> List[Dict]:
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/404",
]
results = await fetch_all(urls)
for result in results:
print(result)
# Install aiohttp first: pip install aiohttp
asyncio.run(main())
Semaphores for Rate Limiting
Control concurrency with semaphores to avoid overwhelming external services:
import asyncio
import aiohttp
async def fetch_with_limit(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore
) -> dict:
"""Fetch URL with concurrency limiting"""
async with semaphore: # Only N requests run simultaneously
print(f"Fetching {url}")
async with session.get(url) as response:
data = await response.json()
print(f"Completed {url}")
return data
async def main():
# Limit to 3 concurrent requests
semaphore = asyncio.Semaphore(3)
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
Async Generators and Iteration
Async generators allow you to yield values asynchronously, useful for streaming data:
import asyncio
from typing import AsyncIterator
async def fetch_pages(num_pages: int) -> AsyncIterator[dict]:
"""Simulate paginated API responses"""
for page in range(1, num_pages + 1):
await asyncio.sleep(0.5) # Simulate API delay
yield {
"page": page,
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def process_stream():
"""Process data as it arrives"""
async for page_data in fetch_pages(5):
print(f"Processing page {page_data['page']}")
# Process items immediately without waiting for all pages
for item in page_data['data']:
print(f" - {item}")
asyncio.run(process_stream())
Handling Timeouts
Always set timeouts for external operations to prevent indefinite hanging:
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "Completed"
async def main():
try:
# Timeout after 2 seconds
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
# Alternative: using timeout context manager (Python 3.11+)
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
print(result)
except TimeoutError:
print("Operation timed out")
asyncio.run(main())
Mixing Sync and Async Code
Never call blocking synchronous code directly in async functions. Use run_in_executor() for CPU-bound or blocking I/O:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io_operation(filename: str) -> str:
"""Simulates blocking I/O (legacy library without async support)"""
time.sleep(2)
return f"Data from {filename}"
def cpu_intensive_task(n: int) -> int:
"""CPU-bound operation"""
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
# Run blocking I/O in thread pool
with ThreadPoolExecutor(max_workers=3) as pool:
results = await asyncio.gather(
loop.run_in_executor(pool, blocking_io_operation, "file1.txt"),
loop.run_in_executor(pool, blocking_io_operation, "file2.txt"),
loop.run_in_executor(pool, cpu_intensive_task, 10000000)
)
print(results)
asyncio.run(main())
Common Pitfalls
Forgetting await: Calling an async function without await returns a coroutine object, not the result:
async def get_data():
return "data"
async def wrong():
result = get_data() # Wrong! Returns coroutine object
print(result) # <coroutine object get_data>
async def correct():
result = await get_data() # Correct
print(result) # "data"
Blocking the event loop: Never use time.sleep() in async code—use asyncio.sleep() instead. Blocking calls prevent other tasks from running.
Not handling cancellation: Tasks can be cancelled. Always handle asyncio.CancelledError for cleanup:
async def cancellable_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("Task was cancelled, cleaning up...")
# Perform cleanup
raise # Re-raise to propagate cancellation
Asyncio transforms Python into a powerful tool for I/O-bound concurrency. Master these patterns to build responsive, efficient applications that handle thousands of concurrent operations without the complexity of threading.