From langfuse-pack
Implements Langfuse SDK batching, exponential backoff retries, and concurrent limits to handle rate limits in high-volume LLM trace ingestion.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin langfuse-packThis skill is limited to using the following tools:
Handle Langfuse API rate limits with optimized SDK batching, exponential backoff with jitter, concurrent request limiting, and configurable sampling for ultra-high-volume workloads.
Optimizes Langfuse/OTel tracing in Node.js apps: benchmark overhead, tune batch flushing, reduce latency/memory for high-throughput production.
Provides expertise in Langfuse LLM observability: tracing, prompt management, evaluations, datasets. Integrates with LangChain, LlamaIndex, OpenAI for debugging and monitoring production LLM apps.
Implements LangChain rate limiting with retries, exponential backoff, concurrency control, provider fallbacks, and custom token bucket limiters for API quotas and 429 errors.
Share bugs, ideas, or general feedback.
Handle Langfuse API rate limits with optimized SDK batching, exponential backoff with jitter, concurrent request limiting, and configurable sampling for ultra-high-volume workloads.
The Langfuse SDK batches events internally before sending. Tuning batch settings is the first defense against rate limits.
// v3 Legacy: Direct configuration
import { Langfuse } from "langfuse";
const langfuse = new Langfuse({
flushAt: 50, // Events per batch (default: 15, max ~200)
flushInterval: 10000, // Milliseconds between flushes (default: 10000)
requestTimeout: 30000, // Timeout per batch request
});
// v4+: Configure via OTel span processor
import { LangfuseSpanProcessor } from "@langfuse/otel";
import { NodeSDK } from "@opentelemetry/sdk-node";
const processor = new LangfuseSpanProcessor({
exportIntervalMillis: 10000, // Flush interval
maxExportBatchSize: 50, // Events per batch
});
const sdk = new NodeSDK({ spanProcessors: [processor] });
sdk.start();
For custom API calls (scores, datasets, prompts) that hit rate limits:
async function withRetry<T>(
fn: () => Promise<T>,
options: { maxRetries?: number; baseDelayMs?: number; maxDelayMs?: number } = {}
): Promise<T> {
const { maxRetries = 5, baseDelayMs = 1000, maxDelayMs = 30000 } = options;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error: any) {
const status = error?.status || error?.response?.status;
// Only retry on rate limits (429) and server errors (5xx)
if (attempt === maxRetries || (status && status < 429)) {
throw error;
}
// Honor Retry-After header if present
const retryAfter = error?.response?.headers?.["retry-after"];
let delay: number;
if (retryAfter) {
delay = parseInt(retryAfter, 10) * 1000;
} else {
// Exponential backoff with jitter
delay = Math.min(baseDelayMs * Math.pow(2, attempt), maxDelayMs);
delay += Math.random() * 500; // Jitter
}
console.warn(`Rate limited. Retry ${attempt + 1}/${maxRetries} in ${Math.round(delay)}ms`);
await new Promise((r) => setTimeout(r, delay));
}
}
throw new Error("Unreachable");
}
// Usage with Langfuse client operations
const langfuse = new LangfuseClient();
await withRetry(() =>
langfuse.score.create({
traceId: "trace-123",
name: "quality",
value: 0.95,
dataType: "NUMERIC",
})
);
Use p-queue to cap concurrent Langfuse API calls:
import PQueue from "p-queue";
import { LangfuseClient } from "@langfuse/client";
const langfuse = new LangfuseClient();
// Max 10 concurrent API calls, 50 per second
const queue = new PQueue({
concurrency: 10,
interval: 1000,
intervalCap: 50,
});
// Queue score submissions
async function queueScore(params: {
traceId: string;
name: string;
value: number;
}) {
return queue.add(() =>
langfuse.score.create({
...params,
dataType: "NUMERIC",
})
);
}
// Queue dataset item creation
async function queueDatasetItem(datasetName: string, item: any) {
return queue.add(() =>
langfuse.api.datasetItems.create({
datasetName,
input: item.input,
expectedOutput: item.expectedOutput,
})
);
}
// Monitor queue health
setInterval(() => {
console.log(`Queue: ${queue.pending} pending, ${queue.size} queued`);
}, 10000);
When tracing volume exceeds rate limits, sample traces instead of dropping them:
import { observe, updateActiveObservation, startActiveObservation } from "@langfuse/tracing";
class TraceSampler {
private rate: number;
private windowCounts: number[] = [];
private windowMs = 60000; // 1 minute window
private maxPerWindow: number;
constructor(sampleRate: number, maxPerMinute: number) {
this.rate = sampleRate;
this.maxPerWindow = maxPerMinute;
}
shouldSample(tags?: string[]): boolean {
// Always sample errors
if (tags?.includes("error") || tags?.includes("critical")) {
return true;
}
// Check window limit
const now = Date.now();
this.windowCounts = this.windowCounts.filter((t) => t > now - this.windowMs);
if (this.windowCounts.length >= this.maxPerWindow) {
return false;
}
// Probabilistic sampling
if (Math.random() > this.rate) {
return false;
}
this.windowCounts.push(now);
return true;
}
}
// 10% sampling, max 1000 traces/minute
const sampler = new TraceSampler(0.1, 1000);
async function sampledOperation(name: string, fn: () => Promise<any>) {
if (!sampler.shouldSample()) {
return fn(); // Run without tracing
}
return startActiveObservation(name, async () => {
updateActiveObservation({ metadata: { sampled: true } });
return fn();
});
}
| Tier | Traces/min | Batch Size | Strategy |
|---|---|---|---|
| Hobby | ~500 | 15 | Default settings |
| Pro | ~5,000 | 50 | Increase flushAt |
| Team | ~10,000 | 100 | + Queue-based limiting |
| Enterprise | Custom | Custom | + Sampling |
| Error | Response | Action |
|---|---|---|
429 Too Many Requests | Retry-After: N | Backoff for N seconds |
503 Service Unavailable | Server overloaded | Backoff 30s+ |
| Flush timeout | Large batch | Reduce flushAt, increase requestTimeout |
| Memory growth | Queue backup | Add maxSize to PQueue |