Skip to content

06-异步生成器

Python 版本要求:Python 3.11+

贯穿项目:日志分析系统 本节目标:用 async yield 异步收集多服务器日志


概念铺垫

为什么需要异步生成器?

问题场景:收集多服务器日志

继续日志分析系统,现在需要从多台服务器收集日志:

  • server1.log、server2.log、server3.log
  • 需要同时获取,不要排队等待
python
# ❌ 方案一:同步生成器(排队等待)
def fetch_logs_sync(servers: list[str]) -> Iterator[str]:
    for server in servers:
        # 每台服务器都要等待网络响应
        response = requests.get(f"http://{server}/logs")  # 阻塞!
        yield response.text

# 串行获取:server1 → server2 → server3
# 总时间 = server1 + server2 + server3

# ✅ 方案二:异步生成器(并行获取)
async def fetch_logs_async(servers: list[str]) -> AsyncGenerator[str, None]:
    async with aiohttp.ClientSession() as session:
        for server in servers:
            async with session.get(f"http://{server}/logs") as response:
                yield await response.text()  # 异步等待

# 并行获取:可以同时向多台服务器请求
# 总时间 ≈ 最慢的那台

问题:如何让生成器支持异步操作?async yield 是什么?


异步生成器:多窗口同时取餐

生活类比

把异步生成器想象成多窗口取餐:

┌─────────────────────────────────────────────────────────────┐
│  异步生成器 = 多窗口同时取餐                                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  同步生成器 = 单窗口排队                                       │
│  ─────────────────────────                                  │
│  • 顾客A取餐 → 等待 → 完成                                   │
│  • 顾客B取餐 → 等待 → 完成                                   │
│  • 顾客C取餐 → 等待 → 完成                                   │
│  • 总时间 = A + B + C                                        │
│                                                             │
│  异步生成器 = 多窗口并行                                       │
│  ─────────────────────────                                  │
│  • 顾客A去窗口1 → 等待(不阻塞别人)                          │
│  • 顾客B去窗口2 → 等待(同时进行)                            │
│  • 顾客C去窗口3 → 等待(同时进行)                            │
│  • 总时间 ≈ 最慢的那个人                                      │
│                                                             │
│  关键:                                                      │
│  ─────────────────────                                      │
│  • async yield = 异步窗口取餐                                │
│  • async for = 异步等待结果                                  │
│  • await = 等待某个窗口完成                                  │
│                                                             │
│  效果:                                                      │
│  ─────────────────────                                      │
│  • 同时向多台服务器请求                                       │
│  • 不用排队等每个服务器                                       │
│  • 大幅减少总等待时间                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

一句话:

async yield = 异步产出值,可以同时等待多个数据源

异步生成器语法

┌─────────────────────────────────────────────────────────────┐
│  异步生成器语法                                                │
│                                                             │
│  定义:                                                      │
│  async def async_gen() -> AsyncGenerator[T, None]:         │
│      await async_operation()                               │
│      yield value                                           │
│                                                             │
│  遍历:                                                      │
│  async for item in async_gen():                            │
│      await process(item)                                   │
│                                                             │
│  类型:                                                      │
│  AsyncGenerator[YieldType, SendType]                       │
│  • YieldType → yield 产出的值类型                           │
│  • SendType → asend() 发送的值类型(通常 None)             │
│                                                             │
│  关键:                                                      │
│  1. async def + yield = 异步生成器                         │
│  2. 必须用 async for 遍历                                   │
│  3. yield 前可以 await                                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

L1 理解层:会用

贯穿实战:异步收集多服务器日志

async for 执行流程

┌─────────────────────────────────────────────────────────────┐
│  async for 执行流程                                           │
│                                                             │
│  代码:                                      │
│      async for log in fetch_logs_async(servers):            │
│          print(log)                                         │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 时间线                                               │   │
│  │                                                      │   │
│  │  1. 获取异步迭代器                                   │   │
│  │     agen = fetch_logs_async(servers)                │   │
│  │     iterator = agen.__aiter__()                     │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  2. 调用 __anext__                                  │   │
│  │     awaitable = iterator.__anext__()                │   │
│  │     开始异步请求 server1                             │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  3. await 等待结果                                  │   │
│  │     log = await awaitable                           │   │
│  │     yield 产出 server1 的日志                        │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  4. 执行循环体                                       │   │
│  │     print(log)                                       │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  5. 再次 __anext__                                  │   │
│  │     开始异步请求 server2                             │   │
│  │        │                                             │   │
│  │        ↓                                             │   │
│  │  ... 继续直到 StopAsyncIteration                    │   │
│  │                                                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  ⚠️ 关键:每个 yield 前可以 await 异步操作                    │
│  ⚠️ 关键:必须用 async for,不能用普通 for                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实现异步日志收集

