microservices-patterns

7 stars

Best use case

microservices-patterns is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Teams using microservices-patterns should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/microservices-patterns/SKILL.md --create-dirs "https://raw.githubusercontent.com/wpank/ai/main/skills/backend/microservices-patterns/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/microservices-patterns/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How microservices-patterns Compares

Feature / Agentmicroservices-patternsStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

This skill provides specific capabilities for your AI agent. See the About section for full details.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Microservices Patterns

## WHAT
Patterns for building distributed systems: service decomposition, inter-service communication, data management, and resilience. Helps you avoid the "distributed monolith" anti-pattern.

## WHEN
- Decomposing a monolith into microservices
- Designing service boundaries and contracts
- Implementing inter-service communication
- Managing distributed transactions
- Building resilient distributed systems

## KEYWORDS
microservices, service mesh, event-driven, saga, circuit breaker, API gateway, service discovery, distributed transactions, eventual consistency, CQRS


## Installation

### OpenClaw / Moltbot / Clawbot

```bash
npx clawhub@latest install microservices-patterns
```


---

## Decision Framework: When to Use Microservices

| If you have... | Then... |
|----------------|---------|
| Small team (<5 devs), simple domain | Start with monolith |
| Need independent deployment/scaling | Consider microservices |
| Multiple teams, clear domain boundaries | Microservices work well |
| Tight deadlines, unknown requirements | Monolith first, extract later |

**Rule of thumb**: If you can't define clear service boundaries, you're not ready for microservices.

---

## Service Decomposition Patterns

### Pattern 1: By Business Capability

Organize services around business functions, not technical layers.

```
E-commerce Example:
├── order-service       # Order lifecycle
├── payment-service     # Payment processing
├── inventory-service   # Stock management
├── shipping-service    # Fulfillment
└── notification-service # Emails, SMS
```

### Pattern 2: Strangler Fig (Monolith Migration)

Gradually extract from monolith without big-bang rewrites.

```
1. Identify bounded context to extract
2. Create new microservice
3. Route new traffic to microservice
4. Gradually migrate existing functionality
5. Remove from monolith when complete
```

```python
# API Gateway routing during migration
async def route_orders(request):
    if request.path.startswith("/api/orders/v2"):
        return await new_order_service.forward(request)
    else:
        return await legacy_monolith.forward(request)
```

---

## Communication Patterns

### Pattern 1: Synchronous (REST/gRPC)

Use for: Queries, when you need immediate response.

```python
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class ServiceClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=5.0)
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def get(self, path: str):
        """GET with automatic retries."""
        response = await self.client.get(f"{self.base_url}{path}")
        response.raise_for_status()
        return response.json()

# Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get(f"/payments/{payment_id}")
```

### Pattern 2: Asynchronous (Events)

Use for: Commands, when eventual consistency is acceptable.

```python
from aiokafka import AIOKafkaProducer
import json

@dataclass
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    occurred_at: datetime
    data: dict

class EventBus:
    def __init__(self, bootstrap_servers: List[str]):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
    
    async def publish(self, event: DomainEvent):
        await self.producer.send_and_wait(
            event.event_type,  # Topic = event type
            value=asdict(event),
            key=event.aggregate_id.encode()
        )

# Order service publishes
await event_bus.publish(DomainEvent(
    event_id=str(uuid.uuid4()),
    event_type="OrderCreated",
    aggregate_id=order.id,
    occurred_at=datetime.now(),
    data={"order_id": order.id, "customer_id": order.customer_id}
))

# Inventory service subscribes and reacts
async def handle_order_created(event_data: dict):
    order_id = event_data["data"]["order_id"]
    items = event_data["data"]["items"]
    await reserve_inventory(order_id, items)
```

### When to Use Each

| Synchronous | Asynchronous |
|-------------|--------------|
| Need immediate response | Fire-and-forget |
| Simple query/response | Long-running operations |
| Low latency required | Decoupling is priority |
| Tight coupling acceptable | Eventual consistency OK |

---

## Data Patterns

### Database Per Service

Each service owns its data. **No shared databases.**

