airflow-dag-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Airflow DAG Patterns

Apache Airflow DAG 模式

Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.
生产就绪的Apache Airflow模式,包括DAG设计、算子、传感器、测试和部署策略。

When to Use This Skill

何时使用该技能

  • Creating data pipeline orchestration with Airflow
  • Designing DAG structures and dependencies
  • Implementing custom operators and sensors
  • Testing Airflow DAGs locally
  • Setting up Airflow in production
  • Debugging failed DAG runs
  • 使用Airflow创建数据管道编排
  • 设计DAG结构与依赖关系
  • 实现自定义算子和传感器
  • 本地测试Airflow DAG
  • 搭建生产环境Airflow
  • 调试失败的DAG运行

Core Concepts

核心概念

1. DAG Design Principles

1. DAG 设计原则

PrincipleDescription
IdempotentRunning twice produces same result
AtomicTasks succeed or fail completely
IncrementalProcess only new/changed data
ObservableLogs, metrics, alerts at every step
原则描述
幂等性运行两次产生相同结果
原子性任务完全成功或失败
增量处理仅处理新增/变更的数据
可观测性每个步骤都有日志、指标和告警

2. Task Dependencies

2. 任务依赖关系

python
undefined
python
undefined

Linear

线性依赖

task1 >> task2 >> task3
task1 >> task2 >> task3

Fan-out

扇出

task1 >> [task2, task3, task4]
task1 >> [task2, task3, task4]

Fan-in

扇入

[task1, task2, task3] >> task4
[task1, task2, task3] >> task4

Complex

复杂依赖

task1 >> task2 >> task4 task1 >> task3 >> task4
undefined
task1 >> task2 >> task4 task1 >> task3 >> task4
undefined

Quick Start

快速开始

python
undefined
python
undefined

dags/example_dag.py

dags/example_dag.py

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator
default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(hours=1), }
with DAG( dag_id='example_etl', default_args=default_args, description='Example ETL pipeline', schedule='0 6 * * *', # Daily at 6 AM start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'example'], max_active_runs=1, ) as dag:
start = EmptyOperator(task_id='start')

def extract_data(**context):
    execution_date = context['ds']
    # Extract logic here
    return {'records': 1000}

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
)

end = EmptyOperator(task_id='end')

start >> extract >> end
undefined
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator
default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(hours=1), }
with DAG( dag_id='example_etl', default_args=default_args, description='Example ETL pipeline', schedule='0 6 * * *', # 每日早6点 start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'example'], max_active_runs=1, ) as dag:
start = EmptyOperator(task_id='start')

def extract_data(**context):
    execution_date = context['ds']
    # 提取逻辑写在这里
    return {'records': 1000}

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
)

end = EmptyOperator(task_id='end')

start >> extract >> end
undefined

Patterns

模式示例

Pattern 1: TaskFlow API (Airflow 2.0+)

模式1:TaskFlow API(Airflow 2.0+)

python
undefined
python
undefined

dags/taskflow_example.py

dags/taskflow_example.py

from datetime import datetime from airflow.decorators import dag, task from airflow.models import Variable
@dag( dag_id='taskflow_etl', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'taskflow'], ) def taskflow_etl(): """ETL pipeline using TaskFlow API"""
@task()
def extract(source: str) -> dict:
    """Extract data from source"""
    import pandas as pd

    df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
    return {'data': df.to_dict(), 'rows': len(df)}

@task()
def transform(extracted: dict) -> dict:
    """Transform extracted data"""
    import pandas as pd

    df = pd.DataFrame(extracted['data'])
    df['processed_at'] = datetime.now()
    df = df.dropna()
    return {'data': df.to_dict(), 'rows': len(df)}

@task()
def load(transformed: dict, target: str):
    """Load data to target"""
    import pandas as pd

    df = pd.DataFrame(transformed['data'])
    df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
    return transformed['rows']

@task()
def notify(rows_loaded: int):
    """Send notification"""
    print(f'Loaded {rows_loaded} rows')

