Implement multi-tier database caching with Redis, in-memory, and CDN layers...
Implements multi-tier database caching with Redis and in-memory layers to reduce query latency.
/plugin marketplace add jeremylongshore/claude-code-plugins-plus/plugin install database-cache-layer@claude-code-plugins-plusImplement production-grade multi-tier caching architecture for databases using Redis (distributed cache), in-memory caching (L1), and CDN (static assets) to reduce database load by 80-95%, improve query latency from 50ms to 1-5ms, and support horizontal scaling with cache-aside, write-through, and read-through patterns.
Use /caching when you need to:
DON'T use this when:
This command implements multi-tier caching with intelligent invalidation because:
Alternative considered: Read-through caching
Alternative considered: Database query result caching (pg_stat_statements)
Before running this command:
Define hierarchical cache keys for easy invalidation (e.g., user:123:profile).
Check cache first, query database on miss, populate cache with result.
Set appropriate TTL based on data freshness requirements and memory limits.
Invalidate cache on data updates using event listeners or explicit invalidation.
Track hit rate, miss rate, latency, and memory usage with Prometheus/Grafana.
The command generates:
caching/redis_client.py - Redis connection pool and wrappercaching/cache_decorator.py - Python decorator for automatic cachingcaching/cache_invalidation.js - Event-driven invalidation logiccaching/cache_monitoring.yml - Prometheus metrics and alertscaching/cache_warming.sql - SQL queries for cache preloading#!/usr/bin/env python3
"""
Production-ready multi-tier caching system with L1 (in-memory) and
L2 (Redis) caches, automatic invalidation, and performance monitoring.
"""
import redis
import pickle
from typing import Optional, Callable, Any
from functools import wraps
from datetime import timedelta
import time
import logging
from cachetools import TTLCache
import hashlib
import json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MultiTierCache:
"""
Two-tier caching system with L1 (in-memory) and L2 (Redis).
L1: Fast in-memory cache (1-5ms) for hot data
L2: Distributed Redis cache (5-10ms) shared across servers
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379/0",
l1_max_size: int = 1000,
l1_ttl_seconds: int = 60,
l2_ttl_seconds: int = 3600,
enabled: bool = True
):
"""
Initialize multi-tier cache.
Args:
redis_url: Redis connection URL
l1_max_size: Max entries in L1 cache
l1_ttl_seconds: L1 cache TTL (default: 1 minute)
l2_ttl_seconds: L2 cache TTL (default: 1 hour)
enabled: Enable/disable caching (useful for debugging)
"""
self.enabled = enabled
if not enabled:
logger.warning("Caching is disabled")
return
# L1: In-memory cache (per server)
self.l1_cache = TTLCache(maxsize=l1_max_size, ttl=l1_ttl_seconds)
self.l1_ttl = l1_ttl_seconds
# L2: Redis cache (distributed)
self.redis_client = redis.from_url(
redis_url,
decode_responses=False, # Store binary data
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
self.l2_ttl = l2_ttl_seconds
# Metrics
self.metrics = {
'l1_hits': 0,
'l1_misses': 0,
'l2_hits': 0,
'l2_misses': 0,
'db_queries': 0,
'errors': 0
}
def _generate_key(self, prefix: str, *args, **kwargs) -> str:
"""
Generate cache key from function arguments.
Args:
prefix: Cache key prefix (e.g., 'user:profile')
args: Positional arguments
kwargs: Keyword arguments
Returns:
Cache key string
"""
# Create deterministic key from arguments
key_parts = [str(arg) for arg in args]
key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
key_suffix = hashlib.md5(
"|".join(key_parts).encode()
).hexdigest()[:8]
return f"{prefix}:{key_suffix}"
def get(self, key: str) -> Optional[Any]:
"""
Get value from cache (checks L1 then L2).
Args:
key: Cache key
Returns:
Cached value or None if not found
"""
if not self.enabled:
return None
# Try L1 cache first
if key in self.l1_cache:
self.metrics['l1_hits'] += 1
logger.debug(f"L1 cache hit: {key}")
return self.l1_cache[key]
self.metrics['l1_misses'] += 1
# Try L2 cache (Redis)
try:
cached_data = self.redis_client.get(key)
if cached_data:
self.metrics['l2_hits'] += 1
logger.debug(f"L2 cache hit: {key}")
# Deserialize and populate L1 cache
value = pickle.loads(cached_data)
self.l1_cache[key] = value
return value
self.metrics['l2_misses'] += 1
return None
except redis.RedisError as e:
logger.error(f"Redis error: {e}")
self.metrics['errors'] += 1
return None
def set(
self,
key: str,
value: Any,
l1_ttl: Optional[int] = None,
l2_ttl: Optional[int] = None
) -> bool:
"""
Set value in both cache layers.
Args:
key: Cache key
value: Value to cache
l1_ttl: L1 TTL override (seconds)
l2_ttl: L2 TTL override (seconds)
Returns:
True if successful
"""
if not self.enabled:
return False
try:
# Store in L1 cache
self.l1_cache[key] = value
# Store in L2 cache (Redis)
serialized = pickle.dumps(value)
ttl = l2_ttl or self.l2_ttl
self.redis_client.setex(key, ttl, serialized)
logger.debug(f"Cached: {key} (TTL: {ttl}s)")
return True
except redis.RedisError as e:
logger.error(f"Failed to cache {key}: {e}")
self.metrics['errors'] += 1
return False
def delete(self, key: str) -> bool:
"""
Delete key from both cache layers.
Args:
key: Cache key to delete
Returns:
True if successful
"""
if not self.enabled:
return False
try:
# Delete from L1
self.l1_cache.pop(key, None)
# Delete from L2
self.redis_client.delete(key)
logger.info(f"Invalidated cache: {key}")
return True
except redis.RedisError as e:
logger.error(f"Failed to delete {key}: {e}")
self.metrics['errors'] += 1
return False
def delete_pattern(self, pattern: str) -> int:
"""
Delete all keys matching pattern (L2 only).
Args:
pattern: Redis key pattern (e.g., 'user:123:*')
Returns:
Number of keys deleted
"""
if not self.enabled:
return 0
try:
# Scan and delete matching keys
cursor = 0
deleted_count = 0
while True:
cursor, keys = self.redis_client.scan(
cursor,
match=pattern,
count=100
)
if keys:
deleted_count += self.redis_client.delete(*keys)
if cursor == 0:
break
# Clear L1 cache (simpler than pattern matching)
self.l1_cache.clear()
logger.info(f"Invalidated {deleted_count} keys matching: {pattern}")
return deleted_count
except redis.RedisError as e:
logger.error(f"Failed to delete pattern {pattern}: {e}")
self.metrics['errors'] += 1
return 0
def get_metrics(self) -> dict:
"""
Get cache performance metrics.
Returns:
Dictionary with hit rates and counts
"""
total_l1 = self.metrics['l1_hits'] + self.metrics['l1_misses']
total_l2 = self.metrics['l2_hits'] + self.metrics['l2_misses']
l1_hit_rate = (
self.metrics['l1_hits'] / total_l1 * 100
if total_l1 > 0 else 0
)
l2_hit_rate = (
self.metrics['l2_hits'] / total_l2 * 100
if total_l2 > 0 else 0
)
overall_hit_rate = (
(self.metrics['l1_hits'] + self.metrics['l2_hits']) /
(total_l1 + total_l2) * 100
if (total_l1 + total_l2) > 0 else 0
)
return {
'l1_hits': self.metrics['l1_hits'],
'l1_misses': self.metrics['l1_misses'],
'l1_hit_rate': round(l1_hit_rate, 2),
'l2_hits': self.metrics['l2_hits'],
'l2_misses': self.metrics['l2_misses'],
'l2_hit_rate': round(l2_hit_rate, 2),
'overall_hit_rate': round(overall_hit_rate, 2),
'db_queries': self.metrics['db_queries'],
'errors': self.metrics['errors']
}
# Global cache instance
cache = MultiTierCache()
def cached(
prefix: str,
l2_ttl: int = 3600,
invalidate_on_update: bool = False
):
"""
Decorator to automatically cache function results.
Args:
prefix: Cache key prefix
l2_ttl: Redis cache TTL (seconds)
invalidate_on_update: Auto-invalidate on data updates
Usage:
@cached('user:profile', l2_ttl=1800)
def get_user_profile(user_id: int):
return db.query(...).fetchone()
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
cache_key = cache._generate_key(prefix, *args, **kwargs)
# Try to get from cache
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Cache miss - call function
cache.metrics['db_queries'] += 1
result = func(*args, **kwargs)
# Cache result
cache.set(cache_key, result, l2_ttl=l2_ttl)
return result
return wrapper
return decorator
# Example usage with database queries
@cached('user:profile', l2_ttl=1800)
def get_user_profile(user_id: int):
"""
Get user profile with automatic caching.
First call: Database query (50ms)
Subsequent calls: L1 cache (1ms) or L2 cache (5ms)
"""
import psycopg2
conn = psycopg2.connect("postgresql://...")
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cur.fetchone()
@cached('user:orders', l2_ttl=600)
def get_user_orders(user_id: int, limit: int = 10):
"""Get user orders with caching."""
import psycopg2
conn = psycopg2.connect("postgresql://...")
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC LIMIT %s",
(user_id, limit)
)
return cur.fetchall()
def invalidate_user_cache(user_id: int):
"""
Invalidate all cached data for a user.
Call this after updating user data:
- User profile updates
- User orders/transactions
- User preferences
"""
cache.delete_pattern(f"user:{user_id}:*")
# Example: Invalidate cache on database update
def update_user_profile(user_id: int, **updates):
"""Update user profile and invalidate cache."""
import psycopg2
conn = psycopg2.connect("postgresql://...")
with conn.cursor() as cur:
# Update database
set_clause = ", ".join(f"{k} = %s" for k in updates.keys())
cur.execute(
f"UPDATE users SET {set_clause} WHERE id = %s",
(*updates.values(), user_id)
)
conn.commit()
# Invalidate cached data
invalidate_user_cache(user_id)
logger.info(f"Updated and invalidated cache for user {user_id}")
if __name__ == "__main__":
# Test caching performance
print("Testing cache performance...")
# First call (cache miss - database query)
start = time.time()
profile1 = get_user_profile(123)
db_time = (time.time() - start) * 1000
print(f"Database query: {db_time:.2f}ms")
# Second call (L1 cache hit)
start = time.time()
profile2 = get_user_profile(123)
cache_time = (time.time() - start) * 1000
print(f"L1 cache hit: {cache_time:.2f}ms")
print(f"Speedup: {db_time / cache_time:.1f}x")
# Print metrics
print("\nCache metrics:")
print(json.dumps(cache.get_metrics(), indent=2))
#!/usr/bin/env python3
"""
Cache warming strategy to preload hot data before traffic hits.
Reduces cold start latency and improves cache hit rate.
"""
import psycopg2
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CacheWarmer:
"""
Preload cache with frequently accessed data.
"""
def __init__(self, cache: MultiTierCache, db_conn_string: str):
"""
Initialize cache warmer.
Args:
cache: MultiTierCache instance
db_conn_string: Database connection string
"""
self.cache = cache
self.db_conn_string = db_conn_string
def warm_user_profiles(self, user_ids: list[int]) -> dict:
"""
Preload user profiles for given IDs.
Args:
user_ids: List of user IDs to warm
Returns:
Statistics (count, duration, errors)
"""
start_time = time.time()
stats = {'loaded': 0, 'errors': 0}
logger.info(f"Warming cache for {len(user_ids)} user profiles...")
with psycopg2.connect(self.db_conn_string) as conn:
with conn.cursor() as cur:
for user_id in user_ids:
try:
# Query user profile
cur.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,)
)
profile = cur.fetchone()
if profile:
# Cache profile
cache_key = f"user:profile:{user_id}"
self.cache.set(cache_key, profile, l2_ttl=1800)
stats['loaded'] += 1
except Exception as e:
logger.error(f"Error warming user {user_id}: {e}")
stats['errors'] += 1
duration = time.time() - start_time
stats['duration_seconds'] = duration
logger.info(
f"Cache warming complete: {stats['loaded']} profiles loaded "
f"in {duration:.2f}s ({stats['errors']} errors)"
)
return stats
def warm_top_products(self, limit: int = 100) -> dict:
"""
Preload most popular products.
Args:
limit: Number of top products to warm
Returns:
Statistics
"""
start_time = time.time()
stats = {'loaded': 0, 'errors': 0}
logger.info(f"Warming cache for top {limit} products...")
with psycopg2.connect(self.db_conn_string) as conn:
with conn.cursor() as cur:
# Get top products by view count
cur.execute("""
SELECT p.*
FROM products p
JOIN product_analytics a ON a.product_id = p.id
ORDER BY a.view_count DESC
LIMIT %s
""", (limit,))
products = cur.fetchall()
for product in products:
try:
product_id = product[0] # Assuming ID is first column
cache_key = f"product:detail:{product_id}"
self.cache.set(cache_key, product, l2_ttl=3600)
stats['loaded'] += 1
except Exception as e:
logger.error(f"Error warming product: {e}")
stats['errors'] += 1
duration = time.time() - start_time
stats['duration_seconds'] = duration
logger.info(
f"Product cache warming complete: {stats['loaded']} products loaded "
f"in {duration:.2f}s"
)
return stats
def warm_all_hot_data(self) -> dict:
"""
Warm all hot data concurrently.
Returns:
Combined statistics
"""
logger.info("Starting full cache warm...")
# Identify hot data (most accessed in last 24 hours)
with psycopg2.connect(self.db_conn_string) as conn:
with conn.cursor() as cur:
# Get hot user IDs
cur.execute("""
SELECT DISTINCT user_id
FROM access_logs
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY user_id
ORDER BY COUNT(*) DESC
LIMIT 1000
""")
hot_user_ids = [row[0] for row in cur.fetchall()]
# Warm caches concurrently
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(self.warm_user_profiles, hot_user_ids): 'users',
executor.submit(self.warm_top_products, 100): 'products'
}
results = {}
for future in as_completed(futures):
cache_type = futures[future]
try:
results[cache_type] = future.result()
except Exception as e:
logger.error(f"Error warming {cache_type}: {e}")
return results
# Scheduled cache warming (run via cron or scheduler)
if __name__ == "__main__":
from multitiercache import cache
warmer = CacheWarmer(
cache=cache,
db_conn_string="postgresql://user:pass@localhost/db"
)
# Warm cache (run every 30 minutes)
results = warmer.warm_all_hot_data()
print(f"Cache warm complete: {results}")
| Error | Cause | Solution |
|---|---|---|
| "Redis connection refused" | Redis server down or unreachable | Implement graceful degradation (bypass cache, query database directly) |
| "Out of memory" (Redis) | Cache size exceeds max memory | Configure eviction policy (maxmemory-policy allkeys-lru) or increase memory |
| "Pickle deserialization error" | Cached object structure changed | Version cache keys when data models change, invalidate old caches |
| "Cache stampede" | Many requests miss cache simultaneously | Use locking or probabilistic early expiration to prevent thundering herd |
| "Stale data returned" | TTL too long or invalidation missed | Reduce TTL, implement event-driven invalidation on updates |
Caching Patterns
Eviction Policies (Redis)
TTL Strategies
DO:
DON'T:
/database-connection-pooler - Optimize connections when cache is unavailable/database-health-monitor - Monitor cache hit rate and database load/sql-query-optimizer - Optimize queries that are cache misses/database-security-scanner - Audit sensitive data in cache