airflow-plugins

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Airflow 3 Plugins

Airflow 3 插件

Airflow 3 plugins let you embed FastAPI apps, React UIs, middleware, macros, operator buttons, and custom timetables directly into the Airflow process. No sidecar, no extra server.
CRITICAL: Plugin components (fastapi_apps, react_apps, external_views) require Airflow 3.1+. NEVER import
flask
,
flask_appbuilder
, or use
appbuilder_views
/
flask_blueprints
— these are Airflow 2 patterns and will not work in Airflow 3. If existing code uses them, rewrite the entire registration block using FastAPI.
Security: FastAPI plugin endpoints are not automatically protected by Airflow auth. If your endpoints need to be private, implement authentication explicitly using FastAPI's security utilities.
Restart required: Changes to Python plugin files require restarting the API server. Static file changes (HTML, JS, CSS) are picked up immediately. Set
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False
during development to load plugins at startup rather than lazily.
Relative paths always: In
external_views
,
href
must have no leading slash. In HTML and JavaScript, use relative paths for all assets and
fetch()
calls. Absolute paths break behind reverse proxies.
Airflow 3 插件可让你将FastAPI应用、React UI、中间件、宏、Operator按钮和自定义时间表直接嵌入Airflow进程中,无需边车容器或额外服务器。
重要提示:插件组件(fastapi_apps、react_apps、external_views)需要 Airflow 3.1+切勿导入
flask
flask_appbuilder
,或使用
appbuilder_views
/
flask_blueprints
—— 这些是Airflow 2的模式,在Airflow 3中无法工作。如果现有代码使用了这些模式,请使用FastAPI重写整个注册块。
安全说明:FastAPI插件端点不会自动受Airflow认证保护。如果你的端点需要私有访问,请使用FastAPI的安全工具显式实现认证。
需要重启:修改Python插件文件后需要重启API服务器。静态文件(HTML、JS、CSS)的更改会立即生效。开发期间设置
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False
,以便在启动时加载插件而非延迟加载。
始终使用相对路径:在
external_views
中,
href
不得有前导斜杠。在HTML和JavaScript中,所有资源和
fetch()
调用都要使用相对路径。绝对路径在反向代理环境下会失效。

Before writing any code, verify

编写代码前请确认

  1. Am I using
    fastapi_apps
    / FastAPI — not
    appbuilder_views
    / Flask?
  2. Are all HTML/JS asset paths and
    fetch()
    calls relative (no leading slash)?
  3. Are all synchronous SDK or SQLAlchemy calls wrapped in
    asyncio.to_thread()
    ?
  4. Do the
    static/
    and
    assets/
    directories exist before the FastAPI app mounts them?
  5. If the endpoint must be private, did I add explicit FastAPI authentication?

  1. 我是否在使用
    fastapi_apps
    /FastAPI —— 而非
    appbuilder_views
    /Flask?
  2. 所有HTML/JS资源路径和
    fetch()
    调用是否均为相对路径(无前导斜杠)?
  3. 所有同步SDK或SQLAlchemy调用是否都用
    asyncio.to_thread()
    包裹?
  4. 在FastAPI应用挂载
    static/
    assets/
    目录前,这些目录是否已存在?
  5. 如果端点需要私有访问,我是否已添加显式的FastAPI认证?

Step 1: Choose plugin components

步骤1:选择插件组件

A single plugin class can register multiple component types at once.
ComponentWhat it doesField
Custom API endpointsFastAPI app mounted in Airflow process
fastapi_apps
Nav / page linkEmbeds a URL as an iframe or links out
external_views
React componentCustom React app embedded in Airflow UI
react_apps
API middlewareIntercepts all Airflow API requests/responses
fastapi_root_middlewares
Jinja macrosReusable Python functions in DAG templates
macros
Task instance buttonExtra link button in task Detail view
operator_extra_links
/
global_operator_extra_links
Custom timetableCustom scheduling logic
timetables
Event hooksListener callbacks for Airflow events
listeners

单个插件类可同时注册多种组件类型。
组件功能字段
自定义API端点挂载到Airflow进程中的FastAPI应用
fastapi_apps
导航/页面链接将URL以iframe形式嵌入或跳转至外部
external_views
React组件嵌入Airflow UI的自定义React应用
react_apps
API中间件拦截所有Airflow API请求/响应
fastapi_root_middlewares
Jinja宏DAG模板中可复用的Python函数
macros
任务实例按钮任务详情视图中的额外链接按钮
operator_extra_links
/
global_operator_extra_links
自定义时间表自定义调度逻辑
timetables
事件钩子Airflow事件的监听器回调
listeners

