Skill

advanced-patterns

Install
1
Install the plugin
$
npx claudepluginhub jsamuelsen11/claude-config --plugin ccfg-redis

Want just this skill?

Add to a custom plugin, then install with one command.

Description

This skill should be used when implementing caching strategies, rate limiting, distributed locks, session management, job queues, or other Redis-based patterns.

Tool Access

This skill uses the workspace's default tool permissions.

Skill Content

Advanced Redis Patterns

These are comprehensive patterns for Redis-based architectural components, covering caching strategies, rate limiting algorithms, distributed lock implementations, session management, job queues, leaderboards, and other common Redis patterns. Following these patterns ensures correctness, fault tolerance, and production-grade reliability.

Existing Repository Compatibility

When implementing Redis patterns in existing projects, always respect established patterns and library choices before applying these preferences.

  • Audit existing patterns: Review how the project currently uses Redis before suggesting new patterns. An existing cache-aside implementation may have specific invalidation logic that must be preserved.
  • Library compatibility: If the project uses BullMQ for job queues, do not suggest replacing it with a custom stream-based queue. Work within the existing library's conventions.
  • Migration path: When improving an existing pattern (e.g., adding stampede protection to cache-aside), provide a migration path that does not break existing consumers.
  • Testing: All pattern changes must be testable. Provide test scenarios for success cases, failure cases, and race conditions.

These patterns apply primarily to new implementations and significant refactors. For existing systems, propose changes through proper change management processes.

Cache-Aside Pattern (Lazy Loading)

The most common caching pattern. Application code manages the cache explicitly: read from cache first, fetch from database on miss, write result to cache.

Implementation

# CORRECT: Cache-aside with TTL and error handling
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_product(product_id: int) -> dict:
    cache_key = f"myapp:cache:product:{product_id}"

    # Step 1: Check cache
    cached = r.get(cache_key)
    if cached is not None:
        return json.loads(cached)

    # Step 2: Cache miss - query database
    product = db.query("SELECT * FROM products WHERE id = %s", product_id)
    if product is None:
        # Cache negative result with short TTL to prevent cache penetration
        r.set(cache_key, json.dumps(None), ex=60)
        return None

    # Step 3: Store in cache with TTL
    r.set(cache_key, json.dumps(product), ex=3600)
    return product

def invalidate_product(product_id: int) -> None:
    """Invalidate cache on write (do not update cache directly)."""
    r.unlink(f"myapp:cache:product:{product_id}")
# WRONG: Cache-aside without TTL
def get_product_bad(product_id: int) -> dict:
    cached = r.get(f"myapp:cache:product:{product_id}")
    if cached:
        return json.loads(cached)
    product = db.query("SELECT * FROM products WHERE id = %s", product_id)
    r.set(f"myapp:cache:product:{product_id}", json.dumps(product))  # No TTL
    return product
    # Problem: Stale data lives forever, memory grows unbounded

# WRONG: Updating cache instead of invalidating on write
def update_product_bad(product_id: int, data: dict) -> None:
    db.execute("UPDATE products SET ... WHERE id = %s", product_id)
    product = db.query("SELECT * FROM products WHERE id = %s", product_id)
    r.set(f"myapp:cache:product:{product_id}", json.dumps(product), ex=3600)
    # Problem: Race condition between UPDATE and SELECT
    # Another process may UPDATE between our two queries, caching stale data

Cache Stampede Prevention

When a popular cache key expires, many concurrent requests may hit the database simultaneously.

