bullmq

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

BullMQ

BullMQ

Redis-backed queue system for Node.js. Four core classes:
Queue
,
Worker
,
QueueEvents
,
FlowProducer
.
基于Redis的Node.js队列系统。包含四个核心类:
Queue
Worker
QueueEvents
FlowProducer

Table of Contents

目录

Install

安装

yarn add bullmq
— requires Redis 5.0+ with
maxmemory-policy=noeviction
.
yarn add bullmq
— 要求Redis 5.0+版本,且
maxmemory-policy=noeviction

Quick Start

快速开始

ts
import { Queue, Worker, QueueEvents } from "bullmq";

// --- Producer ---
const queue = new Queue("my-queue", {
  connection: { host: "localhost", port: 6379 },
});

await queue.add("job-name", { foo: "bar" });

// --- Consumer ---
const worker = new Worker(
  "my-queue",
  async (job) => {
    // process job
    await job.updateProgress(50);
    return { result: "done" };
  },
  { connection: { host: "localhost", port: 6379 } },
);

worker.on("completed", (job, returnvalue) => {
  console.log(`${job.id} completed with`, returnvalue);
});

worker.on("failed", (job, err) => {
  console.error(`${job.id} failed with`, err.message);
});

// IMPORTANT: always attach an error handler
worker.on("error", (err) => {
  console.error(err);
});

// --- Global event listener (all workers) ---
const queueEvents = new QueueEvents("my-queue", {
  connection: { host: "localhost", port: 6379 },
});

