From openrouter-pack
Implements OpenRouter streaming with OpenAI SDK in Python and TypeScript for real-time chat UIs via SSE, reducing TTFT and capturing usage metrics.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin openrouter-packThis skill is limited to using the following tools:
OpenRouter supports Server-Sent Events (SSE) streaming via `stream: true`, compatible with the OpenAI SDK. Streaming returns tokens as they're generated, reducing time-to-first-token (TTFT) from seconds to milliseconds. Usage stats are available via `stream_options: {include_usage: true}` in the final chunk. This skill covers Python and TypeScript streaming, SSE forwarding to browsers, and erro...
Builds Anthropic workflows for streaming responses via SSE events and Message Batches API for bulk async requests. Python/TypeScript SDK examples for real-time UIs and 50% cheaper processing.
Optimizes OpenRouter API latency and throughput with Python benchmarking, streaming for lower TTFT, model selection, and concurrent requests for real-time apps.
Integrates TypeScript apps with OpenRouter's 300+ AI models via SDK packages for callModel agents, tools, streaming, OAuth, and API key management.
Share bugs, ideas, or general feedback.
OpenRouter supports Server-Sent Events (SSE) streaming via stream: true, compatible with the OpenAI SDK. Streaming returns tokens as they're generated, reducing time-to-first-token (TTFT) from seconds to milliseconds. Usage stats are available via stream_options: {include_usage: true} in the final chunk. This skill covers Python and TypeScript streaming, SSE forwarding to browsers, and error recovery.
import os
from openai import OpenAI
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.environ["OPENROUTER_API_KEY"],
default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
)
# Stream with usage stats
stream = client.chat.completions.create(
model="anthropic/claude-3.5-sonnet",
messages=[{"role": "user", "content": "Explain how HTTP streaming works"}],
max_tokens=500,
stream=True,
stream_options={"include_usage": True}, # Get token counts in final chunk
)
full_content = []
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
print(token, end="", flush=True)
full_content.append(token)
# Final chunk contains usage stats
if chunk.usage:
print(f"\n---\nTokens: {chunk.usage.prompt_tokens} in + {chunk.usage.completion_tokens} out")
result = "".join(full_content)
import time
def stream_with_metrics(messages, model="anthropic/claude-3.5-sonnet", **kwargs):
"""Stream response and capture performance metrics."""
start = time.monotonic()
first_token_time = None
chunks = []
usage = None
stream = client.chat.completions.create(
model=model, messages=messages, stream=True,
stream_options={"include_usage": True},
**kwargs,
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
if first_token_time is None:
first_token_time = (time.monotonic() - start) * 1000
chunks.append(token)
yield token # Yield each token as it arrives
if chunk.usage:
usage = {
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
}
total_time = (time.monotonic() - start) * 1000
# Metrics available after generator exhausted
stream_with_metrics.last_metrics = {
"ttft_ms": round(first_token_time or 0),
"total_ms": round(total_time),
"usage": usage,
"model": model,
}
# Usage
for token in stream_with_metrics(
[{"role": "user", "content": "Hello"}],
model="openai/gpt-4o-mini",
max_tokens=200,
):
print(token, end="", flush=True)
print(f"\nMetrics: {stream_with_metrics.last_metrics}")
import OpenAI from "openai";
const client = new OpenAI({
baseURL: "https://openrouter.ai/api/v1",
apiKey: process.env.OPENROUTER_API_KEY,
defaultHeaders: { "HTTP-Referer": "https://my-app.com", "X-Title": "my-app" },
});
async function streamCompletion(prompt: string, model = "openai/gpt-4o-mini") {
const stream = await client.chat.completions.create({
model,
messages: [{ role: "user", content: prompt }],
max_tokens: 500,
stream: true,
});
const chunks: string[] = [];
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content;
if (token) {
process.stdout.write(token);
chunks.push(token);
}
}
return chunks.join("");
}
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/v1/stream")
async def stream_endpoint(prompt: str, model: str = "openai/gpt-4o-mini"):
"""Forward OpenRouter SSE stream to browser."""
async def generate():
stream = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
stream=True,
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
// Consume SSE stream from your backend
async function streamChat(prompt) {
const response = await fetch("/v1/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
for (const line of text.split("\n")) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const data = JSON.parse(line.slice(6));
document.getElementById("output").textContent += data.token;
}
}
}
}
from openai import AsyncOpenAI
aclient = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.environ["OPENROUTER_API_KEY"],
default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
)
async def async_stream(messages, model="openai/gpt-4o-mini", **kwargs):
"""Async streaming for use in async web frameworks."""
stream = await aclient.chat.completions.create(
model=model, messages=messages, stream=True, **kwargs,
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
| Error | Cause | Fix |
|---|---|---|
| Stream cuts off mid-response | Network timeout or provider error | Save partial content; implement retry from last position |
Missing usage in stream | Didn't set stream_options | Add stream_options: {"include_usage": True} |
| Empty delta chunks | Keep-alive pings | Filter chunk.choices[0].delta.content is None |
finish_reason: "length" | Hit max_tokens limit | Increase max_tokens or continue with follow-up request |
stream_options: {"include_usage": True} to get token counts for cost tracking