# CORRECT: Lock-based stampede prevention
def get_with_lock(key: str, ttl: int, lock_ttl: int = 10) -> dict:
    """Cache-aside with lock-based stampede prevention."""
    cached = r.get(key)
    if cached is not None:
        return json.loads(cached)

    lock_key = f"myapp:lock:cache:{key}"
    acquired = r.set(lock_key, "1", nx=True, ex=lock_ttl)

    if acquired:
        try:
            value = expensive_database_query()
            r.set(key, json.dumps(value), ex=ttl)
            return value
        finally:
            r.unlink(lock_key)
    else:
        # Wait briefly and retry from cache
        time.sleep(0.1)
        cached = r.get(key)
        if cached is not None:
            return json.loads(cached)
        # Fallback: query database (lock holder may have failed)
        return expensive_database_query()
# CORRECT: Probabilistic early recomputation (PER)
def get_with_per(key: str, ttl: int, beta: float = 1.0) -> dict:
    """Cache with early recomputation before expiry."""
    cached = r.get(key)
    remaining = r.ttl(key)

    if cached is not None and remaining > 0:
        # As TTL approaches 0, probability of recomputation increases
        delta = ttl - remaining
        if delta > 0 and delta < beta * ttl * (-math.log(random.random())):
            return json.loads(cached)  # Not yet time to recompute

    value = expensive_database_query()
    r.set(key, json.dumps(value), ex=ttl)
    return value
# WRONG: No stampede protection
def get_stampede_vulnerable(key: str) -> dict:
    cached = r.get(key)
    if cached:
        return json.loads(cached)
    # 1000 concurrent requests all arrive here at once
    result = expensive_database_query()  # DB hit 1000 times
    r.set(key, json.dumps(result), ex=3600)
    return result

Write-Through Pattern

Every write goes to both cache and database synchronously.

# CORRECT: Write-through with database-first ordering
def create_user(user_data: dict) -> dict:
    # Step 1: Write to database first (source of truth)
    user = db.execute(
        "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING *",
        user_data['name'], user_data['email']
    )

    # Step 2: Write to cache after successful DB write
    cache_key = f"myapp:user:{user['id']}:profile"
    r.hset(cache_key, mapping={
        'id': str(user['id']),
        'name': user['name'],
        'email': user['email'],
    })
    r.expire(cache_key, 3600)
    return user
# WRONG: Write to cache before database
def create_user_bad(user_data: dict) -> dict:
    r.hset(f"myapp:user:new:profile", mapping=user_data)
    user = db.execute("INSERT INTO users ...", user_data)  # May fail!
    # Problem: Cache has phantom data if DB write fails
    return user

Write-Behind (Write-Back) Pattern

Writes go to cache immediately and are asynchronously persisted to the database.

# CORRECT: Write-behind with stream-based persistence queue
def update_inventory(product_id: int, quantity: int) -> None:
    # Step 1: Update cache immediately (fast path)
    r.hset(f"myapp:inventory:{product_id}", 'quantity', quantity)
    r.expire(f"myapp:inventory:{product_id}", 86400)

    # Step 2: Queue async write to database
    r.xadd('myapp:stream:db_writes', {
        'table': 'inventory',
        'product_id': str(product_id),
        'quantity': str(quantity),
        'timestamp': str(int(time.time())),
    }, maxlen=100000)

# Background worker persists writes
def process_db_writes():
    while True:
        entries = r.xreadgroup(
            'db_writers', 'worker-1',
            {'myapp:stream:db_writes': '>'},
            count=100, block=5000
        )
        for stream, messages in entries:
            for msg_id, data in messages:
                try:
                    db.execute("UPDATE inventory SET quantity = %s WHERE product_id = %s",
                               data['quantity'], data['product_id'])
                    r.xack('myapp:stream:db_writes', 'db_writers', msg_id)
                except Exception as e:
                    log.error(f"Write-behind persistence failed: {e}")
# WRONG: Write-behind without persistence queue
def update_inventory_bad(product_id: int, quantity: int) -> None:
    r.hset(f"myapp:inventory:{product_id}", 'quantity', quantity)
    # Problem: No persistence mechanism, data lost on Redis restart

Fixed Window Rate Limiter

Counts requests in fixed time windows. Simple but allows burst at window boundaries.