queueEvents.on("completed", ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed`);
});

queueEvents.on("failed", ({ jobId, failedReason }) => {
  console.error(`Job ${jobId} failed: ${failedReason}`);
});
ts
import { Queue, Worker, QueueEvents } from "bullmq";

// --- 生产者 ---
const queue = new Queue("my-queue", {
  connection: { host: "localhost", port: 6379 },
});

await queue.add("job-name", { foo: "bar" });

// --- 消费者 ---
const worker = new Worker(
  "my-queue",
  async (job) => {
    // 处理作业
    await job.updateProgress(50);
    return { result: "done" };
  },
  { connection: { host: "localhost", port: 6379 } },
);

worker.on("completed", (job, returnvalue) => {
  console.log(`${job.id} 处理完成,结果为`, returnvalue);
});

worker.on("failed", (job, err) => {
  console.error(`${job.id} 处理失败,错误为`, err.message);
});

// 重要提示:务必添加错误处理程序
worker.on("error", (err) => {
  console.error(err);
});

// --- 全局事件监听器(所有工作器) ---
const queueEvents = new QueueEvents("my-queue", {
  connection: { host: "localhost", port: 6379 },
});

queueEvents.on("completed", ({ jobId, returnvalue }) => {
  console.log(`作业 ${jobId} 处理完成`);
});

queueEvents.on("failed", ({ jobId, failedReason }) => {
  console.error(`作业 ${jobId} 处理失败:${failedReason}`);
});

Connections

连接

BullMQ uses ioredis internally. Pass
connection
options or an existing ioredis instance.
ts
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";

// Option 1: connection config (new connection per instance)
const queue = new Queue("q", {
  connection: { host: "redis.example.com", port: 6379 },
});

// Option 2: reuse ioredis instance (Queue and multiple Queues can share)
const connection = new Redis();
const q1 = new Queue("q1", { connection });
const q2 = new Queue("q2", { connection });

// Option 3: reuse for Workers (BullMQ internally duplicates for blocking)
const workerConn = new Redis({ maxRetriesPerRequest: null });
const w1 = new Worker("q1", async (job) => {}, { connection: workerConn });
Critical rules:
  • Workers REQUIRE
    maxRetriesPerRequest: null
    on the ioredis instance. BullMQ enforces this and will warn/throw if not set.
  • Do NOT use ioredis
    keyPrefix
    option — use BullMQ's
    prefix
    option instead.
  • QueueEvents
    cannot share connections (uses blocking Redis commands).
  • Redis MUST have
    maxmemory-policy=noeviction
    .
BullMQ内部使用ioredis。可以传入
connection
选项或已有的ioredis实例。
ts
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";

// 选项1:连接配置(每个实例创建新连接)
const queue = new Queue("q", {
  connection: { host: "redis.example.com", port: 6379 },
});

// 选项2:复用ioredis实例(Queue和多个Queue可以共享)
const connection = new Redis();
const q1 = new Queue("q1", { connection });
const q2 = new Queue("q2", { connection });

// 选项3:为Workers复用连接(BullMQ内部会为阻塞操作复制连接)
const workerConn = new Redis({ maxRetriesPerRequest: null });
const w1 = new Worker("q1", async (job) => {}, { connection: workerConn });
关键规则:
  • Workers的ioredis实例必须设置
    maxRetriesPerRequest: null
    。BullMQ会强制执行此设置,若未设置会发出警告或抛出错误。
  • 不要使用ioredis的
    keyPrefix
    选项 — 请改用BullMQ的
    prefix
    选项。
  • QueueEvents
    无法共享连接(使用Redis阻塞命令)。
  • Redis必须设置
    maxmemory-policy=noeviction

Queue

队列

ts
const queue = new Queue("paint", { connection });

// Add a job
await queue.add("job-name", { color: "red" });

// Add with options
await queue.add(
  "job-name",
  { color: "blue" },
  {
    delay: 5000, // wait 5s before processing
    priority: 1, // lower = higher priority (0 is highest, max 2^21)
    attempts: 3, // retry up to 3 times
    backoff: { type: "exponential", delay: 1000 },
    removeOnComplete: true, // or { count: 100 } to keep last 100
    removeOnFail: 1000, // keep last 1000 failed jobs
  },
);

// Add bulk
await queue.addBulk([
  { name: "job1", data: { x: 1 } },
  { name: "job2", data: { x: 2 }, opts: { priority: 1 } },
]);

// Queue operations
await queue.pause();
await queue.resume();
await queue.obliterate({ force: true }); // remove all data
await queue.close();
ts
const queue = new Queue("paint", { connection });

// 添加作业
await queue.add("job-name", { color: "red" });

// 带选项添加作业
await queue.add(
  "job-name",
  { color: "blue" },
  {
    delay: 5000, // 等待5秒后处理
    priority: 1, // 值越小优先级越高(0为最高,最大值2^21)
    attempts: 3, // 最多重试3次
    backoff: { type: "exponential", delay: 1000 },
    removeOnComplete: true, // 或设置{ count: 100 }保留最后100个完成的作业
    removeOnFail: 1000, // 保留最后1000个失败的作业
  },
);

// 批量添加作业
await queue.addBulk([
  { name: "job1", data: { x: 1 } },
  { name: "job2", data: { x: 2 }, opts: { priority: 1 } },
]);

// 队列操作
await queue.pause();
await queue.resume();
await queue.obliterate({ force: true }); // 删除所有数据
await queue.close();

Worker

工作器

ts
const worker = new Worker<MyData, MyReturn>(
  "paint",
  async (job) => {
    await job.updateProgress(42);
    return { cost: 100 };
  },
  {
    connection,
    concurrency: 5, // process 5 jobs concurrently
    autorun: false, // don't start immediately
  },
);

worker.run(); // start when ready

// Update concurrency at runtime
worker.concurrency = 10;
Processor receives 3 args:
(job, token?, signal?)
— signal is an
AbortSignal
for cancellation support.
ts
const worker = new Worker<MyData, MyReturn>(
  "paint",
  async (job) => {
    await job.updateProgress(42);
    return { cost: 100 };
  },
  {
    connection,
    concurrency: 5, // 同时处理5个作业
    autorun: false, // 不立即启动
  },
);

worker.run(); // 准备就绪后启动

// 运行时更新并发数
worker.concurrency = 10;
处理器接收3个参数:
(job, token?, signal?)
— signal是用于取消支持的
AbortSignal

TypeScript Generics

TypeScript泛型

ts
interface JobData {
  color: string;
}
interface JobReturn {
  cost: number;
}

const queue = new Queue<JobData, JobReturn>("paint");
const worker = new Worker<JobData, JobReturn>("paint", async (job) => {
  // job.data is typed as JobData
  return { cost: 100 }; // must match JobReturn
});
ts
interface JobData {
  color: string;
}
interface JobReturn {
  cost: number;
}

const queue = new Queue<JobData, JobReturn>("paint");
const worker = new Worker<JobData, JobReturn>("paint", async (job) => {
  // job.data会被推断为JobData类型
  return { cost: 100 }; // 必须匹配JobReturn类型
});

Events

事件

Worker events (local to that worker instance):
EventCallback signature
completed
(job, returnvalue)
failed
(job | undefined, error, prev)
progress
(job, progress: number | object)
drained
()
— queue is empty
error
(error)
— MUST attach this handler
QueueEvents (global, all workers, uses Redis Streams):
EventCallback signature
completed
({ jobId, returnvalue })
failed
({ jobId, failedReason })
progress
({ jobId, data })
waiting
({ jobId })
active
({ jobId, prev })
delayed
({ jobId, delay })
deduplicated
({ jobId, deduplicationId, deduplicatedJobId })
Event stream is auto-trimmed (~10,000 events). Configure via
streams.events.maxLen
.
Worker事件(仅针对当前Worker实例):
事件名称回调签名
completed
(job, returnvalue)
failed
(job | undefined, error, prev)
progress
(job, progress: number | object)
drained
()
— 队列为空时触发
error
(error)
— 必须添加此事件处理程序
QueueEvents(全局事件,针对所有Workers,使用Redis Streams):
事件名称回调签名
completed
({ jobId, returnvalue })
failed
({ jobId, failedReason })
progress
({ jobId, data })
waiting
({ jobId })
active
({ jobId, prev })
delayed
({ jobId, delay })
deduplicated
({ jobId, deduplicationId, deduplicatedJobId })
事件流会自动修剪(约保留10,000个事件)。可通过
streams.events.maxLen
配置。

Job Lifecycle States

作业生命周期状态

add() → wait / prioritized / delayed
       active → completed
       failed → (retry) → wait/delayed
With FlowProducer: jobs can also be in
waiting-children
state until all children complete.
add() → 等待 / 优先级队列 / 延迟
       活跃 → 完成
       失败 → (重试) → 等待/延迟
使用FlowProducer时:作业还会处于
waiting-children
状态,直到所有子作业完成。

Advanced Topics

高级主题

  • Job types and options (delayed, prioritized, deduplication, repeatable): See references/job-types-and-options.md
  • Flows and schedulers (FlowProducer, parent-child, job schedulers, cron): See references/flows-and-schedulers.md
  • Patterns (step jobs, idempotent, throttle, manual rate-limit): See references/patterns.md
  • Production (shutdown, Redis config, retries, backoff, monitoring): See references/production.md
  • 作业类型与选项(延迟、优先级、去重、可重复):详见 references/job-types-and-options.md
  • 流程与调度器(FlowProducer、父子作业、作业调度器、cron):详见 references/flows-and-schedulers.md
  • 设计模式(分步作业、幂等、节流、手动速率限制):详见 references/patterns.md
  • 生产环境配置(停机、Redis配置、重试、退避、监控):详见 references/production.md