clickhouse-webhooks-events

Ingest data into ClickHouse from webhooks, Kafka, and streaming sources with batching, dedup, and exactly-once patterns. Use when building data ingestion pipelines, consuming webhook payloads, or integrating Kafka topics into ClickHouse. Trigger: "clickhouse ingestion", "clickhouse webhook", "clickhouse Kafka", "stream data to clickhouse", "clickhouse data pipeline".

1,868 stars

Best use case

clickhouse-webhooks-events is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Ingest data into ClickHouse from webhooks, Kafka, and streaming sources with batching, dedup, and exactly-once patterns. Use when building data ingestion pipelines, consuming webhook payloads, or integrating Kafka topics into ClickHouse. Trigger: "clickhouse ingestion", "clickhouse webhook", "clickhouse Kafka", "stream data to clickhouse", "clickhouse data pipeline".

Teams using clickhouse-webhooks-events should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/clickhouse-webhooks-events/SKILL.md --create-dirs "https://raw.githubusercontent.com/jeremylongshore/claude-code-plugins-plus-skills/main/plugins/saas-packs/clickhouse-pack/skills/clickhouse-webhooks-events/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/clickhouse-webhooks-events/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How clickhouse-webhooks-events Compares

Feature / Agentclickhouse-webhooks-eventsStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Ingest data into ClickHouse from webhooks, Kafka, and streaming sources with batching, dedup, and exactly-once patterns. Use when building data ingestion pipelines, consuming webhook payloads, or integrating Kafka topics into ClickHouse. Trigger: "clickhouse ingestion", "clickhouse webhook", "clickhouse Kafka", "stream data to clickhouse", "clickhouse data pipeline".

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# ClickHouse Data Ingestion

## Overview

Build data ingestion pipelines into ClickHouse from HTTP webhooks, Kafka,
and streaming sources with proper batching, deduplication, and error handling.

## Prerequisites

- ClickHouse table with appropriate engine (see `clickhouse-core-workflow-a`)
- `@clickhouse/client` connected

## Instructions

### Step 1: Webhook Receiver with Batched Inserts

```typescript
import express from 'express';
import { createClient } from '@clickhouse/client';

const client = createClient({ url: process.env.CLICKHOUSE_HOST! });
const app = express();
app.use(express.json());

// Buffer for batching — ClickHouse hates one-row-at-a-time inserts
const buffer: Record<string, unknown>[] = [];
const BATCH_SIZE = 5_000;
const FLUSH_INTERVAL_MS = 5_000;

async function flushBuffer() {
  if (buffer.length === 0) return;
  const batch = buffer.splice(0, buffer.length);

  try {
    await client.insert({
      table: 'analytics.events',
      values: batch,
      format: 'JSONEachRow',
    });
    console.log(`Flushed ${batch.length} events to ClickHouse`);
  } catch (err) {
    console.error('Insert failed, re-queuing:', (err as Error).message);
    buffer.unshift(...batch);  // Put back at front for retry
  }
}

// Flush periodically
setInterval(flushBuffer, FLUSH_INTERVAL_MS);

// Webhook endpoint
app.post('/ingest', async (req, res) => {
  const events = Array.isArray(req.body) ? req.body : [req.body];

  for (const event of events) {
    buffer.push({
      event_type: event.type ?? 'unknown',
      user_id: event.userId ?? 0,
      properties: JSON.stringify(event.properties ?? {}),
      created_at: new Date().toISOString().replace('T', ' ').slice(0, 19),
    });
  }

  if (buffer.length >= BATCH_SIZE) {
    await flushBuffer();
  }

  res.status(202).json({ queued: events.length, buffer_size: buffer.length });
});
```

### Step 2: Kafka Table Engine (Server-Side Ingestion)

```sql
-- Create a Kafka engine table (consumes messages automatically)
CREATE TABLE analytics.events_kafka (
    event_type  String,
    user_id     UInt64,
    properties  String,
    timestamp   DateTime
)
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse_consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 2,
    kafka_max_block_size = 65536;

-- Materialized view pipes Kafka → MergeTree automatically
CREATE MATERIALIZED VIEW analytics.events_kafka_mv
TO analytics.events
AS SELECT
    event_type,
    user_id,
    properties,
    timestamp AS created_at
FROM analytics.events_kafka;

-- ClickHouse now consumes from Kafka continuously!
-- Check lag:
SELECT * FROM system.kafka_consumers;
```

### Step 3: ClickPipes (ClickHouse Cloud Managed Ingestion)

ClickHouse Cloud offers **ClickPipes** — a managed ingestion service that
connects to Kafka, Confluent, Amazon MSK, S3, and GCS without code.

```
ClickPipes Configuration (Cloud Console):
1. Source: Amazon MSK / Confluent Cloud / Apache Kafka
2. Topic: events
3. Format: JSONEachRow
4. Target: analytics.events
5. Scaling: 2 consumers (auto-scales)
```

### Step 4: HTTP Interface Bulk Insert

```bash
# Insert from CSV file via HTTP (no client needed)
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+CSVWithNames' \
  --data-binary @events.csv

# Insert from NDJSON file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+JSONEachRow' \
  --data-binary @events.ndjson

# Insert from Parquet file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+Parquet' \
  --data-binary @events.parquet

# Insert from remote URL (ClickHouse fetches it)
INSERT INTO analytics.events
SELECT * FROM url('https://data.example.com/events.csv', CSVWithNames);

# Insert from S3
INSERT INTO analytics.events
SELECT * FROM s3(
    'https://my-bucket.s3.amazonaws.com/events/*.parquet',
    'ACCESS_KEY', 'SECRET_KEY',
    'Parquet'
);
```

