From clari-pack
Optimizes Clari API exports with parallel multi-period processing, result caching, and incremental SQL warehouse loads. Use for slow exports, data pipeline bottlenecks, or reducing API calls.
How this skill is triggered — by the user, by Claude, or both
Slash command
/clari-pack:clari-performance-tuningThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.
Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_export(
client,
forecast_name: str,
periods: list[str],
max_workers: int = 3,
) -> dict[str, list[dict]]:
results = {}
def export_period(period: str) -> tuple[str, list[dict]]:
data = client.export_and_download(forecast_name, period)
return period, data.get("entries", [])
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(export_period, p): p for p in periods
}
for future in as_completed(futures):
period, entries = future.result()
results[period] = entries
print(f" {period}: {len(entries)} entries")
return results
import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
class ExportCache:
def __init__(self, cache_dir: str = ".cache/clari", ttl_hours: int = 4):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, forecast: str, period: str) -> str:
return hashlib.md5(f"{forecast}:{period}".encode()).hexdigest()
def get(self, forecast: str, period: str) -> list[dict] | None:
path = self.cache_dir / f"{self._key(forecast, period)}.json"
if not path.exists():
return None
meta = json.loads(path.read_text())
cached_at = datetime.fromisoformat(meta["cached_at"])
if datetime.utcnow() - cached_at > self.ttl:
return None
return meta["entries"]
def set(self, forecast: str, period: str, entries: list[dict]):
path = self.cache_dir / f"{self._key(forecast, period)}.json"
path.write_text(json.dumps({
"cached_at": datetime.utcnow().isoformat(),
"entries": entries,
}))
-- Use MERGE for incremental updates instead of full reload
MERGE INTO clari_forecasts AS target
USING staging_clari AS source
ON target.owner_email = source.owner_email
AND target.time_period = source.time_period
AND target.forecast_name = source.forecast_name
WHEN MATCHED THEN UPDATE SET
forecast_amount = source.forecast_amount,
quota_amount = source.quota_amount,
crm_total = source.crm_total,
crm_closed = source.crm_closed,
exported_at = source.exported_at
WHEN NOT MATCHED THEN INSERT VALUES (
source.owner_name, source.owner_email, source.forecast_amount,
source.quota_amount, source.crm_total, source.crm_closed,
source.adjustment_amount, source.time_period,
source.exported_at, source.forecast_name
);
| Optimization | Before | After |
|---|---|---|
| Sequential 4-period export | 2 min | 40s (parallel) |
| Cache hit | 5-10s API call | <1ms |
| Full table reload | 30s | 5s (MERGE) |
For cost optimization, see clari-cost-tuning.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin clari-packBuilds Python pipeline to export Clari forecasts, quotas, adjustments, CRM data to Snowflake, BigQuery, or PostgreSQL.
Optimizes Clay table enrichment throughput by reordering columns, adding conditional run rules, and pre-processing input data to cut processing time and boost hit rates.
Optimizes SalesLoft API performance with caching, incremental syncs, pagination strategies, and connection pooling. For slow responses, bulk operations, or cadence throughput.