05-认证授权
Python 3.11+
本章讲解 FastAPI 用户认证和权限管理。
第一部分:JWT 认证
1.1 实际场景
API 需要用户登录认证,使用 JWT Token 方式进行无状态认证。
问题:如何实现 JWT Token 认证?
1.2 安装
bash
pip install python-jose passlib[bcrypt]1.3 配置
python
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
app: FastAPI = FastAPI()
# 配置
SECRET_KEY: str = "your-secret-key"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# 密码哈希
pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="token")
# 工具函数
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict) -> str:
to_encode: dict = data.copy()
expire: datetime = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)第二部分:用户模型
2.1 实际场景
用户注册时需要用户名、密码、邮箱,响应时不返回密码。
问题:如何定义用户相关的 Pydantic 模型?
2.2 Pydantic 模型
python
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
username: str
email: EmailStr
class UserCreate(UserBase):
password: str
class UserOut(UserBase):
id: int
is_active: bool
model_config = ConfigDict(from_attributes=True)
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None第三部分:认证流程
3.1 实际场景
用户登录后获取 Token,后续请求携带 Token 访问受保护资源。
问题:如何实现登录和 Token 验证?
3.2 登录接口
python
from fastapi import HTTPException
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> Token:
user: dict | None = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token: str = create_access_token(data={"sub": user["username"]})
return Token(access_token=access_token, token_type="bearer")
def authenticate_user(username: str, password: str) -> dict | None:
user: dict | None = get_user(username)
if not user:
return None
if not verify_password(password, user["hashed_password"]):
return None
return user3.3 受保护路由
python
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload: dict = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str | None = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user: dict | None = get_user(username)
if user is None:
raise credentials_exception
return user
@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)) -> dict:
return current_user第四部分:角色权限
4.1 实际场景
管理员可以访问所有用户列表,普通用户不能访问。
问题:如何实现基于角色的权限控制?
4.2 角色检查
python
class RoleChecker:
def __init__(self, allowed_roles: list[str]):
self.allowed_roles: list[str] = allowed_roles
def __call__(self, current_user: dict = Depends(get_current_user)) -> dict:
if current_user.get("role") not in self.allowed_roles:
raise HTTPException(status_code=403, detail="Operation not permitted")
return current_user
# 使用
@app.get("/admin/users")
async def get_all_users(current_user: dict = Depends(RoleChecker(["admin"]))) -> list[dict]:
return list(users_db.values())第五部分:完整示例
python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Any
app: FastAPI = FastAPI()
# 配置
SECRET_KEY: str = "secret-key-change-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="token")
# 模拟数据库
users_db: dict[str, dict[str, Any]] = {
"john": {
"username": "john",
"email": "john@example.com",
"hashed_password": pwd_context.hash("secret123"),
"is_active": True,
"role": "user"
},
"admin": {
"username": "admin",
"email": "admin@example.com",
"hashed_password": pwd_context.hash("admin123"),
"is_active": True,
"role": "admin"
}
}
# 工具函数
def get_user(username: str) -> dict | None:
return users_db.get(username)
def create_access_token(data: dict) -> str:
to_encode: dict = data.copy()
expire: datetime = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
try:
payload: dict = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str | None = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
user: dict | None = get_user(username)
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
# 路由
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict[str, str]:
user: dict | None = get_user(form_data.username)
if not user or not pwd_context.verify(form_data.password, user["hashed_password"]):
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token: str = create_access_token(data={"sub": user["username"], "role": user["role"]})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)) -> dict:
return current_user
@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)) -> dict[str, str]:
return {"message": f"Hello {current_user['username']}", "role": current_user["role"]}
@app.get("/admin-only")
async def admin_only(current_user: dict = Depends(get_current_user)) -> dict[str, str]:
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return {"message": "Welcome admin"}第六部分:L3 专家层
6.1 OAuth2 授权码流程 vs 密码流程
FastAPI 的 OAuth2PasswordBearer 实现的是 OAuth2 密码流程(Resource Owner Password Credentials Grant),适用于前后端一体或可信客户端。现代 SPA/移动端应使用 授权码流程(Authorization Code Grant + PKCE)。
┌──────────┐ 密码流程 ┌──────────┐
│ Client │──POST /token───────────────→│ Server │
│ │ (username + password) │ │
│ │←── access_token ────────────│ │
└──────────┘ └──────────┘
┌──────────┐ 授权码流程 ┌──────────┐
│ Client │──GET /authorize────────────→│ Server │
│ │←── code ───────────────────│ │
│ │──POST /token───────────────→│ │
│ │ (code + PKCE verifier) │ │
│ │←── access_token + refresh ──│ │
└──────────┘ └──────────┘| 对比维度 | 密码流程(Password Grant) | 授权码流程 + PKCE |
|---|---|---|
| 密码暴露 | 客户端直接持有用户密码 | 密码永不离开认证服务器 |
| 适用场景 | 内部系统、CLI 工具 | SPA、移动端、第三方应用 |
| Token 刷新 | 无 refresh_token | 支持 refresh_token 轮换 |
| 安全性 | 低(密码明文传输风险) | 高(PKCE 防中间人拦截) |
| FastAPI 内置 | OAuth2PasswordBearer | 需自行实现或用 Authlib |
6.2 JWT 签名算法对比:HS256 vs RS256
HS256(对称签名) RS256(非对称签名)
┌─────────────┐ ┌─────────────┐
│ Secret Key │ │ Private Key │→ 签名
│ (共享密钥) │→ 签名 + 验证 │ (服务方持有) │
│ │ └─────────────┘
│ 性能:高 │ ┌─────────────┐
│ 密钥管理:难 │ │ Public Key │→ 验证
│ 适用:单服务 │ │ (公开分发) │
└─────────────┘ └─────────────┘| 对比维度 | HS256 | RS256 |
|---|---|---|
| 密钥类型 | 对称密钥(单一 secret) | 非对称密钥对(公私钥) |
| 签名速度 | 快(HMAC-SHA256) | 较慢(RSA 签名) |
| 验证速度 | 快 | 中等 |
| 密钥分发 | 所有验证方共享同一密钥 | 公钥可公开分发,私钥仅签发方持有 |
| 适用场景 | 单体应用、内部服务 | 微服务、第三方验证、SSO |
python-jose 用法 | algorithm="HS256" | algorithm="RS256" |
python
from typing import Tuple
import jwt # PyJWT, 替代 python-jose
# RS256 示例
def generate_keypair() -> Tuple[str, str]:
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return private_pem, public_pem
def create_rs256_token(data: dict, private_key: str) -> str:
return jwt.encode(data, private_key, algorithm="RS256")
def verify_rs256_token(token: str, public_key: str) -> dict:
return jwt.decode(token, public_key, algorithms=["RS256"])6.3 Bearer Token 传递机制
HTTP 规范(RFC 6750)规定 Bearer Token 通过 Authorization 请求头传递:
GET /users/me HTTP/1.1
Host: api.example.com
Authorization: Bearer <token>
Content-Type: application/jsonHTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="api"| 传递方式 | 格式 | 安全性 |
|---|---|---|
| Authorization Header | Authorization: Bearer <token> | 推荐,HTTP 标准 |
| Query Parameter | ?access_token=<token> | 不推荐(URL 会被日志/代理记录) |
| Request Body | {"access_token": "<token>"} | 仅限 POST,不通用 |
OAuth2PasswordBearer 源码核心逻辑:
python
# fastapi/security/oauth2.py 简化版
class OAuth2PasswordBearer(OAuth2):
async def __call__(self, request: Request) -> str:
authorization: str | None = request.headers.get("Authorization")
if not authorization:
raise HTTPException(status_code=403, detail="Not authenticated")
scheme, _, token = authorization.partition(" ")
if scheme.lower() != "bearer":
raise HTTPException(status_code=401, detail="Invalid authentication scheme")
return token6.4 性能考量
| 操作 | 耗时级别 | 说明 |
|---|---|---|
| HS256 编码 | ~0.01ms | 纯内存 HMAC 运算 |
| RS256 编码 | ~0.5ms | RSA 签名计算 |
| bcrypt 哈希 | ~100-300ms | 故意设计为慢(防暴力破解) |
| JWT 解码 | ~0.001ms | 仅 Base64 + JSON 解析 |
| JWT 验证(HS256) | ~0.01ms | HMAC 校验 |
| JWT 验证(RS256) | ~0.1ms | RSA 公钥验证 |
6.5 设计动机
| 设计选择 | 动机 |
|---|---|
FastAPI 用 Depends() 注入认证 | 利用依赖注入系统实现认证逻辑的可复用和可测试 |
| OAuth2PasswordBearer 返回 token 字符串而非用户对象 | 解耦 token 解析与用户查询,用户由 get_current_user 负责 |
| JWT 无状态设计 | 避免服务端存储 session,水平扩展无需 session 同步 |
| bcrypt 而非 MD5/SHA | bcrypt 内置 salt 和 work factor,抗彩虹表和 GPU 暴力破解 |
6.6 知识关联
认证授权
├── OAuth2 协议 ──→ 密码流程(OAuth2PasswordBearer)
│ └── 授权码流程 + PKCE(SPA/移动端)
│
├── JWT (JSON Web Token)
│ ├── 结构:Header.Payload.Signature
│ ├── HS256:对称签名(单体应用)
│ └── RS256:非对称签名(微服务/SSO)
│
├── 密码安全
│ ├── bcrypt:慢哈希 + 内置 salt
│ ├── argon2:更现代的替代方案
│ └── 永远不存储明文密码
│
├── 权限模型
│ ├── RBAC:基于角色(Role-Based)
│ ├── ABAC:基于属性(Attribute-Based)
│ └── FastAPI 实现:callable class + Depends()
│
└── Token 生命周期
├── access_token:短期(15-30min)
├── refresh_token:长期(7-30天)
└── Token 吊销:黑名单 / 短有效期 + 频繁刷新总结
| 知识点 | 说明 |
|---|---|
| JWT | Token 认证 |
| OAuth2PasswordBearer | OAuth2 流程 |
| 密码哈希 | bcrypt |
| 角色权限 | RBAC |
| 依赖注入 | 认证依赖 |