From astronomer-data
Authors Apache Airflow DAGs declaratively with dag-factory YAML configs. Guides project setup, templates, defaults, dynamic/mapped tasks, datasets, callbacks, custom operators, and validation.
npx claudepluginhub astronomer/agents --plugin astronomer-dataThis skill uses the workspace's default tool permissions.
You are helping a user build Apache Airflow DAGs declaratively with **dag-factory**, a library that turns YAML configuration files into Airflow DAGs. Execute steps in order and prefer the simplest configuration that meets the user's needs.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use for data pipelines, workflow orchestration, or batch jobs.
Guides authoring Apache Airflow DAGs via structured workflow: discover environment with af CLI, plan structure, implement patterns, validate syntax and test.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. For data pipelines, workflow orchestration, and batch job scheduling.
Share bugs, ideas, or general feedback.
You are helping a user build Apache Airflow DAGs declaratively with dag-factory, a library that turns YAML configuration files into Airflow DAGs. Execute steps in order and prefer the simplest configuration that meets the user's needs.
Package:
dag-factoryon PyPI Repo: https://github.com/astronomer/dag-factory Docs: https://astronomer.github.io/dag-factory/latest/ Targets: dag-factory v1.0+ only. For pre-1.0 projects, see reference/migration.md before applying any guidance from this skill. Requires: Python 3.10+, Airflow 2.4+ (Airflow 3 supported)
Confirm with the user:
| User Request | Action |
|---|---|
| "Create a YAML DAG" / "Convert this Python DAG to YAML" | Go to Defining a DAG in YAML |
| "Set up dag-factory in my project" | Go to Project Setup |
| "Share defaults across DAGs" / "Set start_date once" | Go to Defaults |
| "Use a custom operator" / "Use KPO / Slack / Snowflake" | Go to Custom & Provider Operators |
| "Dynamic / mapped tasks" / "expand / partial" | Go to Dynamic Task Mapping |
| "Schedule on dataset" / "Outlets and inlets" | Go to Datasets |
| "Add a callback" / "Slack on failure" | Go to Callbacks |
| "Use a timetable" / "datetime in YAML" / "timedelta in YAML" | Go to Custom Python Objects (__type__) |
| "Lint my YAML" / "Validate" | Go to Validation Commands |
| "Convert Airflow 2 YAML to Airflow 3" | Go to Validation Commands (dagfactory convert) |
| "Migrate from dag-factory <1.0" | See reference/migration.md |
| dag-factory errors / troubleshooting | Go to Troubleshooting |
Add to requirements.txt:
dag-factory>=1.0.0
dag-factory does not install Airflow providers automatically. Install any provider packages your YAML references (e.g., apache-airflow-providers-slack, apache-airflow-providers-cncf-kubernetes).
Create dags/load_dags.py so Airflow's DAG processor will pick it up:
import os
from pathlib import Path
from dagfactory import load_yaml_dags
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", "/usr/local/airflow/dags/"))
# Option A: load every *.yml / *.yaml under a folder
load_yaml_dags(globals_dict=globals(), dags_folder=str(CONFIG_ROOT_DIR))
# Option B: load a single file
# load_yaml_dags(globals_dict=globals(), config_filepath=str(CONFIG_ROOT_DIR / "my_dag.yml"))
# Option C: load from an in-Python dict
# load_yaml_dags(globals_dict=globals(), config_dict={...})
globals_dict=globals() is required so generated DAG objects are registered into the module namespace where Airflow can discover them.
dagfactory --version
Each top-level YAML key (other than default) defines a DAG. The key becomes the dag_id. Use the list format for tasks and task_groups — it is the recommended format since v1.0.0.
# dags/example_dag_factory.yml
default:
default_args:
start_date: 2024-11-11
basic_example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
catchup: false
task_groups:
- group_name: "example_task_group"
tooltip: "this is an example task group"
dependencies: [task_1]
tasks:
- task_id: "task_1"
operator: airflow.operators.bash.BashOperator
bash_command: "echo 1"
- task_id: "task_2"
operator: airflow.operators.bash.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
- task_id: "task_3"
operator: airflow.operators.bash.BashOperator
bash_command: "echo 3"
dependencies: [task_1]
task_group_name: "example_task_group"
| Field | Where | Purpose |
|---|---|---|
default | top-level | Shared DAG-level args applied to every DAG in this file |
default_args | DAG or default block | Standard Airflow default_args (owner, retries, start_date, ...) |
schedule | DAG | Cron expression, preset (@daily), Dataset list, or __type__ timetable |
catchup / description / tags | DAG | Standard Airflow DAG kwargs |
tasks | DAG | List of task dicts; each requires task_id and operator |
operator | task | Full import path to operator class (e.g. airflow.operators.bash.BashOperator) |
dependencies | task / task_group | List of upstream task_ids or group_names |
task_groups | DAG | List of group dicts; each requires group_name |
task_group_name | task | Assigns a task to a task group |
Tasks do not need to be ordered by dependency in the YAML — dag-factory resolves the DAG topology.
Pre-1.0 dictionary format (where tasks is a dict keyed by task_id) still works for backward compatibility, but prefer the list format for new code.
There are four ways to set defaults, in precedence order (highest first):
default_args / DAG-level keys inside an individual DAGdefault: block in the same YAML filedefaults_config_dict= argument to load_yaml_dagsdefaults.yml (or defaults.yaml) file via defaults_config_path= (or auto-detected next to the DAG YAML)Note: loader argument names and several other field names changed in v1.0.0. See reference/migration.md if you're working on an older project.
default Block in the Same FilePowerful for templating multiple DAGs from one file:
default:
default_args:
owner: "data-team"
start_date: 2025-01-01
retries: 2
catchup: false
schedule: "@daily"
dag_one:
description: "first DAG"
tasks:
- task_id: t1
operator: airflow.operators.bash.BashOperator
bash_command: "echo one"
dag_two:
description: "second DAG"
tasks:
- task_id: t1
operator: airflow.operators.bash.BashOperator
bash_command: "echo two"
defaults.yml FilePlace a defaults.yml next to the DAG YAML, or point defaults_config_path at a parent directory. dag-factory merges all defaults.yml files walking up the directory tree, with the file closest to the DAG YAML winning. DAG-level args (e.g. schedule, catchup) go at the root of defaults.yml; per-task defaults go under default_args.
# defaults.yml
schedule: 0 1 * * *
catchup: false
default_args:
start_date: '2024-12-31'
owner: data-team
Reference any operator by its full Python import path. dag-factory passes all other task keys as kwargs to that operator.
tasks:
- task_id: begin
operator: airflow.providers.standard.operators.empty.EmptyOperator
- task_id: make_bread
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Sourdough'
The operator's package must be installed and importable. For Airflow 3, prefer airflow.providers.standard.operators.* over the legacy airflow.operators.* paths — the dagfactory convert CLI rewrites these automatically.
Specify the operator path and pass kwargs directly. As of v1.0, dag-factory no longer does legacy type casting — use __type__ for nested k8s objects.
tasks:
- task_id: hello-world-pod
operator: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator
image: "python:3.12-slim"
cmds: ["python", "-c"]
arguments: ["print('hi')"]
name: example-pod
namespace: default
container_resources:
__type__: kubernetes.client.models.V1ResourceRequirements
limits: {cpu: "1", memory: "1024Mi"}
requests: {cpu: "0.5", memory: "512Mi"}
Use expand and partial keys on a task to map dynamically. dag-factory has two distinct ways to reference an upstream task's output:
task_id.output — XCom-style reference, used inside expand op_args / op_kwargs (and the equivalent kwargs of other operators).+task_id — bare value reference, used when the value sits directly under expand (e.g. expand: {number: +numbers_list}) or as a TaskFlow decorator argument.Don't mix them: +request won't resolve inside op_args, and request.output won't resolve as a bare expand value.
dynamic_task_map:
default_args:
start_date: 2025-01-01
schedule: "0 3 * * *"
tasks:
- task_id: request
operator: airflow.providers.standard.operators.python.PythonOperator
python_callable_name: make_list
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
- task_id: process
operator: airflow.providers.standard.operators.python.PythonOperator
python_callable_name: consume_value
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
partial:
op_kwargs:
fixed_param: "test"
expand:
op_args: request.output # XCom-style — used inside op_args / op_kwargs
dependencies: [request]
Bare-value form (TaskFlow decorator tasks, or any non-op_args mapping):
tasks:
- task_id: numbers_list
decorator: airflow.sdk.definitions.decorators.task
python_callable: sample.build_numbers_list
- task_id: double_number
decorator: airflow.sdk.definitions.decorators.task
python_callable: sample.double
expand:
number: +numbers_list # + resolves to upstream task `numbers_list`'s XComArg
For named map indices (Airflow 2.9+), set map_index_template: "{{ task.custom_mapping_key }}" and have the callable assign context["custom_mapping_key"].
Tested patterns: simple mapping, task-generated mapping, repeated mapping, partial, multiple-parameter mapping, map_index_template.
Unsupported / untested: mapping over task groups, zipping, transforming expanding data.
Use inlets / outlets on tasks to declare dataset producers, and a list of dataset URIs as schedule to consume them.
producer_dag:
default_args:
start_date: '2024-01-01'
schedule: "0 5 * * *"
catchup: false
tasks:
- task_id: task_1
operator: airflow.operators.bash.BashOperator
bash_command: "echo 1"
outlets: ['s3://bucket_example/raw/dataset1.json']
consumer_dag:
default_args:
start_date: '2024-01-01'
schedule: ['s3://bucket_example/raw/dataset1.json']
catchup: false
tasks:
- task_id: task_1
operator: airflow.operators.bash.BashOperator
bash_command: "echo 'consumer'"
Nesting the logical operators __and__ / __or__ under datasets key.
schedule:
datasets:
__or__:
- __and__:
- s3://bucket-cjmm/raw/dataset_custom_1
- s3://bucket-cjmm/raw/dataset_custom_2
- s3://bucket-cjmm/raw/dataset_custom_3
Three styles, all valid at the DAG, TaskGroup, or Task level (or under default_args):
- task_id: task_1
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_1"
on_failure_callback: include.custom_callbacks.output_standard_message
With kwargs:
- task_id: task_2
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_2"
on_success_callback:
callback: include.custom_callbacks.output_custom_message
param1: "Task status"
param2: "Successful!"
- task_id: task_3
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_3"
on_retry_callback_name: output_standard_message
on_retry_callback_file: /usr/local/airflow/include/custom_callbacks.py
- task_id: task_4
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_4"
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: slack_conn_id
text: ":red_circle: Task Failed."
channel: "#channel"
The provider package must be installed.
__type__)For anything that isn't a simple scalar — datetime, timedelta, Asset, timetables, k8s objects — use the generalized object syntax:
start_date:
__type__: datetime.datetime
year: 2025
month: 1
day: 1
execution_timeout:
__type__: datetime.timedelta
hours: 1
schedule:
__type__: airflow.timetables.trigger.CronTriggerTimetable
cron: "0 1 * * 3"
timezone: UTC
__type__ is the full import path to the class__args__ is a list of positional arguments__type__: builtins.list with an items: keyDon't use these YAML keys for your own data — dag-factory reserves them: __type__, __args__, __join__, __and__, __or__. The key items is also reserved when used inside a __type__: builtins.list block — don't add a custom field named items to a typed list construction.
After installing, the dagfactory CLI is on PATH:
| Command | When to Use |
|---|---|
dagfactory --version | Confirm install / version |
dagfactory lint <path> | Validate YAML syntax for a file or directory |
dagfactory lint <path> --verbose | Show a per-file table of results |
dagfactory convert <path> | Show diffs to migrate Airflow 2 → 3 import paths |
dagfactory convert <path> --override | Apply the conversions in place |
# 1. Lint YAML
dagfactory lint dags/
# 2. Have Airflow parse to catch operator/import errors
# (Astro CLI users)
astro dev parse
dagfactory lint only checks YAML syntax — operator import errors and missing kwargs surface at Airflow parse time.
ModuleNotFoundErrorCause: Provider package not installed, or wrong import path.
Fix: Install the provider (pip install apache-airflow-providers-...) and verify the path. For Airflow 3, run dagfactory convert to update legacy airflow.operators.* paths to airflow.providers.standard.operators.*.
Cause: Loader file missing or globals_dict=globals() not passed.
Fix: Ensure a Python file in dags/ calls load_yaml_dags(globals_dict=globals(), ...). Check astro dev parse (or airflow dags list-import-errors) for parse errors.
Cause: A scalar string is being passed where a Python object is expected (e.g. start_date: "2025-01-01" for a field that needs datetime).
Fix: Use __type__: datetime.datetime (or datetime.timedelta etc.) per Custom Python Objects.
Cause: Airflow <2.9, dag-factory <0.22, or using legacy !and/!or keys.
Fix: Upgrade and rename to __and__ / __or__.
defaults.yml not merging as expectedCause: defaults_config_path not pointing at a parent directory of the DAG YAML.
Fix: Set defaults_config_path to the highest ancestor folder you want included; dag-factory walks the tree from DAG file → ancestor and merges in that order, with files closer to the DAG winning.
Before finishing, verify with the user:
dagfactory lint dags/ passesdags/ and calls load_yaml_dags(globals_dict=globals(), ...)requirements.txtaf CLI validation. Use when YAML can't express what you need.