From claude-code-toolkit
Provides Python ETL pipeline patterns with extract-transform-load and PySpark jobs for processing S3 data into analytics tables.
npx claudepluginhub rohitg00/awesome-claude-code-toolkitThis skill uses the workspace's default tool permissions.
```python
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides MCP server integration in Claude Code plugins via .mcp.json or plugin.json configs for stdio, SSE, HTTP types, enabling external services as tools.
from datetime import datetime
from dataclasses import dataclass
@dataclass
class PipelineResult:
records_extracted: int
records_transformed: int
records_loaded: int
errors: list[str]
duration_seconds: float
class OrderPipeline:
def __init__(self, source_db, warehouse_db):
self.source = source_db
self.warehouse = warehouse_db
def extract(self, since: datetime) -> list[dict]:
query = """
SELECT o.*, c.name as customer_name, c.segment
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.updated_at > %s
"""
return self.source.fetch_all(query, [since])
def transform(self, records: list[dict]) -> list[dict]:
transformed = []
for record in records:
transformed.append({
"order_id": record["id"],
"customer_name": record["customer_name"],
"segment": record["segment"].upper(),
"total_amount": float(record["total"]),
"order_date": record["created_at"].date(),
"fiscal_quarter": get_fiscal_quarter(record["created_at"]),
"is_high_value": float(record["total"]) > 1000,
"loaded_at": datetime.utcnow(),
})
return transformed
def load(self, records: list[dict]) -> int:
return self.warehouse.upsert_batch(
table="fact_orders",
records=records,
conflict_keys=["order_id"],
batch_size=5000,
)
def run(self, since: datetime) -> PipelineResult:
start = datetime.utcnow()
raw = self.extract(since)
clean = self.transform(raw)
loaded = self.load(clean)
return PipelineResult(
records_extracted=len(raw),
records_transformed=len(clean),
records_loaded=loaded,
errors=[],
duration_seconds=(datetime.utcnow() - start).total_seconds(),
)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("sales-analytics") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
orders = spark.read.parquet("s3://data-lake/orders/")
customers = spark.read.parquet("s3://data-lake/customers/")
daily_revenue = (
orders
.filter(F.col("status") == "completed")
.withColumn("order_date", F.to_date("created_at"))
.groupBy("order_date", "product_category")
.agg(
F.sum("total_amount").alias("revenue"),
F.count("id").alias("order_count"),
F.avg("total_amount").alias("avg_order_value"),
)
.withColumn(
"revenue_7d_avg",
F.avg("revenue").over(
Window.partitionBy("product_category")
.orderBy("order_date")
.rowsBetween(-6, 0)
)
)
)
daily_revenue.write \
.partitionBy("order_date") \
.mode("overwrite") \
.parquet("s3://data-warehouse/daily_revenue/")
from dataclasses import dataclass
@dataclass
class QualityCheck:
name: str
query: str
threshold: float
severity: str
CHECKS = [
QualityCheck(
name="null_customer_ids",
query="SELECT COUNT(*) FROM fact_orders WHERE customer_id IS NULL",
threshold=0,
severity="critical",
),
QualityCheck(
name="negative_amounts",
query="SELECT COUNT(*) FROM fact_orders WHERE total_amount < 0",
threshold=0,
severity="critical",
),
QualityCheck(
name="duplicate_orders",
query="SELECT COUNT(*) - COUNT(DISTINCT order_id) FROM fact_orders",
threshold=0,
severity="warning",
),
QualityCheck(
name="freshness",
query="SELECT EXTRACT(EPOCH FROM NOW() - MAX(loaded_at))/3600 FROM fact_orders",
threshold=2.0,
severity="warning",
),
]
def run_quality_checks(db, checks: list[QualityCheck]) -> list[dict]:
results = []
for check in checks:
value = db.fetch_scalar(check.query)
passed = value <= check.threshold
results.append({
"name": check.name,
"value": value,
"threshold": check.threshold,
"passed": passed,
"severity": check.severity,
})
if not passed and check.severity == "critical":
raise DataQualityError(f"Critical check failed: {check.name} = {value}")
return results
CREATE TABLE dim_customers (
customer_key BIGINT PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
name VARCHAR(200),
segment VARCHAR(50),
country VARCHAR(100),
valid_from TIMESTAMP NOT NULL,
valid_to TIMESTAMP,
is_current BOOLEAN DEFAULT TRUE
);
CREATE TABLE dim_products (
product_key BIGINT PRIMARY KEY,
product_id VARCHAR(50) NOT NULL,
name VARCHAR(200),
category VARCHAR(100),
subcategory VARCHAR(100)
);
CREATE TABLE fact_orders (
order_key BIGINT PRIMARY KEY,
order_id VARCHAR(50) UNIQUE NOT NULL,
customer_key BIGINT REFERENCES dim_customers(customer_key),
product_key BIGINT REFERENCES dim_products(product_key),
order_date_key INT,
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(12,2),
loaded_at TIMESTAMP DEFAULT NOW()
);