routing-strategies

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Celery Task Routing Strategies Skill

Celery任务路由策略Skill

This skill provides comprehensive templates, scripts, and patterns for implementing advanced task routing and queue management in Celery applications, including priority queues, topic-based routing, and worker-specific queue assignments.
本Skill为Celery应用提供了实现高级任务路由与队列管理的全面模板、脚本和模式,包括优先级队列、基于主题的路由以及特定Worker的队列分配。

Overview

概述

Effective task routing is crucial for:
  1. Performance Optimization - Route compute-intensive tasks to dedicated workers
  2. Priority Management - High-priority tasks bypass slower queues
  3. Resource Isolation - Separate critical operations from background jobs
  4. Scalability - Independent scaling of different task types
This skill covers routing with RabbitMQ, Redis, and custom broker configurations.
有效的任务路由对于以下方面至关重要:
  1. 性能优化 - 将计算密集型任务路由到专用Worker
  2. 优先级管理 - 高优先级任务绕过较慢的队列
  3. 资源隔离 - 将关键操作与后台作业分离
  4. 可扩展性 - 独立扩展不同类型的任务
本Skill涵盖了基于RabbitMQ、Redis和自定义中间件配置的路由方案。

Available Scripts

可用脚本

1. Test Routing Configuration

1. 测试路由配置

Script:
scripts/test-routing.sh <config-file>
Purpose: Validates routing configuration and tests queue connectivity
Checks:
  • Broker connectivity (RabbitMQ/Redis)
  • Queue declarations
  • Exchange configurations
  • Routing key patterns
  • Worker queue bindings
  • Priority queue setup
Usage:
bash
undefined
脚本:
scripts/test-routing.sh <config-file>
用途: 验证路由配置并测试队列连通性
检查项:
  • 中间件连通性(RabbitMQ/Redis)
  • 队列声明
  • 交换机配置
  • 路由键模式
  • Worker队列绑定
  • 优先级队列设置
使用方法:
bash
undefined

Test routing configuration

测试路由配置

./scripts/test-routing.sh ./celery_config.py
./scripts/test-routing.sh ./celery_config.py

Test with custom broker URL

使用自定义中间件URL测试

BROKER_URL=amqp://user:password@localhost:5672// ./scripts/test-routing.sh ./celery_config.py
BROKER_URL=amqp://user:password@localhost:5672// ./scripts/test-routing.sh ./celery_config.py

Verbose output

详细输出

VERBOSE=1 ./scripts/test-routing.sh ./celery_config.py

**Exit Codes**:
- `0`: All routing tests passed
- `1`: Configuration errors detected
- `2`: Broker connection failed
VERBOSE=1 ./scripts/test-routing.sh ./celery_config.py

**退出码**:
- `0`: 所有路由测试通过
- `1`: 检测到配置错误
- `2`: 中间件连接失败

2. Validate Queue Configuration

2. 验证队列配置

Script:
scripts/validate-queues.sh <project-dir>
Purpose: Validates queue setup across application code
Checks:
  • Task decorators use valid queues
  • No hardcoded queue names (use config)
  • All queues defined in routing configuration
  • Priority settings are valid (0-255)
  • Exchange types match routing patterns
  • Worker configurations reference valid queues
Usage:
bash
undefined
脚本:
scripts/validate-queues.sh <project-dir>
用途: 验证应用代码中的队列设置
检查项:
  • 任务装饰器使用有效的队列
  • 没有硬编码的队列名称(使用配置)
  • 所有队列都在路由配置中定义
  • 优先级设置有效(0-255)
  • 交换机类型与路由模式匹配
  • Worker配置引用有效的队列
使用方法:
bash
undefined

Validate current project

验证当前项目

./scripts/validate-queues.sh .
./scripts/validate-queues.sh .

Validate specific directory

验证特定目录

./scripts/validate-queues.sh /path/to/celery-app
./scripts/validate-queues.sh /path/to/celery-app

Generate detailed report

生成详细报告

REPORT=1 ./scripts/validate-queues.sh . > queue-validation-report.md

**Exit Codes**:
- `0`: Validation passed
- `1`: Validation failed (must fix issues)
REPORT=1 ./scripts/validate-queues.sh . > queue-validation-report.md

**退出码**:
- `0`: 验证通过
- `1`: 验证失败(必须修复问题)

Available Templates

可用模板

1. Basic Queue Configuration

1. 基础队列配置