```
order-service     → orders_db (PostgreSQL)
payment-service   → payments_db (PostgreSQL)
product-service   → products_db (MongoDB)
analytics-service → analytics_db (ClickHouse)
```

### Saga Pattern (Distributed Transactions)

For operations spanning multiple services that need rollback capability.

```python
class SagaStep:
    def __init__(self, name: str, action: Callable, compensation: Callable):
        self.name = name
        self.action = action
        self.compensation = compensation

class OrderFulfillmentSaga:
    def __init__(self):
        self.steps = [
            SagaStep("create_order", self.create_order, self.cancel_order),
            SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
            SagaStep("process_payment", self.process_payment, self.refund_payment),
            SagaStep("confirm_order", self.confirm_order, self.cancel_confirmation),
        ]
    
    async def execute(self, order_data: dict) -> SagaResult:
        completed_steps = []
        context = {"order_data": order_data}
        
        for step in self.steps:
            try:
                result = await step.action(context)
                if not result.success:
                    await self.compensate(completed_steps, context)
                    return SagaResult(status="failed", error=result.error)
                completed_steps.append(step)
                context.update(result.data)
            except Exception as e:
                await self.compensate(completed_steps, context)
                return SagaResult(status="failed", error=str(e))
        
        return SagaResult(status="completed", data=context)
    
    async def compensate(self, completed_steps: List[SagaStep], context: dict):
        """Execute compensating actions in reverse order."""
        for step in reversed(completed_steps):
            try:
                await step.compensation(context)
            except Exception as e:
                # Log but continue compensating
                logger.error(f"Compensation failed for {step.name}: {e}")
```

---

## Resilience Patterns

### Circuit Breaker

Fail fast when a service is down. Prevents cascade failures.

```python
from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None
    
    async def call(self, func: Callable, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen("Service unavailable")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
    
    def _on_failure(self):
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()
    
    def _should_attempt_reset(self) -> bool:
        return datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout)

# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

async def call_payment_service(data: dict):
    return await breaker.call(payment_client.post, "/payments", json=data)
```

### Retry with Exponential Backoff

For transient failures.

```python
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError))
)
async def fetch_user(user_id: str):
    response = await client.get(f"/users/{user_id}")
    response.raise_for_status()
    return response.json()
```

### Bulkhead

Isolate resources to limit impact of failures.

```python
import asyncio

class Bulkhead:
    def __init__(self, max_concurrent: int):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call(self, func: Callable, *args, **kwargs):
        async with self.semaphore:
            return await func(*args, **kwargs)

# Limit concurrent calls to each service
payment_bulkhead = Bulkhead(max_concurrent=10)
inventory_bulkhead = Bulkhead(max_concurrent=20)

result = await payment_bulkhead.call(payment_service.charge, amount)
```

---

## API Gateway Pattern

Single entry point for all clients.

```python
from fastapi import FastAPI, Depends, HTTPException
from circuitbreaker import circuit

app = FastAPI()

class APIGateway:
    def __init__(self):
        self.clients = {
            "orders": httpx.AsyncClient(base_url="http://order-service:8000"),
            "payments": httpx.AsyncClient(base_url="http://payment-service:8001"),
            "inventory": httpx.AsyncClient(base_url="http://inventory-service:8002"),
        }
    
    @circuit(failure_threshold=5, recovery_timeout=30)
    async def forward(self, service: str, path: str, **kwargs):
        client = self.clients[service]
        response = await client.request(**kwargs, url=path)
        response.raise_for_status()
        return response.json()
    
    async def aggregate(self, order_id: str) -> dict:
        """Aggregate data from multiple services."""
        results = await asyncio.gather(
            self.forward("orders", f"/orders/{order_id}", method="GET"),
            self.forward("payments", f"/payments/order/{order_id}", method="GET"),
            self.forward("inventory", f"/reservations/order/{order_id}", method="GET"),
            return_exceptions=True
        )
        
        return {
            "order": results[0] if not isinstance(results[0], Exception) else None,
            "payment": results[1] if not isinstance(results[1], Exception) else None,
            "inventory": results[2] if not isinstance(results[2], Exception) else None,
        }

gateway = APIGateway()

@app.get("/api/orders/{order_id}")
async def get_order_aggregate(order_id: str):
    return await gateway.aggregate(order_id)
```

