BullMQ Workers


BullMQ is a powerful job queue system for Node.js applications, and nself provides first-class support for building and deploying BullMQ workers. With automatic Redis configuration, built-in monitoring, and scalable worker templates, you can handle background processing, scheduled jobs, and distributed workflows with ease.

Overview

BullMQ workers in nself provide:

Key Features

  • Distributed Job Processing: Scale workers across multiple containers
  • Job Persistence: Jobs survive crashes and restarts
  • Retry Logic: Automatic retries with exponential backoff
  • Job Prioritization: Handle critical tasks first
  • Scheduled Jobs: Cron-like scheduling and delayed execution
  • Monitoring Dashboard: Built-in UI for job monitoring
  • Error Handling: Comprehensive error tracking and alerting

Common Use Cases

  • Email Processing: Send transactional emails asynchronously
  • Image Processing: Resize, compress, and optimize images
  • Data Import/Export: Handle large CSV imports and exports
  • Report Generation: Create PDFs and analytics reports
  • Webhook Processing: Handle external webhook events
  • Cleanup Tasks: Database cleanup and maintenance jobs

Getting Started

Enable BullMQ Workers

Add BullMQ workers to your .env.local:

# Enable BullMQ workers
BULLMQ_WORKERS=email-worker,image-processor,data-sync

# Optional: Configure Redis connection
BULLMQ_REDIS_HOST=redis
BULLMQ_REDIS_PORT=6379
BULLMQ_REDIS_DB=1

# Optional: Configure concurrency
BULLMQ_EMAIL_CONCURRENCY=5
BULLMQ_IMAGE_CONCURRENCY=3
BULLMQ_DATA_SYNC_CONCURRENCY=1

Generate Worker Structure

# Generate BullMQ workers
nself build

# Start all services including workers
nself up

This creates the following structure:

workers/
├── email-worker/
│   ├── src/
│   │   ├── processors/
│   │   │   └── email.processor.js
│   │   ├── jobs/
│   │   │   └── send-email.job.js
│   │   └── index.js
│   ├── package.json
│   └── Dockerfile
├── image-processor/
└── data-sync/

Worker Examples

Email Worker

// workers/email-worker/src/processors/email.processor.js
import { Worker } from 'bullmq';
import nodemailer from 'nodemailer';

const transporter = nodemailer.createTransporter({
  host: process.env.SMTP_HOST,
  port: process.env.SMTP_PORT,
  secure: process.env.SMTP_SECURE === 'true',
  auth: {
    user: process.env.SMTP_USER,
    pass: process.env.SMTP_PASS,
  },
});

const emailWorker = new Worker('email-queue', async (job) => {
  const { to, subject, html, attachments } = job.data;
  
  try {
    const info = await transporter.sendMail({
      from: process.env.SMTP_FROM,
      to,
      subject,
      html,
      attachments,
    });
    
    console.log('Email sent:', info.messageId);
    return { messageId: info.messageId, status: 'sent' };
  } catch (error) {
    console.error('Email failed:', error);
    throw error;
  }
}, {
  connection: {
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
  },
  concurrency: 5,
});

emailWorker.on('completed', (job, result) => {
  console.log(`Email job ${job.id} completed:, result`);
});

emailWorker.on('failed', (job, err) => {
  console.error(`Email job ${job.id} failed:`, err);
});

export default emailWorker;

Image Processing Worker

// workers/image-processor/src/processors/image.processor.js
import { Worker } from 'bullmq';
import sharp from 'sharp';
import AWS from 'aws-sdk';

const s3 = new AWS.S3({
  endpoint: process.env.MINIO_ENDPOINT,
  accessKeyId: process.env.MINIO_ACCESS_KEY,
  secretAccessKey: process.env.MINIO_SECRET_KEY,
  s3ForcePathStyle: true,
});

const imageWorker = new Worker('image-queue', async (job) => {
  const { imageUrl, sizes, bucket } = job.data;
  
  // Download original image
  const response = await fetch(imageUrl);
  const imageBuffer = await response.buffer();
  
  const results = [];
  
  // Process each size
  for (const size of sizes) {
    const { width, height, suffix } = size;
    
    // Resize image
    const processedBuffer = await sharp(imageBuffer)
      .resize(width, height, { fit: 'cover' })
      .jpeg({ quality: 85 })
      .toBuffer();
    
    // Upload to storage
    const key = `images/${Date.now()}-${suffix}.jpg`;
    await s3.upload({
      Bucket: bucket,
      Key: key,
      Body: processedBuffer,
      ContentType: 'image/jpeg',
    }).promise();
    
    results.push({
      size: `${width}x${height}`,
      url: `/storage/${bucket}/${key}`,
    });
  }
  
  return { processedImages: results };
}, {
  connection: {
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
  },
  concurrency: 3,
});

export default imageWorker;

Job Scheduling

Adding Jobs from Your Application

// In your NestJS service or API
import { Queue } from 'bullmq';

const emailQueue = new Queue('email-queue', {
  connection: {
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
  },
});

// Add immediate job
await emailQueue.add('send-welcome-email', {
  to: 'user@example.com',
  subject: 'Welcome to our platform!',
  html: '<h1>Welcome!</h1><p>Thanks for signing up.</p>',
});

