saga-orchestration

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Saga Orchestration

Saga编排

Patterns for managing distributed transactions and long-running business processes.
用于管理分布式事务和长期运行业务流程的模式。

Saga Types

Saga模式类型

Choreography

Choreography

┌─────┐  ┌─────┐  ┌─────┐
│Svc A│─►│Svc B│─►│Svc C│
└─────┘  └─────┘  └─────┘
   │        │        │
   ▼        ▼        ▼
 Event    Event    Event
  • Services react to events
  • Decentralized control
  • Good for simple flows
┌─────┐  ┌─────┐  ┌─────┐
│Svc A│─►│Svc B│─►│Svc C│
└─────┘  └─────┘  └─────┘
   │        │        │
   ▼        ▼        ▼
 Event    Event    Event
  • 服务响应事件
  • 去中心化控制
  • 适用于简单流程

Orchestration

Orchestration

     ┌─────────────┐
     │ Orchestrator│
     └──────┬──────┘
      ┌─────┼─────┐
      ▼     ▼     ▼
   ┌────┐┌────┐┌────┐
   │Svc1││Svc2││Svc3│
   └────┘└────┘└────┘
  • Central coordinator
  • Explicit control flow
  • Better for complex flows
     ┌─────────────┐
     │ Orchestrator│
     └──────┬──────┘
      ┌─────┼─────┐
      ▼     ▼     ▼
   ┌────┐┌────┐┌────┐
   │Svc1││Svc2││Svc3│
   └────┘└────┘└────┘
  • 中央协调器
  • 显式控制流
  • 更适用于复杂流程

Saga States

Saga状态

StateDescription
StartedSaga initiated
PendingWaiting for step
CompensatingRolling back
CompletedAll steps succeeded
FailedFailed after compensation
状态描述
StartedSaga已启动
Pending等待步骤执行
Compensating正在回滚
Completed所有步骤执行成功
Failed补偿后仍失败

Orchestrator Implementation

协调器实现

python
@dataclass
class SagaStep:
    name: str
    action: str
    compensation: str
    status: str = "pending"

class SagaOrchestrator:
    async def execute(self, data: dict) -> SagaResult:
        completed_steps = []
        context = {"data": data}

        for step in self.steps:
            result = await step.action(context)
            if not result.success:
                await self.compensate(completed_steps, context)
                return SagaResult(status="failed", error=result.error)

            completed_steps.append(step)
            context.update(result.data)

        return SagaResult(status="completed", data=context)

    async def compensate(self, completed_steps, context):
        for step in reversed(completed_steps):
            await step.compensation(context)
python
@dataclass
class SagaStep:
    name: str
    action: str
    compensation: str
    status: str = "pending"

class SagaOrchestrator:
    async def execute(self, data: dict) -> SagaResult:
        completed_steps = []
        context = {"data": data}

        for step in self.steps:
            result = await step.action(context)
            if not result.success:
                await self.compensate(completed_steps, context)
                return SagaResult(status="failed", error=result.error)

            completed_steps.append(step)
            context.update(result.data)

        return SagaResult(status="completed", data=context)

    async def compensate(self, completed_steps, context):
        for step in reversed(completed_steps):
            await step.compensation(context)

Order Fulfillment Saga Example

订单履约Saga示例

python
class OrderFulfillmentSaga(SagaOrchestrator):
    def define_steps(self, data):
        return [
            SagaStep("reserve_inventory",
                     action=self.reserve_inventory,
                     compensation=self.release_inventory),
            SagaStep("process_payment",
                     action=self.process_payment,
                     compensation=self.refund_payment),
            SagaStep("create_shipment",
                     action=self.create_shipment,
                     compensation=self.cancel_shipment),
        ]
python
class OrderFulfillmentSaga(SagaOrchestrator):
    def define_steps(self, data):
        return [
            SagaStep("reserve_inventory",
                     action=self.reserve_inventory,
                     compensation=self.release_inventory),
            SagaStep("process_payment",
                     action=self.process_payment,
                     compensation=self.refund_payment),
            SagaStep("create_shipment",
                     action=self.create_shipment,
                     compensation=self.cancel_shipment),
        ]

Choreography Example

Choreography示例

python
class OrderChoreographySaga:
    def __init__(self, event_bus):
        self.event_bus = event_bus
        event_bus.subscribe("OrderCreated", self._on_order_created)
        event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
        event_bus.subscribe("PaymentFailed", self._on_payment_failed)

    async def _on_order_created(self, event):
        await self.event_bus.publish("ReserveInventory", {
            "order_id": event["order_id"],
            "items": event["items"]
        })

    async def _on_payment_failed(self, event):
        # Compensation
        await self.event_bus.publish("ReleaseInventory", {
            "reservation_id": event["reservation_id"]
        })
python
class OrderChoreographySaga:
    def __init__(self, event_bus):
        self.event_bus = event_bus
        event_bus.subscribe("OrderCreated", self._on_order_created)
        event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
        event_bus.subscribe("PaymentFailed", self._on_payment_failed)

    async def _on_order_created(self, event):
        await self.event_bus.publish("ReserveInventory", {
            "order_id": event["order_id"],
            "items": event["items"]
        })

    async def _on_payment_failed(self, event):
        # 补偿操作
        await self.event_bus.publish("ReleaseInventory", {
            "reservation_id": event["reservation_id"]
        })

Best Practices

最佳实践

  1. Make steps idempotent - Safe to retry
  2. Design compensations carefully - Must always work
  3. Use correlation IDs - For tracing across services
  4. Implement timeouts - Don't wait forever
  5. Log everything - For debugging failures
  1. 确保步骤幂等 - 可安全重试
  2. 谨慎设计补偿逻辑 - 必须确保始终可用
  3. 使用关联ID - 用于跨服务追踪
  4. 实现超时机制 - 避免无限等待
  5. 记录所有操作 - 便于调试故障

When to Use

使用场景

Orchestration:
  • Complex multi-step workflows
  • Need visibility into saga state
  • Central error handling
Choreography:
  • Simple event flows
  • Loose coupling required
  • Independent service teams
Orchestration(编排):
  • 复杂的多步骤工作流
  • 需要监控Saga状态
  • 集中式错误处理
Choreography:
  • 简单的事件驱动流程
  • 需要松耦合架构
  • 服务团队独立运作