event-store-design

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.

31,392 stars
Complexity: easy

About this skill

This skill equips an AI agent with comprehensive knowledge to guide users through the intricate process of designing and implementing robust event stores for event-sourced applications. It enables the agent to assist in building resilient event sourcing infrastructure, evaluating and selecting appropriate event store technologies (e.g., EventStoreDB, Kafka, relational databases), and applying various event persistence patterns effectively. By leveraging this skill, an AI agent can offer specialized, actionable advice and best practices for complex architectural decisions related to event sourcing, ensuring scalability, consistency, and maintainability.

Best use case

Assisting developers and architects in designing event stores for new or existing event-sourced applications, selecting optimal event store technologies, implementing advanced event persistence patterns, and building resilient event sourcing infrastructure.

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.

The user will receive actionable, validated steps and best practices for designing their event store, including recommendations for technology choices, implementation patterns, and a clear understanding of how to build robust event sourcing infrastructure. The outcome will be a structured design strategy tailored to their specific event sourcing goals.

Practical example

Example input

I'm building a new microservices application with event sourcing for an e-commerce platform. Can you help me design the event store, considering high availability, scalability, and data consistency for order processing? We are primarily looking at AWS services.

Example output

Certainly! To design an optimal event store for your AWS-based e-commerce microservices, let's clarify your specific goals and constraints. 

1.  **Clarify Goals & Constraints**: What are your projected event volumes for order processing (e.g., events/second), required read/write latency, consistency requirements (e.g., eventual consistency for product catalog vs. strong consistency for payment authorization), and any specific AWS services you've already considered (e.g., DynamoDB, Aurora, Kinesis)?
2.  **Apply Best Practices & Validate**: Based on your requirements, we can explore suitable AWS services. For instance, Amazon DynamoDB could serve as a highly scalable and available event log, while Amazon Kinesis might be used for real-time event streaming and processing. We'll discuss schema evolution, idempotent event handling, and resilience patterns.
3.  **Provide Actionable Steps**: I will outline a phased approach, starting with defining your core aggregates (e.g., Order, Customer) and their associated event schemas. Next, we'll look at infrastructure setup, including initial DynamoDB table design or Kinesis stream configuration, and then implement basic event publishing and subscription mechanisms.
4.  **Detailed Examples**: If specific code examples or architectural diagrams are needed, I can reference `resources/implementation-playbook.md` to illustrate patterns like snapshotting or read model projections.

To begin, can you elaborate on your projected event volume for peak order processing and your specific consistency requirements for critical business operations?

When to use this skill

  • Use this skill when an AI agent needs to provide expert guidance on designing event stores for event-sourced systems, advise on selecting suitable event store technologies (e.g., EventStoreDB, Kafka, Amazon DynamoDB), implement various event persistence patterns (e.g., snapshotting, projections), or develop overall event sourcing infrastructure.

When not to use this skill

  • Do not use this skill if the task is unrelated to the design or implementation of event stores, or if the requirement involves a different domain, tool, or technical aspect outside the scope of event-sourced system persistence.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/event-store-design/SKILL.md --create-dirs "https://raw.githubusercontent.com/sickn33/antigravity-awesome-skills/main/plugins/antigravity-awesome-skills-claude/skills/event-store-design/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/event-store-design/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How event-store-design Compares

Feature / Agentevent-store-designStandard Approach
Platform SupportClaudeLimited / Varies
Context Awareness High Baseline
Installation ComplexityeasyN/A

Frequently Asked Questions

What does this skill do?

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.

Which AI agents support this skill?

This skill is designed for Claude.

How difficult is it to install?

The installation complexity is rated as easy. You can find the installation instructions above.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# Event Store Design

Comprehensive guide to designing event stores for event-sourced applications.

## Do not use this skill when

- The task is unrelated to event store design
- You need a different domain or tool outside this scope

## Instructions

- Clarify goals, constraints, and required inputs.
- Apply relevant best practices and validate outcomes.
- Provide actionable steps and verification.
- If detailed examples are required, open `resources/implementation-playbook.md`.

## Use this skill when

- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Optimizing event storage and retrieval
- Setting up event store schemas
- Planning for event store scaling

## Core Concepts

### 1. Event Store Architecture

