Skip to content

10-缓存与异步任务

Python 3.11+

本章讲解 Flask 缓存机制和 Celery 异步任务。


第一部分:Flask-Caching

1.1 实际场景

文章列表页面查询数据库耗时较长,每次请求都查询会影响性能。这些数据不需要每次都实时更新。

问题:如何缓存数据减少数据库查询?

1.2 安装与配置

bash
uv add flask-caching redis
python
from flask import Flask
from flask_caching import Cache

app: Flask = Flask(__name__)

# 配置缓存后端
app.config["CACHE_TYPE"] = "redis"
app.config["CACHE_REDIS_HOST"] = "localhost"
app.config["CACHE_REDIS_PORT"] = 6379
app.config["CACHE_REDIS_DB"] = 0

cache: Cache = Cache(app)

缓存后端选项:

类型配置说明
simple默认内存缓存,单进程
redisCACHE_REDIS_HOSTRedis,生产推荐
memcachedCACHE_MEMCACHED_SERVERSMemcached
filesystemCACHE_DIR文件系统
null-无缓存(用于禁用)

第二部分:视图缓存

2.1 实际场景

首页文章列表每 5 分钟更新一次,可以缓存视图响应减少数据库压力。

问题:如何缓存整个视图的响应?

2.2 缓存视图

使用 @cache.cached 装饰器缓存整个视图的响应。

python
from flask_caching import cached
from typing import Any

# 缓存视图 5 分钟
@app.route("/api/articles")
@cached(timeout=300)
def get_articles() -> dict[str, Any]:
    # 模拟耗时操作
    articles: list[Article] = Article.query.all()
    return {"articles": [a.to_dict() for a in articles]}


# 带参数的缓存
@app.route("/api/articles/<int:article_id>")
@cached(timeout=600)
def get_article(article_id: int) -> dict[str, Any]:
    article: Article = Article.query.get_or_404(article_id)
    return article.to_dict()


# 自定义缓存键
@app.route("/api/user/<int:user_id>")
@cached(timeout=300, key_prefix="user_data")
def get_user(user_id: int) -> dict[str, Any]:
    user: User = User.query.get_or_404(user_id)
    return user.to_dict()

第三部分:数据缓存

3.1 实际场景

计算统计数据需要查询大量数据,结果可以缓存一段时间。

问题:如何手动缓存任意数据?

3.2 手动缓存

使用 cache.get()cache.set() 手动缓存任意数据。

python
from flask_caching import Cache
from typing import Any

cache: Cache = Cache()


def get_expensive_data() -> Any:
    """缓存计算结果"""
    # 尝试从缓存获取
    data: Any = cache.get("expensive_data")

    if data is None:
        # 缓存未命中,执行耗时操作
        data = compute_expensive_data()
        # 存入缓存,10 分钟过期
        cache.set("expensive_data", data, timeout=600)

    return data


# 缓存删除
@app.route("/api/cache/clear", methods=["POST"])
def clear_cache() -> dict[str, str]:
    cache.clear()
    return {"message": "缓存已清除"}

第四部分:Celery 集成

4.1 实际场景

发送邮件、处理图片等耗时操作会阻塞请求,用户体验差。

问题:如何异步执行耗时任务?

4.2 安装

bash
uv add celery redis

4.3 项目结构

app/
├── celery_app.py      # Celery 配置
├── tasks.py           # 任务定义
└── routes.py          # 视图路由

4.4 Celery 配置

python
# celery_app.py
from celery import Celery
from flask import Flask


def make_celery(app: Flask) -> Celery:
    celery: Celery = Celery(
        app.import_name,
        broker=app.config["CELERY_BROKER_URL"],
        backend=app.config["CELERY_RESULT_BACKEND"]
    )

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs) -> Any:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

4.5 任务定义

python
# tasks.py
from celery import current_task
import time
from typing import Any

celery: Celery = Celery()


@celery.task
def send_email(to: str, subject: str, body: str) -> bool:
    """发送邮件任务"""
    time.sleep(2)  # 模拟发送延迟
    print(f"发送邮件到 {to}: {subject}")
    return True


