migrating-airflow-2-to-3

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Airflow 2 to 3 Migration

Airflow 2 迁移至 3 指南

This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).
Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.
本技能可帮助将Airflow 2.x DAG代码迁移至Airflow 3.x,重点关注代码变更(导入语句、Operator、Hook、上下文、API使用)。
重要提示:在迁移至Airflow 3之前,强烈建议先升级到Airflow 2.11,然后再升级到至少Airflow 3.0.11(理想情况下直接升级到3.1)。其他升级路径会导致无法回滚。参考:https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3。此外,早期的3.0版本存在诸多bug,3.1版本的体验要好得多。

Migration at a Glance

迁移概览

  1. Run Ruff's Airflow migration rules to auto-fix detectable issues (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
    • ruff check --preview --select AIR --fix --unsafe-fixes .
  2. Scan for remaining issues using the manual search checklist in reference/migration-checklist.md.
    • Focus on: direct metadata DB access, legacy imports, scheduling/context keys, XCom pickling, datasets-to-assets, REST API/auth, plugins, and file paths.
    • Hard behavior/config gotchas to explicitly review:
      • Cron scheduling semantics: consider
        AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True
        if you need Airflow 2-style cron data intervals.
      • .airflowignore
        syntax changed from regexp to glob; set
        AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp
        if you must keep regexp behavior.
      • OAuth callback URLs add an
        /auth/
        prefix (e.g.
        /auth/oauth-authorized/google
        ).
  3. Plan changes per file and issue type:
    • Fix imports - update operators/hooks/providers - refactor metadata access to using the Airflow client instead of direct access - fix use of outdated context variables - fix scheduling logic.
  4. Implement changes incrementally, re-running Ruff and code searches after each major change.
  5. Explain changes to the user and caution them to test any updated logic such as refactored metadata, scheduling logic and use of the Airflow context.

  1. 运行Ruff的Airflow迁移规则,自动修复可检测的问题(AIR30/AIR301/AIR302/AIR31/AIR311/AIR312)。
    • ruff check --preview --select AIR --fix --unsafe-fixes .
  2. 使用reference/migration-checklist.md中的手动检查清单扫描剩余问题。
    • 重点关注:直接元数据库访问、旧版导入语句、调度/上下文键、XCom序列化、数据集转资产、REST API/认证、插件和文件路径。
    • 需要特别注意的行为/配置陷阱:
      • Cron调度语义:如果需要Airflow 2风格的Cron数据间隔,请考虑设置
        AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True
      • .airflowignore
        语法从正则表达式改为glob模式;如果必须保留正则表达式行为,请设置
        AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp
      • OAuth回调URL新增
        /auth/
        前缀(例如:
        /auth/oauth-authorized/google
        )。
  3. 按文件和问题类型规划变更:
    • 修复导入语句 - 更新Operator/Hook/提供者 - 将元数据访问重构为使用Airflow客户端而非直接访问 - 修复过时上下文变量的使用 - 修复调度逻辑。
  4. 逐步实施变更,每次重大变更后重新运行Ruff和代码搜索。
  5. 向用户解释变更内容,并提醒他们测试任何更新后的逻辑,例如重构后的元数据、调度逻辑以及Airflow上下文的使用。

Architecture & Metadata DB Access

架构与元数据库访问

Airflow 3 changes how components talk to the metadata database:
  • Workers no longer connect directly to the metadata DB.
  • Task code runs via the Task Execution API exposed by the API server.
  • The DAG processor runs as an independent process separate from the scheduler.
  • The Triggerer uses the task execution mechanism via an in-process API server.
Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via
sync_to_async(...)
(or otherwise ensure hook calls are async-safe).
Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:
text
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x
Airflow 3变更了组件与元数据库的交互方式:
  • Worker不再直接连接到元数据库。
  • 任务代码通过API服务器暴露的任务执行API运行。
  • DAG处理器作为独立进程运行,与调度器分离
  • Triggerer通过进程内API服务器使用任务执行机制。
Trigger实现陷阱:如果Trigger在asyncio事件循环内同步调用Hook,可能会失败或阻塞。建议通过
sync_to_async(...)
调用Hook(或确保Hook调用是异步安全的)。
关键代码影响:任务代码仍可导入ORM会话/模型,但任何尝试使用它们访问元数据库的操作都会失败,并抛出:
text
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x

