python-backend

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Backend

Python后端

Patterns for building production Python backends with asyncio, FastAPI, SQLAlchemy 2.0, and connection pooling. Each category has individual rule files in
rules/
loaded on-demand.
用于构建基于asyncio、FastAPI、SQLAlchemy 2.0和连接池的生产级Python后端的模式。每个分类在
rules/
目录下都有独立的规则文件,可按需加载。

Quick Reference

快速参考

CategoryRulesImpactWhen to Use
Asyncio3HIGHTaskGroup, structured concurrency, cancellation handling
FastAPI3HIGHDependencies, middleware, background tasks
SQLAlchemy3HIGHAsync sessions, relationships, migrations
Pooling3MEDIUMDatabase pools, HTTP sessions, tuning
Total: 12 rules across 4 categories
分类规则数量影响级别使用场景
Asyncio3TaskGroup、结构化并发、取消处理
FastAPI3依赖项、中间件、后台任务
SQLAlchemy3异步会话、关系映射、迁移
Pooling3数据库池、HTTP会话、调优
总计:4个分类共12条规则

Quick Start

快速开始

python
undefined
python
undefined

FastAPI + SQLAlchemy async session

FastAPI + SQLAlchemy 异步会话

async def get_db() -> AsyncGenerator[AsyncSession, None]: async with async_session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
@router.get("/users/{user_id}") async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none()

```python
async def get_db() -> AsyncGenerator[AsyncSession, None]: async with async_session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
@router.get("/users/{user_id}") async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none()

```python

Asyncio TaskGroup with timeout

带超时的Asyncio TaskGroup

async def fetch_all(urls: list[str]) -> list[dict]: async with asyncio.timeout(30): async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch_url(url)) for url in urls] return [t.result() for t in tasks]
undefined
async def fetch_all(urls: list[str]) -> list[dict]: async with asyncio.timeout(30): async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch_url(url)) for url in urls] return [t.result() for t in tasks]
undefined

Asyncio

Asyncio

Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.
基于Python 3.11+特性的现代asyncio模式,使用结构化并发、TaskGroup等。

Key Patterns

核心模式

  • TaskGroup replaces
    gather()
    with structured concurrency and auto-cancellation
  • asyncio.timeout()
    context manager for composable timeouts
  • Semaphore for concurrency limiting (rate-limit HTTP requests)
  • except*
    with ExceptionGroup for handling multiple task failures
  • asyncio.to_thread()
    for bridging sync code to async
  • TaskGroup 替代
    gather()
    ,支持结构化并发和自动取消
  • asyncio.timeout()
    上下文管理器,用于可组合的超时控制
  • Semaphore 用于并发限制(如HTTP请求限流)
  • except*
    结合ExceptionGroup处理多任务失败
  • asyncio.to_thread()
    用于同步代码到异步代码的桥接

Key Decisions

核心决策

DecisionRecommendation
Task spawningTaskGroup not gather()
Timeoutsasyncio.timeout() context manager
Concurrency limitasyncio.Semaphore
Sync bridgeasyncio.to_thread()
CancellationAlways re-raise CancelledError
决策项推荐方案
任务生成使用TaskGroup而非gather()
超时控制使用asyncio.timeout()上下文管理器
并发限制使用asyncio.Semaphore
同步桥接使用asyncio.to_thread()
取消处理始终重新抛出CancelledError

FastAPI

FastAPI

Production-ready FastAPI patterns for lifespan, dependencies, middleware, and settings.
用于生产环境的FastAPI模式,涵盖生命周期、依赖项、中间件和配置。

Key Patterns

核心模式

  • Lifespan with
    asynccontextmanager
    for startup/shutdown resource management
  • Dependency injection with class-based services and
    Depends()
  • Middleware stack: CORS -> RequestID -> Timing -> Logging
  • Pydantic Settings with
    .env
    and field validation
  • Exception handlers with RFC 7807 Problem Details
  • Lifespan 结合
    asynccontextmanager
    用于启动/关闭资源管理
  • 依赖注入 基于类的服务和
    Depends()
  • 中间件栈:CORS -> RequestID -> 计时 -> 日志
  • Pydantic Settings 结合
    .env
    文件和字段验证
  • 异常处理器 遵循RFC 7807问题详情规范

Key Decisions

核心决策

