From ts-dev-kit
BullMQ reference for Node.js Redis-backed job queues: create queues/workers, add delayed/prioritized/repeatable/deduplicated jobs, FlowProducer hierarchies, retries/rate limiting/concurrency, cron schedulers, production setup, stalled job debugging.
npx claudepluginhub jgamaraalv/ts-dev-kit --plugin ts-dev-kitThis skill uses the workspace's default tool permissions.
Redis-backed queue system for Node.js. Four core classes: `Queue`, `Worker`, `QueueEvents`, `FlowProducer`.
Provides BullMQ expertise for Redis-backed job queues, background processing, delayed/repeatable jobs, rate limiting, and worker patterns in Node.js/TypeScript apps.
Sets up message queues for reliable at-least-once async delivery with competing consumers and dead letter queues using BullMQ (Redis/Node.js) and RabbitMQ.
Designs BullMQ queue services, processors, and retry strategies in NestJS for scalable video and image processing workflows.
Share bugs, ideas, or general feedback.
Redis-backed queue system for Node.js. Four core classes: Queue, Worker, QueueEvents, FlowProducer.
yarn add bullmq — requires Redis 5.0+ with maxmemory-policy=noeviction.
<quick_reference>
import { Queue, Worker, QueueEvents } from "bullmq";
// --- Producer ---
const queue = new Queue("my-queue", {
connection: { host: "localhost", port: 6379 },
});
await queue.add("job-name", { foo: "bar" });
// --- Consumer ---
const worker = new Worker(
"my-queue",
async (job) => {
// process job
await job.updateProgress(50);
return { result: "done" };
},
{ connection: { host: "localhost", port: 6379 } },
);
worker.on("completed", (job, returnvalue) => {
console.log(`${job.id} completed with`, returnvalue);
});
worker.on("failed", (job, err) => {
console.error(`${job.id} failed with`, err.message);
});
// IMPORTANT: always attach an error handler
worker.on("error", (err) => {
console.error(err);
});
// --- Global event listener (all workers) ---
const queueEvents = new QueueEvents("my-queue", {
connection: { host: "localhost", port: 6379 },
});
queueEvents.on("completed", ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed`);
});
queueEvents.on("failed", ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});
add() → wait / prioritized / delayed
↓
active → completed
↓
failed → (retry) → wait/delayed
With FlowProducer: jobs can also be in waiting-children state until all children complete.
</quick_reference>
BullMQ uses ioredis internally. Pass connection options or an existing ioredis instance.
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
// Option 1: connection config (new connection per instance)
const queue = new Queue("q", {
connection: { host: "redis.example.com", port: 6379 },
});
// Option 2: reuse ioredis instance (Queue and multiple Queues can share)
const connection = new Redis();
const q1 = new Queue("q1", { connection });
const q2 = new Queue("q2", { connection });
// Option 3: reuse for Workers (BullMQ internally duplicates for blocking)
const workerConn = new Redis({ maxRetriesPerRequest: null });
const w1 = new Worker("q1", async (job) => {}, { connection: workerConn });
Critical rules:
maxRetriesPerRequest: null on the ioredis instance. BullMQ enforces this and will warn/throw if not set.keyPrefix option — use BullMQ's prefix option instead.QueueEvents cannot share connections (uses blocking Redis commands).maxmemory-policy=noeviction.const queue = new Queue("paint", { connection });
// Add a job
await queue.add("job-name", { color: "red" });
// Add with options
await queue.add(
"job-name",
{ color: "blue" },
{
delay: 5000, // wait 5s before processing
priority: 1, // lower = higher priority (0 is highest, max 2^21)
attempts: 3, // retry up to 3 times
backoff: { type: "exponential", delay: 1000 },
removeOnComplete: true, // or { count: 100 } to keep last 100
removeOnFail: 1000, // keep last 1000 failed jobs
},
);
// Add bulk
await queue.addBulk([
{ name: "job1", data: { x: 1 } },
{ name: "job2", data: { x: 2 }, opts: { priority: 1 } },
]);
// Queue operations
await queue.pause();
await queue.resume();
await queue.obliterate({ force: true }); // remove all data
await queue.close();
const worker = new Worker<MyData, MyReturn>(
"paint",
async (job) => {
await job.updateProgress(42);
return { cost: 100 };
},
{
connection,
concurrency: 5, // process 5 jobs concurrently
autorun: false, // don't start immediately
},
);
worker.run(); // start when ready
// Update concurrency at runtime
worker.concurrency = 10;
Processor receives 3 args: (job, token?, signal?) — signal is an AbortSignal for cancellation support.
interface JobData {
color: string;
}
interface JobReturn {
cost: number;
}
const queue = new Queue<JobData, JobReturn>("paint");
const worker = new Worker<JobData, JobReturn>("paint", async (job) => {
// job.data is typed as JobData
return { cost: 100 }; // must match JobReturn
});
Worker events (local to that worker instance):
| Event | Callback signature |
|---|---|
completed | (job, returnvalue) |
failed | (job | undefined, error, prev) |
progress | (job, progress: number | object) |
drained | () — queue is empty |
error | (error) — MUST attach this handler |
QueueEvents (global, all workers, uses Redis Streams):
| Event | Callback signature |
|---|---|
completed | ({ jobId, returnvalue }) |
failed | ({ jobId, failedReason }) |
progress | ({ jobId, data }) |
waiting | ({ jobId }) |
active | ({ jobId, prev }) |
delayed | ({ jobId, delay }) |
deduplicated | ({ jobId, deduplicationId, deduplicatedJobId }) |
Event stream is auto-trimmed (~10,000 events). Configure via streams.events.maxLen.