Error Handling & Observability
An error in production kills revenue: a down API, a payment failure, or a data leak. Without observability, you're blind—running on faith. Observability means you can ask any question about system behavior without adding new code. Implement structured logging (every event recorded as JSON), centralized error tracking (Sentry or similar), and distributed tracing (follow a request through all services). This guide sets up production-grade observability in FastAPI using industry-standard tools and patterns.
Structured Logging vs. Printf Logging
Printf logging ("User created: [email protected]") is unstructured; searching, filtering, and alerting on it is manual. Structured logging emits JSON:
{
"timestamp": "2026-06-02T10:30:45Z",
"level": "INFO",
"logger": "app.users",
"event": "user_created",
"user_id": 42,
"tenant_id": 1,
"email": "[email protected]",
"duration_ms": 145
}
Structured logs are queryable: "Find all errors in tenant 1 in the last hour" becomes a database query. This is standard practice in SaaS.
Setting Up Structured Logging with Python
Install logging libraries:
pip install python-json-logger python-dotenv
Create a centralized logging configuration:
# logging_config.py
import logging
import json
import sys
from datetime import datetime
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
"""
Custom JSON formatter that includes standard fields
(timestamp, level, logger name) plus custom context.
"""
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
# Add timestamp in ISO 8601 format
log_record["timestamp"] = datetime.utcnow().isoformat() + "Z"
log_record["level"] = record.levelname
log_record["logger"] = record.name
def setup_logging():
"""Configure structured JSON logging for all Python loggers."""
# Console handler (for local development and container logs)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
formatter = CustomJsonFormatter()
console_handler.setFormatter(formatter)
# Root logger
logging.root.setLevel(logging.INFO)
logging.root.addHandler(console_handler)
# Suppress noisy libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("sqlalchemy").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Initialize on app startup
setup_logging()
logger = logging.getLogger(__name__)
Contextual Logging with Correlation IDs
Trace a request through multiple services using a correlation ID:
import contextvars
import uuid
from fastapi import Request, FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
import logging
correlation_id_context: contextvars.ContextVar[str] = contextvars.ContextVar(
"correlation_id", default=None
)
class CorrelationIdMiddleware(BaseHTTPMiddleware):
"""
Add a unique correlation ID to each request.
This ID is passed through all logs, enabling end-to-end tracing.
"""
async def dispatch(self, request: Request, call_next):
# Get or generate correlation ID
correlation_id = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
# Store in context for all downstream code
correlation_id_context.set(correlation_id)
# Log the request
logger = logging.getLogger(__name__)
logger.info(
"request_received",
extra={
"correlation_id": correlation_id,
"method": request.method,
"path": request.url.path,
"client_ip": request.client.host if request.client else None
}
)
# Process request
response = await call_next(request)
# Log the response
logger.info(
"request_completed",
extra={
"correlation_id": correlation_id,
"status_code": response.status_code,
"duration_ms": int(response.headers.get("x-process-time", 0) * 1000)
}
)
# Return response with correlation ID header (for client debugging)
response.headers["X-Correlation-ID"] = correlation_id
return response
app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)
# Log with correlation ID automatically
def get_logger(name: str) -> logging.Logger:
"""Create a logger that auto-includes correlation ID in all logs."""
logger = logging.getLogger(name)
class ContextualLogger:
def info(self, msg: str, extra: dict = None):
context = {"correlation_id": correlation_id_context.get()}
if extra:
context.update(extra)
logger.info(msg, extra=context)
def error(self, msg: str, extra: dict = None, exc_info=None):
context = {"correlation_id": correlation_id_context.get()}
if extra:
context.update(extra)
logger.error(msg, extra=context, exc_info=exc_info)
def warning(self, msg: str, extra: dict = None):
context = {"correlation_id": correlation_id_context.get()}
if extra:
context.update(extra)
logger.warning(msg, extra=context)
return ContextualLogger()
Application-Level Error Handling
Create custom exception classes and global error handlers:
from fastapi import HTTPException, FastAPI
from fastapi.responses import JSONResponse
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
import traceback
class SaaSException(Exception):
"""Base exception for SaaS-specific errors."""
def __init__(self, message: str, code: str, status_code: int = 400, details: dict = None):
self.message = message
self.code = code
self.status_code = status_code
self.details = details or {}
class TenantNotFoundError(SaaSException):
def __init__(self, tenant_id: int):
super().__init__(
f"Tenant {tenant_id} not found",
"TENANT_NOT_FOUND",
404
)
class InsufficientPermissionsError(SaaSException):
def __init__(self, resource: str, action: str):
super().__init__(
f"User lacks permission to {action} {resource}",
"INSUFFICIENT_PERMISSIONS",
403
)
class PaymentFailedError(SaaSException):
def __init__(self, reason: str, transaction_id: str):
super().__init__(
f"Payment failed: {reason}",
"PAYMENT_FAILED",
402,
details={"transaction_id": transaction_id}
)
# Global exception handler
@app.exception_handler(SaaSException)
async def saas_exception_handler(request: Request, exc: SaaSException):
"""Handle custom SaaS exceptions."""
logger = get_logger(__name__)
logger.error(
"saas_error",
extra={
"error_code": exc.code,
"message": exc.message,
"status_code": exc.status_code,
"details": exc.details,
"path": request.url.path,
"method": request.method
}
)
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.code,
"message": exc.message,
"details": exc.details
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle unexpected errors (log and return 500)."""
logger = get_logger(__name__)
# Log with full traceback
logger.error(
"unhandled_exception",
extra={
"path": request.url.path,
"method": request.method,
"exception_type": type(exc).__name__,
"exception_message": str(exc)
},
exc_info=True
)
# Return generic 500 (don't leak internal details)
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "INTERNAL_SERVER_ERROR",
"message": "An unexpected error occurred. Our team has been notified."
}
)
Integration with Sentry for Error Tracking
Sentry centralizes error reports and alerts on critical issues:
pip install sentry-sdk
Initialize Sentry in your app:
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
import os
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
integrations=[
FastApiIntegration(),
SqlalchemyIntegration()
],
traces_sample_rate=0.1, # Send 10% of transactions (for performance tracing)
profiles_sample_rate=0.1, # Send 10% of profiles (CPU usage)
environment=os.getenv("ENVIRONMENT", "development"),
release=os.getenv("RELEASE_VERSION", "unknown")
)
Sentry automatically captures unhandled exceptions, HTTP errors, and database query performance. Send custom events:
import sentry_sdk
@app.post("/billing/charge")
async def process_charge(request: ChargeRequest, session: Session):
"""Process a payment charge."""
try:
result = stripe.Charge.create(...)
return result
except stripe.CardError as e:
# Send to Sentry with context
sentry_sdk.capture_exception(e, level="warning")
raise PaymentFailedError(e.message, e.id)
Metrics and Monitoring
Track application metrics (request count, error rate, latency):
pip install prometheus-client
Expose metrics for Prometheus:
from prometheus_client import Counter, Histogram, generate_latest
import time
# Define metrics
request_count = Counter(
"app_requests_total",
"Total requests",
["method", "endpoint", "status"]
)
request_duration = Histogram(
"app_request_duration_seconds",
"Request duration in seconds",
["method", "endpoint"]
)
errors_total = Counter(
"app_errors_total",
"Total errors",
["error_type"]
)
class MetricsMiddleware(BaseHTTPMiddleware):
"""Record metrics for every request."""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
# Record metrics
duration = time.time() - start_time
endpoint = request.url.path.split("/")[1] # First path segment
request_count.labels(
method=request.method,
endpoint=endpoint,
status=response.status_code
).inc()
request_duration.labels(
method=request.method,
endpoint=endpoint
).observe(duration)
return response
app.add_middleware(MetricsMiddleware)
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return Response(generate_latest(), media_type="text/plain")
Configure Prometheus to scrape your app:
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "saas_backend"
static_configs:
- targets: ["localhost:8000"]
metrics_path: "/metrics"
Then visualize in Grafana: create a dashboard querying Prometheus for latency, error rate, and throughput.
Health Checks
Implement a health check endpoint for load balancers and alerting:
@app.get("/health")
async def health_check(session: Session = Depends(get_db)):
"""
Check API and database health.
Returns 503 if any dependency is down.
"""
checks = {}
# Database check
try:
session.execute("SELECT 1")
checks["database"] = "healthy"
except Exception as e:
checks["database"] = f"unhealthy: {str(e)}"
# Redis check (if using Celery)
try:
import redis
redis_client = redis.from_url(os.getenv("REDIS_URL"))
redis_client.ping()
checks["redis"] = "healthy"
except Exception as e:
checks["redis"] = f"unhealthy: {str(e)}"
status = "healthy" if all("healthy" in v for v in checks.values()) else "degraded"
status_code = 200 if status == "healthy" else 503
return JSONResponse(
status_code=status_code,
content={"status": status, "checks": checks}
)
Load balancers poll /health every 10 seconds; if it returns non-200, they remove the instance from the pool.
Key Takeaways
- Use structured (JSON) logging with a context variable for correlation IDs; trace requests end-to-end.
- Create custom exception classes for domain-specific errors; use global exception handlers for consistency.
- Integrate Sentry for centralized error reporting and alerting on critical issues.
- Expose Prometheus metrics and visualize in Grafana for operational visibility.
- Implement health check endpoints for load balancer readiness and automatic failover.
Frequently Asked Questions
Should I log sensitive data (passwords, tokens, PII)?
No. Implement a redaction layer: PII_FIELDS = ["password", "credit_card", "ssn"] and mask them before logging. Use json-masked-logger or custom logic.
How do I reduce log volume in production?
Set logging.INFO level (not DEBUG). Filter noisy loggers: logging.getLogger("urllib3").setLevel(logging.WARNING). Archive old logs: send to S3 or a log aggregation service (DataDog, Splunk).
Can I correlate logs across microservices?
Yes. Pass the correlation ID in HTTP headers between services: X-Correlation-ID. Each service logs it automatically. Splunk or DataDog will group them by correlation ID.
What metrics should I alert on?
- Error rate > 1% (5xx errors)
- Latency p99 > 5s (slow requests)
- Health check failures
- Database connection pool exhaustion
- CPU/memory > 80%
Set up alerting in Prometheus AlertManager; route alerts to PagerDuty or Slack.
How do I test error handlers?
Mock exceptions and verify the response:
def test_payment_error_handler(client):
from unittest.mock import patch
with patch("stripe.Charge.create", side_effect=stripe.CardError(...)):
response = client.post("/billing/charge", json={...})
assert response.status_code == 402
assert "PAYMENT_FAILED" in response.json()["error"]