bullmq
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBullMQ
BullMQ
Redis-backed queue system for Node.js. Four core classes: , , , .
QueueWorkerQueueEventsFlowProducer基于Redis的Node.js队列系统。包含四个核心类:、、、。
QueueWorkerQueueEventsFlowProducerTable of Contents
目录
Install
安装
yarn add bullmqmaxmemory-policy=noevictionyarn add bullmqmaxmemory-policy=noevictionQuick Start
快速开始
ts
import { Queue, Worker, QueueEvents } from "bullmq";
// --- Producer ---
const queue = new Queue("my-queue", {
connection: { host: "localhost", port: 6379 },
});
await queue.add("job-name", { foo: "bar" });
// --- Consumer ---
const worker = new Worker(
"my-queue",
async (job) => {
// process job
await job.updateProgress(50);
return { result: "done" };
},
{ connection: { host: "localhost", port: 6379 } },
);
worker.on("completed", (job, returnvalue) => {
console.log(`${job.id} completed with`, returnvalue);
});
worker.on("failed", (job, err) => {
console.error(`${job.id} failed with`, err.message);
});
// IMPORTANT: always attach an error handler
worker.on("error", (err) => {
console.error(err);
});
// --- Global event listener (all workers) ---
const queueEvents = new QueueEvents("my-queue", {
connection: { host: "localhost", port: 6379 },
});
queueEvents.on("completed", ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed`);
});
queueEvents.on("failed", ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});ts
import { Queue, Worker, QueueEvents } from "bullmq";
// --- 生产者 ---
const queue = new Queue("my-queue", {
connection: { host: "localhost", port: 6379 },
});
await queue.add("job-name", { foo: "bar" });
// --- 消费者 ---
const worker = new Worker(
"my-queue",
async (job) => {
// 处理作业
await job.updateProgress(50);
return { result: "done" };
},
{ connection: { host: "localhost", port: 6379 } },
);
worker.on("completed", (job, returnvalue) => {
console.log(`${job.id} 处理完成,结果为`, returnvalue);
});
worker.on("failed", (job, err) => {
console.error(`${job.id} 处理失败,错误为`, err.message);
});
// 重要提示:务必添加错误处理程序
worker.on("error", (err) => {
console.error(err);
});
// --- 全局事件监听器(所有工作器) ---
const queueEvents = new QueueEvents("my-queue", {
connection: { host: "localhost", port: 6379 },
});
queueEvents.on("completed", ({ jobId, returnvalue }) => {
console.log(`作业 ${jobId} 处理完成`);
});
queueEvents.on("failed", ({ jobId, failedReason }) => {
console.error(`作业 ${jobId} 处理失败:${failedReason}`);
});Connections
连接
BullMQ uses ioredis internally. Pass options or an existing ioredis instance.
connectionts
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
// Option 1: connection config (new connection per instance)
const queue = new Queue("q", {
connection: { host: "redis.example.com", port: 6379 },
});
// Option 2: reuse ioredis instance (Queue and multiple Queues can share)
const connection = new Redis();
const q1 = new Queue("q1", { connection });
const q2 = new Queue("q2", { connection });
// Option 3: reuse for Workers (BullMQ internally duplicates for blocking)
const workerConn = new Redis({ maxRetriesPerRequest: null });
const w1 = new Worker("q1", async (job) => {}, { connection: workerConn });Critical rules:
- Workers REQUIRE on the ioredis instance. BullMQ enforces this and will warn/throw if not set.
maxRetriesPerRequest: null - Do NOT use ioredis option — use BullMQ's
keyPrefixoption instead.prefix - cannot share connections (uses blocking Redis commands).
QueueEvents - Redis MUST have .
maxmemory-policy=noeviction
BullMQ内部使用ioredis。可以传入选项或已有的ioredis实例。
connectionts
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
// 选项1:连接配置(每个实例创建新连接)
const queue = new Queue("q", {
connection: { host: "redis.example.com", port: 6379 },
});
// 选项2:复用ioredis实例(Queue和多个Queue可以共享)
const connection = new Redis();
const q1 = new Queue("q1", { connection });
const q2 = new Queue("q2", { connection });
// 选项3:为Workers复用连接(BullMQ内部会为阻塞操作复制连接)
const workerConn = new Redis({ maxRetriesPerRequest: null });
const w1 = new Worker("q1", async (job) => {}, { connection: workerConn });关键规则:
- Workers的ioredis实例必须设置。BullMQ会强制执行此设置,若未设置会发出警告或抛出错误。
maxRetriesPerRequest: null - 不要使用ioredis的选项 — 请改用BullMQ的
keyPrefix选项。prefix - 无法共享连接(使用Redis阻塞命令)。
QueueEvents - Redis必须设置。
maxmemory-policy=noeviction
Queue
队列
ts
const queue = new Queue("paint", { connection });
// Add a job
await queue.add("job-name", { color: "red" });
// Add with options
await queue.add(
"job-name",
{ color: "blue" },
{
delay: 5000, // wait 5s before processing
priority: 1, // lower = higher priority (0 is highest, max 2^21)
attempts: 3, // retry up to 3 times
backoff: { type: "exponential", delay: 1000 },
removeOnComplete: true, // or { count: 100 } to keep last 100
removeOnFail: 1000, // keep last 1000 failed jobs
},
);
// Add bulk
await queue.addBulk([
{ name: "job1", data: { x: 1 } },
{ name: "job2", data: { x: 2 }, opts: { priority: 1 } },
]);
// Queue operations
await queue.pause();
await queue.resume();
await queue.obliterate({ force: true }); // remove all data
await queue.close();ts
const queue = new Queue("paint", { connection });
// 添加作业
await queue.add("job-name", { color: "red" });
// 带选项添加作业
await queue.add(
"job-name",
{ color: "blue" },
{
delay: 5000, // 等待5秒后处理
priority: 1, // 值越小优先级越高(0为最高,最大值2^21)
attempts: 3, // 最多重试3次
backoff: { type: "exponential", delay: 1000 },
removeOnComplete: true, // 或设置{ count: 100 }保留最后100个完成的作业
removeOnFail: 1000, // 保留最后1000个失败的作业
},
);
// 批量添加作业
await queue.addBulk([
{ name: "job1", data: { x: 1 } },
{ name: "job2", data: { x: 2 }, opts: { priority: 1 } },
]);
// 队列操作
await queue.pause();
await queue.resume();
await queue.obliterate({ force: true }); // 删除所有数据
await queue.close();Worker
工作器
ts
const worker = new Worker<MyData, MyReturn>(
"paint",
async (job) => {
await job.updateProgress(42);
return { cost: 100 };
},
{
connection,
concurrency: 5, // process 5 jobs concurrently
autorun: false, // don't start immediately
},
);
worker.run(); // start when ready
// Update concurrency at runtime
worker.concurrency = 10;Processor receives 3 args: — signal is an for cancellation support.
(job, token?, signal?)AbortSignalts
const worker = new Worker<MyData, MyReturn>(
"paint",
async (job) => {
await job.updateProgress(42);
return { cost: 100 };
},
{
connection,
concurrency: 5, // 同时处理5个作业
autorun: false, // 不立即启动
},
);
worker.run(); // 准备就绪后启动
// 运行时更新并发数
worker.concurrency = 10;处理器接收3个参数: — signal是用于取消支持的。
(job, token?, signal?)AbortSignalTypeScript Generics
TypeScript泛型
ts
interface JobData {
color: string;
}
interface JobReturn {
cost: number;
}
const queue = new Queue<JobData, JobReturn>("paint");
const worker = new Worker<JobData, JobReturn>("paint", async (job) => {
// job.data is typed as JobData
return { cost: 100 }; // must match JobReturn
});ts
interface JobData {
color: string;
}
interface JobReturn {
cost: number;
}
const queue = new Queue<JobData, JobReturn>("paint");
const worker = new Worker<JobData, JobReturn>("paint", async (job) => {
// job.data会被推断为JobData类型
return { cost: 100 }; // 必须匹配JobReturn类型
});Events
事件
Worker events (local to that worker instance):
| Event | Callback signature |
|---|---|
| |
| |
| |
| |
| |
QueueEvents (global, all workers, uses Redis Streams):
| Event | Callback signature |
|---|---|
| |
| |
| |
| |
| |
| |
| |
Event stream is auto-trimmed (~10,000 events). Configure via .
streams.events.maxLenWorker事件(仅针对当前Worker实例):
| 事件名称 | 回调签名 |
|---|---|
| |
| |
| |
| |
| |
QueueEvents(全局事件,针对所有Workers,使用Redis Streams):
| 事件名称 | 回调签名 |
|---|---|
| |
| |
| |
| |
| |
| |
| |
事件流会自动修剪(约保留10,000个事件)。可通过配置。
streams.events.maxLenJob Lifecycle States
作业生命周期状态
add() → wait / prioritized / delayed
↓
active → completed
↓
failed → (retry) → wait/delayedWith FlowProducer: jobs can also be in state until all children complete.
waiting-childrenadd() → 等待 / 优先级队列 / 延迟
↓
活跃 → 完成
↓
失败 → (重试) → 等待/延迟使用FlowProducer时:作业还会处于状态,直到所有子作业完成。
waiting-childrenAdvanced Topics
高级主题
- Job types and options (delayed, prioritized, deduplication, repeatable): See references/job-types-and-options.md
- Flows and schedulers (FlowProducer, parent-child, job schedulers, cron): See references/flows-and-schedulers.md
- Patterns (step jobs, idempotent, throttle, manual rate-limit): See references/patterns.md
- Production (shutdown, Redis config, retries, backoff, monitoring): See references/production.md
- 作业类型与选项(延迟、优先级、去重、可重复):详见 references/job-types-and-options.md
- 流程与调度器(FlowProducer、父子作业、作业调度器、cron):详见 references/flows-and-schedulers.md
- 设计模式(分步作业、幂等、节流、手动速率限制):详见 references/patterns.md
- 生产环境配置(停机、Redis配置、重试、退避、监控):详见 references/production.md