Skill

clickhouse-io

Install
1
Install the plugin
$
npx claudepluginhub xu-xiang/everything-claude-code-zh

Want just this skill?

Add to a custom plugin, then install with one command.

Description

ClickHouse 数据库模式、查询优化、分析以及高性能分析负载的数据工程最佳实践。

Tool Access

This skill uses the workspace's default tool permissions.

Skill Content

ClickHouse 分析模式 (ClickHouse Analytics Patterns)

针对高性能分析和数据工程的 ClickHouse 特定模式。

何时激活 (When to Activate)

  • 设计 ClickHouse 表结构(MergeTree 引擎选择)
  • 编写分析查询(聚合、窗口函数、连接)
  • 优化查询性能(分区剪枝、投影、物化视图)
  • 摄取海量数据(批量插入、Kafka 集成)
  • 将分析业务从 PostgreSQL/MySQL 迁移到 ClickHouse
  • 实现实时仪表盘或时间序列分析

概述 (Overview)

ClickHouse 是一款用于联机分析处理(OLAP)的列式数据库管理系统(DBMS)。它针对大型数据集的高速分析查询进行了优化。

核心特性:

  • 列式存储
  • 数据压缩
  • 并行查询执行
  • 分布式查询
  • 实时分析

表设计模式 (Table Design Patterns)

MergeTree 引擎(最常用)

