From latestaiagents
Deploy RAG systems to production with proper architecture, monitoring, and operations. Use this skill when deploying RAG to production, scaling retrieval systems, implementing RAG monitoring, or building enterprise RAG infrastructure. Activate when: production RAG, RAG deployment, enterprise RAG, RAG architecture, RAG monitoring, RAG scaling, RAG infrastructure, RAG ops.
npx claudepluginhub latestaiagents/agent-skills --plugin skills-authoringThis skill uses the workspace's default tool permissions.
**Everything you need to deploy RAG systems that are reliable, scalable, and maintainable.**
Implements retrieval-augmented generation (RAG) systems for knowledge-intensive apps, document search, Q&A, and grounding LLMs in external data. Covers embeddings, vector stores, retrieval pipelines, evaluation, with cost/prerequisite checks.
Designs end-to-end RAG architectures for use cases like customer support chatbots, documentation Q&A, legal search, and code assistance, covering ingestion pipelines, retrieval strategies, quality metrics, and scaling.
Share bugs, ideas, or general feedback.
Everything you need to deploy RAG systems that are reliable, scalable, and maintainable.
Document pipeline defined
Index refresh strategy
Data quality
Retrieval strategy selected
Embeddings
Vector store
LLM configuration
Prompt engineering
┌─────────────────────────────────────────────────────────────┐
│ Load Balancer │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ API Server │ │ API Server │ │ API Server │
│ (Stateless) │ │ (Stateless) │ │ (Stateless) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└───────────────┼───────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Redis Cache │ │ Vector Store │ │ LLM Gateway │
│ (Query Cache) │ │ (Retrieval) │ │ (Rate Limit) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
from abc import ABC, abstractmethod
class LLMProvider(ABC):
@abstractmethod
async def generate(self, prompt: str, **kwargs) -> str:
pass
class OpenAIProvider(LLMProvider):
async def generate(self, prompt: str, **kwargs) -> str:
# OpenAI implementation
pass
class AnthropicProvider(LLMProvider):
async def generate(self, prompt: str, **kwargs) -> str:
# Anthropic implementation
pass
class RAGService:
def __init__(self, llm_provider: LLMProvider, retriever):
self.llm = llm_provider
self.retriever = retriever
async def query(self, question: str) -> str:
docs = await self.retriever.ainvoke(question)
context = self._format_context(docs)
return await self.llm.generate(
f"Context: {context}\n\nQuestion: {question}"
)
import hashlib
import redis
import json
class RAGCache:
def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
def _cache_key(self, query: str) -> str:
"""Generate cache key from query."""
return f"rag:query:{hashlib.md5(query.encode()).hexdigest()}"
async def get_or_compute(
self,
query: str,
compute_fn: callable
) -> dict:
"""Get from cache or compute and store."""
key = self._cache_key(query)
# Try cache
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Compute
result = await compute_fn(query)
# Cache
self.redis.setex(key, self.ttl, json.dumps(result))
return result
class EmbeddingCache:
"""Cache embeddings to avoid recomputation."""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def get_embedding(self, text: str) -> list[float] | None:
key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
cached = self.redis.get(key)
return json.loads(cached) if cached else None
def set_embedding(self, text: str, embedding: list[float]):
key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
self.redis.setex(key, 86400 * 7, json.dumps(embedding)) # 7 days
class ResilientRAG:
"""RAG with comprehensive error handling."""
def __init__(self, primary_llm, fallback_llm, retriever):
self.primary_llm = primary_llm
self.fallback_llm = fallback_llm
self.retriever = retriever
async def query(self, question: str) -> dict:
try:
# Retrieval with timeout
docs = await asyncio.wait_for(
self.retriever.ainvoke(question),
timeout=5.0
)
except asyncio.TimeoutError:
logger.warning("Retrieval timeout, using empty context")
docs = []
except Exception as e:
logger.error(f"Retrieval error: {e}")
docs = []
if not docs:
return {
"answer": "I couldn't find relevant information to answer your question.",
"sources": [],
"status": "no_results"
}
context = self._format_context(docs)
# Try primary LLM
try:
answer = await asyncio.wait_for(
self.primary_llm.generate(question, context),
timeout=30.0
)
return {"answer": answer, "sources": docs, "status": "success"}
except Exception as e:
logger.warning(f"Primary LLM failed: {e}, trying fallback")
# Try fallback LLM
try:
answer = await self.fallback_llm.generate(question, context)
return {"answer": answer, "sources": docs, "status": "fallback"}
except Exception as e:
logger.error(f"Fallback LLM failed: {e}")
return {
"answer": "I'm having trouble generating a response. Please try again.",
"sources": docs,
"status": "error"
}
import structlog
logger = structlog.get_logger()
async def monitored_query(rag, query: str, user_id: str) -> dict:
"""Query with comprehensive logging."""
request_id = str(uuid.uuid4())
logger.info(
"rag_query_started",
request_id=request_id,
user_id=user_id,
query_length=len(query)
)
start = time.perf_counter()
try:
# Retrieval
retrieval_start = time.perf_counter()
docs = await rag.retriever.ainvoke(query)
retrieval_ms = (time.perf_counter() - retrieval_start) * 1000
logger.info(
"rag_retrieval_complete",
request_id=request_id,
num_docs=len(docs),
latency_ms=retrieval_ms
)
# Generation
gen_start = time.perf_counter()
answer = await rag.generate(query, docs)
gen_ms = (time.perf_counter() - gen_start) * 1000
total_ms = (time.perf_counter() - start) * 1000
logger.info(
"rag_query_complete",
request_id=request_id,
retrieval_ms=retrieval_ms,
generation_ms=gen_ms,
total_ms=total_ms,
answer_length=len(answer)
)
return {"answer": answer, "request_id": request_id}
except Exception as e:
logger.error(
"rag_query_failed",
request_id=request_id,
error=str(e),
error_type=type(e).__name__
)
raise
# Prometheus metrics example
from prometheus_client import Counter, Histogram, Gauge
# Counters
rag_queries_total = Counter(
'rag_queries_total',
'Total RAG queries',
['status'] # success, no_results, error
)
# Histograms
rag_latency = Histogram(
'rag_latency_seconds',
'RAG query latency',
['stage'], # retrieval, generation, total
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
# Gauges
rag_index_size = Gauge(
'rag_index_document_count',
'Number of documents in RAG index'
)
| Metric | Alert Threshold |
|---|---|
| P95 latency | >5 seconds |
| Error rate | >5% |
| Cache hit rate | <50% |
| Empty results rate | >20% |
| LLM fallback rate | >10% |
Input validation
Data protection
API security
class CostAwareRAG:
"""RAG with cost controls."""
def __init__(self, rag, monthly_budget: float):
self.rag = rag
self.budget = monthly_budget
self.month_spend = 0
async def query(self, question: str) -> dict:
# Check budget
if self.month_spend >= self.budget:
return {"error": "Monthly budget exceeded"}
# Estimate cost before query
estimated_cost = self._estimate_cost(question)
if self.month_spend + estimated_cost > self.budget:
# Use cheaper model
return await self.rag.query_cheap(question)
result = await self.rag.query(question)
self.month_spend += self._actual_cost(result)
return result
def _estimate_cost(self, question: str) -> float:
# Estimate based on typical token usage
input_tokens = len(question.split()) * 1.3
output_tokens = 500 # Assume average
return (input_tokens * 0.00001) + (output_tokens * 0.00003)