Loading...
Loading...
Author ZenML pipelines: @step/@pipeline decorators, type hints, multi-output steps, dynamic vs static pipelines, artifact data flow, ExternalArtifact, YAML configuration, DockerSettings for remote execution, custom materializers, metadata logging, secrets management, and custom visualizations. Use this skill whenever asked to write a ZenML pipeline, create ZenML steps, make a pipeline work on Kubernetes/Vertex/SageMaker, add Docker settings, write a materializer, create a custom visualization, handle "works locally but fails on cloud" issues, or configure pipeline YAML files. Even if the user doesn't explicitly mention "pipeline authoring", use this skill when they ask to build an ML workflow, data pipeline, or training pipeline with ZenML.
npx skill4agent add zenml-io/skills zenml-pipeline-authoringzenml-scopingpipeline_architecture.md@pipeline(dynamic=True)BaseModel--sample-size--smallrun.py@stepfrom zenml import step
@step
def train_model(X_train: pd.DataFrame, lr: float = 0.01) -> sklearn.base.BaseEstimator:
"""lr is a parameter (literal value); X_train is an artifact (from upstream step)."""
model = LogisticRegression(C=1/lr).fit(X_train.drop("target", axis=1), X_train["target"])
return modelAnnotatedTuplefrom typing import Annotated, Tuple
from zenml import step
import pandas as pd
@step
def split_data(df: pd.DataFrame, ratio: float = 0.8) -> Tuple[
Annotated[pd.DataFrame, "train"],
Annotated[pd.DataFrame, "test"],
]:
idx = int(len(df) * ratio)
return df.iloc[:idx], df.iloc[idx:]from zenml import pipeline
@pipeline
def training_pipeline(dataset_path: str = "data.csv", lr: float = 0.01) -> None:
df = load_data(path=dataset_path)
train, test = split_data(df=df)
model = train_model(X_train=train, lr=lr)
evaluate(model=model, X_test=test)
if __name__ == "__main__":
training_pipeline()dataset_pathscalescale_2my_step(id="custom_id")my_pipeline_project/
├── steps/ # One file per step
│ ├── load_data.py
│ ├── preprocess.py
│ ├── train_model.py
│ └── evaluate.py
├── pipelines/
│ └── training.py # Pipeline definition(s)
├── materializers/ # Custom materializers (if any)
│ └── my_data_materializer.py
├── visualizations/ # HTML/CSS templates for dashboard visualizations
│ └── metrics_report.html
├── configs/ # One YAML config per environment
│ ├── dev.yaml
│ ├── staging.yaml
│ └── prod.yaml
├── run.py # CLI entry point (argparse, not click)
├── README.md # How to run, what stacks to use, etc.
└── pyproject.toml # Dependencies — always pyproject.toml, not requirements.txtsteps/steps.pypipelines/run.pyREADME.mdsummary.mdpyproject.tomlrequirements.txtpyproject.tomlrun.pyargparseclickzenml initpyproject.toml[project]
name = "my-pipeline-project"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"zenml>=0.93",
"pandas>=2.0",
# Add pipeline-specific dependencies here
]
[project.optional-dependencies]
dev = [
"pytest",
"ruff",
"mypy",
]uvuv pip install -e ".[dev]"uvpip install -e ".[dev]"uv pip install -e ".[dev]"zenml integration install <name> --uvuv-ypython run.py --config configs/dev.yamlload_data --> preprocess --> train_model --> evaluaterun.pyrun.pyimport argparse
from pipelines.training import training_pipeline
def main():
parser = argparse.ArgumentParser(description="Run the training pipeline")
parser.add_argument("--config", default="configs/dev.yaml", help="Path to YAML config")
parser.add_argument("--no-cache", action="store_true", help="Disable caching")
parser.add_argument("--sample-size", type=int, default=None,
help="Use only N rows (for quick local iteration)")
args = parser.parse_args()
pipeline_instance = training_pipeline.with_options(
config_path=args.config,
enable_cache=not args.no_cache,
)
pipeline_instance(sample_size=args.sample_size)
if __name__ == "__main__":
main()sample_sizeData must enter and move through the pipeline as artifacts, not as local file paths.
# ANTI-PATTERN: works locally, fails on cloud
@step
def preprocess(input_path: str) -> str:
df = pd.read_csv(input_path) # Reads from local disk
output_path = "/tmp/processed.csv"
df.to_csv(output_path)
return output_path # Next step can't access /tmp on a different pod
@step
def train(data_path: str) -> None:
df = pd.read_csv(data_path) # FileNotFoundError on cloud!# CORRECT: data flows as artifacts
@step
def preprocess(input_path: str) -> pd.DataFrame:
return pd.read_csv(input_path) # ZenML serializes the DataFrame to the artifact store
@step
def train(data: pd.DataFrame) -> None:
... # ZenML loads it from the artifact store — works everywherefrom zenml import pipeline, step
@step
def get_count() -> int:
return 3
@step
def process(index: int) -> None:
print(f"Processing {index}")
@pipeline(dynamic=True)
def my_dynamic_pipeline() -> None:
count = get_count()
count_data = count.load() # .load() gets actual Python value
for idx in range(count_data):
process(index=idx).load().chunk()| Method | Returns | Use for |
|---|---|---|
| Actual Python data | Decisions, control flow, iteration |
| A DAG edge reference | Wiring to downstream steps |
.load().chunk()items = produce_list()
for i, val in enumerate(items.load()): # load to iterate
if val > threshold:
chunk = items.chunk(index=i) # chunk to wire
process(chunk).map().submit().map().submit().map().product().submit()unmapped().unpack()ExternalArtifact(value=...)from zenml import ExternalArtifact, pipeline, step
import pandas as pd
@step
def train(data: pd.DataFrame) -> None:
...
@pipeline
def my_pipeline() -> None:
df = pd.read_csv("local_data.csv")
train(data=ExternalArtifact(value=df))# run.py (client-side, runs on your machine)
from zenml.artifacts.utils import save_artifact
import pandas as pd
df = pd.read_csv("local_data.csv")
art = save_artifact(data=df, name="my_dataset")
print(art.id) # Pass this UUID to the pipeline
# pipeline.py (runs inside the orchestrator pod)
from zenml.client import Client
from uuid import UUID
@pipeline
def my_pipeline(dataset_id: str) -> None:
artifact = Client().get_artifact_version(UUID(dataset_id))
train(data=artifact)ExternalArtifact(id=...)Client().get_artifact_version()register_artifact()# configs/dev.yaml
enable_cache: false
parameters:
dataset_path: "data/small.csv"
lr: 0.05
steps:
train_model:
settings:
resources:
cpu_count: 2training_pipeline.with_options(config_path="configs/dev.yaml")()configs/dev.yamlconfigs/staging.yamlconfigs/prod.yamlconfig.yamlzenml pipeline build-configuration my_pipeline > config_template.yamlwith_options()configure()DockerSettingsfrom zenml.config import DockerSettings
# Install pip packages
docker = DockerSettings(
requirements=["scikit-learn>=1.0", "pandas>=2.0"],
apt_packages=["libgomp1"],
environment={"PYTHONUNBUFFERED": "1"},
)
@pipeline(settings={"docker": docker})
def my_pipeline() -> None:
...@step(settings={"docker": DockerSettings(parent_image="pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime", requirements=["transformers"])})requirementsrequired_integrationsapt_packagesenvironmentruntime_environmentparent_imagepython_package_installer"uv""pip"prevent_build_reusefrom zenml import step
from zenml.utils.metadata_utils import log_metadata
@step
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> sklearn.base.BaseEstimator:
model = LogisticRegression().fit(X_train, y_train)
log_metadata({"accuracy": model.score(X_train, y_train), "n_samples": len(X_train)})
return modelmetadatalog_metadata({"model_metrics": {"accuracy": 0.95, "f1": 0.90}, "data_stats": {"n_samples": 5000}})# One-time setup (CLI)
zenml secret create db_credentials --host=db.example.com --username=admin --password=secret123from zenml import step
from zenml.client import Client
@step
def load_from_database(query: str) -> pd.DataFrame:
secret = Client().get_secret("db_credentials")
host = secret.secret_values["host"]
username = secret.secret_values["username"]
password = secret.secret_values["password"]
# Use credentials to connect...| Situation | Materializer needed? |
|---|---|
Step returns | No — built-in |
Step returns | No — built-in |
Step returns a Pydantic | No — built-in. Prefer this path for custom types |
Step returns a | No — built-in |
| Step returns a custom dataclass | Convert to Pydantic |
| Step returns a complex domain object | Yes |
| You want a stable format (JSON/Parquet) instead of cloudpickle | Yes |
from zenml.types import HTMLString, MarkdownString, CSVString
@step
def report(metrics: dict) -> HTMLString:
# For polished HTML, load from a separate template file rather than inline strings
html_path = Path(__file__).parent.parent / "visualizations" / "metrics_report.html"
template = html_path.read_text()
return HTMLString(template.format(**metrics))visualizations/.htmlimport json, os
from typing import Any, Type
from zenml.enums import ArtifactType
from zenml.materializers.base_materializer import BaseMaterializer
class MyDataMaterializer(BaseMaterializer):
ASSOCIATED_TYPES = (MyData,)
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
def save(self, data: MyData) -> None:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "w") as f: # Always use self.artifact_store.open()
json.dump(data.to_dict(), f)
def load(self, data_type: Type[Any]) -> MyData:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "r") as f:
return MyData(**json.load(f))self.artifact_store.open()open()@step(output_materializers=MyDataMaterializer)print()loggingimport logging
from zenml import step
logger = logging.getLogger(__name__)
@step
def train_model(data: pd.DataFrame) -> Model:
logger.info(f"Training on {len(data)} samples") # Captured by ZenML
print("Starting training...") # Also captured
...contextvars.copy_context()DockerSettings(environment={"ZENML_LOGGING_VERBOSITY": "DEBUG"})@step(enable_step_logs=False)ZENML_DISABLE_STEP_LOGS_STORAGE=trueStepRetryConfig(max_retries=3, delay=10, backoff=2)@step(retry=...)@pipeline(retry=...)retry:on_failureon_success@pipeline(tags=["training", "v1"])ArtifactConfig(tags=["dataset"])Tag(name="experiment-42", cascade=True)@pipeline(model=Model(name="my_model", tags=["classification"]))stagingproductionSchedule(cron_expression="0 2 * * *")Schedule(interval_second=3600)pipeline.with_options(schedule=schedule)pipeline.deploy(deployment_name="my_service")| Anti-pattern | Symptom | Fix |
|---|---|---|
| Pass local file path between steps | | Return data as artifact, not path |
Write to | Works locally, fails on K8s | Use artifact outputs |
| Missing type hints on step | Silent failures, no caching | Add type annotations to all inputs/outputs |
| Breaks across Python versions | Write a custom |
| | Use |
Missing | | Add to |
| Imports inside pipeline function body | Fails in container if module not available | Import at module level |
No | Import errors in remote steps | Run |
| Passwords/secrets in CLI args or config | Security risk, visible in logs | Use |
All steps in one | Hard to maintain, test, review | One file per step in |
| No metadata logging in train/eval steps | No metrics visible in dashboard | Add |
Single | Config drift, manual editing | Separate |
Using | Version conflicts with ZenML | Use |
Missing or wrong dep file ( | Unclear deps, no Python version pin, no dev deps | Use |
| Inline HTML strings in Python | Ugly visualizations, hard to edit | Use separate |
Defaulting to | Slower installs and weaker dependency resolution in many environments | Prefer |
| User can't review prompts; easier to make mistakes | Prefer |
| Stack registration instructions in README | Users have different stacks; instructions become stale | Just say "assumes a configured ZenML stack" and link to docs |
Using | No built-in materializer, requires custom code | Use Pydantic |
| No minimum ZenML version in deps | Breaks on older versions missing features | Pin |
https://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelineshttps://docs.zenml.io/how-to/infrastructure-deployment/auth-management