Step 2: Plugin registration skeleton

步骤2:插件注册骨架

Project file structure

项目文件结构

Give each plugin its own subdirectory under
plugins/
— this keeps the Python file, static assets, and templates together and makes multi-plugin projects manageable:
plugins/
  my-plugin/
    plugin.py       # AirflowPlugin subclass — auto-discovered by Airflow
    static/
      index.html
      app.js
    assets/
      icon.svg
BASE_DIR = Path(__file__).parent
in
plugin.py
resolves to
plugins/my-plugin/
— static and asset paths will be correct relative to that. Create the subdirectory and any static/assets folders before starting Airflow, or
StaticFiles
will raise on import.
python
from pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse

BASE_DIR = Path(__file__).parent

app = FastAPI(title="My Plugin")
plugins/
下为每个插件创建独立子目录 —— 这样可将Python文件、静态资源和模板放在一起,便于管理多插件项目:
plugins/
  my-plugin/
    plugin.py       # AirflowPlugin子类 —— Airflow会自动发现
    static/
      index.html
      app.js
    assets/
      icon.svg
plugin.py
中的
BASE_DIR = Path(__file__).parent
会解析为
plugins/my-plugin/
—— 静态资源和资产路径相对于该目录会是正确的。在启动Airflow前创建子目录和所有静态/资产文件夹,否则
StaticFiles
会在导入时抛出错误。
python
from pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse

BASE_DIR = Path(__file__).parent

app = FastAPI(title="My Plugin")

Both directories must exist before Airflow starts or FastAPI raises on import

Airflow启动前必须存在这两个目录,否则FastAPI会在导入时抛出错误

app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") app.mount("/assets", StaticFiles(directory=BASE_DIR / "assets"), name="assets")
class MyPlugin(AirflowPlugin): name = "my_plugin"
fastapi_apps = [
    {
        "app": app,
        "url_prefix": "/my-plugin",   # plugin available at {AIRFLOW_HOST}/my-plugin/
        "name": "My Plugin",
    }
]

external_views = [
    {
        "name": "My Plugin",
        "href": "my-plugin/ui",              # NO leading slash — breaks on Astro and reverse proxies
        "destination": "nav",                # see locations table below
        "category": "browse",                # nav bar category (nav destination only)
        "url_route": "my-plugin",            # unique route name (required for React apps)
        "icon": "/my-plugin/static/icon.svg" # DOES use a leading slash — served by FastAPI
    }
]
undefined
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") app.mount("/assets", StaticFiles(directory=BASE_DIR / "assets"), name="assets")
class MyPlugin(AirflowPlugin): name = "my_plugin"
fastapi_apps = [
    {
        "app": app,
        "url_prefix": "/my-plugin",   # 插件可通过 {AIRFLOW_HOST}/my-plugin/ 访问
        "name": "My Plugin",
    }
]

external_views = [
    {
        "name": "My Plugin",
        "href": "my-plugin/ui",              # 不要加前导斜杠 —— 在Astro和反向代理环境下会失效
        "destination": "nav",                # 见下方位置表格
        "category": "browse",                # 导航栏分类(仅当destination为nav时需要)
        "url_route": "my-plugin",            # 唯一路由名称(React应用必填)
        "icon": "/my-plugin/static/icon.svg" # 这里需要加前导斜杠 —— 由FastAPI提供服务
    }
]
undefined

External view locations

外部视图位置

