kafka-patterns

Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.

422 stars

Best use case

kafka-patterns is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.

Teams using kafka-patterns should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/kafka-patterns/SKILL.md --create-dirs "https://raw.githubusercontent.com/vibeeval/vibecosystem/main/skills/kafka-patterns/skill.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/kafka-patterns/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How kafka-patterns Compares

Feature / Agentkafka-patternsStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Kafka Patterns

Event streaming patterns for Apache Kafka in distributed systems.

## Topic Design

```yaml
# Topic naming convention: <domain>.<entity>.<event-type>
# Examples:
#   orders.order.created
#   payments.payment.completed
#   inventory.stock.updated

# Topic configuration
topics:
  orders.order.created:
    partitions: 12          # Match expected consumer parallelism
    replication-factor: 3   # Survive 2 broker failures
    retention.ms: 604800000 # 7 days
    cleanup.policy: delete

  orders.order.changelog:
    partitions: 12
    replication-factor: 3
    retention.ms: -1        # Infinite retention (compacted)
    cleanup.policy: compact # Keep latest value per key
    min.compaction.lag.ms: 3600000  # 1h before compacting
```

## Producer Patterns

```typescript
import { Kafka, Partitioners, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: process.env.KAFKA_BROKERS!.split(','),
})

const producer = kafka.producer({
  idempotent: true,                                    // Exactly-once producer
  maxInFlightRequests: 5,                              // Max parallel requests
  createPartitioner: Partitioners.DefaultPartitioner,
})

await producer.connect()

// Key-based partitioning: same key always goes to same partition (ordering)
async function publishOrderEvent(order: Order, eventType: string): Promise<void> {
  await producer.send({
    topic: `orders.order.${eventType}`,
    compression: CompressionTypes.LZ4,
    messages: [{
      key: order.id,                    // Orders for same ID → same partition → ordered
      value: JSON.stringify({
        eventId: crypto.randomUUID(),   // Idempotency key
        eventType,
        timestamp: new Date().toISOString(),
        data: order,
      }),
      headers: {
        'content-type': 'application/json',
        'source': 'order-service',
        'correlation-id': order.correlationId,
      },
    }],
  })
}

// Batch publishing for throughput
async function publishBatch(events: OrderEvent[]): Promise<void> {
  await producer.sendBatch({
    topicMessages: [{
      topic: 'orders.order.created',
      messages: events.map(e => ({
        key: e.orderId,
        value: JSON.stringify(e),
      })),
    }],
  })
}
```

## Consumer Group Patterns

```typescript
const consumer = kafka.consumer({
  groupId: 'payment-processor',     // Consumer group: shared topic consumption
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
  maxBytesPerPartition: 1048576,    // 1MB per partition per fetch
  retry: { retries: 5 },
})

await consumer.connect()
await consumer.subscribe({
  topics: ['orders.order.created'],
  fromBeginning: false,              // Start from latest offset
})

await consumer.run({
  autoCommit: false,                 // Manual commit for exactly-once
  eachBatchAutoResolve: false,

  eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, heartbeat }) => {
    for (const message of batch.messages) {
      try {
        const event = JSON.parse(message.value!.toString())

        // Idempotency check: skip already processed events
        if (await isAlreadyProcessed(event.eventId)) {
          resolveOffset(message.offset)
          continue
        }

        await processOrderPayment(event.data)
        await markAsProcessed(event.eventId)

        resolveOffset(message.offset)
        await commitOffsetsIfNecessary()
        await heartbeat()
      } catch (err) {
        console.error(`Failed to process message at offset ${message.offset}:`, err)
        // Send to DLQ instead of blocking the partition
        await sendToDeadLetterQueue(message, err as Error)
        resolveOffset(message.offset)
      }
    }
  },
})
```

## Dead Letter Queue (DLQ)

```typescript
const DLQ_TOPIC = 'orders.order.created.dlq'

async function sendToDeadLetterQueue(
  originalMessage: KafkaMessage,
  error: Error
): Promise<void> {
  await producer.send({
    topic: DLQ_TOPIC,
    messages: [{
      key: originalMessage.key,
      value: originalMessage.value,
      headers: {
        ...originalMessage.headers,
        'dlq-reason': error.message,
        'dlq-timestamp': new Date().toISOString(),
        'dlq-original-topic': 'orders.order.created',
        'dlq-retry-count': '0',
      },
    }],
  })
}

// DLQ consumer: retry or alert
async function processDLQ(): Promise<void> {
  const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' })
  await dlqConsumer.subscribe({ topics: [DLQ_TOPIC] })

  await dlqConsumer.run({
    eachMessage: async ({ message }) => {
      const retryCount = parseInt(
        message.headers?.['dlq-retry-count']?.toString() ?? '0'
      )

      if (retryCount >= 3) {
        // Max retries exceeded: alert ops team
        await alertOps({
          topic: DLQ_TOPIC,
          key: message.key?.toString(),
          reason: message.headers?.['dlq-reason']?.toString(),
          retries: retryCount,
        })
        return
      }

      // Retry with incremented count
      try {
        const event = JSON.parse(message.value!.toString())
        await processOrderPayment(event.data)
      } catch (err) {
        // Re-enqueue with incremented retry count
        await producer.send({
          topic: DLQ_TOPIC,
          messages: [{
            key: message.key,
            value: message.value,
            headers: {
              ...message.headers,
              'dlq-retry-count': String(retryCount + 1),
            },
          }],
        })
      }
    },
  })
}
```

