From backend-development
Implements saga orchestration and choreography for distributed microservices transactions without 2PC. Defines steps, compensating actions, timeouts, and monitoring for workflows like orders or bookings.
npx claudepluginhub arogyareddy/https-github.com-wshobson-agents --plugin backend-developmentThis skill uses the workspace's default tool permissions.
Patterns for managing distributed transactions and long-running business processes without two-phase commit.
Provides Ktor server patterns for routing DSL, plugins (auth, CORS, serialization), Koin DI, WebSockets, services, and testApplication testing.
Conducts multi-source web research with firecrawl and exa MCPs: searches, scrapes pages, synthesizes cited reports. For deep dives, competitive analysis, tech evaluations, or due diligence.
Provides demand forecasting, safety stock optimization, replenishment planning, and promotional lift estimation for multi-location retailers managing 300-800 SKUs.
Patterns for managing distributed transactions and long-running business processes without two-phase commit.
What you provide:
What this skill produces:
Choreography Orchestration
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
Each service reacts to the │Svc1││Svc2││Svc3│
previous service's event. └────┘└────┘└────┘
No central coordinator. Central coordinator sends
commands and tracks state.
Choose orchestration when: You need explicit step tracking, retries, and centralized visibility. Easier to debug.
Choose choreography when: You want loose coupling and services that can evolve independently. Harder to trace.
| State | Description |
|---|---|
| Started | Saga initiated, first step dispatched |
| Pending | Waiting for a step reply from a participant |
| Compensating | A step failed; rolling back completed steps |
| Completed | All forward steps succeeded |
| Failed | Saga failed and all compensations have finished |
| Situation | Handling |
|---|---|
| Step never started | No compensation needed (skip) |
| Step completed successfully | Run compensation command |
| Step failed before completion | No compensation needed; mark failed |
| Compensation itself fails | Retry with backoff → DLQ → manual intervention alert |
| Step result no longer exists | Treat compensation as success (idempotency) |
Concrete subclass of the base orchestrator. Defines four steps spanning inventory, payment, shipping, and notification. See references/advanced-patterns.md for the full abstract SagaOrchestrator base class.
from saga_orchestrator import SagaOrchestrator, SagaStep
from typing import Dict, List
class OrderFulfillmentSaga(SagaOrchestrator):
"""Orchestrates order fulfillment across four participant services."""
@property
def saga_type(self) -> str:
return "OrderFulfillment"
def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
),
]
# Start a saga
async def create_order(order_data: Dict, saga_store, event_publisher):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"],
})
# Participant service — handles command and publishes reply
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
reservation = await self.reserve(command["items"], command["order_id"])
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
except InsufficientInventoryError as e:
await self.event_publisher.publish("SagaStepFailed", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
})
async def handle_release_reservation(self, command: Dict):
"""Compensation — idempotent, always publishes completion."""
try:
await self.release_reservation(
command["original_result"]["reservation_id"]
)
except ReservationNotFoundError:
pass # Already released — treat as success
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
Each service listens for the previous service's event and reacts. No central coordinator. Compensation is triggered by failure events propagating backward.
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class SagaContext:
"""Carried through all events in a choreographed saga."""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list
class OrderChoreographySaga:
"""Choreography-based saga — services react to each other's events."""
def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()
def _register_handlers(self):
# Forward path
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
# Compensation path
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
async def _on_order_created(self, event: Dict):
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"],
})
async def _on_inventory_reserved(self, event: Dict):
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"],
})
async def _on_payment_processed(self, event: Dict):
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"],
})
async def _on_shipment_created(self, event: Dict):
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"],
})
# Compensation handlers
async def _on_payment_failed(self, event: Dict):
"""Payment failed — release inventory and mark order failed."""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "Payment failed",
})
async def _on_shipment_failed(self, event: Dict):
"""Shipment failed — refund payment and release inventory."""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"],
})
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
Every participant must guard against duplicate command delivery. Store an idempotency key before executing and return the cached result on replay.
async def handle_reserve_items(self, command: Dict):
"""Idempotency-guarded reservation step."""
idempotency_key = f"reserve-{command['order_id']}"
existing = await self.reservation_store.find_by_key(idempotency_key)
if existing:
# Already executed — return the previous result without side effects
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": existing.id}
})
return
# First execution
reservation = await self.reserve(
items=command["items"],
order_id=command["order_id"],
idempotency_key=idempotency_key
)
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
saga_id must flow through every event and logsaga_id, step_name, old_state → new_state on every changeA saga enters compensation but never reaches FAILED. This means a compensation handler is throwing an unhandled exception and never publishing SagaCompensationCompleted. Add dead-letter queue (DLQ) handling to compensation consumers and ensure every compensation action publishes a result event even when the underlying operation was already rolled back.
async def handle_release_reservation(self, command: Dict):
try:
await self.release_reservation(command["original_result"]["reservation_id"])
except ReservationNotFoundError:
pass # Already released — treat as success
# Always publish completion, regardless of outcome
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
If your orchestrator service restarts mid-saga, it may replay events and re-execute already-completed steps. Guard every step action with an idempotency key — see Template 3 above.
In a choreography-based saga, a downstream service may miss an event if it was offline when published. Use a durable message broker (Kafka with replication, RabbitMQ with persistence) and store the current saga state in a dedicated saga_log table so you can replay from the last known good step.
A step like create_shipment might take up to 15 minutes during peak load but your global timeout is 5 minutes, causing spurious compensation. Make step timeouts configurable per step type — see references/advanced-patterns.md for the TimeoutSagaOrchestrator implementation and the STEP_TIMEOUTS dict pattern.
When two steps both complete before a failure is detected, compensation must run in strict reverse order or you leave data in an inconsistent state. Verify that _compensate() iterates from current_step - 1 down to 0, and add an integration test that deliberately fails at each step index to confirm correct rollback order.
The references/ directory contains production-grade implementations not needed for most sagas:
references/advanced-patterns.md — Full SagaOrchestrator abstract base class, TimeoutSagaOrchestrator with per-step deadlines, detailed bank transfer compensating transaction chain, Prometheus instrumentation, stuck saga PromQL alerts, and DLQ recovery worker.cqrs-implementation — Pair sagas with CQRS for read-model updates after each step completesevent-store-design — Store saga events in an event store for full audit trail and replay capabilityworkflow-orchestration-patterns — Higher-level workflow engines (Temporal, Conductor) that build on saga concepts