Skip to content

04-生成器高级特性

Python 版本要求:Python 3.11+

贯穿项目:日志分析系统 本节目标:用 yield from 合并多个日志源,实现日志统一处理


概念铺垫

为什么需要生成器高级特性?

问题场景:合并多个日志源

继续日志分析系统,现在需要从多个来源收集日志:

  • 本地日志文件(app.log)
  • 远程服务器日志(API 获取)
  • 数据库日志表
python
# ❌ 方案一:逐个处理(代码重复)
def process_all_logs():
    for line in open("app.log"):
        if "ERROR" in line:
            print(line)
    for line in fetch_remote_logs():
        if "ERROR" in line:
            print(line)
    for record in query_db_logs():
        if "ERROR" in record["message"]:
            print(record)

# ✅ 方案二:yield from 统一合并
def all_log_sources():
    yield from read_local_log("app.log")
    yield from fetch_remote_logs()
    yield from query_db_logs()

for log in all_log_sources():
    if "ERROR" in log:
        print(log)  # 统一处理

问题:如何优雅地合并多个数据源?yield from 是什么?


yield from:接力棒传递

生活类比

yield from 想象成接力赛:

┌─────────────────────────────────────────────────────────────┐
│  yield from = 接力棒传递                                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  普通生成器:运动员独自跑                                     │
│  ─────────────────────────                                  │
│  def solo_run():                                            │
│      yield "跑完第1圈"                                       │
│      yield "跑完第2圈"                                       │
│      yield "跑完第3圈"                                       │
│                                                             │
│  yield from:接力赛                                          │
│  ─────────────────────────                                  │
│  def relay_race():                                          │
│      yield "准备接力"                                        │
│      yield from runner_a()  # 接力棒交给运动员A             │
│      yield from runner_b()  # 接力棒交给运动员B             │
│      yield "接力结束"                                        │
│                                                             │
│  效果:                                                      │
│  ─────────────────────────                                  │
│  for step in relay_race():                                  │
│      print(step)                                            │
│  # 准备接力                                                  │
│  # A跑完第1圈                                                │
│  # A跑完第2圈                                                │
│  # B跑完第1圈                                                │
│  # B跑完第2圈                                                │
│  # 接力结束                                                  │
│                                                             │
│  关键:                                                      │
│  ─────────────────────                                      │
│  • 主生成器不亲自 yield,委托给子生成器                       │
│  • 所有值自动传递给调用方                                    │
│  • 子生成器跑完,主生成器继续                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

一句话:

yield from = 把接力棒交给子生成器,让它帮你产出所有值

yield from 语法

python
from collections.abc import Iterator

def sub_generator() -> Iterator[int]:
    """子生成器"""
    yield 1
    yield 2
    yield 3

def main_generator() -> Iterator[int]:
    """主生成器"""
    yield "开始"
    yield from sub_generator()  # 委托给子生成器
    yield "结束"

for value in main_generator():
    print(value)
# 开始
# 1
# 2
# 3
# 结束

yield from 执行时间线

┌─────────────────────────────────────────────────────────────┐
│  yield from 执行时间线                                        │
│                                                             │
│  代码:                                      │
│      yield from sub_generator()                             │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 时间线                                               │   │
│  │                                                      │   │
│  │  1. next(main_gen) ─→ 执行到 yield "开始"           │   │
│  │     返回 "开始"                                      │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  2. next(main_gen) ─→ 执行到 yield from             │   │
│  │     创建子生成器 sub_gen                             │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  3. 自动调用 next(sub_gen)                          │   │
│  │     返回 1(通过主生成器传递给调用方)               │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  4. next(main_gen) ─→ 自动传递到 next(sub_gen)      │   │
│  │     返回 2                                            │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  5. next(main_gen) ─→ 自动传递到 next(sub_gen)      │   │
│  │     返回 3                                            │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  6. sub_gen 结束 ─→ StopIteration                   │   │
│  │     yield from 捕获,继续执行主生成器               │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  7. 执行到 yield "结束"                              │   │
│  │     返回 "结束"                                      │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  8. main_gen 结束 ─→ StopIteration                  │   │
│  │                                                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  ⚠️ 关键:调用方只和主生成器交互                              │
│  ⚠️ 关键:yield from 自动处理子生成器的所有值                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

