04-多进程编程
Python 3.11+
概念铺垫
进程与多进程
实际场景: 需要处理 1,000,000 张图片,每张图片处理需要 0.1 秒。如果单线程处理,需要 100,000 秒(约 28 小时)。如何利用多核 CPU 加速处理?
问题:如何创建和管理进程?
进程 vs 线程
┌─────────────────────────────────────────────────────────────┐
│ 进程 vs 线程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 进程: │
│ • 独立的内存空间 │
│ • 独立的 Python 解释器 │
│ • 不受 GIL 限制 │
│ • 创建开销大 │
│ • 适合 CPU 密集型任务 │
│ │
│ 线程: │
│ • 共享内存空间 │
│ • 同一 Python 解释器 │
│ • 受 GIL 限制 │
│ • 创建开销小 │
│ • 适合 I/O 密集型任务 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 进程 A 进程 B │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Python 解释器│ │ Python 解释器│ │ │
│ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ │
│ │ │ │ 线程 1 │ │ │ │ 线程 1 │ │ │ │
│ │ │ │ 线程 2 │ │ │ │ 线程 2 │ │ │ │
│ │ │ └─────────┘ │ │ └─────────┘ │ │ │
│ │ │ GIL │ │ │ GIL │ │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ 独立内存空间 独立内存空间 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘分层学习
L1 理解层:会用
创建进程
python
from __future__ import annotations
import multiprocessing
import time
def worker(name: str, delay: float) -> None:
"""工作函数"""
for i in range(3):
time.sleep(delay)
print(f"{name} 执行第 {i+1} 次, PID: {multiprocessing.current_process().pid}")
p1: multiprocessing.Process = multiprocessing.Process(target=worker, args=("进程 1", 1))
p2: multiprocessing.Process = multiprocessing.Process(target=worker, args=("进程 2", 1.5))
p1.start()
p2.start()
p1.join()
p2.join()
print("所有进程完成")守护进程
python
from __future__ import annotations
import multiprocessing
import time
def background_task() -> None:
while True:
print("后台任务运行中...")
time.sleep(1)
if __name__ == '__main__':
p: multiprocessing.Process = multiprocessing.Process(target=background_task)
p.daemon = True
p.start()
time.sleep(3)
print("主进程结束")
# 守护进程随之终止进程间通信 — Queue 队列
python
from __future__ import annotations
import multiprocessing
def producer(queue: multiprocessing.Queue) -> None:
for i in range(5):
queue.put(f"消息-{i}")
print(f"生产: 消息-{i}")
def consumer(queue: multiprocessing.Queue) -> None:
while True:
item: str = queue.get()
if item is None: # 结束信号
break
print(f"消费: {item}")
if __name__ == '__main__':
queue: multiprocessing.Queue = multiprocessing.Queue()
p1: multiprocessing.Process = multiprocessing.Process(target=producer, args=(queue,))
p2: multiprocessing.Process = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
queue.put(None) # 发送结束信号
p2.join()进程间通信 — Pipe 管道
python
from __future__ import annotations
import multiprocessing
def sender(conn: multiprocessing.connection.Connection) -> None:
conn.send("Hello from sender!")
conn.close()
def receiver(conn: multiprocessing.connection.Connection) -> None:
msg: str = conn.recv()
print(f"收到: {msg}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p1: multiprocessing.Process = multiprocessing.Process(target=sender, args=(parent_conn,))
p2: multiprocessing.Process = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()共享内存
python
from __future__ import annotations
import multiprocessing
def worker(value: multiprocessing.Value, array: multiprocessing.Array) -> None:
value.value = 100
for i in range(len(array)):
array[i] = i * 2
if __name__ == '__main__':
value: multiprocessing.Value = multiprocessing.Value('i', 0) # 整数
array: multiprocessing.Array = multiprocessing.Array('i', [0] * 5) # 整数数组
p: multiprocessing.Process = multiprocessing.Process(target=worker, args=(value, array))
p.start()
p.join()
print(f"Value: {value.value}") # 100
print(f"Array: {list(array)}") # [0, 2, 4, 6, 8]Manager
python
from __future__ import annotations
import multiprocessing
def worker(d: dict[str, str], l: list[int]) -> None:
d['key'] = 'value'
l.append(1)
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
shared_dict: dict[str, str] = manager.dict()
shared_list: list[int] = manager.list()
p: multiprocessing.Process = multiprocessing.Process(target=worker, args=(shared_dict, shared_list))
p.start()
p.join()
print(f"Dict: {shared_dict}") # {'key': 'value'}
print(f"List: {shared_list}") # [1]进程池 — ProcessPoolExecutor
python
from __future__ import annotations
from concurrent.futures import ProcessPoolExecutor
import time
def task(n: int) -> int:
time.sleep(1)
return n * n
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(task, 5)
print(future.result()) # 25
results: list[int] = list(executor.map(task, range(1, 6)))
print(results) # [1, 4, 9, 16, 25]进程池 — multiprocessing.Pool
python
from __future__ import annotations
import multiprocessing
import time
def task(n: int) -> int:
time.sleep(1)
return n * n
def add(a: int, b: int) -> int:
return a + b
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
result = pool.apply_async(task, (5,))
print(result.get()) # 25
results: list[int] = pool.map(task, range(1, 6))
print(results) # [1, 4, 9, 16, 25]
result = pool.map_async(task, range(1, 6))
print(result.get()) # [1, 4, 9, 16, 25]
results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
print(results) # [3, 7, 11]CPU 密集型任务示例
python
from __future__ import annotations
import multiprocessing
import time
def cpu_bound_task(n: int) -> int:
"""CPU 密集型任务:计算 n 以内的素数个数"""
count: int = 0
for i in range(2, n):
is_prime: bool = True
for j in range(2, int(i ** 0.5) + 1):
if i % j == 0:
is_prime = False
break
if is_prime:
count += 1
return count
if __name__ == '__main__':
numbers: list[int] = [100000, 100000, 100000, 100000]
start: float = time.time()
results: list[int] = [cpu_bound_task(n) for n in numbers]
print(f"单进程: {time.time() - start:.2f}s")
start = time.time()
with multiprocessing.Pool() as pool:
results = pool.map(cpu_bound_task, numbers)
print(f"多进程: {time.time() - start:.2f}s")
# 多进程可以利用多核,速度提升明显性能对比实验
实验:对比单线程、多线程、多进程处理 CPU 密集型任务
python
import time
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_task(n: int) -> int:
"""CPU 密集型:计算n以内素数个数"""
count = 0
for i in range(2, n):
for j in range(2, int(i**0.5) + 1):
if i % j == 0:
break
else:
count += 1
return count
NUMBERS = [50000] * 8 # 8个相同任务
if __name__ == '__main__':
# 1. 单线程(基准)
start = time.time()
results = [cpu_task(n) for n in NUMBERS]
print(f"单线程: {time.time() - start:.2f}s")
# 2. 多线程(受GIL限制,无加速)
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(executor.map(cpu_task, NUMBERS))
print(f"多线程: {time.time() - start:.2f}s") # 可能更慢!
# 3. 多进程(真正并行)
start = time.time()
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_task, NUMBERS))
print(f"多进程: {time.time() - start:.2f}s") # 显著加速
# 结果示例(4核CPU):
# 单线程: 12.5s
# 多线程: 12.8s (甚至更慢,因为线程切换开销)
# 多进程: 3.2s (约4倍加速)实战:并行处理大文件
需求:处理一个大日志文件,分割给多个进程并行分析。
python
import multiprocessing
import os
def process_chunk(file_path: str, start: int, end: int) -> dict:
"""处理文件的指定区间"""
count_errors = 0
with open(file_path, 'r') as f:
f.seek(start)
# 确保从行首开始(处理边界)
if start > 0:
f.readline() # 跳过可能的不完整行
while f.tell() < end:
line = f.readline()
if "ERROR" in line:
count_errors += 1
return {"errors": count_errors, "range": (start, end)}
def parallel_file_process(file_path: str, num_processes: int = 4) -> int:
"""多进程处理大文件"""
file_size = os.path.getsize(file_path)
chunk_size = file_size // num_processes
ranges = []
for i in range(num_processes):
start = i * chunk_size
end = (i + 1) * chunk_size if i < num_processes - 1 else file_size
ranges.append((file_path, start, end))
with multiprocessing.Pool(num_processes) as pool:
results = pool.starmap(process_chunk, ranges)
total_errors = sum(r["errors"] for r in results)
return total_errors
if __name__ == '__main__':
pass关键代码说明:
| 代码 | 含义 | 为什么这样写 |
|---|---|---|
chunk_size = file_size // num_processes | 按进程数均分文件字节数 | 字节级分割无需解析内容,各进程独立 seek 到各自起点,避免加载整个文件到内存 |
if start > 0: f.readline() | 跳过起点处可能的不完整行 | 字节切割可能从行中间开始,readline() 跳到下一个完整行起点,保证逐行统计不出错 |
while f.tell() < end: line = f.readline() | 用 tell() 控制读取范围 | tell() 返回当前文件指针位置,一旦超过分配的字节范围就停止,防止多个进程重复统计同一行 |
pool.starmap(process_chunk, ranges) | 将参数列表以 *args 形式分发给进程池 | starmap 接受 [(arg1, arg2, ...), ...] 格式,比 map 更适合多参数函数 |
L2 实践层:用好
推荐做法表
| 推荐做法 | 说明 | 为什么 |
|---|---|---|
| 进程数 = CPU 核数 | CPU 密集型场景 | 超过核数会产生上下文切换开销,反而降低吞吐 |
必须使用 if __name__ == '__main__' | 保护入口点 | spawn 模式下模块会被重新导入,缺少保护会导致递归创建进程 |
| 小数据直接传参 | 简单易用,无需序列化开销 | 进程间通信开销大,直接传参最快 |
| 大数据使用共享内存 | Value/Array 或 mmap | 避免 pickling 大对象,延迟 ~10ns vs ~100μs |
| 使用进程池复用进程 | ProcessPoolExecutor 或 Pool | 进程启动开销大(~10-50ms),池化可避免频繁启动 |
| 避免嵌套创建子进程 | 进程树扁平化 | 子进程再创建子进程容易导致管理混乱和资源泄漏 |
反模式对比
| ❌ 反模式 | ✅ 正确做法 | 说明 |
|---|---|---|
忘记 __name__ == '__main__' | 始终加保护 | Windows/macOS spawn 模式会崩溃 |
| 用多进程做 I/O 密集型任务 | I/O 型用线程或 asyncio | 进程启动开销抵消 I/O 并发收益 |
| 通过 Queue 传递大对象 | 使用共享内存或文件 | ~100μs + pickle 开销,大数据极慢 |
| 进程数远超 CPU 核数 | 进程数 ≈ CPU 核数 | 上下文切换开销超过并行收益 |
| 无限制创建进程 | 使用 Pool 或 ProcessPoolExecutor | 每个进程占用独立内存(~几十MB),超量导致 OOM |
适用场景表
| 场景 | 推荐 | 进程数建议 | 原因 |
|---|---|---|---|
| CPU 数值计算(素数、加密) | ✅ ProcessPoolExecutor | CPU 核数 | GIL 不限制,真正并行 |
| 图像/视频批处理 | ✅ multiprocessing.Pool | CPU 核数 | 每帧独立处理,天然可分割 |
| 大文件并行分析 | ✅ Pool.starmap | 磁盘 I/O 上限 | 文件分段处理,注意边界对齐 |
| 并行机器学习推理 | ✅ ProcessPoolExecutor | GPU 数或 CPU 核数 | 模型复制到每个进程 |
| 网络请求并发 | ❌ | — | I/O 密集型,用线程或 asyncio |
| 少量快速任务 | ❌ | — | 进程启动开销 > 计算时间 |
L3 专家层:深入
进程启动方式原理图:fork vs spawn vs forkserver
┌─────────────────────────────────────────────────────────────┐
│ 进程启动方式对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ fork(Linux/macOS 默认): │
│ ┌──────────────────────────────────────────────────┐ │
│ │ • 复制父进程的内存空间(写时复制 COW) │ │
│ │ • 启动速度快 │ │
│ │ • 继承文件描述符、全局变量 │ │
│ │ • 风险:可能复制不需要的状态 │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ spawn(Windows 默认,macOS Python 3.8+): │
│ ┌──────────────────────────────────────────────────┐ │
│ │ • 启动全新 Python 解释器 │ │
│ │ • 通过 pickle 序列化传递数据 │ │
│ │ • 更安全,不会继承不需要的状态 │ │
│ │ • 启动速度慢 │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ forkserver(可选): │
│ ┌──────────────────────────────────────────────────┐ │
│ │ • 先启动一个"服务器进程" │ │
│ │ • 后续子进程通过 fork 服务器进程创建 │ │
│ │ • 平衡速度和安全性 │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘python
# spawn_vs_fork_demo.py
"""
演示进程启动方式。
Python 3.8+ macOS 默认使用 spawn,
所以需要 if __name__ == '__main__' 保护。
在 Linux 上可以切换为 spawn:
multiprocessing.set_start_method('spawn')
"""
import multiprocessing
import os
def worker() -> None:
print(f"子进程 PID: {os.getpid()}")
print(f"启动方式: {multiprocessing.get_start_method()}")
if __name__ == "__main__":
print(f"主进程 PID: {os.getpid()}")
p = multiprocessing.Process(target=worker)
p.start()
p.join()IPC 序列化性能
进程间通信(IPC)性能对比:
| 方式 | 延迟 | 吞吐量 | 适用场景 |
|---|---|---|---|
Queue | ~100μs | 中等 | 通用 IPC,生产者-消费者 |
Pipe | ~50μs | 高 | 点对点通信 |
Value/Array | ~10ns | 最高 | 共享数值/数组 |
Manager | ~500μs | 低 | 共享 dict/list 等复杂对象 |
| 文件/内存映射 | ~1μs | 极高 | 大数据量交换 |
序列化成本:
python
# ipc_serialization_demo.py
"""
进程间通信需要序列化数据(pickle),这有性能开销。
大对象通过 Queue 传递时:
1. pickle.dumps() 序列化
2. 通过 OS 管道传输
3. pickle.loads() 反序列化
避免在进程间传递大对象!
"""
import multiprocessing
import time
def worker(queue: multiprocessing.Queue) -> None:
data = queue.get() # 自动反序列化
print(f"收到 {len(data)} 字节数据")
if __name__ == "__main__":
queue = multiprocessing.Queue()
large_data = b"x" * 10_000_000 # 10MB
start = time.time()
queue.put(large_data) # 自动序列化
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(f"传输 10MB 耗时: {time.time() - start:.3f}s")性能考量表
| 操作 | 时间 | 说明 |
|---|---|---|
Process.start() | ~10-50ms | 包括解释器启动 |
spawn 方式启动 | ~50-200ms | 比 fork 慢 |
fork 方式启动 | ~1-10ms | 写时复制 |
Queue.put() | ~100μs + 序列化 | 数据越大越慢 |
Value 读写 | ~10ns | 直接内存访问 |
| 进程上下文切换 | ~1-10μs | 比线程慢 10 倍 |
设计动机
| Python 设计选择 | 原因 |
|---|---|
| Windows 必须用 spawn | Windows 没有 fork 系统调用 |
| macOS 3.8+ 改用 spawn | 防止 fork 后多线程崩溃 |
if __name__ == '__main__' | spawn 模式下模块会被重新导入 |
提供 Manager | 简化 IPC,牺牲性能换易用性 |
知识关联图
多进程知识关联:
┌───────────────────┐
│ multiprocessing │
│ .Process │
└────────┬──────────┘
│
┌────┴────┐
▼ ▼
┌────────┐ ┌───────────┐
│ Pool │ │ Queue │
│ 池化 │ │ IPC 队列 │
└────┬───┘ └─────┬─────┘
│ │
▼ ▼
┌───────────────────┐
│ ProcessPoolExecutor│ ← 与 ThreadPoolExecutor 统一接口
└───────────────────┘
│
▼
┌───────────────────┐
│ 共享内存 │
│ Value/Array │
│ mmap │
└───────────────────┘本章小结
┌─────────────────────────────────────────────────────────────┐
│ 多进程编程 知识要点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 进程特点: │
│ ✓ 独立内存空间,不受 GIL 限制 │
│ ✓ 适合 CPU 密集型任务 │
│ ✓ 创建开销比线程大 │
│ │
│ 创建进程: │
│ ✓ multiprocessing.Process(target=func, args=...) │
│ ✓ start() 启动,join() 等待 │
│ │
│ 进程间通信: │
│ ✓ Queue:队列通信 │
│ ✓ Pipe:管道通信 │
│ ✓ Value/Array:共享内存 │
│ ✓ Manager:共享对象 │
│ │
│ 进程池: │
│ ✓ ProcessPoolExecutor │
│ ✓ multiprocessing.Pool │
│ │
│ 注意事项: │
│ ✓ 必须在 if __name__ == '__main__' 中创建进程 │
│ ✓ Windows 上尤其重要 │
│ │
└─────────────────────────────────────────────────────────────┘