From langfuse-pack
Provides Langfuse reference architecture patterns for production LLM observability: singleton SDKs, AsyncLocalStorage context propagation, OpenTelemetry integration, and scale tiers up to 1M+ traces/day.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin langfuse-packThis skill is limited to using the following tools:
Production-grade architecture patterns for Langfuse LLM observability: singleton SDK, context propagation with `AsyncLocalStorage`, cross-service trace correlation, multi-environment configurations, and scale strategies.
Provides Langfuse SDK patterns for singleton clients, observe wrappers, nested traces, session tracking, and OTel integration for LLM observability in Node.js apps.
Provides expertise in Langfuse LLM observability: tracing, prompt management, evaluations, datasets. Integrates with LangChain, LlamaIndex, OpenAI for debugging and monitoring production LLM apps.
Provides expertise on Langfuse open-source LLM observability platform for tracing, prompt management, evaluation, datasets, and integrations with LangChain, LlamaIndex, OpenAI. For debugging and monitoring LLM apps.
Share bugs, ideas, or general feedback.
Production-grade architecture patterns for Langfuse LLM observability: singleton SDK, context propagation with AsyncLocalStorage, cross-service trace correlation, multi-environment configurations, and scale strategies.
@langfuse/tracing, @langfuse/otel, @opentelemetry/sdk-node| Tier | Scale | Architecture | Langfuse Host |
|---|---|---|---|
| Starter | < 100K traces/day | Direct SDK, Cloud | Langfuse Cloud |
| Growth | 100K-1M traces/day | Singleton + batching | Cloud or Self-hosted |
| Enterprise | 1M+ traces/day | Queue-buffered + sampling | Self-hosted (HA) |
// src/lib/tracing.ts -- Single module for all tracing
import { LangfuseClient } from "@langfuse/client";
import { LangfuseSpanProcessor } from "@langfuse/otel";
import { NodeSDK } from "@opentelemetry/sdk-node";
import { AsyncLocalStorage } from "async_hooks";
// Singleton OTel SDK
let sdk: NodeSDK | null = null;
export function initTracing() {
if (sdk) return sdk;
sdk = new NodeSDK({
spanProcessors: [
new LangfuseSpanProcessor({
exportIntervalMillis: 5000,
maxExportBatchSize: 50,
}),
],
});
sdk.start();
// Graceful shutdown
for (const signal of ["SIGTERM", "SIGINT"]) {
process.on(signal, async () => {
console.log(`Received ${signal}, flushing traces...`);
await sdk?.shutdown();
process.exit(0);
});
}
return sdk;
}
// Singleton client for non-tracing operations
let client: LangfuseClient | null = null;
export function getLangfuseClient(): LangfuseClient {
if (!client) client = new LangfuseClient();
return client;
}
// Request context for user/session tracking
interface RequestContext {
userId?: string;
sessionId?: string;
requestId: string;
}
const requestStore = new AsyncLocalStorage<RequestContext>();
export function getRequestContext(): RequestContext | undefined {
return requestStore.getStore();
}
export function runWithContext<T>(ctx: RequestContext, fn: () => T): T {
return requestStore.run(ctx, fn);
}
// src/middleware/tracing.ts
import { startActiveObservation, updateActiveObservation } from "@langfuse/tracing";
import { runWithContext, getRequestContext } from "../lib/tracing";
import { randomUUID } from "crypto";
import type { Request, Response, NextFunction } from "express";
export function langfuseMiddleware() {
return (req: Request, res: Response, next: NextFunction) => {
const ctx = {
requestId: req.headers["x-request-id"]?.toString() || randomUUID(),
userId: req.headers["x-user-id"]?.toString(),
sessionId: req.headers["x-session-id"]?.toString(),
};
runWithContext(ctx, () => {
startActiveObservation(`${req.method} ${req.path}`, async () => {
updateActiveObservation({
input: {
method: req.method,
path: req.path,
query: req.query,
},
metadata: {
userId: ctx.userId,
sessionId: ctx.sessionId,
requestId: ctx.requestId,
},
});
// Capture response
const originalEnd = res.end.bind(res);
res.end = function (...args: any[]) {
updateActiveObservation({
output: { statusCode: res.statusCode },
});
return originalEnd(...args);
} as any;
next();
}).catch(next);
});
};
}
// Usage
import express from "express";
import { initTracing } from "./lib/tracing";
import { langfuseMiddleware } from "./middleware/tracing";
initTracing();
const app = express();
app.use(langfuseMiddleware());
For microservices, propagate trace context via HTTP headers:
// Service A: Inject trace context into outbound requests
import { context, propagation } from "@opentelemetry/api";
async function callServiceB(data: any) {
const headers: Record<string, string> = {};
// OTel propagation injects traceparent header automatically
propagation.inject(context.active(), headers);
const response = await fetch("https://service-b.internal/api/process", {
method: "POST",
headers: {
"Content-Type": "application/json",
...headers, // Includes traceparent, tracestate
},
body: JSON.stringify(data),
});
return response.json();
}
// Service B: Extract and continue trace context
import { context, propagation } from "@opentelemetry/api";
import { startActiveObservation, updateActiveObservation } from "@langfuse/tracing";
app.post("/api/process", async (req, res) => {
// OTel automatically extracts context from incoming headers
// when using standard HTTP instrumentation.
// Any startActiveObservation call will be a child of the extracted trace.
await startActiveObservation("service-b-process", async () => {
updateActiveObservation({ input: req.body });
const result = await processData(req.body);
updateActiveObservation({ output: result });
res.json(result);
});
});
// src/config/langfuse.ts
type Environment = "development" | "staging" | "production";
const configs: Record<Environment, {
exportIntervalMillis: number;
maxExportBatchSize: number;
sampleRate: number;
}> = {
development: {
exportIntervalMillis: 1000, // Immediate visibility
maxExportBatchSize: 1,
sampleRate: 1.0, // Trace everything
},
staging: {
exportIntervalMillis: 5000,
maxExportBatchSize: 25,
sampleRate: 0.5, // 50% sampling
},
production: {
exportIntervalMillis: 10000,
maxExportBatchSize: 100,
sampleRate: 0.1, // 10% sampling
},
};
export function getTracingConfig() {
const env = (process.env.NODE_ENV || "development") as Environment;
return configs[env] || configs.development;
}
When Langfuse is unavailable, the app must keep running:
// The v4+ SDK with OTel handles this gracefully:
// - Failed exports are logged but don't throw
// - Events are buffered in the queue
// - Queue drops oldest events when maxQueueSize is exceeded
//
// For additional safety at the application level:
import { observe, updateActiveObservation } from "@langfuse/tracing";
let tracingHealthy = true;
let consecutiveFailures = 0;
const MAX_FAILURES = 10;
export function safeTrace<T extends (...args: any[]) => Promise<any>>(
name: string,
fn: T
): T {
return (async (...args: Parameters<T>) => {
if (!tracingHealthy) {
return fn(...args); // Circuit breaker open
}
try {
const result = await observe({ name }, async () => {
updateActiveObservation({ input: args });
const r = await fn(...args);
updateActiveObservation({ output: r });
return r;
})();
consecutiveFailures = 0;
return result;
} catch (error) {
consecutiveFailures++;
if (consecutiveFailures >= MAX_FAILURES) {
tracingHealthy = false;
console.error("Langfuse tracing disabled (circuit breaker open)");
// Re-enable after 5 minutes
setTimeout(() => { tracingHealthy = true; consecutiveFailures = 0; }, 300000);
}
return fn(...args);
}
}) as T;
}
| Decision | Starter | Growth | Enterprise |
|---|---|---|---|
| Langfuse host | Cloud | Cloud or Self-hosted | Self-hosted (HA) |
| SDK version | v4+ | v4+ | v4+ with custom processor |
| Sampling | 100% | 50-100% | 5-20% + error always |
| Context propagation | Not needed | AsyncLocalStorage | OTel + HTTP headers |
| Queue buffer | SDK internal | SDK internal | External (SQS/Kafka) |
| Failover | None | Log-and-continue | Circuit breaker |
| Issue | Cause | Solution |
|---|---|---|
| Multiple SDK instances | No singleton | Centralize in tracing.ts module |
| Lost traces on deploy | No SIGTERM handler | Register shutdown handler |
| Cross-service trace gaps | No context propagation | Inject OTel traceparent header |
| Scale bottleneck | Direct SDK at high volume | Add queue buffer or increase sampling |