Process Pools for Parallel Computing: Distribute Work
Process pools eliminate manual process spawning by managing a fixed set of worker processes that consume tasks from an internal queue. You submit work via map(), imap(), or apply_async(), and the pool distributes it, blocks results, and handles cleanup automatically. Pools are ideal for batch processing (image resizing, data aggregation, scientific simulation) because they reuse worker processes, amortizing startup overhead across many tasks. This article covers pool creation, task submission strategies, result handling, and tuning for maximum throughput.
What Is a Process Pool?
A multiprocessing.Pool maintains N worker processes that loop internally, waiting for tasks. You submit tasks (a function + arguments) to the pool's queue. Workers dequeue tasks, execute them, and return results. Once all tasks are exhausted, workers sleep until new work arrives. This is far more efficient than spawning a fresh process for each task because process startup (~100 ms) is paid once, then amortized over hundreds or thousands of tasks.
Typical architecture:
Main Process
|
+---> Pool Queue (tasks to do)
|
v
Worker-1 (sub-process)
Worker-2 (sub-process)
Worker-N (sub-process)
|
+---> Result Queue (completed results)
^
|
Worker processes write results back
Creating and Configuring a Pool
import multiprocessing
if __name__ == "__main__":
# Create a pool with 4 workers (explicit count)
pool = multiprocessing.Pool(processes=4)
# Create a pool matching CPU count (default)
pool = multiprocessing.Pool() # Usually optimal
# Always clean up with context manager
with multiprocessing.Pool(processes=4) as pool:
# Use pool here
pass
# Pool automatically closed and workers terminated
Pool size guidance:
processes=None(default) – usesos.cpu_count(), usually optimal for CPU-bound tasks.- For I/O-bound tasks, use 2–4x the CPU count (workers can block on I/O).
- For memory-constrained systems, reduce the count; each worker costs 10–50 MB.
Task Submission: map() vs. imap() vs. apply_async()
map(): Simple and Blocking
pool.map() applies a function to every item in an iterable, blocking until all results are ready. It returns a list of results in the same order as input.
import multiprocessing
import math
def compute_sqrt(x):
"""Compute square root."""
return math.sqrt(x)
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
numbers = [1, 4, 9, 16, 25, 36, 49, 64]
# Block until all results are ready
results = pool.map(compute_sqrt, numbers)
print(results) # [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
When to use: Simple batch jobs where you have all inputs upfront and can wait for all outputs.
imap(): Non-Blocking Iterator
pool.imap() returns an iterator that yields results as they complete, allowing you to process partial results without waiting for all tasks.
import multiprocessing
import time
def slow_square(x):
time.sleep(1) # Simulate slow task
return x ** 2
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
numbers = range(10)
# Return immediately; iterate as results arrive
result_iterator = pool.imap(slow_square, numbers)
for i, result in enumerate(result_iterator):
print(f"Task {i} result: {result}")
# Can process each result as soon as it's ready
When to use: Long-running batch jobs where you want to process partial results or monitor progress in real-time.
apply_async(): Flexible, Returns Future-like Object
apply_async() submits a single task and returns an AsyncResult object. You can check completion with ready() or wait with get().
import multiprocessing
import time
def process_item(item_id):
time.sleep(2)
return f"Processed {item_id}"
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
# Submit one task
async_result = pool.apply_async(process_item, args=(42,))
# Check if done without blocking
if not async_result.ready():
print("Task is running...")
time.sleep(1)
# Block and get result with optional timeout
result = async_result.get(timeout=5)
print(result)
When to use: Dynamic workloads where you submit tasks incrementally or need fine-grained control over individual task completion.
Comparison: map vs. imap vs. apply_async
| Method | Blocking | Returns | Use Case |
|---|---|---|---|
map() | Yes, waits for all | List (ordered) | Simple batch; all inputs ready upfront |
imap() | No, yields as ready | Iterator | Long jobs; process partial results incrementally |
apply_async() | No, returns immediately | AsyncResult object | Dynamic workloads; submit one task at a time |
Practical Example: Batch Image Resizing
Here's a real-world scenario: resize 100 images in parallel.
import multiprocessing
from PIL import Image
import os
def resize_image(input_path):
"""Resize image to 800x600 and save."""
try:
img = Image.open(input_path)
img.thumbnail((800, 600))
# Save resized image
output_path = input_path.replace(".jpg", "_resized.jpg")
img.save(output_path, quality=85)
return f"OK: {os.path.basename(input_path)}"
except Exception as e:
return f"FAIL: {os.path.basename(input_path)}: {e}"
if __name__ == "__main__":
image_files = [
"photo1.jpg", "photo2.jpg", "photo3.jpg",
# ... up to 100 images
]
# Use 4 workers for I/O-bound image work
with multiprocessing.Pool(processes=4) as pool:
# Use imap to process results as they arrive
for result in pool.imap(resize_image, image_files):
print(result)
Handling Exceptions in Worker Tasks
If a worker task raises an exception, pool.map() re-raises it when you iterate the results. For partial failures, use imap_unordered() and wrap in try-except:
import multiprocessing
def risky_task(x):
if x == 5:
raise ValueError("Cannot process 5")
return x ** 2
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
try:
results = pool.map(risky_task, range(10))
except ValueError as e:
print(f"Task failed: {e}")
# Better: catch per-result
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
for i, result in enumerate(pool.imap_unordered(risky_task, range(10))):
try:
print(f"Task {i}: {result}")
except Exception as e:
print(f"Task {i} failed: {e}")
Timeout and Cleanup
Always use context managers (with statement) to guarantee cleanup. If you can't use context managers, explicitly call close() and join():
import multiprocessing
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
try:
results = pool.map(some_function, items)
finally:
pool.close() # No new tasks accepted
pool.join() # Wait for workers to finish
For blocking timeouts on map(), there's no built-in timeout. Use imap() with a timeout on iteration:
import multiprocessing
import time
def task(x):
time.sleep(2)
return x ** 2
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
result_iter = pool.imap(task, range(10), chunksize=1)
for i, result in enumerate(result_iter):
try:
print(f"Result {i}: {result}")
except multiprocessing.TimeoutError:
print(f"Task {i} timed out")
break
Chunksize: Optimizing Task Scheduling
By default, map() divides the iterable into processes * 4 chunks (one per worker at a time). For very fast tasks (< 1 ms each), increase chunksize to reduce scheduling overhead:
import multiprocessing
def fast_task(x):
return x ** 2
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
items = range(1_000_000)
# Default chunksize (auto): overhead for 1M tasks
results = pool.map(fast_task, items)
# Explicit chunksize: fewer context switches
results = pool.map(fast_task, items, chunksize=10_000)
For slow tasks (> 100 ms each), small chunksize is better because workers are kept busy. See Article 7 for detailed chunking strategies.
Key Takeaways
Poolmaintains N worker processes, reusing them across many tasks to amortize startup overhead.- Use
pool.map()for simple batch jobs where all inputs are ready upfront; it blocks until done. - Use
pool.imap()for long-running workloads where you want to process partial results incrementally. - Use
pool.apply_async()for dynamic workloads submitted one task at a time. - Set
processestoos.cpu_count()for CPU-bound tasks; 2–4x CPU count for I/O-bound tasks. - Always use context managers (
withstatement) to guarantee cleanup. - Tune
chunksize: large chunks (10,000+) for fast tasks, small chunks (1–10) for slow tasks.
Frequently Asked Questions
How many workers should I use?
For CPU-bound tasks, os.cpu_count() is usually optimal. For I/O-bound tasks, use 2–4x the CPU count. Memory limits may force you lower on small machines.
Can I dynamically add tasks while the pool is running?
Yes. map(), imap(), and apply_async() can be called multiple times on the same pool. The pool will service all submitted tasks in order.
What happens if a worker dies unexpectedly?
The pool does not automatically respawn dead workers. Check exit codes of workers periodically (advanced). For reliability, wrap worker code in try-except and catch exceptions on result retrieval.
Is Pool thread-safe?
Yes, you can call pool.map() and pool.apply_async() from multiple threads in the main process. The pool queue is thread-safe. However, this is uncommon; usually, one thread submits all work.
How do I limit the total number of queued tasks?
Use pool.apply_async() and manually throttle submission. map() queues all tasks at once, which can exhaust memory for very large iterables. For huge workloads, use imap() and process results incrementally.
Further Reading
- multiprocessing.Pool documentation — official API reference.
- Real Python: Python Multiprocessing — detailed comparison of Pool methods.
- ProcessPoolExecutor in concurrent.futures — alternative Executor interface (Article 4).
- Chunking strategies guide — optimize chunk sizes for your workload.