dbt-coder
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesedbt-Coder
dbt-Coder
Patterns for dbt (data build tool) transform layer development.
用于dbt(data build tool)转换层开发的模式
Project Structure
项目结构
my_dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/ # 1:1 with sources, light transforms
│ │ ├── stg_orders.sql
│ │ └── _staging.yml
│ ├── intermediate/ # Joins, business logic
│ │ └── int_orders_enriched.sql
│ └── marts/ # Final consumption layer
│ ├── finance/
│ │ └── fct_revenue.sql
│ └── marketing/
│ └── dim_customers.sql
├── seeds/ # Static lookup data
├── snapshots/ # SCD Type 2
├── macros/ # Reusable SQL
└── tests/ # Custom testsmy_dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/ # 与源数据一一对应,轻量转换
│ │ ├── stg_orders.sql
│ │ └── _staging.yml
│ ├── intermediate/ # 关联操作、业务逻辑实现
│ │ └── int_orders_enriched.sql
│ └── marts/ # 最终消费层
│ ├── finance/
│ │ └── fct_revenue.sql
│ └── marketing/
│ └── dim_customers.sql
├── seeds/ # 静态 lookup 数据
├── snapshots/ # 缓慢变化维度类型2(SCD Type 2)
├── macros/ # 可复用SQL
└── tests/ # 自定义测试dbt_project.yml
dbt_project.yml
yaml
name: 'my_project'
version: '1.0.0'
config-version: 2
profile: 'my_project'
model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
models:
my_project:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: martsyaml
name: 'my_project'
version: '1.0.0'
config-version: 2
profile: 'my_project'
model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
models:
my_project:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: martsStaging Models
Staging 模型
sql
-- models/staging/stg_orders.sql
-- Naming: stg_<source>_<entity>
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
-- Rename to consistent naming
id as order_id,
customer_id,
order_date,
total_amount as order_total,
-- Type casting
cast(status as varchar(50)) as order_status,
-- Timestamps
created_at,
updated_at
from source
)
select * from renamedsql
-- models/staging/stg_orders.sql
-- 命名规则:stg_<数据源>_<实体>
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
-- 重命名为统一命名规范
id as order_id,
customer_id,
order_date,
total_amount as order_total,
-- 类型转换
cast(status as varchar(50)) as order_status,
-- 时间戳字段
created_at,
updated_at
from source
)
select * from renamedSource Definition
源数据定义
yaml
undefinedyaml
undefinedmodels/staging/_sources.yml
models/staging/_sources.yml
version: 2
sources:
- name: raw
database: raw_db
schema: public
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: orders
identifier: orders_table
columns:
- name: id
tests:
- unique
- not_null
- name: id
tests:
- name: customers
- name: orders
identifier: orders_table
columns:
undefinedversion: 2
sources:
- name: raw
database: raw_db
schema: public
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: orders
identifier: orders_table
columns:
- name: id
tests:
- unique
- not_null
- name: id
tests:
- name: customers
- name: orders
identifier: orders_table
columns:
undefinedIntermediate Models
Intermediate 模型
sql
-- models/intermediate/int_orders_enriched.sql
-- Join staging models, apply business logic
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('stg_customers') }}
),
products as (
select * from {{ ref('stg_products') }}
)
select
o.order_id,
o.order_date,
o.order_total,
c.customer_id,
c.customer_name,
c.customer_segment,
-- Business logic
case
when o.order_total >= 1000 then 'high_value'
when o.order_total >= 100 then 'medium_value'
else 'low_value'
end as order_tier
from orders o
left join customers c on o.customer_id = c.customer_idsql
-- models/intermediate/int_orders_enriched.sql
-- 关联staging模型,实现业务逻辑
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('stg_customers') }}
),
products as (
select * from {{ ref('stg_products') }}
)
select
o.order_id,
o.order_date,
o.order_total,
c.customer_id,
c.customer_name,
c.customer_segment,
-- 业务逻辑处理
case
when o.order_total >= 1000 then 'high_value'
when o.order_total >= 100 then 'medium_value'
else 'low_value'
end as order_tier
from orders o
left join customers c on o.customer_id = c.customer_idMart Models
Mart 模型
sql
-- models/marts/finance/fct_revenue.sql
-- Final aggregated fact table
{{ config(
materialized='table',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "month"
}
) }}
with orders as (
select * from {{ ref('int_orders_enriched') }}
)
select
date_trunc('day', order_date) as revenue_date,
customer_segment,
order_tier,
count(*) as order_count,
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value
from orders
group by 1, 2, 3sql
-- models/marts/finance/fct_revenue.sql
-- 最终聚合事实表
{{ config(
materialized='table',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "month"
}
) }}
with orders as (
select * from {{ ref('int_orders_enriched') }}
)
select
date_trunc('day', order_date) as revenue_date,
customer_segment,
order_tier,
count(*) as order_count,
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value
from orders
group by 1, 2, 3Incremental Models
增量模型
sql
-- models/marts/fct_events.sql
{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge' -- or 'delete+insert', 'append'
) }}
select
event_id,
user_id,
event_type,
event_timestamp,
properties
from {{ source('raw', 'events') }}
{% if is_incremental() %}
-- Only new/updated rows since last run
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}sql
-- models/marts/fct_events.sql
{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge' -- 可选值:'delete+insert'、'append'
) }}
select
event_id,
user_id,
event_type,
event_timestamp,
properties
from {{ source('raw', 'events') }}
{% if is_incremental() %}
-- 仅加载上次运行后新增/更新的数据
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}Snapshots (SCD Type 2)
快照(SCD Type 2)
sql
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
select * from {{ source('raw', 'customers') }}
{% endsnapshot %}sql
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
select * from {{ source('raw', 'customers') }}
{% endsnapshot %}Tests
测试
yaml
undefinedyaml
undefinedmodels/marts/_schema.yml
models/marts/_schema.yml
version: 2
models:
-
name: fct_revenue description: Daily revenue aggregations columns:
- name: revenue_date
tests:
- not_null
- name: total_revenue
tests:
- not_null
- dbt_utils.accepted_range: min_value: 0
tests:Model-level tests
- dbt_utils.unique_combination_of_columns: combination_of_columns: - revenue_date - customer_segment - order_tier
- name: revenue_date
tests:
undefinedversion: 2
models:
-
name: fct_revenue description: 每日收入聚合表 columns:
- name: revenue_date
tests:
- not_null
- name: total_revenue
tests:
- not_null
- dbt_utils.accepted_range: min_value: 0
tests:模型级测试
- dbt_utils.unique_combination_of_columns: combination_of_columns: - revenue_date - customer_segment - order_tier
- name: revenue_date
tests:
undefinedCustom Tests
自定义测试
sql
-- tests/assert_positive_revenue.sql
-- Returns rows that fail the test
select
revenue_date,
total_revenue
from {{ ref('fct_revenue') }}
where total_revenue < 0sql
-- tests/assert_positive_revenue.sql
-- 返回测试不通过的行数据
select
revenue_date,
total_revenue
from {{ ref('fct_revenue') }}
where total_revenue < 0Macros
宏(Macros)
sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
round({{ column_name }} / 100.0, 2)
{% endmacro %}
-- Usage in model:
-- select {{ cents_to_dollars('amount_cents') }} as amount_dollarssql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{% if custom_schema_name %}
{{ custom_schema_name }}
{% else %}
{{ target.schema }}
{% endif %}
{% endmacro %}sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
round({{ column_name }} / 100.0, 2)
{% endmacro %}
-- 在模型中使用:
-- select {{ cents_to_dollars('amount_cents') }} as amount_dollarssql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{% if custom_schema_name %}
{{ custom_schema_name }}
{% else %}
{{ target.schema }}
{% endif %}
{% endmacro %}dbt Commands
dbt 常用命令
bash
undefinedbash
undefinedRun all models
运行所有模型
dbt run
dbt run
Run specific model and dependencies
运行指定模型及其依赖
dbt run --select fct_revenue+
dbt run --select fct_revenue+
Run models with tag
运行带有指定标签的模型
dbt run --select tag:finance
dbt run --select tag:finance
Test all
执行所有测试
dbt test
dbt test
Generate docs
生成文档
dbt docs generate
dbt docs serve
dbt docs generate
dbt docs serve
Freshness check
检查源数据新鲜度
dbt source freshness
dbt source freshness
Full refresh of incremental
全量刷新增量模型
dbt run --full-refresh --select fct_events
dbt run --full-refresh --select fct_events
Build (run + test)
构建(运行模型+执行测试)
dbt build
undefineddbt build
undefinedBest Practices
最佳实践
yaml
undefinedyaml
undefined1. Use ref() for model references
1. 使用ref()引用模型
BAD: select * from schema.stg_orders
错误写法: select * from schema.stg_orders
GOOD: select * from {{ ref('stg_orders') }}
正确写法: select * from {{ ref('stg_orders') }}
2. Use source() for raw tables
2. 使用source()引用原始表
BAD: select * from raw_db.orders
错误写法: select * from raw_db.orders
GOOD: select * from {{ source('raw', 'orders') }}
正确写法: select * from {{ source('raw', 'orders') }}
3. Document models
3. 为模型添加文档
models:
- name: fct_revenue description: | Daily revenue by segment. Grain: one row per day/segment/tier. Updated daily by the finance_dag. meta: owner: data-team pii: false
undefinedmodels:
- name: fct_revenue description: | 按客户细分统计的每日收入数据。粒度:每行对应一个日期/客户细分/订单层级。 由finance_dag每日更新。 meta: owner: data-team pii: false
undefinedPackages
依赖包
yaml
undefinedyaml
undefinedpackages.yml
packages.yml
packages:
- package: dbt-labs/dbt_utils version: 1.1.1
- package: dbt-labs/codegen version: 0.12.1
- package: calogica/dbt_expectations version: 0.10.1
```bashpackages:
- package: dbt-labs/dbt_utils version: 1.1.1
- package: dbt-labs/codegen version: 0.12.1
- package: calogica/dbt_expectations version: 0.10.1
```bashInstall packages
安装依赖包
dbt deps
undefineddbt deps
undefined