Skip to content

03-线程同步

Python 3.11+

概念铺垫

线程同步的必要性

实际场景

10 个线程同时访问一个共享计数器,每个线程对计数器加 100,000 次。预期结果是 1,000,000,但实际结果往往小于预期。这是为什么?

问题:多线程访问共享资源时会出现什么问题?

竞态条件

python
# race_condition_demo.py
from __future__ import annotations

import threading

counter: int = 0

def increment() -> None:
    global counter
    for _ in range(100000):
        counter += 1  # 不是原子操作!

threads: list[threading.Thread] = []
for _ in range(10):
    t: threading.Thread = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"预期: 1000000, 实际: {counter}")
# 实际结果可能小于 1000000,因为竞态条件

竞态条件示意

┌─────────────────────────────────────────────────────────────┐
│              竞态条件示意                                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   counter += 1 的执行过程:                                 │
│   1. 读取 counter 的值                                      │
│   2. 加 1                                                   │
│   3. 写回 counter                                           │
│                                                             │
│   两个线程同时执行:                                         │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ 时间 │ 线程 A              │ 线程 B              │   │   │
│   ├──────┼─────────────────────┼─────────────────────┤   │   │
│   │ T1   │ 读取 counter = 0    │                     │   │   │
│   │ T2   │                     │ 读取 counter = 0    │   │   │
│   │ T3   │ 计算 0 + 1 = 1      │                     │   │   │
│   │ T4   │                     │ 计算 0 + 1 = 1      │   │   │
│   │ T5   │ 写回 counter = 1    │                     │   │   │
│   │ T6   │                     │ 写回 counter = 1    │   │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
│   结果:两次加 1,但 counter 只增加了 1                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

分层学习

L1 理解层:会用

Lock 互斥锁

Lock: 互斥锁,保证同一时间只有一个线程访问共享资源。

Lock语法:
┌─────────────────────────────────────────────────────────────┐
│  Lock 锁的使用                                                 │
│                                                              │
│  创建锁:                                                     │
│  lock = threading.Lock()                                      │
│                                                              │
│  使用锁(推荐 with 语句):                                    │
│  with lock:                                                   │
│      # 临界区代码                                             │
│      counter += 1                                             │
│                                                              │
│  ⚠️ 执行流程:                                                 │
│  1. with lock → 获取锁                                        │
│  2. 执行临界区代码                                            │
│  3. 退出 with → 自动释放锁                                    │
│                                                              │
│  ⚠️ 锁的作用:                                                 │
│  • 保证同一时间只有一个线程在临界区                            │
│  • 防止竞态条件                                               │
│  • 确保数据一致性                                             │
└─────────────────────────────────────────────────────────────┘
python
import threading

counter: int = 0
lock: threading.Lock = threading.Lock()

def increment() -> None:
    global counter
    for _ in range(100000):
        with lock:  # 获取锁,自动释放
            counter += 1

threads = []
for _ in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"结果: {counter}")  # 正确:1000000

Lock 手动获取和释放

Lock手动操作:
┌─────────────────────────────────────────────────────────────┐
│  Lock 的手动获取和释放                                         │
│                                                              │
│  方式1:with语句(推荐)                                       │
│  ─────────────────────────────                               │
│  with lock:                                                   │
│      # 临界区代码                                             │
│  # 自动释放锁                                                 │
│                                                              │
│  方式2:手动获取释放                                           │
│  ─────────────────────────────                               │
│  lock.acquire()    # 获取锁                                   │
│  try:                                                         │
│      # 临界区代码                                             │
│  finally:                                                     │
│      lock.release()  # 必须释放                               │
│                                                              │
│  方式3:非阻塞获取                                             │
│  ─────────────────────────────                               │
│  if lock.acquire(blocking=False):                            │
│      try:                                                     │
│          # 临界区代码                                         │
│      finally:                                                 │
│          lock.release()                                       │
│  else:                                                        │
│      print("获取锁失败")                                       │
│                                                              │
│  ⚠️ 推荐:使用 with 语句,自动处理异常和释放                   │
└─────────────────────────────────────────────────────────────┘
python
import threading

lock = threading.Lock()

# 方式 1:with 语句(推荐)
with lock:
    pass  # 临界区代码

# 方式 2:手动获取释放
lock.acquire()
try:
    pass  # 临界区代码
finally:
    lock.release()

# 方式 3:非阻塞获取
if lock.acquire(blocking=False):
    try:
        pass
    finally:
        lock.release()
else:
    print("获取锁失败")
方法含义说明
acquire()获取锁阻塞直到获取成功
acquire(False)非阻塞获取立即返回 True/False
release()释放锁必须由持有者调用

