cqrs-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CQRS Patterns

CQRS 模式

Separate read and write concerns for optimized data access.
分离读写关注点,优化数据访问。

Overview

概述

  • Read-heavy workloads with complex queries
  • Different scaling requirements for reads vs writes
  • Event sourcing implementations
  • Multiple read model representations of same data
  • Complex domain models with simple read requirements
  • 带有复杂查询的读密集型工作负载
  • 读操作与写操作有不同的扩展需求
  • 事件溯源的实现场景
  • 同一数据的多种读模型表示
  • 领域模型复杂但读需求简单的场景

When NOT to Use

不适用于以下场景

  • Simple CRUD applications
  • Strong consistency requirements everywhere
  • Small datasets with simple queries
  • 简单的CRUD应用
  • 所有场景都要求强一致性
  • 数据集小且查询简单的系统

Architecture Overview

架构概述

┌─────────────────┐         ┌─────────────────┐
│   Write Side    │         │   Read Side     │
├─────────────────┤         ├─────────────────┤
│  ┌───────────┐  │         │  ┌───────────┐  │
│  │ Commands  │  │         │  │  Queries  │  │
│  └─────┬─────┘  │         │  └─────┬─────┘  │
│  ┌─────▼─────┐  │         │  ┌─────▼─────┐  │
│  │ Aggregate │  │         │  │Read Model │  │
│  └─────┬─────┘  │         │  └───────────┘  │
│  ┌─────▼─────┐  │         │        ▲        │
│  │  Events   │──┼─────────┼────────┘        │
│  └───────────┘  │ Publish │   Project       │
└─────────────────┘         └─────────────────┘
┌─────────────────┐         ┌─────────────────┐
│   Write Side    │         │   Read Side     │
├─────────────────┤         ├─────────────────┤
│  ┌───────────┐  │         │  ┌───────────┐  │
│  │ Commands  │  │         │  │  Queries  │  │
│  └─────┬─────┘  │         │  └─────┬─────┘  │
│  ┌─────▼─────┐  │         │  ┌─────▼─────┐  │
│  │ Aggregate │  │         │  │Read Model │  │
│  └─────┬─────┘  │         │  └───────────┘  │
│  ┌─────▼─────┐  │         │        ▲        │
│  │  Events   │──┼─────────┼────────┘        │
│  └───────────┘  │ Publish │   Project       │
└─────────────────┘         └─────────────────┘

Command Side (Write Model)

命令端(写模型)

Command and Handler

命令与处理器

python
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
from datetime import datetime, timezone
from abc import ABC, abstractmethod

class Command(BaseModel):
    """Base command with metadata."""
    command_id: UUID = Field(default_factory=uuid4)
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    user_id: UUID | None = None

class CreateOrder(Command):
    customer_id: UUID
    items: list[OrderItem]
    shipping_address: Address

class CommandHandler(ABC):
    @abstractmethod
    async def handle(self, command: Command) -> list["DomainEvent"]:
        pass

class CreateOrderHandler(CommandHandler):
    def __init__(self, order_repo, inventory_service):
        self.order_repo = order_repo
        self.inventory = inventory_service

    async def handle(self, command: CreateOrder) -> list[DomainEvent]:
        for item in command.items:
            if not await self.inventory.check_availability(item.product_id, item.quantity):
                raise InsufficientInventoryError(item.product_id)

        order = Order.create(
            customer_id=command.customer_id,
            items=command.items,
            shipping_address=command.shipping_address,
        )
        await self.order_repo.save(order)
        return order.pending_events

class CommandBus:
    def __init__(self):
        self._handlers: dict[type, CommandHandler] = {}

    def register(self, command_type: type, handler: CommandHandler):
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> list[DomainEvent]:
        handler = self._handlers.get(type(command))
        if not handler:
            raise NoHandlerFoundError(type(command))
        events = await handler.handle(command)
        for event in events:
            await self.event_publisher.publish(event)
        return events
python
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
from datetime import datetime, timezone
from abc import ABC, abstractmethod

class Command(BaseModel):
    """Base command with metadata."""
    command_id: UUID = Field(default_factory=uuid4)
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    user_id: UUID | None = None

class CreateOrder(Command):
    customer_id: UUID
    items: list[OrderItem]
    shipping_address: Address

class CommandHandler(ABC):
    @abstractmethod
    async def handle(self, command: Command) -> list["DomainEvent"]:
        pass

