sqlalchemy-orm

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

SQLAlchemy ORM Skill

SQLAlchemy ORM 技能文档


progressive_disclosure: entry_point: summary: "Python SQL toolkit and ORM with powerful query builder and relationship mapping" when_to_use: - "When building Python applications with databases" - "When needing complex SQL queries with type safety" - "When working with FastAPI/Flask/Django" - "When needing database migrations (Alembic)" quick_start: - "pip install sqlalchemy" - "Define models with declarative base" - "Create engine and session" - "Query with select() and commit()" token_estimate: entry: 70-85 full: 4500-5500


progressive_disclosure: entry_point: summary: "Python SQL工具包与ORM框架,具备强大的查询构建器和关系映射功能" when_to_use: - "在构建使用数据库的Python应用程序时" - "需要具备类型安全性的复杂SQL查询时" - "使用FastAPI/Flask/Django开发时" - "需要通过Alembic实现数据库迁移时" quick_start: - "pip install sqlalchemy" - "使用声明式基类定义模型" - "创建引擎和会话" - "使用select()和commit()执行查询" token_estimate: entry: 70-85 full: 4500-5500

Core Concepts

核心概念

SQLAlchemy 2.0 Modern API

SQLAlchemy 2.0 现代API

SQLAlchemy 2.0 introduced modern patterns with better type hints, improved query syntax, and async support.
Key Changes from 1.x:
  • select()
    instead of
    Query
  • Mapped[T]
    and
    mapped_column()
    for type hints
  • Explicit
    Session.execute()
    for queries
  • Better async support with
    AsyncSession
SQLAlchemy 2.0引入了现代编程模式,提供了更完善的类型提示、改进的查询语法以及异步支持。
与1.x版本的主要变化:
  • 使用
    select()
    替代
    Query
  • 采用
    Mapped[T]
    mapped_column()
    实现类型提示
  • 显式使用
    Session.execute()
    执行查询
  • 通过
    AsyncSession
    提供更优的异步支持

Installation

安装

bash
undefined
bash
undefined

Core SQLAlchemy

核心SQLAlchemy

pip install sqlalchemy
pip install sqlalchemy

With async support

带异步支持

pip install sqlalchemy[asyncio] aiosqlite # SQLite pip install sqlalchemy[asyncio] asyncpg # PostgreSQL
pip install sqlalchemy[asyncio] aiosqlite # SQLite pip install sqlalchemy[asyncio] asyncpg # PostgreSQL

With Alembic for migrations

搭配Alembic实现迁移

pip install alembic
pip install alembic

FastAPI integration

FastAPI集成

pip install fastapi sqlalchemy
undefined
pip install fastapi sqlalchemy
undefined

Declarative Models (SQLAlchemy 2.0)

声明式模型(SQLAlchemy 2.0)

Basic Model Definition

基础模型定义

python
from datetime import datetime
from typing import Optional
from sqlalchemy import String, DateTime, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
python
from datetime import datetime
from typing import Optional
from sqlalchemy import String, DateTime, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

Base class for all models

所有模型的基类

class Base(DeclarativeBase): pass
class Base(DeclarativeBase): pass

User model with type hints

带类型提示的用户模型

class User(Base): tablename = "users"
# Primary key
id: Mapped[int] = mapped_column(primary_key=True)

# Required fields
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))

# Optional fields
full_name: Mapped[Optional[str]] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)

# Timestamps with server defaults
created_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    server_default=func.now(),
    onupdate=func.now()
)

# Relationships
posts: Mapped[list["Post"]] = relationship(back_populates="author")

def __repr__(self) -> str:
    return f"User(id={self.id}, email={self.email})"
undefined
class User(Base): tablename = "users"
# 主键
id: Mapped[int] = mapped_column(primary_key=True)

# 必填字段
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))

# 可选字段
full_name: Mapped[Optional[str]] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)

# 带服务器默认值的时间戳
created_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    server_default=func.now(),
    onupdate=func.now()
)

# 关系映射
posts: Mapped[list["Post"]] = relationship(back_populates="author")

def __repr__(self) -> str:
    return f"User(id={self.id}, email={self.email})"
undefined

Relationships

关系映射

One-to-Many:
python
class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str]
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    # Relationship with back_populates
    author: Mapped["User"] = relationship(back_populates="posts")
    tags: Mapped[list["Tag"]] = relationship(
        secondary="post_tags",
        back_populates="posts"
    )
Many-to-Many:
python
from sqlalchemy import Table, Column, Integer, ForeignKey
一对多关系:
python
class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str]
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    # 反向关联关系
    author: Mapped["User"] = relationship(back_populates="posts")
    tags: Mapped[list["Tag"]] = relationship(
        secondary="post_tags",
        back_populates="posts"
    )
