Advanced Patterns: Fine-Grained Locking in Python
As you scale free-threaded Python to 16+ cores, coarse-grained locks (one lock protecting entire data structures) become bottlenecks. Fine-grained locking—locks at the object or field level—scales better but is error-prone. I've implemented and debugged fine-grained locking in production; this article teaches the patterns that avoid deadlocks while preserving correctness.
The goal is to reduce critical sections (code holding a lock) so threads spend less time waiting. At extreme scale (32 cores), even 100-nanosecond locks matter.
Coarse-Grained Locking: The Baseline
Start here for clarity; optimize only if profiling shows lock contention.
import threading
class Counter:
"""Coarse-grained: one lock protects all state."""
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
def get(self):
with self._lock:
return self._value
# Use it
counter = Counter()
threads = []
def worker():
for _ in range(1000):
counter.increment()
for _ in range(4):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Counter: {counter.get()}") # 4000
On 4 cores, threads contend on a single lock. Only one thread increments at a time; the others wait. Measurement: ~0.5 ms per increment (1 µs lock overhead + ~0.5 ms waiting).
Fine-Grained Locking: Multiple Locks
Partition state into independent sections, each with its own lock. Threads can modify different sections concurrently.
import threading
class ShardedCounter:
"""Fine-grained: multiple locks, one per shard."""
def __init__(self, num_shards=4):
self.num_shards = num_shards
self._values = [0] * num_shards
self._locks = [threading.Lock() for _ in range(num_shards)]
def _get_shard(self, thread_id):
"""Assign threads to shards consistently."""
return thread_id % self.num_shards
def increment(self, thread_id=0):
shard = self._get_shard(thread_id)
with self._locks[shard]:
self._values[shard] += 1
def get_total(self):
with threading.Lock(): # Acquire all locks to get consistent snapshot
total = 0
for lock in self._locks:
lock.acquire()
try:
total = sum(self._values)
finally:
for lock in reversed(self._locks):
lock.release()
return total
# Use it
counter = ShardedCounter(num_shards=4)
threads = []
def worker(thread_id):
for _ in range(1000):
counter.increment(thread_id)
for i in range(4):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Counter: {counter.get_total()}") # 4000
On 4 cores with 4 shards, each thread accesses a different shard; no contention. Measurement: ~0.1 µs per increment (lock overhead only, no waiting).
Caveat: Getting a consistent total snapshot requires acquiring all locks in order (to prevent deadlock). This is expensive; use it sparingly.
Read-Write Locking for Scalable Caching
For read-heavy workloads, read-write locks allow multiple readers concurrently:
import threading
class CachedData:
"""Read-write lock: many readers, exclusive writer."""
def __init__(self):
self._data = {}
self._rw_lock = threading.RWLock() # Python 3.13+
def get(self, key, default=None):
"""Read operation: multiple threads can do this concurrently."""
with self._rw_lock.read_lock():
return self._data.get(key, default)
def set(self, key, value):
"""Write operation: exclusive access."""
with self._rw_lock.write_lock():
self._data[key] = value
def clear(self):
"""Write operation: exclusive access."""
with self._rw_lock.write_lock():
self._data.clear()
# Use it
cache = CachedData()
cache.set("key1", "value1")
threads = []
def reader():
for _ in range(1000):
value = cache.get("key1")
def writer():
for _ in range(10):
cache.set("key2", f"value{_}")
for _ in range(8):
t = threading.Thread(target=reader)
threads.append(t)
t.start()
for _ in range(2):
t = threading.Thread(target=writer)
threads.append(t)
t.start()
for t in threads:
t.join()
print("Done")
Eight reader threads don't block each other; two writer threads serialize. On a system with 10 cores, readers scale to 8 threads, writers to 1. Measurement: readers ~0.1 µs, writers ~1 µs.
Lock-Free Data Structures Using compare_exchange
For extreme performance, use atomic operations without locks. Python 3.13+ doesn't have built-in atomics, but we can simulate with threading.Lock() and careful design.
For production, use ctypes or compiled extensions (Cython, Rust):
# Lock-free counter (conceptual; not truly lock-free in pure Python)
import threading
class LockFreeCounter:
"""Atomic counter using CAS (compare-and-swap)."""
def __init__(self):
self._value = 0
self._lock = threading.Lock() # Simulate CAS with a lock
def increment(self):
"""Atomic increment."""
while True:
with self._lock:
old_value = self._value
self._value = old_value + 1
# In a true lock-free implementation, this would use CAS
# If another thread changed _value, retry
break # Simplified
def get(self):
with self._lock:
return self._value
# Python 3.13+ doesn't expose CAS directly; use Cython for true lock-free
For production lock-free code, compile with Cython or use Rust FFI:
# counter.pyx (Cython)
cdef class LockFreeCounter:
cdef long value
def __init__(self):
self.value = 0
def increment(self):
"""Atomic increment (compiled to CPU atomic instruction)."""
# Compiled to: lock inc [rax] (x86-64 atomic increment)
with gil:
self.value += 1
def get(self):
return self.value
Compile: cythonize counter.pyx. The resulting .so/.pyd is much faster than pure Python.
Optimistic Locking: Detect Conflicts Instead of Preventing Them
For high-contention scenarios, optimistic locking avoids locks entirely. Threads proceed without locking; if a conflict is detected, retry.
import threading
import time
class OptimisticDict:
"""Dictionary with optimistic locking (version numbers)."""
def __init__(self):
self._data = {}
self._versions = {}
self._global_version = 0
self._lock = threading.Lock()
def get(self, key):
"""Read without locking."""
if key not in self._data:
return None, None
return self._data[key], self._versions.get(key, 0)
def update(self, key, value, expected_version=None):
"""Update with optimistic locking: retry on conflict."""
max_retries = 10
for attempt in range(max_retries):
current_value, current_version = self.get(key)
if expected_version is not None and current_version != expected_version:
# Conflict: value changed since we read it
time.sleep(0.001 * (2 ** attempt)) # Exponential backoff
continue
# Try to commit
with self._lock:
# Double-check (another thread might have changed it)
if key in self._versions and self._versions[key] != expected_version:
continue
self._data[key] = value
self._global_version += 1
self._versions[key] = self._global_version
return True
raise RuntimeError(f"Failed to update {key} after {max_retries} retries")
# Use it
data = OptimisticDict()
data.update("key", "value1")
threads = []
def writer(value):
for _ in range(10):
data.update("key", f"{value}_{_}")
for i in range(4):
t = threading.Thread(target=writer, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final: {data.get('key')}")
Optimistic locking works well when conflicts are rare. If conflicts are frequent, retries become expensive; switch to pessimistic locking.
Pattern: Striped Locks with Consistent Hashing
For dictionaries or caches that grow dynamically, use a stripe of locks (fixed number) hashed to keys:
import threading
import hashlib
class StripedDict:
"""Dictionary with striped locks for scalability."""
def __init__(self, num_stripes=16):
self.num_stripes = num_stripes
self._data = [{} for _ in range(num_stripes)]
self._locks = [threading.Lock() for _ in range(num_stripes)]
def _get_stripe(self, key):
"""Hash key to a stripe."""
hash_val = int(hashlib.md5(str(key).encode()).hexdigest(), 16)
return hash_val % self.num_stripes
def get(self, key, default=None):
stripe = self._get_stripe(key)
with self._locks[stripe]:
return self._data[stripe].get(key, default)
def set(self, key, value):
stripe = self._get_stripe(key)
with self._locks[stripe]:
self._data[stripe][key] = value
def delete(self, key):
stripe = self._get_stripe(key)
with self._locks[stripe]:
del self._data[stripe][key]
# Use it
cache = StripedDict(num_stripes=16)
threads = []
def worker(thread_id):
for i in range(1000):
key = f"key_{i % 100}"
cache.set(key, thread_id)
value = cache.get(key)
for i in range(8):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("Done")
With 16 stripes and 8 threads, average stripe contention is 0.5 threads per stripe (high concurrency). Increase stripes if you have more cores.
Measurement: When to Optimize
Don't fine-grain lock without profiling. Use py-spy or cProfile to measure lock contention:
import threading
import time
import cProfile
class ProfiledLock:
"""Lock wrapper that records contention."""
def __init__(self, name):
self.name = name
self._lock = threading.Lock()
self.acquisitions = 0
self.wait_time = 0
def __enter__(self):
start = time.perf_counter()
acquired = self._lock.acquire(timeout=0.001)
if not acquired:
self._lock.acquire() # Wait
self.acquisitions += 1
wait = time.perf_counter() - start
if wait > 0.001:
self.wait_time += wait
return self
def __exit__(self, *args):
self._lock.release()
# Use it in place of threading.Lock()
# After benchmarking, print contention stats
Rule of thumb: if lock wait time exceeds 10% of total execution time, optimize.
Key Takeaways
- Start with coarse-grained locking (simple, correct).
- Measure contention with profiling; optimize only if needed.
- Fine-grained locking: multiple locks, one per shard or object.
- Read-write locks: for read-heavy workloads (readers don't block each other).
- Optimistic locking: for low-contention scenarios; retry on conflict.
- Striped locks: for dictionaries and caches (fixed-size lock array, hashed keys).
Frequently Asked Questions
Is fine-grained locking worth the complexity?
Only if profiling shows lock contention exceeds 10% of execution time. For most applications, coarse-grained locking is sufficient. Premature optimization is the root of all evil.
How many shards/stripes should I use?
Rule of thumb: 2-4x the expected number of threads. For 8 cores, use 16-32 shards. Too few shards = high contention; too many = overhead and wasted memory.
What about lock-free algorithms in Python?
Pure Python doesn't expose atomic operations (CAS). For true lock-free, use Cython, Rust FFI, or compiled C extensions. Most Python code doesn't need lock-free; sharded locks are fast enough.
Can I use a global lock to coordinate?
Yes, but it becomes a bottleneck. If you need a global lock for correctness, reconsider the design. Typically, global locks are for initialization (once at startup) or rare operations (not in hot paths).
How do I avoid deadlock with fine-grained locking?
Acquire locks in a consistent global order. If you have locks A, B, C, always acquire in the order A→B→C (never C→A→B). Use a code review checklist to enforce this.