06-并发模型选择
Python 3.11+
概念铺垫
三种并发模型对比
实际场景
一个系统有多种任务:Web API 调用(I/O 密集型)、图像处理(CPU 密集型)、数据库查询(I/O 密集型)。不同任务需要不同的并发模型。如何选择?
问题:如何根据任务特点选择合适的并发模型?
┌─────────────────────────────────────────────────────────────┐
│ 三种并发模型对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 多线程 │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ 优点: │ │
│ │ • 创建开销小 │ │
│ │ • 共享内存方便 │ │
│ │ • 适合 I/O 密集型 │ │
│ │ │ │
│ │ 缺点: │ │
│ │ • 受 GIL 限制 │ │
│ │ • 需要同步机制 │ │
│ │ • 调试困难 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 多进程 │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ 优点: │ │
│ │ • 不受 GIL 限制 │ │
│ │ • 利用多核 CPU │ │
│ │ • 进程隔离,稳定性高 │ │
│ │ │ │
│ │ 缺点: │ │
│ │ • 创建开销大 │ │
│ │ • 进程间通信复杂 │ │
│ │ • 内存占用高 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ asyncio │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ 优点: │ │
│ │ • 单线程,无同步问题 │ │
│ │ • 高并发支持 │ │
│ │ • 代码清晰(async/await) │ │
│ │ │ │
│ │ 缺点: │ │
│ │ • 需要异步库支持 │ │
│ │ • 不能混合同步代码 │ │
│ │ • 学习曲线较陡 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘并发模型选择决策树
┌─────────────────────────────────────────────────────────────┐
│ 并发模型选择决策树 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 任务类型是什么? │
│ │ │
│ ├── CPU 密集型(计算、图像处理、加密) │
│ │ │ │
│ │ └── 选择:多进程 │
│ │ multiprocessing / ProcessPoolExecutor │
│ │ │
│ ├── I/O 密集型(网络、文件、数据库) │
│ │ │ │
│ │ ├── 需要高并发? │
│ │ │ │ │
│ │ │ ├── 是 → 选择:asyncio │
│ │ │ │ 适合:Web 服务器、API 客户端 │
│ │ │ │ │
│ │ │ └── 否 → 选择:多线程 │
│ │ │ threading / ThreadPoolExecutor │
│ │ │ │
│ │ └── 需要使用同步库? │
│ │ │ │
│ │ ├── 是 → 选择:多线程 │
│ │ │ 可以直接使用 requests 等 │
│ │ │ │
│ │ └── 否 → 选择:asyncio │
│ │ 使用 aiohttp 等异步库 │
│ │ │
│ └── 混合型 │
│ │ │
│ └── 选择:多进程 + 多线程/asyncio │
│ 进程处理计算,线程/协程处理 I/O │
│ │
└─────────────────────────────────────────────────────────────┘分层学习
L1 理解层:会用
场景 1:Web 爬虫
python
from __future__ import annotations
# 推荐:asyncio(高并发网络请求)
import asyncio
import aiohttp
async def crawl(urls: list[str]) -> list[str]:
async with aiohttp.ClientSession() as session:
tasks: list[asyncio.Task[str]] = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()
# 如果必须使用同步库(如 requests),用多线程
from concurrent.futures import ThreadPoolExecutor
import requests
def crawl_sync(urls: list[str]) -> list[requests.Response]:
with ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(requests.get, urls))场景 2:数据处理
python
from __future__ import annotations
# CPU 密集型:多进程
from concurrent.futures import ProcessPoolExecutor
def process_data(data: range) -> int:
return sum(x * x for x in data)
def main() -> None:
data_chunks: list[range] = [range(1000000) for _ in range(8)]
with ProcessPoolExecutor() as executor:
results: list[int] = list(executor.map(process_data, data_chunks))
print(results)
if __name__ == '__main__':
main()场景 3:Web 服务器
python
from __future__ import annotations
# 推荐:asyncio(FastAPI / Sanic)
from fastapi import FastAPI
app: FastAPI = FastAPI()
@app.get("/")
async def root() -> dict[str, str]:
return {"message": "Hello"}
# Flask(多线程模式)
from flask import Flask
app_flask: Flask = Flask(__name__)
@app_flask.route("/")
def root_flask() -> dict[str, str]:
return {"message": "Hello"}场景 4:数据库操作
python
from __future__ import annotations
# asyncio + 异步数据库驱动
import asyncio
import asyncpg
async def query_db() -> list[asyncpg.Record]:
conn: asyncpg.Connection = await asyncpg.connect("postgresql://user:pass@localhost/db")
rows: list[asyncpg.Record] = await conn.fetch("SELECT * FROM users")
await conn.close()
return rows
# 多线程 + 同步数据库驱动
from concurrent.futures import ThreadPoolExecutor
import psycopg2
def query_db_sync() -> list[tuple]:
conn: psycopg2.extensions.connection = psycopg2.connect("postgresql://user:pass@localhost/db")
cur: psycopg2.extensions.cursor = conn.cursor()
cur.execute("SELECT * FROM users")
rows: list[tuple] = cur.fetchall()
conn.close()
return rowsL2 实践层:用好
推荐做法表
| 推荐做法 | 说明 | 为什么 |
|---|---|---|
| 识别任务类型 | I/O 密集型 → 多线程 / asyncio;CPU 密集型 → 多进程 | 错误选择导致性能下降甚至更慢 |
| 选择合适的粒度 | 任务太少:并发开销可能大于收益;任务太多:注意资源限制 | 找到并发数与开销的平衡点 |
| 使用高级接口 | ThreadPoolExecutor / ProcessPoolExecutor / asyncio.gather | 避免手动管理线程/进程生命周期 |
| 处理异常 | 捕获并处理异常,使用超时机制 | 并发代码的异常容易被吞噬 |
使用 with 语句 | 管理线程池、进程池、异步资源 | 自动清理资源,防止泄漏 |
| 测试和监控 | 测试并发正确性,监控性能指标 | 竞态条件难以复现,需要充分测试 |
反模式对比
| ❌ 反模式 | ✅ 正确做法 | 说明 |
|---|---|---|
| CPU 密集用多线程 | 改用多进程 | GIL 导致多线程反而更慢 |
| I/O 高并发用多线程 | 改用 asyncio | 线程内存开销大,asyncio 万级并发只需几十 MB |
| 混合项目中只用一种模型 | 按任务类型分别选择 | CPU 部分用进程,I/O 部分用协程 |
不用 with 管理资源 | 始终用 with 语句 | 忘记关闭连接/释放资源 |
| 不设置超时 | 加 timeout 参数 | 单个任务卡死导致整个程序挂起 |
适用场景表
| 场景 | 推荐模型 | 原因 | 示例技术栈 |
|---|---|---|---|
| CPU 密集型 — 数值计算、图像处理 | 多进程 | 不受 GIL 限制,真正并行 | ProcessPoolExecutor |
| CPU 密集型 — 机器学习推理 | 多进程 | 模型加载到每个进程独立内存 | multiprocessing.Pool |
| I/O 密集型 — 高并发(>1000 连接) | asyncio | 单线程管理万级连接,内存占用低 | FastAPI + aiohttp |
| I/O 密集型 — 低并发(<100 连接) | 多线程 | 开发简单,同步库直接可用 | ThreadPoolExecutor + requests |
| I/O 密集型 — 需用同步库 | 多线程 | 无法替换为异步库 | run_in_executor 或 ThreadPoolExecutor |
| 混合型 — CPU + I/O | 多进程 + asyncio | 进程处理计算,协程处理 I/O | ProcessPoolExecutor + asyncio |
并发编程最佳实践
┌─────────────────────────────────────────────────────────────┐
│ 并发编程最佳实践 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 识别任务类型 │
│ • I/O 密集型 → 多线程 / asyncio │
│ • CPU 密集型 → 多进程 │
│ │
│ 2. 选择合适的粒度 │
│ • 任务太少:并发开销可能大于收益 │
│ • 任务太多:注意资源限制 │
│ │
│ 3. 使用高级接口 │
│ • ThreadPoolExecutor / ProcessPoolExecutor │
│ • asyncio.gather / create_task │
│ │
│ 4. 处理异常 │
│ • 捕获并处理异常 │
│ • 使用超时机制 │
│ │
│ 5. 资源管理 │
│ • 使用 with 语句 │
│ • 及时关闭连接、释放资源 │
│ │
│ 6. 测试和监控 │
│ • 测试并发正确性 │
│ • 监控性能指标 │
│ │
└─────────────────────────────────────────────────────────────┘实测性能对比
以下为典型场景的实测数据(4 核 CPU,Python 3.12):
场景 1:100 个 I/O 任务(每个延迟 0.1 秒)
| 方式 | 耗时 | 加速比 |
|---|---|---|
| 串行 | ~10.0s | 1× |
| 多线程 (10 workers) | ~1.1s | 9× |
| asyncio | ~0.15s | 67× |
场景 2:8 个 CPU 任务(每个计算 50000 以内素数)
| 方式 | 耗时 | 加速比 |
|---|---|---|
| 串行 | ~12.5s | 1× |
| 多线程 (8 workers) | ~12.8s | 0.98×(更慢!) |
| 多进程 (4 workers) | ~3.2s | 3.9× |
场景 3:混合型(4 个 CPU + 20 个 I/O)
| 方式 | 耗时 | 说明 |
|---|---|---|
| 纯多线程 | ~8.5s | CPU 部分被 GIL 限制 |
| 多进程 + asyncio | ~4.0s | 进程处理 CPU,协程处理 I/O |
性能基准测试代码
python
import time
import threading
import multiprocessing
def io_task() -> None:
time.sleep(0.1)
async def async_io_task() -> None:
await asyncio.sleep(0.1)
def cpu_task() -> int:
return sum(range(100000))
def test_threading() -> float:
start: float = time.time()
threads: list[threading.Thread] = [threading.Thread(target=io_task) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
return time.time() - start
def test_multiprocessing() -> float:
start: float = time.time()
processes: list[multiprocessing.Process] = [multiprocessing.Process(target=cpu_task) for _ in range(8)]
for p in processes:
p.start()
for p in processes:
p.join()
return time.time() - start
async def test_asyncio() -> float:
start: float = time.time()
await asyncio.gather(*[async_io_task() for _ in range(100)])
return time.time() - startasyncio vs 多线程性能差异原因
┌─────────────────────────────────────────────────────────────┐
│ asyncio vs 多线程性能差异原因 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 线程切换开销: │
│ • OS 调度器介入(上下文切换 ~1-10μs) │
│ • CPU 寄存器保存/恢复 │
│ • 线程栈切换 │
│ │
│ 协程切换开销: │
│ • 用户态切换(~0.5μs) │
│ • 只需保存/恢复栈指针和指令指针 │
│ • 无需 OS 介入 │
│ │
│ 内存开销: │
│ • 线程:每线程 ~1-8MB 栈空间 │
│ • 协程:每协程 ~几 KB │
│ │
│ 锁竞争: │
│ • 多线程:需要 Lock 保护共享数据 │
│ • asyncio:单线程,无锁 │
│ │
└─────────────────────────────────────────────────────────────┘知识关联图
并发模型全景:
┌───────────────────────────────────────────────┐
│ │
│ 任务类型? │
│ ├── I/O 密集型 │
│ │ ├── 高并发(>1000) → asyncio │
│ │ └── 低并发(<100) → ThreadPoolExecutor │
│ │ │
│ ├── CPU 密集型 │
│ │ └── 多核并行 → ProcessPoolExecutor │
│ │ │
│ └── 混合型 │
│ └── 多进程(CPU) + asyncio(I/O) │
│ │
└───────────────────────────────────────────────┘本章小结
┌─────────────────────────────────────────────────────────────┐
│ 并发模型选择 知识要点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 选择依据: │
│ ✓ 任务类型:I/O 密集型 vs CPU 密集型 │
│ ✓ 并发需求:高并发 vs 低并发 │
│ ✓ 库支持:同步库 vs 异步库 │
│ │
│ 推荐选择: │
│ ✓ CPU 密集型 → 多进程 │
│ ✓ I/O 密集型 + 高并发 → asyncio │
│ ✓ I/O 密集型 + 同步库 → 多线程 │
│ ✓ 混合型 → 多进程 + 多线程/asyncio │
│ │
│ 注意事项: │
│ ✓ 评估并发开销 │
│ ✓ 处理异常和超时 │
│ ✓ 管理资源 │
│ │
└─────────────────────────────────────────────────────────────┘