Skip to content

08-认证授权

Python 3.11+

本章讲解 Flask 用户认证和权限管理。


第一部分:Flask-Login

1.1 实际场景

用户登录后需要记住登录状态,访问个人中心时需要检查是否已登录。

问题:如何实现用户登录和会话管理?

1.2 安装

bash
pip install flask-login flask-bcrypt

1.3 配置

python
from flask import Flask
from flask_login import LoginManager

app: Flask = Flask(__name__)
app.config["SECRET_KEY"] = "secret-key"

login_manager: LoginManager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"

1.4 用户模型

python
from flask_login import UserMixin
from flask_sqlalchemy import SQLAlchemy

db: SQLAlchemy = SQLAlchemy()

class User(UserMixin, db.Model):
    id: int = db.Column(db.Integer, primary_key=True)
    username: str = db.Column(db.String(80), unique=True)
    email: str = db.Column(db.String(120), unique=True)
    password_hash: str = db.Column(db.String(200))
    is_active: bool = db.Column(db.Boolean, default=True)
    
    def set_password(self, password: str) -> None:
        from flask_bcrypt import Bcrypt
        self.password_hash = Bcrypt().generate_password_hash(password).decode()
    
    def check_password(self, password: str) -> bool:
        from flask_bcrypt import Bcrypt
        return Bcrypt().check_password_hash(self.password_hash, password)

1.5 加载用户

python
@login_manager.user_loader
def load_user(user_id: str) -> User | None:
    return User.query.get(int(user_id))

第二部分:登录流程

2.1 实际场景

用户在登录页面输入用户名密码,验证成功后跳转到首页。

问题:如何实现登录和登出功能?

2.2 登录视图

python
from flask_login import login_user, logout_user, login_required, current_user
from flask import request, redirect, url_for, flash, render_template

@app.route("/login", methods=["GET", "POST"])
def login() -> str:
    if request.method == "POST":
        username: str = request.form.get("username", "")
        password: str = request.form.get("password", "")
        
        user: User | None = User.query.filter_by(username=username).first()
        
        if user and user.check_password(password):
            login_user(user)
            return redirect(url_for("dashboard"))
        else:
            flash("用户名或密码错误")
    
    return render_template("auth/login.html")

@app.route("/logout")
@login_required
def logout() -> str:
    logout_user()
    return redirect(url_for("login"))

2.3 受保护路由

python
@app.route("/dashboard")
@login_required
def dashboard() -> str:
    return f"欢迎, {current_user.username}!"

第三部分:JWT 认证

3.1 实际场景

移动端应用需要 API 认证,不适合使用 session,需要使用 Token 认证。

问题:如何实现 JWT Token 认证?

3.2 安装

bash
pip install flask-jwt-extended

3.3 配置

python
from flask_jwt_extended import JWTManager

app.config["JWT_SECRET_KEY"] = "jwt-secret"
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = 3600

jwt: JWTManager = JWTManager(app)

3.4 使用

python
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
from typing import Any

@app.route("/login", methods=["POST"])
def login() -> dict[str, str] | tuple[dict[str, str], int]:
    data: dict[str, Any] = request.get_json()
    user: User | None = User.query.filter_by(username=data["username"]).first()
    
    if user and user.check_password(data["password"]):
        token: str = create_access_token(identity=user.id)
        return {"token": token}
    
    return {"error": "Invalid credentials"}, 401

@app.route("/protected")
@jwt_required()
def protected() -> dict[str, str]:
    user_id: int = get_jwt_identity()
    user: User | None = User.query.get(user_id)
    return {"user": user.username if user else "Unknown"}

第四部分:权限管理

4.1 实际场景

管理员可以访问后台管理页面,普通用户没有权限。

问题:如何实现基于角色的权限控制?

4.2 角色定义

python
class Role(db.Model):
    id: int = db.Column(db.Integer, primary_key=True)
    name: str = db.Column(db.String(50), unique=True)
    users = db.relationship("User", backref="role", lazy="dynamic")

class User(db.Model):
    id: int = db.Column(db.Integer, primary_key=True)
    username: str = db.Column(db.String(80))
    role_id: int = db.Column(db.Integer, db.ForeignKey("role.id"))

4.3 权限装饰器

python
from functools import wraps
from flask import abort
from flask_login import current_user