python
import asyncio
import aiohttp
from collections.abc import AsyncGenerator
from datetime import datetime
from typing import Any

async def fetch_server_log(
    session: aiohttp.ClientSession,
    server: str
) -> dict[str, Any]:
    """从单台服务器获取日志
    
    贯穿项目:日志分析系统
    功能:异步 HTTP 请求获取日志
    """
    url: str = f"http://{server}/api/logs"
    
    async with session.get(url) as response:
        data: dict[str, Any] = await response.json()
        return {
            "server": server,
            "logs": data.get("logs", []),
            "timestamp": datetime.now(),
            "status": "success"
        }

async def collect_logs_from_servers(
    servers: list[str]
) -> AsyncGenerator[dict[str, Any], None]:
    """异步收集多服务器日志
    
    贯穿项目:日志分析系统
    核心功能:用 async yield 产出各服务器日志
    """
    async with aiohttp.ClientSession() as session:
        for server in servers:
            try:
                log_data: dict[str, Any] = await fetch_server_log(session, server)
                yield log_data
            except aiohttp.ClientError as e:
                yield {
                    "server": server,
                    "error": str(e),
                    "timestamp": datetime.now(),
                    "status": "failed"
                }

async def analyze_multi_server_logs(servers: list[str]) -> dict[str, int]:
    """分析多服务器日志,统计错误数量
    
    贯穿项目:日志分析系统
    功能:汇总各服务器错误数
    """
    total_errors: dict[str, int] = {}
    
    async for server_data in collect_logs_from_servers(servers):
        server_name: str = server_data["server"]
        
        if server_data["status"] == "success":
            logs: list[str] = server_data["logs"]
            error_count: int = sum(1 for log in logs if "ERROR" in log)
            total_errors[server_name] = error_count
            print(f"{server_name}: {len(logs)} 条日志, {error_count} 个错误")
        else:
            print(f"{server_name}: 获取失败 - {server_data['error']}")
            total_errors[server_name] = -1
    
    return total_errors

# 使用示例(模拟)
async def demo_async_logs() -> None:
    """演示异步日志收集"""
    # 模拟服务器列表
    servers: list[str] = ["server1.local", "server2.local", "server3.local"]
    
    # 实际使用时需要真实服务器,这里用模拟数据演示
    async def mock_collect_logs() -> AsyncGenerator[dict[str, Any], None]:
        """模拟异步日志收集"""
        for server in servers:
            await asyncio.sleep(0.1)  # 模拟网络延迟
            yield {
                "server": server,
                "logs": [
                    "[ERROR] Connection timeout",
                    "[INFO] Request processed",
                    "[ERROR] Database error"
                ],
                "status": "success"
            }
    
    async for data in mock_collect_logs():
        print(f"收到 {data['server']} 的日志")

asyncio.run(demo_async_logs())

关键代码说明:

代码含义为什么这样写
async def collect_logs_from_servers()定义异步生成器使用 async def + yield
async with aiohttp.ClientSession() as session异步 HTTP 会话管理连接池,复用连接
await fetch_server_log(session, server)异步请求不阻塞其他操作
yield log_data产出结果async yield 异步产出
async for server_data in collect_logs_from_servers()异步遍历必须用 async for

异步生成器基础

最简示例

python
import asyncio
from collections.abc import AsyncGenerator

async def async_counter(n: int) -> AsyncGenerator[int, None]:
    """简单的异步计数器"""
    for i in range(n):
        await asyncio.sleep(0.1)  # 模拟异步操作
        yield i

async def main() -> None:
    async for num in async_counter(5):
        print(num)

asyncio.run(main())
# 0, 1, 2, 3, 4

异步迭代器协议

