BullMQ is a powerful job queue system for Node.js applications, and nself provides first-class support for building and deploying BullMQ workers. With automatic Redis configuration, built-in monitoring, and scalable worker templates, you can handle background processing, scheduled jobs, and distributed workflows with ease.
BullMQ workers in nself provide:
Add BullMQ workers to your .env.local
:
# Enable BullMQ workers
BULLMQ_WORKERS=email-worker,image-processor,data-sync
# Optional: Configure Redis connection
BULLMQ_REDIS_HOST=redis
BULLMQ_REDIS_PORT=6379
BULLMQ_REDIS_DB=1
# Optional: Configure concurrency
BULLMQ_EMAIL_CONCURRENCY=5
BULLMQ_IMAGE_CONCURRENCY=3
BULLMQ_DATA_SYNC_CONCURRENCY=1
# Generate BullMQ workers
nself build
# Start all services including workers
nself up
This creates the following structure:
workers/
├── email-worker/
│ ├── src/
│ │ ├── processors/
│ │ │ └── email.processor.js
│ │ ├── jobs/
│ │ │ └── send-email.job.js
│ │ └── index.js
│ ├── package.json
│ └── Dockerfile
├── image-processor/
└── data-sync/
// workers/email-worker/src/processors/email.processor.js
import { Worker } from 'bullmq';
import nodemailer from 'nodemailer';
const transporter = nodemailer.createTransporter({
host: process.env.SMTP_HOST,
port: process.env.SMTP_PORT,
secure: process.env.SMTP_SECURE === 'true',
auth: {
user: process.env.SMTP_USER,
pass: process.env.SMTP_PASS,
},
});
const emailWorker = new Worker('email-queue', async (job) => {
const { to, subject, html, attachments } = job.data;
try {
const info = await transporter.sendMail({
from: process.env.SMTP_FROM,
to,
subject,
html,
attachments,
});
console.log('Email sent:', info.messageId);
return { messageId: info.messageId, status: 'sent' };
} catch (error) {
console.error('Email failed:', error);
throw error;
}
}, {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
concurrency: 5,
});
emailWorker.on('completed', (job, result) => {
console.log(`Email job ${job.id} completed:, result`);
});
emailWorker.on('failed', (job, err) => {
console.error(`Email job ${job.id} failed:`, err);
});
export default emailWorker;
// workers/image-processor/src/processors/image.processor.js
import { Worker } from 'bullmq';
import sharp from 'sharp';
import AWS from 'aws-sdk';
const s3 = new AWS.S3({
endpoint: process.env.MINIO_ENDPOINT,
accessKeyId: process.env.MINIO_ACCESS_KEY,
secretAccessKey: process.env.MINIO_SECRET_KEY,
s3ForcePathStyle: true,
});
const imageWorker = new Worker('image-queue', async (job) => {
const { imageUrl, sizes, bucket } = job.data;
// Download original image
const response = await fetch(imageUrl);
const imageBuffer = await response.buffer();
const results = [];
// Process each size
for (const size of sizes) {
const { width, height, suffix } = size;
// Resize image
const processedBuffer = await sharp(imageBuffer)
.resize(width, height, { fit: 'cover' })
.jpeg({ quality: 85 })
.toBuffer();
// Upload to storage
const key = `images/${Date.now()}-${suffix}.jpg`;
await s3.upload({
Bucket: bucket,
Key: key,
Body: processedBuffer,
ContentType: 'image/jpeg',
}).promise();
results.push({
size: `${width}x${height}`,
url: `/storage/${bucket}/${key}`,
});
}
return { processedImages: results };
}, {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
concurrency: 3,
});
export default imageWorker;
// In your NestJS service or API
import { Queue } from 'bullmq';
const emailQueue = new Queue('email-queue', {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
});
// Add immediate job
await emailQueue.add('send-welcome-email', {
to: 'user@example.com',
subject: 'Welcome to our platform!',
html: '<h1>Welcome!</h1><p>Thanks for signing up.</p>',
});
// Add delayed job
await emailQueue.add('send-reminder', {
to: 'user@example.com',
subject: 'Don\'t forget to complete your profile',
html: '<p>Complete your profile to get started.</p>',
}, {
delay: 24 * 60 * 60 * 1000, // 24 hours
});
// Add recurring job
await emailQueue.add('daily-digest', {
to: 'user@example.com',
subject: 'Your daily digest',
html: '<p>Here\'s what happened today...</p>',
}, {
repeat: { cron: '0 9 * * *' }, // Every day at 9 AM
});
// High priority job
await emailQueue.add('urgent-notification', data, {
priority: 1, // Higher number = higher priority
});
// Job with retry configuration
await imageQueue.add('process-image', data, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
});
// Job with timeout
await dataQueue.add('sync-data', data, {
timeout: 30000, // 30 seconds
});
nself automatically configures Bull Dashboard for monitoring:
http://localhost:3001/bull
// workers/shared/health.js
import { Queue } from 'bullmq';
export class HealthChecker {
constructor(queueName) {
this.queue = new Queue(queueName, {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
});
}
async check() {
try {
const waiting = await this.queue.getWaiting();
const active = await this.queue.getActive();
const failed = await this.queue.getFailed();
return {
status: 'healthy',
stats: {
waiting: waiting.length,
active: active.length,
failed: failed.length,
},
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message,
};
}
}
}
// Chain jobs together
const processDataWorker = new Worker('process-data', async (job) => {
const { userId, data } = job.data;
// Process the data
const processedData = await processUserData(data);
// Chain to next job
await emailQueue.add('send-processing-complete', {
to: await getUserEmail(userId),
subject: 'Data processing complete',
html: `<p>Your data has been processed successfully.</p>`,
});
return processedData;
});
// Add multiple jobs at once
const jobs = users.map(user => ({
name: 'send-newsletter',
data: {
to: user.email,
subject: 'Weekly Newsletter',
html: generateNewsletterHTML(user),
},
}));
await emailQueue.addBulk(jobs);
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
});
// Create job flow
await flowProducer.add({
name: 'process-order',
queueName: 'order-processing',
data: { orderId: 123 },
children: [
{
name: 'validate-payment',
queueName: 'payment-validation',
data: { orderId: 123 },
},
{
name: 'update-inventory',
queueName: 'inventory-management',
data: { orderId: 123 },
},
{
name: 'send-confirmation',
queueName: 'email-queue',
data: { orderId: 123 },
},
],
});
# Worker-specific configuration
BULLMQ_EMAIL_CONCURRENCY=10
BULLMQ_IMAGE_CONCURRENCY=5
BULLMQ_DATA_CONCURRENCY=2
# Redis configuration
BULLMQ_REDIS_HOST=redis
BULLMQ_REDIS_PORT=6379
BULLMQ_REDIS_PASSWORD=secure_password
BULLMQ_REDIS_DB=1
# Monitoring
BULLMQ_DASHBOARD_ENABLED=true
BULLMQ_DASHBOARD_PORT=3001
BULLMQ_DASHBOARD_AUTH=admin:password123
# Job retention
BULLMQ_COMPLETED_RETENTION=100
BULLMQ_FAILED_RETENTION=50
# Scale workers independently
BULLMQ_EMAIL_REPLICAS=3
BULLMQ_IMAGE_REPLICAS=2
BULLMQ_DATA_REPLICAS=1
# Rebuild and restart
nself build && nself restart
# Generate new worker
nself generate bullmq --name pdf-generator
# View worker logs
nself logs bullmq --worker email-worker --follow
# Restart specific worker
nself restart bullmq --worker image-processor
# Scale worker
nself scale bullmq --worker email-worker --replicas 5
# View queue status
nself bullmq status --queue email-queue
# Retry failed jobs
nself bullmq retry --queue email-queue --failed
# Clear completed jobs
nself bullmq clean --queue email-queue --completed
# Add test job
nself bullmq add --queue email-queue --data '{"to":"test@example.com"}'
Now that you understand BullMQ workers:
BullMQ workers provide a robust foundation for asynchronous processing and background jobs in your nself applications.