Implement LangChain callback and event handling for webhooks. Use when integrating with external systems, implementing streaming, or building event-driven LangChain applications. Trigger with phrases like "langchain callbacks", "langchain webhooks", "langchain events", "langchain streaming", "callback handler".
/plugin marketplace add jeremylongshore/claude-code-plugins-plus-skills/plugin install langchain-pack@claude-code-plugins-plusThis skill is limited to using the following tools:
Implement callback handlers and event-driven patterns for LangChain applications including streaming, webhooks, and real-time updates.
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
from typing import Any, Dict, List
import httpx
class WebhookCallbackHandler(BaseCallbackHandler):
"""Send events to external webhook."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
self.client = httpx.Client(timeout=10.0)
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs
) -> None:
"""Called when LLM starts."""
self._send_event("llm_start", {
"model": serialized.get("name"),
"prompt_count": len(prompts)
})
def on_llm_end(self, response, **kwargs) -> None:
"""Called when LLM completes."""
self._send_event("llm_end", {
"generations": len(response.generations),
"token_usage": response.llm_output.get("token_usage") if response.llm_output else None
})
def on_llm_error(self, error: Exception, **kwargs) -> None:
"""Called on LLM error."""
self._send_event("llm_error", {
"error_type": type(error).__name__,
"message": str(error)
})
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs
) -> None:
"""Called when chain starts."""
self._send_event("chain_start", {
"chain": serialized.get("name"),
"input_keys": list(inputs.keys())
})
def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None:
"""Called when chain completes."""
self._send_event("chain_end", {
"output_keys": list(outputs.keys())
})
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs
) -> None:
"""Called when tool starts."""
self._send_event("tool_start", {
"tool": serialized.get("name"),
"input_length": len(input_str)
})
def _send_event(self, event_type: str, data: Dict[str, Any]) -> None:
"""Send event to webhook."""
try:
self.client.post(self.webhook_url, json={
"event": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
print(f"Webhook error: {e}")
from langchain_core.callbacks import StreamingStdOutCallbackHandler
import asyncio
from typing import AsyncIterator
class StreamingWebSocketHandler(BaseCallbackHandler):
"""Stream tokens to WebSocket clients."""
def __init__(self, websocket):
self.websocket = websocket
self.queue = asyncio.Queue()
async def on_llm_new_token(self, token: str, **kwargs) -> None:
"""Called for each new token."""
await self.queue.put(token)
async def on_llm_end(self, response, **kwargs) -> None:
"""Signal end of stream."""
await self.queue.put(None)
async def stream_tokens(self) -> AsyncIterator[str]:
"""Iterate over streamed tokens."""
while True:
token = await self.queue.get()
if token is None:
break
yield token
# FastAPI WebSocket endpoint
from fastapi import WebSocket
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
handler = StreamingWebSocketHandler(websocket)
llm = ChatOpenAI(streaming=True, callbacks=[handler])
while True:
data = await websocket.receive_json()
# Start streaming in background
asyncio.create_task(chain.ainvoke(
{"input": data["message"]},
config={"callbacks": [handler]}
))
# Stream tokens to client
async for token in handler.stream_tokens():
await websocket.send_json({"token": token})
from fastapi import Request
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
@app.get("/chat/stream")
async def stream_chat(request: Request, message: str):
"""Stream response using Server-Sent Events."""
async def event_generator():
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_template("{input}")
chain = prompt | llm
async for chunk in chain.astream({"input": message}):
if await request.is_disconnected():
break
yield f"data: {chunk.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
@dataclass
class ChainEvent:
event_type: str
timestamp: datetime
data: Dict[str, Any]
@dataclass
class ChainTrace:
chain_id: str
events: List[ChainEvent] = field(default_factory=list)
start_time: datetime = None
end_time: datetime = None
class TraceAggregator(BaseCallbackHandler):
"""Aggregate all events for a chain execution."""
def __init__(self):
self.traces: Dict[str, ChainTrace] = {}
def on_chain_start(self, serialized, inputs, run_id, **kwargs):
self.traces[str(run_id)] = ChainTrace(
chain_id=str(run_id),
start_time=datetime.now()
)
self._add_event(run_id, "chain_start", {"inputs": inputs})
def on_chain_end(self, outputs, run_id, **kwargs):
self._add_event(run_id, "chain_end", {"outputs": outputs})
if str(run_id) in self.traces:
self.traces[str(run_id)].end_time = datetime.now()
def _add_event(self, run_id, event_type, data):
trace = self.traces.get(str(run_id))
if trace:
trace.events.append(ChainEvent(
event_type=event_type,
timestamp=datetime.now(),
data=data
))
def get_trace(self, run_id: str) -> ChainTrace:
return self.traces.get(run_id)
from langchain_openai import ChatOpenAI
webhook_handler = WebhookCallbackHandler("https://api.example.com/webhook")
llm = ChatOpenAI(
model="gpt-4o-mini",
callbacks=[webhook_handler]
)
# All LLM calls will trigger webhook events
response = llm.invoke("Hello!")
// JavaScript client
const eventSource = new EventSource('/chat/stream?message=Hello');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
document.getElementById('output').textContent += event.data;
};
| Error | Cause | Solution |
|---|---|---|
| Webhook Timeout | Slow endpoint | Increase timeout, use async |
| WebSocket Disconnect | Client closed | Handle disconnect gracefully |
| Event Queue Full | Too many events | Add queue size limit |
| SSE Timeout | Long response | Add keep-alive pings |
Use langchain-observability for comprehensive monitoring.
This skill should be used when the user asks to "create a hookify rule", "write a hook rule", "configure hookify", "add a hookify rule", or needs guidance on hookify rule syntax and patterns.
Create distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.