DecisionRecommendation
Lifespanasynccontextmanager (not events)
DependenciesClass-based services with DI
SettingsPydantic Settings with .env
ResponseORJSONResponse for performance
HealthCheck all critical dependencies
决策项推荐方案
生命周期使用asynccontextmanager(而非事件)
依赖项基于类的服务结合DI
配置使用Pydantic Settings和.env文件
响应使用ORJSONResponse提升性能
健康检查检查所有关键依赖项

SQLAlchemy

SQLAlchemy

Async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.
基于SQLAlchemy 2.0、AsyncSession并与FastAPI集成的异步数据库模式。

Key Patterns

核心模式

  • One AsyncSession per request with
    expire_on_commit=False
  • lazy="raise"
    on relationships to prevent accidental N+1 queries
  • selectinload
    for eager loading collections
  • Repository pattern with generic async CRUD
  • Bulk inserts chunked 1000-10000 rows for memory management
  • 每个请求一个AsyncSession,设置
    expire_on_commit=False
  • 关系映射使用**
    lazy="raise"
    **,防止意外的N+1查询
  • selectinload
    用于预加载集合
  • 仓库模式 结合通用异步CRUD
  • 批量插入 按1000-10000行分块,优化内存管理

Key Decisions

核心决策

DecisionRecommendation
Session scopeOne AsyncSession per request
Lazy loadinglazy="raise" + explicit loads
Eager loadingselectinload for collections
expire_on_commitFalse (prevents lazy load errors)
Poolpool_pre_ping=True
决策项推荐方案
会话作用域每个请求一个AsyncSession
延迟加载lazy="raise" + 显式加载
预加载集合使用selectinload
expire_on_commit设置为False(避免延迟加载错误)
连接池设置pool_pre_ping=True

Pooling

连接池

Database and HTTP connection pooling for high-performance async Python applications.
用于高性能异步Python应用的数据库和HTTP连接池。

Key Patterns

核心模式

  • SQLAlchemy pool with
    pool_size
    ,
    max_overflow
    ,
    pool_pre_ping
  • Direct asyncpg pool with
    min_size
    /
    max_size
    and connection lifecycle
  • aiohttp session with
    TCPConnector
    limits and DNS caching
  • FastAPI lifespan creating and closing pools at startup/shutdown
  • Pool monitoring with Prometheus metrics
  • SQLAlchemy连接池 配置
    pool_size
    max_overflow
    pool_pre_ping
  • 直接使用asyncpg连接池 配置
    min_size
    /
    max_size
    和连接生命周期
  • aiohttp会话 结合
    TCPConnector
    限制和DNS缓存
  • FastAPI Lifespan 在启动时创建连接池,关闭时销毁
  • 连接池监控 使用Prometheus指标

Pool Sizing Formula

连接池大小计算公式

pool_size = (concurrent_requests / avg_queries_per_request) * 1.5
pool_size = (并发请求数 / 平均每个请求的查询数) * 1.5

Anti-Patterns (FORBIDDEN)

反模式(禁止使用)

python
undefined
python
undefined

NEVER use gather() for new code - no structured concurrency

新代码中绝对不要使用gather() - 无结构化并发

NEVER swallow CancelledError - breaks TaskGroup and timeout

绝对不要吞掉CancelledError - 会破坏TaskGroup和超时机制

NEVER block the event loop with sync calls (time.sleep, requests.get)

绝对不要用同步调用阻塞事件循环(如time.sleep、requests.get)

NEVER use global mutable state for db sessions

绝对不要用全局可变状态存储数据库会话

NEVER skip dependency injection (create sessions in routes)

绝对不要跳过依赖注入(在路由中直接创建会话)

NEVER share AsyncSession across tasks (race condition)

绝对不要在多任务间共享AsyncSession(会出现竞态条件)

NEVER use sync Session in async code (blocks event loop)

绝对不要在异步代码中使用同步Session(阻塞事件循环)

NEVER create engine/pool per request

绝对不要每个请求创建引擎/连接池

NEVER forget to close pools on shutdown

绝对不要在关闭时忘记销毁连接池

undefined
undefined

Related Skills

相关技能

  • architecture-patterns
    - Clean architecture and layer separation
  • async-jobs
    - Celery/ARQ for background processing
  • streaming-api-patterns
    - SSE/WebSocket async patterns
  • database-patterns
    - Database schema design
  • architecture-patterns
    - 整洁架构和分层设计
  • async-jobs
    - 用于后台处理的Celery/ARQ
  • streaming-api-patterns
    - SSE/WebSocket异步模式
  • database-patterns
    - 数据库 schema 设计