Skip to content

05-认证授权

Python 3.11+

本章讲解 FastAPI 用户认证和权限管理。


第一部分:JWT 认证

1.1 实际场景

API 需要用户登录认证,使用 JWT Token 方式进行无状态认证。

问题:如何实现 JWT Token 认证?

1.2 安装

bash
pip install python-jose passlib[bcrypt]

1.3 配置

python
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta

app: FastAPI = FastAPI()

# 配置
SECRET_KEY: str = "your-secret-key"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30

# 密码哈希
pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="token")


# 工具函数
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)


def create_access_token(data: dict) -> str:
    to_encode: dict = data.copy()
    expire: datetime = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

第二部分:用户模型

2.1 实际场景

用户注册时需要用户名、密码、邮箱,响应时不返回密码。

问题:如何定义用户相关的 Pydantic 模型?

2.2 Pydantic 模型

python
from pydantic import BaseModel, EmailStr


class UserBase(BaseModel):
    username: str
    email: EmailStr


class UserCreate(UserBase):
    password: str


class UserOut(UserBase):
    id: int
    is_active: bool
    
    model_config = ConfigDict(from_attributes=True)


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None

第三部分:认证流程

3.1 实际场景

用户登录后获取 Token,后续请求携带 Token 访问受保护资源。

问题:如何实现登录和 Token 验证?

3.2 登录接口

python
from fastapi import HTTPException


@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> Token:
    user: dict | None = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    
    access_token: str = create_access_token(data={"sub": user["username"]})
    return Token(access_token=access_token, token_type="bearer")


def authenticate_user(username: str, password: str) -> dict | None:
    user: dict | None = get_user(username)
    if not user:
        return None
    if not verify_password(password, user["hashed_password"]):
        return None
    return user

3.3 受保护路由

python
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload: dict = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str | None = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    
    user: dict | None = get_user(username)
    if user is None:
        raise credentials_exception
    return user


@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)) -> dict:
    return current_user

第四部分:角色权限

4.1 实际场景

管理员可以访问所有用户列表,普通用户不能访问。

问题:如何实现基于角色的权限控制?

4.2 角色检查

python
class RoleChecker:
    def __init__(self, allowed_roles: list[str]):
        self.allowed_roles: list[str] = allowed_roles
    
    def __call__(self, current_user: dict = Depends(get_current_user)) -> dict:
        if current_user.get("role") not in self.allowed_roles:
            raise HTTPException(status_code=403, detail="Operation not permitted")
        return current_user


# 使用
@app.get("/admin/users")
async def get_all_users(current_user: dict = Depends(RoleChecker(["admin"]))) -> list[dict]:
    return list(users_db.values())

第五部分:完整示例

python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Any

app: FastAPI = FastAPI()

# 配置
SECRET_KEY: str = "secret-key-change-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30

pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="token")

# 模拟数据库
users_db: dict[str, dict[str, Any]] = {
    "john": {
        "username": "john",
        "email": "john@example.com",
        "hashed_password": pwd_context.hash("secret123"),
        "is_active": True,
        "role": "user"
    },
    "admin": {
        "username": "admin",
        "email": "admin@example.com",
        "hashed_password": pwd_context.hash("admin123"),
        "is_active": True,
        "role": "admin"
    }
}


# 工具函数
def get_user(username: str) -> dict | None:
    return users_db.get(username)


def create_access_token(data: dict) -> str:
    to_encode: dict = data.copy()
    expire: datetime = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
    try:
        payload: dict = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str | None = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid token")
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")
    
    user: dict | None = get_user(username)
    if user is None:
        raise HTTPException(status_code=401, detail="User not found")
    return user


# 路由
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict[str, str]:
    user: dict | None = get_user(form_data.username)
    if not user or not pwd_context.verify(form_data.password, user["hashed_password"]):
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    
    access_token: str = create_access_token(data={"sub": user["username"], "role": user["role"]})
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)) -> dict:
    return current_user


@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)) -> dict[str, str]:
    return {"message": f"Hello {current_user['username']}", "role": current_user["role"]}


@app.get("/admin-only")
async def admin_only(current_user: dict = Depends(get_current_user)) -> dict[str, str]:
    if current_user["role"] != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    return {"message": "Welcome admin"}

第六部分:L3 专家层

6.1 OAuth2 授权码流程 vs 密码流程

FastAPI 的 OAuth2PasswordBearer 实现的是 OAuth2 密码流程(Resource Owner Password Credentials Grant),适用于前后端一体或可信客户端。现代 SPA/移动端应使用 授权码流程(Authorization Code Grant + PKCE)

┌──────────┐  密码流程                    ┌──────────┐
│  Client  │──POST /token───────────────→│  Server  │
│          │  (username + password)       │          │
│          │←── access_token ────────────│          │
└──────────┘                            └──────────┘

