04-数据库集成
Python 3.11+
本章讲解 Flask-SQLAlchemy 数据库集成,包括模型定义、CRUD 操作和关系映射。
第一部分:安装和配置
1.1 实际场景
你正在开发一个博客系统,需要存储用户、文章、评论等数据。你选择了 SQLite 作为开发数据库。
问题:如何在 Flask 中配置数据库连接?
1.2 安装依赖
bash
pip install flask-sqlalchemy sqlalchemy pymysql1.3 基本配置
python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app: Flask = Flask(__name__)
# SQLite 配置
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///blog.db"
# MySQL 配置
app.config["SQLALCHEMY_DATABASE_URI"] = \
"mysql+pymysql://username:password@localhost:3306/dbname"
# PostgreSQL 配置
app.config["SQLALCHEMY_DATABASE_URI"] = \
"postgresql://username:password@localhost:5432/dbname"
# 其他配置
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False # 关闭修改追踪
app.config["SQLALCHEMY_ECHO"] = True # 打印 SQL 语句(调试用)
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_size": 10,
"pool_recycle": 3600,
"pool_pre_ping": True
}
db: SQLAlchemy = SQLAlchemy(app)第二部分:数据模型
2.1 实际场景
博客系统需要用户表存储用户信息,每个用户有 ID、用户名、邮箱、创建时间等字段。
问题:如何用 SQLAlchemy 定义数据库表结构?
2.2 定义模型
python
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
db: SQLAlchemy = SQLAlchemy()
class User(db.Model):
# 表名(可选,默认使用类名小写)
__tablename__ = "users"
# 字段定义
id: int = db.Column(db.Integer, primary_key=True)
username: str = db.Column(db.String(80), unique=True, nullable=False)
email: str = db.Column(db.String(120), unique=True, nullable=False)
password_hash: str = db.Column(db.String(200), nullable=False)
created_at: datetime = db.Column(db.DateTime, default=datetime.utcnow)
updated_at: datetime = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 字符串表示
def __repr__(self) -> str:
return f"<User {self.username}>"
# 字典表示
def to_dict(self) -> dict[str, str | int | None]:
return {
"id": self.id,
"username": self.username,
"email": self.email,
"created_at": self.created_at.isoformat() if self.created_at else None
}2.3 字段类型
| SQLAlchemy 类型 | Python 类型 | 说明 |
|---|---|---|
| Integer | int | 整数 |
| BigInteger | int | 大整数 |
| Float | float | 浮点数 |
| String | str | 字符串 |
| Text | str | 长文本 |
| Boolean | bool | 布尔值 |
| DateTime | datetime | 日期时间 |
| Date | date | 日期 |
| Time | time | 时间 |
| LargeBinary | bytes | 二进制数据 |
| JSON | dict | JSON 数据 |
2.4 字段参数
python
class User(db.Model):
# 主键
id: int = db.Column(db.Integer, primary_key=True)
# 索引
username: str = db.Column(db.String(80), index=True)
email: str = db.Column(db.String(120), unique=True, index=True)
# 约束
name: str = db.Column(db.String(100), nullable=False) # 非空
code: str = db.Column(db.String(10), unique=True) # 唯一
status: str = db.Column(db.String(20), default="active") # 默认值第三部分:关系映射
3.1 实际场景
一个作者可以写多本书,一本书属于一个作者。博客文章可以有多个标签,一个标签可以对应多篇文章。
问题:如何在 SQLAlchemy 中定义表之间的关系?
3.2 一对多关系
python
class Author(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
name: str = db.Column(db.String(100), nullable=False)
# 关系定义
books: list = db.relationship("Book", back_populates="author", lazy="dynamic")
class Book(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
title: str = db.Column(db.String(200), nullable=False)
author_id: int = db.Column(db.Integer, db.ForeignKey("author.id"), nullable=False)
# 反向引用
author: Author = db.relationship("Author", back_populates="books")
# 使用
author: Author = Author.query.first()
for book in author.books:
print(book.title)
book: Book = Book.query.first()
print(book.author.name)3.3 多对多关系
python
# 关联表
association_table = db.Table(
"post_tags",
db.Column("post_id", db.Integer, db.ForeignKey("post.id")),
db.Column("tag_id", db.Integer, db.ForeignKey("tag.id"))
)
class Post(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
title: str = db.Column(db.String(200))
tags: list = db.relationship("Tag", secondary=association_table, back_populates="posts")
class Tag(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
name: str = db.Column(db.String(50))
posts: list = db.relationship("Post", secondary=association_table, back_populates="tags")
# 使用
post: Post = Post.query.first()
for tag in post.tags:
print(tag.name)
tag: Tag = Tag.query.first()
for post in tag.posts:
print(post.title)3.4 一对一关系
python
class User(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
username: str = db.Column(db.String(80))
profile: UserProfile = db.relationship("UserProfile", back_populates="user", uselist=False)
class UserProfile(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
user_id: int = db.Column(db.Integer, db.ForeignKey("user.id"), unique=True)
bio: str = db.Column(db.Text)
user: User = db.relationship("User", back_populates="profile")第四部分:CRUD 操作
4.1 实际场景
用户注册时需要创建用户记录,用户登录时需要查询用户,用户修改信息时需要更新,用户注销时需要删除。
问题:如何在 Flask 中执行数据库的增删改查操作?
4.2 创建数据
python
# 创建单条记录
user: User = User(username="john", email="john@example.com")
db.session.add(user)
db.session.commit()
# 创建并返回
user: User = User(username="john", email="john@example.com")
db.session.add(user)
db.session.flush() # 获取 ID 但不提交
print(user.id)
# 批量创建
users: list[User] = [
User(username="user1", email="user1@example.com"),
User(username="user2", email="user2@example.com"),
User(username="user3", email="user3@example.com"),
]
db.session.add_all(users)
db.session.commit()4.3 读取数据
python
# 根据主键查询
user: User | None = User.query.get(1)
# 根据条件查询
user: User | None = User.query.filter_by(username="john").first()
users: list[User] = User.query.filter(User.email.like("%@example.com")).all()
# 排序
users: list[User] = User.query.order_by(User.created_at.desc()).all()
# 分页
page = User.query.paginate(page=1, per_page=20)
users: list[User] = page.items
total: int = page.total
# 计数
count: int = User.query.count()4.4 更新数据
python
# 更新单条记录
user: User | None = User.query.get(1)
if user:
user.email = "newemail@example.com"
db.session.commit()
# 批量更新
User.query.filter_by(is_active=False).update({"status": "inactive"})
db.session.commit()4.5 删除数据
python
# 删除单条记录
user: User | None = User.query.get(1)
if user:
db.session.delete(user)
db.session.commit()
# 批量删除
User.query.filter_by(is_active=False).delete()
db.session.commit()第五部分:查询进阶
5.1 实际场景
你需要查询 2024 年以后注册的活跃用户,或者查询用户名包含"admin"或邮箱是"admin@example.com"的用户。
问题:如何构建复杂的查询条件?
5.2 复杂查询
python
from sqlalchemy import and_, or_, not_
from datetime import datetime
# AND 条件
users: list[User] = User.query.filter(
and_(
User.is_active == True,
User.created_at >= datetime(2024, 1, 1)
)
).all()
# OR 条件
users: list[User] = User.query.filter(
or_(
User.username == "admin",
User.email == "admin@example.com"
)
).all()
# NOT 条件
users: list[User] = User.query.filter(
not_(User.is_active == True)
).all()5.3 联表查询
python
# 联表查询
results: list = db.session.query(User, Book).join(
Book, User.id == Book.author_id
).all()
for user, book in results:
print(f"{user.username} - {book.title}")5.4 聚合查询
python
from sqlalchemy import func
# 计数
count: int = db.session.query(func.count(User.id)).scalar()
# 求和
total: float = db.session.query(func.sum(Order.amount)).scalar()
# 平均值
avg_price: float = db.session.query(func.avg(Product.price)).scalar()
# 分组统计
results: list = db.session.query(
User.status,
func.count(User.id)
).group_by(User.status).all()
for status, count in results:
print(f"{status}: {count}")第六部分:迁移管理
6.1 实际场景
你修改了 User 模型,添加了 phone 字段,需要更新数据库表结构。
问题:如何管理数据库表结构的变更?
6.2 使用 Flask-Migrate
bash
pip install flask-migrate
# 初始化
flask db init
# 创建迁移
flask db migrate -m "add users table"
# 执行迁移
flask db upgrade
# 回滚
flask db downgrade6.3 配置
python
from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
app: Flask = Flask(__name__)
db: SQLAlchemy = SQLAlchemy(app)
migrate: Migrate = Migrate()
def create_app() -> Flask:
app = Flask(__name__)
db.init_app(app)
migrate.init_app(app, db)
return app第七部分:完整示例
python
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
from typing import Any
app: Flask = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///blog.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db: SQLAlchemy = SQLAlchemy(app)
class User(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
username: str = db.Column(db.String(80), unique=True, nullable=False)
email: str = db.Column(db.String(120), unique=True, nullable=False)
created_at: datetime = db.Column(db.DateTime, default=datetime.utcnow)
posts: list = db.relationship("Post", back_populates="author", lazy="dynamic")
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"username": self.username,
"email": self.email,
"created_at": self.created_at.isoformat()
}
class Post(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
title: str = db.Column(db.String(200), nullable=False)
content: str = db.Column(db.Text, nullable=False)
published: bool = db.Column(db.Boolean, default=False)
created_at: datetime = db.Column(db.DateTime, default=datetime.utcnow)
author_id: int = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False)
author: User = db.relationship("User", back_populates="posts")
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"title": self.title,
"content": self.content,
"published": self.published,
"author": self.author.username,
"created_at": self.created_at.isoformat()
}
@app.route("/users", methods=["GET"])
def get_users() -> list[dict[str, Any]]:
users: list[User] = User.query.all()
return jsonify([u.to_dict() for u in users])
@app.route("/users", methods=["POST"])
def create_user() -> tuple[dict[str, Any], int]:
data: dict[str, Any] = request.get_json()
user: User = User(username=data["username"], email=data["email"])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
with app.app_context():
db.create_all()
if __name__ == "__main__":
app.run(debug=True)第七部分:L3 专家层 — 底层原理
7.1 SQLAlchemy 的 Unit of Work 模式
Unit of Work(工作单元)是 SQLAlchemy 核心设计模式,跟踪所有对象变更并在 commit() 时批量同步到数据库。
┌─────────────────────────────────────────────────────────────────┐
│ Unit of Work 生命周期 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ db.session.add(user) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Identity Map(身份映射) │ │
│ │ 记录每个已加载对象的 (类, 主键) → 对象引用 │ │
│ │ 作用:同一主键在同一 session 内只返回一个对象实例 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 状态追踪(状态机) │ │
│ │ │ │
│ │ transient ──add()──→ pending ──flush()──→ persistent │ │
│ │ │ │ │ │
│ │ │ 自动分配主键 │ 属性修改自动追踪 │ │
│ │ ▼ ▼ │ │
│ │ deleted ◄──delete()── persistent ◄── modified │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ detached ◄──commit()/rollback() │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ db.session.commit() │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ flush() 阶段: │ │
│ │ 1. 收集所有 pending 对象 → INSERT 队列 │ │
│ │ 2. 收集所有 modified 对象 → UPDATE 队列 │ │
│ │ 3. 收集所有 deleted 对象 → DELETE 队列 │ │
│ │ 4. 按外键依赖排序队列 │ │
│ │ 5. 批量执行 SQL(减少 round-trip) │ │
│ │ │ │
│ │ commit() 阶段: │ │
│ │ 1. 数据库事务提交 │ │
│ │ 2. 状态迁移:persistent → detached │ │
│ │ 3. 清空 Unit of Work 状态 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘python
# Unit of Work 实际演示
from flask_sqlalchemy import SQLAlchemy
db: SQLAlchemy = SQLAlchemy()
# 创建对象 —— 状态: pending
user: User = User(username="alice", email="alice@example.com")
db.session.add(user)
print(db.session.new) # IdentitySet([User])
# 修改对象 —— 状态: pending → modified(flush 后)
user.email = "new@example.com"
db.session.flush() # 执行 SQL 但不提交事务
print(user.id) # 现在有 ID 了(INSERT 已执行)
# 批量操作 —— Unit of Work 自动排序
user2: User = User(username="bob", email="bob@example.com")
db.session.add(user2)
db.session.delete(user) # 标记删除
# 一次 commit 处理所有变更
db.session.commit() # INSERT bob, DELETE alice(按依赖排序)7.2 连接池原理(QueuePool vs StaticPool)
SQLAlchemy 的连接池管理数据库连接的创建、复用和回收。
┌─────────────────────────────────────────────────────────────────┐
│ 连接池架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ QueuePool(默认,适用于多进程/多线程): │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ QueuePool │ │
│ │ ├── pool_size: 5 # 常驻连接数 │ │
│ │ ├── max_overflow: 10 # 峰值时可额外创建的连接数 │ │
│ │ ├── pool_recycle: 3600 # 连接回收时间(秒) │ │
│ │ └── pool_pre_ping: True # 使用前检查连接是否存活 │ │
│ │ │ │
│ │ 连接获取流程: │ │
│ │ 1. 检查空闲队列 → 有可用连接?→ 复用 │ │
│ │ 2. 无空闲连接 + 未达 pool_size → 创建新连接 │ │
│ │ 3. 无空闲连接 + 已达 pool_size + 可溢出 → 创建溢出连接 │ │
│ │ 4. 无空闲连接 + 已达上限 → 等待(timeout=30s)→ 抛异常 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ StaticPool(适用于单线程,如 SQLite 内存数据库): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ StaticPool │ │
│ │ ├── 仅维护一个连接 │ │
│ │ ├── 每次 checkout 返回同一连接 │ │
│ │ └── checkin 时不关闭,放回池重用 │ │
│ │ 适用场景:SQLite :memory:、测试环境 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ NullPool(不池化,每次创建新连接): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ NullPool │ │
│ │ ├── 每次 checkout 创建新连接 │ │
│ │ ├── checkin 时立即关闭连接 │ │
│ │ └── 适用场景:短生命周期进程、连接数不确定的场景 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘python
# 配置连接池
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
# QueuePool 配置(生产环境推荐)
"pool_size": 10, # 基础连接数
"max_overflow": 20, # 最大连接 = 10 + 20 = 30
"pool_timeout": 30, # 等待超时(秒)
"pool_recycle": 1800, # 30 分钟回收(应对 MySQL 8 小时超时)
"pool_pre_ping": True, # 使用前检测连接存活
}
# SQLite 内存数据库必须用 StaticPool
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"poolclass": StaticPool, # 需要 from sqlalchemy.pool import StaticPool
"connect_args": {"check_same_thread": False},
}7.3 N+1 查询问题及 eager loading 的 SQL 差异
┌─────────────────────────────────────────────────────────────────┐
│ N+1 查询问题与解决方案 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 问题场景:查询所有作者及其书籍 │
│ │
│ ❌ Lazy Loading(产生 N+1 查询): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ SELECT * FROM author; -- 1 条 SQL │ │
│ │ -- 遍历每个作者时: │ │
│ │ SELECT * FROM book WHERE author_id = 1; -- N 条 SQL │ │
│ │ SELECT * FROM book WHERE author_id = 2; │ │
│ │ SELECT * FROM book WHERE author_id = 3; │ │
│ │ ... │ │
│ │ │ │
│ │ 总 SQL 数: 1 + N │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ✅ joinedload(JOIN 预加载): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ SELECT author.*, book.* │ │
│ │ FROM author │ │
│ │ LEFT OUTER JOIN book ON author.id = book.author_id; │ │
│ │ │ │
│ │ 总 SQL 数: 1 │ │
│ │ 适用场景: 一对多、多对一,关系数据量不大 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ✅ subqueryload(子查询预加载): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ SELECT * FROM author; -- 第 1 条 │ │
│ │ SELECT book.* FROM book │ │
│ │ WHERE book.author_id IN (1, 2, 3, ..., N); -- 第 2 条 │ │
│ │ │ │
│ │ 总 SQL 数: 2 │ │
│ │ 适用场景: 一对多且子表数据量大,避免 JOIN 结果集膨胀 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ✅ selectinload(IN 查询预加载,SQLAlchemy 1.4+ 推荐): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ SELECT * FROM author; -- 第 1 条 │ │
│ │ SELECT * FROM book │ │
│ │ WHERE book.author_id IN (1, 2, 3, ..., N); -- 第 2 条 │ │
│ │ │ │
│ │ 总 SQL 数: 2(大批量时分批 IN 查询) │ │
│ │ 适用场景: 通用推荐,兼顾性能和内存 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘python
from sqlalchemy.orm import joinedload, subqueryload, selectinload
# Lazy Loading(默认,产生 N+1)
authors: list[Author] = Author.query.all()
for author in authors:
print(author.books) # 每个 author 触发 1 条 SQL
# joinedload —— 单次 JOIN 查询
authors: list[Author] = (
Author.query
.options(joinedload(Author.books))
.all()
)
# SQL: SELECT author.*, book.* FROM author LEFT OUTER JOIN book ...
# subqueryload —— 两条 SQL(主查询 + 子查询)
authors: list[Author] = (
Author.query
.options(subqueryload(Author.books))
.all()
)
# SQL 1: SELECT * FROM author
# SQL 2: SELECT * FROM book WHERE author_id IN (1, 2, 3)
# selectinload —— 推荐方式(SQLAlchemy 1.4+)
authors: list[Author] = (
Author.query
.options(selectinload(Author.books))
.all()
)
# SQL 1: SELECT * FROM author
# SQL 2: SELECT * FROM book WHERE author_id IN (1, 2, 3)
# (大量数据时自动分批,避免 IN 列表过长)7.4 性能考量
| 操作 | 耗时 | 说明 |
|---|---|---|
| 连接池获取连接(命中) | ~0.01ms | 从空闲队列直接返回 |
| 连接池获取连接(新建) | ~5ms | TCP 握手 + 数据库认证 |
| 简单 INSERT(1 条) | ~2ms | 含 flush + commit |
| 批量 INSERT(1000 条) | ~50ms | bulk_insert_mappings 可降至 ~15ms |
| 简单 SELECT(含索引) | ~1ms | O(log n) B-Tree 查找 |
| 简单 SELECT(无索引) | ~100ms | O(n) 全表扫描 |
| N+1 查询(100 条主记录 × 10 条关联) | ~1000ms | 101 次 round-trip |
| joinedload(同等数据量) | ~15ms | 单次 JOIN 查询 |
| selectinload(同等数据量) | ~20ms | 2 次查询,结果集更小 |
扩展性: 连接池 pool_size 应与数据库 max_connections 和 worker 数量匹配。公式:pool_size × worker_count < max_connections × 0.8。
7.5 设计动机
| 设计决策 | 动机 | 权衡 |
|---|---|---|
| Unit of Work 模式 | 自动追踪变更,减少手动 SQL,保证事务一致性 | 隐性 SQL 生成,复杂场景难以精确控制 |
| Identity Map | 避免同一对象在 session 内多份实例 | 大结果集占用内存,需用 yield_per() 分批 |
| lazy="dynamic" | 关系属性返回 Query 对象,支持链式过滤 | 仍需额外 SQL,不适合小数据量场景 |
| 声明式模型(db.Model) | 将表定义与 ORM 映射合并,代码简洁 | 模型类与数据库耦合,难以拆分 |
| Flask-SQLAlchemy 封装 | 自动绑定 app 和 session,简化配置 | 隐藏了 SQLAlchemy 的部分高级特性 |
7.6 知识关联
Flask 应用
│
▼
┌───────────────────┐
│ Flask-SQLAlchemy │ ← 桥接层
│ (db.session) │
└────────┬──────────┘
│
▼
┌───────────────────┐
│ SQLAlchemy │
│ ┌─────────────┐ │
│ │ ORM 层 │ │ ← 声明式模型、关系映射
│ │ (Mapper) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Unit of Work │ │ ← 状态追踪、变更管理
│ │ (Session) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ SQL 表达式层 │ │ ← Query, select(), join()
│ │ (Core) │ │
│ └──────┬──────┘ │
└─────────┼─────────┘
│
▼
┌───────────────────┐
│ 连接池 │ ← QueuePool / StaticPool
│ (Connection Pool)│
└────────┬──────────┘
│
▼
┌───────────────────┐
│ DBAPI 驱动 │ ← pymysql / psycopg2 / sqlite3
│ (pymysql 等) │
└────────┬──────────┘
│
▼
数据库服务器总结
| 知识点 | 说明 |
|---|---|
| 配置 | 数据库连接配置 |
| 模型 | 表结构定义 |
| 关系 | 一对多、多对多、自引用 |
| CRUD | 增删改查操作 |
| 查询 | 过滤、排序、分页 |
| 迁移 | Flask-Migrate |