Skip to main content

Process Pools for Parallel Computing: Distribute Work

Process pools eliminate manual process spawning by managing a fixed set of worker processes that consume tasks from an internal queue. You submit work via map(), imap(), or apply_async(), and the pool distributes it, blocks results, and handles cleanup automatically. Pools are ideal for batch processing (image resizing, data aggregation, scientific simulation) because they reuse worker processes, amortizing startup overhead across many tasks. This article covers pool creation, task submission strategies, result handling, and tuning for maximum throughput.

What Is a Process Pool?

A multiprocessing.Pool maintains N worker processes that loop internally, waiting for tasks. You submit tasks (a function + arguments) to the pool's queue. Workers dequeue tasks, execute them, and return results. Once all tasks are exhausted, workers sleep until new work arrives. This is far more efficient than spawning a fresh process for each task because process startup (~100 ms) is paid once, then amortized over hundreds or thousands of tasks.

Typical architecture:

Main Process
|
+---> Pool Queue (tasks to do)
|
v
Worker-1 (sub-process)
Worker-2 (sub-process)
Worker-N (sub-process)
|
+---> Result Queue (completed results)
^
|
Worker processes write results back

Creating and Configuring a Pool

import multiprocessing

if __name__ == "__main__":
# Create a pool with 4 workers (explicit count)
pool = multiprocessing.Pool(processes=4)

# Create a pool matching CPU count (default)
pool = multiprocessing.Pool() # Usually optimal

# Always clean up with context manager
with multiprocessing.Pool(processes=4) as pool:
# Use pool here
pass
# Pool automatically closed and workers terminated

Pool size guidance:

  • processes=None (default) – uses os.cpu_count(), usually optimal for CPU-bound tasks.
  • For I/O-bound tasks, use 2–4x the CPU count (workers can block on I/O).
  • For memory-constrained systems, reduce the count; each worker costs 10–50 MB.

Task Submission: map() vs. imap() vs. apply_async()

map(): Simple and Blocking

pool.map() applies a function to every item in an iterable, blocking until all results are ready. It returns a list of results in the same order as input.

import multiprocessing
import math

def compute_sqrt(x):
"""Compute square root."""
return math.sqrt(x)

if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
numbers = [1, 4, 9, 16, 25, 36, 49, 64]

# Block until all results are ready
results = pool.map(compute_sqrt, numbers)

print(results) # [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

When to use: Simple batch jobs where you have all inputs upfront and can wait for all outputs.

imap(): Non-Blocking Iterator

pool.imap() returns an iterator that yields results as they complete, allowing you to process partial results without waiting for all tasks.

import multiprocessing
import time

def slow_square(x):
time.sleep(1) # Simulate slow task
return x ** 2

if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
numbers = range(10)

# Return immediately; iterate as results arrive
result_iterator = pool.imap(slow_square, numbers)

for i, result in enumerate(result_iterator):
print(f"Task {i} result: {result}")
# Can process each result as soon as it's ready

When to use: Long-running batch jobs where you want to process partial results or monitor progress in real-time.

apply_async(): Flexible, Returns Future-like Object

apply_async() submits a single task and returns an AsyncResult object. You can check completion with ready() or wait with get().

import multiprocessing
import time

def process_item(item_id):
time.sleep(2)
return f"Processed {item_id}"

if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
# Submit one task
async_result = pool.apply_async(process_item, args=(42,))

# Check if done without blocking
if not async_result.ready():
print("Task is running...")
time.sleep(1)

# Block and get result with optional timeout
result = async_result.get(timeout=5)
print(result)

When to use: Dynamic workloads where you submit tasks incrementally or need fine-grained control over individual task completion.

Comparison: map vs. imap vs. apply_async

MethodBlockingReturnsUse Case
map()Yes, waits for allList (ordered)Simple batch; all inputs ready upfront
imap()No, yields as readyIteratorLong jobs; process partial results incrementally
apply_async()No, returns immediatelyAsyncResult objectDynamic workloads; submit one task at a time

Practical Example: Batch Image Resizing

Here's a real-world scenario: resize 100 images in parallel.

import multiprocessing
from PIL import Image
import os

def resize_image(input_path):
"""Resize image to 800x600 and save."""
try:
img = Image.open(input_path)
img.thumbnail((800, 600))

