From zenml-io-skills
Author ZenML pipelines: @step/@pipeline decorators, type hints, multi-output steps, dynamic vs static pipelines, artifact data flow, ExternalArtifact, YAML configuration, DockerSettings for remote execution, custom materializers, metadata logging, secrets management, and custom visualizations. Use this skill whenever asked to write a ZenML pipeline, create ZenML steps, make a pipeline work on Kubernetes/Vertex/SageMaker, add Docker settings, write a materializer, create a custom visualization, handle "works locally but fails on cloud" issues, or configure pipeline YAML files. Even if the user doesn't explicitly mention "pipeline authoring", use this skill when they ask to build an ML workflow, data pipeline, or training pipeline with ZenML.
npx claudepluginhub joshuarweaver/cascade-ai-ml-engineering --plugin zenml-io-skillsThis skill uses the workspace's default tool permissions.
This skill guides pipeline authoring: steps, artifacts, configuration, Docker settings, materializers, metadata, secrets, and visualizations.
Creates isolated Git worktrees for feature branches with prioritized directory selection, gitignore safety checks, auto project setup for Node/Python/Rust/Go, and baseline verification.
Executes implementation plans in current session by dispatching fresh subagents per independent task, with two-stage reviews: spec compliance then code quality.
Dispatches parallel agents to independently tackle 2+ tasks like separate test failures or subsystems without shared state or dependencies.
This skill guides pipeline authoring: steps, artifacts, configuration, Docker settings, materializers, metadata, secrets, and visualizations.
Do not rush to code. Before writing a single line, thoroughly understand what the user wants to build. The interview is the most important step — a well-scoped pipeline that does 3 things well beats a sprawling one that does 10 things poorly.
For complex or multi-pipeline projects: If the user describes something ambitious (e.g., "build me an end-to-end ML platform with data ingestion, feature engineering, training, evaluation, deployment, monitoring, and retraining"), or if they mention multiple pipelines, invoke the zenml-scoping skill first. It runs a deeper architectural interview that decomposes the system into pipeline units, identifies what doesn't belong in a pipeline at all, and produces a pipeline_architecture.md spec. Once that's done, come back here to build each pipeline one at a time.
For single, focused pipelines: If the user's request is clearly one pipeline (e.g., "build a training pipeline for my CSV data"), proceed with the questions below. If the answers are obvious from context, infer them and proceed. Only ask when genuinely ambiguous.
Q1: Static or dynamic pipeline?
Most pipelines are static (fixed DAG). Use dynamic (@pipeline(dynamic=True)) only when the number of steps or their wiring depends on runtime values (e.g., "process N documents where N comes from a query"). See Dynamic Pipelines and references/dynamic-pipelines.md.
Q2: Local or remote orchestrator? If remote (Kubernetes, Vertex AI, SageMaker, AzureML), the Artifact Golden Rule is critical, and you will need Docker Settings. If local-only for now, you can defer those concerns. Ask whether the user already has a stack set up — if not, point them to the ZenML docs for stack setup (this skill does not cover stack creation).
Q3: Any custom Python types?
If steps produce or consume types beyond builtins, pandas, numpy, or Pydantic models, you likely need a custom materializer. Note: Pydantic BaseModel subclasses have a built-in materializer — often the simplest alternative to writing a custom materializer.
Q4: Where should the project live? Ask the user where to create the project — a new subfolder, or the current directory. If the current directory is not empty, suggest a new subfolder.
Q5: What are the data sources? Understand where data comes from: local CSV/Parquet files, a database (Snowflake, PostgreSQL), an API, cloud storage? This determines the first step's implementation and whether secrets are needed. If credentials are involved, always use ZenML Secrets — never pass passwords as CLI arguments or in config files.
Q6: Does the user want a small-data development mode?
Many users want to iterate quickly with a fraction of the dataset. Plan for a --sample-size or --small CLI flag in run.py.
A step is a Python function decorated with @step. Type hints on inputs and outputs are required — they control serialization, caching, and dashboard display.
from zenml import step
@step
def train_model(X_train: pd.DataFrame, lr: float = 0.01) -> sklearn.base.BaseEstimator:
"""lr is a parameter (literal value); X_train is an artifact (from upstream step)."""
model = LogisticRegression(C=1/lr).fit(X_train.drop("target", axis=1), X_train["target"])
return model
Parameters vs artifacts: If a step input comes from another step's output, it is an artifact. If it is a literal value passed directly (JSON-serializable), it is a parameter. ZenML handles them differently.
Use Annotated to give outputs stable names. Use Tuple for multiple outputs:
from typing import Annotated, Tuple
from zenml import step
import pandas as pd
@step
def split_data(df: pd.DataFrame, ratio: float = 0.8) -> Tuple[
Annotated[pd.DataFrame, "train"],
Annotated[pd.DataFrame, "test"],
]:
idx = int(len(df) * ratio)
return df.iloc[:idx], df.iloc[idx:]
from zenml import pipeline
@pipeline
def training_pipeline(dataset_path: str = "data.csv", lr: float = 0.01) -> None:
df = load_data(path=dataset_path)
train, test = split_data(df=df)
model = train_model(X_train=train, lr=lr)
evaluate(model=model, X_test=test)
if __name__ == "__main__":
training_pipeline()
Pipeline parameters (like dataset_path) can be overridden at runtime or via YAML config.
When you call a step multiple times in one pipeline, ZenML auto-suffixes the name (scale, scale_2). Override with my_step(id="custom_id").
Every pipeline project MUST follow this layout. This is non-negotiable — it produces clean, maintainable projects:
my_pipeline_project/
├── steps/ # One file per step
│ ├── load_data.py
│ ├── preprocess.py
│ ├── train_model.py
│ └── evaluate.py
├── pipelines/
│ └── training.py # Pipeline definition(s)
├── materializers/ # Custom materializers (if any)
│ └── my_data_materializer.py
├── visualizations/ # HTML/CSS templates for dashboard visualizations
│ └── metrics_report.html
├── configs/ # One YAML config per environment
│ ├── dev.yaml
│ ├── staging.yaml
│ └── prod.yaml
├── run.py # CLI entry point (argparse, not click)
├── README.md # How to run, what stacks to use, etc.
└── pyproject.toml # Dependencies — always pyproject.toml, not requirements.txt
Key rules:
steps/ directory — not all steps in one steps.py.pipelines/, execution in run.py.README.md (not summary.md) explaining how to run the pipeline, what stacks it supports, and any setup needed. Link to the relevant ZenML docs pages (e.g., dynamic pipelines docs) rather than embedding lengthy explanations. Do NOT include stack registration or setup instructions — just say "assumes you have a ZenML stack configured" and link to https://docs.zenml.io for stack setup.pyproject.toml for dependency declarations. Do NOT create requirements.txt alongside it — use one or the other, and pyproject.toml is the right choice.run.py uses argparse (not click) — click can conflict with ZenML's own click dependency.zenml init at the project root to set the source root explicitly — this prevents import failures when code runs inside containers.Always use this as the starting point for pyproject.toml:
[project]
name = "my-pipeline-project"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"zenml>=0.93",
"pandas>=2.0",
# Add pipeline-specific dependencies here
]
[project.optional-dependencies]
dev = [
"pytest",
"ruff",
"mypy",
]
Key version constraints:
uv in README instructions: uv pip install -e ".[dev]" for faster and more reliable resolution. If uv is unavailable in the user's environment, use pip install -e ".[dev]".The README should include:
uv pip install -e ".[dev]" and zenml integration install <name> --uv when supported. If uv is unavailable, use pip equivalents. Omit -y so users can review prompts.python run.py --config configs/dev.yamlload_data --> preprocess --> train_model --> evaluate
run.py CLI templateEvery run.py should offer these flags:
import argparse
from pipelines.training import training_pipeline
def main():
parser = argparse.ArgumentParser(description="Run the training pipeline")
parser.add_argument("--config", default="configs/dev.yaml", help="Path to YAML config")
parser.add_argument("--no-cache", action="store_true", help="Disable caching")
parser.add_argument("--sample-size", type=int, default=None,
help="Use only N rows (for quick local iteration)")
args = parser.parse_args()
pipeline_instance = training_pipeline.with_options(
config_path=args.config,
enable_cache=not args.no_cache,
)
pipeline_instance(sample_size=args.sample_size)
if __name__ == "__main__":
main()
The sample_size parameter is passed as a pipeline parameter so the data-loading step can slice the dataset.
Data must enter and move through the pipeline as artifacts, not as local file paths.
This is the single most important concept for cloud portability. When running on a remote orchestrator, each step runs in a separate container on a separate machine. There is no shared filesystem between steps.
# ANTI-PATTERN: works locally, fails on cloud
@step
def preprocess(input_path: str) -> str:
df = pd.read_csv(input_path) # Reads from local disk
output_path = "/tmp/processed.csv"
df.to_csv(output_path)
return output_path # Next step can't access /tmp on a different pod
@step
def train(data_path: str) -> None:
df = pd.read_csv(data_path) # FileNotFoundError on cloud!
# CORRECT: data flows as artifacts
@step
def preprocess(input_path: str) -> pd.DataFrame:
return pd.read_csv(input_path) # ZenML serializes the DataFrame to the artifact store
@step
def train(data: pd.DataFrame) -> None:
... # ZenML loads it from the artifact store — works everywhere
The first step in a pipeline is typically the one that bridges external data into the artifact world. All downstream steps receive artifacts, never file paths.
Use dynamic pipelines when the DAG shape depends on runtime values. They are experimental and have restricted orchestrator support (Local, LocalDocker, Kubernetes, Vertex, SageMaker, AzureML). Always link to the dynamic pipelines documentation (https://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelines) in the README since these APIs can be tricky to get right.
from zenml import pipeline, step
@step
def get_count() -> int:
return 3
@step
def process(index: int) -> None:
print(f"Processing {index}")
@pipeline(dynamic=True)
def my_dynamic_pipeline() -> None:
count = get_count()
count_data = count.load() # .load() gets actual Python value
for idx in range(count_data):
process(index=idx)
.load() vs .chunk()| Method | Returns | Use for |
|---|---|---|
.load() | Actual Python data | Decisions, control flow, iteration |
.chunk(index=i) | A DAG edge reference | Wiring to downstream steps |
You typically need both: .load() to iterate/decide, .chunk() to wire the DAG:
items = produce_list()
for i, val in enumerate(items.load()): # load to iterate
if val > threshold:
chunk = items.chunk(index=i) # chunk to wire
process(chunk)
.map() and parallel execution with .submit()For map/reduce patterns, .map() fans out over a collection. For explicit parallelism, .submit() returns a future.
See references/dynamic-pipelines.md for the complete API: .map(), .product(), .submit(), unmapped(), .unpack(), runtime modes, orchestrator support table, and limitations.
When data originates outside the pipeline (a local file, a database, an API), you need to bridge it into the artifact system.
ExternalArtifact(value=...)Upload data inline when defining the pipeline. Simple but disables caching for the consuming step:
from zenml import ExternalArtifact, pipeline, step
import pandas as pd
@step
def train(data: pd.DataFrame) -> None:
...
@pipeline
def my_pipeline() -> None:
df = pd.read_csv("local_data.csv")
train(data=ExternalArtifact(value=df))
For dynamic pipelines on remote orchestrators, the pipeline function runs inside the orchestrator pod — it cannot read your local filesystem. Pre-upload the data, then reference it by UUID:
# run.py (client-side, runs on your machine)
from zenml.artifacts.utils import save_artifact
import pandas as pd
df = pd.read_csv("local_data.csv")
art = save_artifact(data=df, name="my_dataset")
print(art.id) # Pass this UUID to the pipeline
# pipeline.py (runs inside the orchestrator pod)
from zenml.client import Client
from uuid import UUID
@pipeline
def my_pipeline(dataset_id: str) -> None:
artifact = Client().get_artifact_version(UUID(dataset_id))
train(data=artifact)
Important: ExternalArtifact(id=...) is rejected by the current validator. Use Client().get_artifact_version() instead.
See references/external-data.md for additional patterns including register_artifact().
Separate environment-specific settings from pipeline code using YAML config files.
# configs/dev.yaml
enable_cache: false
parameters:
dataset_path: "data/small.csv"
lr: 0.05
steps:
train_model:
settings:
resources:
cpu_count: 2
training_pipeline.with_options(config_path="configs/dev.yaml")()
Configuration precedence (highest to lowest): Runtime Python code > Step-level YAML > Pipeline-level YAML > Defaults.
Always use separate config files per environment (configs/dev.yaml, configs/staging.yaml, configs/prod.yaml) — never a single config.yaml. Generate a template with zenml pipeline build-configuration my_pipeline > config_template.yaml.
Prefer with_options() (returns a copy) over configure() (mutates in place).
See references/yaml-config.md for the complete YAML schema and multi-env pattern.
When running on remote orchestrators, ZenML builds Docker images for each step. Use DockerSettings to control what goes into those images.
from zenml.config import DockerSettings
# Install pip packages
docker = DockerSettings(
requirements=["scikit-learn>=1.0", "pandas>=2.0"],
apt_packages=["libgomp1"],
environment={"PYTHONUNBUFFERED": "1"},
)
@pipeline(settings={"docker": docker})
def my_pipeline() -> None:
...
Apply different settings per step with @step(settings={"docker": DockerSettings(parent_image="pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime", requirements=["transformers"])}).
Key fields: requirements (pip packages), required_integrations (ZenML integrations), apt_packages (system packages), environment (build-time env vars), runtime_environment (runtime env vars), parent_image (custom base image), python_package_installer ("uv" default or "pip"), prevent_build_reuse (force fresh builds for debugging).
See references/docker-settings.md for the full field catalog and YAML equivalents.
Log metadata from within steps to track metrics, parameters, and results in the ZenML dashboard. This is essential for production pipelines — every training step, evaluation step, and data quality step should log relevant metadata.
from zenml import step
from zenml.utils.metadata_utils import log_metadata
@step
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> sklearn.base.BaseEstimator:
model = LogisticRegression().fit(X_train, y_train)
log_metadata({"accuracy": model.score(X_train, y_train), "n_samples": len(X_train)})
return model
When called with only metadata inside a step, it automatically attaches to the current step.
Use nested dicts — each top-level key becomes its own card: log_metadata({"model_metrics": {"accuracy": 0.95, "f1": 0.90}, "data_stats": {"n_samples": 5000}}).
Log metadata in every data loading step (row count, source), preprocessing step (rows removed), training step (all metrics, hyperparams), and evaluation step (test metrics, thresholds).
When a pipeline needs credentials (database passwords, API keys, cloud tokens), never pass them as CLI arguments, config file values, or environment variables in code. Use the ZenML secret store:
# One-time setup (CLI)
zenml secret create db_credentials --host=db.example.com --username=admin --password=secret123
from zenml import step
from zenml.client import Client
@step
def load_from_database(query: str) -> pd.DataFrame:
secret = Client().get_secret("db_credentials")
host = secret.secret_values["host"]
username = secret.secret_values["username"]
password = secret.secret_values["password"]
# Use credentials to connect...
This works on any orchestrator — the secret store is centralized in the ZenML server.
| Situation | Materializer needed? |
|---|---|
Step returns int, str, float, bool, dict, list | No — built-in |
Step returns pd.DataFrame, np.ndarray | No — built-in |
Step returns a Pydantic BaseModel | No — built-in. Prefer this path for custom types |
Step returns a pathlib.Path (file or directory) | No — built-in PathMaterializer handles it. Archives directories as .tar.gz, copies single files. Great for model checkpoints, data folders, or any file-based artifact |
| Step returns a custom dataclass | Convert to Pydantic BaseModel — it has a built-in materializer, validation, and JSON serialization. Only write a custom materializer if you specifically need a non-JSON format |
| Step returns a complex domain object | Yes |
| You want a stable format (JSON/Parquet) instead of cloudpickle | Yes |
Return special string types for instant dashboard visualizations — no materializer needed:
from zenml.types import HTMLString, MarkdownString, CSVString
@step
def report(metrics: dict) -> HTMLString:
# For polished HTML, load from a separate template file rather than inline strings
html_path = Path(__file__).parent.parent / "visualizations" / "metrics_report.html"
template = html_path.read_text()
return HTMLString(template.format(**metrics))
Visualization quality: Keep HTML templates in a visualizations/ directory as separate .html files (optionally with embedded CSS). This makes them easy to edit and preview in a browser. Avoid writing raw HTML strings inside Python — it produces ugly, hard-to-maintain visualizations. Include proper CSS styling for a polished dashboard appearance.
import json, os
from typing import Any, Type
from zenml.enums import ArtifactType
from zenml.materializers.base_materializer import BaseMaterializer
class MyDataMaterializer(BaseMaterializer):
ASSOCIATED_TYPES = (MyData,)
ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
def save(self, data: MyData) -> None:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "w") as f: # Always use self.artifact_store.open()
json.dump(data.to_dict(), f)
def load(self, data_type: Type[Any]) -> MyData:
path = os.path.join(self.uri, "data.json")
with self.artifact_store.open(path, "r") as f:
return MyData(**json.load(f))
Use self.artifact_store.open() (not plain open()) so it works with S3, GCS, and Azure Blob — not just local filesystems.
Assign it: @step(output_materializers=MyDataMaterializer).
See references/materializers.md for visualizations, metadata extraction, and registration options.
ZenML automatically captures print() and standard logging module output from within steps — no special setup needed. Logs are stored in the artifact store and visible in the dashboard.
import logging
from zenml import step
logger = logging.getLogger(__name__)
@step
def train_model(data: pd.DataFrame) -> Model:
logger.info(f"Training on {len(data)} samples") # Captured by ZenML
print("Starting training...") # Also captured
...
Key points:
contextvars.copy_context() to propagate ZenML's logging context.DockerSettings(environment={"ZENML_LOGGING_VERBOSITY": "DEBUG"}).@step(enable_step_logs=False) or ZENML_DISABLE_STEP_LOGS_STORAGE=true.For flaky external services or transient cloud errors, use StepRetryConfig(max_retries=3, delay=10, backoff=2) on @step(retry=...) or @pipeline(retry=...). YAML equivalent uses retry: at step or top level. See ZenML docs for hooks (on_failure, on_success).
After the pipeline is scaffolded and working, offer these enhancements. Each is optional — ask the user which they want before adding them.
Always add tags to pipelines and key artifacts — they make filtering and organization much easier in the dashboard. Use @pipeline(tags=["training", "v1"]) and ArtifactConfig(tags=["dataset"]) on step outputs. Cascade tags (Tag(name="experiment-42", cascade=True)) automatically propagate to all artifact versions created during the run.
Track the pipeline's artifacts under a named model for versioning, promotion, and cross-pipeline artifact sharing. Use @pipeline(model=Model(name="my_model", tags=["classification"])). This enables model promotion (staging → production) and artifact exchange between training and inference pipelines. See references/post-creation.md for full patterns.
Run the pipeline on a recurring schedule. Use Schedule(cron_expression="0 2 * * *") or Schedule(interval_second=3600) with pipeline.with_options(schedule=schedule). Not all orchestrators support scheduling — Kubernetes, Vertex AI, SageMaker, AzureML, Airflow, and Kubeflow do; Local and SkyPilot do not. See references/post-creation.md for the orchestrator support table and management commands.
For real-time inference or agent workflows, pipelines can be deployed as persistent HTTP services using pipeline.deploy(deployment_name="my_service"). This replaces the deprecated model deployer components. For advanced deployment patterns and production configuration, see the deployment docs.
See references/post-creation.md for detailed patterns for all of the above.
| Anti-pattern | Symptom | Fix |
|---|---|---|
| Pass local file path between steps | FileNotFoundError on cloud | Return data as artifact, not path |
Write to /tmp and read in next step | Works locally, fails on K8s | Use artifact outputs |
| Missing type hints on step | Silent failures, no caching | Add type annotations to all inputs/outputs |
cloudpickle for production artifacts | Breaks across Python versions | Write a custom BaseMaterializer |
ExternalArtifact(id=...) | ValueError from validator | Use Client().get_artifact_version() |
Missing DockerSettings deps | ModuleNotFoundError in container | Add to requirements or required_integrations |
| Imports inside pipeline function body | Fails in container if module not available | Import at module level |
No zenml init at project root | Import errors in remote steps | Run zenml init to set source root |
| Passwords/secrets in CLI args or config | Security risk, visible in logs | Use Client().get_secret() from ZenML secret store |
All steps in one steps.py | Hard to maintain, test, review | One file per step in steps/ directory |
| No metadata logging in train/eval steps | No metrics visible in dashboard | Add log_metadata() calls |
Single config.yaml for all environments | Config drift, manual editing | Separate configs/dev.yaml, configs/prod.yaml |
Using click for run.py CLI | Version conflicts with ZenML | Use argparse instead |
Missing or wrong dep file (requirements.txt instead of / alongside pyproject.toml) | Unclear deps, no Python version pin, no dev deps | Use pyproject.toml only with zenml>=0.93, requires-python>=3.12, and [project.optional-dependencies] for dev tools |
| Inline HTML strings in Python | Ugly visualizations, hard to edit | Use separate .html template files |
Defaulting to pip install when uv is available | Slower installs and weaker dependency resolution in many environments | Prefer uv pip install in docs/README; fall back to pip when uv is unavailable |
zenml integration install X -y | User can't review prompts; easier to make mistakes | Prefer zenml integration install X --uv (no -y) when available; otherwise run without -y using default installer |
| Stack registration instructions in README | Users have different stacks; instructions become stale | Just say "assumes a configured ZenML stack" and link to docs |
Using dataclass for step outputs | No built-in materializer, requires custom code | Use Pydantic BaseModel — has built-in materializer |
| No minimum ZenML version in deps | Breaks on older versions missing features | Pin zenml>=0.93 (or >=0.91 minimum for dynamic) |
For topics not covered here (stack setup, experiment tracking, advanced deployment configuration), query the ZenML docs at https://docs.zenml.io.
When linking to docs in generated READMEs, link to specific pages rather than the generic homepage. Common links to include:
https://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelineshttps://docs.zenml.io/how-to/infrastructure-deployment/auth-management