// Add delayed job
await emailQueue.add('send-reminder', {
  to: 'user@example.com',
  subject: 'Don\'t forget to complete your profile',
  html: '<p>Complete your profile to get started.</p>',
}, {
  delay: 24 * 60 * 60 * 1000, // 24 hours
});

// Add recurring job
await emailQueue.add('daily-digest', {
  to: 'user@example.com',
  subject: 'Your daily digest',
  html: '<p>Here\'s what happened today...</p>',
}, {
  repeat: { cron: '0 9 * * *' }, // Every day at 9 AM
});

Priority and Advanced Options

// High priority job
await emailQueue.add('urgent-notification', data, {
  priority: 1, // Higher number = higher priority
});

// Job with retry configuration
await imageQueue.add('process-image', data, {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 2000,
  },
});

// Job with timeout
await dataQueue.add('sync-data', data, {
  timeout: 30000, // 30 seconds
});

Monitoring and Management

Built-in Dashboard

nself automatically configures Bull Dashboard for monitoring:

  • Access: http://localhost:3001/bull
  • Features: View jobs, retry failed jobs, monitor queues
  • Real-time: Live updates of job status

Health Checks

// workers/shared/health.js
import { Queue } from 'bullmq';

export class HealthChecker {
  constructor(queueName) {
    this.queue = new Queue(queueName, {
      connection: {
        host: process.env.REDIS_HOST,
        port: process.env.REDIS_PORT,
      },
    });
  }

  async check() {
    try {
      const waiting = await this.queue.getWaiting();
      const active = await this.queue.getActive();
      const failed = await this.queue.getFailed();
      
      return {
        status: 'healthy',
        stats: {
          waiting: waiting.length,
          active: active.length,
          failed: failed.length,
        },
      };
    } catch (error) {
      return {
        status: 'unhealthy',
        error: error.message,
      };
    }
  }
}

Advanced Patterns

Job Chaining

// Chain jobs together
const processDataWorker = new Worker('process-data', async (job) => {
  const { userId, data } = job.data;
  
  // Process the data
  const processedData = await processUserData(data);
  
  // Chain to next job
  await emailQueue.add('send-processing-complete', {
    to: await getUserEmail(userId),
    subject: 'Data processing complete',
    html: `<p>Your data has been processed successfully.</p>`,
  });
  
  return processedData;
});

Bulk Job Processing

// Add multiple jobs at once
const jobs = users.map(user => ({
  name: 'send-newsletter',
  data: {
    to: user.email,
    subject: 'Weekly Newsletter',
    html: generateNewsletterHTML(user),
  },
}));

await emailQueue.addBulk(jobs);

Flow Control

import { FlowProducer } from 'bullmq';

const flowProducer = new FlowProducer({
  connection: {
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
  },
});

// Create job flow
await flowProducer.add({
  name: 'process-order',
  queueName: 'order-processing',
  data: { orderId: 123 },
  children: [
    {
      name: 'validate-payment',
      queueName: 'payment-validation',
      data: { orderId: 123 },
    },
    {
      name: 'update-inventory',
      queueName: 'inventory-management',
      data: { orderId: 123 },
    },
    {
      name: 'send-confirmation',
      queueName: 'email-queue',
      data: { orderId: 123 },
    },
  ],
});

Configuration and Scaling

Environment Configuration

# Worker-specific configuration
BULLMQ_EMAIL_CONCURRENCY=10
BULLMQ_IMAGE_CONCURRENCY=5
BULLMQ_DATA_CONCURRENCY=2

# Redis configuration
BULLMQ_REDIS_HOST=redis
BULLMQ_REDIS_PORT=6379
BULLMQ_REDIS_PASSWORD=secure_password
BULLMQ_REDIS_DB=1

# Monitoring
BULLMQ_DASHBOARD_ENABLED=true
BULLMQ_DASHBOARD_PORT=3001
BULLMQ_DASHBOARD_AUTH=admin:password123

# Job retention
BULLMQ_COMPLETED_RETENTION=100
BULLMQ_FAILED_RETENTION=50

Horizontal Scaling

# Scale workers independently
BULLMQ_EMAIL_REPLICAS=3
BULLMQ_IMAGE_REPLICAS=2
BULLMQ_DATA_REPLICAS=1

# Rebuild and restart
nself build && nself restart

CLI Commands

Worker Management

# Generate new worker
nself generate bullmq --name pdf-generator

# View worker logs
nself logs bullmq --worker email-worker --follow

# Restart specific worker
nself restart bullmq --worker image-processor

# Scale worker
nself scale bullmq --worker email-worker --replicas 5

Job Management

# View queue status
nself bullmq status --queue email-queue

# Retry failed jobs
nself bullmq retry --queue email-queue --failed

# Clear completed jobs
nself bullmq clean --queue email-queue --completed

# Add test job
nself bullmq add --queue email-queue --data '{"to":"test@example.com"}'

Best Practices

Job Design

  • Idempotent Jobs: Jobs should be safe to retry
  • Small Payloads: Keep job data minimal
  • Timeout Handling: Set appropriate timeouts
  • Progress Updates: Report progress for long-running jobs

Error Handling

  • Graceful Failures: Handle errors without crashing
  • Retry Strategies: Use appropriate retry logic
  • Dead Letter Queues: Handle permanently failed jobs
  • Monitoring: Set up alerts for job failures

Next Steps

Now that you understand BullMQ workers:

BullMQ workers provide a robust foundation for asynchronous processing and background jobs in your nself applications.