add-outbox-pattern

Add transactional outbox pattern for reliable event publishing with RavenDB (project)

181 stars

Best use case

add-outbox-pattern is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Add transactional outbox pattern for reliable event publishing with RavenDB (project)

Teams using add-outbox-pattern should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/add-outbox-pattern/SKILL.md --create-dirs "https://raw.githubusercontent.com/majiayu000/claude-skill-registry/main/skills/data/add-outbox-pattern/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/add-outbox-pattern/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How add-outbox-pattern Compares

Feature / Agentadd-outbox-patternStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Add transactional outbox pattern for reliable event publishing with RavenDB (project)

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Add Outbox Pattern Skill

Implement the transactional outbox pattern for reliable event publishing in NovaTune using RavenDB.

## Overview

The outbox pattern ensures exactly-once event publishing by:
1. Writing events to an `OutboxMessages` collection in the same transaction as domain changes
2. A background processor reads and publishes events, then marks them as processed
3. Guarantees no lost events even if Kafka/Redpanda is temporarily unavailable

## Steps

### 1. Create OutboxMessage Model

Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Models/OutboxMessage.cs`

```csharp
namespace NovaTuneApp.ApiService.Models;

/// <summary>
/// Represents an event pending publication to the message broker.
/// </summary>
public sealed class OutboxMessage
{
    /// <summary>
    /// RavenDB document ID (e.g., "OutboxMessages/01HXK...")
    /// </summary>
    public string Id { get; init; } = string.Empty;

    /// <summary>
    /// Event type name for deserialization/routing.
    /// </summary>
    public required string EventType { get; init; }

    /// <summary>
    /// JSON-serialized event payload.
    /// </summary>
    public required string Payload { get; init; }

    /// <summary>
    /// Kafka partition key for ordering guarantees.
    /// </summary>
    public required string PartitionKey { get; init; }

    /// <summary>
    /// Target topic name (without prefix).
    /// </summary>
    public string? Topic { get; init; }

    /// <summary>
    /// When the outbox message was created.
    /// </summary>
    public required DateTimeOffset CreatedAt { get; init; }

    /// <summary>
    /// When the message was published (null if pending).
    /// </summary>
    public DateTimeOffset? ProcessedAt { get; set; }

    /// <summary>
    /// Number of publication attempts.
    /// </summary>
    public int Attempts { get; set; }

    /// <summary>
    /// Last error message if publication failed.
    /// </summary>
    public string? LastError { get; set; }
}
```

### 2. Create RavenDB Index for Pending Messages

Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Indexes/OutboxMessages_ByPending.cs`

```csharp
using Raven.Client.Documents.Indexes;
using NovaTuneApp.ApiService.Models;

namespace NovaTuneApp.ApiService.Infrastructure.Indexes;

public class OutboxMessages_ByPending : AbstractIndexCreationTask<OutboxMessage>
{
    public OutboxMessages_ByPending()
    {
        Map = messages => from msg in messages
                          where msg.ProcessedAt == null
                          select new
                          {
                              msg.CreatedAt,
                              msg.Attempts
                          };
    }
}
```

### 3. Create Outbox Service Interface

Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Services/IOutboxService.cs`

```csharp
namespace NovaTuneApp.ApiService.Services;

/// <summary>
/// Service for writing events to the outbox.
/// </summary>
public interface IOutboxService
{
    /// <summary>
    /// Writes an event to the outbox within the current session.
    /// Must be called before SaveChangesAsync().
    /// </summary>
    Task WriteAsync<TEvent>(
        TEvent @event,
        string partitionKey,
        string? topic = null,
        CancellationToken ct = default) where TEvent : class;
}
```

### 4. Implement Outbox Service

Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Services/OutboxService.cs`