RLock 可重入锁

RLock 允许同一个线程多次获取锁,但必须释放相同次数。

python
from __future__ import annotations

import threading

rlock: threading.RLock = threading.RLock()

def outer() -> None:
    with rlock:
        print("outer 获取锁")
        inner()

def inner() -> None:
    with rlock:  # 同一线程可以再次获取
        print("inner 获取锁")

outer()  # 正常工作

lock: threading.Lock = threading.Lock()
def outer_deadlock() -> None:
    with lock:
        inner_deadlock()  # 死锁!

def inner_deadlock() -> None:
    with lock:  # 等待自己释放锁 → 死锁
        pass
┌─────────────────────────────────────────────────────────────┐
│              Lock vs RLock                                   │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Lock:                                                     │
│   • 同一线程不能重复获取                                    │
│   • 重复获取会死锁                                          │
│   • 性能略高                                                │
│                                                             │
│   RLock:                                                    │
│   • 同一线程可重复获取                                      │
│   • 需要释放相同次数                                        │
│   • 适合递归调用                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Semaphore 信号量

控制同时访问资源的线程数量。

python
from __future__ import annotations

import threading
import time

semaphore: threading.Semaphore = threading.Semaphore(3)

def limited_resource(name: str) -> None:
    with semaphore:
        print(f"{name} 获取资源")
        time.sleep(1)
        print(f"{name} 释放资源")

threads: list[threading.Thread] = []
for i in range(6):
    t: threading.Thread = threading.Thread(target=limited_resource, args=(f"Thread-{i}",))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# 输出:每次最多 3 个线程同时执行

应用场景:

python
from __future__ import annotations

import threading
import time

# 限制数据库连接数
db_semaphore: threading.Semaphore = threading.Semaphore(5)

def query_database(query_id: int) -> None:
    with db_semaphore:
        print(f"查询 {query_id} 开始")
        time.sleep(0.5)
        print(f"查询 {query_id} 完成")

# 限制并发请求数
request_semaphore: threading.Semaphore = threading.Semaphore(10)

def make_request(url: str) -> None:
    with request_semaphore:
        pass  # 发送请求

Event 事件

Event 用于线程间通信,一个线程发出信号,其他线程等待。

python
from __future__ import annotations

import threading
import time

event: threading.Event = threading.Event()

def waiter(name: str) -> None:
    print(f"{name} 等待事件...")
    event.wait()  # 阻塞,直到事件被设置
    print(f"{name} 收到事件,继续执行")

def setter() -> None:
    time.sleep(2)
    print("设置事件")
    event.set()  # 设置事件,唤醒所有等待的线程

threads: list[threading.Thread] = [
    threading.Thread(target=waiter, args=("A",)),
    threading.Thread(target=waiter, args=("B",)),
    threading.Thread(target=setter)
]

for t in threads:
    t.start()

for t in threads:
    t.join()

Event 方法:

python
from __future__ import annotations

import threading

event: threading.Event = threading.Event()

event.set()            # 设置事件
event.clear()          # 清除事件
event.wait()           # 等待事件(阻塞)
event.wait(timeout=5)  # 等待事件(带超时)
print(event.is_set())  # 检查事件是否被设置

Condition 条件变量

Condition 用于复杂的线程同步,结合了 Lock 和 Event 的功能。

python
from __future__ import annotations

import threading
import time
import random

buffer: list[str] = []
buffer_size: int = 5
condition: threading.Condition = threading.Condition()

def producer() -> None:
    for i in range(10):
        with condition:
            while len(buffer) >= buffer_size:
                print("缓冲区满,生产者等待")
                condition.wait()  # 等待消费者消费
            
            item: str = f"商品-{i}"
            buffer.append(item)
            print(f"生产: {item}")
            
            condition.notify_all()  # 通知消费者
        time.sleep(random.random())

def consumer() -> None:
    for _ in range(10):
        with condition:
            while not buffer:
                print("缓冲区空,消费者等待")
                condition.wait()  # 等待生产者生产
            
            item: str = buffer.pop(0)
            print(f"消费: {item}")
            
            condition.notify_all()  # 通知生产者
        time.sleep(random.random())

t1: threading.Thread = threading.Thread(target=producer)
t2: threading.Thread = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

关键代码说明:

代码含义为什么这样写
condition: threading.Condition = threading.Condition()条件变量(内含隐式锁)Condition = Lock + 等待/通知机制,比单独用 Lock + Event 更适合生产者-消费者模型
with condition:进入临界区并自动加锁Condition 支持上下文管理器,进入 with 块自动获取内部锁,退出时释放
while len(buffer) >= buffer_size: condition.wait()循环等待直到有空位while 而不是 ifwait() 返回后必须重新检查条件(防止虚假唤醒)
condition.wait()释放锁并挂起,直到被通知wait() 原子地释放锁让其他线程进入临界区,被唤醒后重新获取锁再继续
condition.notify_all()唤醒所有等待的线程生产者/消费者操作后通知对方,notify_all 确保所有等待者都能重新竞争锁

