Use when building real-time chat interfaces, displaying incremental LLM responses, or streaming output from OpenAI, Anthropic, Google, or Ollama - async iteration with usage tracking works across all providers
Stream LLM responses chunk-by-chunk for real-time display. Use when building chat interfaces or processing incremental outputs from OpenAI, Anthropic, Google, or Ollama models.
/plugin marketplace add juanre/llmring/plugin install llmring@juanre-ai-toolsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
# With uv (recommended)
uv add llmring
# With pip
pip install llmring
Provider SDKs (install what you need):
uv add openai>=1.0 # OpenAI
uv add anthropic>=0.67 # Anthropic
uv add google-genai # Google Gemini
uv add ollama>=0.4 # Ollama
This skill covers:
LLMRing.chat_stream() - Stream response chunksStreamChunk - Individual chunk structureFirst, create your lockfile (see llmring:lockfile skill):
llmring lock init
llmring bind chatbot anthropic:claude-3-5-haiku-20241022
Then use streaming:
from llmring import LLMRing, LLMRequest, Message
from llmring.schemas import StreamChunk # Optional: for type hints
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # YOUR alias from llmring.lock
messages=[Message(role="user", content="Count to 10")]
)
# Stream response
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
print() # Newline after streaming
Stream a chat completion response as chunks.
Signature:
async def chat_stream(
request: LLMRequest,
profile: Optional[str] = None
) -> AsyncIterator[StreamChunk]
Parameters:
request (LLMRequest): Request configuration with messages and parametersprofile (str, optional): Profile name for environment-specific configurationReturns:
AsyncIterator[StreamChunk]: Async iterator yielding response chunksRaises:
ProviderNotFoundError: If provider is not configuredModelNotFoundError: If model is not availableProviderAuthenticationError: If API key is invalidProviderRateLimitError: If rate limit exceededExample:
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Write a haiku")]
)
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
A chunk of a streaming response.
Attributes:
delta (str): Text content in this chunkmodel (str): Model identifier (present in all chunks)finish_reason (str, optional): Why generation stopped (only in final chunk)usage (dict, optional): Token usage statistics (only in final chunk)tool_calls (list, optional): Tool calls being constructed (incremental)Example:
async for chunk in service.chat_stream(request):
print(f"Delta: '{chunk.delta}'")
if chunk.model:
print(f"Model: {chunk.model}")
if chunk.finish_reason:
print(f"Finished: {chunk.finish_reason}")
if chunk.usage:
print(f"Tokens: {chunk.usage}")
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Tell me a joke")]
)
# Print each chunk immediately
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
print() # Newline when done
The final chunk contains usage statistics. Capture them:
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Explain quantum computing")]
)
accumulated_usage = None
full_response = ""
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
full_response += chunk.delta
# Capture usage from final chunk
if chunk.usage:
accumulated_usage = chunk.usage
print() # Newline
if accumulated_usage:
print(f"\nTokens used: {accumulated_usage.get('total_tokens', 0)}")
print(f"Prompt tokens: {accumulated_usage.get('prompt_tokens', 0)}")
print(f"Completion tokens: {accumulated_usage.get('completion_tokens', 0)}")
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Write a story")]
)
chunks = []
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
chunks.append(chunk.delta)
# Reconstruct complete response
full_response = "".join(chunks)
print(f"\n\nFull response length: {len(full_response)} characters")
from llmring import LLMRing, LLMRequest, Message
import sys
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Describe the ocean")]
)
word_count = 0
async for chunk in service.chat_stream(request):
# Custom processing per chunk
sys.stdout.write(chunk.delta)
sys.stdout.flush()
# Count words in real-time
word_count += len(chunk.delta.split())
print(f"\n\nTotal words: {word_count}")
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
# Higher temperature for creative streaming
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Write a creative story")],
temperature=1.2
)
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
messages = [
Message(role="system", content="You are a helpful assistant."),
Message(role="user", content="What is Python?")
]
# First streaming response
request = LLMRequest(model="chatbot", # Your streaming alias messages=messages)
response_text = ""
print("Assistant: ", end="")
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
response_text += chunk.delta
print()
# Add to history
messages.append(Message(role="assistant", content=response_text))
# Second turn
messages.append(Message(role="user", content="Give me an example"))
request = LLMRequest(model="chatbot", # Your streaming alias messages=messages)
response_text = ""
print("Assistant: ", end="")
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
response_text += chunk.delta
print()
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
# Limit streaming response length
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Write a long essay")],
max_tokens=50 # Stop after 50 tokens
)
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
# Check finish_reason in final chunk
if chunk.finish_reason == "length":
print("\n[Response truncated due to max_tokens]")
from llmring import LLMRing, LLMRequest, Message
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Hello")]
)
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
# Final chunk has finish_reason
if chunk.finish_reason:
print(f"\nStream ended: {chunk.finish_reason}")
# finish_reason values:
# - "stop": Natural completion
# - "length": Hit max_tokens limit
# - "tool_calls": Model wants to call tools
from llmring import LLMRing, LLMRequest, Message
from llmring.exceptions import (
ProviderAuthenticationError,
ModelNotFoundError,
ProviderRateLimitError,
ProviderTimeoutError
)
async with LLMRing() as service:
try:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Hello")]
)
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
except ProviderAuthenticationError:
print("\nInvalid API key")
except ModelNotFoundError as e:
print(f"\nModel not available: {e}")
except ProviderRateLimitError as e:
print(f"\nRate limited - retry after {e.retry_after}s")
except ProviderTimeoutError:
print("\nRequest timed out")
except Exception as e:
print(f"\nStream error: {e}")
If updating UI, buffer chunks to avoid excessive redraws:
from llmring import LLMRing, LLMRequest, Message
import asyncio
async with LLMRing() as service:
request = LLMRequest(
model="chatbot", # Your streaming alias
messages=[Message(role="user", content="Write a paragraph")]
)
buffer = ""
last_update = asyncio.get_event_loop().time()
UPDATE_INTERVAL = 0.05 # Update UI every 50ms
async for chunk in service.chat_stream(request):
buffer += chunk.delta
# Update UI at intervals, not every chunk
now = asyncio.get_event_loop().time()
if now - last_update >= UPDATE_INTERVAL or chunk.finish_reason:
print(buffer, end="", flush=True)
buffer = ""
last_update = now
# DON'T DO THIS - output buffered, appears all at once
async for chunk in service.chat_stream(request):
print(chunk.delta, end="") # No flush!
Right: Always Flush
# DO THIS - see output in real-time
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
# DON'T DO THIS - usage only in final chunk
async for chunk in service.chat_stream(request):
if chunk.usage: # Only true once!
tokens = chunk.usage["total_tokens"]
Right: Accumulate Then Check
# DO THIS - capture usage from final chunk
accumulated_usage = None
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
if chunk.usage:
accumulated_usage = chunk.usage
# Use usage after streaming completes
if accumulated_usage:
print(f"\nTokens: {accumulated_usage['total_tokens']}")
# DON'T DO THIS - loses full response for history
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
# Can't add to conversation history!
Right: Accumulate for History
# DO THIS - keep full response for multi-turn
response_text = ""
async for chunk in service.chat_stream(request):
print(chunk.delta, end="", flush=True)
response_text += chunk.delta
# Now can add to history
messages.append(Message(role="assistant", content=response_text))
All providers support streaming with the same API:
| Provider | Streaming | Usage Stats | Notes |
|---|---|---|---|
| OpenAI | Yes | Final chunk | Fast, reliable |
| Anthropic | Yes | Final chunk | Large context support |
| Yes | Final chunk | 2M+ token context | |
| Ollama | Yes | Final chunk | Local models |
No code changes needed to switch between providers - same streaming API works for all.
llmring-chat - Basic non-streaming chatllmring-tools - Streaming with tool callsllmring-structured - Streaming structured outputllmring-lockfile - Configure model aliasesllmring-providers - Provider-specific optimizationsUse streaming when:
Use regular chat when:
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.
Applies Anthropic's official brand colors and typography to any sort of artifact that may benefit from having Anthropic's look-and-feel. Use it when brand colors or style guidelines, visual formatting, or company design standards apply.
Create beautiful visual art in .png and .pdf documents using design philosophy. You should use this skill when the user asks to create a poster, piece of art, design, or other static piece. Create original visual designs, never copying existing artists' work to avoid copyright violations.