Skip to main content

Context Variables and Thread-Local Storage (2026)

Thread-local storage (TLS) allows each thread to maintain its own isolated copy of a variable. Context variables extend this pattern to work across threads and async tasks, enabling clean propagation of request-specific state (user ID, request ID, etc.) without passing parameters through every function call. This article covers both patterns and shows when to use each.

I once debugged a web server where database connections were being used by the wrong user due to improper state management. Thread-local storage solved it instantly—each thread got its own connection pool, eliminating cross-thread contamination. This article teaches you that pattern and more.

Thread-Local Storage: The Basics

threading.local() creates an object where each thread sees a separate value:

import threading
import time

# Shared object that looks the same to all threads
thread_local_storage = threading.local()

def worker(worker_id):
"""Each thread gets its own thread_local_storage.value."""
# Set value in this thread's storage
thread_local_storage.value = worker_id
print(f"Worker {worker_id}: set value = {thread_local_storage.value}")

time.sleep(1)

# The value is still there, isolated from other threads
print(f"Worker {worker_id}: value is still {thread_local_storage.value}")

threads = [
threading.Thread(target=worker, args=(i,))
for i in range(3)
]

for t in threads:
t.start()
for t in threads:
t.join()

# Output:
# Worker 0: set value = 0
# Worker 1: set value = 1
# Worker 2: set value = 2
# Worker 0: value is still 0
# Worker 1: value is still 1
# Worker 2: value is still 2

Each thread sees its own value attribute. If thread 1 sets value = 1 and thread 2 sets value = 2, they don't interfere with each other—each thread's value is isolated.

Use Case: Database Connections Per Thread

A classic application is maintaining a separate database connection per thread:

import threading
import time

class FakeConnection:
"""Simulate a database connection."""
def __init__(self, conn_id):
self.id = conn_id

def execute(self, query):
return f"Connection {self.id} executed: {query}"

# Thread-local storage for connections
thread_local = threading.local()
connection_counter = 0
counter_lock = threading.Lock()

def get_connection():
"""Get (or create) a per-thread connection."""
global connection_counter

# Check if this thread already has a connection
if hasattr(thread_local, 'connection'):
return thread_local.connection

# Create a new connection for this thread
with counter_lock:
connection_counter += 1
conn_id = connection_counter

conn = FakeConnection(conn_id)
thread_local.connection = conn
print(f"Thread {threading.current_thread().name}: created connection {conn_id}")
return conn

def database_task(query):
"""Execute a query using the thread's connection."""
conn = get_connection()
result = conn.execute(query)
print(result)

threads = [
threading.Thread(target=database_task, args=("SELECT * FROM users",), name=f"Worker-{i}")
for i in range(3)
]

for t in threads:
t.start()
for t in threads:
t.join()

# Output:
# Thread Worker-0: created connection 1
# Thread Worker-1: created connection 2
# Thread Worker-2: created connection 3
# Connection 1 executed: SELECT * FROM users
# Connection 2 executed: SELECT * FROM users
# Connection 3 executed: SELECT * FROM users

Each thread automatically gets its own connection, so there's no contention or cross-thread interference.

Context Variables: Advanced State Management

Context variables are similar to thread-local storage but work correctly with both threads and async tasks. They're the recommended approach in Python 3.7+:

import contextvars

# Define a context variable
request_id = contextvars.ContextVar('request_id', default=None)
user_id = contextvars.ContextVar('user_id', default=None)

def handle_request(req_id, usr_id):
"""Set context variables and call downstream code."""
token1 = request_id.set(req_id)
token2 = user_id.set(usr_id)

print(f"Request {request_id.get()}: User {user_id.get()}")

# Each piece of code can access these without parameters
log_action(f"User logged in")

# Reset (optional, for cleanup)
request_id.reset(token1)
user_id.reset(token2)

def log_action(action):
"""Nested function can access context without parameters."""
req = request_id.get()
usr = user_id.get()
print(f"[Request {req}] [User {usr}] {action}")

# In a web server, each request would set context
handle_request("req-001", "user-42")
handle_request("req-002", "user-99")

Output:

Request req-001: User user-42
[Request req-001] [User user-42] User logged in
Request req-002: User user-99
[Request req-002] [User user-99] User logged in

Context variables are cleaner than passing state through function parameters and work correctly with async code (unlike plain thread-local storage).

