Skip to main content

gRPC Streaming: Client and Server Tutorial

Streaming RPCs allow clients and servers to exchange multiple messages over a single connection, enabling real-time data pipelines, bulk uploads, and bidirectional communication. Unlike unary RPCs (one request, one response), gRPC streaming supports three additional patterns: server streaming (one request, many responses), client streaming (many requests, one response), and bidirectional streaming (many in both directions). HTTP/2 multiplexing means multiple streams coexist on one connection without blocking each other. This guide covers all four RPC types, backpressure handling, error propagation, and production patterns like event broadcasting and bulk data import.

Proto Definition: Four RPC Types

Define streaming RPCs in your .proto file using the stream keyword:

syntax = "proto3";
package ecommerce.orders;

service OrderService {
// Unary: one request, one response (covered in article 4)
rpc CreateOrder (Order) returns (OrderResponse) {}

// Server streaming: one request, many responses
rpc GetOrderHistory (UserID) returns (stream OrderEvent) {}

// Client streaming: many requests, one response
rpc ProcessBulkOrders (stream Order) returns (BulkProcessResult) {}

// Bidirectional streaming: many in both directions
rpc NotifyOrderUpdates (stream OrderID) returns (stream OrderUpdate) {}
}

message Order {
string order_id = 1;
string customer_id = 2;
float total = 3;
}

message OrderEvent {
string event_type = 1; // "CREATED", "SHIPPED", "DELIVERED"
string order_id = 2;
int64 timestamp = 3;
}

message OrderID {
string id = 1;
}

message OrderUpdate {
string order_id = 1;
string status = 2;
int64 updated_at = 3;
}

message UserID {
string id = 1;
}

message BulkProcessResult {
int32 success_count = 1;
int32 failure_count = 2;
repeated string error_messages = 3;
}

Server Streaming: Broadcasting to Clients

Server streaming sends multiple messages to a single client. Use cases: event feeds, log tails, real-time dashboards.

Server implementation:

class OrderServicer(order_pb2_grpc.OrderServiceServicer):
def GetOrderHistory(self, request, context):
"""
Stream all order events for a user.

Yields:
OrderEvent messages (one per update in order history)
"""
user_id = request.id

# Validate user
if not user_exists(user_id):
context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")

# Fetch order history (paginated to avoid memory issues)
try:
events = stream_order_events(user_id)
for event in events:
# Check if client cancelled before sending each message
if context.cancelled():
print(f"Client cancelled GetOrderHistory for {user_id}")
return

# Yield sends one message to the client
yield order_pb2.OrderEvent(
event_type=event["type"],
order_id=event["order_id"],
timestamp=event["timestamp"]
)
except DatabaseError as e:
context.abort(grpc.StatusCode.INTERNAL, f"Failed to fetch history: {e}")

def stream_order_events(user_id):
"""Pseudo-code: query database with cursor to avoid loading all at once."""
cursor = db.orders.find({"customer_id": user_id}).sort("created_at", -1)
for doc in cursor:
yield {
"type": "CREATED" if doc["status"] == "pending" else "SHIPPED",
"order_id": doc["_id"],
"timestamp": doc["created_at"].timestamp() * 1000
}

Client implementation:

def get_order_history(user_id):
"""Consume order event stream from server."""
with grpc.insecure_channel("localhost:50051") as channel:
stub = order_pb2_grpc.OrderServiceStub(channel)

request = order_pb2.UserID(id=user_id)

# Call returns an iterator; iterate to consume messages
try:
for event in stub.GetOrderHistory(request):
print(f"Order {event.order_id}: {event.event_type}")
# Process each event in real time
log_event_to_analytics(event)
except grpc.RpcError as e:
print(f"Stream failed: {e.code()} - {e.details()}")

If the stream has 1 million events, the client receives them incrementally without loading all into memory. Each yield sends one message; each loop iteration receives one.

Client Streaming: Batch Upload from Client

Client streaming lets a single client send multiple messages to the server, useful for bulk uploads, batched writes.

Server implementation:

class OrderServicer(order_pb2_grpc.OrderServiceServicer):
def ProcessBulkOrders(self, request_iterator, context):
"""
Consume a stream of Order messages from client.

Args:
request_iterator: Iterator of Order messages (client sends multiple)
context: gRPC context

Returns:
BulkProcessResult (one response after consuming all requests)
"""
success_count = 0
error_messages = []

try:
for order in request_iterator: # Iterate over client-sent messages
try:
# Validate and persist each order
if not order.customer_id or order.total <= 0:
error_messages.append(f"Invalid order: {order.order_id}")
continue

order_id = save_order(order)
success_count += 1

# Periodically log progress (optional)
if success_count % 100 == 0:
print(f"Processed {success_count} orders...")

except Exception as e:
error_messages.append(f"Order {order.order_id}: {e}")

except Exception as e:
context.abort(grpc.StatusCode.INTERNAL, f"Stream error: {e}")

# After consuming all messages, return a single response
return order_pb2.BulkProcessResult(
success_count=success_count,
failure_count=len(error_messages),
error_messages=error_messages[:10] # Limit to 10 for brevity
)

def save_order(order):
"""Pseudo-code: insert order to database."""
return f"ORD-{uuid.uuid4().hex[:12]}"

Client implementation:

def upload_orders_in_bulk(orders):
"""Send multiple orders to server in a stream."""
with grpc.insecure_channel("localhost:50051") as channel:
stub = order_pb2_grpc.OrderServiceStub(channel)

# Create a request iterator (generator)
def request_generator():
for order_data in orders: # orders: list of dicts
yield order_pb2.Order(
order_id=order_data["id"],
customer_id=order_data["customer_id"],
total=order_data["total"]
)

