Skip to content

02-多线程编程

Python 3.11+

概念铺垫

线程与多线程

多线程允许在同一进程中并发执行多个任务。Python 的 threading 模块提供了创建和管理线程的完整接口。

线程生命周期

┌─────────────────────────────────────────────────────────────┐
│              线程生命周期                                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   主线程                                                    │
│     │                                                       │
│     ├── 创建 Thread 对象                                    │
│     │      │                                                │
│     │      ▼                                                │
│     │   线程(未启动)                                       │
│     │                                                       │
│     ├── start()                                             │
│     │      │                                                │
│     │      ▼                                                │
│     │   线程(运行中)← 操作系统调度                         │
│     │                                                       │
│     └── join()                                              │
│            │                                                │
│            ▼                                                │
│         等待线程结束                                         │
│            │                                                │
│            ▼                                                │
│         线程(已结束)                                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

守护线程 vs 普通线程

┌─────────────────────────────────────────────────────────────┐
│              守护线程 vs 普通线程                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   普通线程:                                                 │
│   • 主线程会等待所有普通线程结束                             │
│   • 程序在所有普通线程结束后才退出                           │
│                                                             │
│   守护线程:                                                 │
│   • 主线程结束时自动终止                                     │
│   • 适合后台任务(日志、监控等)                             │
│   • 不能阻塞程序退出                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

分层学习

L1 理解层:会用

创建线程 — 基本用法

实际场景: 需要同时下载 10 个文件,每个文件下载需要 2 秒。如果逐个下载,总共需要 20 秒。如何使用多线程并行下载?

问题:如何创建和管理线程?

python
# basic_thread_demo.py
from __future__ import annotations

import threading
import time

def worker(name: str, delay: float) -> None:
    """工作函数"""
    for i in range(3):
        time.sleep(delay)
        print(f"{name} 执行第 {i+1} 次")

t1: threading.Thread = threading.Thread(target=worker, args=("线程 1", 1))
t2: threading.Thread = threading.Thread(target=worker, args=("线程 2", 1.5))

t1.start()
t2.start()

t1.join()
t2.join()

print("所有线程完成")

线程参数

python
from __future__ import annotations

import threading

def greet(name: str, greeting: str = "Hello", times: int = 1) -> None:
    for _ in range(times):
        print(f"{greeting}, {name}!")

t1: threading.Thread = threading.Thread(target=greet, args=("Alice",))

t2: threading.Thread = threading.Thread(target=greet, kwargs={
    "name": "Bob",
    "greeting": "Hi",
    "times": 3
})

t3: threading.Thread = threading.Thread(
    target=greet,
    args=("Carol",),
    kwargs={"greeting": "Hey"}
)

t1.start()
t2.start()
t3.start()

Thread 类常用属性和方法

python
from __future__ import annotations

import threading
import time

def worker() -> None:
    time.sleep(2)
    print("工作完成")

t: threading.Thread = threading.Thread(target=worker)

print(t.name)      # 线程名
print(t.daemon)    # 是否为守护线程
print(t.ident)     # 线程标识符(启动前为 None)

t.start()

print(t.is_alive())  # 是否存活
t.join(timeout=1)    # 等待线程结束(最多等待 1 秒)

守护线程

python
from __future__ import annotations

import threading
import time

def background_task() -> None:
    while True:
        print("后台任务运行中...")
        time.sleep(1)

t: threading.Thread = threading.Thread(target=background_task)
t.daemon = True
t.start()

time.sleep(3)
print("主线程结束")
# 守护线程随之终止

继承 Thread 类

python
from __future__ import annotations

import threading
import time

class WorkerThread(threading.Thread):
    """自定义线程类"""
    
    def __init__(self, name: str, count: int) -> None:
        super().__init__()
        self.name = name
        self.count = count
        self.result: str | None = None
    
    def run(self) -> None:
        """线程执行的方法"""
        for i in range(self.count):
            time.sleep(0.5)
            print(f"{self.name}: 执行 {i+1}/{self.count}")
        self.result = f"{self.name} 完成"

threads: list[WorkerThread] = []
for i in range(3):
    t: WorkerThread = WorkerThread(f"Worker-{i}", 3)
    threads.append(t)
    t.start()

for t in threads:
    t.join()
    print(f"结果: {t.result}")

线程池 — ThreadPoolExecutor

python
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import time

def task(n: int) -> int:
    time.sleep(1)
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, 5)
    print(future.result())  # 25
    
    results: list[int] = list(executor.map(task, range(1, 6)))
    print(results)  # [1, 4, 9, 16, 25]

Future 对象

python
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def task(n: int) -> int:
    time.sleep(1)
    return n * 2

with ThreadPoolExecutor(max_workers=3) as executor:
    futures: list[Future] = [executor.submit(task, i) for i in range(5)]
    
    for future in futures:
        print(future.result())
    
    futures = [executor.submit(task, i) for i in range(5)]
    for future in as_completed(futures):
        print(future.result())

