queues

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cloudflare Queues

Cloudflare Queues

Build reliable asynchronous message processing on Cloudflare Workers using Queues for background tasks, batch operations, and retry handling.
借助Queues在Cloudflare Workers上构建可靠的异步消息处理能力,用于后台任务、批量操作及重试处理。

When to Use

适用场景

  • Background Tasks - Offload non-critical work from request handlers
  • Batch Processing - Accumulate messages and process in batches to reduce upstream API calls
  • Retry Handling - Automatic retries with configurable delays for transient failures
  • Decoupling - Separate producers from consumers for scalability
  • Rate Limiting Upstream - Control the rate of requests to external APIs
  • Dead Letter Queues - Capture and inspect failed messages for debugging
  • 后台任务 - 从请求处理程序中卸载非关键工作
  • 批量处理 - 累积消息并批量处理,以减少上游API调用
  • 重试处理 - 针对临时故障自动重试,支持配置延迟时间
  • 解耦架构 - 分离生产者与消费者,提升系统可扩展性
  • 上游速率限制 - 控制对外部API的请求速率
  • 死信队列 - 捕获并检查失败消息,便于调试

Quick Reference

快速参考

TaskAPI
Send single message
env.QUEUE_BINDING.send(payload)
Send batch
env.QUEUE_BINDING.sendBatch([msg1, msg2])
Define consumer
async queue(batch: MessageBatch, env: Env) { ... }
Access message body
batch.messages.map(msg => msg.body)
Acknowledge messageMessages auto-ack unless handler throws
Retry message
throw new Error()
in queue handler
Get batch size
batch.messages.length
任务API
发送单条消息
env.QUEUE_BINDING.send(payload)
发送批量消息
env.QUEUE_BINDING.sendBatch([msg1, msg2])
定义消费者
async queue(batch: MessageBatch, env: Env) { ... }
访问消息体
batch.messages.map(msg => msg.body)
确认消息除非处理程序抛出异常,否则消息会自动确认
重试消息在队列处理程序中抛出
throw new Error()
获取批量大小
batch.messages.length

FIRST: wrangler.jsonc Configuration

第一步:wrangler.jsonc 配置

Queues require both producer and consumer configuration:
jsonc
{
  "name": "request-logger-consumer",
  "main": "src/index.ts",
  "compatibility_date": "2025-02-11",
  "queues": {
    "producers": [{
      "name": "request-queue",
      "binding": "REQUEST_QUEUE"
    }],
    "consumers": [{
      "name": "request-queue",
      "dead_letter_queue": "request-queue-dlq",
      "retry_delay": 300,
      "max_batch_size": 100,
      "max_batch_timeout": 30,
      "max_retries": 3
    }]
  },
  "vars": {
    "UPSTREAM_API_URL": "https://api.example.com/batch-logs",
    "UPSTREAM_API_KEY": ""
  }
}
Consumer Options:
  • dead_letter_queue
    - Queue name for failed messages (optional)
  • retry_delay
    - Seconds to wait before retry (default: 0)
  • max_batch_size
    - Max messages per batch (default: 10, max: 100)
  • max_batch_timeout
    - Max seconds to wait for batch (default: 5, max: 30)
  • max_retries
    - Max retry attempts (default: 3)
Queues需要同时配置生产者和消费者:
jsonc
{
  "name": "request-logger-consumer",
  "main": "src/index.ts",
  "compatibility_date": "2025-02-11",
  "queues": {
    "producers": [{
      "name": "request-queue",
      "binding": "REQUEST_QUEUE"
    }],
    "consumers": [{
      "name": "request-queue",
      "dead_letter_queue": "request-queue-dlq",
      "retry_delay": 300,
      "max_batch_size": 100,
      "max_batch_timeout": 30,
      "max_retries": 3
    }]
  },
  "vars": {
    "UPSTREAM_API_URL": "https://api.example.com/batch-logs",
    "UPSTREAM_API_KEY": ""
  }
}
消费者配置项:
  • dead_letter_queue
    - 存储失败消息的队列名称(可选)
  • retry_delay
    - 重试前等待的秒数(默认值:0)
  • max_batch_size
    - 每批消息的最大数量(默认值:10,最大值:100)
  • max_batch_timeout
    - 等待批量消息的最长秒数(默认值:5,最大值:30)
  • max_retries
    - 最大重试次数(默认值:3)

Producer and Consumer Pattern

生产者与消费者模式

Complete example showing how to produce and consume messages:
typescript
// src/index.ts
interface Env {
  REQUEST_QUEUE: Queue;
  UPSTREAM_API_URL: string;
  UPSTREAM_API_KEY: string;
}