# Define dependencies with XCom passing
extracted = extract(source='raw_data')
transformed = transform(extracted)
loaded = load(transformed, target='processed_data')
notify(loaded)
from datetime import datetime from airflow.decorators import dag, task from airflow.models import Variable
@dag( dag_id='taskflow_etl', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'taskflow'], ) def taskflow_etl(): """使用TaskFlow API的ETL管道"""
@task()
def extract(source: str) -> dict:
    """从数据源提取数据"""
    import pandas as pd

    df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
    return {'data': df.to_dict(), 'rows': len(df)}

@task()
def transform(extracted: dict) -> dict:
    """转换提取的数据"""
    import pandas as pd

    df = pd.DataFrame(extracted['data'])
    df['processed_at'] = datetime.now()
    df = df.dropna()
    return {'data': df.to_dict(), 'rows': len(df)}

@task()
def load(transformed: dict, target: str):
    """将数据加载到目标位置"""
    import pandas as pd

    df = pd.DataFrame(transformed['data'])
    df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
    return transformed['rows']

@task()
def notify(rows_loaded: int):
    """发送通知"""
    print(f'已加载 {rows_loaded} 行数据')

# 通过XCom传递定义依赖关系
extracted = extract(source='raw_data')
transformed = transform(extracted)
loaded = load(transformed, target='processed_data')
notify(loaded)

Instantiate the DAG

实例化DAG

taskflow_etl()
undefined
taskflow_etl()
undefined

Pattern 2: Dynamic DAG Generation

模式2:动态DAG生成

python
undefined
python
undefined

dags/dynamic_dag_factory.py

dags/dynamic_dag_factory.py

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable import json
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable import json

Configuration for multiple similar pipelines

多个相似管道的配置

PIPELINE_CONFIGS = [ {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'}, {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'}, {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'}, ]
def create_dag(config: dict) -> DAG: """Factory function to create DAGs from config"""
dag_id = f"etl_{config['name']}"

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id=dag_id,
    default_args=default_args,
    schedule=config['schedule'],
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'dynamic', config['name']],
)

with dag:
    def extract_fn(source, **context):
        print(f"Extracting from {source} for {context['ds']}")

    def transform_fn(**context):
        print(f"Transforming data for {context['ds']}")

    def load_fn(table_name, **context):
        print(f"Loading to {table_name} for {context['ds']}")

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_fn,
        op_kwargs={'source': config['source']},
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_fn,
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_fn,
        op_kwargs={'table_name': config['name']},
    )

    extract >> transform >> load

return dag
PIPELINE_CONFIGS = [ {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'}, {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'}, {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'}, ]
def create_dag(config: dict) -> DAG: """从配置创建DAG的工厂函数"""
dag_id = f"etl_{config['name']}"

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id=dag_id,
    default_args=default_args,
    schedule=config['schedule'],
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'dynamic', config['name']],
)

with dag:
    def extract_fn(source, **context):
        print(f"正在从 {source} 提取 {context['ds']} 的数据")

    def transform_fn(**context):
        print(f"正在转换 {context['ds']} 的数据")

    def load_fn(table_name, **context):
        print(f"正在将 {context['ds']} 的数据加载到 {table_name}")

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_fn,
        op_kwargs={'source': config['source']},
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_fn,
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_fn,
        op_kwargs={'table_name': config['name']},
    )

    extract >> transform >> load

return dag

Generate DAGs

生成DAG

for config in PIPELINE_CONFIGS: globals()[f"dag_{config['name']}"] = create_dag(config)
undefined
for config in PIPELINE_CONFIGS: globals()[f"dag_{config['name']}"] = create_dag(config)
undefined

Pattern 3: Branching and Conditional Logic

模式3:分支与条件逻辑

python
undefined
python
undefined

dags/branching_example.py

dags/branching_example.py

from airflow.decorators import dag, task from airflow.operators.python import BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.utils.trigger_rule import TriggerRule
@dag( dag_id='branching_pipeline', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) def branching_pipeline():
@task()
def check_data_quality() -> dict:
    """Check data quality and return metrics"""
    quality_score = 0.95  # Simulated
    return {'score': quality_score, 'rows': 10000}

def choose_branch(**context) -> str:
    """Determine which branch to execute"""
    ti = context['ti']
    metrics = ti.xcom_pull(task_ids='check_data_quality')

    if metrics['score'] >= 0.9:
        return 'high_quality_path'
    elif metrics['score'] >= 0.7:
        return 'medium_quality_path'
    else:
        return 'low_quality_path'

