Master Redis Pub/Sub - messaging patterns, Streams with consumer groups, event-driven architecture, and real-time data streaming
Master Redis Pub/Sub for fire-and-forget messaging and Streams for persistent, exactly-once processing with consumer groups. Build event-driven architectures with message replay, fault tolerance, and real-time data streaming.
/plugin marketplace add pluginagentmarketplace/custom-plugin-redis/plugin install pluginagentmarketplace-developer-roadmap-interactive@pluginagentmarketplace/custom-plugin-redissonnetProduction-grade agent for Redis messaging systems. Master Pub/Sub for fire-and-forget messaging and Streams for persistent, exactly-once message processing with consumer groups.
┌─────────────────────────────────────────────────────────────────────┐
│ MESSAGING DECISION TREE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Do you need message persistence? │
│ │ │
│ ├── NO ────────────────────────────────> PUB/SUB │
│ │ ├── Real-time notifications │
│ │ ├── Chat messages │
│ │ └── Live updates │
│ │ │
│ └── YES ───────────────────────────────> STREAMS │
│ │ │
│ ├── Need consumer groups? │
│ │ ├── YES ─────> STREAMS + XREADGROUP │
│ │ └── NO ──────> STREAMS + XREAD │
│ │ │
│ └── Need exactly-once processing? │
│ └── YES ─────> STREAMS + XACK + Idempotency │
│ │
└─────────────────────────────────────────────────────────────────────┘
| Feature | Pub/Sub | Streams |
|---|---|---|
| Message Persistence | ❌ | ✅ |
| Consumer Groups | ❌ | ✅ |
| Message Replay | ❌ | ✅ |
| At-least-once | ❌ | ✅ |
| Blocking reads | ❌ | ✅ |
| Pattern subscriptions | ✅ | ❌ |
| Cluster support | ⚠️ Limited | ✅ Full |
| Memory efficiency | ✅ Higher | Medium |
# Subscriber (run first)
redis-cli SUBSCRIBE channel:news
# Publisher
redis-cli PUBLISH channel:news "Breaking news!"
# Pattern subscription
redis-cli PSUBSCRIBE channel:*
redis-cli PSUBSCRIBE user:*:notifications
# Channel management
PUBSUB CHANNELS # List active channels
PUBSUB NUMSUB channel:news # Subscriber count
PUBSUB NUMPAT # Pattern subscription count
# Subscription
SUBSCRIBE ch1 ch2 ch3 # Multiple channels
UNSUBSCRIBE ch1 # Leave channel
PSUBSCRIBE news:* # Pattern match
PUNSUBSCRIBE news:* # Leave pattern
| Concern | Solution |
|---|---|
| No persistence | Use Streams if messages must survive restarts |
| Slow subscribers | Messages dropped if buffer fills |
| No ACK | Implement application-level confirmation |
| Cluster sharding | All nodes receive, app filters |
# Add entries (auto-generated ID)
XADD events * action "click" user_id "123" page "/home"
# Returns: 1704067200000-0
# Add with custom ID
XADD events 1704067200000-0 action "view" user_id "456"
# Read entries
XREAD COUNT 10 STREAMS events 0 # From beginning
XREAD COUNT 10 STREAMS events $ # Only new entries
XREAD BLOCK 5000 STREAMS events $ # Block 5 seconds for new
# Create consumer group
XGROUP CREATE events mygroup $ MKSTREAM
# Read as consumer (blocking)
XREADGROUP GROUP mygroup consumer1 \
COUNT 10 BLOCK 5000 STREAMS events >
# Acknowledge processed messages
XACK events mygroup 1704067200000-0
# Check pending (unacked) messages
XPENDING events mygroup
# Claim old pending messages (stale consumer recovery)
XCLAIM events mygroup consumer2 60000 1704067200000-0
# Takes messages idle > 60 seconds
# Auto-claim (Redis 6.2+)
XAUTOCLAIM events mygroup consumer2 60000 0-0 COUNT 10
┌─────────────────────────────────────────────────────────────────────┐
│ CONSUMER GROUP ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Producer 1 ──┐ │
│ Producer 2 ──┼──> XADD ──> [Stream: events] ──┬──> Consumer 1 │
│ Producer 3 ──┘ │ (mygroup) │
│ ├──> Consumer 2 │
│ │ (mygroup) │
│ └──> Consumer 3 │
│ (mygroup) │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Message Flow: │ │
│ │ 1. XADD → Stream │ │
│ │ 2. XREADGROUP → Consumer (exclusive delivery) │ │
│ │ 3. Process message │ │
│ │ 4. XACK → Remove from pending │ │
│ │ 5. On failure: XCLAIM by another consumer │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
| Command | Purpose | Complexity |
|---|---|---|
| XADD | Add entry | O(1) |
| XREAD | Read entries | O(N) |
| XREADGROUP | Consumer group read | O(M) |
| XACK | Acknowledge | O(1) |
| XPENDING | Check pending | O(N) |
| XCLAIM | Claim message | O(log N) |
| XAUTOCLAIM | Auto-claim stale | O(1) |
| XTRIM | Limit stream size | O(N) |
| XLEN | Stream length | O(1) |
| XINFO GROUPS | Group info | O(1) |
def process_stream():
while True:
# Read with consumer group
messages = r.xreadgroup(
'mygroup', 'consumer1',
{'events': '>'},
count=10, block=5000
)
for stream, entries in messages:
for msg_id, data in entries:
try:
# Idempotency check
if r.sismember('processed', msg_id):
r.xack('events', 'mygroup', msg_id)
continue
# Process message
process(data)
# Mark as processed (atomic)
pipe = r.pipeline()
pipe.sadd('processed', msg_id)
pipe.xack('events', 'mygroup', msg_id)
pipe.execute()
except Exception as e:
# Message stays pending, will be retried
log.error(f"Failed: {msg_id}", e)
# By max length (approximate)
XADD events MAXLEN ~ 10000 * field value
# By min ID (exact)
XTRIM events MINID = 1704067200000-0
# Capped + LIMIT (Redis 6.2+)
XTRIM events MAXLEN ~ 10000 LIMIT 100
redis-transactions - Atomic operationsredis-cluster - Distributed streamsNOGROUP No such key 'mystream' or consumer group 'mygroup'
Fix:
# Create stream + group
XGROUP CREATE mystream mygroup $ MKSTREAM
# Or create on existing stream
XGROUP CREATE mystream mygroup 0
Diagnosis:
# Check pending entries
XPENDING events mygroup
# Detailed pending info
XPENDING events mygroup - + 10 consumer1
Causes & Fixes:
| Cause | Fix |
|---|---|
| Consumer crashed | XCLAIM or XAUTOCLAIM |
| No XACK sent | Add XACK after processing |
| Consumer blocked | Check consumer health |
# Check stream length
XLEN events
# Memory used
MEMORY USAGE events
Fix:
# Cap at creation
XADD events MAXLEN ~ 100000 * field value
# Manual trim
XTRIM events MAXLEN ~ 100000
Causes:
Mitigation:
# Check subscriber count before publish
PUBSUB NUMSUB channel:critical
# If 0, queue in Stream instead
□ Consumer group exists?
□ Using correct stream/group names?
□ XACK sent after processing?
□ Handling pending messages on restart?
□ Stream size capped with MAXLEN?
□ Consumer claiming stale messages?
□ Idempotency for exactly-once?
| Log Pattern | Meaning | Action |
|---|---|---|
| Pending entries growing | Consumers failing | Check consumer health |
| Memory increasing | Unbounded stream | Add MAXLEN |
| Zero deliveries | No new messages | Normal if idle |
| High claim rate | Consumer failures | Investigate crashes |
| Code | Name | Description | Recovery |
|---|---|---|---|
| E301 | NOGROUP | Group doesn't exist | XGROUP CREATE |
| E302 | BUSYGROUP | Group already exists | Ignore or use different name |
| E303 | XREAD_TIMEOUT | No messages in timeout | Normal, retry |
| E304 | STREAM_FULL | Reached MAXLEN | Trim or increase limit |
| E305 | CONSUMER_STALE | Consumer not responding | XAUTOCLAIM |
| Throughput | Consumers | COUNT | BLOCK |
|---|---|---|---|
| Low (<1K/s) | 1-2 | 100 | 5000ms |
| Medium (<10K/s) | 3-5 | 500 | 2000ms |
| High (<100K/s) | 10+ | 1000 | 1000ms |
| Metric | Pub/Sub | Streams |
|---|---|---|
| Publish latency | ~0.1ms | ~0.2ms |
| Memory per message | 0 (transient) | ~100 bytes |
| Max throughput | 1M+/s | 500K/s |
| Consumer scaling | Broadcast | Partitioned |
You are an elite AI agent architect specializing in crafting high-performance agent configurations. Your expertise lies in translating user requirements into precisely-tuned agent specifications that maximize effectiveness and reliability.