Skip to content

04-数据库集成

Python 3.11+

本章讲解 FastAPI 与 SQLAlchemy 数据库集成。


第一部分:SQLAlchemy 基础

1.1 实际场景

API 需要持久化存储用户数据、文章数据等,选择 SQLAlchemy 作为 ORM。

问题:如何配置数据库连接和会话管理?

1.2 安装

bash
pip install sqlalchemy databases asyncpg

1.3 配置

python
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

DATABASE_URL: str = "sqlite:///./test.db"

engine = create_engine(DATABASE_URL)
SessionLocal: sessionmaker[Session] = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

第二部分:定义模型

2.1 实际场景

需要定义用户表和文章表,用户表有 id、用户名、邮箱等字段。

问题:如何用 SQLAlchemy 定义数据模型?

2.2 基本模型

python
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from datetime import datetime


class User(Base):
    __tablename__ = "users"
    
    id: int = Column(Integer, primary_key=True, index=True)
    username: str = Column(String(50), unique=True, index=True)
    email: str = Column(String(120), unique=True, index=True)
    is_active: bool = Column(Boolean, default=True)
    created_at: datetime = Column(DateTime, default=datetime.utcnow)


# 创建表
Base.metadata.create_all(bind=engine)

2.3 关系定义

python
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship


class Author(Base):
    __tablename__ = "authors"
    
    id: int = Column(Integer, primary_key=True)
    name: str = Column(String(100), nullable=False)
    
    books: list = relationship("Book", back_populates="author")


class Book(Base):
    __tablename__ = "books"
    
    id: int = Column(Integer, primary_key=True)
    title: str = Column(String(200), nullable=False)
    author_id: int = Column(Integer, ForeignKey("authors.id"))
    
    author: Author = relationship("Author", back_populates="books")

第三部分:CRUD 操作

3.1 实际场景

API 需要创建用户、查询用户、更新用户、删除用户等操作。

问题:如何实现数据库的增删改查?

3.2 创建

python
from sqlalchemy.orm import Session
from pydantic import BaseModel


class UserCreate(BaseModel):
    username: str
    email: str


def create_user(db: Session, user: UserCreate) -> User:
    db_user: User = User(**user.model_dump())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

3.3 读取

python
def get_user(db: Session, user_id: int) -> User | None:
    return db.query(User).filter(User.id == user_id).first()


def get_users(db: Session, skip: int = 0, limit: int = 10) -> list[User]:
    return db.query(User).offset(skip).limit(limit).all()

3.4 更新

python
def update_user(db: Session, user_id: int, user_update: dict) -> User | None:
    db_user: User | None = db.query(User).filter(User.id == user_id).first()
    if db_user:
        for key, value in user_update.items():
            setattr(db_user, key, value)
        db.commit()
        db.refresh(db_user)
    return db_user

3.5 删除

python
def delete_user(db: Session, user_id: int) -> User | None:
    db_user: User | None = db.query(User).filter(User.id == user_id).first()
    if db_user:
        db.delete(db_user)
        db.commit()
    return db_user

第四部分:异步数据库

4.1 实际场景

高并发场景下,同步数据库操作会成为瓶颈,需要异步数据库支持。

问题:如何配置异步数据库连接?

4.2 异步配置

python
from databases import Database
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

DATABASE_URL: str = "sqlite+aiosqlite:///./test.db"

database: Database = Database(DATABASE_URL)
engine = create_async_engine(DATABASE_URL)
async_session: sessionmaker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

4.3 异步 CRUD

python
async def get_users() -> list[dict]:
    query: str = "SELECT * FROM users"
    return await database.fetch_all(query)


async def create_user(user_data: dict) -> int:
    query: str = "INSERT INTO users (username, email) VALUES (:username, :email)"
    return await database.execute(query, user_data)

第五部分:完整示例

python
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from datetime import datetime
from typing import Any

app: FastAPI = FastAPI()