L1 理解层:会用

yield from 实用技巧

展平嵌套结构

python
from collections.abc import Iterator
from typing import Any

def flatten(nested: list[Any]) -> Iterator[Any]:
    """递归展平嵌套列表"""
    for item in nested:
        if isinstance(item, list):
            yield from flatten(item)  # 递归委托
        else:
            yield item

nested = [1, [2, 3], [4, [5, 6]], 7]
print(list(flatten(nested)))
# [1, 2, 3, 4, 5, 6, 7]

获取子生成器返回值

python
from collections.abc import Iterator

def sub_gen() -> Iterator[int]:
    """子生成器带返回值"""
    yield 1
    yield 2
    return "子生成器完成"  # 返回值

def main_gen() -> Iterator[str | int]:
    """主生成器接收返回值"""
    result: str = yield from sub_gen()
    yield f"收到返回值:{result}"

gen = main_gen()
print(next(gen))  # 1
print(next(gen))  # 2
print(next(gen))  # 收到返回值:子生成器完成

贯穿实战:合并多源日志

场景说明

日志分析系统需要从三个来源收集日志:

┌─────────────────────────────────────────────────────────────┐
│  日志来源架构                                                 │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐                                            │
│  │ 本地日志    │ app.log                                    │
│  │ 文件        │                                            │
│  └────┬────────┘                                            │
│       │                                                     │
│       ↓                                                     │
│  ┌─────────────┐                                            │
│  │ 远程服务器  │ API: /api/logs                             │
│  │ 日志        │                                            │
│  └────┬────────┘                                            │
│       │                                                     │
│       ↓                                                     │
│  ┌─────────────┐                                            │
│  │ 数据库日志  │ SELECT * FROM logs                         │
│  │ 表          │                                            │
│  └────┬────────┘                                            │
│       │                                                     │
│       ↓                                                     │
│  ┌─────────────┐                                            │
│  │ unified_logs│ yield from 合并                           │
│  │ ()          │                                            │
│  └─────────────┘                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实现代码

python
from collections.abc import Iterator
from datetime import datetime
from typing import Any

def read_local_log(filename: str) -> Iterator[dict[str, Any]]:
    """读取本地日志文件
    
    贯穿项目:日志分析系统
    功能:逐行解析本地日志文件
    """
    with open(filename, "r", encoding="utf-8") as f:
        for line in f:
            if not line.strip():
                continue
            yield {
                "source": "local",
                "content": line.strip(),
                "timestamp": datetime.now()
            }

def fetch_remote_logs(api_url: str) -> Iterator[dict[str, Any]]:
    """模拟远程日志获取
    
    贯穿项目:日志分析系统
    功能:从 API 获取远程服务器日志
    """
    # 模拟 API 返回数据
    mock_logs = [
        "[2024-01-19 10:00:00] ERROR remote_server - 连接失败",
        "[2024-01-19 10:05:00] WARN  remote_server - 超时警告",
        "[2024-01-19 10:10:00] INFO  remote_server - 重连成功",
    ]
    for line in mock_logs:
        yield {
            "source": "remote",
            "content": line,
            "timestamp": datetime.now()
        }

def query_db_logs(query: str) -> Iterator[dict[str, Any]]:
    """模拟数据库日志查询
    
    贯穿项目:日志分析系统
    功能:从数据库查询日志记录
    """
    # 模拟数据库返回
    mock_records = [
        {"id": 1, "message": "用户登录失败", "level": "ERROR"},
        {"id": 2, "message": "订单创建成功", "level": "INFO"},
    ]
    for record in mock_records:
        yield {
            "source": "database",
            "content": f"[{record['level']}] {record['message']}",
            "timestamp": datetime.now(),
            "db_id": record["id"]
        }