destination
Where it appears
"nav"
Left navigation bar (also set
category
)
"dag"
Extra tab on every Dag page
"dag_run"
Extra tab on every Dag run page
"task"
Extra tab on every task page
"task_instance"
Extra tab on every task instance page
destination
显示位置
"nav"
左侧导航栏(需同时设置
category
"dag"
每个DAG页面的额外标签页
"dag_run"
每个DAG运行页面的额外标签页
"task"
每个任务页面的额外标签页
"task_instance"
每个任务实例页面的额外标签页

Nav bar categories (
destination: "nav"
)

导航栏分类(
destination: "nav"

Set
"category"
to place the link under a specific nav group:
"browse"
,
"admin"
, or omit for top-level.
设置
"category"
可将链接放在指定导航组下:
"browse"
"admin"
,或留空放在顶层。

External URLs and minimal plugins

外部URL与极简插件

href
can be a relative path to an internal endpoint (
"my-plugin/ui"
) or a full external URL. A plugin with only
external_views
and no
fastapi_apps
is valid — no backend needed for a simple link or tab:
python
from airflow.plugins_manager import AirflowPlugin

class LearnViewPlugin(AirflowPlugin):
    name = "learn_view_plugin"

    external_views = [
        {
            "name": "Learn Airflow 3",
            "href": "https://www.astronomer.io/docs/learn",
            "destination": "dag",   # adds a tab to every Dag page
            "url_route": "learn"
        }
    ]
The no-leading-slash rule applies to internal paths only — full
https://
URLs are fine.

href
可以是内部端点的相对路径(
"my-plugin/ui"
)或完整外部URL。仅包含
external_views
而无
fastapi_apps
的插件是有效的 —— 简单链接或标签页无需后端支持:
python
from airflow.plugins_manager import AirflowPlugin

class LearnViewPlugin(AirflowPlugin):
    name = "learn_view_plugin"

    external_views = [
        {
            "name": "Learn Airflow 3",
            "href": "https://www.astronomer.io/docs/learn",
            "destination": "dag",   # 为每个DAG页面添加标签页
            "url_route": "learn"
        }
    ]
无前导斜杠规则仅适用于内部路径 —— 完整的
https://
URL不受限制。

Step 3: Serve the UI entry point

步骤3:提供UI入口

python
@app.get("/ui", response_class=FileResponse)
async def serve_ui():
    return FileResponse(BASE_DIR / "static" / "index.html")
In HTML, always use relative paths. Absolute paths break when Airflow is mounted at a sub-path:
html
<!-- correct -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>

<!-- breaks behind a reverse proxy -->
<script src="/my-plugin/static/app.js"></script>
Same rule in JavaScript:
javascript
fetch('api/dags')           // correct — relative to current page
fetch('/my-plugin/api/dags') // breaks on Astro and sub-path deploys

python
@app.get("/ui", response_class=FileResponse)
async def serve_ui():
    return FileResponse(BASE_DIR / "static" / "index.html")
在HTML中,始终使用相对路径。当Airflow挂载在子路径下时,绝对路径会失效:
html
<!-- 正确写法 -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>

<!-- 在反向代理环境下会失效 -->
<script src="/my-plugin/static/app.js"></script>
JavaScript中同样遵循此规则:
javascript
fetch('api/dags')           // 正确 —— 相对于当前页面
fetch('/my-plugin/api/dags') // 在Astro和子路径部署环境下会失效

Step 4: Call the Airflow API from your plugin

步骤4:在插件中调用Airflow API

Only needed if your plugin calls the Airflow REST API. Plugins that only serve static files, register
external_views
, or use direct DB access do not need this step — skip to Step 5 or Step 6.
仅当插件需要调用Airflow REST API时才需要此步骤。仅提供静态文件、注册
external_views
或直接访问数据库的插件无需此步骤 —— 跳至步骤5或步骤6。

Add the dependency

添加依赖

Only if REST API communication is being implemented: add
apache-airflow-client
to the project's dependencies. Check which file exists and act accordingly:
File foundAction
requirements.txt
Append
apache-airflow-client
pyproject.toml
(uv / poetry)
uv add apache-airflow-client
or
poetry add apache-airflow-client
None of the aboveTell the user: "Add
apache-airflow-client
to your dependencies before running the plugin."
Use
apache-airflow-client
to talk to Airflow's own REST API. The SDK is synchronous but FastAPI routes are async — never call blocking SDK methods directly inside
async def
or you will stall the event loop and freeze all concurrent requests.
仅当需要实现REST API通信时:将
apache-airflow-client
添加到项目依赖中。根据找到的文件执行相应操作:
找到的文件操作
requirements.txt
添加
apache-airflow-client
到末尾
pyproject.toml
(uv / poetry)
执行
uv add apache-airflow-client
poetry add apache-airflow-client
以上都没有告知用户:"在运行插件前,请将
apache-airflow-client
添加到你的依赖中。"
使用
apache-airflow-client
与Airflow自身的REST API通信。该SDK是同步的,但FastAPI路由是异步的 —— 切勿在
async def
中直接调用阻塞性SDK方法,否则会阻塞事件循环并冻结所有并发请求。

JWT token management

JWT令牌管理

Cache one token per process. Refresh 5 minutes before the 1-hour expiry. Use double-checked locking so multiple concurrent requests don't all race to refresh simultaneously:
Replace
MYPLUGIN_
with a short uppercase prefix derived from the plugin name (e.g. if the plugin is called "Trip Analyzer", use
TRIP_ANALYZER_
). If no plugin name has been given yet, ask the user before writing env var names.
python
import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests

AIRFLOW_HOST  = os.environ.get("MYPLUGIN_HOST",     "http://localhost:8080")
AIRFLOW_USER  = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS  = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN")    # Astronomer Astro: Deployment API token

_cached_token: str | None = None
_token_expires_at: float  = 0.0
_token_lock = threading.Lock()


def _fetch_fresh_token() -> str:
    """Exchange username/password for a JWT via Airflow's auth endpoint."""
    response = requests.post(
        f"{AIRFLOW_HOST}/auth/token",
        json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
        timeout=10,
    )
    response.raise_for_status()
    return response.json()["access_token"]


def _get_token() -> str:
    # Astronomer Astro production: use static Deployment API token directly
    if AIRFLOW_TOKEN:
        return AIRFLOW_TOKEN
    global _cached_token, _token_expires_at
    now = time.monotonic()
    # Fast path — no lock if still valid
    if _cached_token and now < _token_expires_at:
        return _cached_token
    # Slow path — one thread refreshes, others wait
    with _token_lock:
        if _cached_token and now < _token_expires_at:
            return _cached_token
        _cached_token = _fetch_fresh_token()
        _token_expires_at = now + 55 * 60  # refresh 5 min before 1-hour expiry
    return _cached_token


def _make_config() -> airflow_sdk.Configuration:
    config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
    config.access_token = _get_token()
    return config
After implementing auth, tell the user:
  • Local development: set
    MYPLUGIN_USERNAME
    and
    MYPLUGIN_PASSWORD
    in
    .env
    — JWT exchange happens automatically.
  • Astronomer Astro (production): create a Deployment API token and set it as
    MYPLUGIN_TOKEN
    — the JWT exchange is skipped entirely:
    1. Astro UI → open the Deployment → AccessAPI Tokens+ Deployment API Token
    2. Copy the token value (shown only once)
    3. astro deployment variable create MYPLUGIN_TOKEN=<token>
    MYPLUGIN_USERNAME
    and
    MYPLUGIN_PASSWORD
    are not needed on Astro.
为每个进程缓存一个令牌。在1小时有效期前5分钟刷新令牌。使用双重检查锁,避免多个并发请求同时触发刷新:
MYPLUGIN_
替换为插件名称衍生的短大写前缀(例如,如果插件名为"Trip Analyzer",则使用
TRIP_ANALYZER_
)。如果尚未确定插件名称,请先询问用户再编写环境变量名。
python
import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests

AIRFLOW_HOST  = os.environ.get("MYPLUGIN_HOST",     "http://localhost:8080")
AIRFLOW_USER  = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS  = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN")    # Astronomer Astro:部署API令牌

_cached_token: str | None = None
_token_expires_at: float  = 0.0
_token_lock = threading.Lock()


def _fetch_fresh_token() -> str:
    """通过Airflow的认证端点,将用户名/密码交换为JWT令牌。"""
    response = requests.post(
        f"{AIRFLOW_HOST}/auth/token",
        json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
        timeout=10,
    )
    response.raise_for_status()
    return response.json()["access_token"]


def _get_token() -> str:
    # Astronomer Astro生产环境:直接使用静态部署API令牌
    if AIRFLOW_TOKEN:
        return AIRFLOW_TOKEN
    global _cached_token, _token_expires_at
    now = time.monotonic()
    # 快速路径 —— 令牌仍有效时无需加锁
    if _cached_token and now < _token_expires_at:
        return _cached_token
    # 慢速路径 —— 一个线程负责刷新,其他线程等待
    with _token_lock:
        if _cached_token and now < _token_expires_at:
            return _cached_token
        _cached_token = _fetch_fresh_token()
        _token_expires_at = now + 55 * 60  # 在1小时有效期前5分钟刷新
    return _cached_token


def _make_config() -> airflow_sdk.Configuration:
    config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
    config.access_token = _get_token()
    return config
实现认证后,告知用户:
  • 本地开发:在
    .env
    中设置
    MYPLUGIN_USERNAME
    MYPLUGIN_PASSWORD
    —— JWT交换会自动进行。
  • Astronomer Astro(生产环境):创建部署API令牌并将其设置为
    MYPLUGIN_TOKEN
    —— 可跳过JWT交换:
    1. Astro UI → 打开部署 → 访问API令牌+ 部署API令牌
    2. 复制令牌值(仅显示一次)
    3. 执行
      astro deployment variable create MYPLUGIN_TOKEN=<token>
    在Astro环境下无需
    MYPLUGIN_USERNAME
    MYPLUGIN_PASSWORD

Wrapping SDK calls with asyncio.to_thread

使用asyncio.to_thread包裹SDK调用

python
from fastapi import HTTPException
from airflow_client.client.api import DAGApi

@app.get("/api/dags")
async def list_dags():
    try:
        def _fetch():
            with airflow_sdk.ApiClient(_make_config()) as client:
                return DAGApi(client).get_dags(limit=100).dags
        dags = await asyncio.to_thread(_fetch)
        return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
API field names: Never guess response field names — verify against the REST API reference. Key
DAGResponse
fields:
dag_id
,
dag_display_name
,
description
,
is_paused
,
timetable_summary
,
timetable_description
,
fileloc
,
owners
,
tags
.
The pattern is always: define a plain inner
def _fetch()
with all SDK logic, then
await asyncio.to_thread(_fetch)
.
python
from fastapi import HTTPException
from airflow_client.client.api import DAGApi

@app.get("/api/dags")
async def list_dags():
    try:
        def _fetch():
            with airflow_sdk.ApiClient(_make_config()) as client:
                return DAGApi(client).get_dags(limit=100).dags
        dags = await asyncio.to_thread(_fetch)
        return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
API字段名称:切勿猜测响应字段名称 —— 请对照REST API参考文档进行验证。
DAGResponse
的关键字段包括:
dag_id
dag_display_name
description
is_paused
timetable_summary
timetable_description
fileloc
owners
tags
模式始终是:定义一个纯内部
def _fetch()
包含所有SDK逻辑,然后调用
await asyncio.to_thread(_fetch)

Alternative: Direct database access

替代方案:直接访问数据库

Warning — use with caution and tell the user. The Airflow metadb is not a public interface. Direct writes or poorly-formed queries can corrupt scheduler state. Whenever you use this pattern, explicitly tell the user: "This accesses Airflow's internal database directly. The internal models are not part of the public API, can change between Airflow versions, and incorrect queries can cause issues in the metadb. Prefer
apache-airflow-client
unless the operation is not exposed via the REST API."
Since FastAPI plugin endpoints run inside the API server process (not in a task worker), they have direct access to Airflow's internal SQLAlchemy models — no HTTP round-trip or JWT needed. Use only for read operations not exposed via the REST API, or when the extra HTTP overhead genuinely matters. Always wrap DB calls in
asyncio.to_thread()
— SQLAlchemy queries are blocking.
python
from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session

@app.get("/api/dags/status")
async def dag_status():
    def _fetch():
        @provide_session
        def _query(session=None):
            dagbag = DagBag()
            paused = sum(
                1 for dag_id in dagbag.dags
                if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
                and m.is_paused
            )
            return {"total": len(dagbag.dags), "paused": paused}
        return _query()
    return await asyncio.to_thread(_fetch)

警告 —— 谨慎使用并告知用户。Airflow元数据库并非公共接口。直接写入或格式不当的查询可能会破坏调度器状态。无论何时使用此模式,都要明确告知用户:"此方式直接访问Airflow的内部数据库。内部模型不属于公共API,在不同Airflow版本间可能会变化,错误的查询可能会导致元数据库出现问题。除非操作未通过REST API暴露,否则优先使用
apache-airflow-client
。"
由于FastAPI插件端点运行在API服务器进程中(而非任务工作者),因此可直接访问Airflow的内部SQLAlchemy模型 —— 无需HTTP往返或JWT。仅用于REST API未暴露的读取操作,或当额外HTTP开销确实影响性能时使用。始终用
asyncio.to_thread()
包裹数据库调用 —— SQLAlchemy查询是阻塞性的。
python
from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session

@app.get("/api/dags/status")
async def dag_status():
    def _fetch():
        @provide_session
        def _query(session=None):
            dagbag = DagBag()
            paused = sum(
                1 for dag_id in dagbag.dags
                if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
                and m.is_paused
            )
            return {"total": len(dagbag.dags), "paused": paused}
        return _query()
    return await asyncio.to_thread(_fetch)

Step 5: Common API endpoint patterns

步骤5:常见API端点模式

If you need an SDK method or field not shown in the examples below, verify it before generating code — do not guess. Either run
python3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])"
in any environment where the SDK is installed, or search the
apache/airflow-client-python
repo for the class definition.
python
from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody


@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
    result = await asyncio.to_thread(_run)
    return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}


@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
    await asyncio.to_thread(_run)
    return {"dag_id": dag_id, "is_paused": is_paused}


@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            DAGApi(client).delete_dag(dag_id)
    await asyncio.to_thread(_run)
    return {"deleted": dag_id}


def normalize_state(raw) -> str:
    """Convert SDK enum objects to plain strings before sending to the frontend."""
    if raw is None:
        return "never_run"
    return str(raw).lower()
如果需要示例中未展示的SDK方法或字段,请在生成代码前进行验证 —— 切勿猜测。可在任何安装了SDK的环境中执行
python3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])"
,或在
apache/airflow-client-python
仓库中搜索类定义。
python
from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody


@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
    result = await asyncio.to_thread(_run)
    return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}


@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
    await asyncio.to_thread(_run)
    return {"dag_id": dag_id, "is_paused": is_paused}


@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            DAGApi(client).delete_dag(dag_id)
    await asyncio.to_thread(_run)
    return {"deleted": dag_id}


def normalize_state(raw) -> str:
    """在发送到前端前,将SDK枚举对象转换为普通字符串。"""
    if raw is None:
        return "never_run"
    return str(raw).lower()

DAG runs, task instances, and logs

DAG运行、任务实例与日志

