04-数据库集成
Python 3.11+
本章讲解 FastAPI 与 SQLAlchemy 数据库集成。
第一部分:SQLAlchemy 基础
1.1 实际场景
API 需要持久化存储用户数据、文章数据等,选择 SQLAlchemy 作为 ORM。
问题:如何配置数据库连接和会话管理?
1.2 安装
bash
pip install sqlalchemy databases asyncpg1.3 配置
python
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
DATABASE_URL: str = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL)
SessionLocal: sessionmaker[Session] = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()第二部分:定义模型
2.1 实际场景
需要定义用户表和文章表,用户表有 id、用户名、邮箱等字段。
问题:如何用 SQLAlchemy 定义数据模型?
2.2 基本模型
python
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from datetime import datetime
class User(Base):
__tablename__ = "users"
id: int = Column(Integer, primary_key=True, index=True)
username: str = Column(String(50), unique=True, index=True)
email: str = Column(String(120), unique=True, index=True)
is_active: bool = Column(Boolean, default=True)
created_at: datetime = Column(DateTime, default=datetime.utcnow)
# 创建表
Base.metadata.create_all(bind=engine)2.3 关系定义
python
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Author(Base):
__tablename__ = "authors"
id: int = Column(Integer, primary_key=True)
name: str = Column(String(100), nullable=False)
books: list = relationship("Book", back_populates="author")
class Book(Base):
__tablename__ = "books"
id: int = Column(Integer, primary_key=True)
title: str = Column(String(200), nullable=False)
author_id: int = Column(Integer, ForeignKey("authors.id"))
author: Author = relationship("Author", back_populates="books")第三部分:CRUD 操作
3.1 实际场景
API 需要创建用户、查询用户、更新用户、删除用户等操作。
问题:如何实现数据库的增删改查?
3.2 创建
python
from sqlalchemy.orm import Session
from pydantic import BaseModel
class UserCreate(BaseModel):
username: str
email: str
def create_user(db: Session, user: UserCreate) -> User:
db_user: User = User(**user.model_dump())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user3.3 读取
python
def get_user(db: Session, user_id: int) -> User | None:
return db.query(User).filter(User.id == user_id).first()
def get_users(db: Session, skip: int = 0, limit: int = 10) -> list[User]:
return db.query(User).offset(skip).limit(limit).all()3.4 更新
python
def update_user(db: Session, user_id: int, user_update: dict) -> User | None:
db_user: User | None = db.query(User).filter(User.id == user_id).first()
if db_user:
for key, value in user_update.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user3.5 删除
python
def delete_user(db: Session, user_id: int) -> User | None:
db_user: User | None = db.query(User).filter(User.id == user_id).first()
if db_user:
db.delete(db_user)
db.commit()
return db_user第四部分:异步数据库
4.1 实际场景
高并发场景下,同步数据库操作会成为瓶颈,需要异步数据库支持。
问题:如何配置异步数据库连接?
4.2 异步配置
python
from databases import Database
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
DATABASE_URL: str = "sqlite+aiosqlite:///./test.db"
database: Database = Database(DATABASE_URL)
engine = create_async_engine(DATABASE_URL)
async_session: sessionmaker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)4.3 异步 CRUD
python
async def get_users() -> list[dict]:
query: str = "SELECT * FROM users"
return await database.fetch_all(query)
async def create_user(user_data: dict) -> int:
query: str = "INSERT INTO users (username, email) VALUES (:username, :email)"
return await database.execute(query, user_data)第五部分:完整示例
python
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from datetime import datetime
from typing import Any
app: FastAPI = FastAPI()
# ==================== 数据库配置 ====================
DATABASE_URL: str = "sqlite:///./app.db"
engine = create_engine(DATABASE_URL)
SessionLocal: sessionmaker[Session] = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# ==================== 模型 ====================
class User(Base):
__tablename__ = "users"
id: int = Column(Integer, primary_key=True, index=True)
username: str = Column(String(50), unique=True, index=True)
email: str = Column(String(120), unique=True, index=True)
is_active: bool = Column(Boolean, default=True)
created_at: datetime = Column(DateTime, default=datetime.utcnow)
Base.metadata.create_all(bind=engine)
# ==================== Pydantic 模型 ====================
class UserCreate(BaseModel):
username: str
email: str
class UserResponse(BaseModel):
id: int
username: str
email: str
is_active: bool
created_at: datetime
model_config = ConfigDict(from_attributes=True)
# ==================== 依赖 ====================
def get_db() -> Session:
db: Session = SessionLocal()
try:
yield db
finally:
db.close()
# ==================== 路由 ====================
@app.post("/users", response_model=UserResponse, status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)) -> User:
db_user: User = User(**user.model_dump())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users", response_model=list[UserResponse])
def get_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)) -> list[User]:
users: list[User] = db.query(User).offset(skip).limit(limit).all()
return users
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)) -> User:
user: User | None = db.query(User).filter(User.id == user_id).first()
if not user:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="User not found")
return user
@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: int, db: Session = Depends(get_db)) -> None:
user: User | None = db.query(User).filter(User.id == user_id).first()
if user:
db.delete(user)
db.commit()
return None第六部分:L3 专家层
6.1 SQLAlchemy 2.0 的 async 引擎(asyncpg/aiosqlite)
SQLAlchemy 2.0 引入了原生 async 支持,通过 sqlalchemy.ext.asyncio 模块提供异步引擎和会话。
异步引擎架构:
AsyncSession
│
▼
┌──────────────────────────────────┐
│ AsyncEngine (SQLAlchemy) │
│ ┌────────────────────────────┐ │
│ │ AsyncAdaptedQueue │ │
│ │ (async 连接池队列) │ │
│ └────────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────┐ │
│ │ AsyncAdaptedConnection │ │
│ │ (包装底层 async 驱动) │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ asyncpg (PostgreSQL) │ │ │
│ │ │ aiosqlite (SQLite) │ │ │
│ │ │ aiomysql (MySQL) │ │ │
│ │ └──────────────────────┘ │ │
│ └────────────────────────────┘ │
└──────────────────────────────────┘驱动对比:
python
# PostgreSQL + asyncpg(生产推荐)
from sqlalchemy.ext.asyncio import create_async_engine
engine: AsyncEngine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/dbname",
echo=True,
)
# SQLite + aiosqlite(开发/测试)
engine: AsyncEngine = create_async_engine(
"sqlite+aiosqlite:///./test.db",
echo=True,
)核心区别:
asyncpg:基于 PostgreSQL 协议直接实现,性能最高,不支持 SQLiteaiosqlite:在线程池中运行同步sqlite3,通过anyio.to_thread桥接echo=True:输出 SQL 日志,生产环境应关闭
6.2 连接池的 async 适配
SQLAlchemy 的连接池在 async 环境下通过 AsyncAdaptedQueue 实现。
连接池工作流程:
AsyncEngine
│
▼
┌─────────────────┐
│ Connection │
│ Pool │
│ │
│ pool_size=5 │ ← 常驻连接数
│ max_overflow=10 │ ← 最大溢出连接
│ pool_timeout=30 │ ← 获取超时(秒)
│ pool_recycle= │ ← 连接回收时间
│ 3600 │
└────────┬────────┘
│
┌────────┴────────┐
│ AsyncAdapted │
│ Queue │
│ (FIFO async) │
└────────┬────────┘
│
┌──────────┼──────────┐
▼ ▼ ▼
conn1 conn2 conn3
(asyncpg (asyncpg (asyncpg
connect) connect) connect)关键参数说明:
| 参数 | 默认值 | 说明 |
|---|---|---|
pool_size | 5 | 池中保持的常驻连接数 |
max_overflow | 10 | 超出 pool_size 后可临时创建的连接数 |
pool_timeout | 30 | 获取连接的最大等待时间(秒) |
pool_recycle | -1(不回收) | 连接回收间隔(秒),防止数据库断开空闲连接 |
pool_pre_ping | False | 每次获取连接前测试连通性 |
6.3 事务隔离级别
SQLAlchemy 支持多种事务隔离级别,直接影响并发读写的一致性。
隔离级别对比:
隔离级别 (由弱到强):
READ UNCOMMITTED ← 可能读到其他事务未提交的数据(脏读)
│
▼
READ COMMITTED ← 只读到已提交的数据,但同一事务内两次读取可能不同(不可重复读)
│
▼
REPEATABLE READ ← 同一事务内多次读取结果一致,但可能出现幻读
│
▼
SERIALIZABLE ← 完全隔离,事务串行执行,性能最低SQLAlchemy 中设置:
python
from sqlalchemy import create_engine
# 引擎级别设置
engine = create_engine(
"postgresql+asyncpg://user:pass@localhost/db",
isolation_level="REPEATABLE READ", # PostgreSQL
)
# SQLite 仅支持两种模式
engine = create_engine(
"sqlite+aiosqlite:///./test.db",
connect_args={"check_same_thread": False},
)
# SQLite 通过 PRAGMA 设置: PRAGMA journal_mode=WAL;
# 会话级别设置(覆盖引擎级别)
async with async_session() as session:
await session.connection(
execution_options={"isolation_level": "SERIALIZABLE"}
)
# 在此连接上的操作使用 SERIALIZABLE 隔离级别各数据库支持的隔离级别:
| 数据库 | READ UNCOMMITTED | READ COMMITTED | REPEATABLE READ | SERIALIZABLE |
|---|---|---|---|---|
| PostgreSQL | ❌ (映射为 READ COMMITTED) | ✅ | ✅ | ✅ |
| MySQL | ✅ | ✅ | ✅ (默认) | ✅ |
| SQLite | ❌ | ✅ (默认) | ❌ | ✅ |
6.4 性能考量
| 维度 | 说明 | 建议 |
|---|---|---|
| asyncpg | 直接协议实现,零 Python 对象转换 | 生产环境首选 PostgreSQL 驱动 |
| aiosqlite | 线程池桥接,有上下文切换开销 | 仅用于开发/测试 |
| 连接池大小 | 过大浪费资源,过小导致等待 | 根据并发量调整,通常 5-20 |
| pool_pre_ping | 每次获取连接前 ping | 数据库不稳定时开启 |
expire_on_commit | 默认 True,commit 后属性过期 | async 场景设为 False 避免额外查询 |
6.5 设计动机
| 设计选择 | 原因 |
|---|---|
| async 引擎 | FastAPI 是异步框架,同步 DB 调用会阻塞事件循环 |
| 连接池 | 数据库连接建立开销大,复用可显著提升性能 |
| 事务隔离 | 不同业务场景对一致性和性能的需求不同 |
| ORM + Core 并存 | ORM 适合业务逻辑,Core 适合复杂查询和批量操作 |
6.6 知识关联
数据库集成
│
┌────────────────┼────────────────┐
▼ ▼ ▼
SQLAlchemy 2.0 连接池管理 事务隔离
(async engine) (pool config) (isolation level)
│ │ │
▼ ▼ ▼
asyncpg/aiosqlite pool_size READ COMMITTED
AsyncSession max_overflow REPEATABLE READ
AsyncEngine pool_recycle SERIALIZABLE
│ │ │
└────────────────┼────────────────┘
▼
01-FastAPI 入门
02-Pydantic 模型
03-依赖注入 (DB Session)总结
| 知识点 | 说明 |
|---|---|
| SQLAlchemy | ORM 框架 |
| Session | 数据库会话 |
| CRUD | 增删改查 |
| 关系 | 一对多、多对多 |
| 异步 | async/await |