Skip to main content

Inter-Process Communication: Sharing Data IPC

Inter-process communication (IPC) is how separate processes exchange data safely. Python offers three main IPC mechanisms: Queue (thread-safe, task-based), Pipe (bidirectional, low-level), and Value/Array (shared memory, fast but error-prone). Choosing the right mechanism depends on your data size, concurrency pattern, and latency requirements. This article covers each mechanism with working examples, performance trade-offs, and a decision tree for production use.

Why IPC Matters

Processes have isolated memory spaces—objects created in one process are invisible to others. To share data (results, configuration, work items), you must serialize it, send it through an OS-level channel, and deserialize in the receiving process. This adds latency and requires careful synchronization to prevent race conditions.

IPC mechanisms in Python:

  1. Queue – task-oriented, thread-safe, automatic locking.
  2. Pipe – low-latency bidirectional communication, less overhead than Queue.
  3. Value/Array – shared memory directly accessible across processes, fastest but no locks.
  4. Manager – high-level proxy objects, works across network sockets.

multiprocessing.Queue: Safe Task Passing

Queue is FIFO (first-in, first-out), thread-safe, and ideal for passing tasks and results between producer (main process) and consumers (workers).

import multiprocessing
import time

def worker(task_queue, result_queue):
"""Worker that processes tasks from queue."""
while True:
# Block until a task arrives
task = task_queue.get()

# Sentinel: poison pill to signal worker to exit
if task is None:
break

# Process task
task_id, value = task
result = value ** 2

# Send result back
result_queue.put((task_id, result))

if __name__ == "__main__":
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()

# Start 4 workers
workers = [
multiprocessing.Process(target=worker, args=(task_queue, result_queue))
for _ in range(4)
]
for w in workers:
w.start()

# Send tasks
for task_id, value in enumerate([1, 2, 3, 4, 5]):
task_queue.put((task_id, value))

# Collect results
results = []
for _ in range(5):
results.append(result_queue.get(timeout=5))

# Signal workers to stop
for _ in range(4):
task_queue.put(None)

for w in workers:
w.join()

print(results) # [(0, 1), (1, 4), (2, 9), (3, 16), (4, 25)]

Key methods:

  • put(obj, block=True, timeout=None) – add object to queue (blocks if full).
  • get(block=True, timeout=None) – remove and return object (blocks if empty).
  • qsize() – approximate size (unreliable; use only for monitoring).
  • empty() / full() – state checks (race-condition-prone).

multiprocessing.Pipe: Bidirectional Channels

Pipe() creates two endpoints (conn1, conn2) for bidirectional communication. It's lower-level than Queue but faster for point-to-point messaging.

import multiprocessing

def parent_process(conn):
"""Parent sends messages to child."""
for i in range(5):
message = f"Message {i}"
conn.send(message)
print(f"Parent sent: {message}")

# Receive response
response = conn.recv()
print(f"Parent received: {response}")

conn.send(None) # Signal done
conn.close()

def child_process(conn):
"""Child receives and responds."""
while True:
message = conn.recv()
if message is None:
break

response = f"Echo: {message}"
conn.send(response)

conn.close()

if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()

child = multiprocessing.Process(target=child_process, args=(child_conn,))
child.start()

parent_process(parent_conn)

child.join()

Key methods:

  • send(obj) – send object; raises EOFError if other end closed.
  • recv() – block until object arrives.
  • send_bytes(buf) / recv_bytes() – send raw bytes, no pickle overhead.
  • close() – close endpoint.
  • poll(timeout=0) – non-blocking check if data is available.

Queue vs. Pipe: Side-by-Side Comparison

FeatureQueuePipe
TopologyMany-to-many (any process can put/get)One-to-one (two endpoints)
Latency~1–10 ms per operation~0.1–1 ms per operation
Use caseTask distribution; producer-consumerPoint-to-point; request-response
Thread-safetyYes (internal locks)No (single-threaded per endpoint)
Blocking operationsYes: put(), get()Yes: send(), recv()
Non-blocking checkempty() (unreliable)poll(timeout=0) (reliable)

Choose Queue for: task pools, many producers/consumers, priority queues.

Choose Pipe for: parent-child communication, low-latency messaging, request-response.

Shared Memory: Ctypes Arrays and Values

For performance-critical scenarios where you need to avoid serialization, use shared memory. Value and Array from ctypes let processes read/write the same memory region directly.

import multiprocessing
import ctypes

def worker(shared_array, shared_counter):
"""Modify shared data."""
# Access array
for i in range(len(shared_array)):
shared_array[i] *= 2

# Increment counter
shared_counter.value += 10

if __name__ == "__main__":
# Shared array of integers
shared_array = multiprocessing.Array(ctypes.c_int, [1, 2, 3, 4, 5])

# Shared integer counter
shared_counter = multiprocessing.Value(ctypes.c_int, 0)

