Analyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.
Analyzes Apache Airflow DAGs to validate structure, optimize performance, and recommend best practices.
npx claudepluginhub a5c-ai/babysitterThis skill is limited to using the following tools:
README.mdAnalyzes, validates, and optimizes Apache Airflow DAGs for reliability and performance.
This skill examines Apache Airflow DAG definitions to identify performance bottlenecks, reliability issues, and best practice violations. It provides recommendations for task dependency optimization, parallelism configuration, error handling, and resource management.
{
"dagCode": {
"type": "string",
"description": "The Python DAG definition code",
"required": true
},
"dagId": {
"type": "string",
"description": "The DAG identifier"
},
"executionHistory": {
"type": "object",
"description": "Historical execution metrics",
"properties": {
"runs": {
"type": "array",
"items": {
"dagRunId": "string",
"executionDate": "string",
"duration": "number",
"state": "string",
"taskDurations": "object"
}
}
}
},
"clusterConfig": {
"type": "object",
"properties": {
"workerCount": "number",
"executorType": "string",
"poolConfigs": "object",
"airflowVersion": "string"
}
},
"analysisScope": {
"type": "array",
"items": {
"type": "string",
"enum": ["structure", "performance", "reliability", "resources", "security"]
},
"default": ["structure", "performance", "reliability"]
}
}
{
"validationResults": {
"errors": {
"type": "array",
"items": {
"code": "string",
"message": "string",
"line": "number",
"severity": "error"
}
},
"warnings": {
"type": "array",
"items": {
"code": "string",
"message": "string",
"line": "number",
"severity": "warning"
}
}
},
"optimizations": {
"type": "array",
"items": {
"category": "string",
"current": "string",
"recommended": "string",
"impact": "high|medium|low",
"effort": "string",
"codeChange": "string"
}
},
"recommendedConfig": {
"type": "object",
"properties": {
"poolSize": "number",
"maxActiveRuns": "number",
"concurrency": "number",
"defaultRetries": "number",
"executionTimeout": "string"
}
},
"dependencyGraph": {
"type": "object",
"properties": {
"nodes": "array",
"edges": "array",
"criticalPath": "array",
"parallelGroups": "array"
}
},
"metrics": {
"taskCount": "number",
"maxDepth": "number",
"parallelizationRatio": "number",
"estimatedDuration": "string"
},
"securityFindings": {
"type": "array",
"items": {
"severity": "high|medium|low",
"finding": "string",
"recommendation": "string"
}
}
}
{
"dagCode": "from airflow import DAG\nfrom airflow.operators.python import PythonOperator\n...",
"dagId": "daily_etl_pipeline"
}
{
"dagCode": "...",
"dagId": "daily_etl_pipeline",
"executionHistory": {
"runs": [
{
"dagRunId": "manual__2024-01-15",
"duration": 3600,
"state": "success",
"taskDurations": {
"extract": 600,
"transform": 1800,
"load": 1200
}
}
]
}
}
{
"dagCode": "...",
"dagId": "complex_ml_pipeline",
"clusterConfig": {
"workerCount": 8,
"executorType": "KubernetesExecutor",
"poolConfigs": {
"default_pool": {"slots": 128},
"ml_pool": {"slots": 32}
},
"airflowVersion": "2.8.0"
},
"analysisScope": ["structure", "performance", "reliability", "resources", "security"]
}
| Rule | Severity | Description |
|---|---|---|
| DAG-001 | Error | Missing DAG default_args |
| DAG-002 | Error | Invalid schedule_interval |
| DAG-003 | Warning | Catchup enabled for long-running DAG |
| DAG-004 | Warning | No email on failure configured |
| DAG-005 | Info | Consider using @dag decorator |
| Rule | Severity | Description |
|---|---|---|
| TSK-001 | Error | Task has no upstream or downstream |
| TSK-002 | Warning | Task missing retries configuration |
| TSK-003 | Warning | Execution timeout not set |
| TSK-004 | Warning | PythonOperator with no pool |
| TSK-005 | Info | Consider TaskGroup for related tasks |
| Rule | Severity | Description |
|---|---|---|
| SEN-001 | Warning | Sensor in poke mode (use reschedule) |
| SEN-002 | Warning | Sensor missing timeout |
| SEN-003 | Info | Consider deferrable operator |
| SEN-004 | Warning | External sensor without soft_fail |
| Rule | Severity | Description |
|---|---|---|
| SEC-001 | Error | Hardcoded credentials |
| SEC-002 | Warning | Using Variable.get without default |
| SEC-003 | Warning | Connection ID not parameterized |
| SEC-004 | Info | Consider Secrets Backend |
# Before: Sequential execution
task1 >> task2 >> task3 >> task4
# After: Parallel execution where possible
task1 >> [task2, task3] >> task4
# Before: Poke mode (blocks worker)
FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
mode='poke' # Bad
)
# After: Reschedule mode (releases worker)
FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
mode='reschedule', # Good
poke_interval=300
)
# Best: Deferrable (Airflow 2.2+)
from airflow.sensors.filesystem import FileSensor
FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
deferrable=True
)
# Before: Flat task structure
extract_orders >> transform_orders >> load_orders
extract_products >> transform_products >> load_products
# After: TaskGroups for organization
with TaskGroup('orders') as orders_group:
extract >> transform >> load
with TaskGroup('products') as products_group:
extract >> transform >> load
# Before: Static task generation
for i in range(10):
PythonOperator(task_id=f'process_{i}', ...)
# After: Dynamic task mapping
@task
def process_item(item):
return item * 2
process_item.expand(item=[1, 2, 3, 4, 5])
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'execution_timeout': timedelta(hours=2),
'sla': timedelta(hours=1),
}
| Workload Type | Recommended Pool Size |
|---|---|
| Heavy compute | 2-4 per worker |
| I/O bound | 8-16 per worker |
| API calls | Rate limit based |
| Sensors | Separate pool, high slots |
etl-elt-pipeline.js)ab-testing-pipeline.js)pipeline-migration.js)data-quality-framework.js)Activates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.