Skill

etl-core-patterns

Install
1
Install the plugin
$
npx claudepluginhub majesticlabs-dev/majestic-marketplace --plugin majestic-data

Want just this skill?

Add to a custom plugin, then install with one command.

Description

Core ETL reliability patterns including idempotency, checkpointing, error handling, chunking, retry logic, and logging.

Tool Access

This skill is limited to using the following tools:

Read Write Edit Grep Glob Bash
Skill Content

ETL Core Patterns

Reliability patterns for production data pipelines.

Idempotency Patterns

# Pattern 1: Delete-then-insert (simple, works for small datasets)
def load_daily_data(date: str, df: pd.DataFrame) -> None:
    with engine.begin() as conn:
        conn.execute(
            text("DELETE FROM daily_metrics WHERE date = :date"),
            {"date": date}
        )
        df.to_sql('daily_metrics', conn, if_exists='append', index=False)

# Pattern 2: UPSERT (better for large datasets)
def upsert_records(df: pd.DataFrame) -> None:
    for batch in chunked(df.to_dict('records'), 1000):
        stmt = insert(MyTable).values(batch)
        stmt = stmt.on_conflict_do_update(
            index_elements=['id'],
            set_={col: stmt.excluded[col] for col in update_cols}
        )
        session.execute(stmt)

# Pattern 3: Source hash for change detection
def extract_with_hash(df: pd.DataFrame) -> pd.DataFrame:
    hash_cols = ['id', 'name', 'value', 'updated_at']
    df['_row_hash'] = pd.util.hash_pandas_object(df[hash_cols])
    return df

Checkpointing

import json
from pathlib import Path

class Checkpoint:
    def __init__(self, path: str):
        self.path = Path(path)
        self.state = self._load()

    def _load(self) -> dict:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return {}

    def save(self) -> None:
        self.path.write_text(json.dumps(self.state, default=str))

    def get_last_processed(self, key: str) -> str | None:
        return self.state.get(key)

    def set_last_processed(self, key: str, value: str) -> None:
        self.state[key] = value
        self.save()

# Usage
checkpoint = Checkpoint('.etl_checkpoint.json')
last_id = checkpoint.get_last_processed('users_sync')

for batch in fetch_users_since(last_id):
    process(batch)
    checkpoint.set_last_processed('users_sync', batch[-1]['id'])

Error Handling

from dataclasses import dataclass

@dataclass
class FailedRecord:
    source_id: str
    error: str
    raw_data: dict
    timestamp: datetime

class ETLProcessor:
    def __init__(self):
        self.failed_records: list[FailedRecord] = []

    def process_batch(self, records: list[dict]) -> list[dict]:
        processed = []
        for record in records:
            try:
                processed.append(self.transform(record))
            except Exception as e:
                self.failed_records.append(FailedRecord(
                    source_id=record.get('id', 'unknown'),
                    error=str(e),
                    raw_data=record,
                    timestamp=datetime.now()
                ))
        return processed

    def save_failures(self, path: str) -> None:
        if self.failed_records:
            df = pd.DataFrame([vars(r) for r in self.failed_records])
            df.to_parquet(f"{path}/failures_{datetime.now():%Y%m%d_%H%M%S}.parquet")

# Dead letter queue pattern
def process_with_dlq(records: list[dict], dlq_table: str) -> None:
    for record in records:
        try:
            process(record)
        except Exception as e:
            save_to_dlq(dlq_table, record, str(e))

Chunked Processing

from typing import Iterator, TypeVar

T = TypeVar('T')

def chunked(iterable: Iterator[T], size: int) -> Iterator[list[T]]:
    """Yield successive chunks from iterable."""
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch

# Memory-efficient file processing
def process_large_csv(path: str, chunk_size: int = 50_000) -> None:
    for i, chunk in enumerate(pd.read_csv(path, chunksize=chunk_size)):
        print(f"Processing chunk {i}: {len(chunk)} rows")
        transformed = transform(chunk)
        load(transformed, mode='append')
        del chunk, transformed  # Explicit memory cleanup
        gc.collect()

Retry Logic

import time
from functools import wraps