def unified_logs(
    local_file: str = "app.log",
    remote_api: str = "/api/logs",
    db_query: str = "SELECT * FROM logs"
) -> Iterator[dict[str, Any]]:
    """合并所有日志源
    
    贯穿项目:日志分析系统
    核心功能:用 yield from 统一多个日志来源
    """
    yield from read_local_log(local_file)
    yield from fetch_remote_logs(remote_api)
    yield from query_db_logs(db_query)

# 使用:统一处理所有日志源
for log_entry in unified_logs():
    if "ERROR" in log_entry["content"]:
        print(f"[{log_entry['source']}] {log_entry['content']}")

关键代码说明:

代码含义为什么这样写
yield from read_local_log()委托给本地日志生成器自动传递所有值,无需手动循环
yield {"source": "local", ...}标记日志来源便于区分不同来源的日志
def unified_logs()统一入口所有日志源通过一个函数访问
三个 yield from 连续使用顺序合并先本地,后远程,最后数据库

统计错误数量

python
from collections import Counter

def count_errors_by_source(logs: Iterator[dict[str, Any]]) -> dict[str, int]:
    """按来源统计错误数量"""
    error_counts: Counter[str] = Counter()
    
    for log in logs:
        if "ERROR" in log["content"]:
            error_counts[log["source"]] += 1
    
    return dict(error_counts)

# 使用
all_logs = unified_logs()
result = count_errors_by_source(all_logs)
print(result)
# {'local': 5, 'remote': 1, 'database': 1}

生成器的其他高级方法

send():双向通信

send() 可以向生成器发送值,实现双向数据流:

python
from collections.abc import Generator

def accumulator() -> Generator[int, int, None]:
    """累加器:接收外部值并累加"""
    total: int = 0
    while True:
        value: int = yield total  # yield 返回累计值,接收外部值
        total += value

acc = accumulator()
next(acc)           # 启动生成器,返回 0
print(acc.send(10)) # 发送 10,返回 10
print(acc.send(20)) # 发送 20,返回 30
print(acc.send(5))  # 发送 5,返回 35

执行流程:

┌─────────────────────────────────────────────────────────────┐
│  send() 执行流程                                              │
│                                                             │
│  value = yield total                                        │
│           │        │                                        │
│           │        └── 返回给调用方的值                       │
│           └── send() 传入的值                                │
│                                                             │
│  步骤:                                                      │
│  1. next(acc) ─→ 执行到 yield total                         │
│     返回 0,暂停在 yield 处                                  │
│                                                             │
│  2. acc.send(10) ─→ value = 10                             │
│     total = 0 + 10 = 10                                     │
│     继续到 yield total                                       │
│     返回 10,暂停                                            │
│                                                             │
│  ⚠️ 注意:首次必须用 next() 或 send(None) 启动               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

throw():注入异常

向生成器内部抛出异常,可用于错误恢复:

python
from collections.abc import Generator

def resilient_gen() -> Generator[int, None, None]:
    """可恢复的生成器"""
    try:
        yield 1
        yield 2
        yield 3
    except ValueError as e:
        print(f"恢复:{e}")
        yield -1  # 错误恢复后继续

gen = resilient_gen()
print(next(gen))  # 1
gen.throw(ValueError("测试"))
# 恢复:测试
print(next(gen))  # -1

close():关闭生成器

提前关闭生成器,触发 finally 块:

python
from collections.abc import Generator

def resource_gen() -> Generator[int, None, None]:
    """带资源清理的生成器"""
    try:
        for i in range(10):
            yield i
    finally:
        print("清理资源")

gen = resource_gen()
print(next(gen))  # 0
print(next(gen))  # 1
gen.close()       # 清理资源
# next(gen) 会抛出 StopIteration

