From bunqueue
Implements job queues in Bun apps with bunqueue: embedded/TCP modes, Workers, simple Bunqueue, cron, retries, DLQ, middleware, routes.
npx claudepluginhub egeominotti/bunqueueThis skill is limited to using the following tools:
You are helping a developer use **bunqueue**, a high-performance job queue for Bun with SQLite persistence.
Guides message queue and job processing setup with Kafka, RabbitMQ, SQS, BullMQ, Celery, Sidekiq. Covers architecture, retries, DLQs, idempotency, priorities, backpressure, and scaling.
Sets up Cloudflare Queues for producers/consumers, batch processing, retries, dead letter queues, and resolves errors like timeouts, retries, and backlogs.
Implements Upstash QStash for serverless message queues, scheduled cron jobs, delayed HTTP calls, and reliable webhook delivery without infrastructure management.
Share bugs, ideas, or general feedback.
You are helping a developer use bunqueue, a high-performance job queue for Bun with SQLite persistence.
bun add bunqueue
Bunqueue): All-in-one wrapper. Best for getting started fast.Simple Mode gives you a Queue and a Worker in a single object. Add jobs, process them, add middleware, schedule crons — all from one place. Use Bunqueue when producer and consumer are in the same process. For distributed systems, use Queue + Worker separately.
For full API details, see reference.md
new Bunqueue('emails', opts)
│
├── this.queue = new Queue('emails', ...)
├── this.worker = new Worker('emails', ...)
│
└── Subsystems (all optional):
├── RetryEngine — jitter, fibonacci, exponential, custom
├── CircuitBreaker — pauses worker after N failures
├── BatchAccumulator — groups N jobs into one call
├── TriggerManager — "on complete → create job B"
├── TtlChecker — rejects expired jobs
├── PriorityAger — boosts old jobs' priority
├── CancellationManager — AbortController per job
└── DedupDebounceMerger — deduplication & debounce defaults
Processing pipeline per job: Job → Circuit Breaker → TTL check → AbortController → Retry → Middleware → Processor
import { Bunqueue } from 'bunqueue/client';
const app = new Bunqueue('emails', {
embedded: true,
processor: async (job) => {
console.log(`Sending to ${job.data.to}`);
return { sent: true };
},
});
await app.add('send', { to: 'alice@example.com' });
const app = new Bunqueue('notifications', {
embedded: true,
routes: {
'send-email': async (job) => {
await sendEmail(job.data.to);
return { channel: 'email' };
},
'send-sms': async (job) => {
await sendSMS(job.data.to);
return { channel: 'sms' };
},
},
});
await app.add('send-email', { to: 'alice' });
await app.add('send-sms', { to: 'bob' });
Use one of
processor,routes, orbatch. Passing multiple or none throws an error.
// Timing middleware
app.use(async (job, next) => {
const start = Date.now();
const result = await next();
console.log(`${job.name}: ${Date.now() - start}ms`);
return result;
});
// Error recovery middleware
app.use(async (job, next) => {
try {
return await next();
} catch (err) {
return { recovered: true, error: err.message };
}
});
Execution order: mw1 → mw2 → processor → mw2 → mw1. Zero overhead when no middleware.
const app = new Bunqueue('db-inserts', {
embedded: true,
batch: {
size: 50, // flush every 50 jobs
timeout: 2000, // or every 2 seconds
processor: async (jobs) => {
const rows = jobs.map(j => j.data.row);
await db.insertMany('table', rows);
return jobs.map(() => ({ inserted: true }));
},
},
});
const app = new Bunqueue('api-calls', {
embedded: true,
processor: async (job) => {
const res = await fetch(job.data.url);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return { status: res.status };
},
retry: {
maxAttempts: 5,
delay: 1000,
strategy: 'jitter', // 'fixed' | 'exponential' | 'jitter' | 'fibonacci' | 'custom'
retryIf: (error) => error.message.includes('503'),
},
});
Strategies: fixed (constant delay), exponential (delay × 2^attempt), jitter (exponential × random 0.5-1.0), fibonacci (delay × fib(attempt)), custom (customBackoff(attempt, error) → ms). This is in-process retry — the job stays active.
const app = new Bunqueue('encoding', {
embedded: true,
processor: async (job) => {
const signal = app.getSignal(job.id);
for (const chunk of chunks) {
if (signal?.aborted) throw new Error('Cancelled');
await encode(chunk);
}
return { done: true };
},
});
const job = await app.add('video', { file: 'big.mp4' });
app.cancel(job.id); // cancel immediately
app.cancel(job.id, 5000); // cancel after 5s grace period
Works with fetch too: await fetch(url, { signal }).
Pauses the worker after too many consecutive failures: CLOSED → OPEN (paused) → HALF-OPEN → CLOSED
const app = new Bunqueue('payments', {
embedded: true,
processor: async (job) => paymentGateway.charge(job.data),
circuitBreaker: {
threshold: 5, // open after 5 failures
resetTimeout: 30000, // try again after 30s
onOpen: () => alert('Gateway down!'),
onClose: () => alert('Gateway recovered'),
},
});
app.getCircuitState(); // 'closed' | 'open' | 'half-open'
app.resetCircuit(); // force close + resume worker
const app = new Bunqueue('orders', {
embedded: true,
routes: {
'place-order': async (job) => ({ orderId: job.data.id, total: 99 }),
'send-receipt': async (job) => ({ sent: true }),
'fraud-alert': async (job) => ({ alerted: true }),
},
});
app.trigger({ on: 'place-order', create: 'send-receipt', data: (result, job) => ({ id: job.data.id }) });
app.trigger({ on: 'place-order', create: 'fraud-alert', data: (r) => ({ amount: r.total }), condition: (r) => r.total > 1000 });
// Chain triggers
app.trigger({ on: 'step-1', create: 'step-2', data: (r) => r })
.trigger({ on: 'step-2', create: 'step-3', data: (r) => r });
const app = new Bunqueue('otp', {
embedded: true,
processor: async (job) => verifyOTP(job.data.code),
ttl: {
defaultTtl: 300000, // 5 minutes for all jobs
perName: { 'verify-otp': 60000 }, // 1 minute for OTP
},
});
Resolution: perName[job.name] → defaultTtl → 0 (no TTL).
Automatically boosts priority of old waiting jobs to prevent starvation:
const app = new Bunqueue('tasks', {
embedded: true,
processor: async (job) => ({ done: true }),
priorityAging: {
interval: 60000, // check every 60s
minAge: 300000, // boost after 5 minutes
boost: 2, // +2 priority per tick
maxPriority: 100, // cap
},
});
const app = new Bunqueue('webhooks', {
embedded: true,
processor: async (job) => processWebhook(job.data),
deduplication: { ttl: 60000 },
});
await app.add('hook', { event: 'user.created', userId: '123' });
await app.add('hook', { event: 'user.created', userId: '123' }); // deduplicated!
const app = new Bunqueue('search', {
embedded: true,
processor: async (job) => executeSearch(job.data.query),
debounce: { ttl: 500 },
});
await app.add('search', { query: 'h' });
await app.add('search', { query: 'he' });
await app.add('search', { query: 'hello' }); // only this one processes
const app = new Bunqueue('api', {
embedded: true,
processor: async (job) => callExternalAPI(job.data),
rateLimit: { max: 100, duration: 1000 },
});
// Per-group rate limiting
const app2 = new Bunqueue('api', {
embedded: true,
processor: async (job) => callAPI(job.data),
rateLimit: { max: 10, duration: 1000, groupKey: 'customerId' },
});
const app = new Bunqueue('critical', {
embedded: true,
processor: async (job) => riskyOperation(job.data),
dlq: {
autoRetry: true,
autoRetryInterval: 3600000,
maxAutoRetries: 3,
maxAge: 604800000,
maxEntries: 10000,
},
});
app.getDlq(); // all entries
app.getDlqStats(); // { total, byReason, ... }
app.retryDlq(); // retry all
app.purgeDlq(); // clear all
await app.cron('daily-report', '0 9 * * *', { type: 'report' });
await app.cron('eu-digest', '0 8 * * 1', { type: 'weekly' }, { timezone: 'Europe/Rome' });
await app.every('healthcheck', 30000, { type: 'ping' });
app.on('completed', (job, result) => {});
app.on('failed', (job, error) => {});
app.on('active', (job) => {});
app.on('stalled', (jobId, reason) => {});
app.on('drained', () => {});
import { Bunqueue, shutdownManager } from 'bunqueue/client';
const app = new Bunqueue('my-app', {
embedded: true,
routes: {
'process': async (job) => ({ id: job.data.payload, status: 'done' }),
'notify': async (job) => ({ sent: true }),
'alert': async (job) => ({ alerted: true }),
},
concurrency: 10,
retry: { maxAttempts: 3, delay: 1000, strategy: 'jitter' },
circuitBreaker: { threshold: 5, resetTimeout: 30000 },
ttl: { defaultTtl: 600000, perName: { 'verify-otp': 60000 } },
priorityAging: { interval: 60000, minAge: 300000, boost: 1 },
deduplication: { ttl: 5000 },
rateLimit: { max: 100, duration: 1000 },
dlq: { autoRetry: true, maxAge: 604800000 },
});
app.use(async (job, next) => {
const start = Date.now();
const result = await next();
console.log(`${job.name}: ${Date.now() - start}ms`);
return result;
});
app.trigger({ on: 'process', create: 'notify', data: (r) => ({ payload: r.id }) })
.trigger({ on: 'process', event: 'failed', create: 'alert', data: (_, j) => j.data });
await app.cron('cleanup', '0 2 * * *', { payload: 'nightly' });
await app.add('process', { payload: 'ORD-001' });
process.on('SIGINT', async () => {
await app.close();
shutdownManager();
});
import { Queue, Worker } from 'bunqueue/client';
// Embedded mode
const queue = new Queue('emails', {
embedded: true,
dataPath: './data/myapp.db',
});
// TCP mode (requires bunqueue server running)
// const queue = new Queue('emails', { connection: { port: 6789 } });
// Add jobs
await queue.add('welcome', { userId: 123 });
await queue.add('urgent', { alert: true }, { priority: 10 });
await queue.add('later', { data: 1 }, { delay: 60000 });
await queue.add('critical', { data: 1 }, { durable: true }); // Immediate disk write
// Bulk add
await queue.addBulk([
{ name: 'task1', data: { x: 1 } },
{ name: 'task2', data: { x: 2 }, opts: { priority: 5 } },
]);
// Worker
const worker = new Worker('emails', async (job) => {
await job.updateProgress(50);
await job.log('Processing...');
return { sent: true };
}, {
concurrency: 5,
heartbeatInterval: 10000,
});
worker.on('completed', (job, result) => {});
worker.on('failed', (job, err) => {});
// Graceful shutdown
process.on('SIGTERM', async () => {
await worker.close();
await queue.close();
});
priority: Higher number = processed soonerdelay: Milliseconds before job becomes availableattempts: Max retries (default: 3)backoff: Retry delay in ms (default: 1000)timeout: Processing timeout in msjobId: Custom ID for idempotency/deduplicationdurable: Bypass write buffer, immediate disk writeremoveOnComplete / removeOnFail: Auto-cleanupconcurrency: Parallel jobs (default: 1)heartbeatInterval: Stall detection interval (default: 10000ms, 0=disabled)batchSize: Jobs to pull per batch (default: 10, max: 1000)pollTimeout: Long poll timeout (default: 0, max: 30000ms)queue.setDlqConfig({ autoRetry: true, maxAge: 604800000, maxEntries: 10000 });
const entries = queue.getDlq({ reason: 'timeout' });
queue.retryDlq(); // Retry all DLQ jobs
queue.purgeDlq(); // Delete all DLQ jobs
queue.setStallConfig({ stallInterval: 30000, maxStalls: 3, gracePeriod: 5000 });
import { FlowProducer } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });
await flow.add({
name: 'parent-job',
queueName: 'pipeline',
data: { step: 'final' },
children: [
{ name: 'child-1', queueName: 'pipeline', data: { step: 1 } },
{ name: 'child-2', queueName: 'pipeline', data: { step: 2 } },
],
});
// Parent waits for all children to complete before running
queue.pause(); // Stop processing
queue.resume(); // Resume processing
queue.drain(); // Remove all waiting jobs
queue.obliterate(); // Delete everything
queue.clean('completed', 3600000); // Clean old completed jobs
Transparent batching for TCP mode. Sequential adds have zero overhead; concurrent adds get ~3x speedup.
const queue = new Queue('jobs', {
connection: { port: 6789 },
autoBatch: { maxSize: 50, maxDelayMs: 5 }, // defaults, enabled by default in TCP
});
// Sequential: same speed as without batching
for (const item of items) {
await queue.add('task', item);
}
// Concurrent: batched into single PUSHB round-trip (~3x faster)
await Promise.all(tasks.map(t => queue.add('process', t)));
// Durable jobs bypass the batcher
await queue.add('critical', data, { durable: true });
import { QueueGroup } from 'bunqueue/client';
const billing = new QueueGroup('billing');
const invoices = billing.getQueue('invoices'); // → "billing:invoices"
const payments = billing.getQueue('payments'); // → "billing:payments"
const worker = billing.getWorker('invoices', async (job) => {
return await generateInvoice(job.data);
}, { concurrency: 5 });
await invoices.add('monthly', { customerId: '123' });
billing.pauseAll(); // pause all billing:* queues
billing.resumeAll();
billing.drainAll();
billing.obliterateAll();
Receive HTTP notifications on job events. SSRF-protected, with HMAC signing and retry.
// Via SDK (TCP mode)
await queue.addWebhook({
url: 'https://api.example.com/hooks/jobs',
events: ['job.completed', 'job.failed'],
queue: 'emails', // null = all queues
secret: 'hmac-secret', // optional, enables X-Webhook-Signature header
});
// Via CLI
// bunqueue webhook add https://api.example.com/hooks --events job.completed,job.failed
// Via MCP
// bunqueue_add_webhook
Events: job.pushed, job.started, job.completed, job.failed, job.progress, job.stalled
Features:
X-Webhook-Signature header) when secret is setBuilt-in automatic backup to S3-compatible storage.
S3_BACKUP_ENABLED=1
S3_BUCKET=my-bunqueue-backups
S3_ACCESS_KEY_ID=AKIA...
S3_SECRET_ACCESS_KEY=...
S3_REGION=us-east-1
S3_BACKUP_INTERVAL=21600000 # every 6 hours
S3_BACKUP_RETENTION=7 # keep 7 days
# Optional: custom endpoint (MinIO, R2, etc.)
S3_ENDPOINT=https://s3.custom.endpoint
# Start server
bunqueue start --tcp-port 6789 --data-path ./data/queue.db
# Or with env vars
TCP_PORT=6789 BUNQUEUE_DATA_PATH=./data/queue.db bunqueue start
bunqueue includes a native MCP server with 73 tools, 5 resources, and 3 diagnostic prompts. AI agents can manage queues, add/pull jobs, monitor stats, and auto-process jobs via HTTP handlers.
For full setup and tool list, see mcp.md
{
"mcpServers": {
"bunqueue": {
"command": "npx",
"args": ["bunqueue-mcp"],
"env": { "BUNQUEUE_MODE": "embedded" }
}
}
}
bunqueue is largely API-compatible with BullMQ. Replace the import and connection config:
// Before (BullMQ + Redis)
import { Queue, Worker } from 'bullmq';
const queue = new Queue('tasks', { connection: { host: 'redis', port: 6379 } });
// After (bunqueue, no Redis needed)
import { Queue, Worker } from 'bunqueue/client';
const queue = new Queue('tasks', { embedded: true });
Same API: add(), addBulk(), Worker(name, processor, opts), FlowProducer.add(), events, job options. For full migration guide, see examples.md
| Mode | Throughput | Data Loss Risk |
|---|---|---|
| Buffered (default) | ~100k jobs/sec | Up to 10ms |
| Durable | ~10k jobs/sec | None |
| Auto-batch (TCP) | ~145k ops/s concurrent | None |