Skip to main content

Securing Microservices: Service-to-Service Auth

In microservices, API-to-API calls replace function calls. Service A must prove its identity to Service B. Unlike user authentication (password + JWT), service-to-service auth uses certificates (mutual TLS), shared keys, or signed requests. The goal is zero-trust: every service verifies every call, and compromising one service doesn't automatically grant access to others.

Service-to-Service Authentication Patterns

PatternMechanismBest ForProsCons
Mutual TLSClient & server certsInternal networksTransparent, no app codeComplex PKI setup
JWT with Private KeyService signs JWTMicroservicesSelf-contained, verifiableToken revocation hard
Shared SecretsAPI key between servicesSimple microservicesEasy setupKey rotation complex
OAuth2 Client CredentialsService obtains token from auth serverDistributed systemsCentralized auth, easy revocationExtra network call

Mutual TLS (mTLS)

Mutual TLS (mTLS) is TLS where both client and server present certificates. The client certificate proves the client's identity; the server verifies it.

Generate Certificates

For development:

# Generate CA (certificate authority)
openssl req -new -x509 -days 365 -keyout ca-key.pem -out ca-cert.pem \
-subj "/CN=internal-ca"

# Generate service certificate
openssl req -new -keyout service-key.pem -out service.csr \
-subj "/CN=service-a"

# Sign certificate with CA
openssl x509 -req -days 365 -in service.csr \
-CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial \
-out service-cert.pem

Enforce mTLS in FastAPI

Use Python's ssl module:

# main.py
import ssl
import uvicorn
from fastapi import FastAPI

app = FastAPI()

# Load certificates
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(
certfile="service-cert.pem",
keyfile="service-key.pem"
)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations("ca-cert.pem")

@app.get("/internal/data")
async def internal_endpoint():
"""Internal endpoint accessed via mTLS."""
return {"data": "sensitive"}

if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl_keyfile="service-key.pem",
ssl_certfile="service-cert.pem",
ssl_ca_certs="ca-cert.pem"
)

Service A Calls Service B with mTLS

# service_a.py
import httpx
import ssl

async def call_service_b():
"""Call Service B with client certificate."""
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_cert_chain(
certfile="service-a-cert.pem",
keyfile="service-a-key.pem"
)
ssl_context.load_verify_locations("ca-cert.pem")

async with httpx.AsyncClient(verify=ssl_context) as client:
response = await client.get("https://service-b:8443/internal/data")
return response.json()

JWT with Asymmetric Signatures

Instead of mTLS, services can sign JWTs with a private key and verify with the public key. This works across network boundaries and is easier to audit.

Generate RSA Key Pair

# Private key (Service keeps secret)
openssl genrsa -out private-key.pem 2048

# Public key (Services share)
openssl rsa -in private-key.pem -pubout -out public-key.pem

Service A Issues a JWT

# service_a.py
from jose import jwt
from datetime import datetime, timedelta
import os

PRIVATE_KEY = open("private-key.pem").read()

def create_service_jwt(service_name: str, audience: str) -> str:
"""Create a JWT signed with private key."""
payload = {
"iss": "service-a", # Issuer
"sub": "service-a", # Subject
"aud": audience, # Audience (e.g., "service-b")
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(minutes=5)
}

token = jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")
return token

async def call_service_b():
"""Call Service B with signed JWT."""
token = create_service_jwt("service-a", "service-b")

async with httpx.AsyncClient() as client:
response = await client.get(
"https://service-b:443/internal/data",
headers={"Authorization": f"Bearer {token}"}
)
return response.json()

Service B Verifies the JWT

# service_b.py
from fastapi import FastAPI, Depends, HTTPException
from jose import jwt, JWTError

app = FastAPI()

PUBLIC_KEY = open("public-key.pem").read()

async def verify_service_jwt(authorization: str = Header()) -> dict:
"""Verify service JWT signed with private key."""
try:
token = authorization.split(" ")[1]
payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"])

# Verify audience
if payload.get("aud") != "service-b":
raise HTTPException(status_code=403, detail="Token not for this service")

return payload
except (JWTError, IndexError, ValueError):
raise HTTPException(status_code=401, detail="Invalid token")

@app.get("/internal/data")
async def internal_endpoint(token: dict = Depends(verify_service_jwt)):
"""Service-to-service endpoint."""
return {
"data": "sensitive",
"called_by": token.get("iss")
}

OAuth2 Client Credentials Flow

For centralized auth in large microservices, use OAuth2 client credentials: services authenticate with an auth server, receive tokens, then call other services.

Auth Server Setup

# auth_server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

# Service credentials (stored securely)
service_credentials = {
"service-a": "secret-key-a",
"service-b": "secret-key-b"
}

class ClientCredentialsRequest(BaseModel):
client_id: str
client_secret: str
grant_type: str # "client_credentials"

