Asyncio Semaphores: Control Concurrency with Rate Limits
An asyncio.Semaphore is a synchronization primitive that limits the number of tasks that can acquire it simultaneously. Semaphores are essential for rate limiting: preventing your code from overwhelming a server, database, or API with unbounded concurrent requests. A semaphore with value N allows at most N tasks to proceed; additional tasks wait until one releases the semaphore.
How Semaphores Work
A semaphore maintains an internal counter. When a task acquires the semaphore (via async with semaphore: or await semaphore.acquire()), the counter decrements. When it releases, the counter increments. If the counter reaches zero, waiting tasks are queued. This is the foundation of bounded concurrency.
Think of a semaphore as a parking garage with N spots. Each task is a car. When spots are available, cars enter immediately. When full, cars wait outside until one leaves.
import asyncio
async def fetch_data(semaphore, name, delay):
"""Acquire the semaphore, do work, then release it."""
async with semaphore:
print(f"{name} acquired semaphore")
await asyncio.sleep(delay) # Simulated work
print(f"{name} releasing semaphore")
return f"{name} done"
async def main():
# Limit to 2 concurrent tasks
semaphore = asyncio.Semaphore(2)
async with asyncio.TaskGroup() as tg:
for i in range(5):
tg.create_task(fetch_data(semaphore, f"task{i}", 1.0))
asyncio.run(main())
Output:
task0 acquired semaphore
task1 acquired semaphore
task2 acquired semaphore (waits ~1s, until task0 releases)
task3 acquired semaphore (waits ~1s, until task1 releases)
task4 acquired semaphore (waits ~1s, until task2 releases)
task0 releasing semaphore
task1 releasing semaphore
task2 releasing semaphore
task3 releasing semaphore
task4 releasing semaphore
The first two tasks acquire immediately. Tasks 3, 4, and 5 queue up, acquiring as earlier ones finish. Total execution time is roughly 3 seconds (two 1-second waves plus overhead), not 5 seconds (linear).
Practical Example: Rate-Limited API Calls
Semaphores are perfect for controlling API request rates. Many APIs allow only a fixed number of concurrent connections or requests per second. Here's a realistic pattern:
import asyncio
import aiohttp
import time
async def fetch_url(session, semaphore, url):
"""Fetch a URL with semaphore-based rate limiting."""
async with semaphore:
try:
async with session.get(url, timeout=5) as resp:
return await resp.text()
except asyncio.TimeoutError:
print(f"Timeout fetching {url}")
return None
async def batch_fetch(urls, max_concurrent=3):
"""Fetch multiple URLs with bounded concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_url(session, semaphore, url))
for url in urls
]
return [t.result() for t in tasks]
# Usage: fetch 10 URLs, at most 3 concurrently
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
start = time.time()
results = asyncio.run(batch_fetch(urls, max_concurrent=3))
elapsed = time.time() - start
print(f"Fetched {len(results)} URLs in {elapsed:.1f}s (max 3 concurrent)")
Expected runtime: roughly 4 seconds (10 requests / 3 concurrent = 4 batches, 1 second each, minus overlap). Without rate limiting, all 10 would run concurrently, potentially overwhelming the server.
BoundedSemaphore: Preventing Over-Release
asyncio.BoundedSemaphore is a variant that never exceeds its initial value. If a task calls release() too many times, it raises ValueError. This prevents accidental semaphore corruption:
import asyncio
async def demo_bounded():
# Semaphore with value 2
sem = asyncio.BoundedSemaphore(2)
await sem.acquire()
await sem.acquire()
print(f"Semaphore value: {sem._value}") # Internal counter
sem.release()
print(f"After one release: {sem._value}")
sem.release()
print(f"After second release: {sem._value}")
# This raises ValueError because value would exceed 2
try:
sem.release()
except ValueError as e:
print(f"Error: {e}")
asyncio.run(demo_bounded())
Output:
Semaphore value: 0
After one release: 1
After second release: 2
Error: Semaphore released too many times
Use BoundedSemaphore in production code to catch bugs where a task releases more than once.
Semaphore vs. Other Rate-Limiting Approaches
| Approach | Best For | Pros | Cons |
|---|---|---|---|
| Semaphore | Limiting concurrent operations | Simple, fair FIFO queueing | Per-instance only |
| Token bucket | Rate limiting per time window | Flexible, allows burst traffic | More complex implementation |
RateLimiter library (like limits) | Complex policies (sliding window, jitter) | Standards-based algorithms | Overhead for simple cases |
| Connection pool | Limiting database/HTTP connections | Built into libraries | Not general-purpose |
For most scenarios, a semaphore is sufficient and easier to understand.
Combining Semaphores with Timeouts
To prevent tasks from waiting indefinitely, combine semaphores with asyncio.wait_for():
import asyncio
async def work_with_timeout(semaphore, name, delay):
"""Acquire semaphore with a timeout."""
try:
# Wait up to 3 seconds to acquire the semaphore
async with asyncio.timeout(3): # Python 3.11+
async with semaphore:
print(f"{name} got semaphore after waiting")
await asyncio.sleep(delay)
except asyncio.TimeoutError:
print(f"{name} timed out waiting for semaphore")
async def main():
semaphore = asyncio.Semaphore(1)
async with asyncio.TaskGroup() as tg:
# First task holds the semaphore for 2s
tg.create_task(work_with_timeout(semaphore, "task1", 2.0))
# Second task waits; with 3s timeout, it succeeds
await asyncio.sleep(0.1) # Ensure task1 starts first
tg.create_task(work_with_timeout(semaphore, "task2", 0.5))
# Third task also waits; with 3s timeout, it succeeds
tg.create_task(work_with_timeout(semaphore, "task3", 0.5))
asyncio.run(main())
Wrapping the semaphore acquire in asyncio.timeout() prevents indefinite waits if the semaphore becomes starved.
Key Takeaways
- Semaphores limit the number of concurrent tasks by maintaining a counter; acquiring decrements, releasing increments.
- Use semaphores to control API request rates, limit database connections, or throttle any bounded resource.
asyncio.Semaphore(N)allows up toNconcurrent acquisitions;BoundedSemaphoreprevents over-release.- Combine semaphores with
asyncio.timeout()orasyncio.wait_for()to prevent indefinite waits. - Semaphores are FIFO fair: waiting tasks acquire in the order they waited (no starvation).
Frequently Asked Questions
How do I set a per-task timeout for acquiring a semaphore?
Use async with asyncio.timeout(N): around the semaphore context. If acquisition takes longer than N seconds, it raises asyncio.TimeoutError.
Can I change a semaphore's value after creation?
No. Semaphore values are fixed at creation. To dynamically adjust concurrency, create a new semaphore and migrate tasks gradually (advanced pattern).
What's the difference between semaphore value and max_concurrent in libraries?
In asyncio, you set the initial value directly: Semaphore(5) allows 5 concurrent. In other libraries, max_concurrent=5 is syntactic sugar for the same behavior.
Do semaphores guarantee fairness (FIFO)?
Yes. Tasks are queued in the order they waited; the next waiting task always acquires when a semaphore is released. This prevents starvation.
Can I use a semaphore with multiple event loops?
No. Semaphores are not thread-safe and belong to a single event loop. For multi-loop scenarios, use thread-safe primitives like threading.Semaphore or multiprocessing.Semaphore.