Cost optimization patterns for LLM API usage — model routing by task complexity, budget tracking, retry logic, and prompt caching.
From clarcnpx claudepluginhub marvinrichter/clarc --plugin clarcThis skill uses the workspace's default tool permissions.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Enables AI agents to execute x402 payments with per-task budgets, spending controls, and non-custodial wallets via MCP tools. Use when agents pay for APIs, services, or other agents.
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
Patterns for controlling LLM API costs while maintaining quality. Combines model routing, budget tracking, retry logic, and prompt caching into a composable pipeline.
Automatically select cheaper models for simple tasks, reserving expensive models for complex ones.
MODEL_SONNET = "claude-sonnet-latest" # Balanced tier — check anthropic.com/api for current ID
MODEL_HAIKU = "claude-haiku-latest" # Fast/lightweight tier — check anthropic.com/api for current ID
_SONNET_TEXT_THRESHOLD = 10_000 # chars
_SONNET_ITEM_THRESHOLD = 30 # items
def select_model(
text_length: int,
item_count: int,
force_model: str | None = None,
) -> str:
"""Select model based on task complexity."""
if force_model is not None:
return force_model
if text_length >= _SONNET_TEXT_THRESHOLD or item_count >= _SONNET_ITEM_THRESHOLD:
return MODEL_SONNET # Complex task
return MODEL_HAIKU # Simple task (3-4x cheaper)
Track cumulative spend with frozen dataclasses. Each API call returns a new tracker — never mutates state.
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class CostRecord:
model: str
input_tokens: int
output_tokens: int
cost_usd: float
@dataclass(frozen=True, slots=True)
class CostTracker:
budget_limit: float = 1.00
records: tuple[CostRecord, ...] = ()
def add(self, record: CostRecord) -> "CostTracker":
"""Return new tracker with added record (never mutates self)."""
return CostTracker(
budget_limit=self.budget_limit,
records=(*self.records, record),
)
@property
def total_cost(self) -> float:
return sum(r.cost_usd for r in self.records)
@property
def over_budget(self) -> bool:
return self.total_cost > self.budget_limit
Retry only on transient errors. Fail fast on authentication or bad request errors.
from anthropic import (
APIConnectionError,
InternalServerError,
RateLimitError,
)
_RETRYABLE_ERRORS = (APIConnectionError, RateLimitError, InternalServerError)
_MAX_RETRIES = 3
def call_with_retry(func, *, max_retries: int = _MAX_RETRIES):
"""Retry only on transient errors, fail fast on others."""
for attempt in range(max_retries):
try:
return func()
except _RETRYABLE_ERRORS:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
# AuthenticationError, BadRequestError etc. → raise immediately
Cache long system prompts to avoid resending them on every request.
Requirements:
What to cache:
What NOT to cache:
# Python SDK — mark stable sections with cache_control
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": system_prompt, # stable — cache this
"cache_control": {"type": "ephemeral"},
},
{
"type": "text",
"text": few_shot_examples, # stable — cache this too
"cache_control": {"type": "ephemeral"},
},
{
"type": "text",
"text": user_input, # variable — do NOT cache
},
],
}
]
# Verify caching worked — check response usage
response = client.messages.create(model=model, messages=messages, max_tokens=1024)
usage = response.usage
cache_read = getattr(usage, 'cache_read_input_tokens', 0)
cache_write = getattr(usage, 'cache_creation_input_tokens', 0)
print(f"Cache hit: {cache_read} tokens | Cache write: {cache_write} tokens")
// TypeScript SDK — same pattern
const response = await client.messages.create({
model,
max_tokens: 1024,
messages: [{
role: 'user',
content: [
{ type: 'text', text: systemPrompt, cache_control: { type: 'ephemeral' } },
{ type: 'text', text: userInput }, // variable — no cache
],
}],
});
const { cache_read_input_tokens, cache_creation_input_tokens } = response.usage;
Multi-turn conversations — cache the growing history:
# Cache all previous turns; only the latest user message is variable
def build_cached_conversation(history: list[dict], new_user_message: str) -> list[dict]:
if not history:
return [{"role": "user", "content": new_user_message}]
# Mark last assistant message in history as cacheable
cached_history = history[:-1] + [{
**history[-1],
"content": [
{"type": "text", "text": history[-1]["content"],
"cache_control": {"type": "ephemeral"}},
] if isinstance(history[-1]["content"], str) else history[-1]["content"],
}]
return cached_history + [{"role": "user", "content": new_user_message}]
Expected savings for a 2000-token system prompt at 1000 requests/day:
| Scenario | Daily input tokens | Daily cost (Sonnet) |
|---|---|---|
| No caching | 2,000,000 | $6.00 |
| With caching (90% hit rate) | 200,000 cache + 2M original writes | ~$0.90 |
| Savings | ~$5.10/day |
Combine all four techniques in a single pipeline function:
def process(text: str, config: Config, tracker: CostTracker) -> tuple[Result, CostTracker]:
# 1. Route model
model = select_model(len(text), estimated_items, config.force_model)
# 2. Check budget
if tracker.over_budget:
raise BudgetExceededError(tracker.total_cost, tracker.budget_limit)
# 3. Call with retry + caching
response = call_with_retry(lambda: client.messages.create(
model=model,
messages=build_cached_messages(system_prompt, text),
))
# 4. Track cost (immutable)
record = CostRecord(model=model, input_tokens=..., output_tokens=..., cost_usd=...)
tracker = tracker.add(record)
return parse_result(response), tracker
| Model | Input ($/1M tokens) | Output ($/1M tokens) | Relative Cost |
|---|---|---|---|
| Claude Haiku (fast tier) | $0.80 | $4.00 | 1x |
| Claude Sonnet (balanced tier) | $3.00 | $15.00 | ~4x |
| Claude Opus (most capable tier) | $15.00 | $75.00 | ~19x |