-- CORRECT: Atomic fixed window rate limiter (Lua script)
-- KEYS[1] = rate limit key
-- ARGV[1] = limit, ARGV[2] = window in seconds
local current = redis.call('INCR', KEYS[1])
if current == 1 then
    redis.call('EXPIRE', KEYS[1], ARGV[2])
end
if current > tonumber(ARGV[1]) then
    return 0  -- Rate limited
end
return 1  -- Allowed
# CORRECT: Python wrapper
def check_rate_limit_fixed(client_id: str, limit: int, window: int) -> bool:
    window_num = int(time.time() // window)
    key = f"myapp:rate:{client_id}:{window_num}"
    pipe = r.pipeline()
    pipe.incr(key)
    pipe.expire(key, window + 1)
    results = pipe.execute()
    return results[0] <= limit
# WRONG: Non-atomic rate limiter
def check_rate_bad(client_id: str, limit: int) -> bool:
    key = f"myapp:rate:{client_id}"
    current = r.get(key)           # Read
    if current and int(current) >= limit:
        return False
    r.incr(key)                     # Write
    return True
    # Problem: Race condition between read and write

Sliding Window Rate Limiter

Uses a sorted set to track individual request timestamps. Accurate without boundary burst.

# CORRECT: Sliding window with sorted set
def check_rate_limit_sliding(client_id: str, limit: int, window: int) -> bool:
    key = f"myapp:rate:sliding:{client_id}"
    now = time.time()
    window_start = now - window

    pipe = r.pipeline()
    pipe.zremrangebyscore(key, 0, window_start)  # Remove expired
    pipe.zcard(key)                               # Count remaining
    pipe.zadd(key, {f"{now}:{uuid.uuid4().hex[:8]}": now})  # Add current
    pipe.expire(key, window + 1)                  # Set TTL
    results = pipe.execute()

    current_count = results[1]
    if current_count >= limit:
        r.zrem(key, f"{now}:{uuid.uuid4().hex[:8]}")  # Remove the one we just added
        return False
    return True
# WRONG: Sliding window without cleanup
def check_rate_bad_sliding(client_id: str, limit: int) -> bool:
    key = f"myapp:rate:sliding:{client_id}"
    r.zadd(key, {str(time.time()): time.time()})
    count = r.zcard(key)  # Counts ALL entries including expired
    return count <= limit
    # Problem: Never removes old entries, sorted set grows forever

Token Bucket Rate Limiter

Allows controlled burst traffic up to bucket capacity while maintaining a steady average rate.

-- CORRECT: Token bucket (Lua script for atomicity)
-- KEYS[1] = bucket key
-- ARGV[1] = capacity, ARGV[2] = refill rate, ARGV[3] = now, ARGV[4] = tokens requested
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now

local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * refill_rate)

if tokens >= requested then
    tokens = tokens - requested
    redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
    redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
    return 1  -- Allowed
else
    redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
    redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
    return 0  -- Rate limited
end
Rate limiter comparison:
  Fixed window:    Simple, low memory, boundary burst possible
  Sliding window:  Accurate, higher memory (sorted set per client), no burst
  Token bucket:    Controlled burst, low memory, most flexible
  Recommendation:  Token bucket for API gateways, fixed window for per-user limits

Distributed Lock with SET NX EX

Single-Instance Lock

# CORRECT: Lock with owner verification and atomic release
class RedisLock:
    def __init__(self, redis_client, name: str, ttl: int = 30):
        self.redis = redis_client
        self.name = f"myapp:lock:{name}"
        self.ttl = ttl
        self.owner = str(uuid.uuid4())

    def acquire(self, timeout: int = 10) -> bool:
        deadline = time.time() + timeout
        while time.time() < deadline:
            if self.redis.set(self.name, self.owner, nx=True, ex=self.ttl):
                return True
            time.sleep(0.1)
        return False

    def release(self) -> bool:
        # Atomic: verify owner then delete
        script = """
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        end
        return 0
        """
        return self.redis.eval(script, 1, self.name, self.owner) == 1

    def extend(self, additional_ttl: int = None) -> bool:
        ttl = additional_ttl or self.ttl
        script = """
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('EXPIRE', KEYS[1], ARGV[2])
        end
        return 0
        """
        return self.redis.eval(script, 1, self.name, self.owner, ttl) == 1

