clickhouse-webhooks-events
Ingest data into ClickHouse from webhooks, Kafka, and streaming sources with batching, dedup, and exactly-once patterns. Use when building data ingestion pipelines, consuming webhook payloads, or integrating Kafka topics into ClickHouse. Trigger: "clickhouse ingestion", "clickhouse webhook", "clickhouse Kafka", "stream data to clickhouse", "clickhouse data pipeline".
Best use case
clickhouse-webhooks-events is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Ingest data into ClickHouse from webhooks, Kafka, and streaming sources with batching, dedup, and exactly-once patterns. Use when building data ingestion pipelines, consuming webhook payloads, or integrating Kafka topics into ClickHouse. Trigger: "clickhouse ingestion", "clickhouse webhook", "clickhouse Kafka", "stream data to clickhouse", "clickhouse data pipeline".
Teams using clickhouse-webhooks-events should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/clickhouse-webhooks-events/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How clickhouse-webhooks-events Compares
| Feature / Agent | clickhouse-webhooks-events | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Ingest data into ClickHouse from webhooks, Kafka, and streaming sources with batching, dedup, and exactly-once patterns. Use when building data ingestion pipelines, consuming webhook payloads, or integrating Kafka topics into ClickHouse. Trigger: "clickhouse ingestion", "clickhouse webhook", "clickhouse Kafka", "stream data to clickhouse", "clickhouse data pipeline".
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# ClickHouse Data Ingestion
## Overview
Build data ingestion pipelines into ClickHouse from HTTP webhooks, Kafka,
and streaming sources with proper batching, deduplication, and error handling.
## Prerequisites
- ClickHouse table with appropriate engine (see `clickhouse-core-workflow-a`)
- `@clickhouse/client` connected
## Instructions
### Step 1: Webhook Receiver with Batched Inserts
```typescript
import express from 'express';
import { createClient } from '@clickhouse/client';
const client = createClient({ url: process.env.CLICKHOUSE_HOST! });
const app = express();
app.use(express.json());
// Buffer for batching — ClickHouse hates one-row-at-a-time inserts
const buffer: Record<string, unknown>[] = [];
const BATCH_SIZE = 5_000;
const FLUSH_INTERVAL_MS = 5_000;
async function flushBuffer() {
if (buffer.length === 0) return;
const batch = buffer.splice(0, buffer.length);
try {
await client.insert({
table: 'analytics.events',
values: batch,
format: 'JSONEachRow',
});
console.log(`Flushed ${batch.length} events to ClickHouse`);
} catch (err) {
console.error('Insert failed, re-queuing:', (err as Error).message);
buffer.unshift(...batch); // Put back at front for retry
}
}
// Flush periodically
setInterval(flushBuffer, FLUSH_INTERVAL_MS);
// Webhook endpoint
app.post('/ingest', async (req, res) => {
const events = Array.isArray(req.body) ? req.body : [req.body];
for (const event of events) {
buffer.push({
event_type: event.type ?? 'unknown',
user_id: event.userId ?? 0,
properties: JSON.stringify(event.properties ?? {}),
created_at: new Date().toISOString().replace('T', ' ').slice(0, 19),
});
}
if (buffer.length >= BATCH_SIZE) {
await flushBuffer();
}
res.status(202).json({ queued: events.length, buffer_size: buffer.length });
});
```
### Step 2: Kafka Table Engine (Server-Side Ingestion)
```sql
-- Create a Kafka engine table (consumes messages automatically)
CREATE TABLE analytics.events_kafka (
event_type String,
user_id UInt64,
properties String,
timestamp DateTime
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2,
kafka_max_block_size = 65536;
-- Materialized view pipes Kafka → MergeTree automatically
CREATE MATERIALIZED VIEW analytics.events_kafka_mv
TO analytics.events
AS SELECT
event_type,
user_id,
properties,
timestamp AS created_at
FROM analytics.events_kafka;
-- ClickHouse now consumes from Kafka continuously!
-- Check lag:
SELECT * FROM system.kafka_consumers;
```
### Step 3: ClickPipes (ClickHouse Cloud Managed Ingestion)
ClickHouse Cloud offers **ClickPipes** — a managed ingestion service that
connects to Kafka, Confluent, Amazon MSK, S3, and GCS without code.
```
ClickPipes Configuration (Cloud Console):
1. Source: Amazon MSK / Confluent Cloud / Apache Kafka
2. Topic: events
3. Format: JSONEachRow
4. Target: analytics.events
5. Scaling: 2 consumers (auto-scales)
```
### Step 4: HTTP Interface Bulk Insert
```bash
# Insert from CSV file via HTTP (no client needed)
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+CSVWithNames' \
--data-binary @events.csv
# Insert from NDJSON file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+JSONEachRow' \
--data-binary @events.ndjson
# Insert from Parquet file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+Parquet' \
--data-binary @events.parquet
# Insert from remote URL (ClickHouse fetches it)
INSERT INTO analytics.events
SELECT * FROM url('https://data.example.com/events.csv', CSVWithNames);
# Insert from S3
INSERT INTO analytics.events
SELECT * FROM s3(
'https://my-bucket.s3.amazonaws.com/events/*.parquet',
'ACCESS_KEY', 'SECRET_KEY',
'Parquet'
);
```
### Step 5: Deduplication with ReplacingMergeTree
```sql
-- For idempotent ingestion (webhook retries, Kafka reprocessing)
CREATE TABLE analytics.events_dedup (
event_id String, -- Unique event identifier
event_type LowCardinality(String),
user_id UInt64,
properties String,
created_at DateTime,
_version UInt64 DEFAULT toUnixTimestamp(now())
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY event_id; -- Dedup key
-- Insert duplicate-safe: same event_id keeps latest _version
-- Query with FINAL for deduplicated results
SELECT * FROM analytics.events_dedup FINAL
WHERE created_at >= today() - 7;
```
### Step 6: Insert Monitoring
```sql
-- Track insert throughput
SELECT
toStartOfMinute(event_time) AS minute,
count() AS inserts,
sum(written_rows) AS rows_inserted,
formatReadableSize(sum(written_bytes)) AS bytes_inserted
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_kind = 'Insert'
AND event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
-- Check for insert errors
SELECT event_time, exception, substring(query, 1, 200)
FROM system.query_log
WHERE type = 'ExceptionWhileProcessing'
AND query_kind = 'Insert'
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY event_time DESC;
```
## Insert Best Practices
| Practice | Why |
|----------|-----|
| Batch 10K-100K rows per INSERT | Fewer parts, faster merges |
| Buffer 1-5 seconds for real-time | Balances latency vs throughput |
| Use `JSONEachRow` format | Client handles serialization |
| Compress with `ZSTD` on wire | Reduces network transfer |
| Use `ReplacingMergeTree` for retries | Handles duplicate delivery |
| Use `async_insert=1` for small batches | Server-side batching |
## Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| `Too many parts` | Single-row inserts | Batch inserts (10K+ rows) |
| `Cannot parse input` | Wrong format | Match format to data structure |
| `TIMEOUT` on large insert | Slow network | Enable compression, split batch |
| Duplicate events | Webhook retries | Use ReplacingMergeTree + event_id |
## Resources
- [Kafka Integration](https://clickhouse.com/docs/integrations/kafka)
- [ClickPipes](https://clickhouse.com/cloud/clickpipes)
- [HTTP Interface](https://clickhouse.com/docs/interfaces/http)
- [S3 Table Function](https://clickhouse.com/docs/sql-reference/table-functions/s3)
## Next Steps
For query and server performance, see `clickhouse-performance-tuning`.Related Skills
server-sent-events-setup
Server Sent Events Setup - Auto-activating skill for API Integration. Triggers on: server sent events setup, server sent events setup Part of the API Integration skill category.
exa-webhooks-events
Build event-driven integrations with Exa using scheduled monitors and content alerts. Use when building content monitoring, competitive intelligence pipelines, or scheduled search automation with Exa. Trigger with phrases like "exa monitor", "exa content alerts", "exa scheduled search", "exa event-driven", "exa notifications".
evernote-webhooks-events
Implement Evernote webhook notifications and sync events. Use when handling note changes, implementing real-time sync, or processing Evernote notifications. Trigger with phrases like "evernote webhook", "evernote events", "evernote sync", "evernote notifications".
emitting-api-events
Build event-driven APIs with webhooks, Server-Sent Events, and real-time notifications. Use when building event-driven API architectures. Trigger with phrases like "add webhooks", "implement events", or "create event-driven API".
elevenlabs-webhooks-events
Implement ElevenLabs webhook HMAC signature verification and event handling. Use when setting up webhook endpoints for transcription completion, call recording, or agent conversation events from ElevenLabs. Trigger: "elevenlabs webhook", "elevenlabs events", "elevenlabs webhook signature", "handle elevenlabs notifications", "elevenlabs post-call webhook", "elevenlabs transcription webhook".
documenso-webhooks-events
Implement Documenso webhook configuration and event handling. Use when setting up webhook endpoints, handling document events, or implementing real-time notifications for document signing. Trigger with phrases like "documenso webhook", "documenso events", "document completed webhook", "signing notification".
deepgram-webhooks-events
Implement Deepgram callback and webhook handling for async transcription. Use when implementing callback URLs, processing async transcription results, or handling Deepgram event notifications. Trigger: "deepgram callback", "deepgram webhook", "async transcription", "deepgram events", "deepgram notifications", "deepgram async".
databricks-webhooks-events
Configure Databricks job notifications, webhooks, and event handling. Use when setting up Slack/Teams notifications, configuring alerts, or integrating Databricks events with external systems. Trigger with phrases like "databricks webhook", "databricks notifications", "databricks alerts", "job failure notification", "databricks slack".
customerio-webhooks-events
Implement Customer.io webhook and reporting event handling. Use when processing email delivery events, click/open tracking, bounce handling, or streaming to a data warehouse. Trigger: "customer.io webhook", "customer.io events", "customer.io delivery status", "customer.io bounces", "customer.io open tracking".
coreweave-webhooks-events
Monitor CoreWeave cluster events and GPU workload status. Use when tracking pod lifecycle events, monitoring GPU utilization, or alerting on inference service health changes. Trigger with phrases like "coreweave events", "coreweave monitoring", "coreweave pod alerts", "coreweave gpu monitoring".
cohere-webhooks-events
Implement Cohere streaming event handling, SSE patterns, and connector webhooks. Use when building streaming UIs, handling chat/tool events, or registering Cohere connectors for RAG. Trigger with phrases like "cohere streaming", "cohere events", "cohere SSE", "cohere connectors", "cohere webhook".
coderabbit-webhooks-events
Implement CodeRabbit webhook signature validation and event handling. Use when setting up webhook endpoints, implementing signature verification, or handling CodeRabbit event notifications securely. Trigger with phrases like "coderabbit webhook", "coderabbit events", "coderabbit webhook signature", "handle coderabbit events", "coderabbit notifications".