Patterns to search for

需要搜索的模式

When scanning DAGs, custom operators, and
@task
functions, look for:
  • Session helpers:
    provide_session
    ,
    create_session
    ,
    @provide_session
  • Sessions from settings:
    from airflow.settings import Session
  • Engine access:
    from airflow.settings import engine
  • ORM usage with models:
    session.query(DagModel)...
    ,
    session.query(DagRun)...
扫描DAG、自定义Operator和
@task
函数时,需查找:
  • 会话助手:
    provide_session
    ,
    create_session
    ,
    @provide_session
  • 来自settings的会话:
    from airflow.settings import Session
  • 引擎访问:
    from airflow.settings import engine
  • 使用模型的ORM操作:
    session.query(DagModel)...
    ,
    session.query(DagRun)...

Replacement: Airflow Python client

替代方案:Airflow Python客户端

Preferred for rich metadata access patterns. Add to
requirements.txt
:
text
apache-airflow-client==<your-airflow-runtime-version>
Example usage:
python
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

class ListDagsOperator(BaseOperator):
    def execute(self, context):
        config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
        with airflow_client.client.ApiClient(config) as api_client:
            dag_api = DAGApi(api_client)
            dags = dag_api.get_dags(limit=10)
            self.log.info("Found %d DAGs", len(dags.dags))
适用于复杂的元数据访问场景。添加至
requirements.txt
text
apache-airflow-client==<your-airflow-runtime-version>
使用示例:
python
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

class ListDagsOperator(BaseOperator):
    def execute(self, context):
        config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
        with airflow_client.client.ApiClient(config) as api_client:
            dag_api = DAGApi(api_client)
            dags = dag_api.get_dags(limit=10)
            self.log.info("Found %d DAGs", len(dags.dags))

Replacement: Direct REST API calls

替代方案:直接调用REST API

For simple cases, call the REST API directly using
requests
:
python
from airflow.sdk import task
import os
import requests

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

@task
def list_dags_via_api() -> None:
    response = requests.get(
        f"{_HOST}/api/v2/dags",
        headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
        params={"limit": 10}
    )
    response.raise_for_status()
    print(response.json())

对于简单场景,可使用
requests
直接调用REST API:
python
from airflow.sdk import task
import os
import requests

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

@task
def list_dags_via_api() -> None:
    response = requests.get(
        f"{_HOST}/api/v2/dags",
        headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
        params={"limit": 10}
    )
    response.raise_for_status()
    print(response.json())

Ruff Airflow Migration Rules

Ruff Airflow迁移规则

Use Ruff's Airflow rules to detect and fix many breaking changes automatically.
  • AIR30 / AIR301 / AIR302: Removed code and imports in Airflow 3 - must be fixed.
  • AIR31 / AIR311 / AIR312: Deprecated code and imports - still work but will be removed in future versions; should be fixed.
Commands to run (via
uv
) against the project root:
bash
undefined
使用Ruff的Airflow规则可自动检测并修复许多破坏性变更。
  • AIR30 / AIR301 / AIR302:Airflow 3中移除的代码和导入语句 - 必须修复
  • AIR31 / AIR311 / AIR312:已弃用的代码和导入语句 - 仍可工作但未来版本会移除;建议修复
在项目根目录下通过
uv
运行以下命令:
bash
undefined

Auto-fix all detectable Airflow issues (safe + unsafe)

自动修复所有可检测的Airflow问题(安全+不安全修复)

ruff check --preview --select AIR --fix --unsafe-fixes .
ruff check --preview --select AIR --fix --unsafe-fixes .

Check remaining Airflow issues without fixing

检查剩余Airflow问题但不修复

ruff check --preview --select AIR .

---
ruff check --preview --select AIR .

---

Reference Files

参考文件

