Complete CDP implementation with workflow generation, deployment, and execution
Orchestrates complete CDP implementation with automated workflow generation, deployment, and execution across all pipeline phases.
/plugin marketplace add treasure-data/aps_claude_tools/plugin install treasure-data-cdp-orchestrator-plugins-cdp-orchestrator@treasure-data/aps_claude_toolsI'll orchestrate your complete CDP implementation with automated deployment and execution across all phases:
Phase 1: Ingestion → Generate → Deploy → Execute → Monitor → Validate ✓
Phase 2: Hist-Union → Generate → Deploy → Execute → Monitor → Validate ✓
Phase 3: Staging → Generate → Deploy → Execute → Monitor → Validate ✓
Phase 4: Unification → Generate → Deploy → Execute → Monitor → Validate ✓
Each phase MUST complete successfully before proceeding to the next phase.
This ensures data dependencies are satisfied:
TD API Credentials:
TD_API_KEY: Your Treasure Data API key (required for deployment & execution)
12345/abcdef1234567890abcdef1234567890abcdef12TD_ENDPOINT: Your TD regional endpoint
https://api.treasuredata.com (default)https://api-cdp.eu01.treasuredata.comhttps://api-cdp.treasuredata.co.jphttps://api-cdp.ap02.treasuredata.comClient Information:
mck, acme, client_name)
Data Source:
shopify, salesforce, klaviyo)rest, salesforce, bigquery)orders,customers,products)Ingestion Settings:
Mode: Choose one:
incremental - Ongoing data sync onlyhistorical - One-time historical backfill onlyboth - Separate historical and incremental workflows (recommended)Incremental Field: Field for updates (e.g., updated_at, modified_date)
Default Start Date: Initial load date (format: 2023-09-01T00:00:00.000000)
Target:
{client}_srcingestionAuthentication:
Tables to Combine:
database.table_name or table_name (uses default database)shopify_orders, shopify_customersSettings:
hist_union{client}_src){client}_src (overwrites with combined data)Transformation Settings:
presto - Presto SQL (recommended for most cases)hive - Hive SQL (for legacy compatibility)Tables to Transform:
Settings:
staging{client}_stgUnification Settings:
Unification Name: Project name (e.g., customer_360, unified_profile)
ID Method: Choose one:
persistent_id - Stable IDs across updates (recommended)canonical_id - Traditional merge approachUpdate Strategy: Choose one:
incremental - Process new/updated records only (recommended)full - Reprocess all data each timeTables for Unification:
database.table_nameacme_stg.shopify_orders, acme_stg.shopify_customersSettings:
unificationI'll launch the cdp-pipeline-orchestrator agent to execute:
1. Configuration Collection
2. Phase Execution Loop
For each phase (Ingestion → Hist-Union → Staging → Unification):
┌─────────── PHASE EXECUTION ───────────┐
│ │
│ [1] GENERATE Workflows │
│ → Invoke plugin slash command │
│ → Wait for file generation │
│ → Verify files created │
│ │
│ [2] DEPLOY Workflows │
│ → Navigate to project directory │
│ → Execute: td wf push │
│ → Parse deployment result │
│ → Auto-fix errors if possible │
│ → Retry up to 3 times │
│ │
│ [3] EXECUTE Workflows │
│ → Execute: td wf start │
│ → Capture session ID │
│ → Log start time │
│ │
│ [4] MONITOR Execution │
│ → Poll status every 30 seconds │
│ → Show real-time progress │
│ → Calculate elapsed time │
│ → Wait for completion │
│ │
│ [5] VALIDATE Output │
│ → Query TD for created tables │
│ → Verify row counts > 0 │
│ → Check schema expectations │
│ │
│ [6] PROCEED to Next Phase │
│ → Only if validation passes │
│ → Update progress tracker │
│ │
└───────────────────────────────────────┘
3. Final Report
Tool: SlashCommand
Actions:
Phase 1 → /cdp-ingestion:ingest-new
Phase 2 → /cdp-histunion:histunion-batch
Phase 3 → /cdp-staging:transform-batch
Phase 4 → /cdp-unification:unify-setup
Output: Complete workflow files (.dig, .yml, .sql)
Verification: Check files exist using Glob tool
Tool: Bash
Command:
cd {project_directory}
td -k {TD_API_KEY} -e {TD_ENDPOINT} wf push {project_name}
Success Indicators:
Error Handling:
Syntax Error:
Error: syntax error in shopify_ingest_inc.dig:15
→ Read file to identify issue
→ Fix using Edit tool
→ Retry deployment
Validation Error:
Error: database 'acme_src' not found
→ Check database exists
→ Create if needed
→ Update configuration
→ Retry deployment
Authentication Error:
Error: authentication failed
→ Verify TD_API_KEY
→ Check endpoint URL
→ Ask user to provide correct credentials
→ Retry deployment
Retry Logic: Up to 3 attempts with auto-fixes
Tool: Bash
Command:
td -k {TD_API_KEY} -e {TD_ENDPOINT} wf start {project} {workflow} --session now
Output Parsing:
session id: 123456789
attempt id: 987654321
Captured:
Tool: Bash (polling loop)
Pattern:
# Check 1 (immediately)
td -k {TD_API_KEY} -e {TD_ENDPOINT} wf session {session_id}
# Output: {"status": "running"}
# → Show: ⏳ Status: running (0:00 elapsed)
# Wait 30 seconds
sleep 30
# Check 2 (after 30s)
td -k {TD_API_KEY} -e {TD_ENDPOINT} wf session {session_id}
# Output: {"status": "running"}
# → Show: ⏳ Status: running (0:30 elapsed)
# Continue until status changes...
# Final check
td -k {TD_API_KEY} -e {TD_ENDPOINT} wf session {session_id}
# Output: {"status": "success"}
# → Show: ✓ Execution completed (15:30 elapsed)
Status Handling:
Status: running
Status: success
Status: error
td wf log {session_id}Status: killed
Maximum Wait: 2 hours (240 checks)
Tool: mcp__mcc_treasuredata__query
Validation Queries:
Ingestion Phase:
-- Check tables created
SELECT table_name, row_count
FROM information_schema.tables
WHERE database_name = '{target_database}'
AND table_name LIKE '{source_name}%'
-- Expected: {source}_orders, {source}_customers, etc.
-- Verify: row_count > 0 for each table
Hist-Union Phase:
-- Check hist-union tables
SELECT table_name, row_count
FROM information_schema.tables
WHERE database_name = '{target_database}'
AND table_name LIKE '%_hist_union'
-- Verify: row_count >= source table counts
Staging Phase:
-- Check staging tables
SELECT table_name, row_count
FROM information_schema.tables
WHERE database_name = '{target_database}'
AND table_name LIKE '%_stg_%'
-- Check for transformed columns
DESCRIBE {database}.{table_name}
Unification Phase:
-- Check unification tables
SELECT table_name, row_count
FROM information_schema.tables
WHERE database_name = '{target_database}'
AND (table_name LIKE '%_prep'
OR table_name LIKE '%unified_id%'
OR table_name LIKE '%enriched%')
-- Verify: unified_id_lookup exists
-- Verify: enriched tables have unified_id column
Validation Results:
Common Issues & Auto-Fixes:
1. YAML Syntax Error
Error: syntax error at line 15: missing colon
→ Auto-fix: Add missing colon
→ Retry: Automatic
2. Missing Database
Error: database 'acme_src' does not exist
→ Check: Query information_schema
→ Create: If user approves
→ Retry: Automatic
3. Secret Not Found
Error: secret 'shopify_api_key' not found
→ Prompt: User to upload credentials
→ Wait: For user confirmation
→ Retry: After user uploads
Retry Strategy:
Common Issues:
1. Table Not Found
Error: Table 'acme_src.shopify_orders' does not exist
Diagnosis:
- Previous phase (Ingestion) may have failed
- Table name mismatch in configuration
- Database permissions issue
Options:
1. Retry - Run workflow again
2. Check Previous Phase - Verify ingestion completed
3. Skip - Skip this phase (NOT RECOMMENDED)
4. Abort - Stop entire pipeline
Your choice:
2. Query Timeout
Error: Query exceeded timeout limit
Diagnosis:
- Data volume too large
- Query not optimized
- Warehouse too small
Options:
1. Retry - Attempt again
2. Increase Timeout - Update workflow configuration
3. Abort - Stop for investigation
Your choice:
3. Authentication Failed
Error: Authentication failed for data source
Diagnosis:
- Credentials expired
- Invalid API key
- Permissions changed
Options:
1. Update Credentials - Upload new credentials
2. Retry - Try again with existing credentials
3. Abort - Stop for manual fix
Your choice:
At each failure, I'll present:
┌─────────────────────────────────────────┐
│ ⚠ Phase X Failed │
├─────────────────────────────────────────┤
│ Workflow: {workflow_name} │
│ Session ID: {session_id} │
│ Error: {error_message} │
│ │
│ Possible Causes: │
│ 1. {cause_1} │
│ 2. {cause_2} │
│ 3. {cause_3} │
│ │
│ Options: │
│ 1. Retry - Run again with same config │
│ 2. Fix - Let me help fix the issue │
│ 3. Skip - Skip this phase (DANGEROUS) │
│ 4. Abort - Stop entire pipeline │
│ │
│ Your choice (1-4): │
└─────────────────────────────────────────┘
Choice Handling:
I'll use TodoWrite to show real-time progress:
✓ Pre-Flight: Configuration gathered
→ Phase 1: Ingestion
✓ Generate workflows
✓ Deploy workflows
→ Execute workflows (session: 123456789)
⏳ Monitor execution... (5:30 elapsed)
⏳ Validate output...
□ Phase 2: Hist-Union
□ Phase 3: Staging
□ Phase 4: Unification
□ Final: Report generation
Status Indicators:
Typical Execution Times (varies by data volume):
| Phase | Generation | Deployment | Execution | Validation | Total |
|---|---|---|---|---|---|
| Ingestion | 2-5 min | 30 sec | 15-60 min | 1 min | ~1 hour |
| Hist-Union | 1-2 min | 30 sec | 10-30 min | 1 min | ~30 min |
| Staging | 2-5 min | 30 sec | 20-45 min | 1 min | ~45 min |
| Unification | 3-5 min | 30 sec | 30-90 min | 2 min | ~1.5 hours |
| TOTAL | ~10 min | ~2 min | ~2-3 hours | ~5 min | ~3-4 hours |
Actual times depend on data volume, complexity, and TD warehouse size
Upon successful completion, you'll receive:
Ingestion (ingestion/):
{source}_ingest_inc.dig - Incremental ingestion workflow{source}_ingest_hist.dig - Historical backfill workflow (if mode=both)config/{source}_datasources.yml - Datasource configurationconfig/{source}_{object}_load.yml - Per-object load configsHist-Union (hist_union/):
{source}_hist_union.dig - Main hist-union workflowqueries/{table}_union.sql - Union SQL per tableStaging (staging/):
transform_{source}.dig - Transformation workflowqueries/{table}_transform.sql - Transform SQL per tableUnification (unification/):
unif_runner.dig - Main orchestration workflowdynmic_prep_creation.dig - Prep table creationid_unification.dig - ID unification via APIenrich_runner.dig - Enrichment workflowconfig/unify.yml - Unification configurationconfig/environment.yml - Client environmentconfig/src_prep_params.yml - Prep parametersconfig/stage_enrich.yml - Enrichment configqueries/ - 15+ SQL query filespipeline_logs/{date}/Before starting, ensure:
✅ TD Toolbelt Installed:
# Check version
td --version
# If not installed:
# macOS: brew install td
# Linux: https://toolbelt.treasuredata.com/
✅ Valid TD API Key:
✅ Network Access:
✅ Sufficient Permissions:
✅ Source System Credentials:
Ready to begin?
Please provide the following information:
TD_API_KEY: 12345/abcd...
TD_ENDPOINT: https://api.treasuredata.com
Client Name: acme
Source Name: shopify
Connector Type: rest
Objects: orders,customers,products
Mode: both
Incremental Field: updated_at
Start Date: 2023-09-01T00:00:00.000000
Target Database: acme_src
Tables: shopify_orders,shopify_customers,shopify_products
SQL Engine: presto
Tables: (will use hist-union output)
Target Database: acme_stg
Unification Name: customer_360
ID Method: persistent_id
Update Strategy: incremental
Tables: (will use staging output)
Alternatively, provide all at once in YAML format:
global:
td_api_key: "12345/abcd..."
td_endpoint: "https://api.treasuredata.com"
client: "acme"
ingestion:
source_name: "shopify"
connector: "rest"
objects: ["orders", "customers", "products"]
mode: "both"
incremental_field: "updated_at"
start_date: "2023-09-01T00:00:00.000000"
target_database: "acme_src"
hist_union:
tables: ["shopify_orders", "shopify_customers", "shopify_products"]
staging:
engine: "presto"
target_database: "acme_stg"
unification:
name: "customer_360"
id_method: "persistent_id"
update_strategy: "incremental"
I'll orchestrate the complete CDP implementation from start to finish!