From clari-pack
Optimizes Clari API exports with parallel multi-period processing, result caching, and incremental SQL warehouse loads. Use for slow exports, data pipeline bottlenecks, or reducing API calls.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin clari-packThis skill is limited to using the following tools:
Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.
Builds Python pipeline to export Clari forecasts, quotas, adjustments, CRM data to Snowflake, BigQuery, or PostgreSQL.
Optimizes Clay table enrichment throughput by reordering columns, adding conditional run rules, and pre-processing input data to cut processing time and boost hit rates.
Optimizes SalesLoft API performance with caching, incremental syncs, pagination strategies, and connection pooling. For slow responses, bulk operations, or cadence throughput.
Share bugs, ideas, or general feedback.
Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_export(
client,
forecast_name: str,
periods: list[str],
max_workers: int = 3,
) -> dict[str, list[dict]]:
results = {}
def export_period(period: str) -> tuple[str, list[dict]]:
data = client.export_and_download(forecast_name, period)
return period, data.get("entries", [])
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(export_period, p): p for p in periods
}
for future in as_completed(futures):
period, entries = future.result()
results[period] = entries
print(f" {period}: {len(entries)} entries")
return results
import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
class ExportCache:
def __init__(self, cache_dir: str = ".cache/clari", ttl_hours: int = 4):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, forecast: str, period: str) -> str:
return hashlib.md5(f"{forecast}:{period}".encode()).hexdigest()
def get(self, forecast: str, period: str) -> list[dict] | None:
path = self.cache_dir / f"{self._key(forecast, period)}.json"
if not path.exists():
return None
meta = json.loads(path.read_text())
cached_at = datetime.fromisoformat(meta["cached_at"])
if datetime.utcnow() - cached_at > self.ttl:
return None
return meta["entries"]
def set(self, forecast: str, period: str, entries: list[dict]):
path = self.cache_dir / f"{self._key(forecast, period)}.json"
path.write_text(json.dumps({
"cached_at": datetime.utcnow().isoformat(),
"entries": entries,
}))
-- Use MERGE for incremental updates instead of full reload
MERGE INTO clari_forecasts AS target
USING staging_clari AS source
ON target.owner_email = source.owner_email
AND target.time_period = source.time_period
AND target.forecast_name = source.forecast_name
WHEN MATCHED THEN UPDATE SET
forecast_amount = source.forecast_amount,
quota_amount = source.quota_amount,
crm_total = source.crm_total,
crm_closed = source.crm_closed,
exported_at = source.exported_at
WHEN NOT MATCHED THEN INSERT VALUES (
source.owner_name, source.owner_email, source.forecast_amount,
source.quota_amount, source.crm_total, source.crm_closed,
source.adjustment_amount, source.time_period,
source.exported_at, source.forecast_name
);
| Optimization | Before | After |
|---|---|---|
| Sequential 4-period export | 2 min | 40s (parallel) |
| Cache hit | 5-10s API call | <1ms |
| Full table reload | 30s | 5s (MERGE) |
For cost optimization, see clari-cost-tuning.