# Usage
lock = RedisLock(r, "order:5001:process")
if lock.acquire():
    try:
        process_order(5001)
    finally:
        lock.release()
# WRONG: Lock without owner verification on release
def release_bad(name: str):
    r.delete(f"lock:{name}")
    # Problem: Any process can release any lock
    # Process A acquires, times out, Process B acquires, Process A releases B's lock

# WRONG: Lock without TTL
def acquire_bad(name: str) -> bool:
    return r.setnx(f"lock:{name}", "1")
    # Problem: If process crashes, lock held forever (deadlock)

# WRONG: Non-atomic check-and-delete
def release_race(name: str, owner: str):
    if r.get(f"lock:{name}") == owner:
        r.delete(f"lock:{name}")
    # Problem: Race between GET and DELETE

Redlock Algorithm

Distributed lock across multiple independent Redis instances for higher fault tolerance.

Redlock steps:
1. Record current time
2. Try to acquire lock on N instances (minimum 3, recommended 5)
3. Lock is acquired if majority (N/2 + 1) succeed AND elapsed time < TTL
4. Effective TTL = original TTL - elapsed time - clock drift
5. If failed: release lock on all instances

Requirements:
- N independent Redis instances (not replicas)
- Clock drift factor: 1% of TTL
- Retry with random delay to prevent split-brain

Fencing Tokens

Prevent stale lock holders from performing writes after expiry.

# CORRECT: Fencing token with monotonic counter
class FencedLock:
    def __init__(self, redis_client, name: str, ttl: int = 30):
        self.redis = redis_client
        self.name = f"myapp:lock:{name}"
        self.fence_key = f"myapp:fence:{name}"
        self.ttl = ttl
        self.owner = str(uuid.uuid4())

    def acquire(self) -> int | None:
        if self.redis.set(self.name, self.owner, nx=True, ex=self.ttl):
            return self.redis.incr(self.fence_key)
        return None

# Storage validates: reject writes with fence_token <= last_seen_token
Fencing flow:
1. Client A acquires lock, gets fence_token=33
2. Client A starts processing (slow), lock expires
3. Client B acquires lock, gets fence_token=34
4. Client B writes with token=34 -> Accepted
5. Client A writes with token=33 -> Rejected (33 < 34)

Pub/Sub vs Streams

Use Pub/Sub when:
  - Fire-and-forget messaging is acceptable
  - All subscribers must receive all messages (fan-out)
  - No message persistence needed
  - Cache invalidation, config updates, real-time notifications

Use Streams when:
  - Messages must be delivered reliably (at-least-once)
  - Consumer groups for load-balanced processing
  - Message acknowledgment and retry needed
  - Consumers may disconnect and resume later
  - Event sourcing, job queues, audit logs
# CORRECT: Pub/Sub for cache invalidation (fire-and-forget)
def invalidate_cache(entity_type: str, entity_id: int):
    r.publish(f"myapp:invalidate:{entity_type}", json.dumps({'id': entity_id}))

# WRONG: Pub/Sub for critical message delivery
r.publish("myapp:critical:payment", json.dumps({'order_id': 5001}))
# Problem: If no subscriber connected, message is lost forever
# CORRECT: Streams for reliable processing
processor = OrderEventProcessor(r, "processors", "worker-1")
processor.publish({'type': 'order_created', 'order_id': '5001'})