### Step 5: Deduplication with ReplacingMergeTree

```sql
-- For idempotent ingestion (webhook retries, Kafka reprocessing)
CREATE TABLE analytics.events_dedup (
    event_id    String,           -- Unique event identifier
    event_type  LowCardinality(String),
    user_id     UInt64,
    properties  String,
    created_at  DateTime,
    _version    UInt64 DEFAULT toUnixTimestamp(now())
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY event_id;               -- Dedup key

-- Insert duplicate-safe: same event_id keeps latest _version
-- Query with FINAL for deduplicated results
SELECT * FROM analytics.events_dedup FINAL
WHERE created_at >= today() - 7;
```

### Step 6: Insert Monitoring

```sql
-- Track insert throughput
SELECT
    toStartOfMinute(event_time) AS minute,
    count() AS inserts,
    sum(written_rows) AS rows_inserted,
    formatReadableSize(sum(written_bytes)) AS bytes_inserted
FROM system.query_log
WHERE type = 'QueryFinish'
  AND query_kind = 'Insert'
  AND event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;

-- Check for insert errors
SELECT event_time, exception, substring(query, 1, 200)
FROM system.query_log
WHERE type = 'ExceptionWhileProcessing'
  AND query_kind = 'Insert'
  AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY event_time DESC;
```

## Insert Best Practices

| Practice | Why |
|----------|-----|
| Batch 10K-100K rows per INSERT | Fewer parts, faster merges |
| Buffer 1-5 seconds for real-time | Balances latency vs throughput |
| Use `JSONEachRow` format | Client handles serialization |
| Compress with `ZSTD` on wire | Reduces network transfer |
| Use `ReplacingMergeTree` for retries | Handles duplicate delivery |
| Use `async_insert=1` for small batches | Server-side batching |

## Error Handling

| Error | Cause | Solution |
|-------|-------|----------|
| `Too many parts` | Single-row inserts | Batch inserts (10K+ rows) |
| `Cannot parse input` | Wrong format | Match format to data structure |
| `TIMEOUT` on large insert | Slow network | Enable compression, split batch |
| Duplicate events | Webhook retries | Use ReplacingMergeTree + event_id |

## Resources

- [Kafka Integration](https://clickhouse.com/docs/integrations/kafka)
- [ClickPipes](https://clickhouse.com/cloud/clickpipes)
- [HTTP Interface](https://clickhouse.com/docs/interfaces/http)
- [S3 Table Function](https://clickhouse.com/docs/sql-reference/table-functions/s3)

## Next Steps

For query and server performance, see `clickhouse-performance-tuning`.

Related Skills

workhuman-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Workhuman webhooks events for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman webhooks events".

wispr-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Wispr Flow webhooks events for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr webhooks events".

windsurf-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Build Windsurf extensions and integrate with VS Code extension API events. Use when building custom Windsurf extensions, tracking editor events, or integrating Windsurf with external tools via extension development. Trigger with phrases like "windsurf extension", "windsurf events", "windsurf plugin", "build windsurf extension", "windsurf API".

webflow-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Implement Webflow webhook registration, signature verification, and event handling for form_submission, site_publish, ecomm_new_order, page_created, and more. Use when setting up webhook endpoints, implementing event-driven workflows, or handling Webflow notifications. Trigger with phrases like "webflow webhook", "webflow events", "webflow webhook signature", "handle webflow events", "webflow notifications".

vercel-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Implement Vercel webhook handling with signature verification and event processing. Use when setting up webhook endpoints, processing deployment events, or building integrations that react to Vercel deployment lifecycle. Trigger with phrases like "vercel webhook", "vercel events", "vercel deployment.ready", "handle vercel events", "vercel webhook signature".

veeva-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Veeva Vault webhooks events for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva webhooks events".

vastai-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Build event-driven workflows around Vast.ai instance lifecycle events. Use when monitoring instance status changes, implementing auto-recovery, or building event-driven GPU orchestration. Trigger with phrases like "vastai events", "vastai instance monitoring", "vastai status changes", "vastai lifecycle events".

twinmind-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Handle TwinMind meeting events including transcription completion, action item extraction, and calendar sync notifications. Use when implementing webhooks events, or managing TwinMind meeting AI operations. Trigger with phrases like "twinmind webhooks events", "twinmind webhooks events".

together-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Together AI webhooks events for inference, fine-tuning, and model deployment. Use when working with Together AI's OpenAI-compatible API. Trigger: "together webhooks events".

techsmith-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

TechSmith webhooks events for Snagit COM API and Camtasia automation. Use when working with TechSmith screen capture and video editing automation. Trigger: "techsmith webhooks events".

supabase-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

Implement Supabase database webhooks, pg_net async HTTP, LISTEN/NOTIFY, and Edge Function event handlers with signature verification. Use when setting up database webhooks for INSERT/UPDATE/DELETE events, sending HTTP requests from PostgreSQL triggers, handling Realtime postgres_changes as an event source, or building event-driven architectures. Trigger with phrases like "supabase webhook", "database events", "pg_net trigger", "supabase LISTEN NOTIFY", "webhook signature verify", "supabase event-driven", "supabase_functions.http_request".

stackblitz-webhooks-events

1868
from jeremylongshore/claude-code-plugins-plus-skills

WebContainer lifecycle events: server-ready, port changes, error handling. Use when working with WebContainers or StackBlitz SDK. Trigger: "webcontainer events".