```csharp
using System.Text.Json;
using Raven.Client.Documents.Session;
using NovaTuneApp.ApiService.Models;

namespace NovaTuneApp.ApiService.Services;

public class OutboxService : IOutboxService
{
    private readonly IAsyncDocumentSession _session;
    private readonly ILogger<OutboxService> _logger;

    public OutboxService(
        IAsyncDocumentSession session,
        ILogger<OutboxService> logger)
    {
        _session = session;
        _logger = logger;
    }

    public async Task WriteAsync<TEvent>(
        TEvent @event,
        string partitionKey,
        string? topic = null,
        CancellationToken ct = default) where TEvent : class
    {
        var eventType = typeof(TEvent).Name;
        var outboxMessage = new OutboxMessage
        {
            Id = $"OutboxMessages/{Ulid.NewUlid()}",
            EventType = eventType,
            Payload = JsonSerializer.Serialize(@event),
            PartitionKey = partitionKey,
            Topic = topic,
            CreatedAt = DateTimeOffset.UtcNow
        };

        await _session.StoreAsync(outboxMessage, ct);

        _logger.LogDebug(
            "Queued {EventType} for partition {PartitionKey}",
            eventType, partitionKey);
    }
}
```

### 5. Create Outbox Processor Background Service

Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Services/OutboxProcessorService.cs`

```csharp
using System.Text.Json;
using KafkaFlow.Producers;
using Microsoft.Extensions.Options;
using Raven.Client.Documents;
using Raven.Client.Documents.Session;
using NovaTuneApp.ApiService.Configuration;
using NovaTuneApp.ApiService.Models;
using NovaTuneApp.ApiService.Infrastructure.Indexes;

namespace NovaTuneApp.ApiService.Infrastructure.Services;

public class OutboxProcessorService : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly IOptions<OutboxOptions> _options;
    private readonly IOptions<NovaTuneOptions> _novatuneOptions;
    private readonly ILogger<OutboxProcessorService> _logger;

    public OutboxProcessorService(
        IServiceProvider serviceProvider,
        IOptions<OutboxOptions> options,
        IOptions<NovaTuneOptions> novatuneOptions,
        ILogger<OutboxProcessorService> logger)
    {
        _serviceProvider = serviceProvider;
        _options = options;
        _novatuneOptions = novatuneOptions;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        if (!_options.Value.Enabled)
        {
            _logger.LogInformation("Outbox processor is disabled");
            return;
        }

        _logger.LogInformation(
            "Outbox processor starting with {Interval} interval",
            _options.Value.PollingInterval);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await ProcessBatchAsync(stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing outbox");
            }

            await Task.Delay(_options.Value.PollingInterval, stoppingToken);
        }
    }

    private async Task ProcessBatchAsync(CancellationToken ct)
    {
        using var scope = _serviceProvider.CreateScope();
        var store = scope.ServiceProvider.GetRequiredService<IDocumentStore>();
        var producerAccessor = scope.ServiceProvider.GetRequiredService<IProducerAccessor>();

        using var session = store.OpenAsyncSession();
        var pendingMessages = await session
            .Query<OutboxMessage, OutboxMessages_ByPending>()
            .Where(m => m.ProcessedAt == null && m.Attempts < _options.Value.MaxAttempts)
            .OrderBy(m => m.CreatedAt)
            .Take(_options.Value.BatchSize)
            .ToListAsync(ct);

        if (pendingMessages.Count == 0) return;

        _logger.LogDebug("Processing {Count} outbox messages", pendingMessages.Count);

        var topicPrefix = _novatuneOptions.Value.TopicPrefix;

        foreach (var message in pendingMessages)
        {
            try
            {
                var topic = message.Topic ?? GetDefaultTopic(message.EventType);
                var fullTopic = $"{topicPrefix}-{topic}";

                var producer = producerAccessor.GetProducer("default");
                await producer.ProduceAsync(
                    fullTopic,
                    message.PartitionKey,
                    message.Payload);

                message.ProcessedAt = DateTimeOffset.UtcNow;
                _logger.LogDebug(
                    "Published {EventType} to {Topic}",
                    message.EventType, fullTopic);
            }
            catch (Exception ex)
            {
                message.Attempts++;
                message.LastError = ex.Message;
                _logger.LogWarning(
                    ex,
                    "Failed to publish {EventType} (attempt {Attempt})",
                    message.EventType, message.Attempts);
            }
        }

        await session.SaveChangesAsync(ct);
    }

    private static string GetDefaultTopic(string eventType) => eventType switch
    {
        nameof(TrackDeletedEvent) => "track-deletions",
        nameof(AudioUploadedEvent) => "audio-events",
        _ => "events"
    };
}
```

### 6. Add Configuration Options

Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Configuration/OutboxOptions.cs`