python
from collections.abc import AsyncIterator

class AsyncRange(AsyncIterator[int]):
    """自定义异步迭代器"""
    
    def __init__(self, start: int, end: int) -> None:
        self.current: int = start
        self.end: int = end
    
    def __aiter__(self) -> "AsyncRange":
        return self
    
    async def __anext__(self) -> int:
        if self.current >= self.end:
            raise StopAsyncIteration
        await asyncio.sleep(0.01)  # 模拟异步操作
        result: int = self.current
        self.current += 1
        return result

async def main() -> None:
    async for num in AsyncRange(1, 4):
        print(num)

asyncio.run(main())
# 1, 2, 3

L2 实践层:用好

推荐做法与反模式

推荐做法

做法原因示例
用 AsyncGenerator 类型提示明确类型-> AsyncGenerator[dict, None]
yield 前做异步操作利用异步优势await fetch(); yield data
用 async for 遍历必须的语法async for item in agen
批量并发获取加速处理asyncio.gather() + 异步生成器

反模式:不要这样做

python
# ❌ 错误:用普通 for 遍历异步生成器
async def async_gen() -> AsyncGenerator[int, None]:
    yield 1

async def main() -> None:
    for item in async_gen():  # TypeError!
        print(item)

# ✅ 正确:用 async for
async def main() -> None:
    async for item in async_gen():
        print(item)
python
# ❌ 错误:忘记 await
async def fetch_data() -> dict:
    await asyncio.sleep(0.1)
    return {"data": "value"}

async def async_gen() -> AsyncGenerator[dict, None]:
    data = fetch_data()  # 没有 await,返回协程对象
    yield data

# ✅ 正确:使用 await
async def async_gen() -> AsyncGenerator[dict, None]:
    data = await fetch_data()
    yield data

适用场景

场景推荐原因
多服务器日志收集异步并行获取
分页 API 数据流惰性 + 异步
大文件异步读取逐行异步处理
实时数据流按需处理
本地小数据普通生成器足够

L3 专家层:深入

异步生成器的底层实现

异步生成器 vs 普通生成器 内部结构:

  普通生成器 (generator):            异步生成器 (async_generator):
  ┌──────────────────────┐         ┌──────────────────────────┐
  │ gi_code    → 代码对象│         │ ag_code    → 代码对象     │
  │ gi_frame   → 帧对象  │         │ ag_frame   → 帧对象       │
  │ gi_running → 运行状态│         │ ag_running → 运行状态     │
  │ gi_yieldfrom → 委托  │         │ ag_finalizer → 清理钩子  │
  └──────────────────────┘         │ ag_hooks_inited → 标记   │
                                   └──────────────────────────┘

关键区别:
• 异步生成器有 ag_finalizer — 用于事件循环关闭时的清理
• 异步生成器通过 __anext__() 返回 awaitable,而非直接返回值
• 异步生成器与事件循环紧密耦合

验证:

python
import asyncio

async def agen():
    yield 1
    await asyncio.sleep(0.1)
    yield 2

ag = agen()
print(type(ag))          # <class 'async_generator'>
print(ag.ag_code)        # <code object agen>
print(ag.ag_frame)       # None(未启动时为 None)

# 获取 __anext__ 返回的 awaitable
ait = ag.__anext__()
print(type(ait))         # <class 'coroutine'> — 本质是协程!

# 执行后才能看到帧
async def run():
    print(await ait)     # 1
    print(ag.ag_frame)   # <frame at ...>(暂停后帧对象存在)

asyncio.run(run())

事件循环集成机制

异步生成器与事件循环的协作:

  async for item in agen():


  ┌──────────────────────────────────────┐
  │  事件循环 (Event Loop)               │
  │                                      │
  │  ① 调用 agen.__aiter__() → 返回 self │
  │  ② 调用 agen.__anext__() → 返回协程  │
  │  ③ 将协程提交到事件循环:            │
  │     ┌──────────────────────┐         │
  │     │ 等待协程完成或 yield  │         │
  │     │ 如果 yield → 返回值给调用方   │
  │     │ 如果 await → 注册等待          │
  │     └──────────────────────┘         │
  │  ④ 协程恢复时继续执行               │
  │  ⑤ 重复 ②-④                          │
  │  ⑥ StopAsyncIteration → 循环结束    │
  └──────────────────────────────────────┘

