airflow-plugins
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAirflow 3 Plugins
Airflow 3 插件
Airflow 3 plugins let you embed FastAPI apps, React UIs, middleware, macros, operator buttons, and custom timetables directly into the Airflow process. No sidecar, no extra server.
CRITICAL: Plugin components (fastapi_apps, react_apps, external_views) require Airflow 3.1+. NEVER import,flask, or useflask_appbuilder/appbuilder_views— these are Airflow 2 patterns and will not work in Airflow 3. If existing code uses them, rewrite the entire registration block using FastAPI.flask_blueprintsSecurity: FastAPI plugin endpoints are not automatically protected by Airflow auth. If your endpoints need to be private, implement authentication explicitly using FastAPI's security utilities.Restart required: Changes to Python plugin files require restarting the API server. Static file changes (HTML, JS, CSS) are picked up immediately. Setduring development to load plugins at startup rather than lazily.AIRFLOW__CORE__LAZY_LOAD_PLUGINS=FalseRelative paths always: In,external_viewsmust have no leading slash. In HTML and JavaScript, use relative paths for all assets andhrefcalls. Absolute paths break behind reverse proxies.fetch()
Airflow 3 插件可让你将FastAPI应用、React UI、中间件、宏、Operator按钮和自定义时间表直接嵌入Airflow进程中,无需边车容器或额外服务器。
重要提示:插件组件(fastapi_apps、react_apps、external_views)需要 Airflow 3.1+。切勿导入、flask,或使用flask_appbuilder/appbuilder_views—— 这些是Airflow 2的模式,在Airflow 3中无法工作。如果现有代码使用了这些模式,请使用FastAPI重写整个注册块。flask_blueprints安全说明:FastAPI插件端点不会自动受Airflow认证保护。如果你的端点需要私有访问,请使用FastAPI的安全工具显式实现认证。需要重启:修改Python插件文件后需要重启API服务器。静态文件(HTML、JS、CSS)的更改会立即生效。开发期间设置,以便在启动时加载插件而非延迟加载。AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False始终使用相对路径:在中,external_views不得有前导斜杠。在HTML和JavaScript中,所有资源和href调用都要使用相对路径。绝对路径在反向代理环境下会失效。fetch()
Before writing any code, verify
编写代码前请确认
- Am I using / FastAPI — not
fastapi_apps/ Flask?appbuilder_views - Are all HTML/JS asset paths and calls relative (no leading slash)?
fetch() - Are all synchronous SDK or SQLAlchemy calls wrapped in ?
asyncio.to_thread() - Do the and
static/directories exist before the FastAPI app mounts them?assets/ - If the endpoint must be private, did I add explicit FastAPI authentication?
- 我是否在使用/FastAPI —— 而非
fastapi_apps/Flask?appbuilder_views - 所有HTML/JS资源路径和调用是否均为相对路径(无前导斜杠)?
fetch() - 所有同步SDK或SQLAlchemy调用是否都用包裹?
asyncio.to_thread() - 在FastAPI应用挂载和
static/目录前,这些目录是否已存在?assets/ - 如果端点需要私有访问,我是否已添加显式的FastAPI认证?
Step 1: Choose plugin components
步骤1:选择插件组件
A single plugin class can register multiple component types at once.
| Component | What it does | Field |
|---|---|---|
| Custom API endpoints | FastAPI app mounted in Airflow process | |
| Nav / page link | Embeds a URL as an iframe or links out | |
| React component | Custom React app embedded in Airflow UI | |
| API middleware | Intercepts all Airflow API requests/responses | |
| Jinja macros | Reusable Python functions in DAG templates | |
| Task instance button | Extra link button in task Detail view | |
| Custom timetable | Custom scheduling logic | |
| Event hooks | Listener callbacks for Airflow events | |
单个插件类可同时注册多种组件类型。
| 组件 | 功能 | 字段 |
|---|---|---|
| 自定义API端点 | 挂载到Airflow进程中的FastAPI应用 | |
| 导航/页面链接 | 将URL以iframe形式嵌入或跳转至外部 | |
| React组件 | 嵌入Airflow UI的自定义React应用 | |
| API中间件 | 拦截所有Airflow API请求/响应 | |
| Jinja宏 | DAG模板中可复用的Python函数 | |
| 任务实例按钮 | 任务详情视图中的额外链接按钮 | |
| 自定义时间表 | 自定义调度逻辑 | |
| 事件钩子 | Airflow事件的监听器回调 | |
Step 2: Plugin registration skeleton
步骤2:插件注册骨架
Project file structure
项目文件结构
Give each plugin its own subdirectory under — this keeps the Python file, static assets, and templates together and makes multi-plugin projects manageable:
plugins/plugins/
my-plugin/
plugin.py # AirflowPlugin subclass — auto-discovered by Airflow
static/
index.html
app.js
assets/
icon.svgBASE_DIR = Path(__file__).parentplugin.pyplugins/my-plugin/StaticFilespython
from pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
BASE_DIR = Path(__file__).parent
app = FastAPI(title="My Plugin")在下为每个插件创建独立子目录 —— 这样可将Python文件、静态资源和模板放在一起,便于管理多插件项目:
plugins/plugins/
my-plugin/
plugin.py # AirflowPlugin子类 —— Airflow会自动发现
static/
index.html
app.js
assets/
icon.svgplugin.pyBASE_DIR = Path(__file__).parentplugins/my-plugin/StaticFilespython
from pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
BASE_DIR = Path(__file__).parent
app = FastAPI(title="My Plugin")Both directories must exist before Airflow starts or FastAPI raises on import
Airflow启动前必须存在这两个目录,否则FastAPI会在导入时抛出错误
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
app.mount("/assets", StaticFiles(directory=BASE_DIR / "assets"), name="assets")
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [
{
"app": app,
"url_prefix": "/my-plugin", # plugin available at {AIRFLOW_HOST}/my-plugin/
"name": "My Plugin",
}
]
external_views = [
{
"name": "My Plugin",
"href": "my-plugin/ui", # NO leading slash — breaks on Astro and reverse proxies
"destination": "nav", # see locations table below
"category": "browse", # nav bar category (nav destination only)
"url_route": "my-plugin", # unique route name (required for React apps)
"icon": "/my-plugin/static/icon.svg" # DOES use a leading slash — served by FastAPI
}
]undefinedapp.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
app.mount("/assets", StaticFiles(directory=BASE_DIR / "assets"), name="assets")
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [
{
"app": app,
"url_prefix": "/my-plugin", # 插件可通过 {AIRFLOW_HOST}/my-plugin/ 访问
"name": "My Plugin",
}
]
external_views = [
{
"name": "My Plugin",
"href": "my-plugin/ui", # 不要加前导斜杠 —— 在Astro和反向代理环境下会失效
"destination": "nav", # 见下方位置表格
"category": "browse", # 导航栏分类(仅当destination为nav时需要)
"url_route": "my-plugin", # 唯一路由名称(React应用必填)
"icon": "/my-plugin/static/icon.svg" # 这里需要加前导斜杠 —— 由FastAPI提供服务
}
]undefinedExternal view locations
外部视图位置
| Where it appears |
|---|---|
| Left navigation bar (also set |
| Extra tab on every Dag page |
| Extra tab on every Dag run page |
| Extra tab on every task page |
| Extra tab on every task instance page |
| 显示位置 |
|---|---|
| 左侧导航栏(需同时设置 |
| 每个DAG页面的额外标签页 |
| 每个DAG运行页面的额外标签页 |
| 每个任务页面的额外标签页 |
| 每个任务实例页面的额外标签页 |
Nav bar categories (destination: "nav"
)
destination: "nav"导航栏分类(destination: "nav"
)
destination: "nav"Set to place the link under a specific nav group: , , or omit for top-level.
"category""browse""admin"设置可将链接放在指定导航组下:、,或留空放在顶层。
"category""browse""admin"External URLs and minimal plugins
外部URL与极简插件
href"my-plugin/ui"external_viewsfastapi_appspython
from airflow.plugins_manager import AirflowPlugin
class LearnViewPlugin(AirflowPlugin):
name = "learn_view_plugin"
external_views = [
{
"name": "Learn Airflow 3",
"href": "https://www.astronomer.io/docs/learn",
"destination": "dag", # adds a tab to every Dag page
"url_route": "learn"
}
]The no-leading-slash rule applies to internal paths only — full URLs are fine.
https://href"my-plugin/ui"external_viewsfastapi_appspython
from airflow.plugins_manager import AirflowPlugin
class LearnViewPlugin(AirflowPlugin):
name = "learn_view_plugin"
external_views = [
{
"name": "Learn Airflow 3",
"href": "https://www.astronomer.io/docs/learn",
"destination": "dag", # 为每个DAG页面添加标签页
"url_route": "learn"
}
]无前导斜杠规则仅适用于内部路径 —— 完整的URL不受限制。
https://Step 3: Serve the UI entry point
步骤3:提供UI入口
python
@app.get("/ui", response_class=FileResponse)
async def serve_ui():
return FileResponse(BASE_DIR / "static" / "index.html")In HTML, always use relative paths. Absolute paths break when Airflow is mounted at a sub-path:
html
<!-- correct -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>
<!-- breaks behind a reverse proxy -->
<script src="/my-plugin/static/app.js"></script>Same rule in JavaScript:
javascript
fetch('api/dags') // correct — relative to current page
fetch('/my-plugin/api/dags') // breaks on Astro and sub-path deployspython
@app.get("/ui", response_class=FileResponse)
async def serve_ui():
return FileResponse(BASE_DIR / "static" / "index.html")在HTML中,始终使用相对路径。当Airflow挂载在子路径下时,绝对路径会失效:
html
<!-- 正确写法 -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>
<!-- 在反向代理环境下会失效 -->
<script src="/my-plugin/static/app.js"></script>JavaScript中同样遵循此规则:
javascript
fetch('api/dags') // 正确 —— 相对于当前页面
fetch('/my-plugin/api/dags') // 在Astro和子路径部署环境下会失效Step 4: Call the Airflow API from your plugin
步骤4:在插件中调用Airflow API
Only needed if your plugin calls the Airflow REST API. Plugins that only serve static files, register, or use direct DB access do not need this step — skip to Step 5 or Step 6.external_views
仅当插件需要调用Airflow REST API时才需要此步骤。仅提供静态文件、注册或直接访问数据库的插件无需此步骤 —— 跳至步骤5或步骤6。external_views
Add the dependency
添加依赖
Only if REST API communication is being implemented: add to the project's dependencies. Check which file exists and act accordingly:
apache-airflow-client| File found | Action |
|---|---|
| Append |
| |
| None of the above | Tell the user: "Add |
Use to talk to Airflow's own REST API. The SDK is synchronous but FastAPI routes are async — never call blocking SDK methods directly inside or you will stall the event loop and freeze all concurrent requests.
apache-airflow-clientasync def仅当需要实现REST API通信时:将添加到项目依赖中。根据找到的文件执行相应操作:
apache-airflow-client| 找到的文件 | 操作 |
|---|---|
| 添加 |
| 执行 |
| 以上都没有 | 告知用户:"在运行插件前,请将 |
使用与Airflow自身的REST API通信。该SDK是同步的,但FastAPI路由是异步的 —— 切勿在中直接调用阻塞性SDK方法,否则会阻塞事件循环并冻结所有并发请求。
apache-airflow-clientasync defJWT token management
JWT令牌管理
Cache one token per process. Refresh 5 minutes before the 1-hour expiry. Use double-checked locking so multiple concurrent requests don't all race to refresh simultaneously:
Replacewith a short uppercase prefix derived from the plugin name (e.g. if the plugin is called "Trip Analyzer", useMYPLUGIN_). If no plugin name has been given yet, ask the user before writing env var names.TRIP_ANALYZER_
python
import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN") # Astronomer Astro: Deployment API token
_cached_token: str | None = None
_token_expires_at: float = 0.0
_token_lock = threading.Lock()
def _fetch_fresh_token() -> str:
"""Exchange username/password for a JWT via Airflow's auth endpoint."""
response = requests.post(
f"{AIRFLOW_HOST}/auth/token",
json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
timeout=10,
)
response.raise_for_status()
return response.json()["access_token"]
def _get_token() -> str:
# Astronomer Astro production: use static Deployment API token directly
if AIRFLOW_TOKEN:
return AIRFLOW_TOKEN
global _cached_token, _token_expires_at
now = time.monotonic()
# Fast path — no lock if still valid
if _cached_token and now < _token_expires_at:
return _cached_token
# Slow path — one thread refreshes, others wait
with _token_lock:
if _cached_token and now < _token_expires_at:
return _cached_token
_cached_token = _fetch_fresh_token()
_token_expires_at = now + 55 * 60 # refresh 5 min before 1-hour expiry
return _cached_token
def _make_config() -> airflow_sdk.Configuration:
config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
config.access_token = _get_token()
return configAfter implementing auth, tell the user:
-
Local development: setand
MYPLUGIN_USERNAMEinMYPLUGIN_PASSWORD— JWT exchange happens automatically..env -
Astronomer Astro (production): create a Deployment API token and set it as— the JWT exchange is skipped entirely:
MYPLUGIN_TOKEN- Astro UI → open the Deployment → Access → API Tokens → + Deployment API Token
- Copy the token value (shown only once)
astro deployment variable create MYPLUGIN_TOKEN=<token>
andMYPLUGIN_USERNAMEare not needed on Astro.MYPLUGIN_PASSWORD
为每个进程缓存一个令牌。在1小时有效期前5分钟刷新令牌。使用双重检查锁,避免多个并发请求同时触发刷新:
将替换为插件名称衍生的短大写前缀(例如,如果插件名为"Trip Analyzer",则使用MYPLUGIN_)。如果尚未确定插件名称,请先询问用户再编写环境变量名。TRIP_ANALYZER_
python
import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN") # Astronomer Astro:部署API令牌
_cached_token: str | None = None
_token_expires_at: float = 0.0
_token_lock = threading.Lock()
def _fetch_fresh_token() -> str:
"""通过Airflow的认证端点,将用户名/密码交换为JWT令牌。"""
response = requests.post(
f"{AIRFLOW_HOST}/auth/token",
json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
timeout=10,
)
response.raise_for_status()
return response.json()["access_token"]
def _get_token() -> str:
# Astronomer Astro生产环境:直接使用静态部署API令牌
if AIRFLOW_TOKEN:
return AIRFLOW_TOKEN
global _cached_token, _token_expires_at
now = time.monotonic()
# 快速路径 —— 令牌仍有效时无需加锁
if _cached_token and now < _token_expires_at:
return _cached_token
# 慢速路径 —— 一个线程负责刷新,其他线程等待
with _token_lock:
if _cached_token and now < _token_expires_at:
return _cached_token
_cached_token = _fetch_fresh_token()
_token_expires_at = now + 55 * 60 # 在1小时有效期前5分钟刷新
return _cached_token
def _make_config() -> airflow_sdk.Configuration:
config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
config.access_token = _get_token()
return config实现认证后,告知用户:
-
本地开发:在中设置
.env和MYPLUGIN_USERNAME—— JWT交换会自动进行。MYPLUGIN_PASSWORD -
Astronomer Astro(生产环境):创建部署API令牌并将其设置为—— 可跳过JWT交换:
MYPLUGIN_TOKEN- Astro UI → 打开部署 → 访问 → API令牌 → + 部署API令牌
- 复制令牌值(仅显示一次)
- 执行
astro deployment variable create MYPLUGIN_TOKEN=<token>
在Astro环境下无需和MYPLUGIN_USERNAME。MYPLUGIN_PASSWORD
Wrapping SDK calls with asyncio.to_thread
使用asyncio.to_thread包裹SDK调用
python
from fastapi import HTTPException
from airflow_client.client.api import DAGApi
@app.get("/api/dags")
async def list_dags():
try:
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return DAGApi(client).get_dags(limit=100).dags
dags = await asyncio.to_thread(_fetch)
return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))API field names: Never guess response field names — verify against the REST API reference. Keyfields:DAGResponse,dag_id,dag_display_name,description,is_paused,timetable_summary,timetable_description,fileloc,owners.tags
The pattern is always: define a plain inner with all SDK logic, then .
def _fetch()await asyncio.to_thread(_fetch)python
from fastapi import HTTPException
from airflow_client.client.api import DAGApi
@app.get("/api/dags")
async def list_dags():
try:
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return DAGApi(client).get_dags(limit=100).dags
dags = await asyncio.to_thread(_fetch)
return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))API字段名称:切勿猜测响应字段名称 —— 请对照REST API参考文档进行验证。的关键字段包括:DAGResponse、dag_id、dag_display_name、description、is_paused、timetable_summary、timetable_description、fileloc、owners。tags
模式始终是:定义一个纯内部包含所有SDK逻辑,然后调用。
def _fetch()await asyncio.to_thread(_fetch)Alternative: Direct database access
替代方案:直接访问数据库
Warning — use with caution and tell the user. The Airflow metadb is not a public interface. Direct writes or poorly-formed queries can corrupt scheduler state. Whenever you use this pattern, explicitly tell the user: "This accesses Airflow's internal database directly. The internal models are not part of the public API, can change between Airflow versions, and incorrect queries can cause issues in the metadb. Preferunless the operation is not exposed via the REST API."apache-airflow-client
Since FastAPI plugin endpoints run inside the API server process (not in a task worker), they have direct access to Airflow's internal SQLAlchemy models — no HTTP round-trip or JWT needed. Use only for read operations not exposed via the REST API, or when the extra HTTP overhead genuinely matters. Always wrap DB calls in — SQLAlchemy queries are blocking.
asyncio.to_thread()python
from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session
@app.get("/api/dags/status")
async def dag_status():
def _fetch():
@provide_session
def _query(session=None):
dagbag = DagBag()
paused = sum(
1 for dag_id in dagbag.dags
if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
and m.is_paused
)
return {"total": len(dagbag.dags), "paused": paused}
return _query()
return await asyncio.to_thread(_fetch)警告 —— 谨慎使用并告知用户。Airflow元数据库并非公共接口。直接写入或格式不当的查询可能会破坏调度器状态。无论何时使用此模式,都要明确告知用户:"此方式直接访问Airflow的内部数据库。内部模型不属于公共API,在不同Airflow版本间可能会变化,错误的查询可能会导致元数据库出现问题。除非操作未通过REST API暴露,否则优先使用。"apache-airflow-client
由于FastAPI插件端点运行在API服务器进程中(而非任务工作者),因此可直接访问Airflow的内部SQLAlchemy模型 —— 无需HTTP往返或JWT。仅用于REST API未暴露的读取操作,或当额外HTTP开销确实影响性能时使用。始终用包裹数据库调用 —— SQLAlchemy查询是阻塞性的。
asyncio.to_thread()python
from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session
@app.get("/api/dags/status")
async def dag_status():
def _fetch():
@provide_session
def _query(session=None):
dagbag = DagBag()
paused = sum(
1 for dag_id in dagbag.dags
if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
and m.is_paused
)
return {"total": len(dagbag.dags), "paused": paused}
return _query()
return await asyncio.to_thread(_fetch)Step 5: Common API endpoint patterns
步骤5:常见API端点模式
If you need an SDK method or field not shown in the examples below, verify it before generating code — do not guess. Either runin any environment where the SDK is installed, or search thepython3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])"repo for the class definition.apache/airflow-client-python
python
from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody
@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
result = await asyncio.to_thread(_run)
return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}
@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
await asyncio.to_thread(_run)
return {"dag_id": dag_id, "is_paused": is_paused}
@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).delete_dag(dag_id)
await asyncio.to_thread(_run)
return {"deleted": dag_id}
def normalize_state(raw) -> str:
"""Convert SDK enum objects to plain strings before sending to the frontend."""
if raw is None:
return "never_run"
return str(raw).lower()如果需要示例中未展示的SDK方法或字段,请在生成代码前进行验证 —— 切勿猜测。可在任何安装了SDK的环境中执行,或在python3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])"仓库中搜索类定义。apache/airflow-client-python
python
from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody
@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
result = await asyncio.to_thread(_run)
return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}
@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
await asyncio.to_thread(_run)
return {"dag_id": dag_id, "is_paused": is_paused}
@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).delete_dag(dag_id)
await asyncio.to_thread(_run)
return {"deleted": dag_id}
def normalize_state(raw) -> str:
"""在发送到前端前,将SDK枚举对象转换为普通字符串。"""
if raw is None:
return "never_run"
return str(raw).lower()DAG runs, task instances, and logs
DAG运行、任务实例与日志
These are the most common calls beyond basic DAG CRUD. For anything not shown here, consult the REST API reference for available endpoints and the matching Python SDK class/method names.
python
from airflow_client.client.api import DagRunApi, TaskInstanceApi这些是基础DAG增删改查之外最常用的调用。对于此处未展示的内容,请查阅REST API参考文档获取可用端点及匹配的Python SDK类/方法名称。
python
from airflow_client.client.api import DagRunApi, TaskInstanceApiLatest run for a DAG
DAG的最新运行记录
@app.get("/api/dags/{dag_id}/runs/latest")
async def latest_run(dag_id: str):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
runs = DagRunApi(client).get_dag_runs(dag_id, limit=1, order_by="-start_date").dag_runs
return runs[0] if runs else None
run = await asyncio.to_thread(_fetch)
if not run:
return {"state": "never_run"}
return {"run_id": run.dag_run_id, "state": normalize_state(run.state)}
@app.get("/api/dags/{dag_id}/runs/latest")
async def latest_run(dag_id: str):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
runs = DagRunApi(client).get_dag_runs(dag_id, limit=1, order_by="-start_date").dag_runs
return runs[0] if runs else None
run = await asyncio.to_thread(_fetch)
if not run:
return {"state": "never_run"}
return {"run_id": run.dag_run_id, "state": normalize_state(run.state)}
Task instances for a specific run
特定运行记录的任务实例
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks")
async def task_instances(dag_id: str, run_id: str):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return TaskInstanceApi(client).get_task_instances(dag_id, run_id).task_instances
tasks = await asyncio.to_thread(_fetch)
return [{"task_id": t.task_id, "state": normalize_state(t.state)} for t in tasks]
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks")
async def task_instances(dag_id: str, run_id: str):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return TaskInstanceApi(client).get_task_instances(dag_id, run_id).task_instances
tasks = await asyncio.to_thread(_fetch)
return [{"task_id": t.task_id, "state": normalize_state(t.state)} for t in tasks]
Task log (try_number starts at 1)
任务日志(try_number从1开始)
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks/{task_id}/logs/{try_number}")
async def task_log(dag_id: str, run_id: str, task_id: str, try_number: int):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return TaskInstanceApi(client).get_log(
dag_id, run_id, task_id, try_number, map_index=-1
)
result = await asyncio.to_thread(_fetch)
return {"log": result.content if hasattr(result, "content") else str(result)}
undefined@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks/{task_id}/logs/{try_number}")
async def task_log(dag_id: str, run_id: str, task_id: str, try_number: int):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return TaskInstanceApi(client).get_log(
dag_id, run_id, task_id, try_number, map_index=-1
)
result = await asyncio.to_thread(_fetch)
return {"log": result.content if hasattr(result, "content") else str(result)}
undefinedStreaming proxy
流代理
Use to proxy binary content from an external URL through the plugin — useful when the browser can't fetch the resource directly (CORS, auth, etc.):
StreamingResponsepython
import requests
from starlette.responses import StreamingResponse
@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
def _stream():
r = requests.get(f"https://files.example.com/{filename}", stream=True)
r.raise_for_status()
return r
response = await asyncio.to_thread(_stream)
return StreamingResponse(
response.iter_content(chunk_size=8192),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)Note that is blocking — fetch in so the event loop isn't stalled while waiting for the remote server.
requests.get()asyncio.to_thread使用将外部URL的二进制内容通过插件代理 —— 当浏览器无法直接获取资源时(如跨域、认证等问题)非常有用:
StreamingResponsepython
import requests
from starlette.responses import StreamingResponse
@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
def _stream():
r = requests.get(f"https://files.example.com/{filename}", stream=True)
r.raise_for_status()
return r
response = await asyncio.to_thread(_stream)
return StreamingResponse(
response.iter_content(chunk_size=8192),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)注意是阻塞性的 —— 请在中执行,避免等待远程服务器响应时阻塞事件循环。
requests.get()asyncio.to_threadStep 6: Other plugin component types
步骤6:其他插件组件类型
Macros
宏
Macros are loaded by the scheduler (and DAG processor), not the API server. Restart the scheduler after changes.
python
from airflow.plugins_manager import AirflowPlugin
def format_confidence(confidence: float) -> str:
return f"{confidence * 100:.2f}%"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
macros = [format_confidence]Use in any templated field — including with XCom:
{{ macros.my_plugin.format_confidence(0.95) }}
{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}The naming pattern is always .
macros.{plugin_name}.{function_name}宏由调度器(和DAG处理器)加载,而非API服务器。修改后需要重启调度器。
python
from airflow.plugins_manager import AirflowPlugin
def format_confidence(confidence: float) -> str:
return f"{confidence * 100:.2f}%"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
macros = [format_confidence]可在任何模板化字段中使用 —— 包括XCom:
{{ macros.my_plugin.format_confidence(0.95) }}
{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}命名模式始终为。
macros.{plugin_name}.{function_name}Middleware
中间件
Middleware applies to all Airflow API requests, including the built-in REST API and any FastAPI plugins. Use sparingly and filter requests explicitly if needed:
python
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response
class AuditMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
# runs before every request to the Airflow API server
response = await call_next(request)
return response
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_root_middlewares = [
{"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
]中间件会应用于所有Airflow API请求,包括内置REST API和所有FastAPI插件。请谨慎使用,必要时显式过滤请求:
python
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response
class AuditMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
# 在每个Airflow API服务器请求前运行
response = await call_next(request)
return response
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_root_middlewares = [
{"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
]Operator extra links
Operator额外链接
python
from airflow.sdk.bases.operatorlink import BaseOperatorLink
class MyDashboardLink(BaseOperatorLink):
name = "Open in Dashboard"
def get_link(self, operator, *, ti_key, **context) -> str:
return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
global_operator_extra_links = [MyDashboardLink()] # appears on every task
# operator_extra_links = [MyDashboardLink()] # attach to specific operator insteadpython
from airflow.sdk.bases.operatorlink import BaseOperatorLink
class MyDashboardLink(BaseOperatorLink):
name = "在仪表板中打开"
def get_link(self, operator, *, ti_key, **context) -> str:
return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
global_operator_extra_links = [MyDashboardLink()] # 显示在所有任务上
# operator_extra_links = [MyDashboardLink()] # 仅附加到特定OperatorReact apps
React应用
React apps are embedded as JavaScript bundles served via FastAPI. The bundle must expose itself as a global variable matching the plugin name:
javascript
// In your bundle (e.g. my-app.js)
globalThis['My Plugin'] = MyComponent; // matches plugin name
globalThis.AirflowPlugin = MyComponent; // fallback Airflow looks forpython
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
react_apps = [
{
"name": "My Plugin",
"bundle_url": "/my-plugin/my-app.js",
"destination": "nav",
"category": "browse",
"url_route": "my-plugin",
}
]The same bundle can be registered to multiple destinations by adding multiple entries — each needs a unique :
url_routepython
react_apps = [
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav", "url_route": "my-widget-nav"},
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag", "url_route": "my-widget-dag"},
]React app integration is experimental in Airflow 3.1. Interfaces may change in future releases.
React应用以JavaScript包的形式通过FastAPI提供服务。该包必须将自身暴露为与插件名称匹配的全局变量:
javascript
// 在你的包中(如my-app.js)
globalThis['My Plugin'] = MyComponent; # 与插件名称匹配
globalThis.AirflowPlugin = MyComponent; # Airflow会寻找的回退变量python
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
react_apps = [
{
"name": "My Plugin",
"bundle_url": "/my-plugin/my-app.js",
"destination": "nav",
"category": "browse",
"url_route": "my-plugin",
}
]同一包可通过添加多个条目注册到多个位置 —— 每个条目需要唯一的:
url_routepython
react_apps = [
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav", "url_route": "my-widget-nav"},
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag", "url_route": "my-widget-dag"},
]React应用集成在Airflow 3.1中属于实验性功能。接口在未来版本中可能会变化。
Step 7: Environment variables and deployment
步骤7:环境变量与部署
Never hardcode credentials:
python
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")Local Astro CLI:
undefined切勿硬编码凭证:
python
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")本地Astro CLI:
undefined.env
.env
MYPLUGIN_HOST=http://localhost:8080
MYPLUGIN_USERNAME=admin
MYPLUGIN_PASSWORD=admin
```bash
astro dev restart # required after any Python plugin changeMYPLUGIN_HOST=http://localhost:8080
MYPLUGIN_USERNAME=admin
MYPLUGIN_PASSWORD=admin
```bash
astro dev restart # 修改Python插件后必须执行Check logs by component (Astro CLI):
按组件查看日志(Astro CLI):
astro dev logs --api-server # FastAPI apps, external_views — plugin import errors show here
astro dev logs --scheduler # macros, timetables, listeners, operator links
astro dev logs --dag-processor # DAG parsing errors
astro dev logs --api-server # FastAPI应用、external_views —— 插件导入错误会显示在这里
astro dev logs --scheduler # 宏、时间表、监听器、Operator链接
astro dev logs --dag-processor # DAG解析错误
Non-Astro:
非Astro环境:
airflow plugins # CLI — lists all loaded plugins
**Production Astronomer:**
```bash
astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.comAuto-reload during development (skips lazy loading):
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=FalseCache busting for static files after deploy:
html
<script src="static/app.js?v=20240315-1"></script>Verify the plugin loaded: open Admin > Plugins in the Airflow UI.
OpenAPI docs are auto-generated for FastAPI plugins:
- Swagger UI:
{AIRFLOW_HOST}/{url_prefix}/docs - OpenAPI JSON:
{AIRFLOW_HOST}/{url_prefix}/openapi.json
airflow plugins # CLI —— 列出所有已加载的插件
**生产环境Astronomer:**
```bash
astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.com开发期间自动重载(跳过延迟加载):
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False部署后静态文件缓存清除:
html
<script src="static/app.js?v=20240315-1"></script>验证插件是否加载:在Airflow UI中打开 Admin > Plugins。
OpenAPI文档会为FastAPI插件自动生成:
- Swagger UI:
{AIRFLOW_HOST}/{url_prefix}/docs - OpenAPI JSON:
{AIRFLOW_HOST}/{url_prefix}/openapi.json
Common pitfalls
常见问题
| Problem | Cause | Fix |
|---|---|---|
| Nav link goes to 404 | Leading | |
| Nav icon not showing | Missing | |
| Event loop freezes under load | Sync SDK called directly in | Wrap with |
| 401 errors after 1 hour | JWT expires with no refresh | Use the 5-minute pre-expiry refresh pattern |
| Directory missing | Create |
| Plugin not showing up | Python file changed without restart | |
| Endpoints accessible without login | FastAPI apps are not auto-authenticated | Add FastAPI security (e.g. OAuth2, API key) if endpoints must be private |
| Middleware affecting wrong routes | Middleware applies to all API traffic | Filter by |
JS | Absolute path in | Always use relative paths: |
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 导航链接跳转到404 | | 使用 |
| 导航图标不显示 | | |
| 高负载下事件循环冻结 | 在 | 用 |
| 1小时后出现401错误 | JWT过期且未刷新 | 使用提前5分钟刷新的模式 |
启动时 | 目录不存在 | 在启动前创建 |
| 插件未显示 | 修改Python文件后未重启 | 执行 |
| 端点无需登录即可访问 | FastAPI应用未自动认证 | 如果端点需要私有访问,请添加FastAPI安全机制(如OAuth2、API密钥) |
| 中间件影响错误的路由 | 中间件应用于所有API流量 | 在 |
JS | | 始终使用相对路径: |
References
参考资料
- Airflow plugins documentation
- Airflow REST API reference — full endpoint list with SDK class/method names
- Astronomer: Using Airflow plugins
- Airflow插件文档
- Airflow REST API参考文档 —— 包含完整端点列表及SDK类/方法名称
- Astronomer:使用Airflow插件