# Consumer loop with acknowledgment
for stream, messages in processor.consume():
    for msg_id, data in messages:
        try:
            handle_event(data)
            processor.ack(msg_id)
        except Exception:
            pass  # Stays in pending list for retry/claim

Job Queue Patterns

Stream-Based Queue

# CORRECT: Job queue with streams, consumer groups, and dead letter
class StreamQueue:
    def __init__(self, redis_client, name: str, group: str, consumer: str):
        self.redis = redis_client
        self.stream = f"myapp:queue:stream:{name}"
        self.group = group
        self.consumer = consumer
        try:
            self.redis.xgroup_create(self.stream, self.group, id='0', mkstream=True)
        except redis.ResponseError as e:
            if 'BUSYGROUP' not in str(e):
                raise

    def enqueue(self, job_type: str, data: dict) -> str:
        return self.redis.xadd(self.stream, {
            'type': job_type,
            'data': json.dumps(data),
        }, maxlen=500000)

    def dequeue(self, count: int = 1, block_ms: int = 5000) -> list:
        return self.redis.xreadgroup(
            self.group, self.consumer,
            {self.stream: '>'}, count=count, block=block_ms
        ) or []

    def ack(self, msg_id: str):
        self.redis.xack(self.stream, self.group, msg_id)

    def claim_stale(self, min_idle_ms: int = 60000):
        return self.redis.xautoclaim(
            self.stream, self.group, self.consumer,
            min_idle_time=min_idle_ms, start_id='0-0', count=10
        )

Priority Queue with Sorted Set

# CORRECT: Priority queue using sorted set
class PriorityQueue:
    def __init__(self, redis_client, name: str):
        self.redis = redis_client
        self.key = f"myapp:queue:priority:{name}"

    def enqueue(self, job_id: str, priority: int = 0):
        self.redis.zadd(self.key, {job_id: priority})

    def dequeue(self) -> str | None:
        result = self.redis.zpopmin(self.key)
        return result[0][0] if result else None

Session Storage Pattern

# CORRECT: Hash-based session with sliding expiry
class SessionStore:
    def __init__(self, redis_client, ttl: int = 1800):
        self.redis = redis_client
        self.prefix = "myapp:session"
        self.ttl = ttl

    def create(self, user_id: int, metadata: dict = None) -> str:
        import secrets
        token = secrets.token_urlsafe(32)
        key = f"{self.prefix}:{token}"
        pipe = self.redis.pipeline()
        pipe.hset(key, mapping={
            'user_id': str(user_id),
            'created_at': str(int(time.time())),
            'ip': metadata.get('ip', '') if metadata else '',
        })
        pipe.expire(key, self.ttl)
        pipe.sadd(f"{self.prefix}:user:{user_id}:active", token)
        pipe.execute()
        return token

    def get(self, token: str) -> dict | None:
        key = f"{self.prefix}:{token}"
        session = self.redis.hgetall(key)
        if not session:
            return None
        # Sliding expiry: renew TTL on access
        self.redis.expire(key, self.ttl)
        return session

    def destroy(self, token: str):
        key = f"{self.prefix}:{token}"
        session = self.redis.hgetall(key)
        if session:
            pipe = self.redis.pipeline()
            pipe.unlink(key)
            if 'user_id' in session:
                pipe.srem(f"{self.prefix}:user:{session['user_id']}:active", token)
            pipe.execute()
# WRONG: Session as JSON string without sliding expiry
def create_session_bad(user_id: int) -> str:
    token = secrets.token_urlsafe(32)
    r.set(f"session:{token}", json.dumps({'user_id': user_id}))
    # Problem: No TTL, no sliding expiry, full deserialize for any read
    return token

Leaderboard Pattern

