Skip to main content

Broadcasting Messages: Efficient Multi-Client Updates

Broadcasting is the core operation in real-time apps: one user sends a message, it reaches 1,000 connected clients in milliseconds. Naive broadcast loops—iterating over connections and sending—become bottlenecks at scale. This article optimizes broadcasting with batch operations, concurrent sending, graceful failure handling, and strategies to minimize latency across thousands of clients.

The Naive Broadcast Loop and Its Limits

A straightforward broadcast looks like this:

async def broadcast(self, message: dict):
for client_id, user in self.active_connections.items():
try:
await user["websocket"].send_json(message)
except Exception:
pass

This is sequential: send to connection 1, wait for it, send to connection 2, wait, and so on. If each send takes 1ms, broadcasting to 1,000 clients takes 1 second—unacceptable for real-time apps where messages should arrive within 100ms. The await keyword yields control back to the event loop between sends, so other tasks (other message handling) can run, but the broadcast task itself is serialized.

Concurrent Broadcasting with asyncio.gather()

Speed up broadcast by sending to multiple clients concurrently:

import asyncio

async def broadcast(self, message: dict):
if not self.active_connections:
return

tasks = []
for client_id, user in self.active_connections.items():
tasks.append(self._send_safe(user["websocket"], message))

# Wait for all sends to complete concurrently
await asyncio.gather(*tasks, return_exceptions=True)

async def _send_safe(self, websocket: WebSocket, message: dict):
try:
await websocket.send_json(message)
except Exception as e:
# Log or handle failure; don't crash the broadcast
print(f"Send failed: {e}")

With gather(), all 1,000 sends happen concurrently on the event loop. If each takes 1ms, but only 10 are truly blocked at any moment (e.g., waiting on TCP kernel), the entire broadcast completes in ~100ms. The event loop multiplexes between them, sending to whichever socket is ready.

Handling Disconnections During Broadcast

Some sends will fail: network drops, client closes, etc. The return_exceptions=True parameter tells gather() to catch exceptions and return them as results instead of re-raising. Process them afterward:

async def broadcast(self, message: dict):
tasks = []
client_ids = list(self.active_connections.keys())

for client_id in client_ids:
user = self.active_connections.get(client_id)
if user:
tasks.append(self._send_safe(user["websocket"], message, client_id))

results = await asyncio.gather(*tasks, return_exceptions=True)

# Remove failed connections
disconnected = [
client_id for client_id, result in zip(client_ids, results)
if isinstance(result, Exception)
]
for client_id in disconnected:
self.active_connections.pop(client_id, None)

async def _send_safe(self, websocket: WebSocket, message: dict, client_id: str):
try:
await websocket.send_json(message)
return client_id # success indicator
except Exception as e:
raise e # re-raise to be caught by gather

Now, if one connection is dead, it doesn't block the broadcast—we collect all results and clean up afterward.

Limiting Concurrency with Semaphore

Broadcasting to 100,000 clients concurrently might exhaust system resources (file descriptors, memory). Limit concurrency with asyncio.Semaphore:

class BroadcastManager:
def __init__(self, max_concurrent: int = 1000):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.active_connections = {}

async def _send_safe(self, websocket: WebSocket, message: dict):
async with self.semaphore:
await websocket.send_json(message)

async def broadcast(self, message: dict):
tasks = [
self._send_safe(user["websocket"], message)
for user in self.active_connections.values()
]
results = await asyncio.gather(*tasks, return_exceptions=True)

# Clean up failures
disconnected = [
cid for cid, (result, (cid, user)) in enumerate(zip(results, self.active_connections.items()))
if isinstance(result, Exception)
]
for cid in disconnected:
self.active_connections.pop(cid, None)

The semaphore ensures at most 1,000 concurrent sends. Excess tasks queue and run as slots free up. This prevents resource exhaustion while maintaining high throughput.

Batch Broadcasting for Efficiency

If messages arrive rapidly (e.g., a stock ticker emitting updates every 100ms), you can batch them into a single broadcast:

import time
from collections import deque

class BatchBroadcaster:
def __init__(self, batch_size: int = 50, batch_interval: float = 0.1):
self.batch_size = batch_size
self.batch_interval = batch_interval
self.message_queue = deque()
self.last_flush = time.time()

async def queue_message(self, message: dict):
self.message_queue.append(message)

# Flush if batch is full or interval has passed
if len(self.message_queue) >= self.batch_size or \
(time.time() - self.last_flush) >= self.batch_interval:
await self.flush()

async def flush(self):
if not self.message_queue:
return

batch = list(self.message_queue)
self.message_queue.clear()
self.last_flush = time.time()

# Send batch as a single message
await self.broadcast({"type": "batch", "messages": batch})

Clients receive a batch message containing up to 50 updates, reducing overhead. The trade-off is latency: messages wait up to 100ms for a batch to fill, which is acceptable for non-critical updates.

Measuring Broadcast Latency

To monitor performance, measure the time from send initiation to completion:

import time

async def broadcast(self, message: dict):
start = time.time()
tasks = [self._send_safe(user["websocket"], message) for user in self.active_connections.values()]
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.time() - start

num_clients = len(self.active_connections)
print(f"Broadcast to {num_clients} clients took {elapsed*1000:.2f}ms ({elapsed/num_clients*1000:.3f}ms per client)")

Track metrics like:

  • Total broadcast time (target: <200ms for 10,000 clients)
  • Per-client latency
  • Failed sends (should be <0.1%)

Dashboard tools (Prometheus, DataDog) can alert if broadcast time spikes, indicating network or infrastructure issues.

Ordered Broadcasting: When Order Matters

For applications like collaborative editing, messages must arrive in order. Sequential broadcasting guarantees this, but concurrent broadcast might reorder if some clients are slower. If order is critical, use a sequence number:

async def broadcast(self, message: dict):
message["sequence"] = self.sequence_counter
self.sequence_counter += 1

tasks = [self._send_safe(user["websocket"], message) for user in self.active_connections.values()]
await asyncio.gather(*tasks, return_exceptions=True)

Clients buffer out-of-order messages and apply them in sequence number order, ensuring consistency.

Key Takeaways

  • Sequential broadcast is slow; use asyncio.gather() to send concurrently, achieving 10–50x speedup.
  • Limit concurrency with Semaphore to prevent resource exhaustion at extreme scale.
  • Gracefully handle failed sends during broadcast; don't let one dead connection fail everyone.
  • Batch messages for high-frequency updates to reduce overhead and improve throughput.
  • Measure latency and track failures to detect infrastructure issues early.
  • Add sequence numbers for ordered delivery when consistency is required.

Frequently Asked Questions

What's the maximum clients a single FastAPI server can handle?

Practically, 10,000–50,000 depending on message frequency, client library, and hardware. Each connection uses file descriptors (OS limit typically 65,536) and RAM (~30 KB per idle connection). Load testing your specific application is essential.

Should I use asyncio.gather() or asyncio.create_task()?

gather() waits for all tasks; use it when you need to know if all sends succeeded. create_task() fires-and-forgets, useful for non-critical notifications. For critical chat messages, use gather() and handle failures; for analytics, use create_task().

How do I broadcast to a subset of clients (e.g., a room)?

Filter before gathering: tasks = [self._send_safe(...) for cid, user in self.active_connections.items() if user.get("room") == room_id]. This is more efficient than broadcasting to all and filtering client-side.

What if a client is slow and backs up the whole broadcast?

Set a send timeout: await asyncio.wait_for(websocket.send_json(message), timeout=1.0). If a send takes longer than 1 second, timeout and skip it. This prevents one slow client from holding up everyone.

Further Reading