Debugging Async Deadlocks: Tools and Strategies
Deadlocks occur when tasks wait for each other in a circular dependency, preventing forward progress. A task holding lock A waits for lock B while another task holds lock B and waits for lock A—both hang forever. Debugging deadlocks is notoriously difficult because they're timing-sensitive and hard to reproduce. This article covers detection strategies, diagnostic tools, and fixes.
Detecting Deadlocks: Timeouts and Watchdog Tasks
The simplest deadlock detection is a watchdog timer: if the entire application doesn't complete within an expected time, something is deadlocked.
import asyncio
async def potentially_deadlocked():
"""A task that might deadlock."""
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task1():
async with lock1:
await asyncio.sleep(0.1)
# Try to acquire lock2 (might be held by task2)
async with lock2:
return "task1 done"
async def task2():
async with lock2:
await asyncio.sleep(0.1)
# Try to acquire lock1 (held by task1)
async with lock1:
return "task2 done"
# Both tasks try to acquire locks in opposite order—deadlock!
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
async def deadlock_watchdog(timeout=5.0):
"""Monitor and detect deadlock by timeout."""
try:
async with asyncio.timeout(timeout):
await potentially_deadlocked()
except TimeoutError:
print(f"DEADLOCK DETECTED: Operation timed out after {timeout}s")
asyncio.run(deadlock_watchdog())
Output:
DEADLOCK DETECTED: Operation timed out after 5.0s
Timeouts are the most practical deadlock detection for production: set a reasonable upper bound and alert if exceeded.
Inspecting Event Loop State
To diagnose a hanging task, inspect the event loop's running tasks:
import asyncio
async def hanging_task():
"""Task that will hang."""
lock = asyncio.Lock()
async with lock:
# Try to acquire same lock again (without RLock—deadlock)
async with lock:
return "never reached"
async def debug_hanging_task():
"""Start a task and inspect its state."""
task = asyncio.create_task(hanging_task())
await asyncio.sleep(1) # Let it hang
# Inspect event loop state
all_tasks = asyncio.all_tasks()
for t in all_tasks:
print(f"Task: {t.get_name()}")
print(f" Done: {t.done()}")
print(f" Cancelled: {t.cancelled()}")
print(f" Stack (top 5 frames):")
if hasattr(t, "get_stack"):
stack = t.get_stack()
for frame in stack[-5:]:
print(f" {frame.f_code.co_filename}:{frame.f_lineno} in {frame.f_code.co_name}")
task.cancel()
asyncio.run(debug_hanging_task())
Output:
Task: hanging_task
Done: False
Cancelled: False
Stack (top 5 frames):
/path/to/script.py:15 in __aenter__
The stack trace shows the task is stuck in lock acquisition (in __aenter__). This narrows the problem to synchronization.
Using asyncio Debug Mode
Enable asyncio debug mode to log slow callbacks and task creation:
import asyncio
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
async def slow_callback():
await asyncio.sleep(0.1)
async def debug_mode_demo():
loop = asyncio.get_running_loop()
loop.set_debug(True) # Enable debug mode
# Callbacks slower than 100ms are logged
loop.slow_callback_duration = 0.1
await slow_callback()
asyncio.run(debug_mode_demo())
Debug mode logs warnings for callbacks exceeding slow_callback_duration, helping identify blocking code that starves other tasks.
Deadlock Scenario: Lock Ordering
The most common deadlock is inconsistent lock ordering. Fix it by always acquiring locks in a consistent order:
import asyncio
async def deadlock_example():
"""Locks acquired in different orders—deadlock."""
lock_a = asyncio.Lock()
lock_b = asyncio.Lock()
async def task1():
print("Task1: acquiring lock_a")
async with lock_a:
await asyncio.sleep(0.1)
print("Task1: acquiring lock_b")
async with lock_b:
return "task1 done"
async def task2():
print("Task2: acquiring lock_b")
async with lock_b:
await asyncio.sleep(0.1)
print("Task2: acquiring lock_a")
async with lock_a:
return "task2 done"
# Deadlock likely (task1 waits for lock_b, task2 waits for lock_a)
try:
async with asyncio.timeout(2):
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
except TimeoutError:
print("Deadlock detected!")
async def fixed_version():
"""Locks acquired in consistent order—no deadlock."""
lock_a = asyncio.Lock()
lock_b = asyncio.Lock()
async def task1():
print("Task1: acquiring locks in order a, b")
async with lock_a:
async with lock_b:
await asyncio.sleep(0.1)
return "task1 done"
async def task2():
print("Task2: acquiring locks in order a, b (same as task1)")
async with lock_a:
async with lock_b:
await asyncio.sleep(0.1)
return "task2 done"
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
print("Fixed: no deadlock!")
print("=== Deadlock Example ===")
asyncio.run(deadlock_example())
print("\n=== Fixed Version ===")
asyncio.run(fixed_version())
Output:
=== Deadlock Example ===
Task1: acquiring lock_a
Task2: acquiring lock_b
Deadlock detected!
=== Fixed Version ===
Task1: acquiring locks in order a, b
Task2: acquiring locks in order a, b (same as task1)
Fixed: no deadlock!
Always document a global lock order and enforce it consistently across all tasks. Use comments or a dedicated module to define it.
Using Task Names for Diagnosis
Name your tasks during creation; the names appear in debuggers and error messages:
import asyncio
async def worker(name):
lock = asyncio.Lock()
async with lock:
print(f"Worker {name} acquired lock")
async def named_tasks_demo():
tasks = []
for i in range(3):
task = asyncio.create_task(
worker(f"w{i}"),
name=f"worker-{i}" # Named task
)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(named_tasks_demo())
Named tasks are easier to track in logs and debugger output. Use descriptive names like "fetch-user-123" or "process-batch-5" to identify tasks quickly.
Avoiding Deadlocks: Use Timeouts Everywhere
Wrap lock acquisitions with timeouts to prevent indefinite waits:
import asyncio
async def timeout_protected_lock():
"""Acquire lock with timeout."""
lock = asyncio.Lock()
async def work():
try:
# Wait up to 2 seconds to acquire lock
async with asyncio.timeout(2):
async with lock:
print("Acquired lock")
except TimeoutError:
print("Failed to acquire lock within timeout")
await work()
asyncio.run(timeout_protected_lock())
Timeouts on lock acquisition prevent silent hangs; the task either acquires the lock or fails explicitly, making the problem visible.
Profiling Event Loop for Bottlenecks
Use py-spy or similar to profile event loop performance and identify which tasks consume time:
import asyncio
async def cpu_intensive():
"""Simulate CPU work (blocks event loop)."""
total = 0
for i in range(10_000_000):
total += i
return total
async def async_work():
"""Non-blocking work."""
await asyncio.sleep(0.1)
return "done"
async def profile_demo():
"""Profile mixed workload."""
# Run cpu_intensive and async_work concurrently
result = await asyncio.gather(
cpu_intensive(),
async_work(),
async_work()
)
print(f"Result: {result}")
# Profile with: py-spy record -o profile.svg -- python script.py
# Then examine profile.svg to see where time is spent
asyncio.run(profile_demo())
Profile your application with py-spy or similar to identify CPU hogs blocking the event loop. Long-running synchronous code is a common cause of apparent deadlocks.
Key Takeaways
- Detect deadlocks with timeouts: wrap suspicious code in
asyncio.timeout()and alert if exceeded. - Use
asyncio.all_tasks()andtask.get_stack()to inspect task state and stack traces when debugging. - Enable debug mode with
loop.set_debug(True)to log slow callbacks and identify blocking operations. - Always acquire locks in a consistent, well-documented global order to prevent circular waits.
- Name tasks during creation for easier diagnosis and tracking in logs.
- Wrap lock acquisitions with timeouts to fail fast if a lock is unreachable.
Frequently Asked Questions
What's the difference between a deadlock and a livelock?
A deadlock is frozen: tasks wait indefinitely. A livelock is busy-waiting: tasks keep running but make no progress (e.g., spinning without yielding). Livelocks are rarer in asyncio but still possible.
How do I detect if my code is deadlock-prone?
Use stress tests: run the code with high concurrency and many iterations under different machine loads. Deadlocks are timing-sensitive; they may not appear in light testing.
Can I prevent deadlocks with a lock-free data structure?
Yes. Atomic operations (like compare-and-swap) avoid locks entirely, eliminating deadlock risk. However, lock-free code is complex; use libraries like sortedcontainers for common data structures.
What if I have more than 2 locks?
The same lock-ordering rule applies: define a global total order for all locks and acquire them in that order everywhere. Document the order clearly.
How do I debug asyncio code with pdb?
Use python -m pdb script.py or set breakpoints. When the debugger pauses, use asyncio.all_tasks() to list running tasks and inspect their state manually.