From cohere-pack
Handles Cohere streaming chat events via SSE for text deltas, tool calls, citations; registers RAG data connectors. For streaming UIs and event processing.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin cohere-packThis skill is limited to using the following tools:
Handle Cohere's streaming chat events (SSE), tool-call events, citation events, and register data connectors for RAG. Cohere does not use traditional webhooks — its event model is streaming-based.
Builds multi-step tool-using agents with Cohere Chat API v2, including TypeScript examples for function definitions, executors, and chat loops.
Handles SSE streaming events and async batch processing for Anthropic Claude API using Python. For real-time integrations and bulk result processing.
Implements LangChain callback handlers for webhook event dispatching, SSE streaming, WebSocket integration, and tracing during chain/LLM/tool execution.
Share bugs, ideas, or general feedback.
Handle Cohere's streaming chat events (SSE), tool-call events, citation events, and register data connectors for RAG. Cohere does not use traditional webhooks — its event model is streaming-based.
cohere-ai SDK v7+Cohere's chatStream returns a stream of typed events:
import { CohereClientV2 } from 'cohere-ai';
const cohere = new CohereClientV2();
async function handleStream(userMessage: string) {
const stream = await cohere.chatStream({
model: 'command-a-03-2025',
messages: [{ role: 'user', content: userMessage }],
});
for await (const event of stream) {
switch (event.type) {
// Text content streaming
case 'content-start':
console.log('--- Generation started ---');
break;
case 'content-delta':
const text = event.delta?.message?.content?.text ?? '';
process.stdout.write(text);
break;
case 'content-end':
console.log('\n--- Generation complete ---');
break;
// Citation events (when using documents)
case 'citation-start':
console.log('Citation:', event.delta?.message?.citations);
break;
// Tool call events (when using tools)
case 'tool-call-start':
console.log('Tool call started:', event.delta?.message?.toolCalls?.function?.name);
break;
case 'tool-call-delta':
// Streaming tool arguments
break;
case 'tool-call-end':
console.log('Tool call complete');
break;
// Message lifecycle
case 'message-start':
console.log('Message ID:', event.id);
break;
case 'message-end':
console.log('Finish reason:', event.delta?.finishReason);
console.log('Usage:', event.delta?.usage);
break;
}
}
}
async function streamRAG(query: string, docs: string[]) {
const stream = await cohere.chatStream({
model: 'command-a-03-2025',
messages: [{ role: 'user', content: query }],
documents: docs.map((text, i) => ({
id: `doc-${i}`,
data: { text },
})),
});
let fullText = '';
const citations: Array<{ start: number; end: number; text: string; sources: string[] }> = [];
for await (const event of stream) {
if (event.type === 'content-delta') {
const chunk = event.delta?.message?.content?.text ?? '';
fullText += chunk;
process.stdout.write(chunk);
}
if (event.type === 'citation-start') {
const cite = event.delta?.message?.citations;
if (cite) {
citations.push({
start: cite.start,
end: cite.end,
text: cite.text,
sources: cite.sources?.map((s: any) => s.id) ?? [],
});
}
}
}
return { fullText, citations };
}
const tools = [{
type: 'function' as const,
function: {
name: 'get_price',
description: 'Get stock price',
parameters: {
type: 'object' as const,
properties: { ticker: { type: 'string' } },
required: ['ticker'],
},
},
}];
async function streamToolUse(query: string) {
const stream = await cohere.chatStream({
model: 'command-a-03-2025',
messages: [{ role: 'user', content: query }],
tools,
});
let currentToolName = '';
let currentToolArgs = '';
for await (const event of stream) {
switch (event.type) {
case 'tool-call-start':
currentToolName = event.delta?.message?.toolCalls?.function?.name ?? '';
currentToolArgs = '';
console.log(`Calling tool: ${currentToolName}`);
break;
case 'tool-call-delta':
currentToolArgs += event.delta?.message?.toolCalls?.function?.arguments ?? '';
break;
case 'tool-call-end':
console.log(`Tool args: ${currentToolArgs}`);
// Execute tool here, then send results back
break;
case 'content-delta':
process.stdout.write(event.delta?.message?.content?.text ?? '');
break;
}
}
}
// Express endpoint that proxies Cohere stream as SSE
import express from 'express';
const app = express();
app.use(express.json());
app.post('/api/chat/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const cohere = new CohereClientV2();
try {
const stream = await cohere.chatStream({
model: 'command-a-03-2025',
messages: req.body.messages,
});
for await (const event of stream) {
if (event.type === 'content-delta') {
const text = event.delta?.message?.content?.text ?? '';
res.write(`data: ${JSON.stringify({ type: 'text', text })}\n\n`);
}
if (event.type === 'citation-start') {
res.write(`data: ${JSON.stringify({ type: 'citation', data: event.delta })}\n\n`);
}
if (event.type === 'message-end') {
res.write(`data: ${JSON.stringify({ type: 'done', usage: event.delta?.usage })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
} catch (err) {
res.write(`data: ${JSON.stringify({ type: 'error', message: String(err) })}\n\n`);
res.end();
}
});
// Register a custom data source for RAG queries
// Connectors allow Cohere to fetch documents from your APIs
// Create a connector
const connector = await cohere.connectors.create({
name: 'internal-docs',
url: 'https://api.yourapp.com/search',
description: 'Internal documentation search',
});
// Use connector in chat for automatic retrieval
const response = await cohere.chat({
model: 'command-a-03-2025',
messages: [{ role: 'user', content: 'How do I reset my password?' }],
connectors: [{ id: connector.connector.id }],
});
// List registered connectors
const connectors = await cohere.connectors.list();
console.log('Registered connectors:', connectors.connectors.length);
Connector endpoint contract: Your URL receives POST { query: string } and must return { results: [{ id, text, title?, url? }] }.
| Event | When | Contains |
|---|---|---|
message-start | Stream begins | Message ID |
content-start | Text generation starts | Content index |
content-delta | Each text token | Text chunk |
content-end | Text generation ends | - |
citation-start | Citation found | Source, position |
tool-call-start | Tool call begins | Tool name |
tool-call-delta | Tool args streaming | Argument chunk |
tool-call-end | Tool call complete | - |
message-end | Stream ends | Finish reason, usage |
| Issue | Cause | Solution |
|---|---|---|
| Stream drops mid-response | Network timeout | Set higher timeout, add reconnect |
| No citation events | No documents passed | Include documents param |
| Tool events but no content | Tool call in progress | Wait for tool results, re-stream |
| Connector returns empty | Bad search endpoint | Test endpoint with curl |
For performance optimization, see cohere-performance-tuning.