From snowflake-skills
Develops and troubleshoots dbt incremental models: selects strategies (append, merge, insert_overwrite), designs unique_keys, optimizes performance, fixes merge errors, partition pruning, and schema drift.
npx claudepluginhub altimateai/data-engineering-skills --plugin dbt-skillsThis skill uses the workspace's default tool permissions.
**Choose the right strategy. Design the unique_key carefully. Handle edge cases.**
Guides Next.js Cache Components and Partial Prerendering (PPR) with cacheComponents enabled. Implements 'use cache', cacheLife(), cacheTag(), revalidateTag(), static/dynamic optimization, and cache debugging.
Guides building MCP servers enabling LLMs to interact with external services via tools. Covers best practices, TypeScript/Node (MCP SDK), Python (FastMCP).
Generates original PNG/PDF visual art via design philosophy manifestos for posters, graphics, and static designs on user request.
Choose the right strategy. Design the unique_key carefully. Handle edge cases.
| Scenario | Recommendation |
|---|---|
| Source data < 10M rows | Use table (simpler, full refresh is fast) |
| Source data > 10M rows | Consider incremental |
| Source data updated in place | Use incremental with merge strategy |
| Append-only source (logs, events) | Use incremental with append strategy |
| Partitioned warehouse data | Use insert_overwrite if supported |
Default to table unless you have a clear performance reason for incremental.
--full-refresh first before relying on incremental logic# Check source table size
dbt show --inline "select count(*) from {{ source('schema', 'table') }}"
If count < 10 million, consider using table instead. Incremental adds complexity.
Before choosing a strategy, answer:
# Check for timestamp column
dbt show --inline "
select
min(updated_at) as earliest,
max(updated_at) as latest,
count(distinct date(updated_at)) as days_of_data
from {{ source('schema', 'table') }}
"
| Strategy | Use When | How It Works |
|---|---|---|
append | Data is append-only, no updates | INSERT only, no deduplication |
merge | Data can be updated | MERGE/UPSERT by unique_key |
delete+insert | Data updated in batches | DELETE matching rows, then INSERT |
insert_overwrite | Partitioned tables (BigQuery, Spark) | Replace entire partitions |
Default: merge is safest for most use cases.
Note: Strategy availability varies by adapter. Check the dbt incremental strategy docs for your specific warehouse.
CRITICAL: unique_key must be truly unique in your data.
# Verify uniqueness BEFORE creating model
dbt show --inline "
select {{ unique_key_column }}, count(*)
from {{ source('schema', 'table') }}
group by 1
having count(*) > 1
limit 10
"
If duplicates exist:
delete+insert instead of merge{{
config(
materialized='incremental',
incremental_strategy='merge', -- or append, delete+insert
unique_key='id', -- MUST be unique
on_schema_change='append_new_columns' -- handle new columns
)
}}
select
id,
column_a,
column_b,
updated_at
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
ALWAYS verify with full refresh before trusting incremental logic.
# First run: full refresh to establish baseline
dbt build --select <model_name> --full-refresh
# Verify output
dbt show --select <model_name> --limit 10
dbt show --inline "select count(*) from {{ ref('model_name') }}"
# Run incrementally (no --full-refresh)
dbt build --select <model_name>
# Verify row count changed appropriately
dbt show --inline "select count(*) from {{ ref('model_name') }}"
Set on_schema_change based on your needs:
| Setting | Behavior |
|---|---|
ignore (default) | New columns in source are ignored |
append_new_columns | New columns added to target |
sync_all_columns | Target schema matches source exactly |
fail | Error if schema changes |
Symptom: "Cannot MERGE with duplicate values"
Cause: Multiple rows with same unique_key in source or target.
Fix:
-- Add deduplication using a CTE (cross-database compatible)
with deduplicated as (
select *,
row_number() over (partition by id order by updated_at desc) as rn
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
)
select * from deduplicated where rn = 1
Symptom: Incremental runs take as long as full refresh.
Cause: Dynamic date filter prevents partition pruning.
Fix:
{% if is_incremental() %}
-- Use static date instead of subquery for partition pruning
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
and updated_at > (select max(updated_at) from {{ this }})
{% endif %}
Symptom: Some records never appear in incremental model.
Cause: Filtering by max(updated_at) misses late arrivals.
Fix: Use a lookback window with a fixed offset from current date:
{% if is_incremental() %}
-- Lookback 3 days to catch late-arriving data
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}
Alternatively, use a variable for the lookback period:
{% set lookback_days = 3 %}
{% if is_incremental() %}
where updated_at >= {{ dbt.dateadd('day', -lookback_days, dbt.current_timestamp()) }}
{% endif %}
Symptom: "Column X not found" after source adds column.
Fix: Set on_schema_change='append_new_columns' in config.
Symptom: Counts diverge between incremental and full refresh.
Fix: Schedule periodic full refresh:
# Weekly full refresh
dbt build --select <model_name> --full-refresh
{{ config(materialized='incremental', incremental_strategy='append') }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='id'
) }}
select * from {{ source('crm', 'contacts') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='id'
) }}
select * from {{ source('orders', 'raw') }}
{% if is_incremental() %}
where order_date >= {{ dbt.dateadd('day', -7, dbt.current_timestamp()) }}
{% endif %}
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'event_date', 'data_type': 'date'}
) }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_date >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}
--full-refresh