┌──────────┐  授权码流程                  ┌──────────┐
│  Client  │──GET /authorize────────────→│  Server  │
│          │←── code ───────────────────│          │
│          │──POST /token───────────────→│          │
│          │  (code + PKCE verifier)      │          │
│          │←── access_token + refresh ──│          │
└──────────┘                            └──────────┘
对比维度密码流程(Password Grant)授权码流程 + PKCE
密码暴露客户端直接持有用户密码密码永不离开认证服务器
适用场景内部系统、CLI 工具SPA、移动端、第三方应用
Token 刷新无 refresh_token支持 refresh_token 轮换
安全性低(密码明文传输风险)高(PKCE 防中间人拦截)
FastAPI 内置OAuth2PasswordBearer需自行实现或用 Authlib

6.2 JWT 签名算法对比:HS256 vs RS256

HS256(对称签名)                     RS256(非对称签名)
┌─────────────┐                       ┌─────────────┐
│ Secret Key  │                       │ Private Key │→ 签名
│  (共享密钥)  │→ 签名 + 验证          │ (服务方持有) │
│             │                       └─────────────┘
│ 性能:高     │                       ┌─────────────┐
│ 密钥管理:难 │                       │ Public Key  │→ 验证
│ 适用:单服务 │                       │ (公开分发)   │
└─────────────┘                       └─────────────┘
对比维度HS256RS256
密钥类型对称密钥(单一 secret)非对称密钥对(公私钥)
签名速度快(HMAC-SHA256)较慢(RSA 签名)
验证速度中等
密钥分发所有验证方共享同一密钥公钥可公开分发,私钥仅签发方持有
适用场景单体应用、内部服务微服务、第三方验证、SSO
python-jose 用法algorithm="HS256"algorithm="RS256"
python
from typing import Tuple
import jwt  # PyJWT, 替代 python-jose

# RS256 示例
def generate_keypair() -> Tuple[str, str]:
    from cryptography.hazmat.primitives.asymmetric import rsa
    from cryptography.hazmat.primitives import serialization

    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
    )
    private_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption(),
    ).decode()

    public_pem = private_key.public_key().public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo,
    ).decode()

    return private_pem, public_pem

def create_rs256_token(data: dict, private_key: str) -> str:
    return jwt.encode(data, private_key, algorithm="RS256")

def verify_rs256_token(token: str, public_key: str) -> dict:
    return jwt.decode(token, public_key, algorithms=["RS256"])

6.3 Bearer Token 传递机制

HTTP 规范(RFC 6750)规定 Bearer Token 通过 Authorization 请求头传递:

GET /users/me HTTP/1.1
Host: api.example.com
Authorization: Bearer <token>
Content-Type: application/json
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="api"
传递方式格式安全性
Authorization HeaderAuthorization: Bearer <token>推荐,HTTP 标准
Query Parameter?access_token=<token>不推荐(URL 会被日志/代理记录)
Request Body{"access_token": "<token>"}仅限 POST,不通用

OAuth2PasswordBearer 源码核心逻辑:

python
# fastapi/security/oauth2.py 简化版
class OAuth2PasswordBearer(OAuth2):
    async def __call__(self, request: Request) -> str:
        authorization: str | None = request.headers.get("Authorization")
        if not authorization:
            raise HTTPException(status_code=403, detail="Not authenticated")
        scheme, _, token = authorization.partition(" ")
        if scheme.lower() != "bearer":
            raise HTTPException(status_code=401, detail="Invalid authentication scheme")
        return token

6.4 性能考量

操作耗时级别说明
HS256 编码~0.01ms纯内存 HMAC 运算
RS256 编码~0.5msRSA 签名计算
bcrypt 哈希~100-300ms故意设计为慢(防暴力破解)
JWT 解码~0.001ms仅 Base64 + JSON 解析
JWT 验证(HS256)~0.01msHMAC 校验
JWT 验证(RS256)~0.1msRSA 公钥验证

6.5 设计动机

设计选择动机
FastAPI 用 Depends() 注入认证利用依赖注入系统实现认证逻辑的可复用和可测试
OAuth2PasswordBearer 返回 token 字符串而非用户对象解耦 token 解析与用户查询,用户由 get_current_user 负责
JWT 无状态设计避免服务端存储 session,水平扩展无需 session 同步
bcrypt 而非 MD5/SHAbcrypt 内置 salt 和 work factor,抗彩虹表和 GPU 暴力破解

6.6 知识关联

认证授权
├── OAuth2 协议 ──→ 密码流程(OAuth2PasswordBearer)
│                   └── 授权码流程 + PKCE(SPA/移动端)

├── JWT (JSON Web Token)
│   ├── 结构:Header.Payload.Signature
│   ├── HS256:对称签名(单体应用)
│   └── RS256:非对称签名(微服务/SSO)

├── 密码安全
│   ├── bcrypt:慢哈希 + 内置 salt
│   ├── argon2:更现代的替代方案
│   └── 永远不存储明文密码

├── 权限模型
│   ├── RBAC:基于角色(Role-Based)
│   ├── ABAC:基于属性(Attribute-Based)
│   └── FastAPI 实现:callable class + Depends()

└── Token 生命周期
    ├── access_token:短期(15-30min)
    ├── refresh_token:长期(7-30天)
    └── Token 吊销:黑名单 / 短有效期 + 频繁刷新

总结

知识点说明
JWTToken 认证
OAuth2PasswordBearerOAuth2 流程
密码哈希bcrypt
角色权限RBAC
依赖注入认证依赖