Barrier 屏障

Barrier 让多个线程在某个点同步,等待所有线程到达后再继续。

python
from __future__ import annotations

import threading
import time
import random

def worker(barrier: threading.Barrier, name: str) -> None:
    print(f"{name} 开始工作")
    time.sleep(random.random())
    print(f"{name} 到达屏障,等待其他线程")
    
    barrier.wait()  # 等待所有线程到达
    
    print(f"{name} 继续执行")

barrier: threading.Barrier = threading.Barrier(3)

threads: list[threading.Thread] = []
for i in range(3):
    t: threading.Thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i}"))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

L2 实践层:用好

推荐做法表

场景推荐原语原因
保护单个共享变量Lock最简单,性能最好
递归调用需要重入RLock允许同一线程多次获取
限制并发数量Semaphore控制资源池大小
等待信号再继续Event一对一或一对多通知
生产者-消费者Condition等待+通知的组合
多线程等待同一时间点Barrier同步点
推荐做法说明为什么
始终用 with lock:自动释放锁防止忘记 release() 导致永久阻塞
while 检查条件而非 ifCondition 等待后重新检查防止虚假唤醒导致条件不满足时继续执行
缩小临界区只锁共享数据,计算不放锁里减少锁竞争,提高并发性能
按固定顺序获取多把锁统一 A→B 顺序破坏循环等待条件,防止死锁
使用 RLock 处理递归调用避免自己锁死自己Lock 不可重入,递归获取会死锁

反模式对比

❌ 反模式✅ 正确做法说明
忘记释放锁始终用 with lock:其他线程永久阻塞
死锁锁的顺序一致,或用超时两个线程互相等待
活锁引入随机退避线程不断重试但永远不成功
过度加锁缩小临界区,减少锁粒度串行化整个程序

适用场景表

场景推荐原语配置建议
共享计数器自增Lock单锁即可
递归数据结构遍历RLock允许同一线程多次进入
数据库连接池 (5 连接)Semaphore(5)并发上限 = 连接池大小
启动信号同步Event主线程 set,工作线程 wait
消息队列 (生产者/消费者)Conditionwhile + wait/notify
并行计算阶段同步Barrier等待所有 worker 完成当前阶段

锁粒度对比

python
# coarse_grained.py — 粗粒度锁(性能差)
import threading

class Counter:
    def __init__(self) -> None:
        self.value: int = 0
        self.lock = threading.Lock()

    def increment_and_double(self) -> None:
        with self.lock:  # 整个操作加锁
            self.value += 1
            self.value *= 2  # 计算也被串行化了

# fine_grained.py — 细粒度锁(性能好)
import threading

class Counter:
    def __init__(self) -> None:
        self.value: int = 0
        self.lock = threading.Lock()

    def increment_and_double(self) -> None:
        with self.lock:  # 只保护共享数据
            self.value += 1
        doubled = self.value * 2  # 纯计算不需要锁
        with self.lock:
            self.value = doubled

L3 专家层:深入

Lock 的底层实现原理图

Python 的 threading.Lock 是对操作系统级互斥锁的封装:

┌─────────────────────────────────────────────────────────────┐
│              Lock 内部实现                                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   threading.Lock 类型:                                      │
│   ┌──────────────────────────────────────────────────┐      │
│   │  Linux/macOS: _thread.LockType                    │      │
│   │    → 底层: pthread_mutex_t (POSIX 互斥锁)         │      │
│   │                                                   │      │
│   │  Windows: _thread.lock                            │      │
│   │    → 底层: CRITICAL_SECTION / Slim Reader/Writer  │      │
│   └──────────────────────────────────────────────────┘      │
│                                                             │
│   获取流程:                                                  │
│   1. 尝试原子操作(CAS - Compare And Swap)                  │
│   2. 如果失败 → 进入 OS 等待队列                             │
│   3. OS 调度器唤醒等待线程                                   │
│   4. 线程获取锁,继续执行                                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

虚假唤醒(Spurious Wakeup)

python
# spurious_wakeup_demo.py
"""
虚假唤醒:线程可能在没有被 notify() 的情况下被唤醒。

这是 OS 级别的已知行为,所以条件等待必须用 while 循环,不能用 if。
"""
import threading
import time

condition = threading.Condition()
buffer: list[str] = []

