developing-incremental-models
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesedbt Incremental Model Development
dbt增量模型开发
Choose the right strategy. Design the unique_key carefully. Handle edge cases.
选择合适的策略。谨慎设计unique_key。处理边缘情况。
When to Use Incremental
何时使用增量模型
| Scenario | Recommendation |
|---|---|
| Source data < 10M rows | Use |
| Source data > 10M rows | Consider |
| Source data updated in place | Use |
| Append-only source (logs, events) | Use |
| Partitioned warehouse data | Use |
Default to unless you have a clear performance reason for incremental.
table| 场景 | 建议 |
|---|---|
| 源数据行数 < 1000万 | 使用 |
| 源数据行数 > 1000万 | 考虑使用 |
| 源数据原地更新 | 使用带 |
| 仅追加型源数据(日志、事件) | 使用带 |
| 分区化数据仓库 | 若支持则使用 |
除非有明确的性能需求,否则默认使用。
tableCritical Rules
关键规则
- ALWAYS test with first before relying on incremental logic
--full-refresh - ALWAYS verify unique_key is truly unique in both source and target
- If merge fails 3+ times, check unique_key for duplicates
- Run full refresh periodically to prevent data drift
- 在依赖增量逻辑之前,务必先使用测试
--full-refresh - 务必验证unique_key在源数据和目标数据中都是真正唯一的
- 如果合并失败3次以上,检查unique_key是否存在重复值
- 定期执行全量刷新,防止数据漂移
Workflow
工作流程
1. Confirm Incremental is Needed
1. 确认是否需要增量模型
bash
undefinedbash
undefinedCheck source table size
检查源表大小
dbt show --inline "select count(*) from {{ source('schema', 'table') }}"
If count < 10 million, consider using `table` instead. Incremental adds complexity.dbt show --inline "select count(*) from {{ source('schema', 'table') }}"
如果行数少于1000万,考虑使用`table`替代。增量模型会增加复杂度。2. Understand the Source Data Pattern
2. 了解源数据模式
Before choosing a strategy, answer:
- Is data append-only? (new rows added, never updated)
- Are existing rows updated? (need merge/upsert)
- Is there a reliable timestamp? (for filtering new data)
- What's the unique identifier? (for merge matching)
bash
undefined在选择策略之前,先回答以下问题:
- 数据是否为仅追加型?(仅添加新行,从不更新)
- 现有行是否会被更新?(需要merge/upsert)
- 是否有可靠的时间戳?(用于过滤新数据)
- 唯一标识符是什么?(用于匹配合并)
bash
undefinedCheck for timestamp column
检查时间戳列
dbt show --inline "
select
min(updated_at) as earliest,
max(updated_at) as latest,
count(distinct date(updated_at)) as days_of_data
from {{ source('schema', 'table') }}
"
undefineddbt show --inline "
select
min(updated_at) as earliest,
max(updated_at) as latest,
count(distinct date(updated_at)) as days_of_data
from {{ source('schema', 'table') }}
"
undefined3. Choose the Right Strategy
3. 选择合适的策略
| Strategy | Use When | How It Works |
|---|---|---|
| Data is append-only, no updates | INSERT only, no deduplication |
| Data can be updated | MERGE/UPSERT by unique_key |
| Data updated in batches | DELETE matching rows, then INSERT |
| Partitioned tables (BigQuery, Spark) | Replace entire partitions |
Default: is safest for most use cases.
mergeNote: Strategy availability varies by adapter. Check the dbt incremental strategy docs for your specific warehouse.
| 策略 | 使用场景 | 工作原理 |
|---|---|---|
| 数据为仅追加型,无更新 | 仅执行INSERT,不进行去重 |
| 数据可能被更新 | 通过unique_key执行MERGE/UPSERT |
| 数据批量更新 | 先删除匹配行,再执行INSERT |
| 分区表(BigQuery、Spark) | 替换整个分区 |
默认策略:是大多数场景下最安全的选择。
merge**注意:**策略的可用性因适配器而异。请查看dbt增量策略文档了解您使用的数据仓库的具体支持情况。
4. Design the Unique Key
4. 设计Unique Key
CRITICAL: unique_key must be truly unique in your data.
bash
undefined关键:unique_key必须在数据中是真正唯一的。
bash
undefinedVerify uniqueness BEFORE creating model
在创建模型前验证唯一性
dbt show --inline "
select {{ unique_key_column }}, count()
from {{ source('schema', 'table') }}
group by 1
having count() > 1
limit 10
"
If duplicates exist:
- Add more columns to make composite key
- Add deduplication logic in model
- Use `delete+insert` instead of `merge`dbt show --inline "
select {{ unique_key_column }}, count()
from {{ source('schema', 'table') }}
group by 1
having count() > 1
limit 10
"
如果存在重复值:
- 添加更多列组成复合键
- 在模型中添加去重逻辑
- 使用`delete+insert`替代`merge`5. Write the Incremental Model
5. 编写增量模型
sql
{{
config(
materialized='incremental',
incremental_strategy='merge', -- or append, delete+insert
unique_key='id', -- MUST be unique
on_schema_change='append_new_columns' -- handle new columns
)
}}
select
id,
column_a,
column_b,
updated_at
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}sql
{{
config(
materialized='incremental',
incremental_strategy='merge', -- 或append、delete+insert
unique_key='id', -- 必须唯一
on_schema_change='append_new_columns' -- 处理新增列
)
}}
select
id,
column_a,
column_b,
updated_at
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}6. Build with Full Refresh First
6. 先执行全量刷新构建
ALWAYS verify with full refresh before trusting incremental logic.
bash
undefined在信任增量逻辑之前,务必先通过全量刷新验证。
bash
undefinedFirst run: full refresh to establish baseline
首次运行:全量刷新以建立基线
dbt build --select <model_name> --full-refresh
dbt build --select <model_name> --full-refresh
Verify output
验证输出
dbt show --select <model_name> --limit 10
dbt show --inline "select count(*) from {{ ref('model_name') }}"
undefineddbt show --select <model_name> --limit 10
dbt show --inline "select count(*) from {{ ref('model_name') }}"
undefined7. Test Incremental Logic
7. 测试增量逻辑
bash
undefinedbash
undefinedRun incrementally (no --full-refresh)
增量运行(不添加--full-refresh)
dbt build --select <model_name>
dbt build --select <model_name>
Verify row count changed appropriately
验证行数是否有合理变化
dbt show --inline "select count(*) from {{ ref('model_name') }}"
undefineddbt show --inline "select count(*) from {{ ref('model_name') }}"
undefined8. Handle Schema Changes
8. 处理Schema变更
Set based on your needs:
on_schema_change| Setting | Behavior |
|---|---|
| New columns in source are ignored |
| New columns added to target |
| Target schema matches source exactly |
| Error if schema changes |
根据需求设置:
on_schema_change| 设置 | 行为 |
|---|---|
| 忽略源数据中的新增列 |
| 将新增列添加到目标表 |
| 目标表Schema与源表完全匹配 |
| 若Schema变更则抛出错误 |
Common Incremental Problems
常见增量模型问题
Problem: Merge Fails with Duplicate Key
问题:合并因重复键失败
Symptom: "Cannot MERGE with duplicate values"
Cause: Multiple rows with same unique_key in source or target.
Fix:
sql
-- Add deduplication using a CTE (cross-database compatible)
with deduplicated as (
select *,
row_number() over (partition by id order by updated_at desc) as rn
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
)
select * from deduplicated where rn = 1症状:“Cannot MERGE with duplicate values”
**原因:**源数据或目标数据中存在多个具有相同unique_key的行。
修复方案:
sql
-- 使用CTE添加去重逻辑(跨数据库兼容)
with deduplicated as (
select *,
row_number() over (partition by id order by updated_at desc) as rn
from {{ source('schema', 'table') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
)
select * from deduplicated where rn = 1Problem: No Partition Pruning (Full Table Scan)
问题:无分区裁剪(全表扫描)
Symptom: Incremental runs take as long as full refresh.
Cause: Dynamic date filter prevents partition pruning.
Fix:
sql
{% if is_incremental() %}
-- Use static date instead of subquery for partition pruning
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
and updated_at > (select max(updated_at) from {{ this }})
{% endif %}**症状:**增量运行耗时与全量刷新相当。
**原因:**动态日期过滤导致无法进行分区裁剪。
修复方案:
sql
{% if is_incremental() %}
-- 使用静态日期而非子查询以支持分区裁剪
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
and updated_at > (select max(updated_at) from {{ this }})
{% endif %}Problem: Late-Arriving Data is Missed
问题:延迟到达的数据被遗漏
Symptom: Some records never appear in incremental model.
Cause: Filtering by max(updated_at) misses late arrivals.
Fix: Use a lookback window with a fixed offset from current date:
sql
{% if is_incremental() %}
-- Lookback 3 days to catch late-arriving data
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}Alternatively, use a variable for the lookback period:
sql
{% set lookback_days = 3 %}
{% if is_incremental() %}
where updated_at >= {{ dbt.dateadd('day', -lookback_days, dbt.current_timestamp()) }}
{% endif %}**症状:**部分记录从未出现在增量模型中。
**原因:**通过max(updated_at)过滤会遗漏延迟到达的数据。
**修复方案:**使用基于当前日期的固定偏移量作为回溯窗口:
sql
{% if is_incremental() %}
-- 回溯3天以捕获延迟到达的数据
where updated_at >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}或者,使用变量设置回溯周期:
sql
{% set lookback_days = 3 %}
{% if is_incremental() %}
where updated_at >= {{ dbt.dateadd('day', -lookback_days, dbt.current_timestamp()) }}
{% endif %}Problem: Schema Drift Causes Errors
问题:Schema漂移导致错误
Symptom: "Column X not found" after source adds column.
Fix: Set in config.
on_schema_change='append_new_columns'**症状:**源数据添加列后出现“Column X not found”错误。
**修复方案:**在配置中设置。
on_schema_change='append_new_columns'Problem: Data Drift Over Time
问题:数据随时间漂移
Symptom: Counts diverge between incremental and full refresh.
Fix: Schedule periodic full refresh:
bash
undefined**症状:**增量模型与全量刷新的行数逐渐不一致。
**修复方案:**定期执行全量刷新:
bash
undefinedWeekly full refresh
每周执行一次全量刷新
dbt build --select <model_name> --full-refresh
undefineddbt build --select <model_name> --full-refresh
undefinedIncremental Strategy Reference
增量策略参考
Append (Simplest)
Append(最简单)
sql
{{ config(materialized='incremental', incremental_strategy='append') }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}- No unique_key needed
- Fastest performance
- Only use for append-only data (logs, events, immutable records)
sql
{{ config(materialized='incremental', incremental_strategy='append') }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}- 不需要unique_key
- 性能最快
- 仅适用于仅追加型数据(日志、事件、不可变记录)
Merge (Default)
Merge(默认)
sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='id'
) }}
select * from {{ source('crm', 'contacts') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}- Requires unique_key
- Handles updates and inserts
- Most common strategy
sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='id'
) }}
select * from {{ source('crm', 'contacts') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}- 需要unique_key
- 处理更新和插入
- 最常用的策略
Delete+Insert (Batch Updates)
Delete+Insert(批量更新)
sql
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='id'
) }}
select * from {{ source('orders', 'raw') }}
{% if is_incremental() %}
where order_date >= {{ dbt.dateadd('day', -7, dbt.current_timestamp()) }}
{% endif %}- Deletes all matching rows first
- Good for reprocessing batches
- Use when merge has duplicate key issues
sql
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='id'
) }}
select * from {{ source('orders', 'raw') }}
{% if is_incremental() %}
where order_date >= {{ dbt.dateadd('day', -7, dbt.current_timestamp()) }}
{% endif %}- 先删除所有匹配行
- 适合重新处理批量数据
- 当merge存在重复键问题时使用
Insert Overwrite (Partitioned)
Insert Overwrite(分区表)
sql
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'event_date', 'data_type': 'date'}
) }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_date >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}- Replaces entire partitions
- Best for partitioned tables in BigQuery/Spark
- No unique_key needed (operates on partitions)
sql
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'event_date', 'data_type': 'date'}
) }}
select * from {{ source('events', 'raw') }}
{% if is_incremental() %}
where event_date >= {{ dbt.dateadd('day', -3, dbt.current_timestamp()) }}
{% endif %}- 替换整个分区
- 最适合BigQuery/Spark中的分区表
- 不需要unique_key(基于分区操作)
Anti-Patterns
反模式
- Using incremental for small tables (< 10M rows)
- Not testing with full-refresh first
- Using append strategy when data can be updated
- Not verifying unique_key uniqueness
- Relying on exact timestamp match without lookback
- Never running full refresh (causes data drift)
- Using merge with non-unique keys
- 为小表(<1000万行)使用增量模型
- 未先执行全量刷新测试
- 当数据可能被更新时使用append策略
- 未验证unique_key的唯一性
- 依赖精确时间戳匹配而不设置回溯窗口
- 从不执行全量刷新(导致数据漂移)
- 使用非唯一键执行merge
Testing Checklist
测试检查清单
- Model runs with
--full-refresh - Model runs incrementally (without flag)
- unique_key verified as truly unique
- Row counts reasonable after incremental run
- Late-arriving data handled (lookback window)
- Schema changes handled (on_schema_change set)
- Periodic full refresh scheduled
- 模型可通过运行
--full-refresh - 模型可增量运行(不添加标志)
- unique_key已验证为真正唯一
- 增量运行后行数合理
- 延迟到达的数据已处理(设置回溯窗口)
- Schema变更已处理(设置on_schema_change)
- 已定期执行全量刷新