From harness-claude
Implements in-process TypeScript pub/sub with typed topics, wildcards, pattern matching, and fan-out delivery for decoupling publishers/subscribers. Use for domain events or notifications without guaranteed delivery.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Implement publisher-subscriber communication with topic-based routing and fan-out delivery.
Guides use of Node.js EventEmitter for typed pub-sub communication, memory leak prevention, and event-driven patterns in decoupled modules and TypeScript apps.
Builds event-driven APIs with webhooks, Server-Sent Events, message brokers, event schemas, subscribers, retries, and dead-letter queues.
Guides implementation of event-driven architecture using domain events, event sourcing, CQRS, outbox patterns, and message brokers for decoupled, reliable systems.
Share bugs, ideas, or general feedback.
Implement publisher-subscriber communication with topic-based routing and fan-out delivery.
In-process typed pub/sub:
type Listener<T> = (payload: T) => void | Promise<void>;
class PubSub {
private topics = new Map<string, Set<Listener<unknown>>>();
subscribe<T>(topic: string, listener: Listener<T>): () => void {
if (!this.topics.has(topic)) this.topics.set(topic, new Set());
this.topics.get(topic)!.add(listener as Listener<unknown>);
return () => this.topics.get(topic)?.delete(listener as Listener<unknown>);
}
async publish<T>(topic: string, payload: T): Promise<void> {
const listeners = this.topics.get(topic) ?? new Set();
await Promise.all([...listeners].map((l) => l(payload)));
}
}
// Define topics with types
interface AppTopics {
'user.created': { userId: string; email: string; createdAt: Date };
'order.placed': { orderId: string; userId: string; total: number };
'payment.processed': { orderId: string; status: 'success' | 'failed' };
}
// Type-safe wrapper
class TypedPubSub<Topics extends Record<string, unknown>> {
private bus = new PubSub();
subscribe<K extends keyof Topics & string>(topic: K, listener: Listener<Topics[K]>): () => void {
return this.bus.subscribe(topic, listener);
}
async publish<K extends keyof Topics & string>(topic: K, payload: Topics[K]): Promise<void> {
await this.bus.publish(topic, payload);
}
}
const pubsub = new TypedPubSub<AppTopics>();
// Subscribers register independently
pubsub.subscribe('user.created', async ({ userId, email }) => {
await sendWelcomeEmail(email);
});
pubsub.subscribe('user.created', async ({ userId }) => {
await createDefaultProfile(userId);
});
pubsub.subscribe('order.placed', async ({ orderId, total }) => {
await reserveInventory(orderId);
await chargeTax(total);
});
// Publisher doesn't know about subscribers
await pubsub.publish('user.created', {
userId: 'u-123',
email: 'alice@example.com',
createdAt: new Date(),
});
Topic wildcards and pattern matching:
class WildcardPubSub {
private subscriptions: { pattern: RegExp; listener: Listener<unknown> }[] = [];
subscribe<T>(pattern: string, listener: Listener<T>): () => void {
// Convert glob to regex: 'order.*' → /^order\..+$/
const regex = new RegExp('^' + pattern.replace(/\./g, '\\.').replace(/\*/g, '.+') + '$');
const entry = { pattern: regex, listener: listener as Listener<unknown> };
this.subscriptions.push(entry);
return () => {
this.subscriptions = this.subscriptions.filter((s) => s !== entry);
};
}
async publish<T>(topic: string, payload: T): Promise<void> {
const matching = this.subscriptions.filter((s) => s.pattern.test(topic));
await Promise.all(matching.map((s) => s.listener(payload)));
}
}
const bus = new WildcardPubSub();
bus.subscribe('order.*', (payload) => console.log('Any order event:', payload));
bus.subscribe('*.created', (payload) => console.log('Any created event:', payload));
await bus.publish('order.created', { orderId: '123' }); // matches both
Pub/Sub vs. Message Queue:
| Pub/Sub | Message Queue | |
|---|---|---|
| Delivery | At-most-once (fire & forget) | At-least-once (persisted) |
| Consumers | Many (fan-out) | Competing (one wins) |
| History | No replay | Replay/DLQ |
| Use case | Real-time notifications | Reliable task processing |
Use pub/sub for notifications and fan-out. Use a queue when you need guaranteed processing.
At-most-once delivery: If a subscriber crashes or throws, the event is lost. For critical events (payments, order state changes), use a message queue or transactional outbox instead.
Anti-patterns:
Error isolation:
async publish<T>(topic: string, payload: T): Promise<void> {
const listeners = this.topics.get(topic) ?? new Set();
const results = await Promise.allSettled([...listeners].map(l => l(payload)));
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
console.error(`${failures.length} subscriber(s) failed for topic: ${topic}`);
}
}
microservices.io/patterns/data/event-sourcing.html