Skip to main content

API Security Testing and Rate Limiting

No authentication system is secure until tested. Attackers try credential stuffing (testing stolen passwords), token replay (reusing intercepted tokens), permission bypasses (escalating privileges), and brute force (guessing weak tokens). Rate limiting (capping requests per user/IP) slows attackers and protects your API from overload. This article covers practical security testing and rate-limiting strategies.

Security Testing: Common Vulnerabilities

1. Credential Stuffing and Brute Force

Vulnerability: Attackers automate login attempts with leaked credentials or dictionary words.

Test it:

# test_bruteforce.py
import asyncio
import httpx
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

async def test_brute_force_login():
"""Simulate brute force attack on login endpoint."""
passwords = ["password", "123456", "admin", "letmein"]

for password in passwords:
response = client.post(
"/login",
json={"username": "admin", "password": password}
)

if response.status_code == 200:
print(f"Success: password is '{password}'")
return

print("Brute force unsuccessful")

Mitigation: Implement rate limiting on login endpoint (max 5 attempts per IP per minute).

2. Token Expiration and Revocation

Vulnerability: Expired tokens still accepted; revoked tokens still work.

Test it:

# test_tokens.py
from datetime import datetime, timedelta
from security import create_access_token, decode_token
import time

def test_expired_token_rejected():
"""Test that expired tokens are rejected."""
token = create_access_token(
data={"sub": "alice"},
expires_delta=timedelta(seconds=1)
)

# Token is valid now
payload = decode_token(token)
assert payload is not None

# Wait for expiration
time.sleep(2)

# Token should be invalid
payload = decode_token(token)
assert payload is None, "Expired token should be rejected"

def test_revoked_token_invalid():
"""Test that revoked tokens are invalid."""
token = create_access_token(data={"sub": "alice"})

# Token is valid
payload = decode_token(token)
assert payload is not None

# Revoke token (add to blacklist)
revoked_tokens.add(token)

# Token should be invalid
payload = decode_token(token)
assert payload is None, "Revoked token should be rejected"

Mitigation: Always check token expiration (exp claim). Maintain a revocation list (database or Redis) for long-lived tokens.

3. Privilege Escalation

Vulnerability: Users modify JWT claims in transit or manipulate role fields.

Test it:

# test_privilege_escalation.py
def test_user_cannot_elevate_own_privileges():
"""Ensure users can't self-elevate roles."""
# User attempts to modify JWT payload
token = create_access_token(data={"sub": "alice", "roles": ["viewer"]})

# Attacker tries to add "admin" role (JWT is not encrypted, only signed)
# If we check signature properly, this will fail

response = client.get(
"/admin/users",
headers={"Authorization": f"Bearer {fake_admin_token}"}
)

assert response.status_code == 403, "User without admin role should be denied"

def test_scope_enforcement():
"""Ensure scopes are strictly enforced."""
token = create_access_token(
data={"sub": "alice"},
scopes=["users:read"] # Only read, not write
)

response = client.post(
"/users",
headers={"Authorization": f"Bearer {token}"},
json={"username": "bob", "email": "[email protected]"}
)

assert response.status_code == 403, "Should reject write without scope"

Mitigation: Sign tokens with a secret key the client can't know. Verify signatures on every request. Never trust user-supplied role claims.

4. Token Replay Attacks

Vulnerability: Intercepted tokens reused by attackers.

Test it:

# test_replay.py
def test_token_replay_prevention():
"""Test that tokens are tied to specific sessions/devices."""
token = create_access_token(data={"sub": "alice"})

# Valid request from original client
response = client.get(
"/profile",
headers={"Authorization": f"Bearer {token}"},
cookies={"session": "original-session"}
)
assert response.status_code == 200

# Replayed request from different device/IP (should fail or warn)
response = client.get(
"/profile",
headers={"Authorization": f"Bearer {token}"},
cookies={"session": "different-session"}
)
# This might still succeed, but you could log/flag the anomaly

Mitigation: Include context in tokens (session ID, device fingerprint, IP). Use short-lived tokens (5-15 min). Implement device tracking and anomaly detection.

Rate Limiting Implementation

Rate limiting prevents brute force, DoS, and accidental overload.

Simple Rate Limiting with slowapi

pip install slowapi
# main.py
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()
limiter = Limiter(key_func=get_remote_address)

@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request, exc):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"}
)

# 5 login attempts per minute per IP
@app.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginRequest):
# ... login logic ...
pass

# 100 requests per hour per user
@app.get("/data")
@limiter.limit("100/hour")
async def get_data(request: Request, current_user: dict = Depends(get_current_user)):
return {"data": "..."}

Advanced Rate Limiting with Redis

For distributed systems, store rate-limit counters in Redis:

# rate_limiter.py
import redis
import time
from typing import Tuple

redis_client = redis.Redis(host="localhost", port=6379)

