From harness-claude
Implements dead letter queues for repeatedly failing messages after retries, enabling safe inspection, alerting, and reprocessing in event-driven systems.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Handle permanently failing messages with dead letter queues for safe inspection, alerting, and reprocessing
Generates Dead Letter Queue components for PHP 8.4: failed message capture, exponential backoff retry strategy, failure classification, DLQ processor, database store via PDO, and unit tests.
Sets up message queues for reliable at-least-once async delivery with competing consumers and dead letter queues using BullMQ (Redis/Node.js) and RabbitMQ.
Guides message queue and job processing setup with Kafka, RabbitMQ, SQS, BullMQ, Celery, Sidekiq. Covers architecture, retries, DLQs, idempotency, priorities, backpressure, and scaling.
Share bugs, ideas, or general feedback.
Handle permanently failing messages with dead letter queues for safe inspection, alerting, and reprocessing
// queues/dead-letter.ts
interface DeadLetterEntry<T> {
id: string;
originalQueue: string;
payload: T;
error: string;
attempts: number;
firstFailedAt: string;
lastFailedAt: string;
metadata: Record<string, unknown>;
}
export class DeadLetterQueue<T> {
private entries: Map<string, DeadLetterEntry<T>> = new Map();
constructor(
private readonly name: string,
private readonly onDeadLetter?: (entry: DeadLetterEntry<T>) => void
) {}
add(entry: Omit<DeadLetterEntry<T>, 'id' | 'lastFailedAt'>): void {
const id = crypto.randomUUID();
const deadLetter: DeadLetterEntry<T> = {
...entry,
id,
lastFailedAt: new Date().toISOString(),
};
this.entries.set(id, deadLetter);
this.onDeadLetter?.(deadLetter);
console.error(`[DLQ:${this.name}] Message dead-lettered: ${entry.error}`, {
id,
attempts: entry.attempts,
});
}
list(): DeadLetterEntry<T>[] {
return Array.from(this.entries.values());
}
get(id: string): DeadLetterEntry<T> | undefined {
return this.entries.get(id);
}
remove(id: string): boolean {
return this.entries.delete(id);
}
reprocess(id: string): T | undefined {
const entry = this.entries.get(id);
if (entry) {
this.entries.delete(id);
return entry.payload;
}
return undefined;
}
get count(): number {
return this.entries.size;
}
}
// workers/order-processor.ts
import { DeadLetterQueue } from '../queues/dead-letter';
interface OrderMessage {
orderId: string;
items: Array<{ productId: string; qty: number }>;
}
const dlq = new DeadLetterQueue<OrderMessage>('orders', (entry) => {
// Alert on dead letter
alerting.send({
severity: 'warning',
message: `Order processing failed: ${entry.error}`,
context: { orderId: entry.payload.orderId, attempts: entry.attempts },
});
});
const MAX_RETRIES = 3;
async function processMessage(message: OrderMessage, attempt = 1): Promise<void> {
try {
await orderService.process(message);
} catch (error) {
if (attempt >= MAX_RETRIES) {
dlq.add({
originalQueue: 'orders',
payload: message,
error: error instanceof Error ? error.message : String(error),
attempts: attempt,
firstFailedAt: new Date().toISOString(),
metadata: { lastAttemptError: String(error) },
});
return; // Do not rethrow — message is safely in DLQ
}
// Retry with backoff
await delay(1000 * Math.pow(2, attempt));
return processMessage(message, attempt + 1);
}
}
Cloud provider DLQs:
RedrivePolicy with maxReceiveCount and deadLetterTargetArndeadLetterPolicy on the subscriptionx-dead-letter-exchange on the queueBullMQ (Node.js) dead letter pattern:
const queue = new Queue('orders');
const worker = new Worker('orders', processOrder, {
settings: {
backoffStrategies: { custom: (attemptsMade) => Math.pow(2, attemptsMade) * 1000 },
},
});
worker.on('failed', async (job, err) => {
if (job && job.attemptsMade >= job.opts.attempts!) {
// Move to DLQ
await dlqQueue.add('dead-letter', {
originalJob: job.data,
error: err.message,
attempts: job.attemptsMade,
});
}
});
Reprocessing strategy: When the underlying issue is fixed:
DLQ metrics to track: Message count (should trend toward zero), age of oldest message, inflow rate (new messages/hour), category of errors.