event-store
Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.
Best use case
event-store is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.
Teams using event-store should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/event-store/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How event-store Compares
| Feature / Agent | event-store | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Event Store
Guide to designing event stores for event-sourced applications — covering event schemas, projections, snapshotting, and CQRS integration.
## Installation
### OpenClaw / Moltbot / Clawbot
```bash
npx clawhub@latest install event-store
```
## When to Use This Skill
- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Building projections from event streams
- Adding snapshotting for aggregate performance
- Integrating CQRS with event sourcing
## Core Concepts
### Event Store Architecture
```
┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘
```
### Event Store Requirements
| Requirement | Description |
| ----------------- | ---------------------------------- |
| **Append-only** | Events are immutable, only appends |
| **Ordered** | Per-stream and global ordering |
| **Versioned** | Optimistic concurrency control |
| **Subscriptions** | Real-time event notifications |
| **Idempotent** | Handle duplicate writes safely |
### Technology Comparison
| Technology | Best For | Limitations |
| ---------------- | ----------------------- | -------------------------------- |
| **EventStoreDB** | Pure event sourcing | Single-purpose |
| **PostgreSQL** | Existing Postgres stack | Manual implementation |
| **Kafka** | High-throughput streams | Not ideal for per-stream queries |
| **DynamoDB** | Serverless, AWS-native | Query limitations |
## Event Schema Design
Events are the source of truth. Well-designed schemas ensure long-term evolvability.
### Event Envelope Structure
```json
{
"event_id": "uuid",
"stream_id": "Order-abc123",
"event_type": "OrderPlaced",
"version": 1,
"schema_version": 1,
"data": {
"customer_id": "cust-1",
"total_cents": 5000
},
"metadata": {
"correlation_id": "req-xyz",
"causation_id": "evt-prev",
"user_id": "user-1",
"timestamp": "2025-01-15T10:30:00Z"
},
"global_position": 42
}
```
### Schema Evolution Rules
1. **Add fields freely** — new optional fields are always safe
2. **Never remove or rename fields** — introduce a new event type instead
3. **Version event types** — `OrderPlacedV2` when the schema changes materially
4. **Upcast on read** — transform old versions to the current shape in the deserializer
## PostgreSQL Event Store Schema
```sql
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_global ON events(global_position);
CREATE INDEX idx_events_type ON events(event_type);
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
## Event Store Implementation
```python
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: int | None = None
global_position: int | None = None
class EventStore: # backed by PostgreSQL schema above
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def append(self, stream_id: str, stream_type: str,
events: list[Event],
expected_version: int | None = None) -> list[Event]:
"""Append events with optimistic concurrency control."""
async with self.pool.acquire() as conn:
async with conn.transaction():
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events "
"WHERE stream_id = $1", stream_id
) or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected {expected_version}, got {current}"
)
start = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 "
"FROM events WHERE stream_id = $1", stream_id
)
for i, evt in enumerate(events):
evt.version = start + i
row = await conn.fetchrow(
"INSERT INTO events (id, stream_id, stream_type, "
"event_type, event_data, metadata, version) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) "
"RETURNING global_position",
evt.event_id, stream_id, stream_type,
evt.event_type, json.dumps(evt.data),
json.dumps(evt.metadata), evt.version,
)
evt.global_position = row["global_position"]
return events
async def read_stream(self, stream_id: str,
from_version: int = 0) -> list[Event]:
"""Read events for a single stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM events WHERE stream_id = $1 "
"AND version >= $2 ORDER BY version",
stream_id, from_version,
)
return [self._to_event(r) for r in rows]
async def read_all(self, from_position: int = 0,
limit: int = 1000) -> list[Event]:
"""Read global event stream for projections / subscriptions."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM events WHERE global_position > $1 "
"ORDER BY global_position LIMIT $2",
from_position, limit,
)
return [self._to_event(r) for r in rows]
```
## Projections
Projections build read-optimised views by replaying events. They are the "Q" side of CQRS.
### Projection Lifecycle
1. **Start from checkpoint** — resume from last processed global position
2. **Apply events** — update the read model for each relevant event type
3. **Save checkpoint** — persist the new position atomically with the read model
### Projection Example
```python
class OrderSummaryProjection:
def __init__(self, db, event_store: EventStore):
self.db = db
self.store = event_store
async def run(self, batch_size: int = 100):
position = await self._load_checkpoint()
while True:
events = await self.store.read_all(position, batch_size)
if not events:
await asyncio.sleep(1)
continue
for evt in events:
await self._apply(evt)
position = evt.global_position
await self._save_checkpoint(position)
async def _apply(self, event: Event):
match event.event_type:
case "OrderPlaced":
await self.db.execute(
"INSERT INTO order_summaries (id, customer, total, status) "
"VALUES ($1,$2,$3,'placed')",
event.data["order_id"], event.data["customer_id"],
event.data["total_cents"],
)
case "OrderShipped":
await self.db.execute(
"UPDATE order_summaries SET status='shipped' "
"WHERE id=$1", event.data["order_id"],
)
```
### Projection Design Rules
- **Idempotent handlers** — replaying the same event twice must not corrupt state
- **One projection per read model** — keep projections focused
- **Rebuild from scratch** — projections should be deletable and fully replayable
- **Separate storage** — projections can live in different databases (Postgres, Elasticsearch, Redis)
## Snapshotting
Snapshots accelerate aggregate rehydration by caching state at a known version.
Use when streams exceed ~100 events, aggregates have expensive rehydration, or on a cadence (e.g., every 50 events).
### Snapshot Flow
```python
class SnapshottedRepository:
def __init__(self, event_store: EventStore, pool):
self.store = event_store
self.pool = pool
async def load(self, stream_id: str) -> Aggregate:
# 1. Try loading snapshot
snap = await self._load_snapshot(stream_id)
from_version = 0
aggregate = Aggregate(stream_id)
if snap:
aggregate.restore(snap["data"])
from_version = snap["version"] + 1
# 2. Replay events after snapshot
events = await self.store.read_stream(stream_id, from_version)
for evt in events:
aggregate.apply(evt)
# 3. Snapshot if too many events replayed
if len(events) > 50:
await self._save_snapshot(
stream_id, aggregate.snapshot(), aggregate.version
)
return aggregate
```
## CQRS Integration
CQRS separates the write model (commands → events) from the read model (projections).
```
Commands ──► Aggregate ──► Event Store ──► Projections ──► Query API
(write) (domain) (append) (build) (read)
```
### Key Principles
1. **Write side** validates commands, emits events, enforces invariants
2. **Read side** subscribes to events, builds optimised query models
3. **Eventual consistency** — reads may lag behind writes by milliseconds to seconds
4. **Independent scaling** — scale reads and writes separately
### Command Handler Pattern
```python
class PlaceOrderHandler:
def __init__(self, event_store: EventStore):
self.store = event_store
async def handle(self, cmd: PlaceOrderCommand):
# Load aggregate from events
events = await self.store.read_stream(f"Order-{cmd.order_id}")
order = Order.reconstitute(events)
# Execute command — validates and produces new events
new_events = order.place(cmd.customer_id, cmd.items)
# Persist with concurrency check
await self.store.append(
f"Order-{cmd.order_id}", "Order", new_events,
expected_version=order.version,
)
```
## EventStoreDB Integration
```python
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
def append_events(stream_name: str, events: list, expected_revision=None):
new_events = [
NewEvent(
type=event['type'],
data=json.dumps(event['data']).encode(),
metadata=json.dumps(event.get('metadata', {})).encode()
)
for event in events
]
state = (StreamState.ANY if expected_revision is None
else StreamState.NO_STREAM if expected_revision == -1
else expected_revision)
return client.append_to_stream(stream_name, new_events, current_version=state)
def read_stream(stream_name: str, from_revision: int = 0):
return [
{'type': e.type, 'data': json.loads(e.data),
'stream_position': e.stream_position}
for e in client.get_stream(stream_name, stream_position=from_revision)
]
# Category projection: read all events for Order-* streams
def read_category(category: str):
return read_stream(f"$ce-{category}")
```
## DynamoDB Event Store
```python
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json, uuid
class DynamoEventStore:
def __init__(self, table_name: str):
self.table = boto3.resource('dynamodb').Table(table_name)
def append(self, stream_id: str, events: list, expected_version: int = 0):
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = expected_version + i + 1
batch.put_item(Item={
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
})
def read_stream(self, stream_id: str, from_version: int = 0):
resp = self.table.query(
KeyConditionExpression=
Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']}
for item in resp['Items']
]
```
**DynamoDB table design:** PK=`STREAM#{id}`, SK=`VERSION#{version}`, GSI1 for global ordering.
## Best Practices
### Do
- **Name streams `{Type}-{id}`** — e.g., `Order-abc123`
- **Include correlation / causation IDs** in metadata for tracing
- **Version event schemas from day one** — plan for evolution
- **Implement idempotent writes** — use event IDs for deduplication
- **Index for your query patterns** — stream, global position, event type
### Don't
- **Mutate or delete events** — they are immutable facts
- **Store large payloads** — keep events small; reference blobs externally
- **Skip optimistic concurrency** — prevents data corruption
- **Ignore backpressure** — handle slow consumers gracefully
- **Couple projections to the write model** — projections should be independently deployable
## NEVER Do
- **NEVER update or delete events** — Events are immutable historical facts; create compensating events instead
- **NEVER skip version checks on append** — Optimistic concurrency prevents lost updates and corruption
- **NEVER embed large blobs in events** — Store blobs externally, reference by ID in the event
- **NEVER use random UUIDs for event IDs without idempotency checks** — Retries create duplicates
- **NEVER read projections for command validation** — Use the event stream as the source of truth
- **NEVER couple projections to the write transaction** — Projections must be rebuildable independentlyRelated Skills
schema-markup
Add, fix, or optimize schema markup and structured data. Use when the user mentions schema markup, structured data, JSON-LD, rich snippets, schema.org, FAQ schema, product schema, review schema, or breadcrumb schema.
prompt-engineering
Master advanced prompt engineering techniques to maximize LLM performance, reliability, and controllability in production. Use when optimizing prompts, improving LLM outputs, designing production prompt templates, or building AI-powered features.
professional-communication
Write effective professional messages for software teams. Use when drafting emails, Slack/Teams messages, meeting agendas, status updates, or translating technical concepts for non-technical audiences. Triggers on email, slack, teams, message, meeting agenda, status update, stakeholder communication, escalation, jargon translation.
persona-docs
Create persona documentation for a product or codebase. Use when asked to create persona docs, document target users, define user journeys, document onboarding flows, or when starting a new product and needing to define its audience. Persona docs should be the first documentation created for any product.
mermaid-diagrams
Create software diagrams using Mermaid syntax. Use when users need to create, visualize, or document software through diagrams including class diagrams, sequence diagrams, flowcharts, ERDs, C4 architecture diagrams, state diagrams, git graphs, and other diagram types. Triggers include requests to diagram, visualize, model, map out, or show the flow of a system.
game-changing-features
Find 10x product opportunities and high-leverage improvements. Use when the user wants strategic product thinking, mentions 10x, wants to find high-impact features, or asks what would make a product dramatically more valuable.
clear-writing
Write clear, concise prose for humans — documentation, READMEs, API docs, commit messages, error messages, UI text, reports, and explanations. Combines Strunk's rules for clearer prose with technical documentation patterns, structure templates, and review checklists.
brainstorming
Explore ideas before implementation through collaborative dialogue. Use before any creative work — creating features, building components, adding functionality, or modifying behavior. Turns ideas into fully formed designs and specs through structured conversation.
Article Illustrator
When the user wants to add illustrations to an article or blog post. Triggers on: "illustrate article", "add images to article", "generate illustrations", "article images", or requests to visually enhance written content. Analyzes article structure, identifies positions for visual aids, and generates illustrations using a Type x Style two-dimension approach.
subagent-driven-development
Execute implementation plans by dispatching a fresh subagent per task with two-stage review (spec compliance then code quality). Use when you have an implementation plan with mostly independent tasks and want high-quality, fast iteration within a single session.
skill-judge
Evaluate Agent Skill quality against official specifications. Use when reviewing SKILL.md files, auditing skill packages, improving skill design, or checking if a skill follows best practices. Provides 8-dimension scoring (120 points) with actionable improvements. Triggers on review skill, evaluate skill, audit skill, improve skill, skill quality, SKILL.md review.
skill-creator
WHAT: Guide for creating effective AI agent skills - modular packages that extend Claude's capabilities with specialized knowledge, workflows, and tools. WHEN: User wants to create, write, author, or update a skill. User asks about skill structure, SKILL.md format, or how to package domain knowledge for AI agents. KEYWORDS: "create a skill", "make a skill", "new skill", "skill template", "SKILL.md", "agent skill", "write a skill", "skill structure", "package a skill"