```
┌─────────────────────────────────────────────────────┐
│                    Event Store                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Stream 1   │  │   Stream 2   │  │   Stream 3   │ │
│  │ (Aggregate)  │  │ (Aggregate)  │  │ (Aggregate)  │ │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤ │
│  │ Event 1     │  │ Event 1     │  │ Event 1     │ │
│  │ Event 2     │  │ Event 2     │  │ Event 2     │ │
│  │ Event 3     │  │ ...         │  │ Event 3     │ │
│  │ ...         │  │             │  │ Event 4     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘ │
├─────────────────────────────────────────────────────┤
│  Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ...     │
└─────────────────────────────────────────────────────┘
```

### 2. Event Store Requirements

| Requirement       | Description                        |
| ----------------- | ---------------------------------- |
| **Append-only**   | Events are immutable, only appends |
| **Ordered**       | Per-stream and global ordering     |
| **Versioned**     | Optimistic concurrency control     |
| **Subscriptions** | Real-time event notifications      |
| **Idempotent**    | Handle duplicate writes safely     |

## Technology Comparison

| Technology       | Best For                  | Limitations                      |
| ---------------- | ------------------------- | -------------------------------- |
| **EventStoreDB** | Pure event sourcing       | Single-purpose                   |
| **PostgreSQL**   | Existing Postgres stack   | Manual implementation            |
| **Kafka**        | High-throughput streaming | Not ideal for per-stream queries |
| **DynamoDB**     | Serverless, AWS-native    | Query limitations                |
| **Marten**       | .NET ecosystems           | .NET specific                    |

## Templates

### Template 1: PostgreSQL Event Store Schema

```sql
-- Events table
CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id VARCHAR(255) NOT NULL,
    stream_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    version BIGINT NOT NULL,
    global_position BIGSERIAL,
    created_at TIMESTAMPTZ DEFAULT NOW(),

    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);

-- Index for stream queries
CREATE INDEX idx_events_stream_id ON events(stream_id, version);

-- Index for global subscription
CREATE INDEX idx_events_global_position ON events(global_position);

-- Index for event type queries
CREATE INDEX idx_events_event_type ON events(event_type);

-- Index for time-based queries
CREATE INDEX idx_events_created_at ON events(created_at);

-- Snapshots table
CREATE TABLE snapshots (
    stream_id VARCHAR(255) PRIMARY KEY,
    stream_type VARCHAR(255) NOT NULL,
    snapshot_data JSONB NOT NULL,
    version BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Subscriptions checkpoint table
CREATE TABLE subscription_checkpoints (
    subscription_id VARCHAR(255) PRIMARY KEY,
    last_position BIGINT NOT NULL DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);
```

### Template 2: Python Event Store Implementation

