Execute Databricks ID unification workflow with convergence detection and monitoring
Executes Databricks ID unification workflow with convergence detection and real-time monitoring
/plugin marketplace add treasure-data/aps_claude_tools/plugin install treasure-data-cdp-hybrid-idu-plugins-cdp-hybrid-idu@treasure-data/aps_claude_toolsExecute your generated Databricks SQL workflow with intelligent convergence detection, real-time monitoring, and interactive error handling. This command orchestrates the complete unification process from graph creation to master table generation.
databricks_sql/unify/)your-workspace.cloud.databricks.com)/sql/1.0/warehouses/abc123)Option 1: Personal Access Token (PAT)
DATABRICKS_TOKENOption 2: OAuth
Display execution plan with:
I'll call the databricks-workflow-executor agent to:
Intelligent Loop Execution:
Iteration 1:
✓ Execute unify SQL
• Check convergence: 1500 records updated
• Optimize Delta table
→ Continue to iteration 2
Iteration 2:
✓ Execute unify SQL
• Check convergence: 450 records updated
• Optimize Delta table
→ Continue to iteration 3
Iteration 3:
✓ Execute unify SQL
• Check convergence: 0 records updated
✓ CONVERGED! Stop loop
Features:
Provide:
/cdp-hybrid-idu:hybrid-execute-databricks
I'll prompt you for:
- SQL directory path
- Databricks server hostname
- HTTP path
- Catalog and schema
- Authentication method
Provide all parameters upfront:
SQL directory: databricks_sql/unify/
Server hostname: your-workspace.cloud.databricks.com
HTTP path: /sql/1.0/warehouses/abc123
Catalog: my_catalog
Schema: my_schema
Auth type: pat (or oauth)
Access token: dapi... (if using PAT)
Algorithm:
SELECT COUNT(*) as updated_count FROM (
SELECT leader_ns, leader_id, follower_ns, follower_id
FROM current_iteration
EXCEPT
SELECT leader_ns, leader_id, follower_ns, follower_id
FROM previous_iteration
) diff
Stops when: updated_count = 0
After major operations:
OPTIMIZE table_name
Benefits:
If an error occurs:
✗ File: 04_unify_loop_iteration_01.sql
Error: Table not found: source_table
Continue with remaining files? (y/n):
You can choose to:
Track progress with:
After convergence, creates:
CREATE OR REPLACE TABLE catalog.schema.unified_id_graph_unify_loop_final
AS SELECT * FROM catalog.schema.unified_id_graph_unify_loop_3
This allows downstream SQL to reference loop_final regardless of actual iteration count.
The agent executes:
python3 scripts/databricks/databricks_sql_executor.py \
databricks_sql/unify/ \
--server-hostname your-workspace.databricks.com \
--http-path /sql/1.0/warehouses/abc123 \
--catalog my_catalog \
--schema my_schema \
--auth-type pat \
--optimize-tables
Setup Phase (01-03):
Unification Loop (04):
Canonicalization (05):
Statistics (06):
Enrichment (10-19):
Master Tables (20-29):
Metadata (30-39):
SQL directory: databricks_sql/unify/
Server hostname: dbc-12345-abc.cloud.databricks.com
HTTP path: /sql/1.0/warehouses/6789abcd
Catalog: customer_data
Schema: id_unification
Auth type: pat
✓ Connected to Databricks: dbc-12345-abc.cloud.databricks.com
• Using catalog: customer_data, schema: id_unification
Starting Databricks SQL Execution
• Catalog: customer_data
• Schema: id_unification
• Delta tables: ✓ enabled
Executing: 01_create_graph.sql
✓ 01_create_graph.sql: Executed successfully
Executing: 02_extract_merge.sql
✓ 02_extract_merge.sql: Executed successfully
• Rows affected: 125000
Executing: 03_source_key_stats.sql
✓ 03_source_key_stats.sql: Executed successfully
Executing Unify Loop Before Canonicalization
--- Iteration 1 ---
✓ Iteration 1 completed
• Rows processed: 125000
• Updated records: 1500
• Optimizing Delta table
--- Iteration 2 ---
✓ Iteration 2 completed
• Rows processed: 125000
• Updated records: 450
• Optimizing Delta table
--- Iteration 3 ---
✓ Iteration 3 completed
• Rows processed: 125000
• Updated records: 0
✓ Loop converged after 3 iterations
• Creating alias table for final iteration
✓ Alias table 'unified_id_graph_unify_loop_final' created
Executing: 05_canonicalize.sql
✓ 05_canonicalize.sql: Executed successfully
[... continues with enrichment, master tables, metadata ...]
Execution Complete
• Files processed: 18/18
• Final unified_id_lookup rows: 98,500
• Disconnected from Databricks
During execution, you can monitor:
Issue: Connection timeout Solution: Check network access, verify credentials, ensure SQL Warehouse is running
Issue: Table not found Solution: Verify catalog/schema permissions, check source table names in YAML
Issue: Loop doesn't converge Solution: Check data quality, increase max_iterations, review key validation rules
Issue: Out of memory Solution: Increase SQL Warehouse size, optimize clustering, reduce batch sizes
Issue: Permission denied Solution: Verify catalog/schema permissions, check Unity Catalog access controls
DO NOT RUN THESE VALIDATION. JUST PRESENT TO USER TO RUN ON DATABRICKS
SELECT
COUNT(*) as total_records,
COUNT(unified_id) as records_with_id,
COUNT(unified_id) * 100.0 / COUNT(*) as coverage_percent
FROM catalog.schema.enriched_customer_profiles;
SELECT COUNT(*) as unified_customers
FROM catalog.schema.customer_master;
SELECT * FROM catalog.schema.unified_id_result_key_stats
WHERE from_table = '*';
Execution successful when:
Ready to execute your Databricks ID unification workflow?
Provide your SQL directory path and Databricks connection details to begin!