Template:
templates/queue-config.py
Features:
  • Default queue setup
  • Named queues for different task types
  • Queue-to-exchange bindings
  • Priority settings
  • Worker routing configuration
Usage:
python
from celery import Celery
from templates.queue_config import CELERY_ROUTES, CELERY_QUEUES

app = Celery('myapp')
app.conf.task_routes = CELERY_ROUTES
app.conf.task_queues = CELERY_QUEUES
Configuration Example:
python
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('high_priority', Exchange('default'), routing_key='high'),
    Queue('low_priority', Exchange('default'), routing_key='low'),
    Queue('emails', Exchange('emails'), routing_key='email.*'),
    Queue('reports', Exchange('reports'), routing_key='report.*'),
)

CELERY_ROUTES = {
    'myapp.tasks.send_email': {'queue': 'emails', 'routing_key': 'email.send'},
    'myapp.tasks.generate_report': {'queue': 'reports', 'routing_key': 'report.generate'},
    'myapp.tasks.urgent_task': {'queue': 'high_priority', 'priority': 9},
}
模板:
templates/queue-config.py
特性:
  • 默认队列设置
  • 针对不同任务类型的命名队列
  • 队列到交换机的绑定
  • 优先级设置
  • Worker路由配置
使用方法:
python
from celery import Celery
from templates.queue_config import CELERY_ROUTES, CELERY_QUEUES

app = Celery('myapp')
app.conf.task_routes = CELERY_ROUTES
app.conf.task_queues = CELERY_QUEUES
配置示例:
python
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('high_priority', Exchange('default'), routing_key='high'),
    Queue('low_priority', Exchange('default'), routing_key='low'),
    Queue('emails', Exchange('emails'), routing_key='email.*'),
    Queue('reports', Exchange('reports'), routing_key='report.*'),
)

CELERY_ROUTES = {
    'myapp.tasks.send_email': {'queue': 'emails', 'routing_key': 'email.send'},
    'myapp.tasks.generate_report': {'queue': 'reports', 'routing_key': 'report.generate'},
    'myapp.tasks.urgent_task': {'queue': 'high_priority', 'priority': 9},
}

2. Dynamic Routing Rules

2. 动态路由规则

Template:
templates/routing-rules.py
Features:
  • Pattern-based routing
  • Conditional routing logic
  • Dynamic queue selection
  • Routing by task name patterns
  • Routing by task arguments
Key Functions:
python
def route_task(name, args, kwargs, options, task=None, **kw):
    """
    Dynamic routing based on task name or arguments
    """
    if name.startswith('urgent.'):
        return {'queue': 'high_priority', 'priority': 9}

    if 'priority' in kwargs and kwargs['priority'] == 'high':
        return {'queue': 'high_priority'}

    if name.startswith('email.'):
        return {'queue': 'emails', 'exchange': 'emails'}

    return {'queue': 'default'}

app.conf.task_routes = (route_task,)
模板:
templates/routing-rules.py
特性:
  • 基于模式的路由
  • 条件路由逻辑
  • 动态队列选择
  • 按任务名称模式路由
  • 按任务参数路由
核心函数:
python
def route_task(name, args, kwargs, options, task=None, **kw):
    """
    基于任务名称或参数的动态路由
    """
    if name.startswith('urgent.'):
        return {'queue': 'high_priority', 'priority': 9}

    if 'priority' in kwargs and kwargs['priority'] == 'high':
        return {'queue': 'high_priority'}

    if name.startswith('email.'):
        return {'queue': 'emails', 'exchange': 'emails'}

    return {'queue': 'default'}

app.conf.task_routes = (route_task,)

3. Priority Queue Setup

3. 优先级队列设置

Template:
templates/priority-queues.py
Features:
  • Multi-level priority queues (0-255)
  • Priority inheritance
  • Default priority configuration
  • Queue priority enforcement
Priority Levels:
python
undefined
模板:
templates/priority-queues.py
特性:
  • 多级优先级队列(0-255)
  • 优先级继承
  • 默认优先级配置
  • 队列优先级强制
优先级级别:
python
undefined

Priority queue configuration

优先级队列配置

