saga-orchestration

实现分布式事务与跨聚合工作流的Saga模式。适用于协调多步骤业务流程、处理补偿性事务或管理长时间运行的工作流场景。

查看详情
name:saga-orchestrationdescription:Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.

Saga Orchestration

Patterns for managing distributed transactions and long-running business processes.

Do not use this skill when

  • The task is unrelated to saga orchestration

  • You need a different domain or tool outside this scope
  • Instructions

  • Clarify goals, constraints, and required inputs.

  • Apply relevant best practices and validate outcomes.

  • Provide actionable steps and verification.

  • If detailed examples are required, open resources/implementation-playbook.md.
  • Use this skill when

  • Coordinating multi-service transactions

  • Implementing compensating transactions

  • Managing long-running business workflows

  • Handling failures in distributed systems

  • Building order fulfillment processes

  • Implementing approval workflows
  • Core Concepts

    1. Saga Types

    Choreography                    Orchestration
    ┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
    │Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
    └─────┘ └─────┘ └─────┘ └──────┬──────┘
    │ │ │ │
    ▼ ▼ ▼ ┌─────┼─────┐
    Event Event Event ▼ ▼ ▼
    ┌────┐┌────┐┌────┐
    │Svc1││Svc2││Svc3│
    └────┘└────┘└────┘

    2. Saga Execution States

    StateDescription
    StartedSaga initiated
    PendingWaiting for step completion
    CompensatingRolling back due to failure
    CompletedAll steps succeeded
    FailedSaga failed after compensation

    Templates

    Template 1: Saga Orchestrator Base

    from abc import ABC, abstractmethod
    from dataclasses import dataclass, field
    from enum import Enum
    from typing import List, Dict, Any, Optional
    from datetime import datetime
    import uuid

    class SagaState(Enum):
    STARTED = "started"
    PENDING = "pending"
    COMPENSATING = "compensating"
    COMPLETED = "completed"
    FAILED = "failed"


    @dataclass
    class SagaStep:
    name: str
    action: str
    compensation: str
    status: str = "pending"
    result: Optional[Dict] = None
    error: Optional[str] = None
    executed_at: Optional[datetime] = None
    compensated_at: Optional[datetime] = None


    @dataclass
    class Saga:
    saga_id: str
    saga_type: str
    state: SagaState
    data: Dict[str, Any]
    steps: List[SagaStep]
    current_step: int = 0
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)


    class SagaOrchestrator(ABC):
    """Base class for saga orchestrators."""

    def __init__(self, saga_store, event_publisher):
    self.saga_store = saga_store
    self.event_publisher = event_publisher

    @abstractmethod
    def define_steps(self, data: Dict) -> List[SagaStep]:
    """Define the saga steps."""
    pass

    @property
    @abstractmethod
    def saga_type(self) -> str:
    """Unique saga type identifier."""
    pass

    async def start(self, data: Dict) -> Saga:
    """Start a new saga."""
    saga = Saga(
    saga_id=str(uuid.uuid4()),
    saga_type=self.saga_type,
    state=SagaState.STARTED,
    data=data,
    steps=self.define_steps(data)
    )
    await self.saga_store.save(saga)
    await self._execute_next_step(saga)
    return saga

    async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
    """Handle successful step completion."""
    saga = await self.saga_store.get(saga_id)

    # Update step
    for step in saga.steps:
    if step.name == step_name:
    step.status = "completed"
    step.result = result
    step.executed_at = datetime.utcnow()
    break

    saga.current_step += 1
    saga.updated_at = datetime.utcnow()

    # Check if saga is complete
    if saga.current_step >= len(saga.steps):
    saga.state = SagaState.COMPLETED
    await self.saga_store.save(saga)
    await self._on_saga_completed(saga)
    else:
    saga.state = SagaState.PENDING
    await self.saga_store.save(saga)
    await self._execute_next_step(saga)

    async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
    """Handle step failure - start compensation."""
    saga = await self.saga_store.get(saga_id)

    # Mark step as failed
    for step in saga.steps:
    if step.name == step_name:
    step.status = "failed"
    step.error = error
    break

    saga.state = SagaState.COMPENSATING
    saga.updated_at = datetime.utcnow()
    await self.saga_store.save(saga)

    # Start compensation from current step backwards
    await self._compensate(saga)

    async def _execute_next_step(self, saga: Saga):
    """Execute the next step in the saga."""
    if saga.current_step >= len(saga.steps):
    return

    step = saga.steps[saga.current_step]
    step.status = "executing"
    await self.saga_store.save(saga)

    # Publish command to execute step
    await self.event_publisher.publish(
    step.action,
    {
    "saga_id": saga.saga_id,
    "step_name": step.name,
    saga.data
    }
    )

    async def _compensate(self, saga: Saga):
    """Execute compensation for completed steps."""
    # Compensate in reverse order
    for i in range(saga.current_step - 1, -1, -1):
    step = saga.steps[i]
    if step.status == "completed":
    step.status = "compensating"
    await self.saga_store.save(saga)

    await self.event_publisher.publish(
    step.compensation,
    {
    "saga_id": saga.saga_id,
    "step_name": step.name,
    "original_result": step.result,
    saga.data
    }
    )

    async def handle_compensation_completed(self, saga_id: str, step_name: str):
    """Handle compensation completion."""
    saga = await self.saga_store.get(saga_id)

    for step in saga.steps:
    if step.name == step_name:
    step.status = "compensated"
    step.compensated_at = datetime.utcnow()
    break

    # Check if all compensations complete
    all_compensated = all(
    s.status in ("compensated", "pending", "failed")
    for s in saga.steps
    )

    if all_compensated:
    saga.state = SagaState.FAILED
    await self._on_saga_failed(saga)

    await self.saga_store.save(saga)

    async def _on_saga_completed(self, saga: Saga):
    """Called when saga completes successfully."""
    await self.event_publisher.publish(
    f"{self.saga_type}Completed",
    {"saga_id": saga.saga_id, saga.data}
    )

    async def _on_saga_failed(self, saga: Saga):
    """Called when saga fails after compensation."""
    await self.event_publisher.publish(
    f"{self.saga_type}Failed",
    {"saga_id": saga.saga_id, "error": "Saga failed",
    saga.data}
    )

    Template 2: Order Fulfillment Saga

    class OrderFulfillmentSaga(SagaOrchestrator):
    """Orchestrates order fulfillment across services."""

    @property
    def saga_type(self) -> str:
    return "OrderFulfillment"

    def define_steps(self, data: Dict) -> List[SagaStep]:
    return [
    SagaStep(
    name="reserve_inventory",
    action="InventoryService.ReserveItems",
    compensation="InventoryService.ReleaseReservation"
    ),
    SagaStep(
    name="process_payment",
    action="PaymentService.ProcessPayment",
    compensation="PaymentService.RefundPayment"
    ),
    SagaStep(
    name="create_shipment",
    action="ShippingService.CreateShipment",
    compensation="ShippingService.CancelShipment"
    ),
    SagaStep(
    name="send_confirmation",
    action="NotificationService.SendOrderConfirmation",
    compensation="NotificationService.SendCancellationNotice"
    )
    ]


    Usage


    async def create_order(order_data: Dict):
    saga = OrderFulfillmentSaga(saga_store, event_publisher)
    return await saga.start({
    "order_id": order_data["order_id"],
    "customer_id": order_data["customer_id"],
    "items": order_data["items"],
    "payment_method": order_data["payment_method"],
    "shipping_address": order_data["shipping_address"]
    })


    Event handlers in each service


    class InventoryService:
    async def handle_reserve_items(self, command: Dict):
    try:
    # Reserve inventory
    reservation = await self.reserve(
    command["items"],
    command["order_id"]
    )
    # Report success
    await self.event_publisher.publish(
    "SagaStepCompleted",
    {
    "saga_id": command["saga_id"],
    "step_name": "reserve_inventory",
    "result": {"reservation_id": reservation.id}
    }
    )
    except InsufficientInventoryError as e:
    await self.event_publisher.publish(
    "SagaStepFailed",
    {
    "saga_id": command["saga_id"],
    "step_name": "reserve_inventory",
    "error": str(e)
    }
    )

    async def handle_release_reservation(self, command: Dict):
    # Compensating action
    await self.release_reservation(
    command["original_result"]["reservation_id"]
    )
    await self.event_publisher.publish(
    "SagaCompensationCompleted",
    {
    "saga_id": command["saga_id"],
    "step_name": "reserve_inventory"
    }
    )

    Template 3: Choreography-Based Saga

    from dataclasses import dataclass
    from typing import Dict, Any
    import asyncio

    @dataclass
    class SagaContext:
    """Passed through choreographed saga events."""
    saga_id: str
    step: int
    data: Dict[str, Any]
    completed_steps: list


    class OrderChoreographySaga:
    """Choreography-based saga using events."""

    def __init__(self, event_bus):
    self.event_bus = event_bus
    self._register_handlers()

    def _register_handlers(self):
    self.event_bus.subscribe("OrderCreated", self._on_order_created)
    self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
    self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
    self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)

    # Compensation handlers
    self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
    self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)

    async def _on_order_created(self, event: Dict):
    """Step 1: Order created, reserve inventory."""
    await self.event_bus.publish("ReserveInventory", {
    "saga_id": event["order_id"],
    "order_id": event["order_id"],
    "items": event["items"]
    })

    async def _on_inventory_reserved(self, event: Dict):
    """Step 2: Inventory reserved, process payment."""
    await self.event_bus.publish("ProcessPayment", {
    "saga_id": event["saga_id"],
    "order_id": event["order_id"],
    "amount": event["total_amount"],
    "reservation_id": event["reservation_id"]
    })

    async def _on_payment_processed(self, event: Dict):
    """Step 3: Payment done, create shipment."""
    await self.event_bus.publish("CreateShipment", {
    "saga_id": event["saga_id"],
    "order_id": event["order_id"],
    "payment_id": event["payment_id"]
    })

    async def _on_shipment_created(self, event: Dict):
    """Step 4: Complete - send confirmation."""
    await self.event_bus.publish("OrderFulfilled", {
    "saga_id": event["saga_id"],
    "order_id": event["order_id"],
    "tracking_number": event["tracking_number"]
    })

    # Compensation handlers
    async def _on_payment_failed(self, event: Dict):
    """Payment failed - release inventory."""
    await self.event_bus.publish("ReleaseInventory", {
    "saga_id": event["saga_id"],
    "reservation_id": event["reservation_id"]
    })
    await self.event_bus.publish("OrderFailed", {
    "order_id": event["order_id"],
    "reason": "Payment failed"
    })

    async def _on_shipment_failed(self, event: Dict):
    """Shipment failed - refund payment and release inventory."""
    await self.event_bus.publish("RefundPayment", {
    "saga_id": event["saga_id"],
    "payment_id": event["payment_id"]
    })
    await self.event_bus.publish("ReleaseInventory", {
    "saga_id": event["saga_id"],
    "reservation_id": event["reservation_id"]
    })

    Template 4: Saga with Timeouts

    class TimeoutSagaOrchestrator(SagaOrchestrator):
    """Saga orchestrator with step timeouts."""

    def __init__(self, saga_store, event_publisher, scheduler):
    super().__init__(saga_store, event_publisher)
    self.scheduler = scheduler

    async def _execute_next_step(self, saga: Saga):
    if saga.current_step >= len(saga.steps):
    return

    step = saga.steps[saga.current_step]
    step.status = "executing"
    step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
    await self.saga_store.save(saga)

    # Schedule timeout check
    await self.scheduler.schedule(
    f"saga_timeout_{saga.saga_id}_{step.name}",
    self._check_timeout,
    {"saga_id": saga.saga_id, "step_name": step.name},
    run_at=step.timeout_at
    )

    await self.event_publisher.publish(
    step.action,
    {"saga_id": saga.saga_id, "step_name": step.name, saga.data}
    )

    async def _check_timeout(self, data: Dict):
    """Check if step has timed out."""
    saga = await self.saga_store.get(data["saga_id"])
    step = next(s for s in saga.steps if s.name == data["step_name"])

    if step.status == "executing":
    # Step timed out - fail it
    await self.handle_step_failed(
    data["saga_id"],
    data["step_name"],
    "Step timed out"
    )

    Best Practices

    Do's

  • Make steps idempotent - Safe to retry

  • Design compensations carefully - They must work

  • Use correlation IDs - For tracing across services

  • Implement timeouts - Don't wait forever

  • Log everything - For debugging failures
  • Don'ts

  • Don't assume instant completion - Sagas take time

  • Don't skip compensation testing - Most critical part

  • Don't couple services - Use async messaging

  • Don't ignore partial failures** - Handle gracefully
  • Resources

  • Saga Pattern

  • Designing Data-Intensive Applications

    1. saga-orchestration - Agent Skills