From clickhouse-pack
Provides patterns for @clickhouse/client: typed queries, streaming inserts with backpressure, batch inserts with retry, error handling. For robust ClickHouse Node.js integrations.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin clickhouse-packThis skill is limited to using the following tools:
Production patterns for `@clickhouse/client` — typed queries, streaming inserts,
Insert data at scale and run analytical SQL queries in ClickHouse using @clickhouse/client. For aggregations, window functions, funnels, retention, and materialized views.
Reviews ClickHouse schemas, queries, configurations against 31 rules for schema design, query optimization, data ingestion, and agent connectivity. Cite rules in recommendations.
Provides ClickHouse table design patterns with MergeTree engines, query optimization techniques, and best practices for high-performance analytics and data engineering.
Share bugs, ideas, or general feedback.
Production patterns for @clickhouse/client — typed queries, streaming inserts,
error handling, and connection lifecycle management.
@clickhouse/client installed (see clickhouse-install-auth)import { createClient } from '@clickhouse/client';
const client = createClient({
url: process.env.CLICKHOUSE_HOST!,
username: process.env.CLICKHOUSE_USER ?? 'default',
password: process.env.CLICKHOUSE_PASSWORD ?? '',
});
// Generic typed query — returns parsed JSON rows
async function query<T>(sql: string, params?: Record<string, unknown>): Promise<T[]> {
const rs = await client.query({
query: sql,
query_params: params,
format: 'JSONEachRow',
});
return rs.json<T>();
}
// Usage
interface EventCount {
event_type: string;
cnt: string; // ClickHouse JSON returns numbers as strings
}
const rows = await query<EventCount>(
'SELECT event_type, count() AS cnt FROM events WHERE user_id = {user_id:UInt64} GROUP BY event_type',
{ user_id: 42 }
);
Note on parameterized queries: ClickHouse uses {name:Type} syntax for parameters,
not $1 or ?. Always use typed parameters to prevent SQL injection.
import { createClient } from '@clickhouse/client';
import { Readable } from 'stream';
// For large inserts, stream data instead of buffering in memory
async function streamInsert(rows: AsyncIterable<Record<string, unknown>>) {
const stream = new Readable({
objectMode: true,
read() {}, // push-based
});
const insertPromise = client.insert({
table: 'events',
values: stream,
format: 'JSONEachRow',
});
for await (const row of rows) {
// Backpressure: if push returns false, wait for drain
if (!stream.push(row)) {
await new Promise<void>((resolve) => stream.once('drain', resolve));
}
}
stream.push(null); // Signal end of stream
await insertPromise;
}
async function batchInsert<T extends Record<string, unknown>>(
table: string,
rows: T[],
batchSize = 10_000,
maxRetries = 3,
): Promise<{ inserted: number; errors: Error[] }> {
let inserted = 0;
const errors: Error[] = [];
for (let i = 0; i < rows.length; i += batchSize) {
const batch = rows.slice(i, i + batchSize);
let attempt = 0;
while (attempt < maxRetries) {
try {
await client.insert({
table,
values: batch,
format: 'JSONEachRow',
});
inserted += batch.length;
break;
} catch (err) {
attempt++;
if (attempt === maxRetries) {
errors.push(err as Error);
} else {
await new Promise((r) => setTimeout(r, 1000 * Math.pow(2, attempt)));
}
}
}
}
return { inserted, errors };
}
// For large result sets, stream rows instead of loading all into memory
async function* streamQuery<T>(sql: string): AsyncGenerator<T> {
const rs = await client.query({ query: sql, format: 'JSONEachRow' });
const stream = rs.stream();
for await (const rows of stream) {
// Each chunk is an array of rows (typically ~8KB worth)
for (const row of rows) {
yield (row as { json: () => T }).json();
}
}
}
// Usage
for await (const event of streamQuery<{ event_type: string }>('SELECT * FROM events')) {
process.stdout.write(`${event.event_type}\n`);
}
import { ClickHouseError } from '@clickhouse/client';
async function safeQuery<T>(sql: string): Promise<{ data: T[] | null; error: string | null }> {
try {
const rs = await client.query({ query: sql, format: 'JSONEachRow' });
return { data: await rs.json<T>(), error: null };
} catch (err) {
if (err instanceof ClickHouseError) {
// ClickHouse server-side error (syntax, permissions, etc.)
console.error(`ClickHouse error ${err.code}: ${err.message}`);
return { data: null, error: `CH-${err.code}: ${err.message}` };
}
// Network or client-side error
console.error('Client error:', (err as Error).message);
return { data: null, error: (err as Error).message };
}
}
// Graceful shutdown — important for flush of pending inserts
process.on('SIGTERM', async () => {
console.log('Closing ClickHouse connection...');
await client.close();
process.exit(0);
});
// Health check
async function isHealthy(): Promise<boolean> {
try {
const { success } = await client.ping();
return success;
} catch {
return false;
}
}
// Override server settings for specific queries
const rs = await client.query({
query: 'SELECT * FROM huge_table',
format: 'JSONEachRow',
clickhouse_settings: {
max_threads: 4, // Limit parallelism
max_memory_usage: 1_000_000_000, // 1GB memory limit
max_execution_time: 30, // 30s timeout
max_result_rows: 100_000, // Cap result size
},
});
| Format | Use Case | Streaming |
|---|---|---|
JSONEachRow | Standard JSON rows (NDJSON) | Yes |
JSONCompactEachRow | Arrays instead of objects (smaller) | Yes |
CSV | Export/import | Yes |
TabSeparated | CLI-compatible output | Yes |
Parquet | Analytics interchange | Yes |
Native | Fastest binary format | Yes |
| Error Code | Meaning | Action |
|---|---|---|
SYNTAX_ERROR (62) | Bad SQL | Fix query syntax |
UNKNOWN_TABLE (60) | Table doesn't exist | Check table name, database |
TOO_MANY_SIMULTANEOUS_QUERIES (202) | Connection overload | Reduce concurrency or pool |
MEMORY_LIMIT_EXCEEDED (241) | Query uses too much RAM | Add filters, use streaming |
TIMEOUT_EXCEEDED (159) | Query too slow | Optimize ORDER BY, add indexes |
Apply these patterns in clickhouse-core-workflow-a for real data modeling.