Threading in Real Applications: Complete Examples (2026)
Threading patterns learned in isolation don't always translate to real applications. This article combines all previous concepts into three complete, production-ready examples: a parallel web scraper with error handling and progress tracking, a concurrent file processor with bounded resource use, and a responsive CLI application that keeps the interface snappy while processing in the background.
Building the web scraper that follows, I initially created 100 threads for 100 URLs and the machine became unresponsive. Switching to a thread pool with 8 workers made it faster and more stable. That lesson—thinking about resource limits, not just concurrency—is the key to real-world threading success.
Example 1: Parallel Web Scraper with Thread Pool
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
import sys
class WebScraperStats:
"""Thread-safe statistics collector."""
def __init__(self):
self.lock = threading.Lock()
self.successful = 0
self.failed = 0
self.total_bytes = 0
self.start_time = time.time()
def record_success(self, url, bytes_fetched):
with self.lock:
self.successful += 1
self.total_bytes += bytes_fetched
def record_failure(self, url, error):
with self.lock:
self.failed += 1
def get_summary(self):
with self.lock:
elapsed = time.time() - self.start_time
return {
'successful': self.successful,
'failed': self.failed,
'total_bytes': self.total_bytes,
'elapsed': elapsed,
'bytes_per_sec': self.total_bytes / elapsed if elapsed > 0 else 0,
}
def simulate_fetch_url(url):
"""Simulate fetching a URL with realistic latency and occasional failures."""
if random.random() < 0.05: # 5% failure rate
raise ConnectionError(f"Failed to connect to {url}")
latency = random.uniform(0.5, 3)
time.sleep(latency) # GIL released; other threads proceed
# Simulate response size: 10 KB to 100 KB
response_size = random.randint(10_000, 100_000)
return response_size
def scrape_urls(urls, max_workers=8):
"""Scrape multiple URLs concurrently with proper error handling."""
stats = WebScraperStats()
results = {}
print(f"Scraping {len(urls)} URLs with {max_workers} workers...")
print()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_url = {executor.submit(simulate_fetch_url, url): url for url in urls}
# Process results as they complete
completed = 0
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
bytes_fetched = future.result(timeout=10)
stats.record_success(url, bytes_fetched)
results[url] = {'status': 'success', 'bytes': bytes_fetched}
print(f"[{completed + 1}/{len(urls)}] {url}: {bytes_fetched} bytes")
except Exception as e:
stats.record_failure(url, str(e))
results[url] = {'status': 'failed', 'error': str(e)}
print(f"[{completed + 1}/{len(urls)}] {url}: ERROR - {e}")
finally:
completed += 1
# Print summary
print()
summary = stats.get_summary()
print(f"Summary:")
print(f" Successful: {summary['successful']}/{len(urls)}")
print(f" Failed: {summary['failed']}/{len(urls)}")
print(f" Total bytes: {summary['total_bytes']:,}")
print(f" Elapsed: {summary['elapsed']:.2f}s")
print(f" Throughput: {summary['bytes_per_sec'] / 1024:.1f} KB/s")
return results
# Example usage
if __name__ == "__main__":
urls = [f"https://example.com/page{i}" for i in range(20)]
results = scrape_urls(urls, max_workers=8)
This example demonstrates:
- Thread-safe statistics using a lock.
as_completed()for processing results in completion order.- Progress tracking (completed count).
- Error handling per task.
- Resource-bounded execution (8 workers max).
Example 2: Concurrent File Processor
import threading
import queue
import time
import hashlib
from pathlib import Path
class FileProcessor:
"""Process files concurrently: read, hash, and store results."""
def __init__(self, num_workers=4):
self.num_workers = num_workers
self.work_queue = queue.Queue()
self.result_queue = queue.Queue()
self.stop_event = threading.Event()
self.lock = threading.Lock()
self.stats = {'processed': 0, 'failed': 0, 'total_size': 0}
def worker(self):
"""Worker thread: process files from the queue."""
while not self.stop_event.is_set():
try:
filepath = self.work_queue.get(timeout=1)
except queue.Empty:
continue
if filepath is None: # Sentinel: stop signal
self.work_queue.task_done()
break
try:
# Process the file
with open(filepath, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
file_size = filepath.stat().st_size
# Record result
self.result_queue.put({
'filepath': filepath,
'hash': file_hash,
'size': file_size,
'status': 'success'
})
# Update stats (thread-safe)
with self.lock:
self.stats['processed'] += 1
self.stats['total_size'] += file_size
except Exception as e:
self.result_queue.put({
'filepath': filepath,
'error': str(e),
'status': 'failed'
})
with self.lock:
self.stats['failed'] += 1
finally:
self.work_queue.task_done()
def process_files(self, file_paths):
"""Process files concurrently."""
# Start worker threads
workers = [
threading.Thread(target=self.worker, daemon=True)
for _ in range(self.num_workers)
]
for w in workers:
w.start()
# Submit all files
for filepath in file_paths:
self.work_queue.put(Path(filepath))
# Signal workers to stop (after processing all files)
self.work_queue.join() # Wait for all tasks to complete
for _ in range(self.num_workers):
self.work_queue.put(None) # Sentinel
# Collect results
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())
# Wait for workers to exit
for w in workers:
w.join(timeout=5)
return results, self.stats
# Example usage
if __name__ == "__main__":
processor = FileProcessor(num_workers=4)
# Create test files
test_dir = Path("/tmp/test_files")
test_dir.mkdir(exist_ok=True)
for i in range(10):
(test_dir / f"file{i}.txt").write_text(f"Content {i}" * 1000)
# Process files
file_paths = list(test_dir.glob("file*.txt"))
results, stats = processor.process_files(file_paths)
print(f"Processed: {stats['processed']}, Failed: {stats['failed']}")
print(f"Total size: {stats['total_size']} bytes")
for result in results[:3]: # Show first 3
print(f" {result['filepath'].name}: {result['hash'][:16]}...")
This example shows:
- Queue-based work distribution.
- Graceful shutdown with sentinels.
- Thread-safe statistics collection.
- Proper resource cleanup.
Example 3: Responsive CLI Application
import threading
import time
import sys
from queue import Queue
class ResponsiveCLI:
"""A CLI that stays responsive while processing in background."""
def __init__(self):
self.tasks_queue = Queue()
self.results_queue = Queue()
self.stop_event = threading.Event()
def background_worker(self):
"""Background thread: process tasks and store results."""
while not self.stop_event.is_set():
try:
task = self.tasks_queue.get(timeout=0.5)
except:
continue
# Simulate work
time.sleep(2)
result = f"Completed: {task}"
self.results_queue.put(result)
self.tasks_queue.task_done()
def run(self):
"""Interactive CLI loop."""
# Start background worker
worker = threading.Thread(target=self.background_worker, daemon=True)
worker.start()
print("Responsive CLI - Type 'submit <task>' to submit, 'results' to see results, 'quit' to exit")
try:
while True:
# Check for completed results
if not self.results_queue.empty():
print(f"\n>>> {self.results_queue.get()}")
# Prompt and handle user input
try:
user_input = input("\n> ").strip()
except EOFError:
break
if not user_input:
continue
elif user_input == 'quit':
break
elif user_input == 'results':
queue_size = self.tasks_queue.qsize()
print(f"Pending tasks: {queue_size}")
elif user_input.startswith('submit '):
task = user_input[7:]
self.tasks_queue.put(task)
print(f"Submitted: {task}")
else:
print("Unknown command")
finally:
self.stop_event.set()
worker.join(timeout=2)
print("\nGoodbye!")
if __name__ == "__main__":
cli = ResponsiveCLI()
cli.run()
Example session:
> submit task1
Submitted: task1
> submit task2
Submitted: task2
> results
Pending tasks: 2
> # ... after 2 seconds ...
>>> Completed: task1
>>> Completed: task2
> quit
Goodbye!
This example shows:
- Main thread (CLI) remains responsive.
- Background thread processes in parallel.
- Clean separation of concerns.
- Graceful shutdown.
Best Practices Summary
| Practice | Why | Example |
|---|---|---|
Use ThreadPoolExecutor | Avoids manual thread management | with ThreadPoolExecutor(max_workers=8) |
Set reasonable max_workers | Prevents resource exhaustion | 4-8 for I/O-bound, CPU-count for CPU-bound |
| Handle exceptions | Prevent silent failures | try-except in task, check future.result() |
| Use queues for communication | Thread-safe without locks | Queue, as_completed() |
| Track progress | User visibility | Counter with lock, print periodically |
| Graceful shutdown | Clean resource release | Stop event, join() with timeout |
| Lock shared state | Prevent race conditions | Use lock for statistics, counters |
Key Takeaways
- Thread pools eliminate manual thread management; use
ThreadPoolExecutorfor most workloads. - Always estimate resource limits (max workers, memory per thread) before deploying.
- Separate concerns: I/O thread vs. UI thread vs. computation thread.
- Use queues for inter-thread communication rather than shared variables.
- Include error handling, progress tracking, and graceful shutdown from the start.
- Test under load; threading bugs often manifest only at scale.
Frequently Asked Questions
How do I know if my thread pool size is optimal?
Benchmark with different pool sizes and measure throughput. For I/O-bound, try 2–8x CPU count. For CPU-bound, try CPU count. Monitor CPU and memory usage to avoid exhaustion.
What happens if a worker thread crashes?
If using ThreadPoolExecutor.submit(), the exception is captured in the Future. If using raw threads, the thread exits silently. Always handle exceptions explicitly and check results.
How do I gracefully shut down a service with background threads?
Use a stop event or flag that threads check periodically. Call set() on the event, then join() all threads with a timeout. If timeout expires, log a warning but proceed with shutdown.
Can I use threading for a web API that handles thousands of requests?
Threading is slower than asyncio for high-concurrency scenarios (1000+ concurrent connections), but it works for moderate load (100–500 concurrent). For very high concurrency, use asyncio or a production web framework (FastAPI, Django with async).
How do I profile threading overhead?
Use threading.settrace() to track function calls, or use a profiler like py-spy or cProfile. Measure wall-clock time vs. CPU time—large differences indicate I/O waits (good for threading).