Bỏ qua đến nội dung chính
background jobsqueueworkerBullMQRedis

Background Jobs: Queue, Worker, Retry — Hướng Dẫn Đầy Đủ

Background jobs queue worker thực chiến: BullMQ, retry với exponential backoff, dead letter queue, scheduled job. Code Node.js production-ready.

Xuất bản 9 phút đọc

Mọi backend production đều cần background jobs — gửi email, generate PDF, retry webhook, scheduled task. Bài này đi qua đầy đủ pattern + code BullMQ production-ready.

Vì sao cần job queue?

3 lý do chính:

  • Latency: HTTP request không nên > 1-2 giây. Generate PDF 10 giây → enqueue + trả 202 Accepted ngay.
  • Retry: external API (Stripe, SendGrid) có thể fail tạm thời. Queue retry với backoff giúp 99% case eventually success.
  • Rate limit: SendGrid limit 100 email/giây. Queue chia đều, không spike.

Setup BullMQ

npm install bullmq ioredis
// queue.js
import { Queue, Worker } from 'bullmq'
import IORedis from 'ioredis'

const connection = new IORedis(process.env.REDIS_URL, {
  maxRetriesPerRequest: null,  // BullMQ yêu cầu null
})

// Queue producer (gọi từ HTTP handler)
export const emailQueue = new Queue('email', { connection })

// Worker consumer (chạy process riêng)
export const emailWorker = new Worker('email', async (job) => {
  await sendEmail(job.data.to, job.data.subject, job.data.body)
}, {
  connection,
  concurrency: 10,                          // 10 job song song
  limiter: { max: 100, duration: 1000 },    // rate limit 100/giây
})

Enqueue từ HTTP handler

app.post('/users/:id/welcome', async (req, res) => {
  const user = await db.users.findById(req.params.id)

  await emailQueue.add('welcome', {
    to: user.email,
    subject: 'Chào mừng đến Alodev',
    body: renderWelcomeTemplate(user),
  }, {
    attempts: 5,                                  // retry tối đa 5 lần
    backoff: { type: 'exponential', delay: 2000 }, // 2s, 4s, 8s, 16s, 32s
    removeOnComplete: { count: 1000 },             // giữ 1000 job done
    removeOnFail: { count: 5000 },                 // giữ 5000 job fail
  })

  res.status(202).json({ status: 'queued' })
})

Worker với error handling

const worker = new Worker('email', async (job) => {
  // Idempotency: check trạng thái trước
  const sent = await db.emailLog.findByJobId(job.id)
  if (sent) return { status: 'already_sent', id: sent.id }

  try {
    const result = await sendgrid.send({
      to: job.data.to,
      from: 'no-reply@alodev.vn',
      subject: job.data.subject,
      html: job.data.body,
    })
    await db.emailLog.insert({ job_id: job.id, sg_id: result[0].headers['x-message-id'] })
    return { status: 'sent', id: result[0].headers['x-message-id'] }
  } catch (err) {
    if (err.code === 429) {
      // Rate limit từ SendGrid — throw để retry
      throw err
    }
    if (err.code === 400) {
      // Email invalid — không retry, log permanent fail
      job.discard()
      throw new Error(`InvalidEmail: ${err.message}`)
    }
    throw err
  }
}, { connection })

worker.on('completed', (job) => logger.info({ jobId: job.id }, 'done'))
worker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'failed'))

Retry strategy: exponential + jitter

BullMQ backoff: { type: 'exponential', delay: 2000 } cho: 2s, 4s, 8s, 16s, 32s. Đủ cho transient error.

Custom với jitter để tránh thundering herd:

await emailQueue.add('welcome', data, {
  attempts: 6,
  backoff: {
    type: 'custom',
  },
})

// Trong Worker setup:
const worker = new Worker('email', handler, {
  settings: {
    backoffStrategy: (attemptsMade) => {
      const base = 2000 * Math.pow(2, attemptsMade)
      const jitter = Math.random() * base * 0.3
      return Math.min(3600_000, base + jitter)  // cap 1h
    },
  },
})

Dead Letter Queue

Sau attempts exhausted, BullMQ giữ job ở failed state. Anh có thể list, replay manual:

// Move failed jobs sang DLQ riêng
const dlq = new Queue('email-dlq', { connection })
worker.on('failed', async (job, err) => {
  if (job.attemptsMade >= job.opts.attempts) {
    await dlq.add('failed', { ...job.data, original_id: job.id, error: err.message })
  }
})

// Admin UI: list failed jobs
const failed = await emailQueue.getJobs(['failed'], 0, 100)
// Manual retry sau khi fix bug
for (const j of failed) await j.retry()

Scheduled job (delayed)

// Gửi reminder sau 24h
await reminderQueue.add('reminder', { user_id: 42 }, {
  delay: 24 * 3600_000,
})

// Cron repeating
await reportQueue.add('daily-report', {}, {
  repeat: { pattern: '0 6 * * *', tz: 'Asia/Ho_Chi_Minh' },  // 6h sáng mỗi ngày
})

Lưu ý cron: nếu worker xuống lúc 6h, job mất chạy hôm đó. BullMQ > 5.x có repeat strategy "missed" → catch-up. Hoặc dùng @nestjs/schedule + lock pattern.

Priority + concurrency

// Job priority — số nhỏ chạy trước
await queue.add('vip-job', data, { priority: 1 })       // VIP
await queue.add('normal-job', data, { priority: 100 })  // thường

// Concurrency: số job cùng lúc
const worker = new Worker('email', handler, {
  connection,
  concurrency: 50,        // 50 job parallel trên 1 worker
})

// Scale ngang: chạy nhiều process worker — tổng concurrency = process × per-process concurrency

Monitoring với Bull Board

npm install @bull-board/api @bull-board/express
import { createBullBoard } from '@bull-board/api'
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter.js'
import { ExpressAdapter } from '@bull-board/express'

const expressAdapter = new ExpressAdapter()
expressAdapter.setBasePath('/admin/queues')
createBullBoard({
  queues: [new BullMQAdapter(emailQueue), new BullMQAdapter(reminderQueue)],
  serverAdapter: expressAdapter,
})

app.use('/admin/queues', requireAdmin, expressAdapter.getRouter())

Dashboard show: pending, active, completed, failed counts; retry, remove, view payload; chart QPS.

5 pitfall thường gặp

  • Long-running job block worker: chia nhỏ + checkpoint. Job > 5 phút thì split.
  • Memory leak trong handler: stream lớn, không close connection — Node process phình. Restart worker định kỳ (PM2 max-memory-restart).
  • Không idempotent: retry = duplicate action. Mọi handler phải idempotent (xem Idempotency keys).
  • Không monitor: queue backlog 100k mà không biết → user complain trước anh thấy. Alert khi queue depth > threshold.
  • Redis là single point of failure: Sentinel/Cluster + AOF persistence là minimum.

Kết luận

Background jobs là backbone của mọi backend production. BullMQ + Redis là combo chuẩn cho Node.js, đủ chạy hàng triệu job/ngày. Bắt đầu đơn giản, thêm DLQ + monitoring khi growth — đừng over-engineer ngày đầu. Tham khảo Observability để theo dõi job health đúng cách trong production.

Zalo