From langchain-pack
Implements LangChain callback handlers for webhook event dispatching, SSE streaming, WebSocket integration, and tracing during chain/LLM/tool execution.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin langchain-packThis skill is limited to using the following tools:
Event-driven patterns for LangChain: custom callback handlers for lifecycle hooks, webhook dispatching, Server-Sent Events (SSE) for streaming, WebSocket integration, and event aggregation for tracing.
Implements Langfuse tracing for LLM calls via OpenAI wrappers, manual spans in RAG pipelines, streaming responses, and LangChain integration.
Handles SSE streaming events and async batch processing for Anthropic Claude API using Python. For real-time integrations and bulk result processing.
Designs LLM applications using LangChain 1.x and LangGraph for agents, state management, memory, tool integration, and workflows.
Share bugs, ideas, or general feedback.
Event-driven patterns for LangChain: custom callback handlers for lifecycle hooks, webhook dispatching, Server-Sent Events (SSE) for streaming, WebSocket integration, and event aggregation for tracing.
LangChain emits events at every stage of chain/agent execution. Custom handlers can observe, log, stream, or dispatch these events.
chain.invoke()
├── handleChainStart()
│ ├── handleLLMStart()
│ │ ├── handleLLMNewToken() // streaming
│ │ └── handleLLMEnd()
│ ├── handleToolStart()
│ │ └── handleToolEnd()
│ └── handleRetrieverStart()
│ └── handleRetrieverEnd()
└── handleChainEnd()
import { BaseCallbackHandler } from "@langchain/core/callbacks/base";
class WebhookHandler extends BaseCallbackHandler {
name = "WebhookHandler";
constructor(private webhookUrl: string) {
super();
}
async handleLLMStart(llm: any, prompts: string[]) {
await this.send("llm_start", {
model: llm?.id?.[2],
promptCount: prompts.length,
});
}
async handleLLMEnd(output: any) {
await this.send("llm_end", {
tokenUsage: output.llmOutput?.tokenUsage,
});
}
async handleLLMError(error: Error) {
await this.send("llm_error", {
error: error.message,
});
}
async handleToolStart(_tool: any, input: string) {
await this.send("tool_start", { input: input.slice(0, 200) });
}
async handleToolEnd(output: string) {
await this.send("tool_end", { output: output.slice(0, 200) });
}
private async send(event: string, data: Record<string, any>) {
try {
await fetch(this.webhookUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
event,
data,
timestamp: new Date().toISOString(),
}),
signal: AbortSignal.timeout(5000),
});
} catch (e) {
// Don't let webhook failures break the chain
console.warn(`Webhook error: ${e}`);
}
}
}
// Attach to model
const model = new ChatOpenAI({
model: "gpt-4o-mini",
callbacks: [new WebhookHandler("https://api.example.com/webhook")],
});
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";
const app = express();
app.use(express.json());
const chain = ChatPromptTemplate.fromTemplate("{input}")
.pipe(new ChatOpenAI({ model: "gpt-4o-mini", streaming: true }))
.pipe(new StringOutputParser());
app.post("/api/chat/stream", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
try {
const stream = await chain.stream({ input: req.body.input });
for await (const chunk of stream) {
if (res.destroyed) break; // client disconnected
res.write(`data: ${JSON.stringify({ text: chunk })}\n\n`);
}
res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
} catch (error: any) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
}
res.end();
});
// Browser JavaScript
async function streamChat(input: string) {
const response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ input }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split("\n").filter((l) => l.startsWith("data: "));
for (const line of lines) {
const data = JSON.parse(line.slice(6));
if (data.done) return;
if (data.text) document.getElementById("output")!.textContent += data.text;
}
}
}
import { WebSocketServer } from "ws";
import { ChatOpenAI } from "@langchain/openai";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";
const wss = new WebSocketServer({ port: 8080 });
const chain = ChatPromptTemplate.fromTemplate("{input}")
.pipe(new ChatOpenAI({ model: "gpt-4o-mini", streaming: true }))
.pipe(new StringOutputParser());
wss.on("connection", (ws) => {
ws.on("message", async (raw) => {
const { input } = JSON.parse(raw.toString());
try {
const stream = await chain.stream({ input });
for await (const chunk of stream) {
if (ws.readyState !== ws.OPEN) break;
ws.send(JSON.stringify({ type: "token", text: chunk }));
}
ws.send(JSON.stringify({ type: "done" }));
} catch (error: any) {
ws.send(JSON.stringify({ type: "error", message: error.message }));
}
});
});
interface TraceEvent {
event: string;
timestamp: number;
data: Record<string, any>;
runId: string;
}
class TraceAggregator extends BaseCallbackHandler {
name = "TraceAggregator";
events: TraceEvent[] = [];
handleChainStart(_chain: any, inputs: any, runId: string) {
this.log("chain_start", runId, { inputKeys: Object.keys(inputs) });
}
handleChainEnd(outputs: any, runId: string) {
this.log("chain_end", runId, { outputKeys: Object.keys(outputs) });
}
handleLLMStart(llm: any, _prompts: string[], runId: string) {
this.log("llm_start", runId, { model: llm?.id?.[2] });
}
handleLLMEnd(output: any, runId: string) {
this.log("llm_end", runId, {
tokens: output.llmOutput?.tokenUsage?.totalTokens,
});
}
private log(event: string, runId: string, data: Record<string, any>) {
this.events.push({ event, timestamp: Date.now(), data, runId });
}
getTrace() {
return {
events: this.events,
totalEvents: this.events.length,
durationMs: this.events.length > 1
? this.events[this.events.length - 1].timestamp - this.events[0].timestamp
: 0,
};
}
}
// Usage
const tracer = new TraceAggregator();
await chain.invoke({ input: "test" }, { callbacks: [tracer] });
console.log(tracer.getTrace());
| Error | Cause | Fix |
|---|---|---|
| Webhook timeout | Slow endpoint | Use AbortSignal.timeout(), make async |
| WebSocket disconnect | Client closed | Check ws.readyState before sending |
| SSE connection reset | Proxy timeout | Add keep-alive pings every 15s |
| Events not captured | Callback not passed | Add to callbacks array in invoke() |
Use langchain-observability for comprehensive production monitoring.