export default {
  // Producer: Send messages to queue
  async fetch(request: Request, env: Env) {
    const info = {
      timestamp: new Date().toISOString(),
      method: request.method,
      url: request.url,
      headers: Object.fromEntries(request.headers),
    };
    
    await env.REQUEST_QUEUE.send(info);

    return Response.json({
      message: 'Request logged',
      requestId: crypto.randomUUID()
    });
  },

  // Consumer: Process messages in batches
  async queue(batch: MessageBatch<any>, env: Env) {
    const requests = batch.messages.map(msg => msg.body);

    const response = await fetch(env.UPSTREAM_API_URL, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${env.UPSTREAM_API_KEY}`
      },
      body: JSON.stringify({
        timestamp: new Date().toISOString(),
        batchSize: requests.length,
        requests
      })
    });

    if (!response.ok) {
      // Throwing will retry the entire batch
      throw new Error(`Upstream API error: ${response.status}`);
    }
  }
};
完整示例展示如何生产和消费消息:
typescript
// src/index.ts
interface Env {
  REQUEST_QUEUE: Queue;
  UPSTREAM_API_URL: string;
  UPSTREAM_API_KEY: string;
}

export default {
  // 生产者:向队列发送消息
  async fetch(request: Request, env: Env) {
    const info = {
      timestamp: new Date().toISOString(),
      method: request.method,
      url: request.url,
      headers: Object.fromEntries(request.headers),
    };
    
    await env.REQUEST_QUEUE.send(info);

    return Response.json({
      message: 'Request logged',
      requestId: crypto.randomUUID()
    });
  },

  // 消费者:批量处理消息
  async queue(batch: MessageBatch<any>, env: Env) {
    const requests = batch.messages.map(msg => msg.body);

    const response = await fetch(env.UPSTREAM_API_URL, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${env.UPSTREAM_API_KEY}`
      },
      body: JSON.stringify({
        timestamp: new Date().toISOString(),
        batchSize: requests.length,
        requests
      })
    });

    if (!response.ok) {
      // 抛出异常会重试整个批次
      throw new Error(`Upstream API error: ${response.status}`);
    }
  }
};

Batch Message Types

批量消息类型

Send messages with different formats:
typescript
// Send simple JSON payload
await env.QUEUE.send({ userId: 123, action: "login" });

// Send batch of messages
await env.QUEUE.sendBatch([
  { userId: 123, action: "login" },
  { userId: 456, action: "logout" },
  { userId: 789, action: "purchase" }
]);

// Send with typed body
interface UserEvent {
  userId: number;
  action: string;
  timestamp: string;
}

await env.QUEUE.send<UserEvent>({
  userId: 123,
  action: "login",
  timestamp: new Date().toISOString()
});
发送不同格式的消息:
typescript
// 发送简单JSON负载
await env.QUEUE.send({ userId: 123, action: "login" });

// 发送批量消息
await env.QUEUE.sendBatch([
  { userId: 123, action: "login" },
  { userId: 456, action: "logout" },
  { userId: 789, action: "purchase" }
]);

// 发送带类型的消息体
interface UserEvent {
  userId: number;
  action: string;
  timestamp: string;
}

await env.QUEUE.send<UserEvent>({
  userId: 123,
  action: "login",
  timestamp: new Date().toISOString()
});

Retry and Dead Letter Queues

重试与死信队列

Configure automatic retries and capture failed messages:
jsonc
{
  "queues": {
    "consumers": [{
      "name": "main-queue",
      "dead_letter_queue": "main-queue-dlq",
      "retry_delay": 300,  // 5 minutes
      "max_retries": 3
    }]
  }
}
Retry Behavior:
  1. Handler throws error → message is retried after
    retry_delay
    seconds
  2. After
    max_retries
    attempts → message moves to dead letter queue
  3. No DLQ configured → message is discarded after max retries
  4. Handler succeeds → message is acknowledged and removed
Processing Dead Letter Queue:
typescript
export default {
  // Main consumer
  async queue(batch: MessageBatch<any>, env: Env) {
    for (const message of batch.messages) {
      try {
        await processMessage(message.body);
      } catch (error) {
        console.error('Processing failed:', error);
        throw error; // Trigger retry
      }
    }
  }
};

