08-认证授权
Python 3.11+
本章讲解 Flask 用户认证和权限管理。
第一部分:Flask-Login
1.1 实际场景
用户登录后需要记住登录状态,访问个人中心时需要检查是否已登录。
问题:如何实现用户登录和会话管理?
1.2 安装
bash
pip install flask-login flask-bcrypt1.3 配置
python
from flask import Flask
from flask_login import LoginManager
app: Flask = Flask(__name__)
app.config["SECRET_KEY"] = "secret-key"
login_manager: LoginManager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"1.4 用户模型
python
from flask_login import UserMixin
from flask_sqlalchemy import SQLAlchemy
db: SQLAlchemy = SQLAlchemy()
class User(UserMixin, db.Model):
id: int = db.Column(db.Integer, primary_key=True)
username: str = db.Column(db.String(80), unique=True)
email: str = db.Column(db.String(120), unique=True)
password_hash: str = db.Column(db.String(200))
is_active: bool = db.Column(db.Boolean, default=True)
def set_password(self, password: str) -> None:
from flask_bcrypt import Bcrypt
self.password_hash = Bcrypt().generate_password_hash(password).decode()
def check_password(self, password: str) -> bool:
from flask_bcrypt import Bcrypt
return Bcrypt().check_password_hash(self.password_hash, password)1.5 加载用户
python
@login_manager.user_loader
def load_user(user_id: str) -> User | None:
return User.query.get(int(user_id))第二部分:登录流程
2.1 实际场景
用户在登录页面输入用户名密码,验证成功后跳转到首页。
问题:如何实现登录和登出功能?
2.2 登录视图
python
from flask_login import login_user, logout_user, login_required, current_user
from flask import request, redirect, url_for, flash, render_template
@app.route("/login", methods=["GET", "POST"])
def login() -> str:
if request.method == "POST":
username: str = request.form.get("username", "")
password: str = request.form.get("password", "")
user: User | None = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
return redirect(url_for("dashboard"))
else:
flash("用户名或密码错误")
return render_template("auth/login.html")
@app.route("/logout")
@login_required
def logout() -> str:
logout_user()
return redirect(url_for("login"))2.3 受保护路由
python
@app.route("/dashboard")
@login_required
def dashboard() -> str:
return f"欢迎, {current_user.username}!"第三部分:JWT 认证
3.1 实际场景
移动端应用需要 API 认证,不适合使用 session,需要使用 Token 认证。
问题:如何实现 JWT Token 认证?
3.2 安装
bash
pip install flask-jwt-extended3.3 配置
python
from flask_jwt_extended import JWTManager
app.config["JWT_SECRET_KEY"] = "jwt-secret"
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = 3600
jwt: JWTManager = JWTManager(app)3.4 使用
python
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
from typing import Any
@app.route("/login", methods=["POST"])
def login() -> dict[str, str] | tuple[dict[str, str], int]:
data: dict[str, Any] = request.get_json()
user: User | None = User.query.filter_by(username=data["username"]).first()
if user and user.check_password(data["password"]):
token: str = create_access_token(identity=user.id)
return {"token": token}
return {"error": "Invalid credentials"}, 401
@app.route("/protected")
@jwt_required()
def protected() -> dict[str, str]:
user_id: int = get_jwt_identity()
user: User | None = User.query.get(user_id)
return {"user": user.username if user else "Unknown"}第四部分:权限管理
4.1 实际场景
管理员可以访问后台管理页面,普通用户没有权限。
问题:如何实现基于角色的权限控制?
4.2 角色定义
python
class Role(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
name: str = db.Column(db.String(50), unique=True)
users = db.relationship("User", backref="role", lazy="dynamic")
class User(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
username: str = db.Column(db.String(80))
role_id: int = db.Column(db.Integer, db.ForeignKey("role.id"))4.3 权限装饰器
python
from functools import wraps
from flask import abort
from flask_login import current_user
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
abort(401)
if current_user.role.name != "admin":
abort(403)
return f(*args, **kwargs)
return decorated_function
@app.route("/admin")
@admin_required
def admin_panel() -> str:
return "Admin Panel"第五部分:完整示例
python
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_bcrypt import Bcrypt
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from typing import Any
app: Flask = Flask(__name__)
app.config["SECRET_KEY"] = "secret"
app.config["JWT_SECRET_KEY"] = "jwt-secret"
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///app.db"
db: SQLAlchemy = SQLAlchemy(app)
bcrypt: Bcrypt = Bcrypt(app)
login_manager: LoginManager = LoginManager(app)
jwt: JWTManager = JWTManager(app)
# 模型
class User(UserMixin, db.Model):
id: int = db.Column(db.Integer, primary_key=True)
username: str = db.Column(db.String(80), unique=True)
email: str = db.Column(db.String(120), unique=True)
password_hash: str = db.Column(db.String(200))
role: str = db.Column(db.String(20), default="user")
def set_password(self, password: str) -> None:
self.password_hash = bcrypt.generate_password_hash(password).decode()
def check_password(self, password: str) -> bool:
return bcrypt.check_password_hash(self.password_hash, password)
@login_manager.user_loader
def load_user(user_id: str) -> User | None:
return User.query.get(int(user_id))
# 路由
@app.route("/register", methods=["POST"])
def register() -> tuple[dict[str, str], int]:
data: dict[str, Any] = request.get_json()
user: User = User(username=data["username"], email=data["email"])
user.set_password(data["password"])
db.session.add(user)
db.session.commit()
return {"message": "Created"}, 201
@app.route("/login", methods=["POST"])
def login() -> dict[str, str] | tuple[dict[str, str], int]:
data: dict[str, Any] = request.get_json()
user: User | None = User.query.filter_by(username=data["username"]).first()
if user and user.check_password(data["password"]):
token: str = create_access_token(identity=user.id)
return {"token": token}
return {"error": "Invalid"}, 401
@app.route("/profile")
@jwt_required()
def profile() -> dict[str, str]:
user_id: int = get_jwt_identity()
user: User | None = User.query.get(user_id)
return {"username": user.username, "email": user.email} if user else {"error": "Not found"}
if __name__ == "__main__":
with app.app_context():
db.create_all()
app.run(debug=True)第六部分:L3 专家层
6.1 JWT 的签名与验证流程(Header.Payload.Signature)
JWT(JSON Web Token)由三部分组成,每部分使用 Base64Url 编码,以 . 分隔。其安全性依赖于 HMAC 或 RSA 签名。
JWT 结构:
Header.Payload.Signature
+------------------+------------------+------------------+
| Header | Payload | Signature |
| (算法+类型) | (声明+数据) | (HMAC/RSA 签名) |
+------------------+------------------+------------------+
| Base64Url | Base64Url | Base64Url |
+------------------+------------------+------------------+签名生成流程:
+-------------+ Base64Url +------------------+
| Header | --------------> | encodedHeader |
| {"alg":"HS256",| +--------+---------+
| "typ":"JWT"} | |
+-------------+ |
v
+------------------+
| encodedHeader + |
| "." + |
| encodedPayload |
+--------+---------+
|
v
+-------------+ +------------------+
| SECRET_KEY |-------------->| HMAC-SHA256 |
+-------------+ | (签名计算) |
+--------+---------+
|
+-------------+ |
| Payload | Base64Url |
| {"sub":"1", | --------------> |
| "exp":...} | |
+-------------+ v
+------------------+
| Signature |
+------------------+python
import hmac
import hashlib
import base64
import json
import time
from typing import Any
def base64url_encode(data: bytes) -> str:
"""Base64Url 编码(无填充)"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
def base64url_decode(s: str) -> bytes:
"""Base64Url 解码"""
s_padded: str = s + "=" * (4 - len(s) % 4)
return base64.urlsafe_b64decode(s_padded)
def create_jwt(payload: dict[str, Any], secret: str, algorithm: str = "HS256") -> str:
"""创建 JWT Token"""
# Header
header: dict[str, str] = {"alg": algorithm, "typ": "JWT"}
encoded_header: str = base64url_encode(json.dumps(header, separators=(",", ":")).encode())
# Payload
encoded_payload: str = base64url_encode(json.dumps(payload, separators=(",", ":")).encode())
# Signature
signing_input: str = f"{encoded_header}.{encoded_payload}"
if algorithm == "HS256":
signature: bytes = hmac.new(
secret.encode("utf-8"),
signing_input.encode("utf-8"),
hashlib.sha256
).digest()
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
encoded_signature: str = base64url_encode(signature)
return f"{encoded_header}.{encoded_payload}.{encoded_signature}"
def verify_jwt(token: str, secret: str) -> dict[str, Any] | None:
"""验证并解析 JWT Token"""
try:
header_b64, payload_b64, signature_b64 = token.split(".")
# 验证签名
signing_input: str = f"{header_b64}.{payload_b64}"
expected_sig: bytes = hmac.new(
secret.encode("utf-8"),
signing_input.encode("utf-8"),
hashlib.sha256
).digest()
actual_sig: bytes = base64url_decode(signature_b64)
if not hmac.compare_digest(expected_sig, actual_sig):
return None
# 解析 Payload
payload: dict[str, Any] = json.loads(base64url_decode(payload_b64))
# 检查过期时间
if "exp" in payload and payload["exp"] < time.time():
return None
return payload
except (ValueError, json.JSONDecodeError):
return None关键安全要点:
| 安全项 | 说明 | 风险 |
|---|---|---|
alg: none 攻击 | 攻击者将 Header 算法设为 none 绕过签名 | 必须拒绝 alg=none |
| 密钥泄露 | 服务端 SECRET_KEY 泄露 = 可伪造任意 Token | 使用强随机密钥,定期轮换 |
| Token 过期 | 未设置 exp 的 Token 永久有效 | 设置合理的过期时间 |
| 敏感信息 | Payload 仅 Base64 编码,可被任何人读取 | 不要在 Payload 存密码等敏感数据 |
6.2 Flask-Login 的 session 管理机制
Flask-Login 基于 Flask 的 session(默认使用客户端签名 cookie)实现用户状态管理,核心在于 session 中存储 user_id。
Session 管理流程:
登录请求 后续请求
| |
v v
+-------------+ +-------------+
| 验证用户凭证 | | 读取 Cookie |
+-------------+ +-------------+
| |
v v
+-------------+ +-------------+
| login_user()| | session 中 |
| 设置 session | | 提取 user_id |
| {'user_id': | +-------------+
| '123'} | |
+-------------+ v
| +-------------+
v | load_user() |
+-------------+ | (回调函数) |
| Set-Cookie | +-------------+
| (签名后的 | |
| session) | v
+-------------+ +-------------+
| current_user|
| 代理对象 |
+-------------+python
from flask import session, request, g
from typing import Any, Callable
from functools import wraps
class SimpleLoginManager:
"""Flask-Login 核心机制简化演示"""
def __init__(self) -> None:
self._user_loader: Callable | None = None
def user_loader(self, callback: Callable) -> Callable:
"""注册用户加载回调"""
self._user_loader = callback
return callback
def login_user(self, user: Any, remember: bool = False) -> bool:
"""登录用户(设置 session)"""
user_id: str = str(user.get_id())
session["user_id"] = user_id
if remember:
session.permanent = True # 设置长期有效
return True
def logout_user(self) -> None:
"""登出用户(清除 session)"""
session.pop("user_id", None)
def get_current_user(self) -> Any | None:
"""获取当前用户(每次请求调用)"""
user_id: str | None = session.get("user_id")
if user_id is None:
return None
if self._user_loader is not None:
return self._user_loader(user_id)
return None
def login_required(self, f: Callable) -> Callable:
"""登录保护装饰器"""
@wraps(f)
def decorated_function(*args: Any, **kwargs: Any) -> Any:
if self.get_current_user() is None:
from flask import redirect, url_for
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated_functionSession 安全机制:
| 机制 | 说明 | Flask 默认行为 |
|---|---|---|
| 签名(Signature) | 防止客户端篡改 session 数据 | 使用 SECRET_KEY 签名 |
| 加密(Encryption) | 隐藏 session 内容 | 默认不加密(仅签名) |
| HttpOnly | 阻止 JavaScript 读取 Cookie | Flask session Cookie 默认启用 |
| Secure | 仅 HTTPS 传输 | 需手动设置 SESSION_COOKIE_SECURE = True |
| SameSite | 防止 CSRF 攻击 | 默认 Lax |
6.3 基于角色的访问控制(RBAC)模型
RBAC(Role-Based Access Control)通过角色作为中介,将用户与权限解耦,实现灵活的权限管理。
RBAC 数据模型:
+----------+ +----------+ +----------+
| User | | Role | |Permission|
+----------+ +----------+ +----------+
| id | M:N | id | M:N | id |
| username |<----->| name |<----->| name |
| email | | 描述 | | resource |
| ... | | ... | | action |
+----------+ +----------+ +----------+
关系表: 关系表:
user_roles role_permissions
+--------+-------+ +-----------+-------------+
|user_id |role_id| |role_id |permission_id|
+--------+-------+ +-----------+-------------+python
from enum import Enum
from typing import Callable, Any
from functools import wraps
from flask import abort
class Action(Enum):
"""权限动作"""
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class PermissionChecker:
"""RBAC 权限检查器"""
def __init__(self) -> None:
# role -> set of permissions
self._role_permissions: dict[str, set[Action]] = {
"admin": {Action.READ, Action.WRITE, Action.DELETE, Action.ADMIN},
"editor": {Action.READ, Action.WRITE},
"viewer": {Action.READ},
}
def has_permission(self, role: str, required: Action) -> bool:
"""检查角色是否有指定权限"""
permissions: set[Action] = self._role_permissions.get(role, set())
return required in permissions
def require_permission(self, action: Action) -> Callable:
"""权限装饰器工厂"""
def decorator(f: Callable) -> Callable:
@wraps(f)
def wrapper(*args: Any, **kwargs: Any) -> Any:
from flask_login import current_user
if not current_user.is_authenticated:
abort(401)
if not self.has_permission(current_user.role, action):
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
# 使用示例
checker: PermissionChecker = PermissionChecker()
# @checker.require_permission(Action.DELETE)
# def delete_post(post_id: int) -> str:
# ...设计动机:
| RBAC 优势 | 说明 | 对比直接授权 |
|---|---|---|
| 可维护性 | 权限变更只需修改角色,不影响用户 | 直接授权需逐个修改用户 |
| 可扩展性 | 新增角色即可,无需改代码 | 硬编码权限难以扩展 |
| 最小权限 | 按角色分配最小必要权限 | 容易出现过度授权 |
| 审计友好 | 角色-权限关系清晰可审计 | 用户-权限关系散乱 |
6.4 知识关联
认证授权知识体系
|
+----------------+----------------+
| | |
认证层 会话层 授权层
| | |
+----+----+ +----+----+ +----+----+
| 密码哈希 | | Session | | 角色 |
| JWT | | Cookie | | 权限 |
+----+----+ +----+----+ +----+----+
| | |
v v v
+---------+ +---------+ +---------+
| bcrypt | | 签名机制 | | RBAC |
| HMAC | | HttpOnly| | 装饰器 |
| SHA256 | | SameSite| | 拦截器 |
+---------+ +---------+ +---------+
|
v
+-----+-----+
| Token 管理 |
| 过期刷新 |
+-----+-----+
|
+-----+-----+ +---------+
| 访问令牌 |----->| 刷新令牌 |
| (短期) | | (长期) |
+---------+ +---------+| 知识点 | 说明 |
|---|---|
| Flask-Login | 会话认证 |
| UserMixin | 用户模型混入 |
| bcrypt | 密码哈希 |
| JWT | Token 认证 |
| 权限装饰器 | 角色权限 |