10-缓存与异步任务
Python 3.11+
本章讲解 Flask 缓存机制和 Celery 异步任务。
第一部分:Flask-Caching
1.1 实际场景
文章列表页面查询数据库耗时较长,每次请求都查询会影响性能。这些数据不需要每次都实时更新。
问题:如何缓存数据减少数据库查询?
1.2 安装与配置
bash
uv add flask-caching redispython
from flask import Flask
from flask_caching import Cache
app: Flask = Flask(__name__)
# 配置缓存后端
app.config["CACHE_TYPE"] = "redis"
app.config["CACHE_REDIS_HOST"] = "localhost"
app.config["CACHE_REDIS_PORT"] = 6379
app.config["CACHE_REDIS_DB"] = 0
cache: Cache = Cache(app)缓存后端选项:
| 类型 | 配置 | 说明 |
|---|---|---|
simple | 默认 | 内存缓存,单进程 |
redis | CACHE_REDIS_HOST | Redis,生产推荐 |
memcached | CACHE_MEMCACHED_SERVERS | Memcached |
filesystem | CACHE_DIR | 文件系统 |
null | - | 无缓存(用于禁用) |
第二部分:视图缓存
2.1 实际场景
首页文章列表每 5 分钟更新一次,可以缓存视图响应减少数据库压力。
问题:如何缓存整个视图的响应?
2.2 缓存视图
使用 @cache.cached 装饰器缓存整个视图的响应。
python
from flask_caching import cached
from typing import Any
# 缓存视图 5 分钟
@app.route("/api/articles")
@cached(timeout=300)
def get_articles() -> dict[str, Any]:
# 模拟耗时操作
articles: list[Article] = Article.query.all()
return {"articles": [a.to_dict() for a in articles]}
# 带参数的缓存
@app.route("/api/articles/<int:article_id>")
@cached(timeout=600)
def get_article(article_id: int) -> dict[str, Any]:
article: Article = Article.query.get_or_404(article_id)
return article.to_dict()
# 自定义缓存键
@app.route("/api/user/<int:user_id>")
@cached(timeout=300, key_prefix="user_data")
def get_user(user_id: int) -> dict[str, Any]:
user: User = User.query.get_or_404(user_id)
return user.to_dict()第三部分:数据缓存
3.1 实际场景
计算统计数据需要查询大量数据,结果可以缓存一段时间。
问题:如何手动缓存任意数据?
3.2 手动缓存
使用 cache.get() 和 cache.set() 手动缓存任意数据。
python
from flask_caching import Cache
from typing import Any
cache: Cache = Cache()
def get_expensive_data() -> Any:
"""缓存计算结果"""
# 尝试从缓存获取
data: Any = cache.get("expensive_data")
if data is None:
# 缓存未命中,执行耗时操作
data = compute_expensive_data()
# 存入缓存,10 分钟过期
cache.set("expensive_data", data, timeout=600)
return data
# 缓存删除
@app.route("/api/cache/clear", methods=["POST"])
def clear_cache() -> dict[str, str]:
cache.clear()
return {"message": "缓存已清除"}第四部分:Celery 集成
4.1 实际场景
发送邮件、处理图片等耗时操作会阻塞请求,用户体验差。
问题:如何异步执行耗时任务?
4.2 安装
bash
uv add celery redis4.3 项目结构
app/
├── celery_app.py # Celery 配置
├── tasks.py # 任务定义
└── routes.py # 视图路由4.4 Celery 配置
python
# celery_app.py
from celery import Celery
from flask import Flask
def make_celery(app: Flask) -> Celery:
celery: Celery = Celery(
app.import_name,
broker=app.config["CELERY_BROKER_URL"],
backend=app.config["CELERY_RESULT_BACKEND"]
)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs) -> Any:
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery4.5 任务定义
python
# tasks.py
from celery import current_task
import time
from typing import Any
celery: Celery = Celery()
@celery.task
def send_email(to: str, subject: str, body: str) -> bool:
"""发送邮件任务"""
time.sleep(2) # 模拟发送延迟
print(f"发送邮件到 {to}: {subject}")
return True
@celery.task
def process_image(image_path: str) -> str:
"""处理图片任务"""
time.sleep(5)
return f"处理完成:{image_path}"
@celery.task(bind=True)
def long_running_task(self, data: Any) -> str:
"""带进度追踪的长任务"""
for i in range(10):
time.sleep(1)
# 更新任务状态
self.update_state(state="PROGRESS", meta={"current": i + 1, "total": 10})
return "完成"第五部分:任务队列使用
5.1 实际场景
用户提交邮件发送请求后,立即返回响应,邮件在后台发送。
问题:如何异步执行任务并追踪状态?
5.2 异步执行任务
python
from flask import Blueprint, request, jsonify
from typing import Any
api_bp: Blueprint = Blueprint("api", __name__)
@api_bp.route("/api/send-email", methods=["POST"])
def queue_email() -> tuple[dict[str, Any], int]:
"""异步发送邮件"""
data: dict[str, Any] = request.get_json()
# 异步执行,立即返回
task = send_email.delay(
to=data["to"],
subject=data["subject"],
body=data["body"]
)
return {
"message": "邮件已加入队列",
"task_id": task.id
}, 202
@api_bp.route("/api/task-status/<task_id>")
def task_status(task_id: str) -> tuple[dict[str, Any], int]:
"""查询任务状态"""
from celery.result import AsyncResult
task: AsyncResult = AsyncResult(task_id)
if task.state == "PENDING":
response: dict[str, Any] = {"state": task.state, "status": "任务等待执行"}
elif task.state == "PROGRESS":
response = {
"state": task.state,
"progress": task.info.get("current", 0),
"total": task.info.get("total", 0)
}
elif task.state == "SUCCESS":
response = {"state": task.state, "result": task.result}
else:
response = {"state": task.state, "status": str(task.info)}
status_code: int = 200 if task.state == "SUCCESS" else 202
return jsonify(response), status_code第六部分:L3 专家层
6.1 缓存策略:LRU / LFU / 随机
缓存的核心问题是 淘汰策略 — 当缓存空间满时,决定移除哪些数据。
淘汰算法对比
+-------------------------------------------------------------------+
| 缓存淘汰策略 |
+-------------------------------------------------------------------+
| |
| LRU (Least Recently Used) 最近最少使用 |
| ┌───┐ ┌───┐ ┌───┐ ┌───┐ |
| | A |->| B |->| C |->| D | ← 最近访问 |
| └───┘ └───┘ └───┘ └───┘ |
| ↑ 淘汰最久未访问的 |
| |
| LFU (Least Frequently Used) 最少频率使用 |
| ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ |
| |A:1次 | |B:5次 | |C:3次 | |D:10次 | |
| └──────┘ └──────┘ └──────┘ └──────┘ |
| ↑ 淘汰访问次数最少的 |
| |
| Random 随机淘汰 |
| ┌───┐ ┌───┐ ┌───┐ ┌───┐ |
| | A | | B | | C | | D | → 随机选一个移除 |
| └───┘ └───┘ └───┘ └───┘ |
| |
+-------------------------------------------------------------------+python
from collections import OrderedDict
from typing import TypeVar, Generic
T = TypeVar("T")
class LRUCache(Generic[T]):
"""基于 OrderedDict 的 LRU 缓存实现"""
def __init__(self, capacity: int) -> None:
self._cache: OrderedDict[str, T] = OrderedDict()
self._capacity: int = capacity
def get(self, key: str) -> T | None:
if key not in self._cache:
return None
# 访问时移到末尾(最近使用)
self._cache.move_to_end(key)
return self._cache[key]
def put(self, key: str, value: T) -> None:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
if len(self._cache) > self._capacity:
# 淘汰最久未使用的(队首)
self._cache.popitem(last=False)性能考量
| 策略 | 查找复杂度 | 插入复杂度 | 淘汰复杂度 | 适用场景 |
|---|---|---|---|---|
| LRU | O(1) | O(1) | O(1) | 时间局部性强(最近访问 ≈ 未来访问) |
| LFU | O(1) | O(1) | O(n) 或 O(log n) | 频率局部性强(热点数据长期稳定) |
| Random | O(1) | O(1) | O(1) | 简单场景,无明确访问模式 |
Redis 实际使用 近似 LRU(采样少量 key 比较访问时间),以 O(1) 复杂度近似精确 LRU 效果。
设计动机
| 策略选择 | 为什么 |
|---|---|
| LRU 最常用 | 实现简单,对大多数 Web 场景效果好 |
| LFU 用于热点 | 防止"扫描污染"(一次性大量访问淘汰掉真正的热点) |
| Random 保底 | 无需维护额外数据结构,内存开销最小 |
6.2 Celery 的消息队列架构
Celery 的核心是三组件架构:Broker → Worker → Result Backend。
架构图
+------------------+ +-------------------+ +---------------------+
| Flask App | | Message Broker | | Celery Worker |
| (Producer) | | (Redis/RabbitMQ)| | (Consumer) |
| | | | | |
| task.delay() |---->| ┌─────────────┐ |---->| worker.process() |
| → 消息序列化 | | │ Queue: celery│ | | 反序列化任务 |
| → 推送到 Broker | | │ - msg_1 │ | | 执行函数 |
+------------------+ | │ - msg_2 │ | | 写回结果 |
| └─────────────┘ | +----------+----------+
+-------------------+ |
|
+---------v----------+
| Result Backend |
| (Redis/DB/RPC) |
| |
| task_id → result |
| status → SUCCESS |
+--------------------+python
from celery import Celery, Task
from typing import Any
# Broker 负责存储待执行任务的消息
# Result Backend 负责存储任务执行结果
celery_app: Celery = Celery(
"worker",
broker="redis://localhost:6379/0", # 消息队列
backend="redis://localhost:6379/1", # 结果存储
)
# 消息体结构(简化)
# {
# "id": "uuid-xxx",
# "task": "tasks.send_email",
# "args": ["user@example.com", "Hello", "Body"],
# "kwargs": {},
# "retries": 0,
# "eta": null, # 预计执行时间(定时任务)
# "expires": null, # 过期时间
# "queue": "celery", # 队列名称
# }多队列路由
python
# 不同任务路由到不同队列
celery_app.conf.task_routes = {
"tasks.send_email": {"queue": "email"},
"tasks.process_image": {"queue": "image"},
"tasks.export_report": {"queue": "report"},
}
# 启动时指定消费的队列
# celery -A app worker -Q email,image设计动机
| 组件 | 为什么分离 |
|---|---|
| Broker | 解耦生产者与消费者,提供消息持久化和可靠性 |
| Worker | 独立进程,可水平扩展,失败不影响主应用 |
| Result Backend | 查询结果不需要与 Worker 直接通信 |
| 多队列 | 不同任务优先级和资源需求不同,可独立调度 |
6.3 异步任务的可靠性保证
生产环境中,任务可能失败、超时、或 Worker 崩溃。Celery 提供了多层可靠性机制。
可靠性机制
+---------------------------------------------------------------+
| 任务可靠性保障层级 |
+---------------------------------------------------------------+
| |
| 1. 重试 (Retry) |
| ┌──────────┐ fail ┌──────────┐ fail ┌───────┐|
| │ 第1次执行 │───────────>│ 第2次执行 │───────────>│ 第3次 │|
| └──────────┘ backoff └──────────┘ backoff └───┬───┘|
| | |
| 2. 死信队列 (Dead Letter Queue) | |
| 超过最大重试次数 ──────────────────────────────────>│ |
| \|/ |
| ┌──────────────┐|
| │ Dead Letter │|
| │ Queue │|
| │ (人工干预) │|
| └──────────────┘|
| |
| 3. ACK 机制 (Acknowledgment) |
| Worker 取走任务 → 执行成功 → 发送 ACK → Broker 删除消息 |
| Worker 崩溃未 ACK → Broker 重新投递到其他 Worker |
| |
+---------------------------------------------------------------+python
from celery import Celery
from celery.exceptions import Retry
from typing import Any
celery_app: Celery = Celery("worker", broker="redis://localhost:6379/0")
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60, # 默认重试间隔 60 秒
acks_late=True, # 任务完成后才 ACK(防止 Worker 崩溃丢失)
reject_on_worker_lost=True, # Worker 丢失时重新入队
)
def send_notification(self: Task, user_id: int, message: str) -> bool:
"""带可靠性保障的通知发送任务"""
try:
result: bool = _send_to_provider(user_id, message)
return result
except ConnectionError as exc:
# 指数退避重试:60s → 120s → 240s
countdown: int = self.default_retry_delay * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=countdown)
except ValueError as exc:
# 参数错误不重试,直接失败
raise exc
@celery_app.task(bind=True, max_retries=5)
def process_payment(self: Task, order_id: str, amount: float) -> dict[str, Any]:
"""支付处理 — 使用指数退避 + 抖动"""
import random
try:
return _execute_payment(order_id, amount)
except TimeoutError as exc:
# 指数退避 + 随机抖动(避免重试风暴)
base_delay: int = 60
jitter: float = random.uniform(0.5, 1.5)
countdown: float = base_delay * (2 ** self.request.retries) * jitter
raise self.retry(exc=exc, countdown=int(countdown))性能考量
| 机制 | 开销 | 效果 | 适用场景 |
|---|---|---|---|
| 重试 | 额外消息 + Worker 负载 | 临时故障自动恢复 | 网络超时、第三方服务不稳 |
| ACK Late | 消息在队列停留更久 | 防止 Worker 崩溃丢失 | 关键业务(支付、通知) |
| 死信队列 | 需要监控和人工处理 | 失败任务不丢失、可追溯 | 所有生产任务 |
| 指数退避 | 延迟完成 | 避免重试风暴 | 下游服务恢复中 |
设计动机
| 设计 | 解决的问题 |
|---|---|
acks_late=True | 默认 ACK 在任务执行前,Worker 崩溃会导致任务丢失 |
reject_on_worker_lost | 区分"任务失败"和"Worker 崩溃",后者应重新投递 |
| 死信队列 | 防止无限重试消耗资源,提供人工介入入口 |
| 指数退避 + 抖动 | 防止大量任务同时重试造成下游服务雪崩 |
6.4 知识关联
+---------------------+ +------------------------+ +--------------------+
| 缓存淘汰策略 | | Celery 消息架构 | | 任务可靠性 |
+---------------------+ +------------------------+ +--------------------+
| LRU: 时间局部性 | | Broker: 消息持久化 | | Retry: 自动恢复 |
| LFU: 频率局部性 |--------->| Worker: 隔离执行 |--------->| ACK: 消息确认 |
| Random: 简单保底 | | Backend: 结果存储 | | DLQ: 人工介入 |
| Redis 近似 LRU | | 多队列路由 | | 指数退避 + 抖动 |
+---------------------+ +------------------------+ +--------------------+
| | |
v v v
+---------------------------------------------------------------------------+
| 设计目标 |
| 降低延迟 · 提高吞吐 · 容错恢复 · 资源可控 · 可观测性 |
+---------------------------------------------------------------------------+总结
| 知识点 | 说明 |
|---|---|
| Flask-Caching | 缓存扩展 |
| @cached | 视图缓存装饰器 |
| cache.set/get | 手动缓存 |
| Celery | 异步任务队列 |
| task.delay() | 异步执行任务 |
| AsyncResult | 任务状态查询 |