Skip to content

06-并发模型选择

Python 3.11+

概念铺垫

三种并发模型对比

实际场景

一个系统有多种任务:Web API 调用(I/O 密集型)、图像处理(CPU 密集型)、数据库查询(I/O 密集型)。不同任务需要不同的并发模型。如何选择?

问题:如何根据任务特点选择合适的并发模型?

┌─────────────────────────────────────────────────────────────┐
│              三种并发模型对比                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                    多线程                            │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │ 优点:                                               │   │
│   │ • 创建开销小                                         │   │
│   │ • 共享内存方便                                       │   │
│   │ • 适合 I/O 密集型                                    │   │
│   │                                                      │   │
│   │ 缺点:                                               │   │
│   │ • 受 GIL 限制                                        │   │
│   │ • 需要同步机制                                       │   │
│   │ • 调试困难                                           │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                    多进程                            │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │ 优点:                                               │   │
│   │ • 不受 GIL 限制                                      │   │
│   │ • 利用多核 CPU                                       │   │
│   │ • 进程隔离,稳定性高                                 │   │
│   │                                                      │   │
│   │ 缺点:                                               │   │
│   │ • 创建开销大                                         │   │
│   │ • 进程间通信复杂                                     │   │
│   │ • 内存占用高                                         │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                    asyncio                           │   │
│   ├─────────────────────────────────────────────────────┤   │
│   │ 优点:                                               │   │
│   │ • 单线程,无同步问题                                 │   │
│   │ • 高并发支持                                         │   │
│   │ • 代码清晰(async/await)                            │   │
│   │                                                      │   │
│   │ 缺点:                                               │   │
│   │ • 需要异步库支持                                     │   │
│   │ • 不能混合同步代码                                   │   │
│   │ • 学习曲线较陡                                       │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

并发模型选择决策树

┌─────────────────────────────────────────────────────────────┐
│              并发模型选择决策树                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   任务类型是什么?                                          │
│   │                                                        │
│   ├── CPU 密集型(计算、图像处理、加密)                    │
│   │   │                                                    │
│   │   └── 选择:多进程                                     │
│   │       multiprocessing / ProcessPoolExecutor            │
│   │                                                        │
│   ├── I/O 密集型(网络、文件、数据库)                      │
│   │   │                                                    │
│   │   ├── 需要高并发?                                     │
│   │   │   │                                                │
│   │   │   ├── 是 → 选择:asyncio                           │
│   │   │   │      适合:Web 服务器、API 客户端              │
│   │   │   │                                                │
│   │   │   └── 否 → 选择:多线程                            │
│   │   │          threading / ThreadPoolExecutor            │
│   │   │                                                    │
│   │   └── 需要使用同步库?                                 │
│   │       │                                                │
│   │       ├── 是 → 选择:多线程                            │
│   │       │      可以直接使用 requests 等                  │
│   │       │                                                │
│   │       └── 否 → 选择:asyncio                           │
│   │              使用 aiohttp 等异步库                     │
│   │                                                        │
│   └── 混合型                                                │
│       │                                                    │
│       └── 选择:多进程 + 多线程/asyncio                     │
│           进程处理计算,线程/协程处理 I/O                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

分层学习

L1 理解层:会用

场景 1:Web 爬虫

python
from __future__ import annotations

# 推荐:asyncio(高并发网络请求)
import asyncio
import aiohttp

async def crawl(urls: list[str]) -> list[str]:
    async with aiohttp.ClientSession() as session:
        tasks: list[asyncio.Task[str]] = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as response:
        return await response.text()

# 如果必须使用同步库(如 requests),用多线程
from concurrent.futures import ThreadPoolExecutor
import requests

def crawl_sync(urls: list[str]) -> list[requests.Response]:
    with ThreadPoolExecutor(max_workers=10) as executor:
        return list(executor.map(requests.get, urls))

场景 2:数据处理

python
from __future__ import annotations

# CPU 密集型:多进程
from concurrent.futures import ProcessPoolExecutor

def process_data(data: range) -> int:
    return sum(x * x for x in data)

def main() -> None:
    data_chunks: list[range] = [range(1000000) for _ in range(8)]
    
    with ProcessPoolExecutor() as executor:
        results: list[int] = list(executor.map(process_data, data_chunks))
    
    print(results)

