Skip to main content

Avoiding Common Threading Pitfalls (2026)

Common threading mistakes silently corrupt data, leak resources, or cause mysterious hangs. Uncaught exceptions in threads are swallowed (not raised in the main thread), daemon threads are abruptly terminated, and shared mutable state is modified without locks. This article catalogs the pitfalls and shows you the correct patterns, saving you weeks of debugging.

I once released a service with a daemon thread that was supposed to flush data to disk before shutdown. It was terminated mid-flush, corrupting the database. A simple flag and graceful shutdown pattern would have prevented it entirely.

Pitfall 1: Uncaught Exceptions in Threads

When an exception is raised in a thread, it is caught by the thread's internal exception handler and logged to stderr—it does NOT propagate to the main thread. The thread silently exits, and the main program may continue, not realizing work was lost:

import threading
import time

def buggy_worker():
"""A worker that crashes without the main thread knowing."""
print("Worker starting")
time.sleep(1)
raise ValueError("Oops! Worker crashed")
print("This line never executes")

# Start the worker
thread = threading.Thread(target=buggy_worker)
thread.start()

# Main thread continues normally
print("Main thread continues...")
time.sleep(3)
print("Main thread done")

# Output:
# Worker starting
# Main thread continues...
# Exception in thread Thread-1:
# Traceback (most recent call last):
# ...
# ValueError: Oops! Worker crashed
# Main thread done

The exception is printed but the main thread doesn't know the worker failed. The correct pattern is to catch exceptions in the worker, store them, and check them in the main thread:

import threading
import time

