event-store

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.

7 stars

Best use case

event-store is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.

Teams using event-store should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/event-store/SKILL.md --create-dirs "https://raw.githubusercontent.com/wpank/ai/main/skills/backend/event-store/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/event-store/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How event-store Compares

Feature / Agentevent-storeStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Event Store

Guide to designing event stores for event-sourced applications — covering event schemas, projections, snapshotting, and CQRS integration.


## Installation

### OpenClaw / Moltbot / Clawbot

```bash
npx clawhub@latest install event-store
```


## When to Use This Skill

- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Building projections from event streams
- Adding snapshotting for aggregate performance
- Integrating CQRS with event sourcing

## Core Concepts

### Event Store Architecture

```
┌─────────────────────────────────────────────────────┐
│                    Event Store                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Stream 1   │  │   Stream 2   │  │   Stream 3   │ │
│  │ (Aggregate)  │  │ (Aggregate)  │  │ (Aggregate)  │ │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤ │
│  │ Event 1     │  │ Event 1     │  │ Event 1     │ │
│  │ Event 2     │  │ Event 2     │  │ Event 2     │ │
│  │ Event 3     │  │ ...         │  │ Event 3     │ │
│  │ ...         │  │             │  │ Event 4     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘ │
├─────────────────────────────────────────────────────┤
│  Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ...     │
└─────────────────────────────────────────────────────┘
```

### Event Store Requirements

| Requirement       | Description                        |
| ----------------- | ---------------------------------- |
| **Append-only**   | Events are immutable, only appends |
| **Ordered**       | Per-stream and global ordering     |
| **Versioned**     | Optimistic concurrency control     |
| **Subscriptions** | Real-time event notifications      |
| **Idempotent**    | Handle duplicate writes safely     |

### Technology Comparison

| Technology       | Best For                | Limitations                      |
| ---------------- | ----------------------- | -------------------------------- |
| **EventStoreDB** | Pure event sourcing     | Single-purpose                   |
| **PostgreSQL**   | Existing Postgres stack | Manual implementation            |
| **Kafka**        | High-throughput streams | Not ideal for per-stream queries |
| **DynamoDB**     | Serverless, AWS-native  | Query limitations                |

## Event Schema Design

Events are the source of truth. Well-designed schemas ensure long-term evolvability.

### Event Envelope Structure

```json
{
  "event_id": "uuid",
  "stream_id": "Order-abc123",
  "event_type": "OrderPlaced",
  "version": 1,
  "schema_version": 1,
  "data": {
    "customer_id": "cust-1",
    "total_cents": 5000
  },
  "metadata": {
    "correlation_id": "req-xyz",
    "causation_id": "evt-prev",
    "user_id": "user-1",
    "timestamp": "2025-01-15T10:30:00Z"
  },
  "global_position": 42
}
```

### Schema Evolution Rules

1. **Add fields freely** — new optional fields are always safe
2. **Never remove or rename fields** — introduce a new event type instead
3. **Version event types** — `OrderPlacedV2` when the schema changes materially
4. **Upcast on read** — transform old versions to the current shape in the deserializer

## PostgreSQL Event Store Schema

```sql
CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id VARCHAR(255) NOT NULL,
    stream_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    version BIGINT NOT NULL,
    global_position BIGSERIAL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);

CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_global ON events(global_position);
CREATE INDEX idx_events_type ON events(event_type);

CREATE TABLE snapshots (
    stream_id VARCHAR(255) PRIMARY KEY,
    stream_type VARCHAR(255) NOT NULL,
    snapshot_data JSONB NOT NULL,
    version BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE subscription_checkpoints (
    subscription_id VARCHAR(255) PRIMARY KEY,
    last_position BIGINT NOT NULL DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);
```

## Event Store Implementation

