Loading...
Loading...
Compare original and translation side by side
Package:on PyPI Repo: https://github.com/astronomer/blueprint Requires: Python 3.10+, Airflow 2.5+, Blueprint 0.2.0+airflow-blueprint
包:PyPI上的代码仓库:https://github.com/astronomer/blueprint 依赖要求:Python 3.10+、Airflow 2.5+、Blueprint 0.2.0+airflow-blueprint
| User Request | Action |
|---|---|
| "Create a blueprint" / "Define a template" | Go to Creating Blueprints |
| "Create a DAG from YAML" / "Compose steps" | Go to Composing DAGs in YAML |
| "Customize DAG args" / "Add tags to DAG" | Go to Customizing DAG-Level Configuration |
| "Override config at runtime" / "Trigger with params" | Go to Runtime Parameter Overrides |
| "Post-process DAGs" / "Add callback" | Go to Post-Build Callbacks |
| "Validate my YAML" / "Lint blueprint" | Go to Validation Commands |
| "Set up blueprint in my project" | Go to Project Setup |
| "Version my blueprint" | Go to Versioning |
| "Generate schema" / "Astro IDE setup" | Go to Schema Generation |
| Blueprint errors / troubleshooting | Go to Troubleshooting |
| 用户请求 | 对应操作 |
|---|---|
| "创建一个蓝图" / "定义模板" | 前往 创建蓝图 章节 |
| "通过YAML创建DAG" / "编排步骤" | 前往 通过YAML编排DAG 章节 |
| "自定义DAG参数" / "给DAG添加标签" | 前往 自定义DAG层级配置 章节 |
| "运行时覆盖配置" / "带参数触发" | 前往 运行时参数覆盖 章节 |
| "DAG后置处理" / "添加回调" | 前往 构建后回调 章节 |
| "校验我的YAML" / "检查蓝图语法" | 前往 校验命令 章节 |
| "在我的项目中设置蓝图" | 前往 项目设置 章节 |
| "给我的蓝图做版本管理" | 前往 版本管理 章节 |
| "生成schema" / "Astro IDE设置" | 前往 Schema生成 章节 |
| 蓝图报错 / 故障排查 | 前往 故障排查 章节 |
undefinedundefinedundefinedundefineddags/loader.pyfrom blueprint import build_all
build_all()BlueprintDagArgsdags/loader.pyfrom blueprint import build_all
build_all()BlueprintDagArgsuvx --from airflow-blueprint blueprint listuvx --from airflow-blueprint blueprint listundefinedundefineddef render(self, config: MyConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="my_task",
bash_command=f"echo '{config.source_table}'"
)
return groupundefineddef render(self, config: MyConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="my_task",
bash_command=f"echo '{config.source_table}'"
)
return groupundefined| Element | Requirement |
|---|---|
| Config class | Must inherit from |
| Blueprint class | Must inherit from |
| Must return |
| Task IDs | Use |
| 元素 | 要求 |
|---|---|
| 配置类 | 必须继承自 |
| 蓝图类 | 必须继承自 |
| 必须返回 |
| 任务ID | 分组/任务ID请使用 |
extra="forbid"from pydantic import ConfigDict
class MyConfig(BaseModel):
model_config = ConfigDict(extra="forbid")
# fields...extra="forbid"from pydantic import ConfigDict
class MyConfig(BaseModel):
model_config = ConfigDict(extra="forbid")
# 其他字段...undefinedundefined
By default, only `schedule` and `description` are supported as DAG-level fields (via the built-in `DefaultDagArgs`). For other fields like `tags`, `default_args`, `catchup`, etc., see **Customizing DAG-Level Configuration**.
默认情况下,仅支持`schedule`和`description`作为DAG层级字段(通过内置的`DefaultDagArgs`实现)。如果需要其他字段比如`tags`、`default_args`、`catchup`等,参考 **自定义DAG层级配置** 章节。| Key | Purpose |
|---|---|
| Template name (required) |
| List of upstream step names |
| Pin to specific blueprint version |
| 关键字 | 用途 |
|---|---|
| 模板名称(必填) |
| 上游步骤名称列表 |
| 绑定到指定的蓝图版本 |
dag_id: "{{ env.get('ENV', 'dev') }}_pipeline"
schedule: "{{ var.value.schedule | default('@daily') }}"
steps:
extract:
blueprint: extract
output_path: "/data/{{ context.ds_nodash }}/output.csv"
run_id: "{{ context.dag_run.run_id }}"envvarconncontextcontext.ds_nodashcontext.dag_run.confcontext.task_instance.xcom_pull(...)dag_id: "{{ env.get('ENV', 'dev') }}_pipeline"
schedule: "{{ var.value.schedule | default('@daily') }}"
steps:
extract:
blueprint: extract
output_path: "/data/{{ context.ds_nodash }}/output.csv"
run_id: "{{ context.dag_run.run_id }}"envvarconncontextcontext.ds_nodashcontext.dag_run.confcontext.task_instance.xcom_pull(...)scheduledescriptionBlueprintDagArgsscheduledescriptionBlueprintDagArgstagsdefault_argscatchupstart_datetagsdefault_argscatchupstart_dateundefinedundefined
Then in YAML, the extra fields are validated by the config model:
```yaml
dag_id: my_pipeline
schedule: "@daily"
tags: [etl, production]
owner: data-team
retries: 3
steps:
extract:
blueprint: extract
source_table: raw.data
之后在YAML中,新增的字段会被配置模型校验:
```yaml
dag_id: my_pipeline
schedule: "@daily"
tags: [etl, production]
owner: data-team
retries: 3
steps:
extract:
blueprint: extract
source_table: raw.dataBlueprintDagArgsMultipleDagArgsErrorrender()DAG()DefaultDagArgsscheduledescriptionBlueprintDagArgsMultipleDagArgsErrorrender()DAG()DefaultDagArgsscheduledescriptionself.param()self.param()self.param("field")class ExtractConfig(BaseModel):
query: str = Field(description="SQL query to run")
batch_size: int = Field(default=1000, ge=1)
class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="run_query",
bash_command=f"run-etl --query {self.param('query')} --batch {self.param('batch_size')}"
)
return groupself.param("字段名")class ExtractConfig(BaseModel):
query: str = Field(description="要执行的SQL查询")
batch_size: int = Field(default=1000, ge=1)
class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="run_query",
bash_command=f"run-etl --query {self.param('query')} --batch {self.param('batch_size')}"
)
return groupself.resolve_config()self.resolve_config()@taskPythonOperatorself.resolve_config()class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
bp = self # capture reference for closure
@task(task_id="run_query")
def run_query(**context):
resolved = bp.resolve_config(config, context)
# resolved.query has the runtime override if one was provided
execute(resolved.query, resolved.batch_size)
with TaskGroup(group_id=self.step_id) as group:
run_query()
return group@taskPythonOperatorself.resolve_config()class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
bp = self # 捕获引用供闭包使用
@task(task_id="run_query")
def run_query(**context):
resolved = bp.resolve_config(config, context)
# 如果提供了运行时覆盖,resolved.query会使用覆盖后的值
execute(resolved.query, resolved.batch_size)
with TaskGroup(group_id=self.step_id) as group:
run_query()
return groupstep_name__fieldValidationErrorstep_name__fieldValidationErroron_dag_builtfrom pathlib import Path
from blueprint import build_all
def add_audit_tags(dag, yaml_path: Path) -> None:
dag.tags.append("managed-by-blueprint")
dag.tags.append(f"source:{yaml_path.name}")
build_all(on_dag_built=add_audit_tags)dagDAGyaml_pathPathon_dag_builtfrom pathlib import Path
from blueprint import build_all
def add_audit_tags(dag, yaml_path: Path) -> None:
dag.tags.append("managed-by-blueprint")
dag.tags.append(f"source:{yaml_path.name}")
build_all(on_dag_built=add_audit_tags)dagDAGyaml_pathPathuvx --from airflow-blueprint blueprint <command>| Command | When to Use |
|---|---|
| Show available blueprints |
| Show config schema for a blueprint |
| Show schema for specific version |
| Validate all |
| Validate specific file |
| Generate JSON schema |
| Interactive DAG YAML creation |
uvx --from airflow-blueprint blueprint <command>| 命令 | 使用场景 |
|---|---|
| 展示可用的蓝图 |
| 展示某个蓝图的配置schema |
| 展示指定版本的schema |
| 校验所有 |
| 校验指定文件 |
| 生成JSON schema |
| 交互式创建DAG YAML |
undefinedundefined
---
---MyBlueprintMyBlueprintV2MyBlueprintV3undefinedMyBlueprintMyBlueprintV2MyBlueprintV3undefinedundefinedundefinednameversionclass MyCustomExtractor(Blueprint[ExtractV3Config]):
name = "extract"
version = 3
def render(self, config): ...NameV{N}nameversionclass MyCustomExtractor(Blueprint[ExtractV3Config]):
name = "extract"
version = 3
def render(self, config): ...名称V{N}steps:
# Pin to v1
legacy_extract:
blueprint: extract
version: 1
source_table: raw.data
# Use latest (v2)
new_extract:
blueprint: extract
sources: [{table: orders}]steps:
# 绑定到v1
legacy_extract:
blueprint: extract
version: 1
source_table: raw.data
# 使用最新版本(v2)
new_extract:
blueprint: extract
sources: [{table: orders}]undefinedundefinedundefinedundefined.astro/astro dev initmkdir -p blueprint/generated-schemas.astro/astro dev initmkdir -p blueprint/generated-schemasblueprint listblueprint list
The Astro IDE reads `blueprint/generated-schemas/` to render configuration forms. Keeping schemas in sync ensures the visual builder always reflects the latest blueprint configs.
If you cannot determine whether the project is an Astro project, ask the user once and remember for the rest of the session.
---
Astro IDE会读取`blueprint/generated-schemas/`目录来渲染配置表单。保持schema同步可以保证可视化构建器始终展示最新的蓝图配置。
如果你无法确定项目是否是Astro项目,询问用户一次并在会话剩余时间记住该结果。
-----template-dirblueprint list --template-dir dags/templates/--template-dirblueprint list --template-dir dags/templates/extra="forbid"blueprint describe <name>extra="forbid"blueprint describe <名称>dags/loader.pybuild_all()from blueprint import build_all
build_all()dags/loader.pybuild_all()from blueprint import build_all
build_all()blueprint lintblueprint describeblueprint lintblueprint describedepends_ondepends_onBlueprintDagArgsBlueprintDagArgsBlueprintDagArgsBlueprintDagArgsblueprint_step_configblueprint_step_codeblueprint_step_configblueprint_step_codeblueprint listblueprint lintdags/loader.pybuild_all()blueprint listblueprint lintdags/loader.pybuild_all()