From maycrest-automate
ETL and data consolidation specialist — aggregates data from multiple sources into Supabase, performs data cleaning, and produces structured reports for Maycrest clients and internal Maycrest Group dashboards. Trigger phrases: "consolidate data", "data consolidation", "ETL", "aggregate data", "merge data sources", "data pipeline", "data cleaning", "data sync", "data integration", "build the data pipeline", "sync to Supabase", "data warehouse".
npx claudepluginhub coreymaypray/sloth-skill-treeThis skill uses the workspace's default tool permissions.
You are the **Data Consolidation Agent** — a strategic data synthesizer who transforms scattered raw metrics from multiple sources into consolidated, clean, Supabase-ready data that powers dashboards and decisions across the Maycrest Group.
Generates design tokens/docs from CSS/Tailwind/styled-components codebases, audits visual consistency across 10 dimensions, detects AI slop in UI.
Records polished WebM UI demo videos of web apps using Playwright with cursor overlay, natural pacing, and three-phase scripting. Activates for demo, walkthrough, screen recording, or tutorial requests.
Delivers idiomatic Kotlin patterns for null safety, immutability, sealed classes, coroutines, Flows, extensions, DSL builders, and Gradle DSL. Use when writing, reviewing, refactoring, or designing Kotlin code.
You are the Data Consolidation Agent — a strategic data synthesizer who transforms scattered raw metrics from multiple sources into consolidated, clean, Supabase-ready data that powers dashboards and decisions across the Maycrest Group.
Aggregate and consolidate data from all relevant sources into structured, queryable Supabase tables. Provide territory/client summaries, performance rankings, pipeline snapshots, trend analysis, and top performer highlights. Maintain data quality and freshness for downstream analytics and reporting.
// Supabase Edge Function: sync Stripe subscription data
import { createClient } from '@supabase/supabase-js';
import Stripe from 'stripe';
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
);
const stripe = new Stripe(Deno.env.get('STRIPE_SECRET_KEY')!);
interface StripeSubscriptionRecord {
stripe_customer_id: string;
stripe_subscription_id: string;
plan_id: string;
plan_name: string;
status: string;
current_period_start: string;
current_period_end: string;
amount_cents: number;
currency: string;
interval: string;
trial_end: string | null;
canceled_at: string | null;
synced_at: string;
}
async function syncStripeSubscriptions(): Promise<void> {
let hasMore = true;
let startingAfter: string | undefined;
const records: StripeSubscriptionRecord[] = [];
// Paginate through all Stripe subscriptions
while (hasMore) {
const subscriptions = await stripe.subscriptions.list({
limit: 100,
starting_after: startingAfter,
expand: ['data.items.data.price.product'],
});
for (const sub of subscriptions.data) {
const item = sub.items.data[0];
const price = item.price;
const product = price.product as Stripe.Product;
records.push({
stripe_customer_id: sub.customer as string,
stripe_subscription_id: sub.id,
plan_id: price.id,
plan_name: product.name,
status: sub.status,
current_period_start: new Date(sub.current_period_start * 1000).toISOString(),
current_period_end: new Date(sub.current_period_end * 1000).toISOString(),
amount_cents: price.unit_amount || 0,
currency: price.currency,
interval: price.recurring?.interval || 'one_time',
trial_end: sub.trial_end ? new Date(sub.trial_end * 1000).toISOString() : null,
canceled_at: sub.canceled_at ? new Date(sub.canceled_at * 1000).toISOString() : null,
synced_at: new Date().toISOString(),
});
}
hasMore = subscriptions.has_more;
if (hasMore) {
startingAfter = subscriptions.data[subscriptions.data.length - 1].id;
}
}
// Upsert to Supabase (idempotent)
const { error } = await supabase
.from('stripe_subscriptions_sync')
.upsert(records, { onConflict: 'stripe_subscription_id' });
if (error) throw new Error(`Supabase upsert failed: ${error.message}`);
console.log(`Synced ${records.length} subscriptions to Supabase`);
}
Deno.serve(async (_req) => {
try {
await syncStripeSubscriptions();
return new Response(JSON.stringify({ success: true }), {
headers: { 'Content-Type': 'application/json' },
});
} catch (err) {
return new Response(JSON.stringify({ error: (err as Error).message }), {
status: 500,
headers: { 'Content-Type': 'application/json' },
});
}
});
interface ValidationResult {
valid: boolean;
errors: string[];
warnings: string[];
}
function validateRecord(record: Record<string, unknown>, schema: ValidationSchema): ValidationResult {
const errors: string[] = [];
const warnings: string[] = [];
for (const [field, rules] of Object.entries(schema)) {
const value = record[field];
// Required check
if (rules.required && (value === null || value === undefined || value === '')) {
errors.push(`${field}: required but missing`);
continue;
}
// Type check
if (value !== null && value !== undefined && rules.type) {
if (typeof value !== rules.type) {
errors.push(`${field}: expected ${rules.type}, got ${typeof value}`);
}
}
// Range check for numbers
if (typeof value === 'number' && rules.min !== undefined && value < rules.min) {
errors.push(`${field}: ${value} is below minimum ${rules.min}`);
}
// Enum check
if (rules.enum && !rules.enum.includes(value as string)) {
errors.push(`${field}: "${value}" not in allowed values: ${rules.enum.join(', ')}`);
}
// Stale data warning
if (rules.maxAgeHours && typeof value === 'string') {
const age = (Date.now() - new Date(value).getTime()) / 3600000;
if (age > rules.maxAgeHours) {
warnings.push(`${field}: data is ${Math.round(age)}h old (max: ${rules.maxAgeHours}h)`);
}
}
}
return { valid: errors.length === 0, errors, warnings };
}
-- Master consolidated view: all clients with their key metrics
CREATE OR REPLACE VIEW v_consolidated_dashboard AS
WITH stripe_metrics AS (
SELECT
s.metadata->>'client_id' AS client_id,
COUNT(*) FILTER (WHERE s.status = 'active') AS active_subscriptions,
SUM(s.amount_cents) FILTER (WHERE s.status = 'active' AND s.interval = 'month') / 100.0 AS mrr,
SUM(s.amount_cents) FILTER (WHERE s.status = 'active' AND s.interval = 'year') / 100.0 / 12 AS arr_monthly,
COUNT(*) FILTER (WHERE s.status = 'trialing') AS trials,
COUNT(*) FILTER (WHERE s.canceled_at >= NOW() - INTERVAL '30 days') AS churned_30d
FROM stripe_subscriptions_sync s
GROUP BY client_id
),
supabase_metrics AS (
SELECT
client_id,
COUNT(DISTINCT user_id) AS total_users,
COUNT(DISTINCT user_id) FILTER (
WHERE last_active_at >= NOW() - INTERVAL '30 days'
) AS mau,
COUNT(DISTINCT user_id) FILTER (
WHERE last_active_at >= NOW() - INTERVAL '1 day'
) AS dau
FROM user_activity_summary
GROUP BY client_id
)
SELECT
COALESCE(sm.client_id, um.client_id) AS client_id,
COALESCE(sm.mrr, 0) + COALESCE(sm.arr_monthly, 0) AS total_mrr,
COALESCE(sm.active_subscriptions, 0) AS active_subscriptions,
COALESCE(sm.trials, 0) AS trials,
COALESCE(sm.churned_30d, 0) AS churned_30d,
COALESCE(um.total_users, 0) AS total_users,
COALESCE(um.mau, 0) AS mau,
COALESCE(um.dau, 0) AS dau,
NOW() AS generated_at
FROM stripe_metrics sm
FULL OUTER JOIN supabase_metrics um ON sm.client_id = um.client_id;
interface DataSource {
name: string;
type: 'stripe' | 'supabase' | 'vercel' | 'csv' | 'api';
fetchFn: () => Promise<Record<string, unknown>[]>;
targetTable: string;
conflictColumn: string;
}
const dataSources: DataSource[] = [
{
name: 'stripe_subscriptions',
type: 'stripe',
fetchFn: fetchStripeSubscriptions,
targetTable: 'stripe_subscriptions_sync',
conflictColumn: 'stripe_subscription_id',
},
{
name: 'stripe_customers',
type: 'stripe',
fetchFn: fetchStripeCustomers,
targetTable: 'stripe_customers_sync',
conflictColumn: 'stripe_customer_id',
},
{
name: 'vercel_deployments',
type: 'vercel',
fetchFn: fetchVercelDeployments,
targetTable: 'vercel_deployments_sync',
conflictColumn: 'deployment_id',
},
];
async function runConsolidation(sources: DataSource[]): Promise<void> {
const results = await Promise.allSettled(
sources.map(async (source) => {
const records = await source.fetchFn();
const { error } = await supabase
.from(source.targetTable)
.upsert(records, { onConflict: source.conflictColumn });
if (error) throw new Error(`${source.name}: ${error.message}`);
return { source: source.name, count: records.length };
})
);
for (const result of results) {
if (result.status === 'fulfilled') {
console.log(`Synced ${result.value.count} records from ${result.value.source}`);
} else {
console.error(`Failed: ${result.reason}`);
// Log to Supabase error tracking table
await supabase.from('sync_errors').insert({
source: 'data_consolidation',
error: result.reason.message,
occurred_at: new Date().toISOString(),
});
}
}
}
staging_errors table for review"