回调函数

python
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor, Future

def task(n: int) -> int:
    return n * 2

def on_complete(future: Future) -> None:
    """任务完成回调"""
    print(f"任务完成,结果: {future.result()}")

with ThreadPoolExecutor(max_workers=2) as executor:
    future: Future = executor.submit(task, 10)
    future.add_done_callback(on_complete)

threading 模块常用函数

python
from __future__ import annotations

import threading

current: threading.Thread = threading.current_thread()
print(current.name)  # MainThread

print(threading.active_count())

for t in threading.enumerate():
    print(t.name)

main: threading.Thread = threading.main_thread()
print(main.name)  # MainThread

local_data: threading.local = threading.local()
local_data.value = 10

实战:并发下载网页

需求:同时下载 10 个网页,对比单线程和多线程的性能差异。

python
import time
import threading
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/3",
    "https://httpbin.org/delay/1",
]

def download(url: str) -> tuple[str, float]:
    start = time.time()
    resp = requests.get(url)
    elapsed = time.time() - start
    return url, elapsed

def single_thread():
    start = time.time()
    for url in urls:
        download(url)
    return time.time() - start

def multi_thread(max_workers: int = 5):
    start = time.time()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(download, url) for url in urls]
        for future in as_completed(futures):
            url, elapsed = future.result()
            print(f"完成: {url} (耗时 {elapsed:.2f}s)")
    return time.time() - start

print(f"单线程耗时: {single_thread():.2f}s")
print(f"多线程耗时: {multi_thread():.2f}s")
# 结果:单线程 ~8s,多线程 ~3s(取决于延迟最大的那个)

实战:日志监控器

需求:启动一个后台线程持续监控日志文件,主线程继续处理其他业务。

python
import threading
import time

class LogMonitor:
    def __init__(self, log_file: str):
        self.log_file = log_file
        self.running = False
        self.thread = None
    
    def start(self) -> None:
        self.running = True
        self.thread = threading.Thread(target=self._monitor, daemon=True)
        self.thread.start()
    
    def stop(self) -> None:
        self.running = False
        if self.thread:
            self.thread.join(timeout=2)
    
    def _monitor(self) -> None:
        while self.running:
            print("监控日志...")
            time.sleep(1)

monitor = LogMonitor("app.log")
monitor.start()

print("主线程继续处理业务...")
time.sleep(5)

monitor.stop()
print("监控已停止")

关键代码说明:

代码含义为什么这样写
futures = [executor.submit(download, url) for url in urls]批量提交任务并保存 Future 列表submit 立即返回 Future 对象,任务在后台并发运行,列表用于后续收集结果
for future in as_completed(futures)按完成顺序迭代 Futureas_completed 不等最慢的,哪个先完成先处理哪个,比 map 响应更及时
threading.Thread(target=self._monitor, daemon=True)创建守护线程daemon=True 确保主程序退出时后台监控线程自动终止,不需要手动停止
self.running = False 配合 while self.running协作式停止标志通过标志位告知线程退出,比强制 kill 线程更安全,保证当前循环体执行完
self.thread.join(timeout=2)等待线程结束,最多 2 秒设置超时防止 stop() 永久阻塞;若线程已停止,join 立即返回

L2 实践层:用好

推荐做法表

推荐做法说明为什么
线程数设置I/O 密集型:线程数 = CPU 核数 × 2 ~ 5充分利用 I/O 等待期间的空闲 CPU
使用 with 管理线程池ThreadPoolExecutor + with 语句自动调用 shutdown(wait=True),防止线程泄漏
使用 daemon=True 处理后台任务日志监控、心跳检测等主程序退出时自动终止,不阻塞进程退出
每个任务内部捕获异常Future.result() 会 re-raise未捕获的异常在 result() 调用时抛出,方便集中处理
避免共享数据优先使用 threading.local 或不可变数据共享数据引入锁竞争和死锁风险
验证并发正确性对共享数据写单元测试竞态条件是概率性的,测试需要反复运行捕获

反模式对比

❌ 反模式✅ 正确做法说明
忘记 join()executor.shutdown(wait=True) 或手动 join()主线程退出,子线程可能未完成
线程泄漏使用 ThreadPoolExecutor + with创建大量线程不关闭,耗尽系统资源
死锁减少锁的数量,使用 RLock多个锁交叉等待
竞态条件使用 Lockthreading.local共享变量未加锁
无限制创建线程使用线程池限制并发数过多线程导致切换开销超过收益

适用场景表

场景推荐线程数建议原因
批量网络请求 (100+ URL)✅ ThreadPoolExecutor10-20I/O 密集,等待时间长
后台日志监控✅ 守护线程1长期运行的轻量任务
CPU 数值计算GIL 限制,改用多进程
少量大文件下载 (5个)✅ 简单 Thread5开销小,不需要线程池
GUI 事件处理✅ 后台线程1-2避免阻塞 UI 主线程

