Authentication in GraphQL APIs
Authentication verifies who a user is; authorization controls what they can access. This article teaches you to implement JWT authentication in Strawberry, protect resolvers based on user identity, and enforce field-level authorization. By the end, you'll have a production-grade API where users log in, receive tokens, and only see their own data.
I learned authorization the hard way: shipping an API without field-level checks, then discovering that changing one line in a GraphQL query revealed other users' private data. This article prevents that mistake by teaching defense-in-depth: authenticate requests, authorize resolvers, and verify field access.
JWT Authentication Flow
JSON Web Tokens (JWT) encode user info (ID, roles) in a signed token. The flow:
- Client sends login credentials (email, password) to a mutation.
- Server verifies credentials, creates a JWT token.
- Client includes the token in subsequent requests (Authorization header:
Bearer <token>). - Server validates the token and extracts the user ID.
Define login and token verification:
import strawberry
from typing import Optional
import jwt
from datetime import datetime, timedelta
SECRET = "your-secret-key" # Use environment variable in production.
@strawberry.type
class AuthPayload:
token: str
user_id: int
@strawberry.input
class LoginInput:
email: str
password: str
@strawberry.type
class Mutation:
@strawberry.mutation
async def login(self, input: LoginInput) -> Optional[AuthPayload]:
"""Authenticate a user and return a JWT token."""
# Fetch user from database and verify password.
user = await db.fetch_user_by_email(input.email)
if not user or not verify_password(input.password, user.password_hash):
return None # Or raise GraphQLError for clearer error messaging.
# Create a JWT token.
token = jwt.encode(
{
'user_id': user.id,
'email': user.email,
'exp': datetime.utcnow() + timedelta(hours=24) # Expires in 24 hours.
},
SECRET,
algorithm='HS256'
)
return AuthPayload(token=token, user_id=user.id)
def verify_password(password: str, hash: str) -> bool:
"""Verify a password against a bcrypt hash."""
import bcrypt
return bcrypt.checkpw(password.encode(), hash.encode())
schema = strawberry.Schema(query=Query, mutation=Mutation)
Client calls:
mutation {
login(input: { email: "[email protected]", password: "secret123" }) {
token
userId
}
}
Response:
{
"data": {
"login": {
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjogMX0...",
"userId": 1
}
}
}
Extracting User from Token in Context
When a client includes the token in requests, extract it from the Authorization header:
from starlette.requests import Request
def get_context(request: Request) -> dict:
"""Extract user from JWT token in the Authorization header."""
user_id = None
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:] # Remove 'Bearer ' prefix.
try:
payload = jwt.decode(token, SECRET, algorithms=['HS256'])
user_id = payload['user_id']
except jwt.InvalidTokenError:
pass # Token is invalid; user_id remains None.
return {
'db': db,
'user_id': user_id, # None if not authenticated.
'user': None if not user_id else await db.fetch_user(user_id),
}
In FastAPI:
from fastapi import FastAPI, Depends
from starlette.requests import Request
app = FastAPI()
graphql_app = GraphQL(schema, context_getter=get_context)
app.add_route("/graphql", graphql_app)
FastAPI passes the Request object to the context getter automatically.
Field-Level Authorization
Protect resolvers so only authorized users can access them:
import strawberry
from strawberry.permission import BasePermission
class IsAuthenticated(BasePermission):
"""Only authenticated users (with a valid token) can access this field."""
message = "Not authenticated"
def has_permission(self, source, info: strawberry.Info, **kwargs) -> bool:
return info.context.get('user_id') is not None
class IsOwner(BasePermission):
"""Only the user who created this resource can access it."""
message = "Not authorized"
def has_permission(self, source, info: strawberry.Info, **kwargs) -> bool:
user_id = info.context.get('user_id')
# Assume 'source' (the parent object) has an 'owner_id' field.
return source and source.owner_id == user_id
Apply permissions to resolvers:
@strawberry.type
class Post:
id: int
title: str
owner_id: int
content: str
@strawberry.field(permission_classes=[IsOwner])
def content(self) -> str:
"""Only the post owner can read the full content."""
return self.content
@strawberry.type
class Query:
@strawberry.field
async def me(self, info: strawberry.Info) -> Optional["User"]:
"""Get the current authenticated user."""
user_id = info.context.get('user_id')
if not user_id:
return None
return await db.fetch_user(user_id)
@strawberry.field(permission_classes=[IsAuthenticated])
async def my_posts(self, info: strawberry.Info) -> list[Post]:
"""Fetch the current user's posts."""
user_id = info.context.get('user_id')
return await db.fetch_posts_by_owner(user_id)
A client without a token (no Authorization header) gets an error when accessing my_posts. A client with a token but trying to read a post owned by another user gets an error on the content field.
Role-Based Access Control (RBAC)
Extend permissions to check user roles:
import strawberry
@strawberry.enum
class Role(Enum):
ADMIN = "admin"
MODERATOR = "moderator"
USER = "user"
class HasRole(BasePermission):
"""Allow access if user has one of the specified roles."""
def __init__(self, allowed_roles: list[Role]):
self.allowed_roles = allowed_roles
def has_permission(self, source, info: strawberry.Info, **kwargs) -> bool:
user = info.context.get('user')
return user and user.role in self.allowed_roles
@strawberry.type
class Mutation:
@strawberry.mutation(permission_classes=[HasRole([Role.ADMIN])])
async def delete_user(self, user_id: int) -> bool:
"""Delete a user (admin only)."""
await db.delete_user(user_id)
return True
Only users with the ADMIN role can call delete_user. Others get a permission error.
Refresh Tokens and Token Rotation
JWT tokens expire (we set exp: 24 hours above). Refresh tokens allow clients to get a new token without re-logging in:
@strawberry.type
class AuthPayload:
access_token: str # Short-lived, 1 hour.
refresh_token: str # Long-lived, 30 days.
user_id: int
@strawberry.type
class Mutation:
@strawberry.mutation
async def login(self, input: LoginInput) -> Optional[AuthPayload]:
"""Log in and return access + refresh tokens."""
user = await db.fetch_user_by_email(input.email)
if not user or not verify_password(input.password, user.password_hash):
return None
access_token = jwt.encode(
{
'user_id': user.id,
'exp': datetime.utcnow() + timedelta(hours=1),
'type': 'access'
},
SECRET,
algorithm='HS256'
)
refresh_token = jwt.encode(
{
'user_id': user.id,
'exp': datetime.utcnow() + timedelta(days=30),
'type': 'refresh'
},
SECRET,
algorithm='HS256'
)
return AuthPayload(
access_token=access_token,
refresh_token=refresh_token,
user_id=user.id
)
@strawberry.mutation
async def refresh_token(self, refresh_token: str) -> Optional[AuthPayload]:
"""Exchange a refresh token for a new access token."""
try:
payload = jwt.decode(refresh_token, SECRET, algorithms=['HS256'])
if payload['type'] != 'refresh':
return None
user_id = payload['user_id']
user = await db.fetch_user(user_id)
new_access_token = jwt.encode(
{
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(hours=1),
'type': 'access'
},
SECRET,
algorithm='HS256'
)
return AuthPayload(
access_token=new_access_token,
refresh_token=refresh_token, # Reuse the same refresh token.
user_id=user_id
)
except jwt.InvalidTokenError:
return None
Clients store the refresh token securely (httpOnly cookie) and use it to get a new access token every hour.
Error Handling for Auth
Return clear error messages:
import strawberry
class AuthError(Exception):
"""Base auth error."""
pass
class InvalidCredentials(AuthError):
"""User email/password don't match."""
pass
@strawberry.type
class Mutation:
@strawberry.mutation
async def login(self, input: LoginInput) -> Optional[AuthPayload]:
"""Log in."""
user = await db.fetch_user_by_email(input.email)
if not user:
raise strawberry.errors.GraphQLError(
"Invalid email or password.",
extensions={"code": "INVALID_CREDENTIALS"}
)
if not verify_password(input.password, user.password_hash):
raise strawberry.errors.GraphQLError(
"Invalid email or password.",
extensions={"code": "INVALID_CREDENTIALS"}
)
token = jwt.encode(...)
return AuthPayload(token=token, user_id=user.id)
Never reveal whether an email exists (to prevent account enumeration). Always return "Invalid email or password."
Comparison: Auth Patterns
| Pattern | Use Case | Security |
|---|---|---|
| API Key | Simple public API, read-only access | Low (keys can be leaked) |
| JWT (short-lived) | Web app, mobile app, SPA | High (expires quickly) |
| OAuth2 | Third-party login (Google, GitHub) | High (delegated to provider) |
| Session cookies | Traditional web app | Medium (CSRF risk) |
Key Takeaways
- JWT tokens encode user info and are signed; clients include them in Authorization headers.
- Use context to extract and store the authenticated user ID and object.
- Permissions (BasePermission subclasses) check user identity and roles at the resolver level.
- Refresh tokens allow clients to get new access tokens without re-logging in.
- Always use HTTPS in production to prevent token interception.
- Never reveal whether a user exists (return generic error messages for invalid credentials).
Frequently Asked Questions
Should I store tokens in localStorage or cookies?
localStorage is accessible to JavaScript, increasing XSS risk. httpOnly cookies are more secure but vulnerable to CSRF. Use httpOnly + SameSite=Strict cookies for web apps; use secure storage (Keychain, Keystore) for mobile apps.
What if a token is compromised?
Short-lived tokens (1 hour) limit damage. Users can also revoke all their tokens by changing their password. For extra security, implement a token blacklist (Redis) to revoke tokens immediately.
Can I use GraphQL to check if a token is valid?
Yes, include a @strawberry.field async def verify_token(self) -> bool: in Query. However, don't rely on this for authorization—always verify tokens server-side in resolvers.
How do I handle logout?
Logout is implicit: the client deletes the token. If you want to revoke tokens immediately (e.g., on account deletion), store revoked token IDs in Redis and check them in the context getter.