Skip to content

04-多进程编程

Python 3.11+

概念铺垫

进程与多进程

实际场景: 需要处理 1,000,000 张图片,每张图片处理需要 0.1 秒。如果单线程处理,需要 100,000 秒(约 28 小时)。如何利用多核 CPU 加速处理?

问题:如何创建和管理进程?

进程 vs 线程

┌─────────────────────────────────────────────────────────────┐
│              进程 vs 线程                                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   进程:                                                     │
│   • 独立的内存空间                                          │
│   • 独立的 Python 解释器                                    │
│   • 不受 GIL 限制                                           │
│   • 创建开销大                                              │
│   • 适合 CPU 密集型任务                                     │
│                                                             │
│   线程:                                                     │
│   • 共享内存空间                                            │
│   • 同一 Python 解释器                                      │
│   • 受 GIL 限制                                             │
│   • 创建开销小                                              │
│   • 适合 I/O 密集型任务                                     │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │  进程 A                    进程 B                    │   │
│   │  ┌─────────────┐          ┌─────────────┐          │   │
│   │  │ Python 解释器│          │ Python 解释器│          │   │
│   │  │ ┌─────────┐ │          │ ┌─────────┐ │          │   │
│   │  │ │ 线程 1  │ │          │ │ 线程 1  │ │          │   │
│   │  │ │ 线程 2  │ │          │ │ 线程 2  │ │          │   │
│   │  │ └─────────┘ │          │ └─────────┘ │          │   │
│   │  │   GIL      │ │          │   GIL      │ │          │   │
│   │  └─────────────┘          └─────────────┘          │   │
│   │  独立内存空间              独立内存空间              │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

分层学习

L1 理解层:会用

创建进程

python
from __future__ import annotations

import multiprocessing
import time

def worker(name: str, delay: float) -> None:
    """工作函数"""
    for i in range(3):
        time.sleep(delay)
        print(f"{name} 执行第 {i+1} 次, PID: {multiprocessing.current_process().pid}")

p1: multiprocessing.Process = multiprocessing.Process(target=worker, args=("进程 1", 1))
p2: multiprocessing.Process = multiprocessing.Process(target=worker, args=("进程 2", 1.5))

p1.start()
p2.start()

p1.join()
p2.join()

print("所有进程完成")

守护进程

python
from __future__ import annotations

import multiprocessing
import time

def background_task() -> None:
    while True:
        print("后台任务运行中...")
        time.sleep(1)

if __name__ == '__main__':
    p: multiprocessing.Process = multiprocessing.Process(target=background_task)
    p.daemon = True
    p.start()
    
    time.sleep(3)
    print("主进程结束")
    # 守护进程随之终止

进程间通信 — Queue 队列

python
from __future__ import annotations

import multiprocessing

def producer(queue: multiprocessing.Queue) -> None:
    for i in range(5):
        queue.put(f"消息-{i}")
        print(f"生产: 消息-{i}")

def consumer(queue: multiprocessing.Queue) -> None:
    while True:
        item: str = queue.get()
        if item is None:  # 结束信号
            break
        print(f"消费: {item}")

if __name__ == '__main__':
    queue: multiprocessing.Queue = multiprocessing.Queue()
    
    p1: multiprocessing.Process = multiprocessing.Process(target=producer, args=(queue,))
    p2: multiprocessing.Process = multiprocessing.Process(target=consumer, args=(queue,))
    
    p1.start()
    p2.start()
    
    p1.join()
    queue.put(None)  # 发送结束信号
    p2.join()

进程间通信 — Pipe 管道

python
from __future__ import annotations

import multiprocessing

def sender(conn: multiprocessing.connection.Connection) -> None:
    conn.send("Hello from sender!")
    conn.close()

