kafka-patterns
Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.
Best use case
kafka-patterns is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.
Teams using kafka-patterns should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/kafka-patterns/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How kafka-patterns Compares
| Feature / Agent | kafka-patterns | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Kafka Patterns
Event streaming patterns for Apache Kafka in distributed systems.
## Topic Design
```yaml
# Topic naming convention: <domain>.<entity>.<event-type>
# Examples:
# orders.order.created
# payments.payment.completed
# inventory.stock.updated
# Topic configuration
topics:
orders.order.created:
partitions: 12 # Match expected consumer parallelism
replication-factor: 3 # Survive 2 broker failures
retention.ms: 604800000 # 7 days
cleanup.policy: delete
orders.order.changelog:
partitions: 12
replication-factor: 3
retention.ms: -1 # Infinite retention (compacted)
cleanup.policy: compact # Keep latest value per key
min.compaction.lag.ms: 3600000 # 1h before compacting
```
## Producer Patterns
```typescript
import { Kafka, Partitioners, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'order-service',
brokers: process.env.KAFKA_BROKERS!.split(','),
})
const producer = kafka.producer({
idempotent: true, // Exactly-once producer
maxInFlightRequests: 5, // Max parallel requests
createPartitioner: Partitioners.DefaultPartitioner,
})
await producer.connect()
// Key-based partitioning: same key always goes to same partition (ordering)
async function publishOrderEvent(order: Order, eventType: string): Promise<void> {
await producer.send({
topic: `orders.order.${eventType}`,
compression: CompressionTypes.LZ4,
messages: [{
key: order.id, // Orders for same ID → same partition → ordered
value: JSON.stringify({
eventId: crypto.randomUUID(), // Idempotency key
eventType,
timestamp: new Date().toISOString(),
data: order,
}),
headers: {
'content-type': 'application/json',
'source': 'order-service',
'correlation-id': order.correlationId,
},
}],
})
}
// Batch publishing for throughput
async function publishBatch(events: OrderEvent[]): Promise<void> {
await producer.sendBatch({
topicMessages: [{
topic: 'orders.order.created',
messages: events.map(e => ({
key: e.orderId,
value: JSON.stringify(e),
})),
}],
})
}
```
## Consumer Group Patterns
```typescript
const consumer = kafka.consumer({
groupId: 'payment-processor', // Consumer group: shared topic consumption
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB per partition per fetch
retry: { retries: 5 },
})
await consumer.connect()
await consumer.subscribe({
topics: ['orders.order.created'],
fromBeginning: false, // Start from latest offset
})
await consumer.run({
autoCommit: false, // Manual commit for exactly-once
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, heartbeat }) => {
for (const message of batch.messages) {
try {
const event = JSON.parse(message.value!.toString())
// Idempotency check: skip already processed events
if (await isAlreadyProcessed(event.eventId)) {
resolveOffset(message.offset)
continue
}
await processOrderPayment(event.data)
await markAsProcessed(event.eventId)
resolveOffset(message.offset)
await commitOffsetsIfNecessary()
await heartbeat()
} catch (err) {
console.error(`Failed to process message at offset ${message.offset}:`, err)
// Send to DLQ instead of blocking the partition
await sendToDeadLetterQueue(message, err as Error)
resolveOffset(message.offset)
}
}
},
})
```
## Dead Letter Queue (DLQ)
```typescript
const DLQ_TOPIC = 'orders.order.created.dlq'
async function sendToDeadLetterQueue(
originalMessage: KafkaMessage,
error: Error
): Promise<void> {
await producer.send({
topic: DLQ_TOPIC,
messages: [{
key: originalMessage.key,
value: originalMessage.value,
headers: {
...originalMessage.headers,
'dlq-reason': error.message,
'dlq-timestamp': new Date().toISOString(),
'dlq-original-topic': 'orders.order.created',
'dlq-retry-count': '0',
},
}],
})
}
// DLQ consumer: retry or alert
async function processDLQ(): Promise<void> {
const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' })
await dlqConsumer.subscribe({ topics: [DLQ_TOPIC] })
await dlqConsumer.run({
eachMessage: async ({ message }) => {
const retryCount = parseInt(
message.headers?.['dlq-retry-count']?.toString() ?? '0'
)
if (retryCount >= 3) {
// Max retries exceeded: alert ops team
await alertOps({
topic: DLQ_TOPIC,
key: message.key?.toString(),
reason: message.headers?.['dlq-reason']?.toString(),
retries: retryCount,
})
return
}
// Retry with incremented count
try {
const event = JSON.parse(message.value!.toString())
await processOrderPayment(event.data)
} catch (err) {
// Re-enqueue with incremented retry count
await producer.send({
topic: DLQ_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'dlq-retry-count': String(retryCount + 1),
},
}],
})
}
},
})
}
```
## Partition Strategy
```typescript
// Custom partitioner: route by region for data locality
const regionalPartitioner = () => ({
partition: ({ topic, partitionMetadata, message }) => {
const region = message.headers?.['region']?.toString() ?? 'default'
const regionMap: Record<string, number> = {
'us-east': 0, 'us-west': 1,
'eu-west': 2, 'eu-east': 3,
'ap-southeast': 4,
}
const partition = regionMap[region]
if (partition !== undefined && partition < partitionMetadata.length) {
return partition
}
// Fallback: hash the key
const numPartitions = partitionMetadata.length
const hash = murmurHash(message.key?.toString() ?? '')
return Math.abs(hash) % numPartitions
}
})
```
## Checklist
- [ ] Idempotent producer enabled (exactly-once semantics)
- [ ] Key-based partitioning for ordered processing per entity
- [ ] Manual offset commit (not auto-commit) for at-least-once guarantee
- [ ] Dead Letter Queue for failed messages (max 3 retries then alert)
- [ ] Consumer idempotency check before processing (eventId dedup)
- [ ] Heartbeat during long-running batch processing
- [ ] Replication factor >= 3 for production topics
- [ ] Monitoring: consumer lag, throughput, error rate per consumer group
## Anti-Patterns
- Auto-commit offsets: message loss if consumer crashes before processing
- Single partition topics: no parallelism, bottleneck
- Unbounded retry: infinite retry loop blocks partition processing
- Large messages (>1MB): use claim-check pattern (store in S3, send reference)
- Skipping idempotency: duplicate processing on consumer restart
- Global ordering requirement: use single partition only when truly neededRelated Skills
websocket-patterns
Connection management, room patterns, reconnection strategies, message buffering, and binary protocol design.
vector-db-patterns
Embedding strategies, ANN algorithms, hybrid search, RAG chunking strategies, and reranking for semantic search and retrieval.
tracing-patterns
OpenTelemetry setup, span context propagation, sampling strategies, Jaeger queries
terraform-patterns
Module composition, state management, workspace strategy, provider versioning, and infrastructure-as-code best practices.
swift-patterns
SwiftUI view composition, @Observable patterns, async/await concurrency, TCA architecture, and Combine reactive streams.
springboot-patterns
Spring Boot architecture patterns, REST API design, layered services, data access, caching, async processing, and logging. Use for Java Spring Boot backend work.
seo-patterns
Meta tag patterns, structured data (JSON-LD), Core Web Vitals optimization, and SSR/SSG strategies for search visibility.
secret-patterns
30+ service-specific secret detection regex patterns, entropy-based detection, PEM/JWT/Base64 identification, and false positive filtering.
saas-payment-patterns
Payment provider abstraction, webhook security, subscription lifecycle, dunning flows, pricing models, invoicing, tax handling, and refund patterns for SaaS applications.
saas-auth-patterns
SaaS authentication and authorization patterns including JWT vs session strategies, multi-tenant isolation, RBAC, API key management, passwordless flows, MFA, and secure session handling.
saas-analytics-patterns
SaaS analytics event taxonomy, metric formulas (MRR, churn, LTV), provider-agnostic tracking, funnel analysis, cohort setup, and privacy-respecting instrumentation.
revenuecat-patterns
RevenueCat SDK entegrasyon pattern'leri. iOS (Swift), Android (Kotlin), React Native ve Flutter icin setup, offerings, entitlement checking, webhook integration, StoreKit 2 migration ve sandbox testing.