# ==================== 数据库配置 ====================
DATABASE_URL: str = "sqlite:///./app.db"
engine = create_engine(DATABASE_URL)
SessionLocal: sessionmaker[Session] = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# ==================== 模型 ====================
class User(Base):
    __tablename__ = "users"
    
    id: int = Column(Integer, primary_key=True, index=True)
    username: str = Column(String(50), unique=True, index=True)
    email: str = Column(String(120), unique=True, index=True)
    is_active: bool = Column(Boolean, default=True)
    created_at: datetime = Column(DateTime, default=datetime.utcnow)

Base.metadata.create_all(bind=engine)

# ==================== Pydantic 模型 ====================
class UserCreate(BaseModel):
    username: str
    email: str


class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool
    created_at: datetime
    
    model_config = ConfigDict(from_attributes=True)

# ==================== 依赖 ====================
def get_db() -> Session:
    db: Session = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# ==================== 路由 ====================
@app.post("/users", response_model=UserResponse, status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)) -> User:
    db_user: User = User(**user.model_dump())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


@app.get("/users", response_model=list[UserResponse])
def get_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)) -> list[User]:
    users: list[User] = db.query(User).offset(skip).limit(limit).all()
    return users


@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)) -> User:
    user: User | None = db.query(User).filter(User.id == user_id).first()
    if not user:
        from fastapi import HTTPException
        raise HTTPException(status_code=404, detail="User not found")
    return user


@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: int, db: Session = Depends(get_db)) -> None:
    user: User | None = db.query(User).filter(User.id == user_id).first()
    if user:
        db.delete(user)
        db.commit()
    return None

第六部分:L3 专家层

6.1 SQLAlchemy 2.0 的 async 引擎(asyncpg/aiosqlite)

SQLAlchemy 2.0 引入了原生 async 支持,通过 sqlalchemy.ext.asyncio 模块提供异步引擎和会话。

异步引擎架构:

AsyncSession


┌──────────────────────────────────┐
│     AsyncEngine (SQLAlchemy)      │
│  ┌────────────────────────────┐  │
│  │  AsyncAdaptedQueue          │  │
│  │  (async 连接池队列)          │  │
│  └────────────┬───────────────┘  │
│               │                   │
│               ▼                   │
│  ┌────────────────────────────┐  │
│  │  AsyncAdaptedConnection     │  │
│  │  (包装底层 async 驱动)       │  │
│  │  ┌──────────────────────┐  │  │
│  │  │ asyncpg (PostgreSQL) │  │  │
│  │  │ aiosqlite (SQLite)   │  │  │
│  │  │ aiomysql (MySQL)     │  │  │
│  │  └──────────────────────┘  │  │
│  └────────────────────────────┘  │
└──────────────────────────────────┘

驱动对比:

python
# PostgreSQL + asyncpg(生产推荐)
from sqlalchemy.ext.asyncio import create_async_engine

engine: AsyncEngine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/dbname",
    echo=True,
)

# SQLite + aiosqlite(开发/测试)
engine: AsyncEngine = create_async_engine(
    "sqlite+aiosqlite:///./test.db",
    echo=True,
)

核心区别:

  • asyncpg:基于 PostgreSQL 协议直接实现,性能最高,不支持 SQLite
  • aiosqlite:在线程池中运行同步 sqlite3,通过 anyio.to_thread 桥接
  • echo=True:输出 SQL 日志,生产环境应关闭

6.2 连接池的 async 适配

SQLAlchemy 的连接池在 async 环境下通过 AsyncAdaptedQueue 实现。