```csharp
namespace NovaTuneApp.ApiService.Configuration;

public class OutboxOptions
{
    public const string SectionName = "Outbox";

    /// <summary>
    /// Polling interval for outbox processor.
    /// Default: 1 second.
    /// </summary>
    public TimeSpan PollingInterval { get; set; } = TimeSpan.FromSeconds(1);

    /// <summary>
    /// Maximum messages per batch.
    /// Default: 100.
    /// </summary>
    public int BatchSize { get; set; } = 100;

    /// <summary>
    /// Maximum publication attempts before giving up.
    /// Default: 5.
    /// </summary>
    public int MaxAttempts { get; set; } = 5;

    /// <summary>
    /// Whether outbox processing is enabled.
    /// Default: true.
    /// </summary>
    public bool Enabled { get; set; } = true;

    /// <summary>
    /// Retention period for processed messages.
    /// Default: 7 days.
    /// </summary>
    public TimeSpan RetentionPeriod { get; set; } = TimeSpan.FromDays(7);
}
```

### 7. Register Services in Program.cs

```csharp
// Configuration
builder.Services.Configure<OutboxOptions>(
    builder.Configuration.GetSection(OutboxOptions.SectionName));

// Services
builder.Services.AddScoped<IOutboxService, OutboxService>();

// Background processor
builder.Services.AddHostedService<OutboxProcessorService>();
```

### 8. Add Configuration to appsettings.json

```json
{
  "Outbox": {
    "PollingInterval": "00:00:01",
    "BatchSize": 100,
    "MaxAttempts": 5,
    "Enabled": true,
    "RetentionPeriod": "7.00:00:00"
  }
}
```

## Usage Example

```csharp
public class TrackManagementService : ITrackManagementService
{
    private readonly IAsyncDocumentSession _session;
    private readonly IOutboxService _outboxService;

    public async Task DeleteTrackAsync(string trackId, string userId, CancellationToken ct)
    {
        var track = await _session.LoadAsync<Track>($"Tracks/{trackId}", ct);
        // ... validation ...

        // Soft-delete track
        track.Status = TrackStatus.Deleted;
        track.DeletedAt = DateTimeOffset.UtcNow;
        track.ScheduledDeletionAt = track.DeletedAt.Value.AddDays(30);

        // Write event to outbox (same transaction)
        var evt = new TrackDeletedEvent
        {
            TrackId = trackId,
            UserId = userId,
            ObjectKey = track.ObjectKey,
            // ... other fields
        };

        await _outboxService.WriteAsync(evt, partitionKey: trackId, ct: ct);

        // Both track update and outbox message saved atomically
        await _session.SaveChangesAsync(ct);
    }
}
```

## Benefits

- **Exactly-once delivery**: Events stored atomically with domain changes
- **Resilience**: Events published even if broker temporarily unavailable
- **Ordering**: Partition key ensures order within entity
- **Retries**: Failed messages retried with exponential backoff
- **Observability**: Failed messages visible in RavenDB

## Cleanup

Add a scheduled task to delete processed messages older than retention period:

