Python asyncio Tasks: Concurrent Coroutines
Coroutines in Python are lazy by nature. When you call an async function, it returns a coroutine object that does nothing until you await it. Tasks change this behavior fundamentally—they're eager...
Key Insights
- Tasks are the primary mechanism for concurrent execution in asyncio—they wrap coroutines and schedule them on the event loop immediately, unlike bare coroutines which remain inert until awaited
- Python 3.11’s TaskGroup provides structured concurrency with automatic cleanup and exception propagation, eliminating the fire-and-forget task problem that plagues older asyncio code
- The choice between
gather(),wait(), andas_completed()fundamentally changes your concurrency semantics—gather for all-or-nothing operations, wait for flexible completion policies, and as_completed for processing results as they arrive
Introduction to asyncio Tasks
Coroutines in Python are lazy by nature. When you call an async function, it returns a coroutine object that does nothing until you await it. Tasks change this behavior fundamentally—they’re eager wrappers around coroutines that schedule execution on the event loop immediately.
Think of a coroutine as a recipe and a Task as that recipe being actively cooked. The event loop is your kitchen, and Tasks are the dishes currently on the stove. You can check on them, cancel them, or wait for them to finish, but once created, they’re already running.
import asyncio
async def lazy_coroutine():
print("This prints only when awaited")
await asyncio.sleep(1)
return "Done"
async def main():
# Creating a coroutine - nothing happens yet
coro = lazy_coroutine()
print("Coroutine created, but not executed")
# Wrapping in a Task - execution starts immediately
task = asyncio.create_task(lazy_coroutine())
print("Task created, already running in background")
await asyncio.sleep(0.5) # Task is running during this sleep
result = await task # Just collecting the result
print(result)
asyncio.run(main())
The critical difference: the Task starts executing as soon as you create it, while the bare coroutine waits patiently until you await it.
Creating and Running Tasks
Python provides two primary functions for creating Tasks: asyncio.create_task() and asyncio.ensure_future(). Use create_task() for application code—it’s clearer and more direct. Reserve ensure_future() for library code where you might receive either a coroutine or an already-created Task.
import asyncio
from asyncio import Future
async def fetch_data(source: str, delay: float):
await asyncio.sleep(delay)
return f"Data from {source}"
async def main():
# Preferred: create_task() for coroutines
task1 = asyncio.create_task(fetch_data("API-1", 1.0))
task2 = asyncio.create_task(fetch_data("API-2", 1.5))
task3 = asyncio.create_task(fetch_data("API-3", 0.5))
# All three are now running concurrently
print("All tasks started")
# Collect results as they complete
result1 = await task1
result2 = await task2
result3 = await task3
print(f"Results: {result1}, {result2}, {result3}")
# ensure_future() works with both coroutines and futures
task_or_future = task1 # Already a Task
wrapped = asyncio.ensure_future(task_or_future) # Returns the same task
print(f"Same object: {wrapped is task1}")
asyncio.run(main())
The power of Tasks becomes apparent when you create multiple ones before awaiting any. They run truly concurrently (within the single-threaded event loop), making three 1-second operations take roughly 1.5 seconds total instead of 3.5 seconds.
Task Management and Control
Tasks have a lifecycle: pending, running, done (with result or exception), or cancelled. You can inspect and control this lifecycle programmatically.
import asyncio
async def long_operation(duration: int):
try:
print(f"Starting {duration}s operation")
await asyncio.sleep(duration)
return f"Completed {duration}s operation"
except asyncio.CancelledError:
print(f"Operation cancelled after some time")
# Cleanup code here
raise # Re-raise to mark task as cancelled
async def main():
task = asyncio.create_task(long_operation(10))
# Check task status
print(f"Task done: {task.done()}")
print(f"Task cancelled: {task.cancelled()}")
# Let it run briefly
await asyncio.sleep(2)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Caught cancellation in main")
print(f"Task done: {task.done()}")
print(f"Task cancelled: {task.cancelled()}")
# Graceful shutdown pattern with timeout
async def graceful_shutdown():
tasks = [
asyncio.create_task(long_operation(5)),
asyncio.create_task(long_operation(3)),
]
try:
# Give tasks 4 seconds to complete
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=4.0
)
except asyncio.TimeoutError:
print("Timeout reached, cancelling remaining tasks")
for task in tasks:
if not task.done():
task.cancel()
# Wait for cancellations to complete
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(main())
Always handle CancelledError properly in your coroutines. It’s not an error condition—it’s a control flow mechanism. Clean up resources, then re-raise it to maintain cancellation semantics.
Gathering and Waiting for Tasks
Three functions handle multiple tasks with different semantics: gather() for parallel execution with combined results, wait() for flexible completion policies, and as_completed() for processing results as they arrive.
import asyncio
import aiohttp
async def fetch_url(session, url: str):
async with session.get(url) as response:
return await response.text()
async def gather_example():
"""gather() waits for all tasks, returns results in order"""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
# All results returned in order, even if completed out of order
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def wait_example():
"""wait() with FIRST_COMPLETED for racing tasks"""
async def check_server(name: str, delay: float):
await asyncio.sleep(delay)
return f"{name} responded"
tasks = {
asyncio.create_task(check_server("Server-A", 1.5)),
asyncio.create_task(check_server("Server-B", 1.0)),
asyncio.create_task(check_server("Server-C", 2.0)),
}
# Wait for first completion
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Get the fastest result
fastest = done.pop()
result = await fastest
print(f"Fastest: {result}")
# Cancel the rest
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(wait_example())
Use gather() when you need all results and want them in a specific order. Use wait() when you need fine-grained control over completion policies (first completed, first exception, or all completed). Use as_completed() when you want to process results immediately as they arrive.
Task Groups (Python 3.11+)
TaskGroup solves the fire-and-forget problem that has plagued asyncio since its inception. It provides structured concurrency—all tasks created within the group are guaranteed to complete (or be cancelled) before the context manager exits.
import asyncio
async def process_item(item: int):
await asyncio.sleep(1)
if item == 3:
raise ValueError(f"Item {item} failed")
return item * 2
async def main():
try:
async with asyncio.TaskGroup() as tg:
# All tasks automatically tracked
task1 = tg.create_task(process_item(1))
task2 = tg.create_task(process_item(2))
task3 = tg.create_task(process_item(3)) # This will raise
task4 = tg.create_task(process_item(4))
# This line never executes due to exception
print("All tasks completed")
except* ValueError as eg:
# Exception group catches all exceptions
print(f"Caught {len(eg.exceptions)} exceptions")
for exc in eg.exceptions:
print(f" - {exc}")
# Python 3.10 and earlier equivalent
async def main_legacy():
tasks = []
try:
for i in range(1, 5):
tasks.append(asyncio.create_task(process_item(i)))
results = await asyncio.gather(*tasks)
except ValueError as e:
print(f"Error occurred: {e}")
# Must manually cancel remaining tasks
for task in tasks:
if not task.done():
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(main())
TaskGroup automatically cancels all tasks if any task raises an exception. No leaked tasks, no forgotten cancellations. This is the future of asyncio task management.
Common Patterns and Best Practices
Background tasks require careful management to avoid them being garbage collected before completion. Always keep a reference or use TaskGroup.
import asyncio
from asyncio import Semaphore
# Background task pattern
background_tasks = set()
async def background_work(name: str):
await asyncio.sleep(2)
print(f"{name} completed")
async def start_background_task(name: str):
task = asyncio.create_task(background_work(name))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# Rate limiting with semaphore
async def rate_limited_fetch(sem: Semaphore, url: str):
async with sem: # Only N tasks run concurrently
print(f"Fetching {url}")
await asyncio.sleep(1) # Simulate request
return f"Data from {url}"
async def main():
# Limit to 3 concurrent requests
sem = Semaphore(3)
urls = [f"https://api.example.com/item/{i}" for i in range(10)]
tasks = [rate_limited_fetch(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} items")
asyncio.run(main())
The semaphore pattern is essential for rate-limiting external API calls or database connections. It ensures you never exceed your concurrency budget, preventing resource exhaustion.
Performance Considerations and Debugging
Monitoring active tasks helps identify leaks and deadlocks in production systems.
import asyncio
from datetime import datetime
async def monitor_tasks():
"""Periodic task monitoring"""
while True:
tasks = asyncio.all_tasks()
print(f"\n[{datetime.now()}] Active tasks: {len(tasks)}")
for task in tasks:
if not task.done():
# Get task name and coroutine info
coro = task.get_coro()
print(f" - {task.get_name()}: {coro.__name__}")
await asyncio.sleep(5)
async def potentially_stuck_task():
"""Simulate a stuck task"""
while True:
await asyncio.sleep(10)
async def main():
# Start monitoring
monitor = asyncio.create_task(monitor_tasks())
# Create some tasks
task1 = asyncio.create_task(potentially_stuck_task(), name="stuck-task")
task2 = asyncio.create_task(asyncio.sleep(3), name="short-task")
await asyncio.sleep(15)
# Cleanup
monitor.cancel()
task1.cancel()
await asyncio.gather(monitor, task1, task2, return_exceptions=True)
asyncio.run(main())
Name your tasks using the name parameter in create_task(). This makes debugging infinitely easier when you’re staring at a list of 50 anonymous coroutines in production.
The golden rule: every task you create must eventually be awaited, cancelled, or managed by a TaskGroup. Anything else is a potential leak waiting to cause mysterious production issues at 3 AM.