// Separate worker for DLQ
export default {
  async queue(batch: MessageBatch<any>, env: Env) {
    // Log failed messages for debugging
    for (const message of batch.messages) {
      console.error('Dead letter message:', {
        body: message.body,
        attempts: message.attempts,
        timestamp: message.timestamp
      });
      
      // Optionally store in KV/D1 for inspection
      await env.FAILED_MESSAGES.put(
        message.id,
        JSON.stringify(message),
        { expirationTtl: 86400 * 7 } // 7 days
      );
    }
  }
};
配置自动重试并捕获失败消息:
jsonc
{
  "queues": {
    "consumers": [{
      "name": "main-queue",
      "dead_letter_queue": "main-queue-dlq",
      "retry_delay": 300,  // 5分钟
      "max_retries": 3
    }]
  }
}
重试行为:
  1. 处理程序抛出异常 → 消息将在
    retry_delay
    秒后重试
  2. 达到
    max_retries
    次数后 → 消息移入死信队列
  3. 未配置死信队列 → 达到最大重试次数后消息被丢弃
  4. 处理程序执行成功 → 消息被确认并移除
处理死信队列:
typescript
export default {
  // 主消费者
  async queue(batch: MessageBatch<any>, env: Env) {
    for (const message of batch.messages) {
      try {
        await processMessage(message.body);
      } catch (error) {
        console.error('Processing failed:', error);
        throw error; // 触发重试
      }
    }
  }
};

// 独立的死信队列处理Worker
export default {
  async queue(batch: MessageBatch<any>, env: Env) {
    // 记录失败消息用于调试
    for (const message of batch.messages) {
      console.error('Dead letter message:', {
        body: message.body,
        attempts: message.attempts,
        timestamp: message.timestamp
      });
      
      // 可选:将消息存储到KV/D1以便检查
      await env.FAILED_MESSAGES.put(
        message.id,
        JSON.stringify(message),
        { expirationTtl: 86400 * 7 } // 7天
      );
    }
  }
};

Batch Processing Patterns

批量处理模式

Pattern 1: All-or-Nothing Batch

模式1:全成全败批次

Process entire batch as transaction—if any message fails, retry all:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
  // Throwing retries entire batch
  const response = await fetch(env.UPSTREAM_API_URL, {
    method: 'POST',
    body: JSON.stringify(batch.messages.map(m => m.body))
  });
  
  if (!response.ok) {
    throw new Error(`Batch failed: ${response.status}`);
  }
}
将整个批次作为事务处理——如果任何消息失败,重试所有消息:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
  // 抛出异常会重试整个批次
  const response = await fetch(env.UPSTREAM_API_URL, {
    method: 'POST',
    body: JSON.stringify(batch.messages.map(m => m.body))
  });
  
  if (!response.ok) {
    throw new Error(`Batch failed: ${response.status}`);
  }
}

Pattern 2: Individual Message Handling

模式2:单条消息独立处理

Process messages individually with partial success:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
  const results = await Promise.allSettled(
    batch.messages.map(msg => processMessage(msg.body))
  );
  
  const failures = results.filter(r => r.status === 'rejected');
  
  if (failures.length > 0) {
    console.error(`${failures.length}/${batch.messages.length} messages failed`);
    // Throwing here retries the entire batch
    // Consider sending failed messages to a separate queue instead
  }
}
独立处理每条消息,支持部分成功:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
  const results = await Promise.allSettled(
    batch.messages.map(msg => processMessage(msg.body))
  );
  
  const failures = results.filter(r => r.status === 'rejected');
  
  if (failures.length > 0) {
    console.error(`${failures.length}/${batch.messages.length} messages failed`);
    // 此处抛出异常会重试整个批次
    // 也可考虑将失败消息发送到单独的队列
  }
}

Pattern 3: Partial Retry with Requeue

模式3:部分重试与重新入队

Requeue only failed messages:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
  const failedMessages = [];
  
  for (const message of batch.messages) {
    try {
      await processMessage(message.body);
    } catch (error) {
      failedMessages.push(message.body);
    }
  }
  
  // Requeue only failures
  if (failedMessages.length > 0) {
    await env.RETRY_QUEUE.sendBatch(failedMessages);
  }
  
  // Don't throw - successfully processed messages won't be retried
}
仅重新入队失败的消息:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
  const failedMessages = [];
  
  for (const message of batch.messages) {
    try {
      await processMessage(message.body);
    } catch (error) {
      failedMessages.push(message.body);
    }
  }
  
  // 仅重新入队失败消息
  if (failedMessages.length > 0) {
    await env.RETRY_QUEUE.sendBatch(failedMessages);
  }
  
  // 不抛出异常——已成功处理的消息不会被重试
}

Message Size Limits

消息大小限制

  • Max message size: 128 KB per message
  • Max batch size: 100 messages per batch (configurable)
  • Max total batch size: 256 MB
