saga-orchestration
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSaga Orchestration
Saga编排
Patterns for managing distributed transactions and long-running business processes.
用于管理分布式事务和长期运行业务流程的模式。
Saga Types
Saga模式类型
Choreography
Choreography
┌─────┐ ┌─────┐ ┌─────┐
│Svc A│─►│Svc B│─►│Svc C│
└─────┘ └─────┘ └─────┘
│ │ │
▼ ▼ ▼
Event Event Event- Services react to events
- Decentralized control
- Good for simple flows
┌─────┐ ┌─────┐ ┌─────┐
│Svc A│─►│Svc B│─►│Svc C│
└─────┘ └─────┘ └─────┘
│ │ │
▼ ▼ ▼
Event Event Event- 服务响应事件
- 去中心化控制
- 适用于简单流程
Orchestration
Orchestration
┌─────────────┐
│ Orchestrator│
└──────┬──────┘
│
┌─────┼─────┐
▼ ▼ ▼
┌────┐┌────┐┌────┐
│Svc1││Svc2││Svc3│
└────┘└────┘└────┘- Central coordinator
- Explicit control flow
- Better for complex flows
┌─────────────┐
│ Orchestrator│
└──────┬──────┘
│
┌─────┼─────┐
▼ ▼ ▼
┌────┐┌────┐┌────┐
│Svc1││Svc2││Svc3│
└────┘└────┘└────┘- 中央协调器
- 显式控制流
- 更适用于复杂流程
Saga States
Saga状态
| State | Description |
|---|---|
| Started | Saga initiated |
| Pending | Waiting for step |
| Compensating | Rolling back |
| Completed | All steps succeeded |
| Failed | Failed after compensation |
| 状态 | 描述 |
|---|---|
| Started | Saga已启动 |
| Pending | 等待步骤执行 |
| Compensating | 正在回滚 |
| Completed | 所有步骤执行成功 |
| Failed | 补偿后仍失败 |
Orchestrator Implementation
协调器实现
python
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
class SagaOrchestrator:
async def execute(self, data: dict) -> SagaResult:
completed_steps = []
context = {"data": data}
for step in self.steps:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status="failed", error=result.error)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status="completed", data=context)
async def compensate(self, completed_steps, context):
for step in reversed(completed_steps):
await step.compensation(context)python
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
class SagaOrchestrator:
async def execute(self, data: dict) -> SagaResult:
completed_steps = []
context = {"data": data}
for step in self.steps:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status="failed", error=result.error)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status="completed", data=context)
async def compensate(self, completed_steps, context):
for step in reversed(completed_steps):
await step.compensation(context)Order Fulfillment Saga Example
订单履约Saga示例
python
class OrderFulfillmentSaga(SagaOrchestrator):
def define_steps(self, data):
return [
SagaStep("reserve_inventory",
action=self.reserve_inventory,
compensation=self.release_inventory),
SagaStep("process_payment",
action=self.process_payment,
compensation=self.refund_payment),
SagaStep("create_shipment",
action=self.create_shipment,
compensation=self.cancel_shipment),
]python
class OrderFulfillmentSaga(SagaOrchestrator):
def define_steps(self, data):
return [
SagaStep("reserve_inventory",
action=self.reserve_inventory,
compensation=self.release_inventory),
SagaStep("process_payment",
action=self.process_payment,
compensation=self.refund_payment),
SagaStep("create_shipment",
action=self.create_shipment,
compensation=self.cancel_shipment),
]Choreography Example
Choreography示例
python
class OrderChoreographySaga:
def __init__(self, event_bus):
self.event_bus = event_bus
event_bus.subscribe("OrderCreated", self._on_order_created)
event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
event_bus.subscribe("PaymentFailed", self._on_payment_failed)
async def _on_order_created(self, event):
await self.event_bus.publish("ReserveInventory", {
"order_id": event["order_id"],
"items": event["items"]
})
async def _on_payment_failed(self, event):
# Compensation
await self.event_bus.publish("ReleaseInventory", {
"reservation_id": event["reservation_id"]
})python
class OrderChoreographySaga:
def __init__(self, event_bus):
self.event_bus = event_bus
event_bus.subscribe("OrderCreated", self._on_order_created)
event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
event_bus.subscribe("PaymentFailed", self._on_payment_failed)
async def _on_order_created(self, event):
await self.event_bus.publish("ReserveInventory", {
"order_id": event["order_id"],
"items": event["items"]
})
async def _on_payment_failed(self, event):
# 补偿操作
await self.event_bus.publish("ReleaseInventory", {
"reservation_id": event["reservation_id"]
})Best Practices
最佳实践
- Make steps idempotent - Safe to retry
- Design compensations carefully - Must always work
- Use correlation IDs - For tracing across services
- Implement timeouts - Don't wait forever
- Log everything - For debugging failures
- 确保步骤幂等 - 可安全重试
- 谨慎设计补偿逻辑 - 必须确保始终可用
- 使用关联ID - 用于跨服务追踪
- 实现超时机制 - 避免无限等待
- 记录所有操作 - 便于调试故障
When to Use
使用场景
Orchestration:
- Complex multi-step workflows
- Need visibility into saga state
- Central error handling
Choreography:
- Simple event flows
- Loose coupling required
- Independent service teams
Orchestration(编排):
- 复杂的多步骤工作流
- 需要监控Saga状态
- 集中式错误处理
Choreography:
- 简单的事件驱动流程
- 需要松耦合架构
- 服务团队独立运作