Async context managers async with explained
Async context managers use async with and implement the async context manager protocol: __aenter__ and __aexit__ methods that are awaited. This pattern ensures safe cleanup of async resources — database connections, network sockets, locks, and async iterators — even when async exceptions or cancellations occur. Understanding async context managers is essential for writing robust async code in Python 3.5+.
The async with statement is to async code what with is to synchronous code: it provides deterministic cleanup and exception safety for resources that may have async acquisition or cleanup. The protocol is identical to the synchronous version, but the methods are coroutines (async def) that must be awaited.
Async Context Manager Protocol: aenter and aexit
An async context manager implements two coroutine methods: __aenter__ and __aexit__. When you enter an async with block, Python awaits __aenter__; on exit, Python awaits __aexit__. The return value of __aenter__ is bound to the variable after as, and the signature of __aexit__ is identical to the synchronous version.
import asyncio
# Example 1: Basic async context manager
class AsyncResource:
def __init__(self, name):
self.name = name
async def __aenter__(self):
print(f"{self.name}: acquiring (async)")
await asyncio.sleep(0.1) # Simulate async acquisition
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"{self.name}: releasing (async)")
await asyncio.sleep(0.1) # Simulate async cleanup
return False
async def main():
# Using async with
with AsyncResource("Resource1"):
pass
# This will fail: AsyncResource requires 'async with', not 'with'
# To use properly:
async def main_correct():
async with AsyncResource("Resource1") as res:
print(f"Using: {res.name}")
asyncio.run(main_correct())
# Output:
# Resource1: acquiring (async)
# Using: Resource1
# Resource1: releasing (async)
Example 1: Async Database Connection
A realistic example: acquiring an async database connection, executing queries, and releasing it.
import asyncio
class AsyncDatabaseConnection:
def __init__(self, db_url):
self.db_url = db_url
self.connection = None
self.is_connected = False
async def __aenter__(self):
print(f"Connecting to {self.db_url}")
# Simulate async connection
await asyncio.sleep(0.2)
self.connection = f"Connected to {self.db_url}"
self.is_connected = True
print(f"Connected: {self.connection}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
print(f"Error occurred: {exc_type.__name__}")
print(f"Rollback in progress...")
await asyncio.sleep(0.1)
else:
print(f"Commit in progress...")
await asyncio.sleep(0.1)
self.is_connected = False
print(f"Disconnected")
return False
async def execute(self, query):
if not self.is_connected:
raise RuntimeError("Not connected")
print(f"Executing: {query}")
await asyncio.sleep(0.1)
return f"Result of {query}"
async def example_success():
print("--- Successful transaction ---")
async with AsyncDatabaseConnection("postgres://localhost/mydb") as db:
result = await db.execute("SELECT * FROM users")
print(f"Result: {result}")
async def example_error():
print("\n--- Transaction with error ---")
try:
async with AsyncDatabaseConnection("postgres://localhost/mydb") as db:
await db.execute("SELECT * FROM users")
raise ValueError("Query failed")
except ValueError as e:
print(f"Error handled: {e}")
async def main():
await example_success()
await example_error()
asyncio.run(main())
# Output:
# --- Successful transaction ---
# Connecting to postgres://localhost/mydb
# Connected: Connected to postgres://localhost/mydb
# Executing: SELECT * FROM users
# Commit in progress...
# Disconnected
#
# --- Transaction with error ---
# Connecting to postgres://localhost/mydb
# Connected: Connected to postgres://localhost/mydb
# Executing: SELECT * FROM users
# Error occurred: ValueError
# Rollback in progress...
# Disconnected
# Error handled: Query failed
Example 2: Async Lock Context Manager
Locks are perfect async context managers: asyncio.Lock has __aenter__ and __aexit__ built-in.
import asyncio
async def shared_resource_example():
lock = asyncio.Lock()
shared_counter = 0
async def worker(worker_id):
nonlocal shared_counter
for _ in range(3):
async with lock: # Acquire the lock
print(f"Worker {worker_id}: counter = {shared_counter}")
await asyncio.sleep(0.1)
shared_counter += 1
print(f"Worker {worker_id}: incremented to {shared_counter}")
# Lock is released here
print("--- Concurrent workers with lock ---")
tasks = [worker(i) for i in range(2)]
await asyncio.gather(*tasks)
print(f"Final counter: {shared_counter}")
asyncio.run(shared_resource_example())
# Output:
# --- Concurrent workers with lock ---
# Worker 0: counter = 0
# Worker 0: incremented to 1
# Worker 1: counter = 1
# Worker 1: incremented to 2
# Worker 0: counter = 2
# Worker 0: incremented to 3
# Worker 1: counter = 3
# Worker 1: incremented to 4
# Final counter: 4
Creating Async Context Managers with asynccontextmanager
The contextlib.asynccontextmanager decorator converts an async generator into an async context manager, just like @contextmanager for synchronous code.
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def async_timer(operation):
"""Async context manager that measures execution time."""
print(f"Starting {operation}")
start = asyncio.get_event_loop().time()
try:
yield
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"Finished {operation} in {elapsed:.2f}s")
async def main():
print("--- Async timer ---")
async with async_timer("database query"):
await asyncio.sleep(0.5)
async with async_timer("file I/O"):
await asyncio.sleep(0.3)
asyncio.run(main())
# Output:
# --- Async timer ---
# Starting database query
# Finished database query in 0.50s
# Starting file I/O
# Finished file I/O in 0.30s
Nested Async Context Managers
Async context managers nest just like synchronous ones: left-to-right entry, right-to-left exit.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_resource(name):
print(f"{name}: acquiring")
await asyncio.sleep(0.1)
try:
yield name
finally:
print(f"{name}: releasing")
await asyncio.sleep(0.1)
async def main():
print("--- Nested async context managers ---")
async with async_resource("A") as a, \
async_resource("B") as b, \
async_resource("C") as c:
print(f"Using {a}, {b}, {c}")
asyncio.run(main())
# Output:
# --- Nested async context managers ---
# A: acquiring
# B: acquiring
# C: acquiring
# Using A, B, C
# C: releasing
# B: releasing
# A: releasing
Exception Handling in Async Context Managers
Exceptions in the async with block are passed to __aexit__ just like in synchronous context managers. You can inspect and suppress them.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def error_handler(name):
try:
yield
except asyncio.CancelledError:
print(f"{name}: task was cancelled")
raise
except Exception as e:
print(f"{name}: caught {type(e).__name__}: {e}")
raise
async def main():
print("--- Exception in async context ---")
try:
async with error_handler("operation"):
raise ValueError("Something went wrong")
except ValueError:
print("Exception propagated to caller")
asyncio.run(main())
# Output:
# --- Exception in async context ---
# operation: caught ValueError: Something went wrong
# Exception propagated to caller
Task Cancellation and Async Context Managers
A critical consideration in async code: asyncio.CancelledError is raised when a task is cancelled. Async context managers must clean up even when cancellation occurs.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def cancellation_safe():
print("Acquiring resource")
try:
yield
except asyncio.CancelledError:
print("Task was cancelled, cleaning up...")
await asyncio.sleep(0.1) # Cleanup work
raise
async def work():
async with cancellation_safe():
await asyncio.sleep(10) # Long operation
async def main():
print("--- Task cancellation ---")
task = asyncio.create_task(work())
await asyncio.sleep(0.2) # Let task start
task.cancel() # Cancel the task
try:
await task
except asyncio.CancelledError:
print("Task cancelled")
asyncio.run(main())
# Output:
# --- Task cancellation ---
# Acquiring resource
# Task was cancelled, cleaning up...
# Task cancelled
Async Context Manager Patterns: Multi-Step Cleanup
For complex async resources, you may need multi-step cleanup with different error handling for each step.
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def complex_async_resource(name):
# Step 1: Connect
connection = None
try:
print(f"{name}: connecting")
await asyncio.sleep(0.1)
connection = f"connection_{name}"
# Step 2: Initialize
print(f"{name}: initializing")
await asyncio.sleep(0.1)
yield connection
finally:
# Step 3: Cleanup (always runs)
if connection:
try:
print(f"{name}: disconnecting")
await asyncio.sleep(0.1)
except Exception as e:
print(f"{name}: disconnect failed: {e}")
async def main():
async with complex_async_resource("DB") as conn:
print(f"Using {conn}")
asyncio.run(main())
# Output:
# DB: connecting
# DB: initializing
# Using connection_DB
# DB: disconnecting
Async Context Managers vs. Synchronous
| Feature | Synchronous | Async |
|---|---|---|
| Entry | __enter__ | async def __aenter__ (awaited) |
| Exit | __exit__ | async def __aexit__ (awaited) |
| Syntax | with ... as | async with ... as |
| Decorator | @contextmanager | @asynccontextmanager |
| Usage | Regular functions | Async functions only |
| Cancellation | N/A | Must handle asyncio.CancelledError |
Key Takeaways
- Async context managers use
async withand implement awaitable__aenter__and__aexit__methods. - The protocol is identical to synchronous context managers, but all methods are coroutines.
- Use
@asynccontextmanagerdecorator to convert async generators into context managers. - Always handle
asyncio.CancelledErrorin async context managers to ensure cleanup on task cancellation. - Nested async context managers follow the same LIFO discipline as synchronous ones.
Frequently Asked Questions
Can I mix async with and regular with?
Yes. You can use both in the same async function. Use async with for async resources and with for synchronous resources. They can be nested: async with ... : with ... :.
What if aenter is cancelled?
If __aenter__ is cancelled, __aexit__ is never called (just like __enter__ exceptions). You must handle cleanup for partial acquisition inside __aenter__ itself.
Can I await inside aexit?
Yes. __aexit__ is a coroutine, so you can await anything. This is essential for async cleanup like flushing buffers or disconnecting from services.
Does asyncio.Lock already implement async context manager?
Yes. asyncio.Lock has __aenter__ and __aexit__, so async with lock: works immediately. The same is true for asyncio.Semaphore, asyncio.Event, and other synchronization primitives.
How do I share state across multiple async context managers?
Store state in enclosing scope variables (using nonlocal if in a nested function), or pass objects that are mutated across context managers. Be careful with thread-safety if state is accessed from multiple tasks.