Skip to main content

Asyncio TaskGroups: Structured Concurrency in Python

TaskGroups (introduced in Python 3.11) enforce structured concurrency: a pattern where all child tasks must complete or be cancelled before exiting the group's scope. Unlike unstructured task creation with create_task(), TaskGroups guarantee no orphaned tasks, automatic exception propagation, and resource cleanup. This single feature has transformed async Python from error-prone to production-safe.

What is Structured Concurrency?

Structured concurrency means task lifetimes are confined to a lexical scope. When you enter a TaskGroup, you create children; when you exit, all children are awaited or cancelled—no leaks, no dangling references. It mirrors traditional structured programming (if/while/for blocks) but for concurrency.

Before TaskGroups (Python < 3.11), developers created tasks and forgot to await them, causing silent failures:

import asyncio

async def legacy_approach():
"""Unstructured: tasks can outlive their creator."""
async def fetch(url):
await asyncio.sleep(1)
return f"data from {url}"

# Create tasks but don't track them
task1 = asyncio.create_task(fetch("http://a.com"))
task2 = asyncio.create_task(fetch("http://b.com"))

# Return immediately—tasks are orphaned if function exits
# One might fail with an unhandled exception; you won't notice
return "done"

# This works but is fragile; if a task fails, the exception is lost
asyncio.run(legacy_approach())

With TaskGroups, all tasks are bound to the scope:

import asyncio

async def structured_approach():
"""Structured: tasks are guaranteed to complete or be cancelled."""
async def fetch(url):
await asyncio.sleep(1)
return f"data from {url}"

async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("http://a.com"))
task2 = tg.create_task(fetch("http://b.com"))
# When exiting this block, both tasks are awaited

# Safe: exceptions are propagated, cleanup is guaranteed
return "done"

asyncio.run(structured_approach())

Creating and Managing Tasks in a TaskGroup

A TaskGroup is created with asyncio.TaskGroup() as an async context manager. Inside, call create_task() to add work. All tasks are awaited when exiting the block.

import asyncio

async def fetch_with_delay(name, delay):
print(f"{name} starting")
await asyncio.sleep(delay)
print(f"{name} completed")
return f"result: {name}"

async def main():
print("Starting TaskGroup")

async with asyncio.TaskGroup() as tg:
# Create multiple tasks; they run concurrently
t1 = tg.create_task(fetch_with_delay("task1", 1.0))
t2 = tg.create_task(fetch_with_delay("task2", 0.5))
t3 = tg.create_task(fetch_with_delay("task3", 0.3))

# All tasks have completed; results are accessible
print(f"Task1 result: {t1.result()}")
print(f"Task2 result: {t2.result()}")
print(f"Task3 result: {t3.result()}")
print("All tasks finished")

asyncio.run(main())

Output:

Starting TaskGroup
task1 starting
task2 starting
task3 starting
task3 completed
task2 completed
task1 completed
All tasks finished
Task1 result: result: task1
Task2 result: result: task2
Task3 result: result: task3

Each task completes at its own pace. The async with block waits for all children; when exited, you access results via task.result().

Exception Handling in TaskGroups

TaskGroups collect exceptions from all child tasks and raise them together as an ExceptionGroup. This lets you handle multiple failures atomically.

import asyncio

async def risky_task(name, should_fail):
await asyncio.sleep(0.1)
if should_fail:
raise ValueError(f"{name} failed intentionally")
return f"{name} succeeded"

async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task("task1", False))
tg.create_task(risky_task("task2", True))
tg.create_task(risky_task("task3", True))
except ExceptionGroup as eg:
print(f"Caught ExceptionGroup with {len(eg.exceptions)} exceptions:")
for exc in eg.exceptions:
print(f" - {exc}")

asyncio.run(main())

Output:

Caught ExceptionGroup with 2 exceptions:
- task2 failed intentionally
- task3 failed intentionally

Use except* (Python 3.11+) to handle specific exception types:

import asyncio

async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task("a", True))
tg.create_task(risky_task("b", False))
except* ValueError as eg:
print(f"ValueError occurred: {eg}")
except* TimeoutError as eg:
print(f"Timeout occurred: {eg}")

