From zenml-io-skills
Migrate Apache Airflow DAGs, operators, and workflows to idiomatic ZenML pipelines. Handles concept mapping (DAG→pipeline, operator→step, XCom→artifact), code translation, scheduling, retry config, Docker settings, and flags unsupported patterns (trigger rules, sensors, dynamic task mapping) for human review. Use this skill whenever the user mentions Airflow migration, converting Airflow DAGs, porting workflows from Airflow, replacing Airflow with ZenML, or asks how an Airflow concept maps to ZenML — even if they don't explicitly say "migrate". Also use when they paste Airflow code and ask to make it work with ZenML, or when they describe a workflow using Airflow terminology (DAG, operator, XCom, sensor, task group) in a ZenML context. If the user just asks a quick conceptual question ("what's the ZenML equivalent of XCom?"), answer it directly from the concept map — no need to run the full migration workflow.
npx claudepluginhub joshuarweaver/cascade-ai-ml-engineering --plugin zenml-io-skillsThis skill uses the workspace's default tool permissions.
This skill translates Apache Airflow DAGs into idiomatic ZenML pipelines. It handles the full migration workflow: analyzing Airflow code, classifying each pattern, translating what maps cleanly, flagging what needs redesign, and producing a working ZenML project.
Creates isolated Git worktrees for feature branches with prioritized directory selection, gitignore safety checks, auto project setup for Node/Python/Rust/Go, and baseline verification.
Executes implementation plans in current session by dispatching fresh subagents per independent task, with two-stage reviews: spec compliance then code quality.
Dispatches parallel agents to independently tackle 2+ tasks like separate test failures or subsystems without shared state or dependencies.
This skill translates Apache Airflow DAGs into idiomatic ZenML pipelines. It handles the full migration workflow: analyzing Airflow code, classifying each pattern, translating what maps cleanly, flagging what needs redesign, and producing a working ZenML project.
Airflow and ZenML look similar on the surface — DAG maps to pipeline, operator maps to step, XCom maps to artifact — but their execution models are fundamentally different. Airflow is built around a scheduler-backed, database-persisted task-instance state machine. ZenML is built around artifact lineage, stack-driven infrastructure abstraction, and Python-first pipeline composition.
This means migration is not a rename-the-primitives exercise. Some patterns translate directly, some need approximation, and some require genuine redesign. The skill's job is to be honest about which is which.
Every Airflow concept falls into one of these categories:
| Type | Meaning | Action |
|---|---|---|
| Direct | Clean 1:1 mapping exists | Translate automatically |
| Approximate | Conceptual equivalent exists but semantics differ | Translate with caveats noted in migration report |
| Absent | No ZenML equivalent | Flag for human review with redesign suggestions |
See references/concept-map.md for the full mapping tables.
Ask the user for their Airflow DAG file(s). Read the code thoroughly before doing anything else. For each DAG, identify:
PythonOperator, BashOperator, KubernetesPodOperator, sensors, custom operators, TaskFlow @task)>>, set_upstream, TaskFlow data passing)BranchPythonOperator), short-circuiting, trigger rules beyond all_success?expand() / dynamic task mapping?schedule_interval, cron presets, timetables, catchup settings?For each component identified in Phase 1, classify it using the mapping type (direct / approximate / absent). Use the decision logic below and the full tables in references/concept-map.md.
Direct translations (translate automatically):
PythonOperator / @task → @stepretries / retry_delay → StepRetryConfigon_success_callback / on_failure_callback → step hooks (on_success, on_failure)Approximate translations (translate with caveats):
BashOperator → @step with subprocess.run() (containerization differs on remote stacks)BranchPythonOperator → conditional pipeline logic, but only if branching depends on pipeline parameters, not upstream step outputsparams / dag_run.conf → pipeline parameters / run configurationSchedule object (depends on orchestrator support)retry_exponential_backoff → StepRetryConfig(backoff=2) (boolean → numeric factor)KubernetesPodOperator → @step(step_operator="kubernetes") (ZenML containerization, not arbitrary container commands)Absent / needs redesign (flag for human review):
all_done, one_failed, none_skipped, etc.)expand() over runtime data)reschedule mode or deferrable operatorssla, sla_miss_callback)Before writing any code, present a summary to the user:
"Here's what I found in your Airflow DAG:
- Direct translations (will migrate cleanly): [list]
- Approximate translations (will work but with noted caveats): [list]
- Needs redesign (cannot auto-migrate): [list with brief explanation]
Shall I proceed with the migration?"
If there are HIGH-severity flags, explain each one concretely: what the Airflow code does, why ZenML can't replicate it directly, and what the recommended redesign looks like.
Translate the Airflow DAG into a ZenML project. Follow these conventions strictly.
Every migrated project MUST use this layout:
migrated_pipeline/
├── steps/ # One file per step
│ ├── extract.py
│ ├── transform.py
│ └── load.py
├── pipelines/
│ └── my_pipeline.py # Pipeline definition
├── materializers/ # Custom materializers (if needed)
├── configs/
│ ├── dev.yaml
│ └── prod.yaml
├── run.py # CLI entry point (argparse, not click)
├── README.md
└── pyproject.toml
This matches the zenml-pipeline-authoring skill's conventions. Key rules:
steps/run.py uses argparse (click conflicts with ZenML)pyproject.toml with zenml>=0.94.1 and requires-python = ">=3.12"zenml init at project rootFor each Airflow task, apply the appropriate translation. See references/code-patterns.md for detailed side-by-side examples covering all major patterns.
The core translation rule: Move the task's callable body into a @step function. Type-hint all inputs and outputs. Wire steps by passing outputs to inputs in the pipeline function.
# Airflow
def extract() -> list[int]:
return [1, 2, 3]
t = PythonOperator(task_id="extract", python_callable=extract)
# ZenML
@step
def extract() -> List[int]:
return [1, 2, 3]
XCom → Artifact passing: Replace all ti.xcom_pull() / ti.xcom_push() with direct function-call wiring:
# Airflow: explicit XCom pull via templating
sum_ = PythonOperator(
task_id="sum",
python_callable=sum_numbers,
op_kwargs={"numbers": "{{ ti.xcom_pull(task_ids='extract') }}"},
)
# ZenML: data flows naturally through function calls
@pipeline
def my_pipeline() -> None:
numbers = extract()
total = sum_numbers(numbers) # Artifact passed directly
Retries: Map retries + retry_delay + retry_exponential_backoff to StepRetryConfig:
# Airflow
default_args = {"retries": 3, "retry_delay": timedelta(seconds=10), "retry_exponential_backoff": True}
# ZenML
@step(retry=StepRetryConfig(max_retries=3, delay=10, backoff=2))
def my_step() -> None: ...
Callbacks → Hooks: Map on_failure_callback / on_success_callback to ZenML hooks. For chat notifications, use ZenML's standard alerter hooks:
from zenml.hooks import alerter_failure_hook, alerter_success_hook
@step(on_failure=alerter_failure_hook, on_success=alerter_success_hook)
def my_step() -> None: ...
Scheduling: Map schedule_interval or cron presets to Schedule:
from zenml.config.schedule import Schedule
schedule = Schedule(cron_expression="0 2 * * *") # Was schedule="@daily" or "0 2 * * *"
my_pipeline.with_options(schedule=schedule)()
Not all orchestrators support scheduling. Check references/concept-map.md for the orchestrator support table.
When translating approximate patterns, always add a comment in the generated code explaining the semantic difference. This helps the user understand what changed and why.
@step
def run_shell_command(cmd: str) -> str:
# Migration note: Airflow's BashOperator ran in the Airflow worker environment.
# This step runs inside a container on the active orchestrator. Working directory
# and available system tools may differ. Verify the command works in your target
# stack's container environment.
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
return result.stdout
For patterns that have no ZenML equivalent, do NOT silently approximate them. Instead:
# TODO(migration) comment in the generated code# TODO(migration): UNSUPPORTED — Airflow trigger rule 'all_done' on this step.
# ZenML does not support trigger rules. This step previously ran regardless of
# upstream success/failure. Consider: (a) splitting into separate pipelines with
# independent failure domains, or (b) wrapping upstream steps in try/except and
# using a status artifact to communicate outcome.
@step
def join_step(upstream_status: str) -> None:
...
After generating the ZenML project, produce a MIGRATION_REPORT.md in the project root. This is the user's map of everything that changed, approximated, or needs attention.
# Migration Report: [DAG Name] → [Pipeline Name]
## Summary
- **Source**: Airflow DAG `[dag_id]`
- **Target**: ZenML pipeline `[pipeline_name]`
- **Tasks migrated**: X direct, Y approximate, Z flagged
## Direct Translations
| Airflow Task | ZenML Step | Notes |
|---|---|---|
| extract (PythonOperator) | steps/extract.py | Clean translation |
## Approximate Translations
| Airflow Task | ZenML Step | What Changed |
|---|---|---|
| run_cmd (BashOperator) | steps/run_cmd.py | Now runs in container; verify command works in target environment |
## Flagged for Review
| Airflow Pattern | Severity | Issue | Suggested Redesign |
|---|---|---|---|
| trigger_rule='all_done' on join_step | HIGH | No ZenML equivalent | Split into independent pipelines or use status artifacts |
## Scheduling
- **Original**: `schedule='@daily'`, catchup=False
- **Migrated**: `Schedule(cron_expression='0 0 * * *')` — requires orchestrator with scheduling support
## What's NOT Migrated
[List any Airflow infrastructure that lives outside the DAG: connections, variables, pools, etc., with guidance on the ZenML equivalent]
## What You Get for Free After Migration
ZenML provides capabilities that Airflow does not have natively:
- **Artifact versioning and lineage** — every step output is versioned and traceable
- **Step caching** — skip re-execution when code and inputs haven't changed
- **Stack abstraction** — same pipeline code runs on local, K8s, Vertex, SageMaker by switching stacks
- **Model Control Plane** — track ML models with versioning and promotion stages
- **Service connectors** — unified cloud auth with automatic token refresh
## Recommended Next Steps
1. Run the `zenml-quick-wins` skill for metadata logging, experiment tracking, and alerters
2. Install the ZenML docs MCP server: `claude mcp add zenmldocs --transport http https://docs.zenml.io/~gitbook/mcp`
3. [Specific links to docs for each flagged pattern]
4. For Docker settings, YAML config, or deployment: use the `zenml-pipeline-authoring` skill
After migration is complete, always include a "Recommended Next Steps" section in the migration report AND communicate it to the user. This section should cover three things:
zenml-quick-wins skillAlways suggest this as the immediate next step. The quick-wins skill adds production-readiness features that complement the migration: metadata logging, experiment tracking, alerter setup, secrets management, and Model Control Plane configuration. Tell the user:
"Now that the migration is done, I'd recommend running the
zenml-quick-winsskill to add metadata logging, experiment tracking, and other production features to your pipeline."
For every flagged pattern in the migration report, include a link to the relevant ZenML documentation page. Don't just say "set up a trigger" — link to the specific docs page. Common links to include:
https://docs.zenml.io/how-to/steps-pipelines/schedule-a-pipelinehttps://docs.zenml.io/how-to/infrastructure-deployment/auth-managementhttps://docs.zenml.io/how-to/steps-pipelines/dynamic-pipelineshttps://docs.zenml.io/stacks/stack-components/orchestratorshttps://docs.zenml.io/how-to/steps-pipelines/trigger-a-pipelineZenML has a documentation MCP server that provides real-time lookups from the docs. This is especially valuable post-migration when the user needs to look up ZenML-specific patterns. Suggest:
"For easier access to ZenML documentation while you work, you can install the ZenML docs MCP server:
claude mcp add zenmldocs --transport http https://docs.zenml.io/~gitbook/mcp"
When the migration has HIGH-severity flags — patterns that couldn't be directly migrated — offer to help the user get support from the ZenML community. This is important because the ZenML engineering team often has workarounds, opinions, or may even build features to support missing patterns.
When there are 2+ HIGH-severity flags, generate a pre-made Slack message that the user can post to the ZenML community Slack (zenml.io/slack). The message should include:
Format it as a fenced code block the user can copy-paste:
**Airflow → ZenML Migration Help**
I'm migrating an Airflow DAG that uses [patterns]. The migration skill flagged these as needing redesign:
1. **[Pattern]**: [brief description + code snippet]
2. **[Pattern]**: [brief description + code snippet]
The suggested workarounds are [X], but I'm wondering if there's a better approach or upcoming feature that could help.
Optionally, offer to create an unlisted GitHub gist (gh gist create --public=false) containing the original Airflow code and the migration report, so the community has full context.
When the migration reveals a genuine missing feature in ZenML (not just a "this works differently" situation, but a real capability gap that multiple users would benefit from), offer to open a GitHub issue on zenml-io/zenml using gh issue create. Include the Airflow pattern, the attempted workaround, and why the feature would be valuable.
/simplify to clean up the migrated codeAfter migration is complete, always suggest running the /simplify skill on the generated code. Migration often produces verbose comments, redundant patterns, and opportunities for consolidation. /simplify will review the code for reuse opportunities, quality issues, and efficiency improvements.
"The migration is done. I'd recommend running
/simplifyon the generated code to clean up migration comments, reduce duplication, and ensure the code follows ZenML best practices."
zenml-pipeline-authoringThe zenml-pipeline-authoring skill handles deeper customization:
These are the most common sources of confusion after migration. Always mention the relevant ones in the migration report.
Airflow XCom is lightweight message passing (often DB-backed, small values). ZenML artifacts are first-class persisted objects stored in the artifact store. This changes:
Airflow tasks run in worker processes managed by a central scheduler. ZenML steps run in containers managed by the orchestrator (Kubernetes pods, Vertex AI jobs, etc.). This means:
context["ti"], context["dag_run"], etc.)Airflow owns scheduling natively (the scheduler is a core component). ZenML delegates scheduling to the orchestrator — not all orchestrators support it, and the scheduling semantics depend on the orchestrator. Always check orchestrator compatibility.
| Anti-pattern | Why it's wrong | What to do instead |
|---|---|---|
Keeping ti.xcom_pull() calls | ZenML has no task instance context | Wire data through step inputs/outputs |
| Passing file paths between steps | Works locally, breaks on remote orchestrators | Pass data as artifacts (DataFrames, dicts, etc.) |
Translating BranchPythonOperator that branches on task outputs | ZenML can't branch on artifact values at graph construction | Redesign: run all branches but no-op when condition is false, or split into separate pipelines |
Mapping expand() over upstream output to a simple loop | Loses Airflow's task-level retry/observability per item | Use ZenML dynamic pipelines (if cardinality known) or multi-run pattern (if runtime-determined) |
| Ignoring trigger rules during migration | Silently changes pipeline behavior | Always flag non-default trigger rules; never drop them without user awareness |
Translating sensors to time.sleep() loops | Consumes compute slot for entire wait | Consider orchestrator scheduling, external triggers, or polling steps with timeouts |
Replicating Airflow's params override behavior | dag_run.conf override semantics are Airflow-specific | Use ZenML pipeline parameters with explicit precedence (YAML config > defaults) |
For topics beyond migration (stack setup, experiment tracking, deployment), query the ZenML docs at https://docs.zenml.io.