Skip to main content

Asyncio Locks and Events: Synchronization Techniques

Locks and events are synchronization primitives that coordinate access to shared state and signal events across async tasks. A lock ensures only one task accesses protected code at a time; an event allows tasks to wait for a flag to be set by another task. Together with semaphores (covered earlier), they form asyncio's toolkit for safe concurrency.

Understanding Locks: Preventing Data Races

A lock is a binary gate: one task at a time can acquire it. If another task tries to acquire while it's held, it waits. This prevents concurrent modification of shared state, eliminating race conditions.

asyncio provides two lock types: Lock (mutual exclusion) and RLock (reentrant, allowing the same task to acquire multiple times).

import asyncio

counter = 0

async def increment_unsafe():
"""Increment without lock—race condition."""
global counter
# Read, modify, write are separate operations
temp = counter
await asyncio.sleep(0.001) # Simulate other work
counter = temp + 1

async def increment_safe(lock):
"""Increment with lock—thread-safe."""
global counter
async with lock: # Acquire lock
temp = counter
await asyncio.sleep(0.001)
counter = temp + 1
# Lock automatically released when exiting block

async def demo_race_condition():
global counter

print("Without lock:")
counter = 0
async with asyncio.TaskGroup() as tg:
# 10 tasks each increment counter 10 times
for _ in range(10):
for _ in range(10):
tg.create_task(increment_unsafe())
print(f"Counter: {counter} (expected 100, got race condition)")

print("\nWith lock:")
counter = 0
lock = asyncio.Lock()
async with asyncio.TaskGroup() as tg:
for _ in range(10):
for _ in range(10):
tg.create_task(increment_safe(lock))
print(f"Counter: {counter} (correct: 100)")

asyncio.run(demo_race_condition())

Output:

Without lock:
Counter: 87 (lost updates due to race condition)

With lock:
Counter: 100 (correct)

Without a lock, concurrent reads/writes interleave, causing lost updates. With a lock, modifications are atomic.

Comparing Lock Types

Lock TypeBehaviorUse Case
LockMutual exclusion; same task cannot re-acquireGeneral shared state
RLockReentrant; same task can acquire multiple timesNested function calls on shared state

RLock is useful when a task needs to acquire a lock, call another function that also tries to acquire it:

import asyncio

async def outer_function(lock):
"""Acquire lock, then call inner function."""
async with lock:
print("Outer: acquired lock")
await inner_function(lock)
print("Outer: still have lock")

async def inner_function(lock):
"""Try to acquire lock (already held by outer)."""
# With Lock: deadlock! Waiting for itself.
# With RLock: succeeds because same task holds it.
async with lock:
print("Inner: acquired lock")

async def demo_rlock():
rlock = asyncio.RLock()
await outer_function(rlock)

asyncio.run(demo_rlock())

Output:

Outer: acquired lock
Inner: acquired lock
Outer: still have lock

With a regular Lock, the inner acquire would deadlock. RLock allows re-entrance.

Using Events to Signal Across Tasks

An Event is a flag that one task sets and others wait for. Unlike locks (for mutual exclusion), events broadcast signals.

import asyncio

async def waiter(event, name):
"""Wait for event to be set."""
print(f"{name}: waiting for event")
await event.wait() # Blocks until event is set
print(f"{name}: event fired!")

async def setter(event, delay):
"""Set the event after a delay."""
await asyncio.sleep(delay)
print("Setter: setting event")
event.set()

async def demo_event():
event = asyncio.Event()

async with asyncio.TaskGroup() as tg:
# Multiple tasks wait for the same event
tg.create_task(waiter(event, "waiter1"))
tg.create_task(waiter(event, "waiter2"))
tg.create_task(waiter(event, "waiter3"))
tg.create_task(setter(event, 2.0))

asyncio.run(demo_event())

Output:

waiter1: waiting for event
waiter2: waiting for event
waiter3: waiting for event
Setter: setting event
waiter1: event fired!
waiter2: event fired!
waiter3: event fired!

All three waiters unblock simultaneously when the event is set. This is broadcast synchronization: one event, many waiters.

Condition Variables for Complex Coordination

