From harness-claude
Implements Redis pub/sub channels, pattern subscriptions, and keyspace notifications for low-latency real-time messaging, cache invalidation, presence, and key events.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Use Redis pub/sub channels and keyspace notifications for lightweight real-time messaging.
Provides TypeScript patterns for Redis: cache-aside/invalidation, sliding window rate limiting, pub/sub, and consumer group streams for events.
Guides Redis system design: data structures for caching, queues, leaderboards, sessions; caching strategies, pub/sub, streams, clustering, memory optimization, Lua scripting.
Provides expert Redis guidance on caching strategies, pub/sub, data structures, rate limiting, distributed locks, sessions, leaderboards, and message queues. Grounds responses in patterns.md, sharp_edges.md, and validations.md references.
Share bugs, ideas, or general feedback.
Use Redis pub/sub channels and keyspace notifications for lightweight real-time messaging.
Basic pub/sub with ioredis:
import Redis from 'ioredis';
const publisher = new Redis({ host: 'localhost', port: 6379 });
const subscriber = new Redis({ host: 'localhost', port: 6379 });
// Note: subscriber connection CANNOT be used for other commands
// Subscribe to channels
await subscriber.subscribe('cache.invalidation', 'user.presence');
subscriber.on('message', (channel: string, message: string) => {
console.log(`[${channel}] ${message}`);
if (channel === 'cache.invalidation') {
const { key } = JSON.parse(message);
localCache.delete(key);
}
if (channel === 'user.presence') {
const { userId, status } = JSON.parse(message);
updatePresence(userId, status);
}
});
// Publish from anywhere (different connection)
async function invalidateCache(key: string): Promise<void> {
await publisher.publish('cache.invalidation', JSON.stringify({ key, timestamp: Date.now() }));
}
async function updateUserPresence(userId: string, status: 'online' | 'offline'): Promise<void> {
await publisher.publish('user.presence', JSON.stringify({ userId, status }));
}
Pattern subscriptions (wildcard channels):
// Subscribe to all channels matching a pattern
await subscriber.psubscribe('order.*');
subscriber.on('pmessage', (pattern: string, channel: string, message: string) => {
console.log(`Pattern: ${pattern}, Channel: ${channel}, Message: ${message}`);
// pattern = 'order.*', channel = 'order.created', message = '{"orderId":"..."}'
const eventType = channel.split('.')[1]; // 'created', 'shipped', etc.
handleOrderEvent(eventType, JSON.parse(message));
});
// Publish to specific channels — matched by pattern
await publisher.publish('order.created', JSON.stringify({ orderId: '123' }));
await publisher.publish('order.shipped', JSON.stringify({ orderId: '123', tracking: 'UPS123' }));
Keyspace notifications (react to Redis key events):
// Enable keyspace notifications in Redis config (or via command)
// notify-keyspace-events "Ex" — expired events
// notify-keyspace-events "KEx" — keyspace + expired events
const adminClient = new Redis({ host: 'localhost', port: 6379 });
await adminClient.config('SET', 'notify-keyspace-events', 'Ex'); // enable expired events
const notifSub = new Redis({ host: 'localhost', port: 6379 });
await notifSub.psubscribe('__keyevent@0__:expired'); // channel for db 0 key expirations
notifSub.on('pmessage', (_pattern, _channel, key) => {
console.log(`Key expired: ${key}`);
// e.g., session expired → log out user
if (key.startsWith('session:')) {
const userId = key.replace('session:', '');
handleSessionExpiry(userId);
}
});
// Set a key with expiry — triggers the notification when it expires
await adminClient.set('session:user-123', 'active', 'EX', 3600);
Typed channel wrapper:
type Channels = {
'cache.invalidation': { key: string; timestamp: number };
'user.presence': { userId: string; status: 'online' | 'offline' };
};
class TypedRedisChannel<C extends Record<string, unknown>> {
constructor(
private readonly pub: Redis,
private readonly sub: Redis
) {}
async publish<K extends keyof C & string>(channel: K, data: C[K]): Promise<void> {
await this.pub.publish(channel, JSON.stringify(data));
}
subscribe<K extends keyof C & string>(channel: K, handler: (data: C[K]) => void): () => void {
this.sub.subscribe(channel);
const listener = (ch: string, msg: string) => {
if (ch === channel) handler(JSON.parse(msg) as C[K]);
};
this.sub.on('message', listener);
return () => {
this.sub.unsubscribe(channel);
this.sub.off('message', listener);
};
}
}
const channels = new TypedRedisChannel<Channels>(publisher, subscriber);
channels.subscribe('cache.invalidation', ({ key }) => localCache.delete(key));
await channels.publish('cache.invalidation', { key: 'user:123', timestamp: Date.now() });
Redis pub/sub is at-most-once: If a subscriber is down when a message is published, the message is lost. For critical notifications, use a message queue or Redis Streams instead.
Redis Streams vs. pub/sub:
| Feature | Pub/Sub | Streams |
|---|---|---|
| Persistence | No | Yes |
| Consumer groups | No | Yes |
| Message history | No | Yes (by ID) |
| Delivery | At-most-once | At-least-once |
Use Streams when you need durability. Use pub/sub for lightweight real-time signals.
Anti-patterns:
connect eventReconnection handling:
subscriber.on('reconnecting', () => {
console.log('Reconnecting to Redis...');
});
subscriber.on('ready', async () => {
// Re-subscribe after reconnect
await subscriber.subscribe('cache.invalidation', 'user.presence');
});
redis.io/docs/latest/develop/interact/pubsub/