多对多关系:
python
from sqlalchemy import Table, Column, Integer, ForeignKey

Association table

关联表

post_tags = Table( "post_tags", Base.metadata, Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True), Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True) )
class Tag(Base): tablename = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)

posts: Mapped[list["Post"]] = relationship(
    secondary=post_tags,
    back_populates="tags"
)
undefined
post_tags = Table( "post_tags", Base.metadata, Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True), Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True) )
class Tag(Base): tablename = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)

posts: Mapped[list["Post"]] = relationship(
    secondary=post_tags,
    back_populates="tags"
)
undefined

Database Setup

数据库配置

Engine and Session Configuration

引擎与会话配置

python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool

Database URL formats

数据库URL格式

SQLite: sqlite:///./database.db

SQLite: sqlite:///./database.db

PostgreSQL: postgresql://user:pass@localhost/dbname

PostgreSQL: postgresql://user:pass@localhost/dbname

MySQL: mysql+pymysql://user:pass@localhost/dbname

MySQL: mysql+pymysql://user:pass@localhost/dbname

DATABASE_URL = "postgresql://user:pass@localhost/mydb"
DATABASE_URL = "postgresql://user:pass@localhost/mydb"

Create engine with connection pooling

创建带连接池的引擎

engine = create_engine( DATABASE_URL, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_pre_ping=True, # Check connection before using echo=False # Set True for SQL logging )
engine = create_engine( DATABASE_URL, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_pre_ping=True, # 使用前检查连接有效性 echo=False # 设置为True可开启SQL日志 )

Session factory

会话工厂

SessionLocal = sessionmaker( bind=engine, autocommit=False, autoflush=False, expire_on_commit=False )
SessionLocal = sessionmaker( bind=engine, autocommit=False, autoflush=False, expire_on_commit=False )

Create tables

创建数据表

Base.metadata.create_all(bind=engine)
undefined
Base.metadata.create_all(bind=engine)
undefined

Dependency Injection (FastAPI Pattern)

依赖注入(FastAPI模式)

python
from typing import Generator

def get_db() -> Generator[Session, None, None]:
    """Database session dependency for FastAPI."""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
python
from typing import Generator

def get_db() -> Generator[Session, None, None]:
    """FastAPI的数据库会话依赖。"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Usage in FastAPI

在FastAPI中使用

from fastapi import Depends
@app.get("/users/{user_id}") def get_user(user_id: int, db: Session = Depends(get_db)): return db.execute( select(User).where(User.id == user_id) ).scalar_one_or_none()
undefined
from fastapi import Depends
@app.get("/users/{user_id}") def get_user(user_id: int, db: Session = Depends(get_db)): return db.execute( select(User).where(User.id == user_id) ).scalar_one_or_none()
undefined

Query Patterns (SQLAlchemy 2.0)

查询模式(SQLAlchemy 2.0)

Select Queries

查询语句

python
from sqlalchemy import select, and_, or_, desc, func
python
from sqlalchemy import select, and_, or_, desc, func

Basic select

基础查询

stmt = select(User).where(User.email == "user@example.com") user = session.execute(stmt).scalar_one_or_none()
stmt = select(User).where(User.email == "user@example.com") user = session.execute(stmt).scalar_one_or_none()

Multiple conditions

多条件查询

stmt = select(User).where( and_( User.is_active == True, User.created_at > datetime(2024, 1, 1) ) ) users = session.execute(stmt).scalars().all()
stmt = select(User).where( and_( User.is_active == True, User.created_at > datetime(2024, 1, 1) ) ) users = session.execute(stmt).scalars().all()

OR conditions

或条件查询

stmt = select(User).where( or_( User.email.like("%@gmail.com"), User.email.like("%@yahoo.com") ) )
stmt = select(User).where( or_( User.email.like("%@gmail.com"), User.email.like("%@yahoo.com") ) )

Ordering and limiting

排序与分页

stmt = ( select(User) .where(User.is_active == True) .order_by(desc(User.created_at)) .limit(10) .offset(20) )
stmt = ( select(User) .where(User.is_active == True) .order_by(desc(User.created_at)) .limit(10) .offset(20) )

Counting

统计数量

stmt = select(func.count()).select_from(User) count = session.execute(stmt).scalar()
undefined
stmt = select(func.count()).select_from(User) count = session.execute(stmt).scalar()
undefined

Joins

连接查询

python
undefined
python
undefined

Inner join

内连接

stmt = ( select(Post, User) .join(User, Post.user_id == User.id) .where(User.is_active == True) ) results = session.execute(stmt).all()
stmt = ( select(Post, User) .join(User, Post.user_id == User.id) .where(User.is_active == True) ) results = session.execute(stmt).all()

Left outer join

左外连接

stmt = ( select(User, func.count(Post.id).label("post_count")) .outerjoin(Post) .group_by(User.id) )
stmt = ( select(User, func.count(Post.id).label("post_count")) .outerjoin(Post) .group_by(User.id) )

Multiple joins

多表连接

stmt = ( select(Post) .join(Post.author) .join(Post.tags) .where(Tag.name == "python") )
undefined
stmt = ( select(Post) .join(Post.author) .join(Post.tags) .where(Tag.name == "python") )
undefined

Eager Loading (Solve N+1 Problem)

预加载(解决N+1查询问题)

python
from sqlalchemy.orm import selectinload, joinedload
python
from sqlalchemy.orm import selectinload, joinedload

selectinload - separate query (better for collections)

selectinload - 单独查询(适用于集合类型关联)

stmt = select(User).options(selectinload(User.posts)) users = session.execute(stmt).scalars().all()
stmt = select(User).options(selectinload(User.posts)) users = session.execute(stmt).scalars().all()

Now users[0].posts won't trigger additional queries

此时访问users[0].posts不会触发额外查询

joinedload - single query with join (better for one-to-one)

joinedload - 单查询带连接(适用于一对一关联)

stmt = select(Post).options(joinedload(Post.author)) posts = session.execute(stmt).unique().scalars().all()
stmt = select(Post).options(joinedload(Post.author)) posts = session.execute(stmt).unique().scalars().all()

Nested eager loading

嵌套预加载

stmt = select(User).options( selectinload(User.posts).selectinload(Post.tags) )
stmt = select(User).options( selectinload(User.posts).selectinload(Post.tags) )

Load only specific columns

仅加载指定列

from sqlalchemy.orm import load_only stmt = select(User).options(load_only(User.id, User.email))
undefined
from sqlalchemy.orm import load_only stmt = select(User).options(load_only(User.id, User.email))
undefined

CRUD Operations

CRUD操作

Create

创建

python
def create_user(db: Session, email: str, username: str, password: str):
    """Create new user."""
    user = User(
        email=email,
        username=username,
        hashed_password=hash_password(password)
    )
    db.add(user)
    db.commit()
    db.refresh(user)  # Get updated fields (id, timestamps)
    return user
python
def create_user(db: Session, email: str, username: str, password: str):
    """创建新用户。"""
    user = User(
        email=email,
        username=username,
        hashed_password=hash_password(password)
    )
    db.add(user)
    db.commit()
    db.refresh(user)  # 获取更新后的字段(如id、时间戳)
    return user

Bulk insert

批量插入

users = [ User(email=f"user{i}@example.com", username=f"user{i}") for i in range(100) ] db.add_all(users) db.commit()
undefined
users = [ User(email=f"user{i}@example.com", username=f"user{i}") for i in range(100) ] db.add_all(users) db.commit()
undefined

Read

读取

python
def get_user_by_email(db: Session, email: str) -> Optional[User]:
    """Get user by email."""
    stmt = select(User).where(User.email == email)
    return db.execute(stmt).scalar_one_or_none()

def get_users(
    db: Session,
    skip: int = 0,
    limit: int = 100
) -> list[User]:
    """Get paginated users."""
    stmt = (
        select(User)
        .where(User.is_active == True)
        .order_by(User.created_at.desc())
        .offset(skip)
        .limit(limit)
    )
    return db.execute(stmt).scalars().all()
python
def get_user_by_email(db: Session, email: str) -> Optional[User]:
    """通过邮箱查询用户。"""
    stmt = select(User).where(User.email == email)
    return db.execute(stmt).scalar_one_or_none()

def get_users(
    db: Session,
    skip: int = 0,
    limit: int = 100
) -> list[User]:
    """分页查询用户。"""
    stmt = (
        select(User)
        .where(User.is_active == True)
        .order_by(User.created_at.desc())
        .offset(skip)
        .limit(limit)
    )
    return db.execute(stmt).scalars().all()

Update

更新

python
def update_user(db: Session, user_id: int, **kwargs):
    """Update user fields."""
    stmt = select(User).where(User.id == user_id)
    user = db.execute(stmt).scalar_one_or_none()

    if not user:
        return None

    for key, value in kwargs.items():
        setattr(user, key, value)

    db.commit()
    db.refresh(user)
    return user
python
def update_user(db: Session, user_id: int, **kwargs):
    """更新用户字段。"""
    stmt = select(User).where(User.id == user_id)
    user = db.execute(stmt).scalar_one_or_none()

    if not user:
        return None

    for key, value in kwargs.items():
        setattr(user, key, value)

    db.commit()
    db.refresh(user)
    return user

Bulk update

批量更新

from sqlalchemy import update
stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=datetime.utcnow()) ) db.execute(stmt) db.commit()
undefined
from sqlalchemy import update
stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=datetime.utcnow()) ) db.execute(stmt) db.commit()
undefined

Delete

删除

python
def delete_user(db: Session, user_id: int) -> bool:
    """Delete user."""
    stmt = select(User).where(User.id == user_id)
    user = db.execute(stmt).scalar_one_or_none()

    if not user:
        return False

    db.delete(user)
    db.commit()
    return True
python
def delete_user(db: Session, user_id: int) -> bool:
    """删除用户。"""
    stmt = select(User).where(User.id == user_id)
    user = db.execute(stmt).scalar_one_or_none()

    if not user:
        return False

    db.delete(user)
    db.commit()
    return True

Bulk delete

批量删除

from sqlalchemy import delete
stmt = delete(User).where(User.is_active == False) db.execute(stmt) db.commit()
undefined
from sqlalchemy import delete
stmt = delete(User).where(User.is_active == False) db.execute(stmt) db.commit()
undefined

Transactions and Session Management

事务与会话管理

Context Manager Pattern

上下文管理器模式

python
from contextlib import contextmanager

@contextmanager
def get_db_session():
    """Session context manager."""
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()
python
from contextlib import contextmanager

@contextmanager
def get_db_session():
    """会话上下文管理器。"""
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

Usage

使用示例

with get_db_session() as db: user = create_user(db, "test@example.com", "testuser", "password") # Auto-commits on success, rollback on exception
undefined
with get_db_session() as db: user = create_user(db, "test@example.com", "testuser", "password") # 成功时自动提交,异常时自动回滚
undefined

Manual Transaction Control

手动事务控制

python
def transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float):
    """Transfer money between users with transaction."""
    try:
        # Begin nested transaction
        with db.begin_nested():
            # Deduct from sender
            stmt = select(User).where(User.id == from_user_id).with_for_update()
            sender = db.execute(stmt).scalar_one()
            sender.balance -= amount

            # Add to receiver
            stmt = select(User).where(User.id == to_user_id).with_for_update()
            receiver = db.execute(stmt).scalar_one()
            receiver.balance += amount

        db.commit()
    except Exception as e:
        db.rollback()
        raise
python
def transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float):
    """通过事务实现用户间转账。"""
    try:
        # 开启嵌套事务
        with db.begin_nested():
            # 从转出方扣除金额
            stmt = select(User).where(User.id == from_user_id).with_for_update()
            sender = db.execute(stmt).scalar_one()
            sender.balance -= amount

            # 给转入方增加金额
            stmt = select(User).where(User.id == to_user_id).with_for_update()
            receiver = db.execute(stmt).scalar_one()
            receiver.balance += amount

        db.commit()
    except Exception as e:
        db.rollback()
        raise

Async SQLAlchemy

异步SQLAlchemy

Async Setup

异步配置

python
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker
)
python
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker
)

Async engine (note: asyncpg for PostgreSQL, aiosqlite for SQLite)

异步引擎(注意:PostgreSQL使用asyncpg,SQLite使用aiosqlite)

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
async_engine = create_async_engine( DATABASE_URL, echo=False, pool_size=5, max_overflow=10 )
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
async_engine = create_async_engine( DATABASE_URL, echo=False, pool_size=5, max_overflow=10 )

Async session factory

异步会话工厂

AsyncSessionLocal = async_sessionmaker( async_engine, class_=AsyncSession, expire_on_commit=False )
AsyncSessionLocal = async_sessionmaker( async_engine, class_=AsyncSession, expire_on_commit=False )

Create tables

创建数据表

async def init_db(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)
undefined
async def init_db(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)
undefined

Async CRUD Operations

异步CRUD操作

python
async def get_user_async(user_id: int) -> Optional[User]:
    """Get user asynchronously."""
    async with AsyncSessionLocal() as session:
        stmt = select(User).where(User.id == user_id)
        result = await session.execute(stmt)
        return result.scalar_one_or_none()

async def create_user_async(email: str, username: str) -> User:
    """Create user asynchronously."""
    async with AsyncSessionLocal() as session:
        user = User(email=email, username=username)
        session.add(user)
        await session.commit()
        await session.refresh(user)
        return user
python
async def get_user_async(user_id: int) -> Optional[User]:
    """异步查询用户。"""
    async with AsyncSessionLocal() as session:
        stmt = select(User).where(User.id == user_id)
        result = await session.execute(stmt)
        return result.scalar_one_or_none()

async def create_user_async(email: str, username: str) -> User:
    """异步创建用户。"""
    async with AsyncSessionLocal() as session:
        user = User(email=email, username=username)
        session.add(user)
        await session.commit()
        await session.refresh(user)
        return user

FastAPI async dependency

FastAPI异步依赖

async def get_async_db(): async with AsyncSessionLocal() as session: yield session
@app.get("/users/{user_id}") async def get_user_endpoint( user_id: int, db: AsyncSession = Depends(get_async_db) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) return result.scalar_one_or_none()
undefined
async def get_async_db(): async with AsyncSessionLocal() as session: yield session
@app.get("/users/{user_id}") async def get_user_endpoint( user_id: int, db: AsyncSession = Depends(get_async_db) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) return result.scalar_one_or_none()
undefined

Alembic Migrations

Alembic数据库迁移

Setup Alembic

初始化Alembic

bash
undefined
bash
undefined

Initialize Alembic

初始化Alembic

alembic init alembic
alembic init alembic

Edit alembic.ini - set database URL

编辑alembic.ini - 设置数据库URL

sqlalchemy.url = postgresql://user:pass@localhost/mydb

sqlalchemy.url = postgresql://user:pass@localhost/mydb

Or use env variable in alembic/env.py

或在alembic/env.py中使用环境变量

undefined
undefined

Configure Alembic

配置Alembic

python
undefined
python
undefined

alembic/env.py

alembic/env.py

from sqlalchemy import engine_from_config, pool from alembic import context from myapp.models import Base # Import your Base
from sqlalchemy import engine_from_config, pool from alembic import context from myapp.models import Base # 导入你的基类

Add your model's MetaData

添加模型的元数据

target_metadata = Base.metadata
def run_migrations_online(): """Run migrations in 'online' mode.""" configuration = config.get_section(config.config_ini_section) configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
connectable = engine_from_config(
    configuration,
    prefix="sqlalchemy.",
    poolclass=pool.NullPool,
)

with connectable.connect() as connection:
    context.configure(
        connection=connection,
        target_metadata=target_metadata
    )

    with context.begin_transaction():
        context.run_migrations()
undefined
target_metadata = Base.metadata
def run_migrations_online(): """在线模式下运行迁移。""" configuration = config.get_section(config.config_ini_section) configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
connectable = engine_from_config(
    configuration,
    prefix="sqlalchemy.",
    poolclass=pool.NullPool,
)

with connectable.connect() as connection:
    context.configure(
        connection=connection,
        target_metadata=target_metadata
    )

    with context.begin_transaction():
        context.run_migrations()
undefined

Create and Apply Migrations

创建与应用迁移

bash
undefined
bash
undefined

Auto-generate migration from model changes

根据模型变更自动生成迁移脚本

alembic revision --autogenerate -m "Add users table"
alembic revision --autogenerate -m "Add users table"

Review generated migration in alembic/versions/

查看alembic/versions/目录下生成的迁移脚本

Apply migration

应用迁移

alembic upgrade head
alembic upgrade head

Rollback one version

回滚一个版本

alembic downgrade -1
alembic downgrade -1

Show current version

查看当前版本

alembic current
alembic current

Show migration history

查看迁移历史

alembic history
undefined
alembic history
undefined

Manual Migration Example

手动迁移示例

python
undefined
python
undefined

alembic/versions/xxx_add_users.py

alembic/versions/xxx_add_users.py

from alembic import op import sqlalchemy as sa
def upgrade(): op.create_table( 'users', sa.Column('id', sa.Integer(), nullable=False), sa.Column('email', sa.String(255), nullable=False), sa.Column('username', sa.String(50), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index('ix_users_email', 'users', ['email'], unique=True)
def downgrade(): op.drop_index('ix_users_email', table_name='users') op.drop_table('users')
undefined
from alembic import op import sqlalchemy as sa
def upgrade(): op.create_table( 'users', sa.Column('id', sa.Integer(), nullable=False), sa.Column('email', sa.String(255), nullable=False), sa.Column('username', sa.String(50), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index('ix_users_email', 'users', ['email'], unique=True)
def downgrade(): op.drop_index('ix_users_email', table_name='users') op.drop_table('users')
undefined

FastAPI Integration

FastAPI集成

Complete FastAPI Example

完整FastAPI示例

python
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from typing import List

app = FastAPI()
python
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from typing import List

app = FastAPI()

Pydantic schemas

Pydantic模型

class UserBase(BaseModel): email: EmailStr username: str
class UserCreate(UserBase): password: str
class UserResponse(UserBase): id: int is_active: bool created_at: datetime
class Config:
    from_attributes = True  # SQLAlchemy 2.0 (was orm_mode)
class UserBase(BaseModel): email: EmailStr username: str
class UserCreate(UserBase): password: str
class UserResponse(UserBase): id: int is_active: bool created_at: datetime
class Config:
    from_attributes = True  # SQLAlchemy 2.0版本(原orm_mode)

CRUD operations

CRUD操作

@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)): # Check if user exists stmt = select(User).where(User.email == user.email) if db.execute(stmt).scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" )
# Create user
db_user = User(
    email=user.email,
    username=user.username,
    hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}", response_model=UserResponse) def read_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none()
if not user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="User not found"
    )
return user
@app.get("/users/", response_model=List[UserResponse]) def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db) ): stmt = ( select(User) .where(User.is_active == True) .offset(skip) .limit(limit) ) return db.execute(stmt).scalars().all()
@app.put("/users/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_update: UserBase, db: Session = Depends(get_db) ): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="User not found"
    )

db_user.email = user_update.email
db_user.username = user_update.username
db.commit()
db.refresh(db_user)
return db_user
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="User not found"
    )

db.delete(db_user)
db.commit()
undefined
@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)): # 检查用户是否已存在 stmt = select(User).where(User.email == user.email) if db.execute(stmt).scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="邮箱已注册" )
# 创建用户
db_user = User(
    email=user.email,
    username=user.username,
    hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}", response_model=UserResponse) def read_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none()
if not user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="用户不存在"
    )
return user
@app.get("/users/", response_model=List[UserResponse]) def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db) ): stmt = ( select(User) .where(User.is_active == True) .offset(skip) .limit(limit) ) return db.execute(stmt).scalars().all()
@app.put("/users/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_update: UserBase, db: Session = Depends(get_db) ): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="用户不存在"
    )

db_user.email = user_update.email
db_user.username = user_update.username
db.commit()
db.refresh(db_user)
return db_user
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="用户不存在"
    )

db.delete(db_user)
db.commit()
undefined

Testing with Pytest

Pytest测试

Test Database Setup

测试数据库配置

python
import pytest
from sqlalchemy import create_engine, StaticPool
from sqlalchemy.orm import sessionmaker
python
import pytest
from sqlalchemy import create_engine, StaticPool
from sqlalchemy.orm import sessionmaker

In-memory SQLite for testing

用于测试的内存SQLite数据库

SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
@pytest.fixture(scope="function") def db_session(): """Create test database session.""" engine = create_engine( SQLALCHEMY_TEST_DATABASE_URL, connect_args={"check_same_thread": False}, poolclass=StaticPool, )
# Create tables
Base.metadata.create_all(bind=engine)

TestingSessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine
)

session = TestingSessionLocal()
try:
    yield session
finally:
    session.close()
    Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function") def test_user(db_session): """Create test user.""" user = User( email="test@example.com", username="testuser", hashed_password="hashed" ) db_session.add(user) db_session.commit() db_session.refresh(user) return user
undefined
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
@pytest.fixture(scope="function") def db_session(): """创建测试数据库会话。""" engine = create_engine( SQLALCHEMY_TEST_DATABASE_URL, connect_args={"check_same_thread": False}, poolclass=StaticPool, )
# 创建数据表
Base.metadata.create_all(bind=engine)

TestingSessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine
)

session = TestingSessionLocal()
try:
    yield session
finally:
    session.close()
    Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function") def test_user(db_session): """创建测试用户。""" user = User( email="test@example.com", username="testuser", hashed_password="hashed" ) db_session.add(user) db_session.commit() db_session.refresh(user) return user
undefined

Test Examples

测试示例

python
def test_create_user(db_session):
    """Test user creation."""
    user = User(email="new@example.com", username="newuser")
    db_session.add(user)
    db_session.commit()

    assert user.id is not None
    assert user.email == "new@example.com"
    assert user.created_at is not None

def test_query_user(db_session, test_user):
    """Test user query."""
    stmt = select(User).where(User.email == "test@example.com")
    found_user = db_session.execute(stmt).scalar_one()

    assert found_user.id == test_user.id
    assert found_user.username == test_user.username

def test_update_user(db_session, test_user):
    """Test user update."""
    test_user.username = "updated"
    db_session.commit()

    stmt = select(User).where(User.id == test_user.id)
    updated_user = db_session.execute(stmt).scalar_one()
    assert updated_user.username == "updated"

def test_delete_user(db_session, test_user):
    """Test user deletion."""
    user_id = test_user.id
    db_session.delete(test_user)
    db_session.commit()

    stmt = select(User).where(User.id == user_id)
    assert db_session.execute(stmt).scalar_one_or_none() is None
python
def test_create_user(db_session):
    """测试用户创建。"""
    user = User(email="new@example.com", username="newuser")
    db_session.add(user)
    db_session.commit()

    assert user.id is not None
    assert user.email == "new@example.com"
    assert user.created_at is not None

def test_query_user(db_session, test_user):
    """测试用户查询。"""
    stmt = select(User).where(User.email == "test@example.com")
    found_user = db_session.execute(stmt).scalar_one()

    assert found_user.id == test_user.id
    assert found_user.username == test_user.username

def test_update_user(db_session, test_user):
    """测试用户更新。"""
    test_user.username = "updated"
    db_session.commit()

    stmt = select(User).where(User.id == test_user.id)
    updated_user = db_session.execute(stmt).scalar_one()
    assert updated_user.username == "updated"

def test_delete_user(db_session, test_user):
    """测试用户删除。"""
    user_id = test_user.id
    db_session.delete(test_user)
    db_session.commit()

    stmt = select(User).where(User.id == user_id)
    assert db_session.execute(stmt).scalar_one_or_none() is None

Performance Optimization

性能优化

Query Optimization

查询优化

python
undefined
python
undefined

Use indexes

使用索引

class User(Base): tablename = "users"
email: Mapped[str] = mapped_column(String(255), index=True, unique=True)
created_at: Mapped[datetime] = mapped_column(index=True)

# Composite index
__table_args__ = (
    Index('ix_user_email_active', 'email', 'is_active'),
)
class User(Base): tablename = "users"
email: Mapped[str] = mapped_column(String(255), index=True, unique=True)
created_at: Mapped[datetime] = mapped_column(index=True)

# 复合索引
__table_args__ = (
    Index('ix_user_email_active', 'email', 'is_active'),
)

Use select_from for complex queries

复杂查询使用select_from

stmt = ( select(func.count(Post.id)) .select_from(User) .join(Post) .where(User.is_active == True) )
stmt = ( select(func.count(Post.id)) .select_from(User) .join(Post) .where(User.is_active == True) )

Use contains_eager for joined loads

使用contains_eager优化关联加载

from sqlalchemy.orm import contains_eager
stmt = ( select(Post) .join(Post.author) .options(contains_eager(Post.author)) .where(User.is_active == True) )
undefined
from sqlalchemy.orm import contains_eager
stmt = ( select(Post) .join(Post.author) .options(contains_eager(Post.author)) .where(User.is_active == True) )
undefined

Connection Pooling

连接池配置

python
undefined
python
undefined

Configure pool

配置连接池

engine = create_engine( DATABASE_URL, pool_size=20, # Number of connections to keep max_overflow=10, # Additional connections when pool full pool_timeout=30, # Seconds to wait for connection pool_recycle=3600, # Recycle connections after 1 hour pool_pre_ping=True # Verify connections before use )
engine = create_engine( DATABASE_URL, pool_size=20, # 保持的连接数 max_overflow=10, # 连接池满时可额外创建的连接数 pool_timeout=30, # 获取连接的超时时间(秒) pool_recycle=3600, # 连接回收周期(1小时) pool_pre_ping=True # 使用前验证连接 )

Monitor pool

监控连接池

from sqlalchemy import event
@event.listens_for(engine, "connect") def receive_connect(dbapi_conn, connection_record): print("New connection established")
@event.listens_for(engine, "checkout") def receive_checkout(dbapi_conn, connection_record, connection_proxy): print("Connection checked out from pool")
undefined
from sqlalchemy import event
@event.listens_for(engine, "connect") def receive_connect(dbapi_conn, connection_record): print("新连接已建立")
@event.listens_for(engine, "checkout") def receive_checkout(dbapi_conn, connection_record, connection_proxy): print("连接已从连接池取出")
undefined

Batch Operations

批量操作

python
undefined
python
undefined

Bulk insert with executemany

使用executemany批量插入

from sqlalchemy import insert
data = [ {"email": f"user{i}@example.com", "username": f"user{i}"} for i in range(1000) ]
stmt = insert(User) db.execute(stmt, data) db.commit()
from sqlalchemy import insert
data = [ {"email": f"user{i}@example.com", "username": f"user{i}"} for i in range(1000) ]
stmt = insert(User) db.execute(stmt, data) db.commit()

Bulk update

批量更新

from sqlalchemy import update
stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=func.now()) ) db.execute(stmt)
undefined
from sqlalchemy import update
stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=func.now()) ) db.execute(stmt)
undefined

Best Practices

最佳实践

  1. Use SQLAlchemy 2.0 Syntax: Modern API with better type hints
  2. Type Annotations: Use
    Mapped[T]
    and
    mapped_column()
  3. Eager Loading: Solve N+1 queries with
    selectinload
    /
    joinedload
  4. Session Management: Use dependency injection pattern
  5. Migrations: Always use Alembic for schema changes
  6. Indexes: Add indexes for frequently queried columns
  7. Connection Pooling: Configure appropriate pool settings
  8. Testing: Use in-memory SQLite for fast tests
  9. Async: Use
    AsyncSession
    for async frameworks
  10. Error Handling: Always handle
    NoResultFound
    and
    MultipleResultsFound
  1. 使用SQLAlchemy 2.0语法:具备更优类型提示的现代API
  2. 类型注解:使用
    Mapped[T]
    mapped_column()
  3. 预加载:使用
    selectinload
    /
    joinedload
    解决N+1查询问题
  4. 会话管理:采用依赖注入模式
  5. 迁移管理:始终使用Alembic处理数据库 schema 变更
  6. 索引优化:为频繁查询的字段添加索引
  7. 连接池配置:根据业务需求配置合适的连接池参数
  8. 测试:使用内存SQLite进行快速测试
  9. 异步支持:为异步框架使用
    AsyncSession
  10. 错误处理:妥善处理
    NoResultFound
    MultipleResultsFound
    异常

Common Patterns

常见模式

Repository Pattern

仓库模式

python
from typing import Generic, TypeVar, Type
from sqlalchemy.orm import Session

T = TypeVar('T', bound=Base)

class BaseRepository(Generic[T]):
    def __init__(self, model: Type[T], db: Session):
        self.model = model
        self.db = db

    def get(self, id: int) -> Optional[T]:
        stmt = select(self.model).where(self.model.id == id)
        return self.db.execute(stmt).scalar_one_or_none()

    def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
        stmt = select(self.model).offset(skip).limit(limit)
        return self.db.execute(stmt).scalars().all()

    def create(self, obj: T) -> T:
        self.db.add(obj)
        self.db.commit()
        self.db.refresh(obj)
        return obj

    def delete(self, id: int) -> bool:
        obj = self.get(id)
        if obj:
            self.db.delete(obj)
            self.db.commit()
            return True
        return False
python
from typing import Generic, TypeVar, Type
from sqlalchemy.orm import Session

T = TypeVar('T', bound=Base)

class BaseRepository(Generic[T]):
    def __init__(self, model: Type[T], db: Session):
        self.model = model
        self.db = db

    def get(self, id: int) -> Optional[T]:
        stmt = select(self.model).where(self.model.id == id)
        return self.db.execute(stmt).scalar_one_or_none()

    def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
        stmt = select(self.model).offset(skip).limit(limit)
        return self.db.execute(stmt).scalars().all()

    def create(self, obj: T) -> T:
        self.db.add(obj)
        self.db.commit()
        self.db.refresh(obj)
        return obj

    def delete(self, id: int) -> bool:
        obj = self.get(id)
        if obj:
            self.db.delete(obj)
            self.db.commit()
            return True
        return False

Usage

使用示例

user_repo = BaseRepository(User, db) user = user_repo.get(1)
undefined
user_repo = BaseRepository(User, db) user = user_repo.get(1)
undefined

Soft Delete Pattern

软删除模式

python
class SoftDeleteMixin:
    deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None)

    @property
    def is_deleted(self) -> bool:
        return self.deleted_at is not None

class User(Base, SoftDeleteMixin):
    __tablename__ = "users"
    # ... fields
python
class SoftDeleteMixin:
    deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None)

    @property
    def is_deleted(self) -> bool:
        return self.deleted_at is not None

class User(Base, SoftDeleteMixin):
    __tablename__ = "users"
    # ... 字段定义

Query only active records

查询仅未删除的记录

stmt = select(User).where(User.deleted_at.is_(None))
stmt = select(User).where(User.deleted_at.is_(None))

Soft delete

软删除

user.deleted_at = datetime.utcnow() db.commit()
undefined
user.deleted_at = datetime.utcnow() db.commit()
undefined

Audit Trail Pattern

审计追踪模式

python
class AuditMixin:
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )
    created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
    updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))

class Post(Base, AuditMixin):
    __tablename__ = "posts"
    # ... fields
python
class AuditMixin:
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )
    created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
    updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))

class Post(Base, AuditMixin):
    __tablename__ = "posts"
    # ... 字段定义

Resources

参考资源

Related Skills

相关技能

When using Sqlalchemy, these skills enhance your workflow:
  • django: Django ORM patterns and migration strategies for comparison
  • test-driven-development: TDD patterns for database models and queries
  • fastapi-local-dev: FastAPI + SQLAlchemy integration patterns
  • systematic-debugging: Advanced debugging for ORM query issues and N+1 problems
[Full documentation available in these skills if deployed in your bundle]
使用Sqlalchemy时,以下技能可提升你的开发效率:
  • django:Django ORM模式与迁移策略(用于对比参考)
  • test-driven-development:数据库模型与查询的TDD模式
  • fastapi-local-dev:FastAPI + SQLAlchemy集成模式
  • systematic-debugging:ORM查询问题与N+1问题的高级调试
[若你的技能包中包含这些技能,可查看完整文档]