关键机制:
• __anext__() 返回的协程由事件循环调度
• yield 暂停协程 → 控制权交回事件循环
• await 注册等待 → 事件循环可以调度其他任务
• 异步生成器关闭:事件循环调用 aclose() → GeneratorExit

异步迭代器协议

python
from collections.abc import AsyncIterator

# 协议要求
class AsyncIterable:
    def __aiter__(self) -> AsyncIterator:
        """返回异步迭代器"""
        ...

class AsyncIterator(AsyncIterable):
    async def __anext__(self):
        """返回下一个值,或 raise StopAsyncIteration"""
        ...

    async def aclose(self):
        """可选:清理资源"""
        ...

# 等价转换
# async for x in async_iter:
#     body(x)
#
# 等价于:
it = async_iter.__aiter__()
while True:
    try:
        x = await it.__anext__()
    except StopAsyncIteration:
        break
    body(x)

异步生成器的清理机制

python
# 事件循环关闭时的清理流程:
# 
# 1. 事件循环检测到未关闭的异步生成器
# 2. 调用异步生成器的 aclose() 方法
# 3. aclose() 向暂停的 yield 处注入 GeneratorExit
# 4. 生成器的 finally 块执行清理
# 5. 如果 finally 中有 await,事件循环等待完成

import asyncio

async def cleanup_demo():
    try:
        while True:
            await asyncio.sleep(1)
            yield "data"
    finally:
        await asyncio.sleep(0.1)  # 异步清理
        print("资源已清理")

async def main():
    async for data in cleanup_demo():
        print(data)
        break  # 提前退出,触发清理

asyncio.run(main())
# 输出: data\n资源已清理

性能考量

操作时间复杂度空间复杂度说明
创建异步生成器O(1)O(~200字节)类似普通生成器 + finalizer 钩子
anext 调用O(1)O(1)返回协程对象
yield 暂停/恢复O(1)O(1)事件循环上下文切换
aclose() 关闭O(1)O(1)GeneratorExit 注入

知识关联

  异步生成器知识关联:

  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
  │  普通生成器   │────→│  async def   │────→│  事件循环     │
  │  yield 暂停   │     │  + yield     │     │  调度恢复     │
  └──────────────┘     └──────────────┘     └──────────────┘
         │                    │                    │
         ↓                    ↓                    ↓
  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
  │  __iter__    │     │  __aiter__   │     │  asyncio.gather│
  │  __next__    │     │  __anext__   │     │  并行组合     │
  └──────────────┘     └──────────────┘     └──────────────┘
         │                    │
         ↓                    ↓
  ┌──────────────┐     ┌──────────────┐
  │ StopIteration│     │StopAsyncIter │
  │  同步终止     │     │  异步终止     │
  └──────────────┘     └──────────────┘

自检清单

回答以下问题,检查你是否掌握了核心概念:

  1. 异步生成器如何定义?用哪些关键字?
  2. 为什么不能用普通 for 遍历异步生成器?
  3. AsyncGenerator[int, None] 中两个类型参数是什么?
  4. 异步迭代器协议包含哪些方法?
  5. 异步生成器和普通生成器的主要区别?

答案

  1. async def + yield,用 async for 遍历
  2. 异步生成器返回 Awaitable,普通 for 无法处理
  3. 第一个是 yield 值类型,第二个是 asend 发送值类型(通常 None)
  4. __aiter____anext____anext__ 是 async 方法
  5. 异步生成器可以 await 异步操作,用 async for 遍历,抛出 StopAsyncIteration

本章能力清单

学完本章,你能够:

  • [x] 定义异步生成器(async def + yield)
  • [x] 用 async for 遍历异步生成器
  • [x] 异步收集多服务器日志
  • [x] 理解异步迭代器协议(aiteranext
  • [x] 区分同步生成器和异步生成器的使用场景

前置知识检查

  • 你是否掌握了生成器基础(yield 机制)? ← 第3节
  • 你是否了解 asyncio 基础(async/await)? ← 异步编程章节

下一步学习

  • 异步编程深入(事件循环、并发控制)
  • asyncio 高级应用(gather、create_task)

导航