Skip to main content

Building gRPC Servers: Unary RPC Guide

Unary RPC is the simplest gRPC communication pattern: a client sends a single request and receives a single response. Unlike REST endpoints that handle each request independently, gRPC servers maintain a persistent connection (HTTP/2), serialize requests and responses as Protocol Buffers, and use a shared context object for cancellation, timeouts, and metadata propagation. This guide teaches you to build robust unary RPC servers in Python, handle errors gracefully, respect client deadlines, and integrate with observability tools.

Anatomy of a Unary RPC Handler

A unary handler is a method on the servicer class that takes a request message and a context object, then returns a response:

import grpc
from concurrent import futures
import order_pb2
import order_pb2_grpc

class OrderServicer(order_pb2_grpc.OrderServiceServicer):
# Handler for CreateOrder RPC
def CreateOrder(self, request: order_pb2.Order, context: grpc.ServicerContext):
"""
Process a new order request.

Args:
request: Order proto message (contains order_id, items, customer_id)
context: gRPC context (provides deadline, cancellation, metadata)

Returns:
OrderResponse proto message
"""
# Step 1: Extract request data
customer_id = request.customer_id
total = sum(item.price * item.quantity for item in request.items)

# Step 2: Validate
if not customer_id or total <= 0:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid order data")
# Note: abort() raises an exception; code below doesn't run

# Step 3: Business logic (database insert, event publish, etc.)
try:
order_id = save_order_to_database(request)
publish_order_created_event(order_id)
except Exception as e:
# Log and return a service error
context.abort(grpc.StatusCode.INTERNAL, f"Failed to persist order: {e}")

# Step 4: Return response
return order_pb2.OrderResponse(
status="CONFIRMED",
order_id=order_id,
confirmed_at=int(time.time_ns())
)

def save_order_to_database(request: order_pb2.Order) -> str:
# Pseudo-code: insert order, return generated ID
return f"ORD-{uuid.uuid4().hex[:12].upper()}"

def publish_order_created_event(order_id: str):
# Pseudo-code: publish to message queue
pass

Key points:

  • Handler receives a proto message and a grpc.ServicerContext object.
  • Use context.abort(StatusCode, message) to return errors. This immediately terminates the RPC and sends the error to the client.
  • All exceptions raised become gRPC errors with StatusCode.UNKNOWN. Use abort() for controlled error responses.
  • The response is a proto message built with field assignments. gRPC serializes it automatically.

The gRPC Context Object

The context parameter provides access to RPC metadata, deadlines, and cancellation signals:

class OrderServicer(order_pb2_grpc.OrderServiceServicer):
def CreateOrder(self, request, context: grpc.ServicerContext):
# Get RPC metadata (headers sent by client)
metadata = dict(context.invocation_metadata())
request_id = metadata.get("x-request-id", "unknown")
user_id = metadata.get("user-id")

# Check deadline (when client gives up waiting)
deadline = context.get_deadline()
if deadline:
remaining_seconds = deadline - time.time()
if remaining_seconds < 0.5: # Less than 500ms left
context.abort(
grpc.StatusCode.DEADLINE_EXCEEDED,
"Insufficient time to process order"
)

# Check if client cancelled the RPC
if context.cancelled():
return # Clean up and return; client won't receive response

# Send metadata back to client (response headers)
context.send_initial_metadata([
("server-version", "1.0.0"),
("region", "us-west-2")
])

# Process request
result = process_order(request)

# Send trailing metadata (response trailers, usually for error details)
context.set_trailing_metadata([
("processing-time-ms", "42")
])

return order_pb2.OrderResponse(status="OK", order_id=result["id"])

Context methods:

  • invocation_metadata(): RPC metadata headers (key-value pairs).
  • get_deadline(): Unix timestamp when the client's deadline expires.
  • cancelled(): Returns True if the client cancelled the RPC.
  • send_initial_metadata(metadata): Send response headers before the response body.
  • set_trailing_metadata(metadata): Send trailers (debugging info, timestamps) after the response.
  • abort(code, message): Immediately fail the RPC with a status code and message.

Handling Errors and Status Codes

gRPC defines 16 standard status codes. Return the correct code to help clients retry intelligently:

CodeMeaningRetryable?Example
OK (0)SuccessNoOrder created
CANCELLED (1)Client cancelledNoUser closed connection
UNKNOWN (2)Unknown errorNoUnhandled exception
INVALID_ARGUMENT (3)Bad inputNoMissing required field
DEADLINE_EXCEEDED (4)TimeoutYesRequest took > 30s
NOT_FOUND (5)Resource missingNoOrder ID doesn't exist
ALREADY_EXISTS (6)DuplicateNoOrder ID already created
PERMISSION_DENIED (7)UnauthorizedNoUser can't access order
RESOURCE_EXHAUSTED (8)Rate limitedYesToo many requests
FAILED_PRECONDITION (9)Invalid stateNoOrder already shipped
ABORTED (10)Aborted by systemYesRetry after backoff
INTERNAL (13)Server errorNoDatabase crash
UNAVAILABLE (14)Service downYesTemporarily unreachable
UNAUTHENTICATED (16)Auth missingNoNo JWT token
def GetOrder(self, request, context):
order_id = request.id