# Modify from main
print(f"Before: array={shared_array[:]}, counter={shared_counter.value}")

# Start worker
p = multiprocessing.Process(target=worker, args=(shared_array, shared_counter))
p.start()
p.join()

print(f"After: array={shared_array[:]}, counter={shared_counter.value}")
# Output: array=[2, 4, 6, 8, 10], counter=10

Supported ctypes:

  • c_int, c_float, c_double, c_bool – scalar values.
  • Array(typecode, size) – arrays of fixed type.

Critical limitation: No automatic locking. You must use multiprocessing.Lock() to prevent race conditions:

import multiprocessing

def unsafe_increment(counter):
"""UNSAFE: no lock—race condition."""
for _ in range(1_000_000):
counter.value += 1

def safe_increment(counter, lock):
"""SAFE: lock protects shared value."""
for _ in range(1_000_000):
with lock:
counter.value += 1

if __name__ == "__main__":
counter = multiprocessing.Value(ctypes.c_int, 0)
lock = multiprocessing.Lock()

p1 = multiprocessing.Process(target=safe_increment, args=(counter, lock))
p2 = multiprocessing.Process(target=safe_increment, args=(counter, lock))

p1.start()
p2.start()
p1.join()
p2.join()

print(f"Final counter: {counter.value}") # Should be 2,000,000

Serialization Overhead: Pickle Performance

All IPC mechanisms serialize objects via pickle. Large objects are slow to pickle:

import multiprocessing
import pickle
import time

# Define a large object
large_data = {"array": list(range(1_000_000))}

# Measure pickle time
start = time.perf_counter()
serialized = pickle.dumps(large_data)
elapsed = time.perf_counter() - start

print(f"Pickle 1M integers: {elapsed*1000:.1f} ms")
print(f"Serialized size: {len(serialized) / 1024 / 1024:.1f} MB")

On a modern CPU, expect ~10–50 ms to pickle 1 MB of data. For high-throughput IPC, prefer shared memory or send_bytes() on Pipes.

Real-World Example: Producer-Consumer Pipeline

Here's a multi-stage pipeline: producer creates items, stage 1 processes, stage 2 aggregates, consumer outputs.

import multiprocessing
import time

def producer(queue, count):
"""Generate items."""
for i in range(count):
queue.put(i)
time.sleep(0.01)
queue.put(None) # Signal end

def stage1(in_queue, out_queue):
"""Transform: square each number."""
while True:
item = in_queue.get()
if item is None:
out_queue.put(None)
break
out_queue.put(item ** 2)

def stage2(in_queue, out_queue):
"""Aggregate: sum every 3 items."""
buffer = []
while True:
item = in_queue.get()
if item is None:
out_queue.put(None)
break
buffer.append(item)
if len(buffer) == 3:
out_queue.put(sum(buffer))
buffer = []

def consumer(queue):
"""Consume results."""
while True:
result = queue.get()
if result is None:
break
print(f"Final result: {result}")

if __name__ == "__main__":
queue_1 = multiprocessing.Queue()
queue_2 = multiprocessing.Queue()
queue_3 = multiprocessing.Queue()

p_producer = multiprocessing.Process(target=producer, args=(queue_1, 10))
p_stage1 = multiprocessing.Process(target=stage1, args=(queue_1, queue_2))
p_stage2 = multiprocessing.Process(target=stage2, args=(queue_2, queue_3))
p_consumer = multiprocessing.Process(target=consumer, args=(queue_3,))

for p in [p_producer, p_stage1, p_stage2, p_consumer]:
p.start()

for p in [p_producer, p_stage1, p_stage2, p_consumer]:
p.join()

Key Takeaways

  • Queue: Thread-safe, FIFO, ideal for task distribution and producer-consumer patterns.
  • Pipe: Bidirectional, low-latency, best for parent-child or request-response communication.
  • Shared Memory (Value/Array): Direct memory access, fastest, but requires manual locking to prevent race conditions.
  • Serialization via pickle is the bottleneck; for large objects, use shared memory or send_bytes().
  • Use "poison pill" (None) pattern to signal worker shutdown.
  • Always use locks when modifying shared values; unsynchronized access causes race conditions.

Frequently Asked Questions

Can I pass a Queue to a child process?

Yes, Queue is designed for this. Pass it as an argument; the child receives a connection to the same queue created in the parent.

What happens if I put an unpicklable object in a Queue?

You get a TypeError or PicklingError. See Article 8 for pickling pitfalls and how to fix them.

Is Pipe thread-safe?

No. One thread per endpoint. If multiple threads need to send on the same endpoint, wrap it in a Lock.

Can I use Pipe across network sockets?

No, not directly. For network IPC, use multiprocessing.managers.BaseManager (advanced) or third-party RPC libraries.

How do I prevent deadlock in a producer-consumer system?

Use non-blocking operations with timeouts: queue.get(timeout=5). Avoid circular queue dependencies (A waits for B, B waits for A).

Further Reading