```python
@dataclass
class Event:
    stream_id: str
    event_type: str
    data: dict
    metadata: dict = field(default_factory=dict)
    event_id: UUID = field(default_factory=uuid4)
    version: int | None = None
    global_position: int | None = None

class EventStore:  # backed by PostgreSQL schema above
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def append(self, stream_id: str, stream_type: str,
                     events: list[Event],
                     expected_version: int | None = None) -> list[Event]:
        """Append events with optimistic concurrency control."""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                if expected_version is not None:
                    current = await conn.fetchval(
                        "SELECT MAX(version) FROM events "
                        "WHERE stream_id = $1", stream_id
                    ) or 0
                    if current != expected_version:
                        raise ConcurrencyError(
                            f"Expected {expected_version}, got {current}"
                        )

                start = await conn.fetchval(
                    "SELECT COALESCE(MAX(version), 0) + 1 "
                    "FROM events WHERE stream_id = $1", stream_id
                )
                for i, evt in enumerate(events):
                    evt.version = start + i
                    row = await conn.fetchrow(
                        "INSERT INTO events (id, stream_id, stream_type, "
                        "event_type, event_data, metadata, version) "
                        "VALUES ($1,$2,$3,$4,$5,$6,$7) "
                        "RETURNING global_position",
                        evt.event_id, stream_id, stream_type,
                        evt.event_type, json.dumps(evt.data),
                        json.dumps(evt.metadata), evt.version,
                    )
                    evt.global_position = row["global_position"]
                return events

    async def read_stream(self, stream_id: str,
                          from_version: int = 0) -> list[Event]:
        """Read events for a single stream."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM events WHERE stream_id = $1 "
                "AND version >= $2 ORDER BY version",
                stream_id, from_version,
            )
            return [self._to_event(r) for r in rows]

    async def read_all(self, from_position: int = 0,
                       limit: int = 1000) -> list[Event]:
        """Read global event stream for projections / subscriptions."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM events WHERE global_position > $1 "
                "ORDER BY global_position LIMIT $2",
                from_position, limit,
            )
            return [self._to_event(r) for r in rows]
```

## Projections

Projections build read-optimised views by replaying events. They are the "Q" side of CQRS.

### Projection Lifecycle

1. **Start from checkpoint** — resume from last processed global position
2. **Apply events** — update the read model for each relevant event type
3. **Save checkpoint** — persist the new position atomically with the read model

### Projection Example

```python
class OrderSummaryProjection:
    def __init__(self, db, event_store: EventStore):
        self.db = db
        self.store = event_store

    async def run(self, batch_size: int = 100):
        position = await self._load_checkpoint()
        while True:
            events = await self.store.read_all(position, batch_size)
            if not events:
                await asyncio.sleep(1)
                continue
            for evt in events:
                await self._apply(evt)
                position = evt.global_position
            await self._save_checkpoint(position)

    async def _apply(self, event: Event):
        match event.event_type:
            case "OrderPlaced":
                await self.db.execute(
                    "INSERT INTO order_summaries (id, customer, total, status) "
                    "VALUES ($1,$2,$3,'placed')",
                    event.data["order_id"], event.data["customer_id"],
                    event.data["total_cents"],
                )
            case "OrderShipped":
                await self.db.execute(
                    "UPDATE order_summaries SET status='shipped' "
                    "WHERE id=$1", event.data["order_id"],
                )
```

### Projection Design Rules

- **Idempotent handlers** — replaying the same event twice must not corrupt state
- **One projection per read model** — keep projections focused
- **Rebuild from scratch** — projections should be deletable and fully replayable
- **Separate storage** — projections can live in different databases (Postgres, Elasticsearch, Redis)

## Snapshotting

Snapshots accelerate aggregate rehydration by caching state at a known version.

Use when streams exceed ~100 events, aggregates have expensive rehydration, or on a cadence (e.g., every 50 events).

### Snapshot Flow

```python
class SnapshottedRepository:
    def __init__(self, event_store: EventStore, pool):
        self.store = event_store
        self.pool = pool

    async def load(self, stream_id: str) -> Aggregate:
        # 1. Try loading snapshot
        snap = await self._load_snapshot(stream_id)
        from_version = 0
        aggregate = Aggregate(stream_id)

        if snap:
            aggregate.restore(snap["data"])
            from_version = snap["version"] + 1

        # 2. Replay events after snapshot
        events = await self.store.read_stream(stream_id, from_version)
        for evt in events:
            aggregate.apply(evt)

        # 3. Snapshot if too many events replayed
        if len(events) > 50:
            await self._save_snapshot(
                stream_id, aggregate.snapshot(), aggregate.version
            )

        return aggregate
```