@celery.task
def process_image(image_path: str) -> str:
    """处理图片任务"""
    time.sleep(5)
    return f"处理完成:{image_path}"


@celery.task(bind=True)
def long_running_task(self, data: Any) -> str:
    """带进度追踪的长任务"""
    for i in range(10):
        time.sleep(1)
        # 更新任务状态
        self.update_state(state="PROGRESS", meta={"current": i + 1, "total": 10})
    return "完成"

第五部分:任务队列使用

5.1 实际场景

用户提交邮件发送请求后,立即返回响应,邮件在后台发送。

问题:如何异步执行任务并追踪状态?

5.2 异步执行任务

python
from flask import Blueprint, request, jsonify
from typing import Any

api_bp: Blueprint = Blueprint("api", __name__)


@api_bp.route("/api/send-email", methods=["POST"])
def queue_email() -> tuple[dict[str, Any], int]:
    """异步发送邮件"""
    data: dict[str, Any] = request.get_json()

    # 异步执行,立即返回
    task = send_email.delay(
        to=data["to"],
        subject=data["subject"],
        body=data["body"]
    )

    return {
        "message": "邮件已加入队列",
        "task_id": task.id
    }, 202


@api_bp.route("/api/task-status/<task_id>")
def task_status(task_id: str) -> tuple[dict[str, Any], int]:
    """查询任务状态"""
    from celery.result import AsyncResult

    task: AsyncResult = AsyncResult(task_id)

    if task.state == "PENDING":
        response: dict[str, Any] = {"state": task.state, "status": "任务等待执行"}
    elif task.state == "PROGRESS":
        response = {
            "state": task.state,
            "progress": task.info.get("current", 0),
            "total": task.info.get("total", 0)
        }
    elif task.state == "SUCCESS":
        response = {"state": task.state, "result": task.result}
    else:
        response = {"state": task.state, "status": str(task.info)}

    status_code: int = 200 if task.state == "SUCCESS" else 202
    return jsonify(response), status_code

第六部分:L3 专家层

6.1 缓存策略:LRU / LFU / 随机

缓存的核心问题是 淘汰策略 — 当缓存空间满时,决定移除哪些数据。

淘汰算法对比

+-------------------------------------------------------------------+
|                        缓存淘汰策略                                 |
+-------------------------------------------------------------------+
|                                                                   |
|  LRU (Least Recently Used)    最近最少使用                         |
|  ┌───┐  ┌───┐  ┌───┐  ┌───┐                                     |
|  | A |->| B |->| C |->| D |  ← 最近访问                           |
|  └───┘  └───┘  └───┘  └───┘                                     |
|   ↑ 淘汰最久未访问的                                                |
|                                                                   |
|  LFU (Least Frequently Used)  最少频率使用                         |
|  ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐                          |
|  |A:1次 |  |B:5次  |  |C:3次  |  |D:10次 |                         |
|  └──────┘  └──────┘  └──────┘  └──────┘                          |
|   ↑ 淘汰访问次数最少的                                              |
|                                                                   |
|  Random                      随机淘汰                              |
|  ┌───┐  ┌───┐  ┌───┐  ┌───┐                                     |
|  | A |  | B |  | C |  | D |  → 随机选一个移除                       |
|  └───┘  └───┘  └───┘  └───┘                                     |
|                                                                   |
+-------------------------------------------------------------------+
python
from collections import OrderedDict
from typing import TypeVar, Generic

T = TypeVar("T")

class LRUCache(Generic[T]):
    """基于 OrderedDict 的 LRU 缓存实现"""

    def __init__(self, capacity: int) -> None:
        self._cache: OrderedDict[str, T] = OrderedDict()
        self._capacity: int = capacity

    def get(self, key: str) -> T | None:
        if key not in self._cache:
            return None
        # 访问时移到末尾(最近使用)
        self._cache.move_to_end(key)
        return self._cache[key]

    def put(self, key: str, value: T) -> None:
        if key in self._cache:
            self._cache.move_to_end(key)
        self._cache[key] = value
        if len(self._cache) > self._capacity:
            # 淘汰最久未使用的(队首)
            self._cache.popitem(last=False)

