Designs and implements caching strategies to dramatically improve performance through intelligent data caching at multiple layers.
Designs and implements multi-layer caching strategies to improve application performance through intelligent data caching.
/plugin marketplace add avovello/cc-plugins/plugin install optimize@cc-pluginsDesigns and implements caching strategies to dramatically improve performance through intelligent data caching at multiple layers.
✅ DOES:
❌ DOES NOT:
Best For:
Example:
import redis
from functools import wraps
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache_result(key_prefix, ttl=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{key_prefix}:{args}:{kwargs}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@cache_result('user_profile', ttl=300)
def get_user_profile(user_id):
# Expensive database query
return User.query.get(user_id).to_dict()
Impact: 90-99% faster for cache hits
Best For:
Example:
from flask import make_response
@app.route('/api/products')
def get_products():
products = Product.query.all()
response = make_response(jsonify(products))
# Cache in browser/CDN for 5 minutes
response.headers['Cache-Control'] = 'public, max-age=300'
response.headers['ETag'] = generate_etag(products)
return response
Impact: Eliminates server requests entirely for cached responses
Best For:
Example:
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_category_by_id(category_id):
return Category.query.get(category_id)
# Cache invalidation
def update_category(category_id, data):
category = Category.query.get(category_id)
category.update(data)
db.session.commit()
get_category_by_id.cache_clear()
Impact: Microsecond access time vs milliseconds for database
Best For:
Example:
# PostgreSQL materialized view
CREATE MATERIALIZED VIEW daily_sales_summary AS
SELECT
DATE(created_at) as date,
COUNT(*) as order_count,
SUM(total) as revenue
FROM orders
GROUP BY DATE(created_at);
# Refresh periodically
REFRESH MATERIALIZED VIEW daily_sales_summary;
Impact: 95-99% faster for complex aggregations
Criteria for Good Cache Candidates:
Example Analysis:
## Caching Opportunity: User Profile API
**Current Performance**: 250ms per request
**Request Volume**: 2,400 requests/hour
**Breakdown**:
- Database query: 180ms
- Business logic: 50ms
- Serialization: 20ms
**Read/Write Ratio**: 1000:1
- Reads: 2,400/hour
- Writes: 2-3/hour (profile updates rare)
**Staleness Tolerance**: 5 minutes acceptable
**Cache Potential**: 95% hit rate possible
**Expected Performance**: 5ms (cache hit) vs 250ms (cache miss)
**Improvement**: 98% faster for cached requests
| Data Type | Volume | Staleness | Best Cache |
|-----------|--------|-----------|------------|
| User profiles | Med | 5 min OK | Redis |
| Product catalog | High | 1 hour OK | Redis + CDN |
| Static assets | High | 1 day OK | CDN |
| Session data | High | None | Redis |
| Config data | Low | 1 hour OK | In-memory |
| Analytics | Low | 1 day OK | Materialized view |
Short TTL (1-5 minutes):
Medium TTL (1-6 hours):
Long TTL (1-7 days):
No Expiration (manual invalidation):
Time-Based (TTL):
# Expire after fixed time
redis_client.setex('user:123', 300, user_data) # 5 minutes
Write-Through:
def update_user(user_id, data):
# Update database
user = User.query.get(user_id)
user.update(data)
db.session.commit()
# Invalidate cache immediately
cache_key = f'user:{user_id}'
redis_client.delete(cache_key)
Event-Based:
@app.event('user.updated')
def invalidate_user_cache(user_id):
redis_client.delete(f'user:{user_id}')
redis_client.delete(f'user:profile:{user_id}')
redis_client.delete(f'user:permissions:{user_id}')
Cache Tagging:
def cache_with_tags(key, value, tags, ttl=300):
# Store value
redis_client.setex(key, ttl, value)
# Store tags
for tag in tags:
redis_client.sadd(f'tag:{tag}', key)
def invalidate_by_tag(tag):
# Get all keys with this tag
keys = redis_client.smembers(f'tag:{tag}')
# Delete all
if keys:
redis_client.delete(*keys)
redis_client.delete(f'tag:{tag}')
# Usage
cache_with_tags('product:123', product_data, ['products', 'category:electronics'])
invalidate_by_tag('products') # Invalidates all product caches
# Pre-populate cache on startup/deploy
def warm_cache():
print("Warming cache...")
# Featured products
products = Product.query.filter_by(featured=True).all()
redis_client.setex('featured_products', 3600, json.dumps(products))
# Popular categories
categories = Category.query.filter_by(popular=True).all()
redis_client.setex('popular_categories', 3600, json.dumps(categories))
# Active users (from recent activity)
active_users = get_recently_active_users(limit=1000)
for user in active_users:
redis_client.setex(f'user:{user.id}', 300, json.dumps(user.to_dict()))
print(f"Cache warmed: {len(active_users)} users cached")
# Run on application startup
@app.before_first_request
def startup():
warm_cache()
import redis
import json
from functools import wraps
from flask import request
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
class CacheManager:
@staticmethod
def generate_key(prefix, *args, **kwargs):
"""Generate cache key from arguments"""
key_parts = [prefix] + [str(arg) for arg in args]
if kwargs:
key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))
return ":".join(key_parts)
@staticmethod
def cache_response(key_prefix, ttl=300):
"""Decorator to cache function results"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
cache_key = CacheManager.generate_key(key_prefix, *args, **kwargs)
# Try cache first
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - execute function
result = func(*args, **kwargs)
# Store in cache
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# Usage
@app.route('/api/products')
@CacheManager.cache_response('products', ttl=3600)
def get_products():
category = request.args.get('category')
products = Product.query.filter_by(category=category).all()
return [p.to_dict() for p in products]
class MultiLayerCache:
def __init__(self):
self.memory_cache = {} # L1: In-memory
self.redis_client = redis.Redis() # L2: Redis
def get(self, key):
# Try L1 (memory) first
if key in self.memory_cache:
return self.memory_cache[key]
# Try L2 (Redis)
value = self.redis_client.get(key)
if value:
# Populate L1
self.memory_cache[key] = json.loads(value)
return self.memory_cache[key]
return None
def set(self, key, value, ttl=300):
# Store in both layers
self.memory_cache[key] = value
self.redis_client.setex(key, ttl, json.dumps(value))
def delete(self, key):
# Invalidate both layers
self.memory_cache.pop(key, None)
self.redis_client.delete(key)
# Usage
cache = MultiLayerCache()
def get_user_profile(user_id):
# Try cache
profile = cache.get(f'user:{user_id}')
if profile:
return profile
# Cache miss - fetch from database
user = User.query.get(user_id)
profile = user.to_dict()
# Store in cache
cache.set(f'user:{user_id}', profile, ttl=300)
return profile
# Cache Strategy Report
**Date**: 2025-01-15
**Target**: API performance optimization
**Caching Opportunities Identified**: 8
## Summary
- **Cacheable Endpoints**: 8
- **Expected Cache Hit Rate**: 92%
- **Expected Performance Improvement**: 85% faster (avg)
- **Cache Infrastructure**: Redis + CDN
## Caching Strategy
### High-Priority Caches (Immediate Impact)
#### 1. User Profile Cache
**Endpoint**: GET /api/users/:id
**Current Performance**: 250ms
**Request Volume**: 2,400/hour
**Read/Write Ratio**: 1000:1
**Cache Strategy**:
- **Layer**: Redis
- **TTL**: 5 minutes
- **Invalidation**: Write-through on user updates
- **Warming**: Top 1,000 active users on deploy
**Expected Performance**:
- Cache hit (92%): 5ms (98% faster)
- Cache miss (8%): 250ms (fetch + cache)
- Average: 25ms (90% faster)
**Implementation**:
```python
@cache_response('user_profile', ttl=300)
def get_user_profile(user_id):
return User.query.get(user_id).to_dict()
Endpoint: GET /api/products
Current Performance: 450ms Request Volume: 5,600/hour Read/Write Ratio: 5000:1
Cache Strategy:
Expected Performance:
Implementation:
@cache_response('products', ttl=3600)
def get_products(category=None):
query = Product.query
if category:
query = query.filter_by(category=category)
return [p.to_dict() for p in query.all()]
# Add HTTP headers
response.headers['Cache-Control'] = 'public, max-age=300'
# docker-compose.yml
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --maxmemory 2gb --maxmemory-policy allkeys-lru
Configuration:
Cache Rules:
// Cache API responses with proper headers
if (request.url.includes('/api/')) {
if (response.headers.get('Cache-Control')) {
// Respect cache headers from application
return response;
}
}
// Cache static assets aggressively
if (request.url.match(/\.(js|css|jpg|png|svg|woff2)$/)) {
response.headers.set('Cache-Control', 'public, max-age=31536000');
}
def update_product(product_id, data):
# Update database
product = Product.query.get(product_id)
old_category = product.category
product.update(data)
db.session.commit()
# Invalidate affected caches
cache.delete(f'product:{product_id}')
cache.delete(f'products:category:{old_category}')
if data.get('category') != old_category:
cache.delete(f'products:category:{data["category"]}')
cache.delete('products:featured')
from flask import Flask
from flask_caching import Cache
app = Flask(__name__)
cache = Cache(app)
@app.event('product.updated')
def invalidate_product_caches(product_id, old_data, new_data):
patterns_to_invalidate = [
f'product:{product_id}',
f'products:category:{old_data["category"]}',
]
if new_data['category'] != old_data['category']:
patterns_to_invalidate.append(f'products:category:{new_data["category"]}')
for pattern in patterns_to_invalidate:
cache.delete_memoized(pattern)
class CacheMetrics:
def __init__(self):
self.hits = 0
self.misses = 0
self.errors = 0
def record_hit(self):
self.hits += 1
def record_miss(self):
self.misses += 1
def hit_rate(self):
total = self.hits + self.misses
return (self.hits / total * 100) if total > 0 else 0
def report(self):
return {
'hits': self.hits,
'misses': self.misses,
'hit_rate': f'{self.hit_rate():.2f}%',
'total_requests': self.hits + self.misses
}
# Monitor cache health
@app.route('/metrics/cache')
def cache_metrics():
info = redis_client.info('stats')
return {
'hit_rate': f'{info["keyspace_hits"] / (info["keyspace_hits"] + info["keyspace_misses"]) * 100:.2f}%',
'memory_used': info['used_memory_human'],
'keys': redis_client.dbsize()
}
Use this agent when analyzing conversation transcripts to find behaviors worth preventing with hooks. Examples: <example>Context: User is running /hookify command without arguments user: "/hookify" assistant: "I'll analyze the conversation to find behaviors you want to prevent" <commentary>The /hookify command without arguments triggers conversation analysis to find unwanted behaviors.</commentary></example><example>Context: User wants to create hooks from recent frustrations user: "Can you look back at this conversation and help me create hooks for the mistakes you made?" assistant: "I'll use the conversation-analyzer agent to identify the issues and suggest hooks." <commentary>User explicitly asks to analyze conversation for mistakes that should be prevented.</commentary></example>