Introduction
The multiprocessing module provides advanced process management including Pool, Manager, Queue, and Pipe for building robust parallel applications.
Pool
import multiprocessing
def process_data(n):
return n ** 2
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(process_data, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Async version
async_result = pool.map_async(process_data, range(10))
print(async_result.get())
# Apply function
results = pool.apply(process_data, (5,))
Manager
import multiprocessing
def worker(shared_dict, shared_list, lock):
with lock:
shared_dict["count"] = shared_dict.get("count", 0) + 1
shared_list.append(shared_dict["count"])
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
lock = manager.Lock()
with multiprocessing.Pool(4) as pool:
pool.starmap(worker, [(shared_dict, shared_list, lock)] * 10)
print(dict(shared_dict))
print(list(shared_list))
Queue
import multiprocessing
def producer(queue):
for i in range(10):
queue.put(i)
queue.put(None)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consumed: {item}")
if __name__ == "__main__":
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
Pipe
import multiprocessing
def worker(conn):
conn.send("Hello from worker")
response = conn.recv()
print(f"Worker received: {response}")
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=worker, args=(child_conn,))
p.start()
print(parent_conn.recv())
parent_conn.send("Thanks!")
p.join()
Process
import multiprocessing
def worker(name, queue):
result = f"Processed by {name}"
queue.put(result)
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker, args=(f"Worker-{i}", queue))
processes.append(p)
p.start()
for p in processes:
p.join()
while not queue.empty():
print(queue.get())
Practice Problems
- Use multiprocessing.Pool for parallel computation
- Share state with Manager
- Implement producer-consumer with Queue
- Use Pipe for process communication
- Create multiple Process instances