Skip to main content

gRPC Interceptors: Error Handling Step-by-Step

Interceptors are gRPC's middleware mechanism: reusable logic that runs before and after every RPC on both server and client sides. Instead of repeating auth checks, logging, error wrapping, and metric collection in every handler, you define it once in an interceptor and attach it to the server. A single server can have multiple chained interceptors (logging → auth → metrics → handler). This reduces code duplication, enforces cross-cutting concerns consistently, and makes it easy to add observability to all RPCs at once. This guide covers server and client interceptors, exception wrapping, request logging, and integration with tracing systems.

Server-Side Interceptor: Logging and Error Wrapping

A server interceptor wraps every RPC call. Here's a simple logging interceptor:

import grpc
import logging
import time
from typing import Callable, Any

logger = logging.getLogger(__name__)

class LoggingInterceptor(grpc.ServerInterceptor):
"""
Log all incoming RPC calls and their results.
"""

def intercept_service(self, continuation, handler_call_details):
"""
Called before each RPC invocation.

Args:
continuation: The next interceptor (or handler) in the chain
handler_call_details: Metadata about the RPC call

Returns:
A ServerRpcContext that wraps the continuation
"""
# Extract RPC metadata
method_name = handler_call_details.method
client_metadata = dict(handler_call_details.invocation_metadata or [])
request_id = client_metadata.get("x-request-id", "unknown")

logger.info(f"[{request_id}] RPC started: {method_name}")

start_time = time.time()

# Call the next interceptor or the handler itself
rpc_event = continuation(handler_call_details)

# Wrap the call to log completion
def _invoke_intercepted_rpc(rpc_event):
elapsed = time.time() - start_time
logger.info(f"[{request_id}] RPC completed: {method_name} ({elapsed:.3f}s)")
return rpc_event

# (Note: This is a simplified version; see advanced example below)
return rpc_event

More complete example with error handling:

class LoggingAndErrorInterceptor(grpc.ServerInterceptor):
"""
Log all RPCs and wrap errors with structured information.
"""

def intercept_service(self, continuation, handler_call_details):
method_name = handler_call_details.method
request_id = dict(handler_call_details.invocation_metadata or []).get("x-request-id", "unknown")

logger.info(f"[{request_id}] RPC: {method_name}")

start_time = time.time()

# Get the actual RPC handler
rpc_event = continuation(handler_call_details)

# Wrap the response handling
def handle_with_logging(request_iterator=None):
try:
# Call the actual handler
response_or_iterator = rpc_event.response(request_iterator)
elapsed = time.time() - start_time
logger.info(f"[{request_id}] Success: {method_name} ({elapsed:.3f}s)")
return response_or_iterator
except grpc.RpcError as e:
# gRPC error; log and re-raise
elapsed = time.time() - start_time
logger.error(
f"[{request_id}] RPC error: {method_name} - {e.code()} {e.details()} ({elapsed:.3f}s)"
)
raise
except Exception as e:
# Unexpected error; wrap in gRPC status
elapsed = time.time() - start_time
logger.exception(
f"[{request_id}] Unhandled error in {method_name} ({elapsed:.3f}s): {e}"
)
# Create a gRPC error that the client will see
raise grpc.RpcError(
grpc.StatusCode.INTERNAL,
f"Internal server error: {type(e).__name__}"
)

# Assign the wrapped function
rpc_event.response = handle_with_logging

return rpc_event

Register the interceptor on the server:

server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[LoggingAndErrorInterceptor()]
)
order_pb2_grpc.add_OrderServiceServicer_to_server(OrderServicer(), server)
server.add_insecure_port("[::]:50051")

Every RPC now logs with a request ID, duration, and any errors.

Authentication Interceptor

Enforce JWT or API key auth on all RPCs:

import jwt
from functools import wraps

class AuthInterceptor(grpc.ServerInterceptor):
"""
Verify JWT tokens in the 'authorization' metadata.
Reject unauthorized calls before reaching the handler.
"""

# Exempt endpoints that don't require auth
EXEMPT_METHODS = {
"/ecommerce.orders.OrderService/HealthCheck",
"/ecommerce.orders.OrderService/CreatePublicOrder",
}

def intercept_service(self, continuation, handler_call_details):
method_name = handler_call_details.method

# Skip auth for exempt methods
if method_name in self.EXEMPT_METHODS:
return continuation(handler_call_details)

# Extract authorization header
metadata = dict(handler_call_details.invocation_metadata or [])
auth_header = metadata.get("authorization", "")

# Validate token
try:
if not auth_header.startswith("Bearer "):
raise ValueError("Missing Bearer token")

token = auth_header.replace("Bearer ", "")
payload = jwt.decode(token, "secret-key", algorithms=["HS256"])
user_id = payload.get("user_id")

if not user_id:
raise ValueError("Invalid token payload")

# Store user_id in context for the handler to access
context = handler_call_details.context
context.user_id = user_id