CELERY_QUEUES = ( Queue('critical', Exchange('tasks'), routing_key='critical', queue_arguments={'x-max-priority': 10}), Queue('high', Exchange('tasks'), routing_key='high', queue_arguments={'x-max-priority': 10}), Queue('normal', Exchange('tasks'), routing_key='normal', queue_arguments={'x-max-priority': 10}), Queue('low', Exchange('tasks'), routing_key='low', queue_arguments={'x-max-priority': 10}), )
CELERY_QUEUES = ( Queue('critical', Exchange('tasks'), routing_key='critical', queue_arguments={'x-max-priority': 10}), Queue('high', Exchange('tasks'), routing_key='high', queue_arguments={'x-max-priority': 10}), Queue('normal', Exchange('tasks'), routing_key='normal', queue_arguments={'x-max-priority': 10}), Queue('low', Exchange('tasks'), routing_key='low', queue_arguments={'x-max-priority': 10}), )

Task priority mapping

任务优先级映射

PRIORITY_LEVELS = { 'critical': 10, # Highest priority 'high': 7, 'normal': 5, 'low': 2, }
PRIORITY_LEVELS = { 'critical': 10, # 最高优先级 'high': 7, 'normal': 5, 'low': 2, }

Apply priority to task

为任务设置优先级

@app.task(priority=PRIORITY_LEVELS['high']) def urgent_processing(): pass
undefined
@app.task(priority=PRIORITY_LEVELS['high']) def urgent_processing(): pass
undefined

4. Topic Exchange Routing

4. 主题交换机路由

Template:
templates/topic-exchange.py
Features:
  • Topic-based routing patterns
  • Wildcard routing keys
  • Multi-queue routing
  • Pattern matching
Topic Patterns:
python
from kombu import Exchange, Queue
模板:
templates/topic-exchange.py
特性:
  • 基于主题的路由模式
  • 通配符路由键
  • 多队列路由
  • 模式匹配
主题模式:
python
from kombu import Exchange, Queue

Topic exchange setup

主题交换机设置

task_exchange = Exchange('tasks', type='topic', durable=True)
CELERY_QUEUES = ( # Match specific patterns Queue('user.notifications', exchange=task_exchange, routing_key='user.notification.*'),
# Match all email types
Queue('emails', exchange=task_exchange,
      routing_key='email.#'),

# Match processing tasks
Queue('processing', exchange=task_exchange,
      routing_key='*.processing.*'),

# Match all reports
Queue('reports', exchange=task_exchange,
      routing_key='report.*'),
)
task_exchange = Exchange('tasks', type='topic', durable=True)
CELERY_QUEUES = ( # 匹配特定模式 Queue('user.notifications', exchange=task_exchange, routing_key='user.notification.*'),
# 匹配所有邮件类型
Queue('emails', exchange=task_exchange,
      routing_key='email.#'),

# 匹配处理任务
Queue('processing', exchange=task_exchange,
      routing_key='*.processing.*'),

# 匹配所有报告
Queue('reports', exchange=task_exchange,
      routing_key='report.*'),
)

Routing configuration

路由配置

CELERY_ROUTES = { 'myapp.tasks.send_welcome_email': { 'exchange': 'tasks', 'routing_key': 'email.welcome.send' }, 'myapp.tasks.notify_user': { 'exchange': 'tasks', 'routing_key': 'user.notification.send' }, }
undefined
CELERY_ROUTES = { 'myapp.tasks.send_welcome_email': { 'exchange': 'tasks', 'routing_key': 'email.welcome.send' }, 'myapp.tasks.notify_user': { 'exchange': 'tasks', 'routing_key': 'user.notification.send' }, }
undefined

5. Worker-Specific Routing

5. 特定Worker路由

Template:
templates/worker-routing.py
Features:
  • Dedicated worker pools
  • Worker-specific queues
  • CPU vs I/O task separation
  • Geographic routing
  • Resource-based routing
Worker Configuration:
python
undefined
模板:
templates/worker-routing.py
特性:
  • 专用Worker池
  • 特定Worker的队列
  • CPU密集型与I/O密集型任务分离
  • 地域路由
  • 基于资源的路由
Worker配置:
python
undefined

Worker pool definitions

Worker池定义

WORKER_POOLS = { 'cpu_intensive': { 'queues': ['ml_training', 'video_processing', 'data_analysis'], 'concurrency': 4, 'prefetch_multiplier': 1, }, 'io_intensive': { 'queues': ['api_calls', 'file_uploads', 'emails'], 'concurrency': 50, 'prefetch_multiplier': 10, }, 'general': { 'queues': ['default', 'background'], 'concurrency': 10, 'prefetch_multiplier': 4, }, }
WORKER_POOLS = { 'cpu_intensive': { 'queues': ['ml_training', 'video_processing', 'data_analysis'], 'concurrency': 4, 'prefetch_multiplier': 1, }, 'io_intensive': { 'queues': ['api_calls', 'file_uploads', 'emails'], 'concurrency': 50, 'prefetch_multiplier': 10, }, 'general': { 'queues': ['default', 'background'], 'concurrency': 10, 'prefetch_multiplier': 4, }, }

Start workers

启动Worker

celery -A myapp worker --queues=ml_training,video_processing -c 4 -n cpu_worker@%h

celery -A myapp worker --queues=ml_training,video_processing -c 4 -n cpu_worker@%h

celery -A myapp worker --queues=api_calls,file_uploads -c 50 -n io_worker@%h

celery -A myapp worker --queues=api_calls,file_uploads -c 50 -n io_worker@%h

undefined
undefined

Available Examples

可用示例

1. Priority Queue Setup Guide

1. 优先级队列设置指南

Example:
examples/priority-queue-setup.md
Demonstrates:
  • Configuring RabbitMQ priority queues
  • Setting task priorities
  • Priority inheritance patterns
  • Testing priority routing
  • Monitoring priority queue performance
Key Concepts:
  • Priority range: 0 (lowest) to 255 (highest)
  • RabbitMQ
    x-max-priority
    argument
  • Priority at task definition vs runtime
  • Queue argument configuration
示例:
examples/priority-queue-setup.md
演示内容:
  • 配置RabbitMQ优先级队列
  • 设置任务优先级
  • 优先级继承模式
  • 测试优先级路由
  • 监控优先级队列性能
核心概念:
  • 优先级范围:0(最低)到255(最高)
  • RabbitMQ的
    x-max-priority
    参数
  • 任务定义时与运行时的优先级
  • 队列参数配置

2. Topic-Based Routing Implementation

2. 基于主题的路由实现

Example:
examples/topic-routing.md
Demonstrates:
  • Topic exchange setup
  • Routing key patterns (* and # wildcards)
  • Multi-queue routing
  • Pattern matching strategies
  • Consumer binding patterns
Routing Key Patterns:
  • *
    - Matches exactly one word
  • #
    - Matches zero or more words
  • Example:
    email.*.send
    matches
    email.welcome.send
    ,
    email.notification.send
  • Example:
    user.#
    matches
    user.create
    ,
    user.update.profile
示例:
examples/topic-routing.md
演示内容:
  • 主题交换机设置
  • 路由键模式(*和#通配符)
  • 多队列路由
  • 模式匹配策略
  • 消费者绑定模式
路由键模式:
  • *
    - 精确匹配一个单词
  • #
    - 匹配零个或多个单词
  • 示例:
    email.*.send
    匹配
    email.welcome.send
    email.notification.send
  • 示例:
    user.#
    匹配
    user.create
    user.update.profile

3. Worker Queue Assignment Strategy

3. Worker队列分配策略

Example:
examples/worker-queue-assignment.md
Demonstrates:
  • CPU-bound vs I/O-bound task separation
  • Worker pool configuration
  • Queue-to-worker mapping
  • Scaling strategies per worker type
  • Resource allocation patterns
Worker Types:
bash
undefined
示例:
examples/worker-queue-assignment.md
演示内容:
  • CPU密集型与I/O密集型任务分离
  • Worker池配置
  • 队列到Worker的映射
  • 按Worker类型的扩展策略
  • 资源分配模式
Worker类型:
bash
undefined

CPU-intensive workers (low concurrency)

CPU密集型Worker(低并发)

celery -A myapp worker -Q ml_training,video_processing -c 4 -n cpu@%h
celery -A myapp worker -Q ml_training,video_processing -c 4 -n cpu@%h

I/O-intensive workers (high concurrency)

I/O密集型Worker(高并发)

celery -A myapp worker -Q api_calls,emails -c 50 -n io@%h
celery -A myapp worker -Q api_calls,emails -c 50 -n io@%h

General purpose workers

通用Worker

celery -A myapp worker -Q default,background -c 10 -n general@%h
undefined
celery -A myapp worker -Q default,background -c 10 -n general@%h
undefined

Routing Strategies Comparison

路由策略对比

1. Direct Exchange (Default)

1. 直连交换机(默认)

  • Use Case: Simple queue-to-task mapping
  • Pros: Simple, predictable, fast
  • Cons: Limited flexibility
  • Example: Each task type goes to one specific queue
  • 适用场景: 简单的队列到任务映射
  • 优点: 简单、可预测、快速
  • 缺点: 灵活性有限
  • 示例: 每种任务类型发送到一个特定队列

2. Topic Exchange

2. 主题交换机

  • Use Case: Pattern-based routing, hierarchical task categories
  • Pros: Flexible, supports wildcards, multi-queue routing
  • Cons: More complex configuration
  • Example:
    email.*.send
    routes all email types to email queue
  • 适用场景: 基于模式的路由、分层任务分类
  • 优点: 灵活、支持通配符、多队列路由
  • 缺点: 配置更复杂
  • 示例:
    email.*.send
    将所有邮件类型路由到邮件队列

3. Fanout Exchange

3. 扇出交换机

  • Use Case: Broadcasting tasks to multiple queues
  • Pros: Simple broadcast mechanism
  • Cons: No routing logic, all queues receive all messages
  • Example: Notifications sent to multiple consumer types
  • 适用场景: 将任务广播到多个队列
  • 优点: 简单的广播机制
  • 缺点: 无路由逻辑,所有队列接收所有消息
  • 示例: 通知发送到多种类型的消费者

4. Headers Exchange

4. 头交换机

  • Use Case: Complex routing based on message headers
  • Pros: Very flexible, metadata-based routing
  • Cons: Performance overhead, complex configuration
  • Example: Route by
    priority=high
    and
    region=us-east
  • 适用场景: 基于消息头的复杂路由
  • 优点: 非常灵活、基于元数据的路由
  • 缺点: 性能开销、配置复杂
  • 示例: 按
    priority=high
    region=us-east
    路由

Performance Considerations

性能考量

1. Queue Configuration

1. 队列配置

  • Durable queues: Messages persist across broker restarts (use for critical tasks)
  • Transient queues: Faster but messages lost on restart (use for disposable tasks)
  • Queue length limits: Prevent memory issues with
    x-max-length
  • 持久化队列: 消息在中间件重启后保留(用于关键任务)
  • 临时队列: 速度更快但重启后消息丢失(用于可丢弃任务)
  • 队列长度限制: 使用
    x-max-length
    防止内存问题

2. Prefetch Settings

2. 预取设置

  • CPU-bound tasks: Low prefetch (1-2) to prevent blocking
  • I/O-bound tasks: High prefetch (10+) to keep workers busy
  • Configure per worker:
    celery worker --prefetch-multiplier=4
  • CPU密集型任务: 低预取(1-2)以避免阻塞
  • I/O密集型任务: 高预取(10+)以保持Worker忙碌
  • 按Worker配置:
    celery worker --prefetch-multiplier=4

3. Priority Queue Performance

3. 优先级队列性能

  • RabbitMQ: Native priority support, efficient
  • Redis: Priority emulation via separate queues, less efficient
  • Trade-off: Priority checking adds overhead, use only when needed
  • RabbitMQ: 原生支持优先级,效率高
  • Redis: 通过独立队列模拟优先级,效率较低
  • 权衡: 优先级检查会增加开销,仅在需要时使用

Security Compliance

安全合规

This skill follows strict security rules:
  • All code examples use placeholder values only
  • No real broker credentials, passwords, or secrets
  • Environment variable references in all code (BROKER_URL, BACKEND_URL)
  • .gitignore
    protection documented
  • Broker URLs use placeholder format:
    amqp://user:password@localhost:5672//
Never hardcode:
python
undefined
本Skill遵循严格的安全规则:
  • 所有代码示例仅使用占位符值
  • 无真实的中间件凭据、密码或密钥
  • 所有代码中使用环境变量引用(BROKER_URL、BACKEND_URL)
  • 文档中包含
    .gitignore
    保护说明
  • 中间件URL使用占位符格式:
    amqp://user:password@localhost:5672//
切勿硬编码:
python
undefined

❌ WRONG

❌ 错误写法

BROKER_URL = 'amqp://myuser:secretpass123@rabbitmq.example.com:5672//'
BROKER_URL = 'amqp://myuser:secretpass123@rabbitmq.example.com:5672//'

✅ CORRECT

✅ 正确写法

import os BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@localhost:5672//')
undefined
import os BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@localhost:5672//')
undefined

Best Practices

最佳实践

  1. Use Environment-Based Configuration - Different queues for dev/staging/prod
  2. Separate Critical Tasks - High-priority queue for time-sensitive operations
  3. Match Worker to Task Type - CPU workers for compute, I/O workers for network/disk
  4. Monitor Queue Lengths - Alert on queue buildup indicating bottlenecks
  5. Use Topic Exchanges for Hierarchical Tasks - Cleaner routing than multiple direct exchanges
  6. Test Routing Configuration - Validate routes before deploying to production
  7. Document Routing Logic - Especially for complex pattern-based routing
  8. Use Priority Sparingly - Overuse defeats the purpose and adds overhead
  9. Configure Prefetch Per Worker Type - Optimize based on task characteristics
  10. Plan for Scaling - Design routing to allow independent queue scaling
  1. 使用基于环境的配置 - 开发/预发布/生产环境使用不同队列
  2. 分离关键任务 - 为时间敏感操作设置高优先级队列
  3. 匹配Worker与任务类型 - CPU密集型任务用CPU Worker,I/O密集型用I/O Worker
  4. 监控队列长度 - 队列堆积时触发警报,指示瓶颈
  5. 对分层任务使用主题交换机 - 比多个直连交换机的路由更简洁
  6. 测试路由配置 - 部署到生产环境前验证路由
  7. 记录路由逻辑 - 尤其是复杂的基于模式的路由
  8. 谨慎使用优先级 - 过度使用会失去意义并增加开销
  9. 按Worker类型配置预取 - 根据任务特性优化
  10. 规划扩展方案 - 设计路由以支持独立的队列扩展

Requirements

要求

  • Celery 5.0+
  • Message broker (RabbitMQ 3.8+ or Redis 6.0+)
  • Python 3.8+
  • kombu library (included with Celery)
  • Environment variables:
    • CELERY_BROKER_URL
      (required)
    • CELERY_RESULT_BACKEND
      (optional)
  • For RabbitMQ priority queues: RabbitMQ 3.5+
  • For testing scripts: netcat/telnet for connectivity checks
  • Celery 5.0+
  • 消息中间件(RabbitMQ 3.8+ 或 Redis 6.0+)
  • Python 3.8+
  • kombu库(随Celery附带)
  • 环境变量:
    • CELERY_BROKER_URL
      (必填)
    • CELERY_RESULT_BACKEND
      (可选)
  • 对于RabbitMQ优先级队列:RabbitMQ 3.5+
  • 对于测试脚本:netcat/telnet用于连通性检查

Progressive Disclosure

进阶内容

For advanced routing patterns, see:
  • examples/priority-queue-setup.md
    - Priority queue implementation
  • examples/topic-routing.md
    - Topic exchange patterns
  • examples/worker-queue-assignment.md
    - Worker pool strategies
如需了解高级路由模式,请查看:
  • examples/priority-queue-setup.md
    - 优先级队列实现
  • examples/topic-routing.md
    - 主题交换机模式
  • examples/worker-queue-assignment.md
    - Worker池策略

Troubleshooting

故障排除

Queue Not Receiving Tasks

队列未接收任务

  1. Check routing configuration matches task name
  2. Verify queue declaration in CELERY_QUEUES
  3. Ensure workers are listening to correct queues
  4. Check broker connectivity with test script
  1. 检查路由配置与任务名称是否匹配
  2. 验证CELERY_QUEUES中是否声明了该队列
  3. 确保Worker正在监听正确的队列
  4. 使用测试脚本检查中间件连通性

Priority Not Working

优先级不生效

  1. Verify
    x-max-priority
    set on queue (RabbitMQ only)
  2. Check tasks are setting priority correctly
  3. Confirm workers consuming from priority queue
  4. Redis: Implement separate high/low priority queues
  1. 验证队列是否设置了
    x-max-priority
    (仅RabbitMQ)
  2. 检查任务是否正确设置了优先级
  3. 确认Worker正在消费优先级队列
  4. Redis:通过独立的高/低优先级队列实现

Worker Not Processing Tasks

Worker未处理任务

  1. Verify worker queue list matches routed queues
  2. Check prefetch_multiplier isn't too low
  3. Ensure no task failures blocking queue
  4. Monitor worker logs for errors

Plugin: celery Version: 1.0.0 Last Updated: 2025-11-16
  1. 验证Worker队列列表与路由队列是否匹配
  2. 检查prefetch_multiplier是否过低
  3. 确保没有任务失败阻塞队列
  4. 监控Worker日志查找错误

Plugin: celery Version: 1.0.0 Last Updated: 2025-11-16