性能考量

策略查找复杂度插入复杂度淘汰复杂度适用场景
LRUO(1)O(1)O(1)时间局部性强(最近访问 ≈ 未来访问)
LFUO(1)O(1)O(n) 或 O(log n)频率局部性强(热点数据长期稳定)
RandomO(1)O(1)O(1)简单场景,无明确访问模式

Redis 实际使用 近似 LRU(采样少量 key 比较访问时间),以 O(1) 复杂度近似精确 LRU 效果。

设计动机

策略选择为什么
LRU 最常用实现简单,对大多数 Web 场景效果好
LFU 用于热点防止"扫描污染"(一次性大量访问淘汰掉真正的热点)
Random 保底无需维护额外数据结构,内存开销最小

6.2 Celery 的消息队列架构

Celery 的核心是三组件架构:Broker → Worker → Result Backend

架构图

+------------------+     +-------------------+     +---------------------+
|   Flask App      |     |   Message Broker  |     |   Celery Worker     |
|   (Producer)     |     |   (Redis/RabbitMQ)|     |   (Consumer)        |
|                  |     |                   |     |                     |
|  task.delay()    |---->|  ┌─────────────┐  |---->|  worker.process()   |
|  → 消息序列化     |     |  │ Queue: celery│  |     |  反序列化任务        |
|  → 推送到 Broker  |     |  │ - msg_1      │  |     |  执行函数            |
+------------------+     |  │ - msg_2      │  |     |  写回结果            |
                         |  └─────────────┘  |     +----------+----------+
                         +-------------------+                |
                                                              |
                                                    +---------v----------+
                                                    |  Result Backend    |
                                                    |  (Redis/DB/RPC)    |
                                                    |                    |
                                                    |  task_id → result  |
                                                    |  status → SUCCESS  |
                                                    +--------------------+
python
from celery import Celery, Task
from typing import Any

# Broker 负责存储待执行任务的消息
# Result Backend 负责存储任务执行结果
celery_app: Celery = Celery(
    "worker",
    broker="redis://localhost:6379/0",        # 消息队列
    backend="redis://localhost:6379/1",       # 结果存储
)

# 消息体结构(简化)
# {
#   "id": "uuid-xxx",
#   "task": "tasks.send_email",
#   "args": ["user@example.com", "Hello", "Body"],
#   "kwargs": {},
#   "retries": 0,
#   "eta": null,                           # 预计执行时间(定时任务)
#   "expires": null,                       # 过期时间
#   "queue": "celery",                     # 队列名称
# }

多队列路由

python
# 不同任务路由到不同队列
celery_app.conf.task_routes = {
    "tasks.send_email":    {"queue": "email"},
    "tasks.process_image": {"queue": "image"},
    "tasks.export_report": {"queue": "report"},
}

# 启动时指定消费的队列
# celery -A app worker -Q email,image

设计动机

组件为什么分离
Broker解耦生产者与消费者,提供消息持久化和可靠性
Worker独立进程,可水平扩展,失败不影响主应用
Result Backend查询结果不需要与 Worker 直接通信
多队列不同任务优先级和资源需求不同,可独立调度

6.3 异步任务的可靠性保证

生产环境中,任务可能失败、超时、或 Worker 崩溃。Celery 提供了多层可靠性机制。

可靠性机制

