Skip to main content

Error Handling in Async Database Code: Retry Logic and Deadlocks

Database errors are inevitable—networks fail, queries deadlock, connections drop. Distinguishing between errors you can recover from (temporary network hiccup) and those you cannot (malformed SQL) is the foundation of reliable systems. Async code requires specific patterns: retries must not block the event loop, connection leaks must be prevented with proper cleanup, and exponential backoff prevents thundering herd when databases recover. This guide teaches you production-grade error handling for asyncpg, transaction retries, and how to instrument your code to diagnose failures.

Master these patterns and your system will be resilient, not fragile.

Classifying Database Errors: Retryable vs. Fatal

Not all errors should trigger a retry. Some are permanent:

Error TypeExampleRetryable?Action
ConnectionNetwork timeoutYesRetry with backoff
Pool exhaustedTooManyConnectionsErrorYes (short-term)Retry after delay
Transaction conflictSerialization failureYesRetry transaction
Constraint violationUnique key duplicateNoLog and reject
Invalid querySyntax errorNoFix code, don't retry
AuthorizationInsufficient privilegeNoCheck credentials
Disk fullOut of spaceNo (requires manual intervention)Alert and fail

Here's how to classify in code:

import asyncpg

async def is_retryable(exc: Exception) -> bool:
"""Return True if the error might succeed on retry."""

# Retryable errors
retryable_codes = {
'SERIALIZATION_FAILURE', # 40P01 in PostgreSQL
'DEADLOCK_DETECTED', # 40P02
'TRANSACTION_ABORT', # 40000
}

if isinstance(exc, asyncpg.TooManyConnectionsError):
return True

if isinstance(exc, asyncpg.PostgresError):
if exc.sqlstate in retryable_codes:
return True

# Network/timeout errors (retryable)
if isinstance(exc, (asyncio.TimeoutError, ConnectionError, OSError)):
return True

return False