# Not found error
if not order_exists(order_id):
context.abort(grpc.StatusCode.NOT_FOUND, f"Order {order_id} not found")

# Permission error
if not user_can_access_order(current_user(), order_id):
context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")

# Rate limiting
if too_many_requests_from_ip(request_ip):
context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "Rate limit exceeded; retry after 10s")

# Internal error
try:
order = fetch_order(order_id)
except DatabaseError as e:
context.abort(grpc.StatusCode.INTERNAL, f"Database error: {e}")

return order

Clients check the status code and decide to retry or fail:

try:
response = stub.GetOrder(GetOrderRequest(id="ORD-123"))
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
# Retry with backoff
retry_after_delay(1.0)
elif e.code() == grpc.StatusCode.NOT_FOUND:
# Don't retry; resource doesn't exist
raise
elif e.code() == grpc.StatusCode.UNAVAILABLE:
# Service is temporarily down; retry
retry_after_delay(5.0)

Creating and Running a Server

Assemble all handlers into a server and listen on a port:

import grpc
from concurrent import futures
import order_pb2_grpc
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OrderServicer(order_pb2_grpc.OrderServiceServicer):
def CreateOrder(self, request, context):
# ... handler logic
return order_pb2.OrderResponse(status="OK")

async def serve():
"""Run an async gRPC server."""
# Create async server with up to 10 worker threads
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
# Max message size: 10 MB (default is 4 MB)
("grpc.max_send_message_length", 10 * 1024 * 1024),
("grpc.max_receive_message_length", 10 * 1024 * 1024),
# Keep-alive: ping client every 30s if idle
("grpc.keepalive_time_ms", 30000),
]
)

# Register servicer
order_pb2_grpc.add_OrderServiceServicer_to_server(
OrderServicer(),
server
)

# Listen on port
server.add_insecure_port("[::]:50051")

logger.info("Starting gRPC server on port 50051")
await server.start()

# Wait for termination signal
await server.wait_for_termination()

if __name__ == "__main__":
import asyncio
asyncio.run(serve())

Run it:

python order_service.py
# Output: Starting gRPC server on port 50051

Client-Side Deadline Propagation

Clients can set a deadline (timeout) for each RPC. The server checks this deadline and aborts early if time runs out:

import grpc
import order_pb2_grpc

def create_order_with_timeout():
with grpc.insecure_channel("localhost:50051") as channel:
stub = order_pb2_grpc.OrderServiceStub(channel)

# Set a 5-second timeout for this RPC
deadline = time.time() + 5.0

try:
response = stub.CreateOrder(
order_pb2.Order(...),
timeout=deadline
)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
print("Order creation timed out after 5s")

The server's context object includes this deadline, and the handler checks it:

def CreateOrder(self, request, context):
deadline = context.get_deadline()
if deadline and time.time() > deadline:
context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "Already exceeded deadline")

For high-concurrency, use async handlers with grpc.aio (async I/O):

class OrderServicer(order_pb2_grpc.OrderServiceServicer):
async def CreateOrder(self, request, context):
"""Async handler using async/await."""
# Async database operation (doesn't block thread)
order_id = await async_db.insert_order(request)

# Async message publish (doesn't block thread)
await async_queue.publish("order.created", {"id": order_id})

return order_pb2.OrderResponse(status="OK", order_id=order_id)

async def serve():
# Use grpc.aio for async servers
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=5) # Fewer threads needed
)
order_pb2_grpc.add_OrderServiceServicer_to_server(
OrderServicer(),
server
)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()

asyncio.run(serve())

Async handlers allow thousands of concurrent requests with minimal thread overhead. Use grpc.aio for all new Python gRPC services.

Key Takeaways

  • Unary handlers receive a proto request and a grpc.ServicerContext, then return a proto response.
  • Use context.abort(StatusCode, message) to return errors; return the correct status code so clients know whether to retry.
  • Context provides metadata, deadline checks, and cancellation signals. Check the deadline on long-running operations.
  • Create servers with grpc.aio.server(), add handlers, and listen on a port.
  • Use async handlers with grpc.aio for high-concurrency services (thousands of concurrent requests).

Frequently Asked Questions

How many concurrent requests can a gRPC server handle?

Depends on handler logic. A sync server with 10 workers can handle ~10 concurrent requests. An async server with 10 threads can handle 1000+ concurrent requests (HTTP/2 multiplexing + asyncio). For high volume, always use async handlers and grpc.aio.

How do I access request headers (like JWT tokens)?

Use context.invocation_metadata():

def CreateOrder(self, request, context):
metadata = dict(context.invocation_metadata())
auth_header = metadata.get("authorization", "")
token = auth_header.replace("Bearer ", "")
user = verify_jwt(token)
# ... process order

Can I run sync and async handlers in the same server?

Yes. grpc.aio.server() handles both sync and async handlers seamlessly.

How do I set a custom deadline on the client?

Use timeout parameter:

response = stub.CreateOrder(order, timeout=5.0)  # 5 seconds

Or set a deadline timestamp:

deadline = time.time() + 5.0
response = stub.CreateOrder(order, timeout=deadline)

What's the max message size?

Default is 4 MB. Increase with server options:

server = grpc.aio.server(options=[
("grpc.max_receive_message_length", 100 * 1024 * 1024) # 100 MB
])

For larger data, use streaming RPCs instead.

Further Reading