Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse. TRIGGERS - data pipeline, polars vs pandas, arrow format, clickhouse ml, efficient loading, zero-copy, memory optimization.
From devops-toolsnpx claudepluginhub terrylica/cc-skills --plugin devops-toolsThis skill is limited to using the following tools:
references/evolution-log.mdSearches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides Next.js 16+ Turbopack for faster dev via incremental bundling, FS caching, and HMR; covers webpack comparison, bundle analysis, and production builds.
Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.
ADR: 2026-01-22-polars-preference-hook (efficiency preferences framework)
Note: A PreToolUse hook enforces Polars preference. To use Pandas, add
# polars-exception: <reason>at file top.
Self-Evolving Skill: This skill improves through use. If instructions are wrong, parameters drifted, or a workaround was needed — fix this file immediately, don't defer. Only update for real, reproducible issues.
Use this skill when:
Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)
Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions
Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)
# BAD: 3 memory copies
df = pd.read_sql(query, conn) # Copy 1: DB → pandas
X = df[features].values # Copy 2: pandas → numpy
tensor = torch.from_numpy(X) # Copy 3: numpy → tensor
# Peak memory: 3x data size
# GOOD: 0-1 memory copies
import clickhouse_connect
import polars as pl
import torch
client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow in DB memory
df = pl.from_arrow(arrow_table) # Zero-copy view
X = df.select(features).to_numpy() # Single allocation
tensor = torch.from_numpy(X) # View (no copy)
# Peak memory: 1.2x data size
def query_arrow(client, query: str) -> pl.DataFrame:
"""ClickHouse → Arrow → Polars (zero-copy chain)."""
arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
return pl.from_arrow(arrow_table)
# Usage
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
# Polars has native ClickHouse support (see pola.rs for version requirements)
df = pl.read_database_uri(
query="SELECT * FROM bars",
uri="clickhouse://user:pass@host/db"
)
# For reproducible batch processing
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet") # Lazy, memory-mapped
from torch.utils.data import TensorDataset, DataLoader
# Accept both pandas and polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()
return (
torch.from_numpy(X).float(),
torch.from_numpy(y).float()
)
X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
class PolarsDataset(torch.utils.data.Dataset):
"""Memory-efficient dataset from Polars DataFrame."""
def __init__(self, df: pl.DataFrame, features: list[str], target: str):
self.arrow = df.to_arrow() # Arrow backing for zero-copy slicing
self.features = features
self.target = target
def __len__(self) -> int:
return self.arrow.num_rows
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
row = self.arrow.slice(idx, 1)
x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
return x, y
# Define transformations lazily (no computation yet)
pipeline = (
pl.scan_parquet("raw_data.parquet")
.filter(pl.col("timestamp") >= start_date)
.with_columns([
(pl.col("close").pct_change()).alias("returns"),
(pl.col("volume").log()).alias("log_volume"),
])
.select(features + [target])
)
# Execute only when needed
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
# Process file in chunks (never loads full file)
def process_large_file(path: str, chunk_size: int = 100_000):
reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
# Process each chunk
features = compute_features(batch)
yield features.to_numpy()
from pydantic import BaseModel, field_validator
class FeatureConfig(BaseModel):
features: list[str]
target: str
seq_len: int = 15
@field_validator("features")
@classmethod
def validate_features(cls, v):
required = {"returns_vs", "momentum_z", "atr_pct"}
missing = required - set(v)
if missing:
raise ValueError(f"Missing required features: {missing}")
return v
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
"""Fail-fast schema validation."""
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(
f"[{stage}] Missing columns: {missing}\n"
f"Available: {sorted(df.columns)}"
)
| Operation | Pandas | Polars | Speedup |
|---|---|---|---|
| Read CSV (1GB) | 45s | 4s | 11x |
| Filter rows | 2.1s | 0.4s | 5x |
| Group-by agg | 3.8s | 0.3s | 13x |
| Sort | 5.2s | 0.4s | 13x |
| Memory peak | 10GB | 2.5GB | 4x |
Benchmark: 50M rows, 20 columns, MacBook M2
polars = "<version>" to dependencies (see PyPI)query_arrow() in data clientpl.from_pandas() wrapper at trainer entryprepare_sequences() to accept both typespl.scan_*.collect() only before .to_numpy()# BAD: Convert back and forth
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas() # Why?
# BAD: Defeats lazy evaluation
df = pl.scan_parquet("data.parquet").collect() # Full load
filtered = df.filter(...) # After the fact
# GOOD: Filter before collect
df = pl.scan_parquet("data.parquet").filter(...).collect()
# BAD: Loads entire file
df = pl.read_parquet("huge_file.parquet")
# GOOD: Stream in chunks
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
process(batch)
| Issue | Cause | Solution |
|---|---|---|
| Memory spike during load | Collecting too early | Use lazy evaluation, call collect() only when needed |
| Arrow conversion fails | Unsupported data type | Check for object columns, convert to native types |
| ClickHouse connection error | Wrong port or credentials | Verify host:8123 (HTTP) or host:9000 (native) |
| Zero-copy not working | Intermediate pandas conversion | Remove to_pandas() calls, stay in Arrow/Polars |
| Polars hook blocking code | Pandas used without exception | Add # polars-exception: reason comment at file top |
| Slow group-by operations | Using pandas for large datasets | Migrate to Polars for 5-10x speedup |
| Schema validation failure | Column names case-sensitive | Verify exact column names from source |
| PyTorch DataLoader OOM | Loading full dataset into memory | Use PolarsDataset with Arrow backing for lazy access |
| Parquet scan performance | Not using predicate pushdown | Add filters before collect() for lazy evaluation |
| Type mismatch in tensor | Float64 vs Float32 mismatch | Explicitly cast with .cast(pl.Float32) before numpy |
After this skill completes, reflect before closing the task:
Do NOT defer. The next invocation inherits whatever you leave behind.