except jwt.InvalidTokenError as e:
logger.warning(f"Invalid token: {e}")
# Reject the RPC immediately
abort_details = grpc.ServicerContext()
abort_details.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
return abort_details
except ValueError as e:
logger.warning(f"Auth error: {e}")
abort_details = grpc.ServicerContext()
abort_details.abort(grpc.StatusCode.UNAUTHENTICATED, str(e))
return abort_details

# Auth passed; call the handler
return continuation(handler_call_details)

In handlers, access the authenticated user:

def CreateOrder(self, request, context):
user_id = getattr(context, "user_id", None)
if not user_id:
context.abort(grpc.StatusCode.UNAUTHENTICATED, "User not authenticated")

# ... process order for user_id

Client-Side Interceptor: Adding Request Headers

Client interceptors run on the client side to add headers, retry logic, or request tracing:

class RequestIDInterceptor(grpc.UnaryClientInterceptor):
"""
Add a unique request ID to every client RPC for tracing.
"""

def intercept_unary_unary(self, continuation, client_call_details, request):
"""
Called before each unary-unary RPC.
"""
# Add request ID to metadata
metadata = client_call_details.metadata or []
request_id = str(uuid.uuid4())
metadata = list(metadata) + [("x-request-id", request_id)]

# Update call details
client_call_details = grpc.client_call_details.ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials
)

logger.info(f"Sending RPC {client_call_details.method} with request_id={request_id}")

# Call the next interceptor or the actual RPC
return continuation(request, client_call_details)

Register on the client:

with grpc.insecure_channel(
"localhost:50051",
interceptors=[RequestIDInterceptor()]
) as channel:
stub = order_pb2_grpc.OrderServiceStub(channel)
response = stub.CreateOrder(order_pb2.Order(...))

Chaining Multiple Interceptors

Pass a list of interceptors to the server; they execute in order:

server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[
LoggingAndErrorInterceptor(), # 1st: Log all calls
AuthInterceptor(), # 2nd: Verify JWT
MetricsInterceptor(), # 3rd: Collect metrics
]
)

Each interceptor wraps the next, creating a pipeline: Request → Logging → Auth → Metrics → Handler → Metrics → Auth → Logging → Response.

Metrics and Observability Interceptor

Collect latency, error rates, and request counts:

import prometheus_client

class MetricsInterceptor(grpc.ServerInterceptor):
"""
Collect Prometheus metrics for all RPCs.
"""

def __init__(self):
self.request_count = prometheus_client.Counter(
"grpc_requests_total",
"Total gRPC requests",
["method", "status"]
)
self.request_duration = prometheus_client.Histogram(
"grpc_request_duration_seconds",
"gRPC request latency",
["method"]
)

def intercept_service(self, continuation, handler_call_details):
method_name = handler_call_details.method
start_time = time.time()

rpc_event = continuation(handler_call_details)

def handle_with_metrics(request_iterator=None):
try:
response = rpc_event.response(request_iterator)
status = "success"
return response
except Exception as e:
status = "error"
raise
finally:
elapsed = time.time() - start_time
self.request_count.labels(method=method_name, status=status).inc()
self.request_duration.labels(method=method_name).observe(elapsed)

rpc_event.response = handle_with_metrics
return rpc_event

Expose metrics at /metrics for Prometheus scraping.

Key Takeaways

  • Interceptors are middleware that wrap every RPC on server or client side, enabling logging, auth, metrics, and error handling without duplicating code.
  • Server interceptors validate and preprocess requests; client interceptors add headers and handle retries.
  • Chain multiple interceptors in order; each wraps the next, creating a processing pipeline.
  • Use interceptors for cross-cutting concerns: logging, authentication, metrics, tracing, rate limiting.
  • Interceptors reduce code duplication and make it easy to add observability to all RPCs at once.

Frequently Asked Questions

Can interceptors short-circuit an RPC (reject it)?

Yes. An interceptor can call context.abort(StatusCode, message) to reject the RPC before it reaches the handler. Use this for auth, rate limiting, or validation.

Do interceptors work with streaming RPCs?

Yes, but the mechanism is slightly different. For streaming, override intercept_service and handle both streaming and unary RPCs in one method.

How do I pass data from an interceptor to the handler?

Attach it to the context object:

# In interceptor
context.user_id = extracted_user_id

# In handler
def CreateOrder(self, request, context):
user_id = getattr(context, "user_id", None)

Can I have per-method interceptors?

Not directly in gRPC. Instead, check the method name inside the interceptor and apply logic conditionally:

if method_name in self.EXEMPT_METHODS:
return continuation(handler_call_details)

What's the performance impact of interceptors?

Negligible if kept simple (logging, adding headers). Complex interceptors (database lookups, external API calls) can add latency. Always measure. For auth, use an interceptor; for expensive operations, cache results.

Further Reading