Skip to content

09-后台任务

Python 3.11+

本章讲解 FastAPI 后台任务处理。


第一部分:BackgroundTasks

1.1 实际场景

发送邮件、生成报表等耗时操作不应该阻塞 HTTP 响应,应该在后台执行。

问题:如何在响应后执行后台任务?

1.2 基本使用

python
from fastapi import FastAPI, BackgroundTasks

app: FastAPI = FastAPI()


def write_log(message: str) -> None:
    with open("log.txt", "a") as f:
        f.write(f"{message}\n")


@app.post("/send-email/{email}")
async def send_email(email: str, background_tasks: BackgroundTasks) -> dict[str, str]:
    background_tasks.add_task(write_log, f"Email sent to {email}")
    return {"message": "Email is being sent"}

1.3 带参数的任务

python
def process_data(data: str, count: int) -> None:
    for i in range(count):
        print(f"Processing {data}: {i+1}/{count}")


@app.post("/process")
async def process(
    data: str,
    count: int = 5,
    background_tasks: BackgroundTasks = None
) -> dict[str, str]:
    background_tasks.add_task(process_data, data, count)
    return {"message": "Processing started"}

第二部分:Celery 集成

2.1 实际场景

后台任务需要持久化、重试机制、分布式执行,BackgroundTasks 无法满足。

问题:如何集成专业的任务队列?

2.2 安装

bash
pip install celery redis

2.3 Celery 配置

python
# celery_config.py
from celery import Celery

celery_app: Celery = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)


@celery_app.task
def send_email_task(to: str, subject: str, body: str) -> dict[str, str]:
    import time
    time.sleep(2)
    print(f"Email sent to {to}")
    return {"status": "sent", "to": to}


@celery_app.task
def process_data_task(data: list) -> dict[str, int]:
    result: int = sum(data)
    return {"result": result, "count": len(data)}

2.4 FastAPI 中使用

python
from fastapi import FastAPI
from celery_config import celery_app, send_email_task, process_data_task

app: FastAPI = FastAPI()


@app.post("/send-email")
async def send_email(email: str, subject: str, body: str) -> dict[str, str]:
    task = send_email_task.delay(email, subject, body)
    return {"task_id": task.id, "status": "queued"}


@app.post("/process")
async def process_data(numbers: list) -> dict[str, str]:
    task = process_data_task.delay(numbers)
    return {"task_id": task.id, "status": "queued"}


@app.get("/task/{task_id}")
async def get_task_status(task_id: str) -> dict[str, str | None]:
    task = celery_app.AsyncResult(task_id)
    return {
        "task_id": task.id,
        "status": task.state,
        "result": task.result if task.ready() else None
    }

第三部分:完整示例

python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import time

app: FastAPI = FastAPI()


# ==================== 任务函数 ====================
def write_notification(email: str, message: str) -> None:
    """写入通知日志"""
    with open("notifications.txt", "a") as f:
        f.write(f"Email: {email}, Message: {message}\n")


def send_email_async(email: str, subject: str, body: str) -> None:
    """模拟发送邮件"""
    time.sleep(2)  # 模拟耗时操作
    print(f"Email sent: {subject} to {email}")


def process_batch(items: list[dict]) -> list[dict]:
    """批量处理"""
    results: list[dict] = []
    for item in items:
        time.sleep(0.5)
        results.append({"processed": item})
    return results


# ==================== 路由 ====================
class EmailRequest(BaseModel):
    email: str
    subject: str
    body: str


class ProcessRequest(BaseModel):
    items: list[dict]


@app.post("/notify")
async def notify(email: str, message: str, background_tasks: BackgroundTasks) -> dict[str, str]:
    background_tasks.add_task(write_notification, email, message)
    return {"message": "Notification queued"}


@app.post("/email")
async def send_email(request: EmailRequest, background_tasks: BackgroundTasks) -> dict[str, str]:
    background_tasks.add_task(
        send_email_async,
        request.email,
        request.subject,
        request.body
    )
    return {"message": "Email sending in background"}


@app.post("/batch")
async def process_batch_items(request: ProcessRequest, background_tasks: BackgroundTasks) -> dict[str, int]:
    task = background_tasks.add_task(
        process_batch,
        request.items
    )
    return {"message": "Batch processing started", "task_id": id(task)}

第四部分:L3 专家层

4.1 BackgroundTasks 的事件循环调度机制

FastAPI 的 BackgroundTasks 基于 Starlette 的 BackgroundTask 类实现,其核心原理是在 响应发送之后、连接关闭之前 调度任务执行。

底层流程:

HTTP Request


┌─────────────────────────┐
│   ASGI Application      │
│   (FastAPI / Starlette) │
│                         │
│  1. 路由匹配             │
│  2. 依赖注入             │
│  3. 执行 endpoint        │
│     └─> tasks.append()  │  ◄── background_tasks.add_task()
└────────────┬────────────┘