These are the most common calls beyond basic DAG CRUD. For anything not shown here, consult the REST API reference for available endpoints and the matching Python SDK class/method names.
python
from airflow_client.client.api import DagRunApi, TaskInstanceApi
这些是基础DAG增删改查之外最常用的调用。对于此处未展示的内容,请查阅REST API参考文档获取可用端点及匹配的Python SDK类/方法名称。
python
from airflow_client.client.api import DagRunApi, TaskInstanceApi

Latest run for a DAG

DAG的最新运行记录

@app.get("/api/dags/{dag_id}/runs/latest") async def latest_run(dag_id: str): def _fetch(): with airflow_sdk.ApiClient(_make_config()) as client: runs = DagRunApi(client).get_dag_runs(dag_id, limit=1, order_by="-start_date").dag_runs return runs[0] if runs else None run = await asyncio.to_thread(_fetch) if not run: return {"state": "never_run"} return {"run_id": run.dag_run_id, "state": normalize_state(run.state)}
@app.get("/api/dags/{dag_id}/runs/latest") async def latest_run(dag_id: str): def _fetch(): with airflow_sdk.ApiClient(_make_config()) as client: runs = DagRunApi(client).get_dag_runs(dag_id, limit=1, order_by="-start_date").dag_runs return runs[0] if runs else None run = await asyncio.to_thread(_fetch) if not run: return {"state": "never_run"} return {"run_id": run.dag_run_id, "state": normalize_state(run.state)}

