Loading...
Loading...
Compare original and translation side by side
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)undefinedundefinedfrom typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency that provides async database session."""
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raisefrom typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency that provides async database session."""
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseundefinedundefinedfrom sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime, timezone
import uuid
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
# Relationship with explicit lazy loading strategy
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # Prevent accidental lazy loads - MUST use selectinload
)
class Order(Base):
__tablename__ = "orders"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
total: Mapped[int]
user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime, timezone
import uuid
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
# Relationship with explicit lazy loading strategy
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # Prevent accidental lazy loads - MUST use selectinload
)
class Order(Base):
__tablename__ = "orders"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
total: Mapped[int]
user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select
async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
"""Load user with orders in single query - NO N+1."""
result = await db.execute(
select(User)
.options(selectinload(User.orders)) # Eager load orders
.where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
"""Load multiple users with orders efficiently."""
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.limit(limit)
)
return list(result.scalars().all())from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select
async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
"""Load user with orders in single query - NO N+1."""
result = await db.execute(
select(User)
.options(selectinload(User.orders)) # Eager load orders
.where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
"""Load multiple users with orders efficiently."""
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.limit(limit)
)
return list(result.scalars().all())async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
"""Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT."""
# SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES
users = [User(**data) for data in users_data]
db.add_all(users)
await db.flush() # Get IDs without committing
return len(users)
async def bulk_insert_chunked(
db: AsyncSession,
items: list[dict],
chunk_size: int = 1000,
) -> int:
"""Insert large datasets in chunks to manage memory."""
total = 0
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
db.add_all([Item(**data) for data in chunk])
await db.flush()
total += len(chunk)
return totalasync def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
"""Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT."""
# SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES
users = [User(**data) for data in users_data]
db.add_all(users)
await db.flush() # Get IDs without committing
return len(users)
async def bulk_insert_chunked(
db: AsyncSession,
items: list[dict],
chunk_size: int = 1000,
) -> int:
"""Insert large datasets in chunks to manage memory."""
total = 0
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
db.add_all([Item(**data) for data in chunk])
await db.flush()
total += len(chunk)
return totalfrom typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T", bound=Base)
class AsyncRepository(Generic[T]):
"""Generic async repository for CRUD operations."""
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_many(self, ids: list[UUID]) -> list[T]:
result = await self.session.execute(
select(self.model).where(self.model.id.in_(ids))
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.flush()from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T", bound=Base)
class AsyncRepository(Generic[T]):
"""Generic async repository for CRUD operations."""
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_many(self, ids: list[UUID]) -> list[T]:
result = await self.session.execute(
select(self.model).where(self.model.id.in_(ids))
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.flush()import asyncio
async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
"""Run multiple queries concurrently - same session is NOT thread-safe."""
# WRONG: Don't share AsyncSession across tasks
# async with asyncio.TaskGroup() as tg:
# tg.create_task(db.execute(...)) # NOT SAFE
# CORRECT: Sequential queries with same session
user = await db.get(User, user_id)
orders_result = await db.execute(
select(Order).where(Order.user_id == user_id).limit(10)
)
stats_result = await db.execute(
select(func.count(Order.id)).where(Order.user_id == user_id)
)
return {
"user": user,
"recent_orders": list(orders_result.scalars().all()),
"total_orders": stats_result.scalar(),
}
async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
"""Concurrent queries - each task gets its own session."""
async def fetch_user(user_id: UUID) -> dict:
async with async_session_factory() as session:
user = await session.get(User, user_id)
return {"id": user_id, "email": user.email if user else None}
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
return [t.result() for t in tasks]import asyncio
async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
"""Run multiple queries concurrently - same session is NOT thread-safe."""
# WRONG: Don't share AsyncSession across tasks
# async with asyncio.TaskGroup() as tg:
# tg.create_task(db.execute(...)) # NOT SAFE
# CORRECT: Sequential queries with same session
user = await db.get(User, user_id)
orders_result = await db.execute(
select(Order).where(Order.user_id == user_id).limit(10)
)
stats_result = await db.execute(
select(func.count(Order.id)).where(Order.user_id == user_id)
)
return {
"user": user,
"recent_orders": list(orders_result.scalars().all()),
"total_orders": stats_result.scalar(),
}
async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
"""Concurrent queries - each task gets its own session."""
async def fetch_user(user_id: UUID) -> dict:
async with async_session_factory() as session:
user = await session.get(User, user_id)
return {"id": user_id, "email": user.email if user else None}
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
return [t.result() for t in tasks]| Decision | Recommendation | Rationale |
|---|---|---|
| Session scope | One AsyncSession per task/request | SQLAlchemy docs: "AsyncSession per task" |
| Scoped sessions | Avoid for async | Maintainers discourage for async code |
| Lazy loading | Use | Prevents accidental N+1 in async |
| Eager loading | | Better than joinedload for async |
| expire_on_commit | Set to | Prevents lazy load errors after commit |
| Connection pool | | Validates connections before use |
| Bulk inserts | Chunk 1000-10000 rows | Memory management for large inserts |
| 决策 | 推荐方案 | 理由 |
|---|---|---|
| 会话作用域 | 每个任务/请求对应一个AsyncSession | SQLAlchemy文档:“每个任务对应一个AsyncSession” |
| 作用域会话 | 异步场景下请避免使用 | 维护者不建议在异步代码中使用 |
| 延迟加载 | 使用 | 防止异步环境中意外出现N+1查询 |
| 预加载 | 集合类型使用 | 在异步场景下表现优于joinedload |
| commit后过期设置 | 设置为 | 避免commit后出现延迟加载错误 |
| 连接池 | 设置 | 在使用前验证连接有效性 |
| 批量插入 | 按1000-10000行分块 | 对大数量插入进行内存管理 |
undefinedundefinedundefinedundefinedasyncio-advancedalembic-migrationsfastapi-advanceddatabase-schema-designerasyncio-advancedalembic-migrationsfastapi-advanceddatabase-schema-designer