L3 专家层:深入

线程的 OS 级实现原理图

Python 线程是对 操作系统原生线程 的封装:

┌─────────────────────────────────────────────────────────────┐
│              Python 线程与 OS 线程的映射                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Python threading 模块                                      │
│   ┌──────────────────────────────────────────────┐          │
│   │  Thread(target=func)                          │          │
│   │    │                                          │          │
│   │    ▼                                          │          │
│   │  _thread.start_new_thread()                   │          │
│   │    │                                          │          │
│   │    ▼                                          │          │
│   │  C 层 pthread_create() (POSIX)                │          │
│   │  或 CreateThread() (Windows)                  │          │
│   └──────────────────────────────────────────────┘          │
│                          │                                  │
│                          ▼                                  │
│   操作系统                                                      │
│   ┌──────────────────────────────────────────────┐          │
│   │  Linux: pthread (1:1 模型)                    │          │
│   │  macOS: pthread (1:1 模型)                    │          │
│   │  Windows: Win32 Thread (1:1 模型)             │          │
│   └──────────────────────────────────────────────┘          │
│                                                             │
│   Python 使用 1:1 线程模型:                                  │
│   • 1 个 Python Thread = 1 个 OS 线程                        │
│   • 由 OS 调度器决定何时执行                                 │
│   • 线程切换开销约 1-10μs                                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

GIL 与线程调度的交互

python
# gil_thread_interaction_demo.py
"""
演示 GIL 如何影响线程调度。

线程在以下情况释放 GIL:
1. 执行 time.sleep()
2. 执行 I/O 操作
3. 时间片到期(默认 5ms)
"""
import threading
import time

active_threads: list[str] = []
lock = threading.Lock()

def worker(name: str, iterations: int) -> None:
    for i in range(iterations):
        with lock:
            active_threads.append(f"{name}-{i}")
        time.sleep(0)  # 会释放 GIL,允许其他线程运行

t1 = threading.Thread(target=worker, args=("A", 100))
t2 = threading.Thread(target=worker, args=("B", 100))
t1.start()
t2.start()
t1.join()
t2.join()

print(f"活跃线程记录数: {len(active_threads)}")

性能考量表

操作时间复杂度说明
Thread.start()O(1)委托给 OS 创建线程
Thread.join()O(n)n 为线程执行时间
线程切换~1-10μsOS 调度器决定
ThreadPoolExecutor.submit()O(1)任务入队,立即返回
线程创建开销~1-5ms包括栈分配、OS 注册

线程数设置经验法则:

I/O 密集型最佳线程数 ≈ CPU 核数 × (1 + 等待时间 / 计算时间)

示例:
• CPU 核数 = 4
• 等待时间(网络延迟)= 100ms
• 计算时间 = 10ms
• 最佳线程数 ≈ 4 × (1 + 100/10) = 44

设计动机

Python 设计选择原因
使用 OS 原生线程(1:1)利用 OS 调度器,代码简单
提供 ThreadPoolExecutor避免手动管理线程生命周期
daemon 线程自动退出防止后台线程阻止程序退出

知识关联图

线程知识关联:
┌───────────────────┐     ┌───────────────────┐
│  threading.Thread │────→│  ThreadPoolExecutor│
│  手动管理         │     │  池化复用         │
└────────┬──────────┘     └────────┬──────────┘
         │                         │
         ▼                         ▼
┌───────────────────┐     ┌───────────────────┐
│  守护线程         │     │  Future 对象      │
│  daemon=True      │     │  结果/异常/取消   │
└───────────────────┘     └───────────────────┘


┌───────────────────┐
│  线程同步          │
│  Lock/RLock       │
│  Semaphore/Event  │
└───────────────────┘

本章小结

┌─────────────────────────────────────────────────────────────┐
│                      多线程编程 知识要点                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   创建线程:                                                 │
│   ✓ threading.Thread(target=func, args=...)                 │
│   ✓ start() 启动,join() 等待                               │
│   ✓ 守护线程:daemon = True                                 │
│                                                             │
│   Thread 类:                                                │
│   ✓ name、ident、daemon 属性                                │
│   ✓ is_alive()、join() 方法                                 │
│   ✓ 可继承重写 run() 方法                                   │
│                                                             │
│   线程池:                                                   │
│   ✓ ThreadPoolExecutor(max_workers=n)                       │
│   ✓ submit() 提交单个任务                                   │
│   ✓ map() 批量提交                                          │
│   ✓ Future 对象获取结果                                     │
│                                                             │
│   适用场景:                                                 │
│   ✓ I/O 密集型任务                                          │
│   ✓ 网络请求、文件读写                                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