asyncio.run(main())

The except* syntax matches all exceptions of the type in the group. This is far cleaner than manually iterating eg.exceptions.

Comparison: TaskGroups vs. gather and wait

PatternBest ForException HandlingCancellation
TaskGroupGeneral concurrency, safe task lifetimeAutomatic propagation via ExceptionGroupAll cancelled if any fails
asyncio.gather()Collecting results from known tasksReturns first exception only (unless return_exceptions=True)Manual via loop
asyncio.wait()Complex task coordination, timeoutsManual exception inspectionManual control

TaskGroups are preferred for most new code because they guarantee cleanup and propagate all exceptions by default.

import asyncio

async def example_comparison():
async def work(n):
await asyncio.sleep(n)
if n == 2:
raise RuntimeError("task 2 failed")
return f"done {n}"

# gather: only the first exception bubbles up
print("Using gather:")
try:
results = await asyncio.gather(
work(1), work(2), work(3),
return_exceptions=False # Stop on first exception
)
except RuntimeError as e:
print(f" Caught: {e}")

# TaskGroup: all exceptions collected
print("\nUsing TaskGroup:")
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(work(1))
tg.create_task(work(2))
tg.create_task(work(3))
except ExceptionGroup as eg:
print(f" Caught ExceptionGroup: {eg}")

asyncio.run(example_comparison())

Nested TaskGroups for Complex Workflows

For hierarchical concurrency, nest TaskGroups to organize work at different levels:

import asyncio

async def fetch_user_data(user_id):
"""Simulate fetching user and associated data."""
await asyncio.sleep(0.2)
return f"user {user_id}"

async def fetch_user_posts(user_id):
await asyncio.sleep(0.3)
return f"posts for user {user_id}"

async def process_user(user_id):
"""Fetch user data and posts concurrently."""
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user_data(user_id))
posts_task = tg.create_task(fetch_user_posts(user_id))
return {"user": user_task.result(), "posts": posts_task.result()}

async def main():
"""Process multiple users with nested TaskGroups."""
async with asyncio.TaskGroup() as tg:
results = [
tg.create_task(process_user(i))
for i in range(1, 4)
]

for i, task in enumerate(results):
print(f"User {i+1}: {task.result()}")

asyncio.run(main())

Nesting allows you to express complex parallelism naturally: outer group for user-level work, inner groups for per-user concurrency.

Key Takeaways

  • TaskGroups (Python 3.11+) enforce structured concurrency: tasks are guaranteed to complete or be cancelled when exiting the scope.
  • Create tasks with async with asyncio.TaskGroup() as tg: tg.create_task(coro), ensuring no orphaned tasks and automatic exception propagation.
  • All exceptions from child tasks are collected in an ExceptionGroup and raised as a group; use except* to handle specific types.
  • TaskGroups are safer and cleaner than asyncio.gather() or create_task() for most use cases; prefer them for new code.
  • Nest TaskGroups to express hierarchical concurrency, organizing work at multiple levels cleanly.

Frequently Asked Questions

What if I need to cancel all tasks in a TaskGroup?

Raise a CancelledError or exception within the TaskGroup block, or use asyncio.CancelledError() explicitly. All remaining tasks are cancelled automatically. You can also catch asyncio.CancelledError to clean up resources.

How do I access task results before all tasks finish?

TaskGroups wait for all children before exiting, so you can't get partial results mid-execution. If you need early results, use asyncio.wait() with return_when=asyncio.FIRST_COMPLETED instead.

Can I add tasks to a TaskGroup dynamically during execution?

Yes. Call tg.create_task() anytime within the async block. The group continues until you exit the block, at which point all tasks (new and old) are awaited.

Is TaskGroup available in Python 3.10 or earlier?

No. TaskGroups were added in Python 3.11. For earlier versions, use asyncio.gather() or asyncio.wait() with careful exception handling. Consider upgrading to 3.11+ for production code.

How do TaskGroups compare to asyncio.gather()?

TaskGroups are stricter and safer: they automatically propagate all exceptions and guarantee cleanup. gather() is lower-level and more flexible but requires manual exception handling and task tracking.

Further Reading