Mọi backend production đều cần background jobs — gửi email, generate PDF, retry webhook, scheduled task. Bài này đi qua đầy đủ pattern + code BullMQ production-ready.
Vì sao cần job queue?
3 lý do chính:
- Latency: HTTP request không nên > 1-2 giây. Generate PDF 10 giây → enqueue + trả 202 Accepted ngay.
- Retry: external API (Stripe, SendGrid) có thể fail tạm thời. Queue retry với backoff giúp 99% case eventually success.
- Rate limit: SendGrid limit 100 email/giây. Queue chia đều, không spike.
Setup BullMQ
npm install bullmq ioredis
// queue.js
import { Queue, Worker } from 'bullmq'
import IORedis from 'ioredis'
const connection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null, // BullMQ yêu cầu null
})
// Queue producer (gọi từ HTTP handler)
export const emailQueue = new Queue('email', { connection })
// Worker consumer (chạy process riêng)
export const emailWorker = new Worker('email', async (job) => {
await sendEmail(job.data.to, job.data.subject, job.data.body)
}, {
connection,
concurrency: 10, // 10 job song song
limiter: { max: 100, duration: 1000 }, // rate limit 100/giây
})
Enqueue từ HTTP handler
app.post('/users/:id/welcome', async (req, res) => {
const user = await db.users.findById(req.params.id)
await emailQueue.add('welcome', {
to: user.email,
subject: 'Chào mừng đến Alodev',
body: renderWelcomeTemplate(user),
}, {
attempts: 5, // retry tối đa 5 lần
backoff: { type: 'exponential', delay: 2000 }, // 2s, 4s, 8s, 16s, 32s
removeOnComplete: { count: 1000 }, // giữ 1000 job done
removeOnFail: { count: 5000 }, // giữ 5000 job fail
})
res.status(202).json({ status: 'queued' })
})
Worker với error handling
const worker = new Worker('email', async (job) => {
// Idempotency: check trạng thái trước
const sent = await db.emailLog.findByJobId(job.id)
if (sent) return { status: 'already_sent', id: sent.id }
try {
const result = await sendgrid.send({
to: job.data.to,
from: 'no-reply@alodev.vn',
subject: job.data.subject,
html: job.data.body,
})
await db.emailLog.insert({ job_id: job.id, sg_id: result[0].headers['x-message-id'] })
return { status: 'sent', id: result[0].headers['x-message-id'] }
} catch (err) {
if (err.code === 429) {
// Rate limit từ SendGrid — throw để retry
throw err
}
if (err.code === 400) {
// Email invalid — không retry, log permanent fail
job.discard()
throw new Error(`InvalidEmail: ${err.message}`)
}
throw err
}
}, { connection })
worker.on('completed', (job) => logger.info({ jobId: job.id }, 'done'))
worker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'failed'))
Retry strategy: exponential + jitter
BullMQ backoff: { type: 'exponential', delay: 2000 } cho: 2s, 4s, 8s, 16s, 32s. Đủ cho transient error.
Custom với jitter để tránh thundering herd:
await emailQueue.add('welcome', data, {
attempts: 6,
backoff: {
type: 'custom',
},
})
// Trong Worker setup:
const worker = new Worker('email', handler, {
settings: {
backoffStrategy: (attemptsMade) => {
const base = 2000 * Math.pow(2, attemptsMade)
const jitter = Math.random() * base * 0.3
return Math.min(3600_000, base + jitter) // cap 1h
},
},
})
Dead Letter Queue
Sau attempts exhausted, BullMQ giữ job ở failed state. Anh có thể list, replay manual:
// Move failed jobs sang DLQ riêng
const dlq = new Queue('email-dlq', { connection })
worker.on('failed', async (job, err) => {
if (job.attemptsMade >= job.opts.attempts) {
await dlq.add('failed', { ...job.data, original_id: job.id, error: err.message })
}
})
// Admin UI: list failed jobs
const failed = await emailQueue.getJobs(['failed'], 0, 100)
// Manual retry sau khi fix bug
for (const j of failed) await j.retry()
Scheduled job (delayed)
// Gửi reminder sau 24h
await reminderQueue.add('reminder', { user_id: 42 }, {
delay: 24 * 3600_000,
})
// Cron repeating
await reportQueue.add('daily-report', {}, {
repeat: { pattern: '0 6 * * *', tz: 'Asia/Ho_Chi_Minh' }, // 6h sáng mỗi ngày
})
Lưu ý cron: nếu worker xuống lúc 6h, job mất chạy hôm đó. BullMQ > 5.x có repeat strategy "missed" → catch-up. Hoặc dùng @nestjs/schedule + lock pattern.
Priority + concurrency
// Job priority — số nhỏ chạy trước
await queue.add('vip-job', data, { priority: 1 }) // VIP
await queue.add('normal-job', data, { priority: 100 }) // thường
// Concurrency: số job cùng lúc
const worker = new Worker('email', handler, {
connection,
concurrency: 50, // 50 job parallel trên 1 worker
})
// Scale ngang: chạy nhiều process worker — tổng concurrency = process × per-process concurrency
Monitoring với Bull Board
npm install @bull-board/api @bull-board/express
import { createBullBoard } from '@bull-board/api'
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter.js'
import { ExpressAdapter } from '@bull-board/express'
const expressAdapter = new ExpressAdapter()
expressAdapter.setBasePath('/admin/queues')
createBullBoard({
queues: [new BullMQAdapter(emailQueue), new BullMQAdapter(reminderQueue)],
serverAdapter: expressAdapter,
})
app.use('/admin/queues', requireAdmin, expressAdapter.getRouter())
Dashboard show: pending, active, completed, failed counts; retry, remove, view payload; chart QPS.
5 pitfall thường gặp
- Long-running job block worker: chia nhỏ + checkpoint. Job > 5 phút thì split.
- Memory leak trong handler: stream lớn, không close connection — Node process phình. Restart worker định kỳ (PM2 max-memory-restart).
- Không idempotent: retry = duplicate action. Mọi handler phải idempotent (xem Idempotency keys).
- Không monitor: queue backlog 100k mà không biết → user complain trước anh thấy. Alert khi queue depth > threshold.
- Redis là single point of failure: Sentinel/Cluster + AOF persistence là minimum.
Kết luận
Background jobs là backbone của mọi backend production. BullMQ + Redis là combo chuẩn cho Node.js, đủ chạy hàng triệu job/ngày. Bắt đầu đơn giản, thêm DLQ + monitoring khi growth — đừng over-engineer ngày đầu. Tham khảo Observability để theo dõi job health đúng cách trong production.