Skip to main content

Advanced Asyncio Python: Timeouts and Cancellation

Timeouts and cancellation are critical for resilient async systems. A timeout prevents tasks from hanging indefinitely; cancellation allows explicit termination. Together, they enable graceful degradation, resource cleanup, and predictable performance. Python 3.11+ introduced asyncio.timeout(), a cleaner API than the older wait_for(), though both remain useful.

Using asyncio.timeout for Time Limits

asyncio.timeout(delay) is a context manager (Python 3.11+) that raises TimeoutError if the block doesn't complete within delay seconds. It's the recommended approach for timeouts in modern Python:

import asyncio

async def slow_operation(name, seconds):
"""Simulate a slow operation."""
print(f"{name} starting, will take {seconds}s")
await asyncio.sleep(seconds)
print(f"{name} complete")
return f"{name} result"

async def demo_timeout():
try:
# Timeout after 2 seconds
async with asyncio.timeout(2):
result = await slow_operation("task", 5)
print(f"Result: {result}")
except TimeoutError:
print("Operation timed out!")

asyncio.run(demo_timeout())

Output:

task starting, will take 5s
Operation timed out!

The task is cancelled automatically when the timeout expires. This is much cleaner than manually tracking deadlines.

Nesting Timeouts and Relative Delays

You can nest asyncio.timeout() blocks and use .reschedule() to adjust deadlines dynamically:

import asyncio

async def multi_stage_operation():
"""Multiple stages with individual timeouts."""
try:
async with asyncio.timeout(10): # Overall timeout: 10s
print("Stage 1: fetching")
async with asyncio.timeout(3): # Stage 1 timeout: 3s
await asyncio.sleep(2)
print("Stage 1 done")

print("Stage 2: processing")
async with asyncio.timeout(2): # Stage 2 timeout: 2s
await asyncio.sleep(1)
print("Stage 2 done")

print("Stage 3: saving")
async with asyncio.timeout(5): # Stage 3 timeout: 5s
await asyncio.sleep(3)
print("Stage 3 done")
except TimeoutError as e:
print(f"Multi-stage operation timed out: {e}")

asyncio.run(multi_stage_operation())

Nesting enables fine-grained control: each stage has its own deadline, and an overall envelope prevents runaway time.

Cancelling Tasks Explicitly

Use task.cancel() to request task cancellation. The task receives asyncio.CancelledError at the next await point:

import asyncio

async def cancellable_task(name):
"""A task that handles cancellation gracefully."""
try:
for i in range(10):
print(f"{name}: step {i}")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"{name}: cancellation received, cleaning up")
# Clean up resources here
raise # Re-raise to propagate cancellation

async def main():
task = asyncio.create_task(cancellable_task("worker"))

await asyncio.sleep(2.5) # Let it run for 2.5 seconds
print("Cancelling task...")
task.cancel()

try:
await task # Wait for task to finish (it will raise CancelledError)
except asyncio.CancelledError:
print("Task was cancelled")

asyncio.run(main())

Output:

worker: step 0
worker: step 1
worker: step 2
Cancelling task...
worker: cancellation received, cleaning up
Task was cancelled

Always catch CancelledError to clean up resources (close files, release locks), then re-raise it so the cancellation propagates up.

Comparison: timeout vs. wait_for

asyncio.timeout() (Python 3.11+) and wait_for() (all versions) both enforce time limits, but differ subtly:

Featureasyncio.timeout()asyncio.wait_for()
Python version3.11+All versions
APIContext managerFunction call
NestingSupports nested deadlinesLess elegant nesting
Deadline precisionRelative to entryRelative to call
CleanupClear exception handlingSame cleanup patterns

For new code targeting Python 3.11+, prefer asyncio.timeout(). For compatibility with older versions, use wait_for():

import asyncio

async def demo_wait_for():
try:
# wait_for raises TimeoutError after 2 seconds
result = await asyncio.wait_for(
slow_operation("task", 5),
timeout=2
)
except asyncio.TimeoutError:
print("wait_for timed out")

async def slow_operation(name, seconds):
await asyncio.sleep(seconds)
return f"{name} result"

asyncio.run(demo_wait_for())

Deadline Tracking with asyncio.current_task

For complex operations that need to know time remaining, use asyncio.current_task() to access the current task and inspect its cancellation:

import asyncio

async def adaptive_operation():
"""Adjust behavior based on time pressure."""
task = asyncio.current_task()

try:
async with asyncio.timeout(10):
for i in range(5):
remaining = task._must_cancel # Internal; use with caution
print(f"Step {i}: doing work")
await asyncio.sleep(2)
except TimeoutError:
print("Out of time; aborting remaining steps")

asyncio.run(adaptive_operation())

Note: task._must_cancel is internal API (underscore prefix). For production code, track your own deadline and check against asyncio.get_event_loop().time() instead.

Safe Timeout Patterns for Servers

In a server, timeouts prevent resource exhaustion. Here's a pattern for handling client requests with per-request and overall timeouts:

import asyncio

async def handle_request(request_id, processing_time):
"""Simulate request processing."""
print(f"Request {request_id} starting")
await asyncio.sleep(processing_time)
print(f"Request {request_id} done")
return f"response for {request_id}"

async def request_handler(request_id, processing_time):
"""Handle a request with timeout."""
try:
# Individual request timeout: 5 seconds
async with asyncio.timeout(5):
result = await handle_request(request_id, processing_time)
return result
except asyncio.TimeoutError:
print(f"Request {request_id} timed out")
return None

async def server_main():
"""Simulate a server accepting requests."""
async with asyncio.TaskGroup() as tg:
# Requests with varying processing times
tg.create_task(request_handler("req1", 2))
tg.create_task(request_handler("req2", 6)) # Will timeout
tg.create_task(request_handler("req3", 3))

asyncio.run(server_main())

Output:

Request req1 starting
Request req2 starting
Request req3 starting
Request req1 done
Request req3 done
Request req2 timed out

Each request has its own timeout, preventing one slow request from blocking others.

Key Takeaways

  • Use asyncio.timeout(delay) (Python 3.11+) to enforce time limits cleanly; raises TimeoutError if the block exceeds the deadline.
  • Nest timeouts for multi-stage operations; each stage can have its own deadline, with an overall envelope.
  • Cancel tasks with task.cancel(), which raises CancelledError at the next await point; always catch and re-raise to ensure cleanup.
  • asyncio.wait_for() is the pre-3.11 equivalent; prefer it for older Python versions.
  • Implement per-request timeouts in servers to prevent resource exhaustion and ensure responsiveness.

Frequently Asked Questions

What happens if I don't handle CancelledError?

If unhandled, it propagates up, potentially leaving resources (files, locks) unclosed. Always catch it, clean up, and re-raise: except asyncio.CancelledError: ... raise.

Can I extend a timeout deadline dynamically?

Yes. If you have access to the timeout context manager, use tg.reschedule(deadline) to adjust. For lower-level control, catch TimeoutError and retry within a new timeout block.

How does timeout interact with TaskGroups?

If a task times out and is cancelled, the ExceptionGroup raised by the TaskGroup includes CancelledError. Handle except* to catch timeouts specifically.

Is timeout resolution guaranteed to be exact?

No. asyncio.timeout() relies on the event loop's timing, which has millisecond-level granularity. For timing-critical code, don't rely on precision better than 10-100ms.

Can I timeout multiple operations together?

Use asyncio.wait_for(asyncio.gather(...), timeout=N) to apply a timeout to multiple tasks: if any exceed the deadline, all are cancelled.

Further Reading