def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            abort(401)
        if current_user.role.name != "admin":
            abort(403)
        return f(*args, **kwargs)
    return decorated_function

@app.route("/admin")
@admin_required
def admin_panel() -> str:
    return "Admin Panel"

第五部分:完整示例

python
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_bcrypt import Bcrypt
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from typing import Any

app: Flask = Flask(__name__)
app.config["SECRET_KEY"] = "secret"
app.config["JWT_SECRET_KEY"] = "jwt-secret"
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///app.db"

db: SQLAlchemy = SQLAlchemy(app)
bcrypt: Bcrypt = Bcrypt(app)
login_manager: LoginManager = LoginManager(app)
jwt: JWTManager = JWTManager(app)

# 模型
class User(UserMixin, db.Model):
    id: int = db.Column(db.Integer, primary_key=True)
    username: str = db.Column(db.String(80), unique=True)
    email: str = db.Column(db.String(120), unique=True)
    password_hash: str = db.Column(db.String(200))
    role: str = db.Column(db.String(20), default="user")
    
    def set_password(self, password: str) -> None:
        self.password_hash = bcrypt.generate_password_hash(password).decode()
    
    def check_password(self, password: str) -> bool:
        return bcrypt.check_password_hash(self.password_hash, password)

@login_manager.user_loader
def load_user(user_id: str) -> User | None:
    return User.query.get(int(user_id))

# 路由
@app.route("/register", methods=["POST"])
def register() -> tuple[dict[str, str], int]:
    data: dict[str, Any] = request.get_json()
    user: User = User(username=data["username"], email=data["email"])
    user.set_password(data["password"])
    db.session.add(user)
    db.session.commit()
    return {"message": "Created"}, 201

@app.route("/login", methods=["POST"])
def login() -> dict[str, str] | tuple[dict[str, str], int]:
    data: dict[str, Any] = request.get_json()
    user: User | None = User.query.filter_by(username=data["username"]).first()
    
    if user and user.check_password(data["password"]):
        token: str = create_access_token(identity=user.id)
        return {"token": token}
    
    return {"error": "Invalid"}, 401

@app.route("/profile")
@jwt_required()
def profile() -> dict[str, str]:
    user_id: int = get_jwt_identity()
    user: User | None = User.query.get(user_id)
    return {"username": user.username, "email": user.email} if user else {"error": "Not found"}

if __name__ == "__main__":
    with app.app_context():
        db.create_all()
    app.run(debug=True)

第六部分:L3 专家层

6.1 JWT 的签名与验证流程(Header.Payload.Signature)

JWT(JSON Web Token)由三部分组成,每部分使用 Base64Url 编码,以 . 分隔。其安全性依赖于 HMAC 或 RSA 签名。

JWT 结构:

  Header.Payload.Signature
  +------------------+------------------+------------------+
  |     Header       |     Payload      |    Signature     |
  | (算法+类型)       | (声明+数据)       | (HMAC/RSA 签名)  |
  +------------------+------------------+------------------+
  | Base64Url        | Base64Url        | Base64Url        |
  +------------------+------------------+------------------+

签名生成流程:

  +-------------+    Base64Url    +------------------+
  |   Header    | --------------> |  encodedHeader   |
  | {"alg":"HS256",|              +--------+---------+
  |  "typ":"JWT"} |                       |
  +-------------+                       |
                                        v
                               +------------------+
                               |  encodedHeader + |
                               |  "." +           |
                               |  encodedPayload  |
                               +--------+---------+
                                        |
                                        v
  +-------------+               +------------------+
  |  SECRET_KEY  |-------------->| HMAC-SHA256      |
  +-------------+               | (签名计算)        |
                                +--------+---------+
                                         |
  +-------------+                        |
  |  Payload    |    Base64Url           |
  | {"sub":"1", | -------------->       |
  |  "exp":...} |                       |
  +-------------+                       v
                               +------------------+
                               |    Signature     |
                               +------------------+
python
import hmac
import hashlib
import base64
import json
import time
from typing import Any

def base64url_encode(data: bytes) -> str:
    """Base64Url 编码(无填充)"""
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")

def base64url_decode(s: str) -> bytes:
    """Base64Url 解码"""
    s_padded: str = s + "=" * (4 - len(s) % 4)
    return base64.urlsafe_b64decode(s_padded)