if __name__ == '__main__':
    main()

场景 3:Web 服务器

python
from __future__ import annotations

# 推荐:asyncio(FastAPI / Sanic)
from fastapi import FastAPI

app: FastAPI = FastAPI()

@app.get("/")
async def root() -> dict[str, str]:
    return {"message": "Hello"}

# Flask(多线程模式)
from flask import Flask

app_flask: Flask = Flask(__name__)

@app_flask.route("/")
def root_flask() -> dict[str, str]:
    return {"message": "Hello"}

场景 4:数据库操作

python
from __future__ import annotations

# asyncio + 异步数据库驱动
import asyncio
import asyncpg

async def query_db() -> list[asyncpg.Record]:
    conn: asyncpg.Connection = await asyncpg.connect("postgresql://user:pass@localhost/db")
    rows: list[asyncpg.Record] = await conn.fetch("SELECT * FROM users")
    await conn.close()
    return rows

# 多线程 + 同步数据库驱动
from concurrent.futures import ThreadPoolExecutor
import psycopg2

def query_db_sync() -> list[tuple]:
    conn: psycopg2.extensions.connection = psycopg2.connect("postgresql://user:pass@localhost/db")
    cur: psycopg2.extensions.cursor = conn.cursor()
    cur.execute("SELECT * FROM users")
    rows: list[tuple] = cur.fetchall()
    conn.close()
    return rows

L2 实践层:用好

推荐做法表

推荐做法说明为什么
识别任务类型I/O 密集型 → 多线程 / asyncio;CPU 密集型 → 多进程错误选择导致性能下降甚至更慢
选择合适的粒度任务太少:并发开销可能大于收益;任务太多:注意资源限制找到并发数与开销的平衡点
使用高级接口ThreadPoolExecutor / ProcessPoolExecutor / asyncio.gather避免手动管理线程/进程生命周期
处理异常捕获并处理异常,使用超时机制并发代码的异常容易被吞噬
使用 with 语句管理线程池、进程池、异步资源自动清理资源,防止泄漏
测试和监控测试并发正确性,监控性能指标竞态条件难以复现,需要充分测试

反模式对比

❌ 反模式✅ 正确做法说明
CPU 密集用多线程改用多进程GIL 导致多线程反而更慢
I/O 高并发用多线程改用 asyncio线程内存开销大,asyncio 万级并发只需几十 MB
混合项目中只用一种模型按任务类型分别选择CPU 部分用进程,I/O 部分用协程
不用 with 管理资源始终用 with 语句忘记关闭连接/释放资源
不设置超时timeout 参数单个任务卡死导致整个程序挂起

适用场景表

场景推荐模型原因示例技术栈
CPU 密集型 — 数值计算、图像处理多进程不受 GIL 限制,真正并行ProcessPoolExecutor
CPU 密集型 — 机器学习推理多进程模型加载到每个进程独立内存multiprocessing.Pool
I/O 密集型 — 高并发(>1000 连接)asyncio单线程管理万级连接,内存占用低FastAPI + aiohttp
I/O 密集型 — 低并发(<100 连接)多线程开发简单,同步库直接可用ThreadPoolExecutor + requests
I/O 密集型 — 需用同步库多线程无法替换为异步库run_in_executorThreadPoolExecutor
混合型 — CPU + I/O多进程 + asyncio进程处理计算,协程处理 I/OProcessPoolExecutor + asyncio

并发编程最佳实践

┌─────────────────────────────────────────────────────────────┐
│              并发编程最佳实践                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   1. 识别任务类型                                           │
│      • I/O 密集型 → 多线程 / asyncio                        │
│      • CPU 密集型 → 多进程                                  │
│                                                             │
│   2. 选择合适的粒度                                         │
│      • 任务太少:并发开销可能大于收益                       │
│      • 任务太多:注意资源限制                               │
│                                                             │
│   3. 使用高级接口                                           │
│      • ThreadPoolExecutor / ProcessPoolExecutor             │
│      • asyncio.gather / create_task                         │
│                                                             │
│   4. 处理异常                                               │
│      • 捕获并处理异常                                       │
│      • 使用超时机制                                         │
│                                                             │
│   5. 资源管理                                               │
│      • 使用 with 语句                                       │
│      • 及时关闭连接、释放资源                               │
│                                                             │
│   6. 测试和监控                                             │
│      • 测试并发正确性                                       │
│      • 监控性能指标                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实测性能对比

