add-kafka-consumer
Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages (project)
Best use case
add-kafka-consumer is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages (project)
Teams using add-kafka-consumer should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/add-kafka-consumer/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How add-kafka-consumer Compares
| Feature / Agent | add-kafka-consumer | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages (project)
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Add Kafka Consumer Skill
Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages in NovaTune.
## Project Context
- Handlers location: `src/NovaTuneApp/NovaTuneApp.Workers.{Name}/Handlers/`
- Message types: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Messaging/Messages/`
- Topic naming: `{prefix}-{topic-name}` (e.g., `dev-track-deletions`)
## Steps
### 1. Create Message Type (if needed)
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Messaging/Messages/{EventName}.cs`
```csharp
namespace NovaTuneApp.ApiService.Infrastructure.Messaging.Messages;
/// <summary>
/// Event published when a track is soft-deleted.
/// </summary>
public record TrackDeletedEvent
{
public int SchemaVersion { get; init; } = 2;
public required string TrackId { get; init; }
public required string UserId { get; init; }
public required string ObjectKey { get; init; }
public string? WaveformObjectKey { get; init; }
public required long FileSizeBytes { get; init; }
public required DateTimeOffset DeletedAt { get; init; }
public required DateTimeOffset ScheduledDeletionAt { get; init; }
public required string CorrelationId { get; init; }
public required DateTimeOffset Timestamp { get; init; }
}
```
### 2. Create Handler Class
Location: `src/NovaTuneApp/NovaTuneApp.Workers.{Name}/Handlers/{EventName}Handler.cs`
```csharp
using KafkaFlow;
using NovaTuneApp.ApiService.Infrastructure.Messaging.Messages;
namespace NovaTuneApp.Workers.Lifecycle.Handlers;
/// <summary>
/// Handles TrackDeletedEvent messages for immediate cache invalidation.
/// </summary>
public class TrackDeletedHandler : IMessageHandler<TrackDeletedEvent>
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<TrackDeletedHandler> _logger;
public TrackDeletedHandler(
IServiceProvider serviceProvider,
ILogger<TrackDeletedHandler> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
public async Task Handle(IMessageContext context, TrackDeletedEvent message)
{
using var scope = _serviceProvider.CreateScope();
_logger.LogInformation(
"Processing TrackDeletedEvent for track {TrackId}, user {UserId}",
message.TrackId,
message.UserId);
try
{
// Get services from scoped container
var cacheService = scope.ServiceProvider.GetRequiredService<ICacheService>();
// Perform idempotent operations
await cacheService.InvalidateTrackCacheAsync(
message.TrackId,
message.UserId,
context.ConsumerContext.WorkerStopped);
_logger.LogDebug(
"Successfully processed TrackDeletedEvent for {TrackId}, scheduled deletion at {ScheduledAt}",
message.TrackId,
message.ScheduledDeletionAt);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process TrackDeletedEvent for track {TrackId}",
message.TrackId);
// Re-throw to trigger retry/DLQ behavior
throw;
}
}
}
```
### 3. Register Consumer in Program.cs
```csharp
var topicPrefix = builder.Configuration["NovaTune:TopicPrefix"] ?? "dev";
var bootstrapServers = builder.Configuration.GetConnectionString("messaging")
?? "localhost:9092";
builder.Services.AddKafka(kafka => kafka
.UseMicrosoftLog()
.AddCluster(cluster =>
{
cluster.WithBrokers([bootstrapServers]);
// Register consumer for track deletions
cluster.AddConsumer(consumer => consumer
.Topic($"{topicPrefix}-track-deletions")
.WithGroupId($"{topicPrefix}-lifecycle-worker")
.WithBufferSize(100)
.WithWorkersCount(2)
.WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Earliest)
.WithConsumerConfig(new ConsumerConfig
{
SessionTimeoutMs = 45000,
SocketTimeoutMs = 30000,
ReconnectBackoffMs = 1000
})
.AddMiddlewares(m => m
.AddDeserializer<JsonCoreDeserializer>()
.AddTypedHandlers(h => h.AddHandler<TrackDeletedHandler>())
)
);
})
);
// Register handler in DI
builder.Services.AddTransient<TrackDeletedHandler>();
```
### 4. Add KafkaFlow Hosted Service
```csharp
builder.Services.AddHostedService<KafkaFlowHostedService>();
```
The hosted service manages the KafkaFlow bus lifecycle:
```csharp
internal class KafkaFlowHostedService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<KafkaFlowHostedService> _logger;
private IKafkaBus? _kafkaBus;
private const int MaxRetries = 30;
private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
public KafkaFlowHostedService(
IServiceProvider serviceProvider,
ILogger<KafkaFlowHostedService> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting KafkaFlow bus...");
await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
for (var attempt = 1; attempt <= MaxRetries; attempt++)
{
try
{
_kafkaBus = _serviceProvider.CreateKafkaBus();
await _kafkaBus.StartAsync(stoppingToken);
_logger.LogInformation("KafkaFlow bus started on attempt {Attempt}", attempt);
await Task.Delay(Timeout.Infinite, stoppingToken);
return;
}
catch (OperationCanceledException)
{
_logger.LogInformation("KafkaFlow bus stopping due to cancellation");
return;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to start KafkaFlow bus (attempt {Attempt}/{Max})",
attempt, MaxRetries);
if (attempt < MaxRetries)
await Task.Delay(RetryDelay, stoppingToken);
else
_logger.LogError(ex, "Failed after {Max} attempts", MaxRetries);
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
if (_kafkaBus is not null)
{
_logger.LogInformation("Stopping KafkaFlow bus...");
await _kafkaBus.StopAsync();
}
await base.StopAsync(cancellationToken);
}
}
```
## Consumer Configuration Options
| Option | Description | Default |
|--------|-------------|---------|
| `WithBufferSize` | Internal message buffer size | 100 |
| `WithWorkersCount` | Parallel message processors | 1-4 |
| `WithAutoOffsetReset` | Starting position for new consumers | `Earliest` |
| `SessionTimeoutMs` | Consumer session timeout | 45000 |
| `SocketTimeoutMs` | Socket timeout | 30000 |
## Handler Patterns
### Simple Handler
```csharp
public class SimpleHandler : IMessageHandler<MyEvent>
{
public Task Handle(IMessageContext context, MyEvent message)
{
// Process message
return Task.CompletedTask;
}
}
```
### Handler with Scoped Services
```csharp
public class ScopedHandler : IMessageHandler<MyEvent>
{
private readonly IServiceProvider _serviceProvider;
public ScopedHandler(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task Handle(IMessageContext context, MyEvent message)
{
using var scope = _serviceProvider.CreateScope();
var dbSession = scope.ServiceProvider.GetRequiredService<IAsyncDocumentSession>();
// Use scoped services
await dbSession.SaveChangesAsync(context.ConsumerContext.WorkerStopped);
}
}
```
### Handler with Retry/DLQ
```csharp
public async Task Handle(IMessageContext context, MyEvent message)
{
try
{
// Process message
}
catch (TransientException ex)
{
// Will be retried based on consumer config
throw;
}
catch (PermanentException ex)
{
// Log and swallow - don't retry
_logger.LogError(ex, "Permanent failure for message");
}
}
```
## Best Practices
1. **Make handlers idempotent** - Messages may be delivered more than once
2. **Use scoped services** - Create scope for each message
3. **Handle cancellation** - Use `context.ConsumerContext.WorkerStopped`
4. **Log appropriately** - Info for processing, Debug for success, Error for failures
5. **Re-throw for retries** - Only swallow permanent failures
6. **Keep handlers focused** - One handler per message type
## Testing
```csharp
[Fact]
public async Task Handler_Should_InvalidateCache_OnTrackDeleted()
{
// Arrange
var cacheService = Substitute.For<ICacheService>();
var serviceProvider = BuildServiceProvider(cacheService);
var handler = new TrackDeletedHandler(serviceProvider, _logger);
var message = new TrackDeletedEvent
{
TrackId = "01HXK...",
UserId = "user123",
ObjectKey = "tracks/01HXK...",
FileSizeBytes = 1024,
DeletedAt = DateTimeOffset.UtcNow,
ScheduledDeletionAt = DateTimeOffset.UtcNow.AddDays(30),
CorrelationId = Guid.NewGuid().ToString(),
Timestamp = DateTimeOffset.UtcNow
};
// Act
await handler.Handle(_mockContext, message);
// Assert
await cacheService.Received(1)
.InvalidateTrackCacheAsync(message.TrackId, message.UserId, Arg.Any<CancellationToken>());
}
```
## Topic Naming Convention
| Topic | Purpose | Producer | Consumer |
|-------|---------|----------|----------|
| `{prefix}-audio-events` | Audio upload notifications | Upload flow | Audio processor |
| `{prefix}-track-deletions` | Track deletion events | API service | Lifecycle worker |
| `{prefix}-minio-events` | MinIO bucket events | MinIO | Upload ingestor |Related Skills
add-datalake-consumer
Adds an event consumer that writes to Azure Data Lake (Parquet) following BI_SALES_RISK plan. Creates events/consumers/[Name]DataLakeCollector.ts subscribing to RabbitMQ, building Parquet rows, writing to /path_prefix/year=YYYY/month=MM/day=DD/. Use when adding DataLakeCollector in logging or similar “event to Data Lake” pipelines.
Build Your Kafka Skill
Create your Kafka event schema skill in one prompt, then learn to improve it throughout the chapter
modal-deployment
Run Python code in the cloud with serverless containers, GPUs, and autoscaling using Modal. This skill enables agents to generate code for deploying ML models, running batch jobs, serving APIs, and scaling compute-intensive workloads.
ux
This AI agent skill provides comprehensive guidance for creating professional and insightful User Experience (UX) designs, covering user research, information architecture, interaction design, visual guidance, and usability evaluation. It aims to produce actionable, user-centered solutions that avoid generic AI aesthetics.
astro
This skill provides essential Astro framework patterns, focusing on server-side rendering (SSR), static site generation (SSG), middleware, and TypeScript best practices. It helps AI agents implement secure authentication, manage API routes, and debug rendering behaviors within Astro projects.
whisper-transcribe
Transcribes audio and video files to text using OpenAI's Whisper CLI, enhanced with contextual grounding from local markdown files for improved accuracy.
tech-blog
Generates comprehensive technical blog posts, offering detailed explanations of system internals, architecture, and implementation, either through source code analysis or document-driven research.
vly-money
Generate crypto payment links for supported tokens and networks, manage access to X402 payment-protected content, and provide direct access to the vly.money wallet interface.
ontopo
An AI agent skill to search for Israeli restaurants, check table availability, view menus, and retrieve booking links via the Ontopo platform, acting as an unofficial interface to its data.
thor-skills
An entry point and router for AI agents to manage various THOR-related cybersecurity tasks, including running scans, analyzing logs, troubleshooting, and maintenance.
grail-miner
This skill assists in setting up, managing, and optimizing Grail miners on Bittensor Subnet 81, handling tasks like environment configuration, R2 storage, model checkpoint management, and performance tuning.
chrome-debug
This skill empowers AI agents to debug web applications and inspect browser behavior using the Chrome DevTools Protocol (CDP), offering both collaborative (headful) and automated (headless) modes.