Python asyncio: Cooperative Multitasking
Multitasking in computing comes in two flavors: preemptive and cooperative. With preemptive multitasking, the operating system forcibly interrupts running tasks to give other tasks CPU time. Threads...
Key Insights
- Cooperative multitasking requires explicit yield points via
await—a single blocking call can freeze your entire application, making it critical to understand where control transfers happen. - asyncio excels at I/O-bound concurrency but runs on a single thread; CPU-intensive work will block the event loop unless offloaded to a thread or process pool.
- Tasks enable true concurrency within asyncio by scheduling coroutines to run independently, while directly awaiting coroutines runs them sequentially.
What Is Cooperative Multitasking?
Multitasking in computing comes in two flavors: preemptive and cooperative. With preemptive multitasking, the operating system forcibly interrupts running tasks to give other tasks CPU time. Threads work this way—the OS scheduler can pause your thread at virtually any point.
Cooperative multitasking flips this model. Tasks must voluntarily yield control back to the scheduler. Nothing forces them to stop. If a task refuses to yield, everything else waits.
Python’s asyncio implements cooperative multitasking. Your coroutines run until they hit an await expression, at which point they pause and let other coroutines execute. This design eliminates many threading headaches—no race conditions from unexpected context switches, no need for locks around most shared state—but introduces its own discipline: you must yield, and you must yield often.
The threading model gives you concurrency without thinking about it. asyncio gives you concurrency only when you explicitly ask for it. This tradeoff matters. Threading hides complexity but introduces subtle bugs. asyncio surfaces complexity but makes control flow predictable.
The Event Loop: asyncio’s Core
The event loop is asyncio’s scheduler. It maintains a queue of ready-to-run coroutines, executes them until they await something, then moves on to the next ready coroutine. When awaited operations complete (a network response arrives, a timer expires), the loop marks those coroutines as ready again.
This all happens on a single thread. No parallelism, just concurrency through interleaving. While one coroutine waits for I/O, others can run.
import asyncio
async def fetch_data(source: str, delay: float) -> str:
print(f"Starting fetch from {source}")
await asyncio.sleep(delay) # Simulates I/O wait
print(f"Completed fetch from {source}")
return f"Data from {source}"
async def main():
# These run concurrently, not sequentially
results = await asyncio.gather(
fetch_data("database", 2.0),
fetch_data("cache", 0.5),
fetch_data("api", 1.0),
)
print(f"All results: {results}")
asyncio.run(main())
Running this produces output showing concurrent execution:
Starting fetch from database
Starting fetch from cache
Starting fetch from api
Completed fetch from cache
Completed fetch from api
Completed fetch from database
All results: ['Data from database', 'Data from cache', 'Data from api']
All three fetches start immediately. The cache fetch completes first despite starting second, because it has the shortest delay. Total runtime is approximately 2 seconds (the longest delay), not 3.5 seconds (the sum of all delays).
asyncio.run() creates an event loop, runs your main coroutine to completion, then cleans up. For most applications, this is your entry point.
Coroutines, Tasks, and Futures
These three types form asyncio’s concurrency primitives, and confusing them causes real bugs.
A coroutine is what you get when you call an async def function. It’s an object representing suspended computation. Calling fetch_data("api", 1.0) doesn’t run anything—it returns a coroutine object that must be awaited or scheduled.
A Task wraps a coroutine and schedules it for execution on the event loop. Tasks run independently; creating a task starts execution immediately (on the next event loop iteration). Use asyncio.create_task() when you want concurrent execution.
A Future is a low-level primitive representing an eventual result. Tasks are Futures. You’ll rarely create Futures directly, but you’ll await them through Tasks.
The critical distinction: awaiting a coroutine directly runs it sequentially. Creating a task runs it concurrently.
import asyncio
import time
async def slow_operation(name: str) -> str:
await asyncio.sleep(1.0)
return f"{name} complete"
async def sequential():
"""Awaiting coroutines directly: runs one after another"""
start = time.perf_counter()
result1 = await slow_operation("first")
result2 = await slow_operation("second")
result3 = await slow_operation("third")
elapsed = time.perf_counter() - start
print(f"Sequential: {elapsed:.2f}s") # ~3 seconds
return [result1, result2, result3]
async def concurrent():
"""Creating tasks: runs simultaneously"""
start = time.perf_counter()
task1 = asyncio.create_task(slow_operation("first"))
task2 = asyncio.create_task(slow_operation("second"))
task3 = asyncio.create_task(slow_operation("third"))
results = [await task1, await task2, await task3]
elapsed = time.perf_counter() - start
print(f"Concurrent: {elapsed:.2f}s") # ~1 second
return results
async def main():
await sequential()
await concurrent()
asyncio.run(main())
asyncio.gather() is syntactic sugar for creating tasks and awaiting them all. Use it when you have a collection of coroutines that should run concurrently and you need all results.
Understanding Await Points
Every await is a potential context switch. The event loop can run other coroutines while yours is suspended. But here’s the trap: only await triggers this. Regular Python code between awaits runs uninterrupted.
This makes blocking calls dangerous. time.sleep() doesn’t yield to the event loop—it blocks the entire thread, freezing all coroutines.
import asyncio
import time
async def blocking_task():
print("Blocking task: starting")
time.sleep(2) # WRONG: blocks the entire event loop
print("Blocking task: done")
async def async_task():
print("Async task: starting")
await asyncio.sleep(2) # CORRECT: yields to event loop
print("Async task: done")
async def other_work():
for i in range(5):
print(f"Other work: iteration {i}")
await asyncio.sleep(0.3)
async def demonstrate_blocking():
print("=== With blocking sleep ===")
await asyncio.gather(blocking_task(), other_work())
async def demonstrate_async():
print("\n=== With async sleep ===")
await asyncio.gather(async_task(), other_work())
asyncio.run(demonstrate_blocking())
asyncio.run(demonstrate_async())
With time.sleep(), “other work” doesn’t start until the blocking task completes. With asyncio.sleep(), they interleave properly.
Common blocking culprits: synchronous HTTP libraries (requests), file I/O without async wrappers, CPU-heavy computation, and database drivers without async support.
Real-World Pattern: Concurrent I/O
Fetching multiple HTTP endpoints is asyncio’s sweet spot. Here’s a practical comparison using aiohttp:
import asyncio
import time
import aiohttp
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_sequential(urls: list[str]) -> list[dict]:
"""Fetch URLs one at a time"""
start = time.perf_counter()
results = []
async with aiohttp.ClientSession() as session:
for url in urls:
result = await fetch_one(session, url)
results.append(result)
elapsed = time.perf_counter() - start
print(f"Sequential: {elapsed:.2f}s for {len(urls)} requests")
return results
async def fetch_concurrent(urls: list[str]) -> list[dict]:
"""Fetch all URLs simultaneously"""
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Concurrent: {elapsed:.2f}s for {len(urls)} requests")
return results
async def main():
await fetch_sequential(URLS) # ~5 seconds
await fetch_concurrent(URLS) # ~1 second
asyncio.run(main())
Five requests to a 1-second delay endpoint: sequential takes ~5 seconds, concurrent takes ~1 second. The speedup scales with I/O wait time.
Coordination Primitives
Uncontrolled concurrency creates problems. Hit an API with 1000 simultaneous requests and you’ll get rate-limited or banned. asyncio provides synchronization primitives for these scenarios.
asyncio.Semaphore limits concurrent access to a resource:
import asyncio
import aiohttp
import time
async def fetch_with_limit(
session: aiohttp.ClientSession,
semaphore: asyncio.Semaphore,
url: str,
request_id: int,
) -> dict:
async with semaphore: # Only N coroutines can hold the semaphore
print(f"Request {request_id}: starting")
async with session.get(url) as response:
result = await response.json()
print(f"Request {request_id}: complete")
return result
async def fetch_rate_limited(urls: list[str], max_concurrent: int = 3):
"""Fetch URLs with concurrency limit"""
semaphore = asyncio.Semaphore(max_concurrent)
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, semaphore, url, i)
for i, url in enumerate(urls)
]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Rate-limited ({max_concurrent} concurrent): {elapsed:.2f}s")
return results
# With 10 URLs and max 3 concurrent, requests batch in groups of 3
asyncio.run(fetch_rate_limited(URLS * 2, max_concurrent=3))
asyncio.Lock prevents concurrent access entirely—useful when you have shared mutable state that isn’t thread-safe.
When asyncio Is (and Isn’t) the Right Choice
asyncio shines for I/O-bound workloads: web servers, API clients, database access, file operations (with async libraries), websockets, and network protocols. If your code spends most of its time waiting for external systems, asyncio can dramatically improve throughput.
asyncio fails for CPU-bound work. Image processing, data crunching, cryptographic operations—these block the event loop because they don’t await anything. Your web server stops responding while computing a hash.
The escape hatch is run_in_executor(), which runs blocking code in a thread or process pool:
import asyncio
import hashlib
def cpu_intensive(data: bytes) -> str:
"""Simulates CPU-bound work"""
for _ in range(100):
data = hashlib.sha256(data).digest()
return data.hex()
async def process_without_blocking(data: bytes) -> str:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, cpu_intensive, data)
return result
For primarily CPU-bound applications, consider multiprocessing directly or libraries like concurrent.futures. asyncio adds complexity without benefit when you’re not waiting on I/O.
Choose asyncio when your application is I/O-bound and you want explicit control over concurrency. Choose threading when you need parallelism with simpler code. Choose multiprocessing when you need true CPU parallelism across cores.