Asyncio Locks and Events: Synchronization Techniques
Locks and events are synchronization primitives that coordinate access to shared state and signal events across async tasks. A lock ensures only one task accesses protected code at a time; an event allows tasks to wait for a flag to be set by another task. Together with semaphores (covered earlier), they form asyncio's toolkit for safe concurrency.
Understanding Locks: Preventing Data Races
A lock is a binary gate: one task at a time can acquire it. If another task tries to acquire while it's held, it waits. This prevents concurrent modification of shared state, eliminating race conditions.
asyncio provides two lock types: Lock (mutual exclusion) and RLock (reentrant, allowing the same task to acquire multiple times).
import asyncio
counter = 0
async def increment_unsafe():
"""Increment without lock—race condition."""
global counter
# Read, modify, write are separate operations
temp = counter
await asyncio.sleep(0.001) # Simulate other work
counter = temp + 1
async def increment_safe(lock):
"""Increment with lock—thread-safe."""
global counter
async with lock: # Acquire lock
temp = counter
await asyncio.sleep(0.001)
counter = temp + 1
# Lock automatically released when exiting block
async def demo_race_condition():
global counter
print("Without lock:")
counter = 0
async with asyncio.TaskGroup() as tg:
# 10 tasks each increment counter 10 times
for _ in range(10):
for _ in range(10):
tg.create_task(increment_unsafe())
print(f"Counter: {counter} (expected 100, got race condition)")
print("\nWith lock:")
counter = 0
lock = asyncio.Lock()
async with asyncio.TaskGroup() as tg:
for _ in range(10):
for _ in range(10):
tg.create_task(increment_safe(lock))
print(f"Counter: {counter} (correct: 100)")
asyncio.run(demo_race_condition())
Output:
Without lock:
Counter: 87 (lost updates due to race condition)
With lock:
Counter: 100 (correct)
Without a lock, concurrent reads/writes interleave, causing lost updates. With a lock, modifications are atomic.
Comparing Lock Types
| Lock Type | Behavior | Use Case |
|---|---|---|
Lock | Mutual exclusion; same task cannot re-acquire | General shared state |
RLock | Reentrant; same task can acquire multiple times | Nested function calls on shared state |
RLock is useful when a task needs to acquire a lock, call another function that also tries to acquire it:
import asyncio
async def outer_function(lock):
"""Acquire lock, then call inner function."""
async with lock:
print("Outer: acquired lock")
await inner_function(lock)
print("Outer: still have lock")
async def inner_function(lock):
"""Try to acquire lock (already held by outer)."""
# With Lock: deadlock! Waiting for itself.
# With RLock: succeeds because same task holds it.
async with lock:
print("Inner: acquired lock")
async def demo_rlock():
rlock = asyncio.RLock()
await outer_function(rlock)
asyncio.run(demo_rlock())
Output:
Outer: acquired lock
Inner: acquired lock
Outer: still have lock
With a regular Lock, the inner acquire would deadlock. RLock allows re-entrance.
Using Events to Signal Across Tasks
An Event is a flag that one task sets and others wait for. Unlike locks (for mutual exclusion), events broadcast signals.
import asyncio
async def waiter(event, name):
"""Wait for event to be set."""
print(f"{name}: waiting for event")
await event.wait() # Blocks until event is set
print(f"{name}: event fired!")
async def setter(event, delay):
"""Set the event after a delay."""
await asyncio.sleep(delay)
print("Setter: setting event")
event.set()
async def demo_event():
event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
# Multiple tasks wait for the same event
tg.create_task(waiter(event, "waiter1"))
tg.create_task(waiter(event, "waiter2"))
tg.create_task(waiter(event, "waiter3"))
tg.create_task(setter(event, 2.0))
asyncio.run(demo_event())
Output:
waiter1: waiting for event
waiter2: waiting for event
waiter3: waiting for event
Setter: setting event
waiter1: event fired!
waiter2: event fired!
waiter3: event fired!
All three waiters unblock simultaneously when the event is set. This is broadcast synchronization: one event, many waiters.
Condition Variables for Complex Coordination
A Condition combines a lock and an event: tasks can wait for a condition and notify waiting tasks when it changes.
import asyncio
buffer = []
condition = asyncio.Condition()
MAX_SIZE = 3
async def producer(name):
"""Produce items; wait if buffer is full."""
for i in range(5):
async with condition:
# Wait until buffer has space
while len(buffer) >= MAX_SIZE:
print(f"{name}: buffer full, waiting")
await condition.wait()
buffer.append(f"{name}-{i}")
print(f"{name}: produced {name}-{i} (buffer: {len(buffer)})")
condition.notify_all() # Wake waiters
await asyncio.sleep(0.1)
async def consumer(name):
"""Consume items; wait if buffer is empty."""
for _ in range(5):
async with condition:
# Wait until buffer has items
while len(buffer) == 0:
print(f"{name}: buffer empty, waiting")
await condition.wait()
item = buffer.pop(0)
print(f"{name}: consumed {item} (buffer: {len(buffer)})")
condition.notify_all() # Wake waiters
await asyncio.sleep(0.15)
async def demo_condition():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer("prod1"))
tg.create_task(consumer("cons1"))
asyncio.run(demo_condition())
Output:
prod1: produced prod1-0 (buffer: 1)
cons1: consumed prod1-0 (buffer: 0)
prod1: produced prod1-1 (buffer: 1)
cons1: consumed prod1-1 (buffer: 0)
...
Condition variables allow complex coordination: producer waits if buffer is full, consumer waits if buffer is empty. Both use notify_all() to wake up blocked tasks.
Practical Example: Rate-Limited Logger with Lock
Protect shared I/O resources (like file handles) with a lock:
import asyncio
import time
class RateLimitedLogger:
def __init__(self, max_logs_per_second=5):
self.lock = asyncio.Lock()
self.max_logs = max_logs_per_second
self.log_times = []
async def log(self, message):
async with self.lock:
now = time.time()
# Remove old entries (older than 1 second)
self.log_times = [t for t in self.log_times if now - t < 1.0]
if len(self.log_times) >= self.max_logs:
# Rate limit exceeded
wait_time = 1.0 - (now - self.log_times[0])
print(f"Rate limit: waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
self.log_times.append(now)
print(f"[{now:.2f}] {message}")
async def logger_test():
logger = RateLimitedLogger(max_logs_per_second=3)
async def worker(name):
for i in range(5):
await logger.log(f"{name}: message {i}")
await asyncio.sleep(0.1)
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("worker1"))
tg.create_task(worker("worker2"))
asyncio.run(logger_test())
The lock ensures only one task logs at a time, and the rate limiter throttles output. Multiple workers can call log() safely.
Key Takeaways
Lockprovides mutual exclusion: only one task at a time can acquire it, protecting shared state from concurrent modification.RLock(reentrant lock) allows the same task to acquire it multiple times; useful for nested calls.Eventbroadcasts a signal: one task sets it, multiple tasks wake up; use for one-time events or state transitions.Conditioncombines a lock with wait/notify semantics for complex coordination (producer-consumer patterns).- Always protect shared state with a lock; always use
async withto ensure release even on exceptions.
Frequently Asked Questions
Why not use threading locks (threading.Lock) in asyncio?
Threading locks are OS-level and can block the event loop. asyncio locks (like asyncio.Lock) are designed for async code and cooperatively yield to other tasks.
What if a task holding a lock is cancelled?
The lock is automatically released when the task is cancelled, due to async with. Use try-finally if you need to clean up additional resources.
Can I acquire multiple locks safely?
Yes, but always acquire them in the same order to avoid deadlock. If task A acquires lock1 then lock2, and task B acquires lock2 then lock1, they can deadlock. Acquire in consistent order.
How do I timeout waiting for a lock?
Use asyncio.wait_for(lock.acquire(), timeout=N). If the lock isn't acquired within N seconds, it raises TimeoutError.
Are asyncio locks thread-safe?
No. asyncio primitives are designed for a single event loop in one thread. For multi-threaded code, use threading.Lock or multiprocessing.Lock.