# Save resized image
output_path = input_path.replace(".jpg", "_resized.jpg")
img.save(output_path, quality=85)

return f"OK: {os.path.basename(input_path)}"
except Exception as e:
return f"FAIL: {os.path.basename(input_path)}: {e}"

if __name__ == "__main__":
image_files = [
"photo1.jpg", "photo2.jpg", "photo3.jpg",
# ... up to 100 images
]

# Use 4 workers for I/O-bound image work
with multiprocessing.Pool(processes=4) as pool:
# Use imap to process results as they arrive
for result in pool.imap(resize_image, image_files):
print(result)

Handling Exceptions in Worker Tasks

If a worker task raises an exception, pool.map() re-raises it when you iterate the results. For partial failures, use imap_unordered() and wrap in try-except:

import multiprocessing

def risky_task(x):
if x == 5:
raise ValueError("Cannot process 5")
return x ** 2

if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
try:
results = pool.map(risky_task, range(10))
except ValueError as e:
print(f"Task failed: {e}")

# Better: catch per-result
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
for i, result in enumerate(pool.imap_unordered(risky_task, range(10))):
try:
print(f"Task {i}: {result}")
except Exception as e:
print(f"Task {i} failed: {e}")

Timeout and Cleanup

Always use context managers (with statement) to guarantee cleanup. If you can't use context managers, explicitly call close() and join():

import multiprocessing

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)

try:
results = pool.map(some_function, items)
finally:
pool.close() # No new tasks accepted
pool.join() # Wait for workers to finish

For blocking timeouts on map(), there's no built-in timeout. Use imap() with a timeout on iteration:

import multiprocessing
import time

def task(x):
time.sleep(2)
return x ** 2

if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
result_iter = pool.imap(task, range(10), chunksize=1)

for i, result in enumerate(result_iter):
try:
print(f"Result {i}: {result}")
except multiprocessing.TimeoutError:
print(f"Task {i} timed out")
break

Chunksize: Optimizing Task Scheduling

By default, map() divides the iterable into processes * 4 chunks (one per worker at a time). For very fast tasks (< 1 ms each), increase chunksize to reduce scheduling overhead:

import multiprocessing

def fast_task(x):
return x ** 2

if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
items = range(1_000_000)

# Default chunksize (auto): overhead for 1M tasks
results = pool.map(fast_task, items)

# Explicit chunksize: fewer context switches
results = pool.map(fast_task, items, chunksize=10_000)

For slow tasks (> 100 ms each), small chunksize is better because workers are kept busy. See Article 7 for detailed chunking strategies.

Key Takeaways

  • Pool maintains N worker processes, reusing them across many tasks to amortize startup overhead.
  • Use pool.map() for simple batch jobs where all inputs are ready upfront; it blocks until done.
  • Use pool.imap() for long-running workloads where you want to process partial results incrementally.
  • Use pool.apply_async() for dynamic workloads submitted one task at a time.
  • Set processes to os.cpu_count() for CPU-bound tasks; 2–4x CPU count for I/O-bound tasks.
  • Always use context managers (with statement) to guarantee cleanup.
  • Tune chunksize: large chunks (10,000+) for fast tasks, small chunks (1–10) for slow tasks.

Frequently Asked Questions

How many workers should I use?

For CPU-bound tasks, os.cpu_count() is usually optimal. For I/O-bound tasks, use 2–4x the CPU count. Memory limits may force you lower on small machines.

Can I dynamically add tasks while the pool is running?

Yes. map(), imap(), and apply_async() can be called multiple times on the same pool. The pool will service all submitted tasks in order.

What happens if a worker dies unexpectedly?

The pool does not automatically respawn dead workers. Check exit codes of workers periodically (advanced). For reliability, wrap worker code in try-except and catch exceptions on result retrieval.

Is Pool thread-safe?

Yes, you can call pool.map() and pool.apply_async() from multiple threads in the main process. The pool queue is thread-safe. However, this is uncommon; usually, one thread submits all work.

How do I limit the total number of queued tasks?

Use pool.apply_async() and manually throttle submission. map() queues all tasks at once, which can exhaust memory for very large iterables. For huge workloads, use imap() and process results incrementally.

Further Reading