ThreadPoolExecutor: Parallel Task Execution (2026)
ThreadPoolExecutor from the concurrent.futures module manages a pool of worker threads and distributes tasks to them. Instead of manually creating threads and managing queues, you submit tasks to the executor and get back Future objects representing eventual results. This pattern is ideal for parallel I/O-bound workloads: fetching multiple URLs, processing files, or querying databases concurrently.
In a previous role, I refactored a system that manually spawned 20 threads per request, causing resource exhaustion. Switching to a fixed-size ThreadPoolExecutor with 8 workers eliminated thread creation overhead and resource leaks—the code became simpler and faster. This article covers every pattern you need to use ThreadPoolExecutor correctly.
Basic ThreadPoolExecutor Usage
The simplest pattern is to submit individual tasks and wait for results:
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_url(url):
"""Simulate fetching a URL and returning the response size."""
time.sleep(1) # Simulate network latency
return len(url) * 100 # Fake: URL length * 100 as "response size"
# Create a thread pool with 5 workers
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit multiple tasks
urls = ["https://example.com/page1", "https://example.com/page2", "https://example.com/page3"]
futures = [executor.submit(fetch_url, url) for url in urls]
# Collect results as they complete
for future in futures:
result = future.result() # Blocks until the task finishes
print(f"Response size: {result}")
print("All downloads complete")
Key points:
ThreadPoolExecutor(max_workers=N)creates a pool with N worker threads.executor.submit(function, *args, **kwargs)returns aFutureobject immediately (non-blocking).future.result()blocks until the task finishes and returns the result. If the task raised an exception,result()re-raises it.- Use
withto ensure the executor shuts down cleanly, waiting for all tasks.
Pattern 1: Map for Parallel Processing
For applying the same function to many inputs, executor.map() is cleaner than manual submit:
from concurrent.futures import ThreadPoolExecutor
import time
def process_item(item):
"""Process a single item and return the result."""
time.sleep(0.5)
return item * 2
items = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(process_item, items)
# results is an iterator; we can consume it lazily
for result in results:
print(f"Processed: {result}")
executor.map() returns results in the same order as the input, even if tasks complete out of order. This is useful for bulk processing where order matters.
Pattern 2: As Completed for Results in Completion Order
When you want results as soon as they're available (not in submission order), use as_completed():
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def download_file(filename):
"""Simulate downloading a file with random latency."""
latency = random.uniform(0.5, 3)
time.sleep(latency)
return f"{filename} downloaded in {latency:.2f}s"
files = ["file1.zip", "file2.pdf", "file3.iso", "file4.tar.gz"]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(download_file, f): f for f in files}
# Process results as they complete, not in order
for future in as_completed(futures):
filename = futures[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"{filename} failed: {e}")
as_completed() yields futures as soon as they finish. This is faster than waiting for each future in order, especially when task duration varies widely.
Pattern 3: Timeout and Error Handling
Always wrap future.result() in error handling and consider timeouts:
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
import time
def flaky_task(task_id):
"""A task that might fail or hang."""
time.sleep(2)
if task_id == 2:
raise ValueError("Task 2 encountered an error")
return f"Task {task_id} result"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(flaky_task, i) for i in range(4)]
for i, future in enumerate(futures):
try:
# Wait up to 3 seconds for result
result = future.result(timeout=3)
print(result)
except FutureTimeoutError:
print(f"Task {i} timed out")
except Exception as e:
print(f"Task {i} failed: {e}")
future.result(timeout=seconds) raises TimeoutError if the task doesn't finish within the timeout. Always catch and handle exceptions from submitted tasks.
Pattern 4: Shutdown and Resource Cleanup
ThreadPoolExecutor maintains threads even after all tasks complete. Use shutdown() to clean up:
from concurrent.futures import ThreadPoolExecutor
def task(x):
return x * 2
executor = ThreadPoolExecutor(max_workers=4)
# Submit tasks
futures = [executor.submit(task, i) for i in range(10)]
# Wait for all to complete
executor.shutdown(wait=True) # Blocks until all tasks finish
# After shutdown(), no new tasks can be submitted
try:
executor.submit(task, 100)
except RuntimeError as e:
print(f"Cannot submit after shutdown: {e}")
shutdown(wait=True) is the default when using with. Always shut down explicitly or use with to ensure threads are released.
Comparison Table: Task Submission Patterns
| Pattern | Best For | Order | Result Handling |
|---|---|---|---|
submit() + result() | Single tasks, custom logic | Manual control | One result at a time |
executor.map() | Bulk processing, preserves order | Input order | Lazy iteration |
as_completed() | Variable latency, progressive results | Completion order | Iterate as available |
wait() with FIRST_COMPLETED | Race conditions, first-to-finish | First done | Custom completion handling |
Advanced: max_workers and System Resources
The optimal max_workers depends on your workload:
from concurrent.futures import ThreadPoolExecutor
import os
import time
# For I/O-bound work, use more threads than CPU count
# Each blocked thread uses little CPU, so we can have many waiting on I/O
cpu_count = os.cpu_count()
io_bound_workers = cpu_count * 4 # 4 threads per core for I/O
# For CPU-bound work, use roughly the CPU count (limited by GIL)
cpu_bound_workers = cpu_count
# Create with adaptive sizing
with ThreadPoolExecutor(max_workers=io_bound_workers) as executor:
# Your tasks here
pass
I/O-bound workloads benefit from more threads (each thread can block independently). CPU-bound workloads are limited by the GIL, so extra threads don't help.
Complete Example: Parallel Web Scraping
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def scrape_page(page_id):
"""Simulate scraping a web page."""
latency = random.uniform(0.5, 2)
time.sleep(latency)
if random.random() < 0.1:
raise Exception(f"Page {page_id} scrape failed")
return {"page": page_id, "items": random.randint(1, 100)}
pages = list(range(1, 21)) # 20 pages to scrape
results = []
errors = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(scrape_page, page): page for page in pages}
completed = 0
for future in as_completed(futures):
page = futures[future]
try:
result = future.result(timeout=5)
results.append(result)
completed += 1
print(f"[{completed}/{len(pages)}] Scraped page {page}: {result['items']} items")
except Exception as e:
errors.append((page, str(e)))
print(f"Error scraping page {page}: {e}")
print(f"\nSummary: {len(results)} successful, {len(errors)} failed")
total_items = sum(r["items"] for r in results)
print(f"Total items scraped: {total_items}")
This example demonstrates:
- Creating a thread pool for I/O-bound work.
- Progress tracking with counted completion.
- Error handling per task.
- Aggregating results across all tasks.
Key Takeaways
- ThreadPoolExecutor eliminates manual thread creation and management.
- Use
submit()for individual tasks,map()for bulk processing, andas_completed()for streaming results. - Set
max_workersbased on workload: 4–8 per CPU for I/O, equal to CPU count for CPU-bound. - Always handle exceptions from
future.result()and use timeouts to avoid indefinite waits. - The
withstatement ensures proper shutdown; never forget to clean up the executor.
Frequently Asked Questions
How do I cancel a running task?
Use future.cancel() before it starts. If the task is already running, cancel() returns False and the task continues. There's no way to forcefully interrupt a running thread in Python.
Can I submit tasks from within a task?
Yes, but be careful: submitting nested tasks can cause deadlock if all worker threads are blocked waiting for child tasks to complete while the child tasks are queued behind other work.
What's the difference between ThreadPoolExecutor and ProcessPoolExecutor?
ThreadPoolExecutor uses threads (shared memory, affected by GIL); ProcessPoolExecutor uses processes (separate memory, true parallelism for CPU-bound work but higher overhead). Use ProcessPoolExecutor for CPU-bound workloads and ThreadPoolExecutor for I/O-bound.
Can I reuse an executor across multiple requests?
Yes, that's the whole idea. Create a single executor at application startup and use it throughout the lifetime of your application. This amortizes the cost of thread creation.
What happens if I don't call shutdown()?
The executor remains alive holding thread resources. When the executor is garbage-collected, it calls shutdown(wait=False), abruptly stopping threads. Always use with or explicit shutdown().