typescript
// Handle large payloads
async function sendLargePayload(data: any, env: Env) {
  const serialized = JSON.stringify(data);
  
  if (serialized.length > 100_000) { // ~100KB
    // Option 1: Store in R2/KV, send reference
    const key = crypto.randomUUID();
    await env.LARGE_PAYLOADS.put(key, serialized);
    await env.QUEUE.send({ type: 'large', key });
  } else {
    await env.QUEUE.send(data);
  }
}
  • 单条消息最大大小:每条消息128 KB
  • 每批消息最大数量:每批100条消息(可配置)
  • 批次总大小上限:256 MB
typescript
// 处理大负载
async function sendLargePayload(data: any, env: Env) {
  const serialized = JSON.stringify(data);
  
  if (serialized.length > 100_000) { // ~100KB
    // 方案1:存储到R2/KV,发送引用
    const key = crypto.randomUUID();
    await env.LARGE_PAYLOADS.put(key, serialized);
    await env.QUEUE.send({ type: 'large', key });
  } else {
    await env.QUEUE.send(data);
  }
}

Environment Interface

环境接口

Type your queue bindings:
typescript
interface Env {
  // Producer bindings
  REQUEST_QUEUE: Queue<RequestInfo>;
  EMAIL_QUEUE: Queue<EmailPayload>;
  
  // Environment variables
  UPSTREAM_API_URL: string;
  UPSTREAM_API_KEY: string;
  
  // Other bindings
  KV: KVNamespace;
  DB: D1Database;
}

interface RequestInfo {
  timestamp: string;
  method: string;
  url: string;
  headers: Record<string, string>;
}

interface EmailPayload {
  to: string;
  subject: string;
  body: string;
}
为队列绑定添加类型:
typescript
interface Env {
  // 生产者绑定
  REQUEST_QUEUE: Queue<RequestInfo>;
  EMAIL_QUEUE: Queue<EmailPayload>;
  
  // 环境变量
  UPSTREAM_API_URL: string;
  UPSTREAM_API_KEY: string;
  
  // 其他绑定
  KV: KVNamespace;
  DB: D1Database;
}

interface RequestInfo {
  timestamp: string;
  method: string;
  url: string;
  headers: Record<string, string>;
}

interface EmailPayload {
  to: string;
  subject: string;
  body: string;
}

Detailed References

详细参考文档

  • references/patterns.md - Advanced patterns: fan-out, priority queues, rate limiting
  • references/error-handling.md - Retry strategies, DLQ management, monitoring
  • references/limits.md - Message size, batch limits, retention, CPU constraints
  • references/testing.md - Vitest integration, createMessageBatch, getQueueResult, testing handlers
  • references/patterns.md - 高级模式:扇出、优先级队列、速率限制
  • references/error-handling.md - 重试策略、死信队列管理、监控
  • references/limits.md - 消息大小、批次限制、保留时间、CPU约束
  • references/testing.md - Vitest集成、createMessageBatch、getQueueResult、处理程序测试

Best Practices

最佳实践

  1. Use batch processing: Reduce upstream API calls by processing messages in batches
  2. Configure retry_delay: Set appropriate delays to avoid overwhelming failing services
  3. Always configure DLQ: Capture failed messages for debugging and replay
  4. Type your messages: Use generics for type-safe message bodies
  5. Monitor batch timeouts: Adjust
    max_batch_timeout
    based on processing time
  6. Handle partial failures: Don't throw on single message failure if others succeeded
  7. Size payloads appropriately: Keep messages under 100KB; use R2/KV for large data
  8. Use separate queues for priorities: Different queues for high/low priority messages
  9. Log DLQ messages: Always log or store DLQ messages for later analysis
  10. Don't await send() in hot paths: Queue operations are async but fast—fire and forget when appropriate
  1. 使用批量处理:通过批量处理消息减少上游API调用
  2. 配置retry_delay:设置合理的延迟时间,避免压垮故障服务
  3. 始终配置死信队列:捕获失败消息用于调试和重放
  4. 为消息添加类型:使用泛型实现类型安全的消息体
  5. 监控批次超时:根据处理时间调整
    max_batch_timeout
  6. 处理部分失败:单条消息失败时不要抛出异常,除非需要重试整个批次
  7. 合理控制负载大小:保持消息大小在100KB以下;大数据使用R2/KV存储
  8. 为不同优先级使用独立队列:为高/低优先级消息配置不同队列
  9. 记录死信队列消息:始终记录或存储死信队列消息以便后续分析
  10. 热点路径中不要等待send():队列操作是异步且快速的——合适时使用“即发即弃”模式