Context Variables with Threading

Context variables in threads work similarly to thread-local storage:

import contextvars
import threading

# Define context variables
request_id = contextvars.ContextVar('request_id')
user_id = contextvars.ContextVar('user_id')

def process_request(req_id, usr_id):
"""Each thread has its own context."""
request_id.set(req_id)
user_id.set(usr_id)

print(f"Thread {threading.current_thread().name}: "
f"Request {request_id.get()}, User {user_id.get()}")

# Call nested function; it can access context
audit_log(f"Processing started")

def audit_log(message):
"""Access context without parameters."""
print(f"[{request_id.get()}] {message}")

threads = [
threading.Thread(target=process_request, args=("req-1", "alice"), name="Worker-1"),
threading.Thread(target=process_request, args=("req-2", "bob"), name="Worker-2"),
]

for t in threads:
t.start()
for t in threads:
t.join()

Each thread maintains its own context, so request_id.get() returns the correct value in each thread.

Context Copying: Run Async Code with Parent Context

When you spawn new threads or async tasks, context is NOT automatically inherited. You must explicitly copy it:

import contextvars
import threading

request_id = contextvars.ContextVar('request_id')

def parent_code():
"""Parent thread sets a context variable."""
request_id.set("parent-request-001")

# WRONG: spawn a thread; the child won't see the context
def child_without_copy():
print(f"Child without copy: {request_id.get(default='NOT SET')}")

# RIGHT: copy context to child
def child_with_copy(ctx):
ctx.run(lambda: print(f"Child with copy: {request_id.get()}"))

child_ctx = contextvars.copy_context()

t1 = threading.Thread(target=child_without_copy)
t2 = threading.Thread(target=child_with_copy, args=(child_ctx,))

t1.start()
t2.start()
t1.join()
t2.join()

parent_code()

# Output:
# Child without copy: NOT SET
# Child with copy: parent-request-001

Use contextvars.copy_context() when spawning threads that need access to parent context.

Comparison Table: State Management Patterns

PatternScopeUse CaseThread-SafeAsync-Safe
Global variableAll threadsShared state (needs lock)No (without lock)No
threading.local()One threadPer-thread stateYesNo
contextvars.ContextVarOne thread or async taskRequest context, loggingYesYes
Function parametersOne callExplicit passingYesYes
Instance attributesObject instanceOOP stateDepends on usageDepends on usage

Advanced: Cleanup with ContextVar

Sometimes you need to clean up resources when context changes:

import contextvars
import threading

# Context variable with a default factory
request_id = contextvars.ContextVar('request_id')

def cleanup_handler(token):
"""Called when context is reset."""
print(f"Cleaning up context for {request_id.get(default='unknown')}")

def handle_request(req_id):
token = request_id.set(req_id)
print(f"Started request {req_id}")

# Do work

# Reset and trigger cleanup
request_id.reset(token)
print(f"Reset context")

handle_request("req-1")
handle_request("req-2")

Context variables are designed for clean state isolation in both threaded and async code.

Key Takeaways

  • threading.local() provides per-thread storage; each thread sees a separate value.
  • Context variables extend this pattern and work correctly with both threads and async code.
  • Use thread-local storage for per-thread resources (database connections, state machines).
  • Use context variables for request-scoped data (user ID, request ID) that needs to propagate through function calls.
  • When spawning threads, use contextvars.copy_context() to inherit parent context.
  • Context variables are the modern approach; prefer them over threading.local() for new code.

Frequently Asked Questions

What happens if I access a ContextVar that hasn't been set?

By default, it raises LookupError. You can provide a default value using the default parameter to ContextVar() or call get(default=value).

Can I share a ContextVar value between threads?

No, each thread/task sees its own value. If you want to share state, use a lock-protected variable or a queue.

How does ContextVar work with async/await?

Context variables are propagated to child tasks created with asyncio.create_task(). Each task sees the context of its parent. This is different from threads, where context is NOT inherited unless explicitly copied.

Should I use threading.local() or contextvars.ContextVar?

Prefer contextvars.ContextVar for new code. It works with both threads and async and is the recommended pattern in Python 3.7+. Use threading.local() only if you need to support older Python versions.

How do I debug context variables?

Print contextvars.copy_context() to see all variables in the current context. Or use ContextVar.set() return value (a token) to temporarily override and reset.

Further Reading