Skip to main content

FastAPI Database Connection Pooling

Database connection pooling is a performance technique where a pool of reusable connections waits for queries instead of opening a new connection per request. Creating a connection typically takes 50-200ms (handshake, authentication, TLS negotiation). Reusing a connection from a pool takes < 1ms. For an API handling 1,000 requests/second, pooling is the difference between 50-200s of latency per second and near-zero. This guide shows you how to configure SQLAlchemy connection pools for FastAPI, tune pool sizes, and debug pool exhaustion.

I've debugged incidents where an underconfigured pool caused timeouts under load. This article teaches you the mechanics and tuning rules that prevent those failures.

Understanding Connection Pooling

A pool maintains a set of "warm" connections to the database. When a route requests a connection, it gets one from the pool (very fast). When done, it returns the connection to the pool for reuse. If all connections are in use, new requests wait for one to become available.

Key metrics:

  • pool_size: Connections kept open and ready (default: 5). Start with 5-10 per 100 req/sec.
  • max_overflow: Additional connections created if pool_size is exhausted (default: 10). Total capacity = pool_size + max_overflow.
  • pool_recycle: Recycle connections older than N seconds (default: 3600). Databases close idle connections; recycling prevents stale connections.
  • pool_timeout: Seconds to wait for a connection before raising timeout (default: 30).

A typical pool configuration:

from sqlalchemy import create_engine

engine = create_engine(
"postgresql://user:password@localhost/mydb",
pool_size=20, # 20 connections always open
max_overflow=10, # Up to 30 total if needed
pool_recycle=3600, # Recycle after 1 hour
pool_timeout=30, # Wait 30s for a connection
echo=False # Don't log SQL (performance)
)

Sync vs. Async Connection Pools

SQLAlchemy's sync engine has one pool shared across threads. The async engine (via sqlalchemy.ext.asyncio) uses per-task connection management:

# Sync engine (thread-based)
engine = create_engine(
"postgresql://...",
pool_size=20,
max_overflow=10
)

# Async engine (event-loop based)
from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
"postgresql+asyncpg://...",
pool_size=20,
max_overflow=10
)

For FastAPI (async), use the async engine. It avoids blocking threads waiting for database I/O.

Configuring Pool Size

Pool size depends on your load and query time:

ScenarioFormulaExample
Low concurrency (< 100 req/s)5-1010 connections
Medium (100-1000 req/s)requests/sec * avg_query_time_s500 req/s * 0.01s = 5 connections
High (> 1000 req/s)requests/sec * avg_query_time_s + 20%2000 req/s * 0.005s = 10 connections + 2 = 12

Calculate it based on your database latency:

# Average query latency ~10ms (0.01s)
# Expected concurrency: 500 requests/second
# Pool size needed: 500 * 0.01 = 5

# Overhead margin: + 20%
pool_size = max(5, int(500 * 0.01 * 1.2)) # 6

Use environment-specific configs:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
database_url: str
database_pool_size: int = 5 # Development default
database_max_overflow: int = 10
database_pool_recycle: int = 3600

class Config:
env_file = ".env"

settings = Settings()

from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
settings.database_url,
pool_size=settings.database_pool_size,
max_overflow=settings.database_max_overflow,
pool_recycle=settings.database_pool_recycle
)

For production, set pool_size via environment:

DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=10

Diagnosing Pool Exhaustion

Pool exhaustion happens when all connections are in use and the queue is full. Routes timeout waiting for a connection. Watch for these errors:

TimeoutError: QueuePool limit exceeded
sqlalchemy.exc.InvalidRequestError: Can't connect to the pool

Debug pool status:

@app.get("/debug/pool-stats")
async def pool_stats():
"""Monitor connection pool status."""
pool = async_engine.pool

