09-后台任务
Python 3.11+
本章讲解 FastAPI 后台任务处理。
第一部分:BackgroundTasks
1.1 实际场景
发送邮件、生成报表等耗时操作不应该阻塞 HTTP 响应,应该在后台执行。
问题:如何在响应后执行后台任务?
1.2 基本使用
python
from fastapi import FastAPI, BackgroundTasks
app: FastAPI = FastAPI()
def write_log(message: str) -> None:
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@app.post("/send-email/{email}")
async def send_email(email: str, background_tasks: BackgroundTasks) -> dict[str, str]:
background_tasks.add_task(write_log, f"Email sent to {email}")
return {"message": "Email is being sent"}1.3 带参数的任务
python
def process_data(data: str, count: int) -> None:
for i in range(count):
print(f"Processing {data}: {i+1}/{count}")
@app.post("/process")
async def process(
data: str,
count: int = 5,
background_tasks: BackgroundTasks = None
) -> dict[str, str]:
background_tasks.add_task(process_data, data, count)
return {"message": "Processing started"}第二部分:Celery 集成
2.1 实际场景
后台任务需要持久化、重试机制、分布式执行,BackgroundTasks 无法满足。
问题:如何集成专业的任务队列?
2.2 安装
bash
pip install celery redis2.3 Celery 配置
python
# celery_config.py
from celery import Celery
celery_app: Celery = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def send_email_task(to: str, subject: str, body: str) -> dict[str, str]:
import time
time.sleep(2)
print(f"Email sent to {to}")
return {"status": "sent", "to": to}
@celery_app.task
def process_data_task(data: list) -> dict[str, int]:
result: int = sum(data)
return {"result": result, "count": len(data)}2.4 FastAPI 中使用
python
from fastapi import FastAPI
from celery_config import celery_app, send_email_task, process_data_task
app: FastAPI = FastAPI()
@app.post("/send-email")
async def send_email(email: str, subject: str, body: str) -> dict[str, str]:
task = send_email_task.delay(email, subject, body)
return {"task_id": task.id, "status": "queued"}
@app.post("/process")
async def process_data(numbers: list) -> dict[str, str]:
task = process_data_task.delay(numbers)
return {"task_id": task.id, "status": "queued"}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str) -> dict[str, str | None]:
task = celery_app.AsyncResult(task_id)
return {
"task_id": task.id,
"status": task.state,
"result": task.result if task.ready() else None
}第三部分:完整示例
python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import time
app: FastAPI = FastAPI()
# ==================== 任务函数 ====================
def write_notification(email: str, message: str) -> None:
"""写入通知日志"""
with open("notifications.txt", "a") as f:
f.write(f"Email: {email}, Message: {message}\n")
def send_email_async(email: str, subject: str, body: str) -> None:
"""模拟发送邮件"""
time.sleep(2) # 模拟耗时操作
print(f"Email sent: {subject} to {email}")
def process_batch(items: list[dict]) -> list[dict]:
"""批量处理"""
results: list[dict] = []
for item in items:
time.sleep(0.5)
results.append({"processed": item})
return results
# ==================== 路由 ====================
class EmailRequest(BaseModel):
email: str
subject: str
body: str
class ProcessRequest(BaseModel):
items: list[dict]
@app.post("/notify")
async def notify(email: str, message: str, background_tasks: BackgroundTasks) -> dict[str, str]:
background_tasks.add_task(write_notification, email, message)
return {"message": "Notification queued"}
@app.post("/email")
async def send_email(request: EmailRequest, background_tasks: BackgroundTasks) -> dict[str, str]:
background_tasks.add_task(
send_email_async,
request.email,
request.subject,
request.body
)
return {"message": "Email sending in background"}
@app.post("/batch")
async def process_batch_items(request: ProcessRequest, background_tasks: BackgroundTasks) -> dict[str, int]:
task = background_tasks.add_task(
process_batch,
request.items
)
return {"message": "Batch processing started", "task_id": id(task)}第四部分:L3 专家层
4.1 BackgroundTasks 的事件循环调度机制
FastAPI 的 BackgroundTasks 基于 Starlette 的 BackgroundTask 类实现,其核心原理是在 响应发送之后、连接关闭之前 调度任务执行。
底层流程:
HTTP Request
│
▼
┌─────────────────────────┐
│ ASGI Application │
│ (FastAPI / Starlette) │
│ │
│ 1. 路由匹配 │
│ 2. 依赖注入 │
│ 3. 执行 endpoint │
│ └─> tasks.append() │ ◄── background_tasks.add_task()
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ Response Streaming │
│ 发送 HTTP 响应体 │
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ BackgroundTask.call() │ ◄── 响应发送完毕后触发
│ ───────────────────── │
│ for task in tasks: │
│ if isasync(task): │
│ await task() │
│ else: │
│ task() │ ◄── 同步任务直接调用
└────────────┬────────────┘
│
▼
Connection Close关键实现位于 Starlette 的 Response 类中,background 属性在 __call__ 方法的最后被调用:
python
# Starlette 源码逻辑示意(简化版)
class Response:
def __init__(self, background: BackgroundTask | None = None) -> None:
self.background = background
async def __call__(self, scope, receive, send) -> None:
await send({"type": "http.response.start", ...})
await send({"type": "http.response.body", ...})
if self.background:
await self.background(scope, receive, send) # ◄── 在这里执行事件循环中的位置:
Event Loop Tick
─────────────────────────────────────────
│
├── Callbacks (I/O 完成)
│ └── 读取请求体、写入响应体
│
├── Ready Tasks (可运行的协程)
│ └── endpoint 执行
│
└── Background Tasks (响应后)
└── BackgroundTask.call() ◄── 本 tick 的最后一个阶段同步任务会 阻塞事件循环,因此同步任务应该是轻量操作(如写日志)。重任务应使用 async def 或外部队列。
4.2 Celery vs FastAPI BackgroundTasks 对比
| 维度 | BackgroundTasks | Celery |
|---|---|---|
| 执行位置 | 同一进程、同一事件循环 | 独立 Worker 进程(可分布式) |
| 持久化 | 无,进程重启即丢失 | Broker(Redis/RabbitMQ)持久化 |
| 任务重试 | 不支持 | 内置重试机制(max_retries) |
| 任务监控 | 无内置监控 | Flower / Admin 可视化监控 |
| 延迟执行 | 不支持 | apply_async(countdown=...) |
| 任务优先级 | 不支持 | 优先级队列 |
| 适用场景 | 写日志、发送通知(秒级) | 数据导入、报表生成(分钟~小时级) |
| 运维成本 | 零额外依赖 | 需要 Broker + Worker 部署 |
选型决策树:
任务是否需要在进程重启后仍然存在?
├── 是 ──> Celery
│
└── 否 ──> 任务执行时间是否 < 5 秒?
├── 是 ──> BackgroundTasks
│
└── 否 ──> 是否需要重试/监控?
├── 是 ──> Celery
│
└── 否 ──> asyncio.create_task()4.3 任务的持久化与监控
对于生产环境,仅靠 BackgroundTasks 无法满足任务的可观测性需求。常见的持久化方案:
方案一:数据库 + BackgroundTasks
python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import asyncio
app: FastAPI = FastAPI()
class TaskRecord(BaseModel):
id: int
status: str # "pending" | "running" | "completed" | "failed"
result: str | None = None
async def execute_with_tracking(
task_id: int,
coro: asyncio.Coroutine,
) -> None:
"""带状态跟踪的任务执行器"""
# 1. 更新状态为 running
await update_task_status(task_id, "running")
try:
result = await coro
# 2. 更新状态为 completed
await update_task_status(task_id, "completed", result=str(result))
except Exception as e:
# 3. 更新状态为 failed
await update_task_status(task_id, "failed", error=str(e))
async def update_task_status(
task_id: int,
status: str,
result: str | None = None,
error: str | None = None,
) -> None:
"""模拟更新数据库"""
print(f"Task {task_id}: {status} | result={result} | error={error}")
@app.post("/tracked-task/{task_id}")
async def create_tracked_task(
task_id: int,
background_tasks: BackgroundTasks,
) -> dict[str, int | str]:
background_tasks.add_task(
execute_with_tracking,
task_id,
asyncio.sleep(2), # 模拟异步任务
)
return {"task_id": task_id, "status": "pending"}
@app.get("/task/{task_id}")
async def get_task(task_id: int) -> dict[str, str | int]:
return {"task_id": task_id, "status": "completed", "result": "done"}监控指标体系:
┌──────────────────────────────────────────────┐
│ Task Monitoring │
├──────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Queue │ │ Execution│ │ Result │ │
│ │ Length │ │ Duration │ │ Status │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 积压告警 超时告警 失败告警 │
│ (>1000) (>5min) (rate>5%) │
│ │
└──────────────────────────────────────────────┘总结
| 知识点 | 说明 |
|---|---|
| BackgroundTasks | 轻量后台任务 |
| add_task | 添加任务 |
| Celery | 生产级任务队列 |
| AsyncResult | 任务状态 |