From api-batch-processor
Implements batch processing infrastructure with Bull/BullMQ job queues, progress tracking, transaction management, error recovery, and RESTful API endpoints for bulk operations.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin api-batch-processor<!-- DESIGN DECISIONS -->
<!-- Batch processing enables efficient handling of large-scale operations that would
otherwise overwhelm synchronous APIs. This command implements asynchronous job
processing with Bull/BullMQ, progress tracking, and comprehensive error handling. -->
<!-- ALTERNATIVES CONSIDERED -->
<!-- Synchronous batch processing: Rejected due to timeout issues with large batches
Simple array iteration: Rejected as it lacks progress tracking and failure recovery
Database-only bulk operations: Rejected as they don't handle business logic validation -->
# Imp.../batch-implementImplements multiple features sequentially from a file or GitHub issues (--issues <nums>), or resumes batch (--resume <id>). Tracks progress with persistent state for crash recovery.
/queue-setupRuns interactive wizard to set up Cloudflare Queues: creates queue via wrangler, configures producer/consumer bindings and DLQ in Worker, generates example code.
/implement-eventsGenerates event-driven API architecture with publishers, subscribers, message broker integrations, and async patterns for microservices and distributed systems.
/queueDesigns, builds, and debugs message queue and job processing systems (Kafka, RabbitMQ, SQS, BullMQ, Celery) with retries, DLQs, workers, scaling, monitoring. Outputs configs, workers, retry policies.
/create-webhook-handlerGenerates secure webhook endpoints with signature verification, idempotency, retry handling, event processors, middleware, routes, schemas, config, and tests.
/implement-throttlingImplements API throttling and quotas with token bucket + sliding window algorithms, generating middleware, services, usage models, config, management API, and metrics.
Share bugs, ideas, or general feedback.
Creates high-performance batch API processing infrastructure for handling bulk operations efficiently. Implements job queues with Bull/BullMQ, real-time progress tracking, transaction management, and intelligent error recovery. Supports millions of records with optimal resource utilization.
Use this command when:
Do NOT use this command for:
Before running this command, ensure:
The command examines your data processing needs:
Sets up Bull/BullMQ for reliable job processing:
Implements RESTful endpoints for batch operations:
Creates efficient batch processing workflows:
Integrates comprehensive monitoring:
The command generates a complete batch processing system:
batch-processing/
├── src/
│ ├── queues/
│ │ ├── batch-queue.js
│ │ ├── workers/
│ │ │ ├── batch-processor.js
│ │ │ └── chunk-worker.js
│ │ └── jobs/
│ │ ├── import-job.js
│ │ └── export-job.js
│ ├── api/
│ │ ├── batch-controller.js
│ │ └── batch-routes.js
│ ├── services/
│ │ ├── batch-service.js
│ │ ├── validation-service.js
│ │ └── transaction-manager.js
│ └── utils/
│ ├── chunking.js
│ └── progress-tracker.js
├── config/
│ └── queue-config.js
├── tests/
│ └── batch-processing.test.js
└── docs/
└── batch-api.md
Scenario: Import 100,000 users from CSV with validation and deduplication
Generated Implementation:
// queues/batch-queue.js
import Queue from 'bull';
import Redis from 'ioredis';
const batchQueue = new Queue('batch-processing', {
redis: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 500,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
}
});
// api/batch-controller.js
class BatchController {
async createBatchJob(req, res) {
const { type, data, options = {} } = req.body;
// Validate batch request
if (!this.validateBatchRequest(type, data)) {
return res.status(400).json({
error: 'Invalid batch request'
});
}
// Create job with unique ID
const jobId = `${type}-${Date.now()}-${uuidv4()}`;
const job = await batchQueue.add(type, {
data,
userId: req.user.id,
options: {
chunkSize: options.chunkSize || 1000,
validateBeforeProcess: options.validate !== false,
stopOnError: options.stopOnError || false,
...options
}
}, {
jobId,
priority: options.priority || 0
});
// Return job information
return res.status(202).json({
jobId: job.id,
status: 'queued',
estimatedTime: this.estimateProcessingTime(data.length),
statusUrl: `/api/batch/jobs/${job.id}`,
resultsUrl: `/api/batch/jobs/${job.id}/results`
});
}
async getJobStatus(req, res) {
const { jobId } = req.params;
const job = await batchQueue.getJob(jobId);
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
const state = await job.getState();
const progress = job.progress();
return res.json({
jobId: job.id,
status: state,
progress: {
percentage: progress.percentage || 0,
processed: progress.processed || 0,
total: progress.total || 0,
successful: progress.successful || 0,
failed: progress.failed || 0,
currentChunk: progress.currentChunk || 0,
totalChunks: progress.totalChunks || 0
},
startedAt: job.processedOn,
completedAt: job.finishedOn,
error: job.failedReason,
result: state === 'completed' ? job.returnvalue : null
});
}
}
// workers/batch-processor.js
class BatchProcessor {
constructor() {
this.initializeWorker();
}
initializeWorker() {
batchQueue.process('user-import', async (job) => {
const { data, options } = job.data;
const chunks = this.chunkArray(data, options.chunkSize);
const results = {
successful: [],
failed: [],
skipped: []
};
// Update initial progress
await job.progress({
percentage: 0,
total: data.length,
totalChunks: chunks.length,
processed: 0,
successful: 0,
failed: 0
});
// Process chunks sequentially
for (let i = 0; i < chunks.length; i++) {
const chunk = chunks[i];
try {
// Process chunk in transaction
const chunkResults = await this.processChunk(
chunk,
options,
job
);
results.successful.push(...chunkResults.successful);
results.failed.push(...chunkResults.failed);
results.skipped.push(...chunkResults.skipped);
// Update progress
const processed = (i + 1) * options.chunkSize;
await job.progress({
percentage: Math.min(100, (processed / data.length) * 100),
processed: Math.min(processed, data.length),
total: data.length,
successful: results.successful.length,
failed: results.failed.length,
currentChunk: i + 1,
totalChunks: chunks.length
});
// Check if should stop on error
if (options.stopOnError && results.failed.length > 0) {
break;
}
} catch (error) {
console.error(`Chunk ${i} failed:`, error);
if (options.stopOnError) {
throw error;
}
// Mark entire chunk as failed
chunk.forEach(item => {
results.failed.push({
data: item,
error: error.message
});
});
}
}
// Store results for retrieval
await this.storeResults(job.id, results);
return {
summary: {
total: data.length,
successful: results.successful.length,
failed: results.failed.length,
skipped: results.skipped.length
},
resultsId: job.id
};
});
}
async processChunk(chunk, options, job) {
const results = {
successful: [],
failed: [],
skipped: []
};
// Start database transaction
const trx = await db.transaction();
try {
for (const item of chunk) {
try {
// Validate if required
if (options.validateBeforeProcess) {
const validation = await this.validateUser(item);
if (!validation.valid) {
results.failed.push({
data: item,
errors: validation.errors
});
continue;
}
}
// Check for duplicates
const existing = await trx('users')
.where('email', item.email)
.first();
if (existing) {
if (options.skipDuplicates) {
results.skipped.push({
data: item,
reason: 'Duplicate email'
});
continue;
} else if (options.updateDuplicates) {
await trx('users')
.where('email', item.email)
.update(item);
results.successful.push({
action: 'updated',
id: existing.id,
data: item
});
continue;
}
}
// Insert new user
const [userId] = await trx('users').insert({
...item,
created_at: new Date(),
batch_job_id: job.id
});
results.successful.push({
action: 'created',
id: userId,
data: item
});
} catch (error) {
results.failed.push({
data: item,
error: error.message
});
}
}
// Commit transaction
await trx.commit();
} catch (error) {
await trx.rollback();
throw error;
}
return results;
}
chunkArray(array, size) {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}
Scenario: Export millions of records with streaming and compression
Generated Streaming Export:
// services/export-service.js
import { Transform } from 'stream';
import zlib from 'zlib';
class ExportService {
async createExportJob(query, format, options) {
const job = await batchQueue.add('data-export', {
query,
format,
options
});
return job;
}
async processExportJob(job) {
const { query, format, options } = job.data;
// Create export stream
const exportStream = this.createExportStream(query, format);
const outputPath = `/tmp/exports/${job.id}.${format}.gz`;
// Create compression stream
const gzip = zlib.createGzip();
const writeStream = fs.createWriteStream(outputPath);
let recordCount = 0;
let errorCount = 0;
return new Promise((resolve, reject) => {
exportStream
.pipe(new Transform({
transform(chunk, encoding, callback) {
recordCount++;
// Update progress every 1000 records
if (recordCount % 1000 === 0) {
job.progress({
processed: recordCount,
percentage: Math.min(100, (recordCount / options.estimatedTotal) * 100)
});
}
callback(null, chunk);
}
}))
.pipe(gzip)
.pipe(writeStream)
.on('finish', async () => {
// Upload to storage
const url = await this.uploadToStorage(outputPath, job.id);
resolve({
recordCount,
errorCount,
downloadUrl: url,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000)
});
})
.on('error', reject);
});
}
createExportStream(query, format) {
const stream = db.raw(query).stream();
switch (format) {
case 'csv':
return stream.pipe(this.createCSVTransform());
case 'json':
return stream.pipe(this.createJSONTransform());
case 'ndjson':
return stream.pipe(this.createNDJSONTransform());
default:
throw new Error(`Unsupported format: ${format}`);
}
}
}
Scenario: Process API calls with rate limiting and retry logic
Generated Rate-Limited Processor:
// workers/rate-limited-processor.js
import Bottleneck from 'bottleneck';
class RateLimitedProcessor {
constructor() {
// Configure rate limiter: 10 requests per second
this.limiter = new Bottleneck({
maxConcurrent: 5,
minTime: 100 // 100ms between requests
});
}
async processBatch(job) {
const { items, apiEndpoint, options } = job.data;
const results = [];
// Process items with rate limiting
const promises = items.map((item, index) =>
this.limiter.schedule(async () => {
try {
const result = await this.callAPI(apiEndpoint, item);
// Update progress
await job.progress({
processed: index + 1,
total: items.length,
percentage: ((index + 1) / items.length) * 100
});
return { success: true, data: result };
} catch (error) {
return {
success: false,
error: error.message,
item
};
}
})
);
const results = await Promise.all(promises);
return {
successful: results.filter(r => r.success).length,
failed: results.filter(r => !r.success),
total: items.length
};
}
}
Symptoms: Jobs not processing, Redis connection errors Cause: Redis server unavailable or misconfigured Solution:
batchQueue.on('error', (error) => {
console.error('Queue error:', error);
// Implement fallback or alerting
});
Prevention: Implement Redis Sentinel or cluster for high availability
Symptoms: Process crashes with heap out of memory Cause: Processing chunks too large for available memory Solution: Reduce chunk size and implement streaming
Symptoms: Batch processing hangs or fails with deadlock errors Cause: Concurrent transactions competing for same resources Solution: Implement retry logic with exponential backoff
--chunk-size/batch --chunk-size 500--concurrency/batch --concurrency 10--retry-attempts/batch --retry-attempts 5✅ DO:
❌ DON'T:
💡 TIPS:
/api-rate-limiter - Implement API rate limiting/api-event-emitter - Event-driven processing/api-monitoring-dashboard - Monitor batch jobs/database-bulk-operations - Database-level batch operations⚠️ Security Considerations:
Solution: Check worker processes and Redis connectivity
Solution: Increase chunk size and worker concurrency
Solution: Review validation logic and add retry mechanisms
Last updated: 2025-10-11 Quality score: 9.5/10 Tested with: Bull 4.x, BullMQ 3.x, Redis 7.0