From backend-development
Implements saga orchestration and choreography for distributed microservices transactions without 2PC. Defines steps, compensating actions, timeouts, and monitoring for workflows like orders or bookings.
npx claudepluginhub sumeet138/qwen-code-agents --plugin backend-developmentThis skill uses the workspace's default tool permissions.
Patterns for managing distributed transactions and long-running business processes without two-phase commit.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Patterns for managing distributed transactions and long-running business processes without two-phase commit.
What you provide:
What this skill produces:
Choreography Orchestration
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
Each service reacts to the │Svc1││Svc2││Svc3│
previous service's event. └────┘└────┘└────┘
No central coordinator. Central coordinator sends
commands and tracks state.
Choose orchestration when: You need explicit step tracking, retries, and centralized visibility. Easier to debug.
Choose choreography when: You want loose coupling and services that can evolve independently. Harder to trace.
| State | Description |
|---|---|
| Started | Saga initiated, first step dispatched |
| Pending | Waiting for a step reply from a participant |
| Compensating | A step failed; rolling back completed steps |
| Completed | All forward steps succeeded |
| Failed | Saga failed and all compensations have finished |
| Situation | Handling |
|---|---|
| Step never started | No compensation needed (skip) |
| Step completed successfully | Run compensation command |
| Step failed before completion | No compensation needed; mark failed |
| Compensation itself fails | Retry with backoff → DLQ → manual intervention alert |
| Step result no longer exists | Treat compensation as success (idempotency) |
Concrete subclass of the base orchestrator. Defines four steps spanning inventory, payment, shipping, and notification. See references/advanced-patterns.md for the full abstract SagaOrchestrator base class.
from saga_orchestrator import SagaOrchestrator, SagaStep
from typing import Dict, List
class OrderFulfillmentSaga(SagaOrchestrator):
"""Orchestrates order fulfillment across four participant services."""
@property
def saga_type(self) -> str:
return "OrderFulfillment"
def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
),
]
# Start a saga
async def create_order(order_data: Dict, saga_store, event_publisher):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"],
})
# Participant service — handles command and publishes reply
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
reservation = await self.reserve(command["items"], command["order_id"])
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
except InsufficientInventoryError as e:
await self.event_publisher.publish("SagaStepFailed", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
})
async def handle_release_reservation(self, command: Dict):
"""Compensation — idempotent, always publishes completion."""
try:
await self.release_reservation(
command["original_result"]["reservation_id"]
)
except ReservationNotFoundError:
pass # Already released — treat as success
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
Each service listens for the previous service's event and reacts. No central coordinator. Compensation is triggered by failure events propagating backward.
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class SagaContext:
"""Carried through all events in a choreographed saga."""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list
class OrderChoreographySaga:
"""Choreography-based saga — services react to each other's events."""
def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()
def _register_handlers(self):
# Forward path
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
# Compensation path
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
async def _on_order_created(self, event: Dict):
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"],
})
async def _on_inventory_reserved(self, event: Dict):
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"],
})
async def _on_payment_processed(self, event: Dict):
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"],
})
async def _on_shipment_created(self, event: Dict):
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"],
})
# Compensation handlers
async def _on_payment_failed(self, event: Dict):
"""Payment failed — release inventory and mark order failed."""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "Payment failed",
})
async def _on_shipment_failed(self, event: Dict):
"""Shipment failed — refund payment and release inventory."""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"],
})
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
Every participant must guard against duplicate command delivery. Store an idempotency key before executing and return the cached result on replay.
async def handle_reserve_items(self, command: Dict):
"""Idempotency-guarded reservation step."""
idempotency_key = f"reserve-{command['order_id']}"
existing = await self.reservation_store.find_by_key(idempotency_key)
if existing:
# Already executed — return the previous result without side effects
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": existing.id}
})
return
# First execution
reservation = await self.reserve(
items=command["items"],
order_id=command["order_id"],
idempotency_key=idempotency_key
)
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
saga_id must flow through every event and logsaga_id, step_name, old_state → new_state on every changeA saga enters compensation but never reaches FAILED. This means a compensation handler is throwing an unhandled exception and never publishing SagaCompensationCompleted. Add dead-letter queue (DLQ) handling to compensation consumers and ensure every compensation action publishes a result event even when the underlying operation was already rolled back.
async def handle_release_reservation(self, command: Dict):
try:
await self.release_reservation(command["original_result"]["reservation_id"])
except ReservationNotFoundError:
pass # Already released — treat as success
# Always publish completion, regardless of outcome
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
If your orchestrator service restarts mid-saga, it may replay events and re-execute already-completed steps. Guard every step action with an idempotency key — see Template 3 above.
In a choreography-based saga, a downstream service may miss an event if it was offline when published. Use a durable message broker (Kafka with replication, RabbitMQ with persistence) and store the current saga state in a dedicated saga_log table so you can replay from the last known good step.
A step like create_shipment might take up to 15 minutes during peak load but your global timeout is 5 minutes, causing spurious compensation. Make step timeouts configurable per step type — see references/advanced-patterns.md for the TimeoutSagaOrchestrator implementation and the STEP_TIMEOUTS dict pattern.
When two steps both complete before a failure is detected, compensation must run in strict reverse order or you leave data in an inconsistent state. Verify that _compensate() iterates from current_step - 1 down to 0, and add an integration test that deliberately fails at each step index to confirm correct rollback order.
The references/ directory contains production-grade implementations not needed for most sagas:
references/advanced-patterns.md — Full SagaOrchestrator abstract base class, TimeoutSagaOrchestrator with per-step deadlines, detailed bank transfer compensating transaction chain, Prometheus instrumentation, stuck saga PromQL alerts, and DLQ recovery worker.cqrs-implementation — Pair sagas with CQRS for read-model updates after each step completesevent-store-design — Store saga events in an event store for full audit trail and replay capabilityworkflow-orchestration-patterns — Higher-level workflow engines (Temporal, Conductor) that build on saga concepts