Handling Concurrent Requests: FastAPI with asyncpg
FastAPI is built on async/await, making it a perfect match for asyncpg. When a user hits an API endpoint, FastAPI runs your handler concurrently with hundreds of others—and each can query the database without blocking. The key is centralizing your asyncpg pool at app startup and sharing it across all endpoints via dependency injection. This article shows you how to build a production-ready async database layer in FastAPI that scales to thousands of concurrent users.
By the end, you'll have a reusable database module, error handling patterns, and endpoints that execute database queries safely and efficiently.
Setting Up the FastAPI App with a Database Pool
First, create a database module that manages the pool lifecycle:
# database.py
import asyncpg
from typing import Optional
class DatabaseManager:
"""Manages asyncpg connection pool lifecycle."""
def __init__(self, db_url: str):
self.db_url = db_url
self.pool: Optional[asyncpg.Pool] = None
async def initialize(self, min_size: int = 10, max_size: int = 50):
"""Create the connection pool on startup."""
self.pool = await asyncpg.create_pool(
self.db_url,
min_size=min_size,
max_size=max_size
)
print("Database pool initialized")
async def close(self):
"""Close all connections on shutdown."""
if self.pool:
await self.pool.close()
print("Database pool closed")
async def query(self, sql: str, *args):
"""Execute a query and fetch one row."""
async with self.pool.acquire() as conn:
return await conn.fetchrow(sql, *args)
async def fetch(self, sql: str, *args):
"""Execute a query and fetch all rows."""
async with self.pool.acquire() as conn:
return await conn.fetch(sql, *args)
async def execute(self, sql: str, *args):
"""Execute a statement (insert/update/delete)."""
async with self.pool.acquire() as conn:
return await conn.execute(sql, *args)
# Initialize the manager (used by FastAPI)
db = DatabaseManager('postgresql://postgres:password@localhost/postgres')
Now create the FastAPI app with startup and shutdown events:
# main.py
from fastapi import FastAPI, Depends, HTTPException
from database import db
from pydantic import BaseModel
app = FastAPI()
@app.on_event('startup')
async def startup():
"""Initialize database pool on app startup."""
await db.initialize(min_size=10, max_size=50)
@app.on_event('shutdown')
async def shutdown():
"""Close database pool on app shutdown."""
await db.close()
class User(BaseModel):
id: int
name: str
email: str
@app.get('/users/{user_id}', response_model=User)
async def get_user(user_id: int):
"""Fetch a user by ID."""
user = await db.query(
'SELECT id, name, email FROM users WHERE id = $1',
user_id
)
if not user:
raise HTTPException(status_code=404, detail='User not found')
return User(**user)
@app.get('/users', response_model=list[User])
async def list_users():
"""Fetch all users."""
rows = await db.fetch('SELECT id, name, email FROM users')
return [User(**row) for row in rows]
This simple setup already handles concurrent requests. When 100 users hit /users/{user_id} simultaneously, FastAPI runs all handlers concurrently—each acquires a pooled connection, executes the query, and returns.
Using Dependency Injection for Better Code Organization
For larger apps, inject the database connection into handlers using FastAPI's dependency system:
from fastapi import Depends, FastAPI
# Create a dependency function
async def get_db():
"""Dependency that provides a database connection."""
async with db.pool.acquire() as conn:
yield conn
@app.post('/users')
async def create_user(
name: str,
email: str,
conn = Depends(get_db) # Inject the connection
):
"""Create a new user."""
result = await conn.execute(
'INSERT INTO users (name, email) VALUES ($1, $2)',
name, email
)
return {'status': 'created'}
The Depends(get_db) decorator tells FastAPI to call get_db() and inject the result. This cleanly separates data access from business logic.
Transaction Safety in Endpoints
For operations spanning multiple queries, use transactions:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Set up async SQLAlchemy
async_engine = create_async_engine(
'postgresql+asyncpg://postgres:password@localhost/postgres'
)
async_session_factory = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_session():
"""Dependency for transactional endpoints."""
async with async_session_factory() as session:
yield session
@app.post('/orders')
async def place_order(
product_id: int,
quantity: int,
session: AsyncSession = Depends(get_session)
):
"""Place an order (decrement stock atomically)."""
try:
async with session.begin():
# Fetch and decrement stock
product = await session.get(Product, product_id)
if product.stock < quantity:
raise HTTPException(
status_code=400,
detail='Insufficient stock'
)
product.stock -= quantity
# Create order
order = Order(product_id=product_id, quantity=quantity)
session.add(order)
await session.commit()
return {'order_id': order.id, 'status': 'success'}
except Exception as e:
await session.rollback()
raise HTTPException(status_code=500, detail=str(e))
Each endpoint gets a fresh session. The async with session.begin() block ensures atomicity.
Error Handling: Connection Failures and Timeouts
Wrap database calls in error handling. Distinguish between retryable (network, temporary) and fatal (constraint violation, invalid query) errors:
from asyncpg import TooManyConnectionsError, PostgresError
import asyncio
@app.get('/users/{user_id}')
async def get_user_with_retry(user_id: int):
"""Fetch with automatic retry on connection failure."""
max_retries = 3
for attempt in range(max_retries):
try:
user = await db.query(
'SELECT id, name, email FROM users WHERE id = $1',
user_id
)
if not user:
raise HTTPException(status_code=404, detail='User not found')
return User(**user)
except TooManyConnectionsError:
if attempt == max_retries - 1:
raise HTTPException(
status_code=503,
detail='Database temporarily unavailable'
)
await asyncio.sleep(0.1 * (2 ** attempt)) # Backoff
except PostgresError as e:
raise HTTPException(status_code=500, detail=str(e))
This retries on pool exhaustion (temporary) but not on query errors (permanent).
Concurrent Batch Operations
For endpoints that fetch from multiple sources, use asyncio.gather():
from datetime import datetime, timedelta
@app.get('/dashboard/{user_id}')
async def get_dashboard(user_id: int):
"""Fetch user data, recent orders, and account balance concurrently."""
async def get_user():
return await db.query('SELECT * FROM users WHERE id = $1', user_id)
async def get_recent_orders():
cutoff = datetime.utcnow() - timedelta(days=30)
return await db.fetch(
'SELECT * FROM orders WHERE user_id = $1 AND created_at > $2',
user_id, cutoff
)
async def get_balance():
row = await db.query('SELECT balance FROM accounts WHERE user_id = $1', user_id)
return row['balance'] if row else 0
try:
# Run all three queries concurrently
user, orders, balance = await asyncio.gather(
get_user(),
get_recent_orders(),
get_balance()
)
return {
'user': dict(user),
'recent_orders': [dict(o) for o in orders],
'balance': balance
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
If each query takes 50 ms, running them sequentially takes 150 ms. Running concurrently takes ~50 ms. This is how you scale to high-concurrency APIs.
Monitoring Pool Health
Expose pool stats via an admin endpoint:
@app.get('/admin/db-stats')
async def db_stats():
"""Return connection pool statistics."""
return {
'pool_size': db.pool.get_size(),
'idle_connections': db.pool.get_idle_size(),
'min_size': db.pool._minsize,
'max_size': db.pool._maxsize
}
Monitor this endpoint to detect pool exhaustion or misconfiguration.
Key Takeaways
- Create an asyncpg pool at app startup; share it across all endpoints.
- Use dependency injection (
Depends(get_db)) to cleanly pass connections to handlers. - Wrap multi-statement operations in transactions to ensure atomicity.
- Distinguish between retryable errors (connection failures) and fatal errors (constraint violations).
- Use
asyncio.gather()to run independent queries concurrently. - Expose pool statistics to monitor health in production.
- Test under load to ensure pool size matches your concurrency.
Frequently Asked Questions
Should I use asyncpg directly or SQLAlchemy async?
Use asyncpg for raw performance and low-overhead queries. Use SQLAlchemy for complex queries, ORM convenience, or if you need type safety. For a pure CRUD API, asyncpg is often faster.
What pool size should I use in FastAPI?
Start with min_size = (typical_concurrent_requests / 2) and max_size = (peak_concurrent_requests * 1.5). For a 100-req/s API, min=10, max=30 is reasonable. Monitor and adjust based on actual concurrency.
How do I handle database connection errors gracefully?
Wrap calls in try/except, distinguish retryable errors (TooManyConnectionsError, timeouts) from fatal ones (constraint violations), and return appropriate HTTP status codes (503 for temporary, 400 for client errors, 500 for server errors).
Can multiple FastAPI instances share a database pool?
No. Each FastAPI process has its own pool. If you run 4 workers (uvicorn workers), you'll have 4 pools. For shared connection pooling across processes, use PgBouncer (external connection pooler) between FastAPI and PostgreSQL.