class CreateOrderHandler(CommandHandler):
    def __init__(self, order_repo, inventory_service):
        self.order_repo = order_repo
        self.inventory = inventory_service

    async def handle(self, command: CreateOrder) -> list[DomainEvent]:
        for item in command.items:
            if not await self.inventory.check_availability(item.product_id, item.quantity):
                raise InsufficientInventoryError(item.product_id)

        order = Order.create(
            customer_id=command.customer_id,
            items=command.items,
            shipping_address=command.shipping_address,
        )
        await self.order_repo.save(order)
        return order.pending_events

class CommandBus:
    def __init__(self):
        self._handlers: dict[type, CommandHandler] = {}

    def register(self, command_type: type, handler: CommandHandler):
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> list[DomainEvent]:
        handler = self._handlers.get(type(command))
        if not handler:
            raise NoHandlerFoundError(type(command))
        events = await handler.handle(command)
        for event in events:
            await self.event_publisher.publish(event)
        return events

Write Model (Aggregate)

写模型(聚合根)

python
from dataclasses import dataclass, field

@dataclass
class Order:
    id: UUID
    customer_id: UUID
    items: list[OrderItem]
    status: OrderStatus
    _pending_events: list[DomainEvent] = field(default_factory=list)

    @classmethod
    def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
        order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
        for item in items:
            order.items.append(item)
            order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
        order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
        return order

    def cancel(self, reason: str):
        if self.status == OrderStatus.SHIPPED:
            raise InvalidOperationError("Cannot cancel shipped order")
        self.status = OrderStatus.CANCELLED
        self._raise_event(OrderCancelled(order_id=self.id, reason=reason))

    def _raise_event(self, event: DomainEvent):
        self._pending_events.append(event)

    @property
    def pending_events(self) -> list[DomainEvent]:
        events = self._pending_events.copy()
        self._pending_events.clear()
        return events
python
from dataclasses import dataclass, field

@dataclass
class Order:
    id: UUID
    customer_id: UUID
    items: list[OrderItem]
    status: OrderStatus
    _pending_events: list[DomainEvent] = field(default_factory=list)

    @classmethod
    def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
        order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
        for item in items:
            order.items.append(item)
            order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
        order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
        return order

    def cancel(self, reason: str):
        if self.status == OrderStatus.SHIPPED:
            raise InvalidOperationError("Cannot cancel shipped order")
        self.status = OrderStatus.CANCELLED
        self._raise_event(OrderCancelled(order_id=self.id, reason=reason))

    def _raise_event(self, event: DomainEvent):
        self._pending_events.append(event)

    @property
    def pending_events(self) -> list[DomainEvent]:
        events = self._pending_events.copy()
        self._pending_events.clear()
        return events

Query Side (Read Model)

查询端(读模型)

Query Handler

查询处理器

python
class Query(BaseModel):
    pass

class GetOrderById(Query):
    order_id: UUID

class GetOrdersByCustomer(Query):
    customer_id: UUID
    status: OrderStatus | None = None
    page: int = 1
    page_size: int = 20

class GetOrderByIdHandler:
    def __init__(self, read_db):
        self.db = read_db

    async def handle(self, query: GetOrderById) -> OrderView | None:
        row = await self.db.fetchrow(
            "SELECT * FROM order_summary WHERE id = $1", query.order_id
        )
        return OrderView(**row) if row else None

class OrderView(BaseModel):
    """Denormalized read model for orders."""
    id: UUID
    customer_id: UUID
    customer_name: str  # Denormalized
    status: str
    total_amount: float
    item_count: int
    created_at: datetime
python
class Query(BaseModel):
    pass

class GetOrderById(Query):
    order_id: UUID

class GetOrdersByCustomer(Query):
    customer_id: UUID
    status: OrderStatus | None = None
    page: int = 1
    page_size: int = 20

class GetOrderByIdHandler:
    def __init__(self, read_db):
        self.db = read_db

    async def handle(self, query: GetOrderById) -> OrderView | None:
        row = await self.db.fetchrow(
            "SELECT * FROM order_summary WHERE id = $1", query.order_id
        )
        return OrderView(**row) if row else None

class OrderView(BaseModel):
    """Denormalized read model for orders."""
    id: UUID
    customer_id: UUID
    customer_name: str  # Denormalized
    status: str
    total_amount: float
    item_count: int
    created_at: datetime

Projections

投影