# CORRECT: Real-time leaderboard with sorted sets
class Leaderboard:
    def __init__(self, redis_client, name: str):
        self.redis = redis_client
        self.key = f"myapp:leaderboard:{name}"

    def add_score(self, player: str, score: float):
        self.redis.zadd(self.key, {player: score})

    def increment(self, player: str, delta: float) -> float:
        return self.redis.zincrby(self.key, delta, player)

    def rank(self, player: str) -> int | None:
        return self.redis.zrevrank(self.key, player)

    def top(self, count: int = 10) -> list:
        return self.redis.zrevrange(self.key, 0, count - 1, withscores=True)

    def around(self, player: str, count: int = 5) -> list:
        rank = self.redis.zrevrank(self.key, player)
        if rank is None:
            return []
        return self.redis.zrevrange(
            self.key, max(0, rank - count), rank + count, withscores=True
        )
# WRONG: String keys per player
def add_score_bad(player: str, score: float):
    r.set(f"score:{player}", score)
    # Problem: No sorting, ranking requires scanning all keys

Circuit Breaker Pattern

# CORRECT: Circuit breaker with Redis state
class CircuitBreaker:
    CLOSED, OPEN, HALF_OPEN = "closed", "open", "half_open"

    def __init__(self, redis_client, service: str,
                 failure_threshold: int = 5, recovery_timeout: int = 30):
        self.redis = redis_client
        self.prefix = f"myapp:circuit:{service}"
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout

    def can_execute(self) -> bool:
        state = self.redis.get(f"{self.prefix}:state") or self.CLOSED
        if state == self.CLOSED:
            return True
        if state == self.OPEN:
            opened_at = float(self.redis.get(f"{self.prefix}:opened_at") or 0)
            if time.time() - opened_at >= self.recovery_timeout:
                self.redis.set(f"{self.prefix}:state", self.HALF_OPEN)
                return True
            return False
        return True  # HALF_OPEN allows testing

    def record_success(self):
        state = self.redis.get(f"{self.prefix}:state") or self.CLOSED
        if state == self.HALF_OPEN:
            self.redis.set(f"{self.prefix}:state", self.CLOSED)
            self.redis.delete(f"{self.prefix}:failures")

    def record_failure(self):
        state = self.redis.get(f"{self.prefix}:state") or self.CLOSED
        if state == self.HALF_OPEN:
            self.redis.set(f"{self.prefix}:state", self.OPEN)
            self.redis.set(f"{self.prefix}:opened_at", time.time())
        elif state == self.CLOSED:
            failures = self.redis.incr(f"{self.prefix}:failures")
            self.redis.expire(f"{self.prefix}:failures", 60)
            if failures >= self.failure_threshold:
                self.redis.set(f"{self.prefix}:state", self.OPEN)
                self.redis.set(f"{self.prefix}:opened_at", time.time())

Anti-Pattern Summary

Pattern             | Anti-Pattern                        | Fix
--------------------|-------------------------------------|-----------------------------------
Cache-aside         | No TTL on cache keys                | Always SET ... EX
Cache-aside         | Update cache on write               | Invalidate (UNLINK), let read repop
Cache-aside         | No stampede protection              | Lock or PER-based repopulation
Write-through       | Cache before database               | Database first, then cache
Write-behind        | No persistence queue                | Use Streams for async DB writes
Rate limiting       | Non-atomic read + increment         | Lua script or pipeline
Rate limiting       | No TTL on rate keys                 | EXPIRE on every counter
Locks               | No owner verification on release    | Lua: compare owner then DEL
Locks               | No TTL on lock                      | Always SET NX EX
Locks               | Non-atomic release                  | Lua for atomic check + delete
Sessions            | No sliding expiry                   | EXPIRE on every access
Sessions            | JSON string instead of hash         | HSET for partial field updates
Queues              | No dead letter handling             | DLQ after max retries
Queues              | No acknowledgment                   | XACK with consumer groups
Leaderboards        | String keys per player              | Single sorted set
Pub/Sub             | Reliable delivery expectation       | Use Streams instead

Bloom Filter Pattern