def receiver(conn: multiprocessing.connection.Connection) -> None:
    msg: str = conn.recv()
    print(f"收到: {msg}")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    
    p1: multiprocessing.Process = multiprocessing.Process(target=sender, args=(parent_conn,))
    p2: multiprocessing.Process = multiprocessing.Process(target=receiver, args=(child_conn,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

共享内存

python
from __future__ import annotations

import multiprocessing

def worker(value: multiprocessing.Value, array: multiprocessing.Array) -> None:
    value.value = 100
    for i in range(len(array)):
        array[i] = i * 2

if __name__ == '__main__':
    value: multiprocessing.Value = multiprocessing.Value('i', 0)  # 整数
    
    array: multiprocessing.Array = multiprocessing.Array('i', [0] * 5)  # 整数数组
    
    p: multiprocessing.Process = multiprocessing.Process(target=worker, args=(value, array))
    p.start()
    p.join()
    
    print(f"Value: {value.value}")    # 100
    print(f"Array: {list(array)}")    # [0, 2, 4, 6, 8]

Manager

python
from __future__ import annotations

import multiprocessing

def worker(d: dict[str, str], l: list[int]) -> None:
    d['key'] = 'value'
    l.append(1)

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        shared_dict: dict[str, str] = manager.dict()
        shared_list: list[int] = manager.list()
        
        p: multiprocessing.Process = multiprocessing.Process(target=worker, args=(shared_dict, shared_list))
        p.start()
        p.join()
        
        print(f"Dict: {shared_dict}")  # {'key': 'value'}
        print(f"List: {shared_list}")  # [1]

进程池 — ProcessPoolExecutor

python
from __future__ import annotations

from concurrent.futures import ProcessPoolExecutor
import time

def task(n: int) -> int:
    time.sleep(1)
    return n * n

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        future = executor.submit(task, 5)
        print(future.result())  # 25
        
        results: list[int] = list(executor.map(task, range(1, 6)))
        print(results)  # [1, 4, 9, 16, 25]

进程池 — multiprocessing.Pool

python
from __future__ import annotations

import multiprocessing
import time

def task(n: int) -> int:
    time.sleep(1)
    return n * n

def add(a: int, b: int) -> int:
    return a + b

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        result = pool.apply_async(task, (5,))
        print(result.get())  # 25
        
        results: list[int] = pool.map(task, range(1, 6))
        print(results)  # [1, 4, 9, 16, 25]
        
        result = pool.map_async(task, range(1, 6))
        print(result.get())  # [1, 4, 9, 16, 25]
        
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
        print(results)  # [3, 7, 11]

CPU 密集型任务示例

python
from __future__ import annotations

import multiprocessing
import time

def cpu_bound_task(n: int) -> int:
    """CPU 密集型任务:计算 n 以内的素数个数"""
    count: int = 0
    for i in range(2, n):
        is_prime: bool = True
        for j in range(2, int(i ** 0.5) + 1):
            if i % j == 0:
                is_prime = False
                break
        if is_prime:
            count += 1
    return count

if __name__ == '__main__':
    numbers: list[int] = [100000, 100000, 100000, 100000]
    
    start: float = time.time()
    results: list[int] = [cpu_bound_task(n) for n in numbers]
    print(f"单进程: {time.time() - start:.2f}s")
    
    start = time.time()
    with multiprocessing.Pool() as pool:
        results = pool.map(cpu_bound_task, numbers)
    print(f"多进程: {time.time() - start:.2f}s")
    
    # 多进程可以利用多核,速度提升明显

性能对比实验

实验:对比单线程、多线程、多进程处理 CPU 密集型任务

python
import time
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_task(n: int) -> int:
    """CPU 密集型:计算n以内素数个数"""
    count = 0
    for i in range(2, n):
        for j in range(2, int(i**0.5) + 1):
            if i % j == 0:
                break
        else:
            count += 1
    return count

NUMBERS = [50000] * 8  # 8个相同任务

if __name__ == '__main__':
    # 1. 单线程(基准)
    start = time.time()
    results = [cpu_task(n) for n in NUMBERS]
    print(f"单线程: {time.time() - start:.2f}s")
    
    # 2. 多线程(受GIL限制,无加速)
    start = time.time()
    with ThreadPoolExecutor(max_workers=8) as executor:
        results = list(executor.map(cpu_task, NUMBERS))
    print(f"多线程: {time.time() - start:.2f}s")  # 可能更慢!
    
    # 3. 多进程(真正并行)
    start = time.time()
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(cpu_task, NUMBERS))
    print(f"多进程: {time.time() - start:.2f}s")  # 显著加速
    
    # 结果示例(4核CPU):
    # 单线程: 12.5s
    # 多线程: 12.8s (甚至更慢,因为线程切换开销)
    # 多进程: 3.2s  (约4倍加速)

实战:并行处理大文件

需求:处理一个大日志文件,分割给多个进程并行分析。

python
import multiprocessing
import os

def process_chunk(file_path: str, start: int, end: int) -> dict:
    """处理文件的指定区间"""
    count_errors = 0
    with open(file_path, 'r') as f:
        f.seek(start)
        # 确保从行首开始(处理边界)
        if start > 0:
            f.readline()  # 跳过可能的不完整行
        
        while f.tell() < end:
            line = f.readline()
            if "ERROR" in line:
                count_errors += 1
    
    return {"errors": count_errors, "range": (start, end)}

def parallel_file_process(file_path: str, num_processes: int = 4) -> int:
    """多进程处理大文件"""
    file_size = os.path.getsize(file_path)
    chunk_size = file_size // num_processes
    
    ranges = []
    for i in range(num_processes):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_processes - 1 else file_size
        ranges.append((file_path, start, end))
    
    with multiprocessing.Pool(num_processes) as pool:
        results = pool.starmap(process_chunk, ranges)
    
    total_errors = sum(r["errors"] for r in results)
    return total_errors

if __name__ == '__main__':
    pass

关键代码说明:

代码含义为什么这样写
chunk_size = file_size // num_processes按进程数均分文件字节数字节级分割无需解析内容,各进程独立 seek 到各自起点,避免加载整个文件到内存
if start > 0: f.readline()跳过起点处可能的不完整行字节切割可能从行中间开始,readline() 跳到下一个完整行起点,保证逐行统计不出错
while f.tell() < end: line = f.readline()tell() 控制读取范围tell() 返回当前文件指针位置,一旦超过分配的字节范围就停止,防止多个进程重复统计同一行
pool.starmap(process_chunk, ranges)将参数列表以 *args 形式分发给进程池starmap 接受 [(arg1, arg2, ...), ...] 格式,比 map 更适合多参数函数

L2 实践层:用好

推荐做法表

推荐做法说明为什么
进程数 = CPU 核数CPU 密集型场景超过核数会产生上下文切换开销,反而降低吞吐
必须使用 if __name__ == '__main__'保护入口点spawn 模式下模块会被重新导入,缺少保护会导致递归创建进程
小数据直接传参简单易用,无需序列化开销进程间通信开销大,直接传参最快
大数据使用共享内存Value/Arraymmap避免 pickling 大对象,延迟 ~10ns vs ~100μs
使用进程池复用进程ProcessPoolExecutorPool进程启动开销大(~10-50ms),池化可避免频繁启动
避免嵌套创建子进程进程树扁平化子进程再创建子进程容易导致管理混乱和资源泄漏

反模式对比

❌ 反模式✅ 正确做法说明
忘记 __name__ == '__main__'始终加保护Windows/macOS spawn 模式会崩溃
用多进程做 I/O 密集型任务I/O 型用线程或 asyncio进程启动开销抵消 I/O 并发收益
通过 Queue 传递大对象使用共享内存或文件~100μs + pickle 开销,大数据极慢
进程数远超 CPU 核数进程数 ≈ CPU 核数上下文切换开销超过并行收益
无限制创建进程使用 PoolProcessPoolExecutor每个进程占用独立内存(~几十MB),超量导致 OOM

适用场景表

场景推荐进程数建议原因
CPU 数值计算(素数、加密)✅ ProcessPoolExecutorCPU 核数GIL 不限制,真正并行
图像/视频批处理✅ multiprocessing.PoolCPU 核数每帧独立处理,天然可分割
大文件并行分析✅ Pool.starmap磁盘 I/O 上限文件分段处理,注意边界对齐
并行机器学习推理✅ ProcessPoolExecutorGPU 数或 CPU 核数模型复制到每个进程
网络请求并发I/O 密集型,用线程或 asyncio
少量快速任务进程启动开销 > 计算时间

L3 专家层:深入

进程启动方式原理图:fork vs spawn vs forkserver

┌─────────────────────────────────────────────────────────────┐
│              进程启动方式对比                                 │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   fork(Linux/macOS 默认):                                 │
│   ┌──────────────────────────────────────────────────┐      │
│   │  • 复制父进程的内存空间(写时复制 COW)           │      │
│   │  • 启动速度快                                     │      │
│   │  • 继承文件描述符、全局变量                       │      │
│   │  • 风险:可能复制不需要的状态                     │      │
│   └──────────────────────────────────────────────────┘      │
│                                                             │
│   spawn(Windows 默认,macOS Python 3.8+):                 │
│   ┌──────────────────────────────────────────────────┐      │
│   │  • 启动全新 Python 解释器                         │      │
│   │  • 通过 pickle 序列化传递数据                     │      │
│   │  • 更安全,不会继承不需要的状态                   │      │
│   │  • 启动速度慢                                     │      │
│   └──────────────────────────────────────────────────┘      │
│                                                             │
│   forkserver(可选):                                       │
│   ┌──────────────────────────────────────────────────┐      │
│   │  • 先启动一个"服务器进程"                         │      │
│   │  • 后续子进程通过 fork 服务器进程创建             │      │
│   │  • 平衡速度和安全性                              │      │
│   └──────────────────────────────────────────────────┘      │
│                                                             │
└─────────────────────────────────────────────────────────────┘
python
# spawn_vs_fork_demo.py
"""
演示进程启动方式。

Python 3.8+ macOS 默认使用 spawn,
所以需要 if __name__ == '__main__' 保护。

在 Linux 上可以切换为 spawn:
    multiprocessing.set_start_method('spawn')
"""
import multiprocessing
import os

def worker() -> None:
    print(f"子进程 PID: {os.getpid()}")
    print(f"启动方式: {multiprocessing.get_start_method()}")

if __name__ == "__main__":
    print(f"主进程 PID: {os.getpid()}")
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

IPC 序列化性能

进程间通信(IPC)性能对比:

方式延迟吞吐量适用场景
Queue~100μs中等通用 IPC,生产者-消费者
Pipe~50μs点对点通信
Value/Array~10ns最高共享数值/数组
Manager~500μs共享 dict/list 等复杂对象
文件/内存映射~1μs极高大数据量交换

序列化成本:

python
# ipc_serialization_demo.py
"""
进程间通信需要序列化数据(pickle),这有性能开销。

大对象通过 Queue 传递时:
1. pickle.dumps() 序列化
2. 通过 OS 管道传输
3. pickle.loads() 反序列化

避免在进程间传递大对象!
"""
import multiprocessing
import time

def worker(queue: multiprocessing.Queue) -> None:
    data = queue.get()  # 自动反序列化
    print(f"收到 {len(data)} 字节数据")

if __name__ == "__main__":
    queue = multiprocessing.Queue()
    large_data = b"x" * 10_000_000  # 10MB

    start = time.time()
    queue.put(large_data)  # 自动序列化
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(f"传输 10MB 耗时: {time.time() - start:.3f}s")

性能考量表

操作时间说明
Process.start()~10-50ms包括解释器启动
spawn 方式启动~50-200ms比 fork 慢
fork 方式启动~1-10ms写时复制
Queue.put()~100μs + 序列化数据越大越慢
Value 读写~10ns直接内存访问
进程上下文切换~1-10μs比线程慢 10 倍

设计动机

Python 设计选择原因
Windows 必须用 spawnWindows 没有 fork 系统调用
macOS 3.8+ 改用 spawn防止 fork 后多线程崩溃
if __name__ == '__main__'spawn 模式下模块会被重新导入
提供 Manager简化 IPC,牺牲性能换易用性

知识关联图

多进程知识关联:
┌───────────────────┐
│  multiprocessing  │
│  .Process         │
└────────┬──────────┘

    ┌────┴────┐
    ▼         ▼
┌────────┐ ┌───────────┐
│ Pool   │ │ Queue     │
│ 池化   │ │ IPC 队列  │
└────┬───┘ └─────┬─────┘
     │           │
     ▼           ▼
┌───────────────────┐
│ ProcessPoolExecutor│ ← 与 ThreadPoolExecutor 统一接口
└───────────────────┘


┌───────────────────┐
│ 共享内存          │
│ Value/Array       │
│ mmap              │
└───────────────────┘

本章小结

┌─────────────────────────────────────────────────────────────┐
│                      多进程编程 知识要点                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   进程特点:                                                 │
│   ✓ 独立内存空间,不受 GIL 限制                             │
│   ✓ 适合 CPU 密集型任务                                     │
│   ✓ 创建开销比线程大                                        │
│                                                             │
│   创建进程:                                                 │
│   ✓ multiprocessing.Process(target=func, args=...)          │
│   ✓ start() 启动,join() 等待                               │
│                                                             │
│   进程间通信:                                               │
│   ✓ Queue:队列通信                                         │
│   ✓ Pipe:管道通信                                          │
│   ✓ Value/Array:共享内存                                   │
│   ✓ Manager:共享对象                                       │
│                                                             │
│   进程池:                                                   │
│   ✓ ProcessPoolExecutor                                     │
│   ✓ multiprocessing.Pool                                    │
│                                                             │
│   注意事项:                                                 │
│   ✓ 必须在 if __name__ == '__main__' 中创建进程             │
│   ✓ Windows 上尤其重要                                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