Implement Mistral AI rate limiting, backoff, and request management. Use when handling rate limit errors, implementing retry logic, or optimizing API request throughput for Mistral AI. Trigger with phrases like "mistral rate limit", "mistral throttling", "mistral 429", "mistral retry", "mistral backoff".
From mistral-packnpx claudepluginhub nickloveinvesting/nick-love-plugins --plugin mistral-packThis skill is limited to using the following tools:
Guides Next.js Cache Components and Partial Prerendering (PPR) with cacheComponents enabled. Implements 'use cache', cacheLife(), cacheTag(), revalidateTag(), static/dynamic optimization, and cache debugging.
Migrates code, prompts, and API calls from Claude Sonnet 4.0/4.5 or Opus 4.1 to Opus 4.5, updating model strings on Anthropic, AWS, GCP, Azure platforms.
Details PluginEval's skill quality evaluation: 3 layers (static, LLM judge), 10 dimensions, rubrics, formulas, anti-patterns, badges. Use to interpret scores, improve triggering, calibrate thresholds.
Rate limit management for Mistral AI API. Mistral enforces per-minute request and token limits that vary by model tier and subscription plan.
| Model | Requests/min | Tokens/min | Tokens/month |
|---|---|---|---|
| mistral-small | 120 | 500,000 | Varies by plan |
| mistral-medium | 60 | 500,000 | Varies by plan |
| mistral-large | 30 | 200,000 | Varies by plan |
| mistral-embed | 300 | 1,000,000 | Varies by plan |
Mistral limits both requests and tokens per minute. Track both.
import time, threading
class MistralRateLimiter:
def __init__(self, rpm: int = 60, tpm: int = 200000): # 200000 = 200K limit
self.rpm = rpm
self.tpm = tpm
self.request_times = []
self.token_usage = []
self.lock = threading.Lock()
def wait_if_needed(self, estimated_tokens: int = 1000): # 1000: 1 second in ms
with self.lock:
now = time.time()
cutoff = now - 60
self.request_times = [t for t in self.request_times if t > cutoff]
self.token_usage = [(t, n) for t, n in self.token_usage if t > cutoff]
current_rpm = len(self.request_times)
current_tpm = sum(n for _, n in self.token_usage)
if current_rpm >= self.rpm:
sleep_time = self.request_times[0] - cutoff
time.sleep(sleep_time + 0.1)
if current_tpm + estimated_tokens > self.tpm:
sleep_time = self.token_usage[0][0] - cutoff
time.sleep(sleep_time + 0.1)
self.request_times.append(time.time())
def record_usage(self, tokens: int):
with self.lock:
self.token_usage.append((time.time(), tokens))
# Usage
limiter = MistralRateLimiter(rpm=30, tpm=200000)
def rate_limited_chat(client, messages, model="mistral-large-latest"):
estimated = sum(len(m["content"]) // 4 for m in messages)
limiter.wait_if_needed(estimated)
response = client.chat.complete(model=model, messages=messages)
limiter.record_usage(response.usage.total_tokens)
return response
import time
def chat_with_retry(client, messages, model, max_retries=5):
for attempt in range(max_retries):
try:
return client.chat.complete(model=model, messages=messages)
except Exception as e:
if hasattr(e, 'status_code') and e.status_code == 429: # HTTP 429 Too Many Requests
wait = min(2 ** attempt + 1, 60)
print(f"Rate limited, waiting {wait}s (attempt {attempt+1})")
time.sleep(wait)
else:
raise
raise Exception("Max retries exceeded")
Route requests to cheaper models when premium capacity is exhausted.
class ModelRouter:
def __init__(self):
self.limiters = {
"mistral-large-latest": MistralRateLimiter(rpm=30, tpm=200000), # 200000 = 200K limit
"mistral-small-latest": MistralRateLimiter(rpm=120, tpm=500000), # 500000 = configured value
}
def get_available_model(self, preferred: str = "mistral-large-latest") -> str:
limiter = self.limiters[preferred]
if limiter.has_capacity():
return preferred
# Fall back to smaller model with more capacity
return "mistral-small-latest"
def batch_embed(client, texts: list[str], batch_size: int = 32):
limiter = MistralRateLimiter(rpm=300, tpm=1000000) # 1000000: 300: timeout: 5 minutes
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
estimated_tokens = sum(len(t) // 4 for t in batch)
limiter.wait_if_needed(estimated_tokens)
response = client.embeddings.create(model="mistral-embed", inputs=batch)
all_embeddings.extend([d.embedding for d in response.data])
limiter.record_usage(response.usage.total_tokens)
return all_embeddings
| Issue | Cause | Solution |
|---|---|---|
| 429 errors | Exceeded RPM or TPM | Use rate limiter, exponential backoff |
| Inconsistent limits | Different limits per model | Configure limiter per model tier |
| Batch embedding failures | Too many tokens per batch | Reduce batch size |
| Spike traffic blocked | No smoothing | Queue requests, spread over time |
status = {
"rpm_used": len(limiter.request_times),
"rpm_limit": limiter.rpm,
"tpm_used": sum(n for _, n in limiter.token_usage),
"tpm_limit": limiter.tpm,
"utilization_pct": len(limiter.request_times) / limiter.rpm * 100
}