Skip to main content

FastAPI Lifespan Events: Startup and Shutdown

Lifespan events let you run code when your FastAPI app starts up and shuts down. When your app boots, you might open a database connection pool, load feature flags from a cache, or start background workers. When it stops, you close those resources cleanly. FastAPI's lifespan context manager (introduced in v0.93) replaced the older @app.on_event() decorator with a cleaner, more composable pattern. This guide shows you how to manage application state reliably so your API starts fast and stops gracefully.

I've debugged production outages caused by resources left open after graceful shutdown. This article teaches you patterns that prevent those failures.

Understanding the FastAPI Application Lifecycle

Every FastAPI application has three phases: initialization (code runs immediately on import), startup (runs after the server starts accepting requests), and shutdown (runs when the server stops). Lifespan events hook into startup and shutdown, letting you control application state:

App import

Server starts

Lifespan startup (code before yield)

API accepts requests

Server receives SIGTERM signal

Lifespan shutdown (code after yield)

Exit

Using the Lifespan Context Manager

The recommended pattern in FastAPI 0.93+ is the lifespan context manager:

from contextlib import asynccontextmanager
from fastapi import FastAPI
import logging

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: code before yield
logger.info("Application starting up")

# Initialize database connection pool
app.db_pool = create_connection_pool(
host="localhost",
port=5432,
database="mydb",
min_size=5,
max_size=20
)

# Load configuration from external service
app.config = await load_config_from_service()

logger.info("Application ready to serve requests")

yield # Server runs here, accepting requests

# Shutdown: code after yield
logger.info("Application shutting down")

# Close database pool
await app.db_pool.close()

# Cleanup external resources
await cleanup_external_service()

logger.info("Application shut down cleanly")

app = FastAPI(lifespan=lifespan)

@app.get("/data")
async def get_data():
# Access resources initialized in lifespan
async with app.db_pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM data")
return result

The pattern is clean: initialization happens before yield, the app runs in the middle, and cleanup happens after yield. FastAPI guarantees that the shutdown code runs even if there's an error or the process receives a SIGTERM signal.

Managing Database Connections

A common lifespan task is initializing database connection pools:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Initializing database")

app.engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb",
echo=False,
pool_size=20,
max_overflow=10,
pool_recycle=3600 # Recycle connections every hour
)

async with app.engine.begin() as conn:
# Create tables if needed (in production, use migrations)
await conn.run_sync(Base.metadata.create_all)

# Create session factory
app.SessionLocal = async_sessionmaker(
app.engine,
class_=AsyncSession,
expire_on_commit=False
)

logger.info("Database initialized")

yield

# Shutdown
logger.info("Closing database connections")
await app.engine.dispose()
logger.info("Database connections closed")

app = FastAPI(lifespan=lifespan)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with app.SessionLocal() as session:
user = await session.get(User, user_id)
return user

Connection pooling is critical for performance. A pool of 20 connections can handle thousands of requests efficiently by reusing connections instead of creating new ones.

Loading External Configuration

Lifespan is ideal for loading configuration from external services:

from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
database_url: str
api_key: str
debug: bool = False

class Config:
env_file = ".env"

@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Loading configuration")

# Load environment variables
app.settings = Settings()

# Fetch feature flags from remote config service
app.feature_flags = await fetch_feature_flags()

# Initialize API client with credentials
app.external_api = ExternalAPIClient(
api_key=app.settings.api_key,
timeout=10
)

logger.info(f"Running in debug mode: {app.settings.debug}")

yield

# Cleanup
logger.info("Closing external API client")
await app.external_api.close()

app = FastAPI(lifespan=lifespan)

@app.get("/feature/{feature_name}")
async def check_feature(feature_name: str):
is_enabled = app.feature_flags.get(feature_name, False)
return {"feature": feature_name, "enabled": is_enabled}

Fetching config at startup (instead of on every request) reduces latency and makes config changes visible only at deploy time, improving predictability.

Starting Background Tasks

Lifespan can start long-running background tasks:

import asyncio

async def background_worker(app: FastAPI):
"""Background task that syncs data every 30 seconds."""
while True:
try:
logger.info("Running background sync")
await app.external_api.sync_data()
await asyncio.sleep(30)
except Exception as e:
logger.error(f"Background sync failed: {e}")
await asyncio.sleep(5) # Retry sooner on error

@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting background worker")

