From langfuse-pack
Implements Langfuse tracing for LLM calls via OpenAI wrappers, manual spans in RAG pipelines, streaming responses, and LangChain integration.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin langfuse-packThis skill is limited to using the following tools:
End-to-end tracing of LLM calls, chains, and agents. Covers the OpenAI drop-in wrapper, manual tracing with `startActiveObservation`, RAG pipeline instrumentation, streaming response tracking, and LangChain integration.
Provides expertise in Langfuse LLM observability: tracing, prompt management, evaluations, datasets. Integrates with LangChain, LlamaIndex, OpenAI for debugging and monitoring production LLM apps.
Provides Langfuse SDK patterns for singleton clients, observe wrappers, nested traces, session tracking, and OTel integration for LLM observability in Node.js apps.
Provides expertise on Langfuse open-source LLM observability platform for tracing, prompt management, evaluation, datasets, and integrations with LangChain, LlamaIndex, OpenAI. For debugging and monitoring LLM apps.
Share bugs, ideas, or general feedback.
End-to-end tracing of LLM calls, chains, and agents. Covers the OpenAI drop-in wrapper, manual tracing with startActiveObservation, RAG pipeline instrumentation, streaming response tracking, and LangChain integration.
langfuse-install-auth setupnpm install openai)@langfuse/openai, @langfuse/tracing, @langfuse/otel, @opentelemetry/sdk-nodeimport OpenAI from "openai";
import { observeOpenAI } from "@langfuse/openai";
// Wrap the OpenAI client -- all calls are now traced automatically
const openai = observeOpenAI(new OpenAI());
// Every call captures: model, input, output, tokens, latency, cost
const response = await openai.chat.completions.create({
model: "gpt-4o",
messages: [
{ role: "system", content: "You are a helpful assistant." },
{ role: "user", content: "What is Langfuse?" },
],
});
// Add metadata to traces
const res = await observeOpenAI(new OpenAI(), {
generationName: "product-description",
generationMetadata: { feature: "onboarding" },
sessionId: "session-abc",
userId: "user-123",
tags: ["production", "onboarding"],
}).chat.completions.create({
model: "gpt-4o-mini",
messages: [{ role: "user", content: "Describe this product" }],
});
import { startActiveObservation, updateActiveObservation } from "@langfuse/tracing";
async function ragPipeline(query: string) {
return await startActiveObservation("rag-pipeline", async () => {
updateActiveObservation({ input: { query }, metadata: { pipeline: "rag-v2" } });
// Span: Query embedding
const embedding = await startActiveObservation("embed-query", async () => {
updateActiveObservation({ input: { text: query } });
const vector = await embedText(query);
updateActiveObservation({
output: { dimensions: vector.length },
metadata: { model: "text-embedding-3-small" },
});
return vector;
});
// Span: Vector search
const documents = await startActiveObservation("vector-search", async () => {
updateActiveObservation({ input: { dimensions: embedding.length } });
const docs = await searchVectorDB(embedding);
updateActiveObservation({
output: { documentCount: docs.length, topScore: docs[0]?.score },
});
return docs;
});
// Generation: LLM call with context
const answer = await startActiveObservation(
{ name: "generate-answer", asType: "generation" },
async () => {
updateActiveObservation({
model: "gpt-4o",
input: { query, context: documents.map((d) => d.content) },
});
const result = await generateAnswer(query, documents);
updateActiveObservation({
output: result.content,
usage: {
promptTokens: result.usage.prompt_tokens,
completionTokens: result.usage.completion_tokens,
},
});
return result.content;
}
);
updateActiveObservation({ output: { answer } });
return answer;
});
}
import { Langfuse } from "langfuse";
const langfuse = new Langfuse();
async function ragPipeline(query: string) {
const trace = langfuse.trace({
name: "rag-pipeline",
input: { query },
metadata: { pipeline: "rag-v1" },
});
const embedSpan = trace.span({ name: "embed-query", input: { text: query } });
const embedding = await embedText(query);
embedSpan.end({ output: { dimensions: embedding.length } });
const searchSpan = trace.span({ name: "vector-search" });
const documents = await searchVectorDB(embedding);
searchSpan.end({ output: { count: documents.length, topScore: documents[0]?.score } });
const generation = trace.generation({
name: "generate-answer",
model: "gpt-4o",
modelParameters: { temperature: 0.7, maxTokens: 500 },
input: { query, context: documents.map((d) => d.content) },
});
const answer = await generateAnswer(query, documents);
generation.end({
output: answer.content,
usage: {
promptTokens: answer.usage.prompt_tokens,
completionTokens: answer.usage.completion_tokens,
totalTokens: answer.usage.total_tokens,
},
});
trace.update({ output: { answer: answer.content } });
await langfuse.flushAsync();
return answer.content;
}
import OpenAI from "openai";
import { observeOpenAI } from "@langfuse/openai";
// The wrapper handles streaming automatically
const openai = observeOpenAI(new OpenAI());
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: "Tell me a story" }],
stream: true,
stream_options: { include_usage: true }, // Required for token tracking
});
let fullContent = "";
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || "";
fullContent += content;
process.stdout.write(content);
}
// Token usage and latency are captured automatically by the wrapper
import Anthropic from "@anthropic-ai/sdk";
import { startActiveObservation, updateActiveObservation } from "@langfuse/tracing";
const anthropic = new Anthropic();
async function callClaude(prompt: string) {
return await startActiveObservation(
{ name: "claude-call", asType: "generation" },
async () => {
updateActiveObservation({
model: "claude-sonnet-4-20250514",
input: [{ role: "user", content: prompt }],
});
const response = await anthropic.messages.create({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
});
updateActiveObservation({
output: response.content[0].text,
usage: {
promptTokens: response.usage.input_tokens,
completionTokens: response.usage.output_tokens,
},
});
return response.content[0].text;
}
);
}
from langfuse.callback import CallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
langfuse_handler = CallbackHandler()
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{input}"),
])
chain = prompt | llm
# All LangChain operations are automatically traced
result = chain.invoke(
{"input": "What is Langfuse?"},
config={"callbacks": [langfuse_handler]},
)
| Issue | Cause | Solution |
|---|---|---|
| Missing generations | OpenAI wrapper not applied | Use observeOpenAI() from @langfuse/openai |
| Orphaned spans | Missing end or callback finish | Use startActiveObservation (auto-ends) or .end() in finally |
| No token usage on stream | Stream usage not requested | Add stream_options: { include_usage: true } |
| Flat trace (no nesting) | Missing OTel context | Ensure NodeSDK is started with LangfuseSpanProcessor |
For evaluation and scoring workflows, see langfuse-core-workflow-b.