02-多线程编程
Python 3.11+
概念铺垫
线程与多线程
多线程允许在同一进程中并发执行多个任务。Python 的 threading 模块提供了创建和管理线程的完整接口。
线程生命周期
┌─────────────────────────────────────────────────────────────┐
│ 线程生命周期 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 主线程 │
│ │ │
│ ├── 创建 Thread 对象 │
│ │ │ │
│ │ ▼ │
│ │ 线程(未启动) │
│ │ │
│ ├── start() │
│ │ │ │
│ │ ▼ │
│ │ 线程(运行中)← 操作系统调度 │
│ │ │
│ └── join() │
│ │ │
│ ▼ │
│ 等待线程结束 │
│ │ │
│ ▼ │
│ 线程(已结束) │
│ │
└─────────────────────────────────────────────────────────────┘守护线程 vs 普通线程
┌─────────────────────────────────────────────────────────────┐
│ 守护线程 vs 普通线程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 普通线程: │
│ • 主线程会等待所有普通线程结束 │
│ • 程序在所有普通线程结束后才退出 │
│ │
│ 守护线程: │
│ • 主线程结束时自动终止 │
│ • 适合后台任务(日志、监控等) │
│ • 不能阻塞程序退出 │
│ │
└─────────────────────────────────────────────────────────────┘分层学习
L1 理解层:会用
创建线程 — 基本用法
实际场景: 需要同时下载 10 个文件,每个文件下载需要 2 秒。如果逐个下载,总共需要 20 秒。如何使用多线程并行下载?
问题:如何创建和管理线程?
python
# basic_thread_demo.py
from __future__ import annotations
import threading
import time
def worker(name: str, delay: float) -> None:
"""工作函数"""
for i in range(3):
time.sleep(delay)
print(f"{name} 执行第 {i+1} 次")
t1: threading.Thread = threading.Thread(target=worker, args=("线程 1", 1))
t2: threading.Thread = threading.Thread(target=worker, args=("线程 2", 1.5))
t1.start()
t2.start()
t1.join()
t2.join()
print("所有线程完成")线程参数
python
from __future__ import annotations
import threading
def greet(name: str, greeting: str = "Hello", times: int = 1) -> None:
for _ in range(times):
print(f"{greeting}, {name}!")
t1: threading.Thread = threading.Thread(target=greet, args=("Alice",))
t2: threading.Thread = threading.Thread(target=greet, kwargs={
"name": "Bob",
"greeting": "Hi",
"times": 3
})
t3: threading.Thread = threading.Thread(
target=greet,
args=("Carol",),
kwargs={"greeting": "Hey"}
)
t1.start()
t2.start()
t3.start()Thread 类常用属性和方法
python
from __future__ import annotations
import threading
import time
def worker() -> None:
time.sleep(2)
print("工作完成")
t: threading.Thread = threading.Thread(target=worker)
print(t.name) # 线程名
print(t.daemon) # 是否为守护线程
print(t.ident) # 线程标识符(启动前为 None)
t.start()
print(t.is_alive()) # 是否存活
t.join(timeout=1) # 等待线程结束(最多等待 1 秒)守护线程
python
from __future__ import annotations
import threading
import time
def background_task() -> None:
while True:
print("后台任务运行中...")
time.sleep(1)
t: threading.Thread = threading.Thread(target=background_task)
t.daemon = True
t.start()
time.sleep(3)
print("主线程结束")
# 守护线程随之终止继承 Thread 类
python
from __future__ import annotations
import threading
import time
class WorkerThread(threading.Thread):
"""自定义线程类"""
def __init__(self, name: str, count: int) -> None:
super().__init__()
self.name = name
self.count = count
self.result: str | None = None
def run(self) -> None:
"""线程执行的方法"""
for i in range(self.count):
time.sleep(0.5)
print(f"{self.name}: 执行 {i+1}/{self.count}")
self.result = f"{self.name} 完成"
threads: list[WorkerThread] = []
for i in range(3):
t: WorkerThread = WorkerThread(f"Worker-{i}", 3)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"结果: {t.result}")线程池 — ThreadPoolExecutor
python
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor
import time
def task(n: int) -> int:
time.sleep(1)
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 5)
print(future.result()) # 25
results: list[int] = list(executor.map(task, range(1, 6)))
print(results) # [1, 4, 9, 16, 25]Future 对象
python
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(n: int) -> int:
time.sleep(1)
return n * 2
with ThreadPoolExecutor(max_workers=3) as executor:
futures: list[Future] = [executor.submit(task, i) for i in range(5)]
for future in futures:
print(future.result())
futures = [executor.submit(task, i) for i in range(5)]
for future in as_completed(futures):
print(future.result())回调函数
python
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, Future
def task(n: int) -> int:
return n * 2
def on_complete(future: Future) -> None:
"""任务完成回调"""
print(f"任务完成,结果: {future.result()}")
with ThreadPoolExecutor(max_workers=2) as executor:
future: Future = executor.submit(task, 10)
future.add_done_callback(on_complete)threading 模块常用函数
python
from __future__ import annotations
import threading
current: threading.Thread = threading.current_thread()
print(current.name) # MainThread
print(threading.active_count())
for t in threading.enumerate():
print(t.name)
main: threading.Thread = threading.main_thread()
print(main.name) # MainThread
local_data: threading.local = threading.local()
local_data.value = 10实战:并发下载网页
需求:同时下载 10 个网页,对比单线程和多线程的性能差异。
python
import time
import threading
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1",
]
def download(url: str) -> tuple[str, float]:
start = time.time()
resp = requests.get(url)
elapsed = time.time() - start
return url, elapsed
def single_thread():
start = time.time()
for url in urls:
download(url)
return time.time() - start
def multi_thread(max_workers: int = 5):
start = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(download, url) for url in urls]
for future in as_completed(futures):
url, elapsed = future.result()
print(f"完成: {url} (耗时 {elapsed:.2f}s)")
return time.time() - start
print(f"单线程耗时: {single_thread():.2f}s")
print(f"多线程耗时: {multi_thread():.2f}s")
# 结果:单线程 ~8s,多线程 ~3s(取决于延迟最大的那个)实战:日志监控器
需求:启动一个后台线程持续监控日志文件,主线程继续处理其他业务。
python
import threading
import time
class LogMonitor:
def __init__(self, log_file: str):
self.log_file = log_file
self.running = False
self.thread = None
def start(self) -> None:
self.running = True
self.thread = threading.Thread(target=self._monitor, daemon=True)
self.thread.start()
def stop(self) -> None:
self.running = False
if self.thread:
self.thread.join(timeout=2)
def _monitor(self) -> None:
while self.running:
print("监控日志...")
time.sleep(1)
monitor = LogMonitor("app.log")
monitor.start()
print("主线程继续处理业务...")
time.sleep(5)
monitor.stop()
print("监控已停止")关键代码说明:
| 代码 | 含义 | 为什么这样写 |
|---|---|---|
futures = [executor.submit(download, url) for url in urls] | 批量提交任务并保存 Future 列表 | submit 立即返回 Future 对象,任务在后台并发运行,列表用于后续收集结果 |
for future in as_completed(futures) | 按完成顺序迭代 Future | as_completed 不等最慢的,哪个先完成先处理哪个,比 map 响应更及时 |
threading.Thread(target=self._monitor, daemon=True) | 创建守护线程 | daemon=True 确保主程序退出时后台监控线程自动终止,不需要手动停止 |
self.running = False 配合 while self.running | 协作式停止标志 | 通过标志位告知线程退出,比强制 kill 线程更安全,保证当前循环体执行完 |
self.thread.join(timeout=2) | 等待线程结束,最多 2 秒 | 设置超时防止 stop() 永久阻塞;若线程已停止,join 立即返回 |
L2 实践层:用好
推荐做法表
| 推荐做法 | 说明 | 为什么 |
|---|---|---|
| 线程数设置 | I/O 密集型:线程数 = CPU 核数 × 2 ~ 5 | 充分利用 I/O 等待期间的空闲 CPU |
使用 with 管理线程池 | ThreadPoolExecutor + with 语句 | 自动调用 shutdown(wait=True),防止线程泄漏 |
使用 daemon=True 处理后台任务 | 日志监控、心跳检测等 | 主程序退出时自动终止,不阻塞进程退出 |
| 每个任务内部捕获异常 | Future.result() 会 re-raise | 未捕获的异常在 result() 调用时抛出,方便集中处理 |
| 避免共享数据 | 优先使用 threading.local 或不可变数据 | 共享数据引入锁竞争和死锁风险 |
| 验证并发正确性 | 对共享数据写单元测试 | 竞态条件是概率性的,测试需要反复运行捕获 |
反模式对比
| ❌ 反模式 | ✅ 正确做法 | 说明 |
|---|---|---|
忘记 join() | executor.shutdown(wait=True) 或手动 join() | 主线程退出,子线程可能未完成 |
| 线程泄漏 | 使用 ThreadPoolExecutor + with | 创建大量线程不关闭,耗尽系统资源 |
| 死锁 | 减少锁的数量,使用 RLock | 多个锁交叉等待 |
| 竞态条件 | 使用 Lock 或 threading.local | 共享变量未加锁 |
| 无限制创建线程 | 使用线程池限制并发数 | 过多线程导致切换开销超过收益 |
适用场景表
| 场景 | 推荐 | 线程数建议 | 原因 |
|---|---|---|---|
| 批量网络请求 (100+ URL) | ✅ ThreadPoolExecutor | 10-20 | I/O 密集,等待时间长 |
| 后台日志监控 | ✅ 守护线程 | 1 | 长期运行的轻量任务 |
| CPU 数值计算 | ❌ | — | GIL 限制,改用多进程 |
| 少量大文件下载 (5个) | ✅ 简单 Thread | 5 | 开销小,不需要线程池 |
| GUI 事件处理 | ✅ 后台线程 | 1-2 | 避免阻塞 UI 主线程 |
L3 专家层:深入
线程的 OS 级实现原理图
Python 线程是对 操作系统原生线程 的封装:
┌─────────────────────────────────────────────────────────────┐
│ Python 线程与 OS 线程的映射 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Python threading 模块 │
│ ┌──────────────────────────────────────────────┐ │
│ │ Thread(target=func) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ _thread.start_new_thread() │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ C 层 pthread_create() (POSIX) │ │
│ │ 或 CreateThread() (Windows) │ │
│ └──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 操作系统 │
│ ┌──────────────────────────────────────────────┐ │
│ │ Linux: pthread (1:1 模型) │ │
│ │ macOS: pthread (1:1 模型) │ │
│ │ Windows: Win32 Thread (1:1 模型) │ │
│ └──────────────────────────────────────────────┘ │
│ │
│ Python 使用 1:1 线程模型: │
│ • 1 个 Python Thread = 1 个 OS 线程 │
│ • 由 OS 调度器决定何时执行 │
│ • 线程切换开销约 1-10μs │
│ │
└─────────────────────────────────────────────────────────────┘GIL 与线程调度的交互
python
# gil_thread_interaction_demo.py
"""
演示 GIL 如何影响线程调度。
线程在以下情况释放 GIL:
1. 执行 time.sleep()
2. 执行 I/O 操作
3. 时间片到期(默认 5ms)
"""
import threading
import time
active_threads: list[str] = []
lock = threading.Lock()
def worker(name: str, iterations: int) -> None:
for i in range(iterations):
with lock:
active_threads.append(f"{name}-{i}")
time.sleep(0) # 会释放 GIL,允许其他线程运行
t1 = threading.Thread(target=worker, args=("A", 100))
t2 = threading.Thread(target=worker, args=("B", 100))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"活跃线程记录数: {len(active_threads)}")性能考量表
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
Thread.start() | O(1) | 委托给 OS 创建线程 |
Thread.join() | O(n) | n 为线程执行时间 |
| 线程切换 | ~1-10μs | OS 调度器决定 |
ThreadPoolExecutor.submit() | O(1) | 任务入队,立即返回 |
| 线程创建开销 | ~1-5ms | 包括栈分配、OS 注册 |
线程数设置经验法则:
I/O 密集型最佳线程数 ≈ CPU 核数 × (1 + 等待时间 / 计算时间)
示例:
• CPU 核数 = 4
• 等待时间(网络延迟)= 100ms
• 计算时间 = 10ms
• 最佳线程数 ≈ 4 × (1 + 100/10) = 44设计动机
| Python 设计选择 | 原因 |
|---|---|
| 使用 OS 原生线程(1:1) | 利用 OS 调度器,代码简单 |
| 提供 ThreadPoolExecutor | 避免手动管理线程生命周期 |
daemon 线程自动退出 | 防止后台线程阻止程序退出 |
知识关联图
线程知识关联:
┌───────────────────┐ ┌───────────────────┐
│ threading.Thread │────→│ ThreadPoolExecutor│
│ 手动管理 │ │ 池化复用 │
└────────┬──────────┘ └────────┬──────────┘
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ 守护线程 │ │ Future 对象 │
│ daemon=True │ │ 结果/异常/取消 │
└───────────────────┘ └───────────────────┘
│
▼
┌───────────────────┐
│ 线程同步 │
│ Lock/RLock │
│ Semaphore/Event │
└───────────────────┘本章小结
┌─────────────────────────────────────────────────────────────┐
│ 多线程编程 知识要点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 创建线程: │
│ ✓ threading.Thread(target=func, args=...) │
│ ✓ start() 启动,join() 等待 │
│ ✓ 守护线程:daemon = True │
│ │
│ Thread 类: │
│ ✓ name、ident、daemon 属性 │
│ ✓ is_alive()、join() 方法 │
│ ✓ 可继承重写 run() 方法 │
│ │
│ 线程池: │
│ ✓ ThreadPoolExecutor(max_workers=n) │
│ ✓ submit() 提交单个任务 │
│ ✓ map() 批量提交 │
│ ✓ Future 对象获取结果 │
│ │
│ 适用场景: │
│ ✓ I/O 密集型任务 │
│ ✓ 网络请求、文件读写 │
│ │
└─────────────────────────────────────────────────────────────┘