## CQRS Integration

CQRS separates the write model (commands → events) from the read model (projections).

```
Commands ──► Aggregate ──► Event Store ──► Projections ──► Query API
 (write)     (domain)      (append)        (build)        (read)
```

### Key Principles

1. **Write side** validates commands, emits events, enforces invariants
2. **Read side** subscribes to events, builds optimised query models
3. **Eventual consistency** — reads may lag behind writes by milliseconds to seconds
4. **Independent scaling** — scale reads and writes separately

### Command Handler Pattern

```python
class PlaceOrderHandler:
    def __init__(self, event_store: EventStore):
        self.store = event_store

    async def handle(self, cmd: PlaceOrderCommand):
        # Load aggregate from events
        events = await self.store.read_stream(f"Order-{cmd.order_id}")
        order = Order.reconstitute(events)

        # Execute command — validates and produces new events
        new_events = order.place(cmd.customer_id, cmd.items)

        # Persist with concurrency check
        await self.store.append(
            f"Order-{cmd.order_id}", "Order", new_events,
            expected_version=order.version,
        )
```

## EventStoreDB Integration

```python
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json

client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

def append_events(stream_name: str, events: list, expected_revision=None):
    new_events = [
        NewEvent(
            type=event['type'],
            data=json.dumps(event['data']).encode(),
            metadata=json.dumps(event.get('metadata', {})).encode()
        )
        for event in events
    ]
    state = (StreamState.ANY if expected_revision is None
             else StreamState.NO_STREAM if expected_revision == -1
             else expected_revision)
    return client.append_to_stream(stream_name, new_events, current_version=state)

def read_stream(stream_name: str, from_revision: int = 0):
    return [
        {'type': e.type, 'data': json.loads(e.data),
         'stream_position': e.stream_position}
        for e in client.get_stream(stream_name, stream_position=from_revision)
    ]

# Category projection: read all events for Order-* streams
def read_category(category: str):
    return read_stream(f"$ce-{category}")
```

## DynamoDB Event Store

```python
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json, uuid

class DynamoEventStore:
    def __init__(self, table_name: str):
        self.table = boto3.resource('dynamodb').Table(table_name)

    def append(self, stream_id: str, events: list, expected_version: int = 0):
        with self.table.batch_writer() as batch:
            for i, event in enumerate(events):
                version = expected_version + i + 1
                batch.put_item(Item={
                    'PK': f"STREAM#{stream_id}",
                    'SK': f"VERSION#{version:020d}",
                    'GSI1PK': 'EVENTS',
                    'GSI1SK': datetime.utcnow().isoformat(),
                    'event_id': str(uuid.uuid4()),
                    'event_type': event['type'],
                    'event_data': json.dumps(event['data']),
                    'version': version,
                })

    def read_stream(self, stream_id: str, from_version: int = 0):
        resp = self.table.query(
            KeyConditionExpression=
                Key('PK').eq(f"STREAM#{stream_id}") &
                Key('SK').gte(f"VERSION#{from_version:020d}")
        )
        return [
            {'event_type': item['event_type'],
             'data': json.loads(item['event_data']),
             'version': item['version']}
            for item in resp['Items']
        ]
```

**DynamoDB table design:** PK=`STREAM#{id}`, SK=`VERSION#{version}`, GSI1 for global ordering.

## Best Practices

### Do

- **Name streams `{Type}-{id}`** — e.g., `Order-abc123`
- **Include correlation / causation IDs** in metadata for tracing
- **Version event schemas from day one** — plan for evolution
- **Implement idempotent writes** — use event IDs for deduplication
- **Index for your query patterns** — stream, global position, event type

### Don't

- **Mutate or delete events** — they are immutable facts
- **Store large payloads** — keep events small; reference blobs externally
- **Skip optimistic concurrency** — prevents data corruption
- **Ignore backpressure** — handle slow consumers gracefully
- **Couple projections to the write model** — projections should be independently deployable

