From harness-claude
Sets up message queues for reliable at-least-once async delivery with competing consumers and dead letter queues using BullMQ (Redis/Node.js) and RabbitMQ.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Use message queues for reliable async delivery with competing consumers and dead letter queues.
Guides message queue and job processing setup with Kafka, RabbitMQ, SQS, BullMQ, Celery, Sidekiq. Covers architecture, retries, DLQs, idempotency, priorities, backpressure, and scaling.
Provides BullMQ expertise for Redis-backed job queues, background processing, delayed/repeatable jobs, rate limiting, and worker patterns in Node.js/TypeScript apps.
Provides BullMQ expertise for Redis-backed job queues, background processing, scheduling, workers, and reliable async execution in Node.js/TypeScript apps.
Share bugs, ideas, or general feedback.
Use message queues for reliable async delivery with competing consumers and dead letter queues.
BullMQ (Redis-backed queue — recommended for Node.js):
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
const connection = new Redis({ host: 'localhost', port: 6379 });
// Define job data types
interface SendEmailJobData {
to: string;
subject: string;
body: string;
templateId?: string;
}
// Producer — enqueue jobs
const emailQueue = new Queue<SendEmailJobData>('email', { connection });
async function scheduleWelcomeEmail(userId: string, email: string): Promise<void> {
await emailQueue.add(
'welcome',
{ to: email, subject: 'Welcome!', body: 'Thanks for joining.', templateId: 'welcome-v2' },
{
delay: 5_000, // wait 5s before processing
attempts: 3, // retry up to 3 times
backoff: { type: 'exponential', delay: 2_000 }, // 2s, 4s, 8s
removeOnComplete: { count: 100 }, // keep last 100 completed jobs
removeOnFail: { count: 50 }, // keep last 50 failed jobs
}
);
}
// Consumer — process jobs
const emailWorker = new Worker<SendEmailJobData>(
'email',
async (job: Job<SendEmailJobData>) => {
console.log(`Processing job ${job.id}: send email to ${job.data.to}`);
await sendEmail(job.data.to, job.data.subject, job.data.body);
// Returning from the processor marks the job complete
},
{
connection,
concurrency: 5, // process 5 jobs in parallel
}
);
emailWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
emailWorker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed after all retries:`, err.message);
// At this point, job moves to the failed set (acts as DLQ)
});
RabbitMQ with amqplib:
import amqp from 'amqplib';
const QUEUE = 'order.processing';
const DLQ = 'order.processing.dlq';
async function setupQueue(): Promise<void> {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
// Dead letter queue
await channel.assertQueue(DLQ, { durable: true });
// Main queue with DLQ routing
await channel.assertQueue(QUEUE, {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': DLQ,
'x-message-ttl': 3_600_000, // messages expire after 1h
},
});
channel.prefetch(1); // process one message at a time per consumer
return channel;
}
// Producer
async function publishOrder(orderId: string, amount: number): Promise<void> {
const channel = await setupQueue();
const message = JSON.stringify({ orderId, amount, timestamp: new Date() });
channel.sendToQueue(QUEUE, Buffer.from(message), { persistent: true });
}
// Consumer
async function startConsumer(): Promise<void> {
const channel = await setupQueue();
await channel.consume(QUEUE, async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
await processOrder(data.orderId, data.amount);
channel.ack(msg); // acknowledge on success
} catch (err) {
console.error('Processing failed:', err);
// nack with requeue=false — sends to DLQ after retries exhausted
channel.nack(msg, false, false);
}
});
}
Competing consumers pattern:
// Run multiple workers — each message processed by exactly ONE worker
// Scale by adding more worker instances
// Worker 1 (process 1)
const worker1 = new Worker('orders', processOrder, { connection, concurrency: 3 });
// Worker 2 (process 2 or separate machine)
const worker2 = new Worker('orders', processOrder, { connection, concurrency: 3 });
// BullMQ guarantees only one worker processes each job
Delivery guarantees:
Dead letter queue strategy:
Anti-patterns:
Monitoring queues:
// BullMQ — check queue health
const counts = await emailQueue.getJobCounts('waiting', 'active', 'completed', 'failed', 'delayed');
console.log(counts);
// Expose as /metrics endpoint for Prometheus
microservices.io/patterns/communication-style/messaging.html