For detailed code examples and migration patterns, see:
  • reference/migration-patterns.md - Detailed code examples for:
    • Removed modules and import reorganizations
    • Task SDK and Param usage
    • SubDAGs, SLAs, and removed features
    • Scheduling and context changes
    • XCom pickling removal
    • Datasets to Assets migration
    • DAG bundles and file paths
  • reference/migration-checklist.md - Manual search checklist with:
    • Search patterns for each issue type
    • Recommended fixes
    • FAB plugin warnings
    • Callback and behavior changes

如需详细代码示例和迁移模式,请查看:
  • reference/migration-patterns.md - 详细代码示例,包括:
    • 移除的模块和导入重组
    • Task SDK和Param使用
    • SubDAG、SLA和移除的功能
    • 调度和上下文变更
    • XCom序列化移除
    • 数据集转资产迁移
    • DAG包和文件路径
  • reference/migration-checklist.md - 手动检查清单,包括:
    • 各问题类型的搜索模式
    • 推荐修复方案
    • FAB插件警告
    • 回调和行为变更

Quick Reference Tables

快速参考表

Key Import Changes

关键导入变更

Airflow 2.xAirflow 3
airflow.operators.dummy_operator.DummyOperator
airflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperator
airflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperator
airflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dag
airflow.sdk.dag
airflow.decorators.task
airflow.sdk.task
airflow.datasets.Dataset
airflow.sdk.Asset
Airflow 2.xAirflow 3
airflow.operators.dummy_operator.DummyOperator
airflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperator
airflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperator
airflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dag
airflow.sdk.dag
airflow.decorators.task
airflow.sdk.task
airflow.datasets.Dataset
airflow.sdk.Asset

Context Key Changes

上下文键变更

Removed KeyReplacement
execution_date
context["dag_run"].logical_date
tomorrow_ds
/
yesterday_ds
Use
ds
with date math:
macros.ds_add(ds, 1)
/
macros.ds_add(ds, -1)
prev_ds
/
next_ds
prev_start_date_success
or timetable API
triggering_dataset_events
triggering_asset_events
templates_dict
context["params"]
Asset-triggered runs:
logical_date
may be
None
; use
context["dag_run"].logical_date
defensively.
Cannot trigger with future
logical_date
: Use
logical_date=None
and rely on
run_id
instead.
Cron note: for scheduled runs using cron,
logical_date
semantics differ under
CronTriggerTimetable
(aligning
logical_date
with
run_after
). If you need Airflow 2-style cron data intervals, consider
AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True
.
移除的键替代方案
execution_date
context["dag_run"].logical_date
tomorrow_ds
/
yesterday_ds
结合日期运算使用
ds
macros.ds_add(ds, 1)
/
macros.ds_add(ds, -1)
prev_ds
/
next_ds
prev_start_date_success
或 timetable API
triggering_dataset_events
triggering_asset_events
templates_dict
context["params"]
资产触发的运行
logical_date
可能为
None
;请谨慎使用
context["dag_run"].logical_date
无法使用未来
logical_date
触发
:使用
logical_date=None
并依赖
run_id
Cron注意事项:对于使用Cron的调度运行,
CronTriggerTimetable
下的
logical_date
语义有所不同(将
logical_date
run_after
对齐)。如果需要Airflow 2风格的Cron数据间隔,请考虑设置
AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True

Default Behavior Changes

默认行为变更

SettingAirflow 2 DefaultAirflow 3 Default
schedule
timedelta(days=1)
None
catchup
True
False
设置项Airflow 2 默认值Airflow 3 默认值
schedule
timedelta(days=1)
None
catchup
True
False

Callback Behavior Changes

回调行为变更

  • on_success_callback
    no longer runs on skip; use
    on_skipped_callback
    if needed.
  • @teardown
    with
    TriggerRule.ALWAYS
    not allowed; teardowns now execute even if DAG run terminated early.

  • on_success_callback
    不再在任务跳过时运行;如果需要,请使用
    on_skipped_callback
  • 不允许将
    @teardown
    TriggerRule.ALWAYS
    一起使用;现在即使DAG运行提前终止,清理操作也会执行。

Resources

资源

Related Skills

相关技能

  • testing-dags: For testing DAGs after migration
  • debugging-dags: For troubleshooting migration issues
  • testing-dags:用于迁移后的DAG测试
  • debugging-dags:用于排查迁移问题