Help us improve
Share bugs, ideas, or general feedback.
From claude-code-toolkit
Provides data engineering patterns for ETL pipelines, data warehousing, Apache Spark, and data quality validation.
npx claudepluginhub rohitg00/awesome-claude-code-toolkitHow this skill is triggered — by the user, by Claude, or both
Slash command
/claude-code-toolkit:data-engineeringThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
```python
Designs scalable data pipelines for batch and streaming processing. Covers ETL/ELT, Lambda, Kappa, Lakehouse architectures, orchestration (Airflow/Prefect), dbt transformations, and data quality frameworks.
Designs data pipelines and ETL processes covering extraction, transformation, loading, data quality checks, orchestration, and patterns for batch, streaming, CDC, ELT. Useful for building pipelines, data flows, syncing, or moving data between systems.
Design batch and streaming data pipelines. Plan ingestion, transformation, quality checks, and failure recovery. Use when building ETL/ELT systems or data infrastructure.
Share bugs, ideas, or general feedback.
from datetime import datetime
from dataclasses import dataclass
@dataclass
class PipelineResult:
records_extracted: int
records_transformed: int
records_loaded: int
errors: list[str]
duration_seconds: float
class OrderPipeline:
def __init__(self, source_db, warehouse_db):
self.source = source_db
self.warehouse = warehouse_db
def extract(self, since: datetime) -> list[dict]:
query = """
SELECT o.*, c.name as customer_name, c.segment
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.updated_at > %s
"""
return self.source.fetch_all(query, [since])
def transform(self, records: list[dict]) -> list[dict]:
transformed = []
for record in records:
transformed.append({
"order_id": record["id"],
"customer_name": record["customer_name"],
"segment": record["segment"].upper(),
"total_amount": float(record["total"]),
"order_date": record["created_at"].date(),
"fiscal_quarter": get_fiscal_quarter(record["created_at"]),
"is_high_value": float(record["total"]) > 1000,
"loaded_at": datetime.utcnow(),
})
return transformed
def load(self, records: list[dict]) -> int:
return self.warehouse.upsert_batch(
table="fact_orders",
records=records,
conflict_keys=["order_id"],
batch_size=5000,
)
def run(self, since: datetime) -> PipelineResult:
start = datetime.utcnow()
raw = self.extract(since)
clean = self.transform(raw)
loaded = self.load(clean)
return PipelineResult(
records_extracted=len(raw),
records_transformed=len(clean),
records_loaded=loaded,
errors=[],
duration_seconds=(datetime.utcnow() - start).total_seconds(),
)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("sales-analytics") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
orders = spark.read.parquet("s3://data-lake/orders/")
customers = spark.read.parquet("s3://data-lake/customers/")
daily_revenue = (
orders
.filter(F.col("status") == "completed")
.withColumn("order_date", F.to_date("created_at"))
.groupBy("order_date", "product_category")
.agg(
F.sum("total_amount").alias("revenue"),
F.count("id").alias("order_count"),
F.avg("total_amount").alias("avg_order_value"),
)
.withColumn(
"revenue_7d_avg",
F.avg("revenue").over(
Window.partitionBy("product_category")
.orderBy("order_date")
.rowsBetween(-6, 0)
)
)
)
daily_revenue.write \
.partitionBy("order_date") \
.mode("overwrite") \
.parquet("s3://data-warehouse/daily_revenue/")
from dataclasses import dataclass
@dataclass
class QualityCheck:
name: str
query: str
threshold: float
severity: str
CHECKS = [
QualityCheck(
name="null_customer_ids",
query="SELECT COUNT(*) FROM fact_orders WHERE customer_id IS NULL",
threshold=0,
severity="critical",
),
QualityCheck(
name="negative_amounts",
query="SELECT COUNT(*) FROM fact_orders WHERE total_amount < 0",
threshold=0,
severity="critical",
),
QualityCheck(
name="duplicate_orders",
query="SELECT COUNT(*) - COUNT(DISTINCT order_id) FROM fact_orders",
threshold=0,
severity="warning",
),
QualityCheck(
name="freshness",
query="SELECT EXTRACT(EPOCH FROM NOW() - MAX(loaded_at))/3600 FROM fact_orders",
threshold=2.0,
severity="warning",
),
]
def run_quality_checks(db, checks: list[QualityCheck]) -> list[dict]:
results = []
for check in checks:
value = db.fetch_scalar(check.query)
passed = value <= check.threshold
results.append({
"name": check.name,
"value": value,
"threshold": check.threshold,
"passed": passed,
"severity": check.severity,
})
if not passed and check.severity == "critical":
raise DataQualityError(f"Critical check failed: {check.name} = {value}")
return results
CREATE TABLE dim_customers (
customer_key BIGINT PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
name VARCHAR(200),
segment VARCHAR(50),
country VARCHAR(100),
valid_from TIMESTAMP NOT NULL,
valid_to TIMESTAMP,
is_current BOOLEAN DEFAULT TRUE
);
CREATE TABLE dim_products (
product_key BIGINT PRIMARY KEY,
product_id VARCHAR(50) NOT NULL,
name VARCHAR(200),
category VARCHAR(100),
subcategory VARCHAR(100)
);
CREATE TABLE fact_orders (
order_key BIGINT PRIMARY KEY,
order_id VARCHAR(50) UNIQUE NOT NULL,
customer_key BIGINT REFERENCES dim_customers(customer_key),
product_key BIGINT REFERENCES dim_products(product_key),
order_date_key INT,
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(12,2),
loaded_at TIMESTAMP DEFAULT NOW()
);