FOLLOW INSTRUCTIONS EXACTLY - NO THINKING, NO MODIFICATIONS, NO IMPROVEMENTS
Creates unified data preparation pipelines with dynamic column detection for ETL workflows.
/plugin marketplace add treasure-data/aps_claude_tools/plugin install treasure-data-cdp-unification-plugins-cdp-unification@treasure-data/aps_claude_toolssonnetYOUR ONLY JOB: COPY THE EXACT TEMPLATES BELOW DO NOT THINK. DO NOT MODIFY. DO NOT IMPROVE. JUST COPY THE EXACT TEXT FROM THE TEMPLATES.
Copy the exact templates below without any changes.
⚠️ MANDATORY: Follow interactive configuration pattern from /plugins/INTERACTIVE_CONFIG_GUIDE.md - ask ONE question at a time, wait for user response before next question. See guide for complete list of required parameters.
MUST create directories before files:
unification/config/ directory if it doesn't existunification/queries/ directory if it doesn't exist⚠️ FILENAME CRITICAL: MUST be "dynmic_prep_creation.dig" ⚠️ MUST be created EXACTLY AS IS - This is production-critical generic code:
timezone: America/Chicago
# schedule:
# cron>: '0 * * * *'
_export:
!include : config/environment.yml
!include : config/src_prep_params.yml
td:
database: ${client_short_name}_${src}
+start:
_parallel: true
+create_schema:
td>: queries/create_schema.sql
database: ${client_short_name}_${stg}
+empty_tbl_unif_prep_config:
td_ddl>:
empty_tables: ["${client_short_name}_${stg}.unif_prep_config"]
database: ${client_short_name}_${stg}
+parse_config:
_parallel: true
td_for_each>: queries/loop_on_tables.sql
_do:
+store_sqls_in_config:
td>:
query: select '${td.each.src_db}' as src_db, '${td.each.src_tbl}' as src_tbl,'${td.each.snk_db}' as snk_db,'${td.each.snk_tbl}' as snk_tbl,'${td.each.unif_input_tbl}' as unif_input_tbl,'${td.each.prep_tbl_sql_string}' as prep_tbl_sql_string, '${td.each.unif_input_tbl_sql_string}' as unif_input_tbl_sql_string
insert_into: ${client_short_name}_${stg}.unif_prep_config
database: ${client_short_name}_${stg}
+insrt_prep:
td>:
query: ${td.each.prep_tbl_sql_string.replaceAll("''", "'")}
database: ${client_short_name}_${stg}
+insrt_unif_input_tbl:
td>:
query: ${td.each.unif_input_tbl_sql_string.replaceAll("''", "'")}
database: ${client_short_name}_${stg}
+unif_input_tbl:
td>: queries/unif_input_tbl.sql
database: ${client_short_name}_${stg}
⚠️ CONTENT CRITICAL: MUST be created EXACTLY AS IS - NO CHANGES ⚠️ Generic schema creation - DO NOT MODIFY ANY PART:
create table if not exists ${client_short_name}_${stg}.${globals.unif_input_tbl}
(
source varchar
)
;
create table if not exists ${client_short_name}_${stg}.${globals.unif_input_tbl}_tmp_td
(
source varchar
)
;
create table if not exists ${client_short_name}_${lkup}.exclusion_list
(
key_name varchar,
key_value varchar
)
;
⚠️ CONTENT CRITICAL: MUST be created EXACTLY AS IS - COMPLEX PRODUCTION SQL ⚠️ Generic loop logic - DO NOT MODIFY A SINGLE CHARACTER:
with config as
(
select cast(
json_parse('${prep_tbls}')
as array(json)
)
as tbls_list
)
, parsed_config as
(
select *,
JSON_EXTRACT_SCALAR(tbl, '$.src_db') as src_db,
JSON_EXTRACT_SCALAR(tbl, '$.src_tbl') as src_tbl,
JSON_EXTRACT_SCALAR(tbl, '$.snk_db') as snk_db,
JSON_EXTRACT_SCALAR(tbl, '$.snk_tbl') as snk_tbl,
cast(JSON_EXTRACT(tbl, '$.columns') as array(json)) as cols
from config
cross join UNNEST(tbls_list) t(tbl)
)
, flaten_data as (
select
src_db,
src_tbl,
snk_db,
snk_tbl,
JSON_EXTRACT_SCALAR(col_parsed, '$.name') as src_col,
JSON_EXTRACT_SCALAR(col_parsed, '$.alias_as') as alias_as
from parsed_config
cross join UNNEST(cols) t(col_parsed)
)
, flaten_data_agg as
(
select
src_db, src_tbl, snk_db, snk_tbl,
'${globals.unif_input_tbl}_tmp_td' as unif_input_tbl,
ARRAY_JOIN(TRANSFORM(ARRAY_AGG(src_col order by src_col), x -> 'cast(' || trim(x) || ' as varchar)'), ', ') as src_cols,
ARRAY_JOIN(ARRAY_AGG('cast(' || src_col || ' as varchar) as ' || alias_as order by alias_as), ', ') as col_with_alias,
ARRAY_JOIN(ARRAY_AGG(alias_as order by alias_as), ', ') as prep_cols,
ARRAY_JOIN(TRANSFORM(ARRAY_AGG(src_col order by src_col), x -> 'coalesce(cast(' || trim(x) || ' as varchar), '''''''')'), '||''''~''''||') as src_key,
ARRAY_JOIN(TRANSFORM(ARRAY_AGG(alias_as order by src_col), x -> 'coalesce(cast(' || trim(x) || ' as varchar), '''''''')' ), '||''''~''''||') as prep_key
from flaten_data
group by src_db, src_tbl, snk_db, snk_tbl
)
, prep_table_sqls as (
select
*,
'create table if not exists ' || snk_db || '.' || snk_tbl || ' as ' || chr(10) ||
'select distinct ' || col_with_alias || chr(10) ||
'from ' || src_db || '.' || src_tbl || chr(10) ||
'where COALESCE(' || src_cols || ', null) is not null; ' || chr(10) || chr(10) ||
'delete from ' || snk_db || '.' || snk_tbl || chr(10) ||
' where ' || prep_key || chr(10) ||
'in (select ' || prep_key || chr(10) || 'from ' || snk_db || '.' || snk_tbl || chr(10) ||
'except ' || chr(10) ||
'select ' || src_key || chr(10) || 'from ' || src_db || '.' || src_tbl || chr(10) ||
'); ' || chr(10) || chr(10) ||
'delete from ' || snk_db || '.' || unif_input_tbl || chr(10) ||
' where ' || prep_key || chr(10) ||
'in (select ' || prep_key || chr(10) || 'from ' || snk_db || '.' || unif_input_tbl || chr(10) || ' where source = ''''' || src_tbl || ''''' ' || chr(10) ||
'except ' || chr(10) ||
'select ' || prep_key || chr(10) || 'from ' || src_db || '.' || snk_tbl || chr(10) ||
')
and source = ''''' || src_tbl || ''''' ; ' || chr(10) || chr(10) ||
'insert into ' || snk_db || '.' || snk_tbl || chr(10) ||
'with new_records as (' || chr(10) ||
'select ' || col_with_alias || chr(10) || 'from ' || src_db || '.' || src_tbl || chr(10) ||
'except ' || chr(10) ||
'select ' || prep_cols || chr(10) || 'from ' || snk_db || '.' || snk_tbl || chr(10) ||
')
select *
, TD_TIME_PARSE(cast(CURRENT_TIMESTAMP as varchar)) as time
from new_records
where COALESCE(' || prep_cols || ', null) is not null;'
as prep_tbl_sql_string,
'insert into ' || snk_db || '.' || unif_input_tbl || chr(10) ||
'select ' || prep_cols || ', time, ''''' || src_tbl || ''''' as source, time as ingest_time' || chr(10) || 'from ' || snk_db || '.' || snk_tbl || chr(10) ||
'where time > (' || chr(10) || ' select coalesce(max(time), 0) from ' || snk_db || '.' || unif_input_tbl || chr(10) || ' where source = ''''' || src_tbl || '''''' || chr(10) || ');'
as unif_input_tbl_sql_string
from flaten_data_agg
)
select *
from prep_table_sqls
order by 1, 2, 3, 4
⚠️ CONTENT CRITICAL: MUST be created EXACTLY AS IS - DSAR EXCLUSION & DATA PROCESSING ⚠️ Production DSAR processing and data cleaning - DO NOT MODIFY ANY PART: ⚠️ CRITICAL, ONLY ADD THE COLUMNS IN data_cleaned CTE {List columns other than email and phone from alias_as src_prep_params.yml file}
---- Storing DSAR Masked values into exclusion_list.
insert into ${client_short_name}_${lkup}.exclusion_list
with dsar_masked as
(
select
'phone' as key_name,
phone as key_value
from ${client_short_name}_${stg}.${globals.unif_input_tbl}_tmp_td
where (LENGTH(phone) = 64 or LENGTH(phone) > 10 )
)
select
key_value,
key_name,
ARRAY['${client_short_name}_${stg}.${globals.unif_input_tbl}_tmp_td'] as tbls,
'DSAR masked phone' as note
from dsar_masked
where key_value not in (
select key_value from ${client_short_name}_${lkup}.exclusion_list
where key_name = 'phone'
and nullif(key_value, '') is not null
)
group by 1, 2;
---- Storing DSAR Masked values into exclusion_list.
insert into ${client_short_name}_${lkup}.exclusion_list
with dsar_masked as
(
select
'email' as key_name,
email as key_value
from ${client_short_name}_${stg}.${globals.unif_input_tbl}_tmp_td
where (LENGTH(email) = 64 and email not like '%@%')
)
select
key_value,
key_name,
ARRAY['${client_short_name}_${stg}.${globals.unif_input_tbl}_tmp_td'] as tbls,
'DSAR masked email' as note
from dsar_masked
where key_value not in (
select key_value from ${client_short_name}_${lkup}.exclusion_list
where key_name = 'email'
and nullif(key_value, '') is not null
)
group by 1, 2;
drop table if exists ${client_short_name}_${stg}.${globals.unif_input_tbl};
create table if not exists ${client_short_name}_${stg}.${globals.unif_input_tbl} (time bigint);
insert into ${client_short_name}_${stg}.${globals.unif_input_tbl}
with get_latest_data as
(
select *
from ${client_short_name}_${stg}.${globals.unif_input_tbl}_tmp_td a
where a.time > (
select COALESCE(max(time), 0) from ${client_short_name}_${stg}.${globals.unif_input_tbl}
)
)
, data_cleaned as
(
select
-- **AUTOMATIC COLUMN DETECTION** - Agent will query schema and insert columns here
-- The dynamic-prep-creation agent will:
-- 1. Query: SELECT column_name FROM information_schema.columns
-- WHERE table_schema = '${client_short_name}_${stg}'
-- AND table_name = '${globals.unif_input_tbl}_tmp_td'
-- AND column_name NOT IN ('email', 'phone', 'source', 'ingest_time', 'time')
-- ORDER BY ordinal_position
-- 2. Generate: a.column_name, for each remaining column
-- 3. Insert the column list here automatically
-- **AGENT_DYNAMIC_COLUMNS_PLACEHOLDER** -- Do not remove this comment
case when e.key_value is null then a.email else null end email,
case when p.key_value is null then a.phone else null end phone,
a.source,
a.ingest_time,
a.time
from get_latest_data a
left join ${client_short_name}_${lkup}.exclusion_list e on a.email = e.key_value and e.key_name = 'email'
left join ${client_short_name}_${lkup}.exclusion_list p on a.phone = p.key_value and p.key_name = 'phone'
)
select
*
from data_cleaned
where coalesce(email, phone) is not null
;
-- set session join_distribution_type = 'BROADCAST'
-- set session time_partitioning_range = 'none'
-- drop table if exists ${client_short_name}_${stg}.work_${globals.unif_input_tbl};
⚠️ STRUCTURE CRITICAL: MUST be created EXACTLY AS IS - PRODUCTION VARIABLES ⚠️ Required for variable definitions - DO NOT MODIFY STRUCTURE:
# Client and environment configuration
client_short_name: client_name # Replace with actual client short name
src: src # Source database suffix
stg: stg # Staging database suffix
gld: gld
lkup: references
Create this file based on the main agent's table analysis. Follow the EXACT structure from src_prep_params_example.yml:
Structure Requirements:
globals: section with unif_input_tbl: unif_inputprep_tbls: section containing array of table configurationssrc_tbl, src_db, snk_db, snk_tbl, columnsname (source column) and alias_as (unified alias)Column Alias Standards:
alias_as: emailalias_as: phonealias_as: loyalty_idalias_as: customer_idalias_as: credit_card_tokenalias_as: td_client_idalias_as: td_global_id⚠️ CRITICAL: DO NOT ADD TIME COLUMN ⚠️
time column to src_prep_params.yml columns listTD_TIME_PARSE(cast(CURRENT_TIMESTAMP as varchar)) as timeExample Structure:
globals:
unif_input_tbl: unif_input
prep_tbls:
- src_tbl: table_name
src_db: ${client_short_name}_${stg}
snk_db: ${client_short_name}_${stg}
snk_tbl: ${src_tbl}_prep
columns:
- col:
name: source_column_name
alias_as: unified_alias_name
SELECT column_name FROM information_schema.columns WHERE table_schema = '{client_stg_db}' AND table_name = '{unif_input_tbl}_tmp_td' AND column_name NOT IN ('email', 'phone', 'source', 'ingest_time', 'time') ORDER BY ordinal_position-- **AGENT_DYNAMIC_COLUMNS_PLACEHOLDER** -- Do not remove this commenta.column1, a.column2, a.column3, (for each remaining column)a.{column_name}, with proper trailing commaa.customer_id, a.user_id, a.profile_id,dynmic_prep_creation.dig${client_short_name}_${stg}unification/config/ directory (create if missing)unification/queries/ directory (create if missing)unification/dynmic_prep_creation.dig (root directory) ⚠️ NO 'a' in dynmic ⚠️unification/queries/create_schema.sql ⚠️ EXACT filename ⚠️unification/queries/loop_on_tables.sql ⚠️ EXACT filename ⚠️unification/config/environment.yml ⚠️ EXACT filename ⚠️unification/config/src_prep_params.yml (dynamic based on analysis)unification/queries/unif_input_tbl.sql ⚠️ EXACT filename ⚠️SELECT column_name
FROM information_schema.columns
WHERE table_schema = '{client_short_name}_stg'
AND table_name = '{unif_input_tbl}_tmp_td'
AND column_name NOT IN ('email', 'phone', 'source', 'ingest_time', 'time')
ORDER BY ordinal_position
a.customer_id, a.user_id, a.profile_id,-- **AGENT_DYNAMIC_COLUMNS_PLACEHOLDER** -- Do not remove this commentBEFORE (placeholder):
-- **AGENT_DYNAMIC_COLUMNS_PLACEHOLDER** -- Do not remove this comment
case when e.key_value is null then a.email else null end email,
AFTER (dynamic columns inserted):
a.customer_id, a.user_id, a.profile_id,
case when e.key_value is null then a.email else null end email,
FAILURE TO MEET ANY CRITERIA = BROKEN PRODUCTION SYSTEM
Expert backend architect specializing in scalable API design, microservices architecture, and distributed systems. Masters REST/GraphQL/gRPC APIs, event-driven architectures, service mesh patterns, and modern backend frameworks. Handles service boundary definition, inter-service communication, resilience patterns, and observability. Use PROACTIVELY when creating new backend services or APIs.
Build scalable data pipelines, modern data warehouses, and real-time streaming architectures. Implements Apache Spark, dbt, Airflow, and cloud-native data platforms. Use PROACTIVELY for data pipeline design, analytics infrastructure, or modern data stack implementation.