+---------------------------------------------------------------+
|                    任务可靠性保障层级                            |
+---------------------------------------------------------------+
|                                                               |
|  1. 重试 (Retry)                                              |
|     ┌──────────┐    fail    ┌──────────┐    fail    ┌───────┐|
|     │ 第1次执行 │───────────>│ 第2次执行 │───────────>│ 第3次 │|
|     └──────────┘  backoff   └──────────┘  backoff   └───┬───┘|
|                                                         |    |
|  2. 死信队列 (Dead Letter Queue)                         |    |
|     超过最大重试次数 ──────────────────────────────────>│    |
|                                                        \|/   |
|                                               ┌──────────────┐|
|                                               │ Dead Letter  │|
|                                               │ Queue        │|
|                                               │ (人工干预)    │|
|                                               └──────────────┘|
|                                                               |
|  3. ACK 机制 (Acknowledgment)                                 |
|     Worker 取走任务 → 执行成功 → 发送 ACK → Broker 删除消息    |
|     Worker 崩溃未 ACK → Broker 重新投递到其他 Worker           |
|                                                               |
+---------------------------------------------------------------+
python
from celery import Celery
from celery.exceptions import Retry
from typing import Any

celery_app: Celery = Celery("worker", broker="redis://localhost:6379/0")

@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,           # 默认重试间隔 60 秒
    acks_late=True,                   # 任务完成后才 ACK(防止 Worker 崩溃丢失)
    reject_on_worker_lost=True,       # Worker 丢失时重新入队
)
def send_notification(self: Task, user_id: int, message: str) -> bool:
    """带可靠性保障的通知发送任务"""
    try:
        result: bool = _send_to_provider(user_id, message)
        return result
    except ConnectionError as exc:
        # 指数退避重试:60s → 120s → 240s
        countdown: int = self.default_retry_delay * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=countdown)
    except ValueError as exc:
        # 参数错误不重试,直接失败
        raise exc


@celery_app.task(bind=True, max_retries=5)
def process_payment(self: Task, order_id: str, amount: float) -> dict[str, Any]:
    """支付处理 — 使用指数退避 + 抖动"""
    import random
    try:
        return _execute_payment(order_id, amount)
    except TimeoutError as exc:
        # 指数退避 + 随机抖动(避免重试风暴)
        base_delay: int = 60
        jitter: float = random.uniform(0.5, 1.5)
        countdown: float = base_delay * (2 ** self.request.retries) * jitter
        raise self.retry(exc=exc, countdown=int(countdown))

性能考量

机制开销效果适用场景
重试额外消息 + Worker 负载临时故障自动恢复网络超时、第三方服务不稳
ACK Late消息在队列停留更久防止 Worker 崩溃丢失关键业务(支付、通知)
死信队列需要监控和人工处理失败任务不丢失、可追溯所有生产任务
指数退避延迟完成避免重试风暴下游服务恢复中

设计动机

设计解决的问题
acks_late=True默认 ACK 在任务执行前,Worker 崩溃会导致任务丢失
reject_on_worker_lost区分"任务失败"和"Worker 崩溃",后者应重新投递
死信队列防止无限重试消耗资源,提供人工介入入口
指数退避 + 抖动防止大量任务同时重试造成下游服务雪崩

6.4 知识关联

+---------------------+          +------------------------+          +--------------------+
| 缓存淘汰策略         |          | Celery 消息架构         |          | 任务可靠性          |
+---------------------+          +------------------------+          +--------------------+
|  LRU: 时间局部性    |          |  Broker: 消息持久化     |          |  Retry: 自动恢复    |
|  LFU: 频率局部性    |--------->|  Worker: 隔离执行       |--------->|  ACK: 消息确认      |
|  Random: 简单保底   |          |  Backend: 结果存储      |          |  DLQ: 人工介入      |
|  Redis 近似 LRU    |          |  多队列路由             |          |  指数退避 + 抖动    |
+---------------------+          +------------------------+          +--------------------+
         |                                 |                                 |
         v                                 v                                 v
+---------------------------------------------------------------------------+
|                          设计目标                                          |
|  降低延迟 · 提高吞吐 · 容错恢复 · 资源可控 · 可观测性                      |
+---------------------------------------------------------------------------+

总结

知识点说明
Flask-Caching缓存扩展
@cached视图缓存装饰器
cache.set/get手动缓存
Celery异步任务队列
task.delay()异步执行任务
AsyncResult任务状态查询