Loading...
Loading...
Develops and troubleshoots dbt incremental models. Use when working with incremental materialization for: (1) Creating new incremental models (choosing strategy, unique_key, partition) (2) Task mentions "incremental", "append", "merge", "upsert", or "late arriving data" (3) Troubleshooting incremental failures (merge errors, partition pruning, schema drift) (4) Optimizing incremental performance or deciding table vs incremental Guides through strategy selection, handles common incremental gotchas.
npx skill4agent add altimateai/data-engineering-skills developing-incremental-models| Scenario | Recommendation |
|---|---|
| Source data < 10M rows | Use |
| Source data > 10M rows | Consider |
| Source data updated in place | Use |
| Append-only source (logs, events) | Use |
| Partitioned warehouse data | Use |
table--full-refresh# Check source table size
dbt show --inline "select count(*) from {{ source('schema', 'table') }}"table# Check for timestamp column
dbt show --inline "
select
min(updated_at) as earliest,
max(updated_at) as latest,
count(distinct date(updated_at)) as days_of_data
from {{ source('schema', 'table') }}
"| Strategy | Use When | How It Works |
|---|---|---|
| Data is append-only, no updates | INSERT only, no deduplication |
| Data can be updated | MERGE/UPSERT by unique_key |
| Data updated in batches | DELETE matching rows, then INSERT |
| Partitioned tables (BigQuery, Spark) | Replace entire partitions |
merge# Verify uniqueness BEFORE creating model
dbt show --inline "
select {{ unique_key_column }}, count(*)
from {{ source('schema', 'table') }}
group by 1
having count(*) > 1
limit 10
"delete+insertmerge{{
config(
materialized='incremental',
incremental_strategy='merge', -- or append, delete+insert
unique_key='id', -- MUST be unique
on_schema_change='append_new_columns' -- handle new columns
)
}}
select
id,
column_a,
column_b,
updated_at
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}# First run: full refresh to establish baseline
dbt build --select <model_name> --full-refresh
# Verify output
dbt show --select <model_name> --limit 10
dbt show --inline "select count(*) from {{ ref('model_name') }}"# Run incrementally (no --full-refresh)
dbt build --select <model_name>
# Verify row count changed appropriately
dbt show --inline "select count(*) from {{ ref('model_name') }}"on_schema_change| Setting | Behavior |
|---|---|
| New columns in source are ignored |
| New columns added to target |
| Target schema matches source exactly |
| Error if schema changes |
-- Add deduplication using a CTE (cross-database compatible)
with deduplicated as (
select *,
row_number() over (partition by id order by updated_at desc) as rn
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
)
select * from deduplicated where rn = 1{% if is_incremental() %}
-- Use static date instead of subquery for partition pruning
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
and updated_at > (select max(updated_at) from {{ this }})
{% endif %}{% if is_incremental() %}
-- Lookback 3 days to catch late-arriving data
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}{% set lookback_days = 3 %}
{% if is_incremental() %}
where updated_at >= {{ dbt.dateadd('day', -lookback_days, dbt.current_timestamp()) }}
{% endif %}on_schema_change='append_new_columns'# Weekly full refresh
dbt build --select <model_name> --full-refresh{{ config(materialized='incremental', incremental_strategy='append') }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='id'
) }}
select * from {{ source('crm', 'contacts') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='id'
) }}
select * from {{ source('orders', 'raw') }}
{% if is_incremental() %}
where order_date >= {{ dbt.dateadd('day', -7, dbt.current_timestamp()) }}
{% endif %}{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'event_date', 'data_type': 'date'}
) }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_date >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}--full-refresh