return {
"pool_size": pool.size(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"total": pool.size() + pool.overflow(),
"available": pool.size() - pool.checkedout()
}

Example output when exhausted:

{
"pool_size": 20,
"checked_out": 20,
"overflow": 10,
"total": 30,
"available": 0
}

Zero available means new requests will wait or timeout.

Common Pool Exhaustion Causes

1. Connection leaks: Sessions not closed properly.

# Bad: connection never returned
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
# If an exception happens, session might not close
users = await db.execute(select(User))
return users.scalars().all()

# Good: use try/finally or context manager
@app.get("/users")
async def get_users():
async with async_sessionmaker() as session:
users = await session.execute(select(User))
return users.scalars().all()

2. Long-running requests: Queries that take too long, holding connections.

# Bad: slow query holds connection
@app.get("/reports/summary")
async def generate_report(db: AsyncSession = Depends(get_db)):
# This takes 5 seconds; connection blocked
result = await db.execute(
select(func.count(User)).where(User.created_at > yesterday)
)
# ...
await asyncio.sleep(5) # Simulate slow processing
return result

# Good: optimize or increase pool size for slow endpoints

3. Synchronous code in async routes:

# Bad: blocking call holds connection and thread
@app.get("/data")
async def get_data(db: AsyncSession = Depends(get_db)):
data = await db.execute(select(Data))
time.sleep(2) # BLOCKS EVENT LOOP AND CONNECTION
return data

# Good: use asyncio.sleep
await asyncio.sleep(2)

Monitoring Pool Health

Log pool usage periodically:

from contextlib import asynccontextmanager
import logging

logger = logging.getLogger(__name__)

async def monitor_pool_health(app: FastAPI, interval: int = 60):
"""Monitor pool health every N seconds."""
while True:
try:
pool = async_engine.pool
logger.info(
"pool_stats",
extra={
"pool_size": pool.size(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"available": pool.size() - pool.checkedout()
}
)
await asyncio.sleep(interval)
except Exception as e:
logger.error(f"Error monitoring pool: {e}")

@asynccontextmanager
async def lifespan(app: FastAPI):
# Start pool monitor
monitor_task = asyncio.create_task(monitor_pool_health(app))

yield

monitor_task.cancel()

Connection Pooling with Multiple Databases

If you connect to multiple databases, configure pools separately:

# Primary database
primary_engine = create_async_engine(
settings.primary_db_url,
pool_size=20,
max_overflow=10
)

# Analytics database (read-only, less critical)
analytics_engine = create_async_engine(
settings.analytics_db_url,
pool_size=5,
max_overflow=5
)

SessionLocal = async_sessionmaker(primary_engine)
AnalyticsSessionLocal = async_sessionmaker(analytics_engine)

async def get_db():
async with SessionLocal() as session:
yield session

async def get_analytics_db():
async with AnalyticsSessionLocal() as session:
yield session

@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
# Uses primary database pool
return await db.execute(select(User))

@app.get("/metrics")
async def get_metrics(analytics: AsyncSession = Depends(get_analytics_db)):
# Uses analytics database pool
return await analytics.execute(select(Metric))

Testing with Connection Pooling

In tests, use an in-memory SQLite database with no pooling:

import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

@pytest.fixture
async def test_db():
"""Create an in-memory database for testing."""

# SQLite in-memory, no pooling needed
engine = create_async_engine("sqlite+aiosqlite:///:memory:")

async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

SessionLocal = async_sessionmaker(engine, class_=AsyncSession)

yield SessionLocal

await engine.dispose()

@pytest.fixture
def override_get_db(test_db):
async def get_db():
async with test_db() as session:
yield session

return get_db

@pytest.mark.asyncio
async def test_get_users(override_get_db):
app.dependency_overrides[get_db] = override_get_db

client = TestClient(app)
response = client.get("/users")
assert response.status_code == 200

Key Takeaways

  • Connection pools reuse database connections, reducing latency from 50-200ms per request to < 1ms.
  • Configure pool_size based on load and query latency: pool_size = requests/sec * avg_query_time_s.
  • Use async engines for FastAPI; they integrate with the event loop without blocking.
  • Monitor pool health and log exhaustion; set alerts on max utilization.
  • Debug connection leaks and long-running queries when pool exhaustion occurs.
  • Use environment variables to configure pool sizes per deployment.

Frequently Asked Questions

What's a good default pool size?

Start with 5-10 for development, 20 for production. Adjust based on monitoring.

How do I know if my pool size is too small?

Watch for QueuePool limit exceeded errors or timeout errors when under load. Increase pool_size.

Should I set max_overflow to zero?

No. max_overflow allows temporary overload during traffic spikes. Set it to 10-20% of pool_size.

How often should I recycle connections?

Every 3600 seconds (1 hour) by default. Most databases close idle connections after 30 minutes. Match your database's idle timeout.

Can I monitor pool usage in production?

Yes. Create a debug endpoint that returns pool stats, or emit pool metrics to your monitoring system (Datadog, Prometheus).

Further Reading