## Partition Strategy

```typescript
// Custom partitioner: route by region for data locality
const regionalPartitioner = () => ({
  partition: ({ topic, partitionMetadata, message }) => {
    const region = message.headers?.['region']?.toString() ?? 'default'
    const regionMap: Record<string, number> = {
      'us-east': 0, 'us-west': 1,
      'eu-west': 2, 'eu-east': 3,
      'ap-southeast': 4,
    }
    const partition = regionMap[region]
    if (partition !== undefined && partition < partitionMetadata.length) {
      return partition
    }
    // Fallback: hash the key
    const numPartitions = partitionMetadata.length
    const hash = murmurHash(message.key?.toString() ?? '')
    return Math.abs(hash) % numPartitions
  }
})
```

## Checklist

- [ ] Idempotent producer enabled (exactly-once semantics)
- [ ] Key-based partitioning for ordered processing per entity
- [ ] Manual offset commit (not auto-commit) for at-least-once guarantee
- [ ] Dead Letter Queue for failed messages (max 3 retries then alert)
- [ ] Consumer idempotency check before processing (eventId dedup)
- [ ] Heartbeat during long-running batch processing
- [ ] Replication factor >= 3 for production topics
- [ ] Monitoring: consumer lag, throughput, error rate per consumer group

## Anti-Patterns

- Auto-commit offsets: message loss if consumer crashes before processing
- Single partition topics: no parallelism, bottleneck
- Unbounded retry: infinite retry loop blocks partition processing
- Large messages (>1MB): use claim-check pattern (store in S3, send reference)
- Skipping idempotency: duplicate processing on consumer restart
- Global ordering requirement: use single partition only when truly needed

Related Skills

websocket-patterns

422
from vibeeval/vibecosystem

Connection management, room patterns, reconnection strategies, message buffering, and binary protocol design.

vector-db-patterns

422
from vibeeval/vibecosystem

Embedding strategies, ANN algorithms, hybrid search, RAG chunking strategies, and reranking for semantic search and retrieval.

tracing-patterns

422
from vibeeval/vibecosystem

OpenTelemetry setup, span context propagation, sampling strategies, Jaeger queries

terraform-patterns

422
from vibeeval/vibecosystem

Module composition, state management, workspace strategy, provider versioning, and infrastructure-as-code best practices.

swift-patterns

422
from vibeeval/vibecosystem

SwiftUI view composition, @Observable patterns, async/await concurrency, TCA architecture, and Combine reactive streams.

springboot-patterns

422
from vibeeval/vibecosystem

Spring Boot architecture patterns, REST API design, layered services, data access, caching, async processing, and logging. Use for Java Spring Boot backend work.

seo-patterns

422
from vibeeval/vibecosystem

Meta tag patterns, structured data (JSON-LD), Core Web Vitals optimization, and SSR/SSG strategies for search visibility.

secret-patterns

422
from vibeeval/vibecosystem

30+ service-specific secret detection regex patterns, entropy-based detection, PEM/JWT/Base64 identification, and false positive filtering.

saas-payment-patterns

422
from vibeeval/vibecosystem

Payment provider abstraction, webhook security, subscription lifecycle, dunning flows, pricing models, invoicing, tax handling, and refund patterns for SaaS applications.

saas-auth-patterns

422
from vibeeval/vibecosystem

SaaS authentication and authorization patterns including JWT vs session strategies, multi-tenant isolation, RBAC, API key management, passwordless flows, MFA, and secure session handling.

saas-analytics-patterns

422
from vibeeval/vibecosystem

SaaS analytics event taxonomy, metric formulas (MRR, churn, LTV), provider-agnostic tracking, funnel analysis, cohort setup, and privacy-respecting instrumentation.

revenuecat-patterns

422
from vibeeval/vibecosystem

RevenueCat SDK entegrasyon pattern'leri. iOS (Swift), Android (Kotlin), React Native ve Flutter icin setup, offerings, entitlement checking, webhook integration, StoreKit 2 migration ve sandbox testing.