def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """Decorator for retrying failed operations."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            current_delay = delay

            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        print(f"Attempt {attempt + 1} failed: {e}. Retrying in {current_delay}s")
                        time.sleep(current_delay)
                        current_delay *= backoff

            raise last_exception
        return wrapper
    return decorator

@retry(max_attempts=3, delay=1.0, backoff=2.0)
def fetch_from_api(url: str) -> dict:
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.json()

Logging Best Practices

import structlog

logger = structlog.get_logger()

def process_with_logging(batch_id: str, records: list[dict]) -> None:
    log = logger.bind(batch_id=batch_id, record_count=len(records))

    log.info("batch_started")

    try:
        result = process(records)
        log.info("batch_completed",
                 processed=result.processed_count,
                 failed=result.failed_count)
    except Exception as e:
        log.error("batch_failed", error=str(e))
        raise

Data Migration Scripts

Create production-safe migration scripts with rollback and validation.

Migration Template (PostgreSQL)

-- Migration: 20240115_add_customer_tier
-- Description: Add column and backfill from order history

-- PRE-MIGRATION CHECKS
DO $$
BEGIN
    IF (SELECT COUNT(*) FROM customers) = 0 THEN
        RAISE EXCEPTION 'No customers found - aborting migration';
    END IF;
END $$;

CREATE TEMP TABLE migration_baseline AS
SELECT COUNT(*) as total_customers, NOW() as snapshot_time
FROM customers;

-- FORWARD MIGRATION
BEGIN;

ALTER TABLE customers ADD COLUMN IF NOT EXISTS tier VARCHAR(20);

-- Batch backfill
DO $$
DECLARE
    batch_size INT := 10000;
    affected INT;
BEGIN
    LOOP
        WITH batch AS (
            SELECT id FROM customers
            WHERE tier IS NULL
            LIMIT batch_size
            FOR UPDATE SKIP LOCKED
        )
        UPDATE customers c
        SET tier = CASE
            WHEN total_orders >= 100 THEN 'platinum'
            WHEN total_orders >= 50 THEN 'gold'
            WHEN total_orders >= 10 THEN 'silver'
            ELSE 'bronze'
        END
        FROM (
            SELECT customer_id, COUNT(*) as total_orders
            FROM orders GROUP BY customer_id
        ) o
        WHERE c.id = o.customer_id AND c.id IN (SELECT id FROM batch);

        GET DIAGNOSTICS affected = ROW_COUNT;
        EXIT WHEN affected = 0;
        COMMIT;
        BEGIN;
    END LOOP;
END $$;

ALTER TABLE customers ALTER COLUMN tier SET DEFAULT 'bronze';
ALTER TABLE customers ADD CONSTRAINT chk_tier
    CHECK (tier IN ('bronze', 'silver', 'gold', 'platinum'));
COMMIT;

-- POST-MIGRATION VALIDATION
DO $$
DECLARE
    before_count INT;
    after_count INT;
BEGIN
    SELECT total_customers INTO before_count FROM migration_baseline;
    SELECT COUNT(*) INTO after_count FROM customers;
    IF before_count != after_count THEN
        RAISE EXCEPTION 'Row count mismatch: before=%, after=%', before_count, after_count;
    END IF;
END $$;

Rollback Script

BEGIN;
ALTER TABLE customers DROP CONSTRAINT IF EXISTS chk_tier;
ALTER TABLE customers ALTER COLUMN tier DROP DEFAULT;
ALTER TABLE customers DROP COLUMN IF EXISTS tier;
COMMIT;

Python Migration Framework

@dataclass
class Migration:
    id: str
    description: str
    up: Callable
    down: Callable
    validate: Callable

class MigrationRunner:
    def __init__(self, engine):
        self.engine = engine

    def run(self, migration: Migration, dry_run: bool = False) -> bool:
        with self.engine.begin() as conn:
            baseline = self._capture_baseline(conn)
            if dry_run:
                return True
            try:
                migration.up(conn)
                if not migration.validate(conn, baseline):
                    raise ValueError("Validation failed")
                return True
            except Exception as e:
                migration.down(conn)
                raise

Safe Migration Patterns

-- Adding a column: nullable first, backfill, then constraint
ALTER TABLE t ADD COLUMN new_col TYPE;
UPDATE t SET new_col = compute_value();
ALTER TABLE t ALTER COLUMN new_col SET NOT NULL;

-- Renaming a column: add new, copy, drop old
ALTER TABLE t ADD COLUMN new_name TYPE;
UPDATE t SET new_name = old_name;
ALTER TABLE t DROP COLUMN old_name;

-- Changing column type: add new, migrate, swap
ALTER TABLE t ADD COLUMN col_new NEWTYPE;
UPDATE t SET col_new = col::NEWTYPE;
ALTER TABLE t DROP COLUMN col;
ALTER TABLE t RENAME COLUMN col_new TO col;

-- Large table batch updates
DO $$
DECLARE
    batch_size INT := 10000;
    total_updated INT := 0;
BEGIN
    LOOP
        WITH batch AS (
            SELECT id FROM large_table
            WHERE needs_update = true
            LIMIT batch_size
        )
        UPDATE large_table SET ...
        WHERE id IN (SELECT id FROM batch);
        GET DIAGNOSTICS rows_affected = ROW_COUNT;
        total_updated := total_updated + rows_affected;
        EXIT WHEN rows_affected = 0;
        PERFORM pg_sleep(0.1);
    END LOOP;
END $$;

Migration Safety Checklist

Before: tested on staging, rollback tested, backup taken, maintenance window scheduled. After: validation queries passed, application health checked, performance normal.

Stats
Stars30
Forks6
Last CommitMar 15, 2026
Actions

Similar Skills