def consumer() -> None:
    with condition:
        # ✅ 正确:while 循环重新检查条件
        while not buffer:
            condition.wait()
        # ❌ 错误:if 可能因虚假唤醒而继续执行
        # if not buffer:
        #     condition.wait()
        # # 此时 buffer 可能仍然为空!
        item = buffer.pop(0)
        print(f"消费: {item}")

为什么会有虚假唤醒?

原因说明
OS 调度器优化批量唤醒等待线程以减少系统调用
信号中断进程收到信号后可能意外唤醒
POSIX 规范明确允许 pthread_cond_wait() 虚假唤醒

死锁分析与预防

┌─────────────────────────────────────────────────────────────┐
│              死锁的四个必要条件                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   1. 互斥:资源一次只能被一个线程持有                        │
│   2. 持有并等待:线程持有资源同时等待其他资源                 │
│   3. 不可抢占:资源不能被强制从持有线程夺走                  │
│   4. 循环等待:线程 A 等 B,B 等 C,C 等 A                  │
│                                                             │
│   预防策略:破坏其中任意一个条件                              │
│   • 破坏 2:一次申请所有资源                                 │
│   • 破坏 3:使用 try_lock + 超时                             │
│   • 破坏 4:按固定顺序获取锁(最常用)                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘
python
# deadlock_prevention_demo.py
"""
死锁预防:按固定顺序获取锁。

两个线程需要同时获取 lock_a 和 lock_b 时,
如果线程 1 先取 A 后取 B,线程 2 先取 B 后取 A,
就会发生死锁。
"""
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

# ✅ 正确:两个线程都按 A→B 顺序获取
def safe_thread1() -> None:
    with lock_a:
        with lock_b:
            print("线程 1 安全执行")

def safe_thread2() -> None:
    with lock_a:  # 与线程 1 相同顺序
        with lock_b:
            print("线程 2 安全执行")

# ❌ 错误:交叉顺序导致死锁
# def unsafe_thread1():
#     with lock_a:
#         with lock_b: ...
#
# def unsafe_thread2():
#     with lock_b:  # 先取 B
#         with lock_a: ...  # 然后等 A ← 死锁

性能考量表

操作时间说明
Lock.acquire() 无竞争~50ns原子操作
Lock.acquire() 有竞争~1-5μs需要 OS 调度
RLock.acquire()~100ns比 Lock 稍慢(维护计数器)
Semaphore.acquire()~100ns与 RLock 相近
Event.wait() 无等待~50ns事件已设置
Event.wait() 有等待取决于通知时间挂起直到 set()

设计动机

Python 设计选择原因
with lock: 自动释放防止忘记 release() 导致永久阻塞
RLock 支持重入递归函数和嵌套调用需要可重入锁
Condition.wait() 原子释放+挂起防止通知丢失(先释放锁再挂起会丢失通知)

知识关联图

同步原语关联:
┌───────────────────┐
│    Lock           │ ← 最基础的同步原语
└────────┬──────────┘

    ┌────┴────┐
    ▼         ▼
┌────────┐ ┌───────────┐
│ RLock  │ │ Semaphore │
│ 可重入  │ │ 计数锁   │
└────────┘ └─────┬─────┘


          ┌───────────┐
          │ Condition │ ← Lock + 等待/通知
          │ + Event   │
          └─────┬─────┘


          ┌───────────┐
          │ Barrier   │ ← 多路同步点
          └───────────┘

本章小结

┌─────────────────────────────────────────────────────────────┐
│                      线程同步 知识要点                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   竞态条件:                                                 │
│   ✓ 多线程访问共享资源导致数据不一致                        │
│   ✓ 需要同步机制保护临界区                                  │
│                                                             │
│   Lock 锁:                                                  │
│   ✓ 保护临界区,确保原子性                                  │
│   ✓ with lock: 自动获取释放                                 │
│                                                             │
│   RLock 可重入锁:                                           │
│   ✓ 同一线程可多次获取                                      │
│   ✓ 适合递归调用                                            │
│                                                             │
│   Semaphore 信号量:                                         │
│   ✓ 控制并发数量                                            │
│   ✓ 限制资源访问                                            │
│                                                             │
│   Event 事件:                                               │
│   ✓ 线程间通信                                              │
│   ✓ set()、wait()、clear()                                  │
│                                                             │
│   Condition 条件变量:                                       │
│   ✓ 复杂同步场景                                            │
│   ✓ 生产者-消费者模型                                       │
│                                                             │
│   Barrier 屏障:                                             │
│   ✓ 多线程同步点                                            │
│   ✓ 等待所有线程到达                                        │
│                                                             │
└─────────────────────────────────────────────────────────────┘