async def execute_with_retry(async_func, max_retries=3):
"""Execute a function with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await async_func()
except Exception as exc:
if not is_retryable(exc):
raise # Non-retryable, fail fast

if attempt == max_retries - 1:
raise # Last attempt, give up

# Exponential backoff: 100ms, 400ms, 1600ms
wait_time = 0.1 * (2 ** attempt)
print(f"Retryable error: {exc}. Retrying in {wait_time:.2f}s...")
await asyncio.sleep(wait_time)

Usage:

async def fetch_user(user_id):
"""Fetch a user with automatic retry."""
async def _fetch():
async with pool.acquire() as conn:
return await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)

return await execute_with_retry(_fetch, max_retries=3)

Connection Leaks: Preventing Resource Exhaustion

A leaked connection stays in the pool forever, reducing capacity. Always use context managers:

# DANGEROUS: Connection might leak if exception occurs
async def bad_query():
conn = await pool.acquire()
try:
result = await conn.fetchval('SELECT 1')
except Exception:
raise # If exception, conn is never released!
conn.release() # Never executed if exception occurs above

# SAFE: Context manager ensures cleanup
async def good_query():
async with pool.acquire() as conn:
result = await conn.fetchval('SELECT 1')
# Connection auto-released, even on exception

In a large codebase, use linters to catch missing context managers:

# pylint: disable=R1732  (disable if you use a different pattern)
# mypy: Optional[AsyncContextManager]

Implementing Exponential Backoff

Naive retries can hammer the database. Exponential backoff increases wait time:

import asyncio
import random

async def execute_with_exponential_backoff(async_func, max_retries=5):
"""Retry with exponential backoff and jitter."""
base_delay = 0.1 # 100 ms

for attempt in range(max_retries):
try:
return await async_func()
except Exception as exc:
if not is_retryable(exc) or attempt == max_retries - 1:
raise

# Exponential backoff: 100ms, 200ms, 400ms, 800ms, 1600ms
delay = base_delay * (2 ** attempt)

# Add jitter to prevent thundering herd
# (all clients retrying at exactly the same time)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter

print(f"Attempt {attempt + 1} failed. Retrying in {total_delay:.2f}s...")
await asyncio.sleep(total_delay)

The jitter prevents all clients from retrying simultaneously when a database recovers.

Deadlock Detection and Handling

Deadlocks happen when two transactions lock resources in opposite order. PostgreSQL detects this and aborts one transaction. Handle it gracefully:

async def transfer_funds_with_deadlock_handling(from_id, to_id, amount):
"""Transfer funds, handling potential deadlock."""

async def _transfer():
async with pool.acquire() as conn:
async with conn.transaction():
# Always acquire locks in the same order to prevent deadlock
# (in this case, update by ID ascending)
accounts = sorted([from_id, to_id])

for acc_id in accounts:
await conn.execute(
'SELECT * FROM accounts WHERE id = $1 FOR UPDATE',
acc_id
)

# Now we hold locks, transfer safely
from_acc = await conn.fetchrow(
'SELECT * FROM accounts WHERE id = $1',
from_id
)

if from_acc['balance'] < amount:
raise ValueError('Insufficient funds')

await conn.execute(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
amount, from_id
)
await conn.execute(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
amount, to_id
)

return await execute_with_exponential_backoff(_transfer, max_retries=3)

Key insight: always acquire locks in a consistent order (e.g., ascending by ID). This prevents circular wait conditions.

Handling Serialization Failures in SERIALIZABLE Isolation

With SERIALIZABLE isolation, concurrent transactions may conflict. Implement mandatory retries:

async def read_modify_write_serializable(user_id):
"""Read-modify-write under SERIALIZABLE isolation."""

async def _operation():
async with async_session_factory() as session:
# Set isolation level per-session
await session.connection().execution_options(
isolation_level='SERIALIZABLE'
)

async with session.begin():
# Read
user = await session.get(User, user_id)

# Modify
user.score += 10

# Write
await session.commit()

# SERIALIZABLE frequently conflicts; retry aggressively
for attempt in range(5):
try:
return await _operation()
except sqlalchemy.exc.DBAPIError as exc:
if 'serialization failure' not in str(exc):
raise
if attempt == 4:
raise
await asyncio.sleep(0.05 * (2 ** attempt))

Connection Timeout Handling

Set timeouts to prevent indefinite hangs:

async def query_with_timeout(pool, sql, *args, timeout=5.0):
"""Execute a query with a timeout."""
try:
async with asyncio.timeout(timeout): # Python 3.11+
async with pool.acquire() as conn:
return await conn.fetch(sql, *args)
except asyncio.TimeoutError:
raise TimeoutError(
f"Query exceeded {timeout}s timeout. Consider optimizing or increasing timeout."
)

# For Python < 3.11
import asyncio

async def query_with_timeout_compat(pool, sql, *args, timeout=5.0):
"""Execute a query with timeout (Python 3.10 compatible)."""
try:
return await asyncio.wait_for(
query_inner(pool, sql, *args),
timeout=timeout
)
except asyncio.TimeoutError:
raise TimeoutError(f"Query timeout after {timeout}s")

async def query_inner(pool, sql, *args):
async with pool.acquire() as conn:
return await conn.fetch(sql, *args)

Structured Logging for Debugging

Log errors with context to aid debugging:

import logging
import traceback

logger = logging.getLogger(__name__)

async def execute_with_logging(async_func, context=None):
"""Execute with comprehensive logging."""
context = context or {}

try:
return await async_func()
except Exception as exc:
logger.error(
"Database operation failed",
extra={
'error_type': type(exc).__name__,
'error_message': str(exc),
'context': context,
'traceback': traceback.format_exc()
},
exc_info=True
)
raise

# Usage
async def get_user(user_id):
return await execute_with_logging(
lambda: pool.acquire(),
context={'operation': 'get_user', 'user_id': user_id}
)

Circuit Breaker Pattern

For degraded databases, use a circuit breaker to fail fast:

import enum
from datetime import datetime, timedelta

class CircuitState(enum.Enum):
CLOSED = 'closed' # Normal operation
OPEN = 'open' # Failing, reject new requests
HALF_OPEN = 'half_open' # Testing if recovered

class CircuitBreaker:
"""Simple circuit breaker for database operations."""

def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.last_failure_time = None

async def call(self, async_func):
"""Execute function through circuit breaker."""
if self.state == CircuitState.OPEN:
# Check if it's time to try recovery
if (datetime.now() - self.last_failure_time) > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
else:
raise RuntimeError("Circuit breaker is OPEN; database is unavailable")

try:
result = await async_func()

# Success: reset failure count
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
logger.info("Circuit breaker recovered")

self.failure_count = 0
return result

except Exception as exc:
self.failure_count += 1
self.last_failure_time = datetime.now()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error(
f"Circuit breaker OPEN after {self.failure_count} failures"
)

raise

# Usage
breaker = CircuitBreaker()

async def safe_query(user_id):
return await breaker.call(
lambda: pool.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
)

Key Takeaways

  • Classify errors: Distinguish retryable (network timeout) from fatal (constraint violation).
  • Always use context managers: Prevent connection leaks with async with.
  • Exponential backoff: Increases delay with each retry, reducing thundering herd.
  • Prevent deadlocks: Acquire locks in a consistent order.
  • Handle SERIALIZABLE conflicts: Retry mandatory; implement aggressive backoff.
  • Set timeouts: Prevent indefinite hangs.
  • Log comprehensively: Include context, error type, and traceback.
  • Use circuit breakers: Fail fast when the database is degraded.

Frequently Asked Questions

How many retries should I use?

3–5 retries is standard. Retries beyond 3 rarely succeed (usually indicates a permanent problem). Use exponential backoff so total wait time is ~1–5 seconds.

What's the difference between a timeout and a retry?

A timeout aborts waiting after N seconds. A retry repeats the entire operation. Use both: a timeout per-operation (5 seconds), and retries across multiple attempts (3 retries with backoff).

Should I retry all asyncpg exceptions?

No. Retry only transient errors (TooManyConnectionsError, deadlock, serialization failure, network errors). Don't retry constraint violations, syntax errors, or authorization failures.

Can I use circuit breakers with connection pools?

Yes. Put the circuit breaker around your pool operations. When the breaker opens, all requests fail fast without trying the pool, saving resources.

Further Reading