CREATE TABLE markets_analytics (
    date Date,
    market_id String,
    market_name String,
    volume UInt64,
    trades UInt32,
    unique_traders UInt32,
    avg_trade_size Float64,
    created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;

ReplacingMergeTree(去重)

-- 针对可能存在重复的数据(例如来自多个源)
CREATE TABLE user_events (
    event_id String,
    user_id String,
    event_type String,
    timestamp DateTime,
    properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);

AggregatingMergeTree(预聚合)

-- 用于维护聚合指标
CREATE TABLE market_stats_hourly (
    hour DateTime,
    market_id String,
    total_volume AggregateFunction(sum, UInt64),
    total_trades AggregateFunction(count, UInt32),
    unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);

-- 查询聚合数据
SELECT
    hour,
    market_id,
    sumMerge(total_volume) AS volume,
    countMerge(total_trades) AS trades,
    uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;

查询优化模式 (Query Optimization Patterns)

高效过滤

-- ✅ 推荐:优先使用索引列
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
  AND market_id = 'market-123'
  AND volume > 1000
ORDER BY date DESC
LIMIT 100;

-- ❌ 不推荐:先对非索引列进行过滤
SELECT *
FROM markets_analytics
WHERE volume > 1000
  AND market_name LIKE '%election%'
  AND date >= '2025-01-01';

聚合 (Aggregations)

-- ✅ 推荐:使用 ClickHouse 特有的聚合函数
SELECT
    toStartOfDay(created_at) AS day,
    market_id,
    sum(volume) AS total_volume,
    count() AS total_trades,
    uniq(trader_id) AS unique_traders,
    avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;

-- ✅ 使用 quantile 计算百分位数(比 percentile 更高效)
SELECT
    quantile(0.50)(trade_size) AS median,
    quantile(0.95)(trade_size) AS p95,
    quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;

窗口函数 (Window Functions)

-- 计算累计总量
SELECT
    date,
    market_id,
    volume,
    sum(volume) OVER (
        PARTITION BY market_id
        ORDER BY date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;

数据插入模式 (Data Insertion Patterns)

批量插入(推荐)

import { ClickHouse } from 'clickhouse'

const clickhouse = new ClickHouse({
  url: process.env.CLICKHOUSE_URL,
  port: 8123,
  basicAuth: {
    username: process.env.CLICKHOUSE_USER,
    password: process.env.CLICKHOUSE_PASSWORD
  }
})

// ✅ 批量插入(高效)
async function bulkInsertTrades(trades: Trade[]) {
  const values = trades.map(trade => `(
    '${trade.id}',
    '${trade.market_id}',
    '${trade.user_id}',
    ${trade.amount},
    '${trade.timestamp.toISOString()}'
  )`).join(',')

  await clickhouse.query(`
    INSERT INTO trades (id, market_id, user_id, amount, timestamp)
    VALUES ${values}
  `).toPromise()
}

// ❌ 逐条插入(缓慢)
async function insertTrade(trade: Trade) {
  // 切勿在循环中这样做!
  await clickhouse.query(`
    INSERT INTO trades VALUES ('${trade.id}', ...)
  `).toPromise()
}

流式插入 (Streaming Insert)

// 用于持续的数据摄取
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'

async function streamInserts() {
  const stream = clickhouse.insert('trades').stream()

  for await (const batch of dataSource) {
    stream.write(batch)
  }

  await stream.end()
}

物化视图 (Materialized Views)

实时聚合

-- 为每小时统计创建物化视图
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
    toStartOfHour(timestamp) AS hour,
    market_id,
    sumState(amount) AS total_volume,
    countState() AS total_trades,
    uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;

-- 查询该物化视图
SELECT
    hour,
    market_id,
    sumMerge(total_volume) AS volume,
    countMerge(total_trades) AS trades,
    uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;

性能监控 (Performance Monitoring)

查询性能

-- 检查慢查询
SELECT
    query_id,
    user,
    query,
    query_duration_ms,
    read_rows,
    read_bytes,
    memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
  AND query_duration_ms > 1000
  AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;

表统计信息

-- 检查表大小
SELECT
    database,
    table,
    formatReadableSize(sum(bytes)) AS size,
    sum(rows) AS rows,
    max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;

常用分析查询 (Common Analytics Queries)

时间序列分析

-- 日活跃用户 (DAU)
SELECT
    toDate(timestamp) AS date,
    uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;

-- 留存分析
SELECT
    signup_date,
    countIf(days_since_signup = 0) AS day_0,
    countIf(days_since_signup = 1) AS day_1,
    countIf(days_since_signup = 7) AS day_7,
    countIf(days_since_signup = 30) AS day_30
FROM (
    SELECT
        user_id,
        min(toDate(timestamp)) AS signup_date,
        toDate(timestamp) AS activity_date,
        dateDiff('day', signup_date, activity_date) AS days_since_signup
    FROM events
    GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;

漏斗分析 (Funnel Analysis)

-- 转化漏斗
SELECT
    countIf(step = 'viewed_market') AS viewed,
    countIf(step = 'clicked_trade') AS clicked,
    countIf(step = 'completed_trade') AS completed,
    round(clicked / viewed * 100, 2) AS view_to_click_rate,
    round(completed / clicked * 100, 2) AS click_to_completion_rate
FROM (
    SELECT
        user_id,
        session_id,
        event_type AS step
    FROM events
    WHERE event_date = today()
)
GROUP BY session_id;

同期群分析 (Cohort Analysis)

-- 按注册月份划分的用户同期群
SELECT
    toStartOfMonth(signup_date) AS cohort,
    toStartOfMonth(activity_date) AS month,
    dateDiff('month', cohort, month) AS months_since_signup,
    count(DISTINCT user_id) AS active_users
FROM (
    SELECT
        user_id,
        min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
        toDate(timestamp) AS activity_date
    FROM events
)
GROUP BY cohort, month, months_since_signup
ORDER BY cohort, months_since_signup;

数据管道模式 (Data Pipeline Patterns)

ETL 模式

// 提取 (Extract), 转换 (Transform), 加载 (Load)
async function etlPipeline() {
  // 1. 从源端提取
  const rawData = await extractFromPostgres()

  // 2. 转换
  const transformed = rawData.map(row => ({
    date: new Date(row.created_at).toISOString().split('T')[0],
    market_id: row.market_slug,
    volume: parseFloat(row.total_volume),
    trades: parseInt(row.trade_count)
  }))

  // 3. 加载到 ClickHouse
  await bulkInsertToClickHouse(transformed)
}

// 定期运行
setInterval(etlPipeline, 60 * 60 * 1000)  // 每小时一次

变更数据捕获 (CDC)

// 监听 PostgreSQL 变更并同步到 ClickHouse
import { Client } from 'pg'

const pgClient = new Client({ connectionString: process.env.DATABASE_URL })

pgClient.query('LISTEN market_updates')

pgClient.on('notification', async (msg) => {
  const update = JSON.parse(msg.payload)

  await clickhouse.insert('market_updates', [
    {
      market_id: update.id,
      event_type: update.operation,  // INSERT, UPDATE, DELETE
      timestamp: new Date(),
      data: JSON.stringify(update.new_data)
    }
  ])
})

最佳实践 (Best Practices)

1. 分区策略 (Partitioning Strategy)

  • 按时间分区(通常为月或日)
  • 避免过多分区(会影响性能)
  • 使用 DATE 类型作为分区键

2. 排序键 (Ordering Key)

  • 将最常过滤的列放在前面
  • 考虑基数(高基数列在前)
  • 排序方式会影响压缩效果

3. 数据类型 (Data Types)

  • 使用最小且合适的类型(UInt32 vs UInt64)
  • 对重复出现的字符串使用 LowCardinality
  • 对分类数据使用 Enum

4. 避免 (Avoid)

  • 使用 SELECT *(应指定具体列)
  • 使用 FINAL(应在查询前合并数据)
  • 过多的 JOIN(分析场景应进行反规范化)
  • 频繁的小批量插入(应改用批量插入)

5. 监控 (Monitoring)

  • 跟踪查询性能
  • 监控磁盘使用情况
  • 检查合并操作 (merge operations)
  • 查看慢查询日志

记住:ClickHouse 在分析型负载方面表现卓越。请根据你的查询模式设计表结构,采用批量插入,并利用物化视图进行实时聚合。

Stats
Stars75
Forks15
Last CommitMar 5, 2026
Actions

Similar Skills