Advanced Asyncio Python: Real-World Patterns and Anti-Patterns
Production asyncio code requires patterns for resilience: retries for transient failures, circuit breakers for cascading failures, and fanout-fanin for parallel work. Equally important is knowing anti-patterns: common mistakes that cause subtle bugs, resource leaks, and poor performance. This article codifies lessons from real-world deployments.
Pattern 1: Retry Logic with Exponential Backoff
Transient failures (network hiccups, temporary service unavailability) are common. Retries with exponential backoff recover without user intervention.
import asyncio
import random
async def unreliable_operation(attempt):
"""Simulates an operation that fails occasionally."""
if attempt < 3 and random.random() < 0.7: # 70% failure on attempts 1-2
raise ConnectionError(f"Connection failed on attempt {attempt}")
return f"Success on attempt {attempt}"
async def retry_with_backoff(coro_func, max_retries=5, base_delay=1.0):
"""Retry an async operation with exponential backoff."""
attempt = 0
while attempt < max_retries:
try:
result = await coro_func(attempt)
return result
except Exception as e:
attempt += 1
if attempt >= max_retries:
raise
# Exponential backoff: 1s, 2s, 4s, 8s, ...
delay = base_delay * (2 ** (attempt - 1))
# Add jitter to prevent thundering herd
delay += random.uniform(0, delay * 0.1)
print(f"Attempt {attempt} failed ({e}). Retrying in {delay:.2f}s...")
await asyncio.sleep(delay)
async def demo_retry():
try:
result = await retry_with_backoff(unreliable_operation, max_retries=5)
print(f"Result: {result}")
except Exception as e:
print(f"Exhausted retries: {e}")
asyncio.run(demo_retry())
Output (example):
Attempt 1 failed (Connection failed on attempt 0). Retrying in 1.03s...
Attempt 2 failed (Connection failed on attempt 1). Retrying in 2.08s...
Success on attempt 2
Key details: exponential backoff prevents overloading a recovering service, and jitter prevents the "thundering herd" (all clients retrying simultaneously).
Pattern 2: Circuit Breaker for Cascading Failures
A circuit breaker trips after too many failures, failing fast and avoiding hammering a dead service:
import asyncio
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal, accepting requests
OPEN = "open" # Failing, rejecting requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60.0):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
async def call(self, coro):
"""Execute a coroutine through the circuit breaker."""
if self.state == CircuitState.OPEN:
# Check if timeout expired
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = CircuitState.HALF_OPEN
self.failures = 0
else:
raise RuntimeError("Circuit breaker is OPEN (service unavailable)")
try:
result = await coro
# Success: reset on half-open or close if fully open
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker OPEN after {self.failures} failures")
raise
async def unstable_service():
"""Simulates an unstable service."""
await asyncio.sleep(0.1)
raise ConnectionError("Service temporarily unavailable")
async def demo_circuit_breaker():
cb = CircuitBreaker(failure_threshold=3, timeout=2.0)
# Try to call until circuit opens
for i in range(5):
try:
await cb.call(unstable_service())
print(f"Call {i+1}: success")
except RuntimeError as e:
print(f"Call {i+1}: {e} (state: {cb.state.value})")
except ConnectionError:
print(f"Call {i+1}: service error (state: {cb.state.value})")
await asyncio.sleep(0.2)
asyncio.run(demo_circuit_breaker())
Circuit breakers prevent wasting time on definitely-failing services and allow graceful degradation.
Pattern 3: Fanout-Fanin for Parallel Work Coordination
Fanout (scatter work to many tasks) and fanin (gather results) is the async equivalent of map-reduce:
import asyncio
async def fetch_resource(resource_id):
"""Fetch a resource by ID."""
await asyncio.sleep(0.2) # Simulated I/O
return f"resource-{resource_id}"
async def fanout_fanin(resource_ids):
"""Fetch all resources in parallel, then aggregate."""
# Fanout: create tasks for all resources
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_resource(rid))
for rid in resource_ids
]
# Fanin: gather results
results = [task.result() for task in tasks]
# Aggregate
return {rid: result for rid, result in zip(resource_ids, results)}
async def demo_fanout_fanin():
resource_ids = range(1, 6)
result = await fanout_fanin(resource_ids)
print(f"Aggregated: {result}")
asyncio.run(demo_fanout_fanin())
This pattern is ideal for embarrassingly parallel problems: independent work items that can be processed concurrently.
Anti-Pattern 1: Fire-and-Forget Tasks
Creating a task and not awaiting it causes silent failures:
import asyncio
async def failing_task():
await asyncio.sleep(0.1)
raise ValueError("Task failed silently!")
async def bad_fire_and_forget():
"""Bad: task fails silently."""
task = asyncio.create_task(failing_task())
# Task is orphaned; exception is never handled
return "done"
async def good_approach():
"""Good: track and await tasks."""
task = asyncio.create_task(failing_task())
try:
await task
except ValueError as e:
print(f"Task failed: {e}")
print("Bad approach (exception lost):")
asyncio.run(bad_fire_and_forget())
print("\nGood approach (exception caught):")
asyncio.run(good_approach())
Always await tasks or use TaskGroups to ensure exceptions are propagated.
Anti-Pattern 2: Forgetting to Await
Mistakenly calling an async function without await is a classic Python gotcha:
import asyncio
async def get_data():
await asyncio.sleep(0.1)
return "data"
async def bad_forget_await():
"""Bad: forgot await—returns a coroutine, not data."""
result = get_data() # Missing await!
print(f"Result type: {type(result)}") # <class 'coroutine'>
print(f"Result: {result}") # Prints coroutine object
async def good_await():
"""Good: properly awaits."""
result = await get_data()
print(f"Result: {result}") # "data"
print("Bad (forgot await):")
asyncio.run(bad_forget_await())
print("\nGood (with await):")
asyncio.run(good_await())
Use a linter (like pylint or flake8 with flake8-async) to catch missing awaits.
Anti-Pattern 3: Blocking the Event Loop
Long synchronous operations freeze the event loop:
import asyncio
import time
async def blocking_loop():
"""Bad: blocks event loop with busy-wait."""
for _ in range(10):
# This busy-loop blocks all other tasks!
for _ in range(100_000_000):
pass
async def concurrent_tasks():
"""These won't start until blocking_loop finishes."""
for i in range(3):
await asyncio.sleep(0.1)
print(f"Task {i}")
async def bad_blocking():
"""Bad: blocking task prevents others from running."""
start = time.time()
async with asyncio.TaskGroup() as tg:
tg.create_task(blocking_loop())
tg.create_task(concurrent_tasks())
print(f"Elapsed: {time.time() - start:.2f}s")
async def good_nonblocking():
"""Good: use executor to offload blocking work."""
loop = asyncio.get_running_loop()
async def offloaded_work():
# Run in thread pool
await loop.run_in_executor(None, lambda: [
None for _ in range(100_000_000)
])
start = time.time()
async with asyncio.TaskGroup() as tg:
tg.create_task(offloaded_work())
tg.create_task(concurrent_tasks())
print(f"Elapsed: {time.time() - start:.2f}s")
print("Bad (blocking event loop):")
asyncio.run(bad_blocking())
print("\nGood (nonblocking):")
asyncio.run(good_nonblocking())
Always offload blocking code to run_in_executor().
Anti-Pattern 4: Shared Mutable State Without Locks
Concurrent modification of shared state without synchronization causes race conditions:
import asyncio
class Counter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment_unsafe(self):
"""Race condition: no lock."""
temp = self.value
await asyncio.sleep(0) # Yield; another task increments
self.value = temp + 1
async def increment_safe(self):
"""Safe: lock prevents concurrent modification."""
async with self.lock:
temp = self.value
await asyncio.sleep(0)
self.value = temp + 1
async def demo_race_condition():
counter = Counter()
# Unsafe: race condition
async with asyncio.TaskGroup() as tg:
for _ in range(10):
tg.create_task(counter.increment_unsafe())
print(f"Unsafe increments: {counter.value} (expected 10)")
# Safe: lock protects
counter.value = 0
async with asyncio.TaskGroup() as tg:
for _ in range(10):
tg.create_task(counter.increment_safe())
print(f"Safe increments: {counter.value} (correct: 10)")
asyncio.run(demo_race_condition())
Always protect shared mutable state with locks.
Anti-Pattern 5: Creating Too Many Tasks
Creating thousands of tasks per second causes memory exhaustion and scheduler overhead:
import asyncio
async def light_work():
await asyncio.sleep(0.01)
async def bad_task_explosion():
"""Bad: creates millions of tasks."""
async with asyncio.TaskGroup() as tg:
for _ in range(1_000_000): # Too many!
tg.create_task(light_work())
async def good_bounded_concurrency():
"""Good: limit concurrent tasks with semaphore or batching."""
semaphore = asyncio.Semaphore(100)
async def bounded_work():
async with semaphore:
await light_work()
async with asyncio.TaskGroup() as tg:
for _ in range(1_000_000):
tg.create_task(bounded_work())
# Uncomment to see the difference:
# asyncio.run(bad_task_explosion()) # OOM!
# asyncio.run(good_bounded_concurrency()) # Runs
Bound concurrency with semaphores or queues to avoid resource exhaustion.
Key Takeaways
- Implement retry logic with exponential backoff and jitter to handle transient failures gracefully.
- Use circuit breakers to fail fast and prevent cascading failures when a service is unavailable.
- Apply fanout-fanin patterns for parallel work coordination and aggregation.
- Avoid fire-and-forget tasks; always await or use TaskGroups to catch exceptions.
- Don't forget
awaiton async function calls—use linters to catch this. - Offload blocking code to
run_in_executor()to prevent event loop starvation. - Protect shared mutable state with locks to prevent race conditions.
- Bound task creation with semaphores to prevent resource exhaustion.
Frequently Asked Questions
How do I choose between retry backoff strategies?
Exponential backoff is safest for most scenarios (recovers gradually, prevents overload). Linear backoff (e.g., 1s, 2s, 3s) is simpler but recovers slower. Use exponential for external services, linear for internal retries.
What's a realistic failure threshold for a circuit breaker?
Start with 3-5 failures before opening. Too low = premature circuit trips; too high = slow failure detection. Tune based on your failure rate and acceptable service degradation.
Can I use asyncio patterns with other async frameworks (e.g., Trio)?
Some patterns are universal (circuit breakers, retry logic), but syntax differs. Trio uses slightly different APIs; Curio is even more different. Write framework-agnostic logic where possible.
How do I avoid creating too many tasks?
Use semaphores (limit concurrent), queues (buffer work), or batch processing. Alternatively, use streams or message brokers instead of task-per-item (e.g., nats.io, RabbitMQ).
Is it ever safe to not await a task?
Only if it's a background monitoring task that can fail independently (rare). Even then, ensure exceptions are logged. For all application logic, always await or track.