From harness-claude
Verifies ETL/ELT pipeline quality, data contracts, idempotency, and test coverage across dbt, Airflow, Dagster, and Prefect. Analyzes DAG structure, transformations, and data checks for PR reviews and audits.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Verify ETL/ELT pipeline quality, data contracts, idempotency, and test coverage. Analyzes DAG structure, transformation logic, and data quality checks across dbt, Airflow, Dagster, and Prefect pipelines.
Implements data quality validation with Great Expectations, dbt tests, and data contracts for pipelines, rules, and team agreements.
Designs data pipelines and ETL processes covering extraction, transformation, loading, data quality checks, orchestration, and patterns for batch, streaming, CDC, ELT. Useful for building pipelines, data flows, syncing, or moving data between systems.
Designs scalable data pipelines for batch/streaming processing with ETL/ELT/Lambda architectures, Airflow/Prefect orchestration, dbt/Spark transforms, Delta Lake storage, and Great Expectations quality checks.
Share bugs, ideas, or general feedback.
Verify ETL/ELT pipeline quality, data contracts, idempotency, and test coverage. Analyzes DAG structure, transformation logic, and data quality checks across dbt, Airflow, Dagster, and Prefect pipelines.
Resolve project root. Use provided path or cwd.
Detect pipeline framework. Scan for framework indicators:
dbt_project.yml, profiles.yml, models/ with .sql files, macros/dags/ directory, files importing from airflow, airflow.cfgdagster/ directory, files importing from dagster, workspace.yamlfrom prefect, prefect.yaml, flows/pipelines/, etl/, src/**/transforms/** without known framework markersMap DAG structure. For the detected framework:
ref() and source() calls to build the model dependency graph>> operators and set_downstream/set_upstream calls to build task dependencies@asset decorators and deps parameters to build the asset graph@flow and @task decorators to build the flow graphIdentify data sources and sinks. Catalog:
Detect configuration. Read pipeline configuration for:
Report detection summary:
Framework: dbt 1.7 + Airflow 2.8
Models: 45 dbt models (12 staging, 18 intermediate, 15 mart)
DAGs: 3 Airflow DAGs (daily-etl, hourly-metrics, weekly-reports)
Sources: 2 PostgreSQL databases, 1 S3 bucket, 1 Stripe API
Sinks: BigQuery (analytics warehouse)
Check idempotency. For each pipeline/model:
INSERT operations without corresponding DELETE or MERGE logic?incremental materialization with proper unique_key?Check error handling. Evaluate:
retries, retry_delay; Prefect: retries, retry_delay_seconds)Check data contracts. Verify schema enforcement:
source tests; custom: schema validation)contracts; custom: schema assertions)Check pipeline dependencies. Analyze the DAG for:
Check freshness and SLAs. Evaluate:
freshness checks defined for sources? (dbt: loaded_at_field, warn_after, error_after)sla parameter)Classify findings by severity:
Audit existing data tests. For each framework:
unique, not_null, accepted_values, relationships, custom)@asset_check decorators and check_specsCalculate test coverage. Measure:
Check for missing critical tests. Flag models that should have specific tests:
WHERE clauses: must have tests verifying the filter logicValidate pipeline testability. Assess:
Check for data quality patterns:
Generate pipeline lineage report. Produce a text-based lineage visualization:
source.stripe.payments
-> stg_payments (staging, view)
-> int_payments_enriched (intermediate, table)
-> mart_revenue_daily (mart, incremental)
-> [exposed to: Looker dashboard, finance API]
Generate quality check report. Summarize test coverage and findings:
Pipeline Quality Report: [PASS/NEEDS_ATTENTION/FAIL]
Models: 45 total
Test coverage: 78% (35/45 models have tests)
Critical gaps: 3 models with zero tests (mart_revenue_daily, stg_users, int_orders)
Data contracts: 12/15 mart models have contracts
Freshness checks: 4/6 sources have freshness monitoring
ERRORS:
[DP-ERR-001] models/marts/mart_revenue_daily.sql
Non-idempotent: uses INSERT without MERGE or DELETE+INSERT pattern
[DP-ERR-002] dags/daily_etl.py
No retry policy: tasks will not retry on transient failures
WARNINGS:
[DP-WARN-001] models/staging/stg_users.sql
Zero tests: no data quality checks on user staging model
[DP-WARN-002] sources.yml
Missing freshness: stripe.payments source has no freshness check
Generate missing documentation. For undocumented models:
schema.yml entries with inferred column descriptionsProduce remediation checklist. Prioritized list of actions:
Priority 1 (errors):
[ ] Fix mart_revenue_daily to use MERGE for idempotency
[ ] Add retry policy to daily_etl DAG tasks
Priority 2 (warnings):
[ ] Add not_null and unique tests to stg_users
[ ] Add freshness check to stripe.payments source
Priority 3 (info):
[ ] Add column descriptions to 12 undocumented models
[ ] Document the weekly-reports DAG purpose and schedule
harness skill run harness-data-pipeline -- Primary command for pipeline quality auditing.harness validate -- Run after applying pipeline changes to verify project health.Glob -- Used to locate DAG files, model definitions, configuration files, and test specifications.Grep -- Used to find ref() calls, source() references, operator chains, and test definitions.Read -- Used to read pipeline definitions, SQL models, configuration files, and test results.Write -- Used to generate documentation stubs, schema.yml entries, and quality reports.Bash -- Used to run dbt ls, dbt test --dry-run, or parse DAG structures.emit_interaction -- Used to present the quality report and confirm remediation priorities.Phase 1: DETECT
Framework: dbt 1.7.4 (dbt-bigquery adapter)
Models: 52 (15 staging, 22 intermediate, 15 mart)
Sources: 3 (PostgreSQL replica, Stripe API via Fivetran, Google Sheets)
Target: BigQuery dataset `analytics`
Phase 2: ANALYZE
[DP-ERR-001] models/marts/mart_subscriptions.sql
Incremental model missing unique_key -- will create duplicates on re-run
[DP-WARN-001] 4 sources missing freshness checks
[DP-WARN-002] No retry configuration in dbt Cloud job settings
Phase 3: VALIDATE
Test coverage: 71% (37/52 models)
Critical gaps: mart_revenue (no tests), mart_subscriptions (no uniqueness test)
Primary key coverage: 80% (missing on 3 intermediate models)
Phase 4: DOCUMENT
Generated: lineage report for all 52 models
Generated: schema.yml stubs for 8 undocumented models
Quality Report: NEEDS_ATTENTION (1 error, 4 warnings)
Phase 1: DETECT
Framework: Apache Airflow 2.8.1
DAGs: 5 (s3_ingest_daily, transform_orders, aggregate_metrics, export_reports, cleanup)
Sources: S3 buckets (raw-events, partner-feeds), PostgreSQL
Sinks: Snowflake (ANALYTICS schema), S3 (processed-exports)
Phase 2: ANALYZE
[DP-ERR-001] dags/s3_ingest_daily.py
S3KeySensor has no timeout -- will block the scheduler indefinitely
[DP-ERR-002] dags/transform_orders.py
PythonOperator writes to Snowflake without transaction -- partial writes on failure
[DP-WARN-001] dags/cleanup.py
No SLA defined -- cleanup failures could go unnoticed for days
[DP-INFO-001] All DAGs use default_args but 2 override retries to 0
Phase 3: VALIDATE
DAG validation: all 5 parse without errors
Data validation tasks: present in 3/5 DAGs
Missing: no validation in s3_ingest_daily (raw data accepted without checks)
Phase 4: DOCUMENT
Generated: DAG dependency diagram
Generated: runbook for each DAG with schedule, dependencies, and failure recovery
Quality Report: FAIL (2 errors requiring immediate attention)
| Rationalization | Reality |
|---|---|
| "The pipeline failed halfway through — we'll just re-run it and it'll pick up where it left off." | A non-idempotent pipeline that is re-run from the middle writes duplicate records for the portion that succeeded before failure. The correct fix is to make the pipeline idempotent (MERGE, upsert, or delete-then-insert) so re-runs are always safe, not to assume partial re-runs are harmless. |
| "The model has no dbt tests yet, but it's only used in one dashboard — low risk." | Every untested model is a silent data quality failure waiting to reach a stakeholder. Revenue and user-facing models require test coverage regardless of how few consumers they have today. The number of consumers grows; the coverage does not add itself retroactively. |
| "We're still figuring out the schema — we'll add data contracts once the model stabilizes." | Contracts are most valuable during schema evolution, not after it. An unstable schema without a contract lets breaking changes propagate undetected to downstream consumers. Add the contract as the model is defined; update it explicitly as the schema changes. That explicitness is the value. |
| "Circular dependency detection is handled by the orchestrator — I don't need to check for it during design." | Orchestrators detect circular dependencies at runtime, after the DAG has been deployed. Static analysis during design catches them before deployment, before the pipeline fails at 3am, and before engineers have to diagnose a graph cycle under pressure. Detect them early. |
| "The freshness check is too strict — it keeps alerting because the upstream source is occasionally delayed. I'll just remove it." | A freshness check that fires too often has the wrong threshold. Removing it means stale data reaches analysts silently. Adjust the warn_after and error_after thresholds to match the source's actual SLA, and escalate if the source cannot meet its own SLA. |
{{ var('target_table') }} which resolves at runtime. Manual lineage documentation is required."stripe.payments has 15% null customer_id values. This is a source data quality issue -- coordinate with the data provider."