连接池工作流程:

                   AsyncEngine


              ┌─────────────────┐
              │   Connection     │
              │     Pool         │
              │                  │
              │  pool_size=5     │  ← 常驻连接数
              │  max_overflow=10 │  ← 最大溢出连接
              │  pool_timeout=30 │  ← 获取超时(秒)
              │  pool_recycle=   │  ← 连接回收时间
              │    3600          │
              └────────┬────────┘

              ┌────────┴────────┐
              │  AsyncAdapted   │
              │    Queue        │
              │  (FIFO async)   │
              └────────┬────────┘

            ┌──────────┼──────────┐
            ▼          ▼          ▼
         conn1      conn2      conn3
         (asyncpg  (asyncpg  (asyncpg
          connect)  connect)  connect)

关键参数说明:

参数默认值说明
pool_size5池中保持的常驻连接数
max_overflow10超出 pool_size 后可临时创建的连接数
pool_timeout30获取连接的最大等待时间(秒)
pool_recycle-1(不回收)连接回收间隔(秒),防止数据库断开空闲连接
pool_pre_pingFalse每次获取连接前测试连通性

6.3 事务隔离级别

SQLAlchemy 支持多种事务隔离级别,直接影响并发读写的一致性。

隔离级别对比:

隔离级别 (由弱到强):

  READ UNCOMMITTED    ← 可能读到其他事务未提交的数据(脏读)


  READ COMMITTED      ← 只读到已提交的数据,但同一事务内两次读取可能不同(不可重复读)


  REPEATABLE READ     ← 同一事务内多次读取结果一致,但可能出现幻读


  SERIALIZABLE        ← 完全隔离,事务串行执行,性能最低

SQLAlchemy 中设置:

python
from sqlalchemy import create_engine

# 引擎级别设置
engine = create_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    isolation_level="REPEATABLE READ",  # PostgreSQL
)

# SQLite 仅支持两种模式
engine = create_engine(
    "sqlite+aiosqlite:///./test.db",
    connect_args={"check_same_thread": False},
)
# SQLite 通过 PRAGMA 设置: PRAGMA journal_mode=WAL;

# 会话级别设置(覆盖引擎级别)
async with async_session() as session:
    await session.connection(
        execution_options={"isolation_level": "SERIALIZABLE"}
    )
    # 在此连接上的操作使用 SERIALIZABLE 隔离级别

各数据库支持的隔离级别:

数据库READ UNCOMMITTEDREAD COMMITTEDREPEATABLE READSERIALIZABLE
PostgreSQL❌ (映射为 READ COMMITTED)
MySQL✅ (默认)
SQLite✅ (默认)

6.4 性能考量

维度说明建议
asyncpg直接协议实现,零 Python 对象转换生产环境首选 PostgreSQL 驱动
aiosqlite线程池桥接,有上下文切换开销仅用于开发/测试
连接池大小过大浪费资源,过小导致等待根据并发量调整,通常 5-20
pool_pre_ping每次获取连接前 ping数据库不稳定时开启
expire_on_commit默认 True,commit 后属性过期async 场景设为 False 避免额外查询

6.5 设计动机

设计选择原因
async 引擎FastAPI 是异步框架,同步 DB 调用会阻塞事件循环
连接池数据库连接建立开销大,复用可显著提升性能
事务隔离不同业务场景对一致性和性能的需求不同
ORM + Core 并存ORM 适合业务逻辑,Core 适合复杂查询和批量操作

6.6 知识关联

                    数据库集成

        ┌────────────────┼────────────────┐
        ▼                ▼                ▼
   SQLAlchemy 2.0    连接池管理      事务隔离
   (async engine)   (pool config)   (isolation level)
        │                │                │
        ▼                ▼                ▼
    asyncpg/aiosqlite  pool_size     READ COMMITTED
    AsyncSession      max_overflow   REPEATABLE READ
    AsyncEngine       pool_recycle   SERIALIZABLE
        │                │                │
        └────────────────┼────────────────┘

                  01-FastAPI 入门
                  02-Pydantic 模型
                  03-依赖注入 (DB Session)

总结

知识点说明
SQLAlchemyORM 框架
Session数据库会话
CRUD增删改查
关系一对多、多对多
异步async/await