03-线程同步
Python 3.11+
概念铺垫
线程同步的必要性
实际场景
10 个线程同时访问一个共享计数器,每个线程对计数器加 100,000 次。预期结果是 1,000,000,但实际结果往往小于预期。这是为什么?
问题:多线程访问共享资源时会出现什么问题?
竞态条件
python
# race_condition_demo.py
from __future__ import annotations
import threading
counter: int = 0
def increment() -> None:
global counter
for _ in range(100000):
counter += 1 # 不是原子操作!
threads: list[threading.Thread] = []
for _ in range(10):
t: threading.Thread = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"预期: 1000000, 实际: {counter}")
# 实际结果可能小于 1000000,因为竞态条件竞态条件示意
┌─────────────────────────────────────────────────────────────┐
│ 竞态条件示意 │
├─────────────────────────────────────────────────────────────┤
│ │
│ counter += 1 的执行过程: │
│ 1. 读取 counter 的值 │
│ 2. 加 1 │
│ 3. 写回 counter │
│ │
│ 两个线程同时执行: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 时间 │ 线程 A │ 线程 B │ │ │
│ ├──────┼─────────────────────┼─────────────────────┤ │ │
│ │ T1 │ 读取 counter = 0 │ │ │ │
│ │ T2 │ │ 读取 counter = 0 │ │ │
│ │ T3 │ 计算 0 + 1 = 1 │ │ │ │
│ │ T4 │ │ 计算 0 + 1 = 1 │ │ │
│ │ T5 │ 写回 counter = 1 │ │ │ │
│ │ T6 │ │ 写回 counter = 1 │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 结果:两次加 1,但 counter 只增加了 1 │
│ │
└─────────────────────────────────────────────────────────────┘分层学习
L1 理解层:会用
Lock 互斥锁
Lock: 互斥锁,保证同一时间只有一个线程访问共享资源。
Lock语法:
┌─────────────────────────────────────────────────────────────┐
│ Lock 锁的使用 │
│ │
│ 创建锁: │
│ lock = threading.Lock() │
│ │
│ 使用锁(推荐 with 语句): │
│ with lock: │
│ # 临界区代码 │
│ counter += 1 │
│ │
│ ⚠️ 执行流程: │
│ 1. with lock → 获取锁 │
│ 2. 执行临界区代码 │
│ 3. 退出 with → 自动释放锁 │
│ │
│ ⚠️ 锁的作用: │
│ • 保证同一时间只有一个线程在临界区 │
│ • 防止竞态条件 │
│ • 确保数据一致性 │
└─────────────────────────────────────────────────────────────┘python
import threading
counter: int = 0
lock: threading.Lock = threading.Lock()
def increment() -> None:
global counter
for _ in range(100000):
with lock: # 获取锁,自动释放
counter += 1
threads = []
for _ in range(10):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"结果: {counter}") # 正确:1000000Lock 手动获取和释放
Lock手动操作:
┌─────────────────────────────────────────────────────────────┐
│ Lock 的手动获取和释放 │
│ │
│ 方式1:with语句(推荐) │
│ ───────────────────────────── │
│ with lock: │
│ # 临界区代码 │
│ # 自动释放锁 │
│ │
│ 方式2:手动获取释放 │
│ ───────────────────────────── │
│ lock.acquire() # 获取锁 │
│ try: │
│ # 临界区代码 │
│ finally: │
│ lock.release() # 必须释放 │
│ │
│ 方式3:非阻塞获取 │
│ ───────────────────────────── │
│ if lock.acquire(blocking=False): │
│ try: │
│ # 临界区代码 │
│ finally: │
│ lock.release() │
│ else: │
│ print("获取锁失败") │
│ │
│ ⚠️ 推荐:使用 with 语句,自动处理异常和释放 │
└─────────────────────────────────────────────────────────────┘python
import threading
lock = threading.Lock()
# 方式 1:with 语句(推荐)
with lock:
pass # 临界区代码
# 方式 2:手动获取释放
lock.acquire()
try:
pass # 临界区代码
finally:
lock.release()
# 方式 3:非阻塞获取
if lock.acquire(blocking=False):
try:
pass
finally:
lock.release()
else:
print("获取锁失败")| 方法 | 含义 | 说明 |
|---|---|---|
acquire() | 获取锁 | 阻塞直到获取成功 |
acquire(False) | 非阻塞获取 | 立即返回 True/False |
release() | 释放锁 | 必须由持有者调用 |
RLock 可重入锁
RLock 允许同一个线程多次获取锁,但必须释放相同次数。
python
from __future__ import annotations
import threading
rlock: threading.RLock = threading.RLock()
def outer() -> None:
with rlock:
print("outer 获取锁")
inner()
def inner() -> None:
with rlock: # 同一线程可以再次获取
print("inner 获取锁")
outer() # 正常工作
lock: threading.Lock = threading.Lock()
def outer_deadlock() -> None:
with lock:
inner_deadlock() # 死锁!
def inner_deadlock() -> None:
with lock: # 等待自己释放锁 → 死锁
pass┌─────────────────────────────────────────────────────────────┐
│ Lock vs RLock │
├─────────────────────────────────────────────────────────────┤
│ │
│ Lock: │
│ • 同一线程不能重复获取 │
│ • 重复获取会死锁 │
│ • 性能略高 │
│ │
│ RLock: │
│ • 同一线程可重复获取 │
│ • 需要释放相同次数 │
│ • 适合递归调用 │
│ │
└─────────────────────────────────────────────────────────────┘Semaphore 信号量
控制同时访问资源的线程数量。
python
from __future__ import annotations
import threading
import time
semaphore: threading.Semaphore = threading.Semaphore(3)
def limited_resource(name: str) -> None:
with semaphore:
print(f"{name} 获取资源")
time.sleep(1)
print(f"{name} 释放资源")
threads: list[threading.Thread] = []
for i in range(6):
t: threading.Thread = threading.Thread(target=limited_resource, args=(f"Thread-{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()
# 输出:每次最多 3 个线程同时执行应用场景:
python
from __future__ import annotations
import threading
import time
# 限制数据库连接数
db_semaphore: threading.Semaphore = threading.Semaphore(5)
def query_database(query_id: int) -> None:
with db_semaphore:
print(f"查询 {query_id} 开始")
time.sleep(0.5)
print(f"查询 {query_id} 完成")
# 限制并发请求数
request_semaphore: threading.Semaphore = threading.Semaphore(10)
def make_request(url: str) -> None:
with request_semaphore:
pass # 发送请求Event 事件
Event 用于线程间通信,一个线程发出信号,其他线程等待。
python
from __future__ import annotations
import threading
import time
event: threading.Event = threading.Event()
def waiter(name: str) -> None:
print(f"{name} 等待事件...")
event.wait() # 阻塞,直到事件被设置
print(f"{name} 收到事件,继续执行")
def setter() -> None:
time.sleep(2)
print("设置事件")
event.set() # 设置事件,唤醒所有等待的线程
threads: list[threading.Thread] = [
threading.Thread(target=waiter, args=("A",)),
threading.Thread(target=waiter, args=("B",)),
threading.Thread(target=setter)
]
for t in threads:
t.start()
for t in threads:
t.join()Event 方法:
python
from __future__ import annotations
import threading
event: threading.Event = threading.Event()
event.set() # 设置事件
event.clear() # 清除事件
event.wait() # 等待事件(阻塞)
event.wait(timeout=5) # 等待事件(带超时)
print(event.is_set()) # 检查事件是否被设置Condition 条件变量
Condition 用于复杂的线程同步,结合了 Lock 和 Event 的功能。
python
from __future__ import annotations
import threading
import time
import random
buffer: list[str] = []
buffer_size: int = 5
condition: threading.Condition = threading.Condition()
def producer() -> None:
for i in range(10):
with condition:
while len(buffer) >= buffer_size:
print("缓冲区满,生产者等待")
condition.wait() # 等待消费者消费
item: str = f"商品-{i}"
buffer.append(item)
print(f"生产: {item}")
condition.notify_all() # 通知消费者
time.sleep(random.random())
def consumer() -> None:
for _ in range(10):
with condition:
while not buffer:
print("缓冲区空,消费者等待")
condition.wait() # 等待生产者生产
item: str = buffer.pop(0)
print(f"消费: {item}")
condition.notify_all() # 通知生产者
time.sleep(random.random())
t1: threading.Thread = threading.Thread(target=producer)
t2: threading.Thread = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()关键代码说明:
| 代码 | 含义 | 为什么这样写 |
|---|---|---|
condition: threading.Condition = threading.Condition() | 条件变量(内含隐式锁) | Condition = Lock + 等待/通知机制,比单独用 Lock + Event 更适合生产者-消费者模型 |
with condition: | 进入临界区并自动加锁 | Condition 支持上下文管理器,进入 with 块自动获取内部锁,退出时释放 |
while len(buffer) >= buffer_size: condition.wait() | 循环等待直到有空位 | 用 while 而不是 if:wait() 返回后必须重新检查条件(防止虚假唤醒) |
condition.wait() | 释放锁并挂起,直到被通知 | wait() 原子地释放锁让其他线程进入临界区,被唤醒后重新获取锁再继续 |
condition.notify_all() | 唤醒所有等待的线程 | 生产者/消费者操作后通知对方,notify_all 确保所有等待者都能重新竞争锁 |
Barrier 屏障
Barrier 让多个线程在某个点同步,等待所有线程到达后再继续。
python
from __future__ import annotations
import threading
import time
import random
def worker(barrier: threading.Barrier, name: str) -> None:
print(f"{name} 开始工作")
time.sleep(random.random())
print(f"{name} 到达屏障,等待其他线程")
barrier.wait() # 等待所有线程到达
print(f"{name} 继续执行")
barrier: threading.Barrier = threading.Barrier(3)
threads: list[threading.Thread] = []
for i in range(3):
t: threading.Thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i}"))
threads.append(t)
t.start()
for t in threads:
t.join()L2 实践层:用好
推荐做法表
| 场景 | 推荐原语 | 原因 |
|---|---|---|
| 保护单个共享变量 | Lock | 最简单,性能最好 |
| 递归调用需要重入 | RLock | 允许同一线程多次获取 |
| 限制并发数量 | Semaphore | 控制资源池大小 |
| 等待信号再继续 | Event | 一对一或一对多通知 |
| 生产者-消费者 | Condition | 等待+通知的组合 |
| 多线程等待同一时间点 | Barrier | 同步点 |
| 推荐做法 | 说明 | 为什么 |
|---|---|---|
始终用 with lock: | 自动释放锁 | 防止忘记 release() 导致永久阻塞 |
用 while 检查条件而非 if | Condition 等待后重新检查 | 防止虚假唤醒导致条件不满足时继续执行 |
| 缩小临界区 | 只锁共享数据,计算不放锁里 | 减少锁竞争,提高并发性能 |
| 按固定顺序获取多把锁 | 统一 A→B 顺序 | 破坏循环等待条件,防止死锁 |
使用 RLock 处理递归调用 | 避免自己锁死自己 | Lock 不可重入,递归获取会死锁 |
反模式对比
| ❌ 反模式 | ✅ 正确做法 | 说明 |
|---|---|---|
| 忘记释放锁 | 始终用 with lock: | 其他线程永久阻塞 |
| 死锁 | 锁的顺序一致,或用超时 | 两个线程互相等待 |
| 活锁 | 引入随机退避 | 线程不断重试但永远不成功 |
| 过度加锁 | 缩小临界区,减少锁粒度 | 串行化整个程序 |
适用场景表
| 场景 | 推荐原语 | 配置建议 |
|---|---|---|
| 共享计数器自增 | Lock | 单锁即可 |
| 递归数据结构遍历 | RLock | 允许同一线程多次进入 |
| 数据库连接池 (5 连接) | Semaphore(5) | 并发上限 = 连接池大小 |
| 启动信号同步 | Event | 主线程 set,工作线程 wait |
| 消息队列 (生产者/消费者) | Condition | while + wait/notify |
| 并行计算阶段同步 | Barrier | 等待所有 worker 完成当前阶段 |
锁粒度对比
python
# coarse_grained.py — 粗粒度锁(性能差)
import threading
class Counter:
def __init__(self) -> None:
self.value: int = 0
self.lock = threading.Lock()
def increment_and_double(self) -> None:
with self.lock: # 整个操作加锁
self.value += 1
self.value *= 2 # 计算也被串行化了
# fine_grained.py — 细粒度锁(性能好)
import threading
class Counter:
def __init__(self) -> None:
self.value: int = 0
self.lock = threading.Lock()
def increment_and_double(self) -> None:
with self.lock: # 只保护共享数据
self.value += 1
doubled = self.value * 2 # 纯计算不需要锁
with self.lock:
self.value = doubledL3 专家层:深入
Lock 的底层实现原理图
Python 的 threading.Lock 是对操作系统级互斥锁的封装:
┌─────────────────────────────────────────────────────────────┐
│ Lock 内部实现 │
├─────────────────────────────────────────────────────────────┤
│ │
│ threading.Lock 类型: │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Linux/macOS: _thread.LockType │ │
│ │ → 底层: pthread_mutex_t (POSIX 互斥锁) │ │
│ │ │ │
│ │ Windows: _thread.lock │ │
│ │ → 底层: CRITICAL_SECTION / Slim Reader/Writer │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ 获取流程: │
│ 1. 尝试原子操作(CAS - Compare And Swap) │
│ 2. 如果失败 → 进入 OS 等待队列 │
│ 3. OS 调度器唤醒等待线程 │
│ 4. 线程获取锁,继续执行 │
│ │
└─────────────────────────────────────────────────────────────┘虚假唤醒(Spurious Wakeup)
python
# spurious_wakeup_demo.py
"""
虚假唤醒:线程可能在没有被 notify() 的情况下被唤醒。
这是 OS 级别的已知行为,所以条件等待必须用 while 循环,不能用 if。
"""
import threading
import time
condition = threading.Condition()
buffer: list[str] = []
def consumer() -> None:
with condition:
# ✅ 正确:while 循环重新检查条件
while not buffer:
condition.wait()
# ❌ 错误:if 可能因虚假唤醒而继续执行
# if not buffer:
# condition.wait()
# # 此时 buffer 可能仍然为空!
item = buffer.pop(0)
print(f"消费: {item}")为什么会有虚假唤醒?
| 原因 | 说明 |
|---|---|
| OS 调度器优化 | 批量唤醒等待线程以减少系统调用 |
| 信号中断 | 进程收到信号后可能意外唤醒 |
| POSIX 规范 | 明确允许 pthread_cond_wait() 虚假唤醒 |
死锁分析与预防
┌─────────────────────────────────────────────────────────────┐
│ 死锁的四个必要条件 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 互斥:资源一次只能被一个线程持有 │
│ 2. 持有并等待:线程持有资源同时等待其他资源 │
│ 3. 不可抢占:资源不能被强制从持有线程夺走 │
│ 4. 循环等待:线程 A 等 B,B 等 C,C 等 A │
│ │
│ 预防策略:破坏其中任意一个条件 │
│ • 破坏 2:一次申请所有资源 │
│ • 破坏 3:使用 try_lock + 超时 │
│ • 破坏 4:按固定顺序获取锁(最常用) │
│ │
└─────────────────────────────────────────────────────────────┘python
# deadlock_prevention_demo.py
"""
死锁预防:按固定顺序获取锁。
两个线程需要同时获取 lock_a 和 lock_b 时,
如果线程 1 先取 A 后取 B,线程 2 先取 B 后取 A,
就会发生死锁。
"""
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
# ✅ 正确:两个线程都按 A→B 顺序获取
def safe_thread1() -> None:
with lock_a:
with lock_b:
print("线程 1 安全执行")
def safe_thread2() -> None:
with lock_a: # 与线程 1 相同顺序
with lock_b:
print("线程 2 安全执行")
# ❌ 错误:交叉顺序导致死锁
# def unsafe_thread1():
# with lock_a:
# with lock_b: ...
#
# def unsafe_thread2():
# with lock_b: # 先取 B
# with lock_a: ... # 然后等 A ← 死锁性能考量表
| 操作 | 时间 | 说明 |
|---|---|---|
Lock.acquire() 无竞争 | ~50ns | 原子操作 |
Lock.acquire() 有竞争 | ~1-5μs | 需要 OS 调度 |
RLock.acquire() | ~100ns | 比 Lock 稍慢(维护计数器) |
Semaphore.acquire() | ~100ns | 与 RLock 相近 |
Event.wait() 无等待 | ~50ns | 事件已设置 |
Event.wait() 有等待 | 取决于通知时间 | 挂起直到 set() |
设计动机
| Python 设计选择 | 原因 |
|---|---|
with lock: 自动释放 | 防止忘记 release() 导致永久阻塞 |
RLock 支持重入 | 递归函数和嵌套调用需要可重入锁 |
Condition.wait() 原子释放+挂起 | 防止通知丢失(先释放锁再挂起会丢失通知) |
知识关联图
同步原语关联:
┌───────────────────┐
│ Lock │ ← 最基础的同步原语
└────────┬──────────┘
│
┌────┴────┐
▼ ▼
┌────────┐ ┌───────────┐
│ RLock │ │ Semaphore │
│ 可重入 │ │ 计数锁 │
└────────┘ └─────┬─────┘
│
▼
┌───────────┐
│ Condition │ ← Lock + 等待/通知
│ + Event │
└─────┬─────┘
│
▼
┌───────────┐
│ Barrier │ ← 多路同步点
└───────────┘本章小结
┌─────────────────────────────────────────────────────────────┐
│ 线程同步 知识要点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 竞态条件: │
│ ✓ 多线程访问共享资源导致数据不一致 │
│ ✓ 需要同步机制保护临界区 │
│ │
│ Lock 锁: │
│ ✓ 保护临界区,确保原子性 │
│ ✓ with lock: 自动获取释放 │
│ │
│ RLock 可重入锁: │
│ ✓ 同一线程可多次获取 │
│ ✓ 适合递归调用 │
│ │
│ Semaphore 信号量: │
│ ✓ 控制并发数量 │
│ ✓ 限制资源访问 │
│ │
│ Event 事件: │
│ ✓ 线程间通信 │
│ ✓ set()、wait()、clear() │
│ │
│ Condition 条件变量: │
│ ✓ 复杂同步场景 │
│ ✓ 生产者-消费者模型 │
│ │
│ Barrier 屏障: │
│ ✓ 多线程同步点 │
│ ✓ 等待所有线程到达 │
│ │
└─────────────────────────────────────────────────────────────┘