saga-orchestration
Patterns for managing distributed transactions and long-running business processes.
Best use case
saga-orchestration is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Patterns for managing distributed transactions and long-running business processes.
Teams using saga-orchestration should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/saga-orchestration/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How saga-orchestration Compares
| Feature / Agent | saga-orchestration | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Patterns for managing distributed transactions and long-running business processes.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Saga Orchestration
Patterns for managing distributed transactions and long-running business processes.
## Do not use this skill when
- The task is unrelated to saga orchestration
- You need a different domain or tool outside this scope
## Instructions
- Clarify goals, constraints, and required inputs.
- Apply relevant best practices and validate outcomes.
- Provide actionable steps and verification.
- If detailed examples are required, open `resources/implementation-playbook.md`.
## Use this skill when
- Coordinating multi-service transactions
- Implementing compensating transactions
- Managing long-running business workflows
- Handling failures in distributed systems
- Building order fulfillment processes
- Implementing approval workflows
## Core Concepts
### 1. Saga Types
```
Choreography Orchestration
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
│Svc1││Svc2││Svc3│
└────┘└────┘└────┘
```
### 2. Saga Execution States
| State | Description |
| ---------------- | ------------------------------ |
| **Started** | Saga initiated |
| **Pending** | Waiting for step completion |
| **Compensating** | Rolling back due to failure |
| **Completed** | All steps succeeded |
| **Failed** | Saga failed after compensation |
## Templates
### Template 1: Saga Orchestrator Base
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid
class SagaState(Enum):
STARTED = "started"
PENDING = "pending"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
result: Optional[Dict] = None
error: Optional[str] = None
executed_at: Optional[datetime] = None
compensated_at: Optional[datetime] = None
@dataclass
class Saga:
saga_id: str
saga_type: str
state: SagaState
data: Dict[str, Any]
steps: List[SagaStep]
current_step: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
class SagaOrchestrator(ABC):
"""Base class for saga orchestrators."""
def __init__(self, saga_store, event_publisher):
self.saga_store = saga_store
self.event_publisher = event_publisher
@abstractmethod
def define_steps(self, data: Dict) -> List[SagaStep]:
"""Define the saga steps."""
pass
@property
@abstractmethod
def saga_type(self) -> str:
"""Unique saga type identifier."""
pass
async def start(self, data: Dict) -> Saga:
"""Start a new saga."""
saga = Saga(
saga_id=str(uuid.uuid4()),
saga_type=self.saga_type,
state=SagaState.STARTED,
data=data,
steps=self.define_steps(data)
)
await self.saga_store.save(saga)
await self._execute_next_step(saga)
return saga
async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
"""Handle successful step completion."""
saga = await self.saga_store.get(saga_id)
# Update step
for step in saga.steps:
if step.name == step_name:
step.status = "completed"
step.result = result
step.executed_at = datetime.utcnow()
break
saga.current_step += 1
saga.updated_at = datetime.utcnow()
# Check if saga is complete
if saga.current_step >= len(saga.steps):
saga.state = SagaState.COMPLETED
await self.saga_store.save(saga)
await self._on_saga_completed(saga)
else:
saga.state = SagaState.PENDING
await self.saga_store.save(saga)
await self._execute_next_step(saga)
async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
"""Handle step failure - start compensation."""
saga = await self.saga_store.get(saga_id)
# Mark step as failed
for step in saga.steps:
if step.name == step_name:
step.status = "failed"
step.error = error
break
saga.state = SagaState.COMPENSATING
saga.updated_at = datetime.utcnow()
await self.saga_store.save(saga)
# Start compensation from current step backwards
await self._compensate(saga)
async def _execute_next_step(self, saga: Saga):
"""Execute the next step in the saga."""
if saga.current_step >= len(saga.steps):
return
step = saga.steps[saga.current_step]
step.status = "executing"
await self.saga_store.save(saga)
# Publish command to execute step
await self.event_publisher.publish(
step.action,
{
"saga_id": saga.saga_id,
"step_name": step.name,
**saga.data
}
)
async def _compensate(self, saga: Saga):
"""Execute compensation for completed steps."""
# Compensate in reverse order
for i in range(saga.current_step - 1, -1, -1):
step = saga.steps[i]
if step.status == "completed":
step.status = "compensating"
await self.saga_store.save(saga)
await self.event_publisher.publish(
step.compensation,
{
"saga_id": saga.saga_id,
"step_name": step.name,
"original_result": step.result,
**saga.data
}
)
async def handle_compensation_completed(self, saga_id: str, step_name: str):
"""Handle compensation completion."""
saga = await self.saga_store.get(saga_id)
for step in saga.steps:
if step.name == step_name:
step.status = "compensated"
step.compensated_at = datetime.utcnow()
break
# Check if all compensations complete
all_compensated = all(
s.status in ("compensated", "pending", "failed")
for s in saga.steps
)
if all_compensated:
saga.state = SagaState.FAILED
await self._on_saga_failed(saga)
await self.saga_store.save(saga)
async def _on_saga_completed(self, saga: Saga):
"""Called when saga completes successfully."""
await self.event_publisher.publish(
f"{self.saga_type}Completed",
{"saga_id": saga.saga_id, **saga.data}
)
async def _on_saga_failed(self, saga: Saga):
"""Called when saga fails after compensation."""
await self.event_publisher.publish(
f"{self.saga_type}Failed",
{"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
)
```
### Template 2: Order Fulfillment Saga
```python
class OrderFulfillmentSaga(SagaOrchestrator):
"""Orchestrates order fulfillment across services."""
@property
def saga_type(self) -> str:
return "OrderFulfillment"
def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
)
]
# Usage
async def create_order(order_data: Dict):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"]
})
# Event handlers in each service
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
# Reserve inventory
reservation = await self.reserve(
command["items"],
command["order_id"]
)
# Report success
await self.event_publisher.publish(
"SagaStepCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
}
)
except InsufficientInventoryError as e:
await self.event_publisher.publish(
"SagaStepFailed",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
}
)
async def handle_release_reservation(self, command: Dict):
# Compensating action
await self.release_reservation(
command["original_result"]["reservation_id"]
)
await self.event_publisher.publish(
"SagaCompensationCompleted",
{
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
}
)
```
### Template 3: Choreography-Based Saga
```python
from dataclasses import dataclass
from typing import Dict, Any
import asyncio
@dataclass
class SagaContext:
"""Passed through choreographed saga events."""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list
class OrderChoreographySaga:
"""Choreography-based saga using events."""
def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()
def _register_handlers(self):
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
# Compensation handlers
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
async def _on_order_created(self, event: Dict):
"""Step 1: Order created, reserve inventory."""
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"]
})
async def _on_inventory_reserved(self, event: Dict):
"""Step 2: Inventory reserved, process payment."""
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"]
})
async def _on_payment_processed(self, event: Dict):
"""Step 3: Payment done, create shipment."""
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"]
})
async def _on_shipment_created(self, event: Dict):
"""Step 4: Complete - send confirmation."""
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"]
})
# Compensation handlers
async def _on_payment_failed(self, event: Dict):
"""Payment failed - release inventory."""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "Payment failed"
})
async def _on_shipment_failed(self, event: Dict):
"""Shipment failed - refund payment and release inventory."""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"]
})
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"]
})
```
### Template 4: Saga with Timeouts
```python
class TimeoutSagaOrchestrator(SagaOrchestrator):
"""Saga orchestrator with step timeouts."""
def __init__(self, saga_store, event_publisher, scheduler):
super().__init__(saga_store, event_publisher)
self.scheduler = scheduler
async def _execute_next_step(self, saga: Saga):
if saga.current_step >= len(saga.steps):
return
step = saga.steps[saga.current_step]
step.status = "executing"
step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
await self.saga_store.save(saga)
# Schedule timeout check
await self.scheduler.schedule(
f"saga_timeout_{saga.saga_id}_{step.name}",
self._check_timeout,
{"saga_id": saga.saga_id, "step_name": step.name},
run_at=step.timeout_at
)
await self.event_publisher.publish(
step.action,
{"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
)
async def _check_timeout(self, data: Dict):
"""Check if step has timed out."""
saga = await self.saga_store.get(data["saga_id"])
step = next(s for s in saga.steps if s.name == data["step_name"])
if step.status == "executing":
# Step timed out - fail it
await self.handle_step_failed(
data["saga_id"],
data["step_name"],
"Step timed out"
)
```
## Durable Execution Alternative
The templates above build saga infrastructure from scratch — saga stores, event publishers, compensation tracking. **Durable execution frameworks** (like DBOS) eliminate much of this boilerplate: the workflow runtime automatically persists state to a database, retries failed steps, and resumes from the last checkpoint after crashes. Instead of building a `SagaOrchestrator` base class, you write a workflow function with steps — the framework handles persistence, crash recovery, and exactly-once execution semantics. Consider durable execution when you want saga-like reliability without managing the coordination infrastructure yourself.
## Best Practices
### Do's
- **Make steps idempotent** - Safe to retry
- **Design compensations carefully** - They must work
- **Use correlation IDs** - For tracing across services
- **Implement timeouts** - Don't wait forever
- **Log everything** - For debugging failures
### Don'ts
- **Don't assume instant completion** - Sagas take time
- **Don't skip compensation testing** - Most critical part
- **Don't couple services** - Use async messaging
- **Don't ignore partial failures** - Handle gracefully
## Related Skills
Works well with: `event-sourcing-architect`, `workflow-automation`, `dbos-*`
## Resources
- [Saga Pattern](https://microservices.io/patterns/data/saga.html)
- [Designing Data-Intensive Applications](https://dataintensive.net/)
## Limitations
- Use this skill only when the task clearly matches the scope described above.
- Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
- Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.Related Skills
workflow-orchestration-patterns
Master workflow orchestration architecture with Temporal, covering fundamental design decisions, resilience patterns, and best practices for building reliable distributed systems.
full-stack-orchestration-full-stack-feature
Use when working with full stack orchestration full stack feature
design-orchestration
Orchestrates design workflows by routing work through brainstorming, multi-agent review, and execution readiness in the correct order.
agent-orchestration-multi-agent-optimize
Optimize multi-agent systems with coordinated profiling, workload distribution, and cost-aware orchestration. Use when improving agent performance, throughput, or reliability.
agent-orchestration-improve-agent
Systematic improvement of existing agents through performance analysis, prompt engineering, and continuous iteration.
find-skills
Helps users discover and install agent skills when they ask questions like "how do I do X", "find a skill for X", "is there a skill that can...", or express interest in extending capabilities. This skill should be used when the user is looking for functionality that might exist as an installable skill.
vercel-cli-with-tokens
Deploy and manage projects on Vercel using token-based authentication. Use when working with Vercel CLI using access tokens rather than interactive login — e.g. "deploy to vercel", "set up vercel", "add environment variables to vercel".
vercel-react-view-transitions
Guide for implementing smooth, native-feeling animations using React's View Transition API (`<ViewTransition>` component, `addTransitionType`, and CSS view transition pseudo-elements). Use this skill whenever the user wants to add page transitions, animate route changes, create shared element animations, animate enter/exit of components, animate list reorder, implement directional (forward/back) navigation animations, or integrate view transitions in Next.js. Also use when the user mentions view transitions, `startViewTransition`, `ViewTransition`, transition types, or asks about animating between UI states in React without third-party animation libraries.
vercel-react-native-skills
React Native and Expo best practices for building performant mobile apps. Use when building React Native components, optimizing list performance, implementing animations, or working with native modules. Triggers on tasks involving React Native, Expo, mobile performance, or native platform APIs.
deploy-to-vercel
Deploy applications and websites to Vercel. Use when the user requests deployment actions like "deploy my app", "deploy and give me the link", "push this live", or "create a preview deployment".
vercel-composition-patterns
React composition patterns that scale. Use when refactoring components with boolean prop proliferation, building flexible component libraries, or designing reusable APIs. Triggers on tasks involving compound components, render props, context providers, or component architecture. Includes React 19 API changes.
vercel-deploy
Deploy applications and websites to Vercel. Use this skill when the user requests deployment actions such as "Deploy my app", "Deploy this to production", "Create a preview deployment", "Deploy and give me the link", or "Push this live". No authentication required - returns preview URL and claimable deployment link.