```python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg

@dataclass
class Event:
    stream_id: str
    event_type: str
    data: dict
    metadata: dict = field(default_factory=dict)
    event_id: UUID = field(default_factory=uuid4)
    version: Optional[int] = None
    global_position: Optional[int] = None
    created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def append_events(
        self,
        stream_id: str,
        stream_type: str,
        events: List[Event],
        expected_version: Optional[int] = None
    ) -> List[Event]:
        """Append events to a stream with optimistic concurrency."""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                # Check expected version
                if expected_version is not None:
                    current = await conn.fetchval(
                        "SELECT MAX(version) FROM events WHERE stream_id = $1",
                        stream_id
                    )
                    current = current or 0
                    if current != expected_version:
                        raise ConcurrencyError(
                            f"Expected version {expected_version}, got {current}"
                        )

                # Get starting version
                start_version = await conn.fetchval(
                    "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
                    stream_id
                )

                # Insert events
                saved_events = []
                for i, event in enumerate(events):
                    event.version = start_version + i
                    row = await conn.fetchrow(
                        """
                        INSERT INTO events (id, stream_id, stream_type, event_type,
                                          event_data, metadata, version, created_at)
                        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
                        RETURNING global_position
                        """,
                        event.event_id,
                        stream_id,
                        stream_type,
                        event.event_type,
                        json.dumps(event.data),
                        json.dumps(event.metadata),
                        event.version,
                        event.created_at
                    )
                    event.global_position = row['global_position']
                    saved_events.append(event)

                return saved_events

    async def read_stream(
        self,
        stream_id: str,
        from_version: int = 0,
        limit: int = 1000
    ) -> List[Event]:
        """Read events from a stream."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                """
                SELECT id, stream_id, event_type, event_data, metadata,
                       version, global_position, created_at
                FROM events
                WHERE stream_id = $1 AND version >= $2
                ORDER BY version
                LIMIT $3
                """,
                stream_id, from_version, limit
            )
            return [self._row_to_event(row) for row in rows]

    async def read_all(
        self,
        from_position: int = 0,
        limit: int = 1000
    ) -> List[Event]:
        """Read all events globally."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                """
                SELECT id, stream_id, event_type, event_data, metadata,
                       version, global_position, created_at
                FROM events
                WHERE global_position > $1
                ORDER BY global_position
                LIMIT $2
                """,
                from_position, limit
            )
            return [self._row_to_event(row) for row in rows]

    async def subscribe(
        self,
        subscription_id: str,
        handler,
        from_position: int = 0,
        batch_size: int = 100
    ):
        """Subscribe to all events from a position."""
        # Get checkpoint
        async with self.pool.acquire() as conn:
            checkpoint = await conn.fetchval(
                """
                SELECT last_position FROM subscription_checkpoints
                WHERE subscription_id = $1
                """,
                subscription_id
            )
            position = checkpoint or from_position

        while True:
            events = await self.read_all(position, batch_size)
            if not events:
                await asyncio.sleep(1)  # Poll interval
                continue

            for event in events:
                await handler(event)
                position = event.global_position

            # Save checkpoint
            async with self.pool.acquire() as conn:
                await conn.execute(
                    """
                    INSERT INTO subscription_checkpoints (subscription_id, last_position)
                    VALUES ($1, $2)
                    ON CONFLICT (subscription_id)
                    DO UPDATE SET last_position = $2, updated_at = NOW()
                    """,
                    subscription_id, position
                )

    def _row_to_event(self, row) -> Event:
        return Event(
            event_id=row['id'],
            stream_id=row['stream_id'],
            event_type=row['event_type'],
            data=json.loads(row['event_data']),
            metadata=json.loads(row['metadata']),
            version=row['version'],
            global_position=row['global_position'],
            created_at=row['created_at']
        )


class ConcurrencyError(Exception):
    """Raised when optimistic concurrency check fails."""
    pass
```

### Template 3: EventStoreDB Usage

```python
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json

# Connect
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

# Append events
def append_events(stream_name: str, events: list, expected_revision=None):
    new_events = [
        NewEvent(
            type=event['type'],
            data=json.dumps(event['data']).encode(),
            metadata=json.dumps(event.get('metadata', {})).encode()
        )
        for event in events
    ]

    if expected_revision is None:
        state = StreamState.ANY
    elif expected_revision == -1:
        state = StreamState.NO_STREAM
    else:
        state = expected_revision

    return client.append_to_stream(
        stream_name=stream_name,
        events=new_events,
        current_version=state
    )

# Read stream
def read_stream(stream_name: str, from_revision: int = 0):
    events = client.get_stream(
        stream_name=stream_name,
        stream_position=from_revision
    )
    return [
        {
            'type': event.type,
            'data': json.loads(event.data),
            'metadata': json.loads(event.metadata) if event.metadata else {},
            'stream_position': event.stream_position,
            'commit_position': event.commit_position
        }
        for event in events
    ]

# Subscribe to all
async def subscribe_to_all(handler, from_position: int = 0):
    subscription = client.subscribe_to_all(commit_position=from_position)
    async for event in subscription:
        await handler({
            'type': event.type,
            'data': json.loads(event.data),
            'stream_id': event.stream_name,
            'position': event.commit_position
        })

# Category projection ($ce-Category)
def read_category(category: str):
    """Read all events for a category using system projection."""
    return read_stream(f"$ce-{category}")
```

### Template 4: DynamoDB Event Store