L2 实践层:用好

推荐做法与反模式

推荐做法

做法原因示例
用 yield from 合并数据源避免重复代码yield from read_file()
yield from 接收返回值获取子生成器状态result = yield from sub()
send() 前先启动防止 TypeErrornext(gen)gen.send(None)
finally 包裹资源确保 close() 清理try: yield x; finally: cleanup()

反模式:不要这样做

python
# ❌ 错误:send() 未先启动
def accumulator():
    total = 0
    while True:
        value = yield total
        total += value

acc = accumulator()
acc.send(10)  # TypeError: can't send non-None to just-started generator

# ✅ 正确:先启动
acc = accumulator()
next(acc)     # 启动生成器
acc.send(10)  # 正常工作
python
# ❌ 错误:GeneratorExit 后继续 yield
def bad_gen():
    try:
        yield 1
    except GeneratorExit:
        yield 2  # RuntimeError: generator ignored GeneratorExit

# ✅ 正确:GeneratorExit 后直接退出
def good_gen():
    try:
        yield 1
    except GeneratorExit:
        print("清理")  # 只做清理,不 yield

适用场景

场景推荐方法
合并多个数据源yield from
展平嵌套结构yield from 递归
生产者-消费者send() 双向通信
错误恢复throw() + try/except
资源清理close() + finally
简单遍历普通生成器足够

L3 专家层:深入

yield from 的底层委托机制

yield from 的 CPython 实现流程:

  主生成器 main_gen:
  ┌──────────────────────────────────────┐
  │ for item in main_gen:               │
  │     ...                             │
  │     yield from sub_gen()            │
  │     ...                             │
  └──────────────────────────────────────┘

              ▼ yield from 执行:
  ┌──────────────────────────────────────┐
  │ ① 创建子生成器 sub = sub_gen()     │
  │ ② 首次 next() 由主生成器转发       │
  │ ③ sub_gen 每次 yield → 值直达调用方│
  │ ④ sub_gen return X → 值赋给 result │
  │    result = yield from sub_gen()    │
  │ ⑤ sub_gen 的 StopIteration 被捕获  │
  │ ⑥ 主生成器继续执行                  │
  └──────────────────────────────────────┘

关键字节码:
  YIELD_FROM   → 委托给子生成器
  GET_YIELD_FROM_ITER → 获取子生成器的迭代器

verify:
import dis
def demo():
    yield from range(3)
dis.dis(demo)
# 包含 LOAD_GLOBAL range, CALL, GET_YIELD_FROM_ITER, YIELD_FROM

send/throw/close 三方协议

生成器的三方通信协议:

  调用方                  生成器内部
  ──────                  ──────────
  
  next(gen)         →   执行到 yield,暂停,返回值
  gen.send(value)   →   恢复执行,yield 表达式接收 value
  gen.throw(exc)    →   向 yield 处注入异常
  gen.close()       →   向 yield 处注入 GeneratorExit

  生成器状态机:
  ┌──────────┐  next()/send()   ┌──────────────┐
  │GEN_CREATED│ ───────────────→ │GEN_SUSPENDED │
  └──────────┘                   └──────────────┘
                                       │    ↑
                    next()/send()/throw│    │
                                       │    │
                                       ↓    │
                                  ┌──────────────┐
                                  │  执行中...    │
                                  │  (GEN_RUNNING)│
                                  └──────────────┘

                              yield/return/异常


                                  ┌──────────────┐
                                  │GEN_SUSPENDED │ ← 回到暂停
                                  └──────────────┘

                              close()/耗尽


                                  ┌──────────────┐
                                  │ GEN_CLOSED   │
                                  └──────────────┘