以下为典型场景的实测数据(4 核 CPU,Python 3.12):

场景 1:100 个 I/O 任务(每个延迟 0.1 秒)

方式耗时加速比
串行~10.0s
多线程 (10 workers)~1.1s
asyncio~0.15s67×

场景 2:8 个 CPU 任务(每个计算 50000 以内素数)

方式耗时加速比
串行~12.5s
多线程 (8 workers)~12.8s0.98×(更慢!)
多进程 (4 workers)~3.2s3.9×

场景 3:混合型(4 个 CPU + 20 个 I/O)

方式耗时说明
纯多线程~8.5sCPU 部分被 GIL 限制
多进程 + asyncio~4.0s进程处理 CPU,协程处理 I/O

性能基准测试代码

python
import time
import threading
import multiprocessing

def io_task() -> None:
    time.sleep(0.1)

async def async_io_task() -> None:
    await asyncio.sleep(0.1)

def cpu_task() -> int:
    return sum(range(100000))

def test_threading() -> float:
    start: float = time.time()
    threads: list[threading.Thread] = [threading.Thread(target=io_task) for _ in range(100)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return time.time() - start

def test_multiprocessing() -> float:
    start: float = time.time()
    processes: list[multiprocessing.Process] = [multiprocessing.Process(target=cpu_task) for _ in range(8)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    return time.time() - start

async def test_asyncio() -> float:
    start: float = time.time()
    await asyncio.gather(*[async_io_task() for _ in range(100)])
    return time.time() - start

asyncio vs 多线程性能差异原因

┌─────────────────────────────────────────────────────────────┐
│              asyncio vs 多线程性能差异原因                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   线程切换开销:                                              │
│   • OS 调度器介入(上下文切换 ~1-10μs)                      │
│   • CPU 寄存器保存/恢复                                      │
│   • 线程栈切换                                               │
│                                                             │
│   协程切换开销:                                              │
│   • 用户态切换(~0.5μs)                                    │
│   • 只需保存/恢复栈指针和指令指针                            │
│   • 无需 OS 介入                                             │
│                                                             │
│   内存开销:                                                  │
│   • 线程:每线程 ~1-8MB 栈空间                              │
│   • 协程:每协程 ~几 KB                                     │
│                                                             │
│   锁竞争:                                                    │
│   • 多线程:需要 Lock 保护共享数据                           │
│   • asyncio:单线程,无锁                                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

知识关联图

并发模型全景:
┌───────────────────────────────────────────────┐
│                                               │
│  任务类型?                                    │
│  ├── I/O 密集型                                │
│  │   ├── 高并发(>1000) → asyncio            │
│  │   └── 低并发(<100) → ThreadPoolExecutor  │
│  │                                             │
│  ├── CPU 密集型                                │
│  │   └── 多核并行 → ProcessPoolExecutor       │
│  │                                             │
│  └── 混合型                                    │
│      └── 多进程(CPU) + asyncio(I/O)           │
│                                               │
└───────────────────────────────────────────────┘

本章小结

┌─────────────────────────────────────────────────────────────┐
│                      并发模型选择 知识要点                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   选择依据:                                                 │
│   ✓ 任务类型:I/O 密集型 vs CPU 密集型                      │
│   ✓ 并发需求:高并发 vs 低并发                              │
│   ✓ 库支持:同步库 vs 异步库                                │
│                                                             │
│   推荐选择:                                                 │
│   ✓ CPU 密集型 → 多进程                                     │
│   ✓ I/O 密集型 + 高并发 → asyncio                           │
│   ✓ I/O 密集型 + 同步库 → 多线程                            │
│   ✓ 混合型 → 多进程 + 多线程/asyncio                        │
│                                                             │
│   注意事项:                                                 │
│   ✓ 评估并发开销                                            │
│   ✓ 处理异常和超时                                          │
│   ✓ 管理资源                                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