def rate_limit(
identifier: str,
max_requests: int,
window_seconds: int
) -> Tuple[bool, int]:
"""Check if identifier exceeded rate limit.

Returns: (is_allowed, remaining_requests)
"""
key = f"ratelimit:{identifier}"
current = redis_client.incr(key)

if current == 1:
redis_client.expire(key, window_seconds)

ttl = redis_client.ttl(key)
is_allowed = current <= max_requests
remaining = max(0, max_requests - current)

return is_allowed, remaining

# main.py
@app.post("/login")
async def login(
request: Request,
credentials: LoginRequest
):
"""Rate limit by IP address."""
ip = request.client.host
is_allowed, remaining = rate_limit(f"login:{ip}", max_requests=5, window_seconds=60)

if not is_allowed:
return JSONResponse(
status_code=429,
content={
"detail": "Too many login attempts. Try again later.",
"retry_after": 60
}
)

# ... login logic ...
return {"access_token": token, "token_type": "bearer"}

@app.get("/data")
async def get_data(
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Rate limit per authenticated user."""
user_id = current_user["username"]
is_allowed, remaining = rate_limit(
f"api:{user_id}",
max_requests=1000,
window_seconds=3600
)

if not is_allowed:
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"}
)

return {
"data": "...",
"X-RateLimit-Remaining": remaining
}

Tiered Rate Limiting

Different limits for different user tiers:

# main.py
RATE_LIMITS = {
"free": {"requests": 100, "window": 3600}, # 100/hour
"pro": {"requests": 10000, "window": 3600}, # 10k/hour
"enterprise": {"requests": 1000000, "window": 3600}
}

@app.get("/api/data")
async def get_data(
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Rate limit based on subscription tier."""
user = db.query(User).filter(User.username == current_user["username"]).first()
tier = user.subscription_tier # "free", "pro", "enterprise"
limit_config = RATE_LIMITS[tier]

is_allowed, remaining = rate_limit(
f"api:{user.id}",
max_requests=limit_config["requests"],
window_seconds=limit_config["window"]
)

if not is_allowed:
return JSONResponse(
status_code=429,
content={"detail": f"Rate limit exceeded for {tier} tier"}
)

return {
"data": "...",
"X-RateLimit-Limit": limit_config["requests"],
"X-RateLimit-Remaining": remaining
}

Comprehensive Security Tests

# test_security.py
import pytest
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

class TestAuthentication:
def test_missing_token_rejected(self):
"""Endpoint without token should 401."""
response = client.get("/protected")
assert response.status_code == 401

def test_invalid_token_rejected(self):
"""Invalid token should 401."""
response = client.get(
"/protected",
headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401

def test_expired_token_rejected(self):
"""Expired token should 401."""
from security import create_access_token
from datetime import timedelta
import time

token = create_access_token(
data={"sub": "alice"},
expires_delta=timedelta(seconds=1)
)
time.sleep(2)

response = client.get(
"/protected",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 401

class TestAuthorization:
def test_scope_enforcement(self):
"""User without scope denied."""
token = create_access_token(
data={"sub": "alice"},
scopes=["users:read"]
)

response = client.post(
"/users",
headers={"Authorization": f"Bearer {token}"},
json={"username": "bob"}
)
assert response.status_code == 403

def test_role_enforcement(self):
"""User without admin role denied."""
token = create_access_token(
data={"sub": "alice"},
roles=["user"]
)

response = client.get(
"/admin/users",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 403

class TestRateLimiting:
def test_rate_limit_enforced(self):
"""Requests exceeding limit rejected."""
for i in range(10):
response = client.get(
"/public",
headers={"X-Forwarded-For": "192.168.1.1"}
)
if i >= 5: # After 5 requests
assert response.status_code == 429
else:
assert response.status_code == 200

Key Takeaways

  • Test for token expiration, revocation, privilege escalation, and replay attacks before production.
  • Rate limiting (per IP, per user, per tier) prevents brute force and overload.
  • Use short-lived tokens (15 min) and refresh tokens to limit replay damage.
  • Monitor failed authentication attempts and flag anomalies (unusual IP, device, time).
  • Log all security-relevant events (token validation, privilege checks, rate limit hits).

Frequently Asked Questions

How do I detect compromised API keys?

Monitor API key usage: unusual IP addresses, spike in request volume, access to sensitive endpoints, time-of-day anomalies. Alert on immediate revocation. Compare against baseline (geographic, temporal).

Should I rate limit all endpoints equally?

No. Sensitive endpoints (login, password reset) get stricter limits (5/min). Public endpoints (list posts) are more lenient (100/min). Admin endpoints are tightest (user-based, not IP-based).

What should I log for security audits?

Log: timestamp, endpoint, authenticated user/service, status code, scopes/roles checked, rate limit state, errors. Never log passwords or tokens. Use structured logs (JSON) for easy analysis.

How do I respond to a security breach?

Immediately revoke tokens (database blacklist), force password resets, rotate API keys, alert affected users, analyze logs to determine breach scope and timeline. Have a documented incident response plan.

Further Reading