Skip to main content

Asyncio Queues: Build Async Producer-Consumer Patterns

An asyncio.Queue is a thread-safe, async-friendly buffer that decouples producer tasks (adding work) from consumer tasks (processing work). Queues are the backbone of scalable async systems: producers enqueue work fast, consumers process at their own pace, and the queue buffers the difference. This pattern eliminates rigid synchronization and allows flexible concurrency ratios.

The Producer-Consumer Pattern

In a producer-consumer system, one or more producers add items to a queue, and one or more consumers retrieve and process items. The queue handles backpressure: if the queue is full, producers wait; if empty, consumers wait. This decoupling lets each component scale independently.

Classic use case: a web scraper where producers fetch HTML from URLs and consumers parse it. Producers and consumers run at different speeds; the queue absorbs the difference without blocking the faster component.

import asyncio

async def producer(queue, name, count):
"""Add items to the queue."""
for i in range(count):
item = f"{name}-{i}"
await queue.put(item) # Blocks if queue is full
print(f"Produced: {item}")
await asyncio.sleep(0.1)

async def consumer(queue, name):
"""Retrieve and process items from the queue."""
while True:
item = await queue.get() # Blocks if queue is empty
print(f"{name} processing: {item}")
await asyncio.sleep(0.3) # Simulated work
queue.task_done() # Signal completion

async def main():
queue = asyncio.Queue(maxsize=5) # Buffer up to 5 items

async with asyncio.TaskGroup() as tg:
# Create 2 producers and 3 consumers
tg.create_task(producer(queue, "prod1", 10))
tg.create_task(producer(queue, "prod2", 10))
tg.create_task(consumer(queue, "cons1"))
tg.create_task(consumer(queue, "cons2"))
tg.create_task(consumer(queue, "cons3"))

# Wait for all items to be processed
await queue.join()

# Stop consumers (they run forever; need manual cancellation)
# In production, use a sentinel value or CancelledError

asyncio.run(main())

Output (abbreviated):

Produced: prod1-0
Produced: prod2-0
prod1 processing: prod1-0
prod2 processing: prod2-0
prod1 processing: prod1-1
...

Producers add items faster than consumers initially, so the queue fills to 5 items. Consumers then drain it steadily. queue.join() waits until all items have been processed (marked with task_done()).

Queue Types and Sizes

asyncio offers three queue types, distinguished by how items are retrieved:

Queue TypeBehaviorUse Case
QueueFIFO (first in, first out)Standard producer-consumer
LifoQueueLIFO (last in, first out, like a stack)Task scheduling, backtracking
PriorityQueueOrdered by priority (lowest first)Task prioritization, event processing

Each accepts a maxsize parameter. Zero (default) = unbounded queue:

import asyncio

async def demo_priority_queue():
queue = asyncio.PriorityQueue()

# Items are (priority, data) tuples; lower priority first
await queue.put((3, "low-priority"))
await queue.put((1, "high-priority"))
await queue.put((2, "medium-priority"))

print(await queue.get()) # (1, "high-priority")
print(await queue.get()) # (2, "medium-priority")
print(await queue.get()) # (3, "low-priority")

asyncio.run(demo_priority_queue())

Use PriorityQueue for systems where some work is more urgent than other work.

Handling Queue Overflow and Underflow

When a queue is full (maxsize > 0), put() blocks. When empty, get() blocks. This backpressure is useful, but sometimes you need non-blocking alternatives:

import asyncio

async def demo_nonblocking():
queue = asyncio.Queue(maxsize=2)

# put_nowait raises if full; put awaits
queue.put_nowait("item1")
queue.put_nowait("item2")

try:
queue.put_nowait("item3") # Raises QueueFull
except asyncio.QueueFull:
print("Queue is full; enqueue blocked")

# get_nowait raises if empty; get awaits
print(await queue.get()) # "item1"

try:
print(queue.get_nowait()) # Raises QueueEmpty if no item ready
except asyncio.QueueEmpty:
print("Queue is empty")

asyncio.run(demo_nonblocking())

Use put_nowait() and get_nowait() in code paths where blocking is unacceptable (e.g., signal handlers, timeouts).

Practical Example: Web Scraper with Queue-Based Concurrency

Here's a realistic pattern: fetch URLs concurrently, parse results, and save to a database, with different concurrency limits per stage:

