queues
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCloudflare Queues
Cloudflare Queues
Build reliable asynchronous message processing on Cloudflare Workers using Queues for background tasks, batch operations, and retry handling.
借助Queues在Cloudflare Workers上构建可靠的异步消息处理能力,用于后台任务、批量操作及重试处理。
When to Use
适用场景
- Background Tasks - Offload non-critical work from request handlers
- Batch Processing - Accumulate messages and process in batches to reduce upstream API calls
- Retry Handling - Automatic retries with configurable delays for transient failures
- Decoupling - Separate producers from consumers for scalability
- Rate Limiting Upstream - Control the rate of requests to external APIs
- Dead Letter Queues - Capture and inspect failed messages for debugging
- 后台任务 - 从请求处理程序中卸载非关键工作
- 批量处理 - 累积消息并批量处理,以减少上游API调用
- 重试处理 - 针对临时故障自动重试,支持配置延迟时间
- 解耦架构 - 分离生产者与消费者,提升系统可扩展性
- 上游速率限制 - 控制对外部API的请求速率
- 死信队列 - 捕获并检查失败消息,便于调试
Quick Reference
快速参考
| Task | API |
|---|---|
| Send single message | |
| Send batch | |
| Define consumer | |
| Access message body | |
| Acknowledge message | Messages auto-ack unless handler throws |
| Retry message | |
| Get batch size | |
| 任务 | API |
|---|---|
| 发送单条消息 | |
| 发送批量消息 | |
| 定义消费者 | |
| 访问消息体 | |
| 确认消息 | 除非处理程序抛出异常,否则消息会自动确认 |
| 重试消息 | 在队列处理程序中抛出 |
| 获取批量大小 | |
FIRST: wrangler.jsonc Configuration
第一步:wrangler.jsonc 配置
Queues require both producer and consumer configuration:
jsonc
{
"name": "request-logger-consumer",
"main": "src/index.ts",
"compatibility_date": "2025-02-11",
"queues": {
"producers": [{
"name": "request-queue",
"binding": "REQUEST_QUEUE"
}],
"consumers": [{
"name": "request-queue",
"dead_letter_queue": "request-queue-dlq",
"retry_delay": 300,
"max_batch_size": 100,
"max_batch_timeout": 30,
"max_retries": 3
}]
},
"vars": {
"UPSTREAM_API_URL": "https://api.example.com/batch-logs",
"UPSTREAM_API_KEY": ""
}
}Consumer Options:
- - Queue name for failed messages (optional)
dead_letter_queue - - Seconds to wait before retry (default: 0)
retry_delay - - Max messages per batch (default: 10, max: 100)
max_batch_size - - Max seconds to wait for batch (default: 5, max: 30)
max_batch_timeout - - Max retry attempts (default: 3)
max_retries
Queues需要同时配置生产者和消费者:
jsonc
{
"name": "request-logger-consumer",
"main": "src/index.ts",
"compatibility_date": "2025-02-11",
"queues": {
"producers": [{
"name": "request-queue",
"binding": "REQUEST_QUEUE"
}],
"consumers": [{
"name": "request-queue",
"dead_letter_queue": "request-queue-dlq",
"retry_delay": 300,
"max_batch_size": 100,
"max_batch_timeout": 30,
"max_retries": 3
}]
},
"vars": {
"UPSTREAM_API_URL": "https://api.example.com/batch-logs",
"UPSTREAM_API_KEY": ""
}
}消费者配置项:
- - 存储失败消息的队列名称(可选)
dead_letter_queue - - 重试前等待的秒数(默认值:0)
retry_delay - - 每批消息的最大数量(默认值:10,最大值:100)
max_batch_size - - 等待批量消息的最长秒数(默认值:5,最大值:30)
max_batch_timeout - - 最大重试次数(默认值:3)
max_retries
Producer and Consumer Pattern
生产者与消费者模式
Complete example showing how to produce and consume messages:
typescript
// src/index.ts
interface Env {
REQUEST_QUEUE: Queue;
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
}
export default {
// Producer: Send messages to queue
async fetch(request: Request, env: Env) {
const info = {
timestamp: new Date().toISOString(),
method: request.method,
url: request.url,
headers: Object.fromEntries(request.headers),
};
await env.REQUEST_QUEUE.send(info);
return Response.json({
message: 'Request logged',
requestId: crypto.randomUUID()
});
},
// Consumer: Process messages in batches
async queue(batch: MessageBatch<any>, env: Env) {
const requests = batch.messages.map(msg => msg.body);
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${env.UPSTREAM_API_KEY}`
},
body: JSON.stringify({
timestamp: new Date().toISOString(),
batchSize: requests.length,
requests
})
});
if (!response.ok) {
// Throwing will retry the entire batch
throw new Error(`Upstream API error: ${response.status}`);
}
}
};完整示例展示如何生产和消费消息:
typescript
// src/index.ts
interface Env {
REQUEST_QUEUE: Queue;
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
}
export default {
// 生产者:向队列发送消息
async fetch(request: Request, env: Env) {
const info = {
timestamp: new Date().toISOString(),
method: request.method,
url: request.url,
headers: Object.fromEntries(request.headers),
};
await env.REQUEST_QUEUE.send(info);
return Response.json({
message: 'Request logged',
requestId: crypto.randomUUID()
});
},
// 消费者:批量处理消息
async queue(batch: MessageBatch<any>, env: Env) {
const requests = batch.messages.map(msg => msg.body);
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${env.UPSTREAM_API_KEY}`
},
body: JSON.stringify({
timestamp: new Date().toISOString(),
batchSize: requests.length,
requests
})
});
if (!response.ok) {
// 抛出异常会重试整个批次
throw new Error(`Upstream API error: ${response.status}`);
}
}
};Batch Message Types
批量消息类型
Send messages with different formats:
typescript
// Send simple JSON payload
await env.QUEUE.send({ userId: 123, action: "login" });
// Send batch of messages
await env.QUEUE.sendBatch([
{ userId: 123, action: "login" },
{ userId: 456, action: "logout" },
{ userId: 789, action: "purchase" }
]);
// Send with typed body
interface UserEvent {
userId: number;
action: string;
timestamp: string;
}
await env.QUEUE.send<UserEvent>({
userId: 123,
action: "login",
timestamp: new Date().toISOString()
});发送不同格式的消息:
typescript
// 发送简单JSON负载
await env.QUEUE.send({ userId: 123, action: "login" });
// 发送批量消息
await env.QUEUE.sendBatch([
{ userId: 123, action: "login" },
{ userId: 456, action: "logout" },
{ userId: 789, action: "purchase" }
]);
// 发送带类型的消息体
interface UserEvent {
userId: number;
action: string;
timestamp: string;
}
await env.QUEUE.send<UserEvent>({
userId: 123,
action: "login",
timestamp: new Date().toISOString()
});Retry and Dead Letter Queues
重试与死信队列
Configure automatic retries and capture failed messages:
jsonc
{
"queues": {
"consumers": [{
"name": "main-queue",
"dead_letter_queue": "main-queue-dlq",
"retry_delay": 300, // 5 minutes
"max_retries": 3
}]
}
}Retry Behavior:
- Handler throws error → message is retried after seconds
retry_delay - After attempts → message moves to dead letter queue
max_retries - No DLQ configured → message is discarded after max retries
- Handler succeeds → message is acknowledged and removed
Processing Dead Letter Queue:
typescript
export default {
// Main consumer
async queue(batch: MessageBatch<any>, env: Env) {
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
console.error('Processing failed:', error);
throw error; // Trigger retry
}
}
}
};
// Separate worker for DLQ
export default {
async queue(batch: MessageBatch<any>, env: Env) {
// Log failed messages for debugging
for (const message of batch.messages) {
console.error('Dead letter message:', {
body: message.body,
attempts: message.attempts,
timestamp: message.timestamp
});
// Optionally store in KV/D1 for inspection
await env.FAILED_MESSAGES.put(
message.id,
JSON.stringify(message),
{ expirationTtl: 86400 * 7 } // 7 days
);
}
}
};配置自动重试并捕获失败消息:
jsonc
{
"queues": {
"consumers": [{
"name": "main-queue",
"dead_letter_queue": "main-queue-dlq",
"retry_delay": 300, // 5分钟
"max_retries": 3
}]
}
}重试行为:
- 处理程序抛出异常 → 消息将在秒后重试
retry_delay - 达到次数后 → 消息移入死信队列
max_retries - 未配置死信队列 → 达到最大重试次数后消息被丢弃
- 处理程序执行成功 → 消息被确认并移除
处理死信队列:
typescript
export default {
// 主消费者
async queue(batch: MessageBatch<any>, env: Env) {
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
console.error('Processing failed:', error);
throw error; // 触发重试
}
}
}
};
// 独立的死信队列处理Worker
export default {
async queue(batch: MessageBatch<any>, env: Env) {
// 记录失败消息用于调试
for (const message of batch.messages) {
console.error('Dead letter message:', {
body: message.body,
attempts: message.attempts,
timestamp: message.timestamp
});
// 可选:将消息存储到KV/D1以便检查
await env.FAILED_MESSAGES.put(
message.id,
JSON.stringify(message),
{ expirationTtl: 86400 * 7 } // 7天
);
}
}
};Batch Processing Patterns
批量处理模式
Pattern 1: All-or-Nothing Batch
模式1:全成全败批次
Process entire batch as transaction—if any message fails, retry all:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
// Throwing retries entire batch
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
body: JSON.stringify(batch.messages.map(m => m.body))
});
if (!response.ok) {
throw new Error(`Batch failed: ${response.status}`);
}
}将整个批次作为事务处理——如果任何消息失败,重试所有消息:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
// 抛出异常会重试整个批次
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
body: JSON.stringify(batch.messages.map(m => m.body))
});
if (!response.ok) {
throw new Error(`Batch failed: ${response.status}`);
}
}Pattern 2: Individual Message Handling
模式2:单条消息独立处理
Process messages individually with partial success:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
const results = await Promise.allSettled(
batch.messages.map(msg => processMessage(msg.body))
);
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
console.error(`${failures.length}/${batch.messages.length} messages failed`);
// Throwing here retries the entire batch
// Consider sending failed messages to a separate queue instead
}
}独立处理每条消息,支持部分成功:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
const results = await Promise.allSettled(
batch.messages.map(msg => processMessage(msg.body))
);
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
console.error(`${failures.length}/${batch.messages.length} messages failed`);
// 此处抛出异常会重试整个批次
// 也可考虑将失败消息发送到单独的队列
}
}Pattern 3: Partial Retry with Requeue
模式3:部分重试与重新入队
Requeue only failed messages:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
const failedMessages = [];
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
failedMessages.push(message.body);
}
}
// Requeue only failures
if (failedMessages.length > 0) {
await env.RETRY_QUEUE.sendBatch(failedMessages);
}
// Don't throw - successfully processed messages won't be retried
}仅重新入队失败的消息:
typescript
async queue(batch: MessageBatch<any>, env: Env) {
const failedMessages = [];
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
failedMessages.push(message.body);
}
}
// 仅重新入队失败消息
if (failedMessages.length > 0) {
await env.RETRY_QUEUE.sendBatch(failedMessages);
}
// 不抛出异常——已成功处理的消息不会被重试
}Message Size Limits
消息大小限制
- Max message size: 128 KB per message
- Max batch size: 100 messages per batch (configurable)
- Max total batch size: 256 MB
typescript
// Handle large payloads
async function sendLargePayload(data: any, env: Env) {
const serialized = JSON.stringify(data);
if (serialized.length > 100_000) { // ~100KB
// Option 1: Store in R2/KV, send reference
const key = crypto.randomUUID();
await env.LARGE_PAYLOADS.put(key, serialized);
await env.QUEUE.send({ type: 'large', key });
} else {
await env.QUEUE.send(data);
}
}- 单条消息最大大小:每条消息128 KB
- 每批消息最大数量:每批100条消息(可配置)
- 批次总大小上限:256 MB
typescript
// 处理大负载
async function sendLargePayload(data: any, env: Env) {
const serialized = JSON.stringify(data);
if (serialized.length > 100_000) { // ~100KB
// 方案1:存储到R2/KV,发送引用
const key = crypto.randomUUID();
await env.LARGE_PAYLOADS.put(key, serialized);
await env.QUEUE.send({ type: 'large', key });
} else {
await env.QUEUE.send(data);
}
}Environment Interface
环境接口
Type your queue bindings:
typescript
interface Env {
// Producer bindings
REQUEST_QUEUE: Queue<RequestInfo>;
EMAIL_QUEUE: Queue<EmailPayload>;
// Environment variables
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
// Other bindings
KV: KVNamespace;
DB: D1Database;
}
interface RequestInfo {
timestamp: string;
method: string;
url: string;
headers: Record<string, string>;
}
interface EmailPayload {
to: string;
subject: string;
body: string;
}为队列绑定添加类型:
typescript
interface Env {
// 生产者绑定
REQUEST_QUEUE: Queue<RequestInfo>;
EMAIL_QUEUE: Queue<EmailPayload>;
// 环境变量
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
// 其他绑定
KV: KVNamespace;
DB: D1Database;
}
interface RequestInfo {
timestamp: string;
method: string;
url: string;
headers: Record<string, string>;
}
interface EmailPayload {
to: string;
subject: string;
body: string;
}Detailed References
详细参考文档
- references/patterns.md - Advanced patterns: fan-out, priority queues, rate limiting
- references/error-handling.md - Retry strategies, DLQ management, monitoring
- references/limits.md - Message size, batch limits, retention, CPU constraints
- references/testing.md - Vitest integration, createMessageBatch, getQueueResult, testing handlers
- references/patterns.md - 高级模式:扇出、优先级队列、速率限制
- references/error-handling.md - 重试策略、死信队列管理、监控
- references/limits.md - 消息大小、批次限制、保留时间、CPU约束
- references/testing.md - Vitest集成、createMessageBatch、getQueueResult、处理程序测试
Best Practices
最佳实践
- Use batch processing: Reduce upstream API calls by processing messages in batches
- Configure retry_delay: Set appropriate delays to avoid overwhelming failing services
- Always configure DLQ: Capture failed messages for debugging and replay
- Type your messages: Use generics for type-safe message bodies
- Monitor batch timeouts: Adjust based on processing time
max_batch_timeout - Handle partial failures: Don't throw on single message failure if others succeeded
- Size payloads appropriately: Keep messages under 100KB; use R2/KV for large data
- Use separate queues for priorities: Different queues for high/low priority messages
- Log DLQ messages: Always log or store DLQ messages for later analysis
- Don't await send() in hot paths: Queue operations are async but fast—fire and forget when appropriate
- 使用批量处理:通过批量处理消息减少上游API调用
- 配置retry_delay:设置合理的延迟时间,避免压垮故障服务
- 始终配置死信队列:捕获失败消息用于调试和重放
- 为消息添加类型:使用泛型实现类型安全的消息体
- 监控批次超时:根据处理时间调整
max_batch_timeout - 处理部分失败:单条消息失败时不要抛出异常,除非需要重试整个批次
- 合理控制负载大小:保持消息大小在100KB以下;大数据使用R2/KV存储
- 为不同优先级使用独立队列:为高/低优先级消息配置不同队列
- 记录死信队列消息:始终记录或存储死信队列消息以便后续分析
- 热点路径中不要等待send():队列操作是异步且快速的——合适时使用“即发即弃”模式