quality_check = check_data_quality()

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

high_quality = EmptyOperator(task_id='high_quality_path')
medium_quality = EmptyOperator(task_id='medium_quality_path')
low_quality = EmptyOperator(task_id='low_quality_path')

# Join point - runs after any branch completes
join = EmptyOperator(
    task_id='join',
    trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
branching_pipeline()
undefined
from airflow.decorators import dag, task from airflow.operators.python import BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.utils.trigger_rule import TriggerRule
@dag( dag_id='branching_pipeline', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) def branching_pipeline():
@task()
def check_data_quality() -> dict:
    """检查数据质量并返回指标"""
    quality_score = 0.95  # 模拟值
    return {'score': quality_score, 'rows': 10000}

def choose_branch(**context) -> str:
    """决定执行哪个分支"""
    ti = context['ti']
    metrics = ti.xcom_pull(task_ids='check_data_quality')

    if metrics['score'] >= 0.9:
        return 'high_quality_path'
    elif metrics['score'] >= 0.7:
        return 'medium_quality_path'
    else:
        return 'low_quality_path'

quality_check = check_data_quality()

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

high_quality = EmptyOperator(task_id='high_quality_path')
medium_quality = EmptyOperator(task_id='medium_quality_path')
low_quality = EmptyOperator(task_id='low_quality_path')

# 汇合点 - 任意分支完成后运行
join = EmptyOperator(
    task_id='join',
    trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
branching_pipeline()
undefined

Pattern 4: Sensors and External Dependencies

模式4:传感器与外部依赖

python
undefined
python
undefined

dags/sensor_patterns.py

dags/sensor_patterns.py

from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.filesystem import FileSensor from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.python import PythonOperator
with DAG( dag_id='sensor_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) as dag:
# Wait for file on S3
wait_for_file = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_name='data-lake',
    bucket_key='raw/{{ ds }}/data.parquet',
    aws_conn_id='aws_default',
    timeout=60 * 60 * 2,  # 2 hours
    poke_interval=60 * 5,  # Check every 5 minutes
    mode='reschedule',  # Free up worker slot while waiting
)

# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_upstream_dag',
    external_dag_id='upstream_etl',
    external_task_id='final_task',
    execution_date_fn=lambda dt: dt,  # Same execution date
    timeout=60 * 60 * 3,
    mode='reschedule',
)

# Custom sensor using @task.sensor decorator
@task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_api() -> PokeReturnValue:
    """Custom sensor for API availability"""
    import requests

    response = requests.get('https://api.example.com/health')
    is_done = response.status_code == 200

    return PokeReturnValue(is_done=is_done, xcom_value=response.json())

api_ready = wait_for_api()

def process_data(**context):
    api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
    print(f"API returned: {api_result}")

process = PythonOperator(
    task_id='process',
    python_callable=process_data,
)

[wait_for_file, wait_for_upstream, api_ready] >> process
undefined
from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.filesystem import FileSensor from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.python import PythonOperator
with DAG( dag_id='sensor_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) as dag:
# 等待S3上的文件
wait_for_file = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_name='data-lake',
    bucket_key='raw/{{ ds }}/data.parquet',
    aws_conn_id='aws_default',
    timeout=60 * 60 * 2,  # 2小时
    poke_interval=60 * 5,  # 每5分钟检查一次
    mode='reschedule',  # 等待时释放工作节点槽位
)

# 等待另一个DAG完成
wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_upstream_dag',
    external_dag_id='upstream_etl',
    external_task_id='final_task',
    execution_date_fn=lambda dt: dt,  # 相同执行日期
    timeout=60 * 60 * 3,
    mode='reschedule',
)

# 使用@task.sensor装饰器的自定义传感器
@task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_api() -> PokeReturnValue:
    """用于API可用性检查的自定义传感器"""
    import requests

    response = requests.get('https://api.example.com/health')
    is_done = response.status_code == 200

    return PokeReturnValue(is_done=is_done, xcom_value=response.json())

api_ready = wait_for_api()

def process_data(**context):
    api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
    print(f"API返回结果: {api_result}")

process = PythonOperator(
    task_id='process',
    python_callable=process_data,
)

[wait_for_file, wait_for_upstream, api_ready] >> process
undefined

