Redis Pub/Sub: Scaling WebSockets Horizontally
A single server can handle 10,000–50,000 WebSocket connections before hitting limits (CPU, memory, file descriptors). Beyond that, you need horizontal scaling: multiple server instances, each accepting some connections, with a load balancer distributing traffic. But a message sent to server A must reach clients connected to server B. Redis pub/sub solves this: servers subscribe to a channel; when any server broadcasts, all other servers receive the message and fan it out to their local clients. This article adds Redis pub/sub to the chat application, enabling unlimited scale.
Architecture: Multi-Server with Shared Message Bus
Before Redis:
- Server 1 holds connections for users A, B, C.
- Server 2 holds connections for users D, E, F.
- User A sends a message; it reaches B and C, but D, E, F don't receive it.
With Redis:
- All servers subscribe to a Redis channel (e.g.,
chat:#general). - When user A (on Server 1) sends a message, Server 1 publishes it to Redis.
- Redis delivers the message to all subscribers: Server 1, 2, and any others.
- Server 1 sends to B and C; Server 2 sends to D, E, F. Everyone gets it.
Redis Client Setup
Install the redis library:
pip install redis[asyncio]
Then initialize a Redis connection:
import redis.asyncio as redis
from typing import Optional
class RedisManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis_client: Optional[redis.Redis] = None
async def init(self):
self.redis_client = await redis.from_url(self.redis_url, decode_responses=True)
async def close(self):
if self.redis_client:
await self.redis_client.close()
async def publish(self, channel: str, message: dict):
"""Publish a message to a Redis channel."""
import json
await self.redis_client.publish(channel, json.dumps(message))
async def subscribe(self, channel: str):
"""Subscribe to a channel and return a PubSub object."""
pubsub = self.redis_client.pubsub()
await pubsub.subscribe(channel)
return pubsub
redis_manager = RedisManager()
@app.on_event("startup")
async def startup():
await redis_manager.init()
@app.on_event("shutdown")
async def shutdown():
await redis_manager.close()
Hybrid Local and Redis Broadcasting
The chat manager now publishes to Redis for other servers and sends directly to local clients:
import json
import asyncio
from typing import Dict
class HybridChatManager:
def __init__(self, server_id: str, redis_manager: RedisManager):
self.server_id = server_id # Unique identifier for this server
self.redis_manager = redis_manager
self.rooms: Dict[str, Dict[str, dict]] = {}
self.subscribed_channels: Dict[str, redis.asyncio.PubSub] = {}
async def connect(self, room_id: str, client_id: str, username: str, websocket: WebSocket):
self.rooms.setdefault(room_id, {})
self.rooms[room_id][client_id] = {
"username": username,
"websocket": websocket,
"joined_at": datetime.now()
}
await websocket.accept()
# Subscribe to Redis channel if not already
if room_id not in self.subscribed_channels:
channel = f"chat:{room_id}"
pubsub = await self.redis_manager.subscribe(channel)
self.subscribed_channels[room_id] = pubsub
# Start listening for Redis messages in background
asyncio.create_task(self._redis_listener(room_id, channel))
# Publish join event
await self.redis_manager.publish(f"chat:{room_id}", {
"type": "user_joined",
"username": username,
"server_id": self.server_id
})
async def handle_message(self, room_id: str, client_id: str, text: str):
if room_id not in self.rooms or client_id not in self.rooms[room_id]:
return
user = self.rooms[room_id][client_id]
message = {
"type": "chat_message",
"username": user["username"],
"text": text,
"timestamp": datetime.now().isoformat(),
"server_id": self.server_id
}
# Publish to Redis (all servers, including this one)
await self.redis_manager.publish(f"chat:{room_id}", message)
async def _redis_listener(self, room_id: str, channel: str):
"""Listen for messages from Redis and broadcast to local clients."""
pubsub = self.subscribed_channels[room_id]
async for message in pubsub.listen():
if message["type"] == "message":
try:
data = json.loads(message["data"])
# Broadcast to local clients in this room
await self._broadcast_local(room_id, data)
except Exception as e:
print(f"Error processing Redis message: {e}")
async def _broadcast_local(self, room_id: str, message: dict):
"""Send message to all local clients in the room."""
if room_id not in self.rooms:
return
disconnected = []
for client_id, user in self.rooms[room_id].items():
try:
await user["websocket"].send_json(message)
except Exception:
disconnected.append(client_id)
for client_id in disconnected:
self.rooms[room_id].pop(client_id, None)
def disconnect(self, room_id: str, client_id: str):
if room_id in self.rooms and client_id in self.rooms[room_id]:
user = self.rooms[room_id].pop(client_id)
return user["username"]
return None
manager = HybridChatManager(server_id="server-1", redis_manager=redis_manager)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, room: str = Query(...), username: str = Query(...), client_id: str = Query(...)):
await manager.connect(room, client_id, username, websocket)
try:
while True:
data = await websocket.receive_text()
await manager.handle_message(room, client_id, data)
except Exception:
username = manager.disconnect(room, client_id)
if username:
await redis_manager.publish(f"chat:{room}", {
"type": "user_left",
"username": username,
"server_id": manager.server_id
})
Avoiding Duplicate Messages to the Publisher
The flow above has a subtle bug: when Server 1 publishes to Redis, Redis also delivers the message back to Server 1 (it's a subscriber), causing Server 1 to send the message to its clients twice—once from the local send, once from the Redis listener. Fix this with a server ID check:
async def _broadcast_local(self, room_id: str, message: dict):
"""Send message to local clients, avoiding self-duplicates."""
if room_id not in self.rooms:
return
# Skip if this message originated from another server
# (Let the originating server send it to its clients)
if message.get("server_id") != self.server_id and message.get("type") in ["user_joined", "user_left"]:
return
disconnected = []
for client_id, user in self.rooms[room_id].items():
try:
await user["websocket"].send_json(message)
except Exception:
disconnected.append(client_id)
for client_id in disconnected:
self.rooms[room_id].pop(client_id, None)
Alternatively, publish after sending locally:
async def handle_message(self, room_id: str, client_id: str, text: str):
user = self.rooms[room_id][client_id]
message = {
"type": "chat_message",
"username": user["username"],
"text": text,
"timestamp": datetime.now().isoformat(),
"server_id": self.server_id
}
# Send to local clients first
await self._broadcast_local(room_id, message)
# Then publish to other servers
message["skip_local"] = True
await self.redis_manager.publish(f"chat:{room_id}", message)
Handling Room Cleanup and Memory
If a room becomes empty (all users disconnect) and Redis holds stale subscriptions, memory leaks. Unsubscribe when the last user leaves:
def disconnect(self, room_id: str, client_id: str):
if room_id in self.rooms and client_id in self.rooms[room_id]:
user = self.rooms[room_id].pop(client_id)
# Cleanup if room is empty
if not self.rooms[room_id]:
del self.rooms[room_id]
if room_id in self.subscribed_channels:
asyncio.create_task(self._unsubscribe(room_id))
return user["username"]
return None
async def _unsubscribe(self, room_id: str):
if room_id in self.subscribed_channels:
pubsub = self.subscribed_channels.pop(room_id)
await pubsub.unsubscribe(f"chat:{room_id}")
await pubsub.close()
Redis Stream vs Pub/Sub
Redis pub/sub is fire-and-forget: if no subscribers are listening when a message is published, it's lost. For critical messages, use Redis Streams instead:
async def publish(self, channel: str, message: dict):
"""Publish to both Pub/Sub (for immediate delivery) and Stream (for replay)."""
import json
msg_json = json.dumps(message)
# Pub/Sub for immediate delivery
await self.redis_client.publish(channel, msg_json)
# Stream for persistence and replay
await self.redis_client.xadd(f"stream:{channel}", {"data": msg_json})
When a new server starts, it can read the stream to catch up on recent messages:
async def replay_recent_messages(self, room_id: str, since_seconds: int = 3600):
"""Replay messages from the last hour."""
stream_key = f"stream:chat:{room_id}"
messages = await self.redis_client.xrevrange(stream_key, count=100)
for msg_id, msg_data in messages:
yield json.loads(msg_data[b"data"])
Key Takeaways
- Redis pub/sub enables multi-server WebSocket scaling: servers subscribe to channels, publish messages globally.
- Each server broadcasts locally to its clients while listening for messages from other servers via Redis.
- Avoid duplicate message delivery by skipping messages that originated from other servers.
- Unsubscribe and cleanup when rooms empty to prevent memory leaks.
- For mission-critical messages, use Redis Streams for persistence and replay.
- Publish to Redis after sending locally to reduce latency for the originating client.
Frequently Asked Questions
What if Redis goes down?
The servers continue to serve local clients, but messages don't cross server boundaries. Implement health checks and failover: if Redis is unreachable for 10 seconds, fallback to local-only broadcasting and alert ops. Once Redis recovers, resume normal operation.
Can I use Redis Cluster instead of a single instance?
Yes. Redis Cluster handles automatic sharding and failover. Update the connection string: redis://cluster-node1:6379,cluster-node2:6379. The redis-py library handles cluster topology automatically.
How do I monitor Redis pub/sub latency?
Publish a test message with a timestamp: {"type": "ping", "sent_at": time.time()}. Servers record the receive time and calculate latency. Plot this on a dashboard to detect network issues.
What's the throughput limit of Redis pub/sub?
A single Redis instance can handle millions of messages per second, but CPU becomes the bottleneck around 100K msg/sec. For higher throughput, use multiple Redis instances, one per topic, or consider Kafka.