Execute Snowflake ID unification workflow with convergence detection and monitoring
Executes Snowflake ID unification workflow with intelligent convergence detection and real-time monitoring
/plugin marketplace add treasure-data/aps_claude_tools/plugin install treasure-data-cdp-hybrid-idu-plugins-cdp-hybrid-idu@treasure-data/aps_claude_toolsExecute your generated Snowflake SQL workflow with intelligent convergence detection, real-time monitoring, and interactive error handling. This command orchestrates the complete unification process from graph creation to master table generation.
snowflake_sql/unify/)myaccount from myaccount.snowflakecomputing.com)COMPUTE_WH)Option 1: Password
SNOWFLAKE_PASSWORD via environment file (.env) SNOWFLAKE_PASSWORDOption 2: SSO (externalbrowser)
Option 3: Key-Pair
SNOWFLAKE_PRIVATE_KEY_PATHSNOWFLAKE_PRIVATE_KEY_PASSPHRASEDisplay execution plan with:
I'll call the snowflake-workflow-executor agent to:
Intelligent Loop Execution:
Iteration 1:
✓ Execute unify SQL
• Check convergence: 1500 records updated
→ Continue to iteration 2
Iteration 2:
✓ Execute unify SQL
• Check convergence: 450 records updated
→ Continue to iteration 3
Iteration 3:
✓ Execute unify SQL
• Check convergence: 0 records updated
✓ CONVERGED! Stop loop
Features:
Provide:
/cdp-hybrid-idu:hybrid-execute-snowflake
I'll prompt you for:
- SQL directory path
- Snowflake account name
- Username
- Database and schema
- Warehouse name
- Authentication method
Provide all parameters upfront:
SQL directory: snowflake_sql/unify/
Account: myaccount
User: myuser
Database: my_database
Schema: my_schema
Warehouse: COMPUTE_WH
Password: (will prompt if not in environment)
Algorithm:
SELECT COUNT(*) as updated_count FROM (
SELECT leader_ns, leader_id, follower_ns, follower_id
FROM current_iteration
EXCEPT
SELECT leader_ns, leader_id, follower_ns, follower_id
FROM previous_iteration
) diff
Stops when: updated_count = 0
If an error occurs:
✗ File: 04_unify_loop_iteration_01.sql
Error: Table not found: source_table
Continue with remaining files? (y/n):
You can choose to:
Track progress with:
After convergence, creates:
CREATE OR REPLACE TABLE database.schema.unified_id_graph_unify_loop_final
AS SELECT * FROM database.schema.unified_id_graph_unify_loop_3
This allows downstream SQL to reference loop_final regardless of actual iteration count.
The agent executes:
python3 scripts/snowflake/snowflake_sql_executor.py \
snowflake_sql/unify/ \
--account myaccount \
--user myuser \
--database my_database \
--schema my_schema \
--warehouse COMPUTE_WH
Setup Phase (01-03):
Unification Loop (04):
Canonicalization (05):
Statistics (06):
Enrichment (10-19):
Master Tables (20-29):
Metadata (30-39):
SQL directory: snowflake_sql/unify/
Account: myorg-myaccount
User: analytics_user
Database: customer_data
Schema: id_unification
Warehouse: LARGE_WH
✓ Connected to Snowflake: myorg-myaccount
• Using database: customer_data, schema: id_unification
Starting Snowflake SQL Execution
• Database: customer_data
• Schema: id_unification
Executing: 01_create_graph.sql
✓ 01_create_graph.sql: Executed successfully
Executing: 02_extract_merge.sql
✓ 02_extract_merge.sql: Executed successfully
• Rows affected: 125000
Executing: 03_source_key_stats.sql
✓ 03_source_key_stats.sql: Executed successfully
Executing Unify Loop Before Canonicalization
--- Iteration 1 ---
✓ Iteration 1 completed
• Rows processed: 125000
• Updated records: 1500
--- Iteration 2 ---
✓ Iteration 2 completed
• Rows processed: 125000
• Updated records: 450
--- Iteration 3 ---
✓ Iteration 3 completed
• Rows processed: 125000
• Updated records: 0
✓ Loop converged after 3 iterations
• Creating alias table for final iteration
✓ Alias table 'unified_id_graph_unify_loop_final' created
Executing: 05_canonicalize.sql
✓ 05_canonicalize.sql: Executed successfully
[... continues with enrichment, master tables, metadata ...]
Execution Complete
• Files processed: 18/18
• Final unified_id_lookup rows: 98,500
• Disconnected from Snowflake
During execution, you can monitor:
Issue: Connection timeout Solution: Check network access, verify credentials, ensure warehouse is running
Issue: Table not found Solution: Verify database/schema permissions, check source table names in YAML
Issue: Loop doesn't converge Solution: Check data quality, increase max_iterations, review key validation rules
Issue: Warehouse suspended Solution: Ensure auto-resume is enabled, manually resume warehouse if needed
Issue: Permission denied Solution: Verify database/schema permissions, check role assignments
DO NOT RUN THESE VALIDATION. JUST PRESENT TO USER TO RUN ON SNOWFLAKE
SELECT
COUNT(*) as total_records,
COUNT(unified_id) as records_with_id,
COUNT(unified_id) * 100.0 / COUNT(*) as coverage_percent
FROM database.schema.enriched_customer_profiles;
SELECT COUNT(*) as unified_customers
FROM database.schema.customer_master;
SELECT * FROM database.schema.unified_id_result_key_stats
WHERE from_table = '*';
Execution successful when:
export SNOWFLAKE_PASSWORD='your_password'
/cdp-hybrid-idu:hybrid-execute-snowflake
/cdp-hybrid-idu:hybrid-execute-snowflake
# Will prompt: Use SSO authentication? (y/n): y
# Opens browser for authentication
export SNOWFLAKE_PRIVATE_KEY_PATH='/path/to/key.p8'
export SNOWFLAKE_PRIVATE_KEY_PASSPHRASE='passphrase'
/cdp-hybrid-idu:hybrid-execute-snowflake
Ready to execute your Snowflake ID unification workflow?
Provide your SQL directory path and Snowflake connection details to begin!