def create_jwt(payload: dict[str, Any], secret: str, algorithm: str = "HS256") -> str:
    """创建 JWT Token"""
    # Header
    header: dict[str, str] = {"alg": algorithm, "typ": "JWT"}
    encoded_header: str = base64url_encode(json.dumps(header, separators=(",", ":")).encode())
    
    # Payload
    encoded_payload: str = base64url_encode(json.dumps(payload, separators=(",", ":")).encode())
    
    # Signature
    signing_input: str = f"{encoded_header}.{encoded_payload}"
    if algorithm == "HS256":
        signature: bytes = hmac.new(
            secret.encode("utf-8"),
            signing_input.encode("utf-8"),
            hashlib.sha256
        ).digest()
    else:
        raise ValueError(f"Unsupported algorithm: {algorithm}")
    
    encoded_signature: str = base64url_encode(signature)
    
    return f"{encoded_header}.{encoded_payload}.{encoded_signature}"

def verify_jwt(token: str, secret: str) -> dict[str, Any] | None:
    """验证并解析 JWT Token"""
    try:
        header_b64, payload_b64, signature_b64 = token.split(".")
        
        # 验证签名
        signing_input: str = f"{header_b64}.{payload_b64}"
        expected_sig: bytes = hmac.new(
            secret.encode("utf-8"),
            signing_input.encode("utf-8"),
            hashlib.sha256
        ).digest()
        
        actual_sig: bytes = base64url_decode(signature_b64)
        if not hmac.compare_digest(expected_sig, actual_sig):
            return None
        
        # 解析 Payload
        payload: dict[str, Any] = json.loads(base64url_decode(payload_b64))
        
        # 检查过期时间
        if "exp" in payload and payload["exp"] < time.time():
            return None
        
        return payload
    except (ValueError, json.JSONDecodeError):
        return None

关键安全要点:

安全项说明风险
alg: none 攻击攻击者将 Header 算法设为 none 绕过签名必须拒绝 alg=none
密钥泄露服务端 SECRET_KEY 泄露 = 可伪造任意 Token使用强随机密钥,定期轮换
Token 过期未设置 exp 的 Token 永久有效设置合理的过期时间
敏感信息Payload 仅 Base64 编码,可被任何人读取不要在 Payload 存密码等敏感数据

6.2 Flask-Login 的 session 管理机制

Flask-Login 基于 Flask 的 session(默认使用客户端签名 cookie)实现用户状态管理,核心在于 session 中存储 user_id

Session 管理流程:

  登录请求                           后续请求
     |                                 |
     v                                 v
  +-------------+              +-------------+
  | 验证用户凭证 |              | 读取 Cookie  |
  +-------------+              +-------------+
     |                                 |
     v                                 v
  +-------------+              +-------------+
  | login_user()|              | session 中   |
  | 设置 session |              | 提取 user_id |
  | {'user_id':  |              +-------------+
  |   '123'}     |                       |
  +-------------+                       v
     |                          +-------------+
     v                          | load_user() |
  +-------------+               | (回调函数)   |
  | Set-Cookie  |               +-------------+
  | (签名后的    |                       |
  |  session)   |                       v
  +-------------+               +-------------+
                                | current_user|
                                | 代理对象     |
                                +-------------+
python
from flask import session, request, g
from typing import Any, Callable
from functools import wraps

class SimpleLoginManager:
    """Flask-Login 核心机制简化演示"""
    
    def __init__(self) -> None:
        self._user_loader: Callable | None = None
    
    def user_loader(self, callback: Callable) -> Callable:
        """注册用户加载回调"""
        self._user_loader = callback
        return callback
    
    def login_user(self, user: Any, remember: bool = False) -> bool:
        """登录用户(设置 session)"""
        user_id: str = str(user.get_id())
        session["user_id"] = user_id
        if remember:
            session.permanent = True  # 设置长期有效
        return True
    
    def logout_user(self) -> None:
        """登出用户(清除 session)"""
        session.pop("user_id", None)
    
    def get_current_user(self) -> Any | None:
        """获取当前用户(每次请求调用)"""
        user_id: str | None = session.get("user_id")
        if user_id is None:
            return None
        if self._user_loader is not None:
            return self._user_loader(user_id)
        return None
    
    def login_required(self, f: Callable) -> Callable:
        """登录保护装饰器"""
        @wraps(f)
        def decorated_function(*args: Any, **kwargs: Any) -> Any:
            if self.get_current_user() is None:
                from flask import redirect, url_for
                return redirect(url_for("login"))
            return f(*args, **kwargs)
        return decorated_function

Session 安全机制:

机制说明Flask 默认行为
签名(Signature)防止客户端篡改 session 数据使用 SECRET_KEY 签名
加密(Encryption)隐藏 session 内容默认不加密(仅签名)
HttpOnly阻止 JavaScript 读取 CookieFlask session Cookie 默认启用
Secure仅 HTTPS 传输需手动设置 SESSION_COOKIE_SECURE = True
SameSite防止 CSRF 攻击默认 Lax

6.3 基于角色的访问控制(RBAC)模型

RBAC(Role-Based Access Control)通过角色作为中介,将用户与权限解耦,实现灵活的权限管理。

RBAC 数据模型:

  +----------+       +----------+       +----------+
  |   User   |       |   Role   |       |Permission|
  +----------+       +----------+       +----------+
  | id       |  M:N  | id       |  M:N  | id       |
  | username |<----->| name     |<----->| name     |
  | email    |       | 描述      |       | resource |
  | ...      |       | ...      |       | action   |
  +----------+       +----------+       +----------+
  
  关系表:                    关系表:
  user_roles                role_permissions
  +--------+-------+        +-----------+-------------+
  |user_id |role_id|        |role_id    |permission_id|
  +--------+-------+        +-----------+-------------+
python
from enum import Enum
from typing import Callable, Any
from functools import wraps
from flask import abort

class Action(Enum):
    """权限动作"""
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

class PermissionChecker:
    """RBAC 权限检查器"""
    
    def __init__(self) -> None:
        # role -> set of permissions
        self._role_permissions: dict[str, set[Action]] = {
            "admin": {Action.READ, Action.WRITE, Action.DELETE, Action.ADMIN},
            "editor": {Action.READ, Action.WRITE},
            "viewer": {Action.READ},
        }
    
    def has_permission(self, role: str, required: Action) -> bool:
        """检查角色是否有指定权限"""
        permissions: set[Action] = self._role_permissions.get(role, set())
        return required in permissions
    
    def require_permission(self, action: Action) -> Callable:
        """权限装饰器工厂"""
        def decorator(f: Callable) -> Callable:
            @wraps(f)
            def wrapper(*args: Any, **kwargs: Any) -> Any:
                from flask_login import current_user
                if not current_user.is_authenticated:
                    abort(401)
                if not self.has_permission(current_user.role, action):
                    abort(403)
                return f(*args, **kwargs)
            return wrapper
        return decorator

# 使用示例
checker: PermissionChecker = PermissionChecker()

# @checker.require_permission(Action.DELETE)
# def delete_post(post_id: int) -> str:
#     ...

设计动机:

RBAC 优势说明对比直接授权
可维护性权限变更只需修改角色,不影响用户直接授权需逐个修改用户
可扩展性新增角色即可,无需改代码硬编码权限难以扩展
最小权限按角色分配最小必要权限容易出现过度授权
审计友好角色-权限关系清晰可审计用户-权限关系散乱

6.4 知识关联

                    认证授权知识体系
                         |
        +----------------+----------------+
        |                |                |
     认证层          会话层          授权层
        |                |                |
   +----+----+      +----+----+      +----+----+
   | 密码哈希 |      | Session  |      |  角色   |
   | JWT     |      | Cookie   |      |  权限   |
   +----+----+      +----+----+      +----+----+
        |                |                |
        v                v                v
   +---------+      +---------+      +---------+
   | bcrypt  |      | 签名机制 |      | RBAC   |
   | HMAC    |      | HttpOnly|      | 装饰器  |
   | SHA256  |      | SameSite|      | 拦截器  |
   +---------+      +---------+      +---------+
                         |
                         v
                   +-----+-----+
                   | Token 管理 |
                   | 过期刷新   |
                   +-----+-----+
                         |
                   +-----+-----+      +---------+
                   | 访问令牌  |----->| 刷新令牌 |
                   | (短期)   |      | (长期)   |
                   +---------+      +---------+
知识点说明
Flask-Login会话认证
UserMixin用户模型混入
bcrypt密码哈希
JWTToken 认证
权限装饰器角色权限