From openrouter-pack
Distributes OpenRouter requests across multiple API keys and models using round-robin, health checks, and circuit breakers to scale beyond rate limits.
How this skill is triggered — by the user, by Claude, or both
Slash command
/openrouter-pack:openrouter-load-balancingThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
A single OpenRouter API key has rate limits (requests/minute and tokens/minute). To scale beyond those limits, distribute requests across multiple keys. OpenRouter also provides server-side load balancing via provider routing and the `:nitro` variant for low-latency inference. This skill covers multi-key rotation, health-based routing, circuit breakers, and concurrent request patterns.
A single OpenRouter API key has rate limits (requests/minute and tokens/minute). To scale beyond those limits, distribute requests across multiple keys. OpenRouter also provides server-side load balancing via provider routing and the :nitro variant for low-latency inference. This skill covers multi-key rotation, health-based routing, circuit breakers, and concurrent request patterns.
import os, itertools, time, logging
from openai import OpenAI, RateLimitError
from dataclasses import dataclass, field
log = logging.getLogger("openrouter.lb")
@dataclass
class KeyPool:
"""Round-robin API key pool with health tracking."""
keys: list[str]
_cycle: itertools.cycle = field(init=False, repr=False)
_health: dict[str, dict] = field(init=False, default_factory=dict)
def __post_init__(self):
self._cycle = itertools.cycle(self.keys)
self._health = {k: {"errors": 0, "last_error": 0, "healthy": True} for k in self.keys}
def next_key(self) -> str:
"""Get next healthy key."""
attempts = 0
while attempts < len(self.keys):
key = next(self._cycle)
h = self._health[key]
# Recover after 60s cooldown
if not h["healthy"] and time.time() - h["last_error"] > 60:
h["healthy"] = True
h["errors"] = 0
if h["healthy"]:
return key
attempts += 1
# All keys unhealthy -- return any and hope for the best
return next(self._cycle)
def mark_error(self, key: str):
h = self._health[key]
h["errors"] += 1
h["last_error"] = time.time()
if h["errors"] >= 3: # Circuit breaker: 3 errors → unhealthy
h["healthy"] = False
log.warning(f"Key {key[:12]}... marked unhealthy after {h['errors']} errors")
def mark_success(self, key: str):
self._health[key]["errors"] = 0
self._health[key]["healthy"] = True
pool = KeyPool(keys=[
os.environ.get("OPENROUTER_KEY_1", ""),
os.environ.get("OPENROUTER_KEY_2", ""),
os.environ.get("OPENROUTER_KEY_3", ""),
])
def balanced_completion(messages, model="anthropic/claude-3.5-sonnet", **kwargs):
"""Send request using next healthy key from the pool."""
key = pool.next_key()
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=key,
default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
)
try:
response = client.chat.completions.create(
model=model, messages=messages, **kwargs
)
pool.mark_success(key)
return response
except RateLimitError:
pool.mark_error(key)
# Retry with next key
return balanced_completion(messages, model, **kwargs)
import asyncio
from openai import AsyncOpenAI
async def parallel_completions(prompts: list[str], model="openai/gpt-4o-mini",
max_concurrent=5, **kwargs):
"""Process multiple prompts concurrently with rate limiting."""
semaphore = asyncio.Semaphore(max_concurrent)
client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.environ["OPENROUTER_API_KEY"],
default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
)
async def process_one(prompt: str):
async with semaphore:
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**kwargs,
)
return response.choices[0].message.content
return await asyncio.gather(*[process_one(p) for p in prompts])
# Usage
results = asyncio.run(parallel_completions(
["Summarize X", "Translate Y", "Analyze Z"],
max_concurrent=3,
max_tokens=500,
))
# OpenRouter can distribute across providers for the same model
response = client.chat.completions.create(
model="anthropic/claude-3.5-sonnet",
messages=[{"role": "user", "content": "Hello"}],
max_tokens=200,
extra_body={
"provider": {
# Let OpenRouter pick the best available provider
"order": ["Anthropic", "AWS Bedrock", "GCP Vertex"],
"allow_fallbacks": True,
},
},
)
import requests
def check_rate_limits(api_key: str) -> dict:
"""Check current rate limit status for a key."""
resp = requests.get(
"https://openrouter.ai/api/v1/auth/key",
headers={"Authorization": f"Bearer {api_key}"},
)
data = resp.json()["data"]
return {
"requests_limit": data["rate_limit"]["requests"],
"interval": data["rate_limit"]["interval"],
"credits_used": data["usage"],
"credits_limit": data.get("limit"),
}
# Check all keys in pool
for key in pool.keys:
limits = check_rate_limits(key)
print(f"Key {key[:12]}...: {limits}")
| Error | Cause | Fix |
|---|---|---|
| 429 on all keys | All keys rate-limited simultaneously | Add more keys; implement request queuing |
| Uneven load distribution | Round-robin not accounting for in-flight requests | Use weighted distribution based on current load |
| Key health false positive | Transient error marked key unhealthy | Use sliding window (3 errors in 60s) before marking unhealthy |
| Concurrent request failures | Too many parallel requests | Reduce semaphore limit; add backoff |
asyncio.Semaphore to control concurrency and prevent overwhelming the APInpx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin openrouter-packCheck OpenRouter rate limits via API queries and headers; implement retries with OpenAI SDK. Use for 429 errors, throttling, or high-volume API usage.
Invoke OpenRouter CLI for chat completions, embeddings, rerank, video generation, API key management, model browsing, credits checks, and scripted LLM calls with stable JSON output from shell, scripts, and agents.
Creates, edits, and optimizes skills for Claude Code, including drafting, evaluating with test prompts, iterating on performance, and improving skill descriptions for better triggering accuracy.