Skip to main content

GraphQL Subscriptions Python Guide

Subscriptions are the real-time operations in GraphQL—a client subscribes to events (new message, post updated, user online) and receives a stream of updates over a WebSocket connection. This article teaches you to define subscriptions in Strawberry, emit events from mutations, and handle WebSocket connections. By the end, you'll build a live chat API where clients receive messages in real-time.

I built my first subscription API in 2023 for a collaborative note-taking app. The cost was high—managing WebSocket connections, broadcasting events to thousands of clients, handling disconnects and reconnects—but the user experience was worth it: edits appeared instantly on every collaborator's screen. This article distills those lessons.

What are Subscriptions?

Subscriptions are long-lived connections (WebSocket) where a client listens for events. Unlike queries (client requests, server responds once) and mutations (client sends data, server processes once), subscriptions are bidirectional: the server pushes updates to the client over time.

Three GraphQL operation types:

OperationDirectionUse Case
QueryClient → Server (once)Fetch data
MutationClient → Server (once), Server → Client (once)Create/update data
SubscriptionServer → Client (streaming)Real-time updates

Defining Subscriptions in Strawberry

Subscriptions are defined on the Subscription type with @strawberry.subscription. Instead of returning a single value, they return an async generator that yields values:

import strawberry
from typing import AsyncGenerator

@strawberry.type
class Message:
id: int
text: str
author: str

@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self) -> AsyncGenerator[Message, None]:
"""Subscribe to new messages in real-time."""
# This generator runs as long as the WebSocket connection is open.
# Yield a message each time one arrives.

# In a real app, listen to a message queue or event stream.
# For this example, we'll simulate receiving messages.
import asyncio

message_id = 1
while True:
await asyncio.sleep(2) # Simulate waiting for a new message.
yield Message(
id=message_id,
text=f"Message {message_id}",
author="User1"
)
message_id += 1

@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "Hello"

schema = strawberry.Schema(query=Query, subscription=Subscription)

To run with WebSocket support, use Strawberry's ASGI handler:

from strawberry.asgi import GraphQL
from fastapi import FastAPI

app = FastAPI()
graphql_app = GraphQL(schema)

app.add_route("/graphql", graphql_app)
app.add_websocket_route("/graphql", graphql_app)

A client (JavaScript, Python, etc.) subscribes:

subscription {
messages {
id
text
author
}
}

Over the WebSocket, the server streams messages:

{"data": {"messages": {"id": 1, "text": "Message 1", "author": "User1"}}}
{"data": {"messages": {"id": 2, "text": "Message 2", "author": "User1"}}}

Emitting Events from Mutations

Subscriptions need a way to be notified of events. A common pattern is to emit events in a global event queue:

import asyncio
from typing import Optional

# Global event queue (in production, use Redis, RabbitMQ, etc.).
message_queue = asyncio.Queue()

@strawberry.type
class Message:
id: int
text: str
author: str

@strawberry.type
class Mutation:
@strawberry.mutation
async def send_message(self, text: str, author: str) -> Message:
"""Send a message and broadcast it to subscribers."""
# Create the message.
message = Message(id=1, text=text, author=author)

# Emit it to the queue so subscribers receive it.
await message_queue.put(message)

return message

@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self) -> AsyncGenerator[Message, None]:
"""Subscribe to messages."""
while True:
# Wait for a message to be sent.
message = await message_queue.get()
yield message

@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "Hello"

schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)

Now, when a client sends mutation { sendMessage(text: "Hi", author: "Alice") { id text } }, all subscribers to subscription { messages { ... } } receive that message in real-time.

Filtered Subscriptions

Subscriptions can accept arguments to filter events:

import asyncio
from typing import AsyncGenerator

# Global event queue with typed events.
event_queue = asyncio.Queue()

@strawberry.type
class PostCreated:
post_id: int
title: str
author: str

@strawberry.type
class Subscription:
@strawberry.subscription
async def posts_by_author(self, author: str) -> AsyncGenerator[PostCreated, None]:
"""Subscribe to posts created by a specific author."""
while True:
event = await event_queue.get()
# Only yield events from the requested author.
if event.author == author:
yield event

@strawberry.type
class Mutation:
@strawberry.mutation
async def create_post(self, title: str, author: str) -> PostCreated:
"""Create a post and broadcast it."""
event = PostCreated(post_id=1, title=title, author=author)
await event_queue.put(event)
return event

schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)

Client subscribes to posts from one author:

subscription {
postsByAuthor(author: "Alice") {
postId
title
}
}

Only events with author == "Alice" are streamed to this subscriber.

Multiple Event Types: Using Unions

A subscription might emit different event types:

import strawberry
from typing import Union, AsyncGenerator

@strawberry.type
class UserOnline:
user_id: int

@strawberry.type
class UserOffline:
user_id: int

