Skip to main content

Authentication in GraphQL APIs

Authentication verifies who a user is; authorization controls what they can access. This article teaches you to implement JWT authentication in Strawberry, protect resolvers based on user identity, and enforce field-level authorization. By the end, you'll have a production-grade API where users log in, receive tokens, and only see their own data.

I learned authorization the hard way: shipping an API without field-level checks, then discovering that changing one line in a GraphQL query revealed other users' private data. This article prevents that mistake by teaching defense-in-depth: authenticate requests, authorize resolvers, and verify field access.

JWT Authentication Flow

JSON Web Tokens (JWT) encode user info (ID, roles) in a signed token. The flow:

  1. Client sends login credentials (email, password) to a mutation.
  2. Server verifies credentials, creates a JWT token.
  3. Client includes the token in subsequent requests (Authorization header: Bearer <token>).
  4. Server validates the token and extracts the user ID.

Define login and token verification:

import strawberry
from typing import Optional
import jwt
from datetime import datetime, timedelta

SECRET = "your-secret-key" # Use environment variable in production.

@strawberry.type
class AuthPayload:
token: str
user_id: int

@strawberry.input
class LoginInput:
email: str
password: str

@strawberry.type
class Mutation:
@strawberry.mutation
async def login(self, input: LoginInput) -> Optional[AuthPayload]:
"""Authenticate a user and return a JWT token."""
# Fetch user from database and verify password.
user = await db.fetch_user_by_email(input.email)
if not user or not verify_password(input.password, user.password_hash):
return None # Or raise GraphQLError for clearer error messaging.

# Create a JWT token.
token = jwt.encode(
{
'user_id': user.id,
'email': user.email,
'exp': datetime.utcnow() + timedelta(hours=24) # Expires in 24 hours.
},
SECRET,
algorithm='HS256'
)

return AuthPayload(token=token, user_id=user.id)

def verify_password(password: str, hash: str) -> bool:
"""Verify a password against a bcrypt hash."""
import bcrypt
return bcrypt.checkpw(password.encode(), hash.encode())

schema = strawberry.Schema(query=Query, mutation=Mutation)

Client calls:

mutation {
login(input: { email: "[email protected]", password: "secret123" }) {
token
userId
}
}

Response:

{
"data": {
"login": {
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjogMX0...",
"userId": 1
}
}
}

Extracting User from Token in Context

When a client includes the token in requests, extract it from the Authorization header:

from starlette.requests import Request

def get_context(request: Request) -> dict:
"""Extract user from JWT token in the Authorization header."""
user_id = None

auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:] # Remove 'Bearer ' prefix.
try:
payload = jwt.decode(token, SECRET, algorithms=['HS256'])
user_id = payload['user_id']
except jwt.InvalidTokenError:
pass # Token is invalid; user_id remains None.

return {
'db': db,
'user_id': user_id, # None if not authenticated.
'user': None if not user_id else await db.fetch_user(user_id),
}

In FastAPI:

from fastapi import FastAPI, Depends
from starlette.requests import Request

app = FastAPI()
graphql_app = GraphQL(schema, context_getter=get_context)

app.add_route("/graphql", graphql_app)

FastAPI passes the Request object to the context getter automatically.

Field-Level Authorization

Protect resolvers so only authorized users can access them:

import strawberry
from strawberry.permission import BasePermission

class IsAuthenticated(BasePermission):
"""Only authenticated users (with a valid token) can access this field."""

message = "Not authenticated"

def has_permission(self, source, info: strawberry.Info, **kwargs) -> bool:
return info.context.get('user_id') is not None

class IsOwner(BasePermission):
"""Only the user who created this resource can access it."""

message = "Not authorized"

def has_permission(self, source, info: strawberry.Info, **kwargs) -> bool:
user_id = info.context.get('user_id')
# Assume 'source' (the parent object) has an 'owner_id' field.
return source and source.owner_id == user_id

Apply permissions to resolvers:

@strawberry.type
class Post:
id: int
title: str
owner_id: int
content: str

@strawberry.field(permission_classes=[IsOwner])
def content(self) -> str:
"""Only the post owner can read the full content."""
return self.content

@strawberry.type
class Query:
@strawberry.field
async def me(self, info: strawberry.Info) -> Optional["User"]:
"""Get the current authenticated user."""
user_id = info.context.get('user_id')
if not user_id:
return None
return await db.fetch_user(user_id)

@strawberry.field(permission_classes=[IsAuthenticated])
async def my_posts(self, info: strawberry.Info) -> list[Post]:
"""Fetch the current user's posts."""
user_id = info.context.get('user_id')
return await db.fetch_posts_by_owner(user_id)

A client without a token (no Authorization header) gets an error when accessing my_posts. A client with a token but trying to read a post owned by another user gets an error on the content field.

Role-Based Access Control (RBAC)

Extend permissions to check user roles:

import strawberry

@strawberry.enum
class Role(Enum):
ADMIN = "admin"
MODERATOR = "moderator"
USER = "user"

class HasRole(BasePermission):
"""Allow access if user has one of the specified roles."""