---

## Health Checks

Every service needs liveness and readiness probes.

```python
@app.get("/health/live")
async def liveness():
    """Is the process running?"""
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """Can we serve traffic?"""
    checks = {
        "database": await check_database(),
        "cache": await check_redis(),
    }
    
    all_healthy = all(checks.values())
    status = "ready" if all_healthy else "not_ready"
    
    return {"status": status, "checks": checks}
```

---

## NEVER

- **Shared Databases**: Creates tight coupling, defeats the purpose
- **Synchronous Chains**: A → B → C → D = fragile, slow
- **No Circuit Breakers**: One service down takes everything down
- **Distributed Monolith**: Services that must deploy together
- **Ignoring Network Failures**: Assume the network WILL fail
- **No Compensation Logic**: Can't undo failed distributed transactions
- **Starting with Microservices**: Always start with a well-structured monolith
- **Chatty Services**: Too many inter-service calls = latency death

Related Skills

testing-patterns

7
from wpank/ai

Unit, integration, and E2E testing patterns with framework-specific guidance. Use when asked to "write tests", "add test coverage", "testing strategy", "test this function", "create test suite", "fix flaky tests", or "improve test quality".

e2e-testing-patterns

7
from wpank/ai

Build reliable, fast E2E test suites with Playwright and Cypress. Critical user journey coverage, flaky test elimination, CI/CD integration.

websocket-hub-patterns

7
from wpank/ai

Horizontally-scalable WebSocket hub pattern with lazy Redis subscriptions, connection registry, and graceful shutdown. Use when building real-time WebSocket servers that scale across multiple instances. Triggers on WebSocket hub, WebSocket scaling, connection registry, Redis WebSocket, real-time gateway, horizontal scaling.

workflow-patterns

7
from wpank/ai

Systematic task implementation using TDD, phase checkpoints, and structured commits. Ensures quality through red-green-refactor cycles, 80% coverage gates, and verification protocols before proceeding.

estimation-patterns

7
from wpank/ai

Practical estimation techniques for software tasks — methods comparison, decomposition, complexity multipliers, buffer calculation, bias awareness, and communication strategies. Use when estimating features, sprint planning, or presenting timelines to stakeholders.

10x-patterns

7
from wpank/ai

Patterns and practices that dramatically accelerate development velocity. Covers parallel execution, automation, feedback loops, workflow optimization, and anti-pattern avoidance. Use when starting projects, planning sprints, optimizing workflows, or onboarding developers.

react-composition-patterns

7
from wpank/ai

No description provided.

loading-state-patterns

7
from wpank/ai

Patterns for skeleton loaders, shimmer effects, and loading states that match design system aesthetics. Covers skeleton components, shimmer animations, and progressive loading. Use when building polished loading experiences. Triggers on skeleton, loading state, shimmer, placeholder, loading animation.

design-system-patterns

7
from wpank/ai

Foundational design system architecture — token hierarchies, theming infrastructure, token pipelines, and governance. Use when creating design tokens, implementing theme switching, setting up Style Dictionary, or establishing multi-brand theming. Triggers on design tokens, theme provider, Style Dictionary, token pipeline, multi-brand theming, CSS custom properties architecture.

nodejs-patterns

7
from wpank/ai

WHAT: Production-ready Node.js backend patterns - Express/Fastify setup, layered architecture, middleware, error handling, validation, database integration, authentication, and caching. WHEN: User is building REST APIs, setting up Node.js servers, implementing authentication, integrating databases, adding validation/caching, or structuring backend applications. KEYWORDS: nodejs, node, express, fastify, typescript, api, rest, middleware, authentication, jwt, validation, zod, postgres, mongodb, redis, caching, rate limiting, error handling

architecture-patterns

7
from wpank/ai

No description provided.

auth-patterns

7
from wpank/ai

Authentication and authorization patterns — JWT, OAuth 2.0, sessions, RBAC/ABAC, password security, MFA, and vulnerability prevention. Use when implementing login flows, protecting routes, managing tokens, or auditing auth security.