Saga patterns for distributed transactions with orchestration and choreography approaches. Use when implementing multi-service transactions, handling partial failures, or building systems requiring eventual consistency with compensation.
Implements saga patterns for distributed transactions with orchestration and compensation.
/plugin marketplace add yonatangross/skillforge-claude-plugin/plugin install skillforge-complete@skillforgeThis skill inherits all available tools. When active, it can use any tool Claude has access to.
checklists/compensation-checklist.mdchecklists/saga-design-checklist.mdchecklists/saga-implementation-checklist.mdexamples/ecommerce-order-saga.mdexamples/order-fulfillment-saga.pyexamples/travel-booking-saga.pyreferences/choreography-deep-dive.mdreferences/compensation-strategies.mdreferences/orchestration-deep-dive.mdreferences/state-machine-saga.mdtemplates/event-router-template.pytemplates/saga-orchestrator-template.pytemplates/saga-step-template.pyMaintain consistency across microservices without distributed locks.
| Aspect | Orchestration | Choreography |
|---|---|---|
| Control | Central orchestrator | Distributed events |
| Coupling | Services depend on orchestrator | Loosely coupled |
| Visibility | Single point of observation | Requires distributed tracing |
| Best for | Complex, ordered workflows | Simple, parallel flows |
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any
from datetime import datetime, timezone
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable
status: SagaStatus = SagaStatus.PENDING
result: Any = None
@dataclass
class SagaContext:
saga_id: str
data: dict = field(default_factory=dict)
steps: list[SagaStep] = field(default_factory=list)
status: SagaStatus = SagaStatus.PENDING
current_step: int = 0
class SagaOrchestrator:
def __init__(self, saga_repository, event_publisher):
self.repo = saga_repository
self.publisher = event_publisher
async def execute(self, saga: SagaContext) -> SagaContext:
saga.status = SagaStatus.RUNNING
await self.repo.save(saga)
for i, step in enumerate(saga.steps):
saga.current_step = i
try:
step.result = await step.action(saga.data)
saga.data.update(step.result or {})
step.status = SagaStatus.COMPLETED
except Exception:
step.status = SagaStatus.FAILED
await self._compensate(saga, i)
return saga
saga.status = SagaStatus.COMPLETED
await self.repo.save(saga)
return saga
async def _compensate(self, saga: SagaContext, failed_step: int):
saga.status = SagaStatus.COMPENSATING
for i in range(failed_step - 1, -1, -1):
step = saga.steps[i]
if step.status == SagaStatus.COMPLETED:
try:
await step.compensation(saga.data)
step.status = SagaStatus.COMPENSATED
except Exception as e:
step.error = f"Compensation failed: {e}"
saga.status = SagaStatus.COMPENSATED
await self.repo.save(saga)
class OrderSaga:
def __init__(self, payment_service, inventory_service, shipping_service):
self.payment = payment_service
self.inventory = inventory_service
self.shipping = shipping_service
def create_saga(self, order: Order) -> SagaContext:
return SagaContext(
saga_id=f"order-{order.id}",
data={"order": order.dict()},
steps=[
SagaStep("reserve_inventory", self._reserve_inventory, self._release_inventory),
SagaStep("process_payment", self._process_payment, self._refund_payment),
SagaStep("create_shipment", self._create_shipment, self._cancel_shipment),
],
)
async def _reserve_inventory(self, data: dict) -> dict:
reservation = await self.inventory.reserve(items=data["order"]["items"])
return {"reservation_id": reservation.id}
async def _release_inventory(self, data: dict):
await self.inventory.release(data["reservation_id"])
async def _process_payment(self, data: dict) -> dict:
payment = await self.payment.charge(amount=data["order"]["total"])
return {"payment_id": payment.id}
async def _refund_payment(self, data: dict):
await self.payment.refund(data["payment_id"])
async def _create_shipment(self, data: dict) -> dict:
shipment = await self.shipping.create(order_id=data["order"]["id"])
return {"shipment_id": shipment.id}
async def _cancel_shipment(self, data: dict):
if "shipment_id" in data:
await self.shipping.cancel(data["shipment_id"])
class OrderChoreography:
"""Event handlers for order saga choreography."""
def __init__(self, event_bus, order_repo):
self.bus = event_bus
self.repo = order_repo
async def handle_order_created(self, event):
await self.bus.publish("inventory.reserve.requested", {
"saga_id": event.saga_id,
"items": event.payload["order"]["items"],
})
async def handle_inventory_reserved(self, event):
await self.bus.publish("payment.charge.requested", {
"saga_id": event.saga_id,
"amount": event.payload["amount"],
})
async def handle_payment_failed(self, event):
# Compensation: release inventory
await self.bus.publish("inventory.release.requested", {
"saga_id": event.saga_id,
"reservation_id": event.payload["reservation_id"],
})
async def handle_shipment_created(self, event):
order = await self.repo.get(event.payload["order_id"])
order.status = "shipped"
await self.repo.save(order)
from datetime import timedelta
import asyncio
class SagaRecovery:
def __init__(self, saga_repo, orchestrator):
self.repo = saga_repo
self.orchestrator = orchestrator
async def recover_stuck_sagas(self, timeout: timedelta = timedelta(hours=1)):
cutoff = datetime.now(timezone.utc) - timeout
stuck_sagas = await self.repo.find_by_status_and_age(SagaStatus.RUNNING, cutoff)
for saga in stuck_sagas:
try:
await self.orchestrator.resume(saga)
except Exception:
await self.orchestrator._compensate(saga, saga.current_step)
async def retry_failed_step(self, saga_id: str, max_retries: int = 3):
saga = await self.repo.get(saga_id)
failed_step = saga.steps[saga.current_step]
for attempt in range(max_retries):
try:
failed_step.result = await failed_step.action(saga.data)
failed_step.status = SagaStatus.COMPLETED
await self.orchestrator.resume(saga, from_step=saga.current_step + 1)
return
except Exception:
await asyncio.sleep(2 ** attempt)
await self.orchestrator._compensate(saga, saga.current_step)
class IdempotentSagaStep:
def __init__(self, step_name: str, idempotency_store):
self.step_name = step_name
self.store = idempotency_store
async def execute(self, saga_id: str, action: Callable, *args, **kwargs):
idempotency_key = f"{saga_id}:{self.step_name}"
existing = await self.store.get(idempotency_key)
if existing:
return existing["result"]
result = await action(*args, **kwargs)
await self.store.set(idempotency_key, {"result": result}, ttl=timedelta(days=7))
return result
| Decision | Recommendation |
|---|---|
| Pattern choice | Orchestration for complex flows, Choreography for simple |
| State storage | Persistent store (PostgreSQL) for saga state |
| Idempotency | Required for all saga steps |
| Timeouts | Per-step timeouts with recovery |
| Compensation | Always implement, test thoroughly |
| Observability | Trace saga ID across all services |
# NEVER skip compensation logic
async def _process_payment(self, data: dict):
return await self.payment.charge(data) # Missing compensation!
# NEVER rely on synchronous calls across services
async def _reserve_and_pay(self, data: dict):
await self.inventory.reserve(data)
await self.payment.charge(data) # If fails, inventory stuck!
# NEVER ignore idempotency
async def _create_order(self, data: dict):
return await self.db.insert(Order(**data)) # Duplicate on retry!
# NEVER use in-memory saga state
sagas = {} # Lost on restart!
# ALWAYS persist saga state and test compensation paths
temporal-io - Durable workflow executionevent-sourcing - Event-driven state managementidempotency-patterns - Idempotent operationsActivates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Activates when the user asks about Agent Skills, wants to find reusable AI capabilities, needs to install skills, or mentions skills for Claude. Use for discovering, retrieving, and installing skills.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.