Install
1
Install the plugin$
npx claudepluginhub majesticlabs-dev/majestic-marketplace --plugin majestic-dataWant just this skill?
Add to a custom plugin, then install with one command.
Description
Incremental data loading patterns including backfill strategies, CDC, timestamp-based loads, and pipeline orchestration.
Tool Access
This skill is limited to using the following tools:
Read Write Edit Grep Glob Bash
Skill Content
ETL Incremental Patterns
Patterns for incremental data loading and backfill operations.
Backfill Strategy
from datetime import date, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
def backfill_date_range(
start: date,
end: date,
process_fn: callable,
parallel: int = 4
) -> None:
"""Backfill data for a date range."""
dates = []
current = start
while current <= end:
dates.append(current)
current += timedelta(days=1)
# Process in parallel with controlled concurrency
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = {executor.submit(process_fn, d): d for d in dates}
for future in as_completed(futures):
d = futures[future]
try:
future.result()
print(f"Completed: {d}")
except Exception as e:
print(f"Failed: {d} - {e}")
# Usage
backfill_date_range(
start=date(2024, 1, 1),
end=date(2024, 3, 31),
process_fn=process_daily_data,
parallel=4
)
Incremental Load Patterns
Timestamp-Based Incremental
def incremental_by_timestamp(table: str, timestamp_col: str) -> pd.DataFrame:
last_run = get_last_run_timestamp(table)
query = f"""
SELECT * FROM {table}
WHERE {timestamp_col} > :last_run
ORDER BY {timestamp_col}
"""
df = pd.read_sql(query, engine, params={'last_run': last_run})
if not df.empty:
set_last_run_timestamp(table, df[timestamp_col].max())
return df
Change Data Capture (CDC)
def process_cdc_events(events: list[dict]) -> None:
for event in events:
op = event['operation'] # INSERT, UPDATE, DELETE
data = event['data']
if op == 'DELETE':
soft_delete(data['id'])
else:
upsert(data)
Full Refresh with Swap
def full_refresh_with_swap(df: pd.DataFrame, table: str) -> None:
temp_table = f"{table}_temp"
df.to_sql(temp_table, engine, if_exists='replace', index=False)
with engine.begin() as conn:
conn.execute(text(f"DROP TABLE IF EXISTS {table}_old"))
conn.execute(text(f"ALTER TABLE {table} RENAME TO {table}_old"))
conn.execute(text(f"ALTER TABLE {temp_table} RENAME TO {table}"))
conn.execute(text(f"DROP TABLE {table}_old"))
Pipeline Orchestration
from enum import Enum
from dataclasses import dataclass, field
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class PipelineStep:
name: str
func: callable
dependencies: list[str] = field(default_factory=list)
status: StepStatus = StepStatus.PENDING
error: str | None = None
class Pipeline:
def __init__(self, name: str):
self.name = name
self.steps: dict[str, PipelineStep] = {}
def add_step(self, name: str, func: callable, depends_on: list[str] = None):
self.steps[name] = PipelineStep(name, func, depends_on or [])
def run(self) -> bool:
for step in self._topological_sort():
# Skip if dependencies failed
if any(self.steps[d].status == StepStatus.FAILED for d in step.dependencies):
step.status = StepStatus.SKIPPED
continue
step.status = StepStatus.RUNNING
try:
step.func()
step.status = StepStatus.SUCCESS
except Exception as e:
step.status = StepStatus.FAILED
step.error = str(e)
return all(s.status == StepStatus.SUCCESS for s in self.steps.values())
def _topological_sort(self) -> list[PipelineStep]:
# Implementation of topological sort for dependency ordering
...
Load Strategy Decision Matrix
| Scenario | Pattern | When to Use |
|---|---|---|
| Small tables (<100K rows) | Full refresh | Daily/hourly loads |
| Large tables with timestamps | Timestamp incremental | Continuous sync |
| Source supports CDC | CDC events | Real-time updates |
| One-time historical load | Parallel backfill | Initial migration |
| Critical tables | Swap pattern | Zero-downtime refresh |
Stats
Stars30
Forks6
Last CommitFeb 15, 2026
Actions