Securing Microservices: Service-to-Service Auth
In microservices, API-to-API calls replace function calls. Service A must prove its identity to Service B. Unlike user authentication (password + JWT), service-to-service auth uses certificates (mutual TLS), shared keys, or signed requests. The goal is zero-trust: every service verifies every call, and compromising one service doesn't automatically grant access to others.
Service-to-Service Authentication Patterns
| Pattern | Mechanism | Best For | Pros | Cons |
|---|---|---|---|---|
| Mutual TLS | Client & server certs | Internal networks | Transparent, no app code | Complex PKI setup |
| JWT with Private Key | Service signs JWT | Microservices | Self-contained, verifiable | Token revocation hard |
| Shared Secrets | API key between services | Simple microservices | Easy setup | Key rotation complex |
| OAuth2 Client Credentials | Service obtains token from auth server | Distributed systems | Centralized auth, easy revocation | Extra network call |
Mutual TLS (mTLS)
Mutual TLS (mTLS) is TLS where both client and server present certificates. The client certificate proves the client's identity; the server verifies it.
Generate Certificates
For development:
# Generate CA (certificate authority)
openssl req -new -x509 -days 365 -keyout ca-key.pem -out ca-cert.pem \
-subj "/CN=internal-ca"
# Generate service certificate
openssl req -new -keyout service-key.pem -out service.csr \
-subj "/CN=service-a"
# Sign certificate with CA
openssl x509 -req -days 365 -in service.csr \
-CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial \
-out service-cert.pem
Enforce mTLS in FastAPI
Use Python's ssl module:
# main.py
import ssl
import uvicorn
from fastapi import FastAPI
app = FastAPI()
# Load certificates
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(
certfile="service-cert.pem",
keyfile="service-key.pem"
)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations("ca-cert.pem")
@app.get("/internal/data")
async def internal_endpoint():
"""Internal endpoint accessed via mTLS."""
return {"data": "sensitive"}
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl_keyfile="service-key.pem",
ssl_certfile="service-cert.pem",
ssl_ca_certs="ca-cert.pem"
)
Service A Calls Service B with mTLS
# service_a.py
import httpx
import ssl
async def call_service_b():
"""Call Service B with client certificate."""
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_cert_chain(
certfile="service-a-cert.pem",
keyfile="service-a-key.pem"
)
ssl_context.load_verify_locations("ca-cert.pem")
async with httpx.AsyncClient(verify=ssl_context) as client:
response = await client.get("https://service-b:8443/internal/data")
return response.json()
JWT with Asymmetric Signatures
Instead of mTLS, services can sign JWTs with a private key and verify with the public key. This works across network boundaries and is easier to audit.
Generate RSA Key Pair
# Private key (Service keeps secret)
openssl genrsa -out private-key.pem 2048
# Public key (Services share)
openssl rsa -in private-key.pem -pubout -out public-key.pem
Service A Issues a JWT
# service_a.py
from jose import jwt
from datetime import datetime, timedelta
import os
PRIVATE_KEY = open("private-key.pem").read()
def create_service_jwt(service_name: str, audience: str) -> str:
"""Create a JWT signed with private key."""
payload = {
"iss": "service-a", # Issuer
"sub": "service-a", # Subject
"aud": audience, # Audience (e.g., "service-b")
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(minutes=5)
}
token = jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")
return token
async def call_service_b():
"""Call Service B with signed JWT."""
token = create_service_jwt("service-a", "service-b")
async with httpx.AsyncClient() as client:
response = await client.get(
"https://service-b:443/internal/data",
headers={"Authorization": f"Bearer {token}"}
)
return response.json()
Service B Verifies the JWT
# service_b.py
from fastapi import FastAPI, Depends, HTTPException
from jose import jwt, JWTError
app = FastAPI()
PUBLIC_KEY = open("public-key.pem").read()
async def verify_service_jwt(authorization: str = Header()) -> dict:
"""Verify service JWT signed with private key."""
try:
token = authorization.split(" ")[1]
payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"])
# Verify audience
if payload.get("aud") != "service-b":
raise HTTPException(status_code=403, detail="Token not for this service")
return payload
except (JWTError, IndexError, ValueError):
raise HTTPException(status_code=401, detail="Invalid token")
@app.get("/internal/data")
async def internal_endpoint(token: dict = Depends(verify_service_jwt)):
"""Service-to-service endpoint."""
return {
"data": "sensitive",
"called_by": token.get("iss")
}
OAuth2 Client Credentials Flow
For centralized auth in large microservices, use OAuth2 client credentials: services authenticate with an auth server, receive tokens, then call other services.
Auth Server Setup
# auth_server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
# Service credentials (stored securely)
service_credentials = {
"service-a": "secret-key-a",
"service-b": "secret-key-b"
}
class ClientCredentialsRequest(BaseModel):
client_id: str
client_secret: str
grant_type: str # "client_credentials"
@app.post("/token")
async def get_service_token(request: ClientCredentialsRequest):
"""Issue access token to service."""
if request.grant_type != "client_credentials":
raise HTTPException(status_code=400, detail="Invalid grant type")
expected_secret = service_credentials.get(request.client_id)
if not expected_secret or expected_secret != request.client_secret:
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_access_token(
data={"sub": request.client_id, "client": True}
)
return {
"access_token": token,
"token_type": "bearer",
"expires_in": 3600
}
Service A Obtains and Uses Token
# service_a.py
class ServiceClient:
def __init__(self, client_id: str, client_secret: str, auth_server_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.auth_server_url = auth_server_url
self.token = None
self.token_expiry = None
async def get_token(self) -> str:
"""Get access token from auth server."""
if self.token and self.token_expiry > datetime.utcnow():
return self.token # Reuse unexpired token
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.auth_server_url}/token",
json={
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials"
}
)
if response.status_code != 200:
raise Exception("Failed to get service token")
data = response.json()
self.token = data["access_token"]
self.token_expiry = datetime.utcnow() + timedelta(seconds=data["expires_in"] - 60)
return self.token
async def call_service(self, service_url: str, endpoint: str) -> dict:
"""Call another service with token."""
token = await self.get_token()
async with httpx.AsyncClient() as client:
response = await client.get(
f"{service_url}{endpoint}",
headers={"Authorization": f"Bearer {token}"}
)
return response.json()
# Usage
client = ServiceClient(
client_id="service-a",
client_secret="secret-key-a",
auth_server_url="https://auth-server:443"
)
data = await client.call_service("https://service-b:443", "/internal/data")
Request Signing (HMAC)
For simpler setups, sign requests using HMAC:
# service_a.py
import hmac
import hashlib
import json
from datetime import datetime
SHARED_SECRET = b"shared-key-between-services"
def sign_request(body: dict, timestamp: str) -> str:
"""Create HMAC signature for request."""
message = f"{json.dumps(body, sort_keys=True)}{timestamp}"
signature = hmac.new(SHARED_SECRET, message.encode(), hashlib.sha256).hexdigest()
return signature
async def call_service_b_signed(data: dict):
"""Call Service B with signed request."""
timestamp = datetime.utcnow().isoformat()
signature = sign_request(data, timestamp)
async with httpx.AsyncClient() as client:
response = await client.post(
"https://service-b:443/process",
json=data,
headers={
"X-Signature": signature,
"X-Timestamp": timestamp,
"X-Service": "service-a"
}
)
return response.json()
Service B Verifies Signature
# service_b.py
from fastapi import FastAPI, Header, HTTPException
from datetime import datetime, timedelta
import hmac
import hashlib
import json
app = FastAPI()
SHARED_SECRET = b"shared-key-between-services"
@app.post("/process")
async def process_request(
request: dict,
x_signature: str = Header(),
x_timestamp: str = Header(),
x_service: str = Header()
):
"""Verify signed request from another service."""
# Prevent replay attacks: timestamp must be recent
request_time = datetime.fromisoformat(x_timestamp)
if (datetime.utcnow() - request_time).total_seconds() > 60:
raise HTTPException(status_code=401, detail="Request too old")
# Verify signature
message = f"{json.dumps(request, sort_keys=True)}{x_timestamp}"
expected_signature = hmac.new(SHARED_SECRET, message.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(x_signature, expected_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
return {"status": "processed", "from": x_service}
Key Takeaways
- Mutual TLS provides transparent, certificate-based service authentication.
- JWT with asymmetric signatures scales better and is easier to audit than shared secrets.
- OAuth2 client credentials centralizes auth in high-scale microservices.
- HMAC signing is simpler for private networks but more brittle than certificates.
- Always verify the caller's identity on every inter-service call (zero-trust).
Frequently Asked Questions
Should I use mTLS or JWT for microservices?
mTLS is transparent (no app code) but complex operationally (certificate rotation). JWT is simpler but requires explicit code. Use mTLS if your infrastructure supports it (Kubernetes, service mesh); use JWT for explicit, auditableauth.
How do I rotate service certificates without downtime?
Implement certificate rotation with an overlap period: issue new certificates a week before expiry, allow both old and new certificates, then retire the old ones. Automate with tools like cert-manager in Kubernetes.
Can a compromised service access all other services?
In zero-trust, no. If Service A is compromised, an attacker has Service A's credentials. They can call Service B (which checks Service A's identity) but not Service C unless they also compromise Service A's credentials on every target. Proper network policies and secret management limit blast radius.
What's the difference between service-to-service auth and API authentication?
User auth proves who the user is (and issues tokens). Service-to-service auth proves which service is calling (mutual identity). Both use similar mechanisms (JWTs, certificates) but for different principals.