```csharp
// In OutboxProcessorService or separate cleanup service
var cutoff = DateTimeOffset.UtcNow - _options.Value.RetentionPeriod;
var oldMessages = await session
    .Query<OutboxMessage>()
    .Where(m => m.ProcessedAt != null && m.ProcessedAt < cutoff)
    .Take(1000)
    .ToListAsync(ct);

foreach (var msg in oldMessages)
    session.Delete(msg);

await session.SaveChangesAsync(ct);
```

Related Skills

advanced-patterns

181
from majiayu000/claude-skill-registry

Advanced T-SQL patterns and techniques for SQL Server. Use this skill when: (1) User needs help with CTEs or recursive queries, (2) User asks about APPLY operator, (3) User wants MERGE or OUTPUT clause help, (4) User works with temporal tables, (5) User needs In-Memory OLTP guidance, (6) User asks about advanced grouping (ROLLUP, CUBE, GROUPING SETS).

advanced-js-mocking-patterns

181
from majiayu000/claude-skill-registry

Advanced mocking patterns for Jest and Vitest including module mocking, spies, and fake timers. PROACTIVELY activate for: (1) Module mocking, (2) Partial mocking with spies, (3) Mock lifecycle management, (4) Fake timers for time-dependent code, (5) Complex mock implementations. Triggers: "jest.mock", "vi.mock", "spyOn", "fakeTimers", "mockImplementation", "mockReturnValue", "mock lifecycle"

Advanced GetX Patterns

181
from majiayu000/claude-skill-registry

Advanced GetX features including Workers, GetxService, SmartManagement, GetConnect, GetSocket, bindings composition, and testing patterns

patterns/adapter

181
from majiayu000/claude-skill-registry

Adapter (Wrapper) Pattern pattern for C development

ActiveRecord Query Patterns

181
from majiayu000/claude-skill-registry

Complete guide to ActiveRecord query optimization, associations, scopes, and PostgreSQL-specific patterns. Use this skill when writing database queries, designing model associations, creating migrations, optimizing query performance, or debugging N+1 queries and grouping errors.

actions-pattern

181
from majiayu000/claude-skill-registry

Garante que novas Actions sigam o padrão de classes actions reutilizáveis do Easy Budget.

Action Pattern Conventions

181
from majiayu000/claude-skill-registry

This skill should be used when the user asks about "Laravel action pattern", "action class naming", "how to structure actions", "React component patterns", "Node.js service structure", "framework-specific conventions", or discusses creating reusable, focused classes following action pattern conventions in Laravel, Symfony, React, Vue, or Node.js projects.

Action Cable & WebSocket Patterns

181
from majiayu000/claude-skill-registry

Real-time WebSocket features with Action Cable in Rails. Use when: (1) Building real-time chat, (2) Live notifications/presence, (3) Broadcasting model updates, (4) WebSocket authorization. Trigger keywords: Action Cable, WebSocket, real-time, channels, broadcasting, stream, subscriptions, presence, cable

ace-pattern-learning

181
from majiayu000/claude-skill-registry

Search ACE playbook before implementing, building, fixing, debugging, or refactoring code. Capture patterns after completing substantial coding work.

accessibility-patterns

181
from majiayu000/claude-skill-registry

Build inclusive web experiences following WCAG guidelines. Covers semantic HTML, ARIA, keyboard navigation, color contrast, and testing strategies. Triggers on accessibility, a11y, WCAG, screen readers, or inclusive design requests.

access-control-patterns

181
from majiayu000/claude-skill-registry

[STUB - Not implemented] Access control auditing with IDOR detection, RBAC/ABAC patterns, and privilege escalation prevention. PROACTIVELY activate for: [TODO: Define on implementation]. Triggers: [TODO: Define on implementation]

acc-stability-patterns-knowledge

181
from majiayu000/claude-skill-registry

Stability Patterns knowledge base. Provides patterns, antipatterns, and PHP-specific guidelines for Circuit Breaker, Retry, Rate Limiter, Bulkhead, and resilience audits.