airflow
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseApache Airflow Skill
Apache Airflow 技能指南
Master Apache Airflow for workflow orchestration, data pipeline automation, and scheduled task management. This skill covers DAG authoring, operators, sensors, hooks, XComs, variables, connections, and deployment patterns.
掌握Apache Airflow,实现工作流编排、数据管道自动化及定时任务管理。本技能涵盖DAG编写、Operator、Sensor、Hook、XCom、变量、连接及部署模式。
When to Use This Skill
何时使用本技能
USE when:
适用场景:
- Building complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows
- Scheduling recurring batch jobs
- Managing workflows with retries and error handling
- Coordinating tasks across multiple systems
- Need visibility into workflow execution history
- Requiring audit trails and lineage tracking
- Building ML pipeline orchestration
- 构建带有任务依赖的复杂数据管道
- 编排ETL/ELT工作流
- 调度重复执行的批处理任务
- 管理具备重试和错误处理机制的工作流
- 协调跨多系统的任务
- 需要查看工作流执行历史
- 要求审计追踪和数据血缘跟踪
- 构建机器学习管道编排
DON'T USE when:
不适用场景:
- Real-time streaming data (use Kafka, Flink)
- Simple cron jobs (use systemd timers, crontab)
- CI/CD pipelines (use GitHub Actions, Jenkins)
- Low-latency requirements (Airflow has scheduler overhead)
- Simple single-task automation (overkill)
- Need visual workflow design for non-developers (use n8n)
- 实时流数据处理(使用Kafka、Flink)
- 简单的定时任务(使用systemd timers、crontab)
- CI/CD管道(使用GitHub Actions、Jenkins)
- 低延迟需求场景(Airflow存在调度开销)
- 简单的单任务自动化(大材小用)
- 非开发人员需要可视化工作流设计(使用n8n)
Prerequisites
前置条件
Installation Options
安装选项
Option 1: pip (Development)
bash
undefined选项1:pip(开发环境)
bash
undefinedCreate virtual environment
Create virtual environment
python -m venv airflow-env
source airflow-env/bin/activate
python -m venv airflow-env
source airflow-env/bin/activate
Set Airflow home
Set Airflow home
export AIRFLOW_HOME=~/airflow
export AIRFLOW_HOME=~/airflow
Install Airflow with constraints
Install Airflow with constraints
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
Initialize database
Initialize database
airflow db init
airflow db init
Create admin user
Create admin user
airflow users create
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
--password admin
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
--password admin
airflow users create
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
--password admin
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
--password admin
Start services
Start services
airflow webserver --port 8080 &
airflow scheduler &
**Option 2: Docker Compose (Recommended)**
```bashairflow webserver --port 8080 &
airflow scheduler &
**选项2:Docker Compose(推荐)**
```bashDownload official docker-compose
Download official docker-compose
Create required directories
Create required directories
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
Initialize
Initialize
docker compose up airflow-init
docker compose up airflow-init
Start services
Start services
docker compose up -d
docker compose up -d
Access UI at http://localhost:8080 (airflow/airflow)
Access UI at http://localhost:8080 (airflow/airflow)
**Option 3: Kubernetes with Helm**
```bash
**选项3:Kubernetes + Helm**
```bashAdd Airflow Helm repo
Add Airflow Helm repo
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm repo add apache-airflow https://airflow.apache.org
helm repo update
Install Airflow
Install Airflow
helm install airflow apache-airflow/airflow
--namespace airflow
--create-namespace
--set executor=KubernetesExecutor
--namespace airflow
--create-namespace
--set executor=KubernetesExecutor
helm install airflow apache-airflow/airflow
--namespace airflow
--create-namespace
--set executor=KubernetesExecutor
--namespace airflow
--create-namespace
--set executor=KubernetesExecutor
Get web UI password
Get web UI password
kubectl get secret --namespace airflow airflow-webserver-secret -o jsonpath="{.data.webserver-secret-key}" | base64 --decode
undefinedkubectl get secret --namespace airflow airflow-webserver-secret -o jsonpath="{.data.webserver-secret-key}" | base64 --decode
undefinedDevelopment Setup
开发环境配置
bash
undefinedbash
undefinedInstall development dependencies
Install development dependencies
pip install apache-airflow[dev,postgres,celery,kubernetes]
pip install apache-airflow[dev,postgres,celery,kubernetes]
Install testing tools
Install testing tools
pip install pytest pytest-airflow
pip install pytest pytest-airflow
Install linting
Install linting
pip install ruff
undefinedpip install ruff
undefinedCore Capabilities
核心功能
1. Basic DAG Structure
1. 基础DAG结构
python
undefinedpython
undefineddags/basic_dag.py
dags/basic_dag.py
"""
Basic DAG demonstrating core Airflow concepts.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
"""
Basic DAG demonstrating core Airflow concepts.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
Default arguments for all tasks
Default arguments for all tasks
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'execution_timeout': timedelta(hours=2),
}
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'execution_timeout': timedelta(hours=2),
}
DAG definition
DAG definition
with DAG(
dag_id='basic_etl_pipeline',
default_args=default_args,
description='Basic ETL pipeline demonstrating core patterns',
schedule_interval='0 6 * * *', # Daily at 6 AM
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'production'],
doc_md="""
## Basic ETL Pipeline
This DAG demonstrates:
- Task dependencies
- Python and Bash operators
- Error handling with retries
- Task documentation
**Owner**: data-team
**Schedule**: Daily at 6 AM UTC
""",) as dag:
# Start marker
start = EmptyOperator(
task_id='start',
doc='Pipeline start marker',
)
# Extract task
def extract_data(**context):
"""Extract data from source systems."""
import logging
logger = logging.getLogger(__name__)
# Access execution context
execution_date = context['ds']
logger.info(f"Extracting data for {execution_date}")
# Simulated extraction
data = {
'records': 1000,
'source': 'database',
'execution_date': execution_date,
}
# Return value available via XCom
return data
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
doc='Extract data from source database',
)
# Transform task
def transform_data(**context):
"""Transform extracted data."""
import logging
logger = logging.getLogger(__name__)
# Pull data from previous task via XCom
ti = context['ti']
extracted_data = ti.xcom_pull(task_ids='extract_data')
logger.info(f"Transforming {extracted_data['records']} records")
# Simulated transformation
transformed = {
**extracted_data,
'records_transformed': extracted_data['records'],
'quality_score': 0.95,
}
return transformed
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
doc='Apply transformations to extracted data',
)
# Load task using Bash
load = BashOperator(
task_id='load_data',
bash_command='''
echo "Loading data to warehouse"
echo "Execution date: {{ ds }}"
echo "Previous task output: {{ ti.xcom_pull(task_ids='transform_data') }}"
''',
doc='Load transformed data to data warehouse',
)
# End marker
end = EmptyOperator(
task_id='end',
doc='Pipeline end marker',
trigger_rule='all_success',
)
# Define dependencies
start >> extract >> transform >> load >> endundefinedwith DAG(
dag_id='basic_etl_pipeline',
default_args=default_args,
description='Basic ETL pipeline demonstrating core patterns',
schedule_interval='0 6 * * *', # Daily at 6 AM
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'production'],
doc_md="""
## Basic ETL Pipeline
This DAG demonstrates:
- Task dependencies
- Python and Bash operators
- Error handling with retries
- Task documentation
**Owner**: data-team
**Schedule**: Daily at 6 AM UTC
""",) as dag:
# Start marker
start = EmptyOperator(
task_id='start',
doc='Pipeline start marker',
)
# Extract task
def extract_data(**context):
"""Extract data from source systems."""
import logging
logger = logging.getLogger(__name__)
# Access execution context
execution_date = context['ds']
logger.info(f"Extracting data for {execution_date}")
# Simulated extraction
data = {
'records': 1000,
'source': 'database',
'execution_date': execution_date,
}
# Return value available via XCom
return data
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
doc='Extract data from source database',
)
# Transform task
def transform_data(**context):
"""Transform extracted data."""
import logging
logger = logging.getLogger(__name__)
# Pull data from previous task via XCom
ti = context['ti']
extracted_data = ti.xcom_pull(task_ids='extract_data')
logger.info(f"Transforming {extracted_data['records']} records")
# Simulated transformation
transformed = {
**extracted_data,
'records_transformed': extracted_data['records'],
'quality_score': 0.95,
}
return transformed
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
doc='Apply transformations to extracted data',
)
# Load task using Bash
load = BashOperator(
task_id='load_data',
bash_command='''
echo "Loading data to warehouse"
echo "Execution date: {{ ds }}"
echo "Previous task output: {{ ti.xcom_pull(task_ids='transform_data') }}"
''',
doc='Load transformed data to data warehouse',
)
# End marker
end = EmptyOperator(
task_id='end',
doc='Pipeline end marker',
trigger_rule='all_success',
)
# Define dependencies
start >> extract >> transform >> load >> endundefined2. Advanced Operators
2. 高级Operator
python
undefinedpython
undefineddags/advanced_operators.py
dags/advanced_operators.py
"""
DAG demonstrating advanced operator patterns.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='advanced_operators_demo',
default_args=default_args,
schedule_interval=None, # Manual trigger only
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['demo', 'advanced'],
) as dag:
# BranchPythonOperator for conditional logic
def choose_branch(**context):
"""Decide which branch to execute based on data."""
import random
# Simulated decision logic
data_volume = random.randint(0, 1000)
context['ti'].xcom_push(key='data_volume', value=data_volume)
if data_volume > 500:
return 'process_large_dataset'
else:
return 'process_small_dataset'
branch_task = BranchPythonOperator(
task_id='branch_on_data_volume',
python_callable=choose_branch,
)
process_large = PythonOperator(
task_id='process_large_dataset',
python_callable=lambda: print("Processing large dataset with parallel workers"),
)
process_small = PythonOperator(
task_id='process_small_dataset',
python_callable=lambda: print("Processing small dataset directly"),
)
# Join branches - triggered when any upstream succeeds
join = EmptyOperator(
task_id='join_branches',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# TaskGroup for logical grouping
from airflow.utils.task_group import TaskGroup
with TaskGroup(group_id='validation_tasks') as validation_group:
validate_schema = PythonOperator(
task_id='validate_schema',
python_callable=lambda: print("Validating data schema"),
)
validate_quality = PythonOperator(
task_id='validate_quality',
python_callable=lambda: print("Validating data quality"),
)
validate_completeness = PythonOperator(
task_id='validate_completeness',
python_callable=lambda: print("Validating data completeness"),
)
# Parallel validation tasks
[validate_schema, validate_quality, validate_completeness]
# Dynamic task mapping (Airflow 2.3+)
def generate_partitions(**context):
"""Generate partition list for dynamic mapping."""
return ['partition_a', 'partition_b', 'partition_c', 'partition_d']
get_partitions = PythonOperator(
task_id='get_partitions',
python_callable=generate_partitions,
)
def process_partition(partition: str):
"""Process a single partition."""
print(f"Processing {partition}")
return f"Processed {partition}"
# Map over partitions dynamically
process_partitions = PythonOperator.partial(
task_id='process_partition',
).expand(op_args=get_partitions.output.map(lambda x: [x]))
# Final aggregation
def aggregate_results(**context):
"""Aggregate results from all partitions."""
ti = context['ti']
results = ti.xcom_pull(task_ids='process_partition', key='return_value')
print(f"Aggregated {len(results)} partition results")
aggregate = PythonOperator(
task_id='aggregate_results',
python_callable=aggregate_results,
)
# Dependencies
branch_task >> [process_large, process_small] >> join
join >> validation_group >> get_partitions >> process_partitions >> aggregateundefined"""
DAG demonstrating advanced operator patterns.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='advanced_operators_demo',
default_args=default_args,
schedule_interval=None, # Manual trigger only
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['demo', 'advanced'],
) as dag:
# BranchPythonOperator for conditional logic
def choose_branch(**context):
"""Decide which branch to execute based on data."""
import random
# Simulated decision logic
data_volume = random.randint(0, 1000)
context['ti'].xcom_push(key='data_volume', value=data_volume)
if data_volume > 500:
return 'process_large_dataset'
else:
return 'process_small_dataset'
branch_task = BranchPythonOperator(
task_id='branch_on_data_volume',
python_callable=choose_branch,
)
process_large = PythonOperator(
task_id='process_large_dataset',
python_callable=lambda: print("Processing large dataset with parallel workers"),
)
process_small = PythonOperator(
task_id='process_small_dataset',
python_callable=lambda: print("Processing small dataset directly"),
)
# Join branches - triggered when any upstream succeeds
join = EmptyOperator(
task_id='join_branches',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# TaskGroup for logical grouping
from airflow.utils.task_group import TaskGroup
with TaskGroup(group_id='validation_tasks') as validation_group:
validate_schema = PythonOperator(
task_id='validate_schema',
python_callable=lambda: print("Validating data schema"),
)
validate_quality = PythonOperator(
task_id='validate_quality',
python_callable=lambda: print("Validating data quality"),
)
validate_completeness = PythonOperator(
task_id='validate_completeness',
python_callable=lambda: print("Validating data completeness"),
)
# Parallel validation tasks
[validate_schema, validate_quality, validate_completeness]
# Dynamic task mapping (Airflow 2.3+)
def generate_partitions(**context):
"""Generate partition list for dynamic mapping."""
return ['partition_a', 'partition_b', 'partition_c', 'partition_d']
get_partitions = PythonOperator(
task_id='get_partitions',
python_callable=generate_partitions,
)
def process_partition(partition: str):
"""Process a single partition."""
print(f"Processing {partition}")
return f"Processed {partition}"
# Map over partitions dynamically
process_partitions = PythonOperator.partial(
task_id='process_partition',
).expand(op_args=get_partitions.output.map(lambda x: [x]))
# Final aggregation
def aggregate_results(**context):
"""Aggregate results from all partitions."""
ti = context['ti']
results = ti.xcom_pull(task_ids='process_partition', key='return_value')
print(f"Aggregated {len(results)} partition results")
aggregate = PythonOperator(
task_id='aggregate_results',
python_callable=aggregate_results,
)
# Dependencies
branch_task >> [process_large, process_small] >> join
join >> validation_group >> get_partitions >> process_partitions >> aggregateundefined3. Sensors for Event-Driven Workflows
3. 事件驱动工作流的Sensor
python
undefinedpython
undefineddags/sensor_patterns.py
dags/sensor_patterns.py
"""
DAG demonstrating sensor patterns for event-driven workflows.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.python import PythonSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
default_args = {
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with DAG(
dag_id='sensor_patterns_demo',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['sensors', 'event-driven'],
) as dag:
# FileSensor - Wait for file to appear
wait_for_file = FileSensor(
task_id='wait_for_data_file',
filepath='/data/incoming/{{ ds }}/data.csv',
poke_interval=60, # Check every 60 seconds
timeout=3600, # Timeout after 1 hour
mode='poke', # 'poke' or 'reschedule'
soft_fail=False, # Fail task if timeout
)
# S3KeySensor - Wait for S3 object
wait_for_s3 = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='my-data-bucket',
bucket_key='incoming/{{ ds }}/data.parquet',
aws_conn_id='aws_default',
poke_interval=120,
timeout=7200,
mode='reschedule', # Release worker while waiting
)
# HttpSensor - Wait for API endpoint
wait_for_api = HttpSensor(
task_id='wait_for_api_ready',
http_conn_id='api_connection',
endpoint='/health',
request_params={},
response_check=lambda response: response.json().get('status') == 'ready',
poke_interval=30,
timeout=600,
)
# ExternalTaskSensor - Wait for another DAG
wait_for_upstream_dag = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_data_pipeline',
external_task_id='final_task',
execution_delta=timedelta(hours=0), # Same execution date
poke_interval=60,
timeout=3600,
mode='reschedule',
allowed_states=['success'],
failed_states=['failed', 'skipped'],
)
# PythonSensor - Custom condition
def check_data_quality(**context):
"""Custom sensor logic to check data quality."""
import random
# Simulated quality check
quality_score = random.random()
print(f"Quality score: {quality_score}")
return quality_score > 0.8 # Return True when condition met
wait_for_quality = PythonSensor(
task_id='wait_for_data_quality',
python_callable=check_data_quality,
poke_interval=120,
timeout=1800,
mode='poke',
)
# Process after all sensors pass
def process_data(**context):
"""Process data after all conditions are met."""
print("All sensors passed, processing data...")
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
# Dependencies - all sensors must pass
[wait_for_file, wait_for_s3, wait_for_api, wait_for_upstream_dag, wait_for_quality] >> processundefined"""
DAG demonstrating sensor patterns for event-driven workflows.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.python import PythonSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
default_args = {
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with DAG(
dag_id='sensor_patterns_demo',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['sensors', 'event-driven'],
) as dag:
# FileSensor - Wait for file to appear
wait_for_file = FileSensor(
task_id='wait_for_data_file',
filepath='/data/incoming/{{ ds }}/data.csv',
poke_interval=60, # Check every 60 seconds
timeout=3600, # Timeout after 1 hour
mode='poke', # 'poke' or 'reschedule'
soft_fail=False, # Fail task if timeout
)
# S3KeySensor - Wait for S3 object
wait_for_s3 = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='my-data-bucket',
bucket_key='incoming/{{ ds }}/data.parquet',
aws_conn_id='aws_default',
poke_interval=120,
timeout=7200,
mode='reschedule', # Release worker while waiting
)
# HttpSensor - Wait for API endpoint
wait_for_api = HttpSensor(
task_id='wait_for_api_ready',
http_conn_id='api_connection',
endpoint='/health',
request_params={},
response_check=lambda response: response.json().get('status') == 'ready',
poke_interval=30,
timeout=600,
)
# ExternalTaskSensor - Wait for another DAG
wait_for_upstream_dag = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_data_pipeline',
external_task_id='final_task',
execution_delta=timedelta(hours=0), # Same execution date
poke_interval=60,
timeout=3600,
mode='reschedule',
allowed_states=['success'],
failed_states=['failed', 'skipped'],
)
# PythonSensor - Custom condition
def check_data_quality(**context):
"""Custom sensor logic to check data quality."""
import random
# Simulated quality check
quality_score = random.random()
print(f"Quality score: {quality_score}")
return quality_score > 0.8 # Return True when condition met
wait_for_quality = PythonSensor(
task_id='wait_for_data_quality',
python_callable=check_data_quality,
poke_interval=120,
timeout=1800,
mode='poke',
)
# Process after all sensors pass
def process_data(**context):
"""Process data after all conditions are met."""
print("All sensors passed, processing data...")
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
# Dependencies - all sensors must pass
[wait_for_file, wait_for_s3, wait_for_api, wait_for_upstream_dag, wait_for_quality] >> processundefined4. Hooks for External System Integration
4. 外部系统集成的Hook
python
undefinedpython
undefineddags/hooks_demo.py
dags/hooks_demo.py
"""
DAG demonstrating hook patterns for external system integration.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hooks_integration_demo',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['hooks', 'integration'],
) as dag:
# PostgresHook for database operations
def extract_from_postgres(**context):
"""Extract data from PostgreSQL using hook."""
hook = PostgresHook(postgres_conn_id='postgres_warehouse')
# Execute query and fetch results
sql = """
SELECT id, name, value, created_at
FROM source_table
WHERE created_at >= '{{ ds }}'
AND created_at < '{{ next_ds }}'
"""
# Get connection and execute
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute(sql)
results = cursor.fetchall()
# Or use pandas
df = hook.get_pandas_df(sql)
print(f"Extracted {len(df)} rows")
# Store record count
context['ti'].xcom_push(key='record_count', value=len(df))
return df.to_dict()
extract_postgres = PythonOperator(
task_id='extract_from_postgres',
python_callable=extract_from_postgres,
)
# S3Hook for file operations
def upload_to_s3(**context):
"""Upload processed data to S3."""
import json
from io import BytesIO
hook = S3Hook(aws_conn_id='aws_default')
# Pull data from previous task
ti = context['ti']
data = ti.xcom_pull(task_ids='extract_from_postgres')
# Convert to JSON and upload
json_data = json.dumps(data)
key = f"processed/{{ ds }}/data.json"
hook.load_string(
string_data=json_data,
key=key,
bucket_name='my-data-bucket',
replace=True,
)
print(f"Uploaded to s3://my-data-bucket/{key}")
return key
upload_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3,
)
# HttpHook for API calls
def call_external_api(**context):
"""Make API call using HTTP hook."""
hook = HttpHook(http_conn_id='api_connection', method='POST')
# Prepare payload
payload = {
'execution_date': context['ds'],
'dag_id': context['dag'].dag_id,
'task_id': context['task'].task_id,
}
# Make request
response = hook.run(
endpoint='/api/v1/notify',
data=json.dumps(payload),
headers={'Content-Type': 'application/json'},
)
return response.json()
api_call = PythonOperator(
task_id='call_external_api',
python_callable=call_external_api,
)
# SlackWebhookHook for notifications
def send_slack_notification(**context):
"""Send completion notification to Slack."""
hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
ti = context['ti']
record_count = ti.xcom_pull(task_ids='extract_from_postgres', key='record_count')
message = f"""
:white_check_mark: *Pipeline Completed Successfully*
*DAG*: {context['dag'].dag_id}
*Execution Date*: {context['ds']}
*Records Processed*: {record_count}
"""
hook.send(text=message)
notify_slack = PythonOperator(
task_id='send_slack_notification',
python_callable=send_slack_notification,
)
# Dependencies
extract_postgres >> upload_s3 >> api_call >> notify_slackundefined"""
DAG demonstrating hook patterns for external system integration.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hooks_integration_demo',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['hooks', 'integration'],
) as dag:
# PostgresHook for database operations
def extract_from_postgres(**context):
"""Extract data from PostgreSQL using hook."""
hook = PostgresHook(postgres_conn_id='postgres_warehouse')
# Execute query and fetch results
sql = """
SELECT id, name, value, created_at
FROM source_table
WHERE created_at >= '{{ ds }}'
AND created_at < '{{ next_ds }}'
"""
# Get connection and execute
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute(sql)
results = cursor.fetchall()
# Or use pandas
df = hook.get_pandas_df(sql)
print(f"Extracted {len(df)} rows")
# Store record count
context['ti'].xcom_push(key='record_count', value=len(df))
return df.to_dict()
extract_postgres = PythonOperator(
task_id='extract_from_postgres',
python_callable=extract_from_postgres,
)
# S3Hook for file operations
def upload_to_s3(**context):
"""Upload processed data to S3."""
import json
from io import BytesIO
hook = S3Hook(aws_conn_id='aws_default')
# Pull data from previous task
ti = context['ti']
data = ti.xcom_pull(task_ids='extract_from_postgres')
# Convert to JSON and upload
json_data = json.dumps(data)
key = f"processed/{{ ds }}/data.json"
hook.load_string(
string_data=json_data,
key=key,
bucket_name='my-data-bucket',
replace=True,
)
print(f"Uploaded to s3://my-data-bucket/{key}")
return key
upload_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3,
)
# HttpHook for API calls
def call_external_api(**context):
"""Make API call using HTTP hook."""
hook = HttpHook(http_conn_id='api_connection', method='POST')
# Prepare payload
payload = {
'execution_date': context['ds'],
'dag_id': context['dag'].dag_id,
'task_id': context['task'].task_id,
}
# Make request
response = hook.run(
endpoint='/api/v1/notify',
data=json.dumps(payload),
headers={'Content-Type': 'application/json'},
)
return response.json()
api_call = PythonOperator(
task_id='call_external_api',
python_callable=call_external_api,
)
# SlackWebhookHook for notifications
def send_slack_notification(**context):
"""Send completion notification to Slack."""
hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
ti = context['ti']
record_count = ti.xcom_pull(task_ids='extract_from_postgres', key='record_count')
message = f"""
:white_check_mark: *Pipeline Completed Successfully*
*DAG*: {context['dag'].dag_id}
*Execution Date*: {context['ds']}
*Records Processed*: {record_count}
"""
hook.send(text=message)
notify_slack = PythonOperator(
task_id='send_slack_notification',
python_callable=send_slack_notification,
)
# Dependencies
extract_postgres >> upload_s3 >> api_call >> notify_slackundefined5. XCom for Task Communication
5. 任务通信的XCom
python
undefinedpython
undefineddags/xcom_patterns.py
dags/xcom_patterns.py
"""
DAG demonstrating XCom patterns for inter-task communication.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import XCom
default_args = {
'owner': 'data-team',
'retries': 1,
}
with DAG(
dag_id='xcom_patterns_demo',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['xcom', 'communication'],
) as dag:
# Basic XCom push via return value
def produce_data(**context):
"""Produce data - return value auto-pushed to XCom."""
data = {
'records': [
{'id': 1, 'value': 100},
{'id': 2, 'value': 200},
{'id': 3, 'value': 300},
],
'metadata': {
'source': 'api',
'timestamp': str(datetime.now()),
}
}
return data # Automatically pushed to XCom
producer = PythonOperator(
task_id='produce_data',
python_callable=produce_data,
)
# Consume XCom from previous task
def consume_data(**context):
"""Consume data from previous task."""
ti = context['ti']
# Pull return value (default key)
data = ti.xcom_pull(task_ids='produce_data')
print(f"Received {len(data['records'])} records")
# Process and return
total = sum(r['value'] for r in data['records'])
return {'total': total, 'count': len(data['records'])}
consumer = PythonOperator(
task_id='consume_data',
python_callable=consume_data,
)
# Multiple XCom values with custom keys
def produce_multiple(**context):
"""Push multiple XCom values with different keys."""
ti = context['ti']
# Push individual values
ti.xcom_push(key='status', value='success')
ti.xcom_push(key='row_count', value=1000)
ti.xcom_push(key='quality_score', value=0.95)
ti.xcom_push(key='metadata', value={
'source': 'database',
'table': 'transactions',
})
multi_producer = PythonOperator(
task_id='produce_multiple_xcoms',
python_callable=produce_multiple,
)
def consume_multiple(**context):
"""Pull multiple XCom values."""
ti = context['ti']
# Pull specific keys
status = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='status')
row_count = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='row_count')
quality = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='quality_score')
metadata = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='metadata')
print(f"Status: {status}")
print(f"Rows: {row_count}, Quality: {quality}")
print(f"Source: {metadata['source']}")
multi_consumer = PythonOperator(
task_id='consume_multiple_xcoms',
python_callable=consume_multiple,
)
# Cross-DAG XCom (use with caution)
def cross_dag_pull(**context):
"""Pull XCom from another DAG run."""
ti = context['ti']
# Pull from specific DAG
value = ti.xcom_pull(
dag_id='other_dag_id',
task_ids='other_task_id',
key='shared_value',
include_prior_dates=True, # Look at previous runs
)
print(f"Value from other DAG: {value}")
# Template-based XCom access
template_task = PythonOperator(
task_id='template_xcom_access',
python_callable=lambda **ctx: print(ctx['templates_dict']),
templates_dict={
'data': "{{ ti.xcom_pull(task_ids='produce_data') }}",
'status': "{{ ti.xcom_pull(task_ids='produce_multiple_xcoms', key='status') }}",
},
)
# Dependencies
producer >> consumer
multi_producer >> multi_consumer
[consumer, multi_consumer] >> template_taskundefined"""
DAG demonstrating XCom patterns for inter-task communication.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import XCom
default_args = {
'owner': 'data-team',
'retries': 1,
}
with DAG(
dag_id='xcom_patterns_demo',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['xcom', 'communication'],
) as dag:
# Basic XCom push via return value
def produce_data(**context):
"""Produce data - return value auto-pushed to XCom."""
data = {
'records': [
{'id': 1, 'value': 100},
{'id': 2, 'value': 200},
{'id': 3, 'value': 300},
],
'metadata': {
'source': 'api',
'timestamp': str(datetime.now()),
}
}
return data # Automatically pushed to XCom
producer = PythonOperator(
task_id='produce_data',
python_callable=produce_data,
)
# Consume XCom from previous task
def consume_data(**context):
"""Consume data from previous task."""
ti = context['ti']
# Pull return value (default key)
data = ti.xcom_pull(task_ids='produce_data')
print(f"Received {len(data['records'])} records")
# Process and return
total = sum(r['value'] for r in data['records'])
return {'total': total, 'count': len(data['records'])}
consumer = PythonOperator(
task_id='consume_data',
python_callable=consume_data,
)
# Multiple XCom values with custom keys
def produce_multiple(**context):
"""Push multiple XCom values with different keys."""
ti = context['ti']
# Push individual values
ti.xcom_push(key='status', value='success')
ti.xcom_push(key='row_count', value=1000)
ti.xcom_push(key='quality_score', value=0.95)
ti.xcom_push(key='metadata', value={
'source': 'database',
'table': 'transactions',
})
multi_producer = PythonOperator(
task_id='produce_multiple_xcoms',
python_callable=produce_multiple,
)
def consume_multiple(**context):
"""Pull multiple XCom values."""
ti = context['ti']
# Pull specific keys
status = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='status')
row_count = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='row_count')
quality = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='quality_score')
metadata = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='metadata')
print(f"Status: {status}")
print(f"Rows: {row_count}, Quality: {quality}")
print(f"Source: {metadata['source']}")
multi_consumer = PythonOperator(
task_id='consume_multiple_xcoms',
python_callable=consume_multiple,
)
# Cross-DAG XCom (use with caution)
def cross_dag_pull(**context):
"""Pull XCom from another DAG run."""
ti = context['ti']
# Pull from specific DAG
value = ti.xcom_pull(
dag_id='other_dag_id',
task_ids='other_task_id',
key='shared_value',
include_prior_dates=True, # Look at previous runs
)
print(f"Value from other DAG: {value}")
# Template-based XCom access
template_task = PythonOperator(
task_id='template_xcom_access',
python_callable=lambda **ctx: print(ctx['templates_dict']),
templates_dict={
'data': "{{ ti.xcom_pull(task_ids='produce_data') }}",
'status': "{{ ti.xcom_pull(task_ids='produce_multiple_xcoms', key='status') }}",
},
)
# Dependencies
producer >> consumer
multi_producer >> multi_consumer
[consumer, multi_consumer] >> template_taskundefined6. Variables and Connections
6. 变量与连接
python
undefinedpython
undefineddags/config_management.py
dags/config_management.py
"""
DAG demonstrating Variables and Connections for configuration.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.base import BaseHook
default_args = {
'owner': 'data-team',
'retries': 1,
}
with DAG(
dag_id='config_management_demo',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['config', 'variables'],
) as dag:
# Using Airflow Variables
def use_variables(**context):
"""Access Airflow Variables."""
import json
# Get simple variable
environment = Variable.get('environment', default_var='development')
print(f"Environment: {environment}")
# Get JSON variable (auto-deserialize)
config = Variable.get('pipeline_config', deserialize_json=True)
print(f"Config: {config}")
# Set variable programmatically
Variable.set('last_run', str(datetime.now()))
# Get with default
threshold = Variable.get('quality_threshold', default_var='0.9')
return {
'environment': environment,
'config': config,
'threshold': float(threshold),
}
var_task = PythonOperator(
task_id='use_variables',
python_callable=use_variables,
)
# Using Connections
def use_connections(**context):
"""Access Airflow Connections."""
# Get connection object
conn = BaseHook.get_connection('postgres_warehouse')
print(f"Host: {conn.host}")
print(f"Port: {conn.port}")
print(f"Schema: {conn.schema}")
print(f"Login: {conn.login}")
# Password accessible but don't log it: conn.password
# Get extra fields (JSON)
extra = conn.extra_dejson
print(f"Extra config: {extra}")
# Build connection URI
uri = conn.get_uri()
return {
'host': conn.host,
'schema': conn.schema,
}
conn_task = PythonOperator(
task_id='use_connections',
python_callable=use_connections,
)
# Template-based variable access
def template_vars(**context):
"""Access variables via templates."""
# Variables accessible in templates
print(context['var']['value'])
print(context['var']['json'])
template_task = PythonOperator(
task_id='template_variable_access',
python_callable=template_vars,
op_kwargs={
# Access variable in template
'config': "{{ var.json.pipeline_config }}",
'env': "{{ var.value.environment }}",
},
)
# Environment-specific configuration pattern
def environment_config(**context):
"""Load environment-specific configuration."""
import json
env = Variable.get('environment', default_var='development')
# Load environment-specific config
config_key = f'config_{env}'
config = Variable.get(config_key, deserialize_json=True, default_var={})
# Merge with defaults
defaults = {
'batch_size': 1000,
'timeout': 300,
'retry_count': 3,
}
final_config = {**defaults, **config}
print(f"Final config for {env}: {final_config}")
return final_config
env_config = PythonOperator(
task_id='load_environment_config',
python_callable=environment_config,
)
# Dependencies
[var_task, conn_task] >> template_task >> env_configundefined"""
DAG demonstrating Variables and Connections for configuration.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.base import BaseHook
default_args = {
'owner': 'data-team',
'retries': 1,
}
with DAG(
dag_id='config_management_demo',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['config', 'variables'],
) as dag:
# Using Airflow Variables
def use_variables(**context):
"""Access Airflow Variables."""
import json
# Get simple variable
environment = Variable.get('environment', default_var='development')
print(f"Environment: {environment}")
# Get JSON variable (auto-deserialize)
config = Variable.get('pipeline_config', deserialize_json=True)
print(f"Config: {config}")
# Set variable programmatically
Variable.set('last_run', str(datetime.now()))
# Get with default
threshold = Variable.get('quality_threshold', default_var='0.9')
return {
'environment': environment,
'config': config,
'threshold': float(threshold),
}
var_task = PythonOperator(
task_id='use_variables',
python_callable=use_variables,
)
# Using Connections
def use_connections(**context):
"""Access Airflow Connections."""
# Get connection object
conn = BaseHook.get_connection('postgres_warehouse')
print(f"Host: {conn.host}")
print(f"Port: {conn.port}")
print(f"Schema: {conn.schema}")
print(f"Login: {conn.login}")
# Password accessible but don't log it: conn.password
# Get extra fields (JSON)
extra = conn.extra_dejson
print(f"Extra config: {extra}")
# Build connection URI
uri = conn.get_uri()
return {
'host': conn.host,
'schema': conn.schema,
}
conn_task = PythonOperator(
task_id='use_connections',
python_callable=use_connections,
)
# Template-based variable access
def template_vars(**context):
"""Access variables via templates."""
# Variables accessible in templates
print(context['var']['value'])
print(context['var']['json'])
template_task = PythonOperator(
task_id='template_variable_access',
python_callable=template_vars,
op_kwargs={
# Access variable in template
'config': "{{ var.json.pipeline_config }}",
'env': "{{ var.value.environment }}",
},
)
# Environment-specific configuration pattern
def environment_config(**context):
"""Load environment-specific configuration."""
import json
env = Variable.get('environment', default_var='development')
# Load environment-specific config
config_key = f'config_{env}'
config = Variable.get(config_key, deserialize_json=True, default_var={})
# Merge with defaults
defaults = {
'batch_size': 1000,
'timeout': 300,
'retry_count': 3,
}
final_config = {**defaults, **config}
print(f"Final config for {env}: {final_config}")
return final_config
env_config = PythonOperator(
task_id='load_environment_config',
python_callable=environment_config,
)
# Dependencies
[var_task, conn_task] >> template_task >> env_configundefined7. Error Handling and Callbacks
7. 错误处理与回调
python
undefinedpython
undefineddags/error_handling.py
dags/error_handling.py
"""
DAG demonstrating error handling and callback patterns.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
def on_success_callback(context):
"""Callback executed on task success."""
print(f"Task {context['task_instance'].task_id} succeeded!")
# Send success notification, update metrics, etc.
def on_failure_callback(context):
"""Callback executed on task failure."""
import logging
logger = logging.getLogger(name)
ti = context['task_instance']
dag_id = ti.dag_id
task_id = ti.task_id
execution_date = context['execution_date']
exception = context.get('exception')
error_msg = f"""
Task Failed!
DAG: {dag_id}
Task: {task_id}
Execution Date: {execution_date}
Exception: {exception}
"""
logger.error(error_msg)
# Send alert (Slack, PagerDuty, email, etc.)
# slack_hook.send(text=error_msg)def on_retry_callback(context):
"""Callback executed on task retry."""
ti = context['task_instance']
print(f"Task {ti.task_id} retrying (attempt {ti.try_number})")
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback when SLA is missed."""
print(f"SLA missed for tasks: {task_list}")
# Send SLA alert
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'on_success_callback': on_success_callback,
'on_failure_callback': on_failure_callback,
'on_retry_callback': on_retry_callback,
}
with DAG(
dag_id='error_handling_demo',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
sla_miss_callback=sla_miss_callback,
tags=['error-handling', 'callbacks'],
) as dag:
# Task with potential failure
def potentially_failing_task(**context):
"""Task that might fail."""
import random
if random.random() < 0.3:
raise ValueError("Random failure for demonstration")
return "Success!"
risky_task = PythonOperator(
task_id='risky_task',
python_callable=potentially_failing_task,
sla=timedelta(minutes=30), # SLA deadline
)
# Cleanup task - always runs
cleanup = BashOperator(
task_id='cleanup',
bash_command='echo "Cleaning up resources..."',
trigger_rule=TriggerRule.ALL_DONE, # Run regardless of upstream status
)
# Error handler task - only runs on upstream failure
def handle_error(**context):
"""Handle upstream failures."""
ti = context['ti']
upstream_tasks = ti.get_dagrun().get_task_instances()
failed_tasks = [t for t in upstream_tasks if t.state == 'failed']
if failed_tasks:
print(f"Failed tasks: {[t.task_id for t in failed_tasks]}")
# Implement recovery logic
error_handler = PythonOperator(
task_id='handle_errors',
python_callable=handle_error,
trigger_rule=TriggerRule.ONE_FAILED, # Run if any upstream fails
)
# Success notification - only runs if all succeeded
success_notify = BashOperator(
task_id='success_notification',
bash_command='echo "All tasks completed successfully!"',
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# Dependencies
risky_task >> [cleanup, error_handler, success_notify]undefined"""
DAG demonstrating error handling and callback patterns.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
def on_success_callback(context):
"""Callback executed on task success."""
print(f"Task {context['task_instance'].task_id} succeeded!")
# Send success notification, update metrics, etc.
def on_failure_callback(context):
"""Callback executed on task failure."""
import logging
logger = logging.getLogger(name)
ti = context['task_instance']
dag_id = ti.dag_id
task_id = ti.task_id
execution_date = context['execution_date']
exception = context.get('exception')
error_msg = f"""
Task Failed!
DAG: {dag_id}
Task: {task_id}
Execution Date: {execution_date}
Exception: {exception}
"""
logger.error(error_msg)
# Send alert (Slack, PagerDuty, email, etc.)
# slack_hook.send(text=error_msg)def on_retry_callback(context):
"""Callback executed on task retry."""
ti = context['task_instance']
print(f"Task {ti.task_id} retrying (attempt {ti.try_number})")
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Callback when SLA is missed."""
print(f"SLA missed for tasks: {task_list}")
# Send SLA alert
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'on_success_callback': on_success_callback,
'on_failure_callback': on_failure_callback,
'on_retry_callback': on_retry_callback,
}
with DAG(
dag_id='error_handling_demo',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
sla_miss_callback=sla_miss_callback,
tags=['error-handling', 'callbacks'],
) as dag:
# Task with potential failure
def potentially_failing_task(**context):
"""Task that might fail."""
import random
if random.random() < 0.3:
raise ValueError("Random failure for demonstration")
return "Success!"
risky_task = PythonOperator(
task_id='risky_task',
python_callable=potentially_failing_task,
sla=timedelta(minutes=30), # SLA deadline
)
# Cleanup task - always runs
cleanup = BashOperator(
task_id='cleanup',
bash_command='echo "Cleaning up resources..."',
trigger_rule=TriggerRule.ALL_DONE, # Run regardless of upstream status
)
# Error handler task - only runs on upstream failure
def handle_error(**context):
"""Handle upstream failures."""
ti = context['ti']
upstream_tasks = ti.get_dagrun().get_task_instances()
failed_tasks = [t for t in upstream_tasks if t.state == 'failed']
if failed_tasks:
print(f"Failed tasks: {[t.task_id for t in failed_tasks]}")
# Implement recovery logic
error_handler = PythonOperator(
task_id='handle_errors',
python_callable=handle_error,
trigger_rule=TriggerRule.ONE_FAILED, # Run if any upstream fails
)
# Success notification - only runs if all succeeded
success_notify = BashOperator(
task_id='success_notification',
bash_command='echo "All tasks completed successfully!"',
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# Dependencies
risky_task >> [cleanup, error_handler, success_notify]undefined8. Docker and Kubernetes Deployment
8. Docker与Kubernetes部署
yaml
undefinedyaml
undefineddocker-compose.yml - Production-ready Airflow deployment
docker-compose.yml - Production-ready Airflow deployment
version: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.8.1
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# Production settings
AIRFLOW__CORE__PARALLELISM: 32
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 3
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./config:/opt/airflow/config
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
redis:
image: redis:7
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
airflow db init
airflow users create
--username admin
--password admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' user: "0:0"
--username admin
--password admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' user: "0:0"
volumes:
postgres-db-volume:
```yamlversion: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.8.1
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# Production settings
AIRFLOW__CORE__PARALLELISM: 32
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 3
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./config:/opt/airflow/config
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
redis:
image: redis:7
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
airflow db init
airflow users create
--username admin
--password admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' user: "0:0"
--username admin
--password admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' user: "0:0"
volumes:
postgres-db-volume:
```yamlkubernetes/values.yaml - Helm chart values for Kubernetes deployment
kubernetes/values.yaml - Helm chart values for Kubernetes deployment
executor: KubernetesExecutor
executor: KubernetesExecutor
Airflow configuration
Airflow configuration
config:
core:
dags_are_paused_at_creation: 'true'
load_examples: 'false'
parallelism: 32
max_active_tasks_per_dag: 16
scheduler:
parsing_processes: 4
kubernetes:
delete_worker_pods: 'true'
delete_worker_pods_on_failure: 'false'
config:
core:
dags_are_paused_at_creation: 'true'
load_examples: 'false'
parallelism: 32
max_active_tasks_per_dag: 16
scheduler:
parsing_processes: 4
kubernetes:
delete_worker_pods: 'true'
delete_worker_pods_on_failure: 'false'
Web server
Web server
webserver:
replicas: 2
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
webserver:
replicas: 2
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
Scheduler
Scheduler
scheduler:
replicas: 2
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
scheduler:
replicas: 2
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
Worker pod template
Worker pod template
workers:
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
workers:
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
DAGs configuration
DAGs configuration
dags:
persistence:
enabled: true
size: 10Gi
gitSync:
enabled: true
repo: https://github.com/org/airflow-dags.git
branch: main
wait: 60
dags:
persistence:
enabled: true
size: 10Gi
gitSync:
enabled: true
repo: https://github.com/org/airflow-dags.git
branch: main
wait: 60
Logs
Logs
logs:
persistence:
enabled: true
size: 50Gi
logs:
persistence:
enabled: true
size: 50Gi
Database
Database
postgresql:
enabled: true
persistence:
enabled: true
size: 20Gi
postgresql:
enabled: true
persistence:
enabled: true
size: 20Gi
Redis (for Celery if using)
Redis (for Celery if using)
redis:
enabled: false
redis:
enabled: false
Ingress
Ingress
ingress:
enabled: true
web:
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
hosts:
- name: airflow.example.com
tls:
enabled: true
secretName: airflow-tls
undefinedingress:
enabled: true
web:
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
hosts:
- name: airflow.example.com
tls:
enabled: true
secretName: airflow-tls
undefinedIntegration Examples
集成示例
Integration with AWS Services
与AWS服务集成
python
undefinedpython
undefineddags/aws_integration.py
dags/aws_integration.py
"""
DAG integrating with AWS services.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
default_args = {
'owner': 'data-team',
'retries': 2,
}
with DAG(
dag_id='aws_integration_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['aws', 'integration'],
) as dag:
# Upload to S3
upload_to_s3 = LocalFilesystemToS3Operator(
task_id='upload_to_s3',
filename='/data/output/{{ ds }}/data.parquet',
dest_key='raw/{{ ds }}/data.parquet',
dest_bucket='my-data-lake',
aws_conn_id='aws_default',
replace=True,
)
# Run Glue ETL job
run_glue_job = GlueJobOperator(
task_id='run_glue_etl',
job_name='my-etl-job',
script_args={
'--input_path': 's3://my-data-lake/raw/{{ ds }}/',
'--output_path': 's3://my-data-lake/processed/{{ ds }}/',
},
aws_conn_id='aws_default',
wait_for_completion=True,
)
# Query with Athena
run_athena_query = AthenaOperator(
task_id='run_athena_analysis',
query="""
SELECT date, COUNT(*) as count, SUM(value) as total
FROM processed_data
WHERE partition_date = '{{ ds }}'
GROUP BY date
""",
database='analytics',
output_location='s3://my-data-lake/athena-results/',
aws_conn_id='aws_default',
)
# Load to Redshift
load_to_redshift = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='public',
table='fact_daily_metrics',
s3_bucket='my-data-lake',
s3_key='processed/{{ ds }}/',
redshift_conn_id='redshift_warehouse',
aws_conn_id='aws_default',
copy_options=['FORMAT AS PARQUET'],
)
upload_to_s3 >> run_glue_job >> run_athena_query >> load_to_redshiftundefined"""
DAG integrating with AWS services.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
default_args = {
'owner': 'data-team',
'retries': 2,
}
with DAG(
dag_id='aws_integration_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['aws', 'integration'],
) as dag:
# Upload to S3
upload_to_s3 = LocalFilesystemToS3Operator(
task_id='upload_to_s3',
filename='/data/output/{{ ds }}/data.parquet',
dest_key='raw/{{ ds }}/data.parquet',
dest_bucket='my-data-lake',
aws_conn_id='aws_default',
replace=True,
)
# Run Glue ETL job
run_glue_job = GlueJobOperator(
task_id='run_glue_etl',
job_name='my-etl-job',
script_args={
'--input_path': 's3://my-data-lake/raw/{{ ds }}/',
'--output_path': 's3://my-data-lake/processed/{{ ds }}/',
},
aws_conn_id='aws_default',
wait_for_completion=True,
)
# Query with Athena
run_athena_query = AthenaOperator(
task_id='run_athena_analysis',
query="""
SELECT date, COUNT(*) as count, SUM(value) as total
FROM processed_data
WHERE partition_date = '{{ ds }}'
GROUP BY date
""",
database='analytics',
output_location='s3://my-data-lake/athena-results/',
aws_conn_id='aws_default',
)
# Load to Redshift
load_to_redshift = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='public',
table='fact_daily_metrics',
s3_bucket='my-data-lake',
s3_key='processed/{{ ds }}/',
redshift_conn_id='redshift_warehouse',
aws_conn_id='aws_default',
copy_options=['FORMAT AS PARQUET'],
)
upload_to_s3 >> run_glue_job >> run_athena_query >> load_to_redshiftundefinedBest Practices
最佳实践
1. DAG Design Principles
1. DAG设计原则
python
undefinedpython
undefinedUse meaningful DAG and task IDs
Use meaningful DAG and task IDs
dag_id='sales_daily_etl_pipeline' # Good
dag_id='dag1' # Bad
dag_id='sales_daily_etl_pipeline' # Good
dag_id='dag1' # Bad
Set appropriate concurrency limits
Set appropriate concurrency limits
max_active_runs=1 # For data pipelines with dependencies
max_active_tasks_per_dag=16 # Limit resource usage
max_active_runs=1 # For data pipelines with dependencies
max_active_tasks_per_dag=16 # Limit resource usage
Use tags for organization
Use tags for organization
tags=['production', 'etl', 'sales']
tags=['production', 'etl', 'sales']
Always set catchup=False unless backfill needed
Always set catchup=False unless backfill needed
catchup=False
catchup=False
Use execution_timeout to prevent stuck tasks
Use execution_timeout to prevent stuck tasks
execution_timeout=timedelta(hours=2)
undefinedexecution_timeout=timedelta(hours=2)
undefined2. Task Best Practices
2. 任务最佳实践
python
undefinedpython
undefinedKeep tasks atomic and idempotent
Keep tasks atomic and idempotent
def process_partition(partition_date: str):
"""Idempotent: can be safely re-run."""
# Delete existing data for this partition
delete_partition(partition_date)
# Process and insert new data
insert_data(partition_date)
def process_partition(partition_date: str):
"""Idempotent: can be safely re-run."""
# Delete existing data for this partition
delete_partition(partition_date)
# Process and insert new data
insert_data(partition_date)
Use retries with exponential backoff
Use retries with exponential backoff
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
}
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
}
Avoid heavy processing in sensors
Avoid heavy processing in sensors
Bad: sensor does complex computation
Bad: sensor does complex computation
Good: sensor checks simple condition, processing in separate task
Good: sensor checks simple condition, processing in separate task
undefinedundefined3. Configuration Management
3. 配置管理
python
undefinedpython
undefinedUse Variables for configuration, not hardcoded values
Use Variables for configuration, not hardcoded values
batch_size = Variable.get('batch_size', default_var=1000)
batch_size = Variable.get('batch_size', default_var=1000)
Use Connections for credentials
Use Connections for credentials
conn = BaseHook.get_connection('my_database')
conn = BaseHook.get_connection('my_database')
Environment-specific configuration
Environment-specific configuration
env = Variable.get('environment')
config = Variable.get(f'config_{env}', deserialize_json=True)
undefinedenv = Variable.get('environment')
config = Variable.get(f'config_{env}', deserialize_json=True)
undefined4. Testing DAGs
4. DAG测试
python
undefinedpython
undefinedtests/test_dags.py
tests/test_dags.py
import pytest
from airflow.models import DagBag
def test_dag_loads():
"""Test that DAGs load without errors."""
dagbag = DagBag()
assert len(dagbag.import_errors) == 0
def test_dag_structure():
"""Test DAG has expected structure."""
dagbag = DagBag()
dag = dagbag.get_dag('my_pipeline')
assert dag is not None
assert len(dag.tasks) == 5
assert dag.schedule_interval == '@daily'undefinedimport pytest
from airflow.models import DagBag
def test_dag_loads():
"""Test that DAGs load without errors."""
dagbag = DagBag()
assert len(dagbag.import_errors) == 0
def test_dag_structure():
"""Test DAG has expected structure."""
dagbag = DagBag()
dag = dagbag.get_dag('my_pipeline')
assert dag is not None
assert len(dag.tasks) == 5
assert dag.schedule_interval == '@daily'undefinedTroubleshooting
故障排除
Common Issues
常见问题
Issue: DAG not appearing in UI
bash
undefined问题:DAG未在UI中显示
bash
undefinedCheck for import errors
Check for import errors
airflow dags list-import-errors
airflow dags list-import-errors
Validate DAG file
Validate DAG file
python dags/my_dag.py
python dags/my_dag.py
Check scheduler logs
Check scheduler logs
docker logs airflow-scheduler-1 | grep -i error
**Issue: Tasks stuck in queued state**
```bashdocker logs airflow-scheduler-1 | grep -i error
**问题:任务卡在排队状态**
```bashCheck worker status
Check worker status
airflow celery status
airflow celery status
Verify executor configuration
Verify executor configuration
airflow config get-value core executor
airflow config get-value core executor
Check for resource constraints
Check for resource constraints
kubectl top pods -n airflow
**Issue: XCom size limits**
```pythonkubectl top pods -n airflow
**问题:XCom大小限制**
```pythonUse external storage for large data
Use external storage for large data
def store_large_result(**context):
# Store in S3 instead of XCom
s3_hook.load_string(large_data, key='results/data.json', bucket='my-bucket')
return 's3://my-bucket/results/data.json' # Return reference only
**Issue: Scheduler performance**
```yamldef store_large_result(**context):
# Store in S3 instead of XCom
s3_hook.load_string(large_data, key='results/data.json', bucket='my-bucket')
return 's3://my-bucket/results/data.json' # Return reference only
**问题:调度器性能**
```yamlTune scheduler settings
Tune scheduler settings
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 30
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 60
undefinedAIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 30
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 60
undefinedDebugging Tips
调试技巧
python
undefinedpython
undefinedAdd detailed logging
Add detailed logging
import logging
logger = logging.getLogger(name)
def my_task(**context):
logger.info(f"Starting task with context: {context}")
# ... task logic
logger.debug(f"Intermediate result: {result}")
```bashimport logging
logger = logging.getLogger(name)
def my_task(**context):
logger.info(f"Starting task with context: {context}")
# ... task logic
logger.debug(f"Intermediate result: {result}")
```bashTest specific task
Test specific task
airflow tasks test my_dag my_task 2026-01-15
airflow tasks test my_dag my_task 2026-01-15
Clear task state for re-run
Clear task state for re-run
airflow tasks clear my_dag -t my_task -s 2026-01-15 -e 2026-01-15
airflow tasks clear my_dag -t my_task -s 2026-01-15 -e 2026-01-15
Trigger DAG run
Trigger DAG run
airflow dags trigger my_dag --conf '{"key": "value"}'
undefinedairflow dags trigger my_dag --conf '{"key": "value"}'
undefinedVersion History
版本历史
| Version | Date | Changes |
|---|---|---|
| 1.0.0 | 2026-01-17 | Initial release with comprehensive workflow patterns |
| 版本 | 日期 | 变更 |
|---|---|---|
| 1.0.0 | 2026-01-17 | 初始版本,包含全面的工作流模式 |
Resources
参考资源
- Apache Airflow Documentation
- Airflow Best Practices
- Airflow Helm Chart
- Astronomer Guides
- Airflow Providers
This skill provides production-ready patterns for Apache Airflow workflow orchestration, tested across enterprise data pipelines.
本技能提供可用于生产环境的Apache Airflow工作流编排模式,已在企业级数据管道中验证。