Use bloom filters for probabilistic membership testing where false negatives are impossible and false positives are acceptable at a known rate.

# CORRECT: Bloom filter for duplicate detection (requires Redis Bloom module)
class BloomFilter:
    def __init__(self, redis_client, name: str):
        self.redis = redis_client
        self.key = f"myapp:bloom:{name}"

    def create(self, error_rate: float = 0.01, capacity: int = 1000000):
        try:
            self.redis.execute_command('BF.RESERVE', self.key, error_rate, capacity)
        except redis.ResponseError:
            pass  # Filter already exists

    def add(self, item: str) -> bool:
        return bool(self.redis.execute_command('BF.ADD', self.key, item))

    def exists(self, item: str) -> bool:
        return bool(self.redis.execute_command('BF.EXISTS', self.key, item))

# Usage: Prevent duplicate email sends
bloom = BloomFilter(r, "sent_emails")
bloom.create(error_rate=0.001, capacity=10000000)

def should_send_email(email_id: str) -> bool:
    if bloom.exists(email_id):
        return False  # Probably already sent
    bloom.add(email_id)
    return True
Bloom filter properties:
- False negatives: IMPOSSIBLE (if bloom says "not exists", it definitely does not)
- False positives: Possible at configured rate (e.g., 0.1% with 0.001 error rate)
- Memory: ~1.2 MB per 1M items at 1% error rate, ~1.8 MB at 0.1%
- Use cases: Email dedup, URL filtering, username availability pre-check
# WRONG: Set for massive duplicate detection
def check_sent_bad(email_id: str) -> bool:
    return r.sismember("myapp:sent_emails", email_id)
    # Problem: Set with 10M members uses 400MB+; bloom filter uses ~12MB

HyperLogLog Counting Pattern

Use HyperLogLog for approximate cardinality estimation with constant 12 KB memory per key.

# CORRECT: Unique visitor counting with daily rollup
class UniqueCounter:
    def __init__(self, redis_client, prefix: str = "myapp:visitors"):
        self.redis = redis_client
        self.prefix = prefix

    def track(self, visitor_id: str, date: str = None):
        if date is None:
            date = time.strftime('%Y-%m-%d')
        self.redis.pfadd(f"{self.prefix}:daily:{date}", visitor_id)
        self.redis.expire(f"{self.prefix}:daily:{date}", 86400 * 90)

    def daily_count(self, date: str) -> int:
        return self.redis.pfcount(f"{self.prefix}:daily:{date}")

    def weekly_count(self, start_date: str) -> int:
        from datetime import datetime, timedelta
        start = datetime.strptime(start_date, '%Y-%m-%d')
        keys = [
            f"{self.prefix}:daily:{(start + timedelta(days=i)).strftime('%Y-%m-%d')}"
            for i in range(7)
        ]
        weekly_key = f"{self.prefix}:weekly:{start_date}"
        self.redis.pfmerge(weekly_key, *keys)
        self.redis.expire(weekly_key, 86400 * 7)
        return self.redis.pfcount(weekly_key)

# Usage
counter = UniqueCounter(r)
counter.track("user:1001")
counter.track("user:1002")
counter.track("user:1001")  # Duplicate, not counted
daily = counter.daily_count("2024-01-31")  # ~2 (0.81% standard error)
# WRONG: Set for counting millions of unique items
def track_visitor_bad(visitor_id: str):
    r.sadd("myapp:all_visitors", visitor_id)
    # Problem: 10M members use 400MB+ of memory
    # HyperLogLog uses 12KB regardless of cardinality

Geospatial Query Pattern