@app.post("/token")
async def get_service_token(request: ClientCredentialsRequest):
"""Issue access token to service."""
if request.grant_type != "client_credentials":
raise HTTPException(status_code=400, detail="Invalid grant type")

expected_secret = service_credentials.get(request.client_id)
if not expected_secret or expected_secret != request.client_secret:
raise HTTPException(status_code=401, detail="Invalid credentials")

token = create_access_token(
data={"sub": request.client_id, "client": True}
)

return {
"access_token": token,
"token_type": "bearer",
"expires_in": 3600
}

Service A Obtains and Uses Token

# service_a.py
class ServiceClient:
def __init__(self, client_id: str, client_secret: str, auth_server_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.auth_server_url = auth_server_url
self.token = None
self.token_expiry = None

async def get_token(self) -> str:
"""Get access token from auth server."""
if self.token and self.token_expiry > datetime.utcnow():
return self.token # Reuse unexpired token

async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.auth_server_url}/token",
json={
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials"
}
)

if response.status_code != 200:
raise Exception("Failed to get service token")

data = response.json()
self.token = data["access_token"]
self.token_expiry = datetime.utcnow() + timedelta(seconds=data["expires_in"] - 60)
return self.token

async def call_service(self, service_url: str, endpoint: str) -> dict:
"""Call another service with token."""
token = await self.get_token()

async with httpx.AsyncClient() as client:
response = await client.get(
f"{service_url}{endpoint}",
headers={"Authorization": f"Bearer {token}"}
)

return response.json()

# Usage
client = ServiceClient(
client_id="service-a",
client_secret="secret-key-a",
auth_server_url="https://auth-server:443"
)

data = await client.call_service("https://service-b:443", "/internal/data")

Request Signing (HMAC)

For simpler setups, sign requests using HMAC:

# service_a.py
import hmac
import hashlib
import json
from datetime import datetime

SHARED_SECRET = b"shared-key-between-services"

def sign_request(body: dict, timestamp: str) -> str:
"""Create HMAC signature for request."""
message = f"{json.dumps(body, sort_keys=True)}{timestamp}"
signature = hmac.new(SHARED_SECRET, message.encode(), hashlib.sha256).hexdigest()
return signature

async def call_service_b_signed(data: dict):
"""Call Service B with signed request."""
timestamp = datetime.utcnow().isoformat()
signature = sign_request(data, timestamp)

async with httpx.AsyncClient() as client:
response = await client.post(
"https://service-b:443/process",
json=data,
headers={
"X-Signature": signature,
"X-Timestamp": timestamp,
"X-Service": "service-a"
}
)
return response.json()

Service B Verifies Signature

# service_b.py
from fastapi import FastAPI, Header, HTTPException
from datetime import datetime, timedelta
import hmac
import hashlib
import json

app = FastAPI()
SHARED_SECRET = b"shared-key-between-services"

@app.post("/process")
async def process_request(
request: dict,
x_signature: str = Header(),
x_timestamp: str = Header(),
x_service: str = Header()
):
"""Verify signed request from another service."""
# Prevent replay attacks: timestamp must be recent
request_time = datetime.fromisoformat(x_timestamp)
if (datetime.utcnow() - request_time).total_seconds() > 60:
raise HTTPException(status_code=401, detail="Request too old")

# Verify signature
message = f"{json.dumps(request, sort_keys=True)}{x_timestamp}"
expected_signature = hmac.new(SHARED_SECRET, message.encode(), hashlib.sha256).hexdigest()

if not hmac.compare_digest(x_signature, expected_signature):
raise HTTPException(status_code=401, detail="Invalid signature")

return {"status": "processed", "from": x_service}

Key Takeaways

  • Mutual TLS provides transparent, certificate-based service authentication.
  • JWT with asymmetric signatures scales better and is easier to audit than shared secrets.
  • OAuth2 client credentials centralizes auth in high-scale microservices.
  • HMAC signing is simpler for private networks but more brittle than certificates.
  • Always verify the caller's identity on every inter-service call (zero-trust).

Frequently Asked Questions

Should I use mTLS or JWT for microservices?

mTLS is transparent (no app code) but complex operationally (certificate rotation). JWT is simpler but requires explicit code. Use mTLS if your infrastructure supports it (Kubernetes, service mesh); use JWT for explicit, auditableauth.

How do I rotate service certificates without downtime?

Implement certificate rotation with an overlap period: issue new certificates a week before expiry, allow both old and new certificates, then retire the old ones. Automate with tools like cert-manager in Kubernetes.

Can a compromised service access all other services?

In zero-trust, no. If Service A is compromised, an attacker has Service A's credentials. They can call Service B (which checks Service A's identity) but not Service C unless they also compromise Service A's credentials on every target. Proper network policies and secret management limit blast radius.

What's the difference between service-to-service auth and API authentication?

User auth proves who the user is (and issues tokens). Service-to-service auth proves which service is calling (mutual identity). Both use similar mechanisms (JWTs, certificates) but for different principals.

Further Reading