Building gRPC Servers: Unary RPC Guide
Unary RPC is the simplest gRPC communication pattern: a client sends a single request and receives a single response. Unlike REST endpoints that handle each request independently, gRPC servers maintain a persistent connection (HTTP/2), serialize requests and responses as Protocol Buffers, and use a shared context object for cancellation, timeouts, and metadata propagation. This guide teaches you to build robust unary RPC servers in Python, handle errors gracefully, respect client deadlines, and integrate with observability tools.
Anatomy of a Unary RPC Handler
A unary handler is a method on the servicer class that takes a request message and a context object, then returns a response:
import grpc
from concurrent import futures
import order_pb2
import order_pb2_grpc
class OrderServicer(order_pb2_grpc.OrderServiceServicer):
# Handler for CreateOrder RPC
def CreateOrder(self, request: order_pb2.Order, context: grpc.ServicerContext):
"""
Process a new order request.
Args:
request: Order proto message (contains order_id, items, customer_id)
context: gRPC context (provides deadline, cancellation, metadata)
Returns:
OrderResponse proto message
"""
# Step 1: Extract request data
customer_id = request.customer_id
total = sum(item.price * item.quantity for item in request.items)
# Step 2: Validate
if not customer_id or total <= 0:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid order data")
# Note: abort() raises an exception; code below doesn't run
# Step 3: Business logic (database insert, event publish, etc.)
try:
order_id = save_order_to_database(request)
publish_order_created_event(order_id)
except Exception as e:
# Log and return a service error
context.abort(grpc.StatusCode.INTERNAL, f"Failed to persist order: {e}")
# Step 4: Return response
return order_pb2.OrderResponse(
status="CONFIRMED",
order_id=order_id,
confirmed_at=int(time.time_ns())
)
def save_order_to_database(request: order_pb2.Order) -> str:
# Pseudo-code: insert order, return generated ID
return f"ORD-{uuid.uuid4().hex[:12].upper()}"
def publish_order_created_event(order_id: str):
# Pseudo-code: publish to message queue
pass
Key points:
- Handler receives a proto message and a
grpc.ServicerContextobject. - Use
context.abort(StatusCode, message)to return errors. This immediately terminates the RPC and sends the error to the client. - All exceptions raised become gRPC errors with
StatusCode.UNKNOWN. Useabort()for controlled error responses. - The response is a proto message built with field assignments. gRPC serializes it automatically.
The gRPC Context Object
The context parameter provides access to RPC metadata, deadlines, and cancellation signals:
class OrderServicer(order_pb2_grpc.OrderServiceServicer):
def CreateOrder(self, request, context: grpc.ServicerContext):
# Get RPC metadata (headers sent by client)
metadata = dict(context.invocation_metadata())
request_id = metadata.get("x-request-id", "unknown")
user_id = metadata.get("user-id")
# Check deadline (when client gives up waiting)
deadline = context.get_deadline()
if deadline:
remaining_seconds = deadline - time.time()
if remaining_seconds < 0.5: # Less than 500ms left
context.abort(
grpc.StatusCode.DEADLINE_EXCEEDED,
"Insufficient time to process order"
)
# Check if client cancelled the RPC
if context.cancelled():
return # Clean up and return; client won't receive response
# Send metadata back to client (response headers)
context.send_initial_metadata([
("server-version", "1.0.0"),
("region", "us-west-2")
])
# Process request
result = process_order(request)
# Send trailing metadata (response trailers, usually for error details)
context.set_trailing_metadata([
("processing-time-ms", "42")
])
return order_pb2.OrderResponse(status="OK", order_id=result["id"])
Context methods:
invocation_metadata(): RPC metadata headers (key-value pairs).get_deadline(): Unix timestamp when the client's deadline expires.cancelled(): ReturnsTrueif the client cancelled the RPC.send_initial_metadata(metadata): Send response headers before the response body.set_trailing_metadata(metadata): Send trailers (debugging info, timestamps) after the response.abort(code, message): Immediately fail the RPC with a status code and message.
Handling Errors and Status Codes
gRPC defines 16 standard status codes. Return the correct code to help clients retry intelligently:
| Code | Meaning | Retryable? | Example |
|---|---|---|---|
OK (0) | Success | No | Order created |
CANCELLED (1) | Client cancelled | No | User closed connection |
UNKNOWN (2) | Unknown error | No | Unhandled exception |
INVALID_ARGUMENT (3) | Bad input | No | Missing required field |
DEADLINE_EXCEEDED (4) | Timeout | Yes | Request took > 30s |
NOT_FOUND (5) | Resource missing | No | Order ID doesn't exist |
ALREADY_EXISTS (6) | Duplicate | No | Order ID already created |
PERMISSION_DENIED (7) | Unauthorized | No | User can't access order |
RESOURCE_EXHAUSTED (8) | Rate limited | Yes | Too many requests |
FAILED_PRECONDITION (9) | Invalid state | No | Order already shipped |
ABORTED (10) | Aborted by system | Yes | Retry after backoff |
INTERNAL (13) | Server error | No | Database crash |
UNAVAILABLE (14) | Service down | Yes | Temporarily unreachable |
UNAUTHENTICATED (16) | Auth missing | No | No JWT token |
def GetOrder(self, request, context):
order_id = request.id
# Not found error
if not order_exists(order_id):
context.abort(grpc.StatusCode.NOT_FOUND, f"Order {order_id} not found")
# Permission error
if not user_can_access_order(current_user(), order_id):
context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")
# Rate limiting
if too_many_requests_from_ip(request_ip):
context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "Rate limit exceeded; retry after 10s")
# Internal error
try:
order = fetch_order(order_id)
except DatabaseError as e:
context.abort(grpc.StatusCode.INTERNAL, f"Database error: {e}")
return order
Clients check the status code and decide to retry or fail:
try:
response = stub.GetOrder(GetOrderRequest(id="ORD-123"))
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
# Retry with backoff
retry_after_delay(1.0)
elif e.code() == grpc.StatusCode.NOT_FOUND:
# Don't retry; resource doesn't exist
raise
elif e.code() == grpc.StatusCode.UNAVAILABLE:
# Service is temporarily down; retry
retry_after_delay(5.0)
Creating and Running a Server
Assemble all handlers into a server and listen on a port:
import grpc
from concurrent import futures
import order_pb2_grpc
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OrderServicer(order_pb2_grpc.OrderServiceServicer):
def CreateOrder(self, request, context):
# ... handler logic
return order_pb2.OrderResponse(status="OK")
async def serve():
"""Run an async gRPC server."""
# Create async server with up to 10 worker threads
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
# Max message size: 10 MB (default is 4 MB)
("grpc.max_send_message_length", 10 * 1024 * 1024),
("grpc.max_receive_message_length", 10 * 1024 * 1024),
# Keep-alive: ping client every 30s if idle
("grpc.keepalive_time_ms", 30000),
]
)
# Register servicer
order_pb2_grpc.add_OrderServiceServicer_to_server(
OrderServicer(),
server
)
# Listen on port
server.add_insecure_port("[::]:50051")
logger.info("Starting gRPC server on port 50051")
await server.start()
# Wait for termination signal
await server.wait_for_termination()
if __name__ == "__main__":
import asyncio
asyncio.run(serve())
Run it:
python order_service.py
# Output: Starting gRPC server on port 50051
Client-Side Deadline Propagation
Clients can set a deadline (timeout) for each RPC. The server checks this deadline and aborts early if time runs out:
import grpc
import order_pb2_grpc
def create_order_with_timeout():
with grpc.insecure_channel("localhost:50051") as channel:
stub = order_pb2_grpc.OrderServiceStub(channel)
# Set a 5-second timeout for this RPC
deadline = time.time() + 5.0
try:
response = stub.CreateOrder(
order_pb2.Order(...),
timeout=deadline
)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
print("Order creation timed out after 5s")
The server's context object includes this deadline, and the handler checks it:
def CreateOrder(self, request, context):
deadline = context.get_deadline()
if deadline and time.time() > deadline:
context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "Already exceeded deadline")
Async Handlers (Recommended)
For high-concurrency, use async handlers with grpc.aio (async I/O):
class OrderServicer(order_pb2_grpc.OrderServiceServicer):
async def CreateOrder(self, request, context):
"""Async handler using async/await."""
# Async database operation (doesn't block thread)
order_id = await async_db.insert_order(request)
# Async message publish (doesn't block thread)
await async_queue.publish("order.created", {"id": order_id})
return order_pb2.OrderResponse(status="OK", order_id=order_id)
async def serve():
# Use grpc.aio for async servers
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=5) # Fewer threads needed
)
order_pb2_grpc.add_OrderServiceServicer_to_server(
OrderServicer(),
server
)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
asyncio.run(serve())
Async handlers allow thousands of concurrent requests with minimal thread overhead. Use grpc.aio for all new Python gRPC services.
Key Takeaways
- Unary handlers receive a proto request and a
grpc.ServicerContext, then return a proto response. - Use
context.abort(StatusCode, message)to return errors; return the correct status code so clients know whether to retry. - Context provides metadata, deadline checks, and cancellation signals. Check the deadline on long-running operations.
- Create servers with
grpc.aio.server(), add handlers, and listen on a port. - Use async handlers with
grpc.aiofor high-concurrency services (thousands of concurrent requests).
Frequently Asked Questions
How many concurrent requests can a gRPC server handle?
Depends on handler logic. A sync server with 10 workers can handle ~10 concurrent requests. An async server with 10 threads can handle 1000+ concurrent requests (HTTP/2 multiplexing + asyncio). For high volume, always use async handlers and grpc.aio.
How do I access request headers (like JWT tokens)?
Use context.invocation_metadata():
def CreateOrder(self, request, context):
metadata = dict(context.invocation_metadata())
auth_header = metadata.get("authorization", "")
token = auth_header.replace("Bearer ", "")
user = verify_jwt(token)
# ... process order
Can I run sync and async handlers in the same server?
Yes. grpc.aio.server() handles both sync and async handlers seamlessly.
How do I set a custom deadline on the client?
Use timeout parameter:
response = stub.CreateOrder(order, timeout=5.0) # 5 seconds
Or set a deadline timestamp:
deadline = time.time() + 5.0
response = stub.CreateOrder(order, timeout=deadline)
What's the max message size?
Default is 4 MB. Increase with server options:
server = grpc.aio.server(options=[
("grpc.max_receive_message_length", 100 * 1024 * 1024) # 100 MB
])
For larger data, use streaming RPCs instead.