python
class OrderProjection:
    """Projects events to read models."""
    def __init__(self, read_db, customer_service):
        self.db = read_db
        self.customers = customer_service

    async def handle(self, event: DomainEvent):
        match event:
            case OrderCreated():
                await self._on_order_created(event)
            case OrderItemAdded():
                await self._on_item_added(event)
            case OrderCancelled():
                await self._on_order_cancelled(event)

    async def _on_order_created(self, event: OrderCreated):
        customer = await self.customers.get(event.customer_id)
        await self.db.execute(
            """INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
               VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
               ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
            event.order_id, event.customer_id, customer.name, event.timestamp,
        )

    async def _on_item_added(self, event: OrderItemAdded):
        subtotal = event.quantity * event.unit_price
        await self.db.execute(
            "UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
            subtotal, event.order_id,
        )

    async def _on_order_cancelled(self, event: OrderCancelled):
        await self.db.execute(
            "UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
        )
python
class OrderProjection:
    """Projects events to read models."""
    def __init__(self, read_db, customer_service):
        self.db = read_db
        self.customers = customer_service

    async def handle(self, event: DomainEvent):
        match event:
            case OrderCreated():
                await self._on_order_created(event)
            case OrderItemAdded():
                await self._on_item_added(event)
            case OrderCancelled():
                await self._on_order_cancelled(event)

    async def _on_order_created(self, event: OrderCreated):
        customer = await self.customers.get(event.customer_id)
        await self.db.execute(
            """INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
               VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
               ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
            event.order_id, event.customer_id, customer.name, event.timestamp,
        )

    async def _on_item_added(self, event: OrderItemAdded):
        subtotal = event.quantity * event.unit_price
        await self.db.execute(
            "UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
            subtotal, event.order_id,
        )

    async def _on_order_cancelled(self, event: OrderCancelled):
        await self.db.execute(
            "UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
        )

FastAPI Integration

FastAPI 集成

python
from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()

@app.post("/api/v1/orders", status_code=201)
async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)):
    command = CreateOrder(
        customer_id=request.customer_id,
        items=request.items,
        shipping_address=request.shipping_address,
    )
    try:
        events = await bus.dispatch(command)
        return {"order_id": events[0].order_id}
    except InsufficientInventoryError as e:
        raise HTTPException(400, f"Insufficient inventory: {e}")

@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)):
    order = await bus.dispatch(GetOrderById(order_id=order_id))
    if not order:
        raise HTTPException(404, "Order not found")
    return order
python
from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()

@app.post("/api/v1/orders", status_code=201)
async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)):
    command = CreateOrder(
        customer_id=request.customer_id,
        items=request.items,
        shipping_address=request.shipping_address,
    )
    try:
        events = await bus.dispatch(command)
        return {"order_id": events[0].order_id}
    except InsufficientInventoryError as e:
        raise HTTPException(400, f"Insufficient inventory: {e}")

@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)):
    order = await bus.dispatch(GetOrderById(order_id=order_id))
    if not order:
        raise HTTPException(404, "Order not found")
    return order

Key Decisions

关键决策

DecisionRecommendation
ConsistencyEventual consistency between write and read models
Event storageEvent store for write side, denormalized tables for read
Projection lagMonitor and alert on projection delay
Read model countStart with one, add more for specific query needs
Rebuild strategyAbility to rebuild projections from events
决策建议
一致性写模型与读模型之间采用最终一致性
事件存储写端使用事件存储,读端使用非规范化表
投影延迟监控并告警投影延迟情况
读模型数量从一个读模型开始,根据特定查询需求逐步添加
重建策略具备从事件重建投影的能力

Anti-Patterns (FORBIDDEN)

反模式(禁止使用)

python
undefined
python
undefined

NEVER query write model for reads

绝不要查询写模型来获取读数据

order = await aggregate_repo.get(order_id) # WRONG
order = await aggregate_repo.get(order_id) # 错误做法

CORRECT: Use read model

正确做法:使用读模型

order = await query_bus.dispatch(GetOrderById(order_id=order_id))
order = await query_bus.dispatch(GetOrderById(order_id=order_id))

NEVER modify read model directly

绝不要直接修改读模型

await read_db.execute("UPDATE orders SET status = $1", status) # WRONG
await read_db.execute("UPDATE orders SET status = $1", status) # 错误做法

CORRECT: Dispatch command, let projection update

正确做法:分发命令,由投影完成更新

await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))
await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))

NEVER skip projection idempotency - use UPSERT

绝不要忽略投影的幂等性 - 使用UPSERT

await self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")
undefined
await self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")
undefined

Related Skills

相关技能

  • event-sourcing
    - Event-sourced write models
  • saga-patterns
    - Cross-aggregate transactions
  • database-schema-designer
    - Read model schema design
  • event-sourcing
    - 基于事件溯源的写模型
  • saga-patterns
    - 跨聚合根事务
  • database-schema-designer
    - 读模型 schema 设计