06-异步生成器
Python 版本要求:Python 3.11+
贯穿项目:日志分析系统 本节目标:用 async yield 异步收集多服务器日志
概念铺垫
为什么需要异步生成器?
问题场景:收集多服务器日志
继续日志分析系统,现在需要从多台服务器收集日志:
- server1.log、server2.log、server3.log
- 需要同时获取,不要排队等待
python
# ❌ 方案一:同步生成器(排队等待)
def fetch_logs_sync(servers: list[str]) -> Iterator[str]:
for server in servers:
# 每台服务器都要等待网络响应
response = requests.get(f"http://{server}/logs") # 阻塞!
yield response.text
# 串行获取:server1 → server2 → server3
# 总时间 = server1 + server2 + server3
# ✅ 方案二:异步生成器(并行获取)
async def fetch_logs_async(servers: list[str]) -> AsyncGenerator[str, None]:
async with aiohttp.ClientSession() as session:
for server in servers:
async with session.get(f"http://{server}/logs") as response:
yield await response.text() # 异步等待
# 并行获取:可以同时向多台服务器请求
# 总时间 ≈ 最慢的那台问题:如何让生成器支持异步操作?async yield 是什么?
异步生成器:多窗口同时取餐
生活类比
把异步生成器想象成多窗口取餐:
┌─────────────────────────────────────────────────────────────┐
│ 异步生成器 = 多窗口同时取餐 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 同步生成器 = 单窗口排队 │
│ ───────────────────────── │
│ • 顾客A取餐 → 等待 → 完成 │
│ • 顾客B取餐 → 等待 → 完成 │
│ • 顾客C取餐 → 等待 → 完成 │
│ • 总时间 = A + B + C │
│ │
│ 异步生成器 = 多窗口并行 │
│ ───────────────────────── │
│ • 顾客A去窗口1 → 等待(不阻塞别人) │
│ • 顾客B去窗口2 → 等待(同时进行) │
│ • 顾客C去窗口3 → 等待(同时进行) │
│ • 总时间 ≈ 最慢的那个人 │
│ │
│ 关键: │
│ ───────────────────── │
│ • async yield = 异步窗口取餐 │
│ • async for = 异步等待结果 │
│ • await = 等待某个窗口完成 │
│ │
│ 效果: │
│ ───────────────────── │
│ • 同时向多台服务器请求 │
│ • 不用排队等每个服务器 │
│ • 大幅减少总等待时间 │
│ │
└─────────────────────────────────────────────────────────────┘一句话:
async yield= 异步产出值,可以同时等待多个数据源
异步生成器语法
┌─────────────────────────────────────────────────────────────┐
│ 异步生成器语法 │
│ │
│ 定义: │
│ async def async_gen() -> AsyncGenerator[T, None]: │
│ await async_operation() │
│ yield value │
│ │
│ 遍历: │
│ async for item in async_gen(): │
│ await process(item) │
│ │
│ 类型: │
│ AsyncGenerator[YieldType, SendType] │
│ • YieldType → yield 产出的值类型 │
│ • SendType → asend() 发送的值类型(通常 None) │
│ │
│ 关键: │
│ 1. async def + yield = 异步生成器 │
│ 2. 必须用 async for 遍历 │
│ 3. yield 前可以 await │
│ │
└─────────────────────────────────────────────────────────────┘L1 理解层:会用
贯穿实战:异步收集多服务器日志
async for 执行流程
┌─────────────────────────────────────────────────────────────┐
│ async for 执行流程 │
│ │
│ 代码: │
│ async for log in fetch_logs_async(servers): │
│ print(log) │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 时间线 │ │
│ │ │ │
│ │ 1. 获取异步迭代器 │ │
│ │ agen = fetch_logs_async(servers) │ │
│ │ iterator = agen.__aiter__() │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 2. 调用 __anext__ │ │
│ │ awaitable = iterator.__anext__() │ │
│ │ 开始异步请求 server1 │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 3. await 等待结果 │ │
│ │ log = await awaitable │ │
│ │ yield 产出 server1 的日志 │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 4. 执行循环体 │ │
│ │ print(log) │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ 5. 再次 __anext__ │ │
│ │ 开始异步请求 server2 │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ ... 继续直到 StopAsyncIteration │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ⚠️ 关键:每个 yield 前可以 await 异步操作 │
│ ⚠️ 关键:必须用 async for,不能用普通 for │
│ │
└─────────────────────────────────────────────────────────────┘实现异步日志收集
python
import asyncio
import aiohttp
from collections.abc import AsyncGenerator
from datetime import datetime
from typing import Any
async def fetch_server_log(
session: aiohttp.ClientSession,
server: str
) -> dict[str, Any]:
"""从单台服务器获取日志
贯穿项目:日志分析系统
功能:异步 HTTP 请求获取日志
"""
url: str = f"http://{server}/api/logs"
async with session.get(url) as response:
data: dict[str, Any] = await response.json()
return {
"server": server,
"logs": data.get("logs", []),
"timestamp": datetime.now(),
"status": "success"
}
async def collect_logs_from_servers(
servers: list[str]
) -> AsyncGenerator[dict[str, Any], None]:
"""异步收集多服务器日志
贯穿项目:日志分析系统
核心功能:用 async yield 产出各服务器日志
"""
async with aiohttp.ClientSession() as session:
for server in servers:
try:
log_data: dict[str, Any] = await fetch_server_log(session, server)
yield log_data
except aiohttp.ClientError as e:
yield {
"server": server,
"error": str(e),
"timestamp": datetime.now(),
"status": "failed"
}
async def analyze_multi_server_logs(servers: list[str]) -> dict[str, int]:
"""分析多服务器日志,统计错误数量
贯穿项目:日志分析系统
功能:汇总各服务器错误数
"""
total_errors: dict[str, int] = {}
async for server_data in collect_logs_from_servers(servers):
server_name: str = server_data["server"]
if server_data["status"] == "success":
logs: list[str] = server_data["logs"]
error_count: int = sum(1 for log in logs if "ERROR" in log)
total_errors[server_name] = error_count
print(f"{server_name}: {len(logs)} 条日志, {error_count} 个错误")
else:
print(f"{server_name}: 获取失败 - {server_data['error']}")
total_errors[server_name] = -1
return total_errors
# 使用示例(模拟)
async def demo_async_logs() -> None:
"""演示异步日志收集"""
# 模拟服务器列表
servers: list[str] = ["server1.local", "server2.local", "server3.local"]
# 实际使用时需要真实服务器,这里用模拟数据演示
async def mock_collect_logs() -> AsyncGenerator[dict[str, Any], None]:
"""模拟异步日志收集"""
for server in servers:
await asyncio.sleep(0.1) # 模拟网络延迟
yield {
"server": server,
"logs": [
"[ERROR] Connection timeout",
"[INFO] Request processed",
"[ERROR] Database error"
],
"status": "success"
}
async for data in mock_collect_logs():
print(f"收到 {data['server']} 的日志")
asyncio.run(demo_async_logs())关键代码说明:
| 代码 | 含义 | 为什么这样写 |
|---|---|---|
async def collect_logs_from_servers() | 定义异步生成器 | 使用 async def + yield |
async with aiohttp.ClientSession() as session | 异步 HTTP 会话 | 管理连接池,复用连接 |
await fetch_server_log(session, server) | 异步请求 | 不阻塞其他操作 |
yield log_data | 产出结果 | async yield 异步产出 |
async for server_data in collect_logs_from_servers() | 异步遍历 | 必须用 async for |
异步生成器基础
最简示例
python
import asyncio
from collections.abc import AsyncGenerator
async def async_counter(n: int) -> AsyncGenerator[int, None]:
"""简单的异步计数器"""
for i in range(n):
await asyncio.sleep(0.1) # 模拟异步操作
yield i
async def main() -> None:
async for num in async_counter(5):
print(num)
asyncio.run(main())
# 0, 1, 2, 3, 4异步迭代器协议
python
from collections.abc import AsyncIterator
class AsyncRange(AsyncIterator[int]):
"""自定义异步迭代器"""
def __init__(self, start: int, end: int) -> None:
self.current: int = start
self.end: int = end
def __aiter__(self) -> "AsyncRange":
return self
async def __anext__(self) -> int:
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.01) # 模拟异步操作
result: int = self.current
self.current += 1
return result
async def main() -> None:
async for num in AsyncRange(1, 4):
print(num)
asyncio.run(main())
# 1, 2, 3L2 实践层:用好
推荐做法与反模式
推荐做法
| 做法 | 原因 | 示例 |
|---|---|---|
| 用 AsyncGenerator 类型提示 | 明确类型 | -> AsyncGenerator[dict, None] |
| yield 前做异步操作 | 利用异步优势 | await fetch(); yield data |
| 用 async for 遍历 | 必须的语法 | async for item in agen |
| 批量并发获取 | 加速处理 | asyncio.gather() + 异步生成器 |
反模式:不要这样做
python
# ❌ 错误:用普通 for 遍历异步生成器
async def async_gen() -> AsyncGenerator[int, None]:
yield 1
async def main() -> None:
for item in async_gen(): # TypeError!
print(item)
# ✅ 正确:用 async for
async def main() -> None:
async for item in async_gen():
print(item)python
# ❌ 错误:忘记 await
async def fetch_data() -> dict:
await asyncio.sleep(0.1)
return {"data": "value"}
async def async_gen() -> AsyncGenerator[dict, None]:
data = fetch_data() # 没有 await,返回协程对象
yield data
# ✅ 正确:使用 await
async def async_gen() -> AsyncGenerator[dict, None]:
data = await fetch_data()
yield data适用场景
| 场景 | 推荐 | 原因 |
|---|---|---|
| 多服务器日志收集 | ✅ | 异步并行获取 |
| 分页 API 数据流 | ✅ | 惰性 + 异步 |
| 大文件异步读取 | ✅ | 逐行异步处理 |
| 实时数据流 | ✅ | 按需处理 |
| 本地小数据 | ❌ | 普通生成器足够 |
L3 专家层:深入
异步生成器的底层实现
异步生成器 vs 普通生成器 内部结构:
普通生成器 (generator): 异步生成器 (async_generator):
┌──────────────────────┐ ┌──────────────────────────┐
│ gi_code → 代码对象│ │ ag_code → 代码对象 │
│ gi_frame → 帧对象 │ │ ag_frame → 帧对象 │
│ gi_running → 运行状态│ │ ag_running → 运行状态 │
│ gi_yieldfrom → 委托 │ │ ag_finalizer → 清理钩子 │
└──────────────────────┘ │ ag_hooks_inited → 标记 │
└──────────────────────────┘
关键区别:
• 异步生成器有 ag_finalizer — 用于事件循环关闭时的清理
• 异步生成器通过 __anext__() 返回 awaitable,而非直接返回值
• 异步生成器与事件循环紧密耦合验证:
python
import asyncio
async def agen():
yield 1
await asyncio.sleep(0.1)
yield 2
ag = agen()
print(type(ag)) # <class 'async_generator'>
print(ag.ag_code) # <code object agen>
print(ag.ag_frame) # None(未启动时为 None)
# 获取 __anext__ 返回的 awaitable
ait = ag.__anext__()
print(type(ait)) # <class 'coroutine'> — 本质是协程!
# 执行后才能看到帧
async def run():
print(await ait) # 1
print(ag.ag_frame) # <frame at ...>(暂停后帧对象存在)
asyncio.run(run())事件循环集成机制
异步生成器与事件循环的协作:
async for item in agen():
│
▼
┌──────────────────────────────────────┐
│ 事件循环 (Event Loop) │
│ │
│ ① 调用 agen.__aiter__() → 返回 self │
│ ② 调用 agen.__anext__() → 返回协程 │
│ ③ 将协程提交到事件循环: │
│ ┌──────────────────────┐ │
│ │ 等待协程完成或 yield │ │
│ │ 如果 yield → 返回值给调用方 │
│ │ 如果 await → 注册等待 │
│ └──────────────────────┘ │
│ ④ 协程恢复时继续执行 │
│ ⑤ 重复 ②-④ │
│ ⑥ StopAsyncIteration → 循环结束 │
└──────────────────────────────────────┘
关键机制:
• __anext__() 返回的协程由事件循环调度
• yield 暂停协程 → 控制权交回事件循环
• await 注册等待 → 事件循环可以调度其他任务
• 异步生成器关闭:事件循环调用 aclose() → GeneratorExit异步迭代器协议
python
from collections.abc import AsyncIterator
# 协议要求
class AsyncIterable:
def __aiter__(self) -> AsyncIterator:
"""返回异步迭代器"""
...
class AsyncIterator(AsyncIterable):
async def __anext__(self):
"""返回下一个值,或 raise StopAsyncIteration"""
...
async def aclose(self):
"""可选:清理资源"""
...
# 等价转换
# async for x in async_iter:
# body(x)
#
# 等价于:
it = async_iter.__aiter__()
while True:
try:
x = await it.__anext__()
except StopAsyncIteration:
break
body(x)异步生成器的清理机制
python
# 事件循环关闭时的清理流程:
#
# 1. 事件循环检测到未关闭的异步生成器
# 2. 调用异步生成器的 aclose() 方法
# 3. aclose() 向暂停的 yield 处注入 GeneratorExit
# 4. 生成器的 finally 块执行清理
# 5. 如果 finally 中有 await,事件循环等待完成
import asyncio
async def cleanup_demo():
try:
while True:
await asyncio.sleep(1)
yield "data"
finally:
await asyncio.sleep(0.1) # 异步清理
print("资源已清理")
async def main():
async for data in cleanup_demo():
print(data)
break # 提前退出,触发清理
asyncio.run(main())
# 输出: data\n资源已清理性能考量
| 操作 | 时间复杂度 | 空间复杂度 | 说明 |
|---|---|---|---|
| 创建异步生成器 | O(1) | O(~200字节) | 类似普通生成器 + finalizer 钩子 |
| anext 调用 | O(1) | O(1) | 返回协程对象 |
| yield 暂停/恢复 | O(1) | O(1) | 事件循环上下文切换 |
| aclose() 关闭 | O(1) | O(1) | GeneratorExit 注入 |
知识关联
异步生成器知识关联:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 普通生成器 │────→│ async def │────→│ 事件循环 │
│ yield 暂停 │ │ + yield │ │ 调度恢复 │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ __iter__ │ │ __aiter__ │ │ asyncio.gather│
│ __next__ │ │ __anext__ │ │ 并行组合 │
└──────────────┘ └──────────────┘ └──────────────┘
│ │
↓ ↓
┌──────────────┐ ┌──────────────┐
│ StopIteration│ │StopAsyncIter │
│ 同步终止 │ │ 异步终止 │
└──────────────┘ └──────────────┘自检清单
回答以下问题,检查你是否掌握了核心概念:
- 异步生成器如何定义?用哪些关键字?
- 为什么不能用普通
for遍历异步生成器? AsyncGenerator[int, None]中两个类型参数是什么?- 异步迭代器协议包含哪些方法?
- 异步生成器和普通生成器的主要区别?
答案:
async def+yield,用async for遍历- 异步生成器返回 Awaitable,普通 for 无法处理
- 第一个是 yield 值类型,第二个是 asend 发送值类型(通常 None)
__aiter__和__anext__,__anext__是 async 方法- 异步生成器可以 await 异步操作,用 async for 遍历,抛出 StopAsyncIteration
本章能力清单
学完本章,你能够:
- [x] 定义异步生成器(async def + yield)
- [x] 用 async for 遍历异步生成器
- [x] 异步收集多服务器日志
- [x] 理解异步迭代器协议(aiter、anext)
- [x] 区分同步生成器和异步生成器的使用场景
前置知识检查:
- 你是否掌握了生成器基础(yield 机制)? ← 第3节
- 你是否了解 asyncio 基础(async/await)? ← 异步编程章节
下一步学习:
- 异步编程深入(事件循环、并发控制)
- asyncio 高级应用(gather、create_task)
导航: