Skip to main content

Asyncio Semaphores: Control Concurrency with Rate Limits

An asyncio.Semaphore is a synchronization primitive that limits the number of tasks that can acquire it simultaneously. Semaphores are essential for rate limiting: preventing your code from overwhelming a server, database, or API with unbounded concurrent requests. A semaphore with value N allows at most N tasks to proceed; additional tasks wait until one releases the semaphore.

How Semaphores Work

A semaphore maintains an internal counter. When a task acquires the semaphore (via async with semaphore: or await semaphore.acquire()), the counter decrements. When it releases, the counter increments. If the counter reaches zero, waiting tasks are queued. This is the foundation of bounded concurrency.

Think of a semaphore as a parking garage with N spots. Each task is a car. When spots are available, cars enter immediately. When full, cars wait outside until one leaves.

import asyncio

async def fetch_data(semaphore, name, delay):
"""Acquire the semaphore, do work, then release it."""
async with semaphore:
print(f"{name} acquired semaphore")
await asyncio.sleep(delay) # Simulated work
print(f"{name} releasing semaphore")
return f"{name} done"

async def main():
# Limit to 2 concurrent tasks
semaphore = asyncio.Semaphore(2)

async with asyncio.TaskGroup() as tg:
for i in range(5):
tg.create_task(fetch_data(semaphore, f"task{i}", 1.0))

asyncio.run(main())

Output:

task0 acquired semaphore
task1 acquired semaphore
task2 acquired semaphore (waits ~1s, until task0 releases)
task3 acquired semaphore (waits ~1s, until task1 releases)
task4 acquired semaphore (waits ~1s, until task2 releases)
task0 releasing semaphore
task1 releasing semaphore
task2 releasing semaphore
task3 releasing semaphore
task4 releasing semaphore

The first two tasks acquire immediately. Tasks 3, 4, and 5 queue up, acquiring as earlier ones finish. Total execution time is roughly 3 seconds (two 1-second waves plus overhead), not 5 seconds (linear).

Practical Example: Rate-Limited API Calls

Semaphores are perfect for controlling API request rates. Many APIs allow only a fixed number of concurrent connections or requests per second. Here's a realistic pattern:

import asyncio
import aiohttp
import time

async def fetch_url(session, semaphore, url):
"""Fetch a URL with semaphore-based rate limiting."""
async with semaphore:
try:
async with session.get(url, timeout=5) as resp:
return await resp.text()
except asyncio.TimeoutError:
print(f"Timeout fetching {url}")
return None

async def batch_fetch(urls, max_concurrent=3):
"""Fetch multiple URLs with bounded concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)

async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_url(session, semaphore, url))
for url in urls
]

return [t.result() for t in tasks]

# Usage: fetch 10 URLs, at most 3 concurrently
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
start = time.time()
results = asyncio.run(batch_fetch(urls, max_concurrent=3))
elapsed = time.time() - start
print(f"Fetched {len(results)} URLs in {elapsed:.1f}s (max 3 concurrent)")

Expected runtime: roughly 4 seconds (10 requests / 3 concurrent = 4 batches, 1 second each, minus overlap). Without rate limiting, all 10 would run concurrently, potentially overwhelming the server.

BoundedSemaphore: Preventing Over-Release

asyncio.BoundedSemaphore is a variant that never exceeds its initial value. If a task calls release() too many times, it raises ValueError. This prevents accidental semaphore corruption:

import asyncio

async def demo_bounded():
# Semaphore with value 2
sem = asyncio.BoundedSemaphore(2)

await sem.acquire()
await sem.acquire()
print(f"Semaphore value: {sem._value}") # Internal counter

sem.release()
print(f"After one release: {sem._value}")

sem.release()
print(f"After second release: {sem._value}")

# This raises ValueError because value would exceed 2
try:
sem.release()
except ValueError as e:
print(f"Error: {e}")

asyncio.run(demo_bounded())

Output:

Semaphore value: 0
After one release: 1
After second release: 2
Error: Semaphore released too many times

Use BoundedSemaphore in production code to catch bugs where a task releases more than once.

Semaphore vs. Other Rate-Limiting Approaches

ApproachBest ForProsCons
SemaphoreLimiting concurrent operationsSimple, fair FIFO queueingPer-instance only
Token bucketRate limiting per time windowFlexible, allows burst trafficMore complex implementation
RateLimiter library (like limits)Complex policies (sliding window, jitter)Standards-based algorithmsOverhead for simple cases
Connection poolLimiting database/HTTP connectionsBuilt into librariesNot general-purpose

For most scenarios, a semaphore is sufficient and easier to understand.

Combining Semaphores with Timeouts

To prevent tasks from waiting indefinitely, combine semaphores with asyncio.wait_for():

import asyncio

async def work_with_timeout(semaphore, name, delay):
"""Acquire semaphore with a timeout."""
try:
# Wait up to 3 seconds to acquire the semaphore
async with asyncio.timeout(3): # Python 3.11+
async with semaphore:
print(f"{name} got semaphore after waiting")
await asyncio.sleep(delay)
except asyncio.TimeoutError:
print(f"{name} timed out waiting for semaphore")

async def main():
semaphore = asyncio.Semaphore(1)

async with asyncio.TaskGroup() as tg:
# First task holds the semaphore for 2s
tg.create_task(work_with_timeout(semaphore, "task1", 2.0))
# Second task waits; with 3s timeout, it succeeds
await asyncio.sleep(0.1) # Ensure task1 starts first
tg.create_task(work_with_timeout(semaphore, "task2", 0.5))
# Third task also waits; with 3s timeout, it succeeds
tg.create_task(work_with_timeout(semaphore, "task3", 0.5))

asyncio.run(main())

Wrapping the semaphore acquire in asyncio.timeout() prevents indefinite waits if the semaphore becomes starved.

Key Takeaways

  • Semaphores limit the number of concurrent tasks by maintaining a counter; acquiring decrements, releasing increments.
  • Use semaphores to control API request rates, limit database connections, or throttle any bounded resource.
  • asyncio.Semaphore(N) allows up to N concurrent acquisitions; BoundedSemaphore prevents over-release.
  • Combine semaphores with asyncio.timeout() or asyncio.wait_for() to prevent indefinite waits.
  • Semaphores are FIFO fair: waiting tasks acquire in the order they waited (no starvation).

Frequently Asked Questions

How do I set a per-task timeout for acquiring a semaphore?

Use async with asyncio.timeout(N): around the semaphore context. If acquisition takes longer than N seconds, it raises asyncio.TimeoutError.

Can I change a semaphore's value after creation?

No. Semaphore values are fixed at creation. To dynamically adjust concurrency, create a new semaphore and migrate tasks gradually (advanced pattern).

What's the difference between semaphore value and max_concurrent in libraries?

In asyncio, you set the initial value directly: Semaphore(5) allows 5 concurrent. In other libraries, max_concurrent=5 is syntactic sugar for the same behavior.

Do semaphores guarantee fairness (FIFO)?

Yes. Tasks are queued in the order they waited; the next waiting task always acquires when a semaphore is released. This prevents starvation.

Can I use a semaphore with multiple event loops?

No. Semaphores are not thread-safe and belong to a single event loop. For multi-loop scenarios, use thread-safe primitives like threading.Semaphore or multiprocessing.Semaphore.

Further Reading