Building a Scalable Image Processing Pipeline
Building a production image processing pipeline requires combining Pool-based parallelism, error handling, progress tracking, shared metrics, and graceful shutdown. This article walks through a complete, real-world example: a service that resizes thousands of images in parallel, logs errors, tracks throughput, and avoids resource leaks. The patterns demonstrated apply to any batch processing workload: video transcoding, data ETL, scientific simulation, or machine learning inference.
Pipeline Architecture
The target system has four components:
- Input Queue: Tasks to process (image paths, target sizes).
- Worker Pool: Processes images in parallel.
- Metrics & Logging: Track progress, errors, throughput.
- Graceful Shutdown: Drain queue, wait for workers, clean up resources.
Step 1: Define the Worker Function
from PIL import Image
import os
def resize_image(task):
"""
Resize a single image.
Args:
task: (input_path, output_dir, target_size, quality)
Returns:
(status, input_path, output_path, error_msg, bytes_saved)
"""
input_path, output_dir, target_size, quality = task
try:
# Open image
img = Image.open(input_path)
original_size = os.path.getsize(input_path)
# Resize
img.thumbnail(target_size, Image.Resampling.LANCZOS)
# Save
basename = os.path.basename(input_path)
output_path = os.path.join(output_dir, basename)
img.save(output_path, quality=quality)
final_size = os.path.getsize(output_path)
bytes_saved = original_size - final_size
return ("OK", input_path, output_path, None, bytes_saved)
except Exception as e:
return ("ERROR", input_path, None, str(e), 0)
Step 2: Main Coordinator with Metrics
import multiprocessing
import ctypes
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
class PipelineMetrics:
"""Track metrics across workers."""
def __init__(self, num_workers):
self.total_processed = multiprocessing.Value(ctypes.c_int, 0)
self.total_errors = multiprocessing.Value(ctypes.c_int, 0)
self.total_bytes_saved = multiprocessing.Value(ctypes.c_int, 0)
self.lock = multiprocessing.Lock()
self.num_workers = num_workers
def record_success(self, bytes_saved):
"""Record successful image processing."""
with self.lock:
self.total_processed.value += 1
self.total_bytes_saved.value += bytes_saved
def record_error(self):
"""Record error."""
with self.lock:
self.total_errors.value += 1
def get_stats(self):
"""Get current stats."""
with self.lock:
return {
'processed': self.total_processed.value,
'errors': self.total_errors.value,
'bytes_saved': self.total_bytes_saved.value,
}
class ImageProcessingPipeline:
"""Manages image resizing with parallelism and monitoring."""
def __init__(self, num_workers=4, target_size=(800, 600), quality=85):
self.num_workers = num_workers
self.target_size = target_size
self.quality = quality
self.metrics = PipelineMetrics(num_workers)
self.start_time = None
def process_directory(self, input_dir, output_dir, batch_size=None):
"""
Resize all images in input_dir, save to output_dir.
Args:
input_dir: Source directory
output_dir: Destination directory
batch_size: Images per batch (None = auto)
"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Enumerate image files
image_files = [
f for f in os.listdir(input_dir)
if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif'))
]
if not image_files:
print("No images found")
return
# Prepare tasks
tasks = [
(
os.path.join(input_dir, f),
output_dir,
self.target_size,
self.quality
)
for f in image_files
]
total_tasks = len(tasks)
# Auto chunksize for good load balancing
if batch_size is None:
batch_size = max(1, total_tasks // (self.num_workers * 4))
print(f"Processing {total_tasks} images with {self.num_workers} workers")
print(f"Batch size: {batch_size}")
self.start_time = time.perf_counter()
# Process with executor
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
futures = [
executor.submit(resize_image, task) for task in tasks
]
completed = 0
last_report = 0
for future in as_completed(futures, timeout=300):
try:
status, input_path, output_path, error_msg, bytes_saved = future.result()
if status == "OK":
self.metrics.record_success(bytes_saved)
else:
self.metrics.record_error()
print(f"ERROR: {os.path.basename(input_path)}: {error_msg}")
completed += 1
# Report progress every 10% or 10 tasks
if completed - last_report >= max(10, total_tasks // 10):
self._report_progress(completed, total_tasks)
last_report = completed
except Exception as e:
print(f"Exception in future: {e}")
self.metrics.record_error()
# Final report
self._report_progress(completed, total_tasks, final=True)
def _report_progress(self, completed, total, final=False):
"""Print progress report."""
elapsed = time.perf_counter() - self.start_time
stats = self.metrics.get_stats()
throughput = stats['processed'] / elapsed if elapsed > 0 else 0
pct = (completed / total) * 100
mb_saved = stats['bytes_saved'] / 1024 / 1024
status = "DONE" if final else "IN PROGRESS"
print(
f"[{status}] {completed}/{total} ({pct:.1f}%) | "
f"{throughput:.1f} img/sec | "
f"{mb_saved:.1f} MB saved | "
f"Errors: {stats['errors']}"
)
def main():
"""Example usage."""
# Create sample images for testing
input_dir = "/tmp/images_input"
output_dir = "/tmp/images_output"
os.makedirs(input_dir, exist_ok=True)
# Generate dummy images (if Pillow available)
try:
from PIL import Image
for i in range(10):
img = Image.new('RGB', (1600, 1200), color=(i*20, i*20, i*20))
img.save(f"{input_dir}/image_{i:03d}.jpg")
except ImportError:
print("Pillow not installed; skipping sample image generation")
return
# Run pipeline
pipeline = ImageProcessingPipeline(
num_workers=4,
target_size=(800, 600),
quality=85
)
pipeline.process_directory(input_dir, output_dir)
if __name__ == "__main__":
main()
Step 3: Error Handling and Retries
For transient failures (network timeouts, temporary file locks), implement retry logic:
import time
def resize_image_with_retries(task, max_retries=3):
"""Wrapper that retries on failure."""
input_path, output_dir, target_size, quality = task
last_error = None
for attempt in range(max_retries):
try:
return resize_image(task)
except Exception as e:
last_error = str(e)
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
return ("ERROR", input_path, None, last_error, 0)
Submit with retry wrapper:
futures = [
executor.submit(resize_image_with_retries, task)
for task in tasks
]
Step 4: Resource Cleanup and Graceful Shutdown
class PipelineContext:
"""Context manager for safe pipeline resource cleanup."""
def __init__(self, input_dir, output_dir, num_workers=4):
self.pipeline = ImageProcessingPipeline(num_workers=num_workers)
self.input_dir = input_dir
self.output_dir = output_dir
def __enter__(self):
return self.pipeline
def __exit__(self, exc_type, exc_val, exc_tb):
"""Cleanup on exit."""
if exc_type:
print(f"Pipeline exited with exception: {exc_type.__name__}: {exc_val}")
print("Cleaning up resources...")
# Additional cleanup (delete temp files, close connections, etc.)
return False
# Usage
with PipelineContext("/tmp/images", "/tmp/output") as pipeline:
pipeline.process_directory("/tmp/images", "/tmp/output")
Step 5: Performance Tuning and Benchmarking
import time
def benchmark_pipeline(num_images=100, num_workers_list=[2, 4, 8]):
"""Benchmark different worker counts."""
# Generate test images
test_dir = "/tmp/test_images"
os.makedirs(test_dir, exist_ok=True)
for i in range(num_images):
img = Image.new('RGB', (1600, 1200), color=(50, 50, 50))
img.save(f"{test_dir}/test_{i:04d}.jpg")
results = {}
for num_workers in num_workers_list:
output_dir = f"/tmp/output_{num_workers}"
pipeline = ImageProcessingPipeline(num_workers=num_workers)
start = time.perf_counter()
pipeline.process_directory(test_dir, output_dir)
elapsed = time.perf_counter() - start
throughput = num_images / elapsed
results[num_workers] = {
'elapsed': elapsed,
'throughput': throughput,
}
print(f"Workers={num_workers}: {elapsed:.2f}s, {throughput:.1f} img/sec")
return results
if __name__ == "__main__":
# Benchmark
benchmark_pipeline(num_images=100, num_workers_list=[2, 4, 8])
Key Patterns Demonstrated
- Metrics via Shared Memory: Use
ValueandLockfor thread-safe counters across processes. - Progress Reporting: Report at intervals (10% or every N tasks) to avoid log spam.
- Error Handling: Catch per-task exceptions; record and continue.
- Resource Cleanup: Use context managers to guarantee cleanup.
- Adaptive Chunking: Auto-calculate batch size based on workload size and worker count.
- Graceful Degradation: If some tasks fail, pipeline continues and reports final error count.
Production Considerations
Logging to file:
import logging
logging.basicConfig(
filename='pipeline.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# In worker or main
logging.error(f"Failed to process {input_path}: {error_msg}")
Monitoring with metrics export:
import json
stats = pipeline.metrics.get_stats()
with open('metrics.json', 'w') as f:
json.dump(stats, f)
Rate limiting (avoid overloading disk I/O):
import time
time.sleep(0.01) # Throttle every 100 ms
Key Takeaways
- Use
ProcessPoolExecutorwithas_completed()for real-time progress tracking. - Track metrics via
multiprocessing.Value+Lockfor safe cross-process updates. - Auto-calculate
chunksizebased on total workload and worker count. - Implement retry logic with exponential backoff for transient failures.
- Use context managers and try-finally to guarantee resource cleanup.
- Report progress periodically (not per-task) to avoid log bottlenecks.
- Test and benchmark with realistic data sizes before production deployment.
Frequently Asked Questions
How do I handle processing very large images that exceed memory?
Use streaming/chunked processing: load image in blocks, process, write output. Or use memory-mapped files with NumPy. This is beyond multiprocessing scope but good to consider.
Can I pause and resume a pipeline?
Yes, but complex. Save processed file list to disk; on restart, skip already-processed files. Or use a database to track state.
What if I need to process >100,000 images?
Chunking (batch size) becomes critical. Use imap_unordered() for better adaptive scheduling. Consider a queue-based architecture where a feeder thread adds tasks as workers finish.
How do I profile which step is the bottleneck?
Add timing around resize_image() return; track I/O (read + write) vs. actual processing. Use cProfile on a subset.
Can I add a pre-processing or post-processing stage?
Yes, make it a pipeline of stages. Use Queues between stages, or call process functions sequentially within a single worker.
Further Reading
- Concurrent.futures ProcessPoolExecutor documentation — API reference.
- Pillow Image processing library — image manipulation API.
- Logging best practices — structured logging in Python.
- Production-grade multiprocessing patterns — IPC queue-based architectures.