04-生成器高级特性
Python 版本要求:Python 3.11+
贯穿项目:日志分析系统 本节目标:用 yield from 合并多个日志源,实现日志统一处理
概念铺垫
为什么需要生成器高级特性?
问题场景:合并多个日志源
继续日志分析系统,现在需要从多个来源收集日志:
- 本地日志文件(app.log)
- 远程服务器日志(API 获取)
- 数据库日志表
python
# ❌ 方案一:逐个处理(代码重复)
def process_all_logs():
for line in open("app.log"):
if "ERROR" in line:
print(line)
for line in fetch_remote_logs():
if "ERROR" in line:
print(line)
for record in query_db_logs():
if "ERROR" in record["message"]:
print(record)
# ✅ 方案二:yield from 统一合并
def all_log_sources():
yield from read_local_log("app.log")
yield from fetch_remote_logs()
yield from query_db_logs()
for log in all_log_sources():
if "ERROR" in log:
print(log) # 统一处理问题:如何优雅地合并多个数据源?yield from 是什么?
yield from:接力棒传递
生活类比
把 yield from 想象成接力赛:
┌─────────────────────────────────────────────────────────────┐
│ yield from = 接力棒传递 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 普通生成器:运动员独自跑 │
│ ───────────────────────── │
│ def solo_run(): │
│ yield "跑完第1圈" │
│ yield "跑完第2圈" │
│ yield "跑完第3圈" │
│ │
│ yield from:接力赛 │
│ ───────────────────────── │
│ def relay_race(): │
│ yield "准备接力" │
│ yield from runner_a() # 接力棒交给运动员A │
│ yield from runner_b() # 接力棒交给运动员B │
│ yield "接力结束" │
│ │
│ 效果: │
│ ───────────────────────── │
│ for step in relay_race(): │
│ print(step) │
│ # 准备接力 │
│ # A跑完第1圈 │
│ # A跑完第2圈 │
│ # B跑完第1圈 │
│ # B跑完第2圈 │
│ # 接力结束 │
│ │
│ 关键: │
│ ───────────────────── │
│ • 主生成器不亲自 yield,委托给子生成器 │
│ • 所有值自动传递给调用方 │
│ • 子生成器跑完,主生成器继续 │
│ │
└─────────────────────────────────────────────────────────────┘一句话:
yield from= 把接力棒交给子生成器,让它帮你产出所有值
yield from 语法
python
from collections.abc import Iterator
def sub_generator() -> Iterator[int]:
"""子生成器"""
yield 1
yield 2
yield 3
def main_generator() -> Iterator[int]:
"""主生成器"""
yield "开始"
yield from sub_generator() # 委托给子生成器
yield "结束"
for value in main_generator():
print(value)
# 开始
# 1
# 2
# 3
# 结束yield from 执行时间线
┌─────────────────────────────────────────────────────────────┐
│ yield from 执行时间线 │
│ │
│ 代码: │
│ yield from sub_generator() │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 时间线 │ │
│ │ │ │
│ │ 1. next(main_gen) ─→ 执行到 yield "开始" │ │
│ │ 返回 "开始" │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 2. next(main_gen) ─→ 执行到 yield from │ │
│ │ 创建子生成器 sub_gen │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 3. 自动调用 next(sub_gen) │ │
│ │ 返回 1(通过主生成器传递给调用方) │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 4. next(main_gen) ─→ 自动传递到 next(sub_gen) │ │
│ │ 返回 2 │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 5. next(main_gen) ─→ 自动传递到 next(sub_gen) │ │
│ │ 返回 3 │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 6. sub_gen 结束 ─→ StopIteration │ │
│ │ yield from 捕获,继续执行主生成器 │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 7. 执行到 yield "结束" │ │
│ │ 返回 "结束" │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 8. main_gen 结束 ─→ StopIteration │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ⚠️ 关键:调用方只和主生成器交互 │
│ ⚠️ 关键:yield from 自动处理子生成器的所有值 │
│ │
└─────────────────────────────────────────────────────────────┘L1 理解层:会用
yield from 实用技巧
展平嵌套结构
python
from collections.abc import Iterator
from typing import Any
def flatten(nested: list[Any]) -> Iterator[Any]:
"""递归展平嵌套列表"""
for item in nested:
if isinstance(item, list):
yield from flatten(item) # 递归委托
else:
yield item
nested = [1, [2, 3], [4, [5, 6]], 7]
print(list(flatten(nested)))
# [1, 2, 3, 4, 5, 6, 7]获取子生成器返回值
python
from collections.abc import Iterator
def sub_gen() -> Iterator[int]:
"""子生成器带返回值"""
yield 1
yield 2
return "子生成器完成" # 返回值
def main_gen() -> Iterator[str | int]:
"""主生成器接收返回值"""
result: str = yield from sub_gen()
yield f"收到返回值:{result}"
gen = main_gen()
print(next(gen)) # 1
print(next(gen)) # 2
print(next(gen)) # 收到返回值:子生成器完成贯穿实战:合并多源日志
场景说明
日志分析系统需要从三个来源收集日志:
┌─────────────────────────────────────────────────────────────┐
│ 日志来源架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ 本地日志 │ app.log │
│ │ 文件 │ │
│ └────┬────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────┐ │
│ │ 远程服务器 │ API: /api/logs │
│ │ 日志 │ │
│ └────┬────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────┐ │
│ │ 数据库日志 │ SELECT * FROM logs │
│ │ 表 │ │
│ └────┬────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────┐ │
│ │ unified_logs│ yield from 合并 │
│ │ () │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘实现代码
python
from collections.abc import Iterator
from datetime import datetime
from typing import Any
def read_local_log(filename: str) -> Iterator[dict[str, Any]]:
"""读取本地日志文件
贯穿项目:日志分析系统
功能:逐行解析本地日志文件
"""
with open(filename, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
yield {
"source": "local",
"content": line.strip(),
"timestamp": datetime.now()
}
def fetch_remote_logs(api_url: str) -> Iterator[dict[str, Any]]:
"""模拟远程日志获取
贯穿项目:日志分析系统
功能:从 API 获取远程服务器日志
"""
# 模拟 API 返回数据
mock_logs = [
"[2024-01-19 10:00:00] ERROR remote_server - 连接失败",
"[2024-01-19 10:05:00] WARN remote_server - 超时警告",
"[2024-01-19 10:10:00] INFO remote_server - 重连成功",
]
for line in mock_logs:
yield {
"source": "remote",
"content": line,
"timestamp": datetime.now()
}
def query_db_logs(query: str) -> Iterator[dict[str, Any]]:
"""模拟数据库日志查询
贯穿项目:日志分析系统
功能:从数据库查询日志记录
"""
# 模拟数据库返回
mock_records = [
{"id": 1, "message": "用户登录失败", "level": "ERROR"},
{"id": 2, "message": "订单创建成功", "level": "INFO"},
]
for record in mock_records:
yield {
"source": "database",
"content": f"[{record['level']}] {record['message']}",
"timestamp": datetime.now(),
"db_id": record["id"]
}
def unified_logs(
local_file: str = "app.log",
remote_api: str = "/api/logs",
db_query: str = "SELECT * FROM logs"
) -> Iterator[dict[str, Any]]:
"""合并所有日志源
贯穿项目:日志分析系统
核心功能:用 yield from 统一多个日志来源
"""
yield from read_local_log(local_file)
yield from fetch_remote_logs(remote_api)
yield from query_db_logs(db_query)
# 使用:统一处理所有日志源
for log_entry in unified_logs():
if "ERROR" in log_entry["content"]:
print(f"[{log_entry['source']}] {log_entry['content']}")关键代码说明:
| 代码 | 含义 | 为什么这样写 |
|---|---|---|
yield from read_local_log() | 委托给本地日志生成器 | 自动传递所有值,无需手动循环 |
yield {"source": "local", ...} | 标记日志来源 | 便于区分不同来源的日志 |
def unified_logs() | 统一入口 | 所有日志源通过一个函数访问 |
| 三个 yield from 连续使用 | 顺序合并 | 先本地,后远程,最后数据库 |
统计错误数量
python
from collections import Counter
def count_errors_by_source(logs: Iterator[dict[str, Any]]) -> dict[str, int]:
"""按来源统计错误数量"""
error_counts: Counter[str] = Counter()
for log in logs:
if "ERROR" in log["content"]:
error_counts[log["source"]] += 1
return dict(error_counts)
# 使用
all_logs = unified_logs()
result = count_errors_by_source(all_logs)
print(result)
# {'local': 5, 'remote': 1, 'database': 1}生成器的其他高级方法
send():双向通信
send() 可以向生成器发送值,实现双向数据流:
python
from collections.abc import Generator
def accumulator() -> Generator[int, int, None]:
"""累加器:接收外部值并累加"""
total: int = 0
while True:
value: int = yield total # yield 返回累计值,接收外部值
total += value
acc = accumulator()
next(acc) # 启动生成器,返回 0
print(acc.send(10)) # 发送 10,返回 10
print(acc.send(20)) # 发送 20,返回 30
print(acc.send(5)) # 发送 5,返回 35执行流程:
┌─────────────────────────────────────────────────────────────┐
│ send() 执行流程 │
│ │
│ value = yield total │
│ │ │ │
│ │ └── 返回给调用方的值 │
│ └── send() 传入的值 │
│ │
│ 步骤: │
│ 1. next(acc) ─→ 执行到 yield total │
│ 返回 0,暂停在 yield 处 │
│ │
│ 2. acc.send(10) ─→ value = 10 │
│ total = 0 + 10 = 10 │
│ 继续到 yield total │
│ 返回 10,暂停 │
│ │
│ ⚠️ 注意:首次必须用 next() 或 send(None) 启动 │
│ │
└─────────────────────────────────────────────────────────────┘throw():注入异常
向生成器内部抛出异常,可用于错误恢复:
python
from collections.abc import Generator
def resilient_gen() -> Generator[int, None, None]:
"""可恢复的生成器"""
try:
yield 1
yield 2
yield 3
except ValueError as e:
print(f"恢复:{e}")
yield -1 # 错误恢复后继续
gen = resilient_gen()
print(next(gen)) # 1
gen.throw(ValueError("测试"))
# 恢复:测试
print(next(gen)) # -1close():关闭生成器
提前关闭生成器,触发 finally 块:
python
from collections.abc import Generator
def resource_gen() -> Generator[int, None, None]:
"""带资源清理的生成器"""
try:
for i in range(10):
yield i
finally:
print("清理资源")
gen = resource_gen()
print(next(gen)) # 0
print(next(gen)) # 1
gen.close() # 清理资源
# next(gen) 会抛出 StopIterationL2 实践层:用好
推荐做法与反模式
推荐做法
| 做法 | 原因 | 示例 |
|---|---|---|
| 用 yield from 合并数据源 | 避免重复代码 | yield from read_file() |
| yield from 接收返回值 | 获取子生成器状态 | result = yield from sub() |
| send() 前先启动 | 防止 TypeError | next(gen) 或 gen.send(None) |
| finally 包裹资源 | 确保 close() 清理 | try: yield x; finally: cleanup() |
反模式:不要这样做
python
# ❌ 错误:send() 未先启动
def accumulator():
total = 0
while True:
value = yield total
total += value
acc = accumulator()
acc.send(10) # TypeError: can't send non-None to just-started generator
# ✅ 正确:先启动
acc = accumulator()
next(acc) # 启动生成器
acc.send(10) # 正常工作python
# ❌ 错误:GeneratorExit 后继续 yield
def bad_gen():
try:
yield 1
except GeneratorExit:
yield 2 # RuntimeError: generator ignored GeneratorExit
# ✅ 正确:GeneratorExit 后直接退出
def good_gen():
try:
yield 1
except GeneratorExit:
print("清理") # 只做清理,不 yield适用场景
| 场景 | 推荐 | 方法 |
|---|---|---|
| 合并多个数据源 | ✅ | yield from |
| 展平嵌套结构 | ✅ | yield from 递归 |
| 生产者-消费者 | ✅ | send() 双向通信 |
| 错误恢复 | ✅ | throw() + try/except |
| 资源清理 | ✅ | close() + finally |
| 简单遍历 | ❌ | 普通生成器足够 |
L3 专家层:深入
yield from 的底层委托机制
yield from 的 CPython 实现流程:
主生成器 main_gen:
┌──────────────────────────────────────┐
│ for item in main_gen: │
│ ... │
│ yield from sub_gen() │
│ ... │
└──────────────────────────────────────┘
│
▼ yield from 执行:
┌──────────────────────────────────────┐
│ ① 创建子生成器 sub = sub_gen() │
│ ② 首次 next() 由主生成器转发 │
│ ③ sub_gen 每次 yield → 值直达调用方│
│ ④ sub_gen return X → 值赋给 result │
│ result = yield from sub_gen() │
│ ⑤ sub_gen 的 StopIteration 被捕获 │
│ ⑥ 主生成器继续执行 │
└──────────────────────────────────────┘
关键字节码:
YIELD_FROM → 委托给子生成器
GET_YIELD_FROM_ITER → 获取子生成器的迭代器
verify:
import dis
def demo():
yield from range(3)
dis.dis(demo)
# 包含 LOAD_GLOBAL range, CALL, GET_YIELD_FROM_ITER, YIELD_FROMsend/throw/close 三方协议
生成器的三方通信协议:
调用方 生成器内部
────── ──────────
next(gen) → 执行到 yield,暂停,返回值
gen.send(value) → 恢复执行,yield 表达式接收 value
gen.throw(exc) → 向 yield 处注入异常
gen.close() → 向 yield 处注入 GeneratorExit
生成器状态机:
┌──────────┐ next()/send() ┌──────────────┐
│GEN_CREATED│ ───────────────→ │GEN_SUSPENDED │
└──────────┘ └──────────────┘
│ ↑
next()/send()/throw│ │
│ │
↓ │
┌──────────────┐
│ 执行中... │
│ (GEN_RUNNING)│
└──────────────┘
│
yield/return/异常
│
↓
┌──────────────┐
│GEN_SUSPENDED │ ← 回到暂停
└──────────────┘
│
close()/耗尽
│
↓
┌──────────────┐
│ GEN_CLOSED │
└──────────────┘协程演进路线
yield → yield from → async/await 的演化:
Python 2.5 (PEP 342):
┌──────────────────────────────────────┐
│ yield 表达式 + send() │
│ 生成器从"产出"变成"协程"雏形 │
│ def coro(): │
│ data = yield # 接收外部数据 │
└──────────────────────────────────────┘
│
▼
Python 3.3 (PEP 380):
┌──────────────────────────────────────┐
│ yield from 委托语法 │
│ 支持子生成器透明代理 │
│ 支持返回值传递 return value │
└──────────────────────────────────────┘
│
▼
Python 3.5 (PEP 492):
┌──────────────────────────────────────┐
│ async def / await 正式协程 │
│ async def → 原生协程 (Coroutine) │
│ await → 等待异步结果 │
│ │
│ 关键区别: │
│ • 生成器:yield 是基于迭代器的协程 │
│ • 原生协程:async/await 是事件循环驱动│
│ • 不能混用(async def 不能用 yield) │
└──────────────────────────────────────┘send() 启动问题的底层原因
python
# 为什么 send(None) 等价于 next()?
# CPython gen_send_ex() 中:
# 如果生成器状态是 GEN_CREATED(刚创建):
# - send(None) → 执行到第一个 yield,等同于 next()
# - send(非None) → TypeError 报错
# 原因:生成器创建时,指令指针在函数开头
# 没有 yield 表达式可以接收 send 的值
# 必须先执行到第一个 yield,才能接收值
def echo():
received = yield "ready" # 这次 yield 后才准备好接收
yield f"got: {received}"
g = echo()
print(g.send(None)) # OK: "ready"(等价于 next(g))
print(g.send("hi")) # OK: "got: hi"性能考量
| 操作 | 时间复杂度 | 空间复杂度 | 说明 |
|---|---|---|---|
| yield from 委托 | O(1) 建立 | O(200字节) | 创建一个子生成器对象 |
| send() 双向通信 | O(1) | O(1) | 帧上下文切换,常数时间 |
| throw() 异常注入 | O(1) | O(1) | 异常对象传递 |
| close() 关闭 | O(1) | O(1) | GeneratorExit 注入 |
知识关联
生成器高级特性知识关联:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ yield │────→│ send() │────→│ throw() │
│ 单向产出 │ │ 双向通信 │ │ 异常注入 │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ yield from │────→│ 子生成器委托│────→│ close() │
│ 代理语法 │ │ 返回值传递 │ │ 资源清理 │
└──────────────┘ └──────────────┘ └──────────────┘
│
↓
┌──────────────┐ ┌──────────────┐
│ async/await │ │ 事件循环 │
│ 原生协程 │────→│ 异步驱动 │
└──────────────┘ └──────────────┘自检清单
回答以下问题,检查你是否掌握了核心概念:
yield from和直接yield有什么区别?- 如何获取子生成器的返回值?
send()为什么必须先启动生成器?close()和throw()有什么区别?- 为什么用
yield from合并日志源更好?
答案:
yield from委托子生成器产出所有值,yield只产出一个值result = yield from sub_gen(),子生成器的 return 值赋给 result- 生成器启动时停在第一个 yield 前,还没有 yield 表达式接收值
close()发送 GeneratorExit 用于关闭,throw()发送普通异常用于恢复- 避免重复处理逻辑,统一入口,代码更简洁
本章能力清单
学完本章,你能够:
- [x] 用
yield from合并多个日志数据源 - [x] 用
yield from展平嵌套结构 - [x] 获取子生成器的返回值
- [x] 使用
send()实现双向通信 - [x] 使用
close()确保资源清理
前置知识检查:
- 你是否掌握了生成器基础(yield 机制)? ← 第3节
下一步学习:
- itertools 模块(高效批量处理) → 第5节
- 异步生成器(async yield) → 第6节
导航: