Skip to content

06-文件上传下载

Python 3.11+

本章讲解 Flask 文件上传、下载和安全管理。


第一部分:基础文件上传

1.1 实际场景

用户需要在网站上上传头像图片,上传后显示上传成功的消息。

问题:如何实现最简单的文件上传功能?

1.2 简单上传

python
import os
from flask import Flask, request, render_template_string

app: Flask = Flask(__name__)
app.config["UPLOAD_FOLDER"] = "uploads"
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024  # 16MB

os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)

HTML: str = """
<!DOCTYPE html>
<html>
<body>
    <h1>文件上传</h1>
    <form method="post" enctype="multipart/form-data">
        <input type="file" name="file">
        <button type="submit">上传</button>
    </form>
    {% if filename %}
    <p>上传成功: {{ filename }}</p>
    {% endif %}
</body>
</html>
"""

@app.route("/", methods=["GET", "POST"])
def upload_file() -> str:
    if request.method == "POST":
        file = request.files.get("file")
        if file:
            filename: str = file.filename
            file.save(os.path.join(app.config["UPLOAD_FOLDER"], filename))
            return render_template_string(HTML, filename=filename)
    
    return render_template_string(HTML)

第二部分:安全上传

2.1 实际场景

用户上传文件时,可能上传恶意文件名如 ../../../etc/passwd,或上传病毒文件。

问题:如何安全地处理文件上传?

2.2 安全配置

python
import os
from werkzeug.utils import secure_filename
from flask import Flask, request

app: Flask = Flask(__name__)

# 允许的文件扩展名
ALLOWED_EXTENSIONS: set[str] = {"txt", "pdf", "png", "jpg", "jpeg", "gif", "doc", "docx"}

def allowed_file(filename: str) -> bool:
    return "." in filename and \
           filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route("/upload", methods=["GET", "POST"])
def upload_file() -> dict[str, str] | tuple[dict[str, str], int]:
    if request.method == "POST":
        file = request.files.get("file")
        
        if not file:
            return {"error": "没有选择文件"}, 400
        
        if not allowed_file(file.filename):
            return {"error": "不支持的文件类型"}, 400
        
        # 安全文件名
        filename: str = secure_filename(file.filename)
        
        # 防止文件名冲突
        from datetime import datetime
        timestamp: str = datetime.now().strftime("%Y%m%d%H%M%S_")
        filename = timestamp + filename
        
        # 保存文件
        filepath: str = os.path.join(app.config["UPLOAD_FOLDER"], filename)
        file.save(filepath)
        
        return {"message": "上传成功", "filename": filename}
    
    return {"message": "POST 文件到此处"}

2.3 文件大小限制

python
from flask import Flask, request

app: Flask = Flask(__name__)

# 全局大小限制
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024  # 16MB

@app.route("/upload", methods=["POST"])
def upload() -> dict[str, str] | tuple[dict[str, str], int]:
    # 获取文件
    file = request.files.get("file")
    
    # 检查文件大小
    file.seek(0, os.SEEK_END)
    size: int = file.tell()
    file.seek(0)
    
    if size > 10 * 1024 * 1024:  # 10MB
        return {"error": "文件太大"}, 400
    
    # 保存文件
    file.save(f"uploads/{file.filename}")
    return {"message": "上传成功"}

第三部分:多文件上传

3.1 实际场景

用户需要批量上传多张照片,一次选择多个文件。

问题:如何实现多文件上传?

3.2 批量上传

python
@app.route("/upload/multiple", methods=["POST"])
def upload_multiple() -> dict[str, list[str] | int]:
    files = request.files.getlist("files")
    
    uploaded: list[str] = []
    errors: list[str] = []
    
    for file in files:
        if file and allowed_file(file.filename):
            filename: str = secure_filename(file.filename)
            file.save(os.path.join(app.config["UPLOAD_FOLDER"], filename))
            uploaded.append(filename)
        else:
            errors.append(file.filename if file else "unknown")
    
    return {
        "uploaded": uploaded,
        "errors": errors,
        "total": len(uploaded)
    }

3.3 前端多文件

html
<form method="post" enctype="multipart/form-data">
    <input type="file" name="files" multiple>
    <button type="submit">上传多个文件</button>
</form>

第四部分:文件下载

4.1 实际场景

用户上传的文件需要提供下载链接,或者动态生成 CSV 文件供用户下载。

问题:如何实现文件下载功能?

4.1 直接下载

python
from flask import send_from_directory

@app.route("/download/<filename>")
def download_file(filename: str) -> str:
    return send_from_directory(
        app.config["UPLOAD_FOLDER"], 
        filename,
        as_attachment=True
    )

4.2 动态生成文件

python
import io
from flask import Response

@app.route("/export/csv")
def export_csv() -> Response:
    # 生成 CSV 数据
    output: io.StringIO = io.StringIO()
    output.write("Name,Email\n")
    output.write("John,john@example.com\n")
    output.write("Jane,jane@example.com\n")
    
    # 返回文件
    return Response(
        output.getvalue(),
        mimetype="text/csv",
        headers={"Content-Disposition": "attachment;filename=users.csv"}
    )

第五部分:完整上传管理类

5.1 实际场景

文件上传逻辑在多个地方使用,需要封装成一个可复用的类。

问题:如何封装文件上传管理功能?

5.2 上传管理器

python
import os
import hashlib
from datetime import datetime
from werkzeug.utils import secure_filename
from typing import Any

class FileUploader:
    def __init__(self, upload_folder: str, allowed_extensions: set[str] | None = None) -> None:
        self.upload_folder: str = upload_folder
        self.allowed_extensions: set[str] = allowed_extensions or {"txt", "pdf", "png", "jpg", "jpeg", "gif"}
        os.makedirs(upload_folder, exist_ok=True)
    
    def save(self, file: Any, custom_name: str | None = None) -> dict[str, str | int]:
        """保存文件"""
        if not file:
            raise ValueError("No file provided")
        
        # 验证文件类型
        filename: str = secure_filename(file.filename)
        ext: str = filename.rsplit(".", 1)[1].lower() if "." in filename else ""
        
        if ext not in self.allowed_extensions:
            raise ValueError(f"File type not allowed: {ext}")
        
        # 生成唯一文件名
        if custom_name:
            filename = f"{custom_name}_{datetime.now().strftime('%Y%m%d%H%M%S')}.{ext}"
        else:
            # 使用哈希避免文件名冲突
            file_hash: str = hashlib.md5(file.read()).hexdigest()[:8]
            file.seek(0)
            filename = f"{file_hash}_{datetime.now().strftime('%Y%m%d%H%M%S')}.{ext}"
        
        # 保存文件
        filepath: str = os.path.join(self.upload_folder, filename)
        file.save(filepath)
        
        return {
            "filename": filename,
            "filepath": filepath,
            "size": os.path.getsize(filepath)
        }
    
    def delete(self, filename: str) -> bool:
        """删除文件"""
        filepath: str = os.path.join(self.upload_folder, filename)
        if os.path.exists(filepath):
            os.remove(filepath)
            return True
        return False
    
    def list_files(self) -> list[dict[str, str | int | datetime]]:
        """列出所有文件"""
        files: list[dict[str, str | int | datetime]] = []
        for filename in os.listdir(self.upload_folder):
            filepath: str = os.path.join(self.upload_folder, filename)
            if os.path.isfile(filepath):
                files.append({
                    "filename": filename,
                    "size": os.path.getsize(filepath),
                    "modified": datetime.fromtimestamp(os.path.getmtime(filepath))
                })
        return files

# 使用
uploader: FileUploader = FileUploader("uploads", {"png", "jpg", "gif"})

第六部分:完整示例

python
from flask import Flask, request, jsonify, render_template, send_from_directory
import os

app: Flask = Flask(__name__)
app.config["UPLOAD_FOLDER"] = "uploads"
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024

os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)

uploader: FileUploader = FileUploader(
    app.config["UPLOAD_FOLDER"],
    {"png", "jpg", "jpeg", "gif", "pdf", "doc", "docx"}
)

@app.route("/")
def index() -> str:
    files: list[dict[str, Any]] = uploader.list_files()
    return render_template("files.html", files=files)

@app.route("/upload", methods=["POST"])
def upload() -> dict[str, Any] | tuple[dict[str, str], int]:
    try:
        file = request.files.get("file")
        if not file:
            return jsonify({"error": "No file provided"}), 400
        
        result: dict[str, Any] = uploader.save(file)
        return jsonify({
            "success": True,
            "filename": result["filename"],
            "size": result["size"]
        })
    except ValueError as e:
        return jsonify({"error": str(e)}), 400

@app.route("/download/<filename>")
def download(filename: str) -> str:
    return send_from_directory(
        app.config["UPLOAD_FOLDER"],
        filename,
        as_attachment=True
    )

@app.route("/delete/<filename>", methods=["POST"])
def delete(filename: str) -> dict[str, bool] | tuple[dict[str, str], int]:
    if uploader.delete(filename):
        return jsonify({"success": True})
    return jsonify({"error": "File not found"}), 404

第七部分:L3 专家层

7.1 MIME 类型检测原理(magic bytes vs 扩展名)

仅依赖文件扩展名判断文件类型是不安全的,攻击者可以将恶意脚本重命名为 .jpg 上传。可靠的 MIME 类型检测需要读取文件头部的"魔数"(magic bytes)。

常见文件格式的魔数:

  文件头(前 N 字节)
  +------------------+
  | FF D8 FF E0 ...  |  → JPEG  (魔数: FF D8 FF)
  +------------------+
  | 89 50 4E 47 ...  |  → PNG   (魔数: 89 50 4E 47 = \x89PNG)
  +------------------+
  | 25 50 44 46 ...  |  → PDF   (魔数: 25 50 44 46 = %PDF)
  +------------------+
  | 50 4B 03 04 ...  |  → ZIP   (魔数: 50 4B = PK)
  +------------------+
  | 47 49 46 38 ...  |  → GIF   (魔数: 47 49 46 = GIF)
  +------------------+
python
import struct
from enum import Enum
from typing import BinaryIO

class FileType(Enum):
    """支持的文件类型及魔数定义"""
    JPEG = (b"\xFF\xD8\xFF", "image/jpeg")
    PNG  = (b"\x89PNG\r\n\x1a\n", "image/png")
    GIF  = (b"GIF8", "image/gif")
    PDF  = (b"%PDF-", "application/pdf")
    ZIP  = (b"PK\x03\x04", "application/zip")

def detect_mime_type(file_stream: BinaryIO, num_bytes: int = 32) -> tuple[str, str]:
    """通过魔数检测 MIME 类型
    
    Returns:
        (detected_type, detected_mime) 或 ("unknown", "application/octet-stream")
    """
    file_stream.seek(0)
    header: bytes = file_stream.read(num_bytes)
    file_stream.seek(0)  # 重置文件指针
    
    for file_type in FileType:
        magic: bytes = file_type.value[0]
        if header.startswith(magic):
            return file_type.name.lower(), file_type.value[1]
    
    return "unknown", "application/octet-stream"

def validate_file_safety(file_stream: BinaryIO, allowed_types: set[str]) -> bool:
    """综合验证:扩展名 + 魔数双重检查"""
    # 假设从 filename 获取扩展名
    # ext_type: str = get_type_from_extension(filename)
    # magic_type: str = detect_mime_type(file_stream)[0]
    # return ext_type == magic_type and magic_type in allowed_types
    detected: str = detect_mime_type(file_stream)[0]
    return detected in allowed_types

两种检测方式对比:

检测方式原理准确性性能可绕过
扩展名检查文件名后缀匹配O(1)极易(重命名即可)
魔数检测读取文件头字节O(1)(固定读取)困难(需构造合法文件头)
完整解析解析整个文件结构最高O(file_size)极难

7.2 流式下载(send_file 底层)

Flask 的 send_filesend_from_directory 底层使用 WSGI 的 file_wrapper 实现流式传输,避免将整个文件加载到内存。

send_file 核心流程:

  请求文件下载
       |
       v
  +-------------+
  | 路径安全检查 |  ← 防止路径遍历
  +-------------+
       |
       v
  +-------------+
  | 获取文件信息 |  ← os.stat() 获取 size, mtime
  +-------------+
       |
       v
  +-------------+     是     +-------------+
  | Range 请求? |--------->| 部分响应 206 |
  +-------------+          +-------------+
       | 否
       v
  +-------------+
  | 检测 MIME   |  ← mimetypes.guess_type()
  +-------------+
       |
       v
  +-------------+
  | 流式响应    |  ← Response(app.wsgi_app.file_wrapper)
  +-------------+
python
import os
import mimetypes
from typing import Generator
from werkzeug.utils import safe_join
from werkzeug.wrappers import Response

def stream_file_streaming(filepath: str, chunk_size: int = 8192) -> Generator[bytes, None, None]:
    """流式读取文件(生成器模式)"""
    with open(filepath, "rb") as f:
        while True:
            chunk: bytes = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

def safe_send_file(directory: str, filename: str) -> Response:
    """简化版 send_from_directory"""
    # 路径安全:防止 ../../etc/passwd
    safe_path: str | None = safe_join(directory, filename)
    if safe_path is None or not os.path.isfile(safe_path):
        from flask import abort
        abort(404)
    
    # MIME 检测
    mime_type: str | None = mimetypes.guess_type(safe_path)[0]
    
    # 获取文件大小
    file_size: int = os.path.getsize(safe_path)
    
    # 构建流式响应
    response: Response = Response(
        stream_file_streaming(safe_path),
        mimetype=mime_type or "application/octet-stream",
        direct_passthrough=True  # 绕过 Werkzeug 的缓冲包装
    )
    response.headers["Content-Length"] = str(file_size)
    response.headers["Content-Disposition"] = f"attachment; filename={filename}"
    
    return response

性能考量:

传输方式内存占用适用文件大小TTFB(首字节时间)
全量读入内存O(file_size)< 10MB高(需完全读取)
流式传输O(chunk_size)任意大小低(立即开始)
X-SendfileO(0)(内核态)任意大小最低(Nginx/Apache 处理)

7.3 路径遍历攻击的原理与防御

路径遍历(Path Traversal)攻击通过 ../ 序列跳出预期目录,访问服务器任意文件。

攻击向量:

  预期路径: uploads/avatar.jpg
  
  攻击输入:
    ../../../etc/passwd          → Linux 系统文件
    ..\..\..\windows\system32\config\sam  → Windows 系统文件
    ....//....//....//etc/passwd  → 绕过简单过滤
    %2e%2e%2f%2e%2e%2fetc/passwd  → URL 编码绕过
  
  危险路径解析:
    uploads/../../../etc/passwd
    = /Users/project/uploads/../../../etc/passwd
    = /Users/etc/passwd  ← 跳出 uploads 目录!

多层防御策略:

python
import os
from pathlib import Path
from werkzeug.utils import safe_join

def is_safe_path(base_dir: str, target_path: str) -> bool:
    """检查目标路径是否在安全目录内"""
    # 方法 1: realpath 解析(解决符号链接问题)
    base_real: str = os.path.realpath(base_dir)
    target_real: str = os.path.realpath(os.path.join(base_dir, target_path))
    return target_real.startswith(base_real + os.sep) or target_real == base_real

def is_safe_path_modern(base_dir: str, target_path: str) -> bool:
    """Python 3.9+ 现代方式(使用 Path.resolve())"""
    base: Path = Path(base_dir).resolve()
    target: Path = (Path(base_dir) / target_path).resolve()
    return base in target.parents or target == base

def secure_download(directory: str, filename: str) -> bytes | None:
    """安全文件下载"""
    # 第一层:werkzeug 的 safe_join(处理 .. 和绝对路径)
    safe_path: str | None = safe_join(directory, filename)
    if safe_path is None:
        return None
    
    # 第二层:realpath 验证
    if not is_safe_path(directory, safe_path):
        return None
    
    # 第三层:只允许读取(不执行)
    if not os.path.isfile(safe_path):
        return None
    
    with open(safe_path, "rb") as f:
        return f.read()

设计动机:

防御层防御对象原理
secure_filename()恶意文件名(含路径分隔符)剥离路径信息,仅保留文件名
safe_join()../ 路径穿越规范化路径后检查是否越界
realpath 验证符号链接攻击解析真实物理路径再比较
白名单扩展名任意文件读取仅允许下载特定类型文件

7.4 知识关联

                    文件上传下载知识体系
                          |
         +----------------+----------------+
         |                |                |
       上传层          存储层          下载层
         |                |                |
    +----+----+      +----+----+      +----+----+
    |multipart|      | 文件系统 |      | 流式传输 |
    | 解析    |      | 安全路径 |      | Range    |
    +----+----+      +----+----+      +----+----+
         |                |                |
         v                v                v
    +---------+      +---------+      +---------+
    | 魔数检测 |      | secure  |      | X-Send |
    | 扩展名   |      |_filename|      | file   |
    +---------+      +---------+      +---------+
                          |
                          v
                    +-----+-----+
                    | 路径遍历  |
                    | 防御体系  |
                    +-----+-----+
                          |
                    +-----+-----+      +---------+
                    | safeJoin |----->|realpath |
                    | realpath |      | 验证    |
                    +----------+      +---------+
知识点说明
request.files获取上传文件
secure_filename安全文件名
allowed_extensions文件类型限制
send_from_directory文件下载
MAX_CONTENT_LENGTH文件大小限制