cqrs
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCQRS Implementation
CQRS 实现
Command Query Responsibility Segregation for scalable architectures.
面向可扩展架构的命令查询职责分离。
Architecture
架构
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘ ┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘Command Infrastructure
命令基础设施
python
@dataclass
class Command:
command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict
class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type, handler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
return await handler.handle(command)python
@dataclass
class Command:
command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict
class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type, handler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
return await handler.handle(command)Query Infrastructure
查询基础设施
python
@dataclass
class GetOrderById(Query):
order_id: str
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
created_at: datetime
class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]):
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
row = await self.read_db.fetchrow(
"SELECT * FROM order_views WHERE order_id = $1",
query.order_id
)
return OrderView(**dict(row)) if row else Nonepython
@dataclass
class GetOrderById(Query):
order_id: str
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
created_at: datetime
class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]):
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
row = await self.read_db.fetchrow(
"SELECT * FROM order_views WHERE order_id = $1",
query.order_id
)
return OrderView(**dict(row)) if row else NoneFastAPI Integration
FastAPI 集成
python
undefinedpython
undefinedCommand endpoints (POST, PUT, DELETE)
Command endpoints (POST, PUT, DELETE)
@app.post("/orders")
async def create_order(request: CreateOrderRequest, command_bus: CommandBus = Depends()):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items
)
order_id = await command_bus.dispatch(command)
return {"order_id": order_id}
@app.post("/orders")
async def create_order(request: CreateOrderRequest, command_bus: CommandBus = Depends()):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items
)
order_id = await command_bus.dispatch(command)
return {"order_id": order_id}
Query endpoints (GET)
Query endpoints (GET)
@app.get("/orders/{order_id}")
async def get_order(order_id: str, query_bus: QueryBus = Depends()):
query = GetOrderById(order_id=order_id)
return await query_bus.dispatch(query)
undefined@app.get("/orders/{order_id}")
async def get_order(order_id: str, query_bus: QueryBus = Depends()):
query = GetOrderById(order_id=order_id)
return await query_bus.dispatch(query)
undefinedRead Model Synchronization
读模型同步
python
class ReadModelSynchronizer:
async def sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)
events = await self.event_store.read_all(from_position=checkpoint)
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self._save_checkpoint(projection.name, event.position)
async def rebuild_projection(self, projection_name: str):
projection = self.projections[projection_name]
await projection.clear()
await self._save_checkpoint(projection_name, 0)
# Rebuild from beginningpython
class ReadModelSynchronizer:
async def sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)
events = await self.event_store.read_all(from_position=checkpoint)
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self._save_checkpoint(projection.name, event.position)
async def rebuild_projection(self, projection_name: str):
projection = self.projections[projection_name]
await projection.clear()
await self._save_checkpoint(projection_name, 0)
# Rebuild from beginningEventual Consistency
最终一致性
python
async def query_after_command(self, query, expected_version, stream_id, timeout=5.0):
"""Read-your-writes consistency."""
start = time.time()
while time.time() - start < timeout:
projection_version = await self._get_projection_version(stream_id)
if projection_version >= expected_version:
return await self.execute_query(query)
await asyncio.sleep(0.1)
return {"data": await self.execute_query(query), "_warning": "May be stale"}python
async def query_after_command(self, query, expected_version, stream_id, timeout=5.0):
"""Read-your-writes consistency."""
start = time.time()
while time.time() - start < timeout:
projection_version = await self._get_projection_version(stream_id)
if projection_version >= expected_version:
return await self.execute_query(query)
await asyncio.sleep(0.1)
return {"data": await self.execute_query(query), "_warning": "May be stale"}Best Practices
最佳实践
- Separate command and query models - Different optimization needs
- Accept eventual consistency - Define acceptable lag
- Validate in command handlers - Before state change
- Denormalize read models - Optimize for queries
- Version your events - For schema evolution
- 分离命令与查询模型 - 满足不同的优化需求
- 接受最终一致性 - 定义可接受的延迟
- 在命令处理器中验证 - 状态变更前执行
- 对读模型进行反规范化 - 为查询做优化
- 为事件添加版本 - 适配 schema 演进
When to Use CQRS
何时使用 CQRS
Good for:
- Different read/write scaling needs
- Complex query requirements
- Event-sourced systems
- High-performance reporting
Not for:
- Simple CRUD applications
- No scaling requirements
- Small data sets
适用场景:
- 读写扩展需求不同的场景
- 复杂查询需求
- 事件溯源系统
- 高性能报表系统
不适用场景:
- 简单 CRUD 应用
- 无扩展需求的场景
- 小型数据集