From astronomer-data
Traces upstream data lineage for Airflow tables, columns, and DAGs via CLI commands, source code, and UI. Identifies SQL sources, external systems like Postgres and Salesforce.
npx claudepluginhub astronomer/agents --plugin astronomer-dataThis skill uses the workspace's default tool permissions.
Trace the origins of data - answer "Where does this data come from?"
Explores DataHub lineage: traces upstream/downstream data dependencies, performs impact analysis, root cause investigation, and maps pipelines.
Traces downstream data lineage for tables and DAGs to identify dependents, build impact trees, categorize criticality, and assess change risks before modifications.
Verifies ETL/ELT pipeline quality, data contracts, idempotency, and test coverage across dbt, Airflow, Dagster, and Prefect. Analyzes DAG structure, transformations, and data checks for PR reviews and audits.
Share bugs, ideas, or general feedback.
Trace the origins of data - answer "Where does this data come from?"
Determine what we're tracing:
Tables are typically populated by Airflow DAGs. Find the connection:
Search DAGs by name: Use af dags list and look for DAG names matching the table name
load_customers -> customers tableetl_daily_orders -> orders tableExplore DAG source code: Use af dags source <dag_id> to read the DAG definition
Check DAG tasks: Use af tasks list <dag_id> to see what operations the DAG performs
If you're running on Astro, the Lineage tab in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.
Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).
From the DAG code, identify source tables and systems:
SQL Sources (look for FROM clauses):
# In DAG code:
SELECT * FROM source_schema.source_table # <- This is an upstream source
External Sources (look for connection references):
S3Operator -> S3 bucket sourcePostgresOperator -> Postgres database sourceSalesforceOperator -> Salesforce API sourceHttpOperator -> REST API sourceFile Sources:
Recursively trace each source:
TARGET: analytics.orders_daily
^
+-- DAG: etl_daily_orders
^
+-- SOURCE: raw.orders (table)
| ^
| +-- DAG: ingest_orders
| ^
| +-- SOURCE: Salesforce API (external)
|
+-- SOURCE: dim.customers (table)
^
+-- DAG: load_customers
^
+-- SOURCE: PostgreSQL (external DB)
For each upstream source:
af dags statsWhen tracing a specific column:
source.col AS target_colCOALESCE(a.col, b.col) AS target_colSUM(detail.amount) AS total_amountOne-line answer: "This table is populated by DAG X from sources Y and Z"
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
| |
DAG: ingest_sfdc DAG: transform_sales
| Source | Type | Connection | Freshness | Owner |
|---|---|---|---|---|
| raw.orders | Table | Internal | 2h ago | data-team |
| Salesforce | API | salesforce_conn | Real-time | sales-ops |
Describe how data flows and transforms:
raw.orders via Salesforce API synctransform_orders cleans and dedupes into stg.ordersbuild_order_facts joins with dimensions into fct.orders