Deploy production recommendation systems with feature stores, caching, A/B testing. Use for personalization APIs, low latency serving, or encountering cache invalidation, experiment tracking, quality monitoring issues.
/plugin marketplace add secondsky/claude-skills/plugin install recommendation-system@claude-skillsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
references/ab-testing-framework.mdreferences/caching-strategies.mdreferences/monitoring-alerting.mdreferences/production-architecture.mdProduction-ready architecture for scalable recommendation systems with feature stores, multi-tier caching, A/B testing, and comprehensive monitoring.
Load this skill when:
# 1. Install dependencies
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0
# 2. Start Redis (for caching and feature store)
docker run -d -p 6379:6379 redis:alpine
# 3. Create recommendation service: app.py
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)
class RecommendationResponse(BaseModel):
user_id: str
items: List[str]
cached: bool
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
# Check cache
cache_key = f"recs:{user_id}:{n}"
cached = cache.get(cache_key)
if cached:
return RecommendationResponse(
user_id=user_id,
items=json.loads(cached),
cached=True
)
# Generate recommendations (simplified)
items = [f"item_{i}" for i in range(n)]
# Cache for 5 minutes
cache.setex(cache_key, 300, json.dumps(items))
return RecommendationResponse(
user_id=user_id,
items=items,
cached=False
)
@app.get("/health")
async def health():
return {"status": "healthy"}
EOF
# 4. Run API
uvicorn app:app --host 0.0.0.0 --port 8000
# 5. Test
curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"
Result: Working recommendation API with caching in under 5 minutes.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User Events │────▶│ Feature │────▶│ Model │
│ (clicks, │ │ Store │ │ Serving │
│ purchases) │ │ (Redis) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Training │ │ API │
│ Pipeline │ │ (FastAPI) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Monitoring │
│ (Prometheus)│
└─────────────┘
Centralized storage for user and item features:
import redis
import json
class FeatureStore:
"""Fast feature access with Redis caching."""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 hour
def get_user_features(self, user_id: str) -> dict:
cache_key = f"user_features:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from database
features = fetch_from_db(user_id)
# Cache
self.redis.setex(cache_key, self.ttl, json.dumps(features))
return features
Serve multiple models for A/B testing:
class ModelServing:
"""Serve multiple recommendation models."""
def __init__(self):
self.models = {}
def register_model(self, name: str, model, is_default: bool = False):
self.models[name] = model
if is_default:
self.default_model = name
def predict(self, user_features: dict, item_features: list, model_name: str = None):
model = self.models.get(model_name or self.default_model)
return model.predict(user_features, item_features)
Multi-tier caching for low latency:
class TieredCache:
"""L1 (memory) -> L2 (Redis) -> L3 (database)."""
def __init__(self, redis_client):
self.l1_cache = {} # In-memory
self.redis = redis_client # L2
def get(self, key: str):
# L1: In-memory (fastest)
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis
cached = self.redis.get(key)
if cached:
value = json.loads(cached)
self.l1_cache[key] = value # Promote to L1
return value
# L3: Miss (fetch from database)
return None
| Metric | Description | Target |
|---|---|---|
| CTR | Click-through rate | >5% |
| Conversion Rate | Purchases from recs | >2% |
| P95 Latency | 95th percentile response time | <200ms |
| Cache Hit Rate | % served from cache | >80% |
| Coverage | % of catalog recommended | >50% |
| Diversity | Variety in recommendations | >0.7 |
Problem: No recommendations for users without history, poor initial experience.
Solution: Use popularity-based fallback:
def get_recommendations(user_id: str, n: int = 10):
user_features = feature_store.get_user_features(user_id)
# Check if new user (no purchase history)
if user_features.get('total_purchases', 0) == 0:
# Fallback to popular items
return get_popular_items(n)
# Personalized recommendations
return generate_personalized_recs(user_id, n)
Problem: User makes purchase, cache still shows purchased item in recommendations.
Solution: Invalidate cache on relevant actions:
INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}
def on_user_action(user_id: str, action: str):
if action in INVALIDATING_ACTIONS:
cache_key = f"recs:{user_id}:*"
redis_client.delete(cache_key)
logger.info(f"Invalidated cache for {user_id} due to {action}")
Problem: Many users' caches expire simultaneously, overload database/model.
Solution: Add random jitter to TTL:
import random
def set_cache(key: str, value: dict, base_ttl: int = 300):
# Add ±10% jitter
jitter = random.uniform(-0.1, 0.1) * base_ttl
ttl = int(base_ttl + jitter)
redis_client.setex(key, ttl, json.dumps(value))
Problem: Recommendations too similar, users only see same category.
Solution: Implement diversity constraint:
def rank_with_diversity(items: list, scores: list, n: int = 10):
selected = []
category_counts = {}
for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
category = item['category']
# Limit 3 items per category
if category_counts.get(category, 0) >= 3:
continue
selected.append(item)
category_counts[category] = category_counts.get(category, 0) + 1
if len(selected) >= n:
break
return selected
Problem: Recommendation quality drops, nobody notices until users complain.
Solution: Continuous monitoring with alerts:
from prometheus_client import Counter, Histogram
recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
start = time.time()
recs = generate_recs(user_id)
latency = time.time() - start
recommendation_latency.observe(latency)
return recs
@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
recommendation_clicks.inc()
# Alert if CTR drops below 3%
Problem: User preferences change but features don't update, recommendations irrelevant.
Solution: Set appropriate TTLs and update triggers:
class FeatureStore:
def __init__(self, redis_client):
self.redis = redis_client
# Shorter TTL for frequently changing features
self.user_ttl = 300 # 5 minutes
self.item_ttl = 3600 # 1 hour
def update_on_event(self, user_id: str, event: str):
# Invalidate on important events
if event in ['purchase', 'rating']:
self.redis.delete(f"user_features:{user_id}")
logger.info(f"Refreshed features for {user_id}")
Problem: Declare winner too early, results not statistically significant.
Solution: Calculate required sample size first:
def calculate_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""Calculate required sample size per variant."""
from scipy import stats
z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)
p_avg = (p1 + p2) / 2
n = (
(z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
(p2 - p1)**2
)
return int(n)
# Example: detect 10% lift with baseline CTR=5%
n_required = calculate_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10
)
print(f"Required sample size: {n_required} per variant")
# Wait until both variants reach this size before concluding
Load reference files for detailed production implementations:
Production Architecture: Load references/production-architecture.md for complete FeatureStore, ModelServing, and RecommendationService implementations with batch fetching, caching integration, and FastAPI deployment patterns.
Caching Strategies: Load references/caching-strategies.md when implementing multi-tier caching (L1/L2/L3), cache warming, invalidation strategies, probabilistic refresh, or thundering herd prevention.
A/B Testing Framework: Load references/ab-testing-framework.md for deterministic variant assignment, Thompson sampling (multi-armed bandits), Bayesian and frequentist significance testing, and experiment tracking.
Monitoring & Alerting: Load references/monitoring-alerting.md for Prometheus metrics integration, dashboard endpoints, alert rules, and quality monitoring (diversity, coverage).
class RecommendationService:
def __init__(self, feature_store, model_serving, cache):
self.feature_store = feature_store
self.model_serving = model_serving
self.cache = cache
def get_recommendations(self, user_id: str, n: int = 10):
# 1. Check cache
cached = self.cache.get(f"recs:{user_id}:{n}")
if cached:
return cached
# 2. Get features
user_features = self.feature_store.get_user_features(user_id)
candidates = self.get_candidates(user_id)
# 3. Score candidates
scores = self.model_serving.predict(user_features, candidates)
# 4. Rank with diversity
recommendations = self.rank_with_diversity(candidates, scores, n)
# 5. Cache
self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)
return recommendations
def assign_variant(user_id: str, experiment_id: str) -> str:
"""Deterministic assignment - same user always gets same variant."""
import hashlib
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# 50/50 split
return 'control' if hash_value % 2 == 0 else 'treatment'
# Usage
variant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)
from prometheus_client import Counter, Histogram
requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
with latency_seconds.time():
try:
recs = generate_recs(user_id)
requests_total.labels(status='success').inc()
return recs
except Exception as e:
requests_total.labels(status='error').inc()
raise
Use when working with Payload CMS projects (payload.config.ts, collections, fields, hooks, access control, Payload API). Use when debugging validation errors, security issues, relationship queries, transactions, or hook behavior.
Applies Anthropic's official brand colors and typography to any sort of artifact that may benefit from having Anthropic's look-and-feel. Use it when brand colors or style guidelines, visual formatting, or company design standards apply.
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.