database
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDatabase Integration (SQLAlchemy Async + Alembic)
数据库集成(SQLAlchemy 异步 + Alembic)
Overview
概述
Use SQLAlchemy 2.0+ with async support as the database toolkit for FastAPI applications. SQLAlchemy provides both ORM (Object-Relational Mapping) and Core (SQL expression) layers. Pair with Alembic for schema migrations.
Key packages:
bash
uv add "sqlalchemy[asyncio]" alembic asyncpg # PostgreSQL使用支持异步的SQLAlchemy 2.0+作为FastAPI应用的数据库工具包。SQLAlchemy同时提供ORM(对象关系映射)和Core(SQL表达式)两层。搭配Alembic进行Schema迁移。
关键包:
bash
uv add "sqlalchemy[asyncio]" alembic asyncpg # PostgreSQLor
or
uv add "sqlalchemy[asyncio]" alembic aiosqlite # SQLite
- `sqlalchemy[asyncio]` -- async engine and session support
- `asyncpg` -- high-performance async PostgreSQL driver
- `aiosqlite` -- async SQLite driver
- `alembic` -- database migration tooluv add "sqlalchemy[asyncio]" alembic aiosqlite # SQLite
- `sqlalchemy[asyncio]` -- 异步引擎和会话支持
- `asyncpg` -- 高性能异步PostgreSQL驱动
- `aiosqlite` -- 异步SQLite驱动
- `alembic` -- 数据库迁移工具Async Engine and Session
异步引擎与会话
Engine Setup
引擎配置
python
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/mydb"python
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/mydb"For SQLite: "sqlite+aiosqlite:///./app.db"
For SQLite: "sqlite+aiosqlite:///./app.db"
engine = create_async_engine(
DATABASE_URL,
echo=False, # Set True for SQL query logging
pool_size=5, # Number of persistent connections
max_overflow=10, # Additional connections allowed beyond pool_size
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
)
async_session = async_sessionmaker(
engine,
expire_on_commit=False, # Prevent lazy-load after commit in async
)
undefinedengine = create_async_engine(
DATABASE_URL,
echo=False, # 设置为True可开启SQL查询日志
pool_size=5, # 持久化连接数量
max_overflow=10, # 超出pool_size允许的额外连接数
pool_pre_ping=True, # 使用前验证连接有效性
pool_recycle=3600, # 1小时后回收连接
)
async_session = async_sessionmaker(
engine,
expire_on_commit=False, # 异步环境下提交后避免延迟加载
)
undefinedFastAPI Integration with Lifespan
与FastAPI生命周期集成
python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Engine is created at module level; dispose on shutdown
yield
await engine.dispose()
app = FastAPI(lifespan=lifespan)
async def get_db() -> AsyncIterator[AsyncSession]:
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseUse in route functions to inject a session per request.
Depends(get_db)python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# 引擎在模块级别创建;关闭时释放资源
yield
await engine.dispose()
app = FastAPI(lifespan=lifespan)
async def get_db() -> AsyncIterator[AsyncSession]:
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise在路由函数中使用,为每个请求注入一个会话。
Depends(get_db)Model Definition
模型定义
Declarative Base
声明式基类
python
from datetime import datetime
from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship,
)
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
onupdate=func.now(),
)python
from datetime import datetime
from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship,
)
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
onupdate=func.now(),
)Model Examples
模型示例
python
class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
# Relationships
posts: Mapped[list["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan",
)
class Post(TimestampMixin, Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")Use with for all column definitions (SQLAlchemy 2.0 style). Avoid the legacy syntax.
Mapped[type]mapped_column()Column()python
class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
# 关联关系
posts: Mapped[list["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan",
)
class Post(TimestampMixin, Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")所有列定义请使用搭配(SQLAlchemy 2.0风格)。避免使用旧版语法。
Mapped[type]mapped_column()Column()CRUD Operations
CRUD操作
Repository Pattern
仓库模式
python
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
return await self.session.get(User, user_id)
async def get_by_email(self, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def list_users(
self, *, offset: int = 0, limit: int = 20
) -> list[User]:
stmt = select(User).offset(offset).limit(limit).order_by(User.id)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def create(self, **kwargs) -> User:
user = User(**kwargs)
self.session.add(user)
await self.session.flush() # Assign ID without committing
return user
async def update(self, user: User, **kwargs) -> User:
for key, value in kwargs.items():
setattr(user, key, value)
await self.session.flush()
return user
async def delete(self, user: User) -> None:
await self.session.delete(user)
async def count(self) -> int:
stmt = select(func.count()).select_from(User)
result = await self.session.execute(stmt)
return result.scalar_one()python
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
return await self.session.get(User, user_id)
async def get_by_email(self, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def list_users(
self, *, offset: int = 0, limit: int = 20
) -> list[User]:
stmt = select(User).offset(offset).limit(limit).order_by(User.id)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def create(self, **kwargs) -> User:
user = User(**kwargs)
self.session.add(user)
await self.session.flush() # 分配ID但不提交
return user
async def update(self, user: User, **kwargs) -> User:
for key, value in kwargs.items():
setattr(user, key, value)
await self.session.flush()
return user
async def delete(self, user: User) -> None:
await self.session.delete(user)
async def count(self) -> int:
stmt = select(func.count()).select_from(User)
result = await self.session.execute(stmt)
return result.scalar_one()Using in FastAPI Routes
在FastAPI路由中使用
python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
repo = UserRepository(db)
user = await repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("/", status_code=201)
async def create_user(
data: UserCreate,
db: AsyncSession = Depends(get_db),
):
repo = UserRepository(db)
existing = await repo.get_by_email(data.email)
if existing:
raise HTTPException(status_code=409, detail="Email already registered")
user = await repo.create(**data.model_dump())
return userpython
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
repo = UserRepository(db)
user = await repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
@router.post("/", status_code=201)
async def create_user(
data: UserCreate,
db: AsyncSession = Depends(get_db),
):
repo = UserRepository(db)
existing = await repo.get_by_email(data.email)
if existing:
raise HTTPException(status_code=409, detail="该邮箱已注册")
user = await repo.create(**data.model_dump())
return userQuery Patterns
查询模式
Eager Loading (Avoid N+1)
预加载(避免N+1问题)
python
from sqlalchemy.orm import selectinload, joinedloadpython
from sqlalchemy.orm import selectinload, joinedloadselectinload: separate IN query (best for collections)
selectinload: 单独的IN查询(适合集合类型关联)
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
joinedload: LEFT JOIN (best for single relationships)
joinedload: LEFT JOIN(适合单个关联关系)
stmt = select(Post).options(joinedload(Post.author)).where(Post.id == post_id)
Always use eager loading when accessing relationships in async contexts. Lazy loading raises errors under async sessions.stmt = select(Post).options(joinedload(Post.author)).where(Post.id == post_id)
在异步环境中访问关联关系时,请始终使用预加载。异步会话下延迟加载会报错。Filtering and Ordering
过滤与排序
python
from sqlalchemy import and_, or_, descpython
from sqlalchemy import and_, or_, descComplex filters
复杂过滤条件
stmt = (
select(User)
.where(
and_(
User.is_active == True,
or_(
User.name.ilike(f"%{query}%"),
User.email.ilike(f"%{query}%"),
),
)
)
.order_by(desc(User.created_at))
.offset(offset)
.limit(limit)
)
undefinedstmt = (
select(User)
.where(
and_(
User.is_active == True,
or_(
User.name.ilike(f"%{query}%"),
User.email.ilike(f"%{query}%"),
),
)
)
.order_by(desc(User.created_at))
.offset(offset)
.limit(limit)
)
undefinedPagination
分页
python
from pydantic import BaseModel
class PaginatedResponse[T](BaseModel):
items: list[T]
total: int
offset: int
limit: int
@property
def has_more(self) -> bool:
return self.offset + self.limit < self.total
async def paginate(
session: AsyncSession,
stmt,
*,
offset: int = 0,
limit: int = 20,
) -> tuple[list, int]:
# Count query
count_stmt = select(func.count()).select_from(stmt.subquery())
total = (await session.execute(count_stmt)).scalar_one()
# Data query
result = await session.execute(stmt.offset(offset).limit(limit))
items = list(result.scalars().all())
return items, totalpython
from pydantic import BaseModel
class PaginatedResponse[T](BaseModel):
items: list[T]
total: int
offset: int
limit: int
@property
def has_more(self) -> bool:
return self.offset + self.limit < self.total
async def paginate(
session: AsyncSession,
stmt,
*,
offset: int = 0,
limit: int = 20,
) -> tuple[list, int]:
# 计数查询
count_stmt = select(func.count()).select_from(stmt.subquery())
total = (await session.execute(count_stmt)).scalar_one()
# 数据查询
result = await session.execute(stmt.offset(offset).limit(limit))
items = list(result.scalars().all())
return items, totalAlembic Migrations
Alembic迁移
Setup
初始化
bash
undefinedbash
undefinedInitialize Alembic
初始化Alembic
uv run alembic init alembic
uv run alembic init alembic
For async support, use the async template
如需异步支持,使用异步模板
uv run alembic init -t async alembic
Configure `alembic/env.py`:
```pythonuv run alembic init -t async alembic
配置`alembic/env.py`:
```pythonalembic/env.py
alembic/env.py
from app.database import Base, DATABASE_URL
from app.models import User, Post # Import all models
config = context.config
config.set_main_option("sqlalchemy.url", DATABASE_URL.replace("+asyncpg", ""))
target_metadata = Base.metadata
For the async template, update `run_async_migrations()` in `env.py` to use your async engine.from app.database import Base, DATABASE_URL
from app.models import User, Post # 导入所有模型
config = context.config
config.set_main_option("sqlalchemy.url", DATABASE_URL.replace("+asyncpg", ""))
target_metadata = Base.metadata
如果使用异步模板,请更新`env.py`中的`run_async_migrations()`以使用你的异步引擎。Migration Commands
迁移命令
bash
undefinedbash
undefinedGenerate a migration from model changes
根据模型变更生成迁移脚本
uv run alembic revision --autogenerate -m "add users table"
uv run alembic revision --autogenerate -m "add users table"
Apply all pending migrations
应用所有待执行的迁移
uv run alembic upgrade head
uv run alembic upgrade head
Rollback one migration
回滚一个迁移
uv run alembic downgrade -1
uv run alembic downgrade -1
Show current migration status
查看当前迁移状态
uv run alembic current
uv run alembic current
Show migration history
查看迁移历史
uv run alembic history
undefineduv run alembic history
undefinedMigration Best Practices
迁移最佳实践
- Always review auto-generated migrations before applying -- autogenerate cannot detect all changes (renamed columns, data migrations).
- Test migrations both ways -- run and
upgradeto verify reversibility.downgrade - Use descriptive revision messages -- not
add_users_table.update - Never edit applied migrations -- create new migrations instead.
- Include migrations in version control -- the directory should be committed.
alembic/versions/
- 应用前务必检查自动生成的迁移脚本——自动生成无法检测所有变更(如列重命名、数据迁移)。
- 双向测试迁移——运行和
upgrade验证可逆性。downgrade - 使用描述性的版本信息——比如而非
add users table。update - 切勿修改已应用的迁移脚本——如需调整请创建新的迁移。
- 将迁移纳入版本控制——目录需提交至代码仓库。
alembic/versions/
Connection Pool Tuning
连接池调优
| Parameter | Default | Description |
|---|---|---|
| 5 | Number of persistent connections in the pool |
| 10 | Extra connections allowed beyond |
| 30 | Seconds to wait for a connection before error |
| -1 | Seconds before a connection is recycled (set for PG) |
| False | Test connections before checkout (set True for prod) |
Production recommendation for a 4-worker FastAPI app:
python
engine = create_async_engine(
DATABASE_URL,
pool_size=5, # 5 per worker = 20 total connections
max_overflow=10, # Burst to 15 per worker
pool_pre_ping=True, # Handle dropped connections
pool_recycle=3600, # Recycle hourly
)Total max connections = . Ensure the database setting accommodates this.
workers * (pool_size + max_overflow)max_connections| 参数 | 默认值 | 描述 |
|---|---|---|
| 5 | 连接池中的持久化连接数量 |
| 10 | 超出pool_size允许的额外临时连接数 |
| 30 | 获取连接超时前等待的秒数 |
| -1 | 连接回收的时间间隔(PostgreSQL需设置) |
| False | 取出连接前验证有效性(生产环境建议设为True) |
针对4个Worker的FastAPI生产环境建议配置:
python
engine = create_async_engine(
DATABASE_URL,
pool_size=5, # 每个Worker 5个连接 = 总计20个连接
max_overflow=10, # 每个Worker可额外扩展15个连接
pool_pre_ping=True, # 处理断开的连接
pool_recycle=3600, # 每小时回收连接
)最大总连接数 = 。需确保数据库的设置能容纳此数量。
workers * (pool_size + max_overflow)max_connectionsTesting with Database
数据库测试
In-Memory SQLite for Unit Tests
单元测试使用内存SQLite
python
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from app.database import Base, get_db
from app.main import create_app
@pytest.fixture
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture
async def db_session(db_engine):
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()
@pytest.fixture
def app(db_session):
app = create_app()
app.dependency_overrides[get_db] = lambda: db_session
yield app
app.dependency_overrides.clear()python
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from app.database import Base, get_db
from app.main import create_app
@pytest.fixture
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture
async def db_session(db_engine):
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()
@pytest.fixture
def app(db_session):
app = create_app()
app.dependency_overrides[get_db] = lambda: db_session
yield app
app.dependency_overrides.clear()Transaction Rollback Pattern
事务回滚模式
Wrap each test in a transaction that always rolls back for isolation:
python
@pytest.fixture
async def db_session(db_engine):
async with db_engine.connect() as conn:
trans = await conn.begin()
session = AsyncSession(bind=conn, expire_on_commit=False)
yield session
await trans.rollback()将每个测试包裹在事务中,测试结束后始终回滚以保证隔离性:
python
@pytest.fixture
async def db_session(db_engine):
async with db_engine.connect() as conn:
trans = await conn.begin()
session = AsyncSession(bind=conn, expire_on_commit=False)
yield session
await trans.rollback()Cross-References
交叉引用
- For Pydantic request/response models, consult the skill.
pydantic - For FastAPI routing and dependency injection, consult the skill.
app-scaffolding - For async patterns and error handling, consult the skill.
async-patterns - For Docker Compose with PostgreSQL, consult the skill.
docker-build - For test fixtures and pytest patterns, consult the skill.
test-runner
- 如需了解Pydantic请求/响应模型,请参考技能。
pydantic - 如需了解FastAPI路由和依赖注入,请参考技能。
app-scaffolding - 如需了解异步模式和错误处理,请参考技能。
async-patterns - 如需了解搭配Docker Compose使用PostgreSQL,请参考技能。
docker-build - 如需了解测试夹具和pytest模式,请参考技能。
test-runner