Skip to main content

Threading in Real Applications: Complete Examples (2026)

Threading patterns learned in isolation don't always translate to real applications. This article combines all previous concepts into three complete, production-ready examples: a parallel web scraper with error handling and progress tracking, a concurrent file processor with bounded resource use, and a responsive CLI application that keeps the interface snappy while processing in the background.

Building the web scraper that follows, I initially created 100 threads for 100 URLs and the machine became unresponsive. Switching to a thread pool with 8 workers made it faster and more stable. That lesson—thinking about resource limits, not just concurrency—is the key to real-world threading success.

Example 1: Parallel Web Scraper with Thread Pool

import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
import sys

class WebScraperStats:
"""Thread-safe statistics collector."""
def __init__(self):
self.lock = threading.Lock()
self.successful = 0
self.failed = 0
self.total_bytes = 0
self.start_time = time.time()

def record_success(self, url, bytes_fetched):
with self.lock:
self.successful += 1
self.total_bytes += bytes_fetched

def record_failure(self, url, error):
with self.lock:
self.failed += 1

def get_summary(self):
with self.lock:
elapsed = time.time() - self.start_time
return {
'successful': self.successful,
'failed': self.failed,
'total_bytes': self.total_bytes,
'elapsed': elapsed,
'bytes_per_sec': self.total_bytes / elapsed if elapsed > 0 else 0,
}

def simulate_fetch_url(url):
"""Simulate fetching a URL with realistic latency and occasional failures."""
if random.random() < 0.05: # 5% failure rate
raise ConnectionError(f"Failed to connect to {url}")

latency = random.uniform(0.5, 3)
time.sleep(latency) # GIL released; other threads proceed

# Simulate response size: 10 KB to 100 KB
response_size = random.randint(10_000, 100_000)
return response_size

def scrape_urls(urls, max_workers=8):
"""Scrape multiple URLs concurrently with proper error handling."""
stats = WebScraperStats()
results = {}

print(f"Scraping {len(urls)} URLs with {max_workers} workers...")
print()

with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_url = {executor.submit(simulate_fetch_url, url): url for url in urls}

# Process results as they complete
completed = 0
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
bytes_fetched = future.result(timeout=10)
stats.record_success(url, bytes_fetched)
results[url] = {'status': 'success', 'bytes': bytes_fetched}
print(f"[{completed + 1}/{len(urls)}] {url}: {bytes_fetched} bytes")
except Exception as e:
stats.record_failure(url, str(e))
results[url] = {'status': 'failed', 'error': str(e)}
print(f"[{completed + 1}/{len(urls)}] {url}: ERROR - {e}")
finally:
completed += 1

# Print summary
print()
summary = stats.get_summary()
print(f"Summary:")
print(f" Successful: {summary['successful']}/{len(urls)}")
print(f" Failed: {summary['failed']}/{len(urls)}")
print(f" Total bytes: {summary['total_bytes']:,}")
print(f" Elapsed: {summary['elapsed']:.2f}s")
print(f" Throughput: {summary['bytes_per_sec'] / 1024:.1f} KB/s")

return results

# Example usage
if __name__ == "__main__":
urls = [f"https://example.com/page{i}" for i in range(20)]
results = scrape_urls(urls, max_workers=8)

This example demonstrates:

  • Thread-safe statistics using a lock.
  • as_completed() for processing results in completion order.
  • Progress tracking (completed count).
  • Error handling per task.
  • Resource-bounded execution (8 workers max).

Example 2: Concurrent File Processor

import threading
import queue
import time
import hashlib
from pathlib import Path

class FileProcessor:
"""Process files concurrently: read, hash, and store results."""

def __init__(self, num_workers=4):
self.num_workers = num_workers
self.work_queue = queue.Queue()
self.result_queue = queue.Queue()
self.stop_event = threading.Event()
self.lock = threading.Lock()
self.stats = {'processed': 0, 'failed': 0, 'total_size': 0}

def worker(self):
"""Worker thread: process files from the queue."""
while not self.stop_event.is_set():
try:
filepath = self.work_queue.get(timeout=1)
except queue.Empty:
continue

if filepath is None: # Sentinel: stop signal
self.work_queue.task_done()
break

try:
# Process the file
with open(filepath, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
file_size = filepath.stat().st_size

# Record result
self.result_queue.put({
'filepath': filepath,
'hash': file_hash,
'size': file_size,
'status': 'success'
})

# Update stats (thread-safe)
with self.lock:
self.stats['processed'] += 1
self.stats['total_size'] += file_size

except Exception as e:
self.result_queue.put({
'filepath': filepath,
'error': str(e),
'status': 'failed'
})

with self.lock:
self.stats['failed'] += 1

finally:
self.work_queue.task_done()

def process_files(self, file_paths):
"""Process files concurrently."""
# Start worker threads
workers = [
threading.Thread(target=self.worker, daemon=True)
for _ in range(self.num_workers)
]

for w in workers:
w.start()

# Submit all files
for filepath in file_paths:
self.work_queue.put(Path(filepath))

# Signal workers to stop (after processing all files)
self.work_queue.join() # Wait for all tasks to complete

for _ in range(self.num_workers):
self.work_queue.put(None) # Sentinel

# Collect results
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())

