celery-advanced

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Advanced Celery Patterns

高级Celery模式

Enterprise-grade task orchestration beyond basic background jobs.
超越基础后台任务的企业级任务编排方案。

Overview

概述

  • Complex multi-step task workflows (ETL pipelines, order processing)
  • Priority-based task processing (premium vs standard users)
  • Rate-limited external API calls (API quotas, throttling)
  • Multi-queue routing (dedicated workers per task type)
  • Production monitoring and observability
  • Task result aggregation and fan-out patterns
  • 复杂的多步骤任务工作流(ETL流水线、订单处理)
  • 基于优先级的任务处理(付费用户 vs 普通用户)
  • 受限速控制的外部API调用(API配额、流量削峰)
  • 多队列路由(为不同任务类型分配专属Worker)
  • 生产环境监控与可观测性
  • 任务结果聚合与扇出模式

Canvas Workflows

画布工作流

Signatures (Task Invocation)

Signatures(任务调用)

python
from celery import signature, chain, group, chord
python
from celery import signature, chain, group, chord

Create a reusable task signature

Create a reusable task signature

sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})

Immutable signature (won't receive results from previous task)

Immutable signature (won't receive results from previous task)

sig = process_order.si(order_id)
sig = process_order.si(order_id)

Partial signature (curry arguments)

Partial signature (curry arguments)

partial_sig = send_email.s(subject="Order Update")
partial_sig = send_email.s(subject="Order Update")

Later: partial_sig.delay(to="user@example.com", body="...")

Later: partial_sig.delay(to="user@example.com", body="...")

undefined
undefined

Chains (Sequential Execution)

Chains(顺序执行)

python
from celery import chain
python
from celery import chain

Tasks execute sequentially, passing results

Tasks execute sequentially, passing results

workflow = chain( extract_data.s(source_id), # Returns raw_data transform_data.s(), # Receives raw_data, returns clean_data load_data.s(destination_id), # Receives clean_data ) result = workflow.apply_async()
workflow = chain( extract_data.s(source_id), # Returns raw_data transform_data.s(), # Receives raw_data, returns clean_data load_data.s(destination_id), # Receives clean_data ) result = workflow.apply_async()

Access intermediate results

Access intermediate results

chain_result = result.get() # Final result parent_result = result.parent.get() # Previous task result
chain_result = result.get() # Final result parent_result = result.parent.get() # Previous task result

Error handling in chains

Error handling in chains

@celery_app.task(bind=True) def transform_data(self, raw_data): try: return do_transform(raw_data) except TransformError as exc: # Chain stops here, no subsequent tasks run raise self.retry(exc=exc, countdown=60)
undefined
@celery_app.task(bind=True) def transform_data(self, raw_data): try: return do_transform(raw_data) except TransformError as exc: # Chain stops here, no subsequent tasks run raise self.retry(exc=exc, countdown=60)
undefined

Groups (Parallel Execution)

Groups(并行执行)

python
from celery import group
python
from celery import group

Execute tasks in parallel

Execute tasks in parallel

parallel = group( process_chunk.s(chunk) for chunk in chunks ) group_result = parallel.apply_async()
parallel = group( process_chunk.s(chunk) for chunk in chunks ) group_result = parallel.apply_async()

Wait for all to complete

Wait for all to complete

results = group_result.get() # List of results
results = group_result.get() # List of results

Check completion status

Check completion status

group_result.ready() # All completed? group_result.successful() # All succeeded? group_result.failed() # Any failed?
group_result.ready() # All completed? group_result.successful() # All succeeded? group_result.failed() # Any failed?

Iterate as they complete

Iterate as they complete

for result in group_result: if result.ready(): print(f"Completed: {result.get()}")
undefined
for result in group_result: if result.ready(): print(f"Completed: {result.get()}")
undefined

Chords (Parallel + Callback)

Chords(并行+回调)

python
from celery import chord
python
from celery import chord

Parallel execution with callback when all complete

Parallel execution with callback when all complete

workflow = chord( [process_chunk.s(chunk) for chunk in chunks], aggregate_results.s() # Receives list of all results ) result = workflow.apply_async()
workflow = chord( [process_chunk.s(chunk) for chunk in chunks], aggregate_results.s() # Receives list of all results ) result = workflow.apply_async()

Chord with header and body

Chord with header and body

header = group(fetch_data.s(url) for url in urls) body = combine_data.s() workflow = chord(header, body)
header = group(fetch_data.s(url) for url in urls) body = combine_data.s() workflow = chord(header, body)

Error handling: if any header task fails, body won't run

Error handling: if any header task fails, body won't run

@celery_app.task(bind=True) def aggregate_results(self, results): # results = [result1, result2, ...] return sum(results)
undefined
@celery_app.task(bind=True) def aggregate_results(self, results): # results = [result1, result2, ...] return sum(results)
undefined

Map and Starmap

Map和Starmap

python
undefined
python
undefined

Map: apply same task to each item

Map: apply same task to each item

workflow = process_item.map([item1, item2, item3])
workflow = process_item.map([item1, item2, item3])

Starmap: unpack args for each call

Starmap: unpack args for each call

workflow = send_email.starmap([ ("user1@example.com", "Subject 1"), ("user2@example.com", "Subject 2"), ])
workflow = send_email.starmap([ ("user1@example.com", "Subject 1"), ("user2@example.com", "Subject 2"), ])

Chunks: split large list into batches

Chunks: split large list into batches

workflow = process_item.chunks(items, batch_size=100)
undefined
workflow = process_item.chunks(items, batch_size=100)
undefined

Priority Queues

优先级队列

Queue Configuration

队列配置

python
undefined
python
undefined

celery_config.py

celery_config.py

from kombu import Queue
celery_app.conf.task_queues = ( Queue("high", routing_key="high"), Queue("default", routing_key="default"), Queue("low", routing_key="low"), )
celery_app.conf.task_default_queue = "default" celery_app.conf.task_default_routing_key = "default"
from kombu import Queue
celery_app.conf.task_queues = ( Queue("high", routing_key="high"), Queue("default", routing_key="default"), Queue("low", routing_key="low"), )
celery_app.conf.task_default_queue = "default" celery_app.conf.task_default_routing_key = "default"

Priority within queue (requires Redis 5+)

Priority within queue (requires Redis 5+)

celery_app.conf.broker_transport_options = { "priority_steps": list(range(10)), # 0-9 priority levels "sep": ":", "queue_order_strategy": "priority", }
undefined
celery_app.conf.broker_transport_options = { "priority_steps": list(range(10)), # 0-9 priority levels "sep": ":", "queue_order_strategy": "priority", }
undefined

Task Routing

任务路由

python
undefined
python
undefined

Route by task name

Route by task name

celery_app.conf.task_routes = { "tasks.critical_task": {"queue": "high"}, "tasks.bulk_": {"queue": "low"}, "tasks.default_": {"queue": "default"}, }
celery_app.conf.task_routes = { "tasks.critical_task": {"queue": "high"}, "tasks.bulk_": {"queue": "low"}, "tasks.default_": {"queue": "default"}, }

Route dynamically at call time

Route dynamically at call time

critical_task.apply_async(args=[data], queue="high", priority=9) bulk_task.apply_async(args=[data], queue="low", priority=1)
critical_task.apply_async(args=[data], queue="high", priority=9) bulk_task.apply_async(args=[data], queue="low", priority=1)

Route by task attribute

Route by task attribute

@celery_app.task(queue="high", priority=8) def premium_user_task(user_id): pass
undefined
@celery_app.task(queue="high", priority=8) def premium_user_task(user_id): pass
undefined

Worker Configuration

Worker配置

bash
undefined
bash
undefined

Start workers for specific queues

Start workers for specific queues

celery -A app worker -Q high -c 4 --prefetch-multiplier=1 celery -A app worker -Q default -c 8 celery -A app worker -Q low -c 2 --prefetch-multiplier=4
undefined
celery -A app worker -Q high -c 4 --prefetch-multiplier=1 celery -A app worker -Q default -c 8 celery -A app worker -Q low -c 2 --prefetch-multiplier=4
undefined

Rate Limiting

速率限制

Per-Task Rate Limits

单任务速率限制

python
@celery_app.task(rate_limit="100/m")  # 100 per minute
def call_external_api(endpoint):
    return requests.get(endpoint)

@celery_app.task(rate_limit="10/s")   # 10 per second
def send_notification(user_id):
    pass

@celery_app.task(rate_limit="1000/h") # 1000 per hour
def bulk_email(batch):
    pass
python
@celery_app.task(rate_limit="100/m")  # 100 per minute
def call_external_api(endpoint):
    return requests.get(endpoint)

@celery_app.task(rate_limit="10/s")   # 10 per second
def send_notification(user_id):
    pass

@celery_app.task(rate_limit="1000/h") # 1000 per hour
def bulk_email(batch):
    pass

Dynamic Rate Limiting

动态速率限制

python
from celery import current_app
python
from celery import current_app

Change rate limit at runtime

Change rate limit at runtime

current_app.control.rate_limit( "tasks.call_external_api", "50/m", # Reduce during high load destination=["worker1@hostname"], )
current_app.control.rate_limit( "tasks.call_external_api", "50/m", # Reduce during high load destination=["worker1@hostname"], )

Custom rate limiter with token bucket

Custom rate limiter with token bucket

from celery.utils.time import rate from celery_singleton import Singleton
class RateLimitedTask(celery_app.Task): _rate_limit_key = "api:rate_limit"
def __call__(self, *args, **kwargs):
    if not self._acquire_token():
        self.retry(countdown=self._get_backoff())
    return super().__call__(*args, **kwargs)

def _acquire_token(self):
    return redis_client.set(
        self._rate_limit_key,
        "1",
        nx=True,
        ex=1  # 1 second window
    )
undefined
from celery.utils.time import rate from celery_singleton import Singleton
class RateLimitedTask(celery_app.Task): _rate_limit_key = "api:rate_limit"
def __call__(self, *args, **kwargs):
    if not self._acquire_token():
        self.retry(countdown=self._get_backoff())
    return super().__call__(*args, **kwargs)

def _acquire_token(self):
    return redis_client.set(
        self._rate_limit_key,
        "1",
        nx=True,
        ex=1  # 1 second window
    )
undefined

Multi-Queue Routing

多队列路由

Router Classes

路由类

python
class TaskRouter:
    def route_for_task(self, task, args=None, kwargs=None):
        if task.startswith("tasks.premium"):
            return {"queue": "premium", "priority": 8}
        elif task.startswith("tasks.analytics"):
            return {"queue": "analytics"}
        elif kwargs and kwargs.get("urgent"):
            return {"queue": "high"}
        return {"queue": "default"}

celery_app.conf.task_routes = (TaskRouter(),)
python
class TaskRouter:
    def route_for_task(self, task, args=None, kwargs=None):
        if task.startswith("tasks.premium"):
            return {"queue": "premium", "priority": 8}
        elif task.startswith("tasks.analytics"):
            return {"queue": "analytics"}
        elif kwargs and kwargs.get("urgent"):
            return {"queue": "high"}
        return {"queue": "default"}

celery_app.conf.task_routes = (TaskRouter(),)

Content-Based Routing

基于内容的路由

python
@celery_app.task(bind=True)
def process_order(self, order):
    # Route based on order value
    if order["total"] > 1000:
        self.update_state(state="ROUTING", meta={"queue": "premium"})
        return chain(
            verify_inventory.s(order).set(queue="high"),
            process_payment.s().set(queue="high"),
            notify_customer.s().set(queue="notifications"),
        ).apply_async()
    else:
        return standard_workflow(order)
python
@celery_app.task(bind=True)
def process_order(self, order):
    # Route based on order value
    if order["total"] > 1000:
        self.update_state(state="ROUTING", meta={"queue": "premium"})
        return chain(
            verify_inventory.s(order).set(queue="high"),
            process_payment.s().set(queue="high"),
            notify_customer.s().set(queue="notifications"),
        ).apply_async()
    else:
        return standard_workflow(order)

Production Monitoring

生产环境监控

Flower Dashboard

Flower仪表盘

bash
undefined
bash
undefined

Install and run Flower

Install and run Flower

pip install flower celery -A app flower --port=5555 --basic_auth=admin:password
pip install flower celery -A app flower --port=5555 --basic_auth=admin:password

With persistent storage

With persistent storage

celery -A app flower --persistent=True --db=flower.db
undefined
celery -A app flower --persistent=True --db=flower.db
undefined

Custom Events

自定义事件

python
from celery import signals

@signals.task_prerun.connect
def on_task_start(sender, task_id, task, args, kwargs, **_):
    metrics.counter("task_started", tags={"task": task.name})

@signals.task_postrun.connect
def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_):
    metrics.counter("task_completed", tags={"task": task.name, "state": state})

@signals.task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_):
    alerting.send_alert(
        f"Task {sender.name} failed: {exception}",
        severity="error"
    )
python
from celery import signals

@signals.task_prerun.connect
def on_task_start(sender, task_id, task, args, kwargs, **_):
    metrics.counter("task_started", tags={"task": task.name})

@signals.task_postrun.connect
def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_):
    metrics.counter("task_completed", tags={"task": task.name, "state": state})

@signals.task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_):
    alerting.send_alert(
        f"Task {sender.name} failed: {exception}",
        severity="error"
    )

Health Checks

健康检查

python
from celery import current_app

def celery_health_check():
    try:
        # Check broker connection
        conn = current_app.connection()
        conn.ensure_connection(max_retries=3)

        # Check workers responding
        inspector = current_app.control.inspect()
        active_workers = inspector.active()

        if not active_workers:
            return {"status": "unhealthy", "reason": "No active workers"}

        return {
            "status": "healthy",
            "workers": list(active_workers.keys()),
            "active_tasks": sum(len(tasks) for tasks in active_workers.values()),
        }
    except Exception as e:
        return {"status": "unhealthy", "reason": str(e)}
python
from celery import current_app

def celery_health_check():
    try:
        # Check broker connection
        conn = current_app.connection()
        conn.ensure_connection(max_retries=3)

        # Check workers responding
        inspector = current_app.control.inspect()
        active_workers = inspector.active()

        if not active_workers:
            return {"status": "unhealthy", "reason": "No active workers"}

        return {
            "status": "healthy",
            "workers": list(active_workers.keys()),
            "active_tasks": sum(len(tasks) for tasks in active_workers.values()),
        }
    except Exception as e:
        return {"status": "unhealthy", "reason": str(e)}

Custom Task States

自定义任务状态

python
from celery import states
python
from celery import states

Define custom states

Define custom states

VALIDATING = "VALIDATING" PROCESSING = "PROCESSING" UPLOADING = "UPLOADING"
@celery_app.task(bind=True) def long_running_task(self, data): self.update_state(state=VALIDATING, meta={"step": 1, "total": 3}) validate(data)
self.update_state(state=PROCESSING, meta={"step": 2, "total": 3})
result = process(data)

self.update_state(state=UPLOADING, meta={"step": 3, "total": 3})
upload(result)

return {"status": "complete", "url": result.url}
VALIDATING = "VALIDATING" PROCESSING = "PROCESSING" UPLOADING = "UPLOADING"
@celery_app.task(bind=True) def long_running_task(self, data): self.update_state(state=VALIDATING, meta={"step": 1, "total": 3}) validate(data)
self.update_state(state=PROCESSING, meta={"step": 2, "total": 3})
result = process(data)

self.update_state(state=UPLOADING, meta={"step": 3, "total": 3})
upload(result)

return {"status": "complete", "url": result.url}

Query task progress

Query task progress

from celery.result import AsyncResult
result = AsyncResult(task_id) if result.state == PROCESSING: print(f"Step {result.info['step']}/{result.info['total']}")
undefined
from celery.result import AsyncResult
result = AsyncResult(task_id) if result.state == PROCESSING: print(f"Step {result.info['step']}/{result.info['total']}")
undefined

Base Tasks and Inheritance

基础任务与继承

python
from celery import Task

class DatabaseTask(Task):
    """Base task with database session management."""
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = create_session()
        return self._db

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self._db:
            self._db.close()
            self._db = None

class RetryableTask(Task):
    """Base task with exponential backoff retry."""
    autoretry_for = (ConnectionError, TimeoutError)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True

@celery_app.task(base=DatabaseTask)
def query_database(query):
    return query_database.db.execute(query)

@celery_app.task(base=RetryableTask)
def call_flaky_api(endpoint):
    return requests.get(endpoint, timeout=30)
python
from celery import Task

class DatabaseTask(Task):
    """Base task with database session management."""
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = create_session()
        return self._db

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self._db:
            self._db.close()
            self._db = None

class RetryableTask(Task):
    """Base task with exponential backoff retry."""
    autoretry_for = (ConnectionError, TimeoutError)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True

@celery_app.task(base=DatabaseTask)
def query_database(query):
    return query_database.db.execute(query)

@celery_app.task(base=RetryableTask)
def call_flaky_api(endpoint):
    return requests.get(endpoint, timeout=30)

Key Decisions

关键决策

DecisionRecommendation
Workflow typeChain for sequential, Group for parallel, Chord for fan-in
Priority queues3 queues (high/default/low) for most use cases
Rate limitingPer-task
rate_limit
for simple, token bucket for complex
MonitoringFlower + custom signals for production
Task routingContent-based router for dynamic routing needs
Worker scalingSeparate workers per queue, autoscale with HPA
Error handlingBase task with retry + dead letter queue
决策项推荐方案
工作流类型顺序任务用Chain,并行任务用Group,扇入场景用Chord
优先级队列大多数场景使用3个队列(high/default/low)
速率限制简单场景用单任务
rate_limit
,复杂场景用令牌桶算法
监控方案生产环境使用Flower+自定义事件信号
任务路由动态路由需求使用基于内容的路由类
Worker扩容为每个队列分配独立Worker,结合HPA实现自动扩容
错误处理基于基础任务实现重试+死信队列机制

Anti-Patterns (FORBIDDEN)

反模式(禁止使用)

python
undefined
python
undefined

NEVER block on results in tasks

NEVER block on results in tasks

@celery_app.task def bad_task(): result = other_task.delay() return result.get() # Blocks worker, causes deadlock!
@celery_app.task def bad_task(): result = other_task.delay() return result.get() # Blocks worker, causes deadlock!

NEVER use synchronous I/O without timeout

NEVER use synchronous I/O without timeout

requests.get(url) # Can hang forever
requests.get(url) # Can hang forever

NEVER ignore task acknowledgment

NEVER ignore task acknowledgment

celery_app.conf.task_acks_late = False # Default loses tasks on crash
celery_app.conf.task_acks_late = False # Default loses tasks on crash

NEVER skip idempotency for retried tasks

NEVER skip idempotency for retried tasks

@celery_app.task(max_retries=3) def create_order(order): Order.create(order) # Creates duplicates on retry!
@celery_app.task(max_retries=3) def create_order(order): Order.create(order) # Creates duplicates on retry!

ALWAYS use immutable signatures in chords

ALWAYS use immutable signatures in chords

chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefined
chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefined

References

参考资料

For detailed implementation patterns, see:
  • references/canvas-workflows.md
    - Deep dive on chain/group/chord with error handling
  • references/priority-queue-setup.md
    - Redis priority queue configuration
  • references/rate-limiting-patterns.md
    - Per-task and dynamic rate limiting
  • references/celery-beat-scheduling.md
    - Periodic task configuration
如需了解详细实现模式,请参考:
  • references/canvas-workflows.md
    - 包含错误处理的Chain/Group/Chord深度解析
  • references/priority-queue-setup.md
    - Redis优先级队列配置指南
  • references/rate-limiting-patterns.md
    - 单任务与动态速率限制实现
  • references/celery-beat-scheduling.md
    - 周期任务配置说明

Templates

模板

Production-ready code templates:
  • scripts/celery-config-template.py
    - Complete production Celery configuration
  • scripts/canvas-workflow-template.py
    - ETL pipeline using canvas patterns
  • scripts/priority-worker-template.py
    - Multi-queue worker with per-user rate limiting
生产环境可用的代码模板:
  • scripts/celery-config-template.py
    - 完整的生产环境Celery配置
  • scripts/canvas-workflow-template.py
    - 使用画布模式的ETL流水线示例
  • scripts/priority-worker-template.py
    - 带用户级速率限制的多队列Worker模板

Checklists

检查清单

  • checklists/celery-production-checklist.md
    - Production deployment verification
  • checklists/celery-production-checklist.md
    - 生产环境部署验证清单

Examples

示例

  • examples/order-processing-pipeline.md
    - Real-world e-commerce order processing
  • examples/order-processing-pipeline.md
    - 真实电商场景的订单处理流水线

Related Skills

相关技能

  • background-jobs
    - Basic Celery and ARQ patterns
  • message-queues
    - RabbitMQ/Kafka integration
  • resilience-patterns
    - Circuit breakers, retries
  • observability-monitoring
    - Metrics and alerting
  • background-jobs
    - 基础Celery与ARQ模式
  • message-queues
    - RabbitMQ/Kafka集成
  • resilience-patterns
    - 熔断器、重试机制
  • observability-monitoring
    - 指标采集与告警

Capability Details

能力详情

canvas-workflows

canvas-workflows

Keywords: chain, group, chord, signature, canvas, workflow Solves:
  • Complex multi-step task pipelines
  • Parallel task execution with aggregation
  • Sequential task dependencies
关键词: chain, group, chord, signature, canvas, workflow 解决问题:
  • 复杂多步骤任务流水线
  • 带聚合的并行任务执行
  • 顺序任务依赖管理

priority-queues

priority-queues

Keywords: priority, queue, routing, high priority, low priority Solves:
  • Premium user task prioritization
  • Urgent vs batch task handling
  • Multi-queue worker deployment
关键词: priority, queue, routing, high priority, low priority 解决问题:
  • 付费用户任务优先级保障
  • 紧急任务与批量任务分离处理
  • 多队列Worker部署

rate-limiting

rate-limiting

Keywords: rate limit, throttle, quota, api limit Solves:
  • External API rate limiting
  • Per-task execution limits
  • Dynamic rate adjustment
关键词: rate limit, throttle, quota, api limit 解决问题:
  • 外部API调用速率控制
  • 单任务执行频次限制
  • 动态速率调整

task-monitoring

task-monitoring

Keywords: flower, monitoring, health check, task state Solves: Production task monitoring, worker health checks, custom task state tracking
关键词: flower, monitoring, health check, task state 解决问题: 生产环境任务监控、Worker健康检查、自定义任务状态追踪