┌─────────────────────────┐
│   Response Streaming    │
│   发送 HTTP 响应体       │
└────────────┬────────────┘


┌─────────────────────────┐
│   BackgroundTask.call() │  ◄── 响应发送完毕后触发
│   ───────────────────── │
│   for task in tasks:    │
│       if isasync(task): │
│           await task()  │
│       else:             │
│           task()        │  ◄── 同步任务直接调用
└────────────┬────────────┘


      Connection Close

关键实现位于 Starlette 的 Response 类中,background 属性在 __call__ 方法的最后被调用:

python
# Starlette 源码逻辑示意(简化版)
class Response:
    def __init__(self, background: BackgroundTask | None = None) -> None:
        self.background = background

    async def __call__(self, scope, receive, send) -> None:
        await send({"type": "http.response.start", ...})
        await send({"type": "http.response.body", ...})
        if self.background:
            await self.background(scope, receive, send)  # ◄── 在这里执行

事件循环中的位置:

Event Loop Tick
─────────────────────────────────────────

  ├── Callbacks (I/O 完成)
  │     └── 读取请求体、写入响应体

  ├── Ready Tasks (可运行的协程)
  │     └── endpoint 执行

  └── Background Tasks (响应后)
        └── BackgroundTask.call()  ◄── 本 tick 的最后一个阶段

同步任务会 阻塞事件循环,因此同步任务应该是轻量操作(如写日志)。重任务应使用 async def 或外部队列。

4.2 Celery vs FastAPI BackgroundTasks 对比

维度BackgroundTasksCelery
执行位置同一进程、同一事件循环独立 Worker 进程(可分布式)
持久化无,进程重启即丢失Broker(Redis/RabbitMQ)持久化
任务重试不支持内置重试机制(max_retries
任务监控无内置监控Flower / Admin 可视化监控
延迟执行不支持apply_async(countdown=...)
任务优先级不支持优先级队列
适用场景写日志、发送通知(秒级)数据导入、报表生成(分钟~小时级)
运维成本零额外依赖需要 Broker + Worker 部署

选型决策树:

任务是否需要在进程重启后仍然存在?
    ├── 是 ──> Celery

    └── 否 ──> 任务执行时间是否 < 5 秒?
                 ├── 是 ──> BackgroundTasks

                 └── 否 ──> 是否需要重试/监控?
                              ├── 是 ──> Celery

                              └── 否 ──> asyncio.create_task()

4.3 任务的持久化与监控

对于生产环境,仅靠 BackgroundTasks 无法满足任务的可观测性需求。常见的持久化方案:

方案一:数据库 + BackgroundTasks

python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import asyncio

app: FastAPI = FastAPI()


class TaskRecord(BaseModel):
    id: int
    status: str  # "pending" | "running" | "completed" | "failed"
    result: str | None = None


async def execute_with_tracking(
    task_id: int,
    coro: asyncio.Coroutine,
) -> None:
    """带状态跟踪的任务执行器"""
    # 1. 更新状态为 running
    await update_task_status(task_id, "running")
    try:
        result = await coro
        # 2. 更新状态为 completed
        await update_task_status(task_id, "completed", result=str(result))
    except Exception as e:
        # 3. 更新状态为 failed
        await update_task_status(task_id, "failed", error=str(e))


async def update_task_status(
    task_id: int,
    status: str,
    result: str | None = None,
    error: str | None = None,
) -> None:
    """模拟更新数据库"""
    print(f"Task {task_id}: {status} | result={result} | error={error}")


@app.post("/tracked-task/{task_id}")
async def create_tracked_task(
    task_id: int,
    background_tasks: BackgroundTasks,
) -> dict[str, int | str]:
    background_tasks.add_task(
        execute_with_tracking,
        task_id,
        asyncio.sleep(2),  # 模拟异步任务
    )
    return {"task_id": task_id, "status": "pending"}


@app.get("/task/{task_id}")
async def get_task(task_id: int) -> dict[str, str | int]:
    return {"task_id": task_id, "status": "completed", "result": "done"}

监控指标体系:

┌──────────────────────────────────────────────┐
│              Task Monitoring                  │
├──────────────────────────────────────────────┤
│                                              │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐ │
│  │ Queue    │   │ Execution│   │ Result   │ │
│  │ Length   │   │ Duration │   │ Status   │ │
│  └────┬─────┘   └────┬─────┘   └────┬─────┘ │
│       │              │              │        │
│       ▼              ▼              ▼        │
│  积压告警         超时告警        失败告警    │
│  (>1000)         (>5min)         (rate>5%)  │
│                                              │
└──────────────────────────────────────────────┘

总结

知识点说明
BackgroundTasks轻量后台任务
add_task添加任务
Celery生产级任务队列
AsyncResult任务状态