Authentication & Authorization in FastAPI
Authentication verifies who you are; authorization confirms what you can do. A production SaaS backend uses JWT (JSON Web Tokens) to issue short-lived access tokens after login, refresh tokens to get new access tokens without re-entering credentials, and role-based access control (RBAC) to restrict endpoints. This guide implements a complete auth system in FastAPI: password hashing, token generation, refresh logic, and middleware to enforce authorization on protected routes.
Why JWT Over Session Cookies?
Session cookies store state on the server (which user is logged in, their permissions). With 10,000 concurrent users, the server must query a session store for every request. JWTs are stateless: the client includes a signed token in every request, the server verifies the signature (no database lookup), and immediately knows who you are. This scales linearly; server overhead is negligible. Stateless tokens are essential for horizontally scaled SaaS: any server in a load-balanced cluster can validate any token without shared state.
Setting Up Password Hashing
Never store raw passwords. Use Argon2, the winner of the Password Hashing Competition (2015), which is slow by design—an attacker brute-forcing a password must wait 100 ms per guess:
pip install bcrypt passlib[bcrypt] python-jose[cryptography] pydantic
Create a hashing utility:
from passlib.context import CryptContext
# Argon2 is secure but slower; bcrypt is also solid and slightly faster.
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""Hash a plain-text password using bcrypt. Safe to store in a database."""
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
"""Compare a plain-text password to its hash. Returns True if they match."""
return pwd_context.verify(plain, hashed)
# Test:
hashed = hash_password("mySecurePassword123!")
assert verify_password("mySecurePassword123!", hashed)
assert not verify_password("wrongPassword", hashed)
Issuing and Validating JWT Tokens
Create tokens after login. A JWT contains three parts separated by dots: header.payload.signature. The payload (claims) includes the user ID, expiration, and custom fields like role:
from datetime import datetime, timedelta
from jose import JWTError, jwt
from pydantic import BaseModel
import os
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
class TokenData(BaseModel):
user_id: int
tenant_id: int
role: str
exp: datetime
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
def create_access_token(user_id: int, tenant_id: int, role: str, expires_delta: timedelta | None = None) -> str:
"""
Create a short-lived JWT access token.
Claims include user_id, tenant_id, and role for authorization checks.
"""
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode = {
"user_id": user_id,
"tenant_id": tenant_id,
"role": role,
"exp": expire
}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_refresh_token(user_id: int, tenant_id: int) -> str:
"""
Create a long-lived refresh token.
Used only to request a new access token; never used for API calls.
"""
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = {
"user_id": user_id,
"tenant_id": tenant_id,
"exp": expire,
"type": "refresh"
}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> TokenData | None:
"""
Verify a JWT signature and return claims.
Returns None if token is expired or tampered with.
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("user_id")
tenant_id = payload.get("tenant_id")
role = payload.get("role")
if user_id is None or tenant_id is None:
return None
return TokenData(
user_id=user_id,
tenant_id=tenant_id,
role=role or "user",
exp=datetime.fromtimestamp(payload.get("exp"))
)
except JWTError:
return None
Login Endpoint
The login endpoint validates credentials and returns both access and refresh tokens:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthCredentials
from sqlalchemy.orm import Session
from pydantic import BaseModel
app = FastAPI()
security = HTTPBearer()
class LoginRequest(BaseModel):
email: str
password: str
class LoginResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str
@app.post("/auth/login", response_model=LoginResponse)
async def login(
request: LoginRequest,
session: Annotated[Session, Depends(get_db)]
):
"""
Authenticate a user with email and password.
Returns access_token (15 min) and refresh_token (7 days).
"""
# Query user by email
user = session.query(User).filter(User.email == request.email).first()
if not user or not verify_password(request.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"}
)
# Generate tokens
access_token = create_access_token(
user_id=user.id,
tenant_id=user.tenant_id,
role=user.role or "user"
)
refresh_token = create_refresh_token(
user_id=user.id,
tenant_id=user.tenant_id
)
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token
)
Protected Routes and Role-Based Access
Extract the token from the Authorization header and validate it:
from fastapi import Depends
from typing import Annotated
async def get_current_user(
credentials: Annotated[HTTPAuthCredentials, Depends(security)]
) -> TokenData:
"""
Extract and validate the JWT token from the Authorization header.
Used as a dependency on protected endpoints.
"""
token = credentials.credentials
token_data = decode_token(token)
if token_data is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"}
)
return token_data
async def require_admin(
current_user: Annotated[TokenData, Depends(get_current_user)]
) -> TokenData:
"""
Dependency that checks if the user has the 'admin' role.
Inject this into endpoints that require admin access.
"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
return current_user
# Protected endpoint examples:
@app.get("/me")
async def get_current_user_profile(
current_user: Annotated[TokenData, Depends(get_current_user)]
):
"""Returns the authenticated user's profile. Any authenticated user can access."""
return {
"user_id": current_user.user_id,
"tenant_id": current_user.tenant_id,
"role": current_user.role
}
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: Annotated[TokenData, Depends(require_admin)],
session: Annotated[Session, Depends(get_db)]
):
"""Delete a user. Only admins can call this endpoint."""
user = session.query(User).filter(
User.id == user_id,
User.tenant_id == current_user.tenant_id
).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
session.delete(user)
session.commit()
return {"deleted": user_id}
Refresh Token Endpoint
When the access token expires (after 15 minutes), the client calls the refresh endpoint to get a new access token without re-entering the password:
@app.post("/auth/refresh", response_model=Token)
async def refresh_access_token(
credentials: Annotated[HTTPAuthCredentials, Depends(security)],
session: Annotated[Session, Depends(get_db)]
):
"""
Exchange a refresh token for a new access token.
The refresh token is long-lived (7 days); access token is short-lived (15 min).
"""
refresh_token = credentials.credentials
token_data = decode_token(refresh_token)
if token_data is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token"
)
# Verify the token type is 'refresh' (optional, for belt-and-suspenders)
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Not a refresh token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid refresh token")
# Query the user to get current role (in case it changed)
user = session.query(User).filter(User.id == token_data.user_id).first()
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found or inactive")
# Issue new access token
new_access_token = create_access_token(
user_id=user.id,
tenant_id=user.tenant_id,
role=user.role or "user"
)
return Token(
access_token=new_access_token,
refresh_token=refresh_token # Client can reuse the same refresh token
)
Password Reset Flow
Implement a secure password reset using a time-limited token:
from uuid import uuid4
class PasswordResetRequest(BaseModel):
email: str
class PasswordReset(BaseModel):
token: str
new_password: str
@app.post("/auth/forgot-password")
async def forgot_password(
request: PasswordResetRequest,
session: Annotated[Session, Depends(get_db)]
):
"""
Initiate a password reset. Send a reset token via email to the user.
In production, store the token in Redis with a 1-hour expiry.
"""
user = session.query(User).filter(User.email == request.email).first()
if not user:
# Don't reveal whether email exists (prevents user enumeration)
return {"message": "If email exists, a reset link has been sent."}
# Generate a reset token (in production, use secrets.token_urlsafe(32))
reset_token = str(uuid4())
# In production, store in Redis: redis.setex(f"reset:{reset_token}", 3600, user.id)
# For now, we'll skip the storage and assume it's validated elsewhere
# Send email with reset link (pseudo-code):
# send_email(user.email, f"https://yourapp.com/reset?token={reset_token}")
return {"message": "Password reset link sent to email"}
@app.post("/auth/reset-password")
async def reset_password(
request: PasswordReset,
session: Annotated[Session, Depends(get_db)]
):
"""
Complete a password reset using a valid reset token.
"""
# Verify token (in production: user_id = redis.get(f"reset:{request.token}"))
user_id = None # Placeholder
if user_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token"
)
user = session.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.password_hash = hash_password(request.new_password)
session.commit()
# Invalidate the reset token in Redis
# redis.delete(f"reset:{request.token}")
return {"message": "Password reset successfully"}
Key Takeaways
- JWT tokens are stateless; verification requires only signature validation, enabling horizontal scaling.
- Access tokens are short-lived (15 min); refresh tokens are long-lived (7 days) and stored securely by the client.
- Role-based access control (RBAC) is enforced via dependencies that check the
roleclaim in the token. - Passwords are hashed with bcrypt before storage; never store plain passwords.
- Password reset links are time-limited tokens sent via email; the user resets via a secondary endpoint.
Frequently Asked Questions
Can I store the refresh token in an HttpOnly cookie instead of the response body?
Yes. Return the refresh token as an HttpOnly, Secure, SameSite=Strict cookie (response.set_cookie(...)). The client automatically includes it in refresh requests. This prevents JavaScript from stealing it (XSS mitigation).
What if the access token is stolen?
An attacker can use it for 15 minutes (the lifetime). After that, they need the refresh token to get a new access token. Store refresh tokens in a revocation list (Redis or database) and check during refresh. This allows users to "log out" by invalidating all refresh tokens.
How do I handle token revocation without a database lookup on every request?
Use a revocation list in Redis with ttl = token_exp - now. Check only during refresh (not every request). For logout, set the token's ID in a revocation set; during refresh, skip revocation check for other tokens.
Should I include the user's full profile in the JWT?
No. Include minimal claims (user_id, tenant_id, role). For other data (email, name), fetch from the database or cache. JWTs are visible to the client; don't include secrets or sensitive fields.
How do I test authentication without valid credentials?
Mock the get_current_user() dependency in tests. Use app.dependency_overrides[get_current_user] = mock_user_dependency. For integration tests, generate a valid token with create_access_token().