Plan and coordinate multi-step ETL pipelines with dependency management.
Designs and coordinates multi-step ETL pipelines with dependency analysis and parallel execution. Use it to plan complex data workflows, manage step dependencies, and orchestrate pipeline runs with failure handling and state tracking.
/plugin marketplace add majesticlabs-dev/majestic-marketplace/plugin install majestic-data@majestic-marketplaceAutonomous agent that designs and coordinates data pipeline execution.
from dataclasses import dataclass
from typing import Set
@dataclass
class PipelineStep:
name: str
inputs: Set[str] # Tables/files this step reads
outputs: Set[str] # Tables/files this step produces
estimated_duration: int # seconds
can_parallelize: bool = True
def build_execution_plan(steps: list[PipelineStep]) -> list[list[str]]:
"""Build execution waves based on dependencies."""
completed = set()
waves = []
remaining = {s.name: s for s in steps}
while remaining:
# Find steps whose inputs are all satisfied
ready = []
for name, step in remaining.items():
if step.inputs <= completed:
ready.append(name)
if not ready:
raise ValueError("Circular dependency detected")
waves.append(ready)
# Mark outputs as completed
for name in ready:
completed.update(remaining[name].outputs)
del remaining[name]
return waves
pipeline: daily_sales_etl
generated: 2024-01-15T10:00:00Z
steps:
- name: extract_orders
type: extract
source: postgres.orders
target: staging/orders.parquet
estimated_duration: 120s
- name: extract_customers
type: extract
source: postgres.customers
target: staging/customers.parquet
estimated_duration: 60s
- name: extract_products
type: extract
source: postgres.products
target: staging/products.parquet
estimated_duration: 30s
- name: transform_orders
type: transform
inputs: [staging/orders.parquet, staging/customers.parquet]
output: transformed/orders_enriched.parquet
depends_on: [extract_orders, extract_customers]
estimated_duration: 180s
- name: aggregate_sales
type: transform
inputs: [transformed/orders_enriched.parquet, staging/products.parquet]
output: marts/daily_sales.parquet
depends_on: [transform_orders, extract_products]
estimated_duration: 90s
- name: load_warehouse
type: load
source: marts/daily_sales.parquet
target: snowflake.sales.daily_summary
depends_on: [aggregate_sales]
estimated_duration: 60s
execution_waves:
- wave: 1
parallel: true
steps: [extract_orders, extract_customers, extract_products]
estimated_duration: 120s
- wave: 2
parallel: false
steps: [transform_orders]
estimated_duration: 180s
- wave: 3
parallel: false
steps: [aggregate_sales]
estimated_duration: 90s
- wave: 4
parallel: false
steps: [load_warehouse]
estimated_duration: 60s
total_estimated_duration: 450s
critical_path: [extract_orders, transform_orders, aggregate_sales, load_warehouse]
from enum import Enum
from datetime import datetime
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class PipelineRun:
run_id: str
pipeline_name: str
started_at: datetime
status: str
steps: dict[str, dict] # step_name -> status info
def to_checkpoint(self) -> dict:
"""Serialize for checkpoint file."""
return {
'run_id': self.run_id,
'pipeline': self.pipeline_name,
'started_at': self.started_at.isoformat(),
'status': self.status,
'steps': self.steps
}
@classmethod
def from_checkpoint(cls, data: dict) -> 'PipelineRun':
"""Resume from checkpoint file."""
return cls(
run_id=data['run_id'],
pipeline_name=data['pipeline'],
started_at=datetime.fromisoformat(data['started_at']),
status=data['status'],
steps=data['steps']
)
failure_policies:
# Retry configuration
retry:
max_attempts: 3
backoff: exponential
initial_delay: 30s
max_delay: 300s
# What to do when step fails
on_failure:
extract_orders:
action: retry
fallback: skip_downstream
transform_orders:
action: retry
fallback: fail_pipeline
load_warehouse:
action: retry
fallback: alert_and_continue
# Alerting
alerts:
- on: step_failure
channels: [slack]
- on: pipeline_failure
channels: [pagerduty, email]
# Pipeline Execution: daily_sales_etl
**Run ID:** run_20240115_100000
**Started:** 2024-01-15 10:00:00 UTC
**Status:** 🟢 SUCCESS
## Execution Timeline
10:00:00 ─┬─ extract_orders ─────────────────────────────── 10:02:05 ✅ ├─ extract_customers ─────────── 10:00:58 ✅ └─ extract_products ─── 10:00:28 ✅ 10:02:05 ─── transform_orders ─────────────────────────────────────── 10:05:12 ✅ 10:05:12 ─── aggregate_sales ─────────────────── 10:06:45 ✅ 10:06:45 ─── load_warehouse ─────────── 10:07:42 ✅
## Step Details
| Step | Duration | Rows | Status |
|------|----------|------|--------|
| extract_orders | 2m 5s | 52,340 | ✅ |
| extract_customers | 58s | 12,450 | ✅ |
| extract_products | 28s | 3,200 | ✅ |
| transform_orders | 3m 7s | 52,340 | ✅ |
| aggregate_sales | 1m 33s | 365 | ✅ |
| load_warehouse | 57s | 365 | ✅ |
**Total Duration:** 7m 42s
**Total Rows Processed:** 120,695
Receive pipeline request
Build execution plan
Execute waves
Report results
Task(subagent_type="majestic-data:research:source-analyzer",
prompt="Analyze source: postgres.orders for extraction planning")
Task(subagent_type="majestic-data:qa:data-validator",
prompt="Validate output: marts/daily_sales.parquet")
Task(subagent_type="majestic-data:qa:drift-detector",
prompt="Compare today's output to yesterday's baseline")
Designs feature architectures by analyzing existing codebase patterns and conventions, then providing comprehensive implementation blueprints with specific files to create/modify, component designs, data flows, and build sequences