Task instances for a specific run

特定运行记录的任务实例

@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks") async def task_instances(dag_id: str, run_id: str): def _fetch(): with airflow_sdk.ApiClient(_make_config()) as client: return TaskInstanceApi(client).get_task_instances(dag_id, run_id).task_instances tasks = await asyncio.to_thread(_fetch) return [{"task_id": t.task_id, "state": normalize_state(t.state)} for t in tasks]
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks") async def task_instances(dag_id: str, run_id: str): def _fetch(): with airflow_sdk.ApiClient(_make_config()) as client: return TaskInstanceApi(client).get_task_instances(dag_id, run_id).task_instances tasks = await asyncio.to_thread(_fetch) return [{"task_id": t.task_id, "state": normalize_state(t.state)} for t in tasks]

Task log (try_number starts at 1)

任务日志(try_number从1开始)

@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks/{task_id}/logs/{try_number}") async def task_log(dag_id: str, run_id: str, task_id: str, try_number: int): def _fetch(): with airflow_sdk.ApiClient(_make_config()) as client: return TaskInstanceApi(client).get_log( dag_id, run_id, task_id, try_number, map_index=-1 ) result = await asyncio.to_thread(_fetch) return {"log": result.content if hasattr(result, "content") else str(result)}
undefined
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks/{task_id}/logs/{try_number}") async def task_log(dag_id: str, run_id: str, task_id: str, try_number: int): def _fetch(): with airflow_sdk.ApiClient(_make_config()) as client: return TaskInstanceApi(client).get_log( dag_id, run_id, task_id, try_number, map_index=-1 ) result = await asyncio.to_thread(_fetch) return {"log": result.content if hasattr(result, "content") else str(result)}
undefined

Streaming proxy

流代理

Use
StreamingResponse
to proxy binary content from an external URL through the plugin — useful when the browser can't fetch the resource directly (CORS, auth, etc.):
python
import requests
from starlette.responses import StreamingResponse

@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
    def _stream():
        r = requests.get(f"https://files.example.com/{filename}", stream=True)
        r.raise_for_status()
        return r
    response = await asyncio.to_thread(_stream)
    return StreamingResponse(
        response.iter_content(chunk_size=8192),
        media_type="application/octet-stream",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )
Note that
requests.get()
is blocking — fetch in
asyncio.to_thread
so the event loop isn't stalled while waiting for the remote server.

使用
StreamingResponse
将外部URL的二进制内容通过插件代理 —— 当浏览器无法直接获取资源时(如跨域、认证等问题)非常有用:
python
import requests
from starlette.responses import StreamingResponse

