Monitoring Async Database Performance: Logging and Metrics
You can't improve what you don't measure. Production database issues—slow queries, connection leaks, pool exhaustion—are invisible without monitoring. This article teaches you how to instrument asyncpg with structured logging, expose metrics to Prometheus, and set up alerts. By the end, you'll have a complete observability stack: query latency histograms, pool usage dashboards, and automated notifications when thresholds are breached.
Monitoring is the difference between diagnosing production issues in minutes versus hours.
Structured Logging: Context and Timing
Log every database operation with timing and context:
import logging
import time
from functools import wraps
from typing import Any
logger = logging.getLogger(__name__)
def log_query(func):
"""Decorator to log asyncpg queries with timing."""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = await func(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000 # ms
logger.info(
"query_executed",
extra={
'query': getattr(func, '__name__', 'unknown'),
'elapsed_ms': elapsed,
'args': str(args)[:100], # Log first 100 chars
'success': True
}
)
return result
except Exception as exc:
elapsed = (time.perf_counter() - start) * 1000
logger.error(
"query_failed",
extra={
'query': getattr(func, '__name__', 'unknown'),
'elapsed_ms': elapsed,
'error_type': type(exc).__name__,
'error_message': str(exc)[:200]
},
exc_info=True
)
raise
return wrapper
@log_query
async def get_user(user_id: int):
"""Fetch a user."""
async with pool.acquire() as conn:
return await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
Log output (with JSON formatter):
{
"timestamp": "2026-06-02T10:30:45.123Z",
"level": "INFO",
"message": "query_executed",
"query": "get_user",
"elapsed_ms": 3.45,
"args": "(1,)",
"success": true
}
Identifying Slow Queries
Track query latencies and flag slow ones:
import logging
logger = logging.getLogger(__name__)
SLOW_QUERY_THRESHOLD_MS = 100 # Alert if query exceeds 100ms
async def query_with_slowness_tracking(pool, sql: str, *args):
"""Execute a query and log if it's slow."""
start = time.perf_counter()
async with pool.acquire() as conn:
result = await conn.fetch(sql, *args)
elapsed_ms = (time.perf_counter() - start) * 1000
if elapsed_ms > SLOW_QUERY_THRESHOLD_MS:
logger.warning(
f"slow_query_detected",
extra={
'sql': sql[:200],
'elapsed_ms': elapsed_ms,
'threshold_ms': SLOW_QUERY_THRESHOLD_MS,
'rows_returned': len(result)
}
)
return result
# Usage
result = await query_with_slowness_tracking(
pool,
'SELECT * FROM users WHERE created_at > $1',
datetime.utcnow() - timedelta(days=30)
)
Use slow query logs to identify performance bottlenecks. For example, if you see many slow queries on a table, consider adding an index.
Prometheus Metrics
Export metrics to Prometheus for dashboarding and alerting:
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
# Create a registry
registry = CollectorRegistry()
# Counter: total queries executed
queries_total = Counter(
'asyncpg_queries_total',
'Total queries executed',
['status'], # 'success' or 'failure'
registry=registry
)
# Histogram: query latency distribution
query_latency = Histogram(
'asyncpg_query_latency_ms',
'Query latency in milliseconds',
buckets=[1, 5, 10, 50, 100, 500, 1000],
registry=registry
)
# Gauge: current pool usage
pool_connections_in_use = Gauge(
'asyncpg_pool_connections_in_use',
'Connections currently in use',
registry=registry
)
pool_connections_idle = Gauge(
'asyncpg_pool_connections_idle',
'Idle connections available',
registry=registry
)
# Counter: connection acquisition waits (pool exhaustion)
pool_exhausted = Counter(
'asyncpg_pool_exhausted_total',
'Times the pool was exhausted',
registry=registry
)
async def query_with_metrics(pool, sql: str, *args):
"""Execute a query and record Prometheus metrics."""
start = time.perf_counter()
try:
# Update pool gauge
pool_connections_in_use.set(pool.get_size() - pool.get_idle_size())
pool_connections_idle.set(pool.get_idle_size())
async with pool.acquire() as conn:
result = await conn.fetch(sql, *args)
# Record latency
elapsed_ms = (time.perf_counter() - start) * 1000
query_latency.observe(elapsed_ms)
queries_total.labels(status='success').inc()
return result
except asyncpg.TooManyConnectionsError:
pool_exhausted.inc()
queries_total.labels(status='failure').inc()
raise
except Exception:
queries_total.labels(status='failure').inc()
raise
Expose metrics via an HTTP endpoint (FastAPI):
from fastapi import FastAPI
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
app = FastAPI()
@app.get('/metrics')
async def metrics():
"""Expose Prometheus metrics."""
return Response(
generate_latest(registry),
media_type=CONTENT_TYPE_LATEST
)
Prometheus scrapes /metrics periodically and stores the data. Use Grafana to visualize.
Connection Leak Detection
Monitor total connections over time. A steady increase indicates leaks:
import asyncio
async def monitor_pool_health(pool, interval_seconds=30):
"""Continuously monitor pool for leaks."""
prev_size = pool.get_size()
while True:
await asyncio.sleep(interval_seconds)
current_size = pool.get_size()
if current_size > prev_size:
logger.warning(
"pool_growth_detected",
extra={
'prev_size': prev_size,
'current_size': current_size,
'idle': pool.get_idle_size(),
'max_size': pool._maxsize
}
)
prev_size = current_size
# Start monitoring in background
asyncio.create_task(monitor_pool_health(pool))
If pool size grows beyond min_size and stays high, investigate for connection leaks (missing .close() or async with statements).
Query Sampling for Production Debugging
In production, log only a sample of queries to reduce noise:
import random
SAMPLE_RATE = 0.01 # Log 1% of queries
async def query_with_sampling(pool, sql: str, *args):
"""Execute a query, logging only a sample."""
should_log = random.random() < SAMPLE_RATE
if should_log:
logger.debug(f"executing_query: {sql[:100]}")
start = time.perf_counter()
async with pool.acquire() as conn:
result = await conn.fetch(sql, *args)
elapsed_ms = (time.perf_counter() - start) * 1000
if should_log:
logger.debug(f"query_completed: {elapsed_ms:.1f}ms")
return result
Adjust SAMPLE_RATE based on traffic. High sample rate = more detail but more logs. Low sample rate = less noise but less visibility.
Dashboard: Pool Usage Over Time
Create a Grafana dashboard with these panels:
- Pool Size (gauge): Current total connections.
- Idle Connections (gauge): Available connections.
- Query Latency (histogram): p50, p95, p99 latency.
- Queries Per Second (counter): Throughput.
- Pool Exhaustion (counter): Times the pool ran out of connections.
Example PromQL queries:
# Current pool utilization percentage
(asyncpg_pool_connections_in_use / (asyncpg_pool_connections_in_use + asyncpg_pool_connections_idle)) * 100
# Query latency p95
histogram_quantile(0.95, asyncpg_query_latency_ms)
# Queries per second
rate(asyncpg_queries_total[1m])
# Pool exhaustion rate
rate(asyncpg_pool_exhausted_total[5m])
Alerting: Critical Thresholds
Configure alerts in Prometheus:
# prometheus.yml
groups:
- name: database
rules:
- alert: HighQueryLatency
expr: histogram_quantile(0.95, asyncpg_query_latency_ms) > 500
for: 5m
annotations:
summary: "Query p95 latency exceeds 500ms"
- alert: PoolExhaustion
expr: rate(asyncpg_pool_exhausted_total[5m]) > 0
for: 1m
annotations:
summary: "Connection pool exhausted in the last 5 minutes"
- alert: HighErrorRate
expr: rate(asyncpg_queries_total{status="failure"}[5m]) > 0.01
for: 5m
annotations:
summary: "Database error rate exceeds 1%"
Send alerts to PagerDuty, Slack, or email via Alertmanager.
Slow Query Log Analysis
Periodically analyze slow query logs to find optimization opportunities:
import json
from collections import defaultdict
from datetime import datetime, timedelta
async def analyze_slow_queries(log_file: str, threshold_ms: int = 100):
"""Analyze slow queries from logs."""
slow_queries = defaultdict(list)
with open(log_file) as f:
for line in f:
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get('message') == 'slow_query_detected':
sql = entry.get('sql', 'unknown')
elapsed = entry.get('elapsed_ms', 0)
slow_queries[sql].append(elapsed)
# Top 5 slowest unique queries
top_slow = sorted(
slow_queries.items(),
key=lambda x: sum(x[1]) / len(x[1]), # average latency
reverse=True
)[:5]
for sql, latencies in top_slow:
avg_latency = sum(latencies) / len(latencies)
print(f"Query: {sql}")
print(f" Count: {len(latencies)}")
print(f" Avg latency: {avg_latency:.1f}ms")
print(f" Max latency: {max(latencies):.1f}ms")
Use this to identify queries that need optimization (e.g., missing indexes, N+1 problems).
Database Health Check Endpoint
Expose a health endpoint for load balancers:
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get('/health/database')
async def database_health():
"""Check if database is responsive."""
try:
async with asyncio.timeout(2.0): # 2 second timeout
async with pool.acquire() as conn:
await conn.fetchval('SELECT 1')
return {
'status': 'healthy',
'pool_size': pool.get_size(),
'idle': pool.get_idle_size()
}
except asyncio.TimeoutError:
raise HTTPException(status_code=503, detail='Database timeout')
except Exception as exc:
raise HTTPException(status_code=503, detail=str(exc))
Load balancers can route traffic away from unhealthy instances based on this endpoint.
Key Takeaways
- Structured logging: Log every query with timing, status, and context.
- Identify slow queries: Flag queries exceeding threshold (e.g., 100 ms), analyze logs to optimize.
- Prometheus metrics: Export query latency, pool usage, and error rates.
- Monitor for leaks: Track pool size over time; steady growth indicates connection leaks.
- Alerting: Set thresholds for p95 latency, pool exhaustion, error rate.
- Health checks: Expose database health endpoints for load balancers.
Frequently Asked Questions
Should I log every query or sample?
Sample in production (e.g., 1%) to reduce log volume. Always log errors and slow queries. Increase sample rate during debugging.
What's a good query latency threshold?
For OLTP (transactional) queries: threshold ~100 ms. For OLAP (analytical) queries: threshold ~5000 ms. Adjust based on your SLA.
How do I prevent metrics from becoming too high-cardinality?
Avoid including user IDs or dynamic values as labels. Use fixed labels: [status], [query_type], [table_name]. Too many unique label values can explode Prometheus memory.
Can I correlate slow query logs with Prometheus metrics?
Yes. Timestamp logs with UTC seconds precision, then join with Prometheus data using the same time window. Tools like Loki can help correlate logs and metrics.