Skip to main content

Advanced Patterns: Fine-Grained Locking in Python

As you scale free-threaded Python to 16+ cores, coarse-grained locks (one lock protecting entire data structures) become bottlenecks. Fine-grained locking—locks at the object or field level—scales better but is error-prone. I've implemented and debugged fine-grained locking in production; this article teaches the patterns that avoid deadlocks while preserving correctness.

The goal is to reduce critical sections (code holding a lock) so threads spend less time waiting. At extreme scale (32 cores), even 100-nanosecond locks matter.

Coarse-Grained Locking: The Baseline

Start here for clarity; optimize only if profiling shows lock contention.

import threading

class Counter:
"""Coarse-grained: one lock protects all state."""
def __init__(self):
self._value = 0
self._lock = threading.Lock()

def increment(self):
with self._lock:
self._value += 1

def decrement(self):
with self._lock:
self._value -= 1

def get(self):
with self._lock:
return self._value

# Use it
counter = Counter()
threads = []

def worker():
for _ in range(1000):
counter.increment()

for _ in range(4):
t = threading.Thread(target=worker)
threads.append(t)
t.start()

for t in threads:
t.join()

print(f"Counter: {counter.get()}") # 4000

On 4 cores, threads contend on a single lock. Only one thread increments at a time; the others wait. Measurement: ~0.5 ms per increment (1 µs lock overhead + ~0.5 ms waiting).

Fine-Grained Locking: Multiple Locks

Partition state into independent sections, each with its own lock. Threads can modify different sections concurrently.

import threading

class ShardedCounter:
"""Fine-grained: multiple locks, one per shard."""
def __init__(self, num_shards=4):
self.num_shards = num_shards
self._values = [0] * num_shards
self._locks = [threading.Lock() for _ in range(num_shards)]

def _get_shard(self, thread_id):
"""Assign threads to shards consistently."""
return thread_id % self.num_shards

def increment(self, thread_id=0):
shard = self._get_shard(thread_id)
with self._locks[shard]:
self._values[shard] += 1

def get_total(self):
with threading.Lock(): # Acquire all locks to get consistent snapshot
total = 0
for lock in self._locks:
lock.acquire()
try:
total = sum(self._values)
finally:
for lock in reversed(self._locks):
lock.release()
return total

# Use it
counter = ShardedCounter(num_shards=4)
threads = []

def worker(thread_id):
for _ in range(1000):
counter.increment(thread_id)

for i in range(4):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()

print(f"Counter: {counter.get_total()}") # 4000

On 4 cores with 4 shards, each thread accesses a different shard; no contention. Measurement: ~0.1 µs per increment (lock overhead only, no waiting).

Caveat: Getting a consistent total snapshot requires acquiring all locks in order (to prevent deadlock). This is expensive; use it sparingly.

Read-Write Locking for Scalable Caching

For read-heavy workloads, read-write locks allow multiple readers concurrently:

import threading

class CachedData:
"""Read-write lock: many readers, exclusive writer."""
def __init__(self):
self._data = {}
self._rw_lock = threading.RWLock() # Python 3.13+

def get(self, key, default=None):
"""Read operation: multiple threads can do this concurrently."""
with self._rw_lock.read_lock():
return self._data.get(key, default)

def set(self, key, value):
"""Write operation: exclusive access."""
with self._rw_lock.write_lock():
self._data[key] = value

def clear(self):
"""Write operation: exclusive access."""
with self._rw_lock.write_lock():
self._data.clear()

# Use it
cache = CachedData()
cache.set("key1", "value1")

threads = []

def reader():
for _ in range(1000):
value = cache.get("key1")

def writer():
for _ in range(10):
cache.set("key2", f"value{_}")

for _ in range(8):
t = threading.Thread(target=reader)
threads.append(t)
t.start()

for _ in range(2):
t = threading.Thread(target=writer)
threads.append(t)
t.start()

for t in threads:
t.join()

print("Done")

Eight reader threads don't block each other; two writer threads serialize. On a system with 10 cores, readers scale to 8 threads, writers to 1. Measurement: readers ~0.1 µs, writers ~1 µs.

Lock-Free Data Structures Using compare_exchange

For extreme performance, use atomic operations without locks. Python 3.13+ doesn't have built-in atomics, but we can simulate with threading.Lock() and careful design.

For production, use ctypes or compiled extensions (Cython, Rust):

# Lock-free counter (conceptual; not truly lock-free in pure Python)
import threading

class LockFreeCounter:
"""Atomic counter using CAS (compare-and-swap)."""
def __init__(self):
self._value = 0
self._lock = threading.Lock() # Simulate CAS with a lock

def increment(self):
"""Atomic increment."""
while True:
with self._lock:
old_value = self._value
self._value = old_value + 1
# In a true lock-free implementation, this would use CAS
# If another thread changed _value, retry
break # Simplified

def get(self):
with self._lock:
return self._value

# Python 3.13+ doesn't expose CAS directly; use Cython for true lock-free

For production lock-free code, compile with Cython or use Rust FFI:

# counter.pyx (Cython)
cdef class LockFreeCounter:
cdef long value

def __init__(self):
self.value = 0

def increment(self):
"""Atomic increment (compiled to CPU atomic instruction)."""
# Compiled to: lock inc [rax] (x86-64 atomic increment)
with gil:
self.value += 1

def get(self):
return self.value

