Create hist-union workflows for multiple tables in batch with parallel processing
Creates batch hist-union workflows for multiple tables with parallel processing and schema validation.
/plugin marketplace add treasure-data/aps_claude_tools/plugin install treasure-data-cdp-histunion-plugins-cdp-histunion@treasure-data/aps_claude_toolsI'll help you create hist-union workflows for multiple tables at once, with proper schema validation for each table.
First, let me gather the required information using a quick questionnaire:
<question_tool> Use the AskUserQuestion tool to gather:
Questions to ask:
Question 1: "How would you like to provide the list of tables?"
Question 2: "What is your lookup/config database for the inc_log watermark table?"
After gathering this information:
Option A - Base names (comma-separated):
client_src.klaviyo_events, client_src.shopify_products, client_src.onetrust_profiles
Option B - Hist names (one per line):
client_src.klaviyo_events_hist
client_src.shopify_products_hist
client_src.onetrust_profiles_hist
Option C - Mixed formats:
client_src.klaviyo_events, client_src.shopify_products_hist, client_src.onetrust_profiles
Option D - List format:
- client_src.klaviyo_events
- client_src.shopify_products
- client_src.onetrust_profiles
I will parse and normalize all table names:
For each table in list:
1. Extract database and base name
2. Remove _hist or _histunion suffix if present
3. Derive:
- Inc table: {database}.{base_name}
- Hist table: {database}.{base_name}_hist
- Target table: {database}.{base_name}_histunion
CRITICAL: I will get exact schemas for EVERY table:
For each table:
1. Call mcp__mcc_treasuredata__describe_table for inc table
- Get complete column list
- Get exact column order
- Get data types
2. Call mcp__mcc_treasuredata__describe_table for hist table
- Get complete column list
- Get exact column order
- Get data types
3. Compare schemas:
- Document column differences
- Note any extra columns in inc vs hist
- Record exact column order
Note: This may require multiple MCP calls. I'll process them efficiently.
I will check each table against full load list:
For each table:
IF table_name IN ('klaviyo_lists', 'klaviyo_metric_data'):
template[table] = 'FULL_LOAD' # Case 3
ELSE:
IF inc_schema == hist_schema:
template[table] = 'IDENTICAL' # Case 1
ELSE:
template[table] = 'EXTRA_COLUMNS' # Case 2
I will create SQL file for each table in ONE response:
For each table, create: hist_union/queries/{base_name}.sql
With correct template based on schema analysis:
- Case 1: Identical schemas
- Case 2: Inc has extra columns
- Case 3: Full load
All files created in parallel using multiple Write tool calls
I will update workflow with all tables:
File: hist_union/hist_union_runner.dig
Structure:
+hist_union_tasks:
_parallel: true
+{table1_name}_histunion:
td>: queries/{table1_name}.sql
+{table2_name}_histunion:
td>: queries/{table2_name}.sql
+{table3_name}_histunion:
td>: queries/{table3_name}.sql
... (all tables)
Before delivering, I will verify for EACH table:
For each table:
✅ MCP tool used for both inc and hist schemas
✅ Schema differences identified
✅ Correct template selected
✅ All inc columns present in exact order
✅ NULL handling correct for missing columns
✅ Watermarks included for both hist and inc
✅ Parallel execution configured
1. Collect all table names first
2. Make MCP calls for all inc tables
3. Make MCP calls for all hist tables
4. Compare all schemas in batch
5. Generate all SQL files in ONE response
6. Update workflow once with all tasks
I will use multiple Write tool calls in a SINGLE response:
Single Response Contains:
- Write: hist_union/queries/table1.sql
- Write: hist_union/queries/table2.sql
- Write: hist_union/queries/table3.sql
- ... (all tables)
- Edit: hist_union/hist_union_runner.dig (add all tasks)
I will generate:
timezone: UTC
_export:
td:
database: {database}
lkup_db: {lkup_db}
+create_inc_log_table:
td>:
query: |
CREATE TABLE IF NOT EXISTS ${lkup_db}.inc_log (
table_name varchar,
project_name varchar,
inc_value bigint
)
+hist_union_tasks:
_parallel: true
+table1_histunion:
td>: queries/table1.sql
+table2_histunion:
td>: queries/table2.sql
+table3_histunion:
td>: queries/table3.sql
# ... all tables processed in parallel
During processing, I will report:
Parsing table names...
✅ Found 5 tables to process:
1. client_src.klaviyo_events
2. client_src.shopify_products
3. client_src.onetrust_profiles
4. client_src.klaviyo_lists (FULL LOAD)
5. client_src.users
Retrieving schemas via MCP tool...
✅ Got schema for client_src.klaviyo_events (inc)
✅ Got schema for client_src.klaviyo_events_hist (hist)
✅ Got schema for client_src.shopify_products (inc)
✅ Got schema for client_src.shopify_products_hist (hist)
... (all tables)
Analyzing schemas...
✅ Table 1: Identical schemas - Use Case 1
✅ Table 2: Inc has extra 'incremental_date' - Use Case 2
✅ Table 3: Identical schemas - Use Case 1
✅ Table 4: FULL LOAD - Use Case 3
✅ Table 5: Identical schemas - Use Case 1
Generating all files...
✅ Created hist_union/queries/klaviyo_events.sql
✅ Created hist_union/queries/shopify_products.sql
✅ Created hist_union/queries/onetrust_profiles.sql
✅ Created hist_union/queries/klaviyo_lists.sql (FULL LOAD)
✅ Created hist_union/queries/users.sql
✅ Updated hist_union/hist_union_runner.dig with 5 parallel tasks
If tables are from different databases:
✅ Supported - Each SQL file uses correct database
✅ Workflow uses primary database in _export
✅ Individual tasks can override if needed
✅ Automatically detected (klaviyo_lists, klaviyo_metric_data)
✅ Uses Case 3 template (DROP + CREATE, no WHERE)
✅ Still updates watermarks
✅ Processed in parallel with other tables
✅ Each table analyzed independently
✅ NULL handling applied only where needed
✅ Exact column order maintained per table
✅ Template selection per table based on schema
Sequential Processing:
Table 1: 10 min
Table 2: 10 min
Table 3: 10 min
Total: 30 minutes
Parallel Processing:
All tables: ~10 minutes (depending on slowest table)
Review All Generated Files:
ls -la hist_union/queries/
cat hist_union/hist_union_runner.dig
Verify Workflow Syntax:
cd hist_union
td wf check hist_union_runner.dig
Run Batch Workflow:
td wf run hist_union_runner.dig
Monitor Progress:
td wf logs hist_union_runner.dig
Verify All Results:
-- Check watermarks for all tables
SELECT * FROM {lkup_db}.inc_log
WHERE project_name = 'hist_union'
ORDER BY table_name;
-- Check row counts for all histunion tables
SELECT
'{table1}_histunion' as table_name,
COUNT(*) as row_count
FROM {database}.{table1}_histunion
UNION ALL
SELECT
'{table2}_histunion',
COUNT(*)
FROM {database}.{table2}_histunion
-- ... (for all tables)
Create hist-union for these tables:
- client_src.klaviyo_events
- client_src.shopify_products_hist
- client_src.onetrust_profiles
- client_src.klaviyo_lists
✅ Processed 4 tables:
1. klaviyo_events (Incremental - Case 1: Identical schemas)
- Inc: client_src.klaviyo_events
- Hist: client_src.klaviyo_events_hist
- Target: client_src.klaviyo_events_histunion
2. shopify_products (Incremental - Case 2: Inc has extra columns)
- Inc: client_src.shopify_products
- Hist: client_src.shopify_products_hist
- Target: client_src.shopify_products_histunion
- Extra columns in inc: incremental_date
3. onetrust_profiles (Incremental - Case 1: Identical schemas)
- Inc: client_src.onetrust_profiles
- Hist: client_src.onetrust_profiles_hist
- Target: client_src.onetrust_profiles_histunion
4. klaviyo_lists (FULL LOAD - Case 3)
- Inc: client_src.klaviyo_lists
- Hist: client_src.klaviyo_lists_hist
- Target: client_src.klaviyo_lists_histunion
Created 4 SQL files + 1 workflow file
All tasks configured for parallel execution
All generated code will:
Ready to proceed? Please provide your list of tables and I'll generate complete hist-union workflows for all of them using exact schemas from MCP tool and production-tested templates.