cqrs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CQRS Implementation

CQRS 实现

Command Query Responsibility Segregation for scalable architectures.
面向可扩展架构的命令查询职责分离。

Architecture

架构

              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Commands   │          │   Queries   │
       │    API      │          │    API      │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │   Write     │─────────►│    Read     │
       │   Model     │  Events  │   Model     │
       └─────────────┘          └─────────────┘
              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Commands   │          │   Queries   │
       │    API      │          │    API      │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │   Write     │─────────►│    Read     │
       │   Model     │  Events  │   Model     │
       └─────────────┘          └─────────────┘

Command Infrastructure

命令基础设施

python
@dataclass
class Command:
    command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass
class CreateOrder(Command):
    customer_id: str
    items: list
    shipping_address: dict

class CommandHandler(ABC, Generic[T]):
    @abstractmethod
    async def handle(self, command: T) -> Any:
        pass

class CommandBus:
    def __init__(self):
        self._handlers: Dict[Type[Command], CommandHandler] = {}

    def register(self, command_type, handler):
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> Any:
        handler = self._handlers.get(type(command))
        return await handler.handle(command)
python
@dataclass
class Command:
    command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass
class CreateOrder(Command):
    customer_id: str
    items: list
    shipping_address: dict

class CommandHandler(ABC, Generic[T]):
    @abstractmethod
    async def handle(self, command: T) -> Any:
        pass

class CommandBus:
    def __init__(self):
        self._handlers: Dict[Type[Command], CommandHandler] = {}

    def register(self, command_type, handler):
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> Any:
        handler = self._handlers.get(type(command))
        return await handler.handle(command)

Query Infrastructure

查询基础设施

python
@dataclass
class GetOrderById(Query):
    order_id: str

@dataclass
class OrderView:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    created_at: datetime

class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]):
    async def handle(self, query: GetOrderById) -> Optional[OrderView]:
        row = await self.read_db.fetchrow(
            "SELECT * FROM order_views WHERE order_id = $1",
            query.order_id
        )
        return OrderView(**dict(row)) if row else None
python
@dataclass
class GetOrderById(Query):
    order_id: str

@dataclass
class OrderView:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    created_at: datetime

class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]):
    async def handle(self, query: GetOrderById) -> Optional[OrderView]:
        row = await self.read_db.fetchrow(
            "SELECT * FROM order_views WHERE order_id = $1",
            query.order_id
        )
        return OrderView(**dict(row)) if row else None

FastAPI Integration

FastAPI 集成

python
undefined
python
undefined

Command endpoints (POST, PUT, DELETE)

Command endpoints (POST, PUT, DELETE)

@app.post("/orders") async def create_order(request: CreateOrderRequest, command_bus: CommandBus = Depends()): command = CreateOrder( customer_id=request.customer_id, items=request.items ) order_id = await command_bus.dispatch(command) return {"order_id": order_id}
@app.post("/orders") async def create_order(request: CreateOrderRequest, command_bus: CommandBus = Depends()): command = CreateOrder( customer_id=request.customer_id, items=request.items ) order_id = await command_bus.dispatch(command) return {"order_id": order_id}

Query endpoints (GET)

Query endpoints (GET)

@app.get("/orders/{order_id}") async def get_order(order_id: str, query_bus: QueryBus = Depends()): query = GetOrderById(order_id=order_id) return await query_bus.dispatch(query)
undefined
@app.get("/orders/{order_id}") async def get_order(order_id: str, query_bus: QueryBus = Depends()): query = GetOrderById(order_id=order_id) return await query_bus.dispatch(query)
undefined

Read Model Synchronization

读模型同步

python
class ReadModelSynchronizer:
    async def sync_projection(self, projection: Projection):
        checkpoint = await self._get_checkpoint(projection.name)
        events = await self.event_store.read_all(from_position=checkpoint)

        for event in events:
            if event.event_type in projection.handles():
                await projection.apply(event)
            await self._save_checkpoint(projection.name, event.position)

    async def rebuild_projection(self, projection_name: str):
        projection = self.projections[projection_name]
        await projection.clear()
        await self._save_checkpoint(projection_name, 0)
        # Rebuild from beginning
python
class ReadModelSynchronizer:
    async def sync_projection(self, projection: Projection):
        checkpoint = await self._get_checkpoint(projection.name)
        events = await self.event_store.read_all(from_position=checkpoint)

        for event in events:
            if event.event_type in projection.handles():
                await projection.apply(event)
            await self._save_checkpoint(projection.name, event.position)

    async def rebuild_projection(self, projection_name: str):
        projection = self.projections[projection_name]
        await projection.clear()
        await self._save_checkpoint(projection_name, 0)
        # Rebuild from beginning

Eventual Consistency

最终一致性

python
async def query_after_command(self, query, expected_version, stream_id, timeout=5.0):
    """Read-your-writes consistency."""
    start = time.time()
    while time.time() - start < timeout:
        projection_version = await self._get_projection_version(stream_id)
        if projection_version >= expected_version:
            return await self.execute_query(query)
        await asyncio.sleep(0.1)

    return {"data": await self.execute_query(query), "_warning": "May be stale"}
python
async def query_after_command(self, query, expected_version, stream_id, timeout=5.0):
    """Read-your-writes consistency."""
    start = time.time()
    while time.time() - start < timeout:
        projection_version = await self._get_projection_version(stream_id)
        if projection_version >= expected_version:
            return await self.execute_query(query)
        await asyncio.sleep(0.1)

    return {"data": await self.execute_query(query), "_warning": "May be stale"}

Best Practices

最佳实践

  1. Separate command and query models - Different optimization needs
  2. Accept eventual consistency - Define acceptable lag
  3. Validate in command handlers - Before state change
  4. Denormalize read models - Optimize for queries
  5. Version your events - For schema evolution
  1. 分离命令与查询模型 - 满足不同的优化需求
  2. 接受最终一致性 - 定义可接受的延迟
  3. 在命令处理器中验证 - 状态变更前执行
  4. 对读模型进行反规范化 - 为查询做优化
  5. 为事件添加版本 - 适配 schema 演进

When to Use CQRS

何时使用 CQRS

Good for:
  • Different read/write scaling needs
  • Complex query requirements
  • Event-sourced systems
  • High-performance reporting
Not for:
  • Simple CRUD applications
  • No scaling requirements
  • Small data sets
适用场景:
  • 读写扩展需求不同的场景
  • 复杂查询需求
  • 事件溯源系统
  • 高性能报表系统
不适用场景:
  • 简单 CRUD 应用
  • 无扩展需求的场景
  • 小型数据集