@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
    def _stream():
        r = requests.get(f"https://files.example.com/{filename}", stream=True)
        r.raise_for_status()
        return r
    response = await asyncio.to_thread(_stream)
    return StreamingResponse(
        response.iter_content(chunk_size=8192),
        media_type="application/octet-stream",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )
注意
requests.get()
是阻塞性的 —— 请在
asyncio.to_thread
中执行,避免等待远程服务器响应时阻塞事件循环。

Step 6: Other plugin component types

步骤6:其他插件组件类型

Macros

Macros are loaded by the scheduler (and DAG processor), not the API server. Restart the scheduler after changes.
python
from airflow.plugins_manager import AirflowPlugin

def format_confidence(confidence: float) -> str:
    return f"{confidence * 100:.2f}%"

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    macros = [format_confidence]
Use in any templated field — including with XCom:
{{ macros.my_plugin.format_confidence(0.95) }}

{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}
The naming pattern is always
macros.{plugin_name}.{function_name}
.
宏由调度器(和DAG处理器)加载,而非API服务器。修改后需要重启调度器。
python
from airflow.plugins_manager import AirflowPlugin

def format_confidence(confidence: float) -> str:
    return f"{confidence * 100:.2f}%"

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    macros = [format_confidence]
可在任何模板化字段中使用 —— 包括XCom:
{{ macros.my_plugin.format_confidence(0.95) }}

{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}
命名模式始终为
macros.{plugin_name}.{function_name}

Middleware

中间件

Middleware applies to all Airflow API requests, including the built-in REST API and any FastAPI plugins. Use sparingly and filter requests explicitly if needed:
python
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response

class AuditMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next) -> Response:
        # runs before every request to the Airflow API server
        response = await call_next(request)
        return response

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    fastapi_root_middlewares = [
        {"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
    ]
中间件会应用于所有Airflow API请求,包括内置REST API和所有FastAPI插件。请谨慎使用,必要时显式过滤请求:
python
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response

class AuditMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next) -> Response:
        # 在每个Airflow API服务器请求前运行
        response = await call_next(request)
        return response

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    fastapi_root_middlewares = [
        {"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
    ]

Operator extra links

Operator额外链接

python
from airflow.sdk.bases.operatorlink import BaseOperatorLink

class MyDashboardLink(BaseOperatorLink):
    name = "Open in Dashboard"

    def get_link(self, operator, *, ti_key, **context) -> str:
        return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    global_operator_extra_links = [MyDashboardLink()]  # appears on every task
    # operator_extra_links = [MyDashboardLink()]       # attach to specific operator instead
python
from airflow.sdk.bases.operatorlink import BaseOperatorLink

class MyDashboardLink(BaseOperatorLink):
    name = "在仪表板中打开"

    def get_link(self, operator, *, ti_key, **context) -> str:
        return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    global_operator_extra_links = [MyDashboardLink()]  # 显示在所有任务上
    # operator_extra_links = [MyDashboardLink()]       # 仅附加到特定Operator

React apps

React应用

React apps are embedded as JavaScript bundles served via FastAPI. The bundle must expose itself as a global variable matching the plugin name:
javascript
// In your bundle (e.g. my-app.js)
globalThis['My Plugin'] = MyComponent;   // matches plugin name
globalThis.AirflowPlugin = MyComponent;  // fallback Airflow looks for
python
class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
    react_apps = [
        {
            "name": "My Plugin",
            "bundle_url": "/my-plugin/my-app.js",
            "destination": "nav",
            "category": "browse",
            "url_route": "my-plugin",
        }
    ]
The same bundle can be registered to multiple destinations by adding multiple entries — each needs a unique
url_route
:
python
react_apps = [
    {"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav",  "url_route": "my-widget-nav"},
    {"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag",  "url_route": "my-widget-dag"},
]
React app integration is experimental in Airflow 3.1. Interfaces may change in future releases.

React应用以JavaScript包的形式通过FastAPI提供服务。该包必须将自身暴露为与插件名称匹配的全局变量:
javascript
// 在你的包中(如my-app.js)
globalThis['My Plugin'] = MyComponent;   # 与插件名称匹配
globalThis.AirflowPlugin = MyComponent;  # Airflow会寻找的回退变量
python
class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
    react_apps = [
        {
            "name": "My Plugin",
            "bundle_url": "/my-plugin/my-app.js",
            "destination": "nav",
            "category": "browse",
            "url_route": "my-plugin",
        }
    ]
同一包可通过添加多个条目注册到多个位置 —— 每个条目需要唯一的
url_route
python
react_apps = [
    {"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav",  "url_route": "my-widget-nav"},
    {"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag",  "url_route": "my-widget-dag"},
]
React应用集成在Airflow 3.1中属于实验性功能。接口在未来版本中可能会变化。

Step 7: Environment variables and deployment

步骤7:环境变量与部署

Never hardcode credentials:
python
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST",     "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
Local Astro CLI:
undefined
切勿硬编码凭证:
python
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST",     "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
本地Astro CLI:
undefined

.env

.env

MYPLUGIN_HOST=http://localhost:8080 MYPLUGIN_USERNAME=admin MYPLUGIN_PASSWORD=admin

```bash
astro dev restart              # required after any Python plugin change
MYPLUGIN_HOST=http://localhost:8080 MYPLUGIN_USERNAME=admin MYPLUGIN_PASSWORD=admin

```bash
astro dev restart              # 修改Python插件后必须执行

Check logs by component (Astro CLI):

按组件查看日志(Astro CLI):

astro dev logs --api-server # FastAPI apps, external_views — plugin import errors show here astro dev logs --scheduler # macros, timetables, listeners, operator links astro dev logs --dag-processor # DAG parsing errors
astro dev logs --api-server # FastAPI应用、external_views —— 插件导入错误会显示在这里 astro dev logs --scheduler # 宏、时间表、监听器、Operator链接 astro dev logs --dag-processor # DAG解析错误

Non-Astro:

非Astro环境:

airflow plugins # CLI — lists all loaded plugins

**Production Astronomer:**
```bash
astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.com
Auto-reload during development (skips lazy loading):
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False
Cache busting for static files after deploy:
html
<script src="static/app.js?v=20240315-1"></script>
Verify the plugin loaded: open Admin > Plugins in the Airflow UI.
OpenAPI docs are auto-generated for FastAPI plugins:
  • Swagger UI:
    {AIRFLOW_HOST}/{url_prefix}/docs
  • OpenAPI JSON:
    {AIRFLOW_HOST}/{url_prefix}/openapi.json

airflow plugins # CLI —— 列出所有已加载的插件

**生产环境Astronomer:**
```bash
astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.com
开发期间自动重载(跳过延迟加载):
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False
部署后静态文件缓存清除
html
<script src="static/app.js?v=20240315-1"></script>
验证插件是否加载:在Airflow UI中打开 Admin > Plugins
OpenAPI文档会为FastAPI插件自动生成:
  • Swagger UI:
    {AIRFLOW_HOST}/{url_prefix}/docs
  • OpenAPI JSON:
    {AIRFLOW_HOST}/{url_prefix}/openapi.json

Common pitfalls

常见问题

ProblemCauseFix
Nav link goes to 404Leading
/
in
href
"my-plugin/ui"
not
"/my-plugin/ui"
Nav icon not showingMissing
/
in
icon
icon
takes an absolute path:
"/my-plugin/static/icon.svg"
Event loop freezes under loadSync SDK called directly in
async def
Wrap with
asyncio.to_thread()
401 errors after 1 hourJWT expires with no refreshUse the 5-minute pre-expiry refresh pattern
StaticFiles
raises on startup
Directory missingCreate
assets/
and
static/
before starting
Plugin not showing upPython file changed without restart
astro dev restart
Endpoints accessible without loginFastAPI apps are not auto-authenticatedAdd FastAPI security (e.g. OAuth2, API key) if endpoints must be private
Middleware affecting wrong routesMiddleware applies to all API trafficFilter by
request.url.path
inside
dispatch()
JS
fetch()
breaks on Astro
Absolute path in
fetch()
Always use relative paths:
fetch('api/dags')

问题原因解决方法
导航链接跳转到404
href
中有前导斜杠
使用
"my-plugin/ui"
而非
"/my-plugin/ui"
导航图标不显示
icon
中缺少前导斜杠
icon
使用绝对路径:
"/my-plugin/static/icon.svg"
高负载下事件循环冻结
async def
中直接调用同步SDK
asyncio.to_thread()
包裹
1小时后出现401错误JWT过期且未刷新使用提前5分钟刷新的模式
启动时
StaticFiles
抛出错误
目录不存在在启动前创建
assets/
static/
目录
插件未显示修改Python文件后未重启执行
astro dev restart
端点无需登录即可访问FastAPI应用未自动认证如果端点需要私有访问,请添加FastAPI安全机制(如OAuth2、API密钥)
中间件影响错误的路由中间件应用于所有API流量
dispatch()
中通过
request.url.path
过滤请求
JS
fetch()
在Astro环境下失效
fetch()
使用了绝对路径
始终使用相对路径:
fetch('api/dags')

References

参考资料