## NEVER Do

- **NEVER update or delete events** — Events are immutable historical facts; create compensating events instead
- **NEVER skip version checks on append** — Optimistic concurrency prevents lost updates and corruption
- **NEVER embed large blobs in events** — Store blobs externally, reference by ID in the event
- **NEVER use random UUIDs for event IDs without idempotency checks** — Retries create duplicates
- **NEVER read projections for command validation** — Use the event stream as the source of truth
- **NEVER couple projections to the write transaction** — Projections must be rebuildable independently

Related Skills

schema-markup

7
from wpank/ai

Add, fix, or optimize schema markup and structured data. Use when the user mentions schema markup, structured data, JSON-LD, rich snippets, schema.org, FAQ schema, product schema, review schema, or breadcrumb schema.

prompt-engineering

7
from wpank/ai

Master advanced prompt engineering techniques to maximize LLM performance, reliability, and controllability in production. Use when optimizing prompts, improving LLM outputs, designing production prompt templates, or building AI-powered features.

professional-communication

7
from wpank/ai

Write effective professional messages for software teams. Use when drafting emails, Slack/Teams messages, meeting agendas, status updates, or translating technical concepts for non-technical audiences. Triggers on email, slack, teams, message, meeting agenda, status update, stakeholder communication, escalation, jargon translation.

persona-docs

7
from wpank/ai

Create persona documentation for a product or codebase. Use when asked to create persona docs, document target users, define user journeys, document onboarding flows, or when starting a new product and needing to define its audience. Persona docs should be the first documentation created for any product.

mermaid-diagrams

7
from wpank/ai

Create software diagrams using Mermaid syntax. Use when users need to create, visualize, or document software through diagrams including class diagrams, sequence diagrams, flowcharts, ERDs, C4 architecture diagrams, state diagrams, git graphs, and other diagram types. Triggers include requests to diagram, visualize, model, map out, or show the flow of a system.

game-changing-features

7
from wpank/ai

Find 10x product opportunities and high-leverage improvements. Use when the user wants strategic product thinking, mentions 10x, wants to find high-impact features, or asks what would make a product dramatically more valuable.

clear-writing

7
from wpank/ai

Write clear, concise prose for humans — documentation, READMEs, API docs, commit messages, error messages, UI text, reports, and explanations. Combines Strunk's rules for clearer prose with technical documentation patterns, structure templates, and review checklists.

brainstorming

7
from wpank/ai

Explore ideas before implementation through collaborative dialogue. Use before any creative work — creating features, building components, adding functionality, or modifying behavior. Turns ideas into fully formed designs and specs through structured conversation.

Article Illustrator

7
from wpank/ai

When the user wants to add illustrations to an article or blog post. Triggers on: "illustrate article", "add images to article", "generate illustrations", "article images", or requests to visually enhance written content. Analyzes article structure, identifies positions for visual aids, and generates illustrations using a Type x Style two-dimension approach.

subagent-driven-development

7
from wpank/ai

Execute implementation plans by dispatching a fresh subagent per task with two-stage review (spec compliance then code quality). Use when you have an implementation plan with mostly independent tasks and want high-quality, fast iteration within a single session.

skill-judge

7
from wpank/ai

Evaluate Agent Skill quality against official specifications. Use when reviewing SKILL.md files, auditing skill packages, improving skill design, or checking if a skill follows best practices. Provides 8-dimension scoring (120 points) with actionable improvements. Triggers on review skill, evaluate skill, audit skill, improve skill, skill quality, SKILL.md review.

skill-creator

7
from wpank/ai

WHAT: Guide for creating effective AI agent skills - modular packages that extend Claude's capabilities with specialized knowledge, workflows, and tools. WHEN: User wants to create, write, author, or update a skill. User asks about skill structure, SKILL.md format, or how to package domain knowledge for AI agents. KEYWORDS: "create a skill", "make a skill", "new skill", "skill template", "SKILL.md", "agent skill", "write a skill", "skill structure", "package a skill"