Synchronization and Locks: Preventing Race Conditions
Race conditions occur when multiple processes modify shared data simultaneously without synchronization, causing unpredictable results. Python provides synchronization primitives—Lock, RLock, Semaphore, and Event—to coordinate access. This article covers each mechanism, shows race conditions in action, demonstrates fixes, and outlines deadlock prevention strategies for production-grade multiprocessing systems.
What Is a Race Condition?
A race condition happens when two or more processes access shared data, and the outcome depends on their execution timing.
Example: Two processes increment a shared counter:
import multiprocessing
import ctypes
import time
def increment(counter):
"""UNSAFE: reads and writes without synchronization."""
for _ in range(100_000):
# Read-modify-write is NOT atomic
current = counter.value
time.sleep(0) # Yield to allow interleaving
counter.value = current + 1
if __name__ == "__main__":
counter = multiprocessing.Value(ctypes.c_int, 0)
p1 = multiprocessing.Process(target=increment, args=(counter,))
p2 = multiprocessing.Process(target=increment, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Expected: 200,000, Actual: {counter.value}")
# Actual: often ~150,000 (race condition caused lost updates)
Both processes read the counter at the same time, then write back; one increment is lost. Repeated runs yield different results—hallmark of a race condition.
Lock: Mutual Exclusion
Lock ensures only one process holds it at a time. Other processes wait.
import multiprocessing
import ctypes
def safe_increment(counter, lock):
"""SAFE: lock protects read-modify-write."""
for _ in range(100_000):
with lock:
counter.value += 1
if __name__ == "__main__":
counter = multiprocessing.Value(ctypes.c_int, 0)
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=safe_increment, args=(counter, lock))
p2 = multiprocessing.Process(target=safe_increment, args=(counter, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Result: {counter.value}") # Always 200,000
How it works:
lock.acquire()(orwith lock:) – blocks until lock is available.- Only one process holds the lock; others wait in queue.
lock.release()(or exitwithblock) – release lock; next waiter acquires it.
Lock methods:
acquire(blocking=True, timeout=None)– acquire lock; return True if successful.release()– release lock.with lock:– context manager; automatic release.
RLock: Reentrant Lock
Regular Lock causes deadlock if a process tries to acquire it twice (it's already holding it). RLock allows re-entry:
import multiprocessing
import ctypes
def outer_func(value, lock):
"""Outer function acquires lock."""
with lock:
value.value += 1
inner_func(value, lock)
def inner_func(value, lock):
"""Inner function also needs lock."""
# With Lock: DEADLOCK (process already holds lock)
# With RLock: OK (process can re-acquire)
with lock:
value.value += 1
if __name__ == "__main__":
value = multiprocessing.Value(ctypes.c_int, 0)
lock = multiprocessing.RLock() # Use RLock
p = multiprocessing.Process(target=outer_func, args=(value, lock))
p.start()
p.join()
print(f"Value: {value.value}") # 2
Use RLock when: A function holds a lock and calls another function that also needs the lock.
Use Lock otherwise: Simpler, slightly faster.
Semaphore: Limiting Concurrent Access
Semaphore(N) allows at most N processes to enter a critical section. Useful for rate-limiting or resource quotas.
import multiprocessing
import time
def limited_resource(semaphore, process_id):
"""Access a resource limited to 2 concurrent users."""
print(f"Process {process_id} waiting...")
with semaphore:
print(f"Process {process_id} entered (time: {time.time():.1f})")
time.sleep(1)
print(f"Process {process_id} exiting")
if __name__ == "__main__":
# Semaphore with initial count 2
semaphore = multiprocessing.Semaphore(2)
processes = [
multiprocessing.Process(target=limited_resource, args=(semaphore, i))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
Output:
Process 0 waiting...
Process 1 waiting...
Process 2 waiting...
Process 3 waiting...
Process 4 waiting...
Process 0 entered (time: 1234567890.1)
Process 1 entered (time: 1234567890.1)
Process 2 waiting...
Process 3 waiting...
Process 4 waiting...
Process 0 exiting
Process 2 entered (time: 1234567891.1)
Process 1 exiting
Process 3 entered (time: 1234567891.1)
... (continues)
Process 0 and 1 enter immediately. Process 2 waits. When 0 exits, 2 enters.
Event: Process Signaling
Event is a one-time signal from one process to others.
import multiprocessing
import time
def waiter(event, process_id):
"""Wait for event signal."""
print(f"Process {process_id} waiting for event...")
event.wait() # Blocks until event.set() is called
print(f"Process {process_id} received signal!")
def signaler(event, delay):
"""Signal event after delay."""
time.sleep(delay)
print("Event signaling now...")
event.set()
if __name__ == "__main__":
event = multiprocessing.Event()
waiters = [
multiprocessing.Process(target=waiter, args=(event, i))
for i in range(3)
]
signaler_process = multiprocessing.Process(target=signaler, args=(event, 2))
for p in waiters:
p.start()
signaler_process.start()
for p in waiters + [signaler_process]:
p.join()
Output:
Process 0 waiting for event...
Process 1 waiting for event...
Process 2 waiting for event...
(2 second pause)
Event signaling now...
Process 0 received signal!
Process 1 received signal!
Process 2 received signal!
Event methods:
wait(timeout=None)– block until set() or timeout.set()– signal all waiters (one-time; set remains true).clear()– reset the event (rarely used).is_set()– check if event is signaled.
Condition: Wait-Notify Synchronization
Condition combines a lock with fine-grained signaling (like Java's notify()).
import multiprocessing
import time
def producer(condition, shared_list):
"""Produce items and notify consumers."""
for i in range(5):
time.sleep(0.5)
with condition:
shared_list.append(i)
print(f"Produced: {i}")
condition.notify_all() # Wake all waiters
def consumer(condition, shared_list, consumer_id):
"""Consume items when available."""
while True:
with condition:
condition.wait_for(lambda: len(shared_list) > 0)
if shared_list:
item = shared_list.pop(0)
print(f"Consumer {consumer_id} consumed: {item}")
if item == 4: # Last item
break
if __name__ == "__main__":
manager = multiprocessing.Manager()
shared_list = manager.list()
condition = multiprocessing.Condition()
prod = multiprocessing.Process(target=producer, args=(condition, shared_list))
cons1 = multiprocessing.Process(target=consumer, args=(condition, shared_list, 1))
cons2 = multiprocessing.Process(target=consumer, args=(condition, shared_list, 2))
prod.start()
cons1.start()
cons2.start()
prod.join()
cons1.join()
cons2.join()
Deadlock: The Trap
Deadlock occurs when processes wait for locks they can never acquire. Classic scenario:
import multiprocessing
import ctypes
import time
lock_a = multiprocessing.Lock()
lock_b = multiprocessing.Lock()
def process1(val_a, val_b):
"""Acquire A, then B."""
with lock_a:
print("P1 acquired A")
time.sleep(1) # Let P2 run
with lock_b:
print("P1 acquired B")
val_a.value += 1
def process2(val_a, val_b):
"""Acquire B, then A."""
with lock_b:
print("P2 acquired B")
time.sleep(1) # Let P1 run
with lock_a:
print("P2 acquired A")
val_b.value += 1
if __name__ == "__main__":
val_a = multiprocessing.Value(ctypes.c_int, 0)
val_b = multiprocessing.Value(ctypes.c_int, 0)
p1 = multiprocessing.Process(target=process1, args=(val_a, val_b))
p2 = multiprocessing.Process(target=process2, args=(val_a, val_b))
p1.start()
p2.start()
p1.join(timeout=5) # Timeout because they're deadlocked
p2.join(timeout=5)
if p1.is_alive() or p2.is_alive():
print("DEADLOCK DETECTED!")
p1.terminate()
p2.terminate()
Deadlock trace:
- P1 acquires lock_a.
- P2 acquires lock_b.
- P1 waits for lock_b (held by P2).
- P2 waits for lock_a (held by P1).
- Both wait forever → deadlock.
Deadlock Prevention
Strategy 1: Always acquire locks in the same order (lock ordering)
def process1(val_a, val_b):
"""Always acquire A before B."""
with lock_a:
with lock_b:
val_a.value += 1
def process2(val_a, val_b):
"""Also acquire A before B."""
with lock_a:
with lock_b:
val_b.value += 1
Strategy 2: Use try_acquire with timeout and retry
def process1(val_a, val_b):
"""Try to acquire with timeout; retry if fails."""
while True:
if lock_a.acquire(timeout=1):
try:
if lock_b.acquire(timeout=1):
try:
val_a.value += 1
break
finally:
lock_b.release()
finally:
lock_a.release()
# Retry
Strategy 3: Minimize lock scope and complexity
The simpler your locking, the less likely deadlock. Prefer one lock over many.
Real-World Example: Thread-Safe Counter Service
import multiprocessing
import ctypes
class CounterService:
def __init__(self):
self.counter = multiprocessing.Value(ctypes.c_int, 0)
self.lock = multiprocessing.Lock()
def increment(self):
with self.lock:
self.counter.value += 1
return self.counter.value
def get(self):
with self.lock:
return self.counter.value
def worker(service, worker_id):
"""Increment counter 1000 times."""
for _ in range(1000):
service.increment()
print(f"Worker {worker_id} done")
if __name__ == "__main__":
service = CounterService()
workers = [
multiprocessing.Process(target=worker, args=(service, i))
for i in range(4)
]
for w in workers:
w.start()
for w in workers:
w.join()
print(f"Final counter: {service.get()}") # Always 4000
Key Takeaways
- Use
Lockfor mutual exclusion; ensures only one process modifies shared data at a time. - Use
RLockwhen a process must acquire the same lock twice (recursive functions). - Use
Semaphore(N)to limit concurrent access to N resources. - Use
Eventfor one-time signaling;Conditionfor repeated producer-consumer notifications. - Deadlock occurs when processes wait for locks they can never acquire; prevent with lock ordering.
- Always use context managers (
with lock:) to guarantee release, even on exceptions. - Test with artificial delays to expose race conditions; add
time.sleep(0)to increase chance of interleaving.
Frequently Asked Questions
What happens if a process crashes while holding a lock?
The lock is released automatically when the process exits. Acquired locks are tied to process lifetime, not just the code scope.
Can I use threading.Lock in a multiprocessing.Process?
No. threading.Lock uses OS-level thread synchronization; it doesn't work across processes. Use multiprocessing.Lock.
How do I timeout on a Lock acquisition?
Use acquire(timeout=5). If the lock isn't available after 5 seconds, return False (don't use with statement; use try-finally).
What's the performance cost of locks?
On modern hardware, lock acquisition is ~1 microsecond. If you're holding a lock for milliseconds, contention is negligible. Profile to measure impact in your workload.
Can I use locks in a Pool worker function?
Yes, pass the lock as an argument or store it globally (with caution). Pool workers are just regular processes.
Further Reading
- multiprocessing synchronization primitives documentation — official reference.
- Deadlock prevention strategies — deeper theory and prevention techniques.
- Shared memory and locks — practical examples with Value and Array.
- Testing for race conditions — tools and techniques for detecting concurrency bugs.