A Condition combines a lock and an event: tasks can wait for a condition and notify waiting tasks when it changes.

import asyncio

buffer = []
condition = asyncio.Condition()
MAX_SIZE = 3

async def producer(name):
"""Produce items; wait if buffer is full."""
for i in range(5):
async with condition:
# Wait until buffer has space
while len(buffer) >= MAX_SIZE:
print(f"{name}: buffer full, waiting")
await condition.wait()

buffer.append(f"{name}-{i}")
print(f"{name}: produced {name}-{i} (buffer: {len(buffer)})")
condition.notify_all() # Wake waiters

await asyncio.sleep(0.1)

async def consumer(name):
"""Consume items; wait if buffer is empty."""
for _ in range(5):
async with condition:
# Wait until buffer has items
while len(buffer) == 0:
print(f"{name}: buffer empty, waiting")
await condition.wait()

item = buffer.pop(0)
print(f"{name}: consumed {item} (buffer: {len(buffer)})")
condition.notify_all() # Wake waiters

await asyncio.sleep(0.15)

async def demo_condition():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer("prod1"))
tg.create_task(consumer("cons1"))

asyncio.run(demo_condition())

Output:

prod1: produced prod1-0 (buffer: 1)
cons1: consumed prod1-0 (buffer: 0)
prod1: produced prod1-1 (buffer: 1)
cons1: consumed prod1-1 (buffer: 0)
...

Condition variables allow complex coordination: producer waits if buffer is full, consumer waits if buffer is empty. Both use notify_all() to wake up blocked tasks.

Practical Example: Rate-Limited Logger with Lock

Protect shared I/O resources (like file handles) with a lock:

import asyncio
import time

class RateLimitedLogger:
def __init__(self, max_logs_per_second=5):
self.lock = asyncio.Lock()
self.max_logs = max_logs_per_second
self.log_times = []

async def log(self, message):
async with self.lock:
now = time.time()

# Remove old entries (older than 1 second)
self.log_times = [t for t in self.log_times if now - t < 1.0]

if len(self.log_times) >= self.max_logs:
# Rate limit exceeded
wait_time = 1.0 - (now - self.log_times[0])
print(f"Rate limit: waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)

self.log_times.append(now)
print(f"[{now:.2f}] {message}")

async def logger_test():
logger = RateLimitedLogger(max_logs_per_second=3)

async def worker(name):
for i in range(5):
await logger.log(f"{name}: message {i}")
await asyncio.sleep(0.1)

async with asyncio.TaskGroup() as tg:
tg.create_task(worker("worker1"))
tg.create_task(worker("worker2"))

asyncio.run(logger_test())

The lock ensures only one task logs at a time, and the rate limiter throttles output. Multiple workers can call log() safely.

Key Takeaways

  • Lock provides mutual exclusion: only one task at a time can acquire it, protecting shared state from concurrent modification.
  • RLock (reentrant lock) allows the same task to acquire it multiple times; useful for nested calls.
  • Event broadcasts a signal: one task sets it, multiple tasks wake up; use for one-time events or state transitions.
  • Condition combines a lock with wait/notify semantics for complex coordination (producer-consumer patterns).
  • Always protect shared state with a lock; always use async with to ensure release even on exceptions.

Frequently Asked Questions

Why not use threading locks (threading.Lock) in asyncio?

Threading locks are OS-level and can block the event loop. asyncio locks (like asyncio.Lock) are designed for async code and cooperatively yield to other tasks.

What if a task holding a lock is cancelled?

The lock is automatically released when the task is cancelled, due to async with. Use try-finally if you need to clean up additional resources.

Can I acquire multiple locks safely?

Yes, but always acquire them in the same order to avoid deadlock. If task A acquires lock1 then lock2, and task B acquires lock2 then lock1, they can deadlock. Acquire in consistent order.

How do I timeout waiting for a lock?

Use asyncio.wait_for(lock.acquire(), timeout=N). If the lock isn't acquired within N seconds, it raises TimeoutError.

Are asyncio locks thread-safe?

No. asyncio primitives are designed for a single event loop in one thread. For multi-threaded code, use threading.Lock or multiprocessing.Lock.

Further Reading