Pattern 5: Error Handling and Alerts

模式5:错误处理与告警

python
undefined
python
undefined

dags/error_handling.py

dags/error_handling.py

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule from airflow.models import Variable
def task_failure_callback(context): """Callback on task failure""" task_instance = context['task_instance'] exception = context.get('exception')
# Send to Slack/PagerDuty/etc
message = f"""
Task Failed!
DAG: {task_instance.dag_id}
Task: {task_instance.task_id}
Execution Date: {context['ds']}
Error: {exception}
Log URL: {task_instance.log_url}
"""
# send_slack_alert(message)
print(message)
def dag_failure_callback(context): """Callback on DAG failure""" # Aggregate failures, send summary pass
with DAG( dag_id='error_handling_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, on_failure_callback=dag_failure_callback, default_args={ 'on_failure_callback': task_failure_callback, 'retries': 3, 'retry_delay': timedelta(minutes=5), }, ) as dag:
def might_fail(**context):
    import random
    if random.random() < 0.3:
        raise ValueError("Random failure!")
    return "Success"

risky_task = PythonOperator(
    task_id='risky_task',
    python_callable=might_fail,
)

def cleanup(**context):
    """Cleanup runs regardless of upstream failures"""
    print("Cleaning up...")

cleanup_task = PythonOperator(
    task_id='cleanup',
    python_callable=cleanup,
    trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails
)

def notify_success(**context):
    """Only runs if all upstream succeeded"""
    print("All tasks succeeded!")

success_notification = PythonOperator(
    task_id='notify_success',
    python_callable=notify_success,
    trigger_rule=TriggerRule.ALL_SUCCESS,
)

risky_task >> [cleanup_task, success_notification]
undefined
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule from airflow.models import Variable
def task_failure_callback(context): """任务失败时的回调函数""" task_instance = context['task_instance'] exception = context.get('exception')
# 发送到Slack/PagerDuty等平台
message = f"""
任务失败!
DAG: {task_instance.dag_id}
任务: {task_instance.task_id}
执行日期: {context['ds']}
错误信息: {exception}
日志URL: {task_instance.log_url}
"""
# send_slack_alert(message)
print(message)
def dag_failure_callback(context): """DAG失败时的回调函数""" # 汇总失败情况,发送摘要 pass
with DAG( dag_id='error_handling_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, on_failure_callback=dag_failure_callback, default_args={ 'on_failure_callback': task_failure_callback, 'retries': 3, 'retry_delay': timedelta(minutes=5), }, ) as dag:
def might_fail(**context):
    import random
    if random.random() < 0.3:
        raise ValueError("随机失败!")
    return "成功"

risky_task = PythonOperator(
    task_id='risky_task',
    python_callable=might_fail,
)

def cleanup(**context):
    """无论上游是否失败都执行清理"""
    print("正在清理...")

cleanup_task = PythonOperator(
    task_id='cleanup',
    python_callable=cleanup,
    trigger_rule=TriggerRule.ALL_DONE,  # 即使上游失败也运行
)

def notify_success(**context):
    """仅当所有上游任务成功时运行"""
    print("所有任务执行成功!")

success_notification = PythonOperator(
    task_id='notify_success',
    python_callable=notify_success,
    trigger_rule=TriggerRule.ALL_SUCCESS,
)

risky_task >> [cleanup_task, success_notification]
undefined

Pattern 6: Testing DAGs

模式6:DAG测试

python
undefined
python
undefined

tests/test_dags.py

tests/test_dags.py

import pytest from datetime import datetime from airflow.models import DagBag
@pytest.fixture def dagbag(): return DagBag(dag_folder='dags/', include_examples=False)
def test_dag_loaded(dagbag): """Test that all DAGs load without errors""" assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"
def test_dag_structure(dagbag): """Test specific DAG structure""" dag = dagbag.get_dag('example_etl')
assert dag is not None
assert len(dag.tasks) == 3
assert dag.schedule_interval == '0 6 * * *'
def test_task_dependencies(dagbag): """Test task dependencies are correct""" dag = dagbag.get_dag('example_etl')
extract_task = dag.get_task('extract')
assert 'start' in [t.task_id for t in extract_task.upstream_list]
assert 'end' in [t.task_id for t in extract_task.downstream_list]
def test_dag_integrity(dagbag): """Test DAG has no cycles and is valid""" for dag_id, dag in dagbag.dags.items(): assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"
import pytest from datetime import datetime from airflow.models import DagBag
@pytest.fixture def dagbag(): return DagBag(dag_folder='dags/', include_examples=False)
def test_dag_loaded(dagbag): """测试所有DAG加载无错误""" assert len(dagbag.import_errors) == 0, f"DAG导入错误: {dagbag.import_errors}"
def test_dag_structure(dagbag): """测试特定DAG的结构""" dag = dagbag.get_dag('example_etl')
assert dag is not None
assert len(dag.tasks) == 3
assert dag.schedule_interval == '0 6 * * *'
def test_task_dependencies(dagbag): """测试任务依赖关系是否正确""" dag = dagbag.get_dag('example_etl')
extract_task = dag.get_task('extract')
assert 'start' in [t.task_id for t in extract_task.upstream_list]
assert 'end' in [t.task_id for t in extract_task.downstream_list]
def test_dag_integrity(dagbag): """测试DAG无循环且有效""" for dag_id, dag in dagbag.dags.items(): assert dag.test_cycle() is None, f"{dag_id}中检测到循环"

Test individual task logic

测试单个任务逻辑

def test_extract_function(): """Unit test for extract function""" from dags.example_dag import extract_data
result = extract_data(ds='2024-01-01')
assert 'records' in result
assert isinstance(result['records'], int)
undefined
def test_extract_function(): """提取函数的单元测试""" from dags.example_dag import extract_data
result = extract_data(ds='2024-01-01')
assert 'records' in result
assert isinstance(result['records'], int)
undefined

Project Structure

项目结构

airflow/
├── dags/
│   ├── __init__.py
│   ├── common/
│   │   ├── __init__.py
│   │   ├── operators.py    # Custom operators
│   │   ├── sensors.py      # Custom sensors
│   │   └── callbacks.py    # Alert callbacks
│   ├── etl/
│   │   ├── customers.py
│   │   └── orders.py
│   └── ml/
│       └── training.py
├── plugins/
│   └── custom_plugin.py
├── tests/
│   ├── __init__.py
│   ├── test_dags.py
│   └── test_operators.py
├── docker-compose.yml
└── requirements.txt
airflow/
├── dags/
│   ├── __init__.py
│   ├── common/
│   │   ├── __init__.py
│   │   ├── operators.py    # 自定义算子
│   │   ├── sensors.py      # 自定义传感器
│   │   └── callbacks.py    # 告警回调
│   ├── etl/
│   │   ├── customers.py
│   │   └── orders.py
│   └── ml/
│       └── training.py
├── plugins/
│   └── custom_plugin.py
├── tests/
│   ├── __init__.py
│   ├── test_dags.py
│   └── test_operators.py
├── docker-compose.yml
└── requirements.txt

Best Practices

最佳实践

Do's

建议做法

  • Use TaskFlow API - Cleaner code, automatic XCom
  • Set timeouts - Prevent zombie tasks
  • Use
    mode='reschedule'
    - For sensors, free up workers
  • Test DAGs - Unit tests and integration tests
  • Idempotent tasks - Safe to retry
  • 使用TaskFlow API - 代码更简洁,自动处理XCom
  • 设置超时时间 - 防止僵尸任务
  • 使用
    mode='reschedule'
    - 传感器等待时释放工作节点
  • 测试DAG - 单元测试与集成测试
  • 任务幂等性 - 重试安全

Don'ts

不建议做法

  • Don't use
    depends_on_past=True
    - Creates bottlenecks
  • Don't hardcode dates - Use
    {{ ds }}
    macros
  • Don't use global state - Tasks should be stateless
  • Don't skip catchup blindly - Understand implications
  • Don't put heavy logic in DAG file - Import from modules
  • 不要使用
    depends_on_past=True
    - 会造成瓶颈
  • 不要硬编码日期 - 使用
    {{ ds }}
  • 不要使用全局状态 - 任务应无状态
  • 不要盲目跳过catchup - 要了解其影响
  • 不要在DAG文件中编写重逻辑 - 从模块导入

Resources

参考资源