Skip to main content

Implementing Refresh Tokens in FastAPI

Short-lived access tokens (15 minutes) limit exposure if stolen, but users shouldn't re-enter passwords every 15 minutes. Refresh tokens solve this: long-lived tokens (7 days to 1 year) stored securely that issue new access tokens without user interaction. When an access token expires, the client uses the refresh token to get a new one. If the refresh token is compromised, it's still easier to revoke than re-authenticate millions of users.

Token Lifecycle: Access and Refresh

  1. User logs in: API issues both an access token (15 min) and refresh token (7 days).
  2. User accesses API: Token included in every request. Works for 15 minutes.
  3. Access token expires: Client catches 401 error, sends refresh token to /refresh endpoint.
  4. API issues new access token: Fresh 15-minute window. User stays logged in without password re-entry.
  5. Refresh token expires: User must log in again.

A compromised access token is useful for only 15 minutes. A compromised refresh token can be rotated or revoked immediately, affecting all derived access tokens within seconds.

Issuing Access and Refresh Tokens

Modify your login endpoint to return both tokens:

# security.py
from jose import jwt
from datetime import datetime, timedelta
from config import SECRET_KEY, ALGORITHM

def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""Create short-lived access token (15 min)."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""Create long-lived refresh token (7 days)."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=7)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def decode_token(token: str, expected_type: str = "access") -> dict | None:
"""Decode token; return None if invalid or wrong type."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != expected_type:
return None # Token type mismatch
return payload
except jwt.JWTError:
return None

Update the login endpoint:

# main.py
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from datetime import timedelta
from security import (
hash_password, verify_password, create_access_token,
create_refresh_token
)

app = FastAPI()

class LoginRequest(BaseModel):
username: str
password: str

class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str

fake_users = {
"alice": {
"username": "alice",
"hashed_password": hash_password("secret123"),
"email": "[email protected]"
}
}

@app.post("/login", response_model=TokenResponse)
async def login(credentials: LoginRequest):
"""Authenticate user and issue access + refresh tokens."""
user = fake_users.get(credentials.username)

if not user or not verify_password(credentials.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)

access_token = create_access_token(data={"sub": user["username"]})
refresh_token = create_refresh_token(data={"sub": user["username"]})

return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}

The Refresh Endpoint

Create a /refresh endpoint that accepts a refresh token and returns a new access token:

# main.py (continued)
class RefreshRequest(BaseModel):
refresh_token: str

class AccessTokenResponse(BaseModel):
access_token: str
token_type: str

@app.post("/refresh", response_model=AccessTokenResponse)
async def refresh_access_token(request: RefreshRequest):
"""Issue a new access token using a refresh token."""
payload = decode_token(request.refresh_token, expected_type="refresh")

if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token"
)

username = payload.get("sub")
new_access_token = create_access_token(data={"sub": username})

return {
"access_token": new_access_token,
"token_type": "bearer"
}

Client-Side Token Handling

The client stores both tokens and refreshes automatically on 401:

// client.js (example)
let accessToken = null;
let refreshToken = null;

async function login(username, password) {
const response = await fetch("/login", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ username, password })
});

const data = await response.json();
accessToken = data.access_token;
refreshToken = data.refresh_token;

// Store in httpOnly cookie or secure storage
localStorage.setItem("refresh_token", refreshToken);
// Don't store access token in localStorage (XSS risk)
}

async function makeAuthenticatedRequest(url, options = {}) {
let response = await fetch(url, {
...options,
headers: {
...options.headers,
"Authorization": `Bearer ${accessToken}`
}
});

// If access token expired, refresh and retry
if (response.status === 401) {
const refreshed = await refreshAccessToken();
if (!refreshed) {
// Refresh failed; user must log in again
window.location.href = "/login";
return;
}

response = await fetch(url, {
...options,
headers: {
...options.headers,
"Authorization": `Bearer ${accessToken}`
}
});
}

return response;
}

async function refreshAccessToken() {
const response = await fetch("/refresh", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ refresh_token: refreshToken })
});

if (response.ok) {
const data = await response.json();
accessToken = data.access_token;
return true;
}

// Refresh failed; clear tokens and log user out
accessToken = null;
refreshToken = null;
localStorage.removeItem("refresh_token");
return false;
}

Storing Refresh Tokens Securely

For production, store issued refresh tokens in a database (with user association and metadata). This enables revocation:

# models.py
from sqlalchemy import Column, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import uuid

Base = declarative_base()

class RefreshTokenRecord(Base):
__tablename__ = "refresh_tokens"

id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
username = Column(String, index=True)
token_hash = Column(String, unique=True) # Hash of refresh token
is_revoked = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
expires_at = Column(DateTime)
last_used_at = Column(DateTime, nullable=True)

When issuing a refresh token, hash and store it:

# security.py
from passlib.context import CryptContext
import secrets

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_token(token: str) -> str:
"""Hash a token for storage."""
return pwd_context.hash(token)

# main.py
@app.post("/login", response_model=TokenResponse)
async def login(credentials: LoginRequest, db: Session = Depends(get_db)):
"""Authenticate and issue tokens."""
user = fake_users.get(credentials.username)

if not user or not verify_password(credentials.password, user["hashed_password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")

access_token = create_access_token(data={"sub": user["username"]})
refresh_token = create_refresh_token(data={"sub": user["username"]})

# Store refresh token hash in database
refresh_token_record = RefreshTokenRecord(
username=user["username"],
token_hash=hash_token(refresh_token),
expires_at=datetime.utcnow() + timedelta(days=7)
)
db.add(refresh_token_record)
db.commit()

return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}

@app.post("/refresh", response_model=AccessTokenResponse)
async def refresh_access_token(
request: RefreshRequest,
db: Session = Depends(get_db)
):
"""Issue new access token; validate refresh token against database."""
payload = decode_token(request.refresh_token, expected_type="refresh")

if payload is None:
raise HTTPException(status_code=401, detail="Invalid refresh token")

username = payload.get("sub")

# Find and validate refresh token record
token_record = db.query(RefreshTokenRecord).filter(
RefreshTokenRecord.username == username,
RefreshTokenRecord.is_revoked == False
).first()

if not token_record or not pwd_context.verify(
request.refresh_token,
token_record.token_hash
):
raise HTTPException(status_code=401, detail="Invalid refresh token")

if token_record.expires_at < datetime.utcnow():
raise HTTPException(status_code=401, detail="Refresh token expired")

# Update last used
token_record.last_used_at = datetime.utcnow()
db.commit()

new_access_token = create_access_token(data={"sub": username})
return {"access_token": new_access_token, "token_type": "bearer"}

Logout and Token Revocation

Revoke refresh tokens to log users out across all devices:

@app.post("/logout")
async def logout(
current_user: dict = Depends(get_current_user),
request: RefreshRequest,
db: Session = Depends(get_db)
):
"""Revoke refresh token to log out user."""
token_record = db.query(RefreshTokenRecord).filter(
RefreshTokenRecord.username == current_user["username"]
).all()

for record in token_record:
record.is_revoked = True

db.commit()
return {"message": "Logged out from all devices"}

Rotating Refresh Tokens

For additional security, issue a new refresh token on each refresh (and revoke the old one):

def create_refresh_token_record(
username: str,
db: Session
) -> RefreshTokenRecord:
"""Create a new refresh token record."""
refresh_token = create_refresh_token(data={"sub": username})
token_record = RefreshTokenRecord(
username=username,
token_hash=hash_token(refresh_token),
expires_at=datetime.utcnow() + timedelta(days=7)
)
db.add(token_record)
db.commit()
return refresh_token

@app.post("/refresh", response_model=dict)
async def refresh_access_token(
request: RefreshRequest,
db: Session = Depends(get_db)
):
"""Issue new tokens; rotate refresh token."""
# Validate old refresh token
payload = decode_token(request.refresh_token, expected_type="refresh")

if payload is None:
raise HTTPException(status_code=401, detail="Invalid refresh token")

username = payload.get("sub")

# Revoke old refresh token
old_record = db.query(RefreshTokenRecord).filter(
RefreshTokenRecord.username == username
).first()

if old_record:
old_record.is_revoked = True

# Issue new access token and new refresh token
new_access_token = create_access_token(data={"sub": username})
new_refresh_token = create_refresh_token_record(username, db)

return {
"access_token": new_access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer"
}

Key Takeaways

  • Access tokens are short-lived (15 min); refresh tokens are long-lived (7 days).
  • Clients use refresh tokens to silently renew access without user interaction.
  • Store refresh token hashes in a database for revocation and validation.
  • Logout revokes all refresh tokens; users must re-authenticate on next access.
  • Token rotation (issuing new refresh token on each use) prevents token replay.

Frequently Asked Questions

What if a refresh token is stolen?

An attacker can issue new access tokens for 7 days (or until the refresh token expires). You can revoke it immediately via the database, invalidating all derived access tokens. Detect compromised tokens via unusual access patterns (IP addresses, user agents).

Should I store refresh tokens in httpOnly cookies or localStorage?

httpOnly cookies (cannot be read by JavaScript) are safer but vulnerable to CSRF. localStorage is simpler but vulnerable to XSS. A hybrid: store in httpOnly cookie + CSRF token. For maximum security in SPAs, use httpOnly cookies with SameSite attribute.

Can I use the same token for both access and refresh?

No. The server must distinguish between them (using the type claim). If an access token is leaked, an attacker could misuse it as a refresh token, extending their access indefinitely.

What's a reasonable refresh token lifetime?

7 days is common for web apps; 30 days for mobile apps (users expect to stay logged in). For high-security APIs, 1 day. For internal services, longer (30+ days). Balance convenience with revocation responsiveness.

Further Reading