From astronomer-data
Migrates Apache Airflow 2.x DAG code to 3.x using Ruff AIR rules for auto-fixes on imports, operators, hooks, context; refactors metadata DB access via Task Execution API and fixes scheduling/config gotchas.
npx claudepluginhub astronomer/agents --plugin astronomer-dataThis skill uses the workspace's default tool permissions.
This skill helps migrate **Airflow 2.x DAG code** to **Airflow 3.x**, focusing on code changes (imports, operators, hooks, context, API usage).
Migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai 0.1.0+, replacing LLM decorators (@task.llm, @task.agent, @task.llm_branch, @task.embed), updating imports, requirements, and switching to pydanticai connections.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use for data pipelines, workflow orchestration, or batch jobs.
Builds production-ready Apache Airflow DAGs with patterns for operators, sensors, testing, and deployment. For data pipelines, workflow orchestration, and batch jobs.
Share bugs, ideas, or general feedback.
This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).
Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.
ruff check --preview --select AIR --fix --unsafe-fixes .AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True if you need Airflow 2-style cron data intervals..airflowignore syntax changed from regexp to glob; set AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp if you must keep regexp behavior./auth/ prefix (e.g. /auth/oauth-authorized/google).import common from dags/common/ no longer work on Astro. Use fully qualified imports: import dags.common.Airflow 3 changes how components talk to the metadata database:
Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via sync_to_async(...) (or otherwise ensure hook calls are async-safe).
Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x
When scanning DAGs, custom operators, and @task functions, look for:
provide_session, create_session, @provide_sessionfrom airflow.settings import Sessionfrom airflow.settings import enginesession.query(DagModel)..., session.query(DagRun)...Preferred for rich metadata access patterns. Add to requirements.txt:
apache-airflow-client==<your-airflow-runtime-version>
Example usage:
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
class ListDagsOperator(BaseOperator):
def execute(self, context):
config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
with airflow_client.client.ApiClient(config) as api_client:
dag_api = DAGApi(api_client)
dags = dag_api.get_dags(limit=10)
self.log.info("Found %d DAGs", len(dags.dags))
For simple cases, call the REST API directly using requests:
from airflow.sdk import task
import os
import requests
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
@task
def list_dags_via_api() -> None:
response = requests.get(
f"{_HOST}/api/v2/dags",
headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
params={"limit": 10}
)
response.raise_for_status()
print(response.json())
Use Ruff's Airflow rules to detect and fix many breaking changes automatically.
Commands to run (via uv) against the project root:
# Auto-fix all detectable Airflow issues (safe + unsafe)
ruff check --preview --select AIR --fix --unsafe-fixes .
# Check remaining Airflow issues without fixing
ruff check --preview --select AIR .
For detailed code examples and migration patterns, see:
airflow.cfg section moves, renames, and removals| Airflow 2.x | Airflow 3 |
|---|---|
airflow.operators.dummy_operator.DummyOperator | airflow.providers.standard.operators.empty.EmptyOperator |
airflow.operators.bash.BashOperator | airflow.providers.standard.operators.bash.BashOperator |
airflow.operators.python.PythonOperator | airflow.providers.standard.operators.python.PythonOperator |
airflow.decorators.dag | airflow.sdk.dag |
airflow.decorators.task | airflow.sdk.task |
airflow.datasets.Dataset | airflow.sdk.Asset |
| Removed Key | Replacement |
|---|---|
execution_date | context["dag_run"].logical_date |
tomorrow_ds / yesterday_ds | Use ds with date math: macros.ds_add(ds, 1) / macros.ds_add(ds, -1) |
prev_ds / next_ds | prev_start_date_success or timetable API |
triggering_dataset_events | triggering_asset_events |
templates_dict | context["params"] |
Asset-triggered runs: logical_date may be None; use context["dag_run"].logical_date defensively.
Cannot trigger with future logical_date: Use logical_date=None and rely on run_id instead.
Cron note: for scheduled runs using cron, logical_date semantics differ under CronTriggerTimetable (aligning logical_date with run_after). If you need Airflow 2-style cron data intervals, consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.
| Setting | Airflow 2 Default | Airflow 3 Default |
|---|---|---|
schedule | timedelta(days=1) | None |
catchup | True | False |
on_success_callback no longer runs on skip; use on_skipped_callback if needed.@teardown with TriggerRule.ALWAYS not allowed; teardowns now execute even if DAG run terminated early.