From clickhouse-pack
Ingests data into ClickHouse from webhooks via Node.js/Express with batching, Kafka table engines, and streaming sources for pipelines with dedup and error handling.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin clickhouse-packThis skill is limited to using the following tools:
Build data ingestion pipelines into ClickHouse from HTTP webhooks, Kafka,
Insert data at scale and run analytical SQL queries in ClickHouse using @clickhouse/client. For aggregations, window functions, funnels, retention, and materialized views.
Provides ClickHouse patterns for MergeTree table design, query optimization, aggregations, data ingestion, and analytics. Useful for OLAP workloads, schema design, performance tuning, and migrations from PostgreSQL/MySQL.
Provides ClickHouse table design patterns with MergeTree engines, query optimization techniques, and best practices for high-performance analytics and data engineering.
Share bugs, ideas, or general feedback.
Build data ingestion pipelines into ClickHouse from HTTP webhooks, Kafka, and streaming sources with proper batching, deduplication, and error handling.
clickhouse-core-workflow-a)@clickhouse/client connectedimport express from 'express';
import { createClient } from '@clickhouse/client';
const client = createClient({ url: process.env.CLICKHOUSE_HOST! });
const app = express();
app.use(express.json());
// Buffer for batching — ClickHouse hates one-row-at-a-time inserts
const buffer: Record<string, unknown>[] = [];
const BATCH_SIZE = 5_000;
const FLUSH_INTERVAL_MS = 5_000;
async function flushBuffer() {
if (buffer.length === 0) return;
const batch = buffer.splice(0, buffer.length);
try {
await client.insert({
table: 'analytics.events',
values: batch,
format: 'JSONEachRow',
});
console.log(`Flushed ${batch.length} events to ClickHouse`);
} catch (err) {
console.error('Insert failed, re-queuing:', (err as Error).message);
buffer.unshift(...batch); // Put back at front for retry
}
}
// Flush periodically
setInterval(flushBuffer, FLUSH_INTERVAL_MS);
// Webhook endpoint
app.post('/ingest', async (req, res) => {
const events = Array.isArray(req.body) ? req.body : [req.body];
for (const event of events) {
buffer.push({
event_type: event.type ?? 'unknown',
user_id: event.userId ?? 0,
properties: JSON.stringify(event.properties ?? {}),
created_at: new Date().toISOString().replace('T', ' ').slice(0, 19),
});
}
if (buffer.length >= BATCH_SIZE) {
await flushBuffer();
}
res.status(202).json({ queued: events.length, buffer_size: buffer.length });
});
-- Create a Kafka engine table (consumes messages automatically)
CREATE TABLE analytics.events_kafka (
event_type String,
user_id UInt64,
properties String,
timestamp DateTime
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2,
kafka_max_block_size = 65536;
-- Materialized view pipes Kafka → MergeTree automatically
CREATE MATERIALIZED VIEW analytics.events_kafka_mv
TO analytics.events
AS SELECT
event_type,
user_id,
properties,
timestamp AS created_at
FROM analytics.events_kafka;
-- ClickHouse now consumes from Kafka continuously!
-- Check lag:
SELECT * FROM system.kafka_consumers;
ClickHouse Cloud offers ClickPipes — a managed ingestion service that connects to Kafka, Confluent, Amazon MSK, S3, and GCS without code.
ClickPipes Configuration (Cloud Console):
1. Source: Amazon MSK / Confluent Cloud / Apache Kafka
2. Topic: events
3. Format: JSONEachRow
4. Target: analytics.events
5. Scaling: 2 consumers (auto-scales)
# Insert from CSV file via HTTP (no client needed)
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+CSVWithNames' \
--data-binary @events.csv
# Insert from NDJSON file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+JSONEachRow' \
--data-binary @events.ndjson
# Insert from Parquet file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+Parquet' \
--data-binary @events.parquet
# Insert from remote URL (ClickHouse fetches it)
INSERT INTO analytics.events
SELECT * FROM url('https://data.example.com/events.csv', CSVWithNames);
# Insert from S3
INSERT INTO analytics.events
SELECT * FROM s3(
'https://my-bucket.s3.amazonaws.com/events/*.parquet',
'ACCESS_KEY', 'SECRET_KEY',
'Parquet'
);
-- For idempotent ingestion (webhook retries, Kafka reprocessing)
CREATE TABLE analytics.events_dedup (
event_id String, -- Unique event identifier
event_type LowCardinality(String),
user_id UInt64,
properties String,
created_at DateTime,
_version UInt64 DEFAULT toUnixTimestamp(now())
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY event_id; -- Dedup key
-- Insert duplicate-safe: same event_id keeps latest _version
-- Query with FINAL for deduplicated results
SELECT * FROM analytics.events_dedup FINAL
WHERE created_at >= today() - 7;
-- Track insert throughput
SELECT
toStartOfMinute(event_time) AS minute,
count() AS inserts,
sum(written_rows) AS rows_inserted,
formatReadableSize(sum(written_bytes)) AS bytes_inserted
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_kind = 'Insert'
AND event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
-- Check for insert errors
SELECT event_time, exception, substring(query, 1, 200)
FROM system.query_log
WHERE type = 'ExceptionWhileProcessing'
AND query_kind = 'Insert'
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY event_time DESC;
| Practice | Why |
|---|---|
| Batch 10K-100K rows per INSERT | Fewer parts, faster merges |
| Buffer 1-5 seconds for real-time | Balances latency vs throughput |
Use JSONEachRow format | Client handles serialization |
Compress with ZSTD on wire | Reduces network transfer |
Use ReplacingMergeTree for retries | Handles duplicate delivery |
Use async_insert=1 for small batches | Server-side batching |
| Error | Cause | Solution |
|---|---|---|
Too many parts | Single-row inserts | Batch inserts (10K+ rows) |
Cannot parse input | Wrong format | Match format to data structure |
TIMEOUT on large insert | Slow network | Enable compression, split batch |
| Duplicate events | Webhook retries | Use ReplacingMergeTree + event_id |
For query and server performance, see clickhouse-performance-tuning.