# Wait for workers to exit
for w in workers:
w.join(timeout=5)

return results, self.stats

# Example usage
if __name__ == "__main__":
processor = FileProcessor(num_workers=4)

# Create test files
test_dir = Path("/tmp/test_files")
test_dir.mkdir(exist_ok=True)
for i in range(10):
(test_dir / f"file{i}.txt").write_text(f"Content {i}" * 1000)

# Process files
file_paths = list(test_dir.glob("file*.txt"))
results, stats = processor.process_files(file_paths)

print(f"Processed: {stats['processed']}, Failed: {stats['failed']}")
print(f"Total size: {stats['total_size']} bytes")

for result in results[:3]: # Show first 3
print(f" {result['filepath'].name}: {result['hash'][:16]}...")

This example shows:

  • Queue-based work distribution.
  • Graceful shutdown with sentinels.
  • Thread-safe statistics collection.
  • Proper resource cleanup.

Example 3: Responsive CLI Application

import threading
import time
import sys
from queue import Queue

class ResponsiveCLI:
"""A CLI that stays responsive while processing in background."""

def __init__(self):
self.tasks_queue = Queue()
self.results_queue = Queue()
self.stop_event = threading.Event()

def background_worker(self):
"""Background thread: process tasks and store results."""
while not self.stop_event.is_set():
try:
task = self.tasks_queue.get(timeout=0.5)
except:
continue

# Simulate work
time.sleep(2)
result = f"Completed: {task}"
self.results_queue.put(result)
self.tasks_queue.task_done()

def run(self):
"""Interactive CLI loop."""
# Start background worker
worker = threading.Thread(target=self.background_worker, daemon=True)
worker.start()

print("Responsive CLI - Type 'submit <task>' to submit, 'results' to see results, 'quit' to exit")

try:
while True:
# Check for completed results
if not self.results_queue.empty():
print(f"\n>>> {self.results_queue.get()}")

# Prompt and handle user input
try:
user_input = input("\n> ").strip()
except EOFError:
break

if not user_input:
continue
elif user_input == 'quit':
break
elif user_input == 'results':
queue_size = self.tasks_queue.qsize()
print(f"Pending tasks: {queue_size}")
elif user_input.startswith('submit '):
task = user_input[7:]
self.tasks_queue.put(task)
print(f"Submitted: {task}")
else:
print("Unknown command")

finally:
self.stop_event.set()
worker.join(timeout=2)
print("\nGoodbye!")

if __name__ == "__main__":
cli = ResponsiveCLI()
cli.run()

Example session:

> submit task1
Submitted: task1
> submit task2
Submitted: task2
> results
Pending tasks: 2
> # ... after 2 seconds ...
>>> Completed: task1
>>> Completed: task2
> quit
Goodbye!

This example shows:

  • Main thread (CLI) remains responsive.
  • Background thread processes in parallel.
  • Clean separation of concerns.
  • Graceful shutdown.

Best Practices Summary

PracticeWhyExample
Use ThreadPoolExecutorAvoids manual thread managementwith ThreadPoolExecutor(max_workers=8)
Set reasonable max_workersPrevents resource exhaustion4-8 for I/O-bound, CPU-count for CPU-bound
Handle exceptionsPrevent silent failurestry-except in task, check future.result()
Use queues for communicationThread-safe without locksQueue, as_completed()
Track progressUser visibilityCounter with lock, print periodically
Graceful shutdownClean resource releaseStop event, join() with timeout
Lock shared statePrevent race conditionsUse lock for statistics, counters

Key Takeaways

  • Thread pools eliminate manual thread management; use ThreadPoolExecutor for most workloads.
  • Always estimate resource limits (max workers, memory per thread) before deploying.
  • Separate concerns: I/O thread vs. UI thread vs. computation thread.
  • Use queues for inter-thread communication rather than shared variables.
  • Include error handling, progress tracking, and graceful shutdown from the start.
  • Test under load; threading bugs often manifest only at scale.

Frequently Asked Questions

How do I know if my thread pool size is optimal?

Benchmark with different pool sizes and measure throughput. For I/O-bound, try 2–8x CPU count. For CPU-bound, try CPU count. Monitor CPU and memory usage to avoid exhaustion.

What happens if a worker thread crashes?

If using ThreadPoolExecutor.submit(), the exception is captured in the Future. If using raw threads, the thread exits silently. Always handle exceptions explicitly and check results.

How do I gracefully shut down a service with background threads?

Use a stop event or flag that threads check periodically. Call set() on the event, then join() all threads with a timeout. If timeout expires, log a warning but proceed with shutdown.

Can I use threading for a web API that handles thousands of requests?

Threading is slower than asyncio for high-concurrency scenarios (1000+ concurrent connections), but it works for moderate load (100–500 concurrent). For very high concurrency, use asyncio or a production web framework (FastAPI, Django with async).

How do I profile threading overhead?

Use threading.settrace() to track function calls, or use a profiler like py-spy or cProfile. Measure wall-clock time vs. CPU time—large differences indicate I/O waits (good for threading).

Further Reading