Avoiding Common Threading Pitfalls (2026)
Common threading mistakes silently corrupt data, leak resources, or cause mysterious hangs. Uncaught exceptions in threads are swallowed (not raised in the main thread), daemon threads are abruptly terminated, and shared mutable state is modified without locks. This article catalogs the pitfalls and shows you the correct patterns, saving you weeks of debugging.
I once released a service with a daemon thread that was supposed to flush data to disk before shutdown. It was terminated mid-flush, corrupting the database. A simple flag and graceful shutdown pattern would have prevented it entirely.
Pitfall 1: Uncaught Exceptions in Threads
When an exception is raised in a thread, it is caught by the thread's internal exception handler and logged to stderr—it does NOT propagate to the main thread. The thread silently exits, and the main program may continue, not realizing work was lost:
import threading
import time
def buggy_worker():
"""A worker that crashes without the main thread knowing."""
print("Worker starting")
time.sleep(1)
raise ValueError("Oops! Worker crashed")
print("This line never executes")
# Start the worker
thread = threading.Thread(target=buggy_worker)
thread.start()
# Main thread continues normally
print("Main thread continues...")
time.sleep(3)
print("Main thread done")
# Output:
# Worker starting
# Main thread continues...
# Exception in thread Thread-1:
# Traceback (most recent call last):
# ...
# ValueError: Oops! Worker crashed
# Main thread done
The exception is printed but the main thread doesn't know the worker failed. The correct pattern is to catch exceptions in the worker, store them, and check them in the main thread:
import threading
import time
class WorkerThread(threading.Thread):
"""A thread that captures exceptions."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exception = None
def run(self):
try:
super().run()
except Exception as e:
self.exception = e
def worker():
print("Worker starting")
time.sleep(1)
raise ValueError("Oops! Worker crashed")
thread = WorkerThread(target=worker)
thread.start()
thread.join()
# Check if the worker failed
if thread.exception:
print(f"Worker failed with exception: {thread.exception}")
else:
print("Worker succeeded")
# Output: Worker failed with exception: Oops! Worker crashed
For ThreadPoolExecutor, exceptions are captured in the Future and re-raised when you call result(), so you get automatic handling.
Pitfall 2: Daemon Thread Resource Leaks
Daemon threads are terminated abruptly when the main program exits. If a daemon thread is in the middle of updating a file or database, data can be corrupted:
import threading
import time
def buggy_daemon():
"""A daemon that performs important cleanup."""
while True:
# Simulate a critical operation
print("Daemon: updating important data...")
time.sleep(1)
# If the program exits here, the update is incomplete
daemon = threading.Thread(target=buggy_daemon, daemon=True)
daemon.start()
print("Main program running...")
time.sleep(2.5)
print("Main program exiting")
# Output:
# Main program running...
# Daemon: updating important data...
# Daemon: updating important data...
# Main program exiting
# (daemon is terminated mid-operation)
The correct pattern is to use non-daemon threads for critical operations and ensure graceful shutdown:
import threading
import time
class GracefulDaemon(threading.Thread):
"""A daemon that shuts down cleanly."""
def __init__(self):
super().__init__(daemon=True)
self.stop_event = threading.Event()
def run(self):
while not self.stop_event.is_set():
print("Daemon: updating important data...")
# Do work here
time.sleep(1)
# Check for stop signal
if self.stop_event.is_set():
print("Daemon: cleanup complete, exiting")
break
def stop(self):
self.stop_event.set()
daemon = GracefulDaemon()
daemon.start()
print("Main program running...")
time.sleep(2.5)
print("Main program requesting daemon stop...")
daemon.stop()
daemon.join(timeout=5) # Wait for graceful shutdown
if daemon.is_alive():
print("Warning: daemon did not shut down cleanly")
else:
print("Daemon shut down successfully")
# Output:
# Main program running...
# Daemon: updating important data...
# Daemon: updating important data...
# Main program requesting daemon stop...
# Daemon: cleanup complete, exiting
# Daemon shut down successfully
Pitfall 3: Shared Mutable State Without Locks
Modifying shared data from multiple threads without synchronization causes race conditions:
import threading
shared_list = []
def buggy_append(item):
"""Append to a shared list without synchronization."""
# This is NOT atomic; multiple threads can interleave
shared_list.append(item)
threads = [
threading.Thread(target=buggy_append, args=(i,))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Length: {len(shared_list)}") # Usually 10, but order varies
Although list.append() appears atomic, it's not thread-safe for complex operations. The correct pattern is to protect shared state with a lock:
import threading
shared_list = []
list_lock = threading.Lock()
def safe_append(item):
"""Append to a shared list with synchronization."""
with list_lock:
shared_list.append(item)
threads = [
threading.Thread(target=safe_append, args=(i,))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Length: {len(shared_list)}") # Always 10, consistent
Pitfall 4: Lock Ordering and Deadlock
Acquiring locks in different orders from different threads can cause deadlock:
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1_wrong():
with lock_a:
print("Thread 1: acquired A")
time.sleep(0.1) # Give thread 2 time to acquire B
with lock_b:
print("Thread 1: acquired A and B")
def thread_2_wrong():
with lock_b:
print("Thread 2: acquired B")
time.sleep(0.1)
with lock_a:
print("Thread 2: acquired B and A")
# This will DEADLOCK
t1 = threading.Thread(target=thread_1_wrong)
t2 = threading.Thread(target=thread_2_wrong)
t1.start()
t2.start()
# Deadlock: T1 waits for B (held by T2); T2 waits for A (held by T1)
t1.join(timeout=2)
t2.join(timeout=2)
if t1.is_alive() or t2.is_alive():
print("DEADLOCK DETECTED")
The correct pattern is to always acquire locks in the same order:
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1_correct():
with lock_a:
print("Thread 1: acquired A")
time.sleep(0.1)
with lock_b:
print("Thread 1: acquired A and B")
def thread_2_correct():
# ALWAYS acquire in the same order: A, then B
with lock_a:
print("Thread 2: acquired A")
time.sleep(0.1)
with lock_b:
print("Thread 2: acquired A and B")
t1 = threading.Thread(target=thread_1_correct)
t2 = threading.Thread(target=thread_2_correct)
t1.start()
t2.start()
t1.join(timeout=2)
t2.join(timeout=2)
if t1.is_alive() or t2.is_alive():
print("DEADLOCK DETECTED")
else:
print("No deadlock: threads completed successfully")
Pitfall 5: Incomplete Shutdown
Not properly cleaning up threads can leave them running after the program "exits":
import threading
import time
def background_worker():
while True:
print("Worker: doing work...")
time.sleep(1)
# Buggy: start a non-daemon thread and don't join it
thread = threading.Thread(target=background_worker) # daemon=False (default)
thread.start()
print("Main: leaving main()")
# The program will hang here because thread is still running
The correct pattern is to join all non-daemon threads and send stop signals to daemon threads:
import threading
import time
def background_worker(stop_event):
while not stop_event.is_set():
print("Worker: doing work...")
time.sleep(1)
stop_event = threading.Event()
thread = threading.Thread(target=background_worker, args=(stop_event,), daemon=True)
thread.start()
print("Main: running...")
time.sleep(3)
print("Main: stopping worker...")
stop_event.set()
thread.join(timeout=5)
print("Main: exiting normally")
Pitfall 6: Holding Locks During I/O
Holding a lock while performing I/O (network, file, database) blocks other threads unnecessarily:
import threading
import time
data_lock = threading.Lock()
data = {}
def buggy_update(key, value):
"""WRONG: holds lock while doing I/O."""
with data_lock:
print(f"Fetching data for {key}...")
time.sleep(2) # Simulate I/O; all other threads blocked!
data[key] = value
# Only one thread can do I/O at a time; very slow
threads = [
threading.Thread(target=buggy_update, args=(i, i*10))
for i in range(4)
]
start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = time.perf_counter() - start
print(f"Buggy: {elapsed:.1f}s (4 threads * 2s each = serial execution)")
The correct pattern is to minimize the critical section:
import threading
import time
data_lock = threading.Lock()
data = {}
def correct_update(key, value):
"""RIGHT: does I/O outside the lock."""
print(f"Fetching data for {key}...")
result = time.sleep(2) # I/O happens WITHOUT the lock
# Only hold the lock while updating shared state
with data_lock:
data[key] = value
threads = [
threading.Thread(target=correct_update, args=(i, i*10))
for i in range(4)
]
start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = time.perf_counter() - start
print(f"Correct: {elapsed:.1f}s (I/O overlaps; ~2s total)")
Comparison Table: Anti-Patterns vs. Fixes
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Uncaught exceptions | Worker failure silent | Capture in thread subclass, check exception |
| Daemon threads with cleanup | Data corruption | Use stop_event, graceful shutdown |
| No locks on shared state | Race conditions, data corruption | Use Lock or Queue |
| Lock ordering inconsistency | Deadlock | Always acquire locks in the same order |
Missing join() on threads | Program hangs | Always join() or use with ThreadPoolExecutor |
| I/O inside critical section | Contention, slow | Do I/O outside lock, then acquire for update |
Key Takeaways
- Capture exceptions in threads and check them after
join(). - Use stop signals (
Event) for graceful daemon shutdown, not abrupt termination. - Always protect shared mutable state with locks.
- Acquire locks in a consistent order to prevent deadlock.
- Minimize critical sections; do I/O outside locks.
- Always
join()non-daemon threads or useThreadPoolExecutorfor automatic cleanup.
Frequently Asked Questions
How do I know if a thread exited due to an exception?
Use a custom Thread subclass that captures exceptions, or check the exception attribute after join().
Can I force-kill a stuck thread?
No, Python doesn't provide a way to forcefully kill a thread. Use stop_event flags and timeout on join().
What's the best way to pass data from a worker thread back to the main thread?
Use a queue.Queue (thread-safe) or a protected variable with a lock. Queue is generally cleaner.
How do I test if my threading code has race conditions?
Run the same test thousands of times in a tight loop and look for inconsistent results. Use stress testing with many threads and random sleep() calls to vary timing.
Is it safe to modify a list from multiple threads?
list.append() is atomic in CPython, but more complex operations (read-modify-write) are not. Always use locks for shared mutable state unless you're sure the operation is atomic.