Advanced Asyncio Python: Timeouts and Cancellation
Timeouts and cancellation are critical for resilient async systems. A timeout prevents tasks from hanging indefinitely; cancellation allows explicit termination. Together, they enable graceful degradation, resource cleanup, and predictable performance. Python 3.11+ introduced asyncio.timeout(), a cleaner API than the older wait_for(), though both remain useful.
Using asyncio.timeout for Time Limits
asyncio.timeout(delay) is a context manager (Python 3.11+) that raises TimeoutError if the block doesn't complete within delay seconds. It's the recommended approach for timeouts in modern Python:
import asyncio
async def slow_operation(name, seconds):
"""Simulate a slow operation."""
print(f"{name} starting, will take {seconds}s")
await asyncio.sleep(seconds)
print(f"{name} complete")
return f"{name} result"
async def demo_timeout():
try:
# Timeout after 2 seconds
async with asyncio.timeout(2):
result = await slow_operation("task", 5)
print(f"Result: {result}")
except TimeoutError:
print("Operation timed out!")
asyncio.run(demo_timeout())
Output:
task starting, will take 5s
Operation timed out!
The task is cancelled automatically when the timeout expires. This is much cleaner than manually tracking deadlines.
Nesting Timeouts and Relative Delays
You can nest asyncio.timeout() blocks and use .reschedule() to adjust deadlines dynamically:
import asyncio
async def multi_stage_operation():
"""Multiple stages with individual timeouts."""
try:
async with asyncio.timeout(10): # Overall timeout: 10s
print("Stage 1: fetching")
async with asyncio.timeout(3): # Stage 1 timeout: 3s
await asyncio.sleep(2)
print("Stage 1 done")
print("Stage 2: processing")
async with asyncio.timeout(2): # Stage 2 timeout: 2s
await asyncio.sleep(1)
print("Stage 2 done")
print("Stage 3: saving")
async with asyncio.timeout(5): # Stage 3 timeout: 5s
await asyncio.sleep(3)
print("Stage 3 done")
except TimeoutError as e:
print(f"Multi-stage operation timed out: {e}")
asyncio.run(multi_stage_operation())
Nesting enables fine-grained control: each stage has its own deadline, and an overall envelope prevents runaway time.
Cancelling Tasks Explicitly
Use task.cancel() to request task cancellation. The task receives asyncio.CancelledError at the next await point:
import asyncio
async def cancellable_task(name):
"""A task that handles cancellation gracefully."""
try:
for i in range(10):
print(f"{name}: step {i}")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"{name}: cancellation received, cleaning up")
# Clean up resources here
raise # Re-raise to propagate cancellation
async def main():
task = asyncio.create_task(cancellable_task("worker"))
await asyncio.sleep(2.5) # Let it run for 2.5 seconds
print("Cancelling task...")
task.cancel()
try:
await task # Wait for task to finish (it will raise CancelledError)
except asyncio.CancelledError:
print("Task was cancelled")
asyncio.run(main())
Output:
worker: step 0
worker: step 1
worker: step 2
Cancelling task...
worker: cancellation received, cleaning up
Task was cancelled
Always catch CancelledError to clean up resources (close files, release locks), then re-raise it so the cancellation propagates up.
Comparison: timeout vs. wait_for
asyncio.timeout() (Python 3.11+) and wait_for() (all versions) both enforce time limits, but differ subtly:
| Feature | asyncio.timeout() | asyncio.wait_for() |
|---|---|---|
| Python version | 3.11+ | All versions |
| API | Context manager | Function call |
| Nesting | Supports nested deadlines | Less elegant nesting |
| Deadline precision | Relative to entry | Relative to call |
| Cleanup | Clear exception handling | Same cleanup patterns |
For new code targeting Python 3.11+, prefer asyncio.timeout(). For compatibility with older versions, use wait_for():
import asyncio
async def demo_wait_for():
try:
# wait_for raises TimeoutError after 2 seconds
result = await asyncio.wait_for(
slow_operation("task", 5),
timeout=2
)
except asyncio.TimeoutError:
print("wait_for timed out")
async def slow_operation(name, seconds):
await asyncio.sleep(seconds)
return f"{name} result"
asyncio.run(demo_wait_for())
Deadline Tracking with asyncio.current_task
For complex operations that need to know time remaining, use asyncio.current_task() to access the current task and inspect its cancellation:
import asyncio
async def adaptive_operation():
"""Adjust behavior based on time pressure."""
task = asyncio.current_task()
try:
async with asyncio.timeout(10):
for i in range(5):
remaining = task._must_cancel # Internal; use with caution
print(f"Step {i}: doing work")
await asyncio.sleep(2)
except TimeoutError:
print("Out of time; aborting remaining steps")
asyncio.run(adaptive_operation())
Note: task._must_cancel is internal API (underscore prefix). For production code, track your own deadline and check against asyncio.get_event_loop().time() instead.
Safe Timeout Patterns for Servers
In a server, timeouts prevent resource exhaustion. Here's a pattern for handling client requests with per-request and overall timeouts:
import asyncio
async def handle_request(request_id, processing_time):
"""Simulate request processing."""
print(f"Request {request_id} starting")
await asyncio.sleep(processing_time)
print(f"Request {request_id} done")
return f"response for {request_id}"
async def request_handler(request_id, processing_time):
"""Handle a request with timeout."""
try:
# Individual request timeout: 5 seconds
async with asyncio.timeout(5):
result = await handle_request(request_id, processing_time)
return result
except asyncio.TimeoutError:
print(f"Request {request_id} timed out")
return None
async def server_main():
"""Simulate a server accepting requests."""
async with asyncio.TaskGroup() as tg:
# Requests with varying processing times
tg.create_task(request_handler("req1", 2))
tg.create_task(request_handler("req2", 6)) # Will timeout
tg.create_task(request_handler("req3", 3))
asyncio.run(server_main())
Output:
Request req1 starting
Request req2 starting
Request req3 starting
Request req1 done
Request req3 done
Request req2 timed out
Each request has its own timeout, preventing one slow request from blocking others.
Key Takeaways
- Use
asyncio.timeout(delay)(Python 3.11+) to enforce time limits cleanly; raisesTimeoutErrorif the block exceeds the deadline. - Nest timeouts for multi-stage operations; each stage can have its own deadline, with an overall envelope.
- Cancel tasks with
task.cancel(), which raisesCancelledErrorat the nextawaitpoint; always catch and re-raise to ensure cleanup. asyncio.wait_for()is the pre-3.11 equivalent; prefer it for older Python versions.- Implement per-request timeouts in servers to prevent resource exhaustion and ensure responsiveness.
Frequently Asked Questions
What happens if I don't handle CancelledError?
If unhandled, it propagates up, potentially leaving resources (files, locks) unclosed. Always catch it, clean up, and re-raise: except asyncio.CancelledError: ... raise.
Can I extend a timeout deadline dynamically?
Yes. If you have access to the timeout context manager, use tg.reschedule(deadline) to adjust. For lower-level control, catch TimeoutError and retry within a new timeout block.
How does timeout interact with TaskGroups?
If a task times out and is cancelled, the ExceptionGroup raised by the TaskGroup includes CancelledError. Handle except* to catch timeouts specifically.
Is timeout resolution guaranteed to be exact?
No. asyncio.timeout() relies on the event loop's timing, which has millisecond-level granularity. For timing-critical code, don't rely on precision better than 10-100ms.
Can I timeout multiple operations together?
Use asyncio.wait_for(asyncio.gather(...), timeout=N) to apply a timeout to multiple tasks: if any exceed the deadline, all are cancelled.