Python asyncio Tasks: Concurrent Coroutines

Coroutines in Python are lazy by nature. When you call an async function, it returns a coroutine object that does nothing until you await it. Tasks change this behavior fundamentally—they're eager...

Key Insights

  • Tasks are the primary mechanism for concurrent execution in asyncio—they wrap coroutines and schedule them on the event loop immediately, unlike bare coroutines which remain inert until awaited
  • Python 3.11’s TaskGroup provides structured concurrency with automatic cleanup and exception propagation, eliminating the fire-and-forget task problem that plagues older asyncio code
  • The choice between gather(), wait(), and as_completed() fundamentally changes your concurrency semantics—gather for all-or-nothing operations, wait for flexible completion policies, and as_completed for processing results as they arrive

Introduction to asyncio Tasks

Coroutines in Python are lazy by nature. When you call an async function, it returns a coroutine object that does nothing until you await it. Tasks change this behavior fundamentally—they’re eager wrappers around coroutines that schedule execution on the event loop immediately.

Think of a coroutine as a recipe and a Task as that recipe being actively cooked. The event loop is your kitchen, and Tasks are the dishes currently on the stove. You can check on them, cancel them, or wait for them to finish, but once created, they’re already running.

import asyncio

async def lazy_coroutine():
    print("This prints only when awaited")
    await asyncio.sleep(1)
    return "Done"

async def main():
    # Creating a coroutine - nothing happens yet
    coro = lazy_coroutine()
    print("Coroutine created, but not executed")
    
    # Wrapping in a Task - execution starts immediately
    task = asyncio.create_task(lazy_coroutine())
    print("Task created, already running in background")
    
    await asyncio.sleep(0.5)  # Task is running during this sleep
    result = await task  # Just collecting the result
    print(result)

asyncio.run(main())

The critical difference: the Task starts executing as soon as you create it, while the bare coroutine waits patiently until you await it.

Creating and Running Tasks

Python provides two primary functions for creating Tasks: asyncio.create_task() and asyncio.ensure_future(). Use create_task() for application code—it’s clearer and more direct. Reserve ensure_future() for library code where you might receive either a coroutine or an already-created Task.

import asyncio
from asyncio import Future

async def fetch_data(source: str, delay: float):
    await asyncio.sleep(delay)
    return f"Data from {source}"

async def main():
    # Preferred: create_task() for coroutines
    task1 = asyncio.create_task(fetch_data("API-1", 1.0))
    task2 = asyncio.create_task(fetch_data("API-2", 1.5))
    task3 = asyncio.create_task(fetch_data("API-3", 0.5))
    
    # All three are now running concurrently
    print("All tasks started")
    
    # Collect results as they complete
    result1 = await task1
    result2 = await task2
    result3 = await task3
    
    print(f"Results: {result1}, {result2}, {result3}")
    
    # ensure_future() works with both coroutines and futures
    task_or_future = task1  # Already a Task
    wrapped = asyncio.ensure_future(task_or_future)  # Returns the same task
    print(f"Same object: {wrapped is task1}")

asyncio.run(main())

The power of Tasks becomes apparent when you create multiple ones before awaiting any. They run truly concurrently (within the single-threaded event loop), making three 1-second operations take roughly 1.5 seconds total instead of 3.5 seconds.

Task Management and Control

Tasks have a lifecycle: pending, running, done (with result or exception), or cancelled. You can inspect and control this lifecycle programmatically.

import asyncio

async def long_operation(duration: int):
    try:
        print(f"Starting {duration}s operation")
        await asyncio.sleep(duration)
        return f"Completed {duration}s operation"
    except asyncio.CancelledError:
        print(f"Operation cancelled after some time")
        # Cleanup code here
        raise  # Re-raise to mark task as cancelled

async def main():
    task = asyncio.create_task(long_operation(10))
    
    # Check task status
    print(f"Task done: {task.done()}")
    print(f"Task cancelled: {task.cancelled()}")
    
    # Let it run briefly
    await asyncio.sleep(2)
    
    # Cancel the task
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("Caught cancellation in main")
    
    print(f"Task done: {task.done()}")
    print(f"Task cancelled: {task.cancelled()}")

# Graceful shutdown pattern with timeout
async def graceful_shutdown():
    tasks = [
        asyncio.create_task(long_operation(5)),
        asyncio.create_task(long_operation(3)),
    ]
    
    try:
        # Give tasks 4 seconds to complete
        await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=4.0
        )
    except asyncio.TimeoutError:
        print("Timeout reached, cancelling remaining tasks")
        for task in tasks:
            if not task.done():
                task.cancel()
        
        # Wait for cancellations to complete
        await asyncio.gather(*tasks, return_exceptions=True)

asyncio.run(main())

Always handle CancelledError properly in your coroutines. It’s not an error condition—it’s a control flow mechanism. Clean up resources, then re-raise it to maintain cancellation semantics.

Gathering and Waiting for Tasks

Three functions handle multiple tasks with different semantics: gather() for parallel execution with combined results, wait() for flexible completion policies, and as_completed() for processing results as they arrive.

import asyncio
import aiohttp

async def fetch_url(session, url: str):
    async with session.get(url) as response:
        return await response.text()