try:
# Call with generator; server consumes all messages
result = stub.ProcessBulkOrders(request_generator())
print(f"Processed {result.success_count} successfully, {result.failure_count} failed")
for error in result.error_messages:
print(f" - {error}")
except grpc.RpcError as e:
print(f"Bulk upload failed: {e.details()}")

# Usage
orders = [
{"id": "A", "customer_id": "C1", "total": 100.0},
{"id": "B", "customer_id": "C2", "total": 200.0},
# ... 1 million more
]
upload_orders_in_bulk(orders)

Bidirectional Streaming: Real-Time Updates

Bidirectional streaming allows simultaneous send and receive. Use cases: chat, collaborative editing, real-time monitoring.

Server implementation:

class OrderServicer(order_pb2_grpc.OrderServiceServicer):
def NotifyOrderUpdates(self, request_iterator, context):
"""
Client subscribes to updates for multiple orders (sends OrderIDs).
Server streams back updates as they occur.
"""
subscriptions = [] # Store client subscriptions
update_queue = asyncio.Queue() # Channel for updates

async def consume_subscriptions():
"""Async task: read incoming subscription requests from client."""
try:
async for order_id_msg in request_iterator:
subscriptions.append(order_id_msg.id)
print(f"Client subscribed to order {order_id_msg.id}")
except Exception as e:
print(f"Subscription stream error: {e}")

async def produce_updates():
"""Async task: stream updates back to client."""
try:
while True:
# Poll database for updates to subscribed orders
for order_id in subscriptions:
update = check_for_order_update(order_id)
if update:
yield order_pb2.OrderUpdate(
order_id=order_id,
status=update["status"],
updated_at=update["timestamp"]
)

# Sleep before next poll (to avoid tight loop)
await asyncio.sleep(1.0)
except Exception as e:
print(f"Update stream error: {e}")

# (Simplified; a full async implementation is more complex.)
# For now, use a simple sync approach:
try:
for order_id_msg in request_iterator:
subscriptions.append(order_id_msg.id)
# In production, use a thread or async loop to poll + yield
except Exception as e:
context.abort(grpc.StatusCode.INTERNAL, str(e))

Client implementation (simpler for bidirectional):

def subscribe_to_order_updates(order_ids):
"""Bidirectional: send order IDs, receive updates."""
with grpc.insecure_channel("localhost:50051") as channel:
stub = order_pb2_grpc.OrderServiceStub(channel)

# Create a request generator that sends order IDs
def request_generator():
for order_id in order_ids:
yield order_pb2.OrderID(id=order_id)
# In a real app, might send more IDs dynamically
time.sleep(0.5)

try:
# Receive updates as server sends them
for update in stub.NotifyOrderUpdates(request_generator()):
print(f"Order {update.order_id} status: {update.status}")
except grpc.RpcError as e:
print(f"Update stream failed: {e.details()}")

# Usage
subscribe_to_order_updates(["ORD-001", "ORD-002", "ORD-003"])

Handling Backpressure and Flow Control

When a client or server sends messages faster than the other side can consume, backpressure (flow control) prevents memory exhaustion. HTTP/2 handles this automatically using window sizes. However, your code should:

  1. Server side: Don't buffer unlimited messages; consume the iterator as you process.
  2. Client side: If sending, use a generator (not a list) to avoid buffering all in memory.
# BAD: Loads all 1 million orders into memory before sending
def bad_upload(orders):
requests = [order_pb2.Order(...) for order in orders] # Huge list!
stub.ProcessBulkOrders(iter(requests)) # Sends all at once

# GOOD: Yields orders one at a time (streaming)
def good_upload(orders):
def request_generator():
for order_data in orders:
yield order_pb2.Order(...) # One at a time
stub.ProcessBulkOrders(request_generator())

Error Propagation in Streams

If an error occurs mid-stream:

Server-side error:

def GetOrderHistory(self, request, context):
for event in stream_order_events(request.id):
try:
yield event
except DatabaseError:
# Abort the stream; client receives error
context.abort(grpc.StatusCode.INTERNAL, "Database offline")

Client-side error handling:

try:
for event in stub.GetOrderHistory(request):
print(event)
except grpc.RpcError as e:
print(f"Stream terminated: {e.code()} - {e.details()}")

Key Takeaways

  • Server streaming (one-to-many) is ideal for event feeds and log tails; client receives incremental messages without loading all into memory.
  • Client streaming (many-to-one) is perfect for bulk uploads; server accumulates multiple requests, then returns one summary response.
  • Bidirectional streaming enables real-time communication; both sides send and receive simultaneously.
  • Use generators for client-side requests to avoid buffering entire datasets in memory.
  • HTTP/2 flow control handles backpressure automatically; your code consumes iterators as-is.

Frequently Asked Questions

Can I cancel a stream mid-transmission?

Yes. Client calls stub.method().cancel() or closes the channel. Server's iterator stops, and context.cancelled() returns True. Clean up resources gracefully in a try/finally.

What's the max size of a streamed message?

Same as unary: default 4 MB, configurable via server options. Streaming doesn't bypass this limit; each individual message must fit.

How do I pause a stream from the client?

gRPC has no built-in pause mechanism. If the server is too fast, the client can:

  1. Consume messages slowly (implicit backpressure via flow control).
  2. Close the channel to stop the stream entirely.

For explicit rate limiting, implement on your application layer (e.g., process every Nth message).

Can I have multiple bidirectional streams on one connection?

Yes. HTTP/2 multiplexes them; they don't block each other.

How do I handle timeouts in streaming?

Client sets a deadline:

deadline = time.time() + 60.0  # 60 seconds
for event in stub.GetOrderHistory(request, timeout=deadline):
...

Server checks the deadline:

if context.get_deadline() and time.time() > context.get_deadline():
context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "Exceeded deadline")

Further Reading