# Create and store background task
app.background_task = asyncio.create_task(background_worker(app))

yield

# Shutdown
logger.info("Stopping background worker")
app.background_task.cancel()

try:
await app.background_task
except asyncio.CancelledError:
logger.info("Background worker stopped")

app = FastAPI(lifespan=lifespan)

The background task runs indefinitely until the app receives a shutdown signal. AsyncIO's create_task() schedules it on the event loop alongside incoming requests.

Composing Multiple Lifespan Contexts

For large applications with many startup/shutdown tasks, compose multiple context managers:

from contextlib import asynccontextmanager, ExitStack

async def init_database(app):
logger.info("Initializing database")
app.engine = create_async_engine("postgresql+asyncpg://...")
yield
await app.engine.dispose()

async def load_cache(app):
logger.info("Loading cache")
app.redis = await aioredis.create_redis_pool("redis://localhost")
yield
app.redis.close()

async def start_scheduler(app):
logger.info("Starting scheduler")
app.scheduler = APScheduler()
app.scheduler.start()
yield
app.scheduler.shutdown()

@asynccontextmanager
async def lifespan(app: FastAPI):
async with ExitStack() as stack:
# Initialize all resources in order
await stack.enter_async_context(init_database(app))
await stack.enter_async_context(load_cache(app))
await stack.enter_async_context(start_scheduler(app))

yield

# Cleanup happens automatically in reverse order

ExitStack ensures resources are cleaned up in reverse order, so dependencies are satisfied (e.g., close the scheduler before closing the database).

Error Handling in Lifespan

Handle errors gracefully during startup and shutdown:

@asynccontextmanager
async def lifespan(app: FastAPI):
try:
logger.info("Starting up")
app.db = create_db_connection()

# Verify database is accessible
async with app.db.pool.acquire() as conn:
await conn.fetchval("SELECT 1")

logger.info("Startup complete")

except Exception as e:
logger.error(f"Startup failed: {e}")
raise # Stop the application if startup fails

yield

try:
logger.info("Shutting down")
await app.db.close()
logger.info("Shutdown complete")

except Exception as e:
logger.error(f"Error during shutdown: {e}")
# Continue shutdown even if there's an error

If startup fails, raise the exception to prevent the app from running. If shutdown fails, log it but don't raise (the process is already stopping).

Testing with Lifespan

Test the lifespan behavior using lifespan as a context manager:

import pytest
from fastapi.testclient import TestClient

@pytest.fixture
async def test_app():
app = FastAPI(lifespan=lifespan)
return app

@pytest.mark.asyncio
async def test_lifespan_startup(test_app):
async with test_app.router.lifespan_context(test_app) as state:
# After startup
assert test_app.db_pool is not None
assert test_app.config is not None

# After shutdown
# Verify cleanup

Or use the TestClient which automatically manages lifespan:

def test_api_with_lifespan():
app = FastAPI(lifespan=lifespan)

@app.get("/test")
async def test_route():
return {"db": app.db_pool is not None}

client = TestClient(app)
response = client.get("/test")
assert response.status_code == 200
assert response.json()["db"] is True

Key Takeaways

  • Lifespan events (startup/shutdown) let you manage application state reliably.
  • Use async context managers with @asynccontextmanager for clean, composable initialization.
  • Database connections, feature flags, and external clients should be initialized at startup.
  • Background tasks can run during the lifespan; cancel them on shutdown.
  • Compose multiple lifespan contexts using ExitStack for complex initialization.
  • Always handle errors gracefully; let startup errors propagate, but log shutdown errors.

Frequently Asked Questions

What if I need to run synchronous code in lifespan?

Wrap sync code using asyncio.to_thread() to prevent blocking the event loop. FastAPI will run it in a thread pool.

Can lifespan access request state?

No. Lifespan runs once per application lifecycle, not per request. Use dependencies for request-scoped state.

How do I reload lifespan on code changes during development?

Set reload=True when running with Uvicorn: uvicorn main:app --reload. Uvicorn restarts the process on code changes, re-running lifespan.

What happens if shutdown takes a long time?

The OS sends SIGTERM after a timeout (often 30 seconds). Graceful shutdown might not complete. Design shutdown to be fast—avoid long database queries or waiting for external services.

Can I access lifespan state in middleware?

Yes. Middleware has access to app, so it can read app.db_pool, app.config, etc.

Further Reading