async def gather_example():
    """gather() waits for all tasks, returns results in order"""
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        # All results returned in order, even if completed out of order
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

async def wait_example():
    """wait() with FIRST_COMPLETED for racing tasks"""
    async def check_server(name: str, delay: float):
        await asyncio.sleep(delay)
        return f"{name} responded"
    
    tasks = {
        asyncio.create_task(check_server("Server-A", 1.5)),
        asyncio.create_task(check_server("Server-B", 1.0)),
        asyncio.create_task(check_server("Server-C", 2.0)),
    }
    
    # Wait for first completion
    done, pending = await asyncio.wait(
        tasks, 
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Get the fastest result
    fastest = done.pop()
    result = await fastest
    print(f"Fastest: {result}")
    
    # Cancel the rest
    for task in pending:
        task.cancel()
    
    await asyncio.gather(*pending, return_exceptions=True)

asyncio.run(wait_example())

Use gather() when you need all results and want them in a specific order. Use wait() when you need fine-grained control over completion policies (first completed, first exception, or all completed). Use as_completed() when you want to process results immediately as they arrive.

Task Groups (Python 3.11+)

TaskGroup solves the fire-and-forget problem that has plagued asyncio since its inception. It provides structured concurrency—all tasks created within the group are guaranteed to complete (or be cancelled) before the context manager exits.

import asyncio

async def process_item(item: int):
    await asyncio.sleep(1)
    if item == 3:
        raise ValueError(f"Item {item} failed")
    return item * 2

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            # All tasks automatically tracked
            task1 = tg.create_task(process_item(1))
            task2 = tg.create_task(process_item(2))
            task3 = tg.create_task(process_item(3))  # This will raise
            task4 = tg.create_task(process_item(4))
        
        # This line never executes due to exception
        print("All tasks completed")
    except* ValueError as eg:
        # Exception group catches all exceptions
        print(f"Caught {len(eg.exceptions)} exceptions")
        for exc in eg.exceptions:
            print(f"  - {exc}")

# Python 3.10 and earlier equivalent
async def main_legacy():
    tasks = []
    try:
        for i in range(1, 5):
            tasks.append(asyncio.create_task(process_item(i)))
        
        results = await asyncio.gather(*tasks)
    except ValueError as e:
        print(f"Error occurred: {e}")
        # Must manually cancel remaining tasks
        for task in tasks:
            if not task.done():
                task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)

asyncio.run(main())

TaskGroup automatically cancels all tasks if any task raises an exception. No leaked tasks, no forgotten cancellations. This is the future of asyncio task management.

Common Patterns and Best Practices

Background tasks require careful management to avoid them being garbage collected before completion. Always keep a reference or use TaskGroup.

import asyncio
from asyncio import Semaphore

# Background task pattern
background_tasks = set()

async def background_work(name: str):
    await asyncio.sleep(2)
    print(f"{name} completed")

async def start_background_task(name: str):
    task = asyncio.create_task(background_work(name))
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

# Rate limiting with semaphore
async def rate_limited_fetch(sem: Semaphore, url: str):
    async with sem:  # Only N tasks run concurrently
        print(f"Fetching {url}")
        await asyncio.sleep(1)  # Simulate request
        return f"Data from {url}"

async def main():
    # Limit to 3 concurrent requests
    sem = Semaphore(3)
    
    urls = [f"https://api.example.com/item/{i}" for i in range(10)]
    tasks = [rate_limited_fetch(sem, url) for url in urls]
    
    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} items")

asyncio.run(main())

The semaphore pattern is essential for rate-limiting external API calls or database connections. It ensures you never exceed your concurrency budget, preventing resource exhaustion.

Performance Considerations and Debugging

Monitoring active tasks helps identify leaks and deadlocks in production systems.

import asyncio
from datetime import datetime

async def monitor_tasks():
    """Periodic task monitoring"""
    while True:
        tasks = asyncio.all_tasks()
        print(f"\n[{datetime.now()}] Active tasks: {len(tasks)}")
        
        for task in tasks:
            if not task.done():
                # Get task name and coroutine info
                coro = task.get_coro()
                print(f"  - {task.get_name()}: {coro.__name__}")
        
        await asyncio.sleep(5)

async def potentially_stuck_task():
    """Simulate a stuck task"""
    while True:
        await asyncio.sleep(10)

async def main():
    # Start monitoring
    monitor = asyncio.create_task(monitor_tasks())
    
    # Create some tasks
    task1 = asyncio.create_task(potentially_stuck_task(), name="stuck-task")
    task2 = asyncio.create_task(asyncio.sleep(3), name="short-task")
    
    await asyncio.sleep(15)
    
    # Cleanup
    monitor.cancel()
    task1.cancel()
    await asyncio.gather(monitor, task1, task2, return_exceptions=True)

asyncio.run(main())

Name your tasks using the name parameter in create_task(). This makes debugging infinitely easier when you’re staring at a list of 50 anonymous coroutines in production.

The golden rule: every task you create must eventually be awaited, cancelled, or managed by a TaskGroup. Anything else is a potential leak waiting to cause mysterious production issues at 3 AM.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.