class WorkerThread(threading.Thread):
"""A thread that captures exceptions."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exception = None

def run(self):
try:
super().run()
except Exception as e:
self.exception = e

def worker():
print("Worker starting")
time.sleep(1)
raise ValueError("Oops! Worker crashed")

thread = WorkerThread(target=worker)
thread.start()
thread.join()

# Check if the worker failed
if thread.exception:
print(f"Worker failed with exception: {thread.exception}")
else:
print("Worker succeeded")

# Output: Worker failed with exception: Oops! Worker crashed

For ThreadPoolExecutor, exceptions are captured in the Future and re-raised when you call result(), so you get automatic handling.

Pitfall 2: Daemon Thread Resource Leaks

Daemon threads are terminated abruptly when the main program exits. If a daemon thread is in the middle of updating a file or database, data can be corrupted:

import threading
import time

def buggy_daemon():
"""A daemon that performs important cleanup."""
while True:
# Simulate a critical operation
print("Daemon: updating important data...")
time.sleep(1)
# If the program exits here, the update is incomplete

daemon = threading.Thread(target=buggy_daemon, daemon=True)
daemon.start()

print("Main program running...")
time.sleep(2.5)
print("Main program exiting")
# Output:
# Main program running...
# Daemon: updating important data...
# Daemon: updating important data...
# Main program exiting
# (daemon is terminated mid-operation)

The correct pattern is to use non-daemon threads for critical operations and ensure graceful shutdown:

import threading
import time

class GracefulDaemon(threading.Thread):
"""A daemon that shuts down cleanly."""
def __init__(self):
super().__init__(daemon=True)
self.stop_event = threading.Event()

def run(self):
while not self.stop_event.is_set():
print("Daemon: updating important data...")
# Do work here
time.sleep(1)
# Check for stop signal
if self.stop_event.is_set():
print("Daemon: cleanup complete, exiting")
break

def stop(self):
self.stop_event.set()

daemon = GracefulDaemon()
daemon.start()

print("Main program running...")
time.sleep(2.5)

print("Main program requesting daemon stop...")
daemon.stop()
daemon.join(timeout=5) # Wait for graceful shutdown

if daemon.is_alive():
print("Warning: daemon did not shut down cleanly")
else:
print("Daemon shut down successfully")

# Output:
# Main program running...
# Daemon: updating important data...
# Daemon: updating important data...
# Main program requesting daemon stop...
# Daemon: cleanup complete, exiting
# Daemon shut down successfully

Pitfall 3: Shared Mutable State Without Locks

Modifying shared data from multiple threads without synchronization causes race conditions:

import threading

shared_list = []

def buggy_append(item):
"""Append to a shared list without synchronization."""
# This is NOT atomic; multiple threads can interleave
shared_list.append(item)

threads = [
threading.Thread(target=buggy_append, args=(i,))
for i in range(10)
]

for t in threads:
t.start()
for t in threads:
t.join()

print(f"Length: {len(shared_list)}") # Usually 10, but order varies

Although list.append() appears atomic, it's not thread-safe for complex operations. The correct pattern is to protect shared state with a lock:

import threading

shared_list = []
list_lock = threading.Lock()

def safe_append(item):
"""Append to a shared list with synchronization."""
with list_lock:
shared_list.append(item)

threads = [
threading.Thread(target=safe_append, args=(i,))
for i in range(10)
]

for t in threads:
t.start()
for t in threads:
t.join()

print(f"Length: {len(shared_list)}") # Always 10, consistent

Pitfall 4: Lock Ordering and Deadlock

Acquiring locks in different orders from different threads can cause deadlock:

import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def thread_1_wrong():
with lock_a:
print("Thread 1: acquired A")
time.sleep(0.1) # Give thread 2 time to acquire B
with lock_b:
print("Thread 1: acquired A and B")

def thread_2_wrong():
with lock_b:
print("Thread 2: acquired B")
time.sleep(0.1)
with lock_a:
print("Thread 2: acquired B and A")

# This will DEADLOCK
t1 = threading.Thread(target=thread_1_wrong)
t2 = threading.Thread(target=thread_2_wrong)

t1.start()
t2.start()

# Deadlock: T1 waits for B (held by T2); T2 waits for A (held by T1)
t1.join(timeout=2)
t2.join(timeout=2)

if t1.is_alive() or t2.is_alive():
print("DEADLOCK DETECTED")

The correct pattern is to always acquire locks in the same order:

import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def thread_1_correct():
with lock_a:
print("Thread 1: acquired A")
time.sleep(0.1)
with lock_b:
print("Thread 1: acquired A and B")

def thread_2_correct():
# ALWAYS acquire in the same order: A, then B
with lock_a:
print("Thread 2: acquired A")
time.sleep(0.1)
with lock_b:
print("Thread 2: acquired A and B")

t1 = threading.Thread(target=thread_1_correct)
t2 = threading.Thread(target=thread_2_correct)

t1.start()
t2.start()

t1.join(timeout=2)
t2.join(timeout=2)

if t1.is_alive() or t2.is_alive():
print("DEADLOCK DETECTED")
else:
print("No deadlock: threads completed successfully")

Pitfall 5: Incomplete Shutdown

Not properly cleaning up threads can leave them running after the program "exits":

import threading
import time

def background_worker():
while True:
print("Worker: doing work...")
time.sleep(1)

# Buggy: start a non-daemon thread and don't join it
thread = threading.Thread(target=background_worker) # daemon=False (default)
thread.start()

print("Main: leaving main()")
# The program will hang here because thread is still running

The correct pattern is to join all non-daemon threads and send stop signals to daemon threads:

import threading
import time

def background_worker(stop_event):
while not stop_event.is_set():
print("Worker: doing work...")
time.sleep(1)

stop_event = threading.Event()
thread = threading.Thread(target=background_worker, args=(stop_event,), daemon=True)
thread.start()

print("Main: running...")
time.sleep(3)

print("Main: stopping worker...")
stop_event.set()
thread.join(timeout=5)

print("Main: exiting normally")

Pitfall 6: Holding Locks During I/O

Holding a lock while performing I/O (network, file, database) blocks other threads unnecessarily:

import threading
import time

data_lock = threading.Lock()
data = {}

def buggy_update(key, value):
"""WRONG: holds lock while doing I/O."""
with data_lock:
print(f"Fetching data for {key}...")
time.sleep(2) # Simulate I/O; all other threads blocked!
data[key] = value

# Only one thread can do I/O at a time; very slow
threads = [
threading.Thread(target=buggy_update, args=(i, i*10))
for i in range(4)
]

start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = time.perf_counter() - start
print(f"Buggy: {elapsed:.1f}s (4 threads * 2s each = serial execution)")

The correct pattern is to minimize the critical section:

import threading
import time

data_lock = threading.Lock()
data = {}

def correct_update(key, value):
"""RIGHT: does I/O outside the lock."""
print(f"Fetching data for {key}...")
result = time.sleep(2) # I/O happens WITHOUT the lock

# Only hold the lock while updating shared state
with data_lock:
data[key] = value

threads = [
threading.Thread(target=correct_update, args=(i, i*10))
for i in range(4)
]

start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = time.perf_counter() - start
print(f"Correct: {elapsed:.1f}s (I/O overlaps; ~2s total)")

Comparison Table: Anti-Patterns vs. Fixes

Anti-PatternProblemFix
Uncaught exceptionsWorker failure silentCapture in thread subclass, check exception
Daemon threads with cleanupData corruptionUse stop_event, graceful shutdown
No locks on shared stateRace conditions, data corruptionUse Lock or Queue
Lock ordering inconsistencyDeadlockAlways acquire locks in the same order
Missing join() on threadsProgram hangsAlways join() or use with ThreadPoolExecutor
I/O inside critical sectionContention, slowDo I/O outside lock, then acquire for update

Key Takeaways

  • Capture exceptions in threads and check them after join().
  • Use stop signals (Event) for graceful daemon shutdown, not abrupt termination.
  • Always protect shared mutable state with locks.
  • Acquire locks in a consistent order to prevent deadlock.
  • Minimize critical sections; do I/O outside locks.
  • Always join() non-daemon threads or use ThreadPoolExecutor for automatic cleanup.

Frequently Asked Questions

How do I know if a thread exited due to an exception?

Use a custom Thread subclass that captures exceptions, or check the exception attribute after join().

Can I force-kill a stuck thread?

No, Python doesn't provide a way to forcefully kill a thread. Use stop_event flags and timeout on join().

What's the best way to pass data from a worker thread back to the main thread?

Use a queue.Queue (thread-safe) or a protected variable with a lock. Queue is generally cleaner.

How do I test if my threading code has race conditions?

Run the same test thousands of times in a tight loop and look for inconsistent results. Use stress testing with many threads and random sleep() calls to vary timing.

Is it safe to modify a list from multiple threads?

list.append() is atomic in CPython, but more complex operations (read-modify-write) are not. Always use locks for shared mutable state unless you're sure the operation is atomic.

Further Reading