Real-Time Chat Server: Full Implementation
A real-time chat server is the canonical WebSocket application: users connect, receive a list of active participants, send and receive messages instantly, and see join/leave notifications. Building one teaches connection lifecycle management, message serialization, and broadcast patterns—skills that apply to dashboards, notifications, and collaborative tools. This article implements a complete, production-ready chat server with a browser client in under 200 lines of Python.
Server Architecture and Data Model
The chat server tracks users by a unique client ID and stores messages with metadata (sender, timestamp, text). Each connection is associated with a user; when a client joins, all peers are notified; when a message arrives, it's broadcast to the room:
from fastapi import FastAPI, WebSocket, Query
from fastapi.responses import HTMLResponse
from datetime import datetime
from typing import List, Set
import json
app = FastAPI()
class User:
def __init__(self, client_id: str, username: str, websocket: WebSocket):
self.client_id = client_id
self.username = username
self.websocket = websocket
self.joined_at = datetime.now()
class ChatManager:
def __init__(self):
self.active_users: dict[str, User] = {} # client_id -> User
self.message_history: List[dict] = []
async def connect(self, client_id: str, username: str, websocket: WebSocket):
await websocket.accept()
user = User(client_id, username, websocket)
self.active_users[client_id] = user
# Notify all users of the join
await self.broadcast({
"type": "user_joined",
"username": username,
"active_users": [u.username for u in self.active_users.values()]
})
def disconnect(self, client_id: str):
if client_id in self.active_users:
user = self.active_users.pop(client_id)
return user.username
return None
async def broadcast(self, message: dict):
disconnected = []
for client_id, user in self.active_users.items():
try:
await user.websocket.send_json(message)
except Exception:
disconnected.append(client_id)
# Clean up failed connections
for client_id in disconnected:
self.active_users.pop(client_id, None)
async def handle_message(self, client_id: str, text: str):
if client_id not in self.active_users:
return
user = self.active_users[client_id]
message = {
"type": "chat_message",
"username": user.username,
"text": text,
"timestamp": datetime.now().isoformat()
}
self.message_history.append(message)
await self.broadcast(message)
manager = ChatManager()
@app.get("/")
async def get():
return HTMLResponse("""
<html>
<body>
<h1>WebSocket Chat</h1>
<div id="messages" style="border: 1px solid black; height: 400px; overflow-y: auto; padding: 10px;"></div>
<input type="text" id="msg" placeholder="Type a message..." />
<button onclick="sendMsg()">Send</button>
<div id="status"></div>
<script>
const params = new URLSearchParams(window.location.search);
const username = params.get('user') || 'User' + Math.floor(Math.random() * 1000);
const ws = new WebSocket(`ws://localhost:8000/ws?username=${encodeURIComponent(username)}&client_id=${Math.random()}`);
ws.onopen = () => {
document.getElementById('status').innerText = `Connected as ${username}`;
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
const div = document.createElement("div");
if (msg.type === 'user_joined') {
div.style.color = 'blue';
div.innerText = `${msg.username} joined. Active: ${msg.active_users.join(', ')}`;
} else if (msg.type === 'chat_message') {
div.innerText = `${msg.username}: ${msg.text}`;
}
document.getElementById('messages').appendChild(div);
document.getElementById('messages').scrollTop = document.getElementById('messages').scrollHeight;
};
function sendMsg() {
const msg = document.getElementById('msg').value;
if (msg) {
ws.send(msg);
document.getElementById('msg').value = '';
}
}
</script>
</body>
</html>
""")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, username: str = Query(...), client_id: str = Query(...)):
await manager.connect(client_id, username, websocket)
try:
while True:
data = await websocket.receive_text()
await manager.handle_message(client_id, data)
except Exception as e:
username = manager.disconnect(client_id)
if username:
await manager.broadcast({
"type": "user_left",
"username": username,
"active_users": [u.username for u in manager.active_users.values()]
})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Message Protocol: JSON for Flexibility
Raw text messages are simple but unstructured. Using JSON allows typed messages: a user_joined event carries different data than a chat_message. The client parses the type field and renders accordingly. This pattern scales to typing indicators, presence updates, and custom events without protocol changes.
User Lifecycle: Connect, Notify, Disconnect
When a user connects, the server creates a User object storing their ID, name, and WebSocket. It then broadcasts a user_joined message to all users (including the new one) with the current active user list. When disconnect happens (client closes, network fails, or exception), the server removes the user and broadcasts user_left. This ensures all clients stay synchronized.
The try/except around receive_text() catches WebSocketDisconnect, which Starlette raises when the client closes. Rather than letting the exception crash, we catch it, clean up, and notify peers.
Testing Multi-User Chat
Open http://localhost:8000?user=Alice in one tab and http://localhost:8000?user=Bob in another. Type a message in one tab; it appears instantly in both. Close a tab; the other sees "Bob left." This confirms the full lifecycle is working.
Avoiding Lost Messages During Broadcast Failures
The broadcast() method wraps each send_json() in try/except because a client might disconnect mid-broadcast (e.g., their connection drops while the server is sending). Rather than failing entirely, we collect disconnected client IDs and remove them after the loop, ensuring other clients receive the message. This is crucial for reliability: one bad connection shouldn't silence everyone.
Scaling Chat to Multiple Rooms
The current implementation is a global room. To add multiple chat rooms (e.g., #general, #random), associate each user with a room ID and broadcast only within that room:
async def connect(self, client_id: str, username: str, websocket: WebSocket, room: str = "general"):
# ... (same as before)
self.active_users[client_id] = user
user.room = room
async def broadcast(self, message: dict, room: str = "general"):
disconnected = []
for client_id, user in self.active_users.items():
if user.room != room:
continue
# ... (same send logic)
Then update the WebSocket endpoint to accept a room query parameter and pass it to connect() and handle_message().
Key Takeaways
- A chat server tracks users by ID and stores their WebSocket connection; disconnect handling is critical for stability.
- JSON-typed messages allow rich, self-documenting protocols without binary serialization overhead.
- Broadcasting to all users except on failure ensures real-time updates reach the majority even if one connection is broken.
- User join/leave notifications keep clients in sync with the active population.
- The pattern is easily extended to rooms, presence, and application-specific events.
Frequently Asked Questions
How do I prevent the same username from joining twice?
Before accept(), check if the username exists in active_users: if any(u.username == username for u in self.active_users.values()): await websocket.close(code=1008). Code 1008 is "policy violation." The client receives the rejection immediately.
Can I persist messages to a database?
Yes. When handle_message() saves to message_history, also insert into a database (PostgreSQL, MongoDB, etc.) within the same transaction. On server restart, load the history from the database and optionally send it to newly connected clients as context.
How do I limit message length to prevent spam?
Before broadcasting, check: if len(text) > 500: await websocket.send_json({"error": "Message too long"}); return. You can also rate-limit per user by tracking timestamps and rejecting if too many messages arrive in a short window.
What if a user is idle for a long time?
Implement a ping/pong mechanism: every 30 seconds, send a ping frame; if the client doesn't respond within 60 seconds, assume it's dead and disconnect. FastAPI's WebSocket handles most of this automatically, but explicitly managing idle timeout with asyncio timers is also possible.