sqlalchemy-orm
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSQLAlchemy ORM Skill
SQLAlchemy ORM 技能文档
progressive_disclosure: entry_point: summary: "Python SQL toolkit and ORM with powerful query builder and relationship mapping" when_to_use: - "When building Python applications with databases" - "When needing complex SQL queries with type safety" - "When working with FastAPI/Flask/Django" - "When needing database migrations (Alembic)" quick_start: - "pip install sqlalchemy" - "Define models with declarative base" - "Create engine and session" - "Query with select() and commit()" token_estimate: entry: 70-85 full: 4500-5500
progressive_disclosure: entry_point: summary: "Python SQL工具包与ORM框架,具备强大的查询构建器和关系映射功能" when_to_use: - "在构建使用数据库的Python应用程序时" - "需要具备类型安全性的复杂SQL查询时" - "使用FastAPI/Flask/Django开发时" - "需要通过Alembic实现数据库迁移时" quick_start: - "pip install sqlalchemy" - "使用声明式基类定义模型" - "创建引擎和会话" - "使用select()和commit()执行查询" token_estimate: entry: 70-85 full: 4500-5500
Core Concepts
核心概念
SQLAlchemy 2.0 Modern API
SQLAlchemy 2.0 现代API
SQLAlchemy 2.0 introduced modern patterns with better type hints, improved query syntax, and async support.
Key Changes from 1.x:
- instead of
select()Query - and
Mapped[T]for type hintsmapped_column() - Explicit for queries
Session.execute() - Better async support with
AsyncSession
SQLAlchemy 2.0引入了现代编程模式,提供了更完善的类型提示、改进的查询语法以及异步支持。
与1.x版本的主要变化:
- 使用替代
select()Query - 采用和
Mapped[T]实现类型提示mapped_column() - 显式使用执行查询
Session.execute() - 通过提供更优的异步支持
AsyncSession
Installation
安装
bash
undefinedbash
undefinedCore SQLAlchemy
核心SQLAlchemy
pip install sqlalchemy
pip install sqlalchemy
With async support
带异步支持
pip install sqlalchemy[asyncio] aiosqlite # SQLite
pip install sqlalchemy[asyncio] asyncpg # PostgreSQL
pip install sqlalchemy[asyncio] aiosqlite # SQLite
pip install sqlalchemy[asyncio] asyncpg # PostgreSQL
With Alembic for migrations
搭配Alembic实现迁移
pip install alembic
pip install alembic
FastAPI integration
FastAPI集成
pip install fastapi sqlalchemy
undefinedpip install fastapi sqlalchemy
undefinedDeclarative Models (SQLAlchemy 2.0)
声明式模型(SQLAlchemy 2.0)
Basic Model Definition
基础模型定义
python
from datetime import datetime
from typing import Optional
from sqlalchemy import String, DateTime, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationshippython
from datetime import datetime
from typing import Optional
from sqlalchemy import String, DateTime, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationshipBase class for all models
所有模型的基类
class Base(DeclarativeBase):
pass
class Base(DeclarativeBase):
pass
User model with type hints
带类型提示的用户模型
class User(Base):
tablename = "users"
# Primary key
id: Mapped[int] = mapped_column(primary_key=True)
# Required fields
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))
# Optional fields
full_name: Mapped[Optional[str]] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
# Timestamps with server defaults
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
# Relationships
posts: Mapped[list["Post"]] = relationship(back_populates="author")
def __repr__(self) -> str:
return f"User(id={self.id}, email={self.email})"undefinedclass User(Base):
tablename = "users"
# 主键
id: Mapped[int] = mapped_column(primary_key=True)
# 必填字段
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))
# 可选字段
full_name: Mapped[Optional[str]] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
# 带服务器默认值的时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
# 关系映射
posts: Mapped[list["Post"]] = relationship(back_populates="author")
def __repr__(self) -> str:
return f"User(id={self.id}, email={self.email})"undefinedRelationships
关系映射
One-to-Many:
python
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# Relationship with back_populates
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[list["Tag"]] = relationship(
secondary="post_tags",
back_populates="posts"
)Many-to-Many:
python
from sqlalchemy import Table, Column, Integer, ForeignKey一对多关系:
python
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# 反向关联关系
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[list["Tag"]] = relationship(
secondary="post_tags",
back_populates="posts"
)多对多关系:
python
from sqlalchemy import Table, Column, Integer, ForeignKeyAssociation table
关联表
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True)
)
class Tag(Base):
tablename = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list["Post"]] = relationship(
secondary=post_tags,
back_populates="tags"
)undefinedpost_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True)
)
class Tag(Base):
tablename = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list["Post"]] = relationship(
secondary=post_tags,
back_populates="tags"
)undefinedDatabase Setup
数据库配置
Engine and Session Configuration
引擎与会话配置
python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePoolpython
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePoolDatabase URL formats
数据库URL格式
SQLite: sqlite:///./database.db
SQLite: sqlite:///./database.db
PostgreSQL: postgresql://user:pass@localhost/dbname
PostgreSQL: postgresql://user:pass@localhost/dbname
MySQL: mysql+pymysql://user:pass@localhost/dbname
MySQL: mysql+pymysql://user:pass@localhost/dbname
DATABASE_URL = "postgresql://user:pass@localhost/mydb"
DATABASE_URL = "postgresql://user:pass@localhost/mydb"
Create engine with connection pooling
创建带连接池的引擎
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Check connection before using
echo=False # Set True for SQL logging
)
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # 使用前检查连接有效性
echo=False # 设置为True可开启SQL日志
)
Session factory
会话工厂
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
Create tables
创建数据表
Base.metadata.create_all(bind=engine)
undefinedBase.metadata.create_all(bind=engine)
undefinedDependency Injection (FastAPI Pattern)
依赖注入(FastAPI模式)
python
from typing import Generator
def get_db() -> Generator[Session, None, None]:
"""Database session dependency for FastAPI."""
db = SessionLocal()
try:
yield db
finally:
db.close()python
from typing import Generator
def get_db() -> Generator[Session, None, None]:
"""FastAPI的数据库会话依赖。"""
db = SessionLocal()
try:
yield db
finally:
db.close()Usage in FastAPI
在FastAPI中使用
from fastapi import Depends
@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
return db.execute(
select(User).where(User.id == user_id)
).scalar_one_or_none()
undefinedfrom fastapi import Depends
@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
return db.execute(
select(User).where(User.id == user_id)
).scalar_one_or_none()
undefinedQuery Patterns (SQLAlchemy 2.0)
查询模式(SQLAlchemy 2.0)
Select Queries
查询语句
python
from sqlalchemy import select, and_, or_, desc, funcpython
from sqlalchemy import select, and_, or_, desc, funcBasic select
基础查询
stmt = select(User).where(User.email == "user@example.com")
user = session.execute(stmt).scalar_one_or_none()
stmt = select(User).where(User.email == "user@example.com")
user = session.execute(stmt).scalar_one_or_none()
Multiple conditions
多条件查询
stmt = select(User).where(
and_(
User.is_active == True,
User.created_at > datetime(2024, 1, 1)
)
)
users = session.execute(stmt).scalars().all()
stmt = select(User).where(
and_(
User.is_active == True,
User.created_at > datetime(2024, 1, 1)
)
)
users = session.execute(stmt).scalars().all()
OR conditions
或条件查询
stmt = select(User).where(
or_(
User.email.like("%@gmail.com"),
User.email.like("%@yahoo.com")
)
)
stmt = select(User).where(
or_(
User.email.like("%@gmail.com"),
User.email.like("%@yahoo.com")
)
)
Ordering and limiting
排序与分页
stmt = (
select(User)
.where(User.is_active == True)
.order_by(desc(User.created_at))
.limit(10)
.offset(20)
)
stmt = (
select(User)
.where(User.is_active == True)
.order_by(desc(User.created_at))
.limit(10)
.offset(20)
)
Counting
统计数量
stmt = select(func.count()).select_from(User)
count = session.execute(stmt).scalar()
undefinedstmt = select(func.count()).select_from(User)
count = session.execute(stmt).scalar()
undefinedJoins
连接查询
python
undefinedpython
undefinedInner join
内连接
stmt = (
select(Post, User)
.join(User, Post.user_id == User.id)
.where(User.is_active == True)
)
results = session.execute(stmt).all()
stmt = (
select(Post, User)
.join(User, Post.user_id == User.id)
.where(User.is_active == True)
)
results = session.execute(stmt).all()
Left outer join
左外连接
stmt = (
select(User, func.count(Post.id).label("post_count"))
.outerjoin(Post)
.group_by(User.id)
)
stmt = (
select(User, func.count(Post.id).label("post_count"))
.outerjoin(Post)
.group_by(User.id)
)
Multiple joins
多表连接
stmt = (
select(Post)
.join(Post.author)
.join(Post.tags)
.where(Tag.name == "python")
)
undefinedstmt = (
select(Post)
.join(Post.author)
.join(Post.tags)
.where(Tag.name == "python")
)
undefinedEager Loading (Solve N+1 Problem)
预加载(解决N+1查询问题)
python
from sqlalchemy.orm import selectinload, joinedloadpython
from sqlalchemy.orm import selectinload, joinedloadselectinload - separate query (better for collections)
selectinload - 单独查询(适用于集合类型关联)
stmt = select(User).options(selectinload(User.posts))
users = session.execute(stmt).scalars().all()
stmt = select(User).options(selectinload(User.posts))
users = session.execute(stmt).scalars().all()
Now users[0].posts won't trigger additional queries
此时访问users[0].posts不会触发额外查询
joinedload - single query with join (better for one-to-one)
joinedload - 单查询带连接(适用于一对一关联)
stmt = select(Post).options(joinedload(Post.author))
posts = session.execute(stmt).unique().scalars().all()
stmt = select(Post).options(joinedload(Post.author))
posts = session.execute(stmt).unique().scalars().all()
Nested eager loading
嵌套预加载
stmt = select(User).options(
selectinload(User.posts).selectinload(Post.tags)
)
stmt = select(User).options(
selectinload(User.posts).selectinload(Post.tags)
)
Load only specific columns
仅加载指定列
from sqlalchemy.orm import load_only
stmt = select(User).options(load_only(User.id, User.email))
undefinedfrom sqlalchemy.orm import load_only
stmt = select(User).options(load_only(User.id, User.email))
undefinedCRUD Operations
CRUD操作
Create
创建
python
def create_user(db: Session, email: str, username: str, password: str):
"""Create new user."""
user = User(
email=email,
username=username,
hashed_password=hash_password(password)
)
db.add(user)
db.commit()
db.refresh(user) # Get updated fields (id, timestamps)
return userpython
def create_user(db: Session, email: str, username: str, password: str):
"""创建新用户。"""
user = User(
email=email,
username=username,
hashed_password=hash_password(password)
)
db.add(user)
db.commit()
db.refresh(user) # 获取更新后的字段(如id、时间戳)
return userBulk insert
批量插入
users = [
User(email=f"user{i}@example.com", username=f"user{i}")
for i in range(100)
]
db.add_all(users)
db.commit()
undefinedusers = [
User(email=f"user{i}@example.com", username=f"user{i}")
for i in range(100)
]
db.add_all(users)
db.commit()
undefinedRead
读取
python
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""Get user by email."""
stmt = select(User).where(User.email == email)
return db.execute(stmt).scalar_one_or_none()
def get_users(
db: Session,
skip: int = 0,
limit: int = 100
) -> list[User]:
"""Get paginated users."""
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return db.execute(stmt).scalars().all()python
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""通过邮箱查询用户。"""
stmt = select(User).where(User.email == email)
return db.execute(stmt).scalar_one_or_none()
def get_users(
db: Session,
skip: int = 0,
limit: int = 100
) -> list[User]:
"""分页查询用户。"""
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return db.execute(stmt).scalars().all()Update
更新
python
def update_user(db: Session, user_id: int, **kwargs):
"""Update user fields."""
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
return None
for key, value in kwargs.items():
setattr(user, key, value)
db.commit()
db.refresh(user)
return userpython
def update_user(db: Session, user_id: int, **kwargs):
"""更新用户字段。"""
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
return None
for key, value in kwargs.items():
setattr(user, key, value)
db.commit()
db.refresh(user)
return userBulk update
批量更新
from sqlalchemy import update
stmt = (
update(User)
.where(User.is_active == False)
.values(deleted_at=datetime.utcnow())
)
db.execute(stmt)
db.commit()
undefinedfrom sqlalchemy import update
stmt = (
update(User)
.where(User.is_active == False)
.values(deleted_at=datetime.utcnow())
)
db.execute(stmt)
db.commit()
undefinedDelete
删除
python
def delete_user(db: Session, user_id: int) -> bool:
"""Delete user."""
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
return False
db.delete(user)
db.commit()
return Truepython
def delete_user(db: Session, user_id: int) -> bool:
"""删除用户。"""
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
return False
db.delete(user)
db.commit()
return TrueBulk delete
批量删除
from sqlalchemy import delete
stmt = delete(User).where(User.is_active == False)
db.execute(stmt)
db.commit()
undefinedfrom sqlalchemy import delete
stmt = delete(User).where(User.is_active == False)
db.execute(stmt)
db.commit()
undefinedTransactions and Session Management
事务与会话管理
Context Manager Pattern
上下文管理器模式
python
from contextlib import contextmanager
@contextmanager
def get_db_session():
"""Session context manager."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()python
from contextlib import contextmanager
@contextmanager
def get_db_session():
"""会话上下文管理器。"""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()Usage
使用示例
with get_db_session() as db:
user = create_user(db, "test@example.com", "testuser", "password")
# Auto-commits on success, rollback on exception
undefinedwith get_db_session() as db:
user = create_user(db, "test@example.com", "testuser", "password")
# 成功时自动提交,异常时自动回滚
undefinedManual Transaction Control
手动事务控制
python
def transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float):
"""Transfer money between users with transaction."""
try:
# Begin nested transaction
with db.begin_nested():
# Deduct from sender
stmt = select(User).where(User.id == from_user_id).with_for_update()
sender = db.execute(stmt).scalar_one()
sender.balance -= amount
# Add to receiver
stmt = select(User).where(User.id == to_user_id).with_for_update()
receiver = db.execute(stmt).scalar_one()
receiver.balance += amount
db.commit()
except Exception as e:
db.rollback()
raisepython
def transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float):
"""通过事务实现用户间转账。"""
try:
# 开启嵌套事务
with db.begin_nested():
# 从转出方扣除金额
stmt = select(User).where(User.id == from_user_id).with_for_update()
sender = db.execute(stmt).scalar_one()
sender.balance -= amount
# 给转入方增加金额
stmt = select(User).where(User.id == to_user_id).with_for_update()
receiver = db.execute(stmt).scalar_one()
receiver.balance += amount
db.commit()
except Exception as e:
db.rollback()
raiseAsync SQLAlchemy
异步SQLAlchemy
Async Setup
异步配置
python
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)python
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)Async engine (note: asyncpg for PostgreSQL, aiosqlite for SQLite)
异步引擎(注意:PostgreSQL使用asyncpg,SQLite使用aiosqlite)
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
async_engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=5,
max_overflow=10
)
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
async_engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=5,
max_overflow=10
)
Async session factory
异步会话工厂
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
Create tables
创建数据表
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
undefinedasync def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
undefinedAsync CRUD Operations
异步CRUD操作
python
async def get_user_async(user_id: int) -> Optional[User]:
"""Get user asynchronously."""
async with AsyncSessionLocal() as session:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def create_user_async(email: str, username: str) -> User:
"""Create user asynchronously."""
async with AsyncSessionLocal() as session:
user = User(email=email, username=username)
session.add(user)
await session.commit()
await session.refresh(user)
return userpython
async def get_user_async(user_id: int) -> Optional[User]:
"""异步查询用户。"""
async with AsyncSessionLocal() as session:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def create_user_async(email: str, username: str) -> User:
"""异步创建用户。"""
async with AsyncSessionLocal() as session:
user = User(email=email, username=username)
session.add(user)
await session.commit()
await session.refresh(user)
return userFastAPI async dependency
FastAPI异步依赖
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user_endpoint(
user_id: int,
db: AsyncSession = Depends(get_async_db)
):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
undefinedasync def get_async_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user_endpoint(
user_id: int,
db: AsyncSession = Depends(get_async_db)
):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
undefinedAlembic Migrations
Alembic数据库迁移
Setup Alembic
初始化Alembic
bash
undefinedbash
undefinedInitialize Alembic
初始化Alembic
alembic init alembic
alembic init alembic
Edit alembic.ini - set database URL
编辑alembic.ini - 设置数据库URL
sqlalchemy.url = postgresql://user:pass@localhost/mydb
sqlalchemy.url = postgresql://user:pass@localhost/mydb
Or use env variable in alembic/env.py
或在alembic/env.py中使用环境变量
undefinedundefinedConfigure Alembic
配置Alembic
python
undefinedpython
undefinedalembic/env.py
alembic/env.py
from sqlalchemy import engine_from_config, pool
from alembic import context
from myapp.models import Base # Import your Base
from sqlalchemy import engine_from_config, pool
from alembic import context
from myapp.models import Base # 导入你的基类
Add your model's MetaData
添加模型的元数据
target_metadata = Base.metadata
def run_migrations_online():
"""Run migrations in 'online' mode."""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
connectable = engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()undefinedtarget_metadata = Base.metadata
def run_migrations_online():
"""在线模式下运行迁移。"""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
connectable = engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()undefinedCreate and Apply Migrations
创建与应用迁移
bash
undefinedbash
undefinedAuto-generate migration from model changes
根据模型变更自动生成迁移脚本
alembic revision --autogenerate -m "Add users table"
alembic revision --autogenerate -m "Add users table"
Review generated migration in alembic/versions/
查看alembic/versions/目录下生成的迁移脚本
Apply migration
应用迁移
alembic upgrade head
alembic upgrade head
Rollback one version
回滚一个版本
alembic downgrade -1
alembic downgrade -1
Show current version
查看当前版本
alembic current
alembic current
Show migration history
查看迁移历史
alembic history
undefinedalembic history
undefinedManual Migration Example
手动迁移示例
python
undefinedpython
undefinedalembic/versions/xxx_add_users.py
alembic/versions/xxx_add_users.py
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('username', sa.String(50), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_users_email', 'users', ['email'], unique=True)
def downgrade():
op.drop_index('ix_users_email', table_name='users')
op.drop_table('users')
undefinedfrom alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('username', sa.String(50), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_users_email', 'users', ['email'], unique=True)
def downgrade():
op.drop_index('ix_users_email', table_name='users')
op.drop_table('users')
undefinedFastAPI Integration
FastAPI集成
Complete FastAPI Example
完整FastAPI示例
python
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from typing import List
app = FastAPI()python
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from typing import List
app = FastAPI()Pydantic schemas
Pydantic模型
class UserBase(BaseModel):
email: EmailStr
username: str
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True # SQLAlchemy 2.0 (was orm_mode)class UserBase(BaseModel):
email: EmailStr
username: str
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True # SQLAlchemy 2.0版本(原orm_mode)CRUD operations
CRUD操作
@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)):
# Check if user exists
stmt = select(User).where(User.email == user.email)
if db.execute(stmt).scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create user
db_user = User(
email=user.email,
username=user.username,
hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user@app.get("/users/{user_id}", response_model=UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user@app.get("/users/", response_model=List[UserResponse])
def list_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
stmt = (
select(User)
.where(User.is_active == True)
.offset(skip)
.limit(limit)
)
return db.execute(stmt).scalars().all()
@app.put("/users/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
user_update: UserBase,
db: Session = Depends(get_db)
):
stmt = select(User).where(User.id == user_id)
db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
db_user.email = user_update.email
db_user.username = user_update.username
db.commit()
db.refresh(db_user)
return db_user@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: Session = Depends(get_db)):
stmt = select(User).where(User.id == user_id)
db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
db.delete(db_user)
db.commit()undefined@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)):
# 检查用户是否已存在
stmt = select(User).where(User.email == user.email)
if db.execute(stmt).scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="邮箱已注册"
)
# 创建用户
db_user = User(
email=user.email,
username=user.username,
hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user@app.get("/users/{user_id}", response_model=UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
return user@app.get("/users/", response_model=List[UserResponse])
def list_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
stmt = (
select(User)
.where(User.is_active == True)
.offset(skip)
.limit(limit)
)
return db.execute(stmt).scalars().all()
@app.put("/users/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
user_update: UserBase,
db: Session = Depends(get_db)
):
stmt = select(User).where(User.id == user_id)
db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
db_user.email = user_update.email
db_user.username = user_update.username
db.commit()
db.refresh(db_user)
return db_user@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: Session = Depends(get_db)):
stmt = select(User).where(User.id == user_id)
db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
db.delete(db_user)
db.commit()undefinedTesting with Pytest
Pytest测试
Test Database Setup
测试数据库配置
python
import pytest
from sqlalchemy import create_engine, StaticPool
from sqlalchemy.orm import sessionmakerpython
import pytest
from sqlalchemy import create_engine, StaticPool
from sqlalchemy.orm import sessionmakerIn-memory SQLite for testing
用于测试的内存SQLite数据库
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
@pytest.fixture(scope="function")
def db_session():
"""Create test database session."""
engine = create_engine(
SQLALCHEMY_TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
# Create tables
Base.metadata.create_all(bind=engine)
TestingSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
Base.metadata.drop_all(bind=engine)@pytest.fixture(scope="function")
def test_user(db_session):
"""Create test user."""
user = User(
email="test@example.com",
username="testuser",
hashed_password="hashed"
)
db_session.add(user)
db_session.commit()
db_session.refresh(user)
return user
undefinedSQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
@pytest.fixture(scope="function")
def db_session():
"""创建测试数据库会话。"""
engine = create_engine(
SQLALCHEMY_TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
# 创建数据表
Base.metadata.create_all(bind=engine)
TestingSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
Base.metadata.drop_all(bind=engine)@pytest.fixture(scope="function")
def test_user(db_session):
"""创建测试用户。"""
user = User(
email="test@example.com",
username="testuser",
hashed_password="hashed"
)
db_session.add(user)
db_session.commit()
db_session.refresh(user)
return user
undefinedTest Examples
测试示例
python
def test_create_user(db_session):
"""Test user creation."""
user = User(email="new@example.com", username="newuser")
db_session.add(user)
db_session.commit()
assert user.id is not None
assert user.email == "new@example.com"
assert user.created_at is not None
def test_query_user(db_session, test_user):
"""Test user query."""
stmt = select(User).where(User.email == "test@example.com")
found_user = db_session.execute(stmt).scalar_one()
assert found_user.id == test_user.id
assert found_user.username == test_user.username
def test_update_user(db_session, test_user):
"""Test user update."""
test_user.username = "updated"
db_session.commit()
stmt = select(User).where(User.id == test_user.id)
updated_user = db_session.execute(stmt).scalar_one()
assert updated_user.username == "updated"
def test_delete_user(db_session, test_user):
"""Test user deletion."""
user_id = test_user.id
db_session.delete(test_user)
db_session.commit()
stmt = select(User).where(User.id == user_id)
assert db_session.execute(stmt).scalar_one_or_none() is Nonepython
def test_create_user(db_session):
"""测试用户创建。"""
user = User(email="new@example.com", username="newuser")
db_session.add(user)
db_session.commit()
assert user.id is not None
assert user.email == "new@example.com"
assert user.created_at is not None
def test_query_user(db_session, test_user):
"""测试用户查询。"""
stmt = select(User).where(User.email == "test@example.com")
found_user = db_session.execute(stmt).scalar_one()
assert found_user.id == test_user.id
assert found_user.username == test_user.username
def test_update_user(db_session, test_user):
"""测试用户更新。"""
test_user.username = "updated"
db_session.commit()
stmt = select(User).where(User.id == test_user.id)
updated_user = db_session.execute(stmt).scalar_one()
assert updated_user.username == "updated"
def test_delete_user(db_session, test_user):
"""测试用户删除。"""
user_id = test_user.id
db_session.delete(test_user)
db_session.commit()
stmt = select(User).where(User.id == user_id)
assert db_session.execute(stmt).scalar_one_or_none() is NonePerformance Optimization
性能优化
Query Optimization
查询优化
python
undefinedpython
undefinedUse indexes
使用索引
class User(Base):
tablename = "users"
email: Mapped[str] = mapped_column(String(255), index=True, unique=True)
created_at: Mapped[datetime] = mapped_column(index=True)
# Composite index
__table_args__ = (
Index('ix_user_email_active', 'email', 'is_active'),
)class User(Base):
tablename = "users"
email: Mapped[str] = mapped_column(String(255), index=True, unique=True)
created_at: Mapped[datetime] = mapped_column(index=True)
# 复合索引
__table_args__ = (
Index('ix_user_email_active', 'email', 'is_active'),
)Use select_from for complex queries
复杂查询使用select_from
stmt = (
select(func.count(Post.id))
.select_from(User)
.join(Post)
.where(User.is_active == True)
)
stmt = (
select(func.count(Post.id))
.select_from(User)
.join(Post)
.where(User.is_active == True)
)
Use contains_eager for joined loads
使用contains_eager优化关联加载
from sqlalchemy.orm import contains_eager
stmt = (
select(Post)
.join(Post.author)
.options(contains_eager(Post.author))
.where(User.is_active == True)
)
undefinedfrom sqlalchemy.orm import contains_eager
stmt = (
select(Post)
.join(Post.author)
.options(contains_eager(Post.author))
.where(User.is_active == True)
)
undefinedConnection Pooling
连接池配置
python
undefinedpython
undefinedConfigure pool
配置连接池
engine = create_engine(
DATABASE_URL,
pool_size=20, # Number of connections to keep
max_overflow=10, # Additional connections when pool full
pool_timeout=30, # Seconds to wait for connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True # Verify connections before use
)
engine = create_engine(
DATABASE_URL,
pool_size=20, # 保持的连接数
max_overflow=10, # 连接池满时可额外创建的连接数
pool_timeout=30, # 获取连接的超时时间(秒)
pool_recycle=3600, # 连接回收周期(1小时)
pool_pre_ping=True # 使用前验证连接
)
Monitor pool
监控连接池
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
print("New connection established")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
print("Connection checked out from pool")
undefinedfrom sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
print("新连接已建立")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
print("连接已从连接池取出")
undefinedBatch Operations
批量操作
python
undefinedpython
undefinedBulk insert with executemany
使用executemany批量插入
from sqlalchemy import insert
data = [
{"email": f"user{i}@example.com", "username": f"user{i}"}
for i in range(1000)
]
stmt = insert(User)
db.execute(stmt, data)
db.commit()
from sqlalchemy import insert
data = [
{"email": f"user{i}@example.com", "username": f"user{i}"}
for i in range(1000)
]
stmt = insert(User)
db.execute(stmt, data)
db.commit()
Bulk update
批量更新
from sqlalchemy import update
stmt = (
update(User)
.where(User.is_active == False)
.values(deleted_at=func.now())
)
db.execute(stmt)
undefinedfrom sqlalchemy import update
stmt = (
update(User)
.where(User.is_active == False)
.values(deleted_at=func.now())
)
db.execute(stmt)
undefinedBest Practices
最佳实践
- Use SQLAlchemy 2.0 Syntax: Modern API with better type hints
- Type Annotations: Use and
Mapped[T]mapped_column() - Eager Loading: Solve N+1 queries with /
selectinloadjoinedload - Session Management: Use dependency injection pattern
- Migrations: Always use Alembic for schema changes
- Indexes: Add indexes for frequently queried columns
- Connection Pooling: Configure appropriate pool settings
- Testing: Use in-memory SQLite for fast tests
- Async: Use for async frameworks
AsyncSession - Error Handling: Always handle and
NoResultFoundMultipleResultsFound
- 使用SQLAlchemy 2.0语法:具备更优类型提示的现代API
- 类型注解:使用和
Mapped[T]mapped_column() - 预加载:使用/
selectinload解决N+1查询问题joinedload - 会话管理:采用依赖注入模式
- 迁移管理:始终使用Alembic处理数据库 schema 变更
- 索引优化:为频繁查询的字段添加索引
- 连接池配置:根据业务需求配置合适的连接池参数
- 测试:使用内存SQLite进行快速测试
- 异步支持:为异步框架使用
AsyncSession - 错误处理:妥善处理和
NoResultFound异常MultipleResultsFound
Common Patterns
常见模式
Repository Pattern
仓库模式
python
from typing import Generic, TypeVar, Type
from sqlalchemy.orm import Session
T = TypeVar('T', bound=Base)
class BaseRepository(Generic[T]):
def __init__(self, model: Type[T], db: Session):
self.model = model
self.db = db
def get(self, id: int) -> Optional[T]:
stmt = select(self.model).where(self.model.id == id)
return self.db.execute(stmt).scalar_one_or_none()
def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
stmt = select(self.model).offset(skip).limit(limit)
return self.db.execute(stmt).scalars().all()
def create(self, obj: T) -> T:
self.db.add(obj)
self.db.commit()
self.db.refresh(obj)
return obj
def delete(self, id: int) -> bool:
obj = self.get(id)
if obj:
self.db.delete(obj)
self.db.commit()
return True
return Falsepython
from typing import Generic, TypeVar, Type
from sqlalchemy.orm import Session
T = TypeVar('T', bound=Base)
class BaseRepository(Generic[T]):
def __init__(self, model: Type[T], db: Session):
self.model = model
self.db = db
def get(self, id: int) -> Optional[T]:
stmt = select(self.model).where(self.model.id == id)
return self.db.execute(stmt).scalar_one_or_none()
def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
stmt = select(self.model).offset(skip).limit(limit)
return self.db.execute(stmt).scalars().all()
def create(self, obj: T) -> T:
self.db.add(obj)
self.db.commit()
self.db.refresh(obj)
return obj
def delete(self, id: int) -> bool:
obj = self.get(id)
if obj:
self.db.delete(obj)
self.db.commit()
return True
return FalseUsage
使用示例
user_repo = BaseRepository(User, db)
user = user_repo.get(1)
undefineduser_repo = BaseRepository(User, db)
user = user_repo.get(1)
undefinedSoft Delete Pattern
软删除模式
python
class SoftDeleteMixin:
deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None)
@property
def is_deleted(self) -> bool:
return self.deleted_at is not None
class User(Base, SoftDeleteMixin):
__tablename__ = "users"
# ... fieldspython
class SoftDeleteMixin:
deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None)
@property
def is_deleted(self) -> bool:
return self.deleted_at is not None
class User(Base, SoftDeleteMixin):
__tablename__ = "users"
# ... 字段定义Query only active records
查询仅未删除的记录
stmt = select(User).where(User.deleted_at.is_(None))
stmt = select(User).where(User.deleted_at.is_(None))
Soft delete
软删除
user.deleted_at = datetime.utcnow()
db.commit()
undefineduser.deleted_at = datetime.utcnow()
db.commit()
undefinedAudit Trail Pattern
审计追踪模式
python
class AuditMixin:
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
class Post(Base, AuditMixin):
__tablename__ = "posts"
# ... fieldspython
class AuditMixin:
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
class Post(Base, AuditMixin):
__tablename__ = "posts"
# ... 字段定义Resources
参考资源
Related Skills
相关技能
When using Sqlalchemy, these skills enhance your workflow:
- django: Django ORM patterns and migration strategies for comparison
- test-driven-development: TDD patterns for database models and queries
- fastapi-local-dev: FastAPI + SQLAlchemy integration patterns
- systematic-debugging: Advanced debugging for ORM query issues and N+1 problems
[Full documentation available in these skills if deployed in your bundle]
使用Sqlalchemy时,以下技能可提升你的开发效率:
- django:Django ORM模式与迁移策略(用于对比参考)
- test-driven-development:数据库模型与查询的TDD模式
- fastapi-local-dev:FastAPI + SQLAlchemy集成模式
- systematic-debugging:ORM查询问题与N+1问题的高级调试
[若你的技能包中包含这些技能,可查看完整文档]