@strawberry.type
class MessageSent:
text: str
author_id: int

# Union of all possible events.
Event = strawberry.union("Event", (UserOnline, UserOffline, MessageSent))

# Global queue for all events.
event_queue = asyncio.Queue()

@strawberry.type
class Subscription:
@strawberry.subscription
async def room_events(self, room_id: int) -> AsyncGenerator[Event, None]:
"""Subscribe to all events in a room."""
while True:
event = await event_queue.get()
# Filter by room (store room_id in events).
yield event

@strawberry.type
class Mutation:
@strawberry.mutation
async def go_online(self, user_id: int) -> UserOnline:
"""Mark a user as online."""
event = UserOnline(user_id=user_id)
await event_queue.put(event)
return event

schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)

Client uses inline fragments to handle each event type:

subscription {
roomEvents(roomId: 1) {
... on UserOnline {
userId
}
... on UserOffline {
userId
}
... on MessageSent {
text
authorId
}
}
}

Production Patterns: Using Redis or RabbitMQ

In production, a single-process in-memory queue doesn't scale. Use Redis or RabbitMQ:

import aioredis
import strawberry
from typing import AsyncGenerator

async def get_redis():
return await aioredis.create_redis_pool("redis://localhost")

@strawberry.type
class Message:
text: str

@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self, channel: str, info: strawberry.Info) -> AsyncGenerator[Message, None]:
"""Subscribe to messages on a channel via Redis Pub/Sub."""
redis = info.context['redis']

# Subscribe to a Redis channel.
channels = await redis.subscribe(channel)
ch = channels[0]

# Yield messages as they arrive.
while True:
message = await ch.get()
if message:
yield Message(text=message.decode())

@strawberry.type
class Mutation:
@strawberry.mutation
async def post_message(
self,
channel: str,
text: str,
info: strawberry.Info
) -> Message:
"""Post a message to a channel."""
redis = info.context['redis']
await redis.publish(channel, text.encode())
return Message(text=text)

schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)

Redis Pub/Sub is lightweight and ideal for simple broadcast patterns. For complex routing (multiple rooms, presence tracking), use RabbitMQ or Kafka.

Handling Disconnect and Cleanup

When a WebSocket closes, the subscription generator should clean up:

@strawberry.type
class Subscription:
@strawberry.subscription
async def messages(self, room_id: int, info: strawberry.Info) -> AsyncGenerator[Message, None]:
"""Subscribe to messages in a room; cleanup on disconnect."""
redis = info.context['redis']

# Record user presence.
user_id = info.context.get('user_id')
await redis.sadd(f"room:{room_id}:users", user_id)

channels = await redis.subscribe(f"room:{room_id}")
ch = channels[0]

try:
while True:
message = await ch.get()
if message:
yield Message(text=message.decode())
finally:
# Cleanup when the client disconnects.
await redis.srem(f"room:{room_id}:users", user_id)
await redis.unsubscribe(f"room:{room_id}")

The try/finally ensures cleanup runs even if the connection drops unexpectedly.

Comparison: Polling vs Subscriptions

AspectREST PollingGraphQL Subscriptions
Latency1-5 seconds (poll interval)Milliseconds (real-time)
Server loadHigh (many polls per client)Low (one connection per client)
BandwidthHigh (repeated requests)Low (events only)
ComplexitySimple to implementRequires WebSocket + event system
ScalabilityDifficult (many short-lived connections)Better (persistent connections)

Key Takeaways

  • Subscriptions are long-lived WebSocket connections where the server streams events to clients.
  • Define subscriptions with @strawberry.subscription returning an AsyncGenerator.
  • Emit events from mutations by putting them in a queue (asyncio.Queue, Redis Pub/Sub, etc.).
  • Subscriptions can be filtered by arguments to deliver only relevant events.
  • Use unions for subscriptions that emit multiple event types.
  • In production, use Redis Pub/Sub or RabbitMQ instead of in-memory queues.
  • Always clean up resources (unsubscribe, close channels) when a WebSocket closes.

Frequently Asked Questions

How many concurrent WebSocket connections can one server handle?

Depends on your infrastructure. A single async Python server can handle thousands of concurrent WebSocket connections (each ~10-100 KB memory). At scale, use a message broker (Redis, RabbitMQ) to broadcast events across multiple servers.

Can I combine queries, mutations, and subscriptions in one request?

No. A single GraphQL request is either a query, mutation, or subscription. However, a subscription can exist in the same schema as queries and mutations.

How do I subscribe to past events (history)?

Subscriptions start listening when the client connects; past events are lost. To include history, have the subscription emit recent events before streaming new ones. Or fetch history separately with a query.

What if a subscription yields errors?

Strawberry sends GraphQL errors in the subscription response, and the subscription continues. The client can parse errors and decide to close the connection or retry.

Further Reading