Skip to main content

ThreadPoolExecutor: Parallel Task Execution (2026)

ThreadPoolExecutor from the concurrent.futures module manages a pool of worker threads and distributes tasks to them. Instead of manually creating threads and managing queues, you submit tasks to the executor and get back Future objects representing eventual results. This pattern is ideal for parallel I/O-bound workloads: fetching multiple URLs, processing files, or querying databases concurrently.

In a previous role, I refactored a system that manually spawned 20 threads per request, causing resource exhaustion. Switching to a fixed-size ThreadPoolExecutor with 8 workers eliminated thread creation overhead and resource leaks—the code became simpler and faster. This article covers every pattern you need to use ThreadPoolExecutor correctly.

Basic ThreadPoolExecutor Usage

The simplest pattern is to submit individual tasks and wait for results:

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_url(url):
"""Simulate fetching a URL and returning the response size."""
time.sleep(1) # Simulate network latency
return len(url) * 100 # Fake: URL length * 100 as "response size"

# Create a thread pool with 5 workers
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit multiple tasks
urls = ["https://example.com/page1", "https://example.com/page2", "https://example.com/page3"]
futures = [executor.submit(fetch_url, url) for url in urls]

# Collect results as they complete
for future in futures:
result = future.result() # Blocks until the task finishes
print(f"Response size: {result}")

print("All downloads complete")

Key points:

  • ThreadPoolExecutor(max_workers=N) creates a pool with N worker threads.
  • executor.submit(function, *args, **kwargs) returns a Future object immediately (non-blocking).
  • future.result() blocks until the task finishes and returns the result. If the task raised an exception, result() re-raises it.
  • Use with to ensure the executor shuts down cleanly, waiting for all tasks.

Pattern 1: Map for Parallel Processing

For applying the same function to many inputs, executor.map() is cleaner than manual submit:

from concurrent.futures import ThreadPoolExecutor
import time

def process_item(item):
"""Process a single item and return the result."""
time.sleep(0.5)
return item * 2

items = [1, 2, 3, 4, 5]

with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(process_item, items)
# results is an iterator; we can consume it lazily
for result in results:
print(f"Processed: {result}")

executor.map() returns results in the same order as the input, even if tasks complete out of order. This is useful for bulk processing where order matters.

Pattern 2: As Completed for Results in Completion Order

When you want results as soon as they're available (not in submission order), use as_completed():

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def download_file(filename):
"""Simulate downloading a file with random latency."""
latency = random.uniform(0.5, 3)
time.sleep(latency)
return f"{filename} downloaded in {latency:.2f}s"

files = ["file1.zip", "file2.pdf", "file3.iso", "file4.tar.gz"]

with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(download_file, f): f for f in files}

# Process results as they complete, not in order
for future in as_completed(futures):
filename = futures[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"{filename} failed: {e}")

as_completed() yields futures as soon as they finish. This is faster than waiting for each future in order, especially when task duration varies widely.

Pattern 3: Timeout and Error Handling

Always wrap future.result() in error handling and consider timeouts:

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
import time

def flaky_task(task_id):
"""A task that might fail or hang."""
time.sleep(2)
if task_id == 2:
raise ValueError("Task 2 encountered an error")
return f"Task {task_id} result"

with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(flaky_task, i) for i in range(4)]

for i, future in enumerate(futures):
try:
# Wait up to 3 seconds for result
result = future.result(timeout=3)
print(result)
except FutureTimeoutError:
print(f"Task {i} timed out")
except Exception as e:
print(f"Task {i} failed: {e}")

future.result(timeout=seconds) raises TimeoutError if the task doesn't finish within the timeout. Always catch and handle exceptions from submitted tasks.

Pattern 4: Shutdown and Resource Cleanup

ThreadPoolExecutor maintains threads even after all tasks complete. Use shutdown() to clean up:

from concurrent.futures import ThreadPoolExecutor

def task(x):
return x * 2

executor = ThreadPoolExecutor(max_workers=4)

# Submit tasks
futures = [executor.submit(task, i) for i in range(10)]

# Wait for all to complete
executor.shutdown(wait=True) # Blocks until all tasks finish