```python
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json
import uuid

class DynamoEventStore:
    def __init__(self, table_name: str):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)

    def append_events(self, stream_id: str, events: list, expected_version: int = None):
        """Append events with conditional write for concurrency."""
        with self.table.batch_writer() as batch:
            for i, event in enumerate(events):
                version = (expected_version or 0) + i + 1
                item = {
                    'PK': f"STREAM#{stream_id}",
                    'SK': f"VERSION#{version:020d}",
                    'GSI1PK': 'EVENTS',
                    'GSI1SK': datetime.utcnow().isoformat(),
                    'event_id': str(uuid.uuid4()),
                    'stream_id': stream_id,
                    'event_type': event['type'],
                    'event_data': json.dumps(event['data']),
                    'version': version,
                    'created_at': datetime.utcnow().isoformat()
                }
                batch.put_item(Item=item)
        return events

    def read_stream(self, stream_id: str, from_version: int = 0):
        """Read events from a stream."""
        response = self.table.query(
            KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
                                  Key('SK').gte(f"VERSION#{from_version:020d}")
        )
        return [
            {
                'event_type': item['event_type'],
                'data': json.loads(item['event_data']),
                'version': item['version']
            }
            for item in response['Items']
        ]

# Table definition (CloudFormation/Terraform)
"""
DynamoDB Table:
  - PK (Partition Key): String
  - SK (Sort Key): String
  - GSI1PK, GSI1SK for global ordering

Capacity: On-demand or provisioned based on throughput needs
"""
```

## Best Practices

### Do's

- **Use stream IDs that include aggregate type** - `Order-{uuid}`
- **Include correlation/causation IDs** - For tracing
- **Version events from day one** - Plan for schema evolution
- **Implement idempotency** - Use event IDs for deduplication
- **Index appropriately** - For your query patterns

### Don'ts

- **Don't update or delete events** - They're immutable facts
- **Don't store large payloads** - Keep events small
- **Don't skip optimistic concurrency** - Prevents data corruption
- **Don't ignore backpressure** - Handle slow consumers

## Resources

- [EventStoreDB](https://www.eventstore.com/)
- [Marten Events](https://martendb.io/events/)
- [Event Sourcing Pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)

Related Skills

ddd-strategic-design

31392
from sickn33/antigravity-awesome-skills

Design DDD strategic artifacts including subdomains, bounded contexts, and ubiquitous language for complex business domains.

Software ArchitectureClaude

cqrs-implementation

31392
from sickn33/antigravity-awesome-skills

Implement Command Query Responsibility Segregation for scalable architectures. Use when separating read and write models, optimizing query performance, or building event-sourced systems.

Software ArchitectureClaude

c4-container

31392
from sickn33/antigravity-awesome-skills

Expert C4 Container-level documentation specialist.

Software ArchitectureClaude

c4-component

31392
from sickn33/antigravity-awesome-skills

Expert C4 Component-level documentation specialist. Synthesizes C4 Code-level documentation into Component-level architecture, defining component boundaries, interfaces, and relationships.

Software ArchitectureClaude

backend-architect

31392
from sickn33/antigravity-awesome-skills

Expert backend architect specializing in scalable API design, microservices architecture, and distributed systems.

Software ArchitectureClaude

mobile-design

31392
from sickn33/antigravity-awesome-skills

(Mobile-First · Touch-First · Platform-Respectful)

UI/UX Design & Frontend DevelopmentClaude

microsoft-azure-webjobs-extensions-authentication-events-dotnet

31392
from sickn33/antigravity-awesome-skills

Microsoft Entra Authentication Events SDK for .NET. Azure Functions triggers for custom authentication extensions.

Identity Management / Authentication & AuthorizationClaude

makepad-event-action

31392
from sickn33/antigravity-awesome-skills

CRITICAL: Use for Makepad event and action handling. Triggers on: makepad event, makepad action, Event enum, ActionTrait, handle_event, MouseDown, KeyDown, TouchUpdate, Hit, FingerDown, post_action, makepad 事件, makepad action, 事件处理

UI DevelopmentClaude

kpi-dashboard-design

31392
from sickn33/antigravity-awesome-skills

Comprehensive patterns for designing effective Key Performance Indicator (KPI) dashboards that drive business decisions.

Business Intelligence & AnalyticsClaude

game-design

31392
from sickn33/antigravity-awesome-skills

Game design principles. GDD structure, balancing, player psychology, progression.

Game DevelopmentClaude

frontend-design

31392
from sickn33/antigravity-awesome-skills

You are a frontend designer-engineer, not a layout generator.

Web DevelopmentClaude

domain-driven-design

31392
from sickn33/antigravity-awesome-skills

Plan and route Domain-Driven Design work from strategic modeling to tactical implementation and evented architecture patterns.

Software DevelopmentClaude