From majestic-data
Orchestrates production ETL patterns by routing to reliability features like idempotency, checkpointing, retries and incremental strategies like timestamp loads, CDC, backfills.
npx claudepluginhub majesticlabs-dev/majestic-marketplace --plugin majestic-dataThis skill is limited to using the following tools:
Orchestrator for production-grade Extract-Transform-Load patterns.
Implements core ETL reliability patterns in Python: idempotency (delete-insert, UPSERT, hashing), checkpointing, error handling with failed records, chunking, retries, logging. For robust data pipelines.
Guides ETL vs ELT choices for data pipelines with comparisons, modern stacks including dbt, transformation patterns, and data quality handling. Use for pipeline design.
Designs data pipelines and ETL processes covering extraction, transformation, loading, data quality checks, orchestration, and patterns for batch, streaming, CDC, ELT. Useful for building pipelines, data flows, syncing, or moving data between systems.
Share bugs, ideas, or general feedback.
Orchestrator for production-grade Extract-Transform-Load patterns.
| Need | Skill | Content |
|---|---|---|
| Reliability patterns | etl-core-patterns | Idempotency, checkpointing, error handling, chunking, retry, logging |
| Load strategies | etl-incremental-patterns | Backfill, timestamp-based, CDC, pipeline orchestration |
| Need | Pattern | Skill |
|---|---|---|
| Repeatable runs | Idempotency | etl-core-patterns |
| Resume after failure | Checkpointing | etl-core-patterns |
| Handle bad records | Error handling + DLQ | etl-core-patterns |
| Memory management | Chunked processing | etl-core-patterns |
| Network resilience | Retry with backoff | etl-core-patterns |
| Observability | Structured logging | etl-core-patterns |
| Scenario | Pattern | Skill |
|---|---|---|
| Small tables (<100K) | Full refresh | etl-incremental-patterns |
| Large tables | Timestamp incremental | etl-incremental-patterns |
| Real-time sync | CDC events | etl-incremental-patterns |
| Historical migration | Parallel backfill | etl-incremental-patterns |
| Zero-downtime refresh | Swap pattern | etl-incremental-patterns |
| Multi-step pipelines | Pipeline orchestration | etl-incremental-patterns |
# Small datasets: Delete-then-insert
# Large datasets: UPSERT on conflict
# Change detection: Row hash comparison
Is table < 100K rows?
→ Full refresh
Has reliable timestamp column?
→ Timestamp incremental
Source supports CDC?
→ CDC event processing
Need zero downtime?
→ Swap pattern (temp table → rename)
One-time historical load?
→ Parallel backfill with date ranges
# 1. Setup
checkpoint = Checkpoint('.etl_checkpoint.json')
processor = ETLProcessor()
# 2. Extract (with incremental)
df = incremental_by_timestamp(source_table, 'updated_at')
# 3. Transform (with error handling)
transformed = processor.process_batch(df.to_dict('records'))
# 4. Load (with idempotency)
upsert_records(pd.DataFrame(transformed))
# 5. Checkpoint
checkpoint.set_last_processed('sync', df['updated_at'].max())
# 6. Handle failures
processor.save_failures('failures/')
data-validation - Validate data quality during ETLdata-quality - Monitor data quality metricspandas-coder - DataFrame transformations