GraphQL Subscriptions Python Guide
Subscriptions are the real-time operations in GraphQL—a client subscribes to events (new message, post updated, user online) and receives a stream of updates over a WebSocket connection. This article teaches you to define subscriptions in Strawberry, emit events from mutations, and handle WebSocket connections. By the end, you'll build a live chat API where clients receive messages in real-time.
I built my first subscription API in 2023 for a collaborative note-taking app. The cost was high—managing WebSocket connections, broadcasting events to thousands of clients, handling disconnects and reconnects—but the user experience was worth it: edits appeared instantly on every collaborator's screen. This article distills those lessons.
What are Subscriptions?
Subscriptions are long-lived connections (WebSocket) where a client listens for events. Unlike queries (client requests, server responds once) and mutations (client sends data, server processes once), subscriptions are bidirectional: the server pushes updates to the client over time.
Three GraphQL operation types:
| Operation | Direction | Use Case |
|---|---|---|
| Query | Client → Server (once) | Fetch data |
| Mutation | Client → Server (once), Server → Client (once) | Create/update data |
| Subscription | Server → Client (streaming) | Real-time updates |
Defining Subscriptions in Strawberry
Subscriptions are defined on the Subscription type with @strawberry.subscription. Instead of returning a single value, they return an async generator that yields values:
import strawberry
from typing import AsyncGenerator
@strawberry.type
class Message:
id: int
text: str
author: str
@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self) -> AsyncGenerator[Message, None]:
"""Subscribe to new messages in real-time."""
# This generator runs as long as the WebSocket connection is open.
# Yield a message each time one arrives.
# In a real app, listen to a message queue or event stream.
# For this example, we'll simulate receiving messages.
import asyncio
message_id = 1
while True:
await asyncio.sleep(2) # Simulate waiting for a new message.
yield Message(
id=message_id,
text=f"Message {message_id}",
author="User1"
)
message_id += 1
@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "Hello"
schema = strawberry.Schema(query=Query, subscription=Subscription)
To run with WebSocket support, use Strawberry's ASGI handler:
from strawberry.asgi import GraphQL
from fastapi import FastAPI
app = FastAPI()
graphql_app = GraphQL(schema)
app.add_route("/graphql", graphql_app)
app.add_websocket_route("/graphql", graphql_app)
A client (JavaScript, Python, etc.) subscribes:
subscription {
messages {
id
text
author
}
}
Over the WebSocket, the server streams messages:
{"data": {"messages": {"id": 1, "text": "Message 1", "author": "User1"}}}
{"data": {"messages": {"id": 2, "text": "Message 2", "author": "User1"}}}
Emitting Events from Mutations
Subscriptions need a way to be notified of events. A common pattern is to emit events in a global event queue:
import asyncio
from typing import Optional
# Global event queue (in production, use Redis, RabbitMQ, etc.).
message_queue = asyncio.Queue()
@strawberry.type
class Message:
id: int
text: str
author: str
@strawberry.type
class Mutation:
@strawberry.mutation
async def send_message(self, text: str, author: str) -> Message:
"""Send a message and broadcast it to subscribers."""
# Create the message.
message = Message(id=1, text=text, author=author)
# Emit it to the queue so subscribers receive it.
await message_queue.put(message)
return message
@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self) -> AsyncGenerator[Message, None]:
"""Subscribe to messages."""
while True:
# Wait for a message to be sent.
message = await message_queue.get()
yield message
@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "Hello"
schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)
Now, when a client sends mutation { sendMessage(text: "Hi", author: "Alice") { id text } }, all subscribers to subscription { messages { ... } } receive that message in real-time.
Filtered Subscriptions
Subscriptions can accept arguments to filter events:
import asyncio
from typing import AsyncGenerator
# Global event queue with typed events.
event_queue = asyncio.Queue()
@strawberry.type
class PostCreated:
post_id: int
title: str
author: str
@strawberry.type
class Subscription:
@strawberry.subscription
async def posts_by_author(self, author: str) -> AsyncGenerator[PostCreated, None]:
"""Subscribe to posts created by a specific author."""
while True:
event = await event_queue.get()
# Only yield events from the requested author.
if event.author == author:
yield event
@strawberry.type
class Mutation:
@strawberry.mutation
async def create_post(self, title: str, author: str) -> PostCreated:
"""Create a post and broadcast it."""
event = PostCreated(post_id=1, title=title, author=author)
await event_queue.put(event)
return event
schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)
Client subscribes to posts from one author:
subscription {
postsByAuthor(author: "Alice") {
postId
title
}
}
Only events with author == "Alice" are streamed to this subscriber.
Multiple Event Types: Using Unions
A subscription might emit different event types:
import strawberry
from typing import Union, AsyncGenerator
@strawberry.type
class UserOnline:
user_id: int
@strawberry.type
class UserOffline:
user_id: int
@strawberry.type
class MessageSent:
text: str
author_id: int
# Union of all possible events.
Event = strawberry.union("Event", (UserOnline, UserOffline, MessageSent))
# Global queue for all events.
event_queue = asyncio.Queue()
@strawberry.type
class Subscription:
@strawberry.subscription
async def room_events(self, room_id: int) -> AsyncGenerator[Event, None]:
"""Subscribe to all events in a room."""
while True:
event = await event_queue.get()
# Filter by room (store room_id in events).
yield event
@strawberry.type
class Mutation:
@strawberry.mutation
async def go_online(self, user_id: int) -> UserOnline:
"""Mark a user as online."""
event = UserOnline(user_id=user_id)
await event_queue.put(event)
return event
schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)
Client uses inline fragments to handle each event type:
subscription {
roomEvents(roomId: 1) {
... on UserOnline {
userId
}
... on UserOffline {
userId
}
... on MessageSent {
text
authorId
}
}
}
Production Patterns: Using Redis or RabbitMQ
In production, a single-process in-memory queue doesn't scale. Use Redis or RabbitMQ:
import aioredis
import strawberry
from typing import AsyncGenerator
async def get_redis():
return await aioredis.create_redis_pool("redis://localhost")
@strawberry.type
class Message:
text: str
@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self, channel: str, info: strawberry.Info) -> AsyncGenerator[Message, None]:
"""Subscribe to messages on a channel via Redis Pub/Sub."""
redis = info.context['redis']
# Subscribe to a Redis channel.
channels = await redis.subscribe(channel)
ch = channels[0]
# Yield messages as they arrive.
while True:
message = await ch.get()
if message:
yield Message(text=message.decode())
@strawberry.type
class Mutation:
@strawberry.mutation
async def post_message(
self,
channel: str,
text: str,
info: strawberry.Info
) -> Message:
"""Post a message to a channel."""
redis = info.context['redis']
await redis.publish(channel, text.encode())
return Message(text=text)
schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)
Redis Pub/Sub is lightweight and ideal for simple broadcast patterns. For complex routing (multiple rooms, presence tracking), use RabbitMQ or Kafka.
Handling Disconnect and Cleanup
When a WebSocket closes, the subscription generator should clean up:
@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self, room_id: int, info: strawberry.Info) -> AsyncGenerator[Message, None]:
"""Subscribe to messages in a room; cleanup on disconnect."""
redis = info.context['redis']
# Record user presence.
user_id = info.context.get('user_id')
await redis.sadd(f"room:{room_id}:users", user_id)
channels = await redis.subscribe(f"room:{room_id}")
ch = channels[0]
try:
while True:
message = await ch.get()
if message:
yield Message(text=message.decode())
finally:
# Cleanup when the client disconnects.
await redis.srem(f"room:{room_id}:users", user_id)
await redis.unsubscribe(f"room:{room_id}")
The try/finally ensures cleanup runs even if the connection drops unexpectedly.
Comparison: Polling vs Subscriptions
| Aspect | REST Polling | GraphQL Subscriptions |
|---|---|---|
| Latency | 1-5 seconds (poll interval) | Milliseconds (real-time) |
| Server load | High (many polls per client) | Low (one connection per client) |
| Bandwidth | High (repeated requests) | Low (events only) |
| Complexity | Simple to implement | Requires WebSocket + event system |
| Scalability | Difficult (many short-lived connections) | Better (persistent connections) |
Key Takeaways
- Subscriptions are long-lived WebSocket connections where the server streams events to clients.
- Define subscriptions with
@strawberry.subscriptionreturning anAsyncGenerator. - Emit events from mutations by putting them in a queue (asyncio.Queue, Redis Pub/Sub, etc.).
- Subscriptions can be filtered by arguments to deliver only relevant events.
- Use unions for subscriptions that emit multiple event types.
- In production, use Redis Pub/Sub or RabbitMQ instead of in-memory queues.
- Always clean up resources (unsubscribe, close channels) when a WebSocket closes.
Frequently Asked Questions
How many concurrent WebSocket connections can one server handle?
Depends on your infrastructure. A single async Python server can handle thousands of concurrent WebSocket connections (each ~10-100 KB memory). At scale, use a message broker (Redis, RabbitMQ) to broadcast events across multiple servers.
Can I combine queries, mutations, and subscriptions in one request?
No. A single GraphQL request is either a query, mutation, or subscription. However, a subscription can exist in the same schema as queries and mutations.
How do I subscribe to past events (history)?
Subscriptions start listening when the client connects; past events are lost. To include history, have the subscription emit recent events before streaming new ones. Or fetch history separately with a query.
What if a subscription yields errors?
Strawberry sends GraphQL errors in the subscription response, and the subscription continues. The client can parse errors and decide to close the connection or retry.