import asyncio

async def fetch_url(url):
"""Simulate fetching a URL."""
await asyncio.sleep(0.5)
return f"<html>{url}</html>"

async def parse_html(html):
"""Simulate parsing HTML."""
await asyncio.sleep(0.2)
return f"parsed: {html[:20]}"

async def save_data(data):
"""Simulate saving to database."""
await asyncio.sleep(0.1)
return f"saved: {data}"

async def fetcher(url_queue, html_queue):
"""Fetch URLs and enqueue HTML."""
while True:
try:
url = url_queue.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(0.1)
continue

html = await fetch_url(url)
await html_queue.put(html)
url_queue.task_done()

async def parser(html_queue, data_queue):
"""Parse HTML and enqueue data."""
while True:
try:
html = html_queue.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(0.1)
continue

data = await parse_html(html)
await data_queue.put(data)
html_queue.task_done()

async def saver(data_queue):
"""Save data to database."""
while True:
try:
data = data_queue.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(0.1)
continue

await save_data(data)
data_queue.task_done()

async def main():
url_queue = asyncio.Queue()
html_queue = asyncio.Queue(maxsize=10)
data_queue = asyncio.Queue(maxsize=5)

# Populate URL queue
urls = [f"http://example.com/{i}" for i in range(20)]
for url in urls:
await url_queue.put(url)

async with asyncio.TaskGroup() as tg:
# 3 fetchers, 2 parsers, 1 saver
for _ in range(3):
tg.create_task(fetcher(url_queue, html_queue))
for _ in range(2):
tg.create_task(parser(html_queue, data_queue))
tg.create_task(saver(data_queue))

# Wait for all stages to complete
await url_queue.join()
await html_queue.join()
await data_queue.join()

asyncio.run(main())

This pipeline processes URLs in parallel stages: 3 fetchers concurrently download, 2 parsers process fetched HTML, and 1 saver commits to the database. Queue sizes buffer each stage, preventing fast stages from overwhelming slow ones.

Sentinel Values for Clean Shutdown

Consumers run forever; to stop them gracefully, use a sentinel value (a special marker) that signals "no more work":

import asyncio

async def producer(queue):
for i in range(5):
await queue.put(f"item-{i}")
# Signal end of work
await queue.put(None)

async def consumer(queue, name):
while True:
item = await queue.get()
if item is None: # Sentinel value
print(f"{name} exiting")
queue.task_done()
break
print(f"{name}: {item}")
queue.task_done()

async def main():
queue = asyncio.Queue()

async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue))
tg.create_task(consumer(queue, "cons1"))
tg.create_task(consumer(queue, "cons2"))

asyncio.run(main())

When consumers receive None, they exit. This pattern ensures no tasks outlive their purpose.

Key Takeaways

  • Queues decouple producers from consumers, allowing flexible concurrency ratios and backpressure handling.
  • asyncio.Queue is FIFO; LifoQueue is LIFO; PriorityQueue orders by priority—choose based on task semantics.
  • put() and get() block when full/empty; put_nowait() and get_nowait() raise exceptions instead.
  • queue.join() waits until all items have been processed (via task_done()); essential for coordination.
  • Use sentinel values (e.g., None) to signal consumer shutdown and guarantee clean termination.

Frequently Asked Questions

How do I know when a queue is empty or full?

Check queue.empty() or queue.full(), but note these are snapshots—conditions may change immediately. For safe non-blocking operations, use get_nowait() and put_nowait() with exception handling.

Can multiple producers and consumers share one queue?

Yes. Queues are designed for this. All producers and consumers that access the queue see the same enqueued items, and task_done() counts work globally.

What if a consumer crashes before calling task_done()?

The queue will hang on join() because the count of undone items never reaches zero. Wrap consumer code in try-finally to ensure task_done() is called even on failure.

How large should maxsize be?

It depends on memory and desired latency. Small (e.g., 5-10) keeps memory low and catches bottlenecks early. Large (e.g., 1000) buffers bursts but may hide slow consumers. Experiment with your workload.

Can I access queue contents without removing them?

No. Queues are designed for FIFO/LIFO/priority ordering, not random access. If you need to inspect items, use a different data structure or dequeue and re-enqueue (inefficient).

Further Reading