协程演进路线

  yield → yield from → async/await 的演化:

  Python 2.5 (PEP 342):
  ┌──────────────────────────────────────┐
  │ yield 表达式 + send()               │
  │ 生成器从"产出"变成"协程"雏形       │
  │ def coro():                         │
  │     data = yield  # 接收外部数据    │
  └──────────────────────────────────────┘


  Python 3.3 (PEP 380):
  ┌──────────────────────────────────────┐
  │ yield from 委托语法                  │
  │ 支持子生成器透明代理                │
  │ 支持返回值传递 return value         │
  └──────────────────────────────────────┘


  Python 3.5 (PEP 492):
  ┌──────────────────────────────────────┐
  │ async def / await 正式协程          │
  │ async def → 原生协程 (Coroutine)    │
  │ await    → 等待异步结果             │
  │                                     │
  │ 关键区别:                          │
  │ • 生成器:yield 是基于迭代器的协程   │
  │ • 原生协程:async/await 是事件循环驱动│
  │ • 不能混用(async def 不能用 yield) │
  └──────────────────────────────────────┘

send() 启动问题的底层原因

python
# 为什么 send(None) 等价于 next()?
# CPython gen_send_ex() 中:
# 如果生成器状态是 GEN_CREATED(刚创建):
#   - send(None) → 执行到第一个 yield,等同于 next()
#   - send(非None) → TypeError 报错

# 原因:生成器创建时,指令指针在函数开头
# 没有 yield 表达式可以接收 send 的值
# 必须先执行到第一个 yield,才能接收值

def echo():
    received = yield "ready"  # 这次 yield 后才准备好接收
    yield f"got: {received}"

g = echo()
print(g.send(None))  # OK: "ready"(等价于 next(g))
print(g.send("hi"))  # OK: "got: hi"

性能考量

操作时间复杂度空间复杂度说明
yield from 委托O(1) 建立O(200字节)创建一个子生成器对象
send() 双向通信O(1)O(1)帧上下文切换,常数时间
throw() 异常注入O(1)O(1)异常对象传递
close() 关闭O(1)O(1)GeneratorExit 注入

知识关联

  生成器高级特性知识关联:

  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
  │  yield        │────→│  send()      │────→│  throw()     │
  │  单向产出     │     │  双向通信     │     │  异常注入     │
  └──────────────┘     └──────────────┘     └──────────────┘
         │                    │                    │
         ↓                    ↓                    ↓
  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
  │  yield from  │────→│  子生成器委托│────→│  close()     │
  │  代理语法     │     │  返回值传递   │     │  资源清理     │
  └──────────────┘     └──────────────┘     └──────────────┘


  ┌──────────────┐     ┌──────────────┐
  │  async/await │     │  事件循环     │
  │  原生协程     │────→│  异步驱动     │
  └──────────────┘     └──────────────┘

自检清单

回答以下问题,检查你是否掌握了核心概念:

  1. yield from 和直接 yield 有什么区别?
  2. 如何获取子生成器的返回值?
  3. send() 为什么必须先启动生成器?
  4. close()throw() 有什么区别?
  5. 为什么用 yield from 合并日志源更好?

答案

  1. yield from 委托子生成器产出所有值,yield 只产出一个值
  2. result = yield from sub_gen(),子生成器的 return 值赋给 result
  3. 生成器启动时停在第一个 yield 前,还没有 yield 表达式接收值
  4. close() 发送 GeneratorExit 用于关闭,throw() 发送普通异常用于恢复
  5. 避免重复处理逻辑,统一入口,代码更简洁

本章能力清单

学完本章,你能够:

  • [x] 用 yield from 合并多个日志数据源
  • [x] 用 yield from 展平嵌套结构
  • [x] 获取子生成器的返回值
  • [x] 使用 send() 实现双向通信
  • [x] 使用 close() 确保资源清理

前置知识检查

  • 你是否掌握了生成器基础(yield 机制)? ← 第3节

下一步学习

  • itertools 模块(高效批量处理) → 第5节
  • 异步生成器(async yield) → 第6节

导航