Apache Airflow, dbt, Prefect, Dagster, and modern data orchestration for production data pipelines
Build production data pipelines with Apache Airflow, dbt, and orchestration tools. Use when creating ETL workflows, data quality checks, or integrating SQL transformations with Python tasks.
/plugin marketplace add pluginagentmarketplace/custom-plugin-data-engineer/plugin install data-engineer-development-assistant@pluginagentmarketplace-data-engineerThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/config.yamlassets/schema.jsonreferences/GUIDE.mdreferences/PATTERNS.mdscripts/validate.pyProduction-grade data pipeline development with Apache Airflow, dbt, and modern orchestration patterns.
# Apache Airflow 2.8+ TaskFlow API
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import pandas as pd
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email": ["alerts@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
@dag(
dag_id="etl_pipeline_v2",
schedule="0 2 * * *", # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["production", "etl"],
default_args=default_args,
doc_md="""
## Daily Sales ETL Pipeline
Extracts from PostgreSQL, transforms, loads to S3.
### Data Quality Checks
- Row count validation
- Schema validation
- Freshness check
"""
)
def etl_pipeline():
@task
def extract_sales(execution_date: str = None) -> dict:
"""Extract daily sales from PostgreSQL."""
hook = PostgresHook(postgres_conn_id="postgres_prod")
query = """
SELECT order_id, customer_id, product_id,
quantity, unit_price, order_date
FROM orders
WHERE order_date = %(date)s
"""
df = hook.get_pandas_df(query, parameters={"date": execution_date})
if df.empty:
raise ValueError(f"No data for {execution_date}")
return {"path": f"/tmp/extract_{execution_date}.parquet", "count": len(df)}
@task
def transform_sales(extract_result: dict) -> dict:
"""Apply business transformations."""
df = pd.read_parquet(extract_result["path"])
# Business logic
df["total_amount"] = df["quantity"] * df["unit_price"]
df["discount_tier"] = pd.cut(
df["total_amount"],
bins=[0, 100, 500, float("inf")],
labels=["small", "medium", "large"]
)
output_path = extract_result["path"].replace("extract", "transform")
df.to_parquet(output_path, index=False)
return {"path": output_path, "count": len(df)}
@task
def load_to_s3(transform_result: dict, execution_date: str = None) -> str:
"""Load to S3 with partitioning."""
s3_hook = S3Hook(aws_conn_id="aws_prod")
s3_key = f"sales/year={execution_date[:4]}/month={execution_date[5:7]}/day={execution_date[8:10]}/data.parquet"
s3_hook.load_file(
filename=transform_result["path"],
key=s3_key,
bucket_name="data-lake-prod",
replace=True
)
return f"s3://data-lake-prod/{s3_key}"
@task
def validate_load(s3_path: str) -> bool:
"""Validate data was loaded correctly."""
s3_hook = S3Hook(aws_conn_id="aws_prod")
# Check file exists and has content
key = s3_path.replace("s3://data-lake-prod/", "")
metadata = s3_hook.get_key(key, bucket_name="data-lake-prod")
if metadata.content_length < 100:
raise ValueError(f"File too small: {metadata.content_length} bytes")
return True
# DAG flow
extracted = extract_sales()
transformed = transform_sales(extracted)
loaded = load_to_s3(transformed)
validate_load(loaded)
# Instantiate DAG
etl_pipeline()
┌─────────────────────────────────────────────────────────────┐
│ Airflow Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Scheduler │───▶│ Executor │───▶│ Workers │ │
│ │ │ │ (Celery/K8s) │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Metadata │ │ Logs │ │
│ │ Database │ │ Storage │ │
│ │ (Postgres) │ │ (S3) │ │
│ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Webserver │ ← UI for monitoring │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
from airflow.sensors.sql import SqlSensor
from airflow.sensors.s3 import S3KeySensor
from airflow.providers.http.sensors.http import HttpSensor
@dag(...)
def sensor_pipeline():
# Wait for upstream data
wait_for_source = SqlSensor(
task_id="wait_for_source_data",
conn_id="postgres_prod",
sql="""
SELECT COUNT(*) > 0
FROM source_table
WHERE date = '{{ ds }}'
""",
mode="reschedule", # Release worker while waiting
poke_interval=300, # Check every 5 minutes
timeout=3600 * 6, # 6 hour timeout
exponential_backoff=True,
)
# Wait for file in S3
wait_for_file = S3KeySensor(
task_id="wait_for_s3_file",
bucket_name="source-bucket",
bucket_key="data/{{ ds }}/complete.flag",
aws_conn_id="aws_prod",
mode="reschedule",
poke_interval=60,
timeout=3600,
)
# Wait for API to be healthy
check_api = HttpSensor(
task_id="check_api_health",
http_conn_id="api_conn",
endpoint="/health",
response_check=lambda response: response.json()["status"] == "healthy",
mode="poke",
poke_interval=30,
timeout=300,
)
[wait_for_source, wait_for_file, check_api] >> process_data()
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
@dag(...)
def dynamic_pipeline():
@task
def get_partitions() -> list:
"""Dynamically determine partitions to process."""
return ["us", "eu", "apac"]
@task
def process_partition(partition: str) -> dict:
"""Process single partition."""
# Processing logic
return {"partition": partition, "status": "success"}
@task
def aggregate_results(results: list) -> None:
"""Combine results from all partitions."""
for result in results:
print(f"Partition {result['partition']}: {result['status']}")
partitions = get_partitions()
# Dynamic task mapping (Airflow 2.3+)
processed = process_partition.expand(partition=partitions)
aggregate_results(processed)
# Alternative: Task Groups for organization
@dag(...)
def grouped_pipeline():
with TaskGroup("extraction") as extract_group:
extract_users = extract("users")
extract_orders = extract("orders")
extract_products = extract("products")
with TaskGroup("transformation") as transform_group:
transform_all = transform()
with TaskGroup("loading") as load_group:
load_warehouse = load()
extract_group >> transform_group >> load_group
-- models/staging/stg_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
)
}}
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
cleaned AS (
SELECT
order_id,
customer_id,
COALESCE(product_id, 'UNKNOWN') AS product_id,
quantity,
unit_price,
quantity * unit_price AS total_amount,
order_date,
updated_at
FROM source
WHERE order_id IS NOT NULL
)
SELECT * FROM cleaned
# dbt_project.yml
name: 'data_warehouse'
version: '1.0.0'
profile: 'production'
model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
models:
data_warehouse:
staging:
+materialized: view
+schema: staging
marts:
+materialized: table
+schema: analytics
vars:
start_date: '2024-01-01'
# Airflow + dbt integration
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
@dag(...)
def dbt_pipeline():
dbt_transform = DbtTaskGroup(
group_id="dbt_transform",
project_config=ProjectConfig(
dbt_project_path="/opt/dbt/project",
),
profile_config=ProfileConfig(
profile_name="production",
target_name="prod",
),
default_args={"retries": 2},
)
extract() >> dbt_transform >> notify()
from airflow.decorators import dag, task
from great_expectations.checkpoint import Checkpoint
import great_expectations as gx
@dag(...)
def quality_pipeline():
@task
def validate_data(dataset_path: str) -> dict:
"""Run Great Expectations validation."""
context = gx.get_context()
# Define expectations
validator = context.sources.pandas_default.read_csv(dataset_path)
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between(
"quantity", min_value=1, max_value=1000
)
validator.expect_column_values_to_be_in_set(
"status", ["pending", "completed", "cancelled"]
)
results = validator.validate()
if not results.success:
raise ValueError(f"Data quality check failed: {results}")
return {"success": True, "stats": results.statistics}
@task.branch
def check_quality_result(result: dict) -> str:
"""Branch based on quality results."""
if result.get("success"):
return "proceed_to_load"
return "alert_and_stop"
| Tool | Purpose | Version (2025) |
|---|---|---|
| Apache Airflow | Workflow orchestration | 2.8+ |
| dbt Core | SQL transformation | 1.7+ |
| Prefect | Modern orchestration | 2.14+ |
| Dagster | Data-aware orchestration | 1.5+ |
| Great Expectations | Data quality | 0.18+ |
| Airbyte | Data integration | 0.55+ |
| Fivetran | Managed EL | Latest |
| Apache NiFi | Data flow automation | 2.0+ |
Week 1: ETL vs ELT concepts, batch vs streaming
Week 2: Airflow basics, DAGs, operators
Week 3: Connections, variables, XComs
Week 4: TaskFlow API, dynamic tasks
Week 5: Sensors, triggers, callbacks
Week 6: dbt fundamentals, models, tests
Week 7: dbt macros, packages, documentation
Week 8: Data quality frameworks
Week 9: Airflow + dbt integration (Cosmos)
Week 10: Custom operators, plugins
Week 11: Performance tuning, parallelism
Week 12: CI/CD for pipelines
Week 13: Monitoring, alerting, SLAs
Week 14: Multi-environment deployment
@task
def load_data_idempotent(data: dict, execution_date: str) -> None:
"""
Idempotent load: can be safely re-run without duplicates.
"""
hook = PostgresHook(postgres_conn_id="postgres")
# Delete existing data for this run
hook.run(
"DELETE FROM fact_sales WHERE load_date = %(date)s",
parameters={"date": execution_date}
)
# Insert new data
hook.insert_rows(
table="fact_sales",
rows=data["rows"],
target_fields=["order_id", "amount", "load_date"]
)
from airflow.exceptions import AirflowSensorTimeout
from airflow.models import Variable
@dag(
sla_miss_callback=sla_alert_callback,
default_args={
"sla": timedelta(hours=4), # Pipeline SLA
}
)
def sla_pipeline():
@task(sla=timedelta(hours=1)) # Task-level SLA
def critical_transform():
pass
@task.on_failure_callback
def alert_on_failure(context):
"""Send alert on task failure."""
task_instance = context["task_instance"]
exception = context["exception"]
slack_webhook = Variable.get("slack_webhook")
message = f"""
:red_circle: Pipeline Failed
DAG: {task_instance.dag_id}
Task: {task_instance.task_id}
Error: {str(exception)[:500]}
"""
# Send to Slack/PagerDuty
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Task Stuck | Task in "queued" state | No available workers | Scale workers, check executor |
| DAG Not Found | DAG missing in UI | Parse error, wrong folder | Check logs, fix syntax |
| Connection Error | Task fails on connect | Wrong credentials, network | Verify connection in UI |
| XCom Too Large | Task fails after success | Returning large data | Use external storage |
| Zombie Tasks | Tasks never complete | Worker died mid-task | Enable heartbeat, set timeout |
# 1. Check DAG parse errors
airflow dags list-import-errors
# 2. Test DAG syntax
python /path/to/dag.py
# 3. Test specific task
airflow tasks test dag_id task_id 2024-01-01
# 4. Check task logs
airflow tasks logs dag_id task_id 2024-01-01
# 5. Clear failed tasks for retry
airflow tasks clear dag_id -s 2024-01-01 -e 2024-01-01
# 6. Check scheduler health
airflow jobs check --job-type SchedulerJob --limit 1
# 7. List running tasks
airflow tasks states-for-dag-run dag_id 2024-01-01
# Common log patterns and meanings
# ✅ Success
# [2024-01-01 02:00:00] INFO - Task completed successfully
# ⚠️ Retry
# [2024-01-01 02:00:00] WARNING - Retry 1/3: Connection refused
# [2024-01-01 02:05:00] INFO - Task completed on retry 2
# ❌ Failure after retries
# [2024-01-01 02:15:00] ERROR - Task failed after 3 retries
# [2024-01-01 02:15:00] ERROR - Exception: ConnectionError(...)
# 🔍 Resource issue
# [2024-01-01 02:00:00] WARNING - Celery worker memory: 95%
# [2024-01-01 02:00:00] ERROR - Worker killed by OOM
import pytest
from datetime import datetime
from airflow.models import DagBag, TaskInstance
from airflow.utils.state import State
from unittest.mock import patch, MagicMock
class TestDAGIntegrity:
"""Test DAG structure and configuration."""
@pytest.fixture
def dagbag(self):
return DagBag(dag_folder="dags/", include_examples=False)
def test_no_import_errors(self, dagbag):
assert len(dagbag.import_errors) == 0, f"Import errors: {dagbag.import_errors}"
def test_dag_has_required_tags(self, dagbag):
for dag_id, dag in dagbag.dags.items():
assert "production" in dag.tags or "development" in dag.tags
def test_dag_has_owner(self, dagbag):
for dag_id, dag in dagbag.dags.items():
assert dag.default_args.get("owner") is not None
def test_dag_has_retries(self, dagbag):
for dag_id, dag in dagbag.dags.items():
assert dag.default_args.get("retries", 0) >= 2
class TestTaskLogic:
"""Test individual task logic."""
@patch("dags.etl_pipeline.PostgresHook")
def test_extract_returns_data(self, mock_hook):
from dags.etl_pipeline import extract_sales
# Arrange
mock_hook.return_value.get_pandas_df.return_value = pd.DataFrame({
"order_id": [1, 2, 3],
"amount": [100, 200, 300]
})
# Act
result = extract_sales(execution_date="2024-01-01")
# Assert
assert result["count"] == 3
assert "path" in result
@patch("dags.etl_pipeline.PostgresHook")
def test_extract_raises_on_empty(self, mock_hook):
from dags.etl_pipeline import extract_sales
mock_hook.return_value.get_pandas_df.return_value = pd.DataFrame()
with pytest.raises(ValueError, match="No data"):
extract_sales(execution_date="2024-01-01")
# ✅ DO: Make tasks atomic and idempotent
@task
def process_chunk(chunk_id: str, execution_date: str):
# Can be re-run safely
clear_existing(chunk_id, execution_date)
process_and_insert(chunk_id, execution_date)
# ✅ DO: Use meaningful task IDs
extract_customer_data = ... # Good
task1 = ... # Bad
# ✅ DO: Keep DAGs simple, split complex pipelines
# Instead of one 50-task DAG, create multiple focused DAGs
# ❌ DON'T: Put business logic in DAG file
# Keep DAG definition separate from processing code
# ❌ DON'T: Return large data via XCom
@task
def bad_practice():
return huge_dataframe # Don't do this
@task
def good_practice():
save_to_s3(huge_dataframe)
return {"s3_path": "s3://bucket/data.parquet"}
# ✅ DO: Use appropriate retry configuration
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=60),
}
# ✅ DO: Add failure callbacks
@task(on_failure_callback=alert_team)
def critical_task():
pass
# ✅ DO: Set reasonable timeouts
@task(execution_timeout=timedelta(hours=2))
def long_running_task():
pass
After mastering ETL Tools:
big-data - Scale with Sparkdata-warehousing - Design data modelsmlops - Orchestrate ML pipelinesmonitoring-observability - Production observabilitySkill Certification Checklist:
Use when working with Payload CMS projects (payload.config.ts, collections, fields, hooks, access control, Payload API). Use when debugging validation errors, security issues, relationship queries, transactions, or hook behavior.