zenml-pipeline-authoring
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAuthor ZenML Pipelines
编写ZenML流水线
This skill guides pipeline authoring: steps, artifacts, configuration, Docker settings, materializers, metadata, secrets, and visualizations.
本技能指导流水线编写工作,涵盖步骤、工件、配置、Docker设置、materializer、元数据、密钥以及可视化等内容。
Start Here: Interview the User
第一步:与用户沟通
Do not rush to code. Before writing a single line, thoroughly understand what the user wants to build. The interview is the most important step — a well-scoped pipeline that does 3 things well beats a sprawling one that does 10 things poorly.
For complex or multi-pipeline projects: If the user describes something ambitious (e.g., "build me an end-to-end ML platform with data ingestion, feature engineering, training, evaluation, deployment, monitoring, and retraining"), or if they mention multiple pipelines, invoke the skill first. It runs a deeper architectural interview that decomposes the system into pipeline units, identifies what doesn't belong in a pipeline at all, and produces a spec. Once that's done, come back here to build each pipeline one at a time.
zenml-scopingpipeline_architecture.mdFor single, focused pipelines: If the user's request is clearly one pipeline (e.g., "build a training pipeline for my CSV data"), proceed with the questions below. If the answers are obvious from context, infer them and proceed. Only ask when genuinely ambiguous.
Q1: Static or dynamic pipeline?
Most pipelines are static (fixed DAG). Use dynamic () only when the number of steps or their wiring depends on runtime values (e.g., "process N documents where N comes from a query"). See Dynamic Pipelines and references/dynamic-pipelines.md.
@pipeline(dynamic=True)Q2: Local or remote orchestrator?
If remote (Kubernetes, Vertex AI, SageMaker, AzureML), the Artifact Golden Rule is critical, and you will need Docker Settings. If local-only for now, you can defer those concerns. Ask whether the user already has a stack set up — if not, point them to the ZenML docs for stack setup (this skill does not cover stack creation).
Q3: Any custom Python types?
If steps produce or consume types beyond builtins, pandas, numpy, or Pydantic models, you likely need a custom materializer. Note: Pydantic subclasses have a built-in materializer — often the simplest alternative to writing a custom materializer.
BaseModelQ4: Where should the project live?
Ask the user where to create the project — a new subfolder, or the current directory. If the current directory is not empty, suggest a new subfolder.
Q5: What are the data sources?
Understand where data comes from: local CSV/Parquet files, a database (Snowflake, PostgreSQL), an API, cloud storage? This determines the first step's implementation and whether secrets are needed. If credentials are involved, always use ZenML Secrets — never pass passwords as CLI arguments or in config files.
Q6: Does the user want a small-data development mode?
Many users want to iterate quickly with a fraction of the dataset. Plan for a or CLI flag in .
--sample-size--smallrun.py不要急于编写代码。在编写任何代码之前,务必充分理解用户想要构建的内容。沟通是最重要的环节——一个范围明确、能出色完成3项任务的流水线,远胜于一个臃肿、能勉强完成10项任务的流水线。
针对复杂或多流水线项目:如果用户描述的需求较为宏大(例如“为我构建一个包含数据采集、特征工程、训练、评估、部署、监控和重训练的端到端ML平台”),或者提到了多个流水线,请先调用技能。该技能会进行更深入的架构沟通,将系统分解为多个流水线单元,识别出不属于流水线的内容,并生成规范文档。完成后,再回到此处逐个构建流水线。
zenml-scopingpipeline_architecture.md针对单一、聚焦的流水线:如果用户的需求明确是构建单个流水线(例如“为我的CSV数据构建一个训练流水线”),请继续以下问题。如果从上下文可以明显推断出答案,可直接推断并继续。仅在确实存在歧义时才提问。
问题1:静态还是动态流水线?
大多数流水线都是静态的(固定DAG)。只有当步骤的数量或它们之间的连接依赖于运行时的值时(例如“处理N个文档,其中N来自查询结果”),才使用动态流水线()。请参阅动态流水线和references/dynamic-pipelines.md。
@pipeline(dynamic=True)问题2:本地还是远程编排器?
如果是远程编排器(Kubernetes、Vertex AI、SageMaker、AzureML),工件黄金法则至关重要,且你需要配置Docker设置。如果目前仅在本地运行,可以暂时忽略这些问题。询问用户是否已设置好stack——如果没有,请引导他们查看ZenML文档中的stack设置部分(本技能不涵盖stack创建)。
问题3:是否使用自定义Python类型?
如果步骤生成或使用的类型超出了内置类型、pandas、numpy或Pydantic模型的范围,你可能需要编写自定义materializer。注意:Pydantic的子类有内置的materializer——这通常是编写自定义materializer之外最简单的选择。
BaseModel问题4:项目应存放在何处?
询问用户要在何处创建项目——是新的子文件夹还是当前目录。如果当前目录非空,建议使用新的子文件夹。
问题5:数据源是什么?
了解数据的来源:本地CSV/Parquet文件、数据库(Snowflake、PostgreSQL)、API还是云存储?这决定了第一步的实现方式以及是否需要密钥。如果涉及凭证,请始终使用ZenML密钥管理——绝不要将密码作为CLI参数或配置文件中的值传递。
问题6:用户是否需要小数据开发模式?
许多用户希望使用部分数据集快速迭代。请在中规划或 CLI标志。
run.py--sample-size--smallCore Anatomy
核心结构
Defining steps
定义步骤
A step is a Python function decorated with . Type hints on inputs and outputs are required — they control serialization, caching, and dashboard display.
@steppython
from zenml import step
@step
def train_model(X_train: pd.DataFrame, lr: float = 0.01) -> sklearn.base.BaseEstimator:
"""lr is a parameter (literal value); X_train is an artifact (from upstream step)."""
model = LogisticRegression(C=1/lr).fit(X_train.drop("target", axis=1), X_train["target"])
return modelParameters vs artifacts: If a step input comes from another step's output, it is an artifact. If it is a literal value passed directly (JSON-serializable), it is a parameter. ZenML handles them differently.
步骤是一个用装饰的Python函数。输入和输出的类型提示是必需的——它们控制着序列化、缓存和仪表板显示。
@steppython
from zenml import step
@step
def train_model(X_train: pd.DataFrame, lr: float = 0.01) -> sklearn.base.BaseEstimator:
"""lr是参数(字面量值);X_train是工件(来自上游步骤)。"""
model = LogisticRegression(C=1/lr).fit(X_train.drop("target", axis=1), X_train["target"])
return model参数与工件的区别:如果步骤的输入来自另一个步骤的输出,则它是工件。如果它是直接传递的字面量值(可JSON序列化),则它是参数。ZenML对它们的处理方式不同。
Named and multi-output steps
命名与多输出步骤
Use to give outputs stable names. Use for multiple outputs:
AnnotatedTuplepython
from typing import Annotated, Tuple
from zenml import step
import pandas as pd
@step
def split_data(df: pd.DataFrame, ratio: float = 0.8) -> Tuple[
Annotated[pd.DataFrame, "train"],
Annotated[pd.DataFrame, "test"],
]:
idx = int(len(df) * ratio)
return df.iloc[:idx], df.iloc[idx:]使用为输出指定稳定的名称。使用实现多输出:
AnnotatedTuplepython
from typing import Annotated, Tuple
from zenml import step
import pandas as pd
@step
def split_data(df: pd.DataFrame, ratio: float = 0.8) -> Tuple[
Annotated[pd.DataFrame, "train"],
Annotated[pd.DataFrame, "test"],
]:
idx = int(len(df) * ratio)
return df.iloc[:idx], df.iloc[idx:]Wiring a pipeline
连接流水线
python
from zenml import pipeline
@pipeline
def training_pipeline(dataset_path: str = "data.csv", lr: float = 0.01) -> None:
df = load_data(path=dataset_path)
train, test = split_data(df=df)
model = train_model(X_train=train, lr=lr)
evaluate(model=model, X_test=test)
if __name__ == "__main__":
training_pipeline()Pipeline parameters (like ) can be overridden at runtime or via YAML config.
dataset_pathpython
from zenml import pipeline
@pipeline
def training_pipeline(dataset_path: str = "data.csv", lr: float = 0.01) -> None:
df = load_data(path=dataset_path)
train, test = split_data(df=df)
model = train_model(X_train=train, lr=lr)
evaluate(model=model, X_test=test)
if __name__ == "__main__":
training_pipeline()流水线参数(如)可在运行时或通过YAML配置覆盖。
dataset_pathStep invocation IDs
步骤调用ID
When you call a step multiple times in one pipeline, ZenML auto-suffixes the name (, ). Override with .
scalescale_2my_step(id="custom_id")当在一个流水线中多次调用同一个步骤时,ZenML会自动为名称添加后缀(例如、)。可以使用来覆盖默认名称。
scalescale_2my_step(id="custom_id")Project structure
项目结构
Every pipeline project MUST follow this layout. This is non-negotiable — it produces clean, maintainable projects:
my_pipeline_project/
├── steps/ # One file per step
│ ├── load_data.py
│ ├── preprocess.py
│ ├── train_model.py
│ └── evaluate.py
├── pipelines/
│ └── training.py # Pipeline definition(s)
├── materializers/ # Custom materializers (if any)
│ └── my_data_materializer.py
├── visualizations/ # HTML/CSS templates for dashboard visualizations
│ └── metrics_report.html
├── configs/ # One YAML config per environment
│ ├── dev.yaml
│ ├── staging.yaml
│ └── prod.yaml
├── run.py # CLI entry point (argparse, not click)
├── README.md # How to run, what stacks to use, etc.
└── pyproject.toml # Dependencies — always pyproject.toml, not requirements.txtKey rules:
- One step per file in a directory — not all steps in one
steps/.steps.py - Separate pipeline definition from execution — pipeline in , execution in
pipelines/.run.py - Always create a (not
README.md) explaining how to run the pipeline, what stacks it supports, and any setup needed. Link to the relevant ZenML docs pages (e.g., dynamic pipelines docs) rather than embedding lengthy explanations. Do NOT include stack registration or setup instructions — just say "assumes you have a ZenML stack configured" and link to https://docs.zenml.io for stack setup.summary.md - Always use for dependency declarations. Do NOT create
pyproject.tomlalongside it — use one or the other, andrequirements.txtis the right choice.pyproject.toml - uses
run.py(notargparse) — click can conflict with ZenML's own click dependency.click - Run at the project root to set the source root explicitly — this prevents import failures when code runs inside containers.
zenml init
每个流水线项目必须遵循以下结构。这是硬性要求——它能生成清晰、可维护的项目:
my_pipeline_project/
├── steps/ # 每个步骤对应一个文件
│ ├── load_data.py
│ ├── preprocess.py
│ ├── train_model.py
│ └── evaluate.py
├── pipelines/
│ └── training.py # 流水线定义
├── materializers/ # 自定义materializer(如有)
│ └── my_data_materializer.py
├── visualizations/ # 用于仪表板可视化的HTML/CSS模板
│ └── metrics_report.html
├── configs/ # 每个环境对应一个YAML配置
│ ├── dev.yaml
│ ├── staging.yaml
│ └── prod.yaml
├── run.py # CLI入口点(使用argparse,而非click)
├── README.md # 运行说明、支持的stack等
└── pyproject.toml # 依赖声明——始终使用pyproject.toml,而非requirements.txt关键规则:
- 目录下每个步骤对应一个文件——不要将所有步骤放在一个
steps/文件中。steps.py - 将流水线定义与执行分离——流水线定义在目录下,执行逻辑在
pipelines/中。run.py - 始终创建(而非
README.md),说明如何运行流水线、支持哪些stack以及任何必要的设置。链接到相关的ZenML文档页面(例如动态流水线文档),而非嵌入冗长的解释。不要包含stack注册或设置说明——只需说明“假设你已配置好ZenML stack”,并链接到https://docs.zenml.io以获取stack设置信息。summary.md - **始终使用**来声明依赖。不要同时创建
pyproject.toml——二选一,requirements.txt是正确的选择。pyproject.toml - 使用
run.py(而非argparse)——click可能与ZenML自身的click依赖冲突。click - **在项目根目录运行**以显式设置源根目录——这可以防止代码在容器中运行时出现导入失败的问题。
zenml init
pyproject.toml template
pyproject.toml模板
Always use this as the starting point for :
pyproject.tomltoml
[project]
name = "my-pipeline-project"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"zenml>=0.93",
"pandas>=2.0",
# Add pipeline-specific dependencies here
]
[project.optional-dependencies]
dev = [
"pytest",
"ruff",
"mypy",
]Key version constraints:
- Python >= 3.12 — ZenML's modern features and type annotations benefit from 3.12+.
- ZenML >= 0.93 — this is the minimum for current features. For dynamic pipelines, require >= 0.91 at absolute minimum (but 0.93 is safer).
- Don't pin dev tool versions (pytest, ruff, mypy) — just list them without version constraints so users get the latest.
- Prefer in README instructions:
uvfor faster and more reliable resolution. Ifuv pip install -e ".[dev]"is unavailable in the user's environment, useuv.pip install -e ".[dev]"
请始终以此作为的起点:
pyproject.tomltoml
[project]
name = "my-pipeline-project"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"zenml>=0.93",
"pandas>=2.0",
# 在此处添加流水线特定的依赖
]
[project.optional-dependencies]
dev = [
"pytest",
"ruff",
"mypy",
]关键版本约束:
- Python >= 3.12——ZenML的现代特性和类型注解受益于Python 3.12及以上版本。
- ZenML >= 0.93——这是使用当前功能的最低版本。对于动态流水线,最低要求是>=0.91(但0.93更安全)。
- 不要固定开发工具的版本(pytest、ruff、mypy)——只需列出它们,不指定版本约束,以便用户获取最新版本。
- 在README说明中优先推荐:使用
uv进行更快、更可靠的依赖解析。如果用户环境中没有uv pip install -e ".[dev]",则使用uv。pip install -e ".[dev]"
README template notes
README模板注意事项
The README should include:
- How to install: Prefer and
uv pip install -e ".[dev]"when supported. Ifzenml integration install <name> --uvis unavailable, use pip equivalents. Omituvso users can review prompts.-y - How to run:
python run.py --config configs/dev.yaml - What stacks it supports (just name them, don't explain how to register them)
- Link to specific orchestrator docs — not just generic https://docs.zenml.io. For example, if targeting Vertex AI, link to the Vertex AI orchestrator page and the GCP service connector page. Encourage users to use service connectors for authentication rather than manual credential management.
- A simple ASCII DAG visualization of the pipeline flow is a nice touch:
load_data --> preprocess --> train_model --> evaluate
README应包含:
- 安装方法:优先推荐,以及支持时使用
uv pip install -e ".[dev]"。如果没有zenml integration install <name> --uv,则使用对应的pip命令。省略uv参数,以便用户查看提示信息。-y - 运行方法:
python run.py --config configs/dev.yaml - 支持的stack(只需列出名称,无需解释如何注册)
- 链接到特定编排器的文档——不要只链接到通用的https://docs.zenml.io。例如,如果目标是Vertex AI,请链接到Vertex AI编排器页面和GCP服务连接器页面。鼓励用户使用服务连接器进行身份验证,而非手动管理凭证。
- 一个简单的ASCII DAG可视化展示流水线流程是不错的选择:
load_data --> preprocess --> train_model --> evaluate
run.py
CLI template
run.pyrun.py
CLI模板
run.pyEvery should offer these flags:
run.pypython
import argparse
from pipelines.training import training_pipeline
def main():
parser = argparse.ArgumentParser(description="Run the training pipeline")
parser.add_argument("--config", default="configs/dev.yaml", help="Path to YAML config")
parser.add_argument("--no-cache", action="store_true", help="Disable caching")
parser.add_argument("--sample-size", type=int, default=None,
help="Use only N rows (for quick local iteration)")
args = parser.parse_args()
pipeline_instance = training_pipeline.with_options(
config_path=args.config,
enable_cache=not args.no_cache,
)
pipeline_instance(sample_size=args.sample_size)
if __name__ == "__main__":
main()The parameter is passed as a pipeline parameter so the data-loading step can slice the dataset.
sample_size每个都应提供以下标志:
run.pypython
import argparse
from pipelines.training import training_pipeline
def main():
parser = argparse.ArgumentParser(description="运行训练流水线")
parser.add_argument("--config", default="configs/dev.yaml", help="YAML配置文件路径")
parser.add_argument("--no-cache", action="store_true", help="禁用缓存")
parser.add_argument("--sample-size", type=int, default=None,
help="仅使用N行数据(用于本地快速迭代)")
args = parser.parse_args()
pipeline_instance = training_pipeline.with_options(
config_path=args.config,
enable_cache=not args.no_cache,
)
pipeline_instance(sample_size=args.sample_size)
if __name__ == "__main__":
main()sample_sizeThe Artifact Golden Rule
工件黄金法则
Data must enter and move through the pipeline as artifacts, not as local file paths.
This is the single most important concept for cloud portability. When running on a remote orchestrator, each step runs in a separate container on a separate machine. There is no shared filesystem between steps.
数据必须以工件的形式进入并在流水线中流转,而非本地文件路径。
这是实现云可移植性最重要的概念。在远程编排器上运行时,每个步骤在单独的容器中运行,且步骤之间没有共享文件系统。
What goes wrong
错误示例
python
undefinedpython
undefinedANTI-PATTERN: works locally, fails on cloud
反模式:本地运行正常,云端运行失败
@step
def preprocess(input_path: str) -> str:
df = pd.read_csv(input_path) # Reads from local disk
output_path = "/tmp/processed.csv"
df.to_csv(output_path)
return output_path # Next step can't access /tmp on a different pod
@step
def train(data_path: str) -> None:
df = pd.read_csv(data_path) # FileNotFoundError on cloud!
undefined@step
def preprocess(input_path: str) -> str:
df = pd.read_csv(input_path) # 从本地磁盘读取
output_path = "/tmp/processed.csv"
df.to_csv(output_path)
return output_path # 下一个步骤无法访问其他Pod上的/tmp目录
@step
def train(data_path: str) -> None:
df = pd.read_csv(data_path) # 云端运行时会出现FileNotFoundError!
undefinedThe correct pattern
正确模式
python
undefinedpython
undefinedCORRECT: data flows as artifacts
正确:数据以工件形式流转
@step
def preprocess(input_path: str) -> pd.DataFrame:
return pd.read_csv(input_path) # ZenML serializes the DataFrame to the artifact store
@step
def train(data: pd.DataFrame) -> None:
... # ZenML loads it from the artifact store — works everywhere
The first step in a pipeline is typically the one that bridges external data into the artifact world. All downstream steps receive artifacts, never file paths.
---@step
def preprocess(input_path: str) -> pd.DataFrame:
return pd.read_csv(input_path) # ZenML将DataFrame序列化为工件存储
@step
def train(data: pd.DataFrame) -> None:
... # ZenML从工件存储中加载数据——在任何环境下都能正常工作
流水线的第一个步骤通常是将外部数据转换为工件的桥梁。所有下游步骤都接收工件,而非文件路径。
---Dynamic Pipelines
动态流水线
Use dynamic pipelines when the DAG shape depends on runtime values. They are experimental and have restricted orchestrator support (Local, LocalDocker, Kubernetes, Vertex, SageMaker, AzureML). Always link to the dynamic pipelines documentation (https://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelines) in the README since these APIs can be tricky to get right.
当DAG的形状依赖于运行时的值时,使用动态流水线。它们目前处于实验阶段,且仅支持部分编排器(Local、LocalDocker、Kubernetes、Vertex、SageMaker、AzureML)。请始终在README中链接到动态流水线文档(https://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelines),因为这些API的使用可能比较复杂。
Minimal example
最小示例
python
from zenml import pipeline, step
@step
def get_count() -> int:
return 3
@step
def process(index: int) -> None:
print(f"Processing {index}")
@pipeline(dynamic=True)
def my_dynamic_pipeline() -> None:
count = get_count()
count_data = count.load() # .load() gets actual Python value
for idx in range(count_data):
process(index=idx)python
from zenml import pipeline, step
@step
def get_count() -> int:
return 3
@step
def process(index: int) -> None:
print(f"Processing {index}")
@pipeline(dynamic=True)
def my_dynamic_pipeline() -> None:
count = get_count()
count_data = count.load() # .load()获取实际的Python值
for idx in range(count_data):
process(index=idx)The critical distinction: .load()
vs .chunk()
.load().chunk()关键区别:.load()
与.chunk()
.load().chunk()| Method | Returns | Use for |
|---|---|---|
| Actual Python data | Decisions, control flow, iteration |
| A DAG edge reference | Wiring to downstream steps |
You typically need both: to iterate/decide, to wire the DAG:
.load().chunk()python
items = produce_list()
for i, val in enumerate(items.load()): # load to iterate
if val > threshold:
chunk = items.chunk(index=i) # chunk to wire
process(chunk)| 方法 | 返回值 | 适用场景 |
|---|---|---|
| 实际Python数据 | 决策、控制流、迭代 |
| DAG边引用 | 连接至下游步骤 |
通常需要同时使用两者:用于迭代/决策,用于连接DAG:
.load().chunk()python
items = produce_list()
for i, val in enumerate(items.load()): # load用于迭代
if val > threshold:
chunk = items.chunk(index=i) # chunk用于连接
process(chunk)Fan-out with .map()
and parallel execution with .submit()
.map().submit()使用.map()
实现扇出,使用.submit()
实现并行执行
.map().submit()For map/reduce patterns, fans out over a collection. For explicit parallelism, returns a future.
.map().submit()See references/dynamic-pipelines.md for the complete API: , , , , , runtime modes, orchestrator support table, and limitations.
.map().product().submit()unmapped().unpack()对于map/reduce模式,可对集合进行扇出。对于显式并行,返回一个future。
.map().submit()请参阅references/dynamic-pipelines.md以获取完整API:、、、、、运行时模式、编排器支持表以及限制条件。
.map().product().submit()unmapped().unpack()Injecting External Data
注入外部数据
When data originates outside the pipeline (a local file, a database, an API), you need to bridge it into the artifact system.
当数据来自流水线外部(本地文件、数据库、API)时,你需要将其转换为工件。
Pattern A: ExternalArtifact(value=...)
ExternalArtifact(value=...)模式A:ExternalArtifact(value=...)
ExternalArtifact(value=...)Upload data inline when defining the pipeline. Simple but disables caching for the consuming step:
python
from zenml import ExternalArtifact, pipeline, step
import pandas as pd
@step
def train(data: pd.DataFrame) -> None:
...
@pipeline
def my_pipeline() -> None:
df = pd.read_csv("local_data.csv")
train(data=ExternalArtifact(value=df))在定义流水线时内联上传数据。简单但会禁用消费步骤的缓存:
python
from zenml import ExternalArtifact, pipeline, step
import pandas as pd
@step
def train(data: pd.DataFrame) -> None:
...
@pipeline
def my_pipeline() -> None:
df = pd.read_csv("local_data.csv")
train(data=ExternalArtifact(value=df))Pattern B: Pre-upload + UUID reference (for remote orchestrators)
模式B:预上传+UUID引用(适用于远程编排器)
For dynamic pipelines on remote orchestrators, the pipeline function runs inside the orchestrator pod — it cannot read your local filesystem. Pre-upload the data, then reference it by UUID:
python
undefined对于远程编排器上的动态流水线,流水线函数在编排器Pod内运行——无法读取你的本地文件系统。请先上传数据,然后通过UUID引用它:
python
undefinedrun.py (client-side, runs on your machine)
run.py(客户端,在你的机器上运行)
from zenml.artifacts.utils import save_artifact
import pandas as pd
df = pd.read_csv("local_data.csv")
art = save_artifact(data=df, name="my_dataset")
print(art.id) # Pass this UUID to the pipeline
from zenml.artifacts.utils import save_artifact
import pandas as pd
df = pd.read_csv("local_data.csv")
art = save_artifact(data=df, name="my_dataset")
print(art.id) # 将此UUID传递给流水线
pipeline.py (runs inside the orchestrator pod)
pipeline.py(在编排器Pod内运行)
from zenml.client import Client
from uuid import UUID
@pipeline
def my_pipeline(dataset_id: str) -> None:
artifact = Client().get_artifact_version(UUID(dataset_id))
train(data=artifact)
**Important**: `ExternalArtifact(id=...)` is rejected by the current validator. Use `Client().get_artifact_version()` instead.
See [references/external-data.md](references/external-data.md) for additional patterns including `register_artifact()`.
---from zenml.client import Client
from uuid import UUID
@pipeline
def my_pipeline(dataset_id: str) -> None:
artifact = Client().get_artifact_version(UUID(dataset_id))
train(data=artifact)
**重要提示**:当前验证器会拒绝`ExternalArtifact(id=...)`。请改用`Client().get_artifact_version()`。
请参阅[references/external-data.md](references/external-data.md)以获取其他模式,包括`register_artifact()`。
---YAML Configuration
YAML配置
Separate environment-specific settings from pipeline code using YAML config files.
使用YAML配置文件将特定于环境的设置与流水线代码分离。
Minimal example
最小示例
yaml
undefinedyaml
undefinedconfigs/dev.yaml
configs/dev.yaml
enable_cache: false
parameters:
dataset_path: "data/small.csv"
lr: 0.05
steps:
train_model:
settings:
resources:
cpu_count: 2
```python
training_pipeline.with_options(config_path="configs/dev.yaml")()Configuration precedence (highest to lowest): Runtime Python code > Step-level YAML > Pipeline-level YAML > Defaults.
Always use separate config files per environment (, , ) — never a single . Generate a template with .
configs/dev.yamlconfigs/staging.yamlconfigs/prod.yamlconfig.yamlzenml pipeline build-configuration my_pipeline > config_template.yamlPrefer (returns a copy) over (mutates in place).
with_options()configure()See references/yaml-config.md for the complete YAML schema and multi-env pattern.
enable_cache: false
parameters:
dataset_path: "data/small.csv"
lr: 0.05
steps:
train_model:
settings:
resources:
cpu_count: 2
```python
training_pipeline.with_options(config_path="configs/dev.yaml")()配置优先级(从高到低):运行时Python代码 > 步骤级YAML > 流水线级YAML > 默认值。
始终为每个环境使用单独的配置文件(、、)——不要使用单个。可以使用生成模板。
configs/dev.yamlconfigs/staging.yamlconfigs/prod.yamlconfig.yamlzenml pipeline build-configuration my_pipeline > config_template.yaml优先使用(返回副本)而非(原地修改)。
with_options()configure()请参阅references/yaml-config.md以获取完整的YAML schema和多环境模式。
Docker Settings
Docker设置
When running on remote orchestrators, ZenML builds Docker images for each step. Use to control what goes into those images.
DockerSettings在远程编排器上运行时,ZenML会为每个步骤构建Docker镜像。使用控制镜像中的内容。
DockerSettingsCommon patterns
常见模式
python
from zenml.config import DockerSettingspython
from zenml.config import DockerSettingsInstall pip packages
安装pip包
docker = DockerSettings(
requirements=["scikit-learn>=1.0", "pandas>=2.0"],
apt_packages=["libgomp1"],
environment={"PYTHONUNBUFFERED": "1"},
)
@pipeline(settings={"docker": docker})
def my_pipeline() -> None:
...
undefineddocker = DockerSettings(
requirements=["scikit-learn>=1.0", "pandas>=2.0"],
apt_packages=["libgomp1"],
environment={"PYTHONUNBUFFERED": "1"},
)
@pipeline(settings={"docker": docker})
def my_pipeline() -> None:
...
undefinedStep-level overrides
步骤级覆盖
Apply different settings per step with .
@step(settings={"docker": DockerSettings(parent_image="pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime", requirements=["transformers"])})Key fields: (pip packages), (ZenML integrations), (system packages), (build-time env vars), (runtime env vars), (custom base image), ( default or ), (force fresh builds for debugging).
requirementsrequired_integrationsapt_packagesenvironmentruntime_environmentparent_imagepython_package_installer"uv""pip"prevent_build_reuseSee references/docker-settings.md for the full field catalog and YAML equivalents.
可为每个步骤应用不同的设置,例如。
@step(settings={"docker": DockerSettings(parent_image="pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime", requirements=["transformers"])})关键字段:(pip包)、(ZenML集成)、(系统包)、(构建时环境变量)、(运行时环境变量)、(自定义基础镜像)、(默认或)、(强制重新构建以进行调试)。
requirementsrequired_integrationsapt_packagesenvironmentruntime_environmentparent_imagepython_package_installer"uv""pip"prevent_build_reuse请参阅references/docker-settings.md以获取完整的字段目录和YAML等效配置。
Metadata Logging
元数据日志
Log metadata from within steps to track metrics, parameters, and results in the ZenML dashboard. This is essential for production pipelines — every training step, evaluation step, and data quality step should log relevant metadata.
在步骤中记录元数据,以便在ZenML仪表板中跟踪指标、参数和结果。这对于生产流水线至关重要——每个训练步骤、评估步骤和数据质量步骤都应记录相关的元数据。
Basic usage (inside a step)
基本用法(在步骤内)
python
from zenml import step
from zenml.utils.metadata_utils import log_metadata
@step
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> sklearn.base.BaseEstimator:
model = LogisticRegression().fit(X_train, y_train)
log_metadata({"accuracy": model.score(X_train, y_train), "n_samples": len(X_train)})
return modelWhen called with only inside a step, it automatically attaches to the current step.
metadatapython
from zenml import step
from zenml.utils.metadata_utils import log_metadata
@step
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> sklearn.base.BaseEstimator:
model = LogisticRegression().fit(X_train, y_train)
log_metadata({"accuracy": model.score(X_train, y_train), "n_samples": len(X_train)})
return model在步骤中仅传入调用时,它会自动附加到当前步骤。
metadataNested metadata (creates separate cards in the dashboard)
嵌套元数据(在仪表板中创建单独的卡片)
Use nested dicts — each top-level key becomes its own card: .
log_metadata({"model_metrics": {"accuracy": 0.95, "f1": 0.90}, "data_stats": {"n_samples": 5000}})Log metadata in every data loading step (row count, source), preprocessing step (rows removed), training step (all metrics, hyperparams), and evaluation step (test metrics, thresholds).
使用嵌套字典——每个顶级键会成为一个单独的卡片:。
log_metadata({"model_metrics": {"accuracy": 0.95, "f1": 0.90}, "data_stats": {"n_samples": 5000}})在每个数据加载步骤(记录行数、数据源)、预处理步骤(记录移除的行数)、训练步骤(记录所有指标、超参数)和评估步骤(记录测试指标、阈值)中记录元数据。
Secrets Management
密钥管理
When a pipeline needs credentials (database passwords, API keys, cloud tokens), never pass them as CLI arguments, config file values, or environment variables in code. Use the ZenML secret store:
bash
undefined当流水线需要凭证(数据库密码、API密钥、云令牌)时,绝不要将它们作为CLI参数、配置文件值或代码中的环境变量传递。请使用ZenML密钥存储:
bash
undefinedOne-time setup (CLI)
一次性设置(CLI)
zenml secret create db_credentials --host=db.example.com --username=admin --password=secret123
```python
from zenml import step
from zenml.client import Client
@step
def load_from_database(query: str) -> pd.DataFrame:
secret = Client().get_secret("db_credentials")
host = secret.secret_values["host"]
username = secret.secret_values["username"]
password = secret.secret_values["password"]
# Use credentials to connect...This works on any orchestrator — the secret store is centralized in the ZenML server.
zenml secret create db_credentials --host=db.example.com --username=admin --password=secret123
```python
from zenml import step
from zenml.client import Client
@step
def load_from_database(query: str) -> pd.DataFrame:
secret = Client().get_secret("db_credentials")
host = secret.secret_values["host"]
username = secret.secret_values["username"]
password = secret.secret_values["password"]
# 使用凭证连接数据库...这在任何编排器上都能正常工作——密钥存储集中在ZenML服务器中。
Custom Types and Materializers
自定义类型与Materializer
When do you need a custom materializer?
何时需要自定义Materializer?
| Situation | Materializer needed? |
|---|---|
Step returns | No — built-in |
Step returns | No — built-in |
Step returns a Pydantic | No — built-in. Prefer this path for custom types |
Step returns a | No — built-in |
| Step returns a custom dataclass | Convert to Pydantic |
| Step returns a complex domain object | Yes |
| You want a stable format (JSON/Parquet) instead of cloudpickle | Yes |
| 场景 | 是否需要Materializer? |
|---|---|
步骤返回 | 不需要——内置支持 |
步骤返回 | 不需要——内置支持 |
步骤返回Pydantic的 | 不需要——内置支持。对于自定义类型,优先选择此方式 |
步骤返回 | 不需要——内置的 |
| 步骤返回自定义dataclass | 转换为Pydantic的 |
| 步骤返回复杂的领域对象 | 需要 |
| 你需要稳定的格式(JSON/Parquet)而非cloudpickle | 需要 |
Zero-effort visualizations
零代码可视化
Return special string types for instant dashboard visualizations — no materializer needed:
python
from zenml.types import HTMLString, MarkdownString, CSVString
@step
def report(metrics: dict) -> HTMLString:
# For polished HTML, load from a separate template file rather than inline strings
html_path = Path(__file__).parent.parent / "visualizations" / "metrics_report.html"
template = html_path.read_text()
return HTMLString(template.format(**metrics))Visualization quality: Keep HTML templates in a directory as separate files (optionally with embedded CSS). This makes them easy to edit and preview in a browser. Avoid writing raw HTML strings inside Python — it produces ugly, hard-to-maintain visualizations. Include proper CSS styling for a polished dashboard appearance.
visualizations/.html返回特殊的字符串类型即可实现即时仪表板可视化——无需materializer:
python
from zenml.types import HTMLString, MarkdownString, CSVString
@step
def report(metrics: dict) -> HTMLString:
# 为了生成精美的HTML,请从单独的模板文件加载,而非内联字符串
html_path = Path(__file__).parent.parent / "visualizations" / "metrics_report.html"
template = html_path.read_text()
return HTMLString(template.format(**metrics))可视化质量:将HTML模板保存在目录中,作为单独的文件(可嵌入CSS)。这使得它们易于编辑和在浏览器中预览。避免在Python中编写原始HTML字符串——这会生成丑陋且难以维护的可视化。请包含适当的CSS样式以实现精美的仪表板外观。
visualizations/.htmlMinimal custom materializer
最小自定义Materializer
python
import json, os
from typing import Any, Type
from zenml.enums import ArtifactType
from zenml.materializers.base_materializer import BaseMaterializer
class MyDataMaterializer(BaseMaterializer):
ASSOCIATED_TYPES = (MyData,)
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
def save(self, data: MyData) -> None:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "w") as f: # Always use self.artifact_store.open()
json.dump(data.to_dict(), f)
def load(self, data_type: Type[Any]) -> MyData:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "r") as f:
return MyData(**json.load(f))Use (not plain ) so it works with S3, GCS, and Azure Blob — not just local filesystems.
self.artifact_store.open()open()Assign it: .
@step(output_materializers=MyDataMaterializer)See references/materializers.md for visualizations, metadata extraction, and registration options.
python
import json, os
from typing import Any, Type
from zenml.enums import ArtifactType
from zenml.materializers.base_materializer import BaseMaterializer
class MyDataMaterializer(BaseMaterializer):
ASSOCIATED_TYPES = (MyData,)
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
def save(self, data: MyData) -> None:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "w") as f: # 始终使用self.artifact_store.open()
json.dump(data.to_dict(), f)
def load(self, data_type: Type[Any]) -> MyData:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "r") as f:
return MyData(**json.load(f))使用(而非普通的),这样它可以与S3、GCS和Azure Blob一起工作——而不仅仅是本地文件系统。
self.artifact_store.open()open()分配方式:。
@step(output_materializers=MyDataMaterializer)请参阅references/materializers.md以获取可视化、元数据提取和注册选项的相关内容。
Logging
日志
ZenML automatically captures and standard module output from within steps — no special setup needed. Logs are stored in the artifact store and visible in the dashboard.
print()loggingpython
import logging
from zenml import step
logger = logging.getLogger(__name__)
@step
def train_model(data: pd.DataFrame) -> Model:
logger.info(f"Training on {len(data)} samples") # Captured by ZenML
print("Starting training...") # Also captured
...Key points:
- Only logs from the main thread are captured. If you spawn threads, use to propagate ZenML's logging context.
contextvars.copy_context() - For remote runs, set logging verbosity via DockerSettings: .
DockerSettings(environment={"ZENML_LOGGING_VERBOSITY": "DEBUG"}) - Disable step log storage with or
@step(enable_step_logs=False).ZENML_DISABLE_STEP_LOGS_STORAGE=true - To view logs in the dashboard with a remote artifact store, you need a configured service connector.
ZenML会自动捕获步骤内的输出和标准模块的日志——无需特殊设置。日志存储在工件存储中,并可在仪表板中查看。
print()loggingpython
import logging
from zenml import step
logger = logging.getLogger(__name__)
@step
def train_model(data: pd.DataFrame) -> Model:
logger.info(f"Training on {len(data)} samples") # 被ZenML捕获
print("Starting training...") # 同样被捕获
...关键点:
- 仅捕获主线程的日志。如果生成了线程,请使用传播ZenML的日志上下文。
contextvars.copy_context() - 对于远程运行,可通过DockerSettings设置日志级别:。
DockerSettings(environment={"ZENML_LOGGING_VERBOSITY": "DEBUG"}) - 使用或
@step(enable_step_logs=False)禁用步骤日志存储。ZENML_DISABLE_STEP_LOGS_STORAGE=true - 要在仪表板中查看远程工件存储的日志,你需要配置服务连接器。
Retry Configuration
重试配置
For flaky external services or transient cloud errors, use on or . YAML equivalent uses at step or top level. See ZenML docs for hooks (, ).
StepRetryConfig(max_retries=3, delay=10, backoff=2)@step(retry=...)@pipeline(retry=...)retry:on_failureon_success对于不稳定的外部服务或临时云错误,可在或中使用。YAML等效配置在步骤或顶层使用。请参阅ZenML文档以获取钩子(、)的相关内容。
@step(retry=...)@pipeline(retry=...)StepRetryConfig(max_retries=3, delay=10, backoff=2)retry:on_failureon_successPost-Creation Enhancements
创建后的增强功能
After the pipeline is scaffolded and working, offer these enhancements. Each is optional — ask the user which they want before adding them.
流水线搭建完成并正常运行后,可提供以下增强功能。每个功能都是可选的——在添加之前请询问用户的需求。
Tags (add by default)
标签(默认添加)
Always add tags to pipelines and key artifacts — they make filtering and organization much easier in the dashboard. Use and on step outputs. Cascade tags () automatically propagate to all artifact versions created during the run.
@pipeline(tags=["training", "v1"])ArtifactConfig(tags=["dataset"])Tag(name="experiment-42", cascade=True)始终为流水线和关键工件添加标签——这使得在仪表板中过滤和组织内容更加容易。使用和步骤输出上的。级联标签()会自动传播到运行期间创建的所有工件版本。
@pipeline(tags=["training", "v1"])ArtifactConfig(tags=["dataset"])Tag(name="experiment-42", cascade=True)Model Control Plane
模型控制平面
Track the pipeline's artifacts under a named model for versioning, promotion, and cross-pipeline artifact sharing. Use . This enables model promotion ( → ) and artifact exchange between training and inference pipelines. See references/post-creation.md for full patterns.
@pipeline(model=Model(name="my_model", tags=["classification"]))stagingproduction在命名模型下跟踪流水线的工件,以实现版本控制、推广和跨流水线工件共享。使用。这支持模型推广( → )以及训练与推理流水线之间的工件交换。请参阅references/post-creation.md以获取完整模式。
@pipeline(model=Model(name="my_model", tags=["classification"]))stagingproductionScheduling
调度
Run the pipeline on a recurring schedule. Use or with . Not all orchestrators support scheduling — Kubernetes, Vertex AI, SageMaker, AzureML, Airflow, and Kubeflow do; Local and SkyPilot do not. See references/post-creation.md for the orchestrator support table and management commands.
Schedule(cron_expression="0 2 * * *")Schedule(interval_second=3600)pipeline.with_options(schedule=schedule)按计划定期运行流水线。使用或,并配合。并非所有编排器都支持调度——Kubernetes、Vertex AI、SageMaker、AzureML、Airflow和Kubeflow支持;Local和SkyPilot不支持。请参阅references/post-creation.md以获取编排器支持表和管理命令。
Schedule(cron_expression="0 2 * * *")Schedule(interval_second=3600)pipeline.with_options(schedule=schedule)Pipeline Deployment (HTTP serving)
流水线部署(HTTP服务)
For real-time inference or agent workflows, pipelines can be deployed as persistent HTTP services using . This replaces the deprecated model deployer components. For advanced deployment patterns and production configuration, see the deployment docs.
pipeline.deploy(deployment_name="my_service")See references/post-creation.md for detailed patterns for all of the above.
对于实时推理或agent工作流,可使用将流水线部署为持久化HTTP服务。这取代了已弃用的模型部署器组件。有关高级部署模式和生产配置,请参阅部署文档。
pipeline.deploy(deployment_name="my_service")请参阅references/post-creation.md以获取上述所有功能的详细模式。
Common Anti-Patterns
常见反模式
| Anti-pattern | Symptom | Fix |
|---|---|---|
| Pass local file path between steps | | Return data as artifact, not path |
Write to | Works locally, fails on K8s | Use artifact outputs |
| Missing type hints on step | Silent failures, no caching | Add type annotations to all inputs/outputs |
| Breaks across Python versions | Write a custom |
| | Use |
Missing | | Add to |
| Imports inside pipeline function body | Fails in container if module not available | Import at module level |
No | Import errors in remote steps | Run |
| Passwords/secrets in CLI args or config | Security risk, visible in logs | Use |
All steps in one | Hard to maintain, test, review | One file per step in |
| No metadata logging in train/eval steps | No metrics visible in dashboard | Add |
Single | Config drift, manual editing | Separate |
Using | Version conflicts with ZenML | Use |
Missing or wrong dep file ( | Unclear deps, no Python version pin, no dev deps | Use |
| Inline HTML strings in Python | Ugly visualizations, hard to edit | Use separate |
Defaulting to | Slower installs and weaker dependency resolution in many environments | Prefer |
| User can't review prompts; easier to make mistakes | Prefer |
| Stack registration instructions in README | Users have different stacks; instructions become stale | Just say "assumes a configured ZenML stack" and link to docs |
Using | No built-in materializer, requires custom code | Use Pydantic |
| No minimum ZenML version in deps | Breaks on older versions missing features | Pin |
| 反模式 | 症状 | 修复方法 |
|---|---|---|
| 步骤之间传递本地文件路径 | 云端运行时出现 | 以工件形式返回数据,而非路径 |
写入 | 本地运行正常,K8s上运行失败 | 使用工件输出 |
| 步骤缺少类型提示 | 静默失败,无缓存 | 为所有输入/输出添加类型注解 |
生产工件使用 | 跨Python版本时失效 | 编写自定义 |
使用 | 验证器返回 | 使用 |
| DockerSettings缺少依赖 | 容器中出现 | 添加到 |
| 在流水线函数体内导入模块 | 如果模块不可用,容器中运行失败 | 在模块级别导入 |
未在项目根目录运行 | 远程步骤中出现导入错误 | 运行 |
| 密码/密钥出现在CLI参数或配置中 | 安全风险,日志中可见 | 使用ZenML密钥存储的 |
所有步骤放在一个 | 难以维护、测试和评审 | |
| 训练/评估步骤中未记录元数据 | 仪表板中无可见指标 | 添加 |
所有环境使用单个 | 配置漂移,需要手动编辑 | 使用单独的 |
| 与ZenML的版本冲突 | 使用 |
缺少或错误的依赖文件(使用 | 依赖不明确,未固定Python版本,无开发依赖 | 仅使用 |
| Python中内联HTML字符串 | 可视化效果差,难以编辑 | 使用单独的 |
有 | 在许多环境中安装速度慢,依赖解析能力弱 | 在文档/README中优先推荐 |
使用 | 用户无法查看提示,容易出错 | 支持时优先使用 |
| README中包含stack注册说明 | 用户的stack各不相同,说明会过时 | 只需说明“假设你已配置好ZenML stack”,并链接到文档 |
步骤输出使用 | 无内置materializer,需要自定义代码 | 使用Pydantic的 |
| 依赖中未指定ZenML最低版本 | 在缺少功能的旧版本上运行失败 | 固定 |
Resources
资源
Reference files (detailed guides)
参考文件(详细指南)
- references/dynamic-pipelines.md — Complete dynamic pipeline API
- references/external-data.md — Data injection patterns
- references/docker-settings.md — Full DockerSettings field catalog
- references/materializers.md — Materializer authoring guide
- references/yaml-config.md — Complete YAML config schema
- references/post-creation.md — Tags, Model Control Plane, scheduling, deployment
- references/dynamic-pipelines.md — 完整的动态流水线API
- references/external-data.md — 数据注入模式
- references/docker-settings.md — 完整的DockerSettings字段目录
- references/materializers.md — Materializer编写指南
- references/yaml-config.md — 完整的YAML配置schema
- references/post-creation.md — 标签、模型控制平面、调度、部署
ZenML documentation
ZenML文档
For topics not covered here (stack setup, experiment tracking, advanced deployment configuration), query the ZenML docs at https://docs.zenml.io.
When linking to docs in generated READMEs, link to specific pages rather than the generic homepage. Common links to include:
- Dynamic pipelines:
https://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelines - Service connectors (for cloud auth):
https://docs.zenml.io/how-to/infrastructure-deployment/auth-management - Orchestrator-specific pages (e.g., Vertex AI, Kubernetes, SageMaker) — search docs for the specific orchestrator name
- Encourage service connectors over manual credential management for cloud stacks
对于此处未涵盖的主题(stack设置、实验跟踪、高级部署配置),请查阅ZenML文档:https://docs.zenml.io。
在生成的README中链接文档时,请链接到具体页面,而非通用主页。常见的链接包括:
- 动态流水线:
https://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelines - 服务连接器(用于云认证):
https://docs.zenml.io/how-to/infrastructure-deployment/auth-management - 特定编排器页面(例如Vertex AI、Kubernetes、SageMaker)——在文档中搜索特定编排器名称
- 鼓励使用服务连接器而非手动凭证管理来配置云stack