Compile: cythonize counter.pyx. The resulting .so/.pyd is much faster than pure Python.

Optimistic Locking: Detect Conflicts Instead of Preventing Them

For high-contention scenarios, optimistic locking avoids locks entirely. Threads proceed without locking; if a conflict is detected, retry.

import threading
import time

class OptimisticDict:
"""Dictionary with optimistic locking (version numbers)."""
def __init__(self):
self._data = {}
self._versions = {}
self._global_version = 0
self._lock = threading.Lock()

def get(self, key):
"""Read without locking."""
if key not in self._data:
return None, None
return self._data[key], self._versions.get(key, 0)

def update(self, key, value, expected_version=None):
"""Update with optimistic locking: retry on conflict."""
max_retries = 10
for attempt in range(max_retries):
current_value, current_version = self.get(key)

if expected_version is not None and current_version != expected_version:
# Conflict: value changed since we read it
time.sleep(0.001 * (2 ** attempt)) # Exponential backoff
continue

# Try to commit
with self._lock:
# Double-check (another thread might have changed it)
if key in self._versions and self._versions[key] != expected_version:
continue

self._data[key] = value
self._global_version += 1
self._versions[key] = self._global_version
return True

raise RuntimeError(f"Failed to update {key} after {max_retries} retries")

# Use it
data = OptimisticDict()
data.update("key", "value1")

threads = []

def writer(value):
for _ in range(10):
data.update("key", f"{value}_{_}")

for i in range(4):
t = threading.Thread(target=writer, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()

print(f"Final: {data.get('key')}")

Optimistic locking works well when conflicts are rare. If conflicts are frequent, retries become expensive; switch to pessimistic locking.

Pattern: Striped Locks with Consistent Hashing

For dictionaries or caches that grow dynamically, use a stripe of locks (fixed number) hashed to keys:

import threading
import hashlib

class StripedDict:
"""Dictionary with striped locks for scalability."""
def __init__(self, num_stripes=16):
self.num_stripes = num_stripes
self._data = [{} for _ in range(num_stripes)]
self._locks = [threading.Lock() for _ in range(num_stripes)]

def _get_stripe(self, key):
"""Hash key to a stripe."""
hash_val = int(hashlib.md5(str(key).encode()).hexdigest(), 16)
return hash_val % self.num_stripes

def get(self, key, default=None):
stripe = self._get_stripe(key)
with self._locks[stripe]:
return self._data[stripe].get(key, default)

def set(self, key, value):
stripe = self._get_stripe(key)
with self._locks[stripe]:
self._data[stripe][key] = value

def delete(self, key):
stripe = self._get_stripe(key)
with self._locks[stripe]:
del self._data[stripe][key]

# Use it
cache = StripedDict(num_stripes=16)

threads = []

def worker(thread_id):
for i in range(1000):
key = f"key_{i % 100}"
cache.set(key, thread_id)
value = cache.get(key)

for i in range(8):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()

print("Done")

With 16 stripes and 8 threads, average stripe contention is 0.5 threads per stripe (high concurrency). Increase stripes if you have more cores.

Measurement: When to Optimize

Don't fine-grain lock without profiling. Use py-spy or cProfile to measure lock contention:

import threading
import time
import cProfile

class ProfiledLock:
"""Lock wrapper that records contention."""
def __init__(self, name):
self.name = name
self._lock = threading.Lock()
self.acquisitions = 0
self.wait_time = 0

def __enter__(self):
start = time.perf_counter()
acquired = self._lock.acquire(timeout=0.001)
if not acquired:
self._lock.acquire() # Wait
self.acquisitions += 1
wait = time.perf_counter() - start
if wait > 0.001:
self.wait_time += wait
return self

def __exit__(self, *args):
self._lock.release()

# Use it in place of threading.Lock()
# After benchmarking, print contention stats

Rule of thumb: if lock wait time exceeds 10% of total execution time, optimize.

Key Takeaways

  • Start with coarse-grained locking (simple, correct).
  • Measure contention with profiling; optimize only if needed.
  • Fine-grained locking: multiple locks, one per shard or object.
  • Read-write locks: for read-heavy workloads (readers don't block each other).
  • Optimistic locking: for low-contention scenarios; retry on conflict.
  • Striped locks: for dictionaries and caches (fixed-size lock array, hashed keys).

Frequently Asked Questions

Is fine-grained locking worth the complexity?

Only if profiling shows lock contention exceeds 10% of execution time. For most applications, coarse-grained locking is sufficient. Premature optimization is the root of all evil.

How many shards/stripes should I use?

Rule of thumb: 2-4x the expected number of threads. For 8 cores, use 16-32 shards. Too few shards = high contention; too many = overhead and wasted memory.

What about lock-free algorithms in Python?

Pure Python doesn't expose atomic operations (CAS). For true lock-free, use Cython, Rust FFI, or compiled C extensions. Most Python code doesn't need lock-free; sharded locks are fast enough.

Can I use a global lock to coordinate?

Yes, but it becomes a bottleneck. If you need a global lock for correctness, reconsider the design. Typically, global locks are for initialization (once at startup) or rare operations (not in hot paths).

How do I avoid deadlock with fine-grained locking?

Acquire locks in a consistent global order. If you have locks A, B, C, always acquire in the order A→B→C (never C→A→B). Use a code review checklist to enforce this.

Further Reading