def __init__(self, allowed_roles: list[Role]):
self.allowed_roles = allowed_roles

def has_permission(self, source, info: strawberry.Info, **kwargs) -> bool:
user = info.context.get('user')
return user and user.role in self.allowed_roles

@strawberry.type
class Mutation:
@strawberry.mutation(permission_classes=[HasRole([Role.ADMIN])])
async def delete_user(self, user_id: int) -> bool:
"""Delete a user (admin only)."""
await db.delete_user(user_id)
return True

Only users with the ADMIN role can call delete_user. Others get a permission error.

Refresh Tokens and Token Rotation

JWT tokens expire (we set exp: 24 hours above). Refresh tokens allow clients to get a new token without re-logging in:

@strawberry.type
class AuthPayload:
access_token: str # Short-lived, 1 hour.
refresh_token: str # Long-lived, 30 days.
user_id: int

@strawberry.type
class Mutation:
@strawberry.mutation
async def login(self, input: LoginInput) -> Optional[AuthPayload]:
"""Log in and return access + refresh tokens."""
user = await db.fetch_user_by_email(input.email)
if not user or not verify_password(input.password, user.password_hash):
return None

access_token = jwt.encode(
{
'user_id': user.id,
'exp': datetime.utcnow() + timedelta(hours=1),
'type': 'access'
},
SECRET,
algorithm='HS256'
)

refresh_token = jwt.encode(
{
'user_id': user.id,
'exp': datetime.utcnow() + timedelta(days=30),
'type': 'refresh'
},
SECRET,
algorithm='HS256'
)

return AuthPayload(
access_token=access_token,
refresh_token=refresh_token,
user_id=user.id
)

@strawberry.mutation
async def refresh_token(self, refresh_token: str) -> Optional[AuthPayload]:
"""Exchange a refresh token for a new access token."""
try:
payload = jwt.decode(refresh_token, SECRET, algorithms=['HS256'])
if payload['type'] != 'refresh':
return None

user_id = payload['user_id']
user = await db.fetch_user(user_id)

new_access_token = jwt.encode(
{
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(hours=1),
'type': 'access'
},
SECRET,
algorithm='HS256'
)

return AuthPayload(
access_token=new_access_token,
refresh_token=refresh_token, # Reuse the same refresh token.
user_id=user_id
)
except jwt.InvalidTokenError:
return None

Clients store the refresh token securely (httpOnly cookie) and use it to get a new access token every hour.

Error Handling for Auth

Return clear error messages:

import strawberry

class AuthError(Exception):
"""Base auth error."""
pass

class InvalidCredentials(AuthError):
"""User email/password don't match."""
pass

@strawberry.type
class Mutation:
@strawberry.mutation
async def login(self, input: LoginInput) -> Optional[AuthPayload]:
"""Log in."""
user = await db.fetch_user_by_email(input.email)
if not user:
raise strawberry.errors.GraphQLError(
"Invalid email or password.",
extensions={"code": "INVALID_CREDENTIALS"}
)

if not verify_password(input.password, user.password_hash):
raise strawberry.errors.GraphQLError(
"Invalid email or password.",
extensions={"code": "INVALID_CREDENTIALS"}
)

token = jwt.encode(...)
return AuthPayload(token=token, user_id=user.id)

Never reveal whether an email exists (to prevent account enumeration). Always return "Invalid email or password."

Comparison: Auth Patterns

PatternUse CaseSecurity
API KeySimple public API, read-only accessLow (keys can be leaked)
JWT (short-lived)Web app, mobile app, SPAHigh (expires quickly)
OAuth2Third-party login (Google, GitHub)High (delegated to provider)
Session cookiesTraditional web appMedium (CSRF risk)

Key Takeaways

  • JWT tokens encode user info and are signed; clients include them in Authorization headers.
  • Use context to extract and store the authenticated user ID and object.
  • Permissions (BasePermission subclasses) check user identity and roles at the resolver level.
  • Refresh tokens allow clients to get new access tokens without re-logging in.
  • Always use HTTPS in production to prevent token interception.
  • Never reveal whether a user exists (return generic error messages for invalid credentials).

Frequently Asked Questions

Should I store tokens in localStorage or cookies?

localStorage is accessible to JavaScript, increasing XSS risk. httpOnly cookies are more secure but vulnerable to CSRF. Use httpOnly + SameSite=Strict cookies for web apps; use secure storage (Keychain, Keystore) for mobile apps.

What if a token is compromised?

Short-lived tokens (1 hour) limit damage. Users can also revoke all their tokens by changing their password. For extra security, implement a token blacklist (Redis) to revoke tokens immediately.

Can I use GraphQL to check if a token is valid?

Yes, include a @strawberry.field async def verify_token(self) -> bool: in Query. However, don't rely on this for authorization—always verify tokens server-side in resolvers.

How do I handle logout?

Logout is implicit: the client deletes the token. If you want to revoke tokens immediately (e.g., on account deletion), store revoked token IDs in Redis and check them in the context getter.

Further Reading