# After shutdown(), no new tasks can be submitted
try:
executor.submit(task, 100)
except RuntimeError as e:
print(f"Cannot submit after shutdown: {e}")

shutdown(wait=True) is the default when using with. Always shut down explicitly or use with to ensure threads are released.

Comparison Table: Task Submission Patterns

PatternBest ForOrderResult Handling
submit() + result()Single tasks, custom logicManual controlOne result at a time
executor.map()Bulk processing, preserves orderInput orderLazy iteration
as_completed()Variable latency, progressive resultsCompletion orderIterate as available
wait() with FIRST_COMPLETEDRace conditions, first-to-finishFirst doneCustom completion handling

Advanced: max_workers and System Resources

The optimal max_workers depends on your workload:

from concurrent.futures import ThreadPoolExecutor
import os
import time

# For I/O-bound work, use more threads than CPU count
# Each blocked thread uses little CPU, so we can have many waiting on I/O
cpu_count = os.cpu_count()
io_bound_workers = cpu_count * 4 # 4 threads per core for I/O

# For CPU-bound work, use roughly the CPU count (limited by GIL)
cpu_bound_workers = cpu_count

# Create with adaptive sizing
with ThreadPoolExecutor(max_workers=io_bound_workers) as executor:
# Your tasks here
pass

I/O-bound workloads benefit from more threads (each thread can block independently). CPU-bound workloads are limited by the GIL, so extra threads don't help.

Complete Example: Parallel Web Scraping

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def scrape_page(page_id):
"""Simulate scraping a web page."""
latency = random.uniform(0.5, 2)
time.sleep(latency)
if random.random() < 0.1:
raise Exception(f"Page {page_id} scrape failed")
return {"page": page_id, "items": random.randint(1, 100)}

pages = list(range(1, 21)) # 20 pages to scrape
results = []
errors = []

with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(scrape_page, page): page for page in pages}

completed = 0
for future in as_completed(futures):
page = futures[future]
try:
result = future.result(timeout=5)
results.append(result)
completed += 1
print(f"[{completed}/{len(pages)}] Scraped page {page}: {result['items']} items")
except Exception as e:
errors.append((page, str(e)))
print(f"Error scraping page {page}: {e}")

print(f"\nSummary: {len(results)} successful, {len(errors)} failed")
total_items = sum(r["items"] for r in results)
print(f"Total items scraped: {total_items}")

This example demonstrates:

  • Creating a thread pool for I/O-bound work.
  • Progress tracking with counted completion.
  • Error handling per task.
  • Aggregating results across all tasks.

Key Takeaways

  • ThreadPoolExecutor eliminates manual thread creation and management.
  • Use submit() for individual tasks, map() for bulk processing, and as_completed() for streaming results.
  • Set max_workers based on workload: 4–8 per CPU for I/O, equal to CPU count for CPU-bound.
  • Always handle exceptions from future.result() and use timeouts to avoid indefinite waits.
  • The with statement ensures proper shutdown; never forget to clean up the executor.

Frequently Asked Questions

How do I cancel a running task?

Use future.cancel() before it starts. If the task is already running, cancel() returns False and the task continues. There's no way to forcefully interrupt a running thread in Python.

Can I submit tasks from within a task?

Yes, but be careful: submitting nested tasks can cause deadlock if all worker threads are blocked waiting for child tasks to complete while the child tasks are queued behind other work.

What's the difference between ThreadPoolExecutor and ProcessPoolExecutor?

ThreadPoolExecutor uses threads (shared memory, affected by GIL); ProcessPoolExecutor uses processes (separate memory, true parallelism for CPU-bound work but higher overhead). Use ProcessPoolExecutor for CPU-bound workloads and ThreadPoolExecutor for I/O-bound.

Can I reuse an executor across multiple requests?

Yes, that's the whole idea. Create a single executor at application startup and use it throughout the lifetime of your application. This amortizes the cost of thread creation.

What happens if I don't call shutdown()?

The executor remains alive holding thread resources. When the executor is garbage-collected, it calls shutdown(wait=False), abruptly stopping threads. Always use with or explicit shutdown().

Further Reading