# CORRECT: Location-based search
class GeoService:
    def __init__(self, redis_client, category: str):
        self.redis = redis_client
        self.key = f"myapp:geo:{category}"

    def add(self, name: str, lon: float, lat: float):
        self.redis.geoadd(self.key, (lon, lat, name))

    def nearby(self, lon: float, lat: float, radius: float,
               unit: str = 'km', count: int = 20) -> list:
        return self.redis.geosearch(
            self.key, longitude=lon, latitude=lat,
            radius=radius, unit=unit, sort='ASC', count=count,
            withcoord=True, withdist=True)

    def distance(self, a: str, b: str, unit: str = 'km') -> float:
        return self.redis.geodist(self.key, a, b, unit=unit)

# Usage
geo = GeoService(r, "restaurants")
geo.add("joes-pizza", -73.9857, 40.7484)
nearby = geo.nearby(-73.9830, 40.7500, 1, 'km')
-- WRONG: Storing coordinates in separate string keys
SET myapp:store:nyc:lat "40.748817"
SET myapp:store:nyc:lon "-73.985428"
-- Problem: Cannot do radius queries, requires application-level distance math

Cache Invalidation Strategies

Event-Driven Invalidation via Pub/Sub

# CORRECT: Invalidate cache across all app instances
def on_product_update(product_id: int):
    """Called after database update."""
    r.unlink(f"myapp:cache:product:{product_id}")
    r.publish("myapp:invalidate:product", json.dumps({'id': product_id}))

# Listener in each app instance
def invalidation_listener():
    pubsub = r.pubsub()
    pubsub.subscribe("myapp:invalidate:product")
    for msg in pubsub.listen():
        if msg['type'] == 'message':
            data = json.loads(msg['data'])
            local_cache.delete(f"product:{data['id']}")

Tag-Based Invalidation

# CORRECT: Tag-based invalidation for related cache entries
def cache_with_tags(key: str, value: str, tags: list, ttl: int = 3600):
    pipe = r.pipeline()
    pipe.set(key, value, ex=ttl)
    for tag in tags:
        pipe.sadd(f"myapp:tag:{tag}", key)
        pipe.expire(f"myapp:tag:{tag}", ttl + 60)
    pipe.execute()

def invalidate_by_tag(tag: str):
    keys = r.smembers(f"myapp:tag:{tag}")
    if keys:
        r.unlink(*keys)
    r.unlink(f"myapp:tag:{tag}")

# Usage
cache_with_tags("myapp:cache:product:5001", data,
                tags=["category:electronics", "brand:acme"])
invalidate_by_tag("category:electronics")  # Clears all electronics cache

Pattern Selection Guide

Need                          | Pattern                    | Key Redis Feature
------------------------------|----------------------------|----------------------
Fast reads, occasional writes | Cache-aside                | String GET/SET EX
Always-fresh cache            | Write-through              | HSET + EXPIRE
Low write latency             | Write-behind               | XADD (async queue)
API rate limiting             | Token bucket               | HMSET + Lua script
Per-user quotas               | Fixed window               | INCR + EXPIRE
Strict rate compliance        | Sliding window             | Sorted Set + ZCARD
Mutual exclusion              | SET NX EX lock             | SET NX EX + Lua release
HA mutual exclusion           | Redlock                    | Multi-instance SET NX
User sessions                 | Hash + sliding expiry      | HSET + EXPIRE on access
Real-time rankings            | Sorted Set leaderboard     | ZADD + ZREVRANGE
Reliable messaging            | Streams + consumer groups  | XREADGROUP + XACK
Fire-and-forget notifications | Pub/Sub                    | PUBLISH + SUBSCRIBE
Background jobs               | Stream queue               | XADD + XREADGROUP
Priority jobs                 | Sorted Set + List          | ZADD + ZPOPMIN
Deduplication at scale        | Bloom filter               | BF.ADD + BF.EXISTS
Unique counting               | HyperLogLog                | PFADD + PFCOUNT
Location search               | Geospatial                 | GEOADD + GEOSEARCH
Service resilience            | Circuit breaker            | Hash state machine
Stats
Stars0
Forks0
Last CommitFeb 10, 2026
Actions

Similar Skills