dbt-coder

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

dbt-Coder

dbt-Coder

Patterns for dbt (data build tool) transform layer development.
用于dbt(data build tool)转换层开发的模式

Project Structure

项目结构

my_dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/          # 1:1 with sources, light transforms
│   │   ├── stg_orders.sql
│   │   └── _staging.yml
│   ├── intermediate/     # Joins, business logic
│   │   └── int_orders_enriched.sql
│   └── marts/            # Final consumption layer
│       ├── finance/
│       │   └── fct_revenue.sql
│       └── marketing/
│           └── dim_customers.sql
├── seeds/                # Static lookup data
├── snapshots/            # SCD Type 2
├── macros/               # Reusable SQL
└── tests/                # Custom tests
my_dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/          # 与源数据一一对应,轻量转换
│   │   ├── stg_orders.sql
│   │   └── _staging.yml
│   ├── intermediate/     # 关联操作、业务逻辑实现
│   │   └── int_orders_enriched.sql
│   └── marts/            # 最终消费层
│       ├── finance/
│       │   └── fct_revenue.sql
│       └── marketing/
│           └── dim_customers.sql
├── seeds/                # 静态 lookup 数据
├── snapshots/            # 缓慢变化维度类型2(SCD Type 2)
├── macros/               # 可复用SQL
└── tests/                # 自定义测试

dbt_project.yml

dbt_project.yml

yaml
name: 'my_project'
version: '1.0.0'
config-version: 2

profile: 'my_project'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

models:
  my_project:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
      +schema: marts
yaml
name: 'my_project'
version: '1.0.0'
config-version: 2

profile: 'my_project'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

models:
  my_project:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
      +schema: marts

Staging Models

Staging 模型

sql
-- models/staging/stg_orders.sql
-- Naming: stg_<source>_<entity>

with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        -- Rename to consistent naming
        id as order_id,
        customer_id,
        order_date,
        total_amount as order_total,

        -- Type casting
        cast(status as varchar(50)) as order_status,

        -- Timestamps
        created_at,
        updated_at
    from source
)

select * from renamed
sql
-- models/staging/stg_orders.sql
-- 命名规则:stg_<数据源>_<实体>

with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        -- 重命名为统一命名规范
        id as order_id,
        customer_id,
        order_date,
        total_amount as order_total,

        -- 类型转换
        cast(status as varchar(50)) as order_status,

        -- 时间戳字段
        created_at,
        updated_at
    from source
)

select * from renamed

Source Definition

源数据定义

yaml
undefined
yaml
undefined

models/staging/_sources.yml

models/staging/_sources.yml

version: 2
sources:
  • name: raw database: raw_db schema: public freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} tables:
    • name: orders identifier: orders_table columns:
      • name: id tests:
        • unique
        • not_null
    • name: customers
undefined
version: 2
sources:
  • name: raw database: raw_db schema: public freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} tables:
    • name: orders identifier: orders_table columns:
      • name: id tests:
        • unique
        • not_null
    • name: customers
undefined

Intermediate Models

Intermediate 模型

sql
-- models/intermediate/int_orders_enriched.sql
-- Join staging models, apply business logic

with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('stg_customers') }}
),

products as (
    select * from {{ ref('stg_products') }}
)

select
    o.order_id,
    o.order_date,
    o.order_total,

    c.customer_id,
    c.customer_name,
    c.customer_segment,

    -- Business logic
    case
        when o.order_total >= 1000 then 'high_value'
        when o.order_total >= 100 then 'medium_value'
        else 'low_value'
    end as order_tier

from orders o
left join customers c on o.customer_id = c.customer_id
sql
-- models/intermediate/int_orders_enriched.sql
-- 关联staging模型,实现业务逻辑

with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('stg_customers') }}
),

products as (
    select * from {{ ref('stg_products') }}
)

select
    o.order_id,
    o.order_date,
    o.order_total,

    c.customer_id,
    c.customer_name,
    c.customer_segment,

    -- 业务逻辑处理
    case
        when o.order_total >= 1000 then 'high_value'
        when o.order_total >= 100 then 'medium_value'
        else 'low_value'
    end as order_tier

from orders o
left join customers c on o.customer_id = c.customer_id

Mart Models

Mart 模型

sql
-- models/marts/finance/fct_revenue.sql
-- Final aggregated fact table

{{ config(
    materialized='table',
    partition_by={
      "field": "order_date",
      "data_type": "date",
      "granularity": "month"
    }
) }}

with orders as (
    select * from {{ ref('int_orders_enriched') }}
)

select
    date_trunc('day', order_date) as revenue_date,
    customer_segment,
    order_tier,
    count(*) as order_count,
    sum(order_total) as total_revenue,
    avg(order_total) as avg_order_value
from orders
group by 1, 2, 3
sql
-- models/marts/finance/fct_revenue.sql
-- 最终聚合事实表

{{ config(
    materialized='table',
    partition_by={
      "field": "order_date",
      "data_type": "date",
      "granularity": "month"
    }
) }}

with orders as (
    select * from {{ ref('int_orders_enriched') }}
)

select
    date_trunc('day', order_date) as revenue_date,
    customer_segment,
    order_tier,
    count(*) as order_count,
    sum(order_total) as total_revenue,
    avg(order_total) as avg_order_value
from orders
group by 1, 2, 3

Incremental Models

增量模型

sql
-- models/marts/fct_events.sql
{{ config(
    materialized='incremental',
    unique_key='event_id',
    incremental_strategy='merge'  -- or 'delete+insert', 'append'
) }}

select
    event_id,
    user_id,
    event_type,
    event_timestamp,
    properties
from {{ source('raw', 'events') }}

{% if is_incremental() %}
    -- Only new/updated rows since last run
    where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
sql
-- models/marts/fct_events.sql
{{ config(
    materialized='incremental',
    unique_key='event_id',
    incremental_strategy='merge'  -- 可选值:'delete+insert'、'append'
) }}

select
    event_id,
    user_id,
    event_type,
    event_timestamp,
    properties
from {{ source('raw', 'events') }}

{% if is_incremental() %}
    -- 仅加载上次运行后新增/更新的数据
    where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}

Snapshots (SCD Type 2)

快照(SCD Type 2)

sql
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}

{{
    config(
      target_schema='snapshots',
      unique_key='customer_id',
      strategy='timestamp',
      updated_at='updated_at',
    )
}}

select * from {{ source('raw', 'customers') }}

{% endsnapshot %}
sql
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}

{{
    config(
      target_schema='snapshots',
      unique_key='customer_id',
      strategy='timestamp',
      updated_at='updated_at',
    )
}}

select * from {{ source('raw', 'customers') }}

{% endsnapshot %}

Tests

测试

yaml
undefined
yaml
undefined

models/marts/_schema.yml

models/marts/_schema.yml

version: 2
models:
  • name: fct_revenue description: Daily revenue aggregations columns:
    • name: revenue_date tests:
      • not_null
    • name: total_revenue tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0
    tests:

    Model-level tests

    • dbt_utils.unique_combination_of_columns: combination_of_columns: - revenue_date - customer_segment - order_tier
undefined
version: 2
models:
  • name: fct_revenue description: 每日收入聚合表 columns:
    • name: revenue_date tests:
      • not_null
    • name: total_revenue tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0
    tests:

    模型级测试

    • dbt_utils.unique_combination_of_columns: combination_of_columns: - revenue_date - customer_segment - order_tier
undefined

Custom Tests

自定义测试

sql
-- tests/assert_positive_revenue.sql
-- Returns rows that fail the test

select
    revenue_date,
    total_revenue
from {{ ref('fct_revenue') }}
where total_revenue < 0
sql
-- tests/assert_positive_revenue.sql
-- 返回测试不通过的行数据

select
    revenue_date,
    total_revenue
from {{ ref('fct_revenue') }}
where total_revenue < 0

Macros

宏(Macros)

sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    round({{ column_name }} / 100.0, 2)
{% endmacro %}

-- Usage in model:
-- select {{ cents_to_dollars('amount_cents') }} as amount_dollars
sql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
    {% if custom_schema_name %}
        {{ custom_schema_name }}
    {% else %}
        {{ target.schema }}
    {% endif %}
{% endmacro %}
sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    round({{ column_name }} / 100.0, 2)
{% endmacro %}

-- 在模型中使用:
-- select {{ cents_to_dollars('amount_cents') }} as amount_dollars
sql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
    {% if custom_schema_name %}
        {{ custom_schema_name }}
    {% else %}
        {{ target.schema }}
    {% endif %}
{% endmacro %}

dbt Commands

dbt 常用命令

bash
undefined
bash
undefined

Run all models

运行所有模型

dbt run
dbt run

Run specific model and dependencies

运行指定模型及其依赖

dbt run --select fct_revenue+
dbt run --select fct_revenue+

Run models with tag

运行带有指定标签的模型

dbt run --select tag:finance
dbt run --select tag:finance

Test all

执行所有测试

dbt test
dbt test

Generate docs

生成文档

dbt docs generate dbt docs serve
dbt docs generate dbt docs serve

Freshness check

检查源数据新鲜度

dbt source freshness
dbt source freshness

Full refresh of incremental

全量刷新增量模型

dbt run --full-refresh --select fct_events
dbt run --full-refresh --select fct_events

Build (run + test)

构建(运行模型+执行测试)

dbt build
undefined
dbt build
undefined

Best Practices

最佳实践

yaml
undefined
yaml
undefined

1. Use ref() for model references

1. 使用ref()引用模型

BAD: select * from schema.stg_orders

错误写法: select * from schema.stg_orders

GOOD: select * from {{ ref('stg_orders') }}

正确写法: select * from {{ ref('stg_orders') }}

2. Use source() for raw tables

2. 使用source()引用原始表

BAD: select * from raw_db.orders

错误写法: select * from raw_db.orders

GOOD: select * from {{ source('raw', 'orders') }}

正确写法: select * from {{ source('raw', 'orders') }}

3. Document models

3. 为模型添加文档

models:
  • name: fct_revenue description: | Daily revenue by segment. Grain: one row per day/segment/tier. Updated daily by the finance_dag. meta: owner: data-team pii: false
undefined
models:
  • name: fct_revenue description: | 按客户细分统计的每日收入数据。粒度:每行对应一个日期/客户细分/订单层级。 由finance_dag每日更新。 meta: owner: data-team pii: false
undefined

Packages

依赖包

yaml
undefined
yaml
undefined

packages.yml

packages.yml

packages:
  • package: dbt-labs/dbt_utils version: 1.1.1
  • package: dbt-labs/codegen version: 0.12.1
  • package: calogica/dbt_expectations version: 0.10.1

```bash
packages:
  • package: dbt-labs/dbt_utils version: 1.1.1
  • package: dbt-labs/codegen version: 0.12.1
  • package: calogica/dbt_expectations version: 0.10.1

```bash

Install packages

安装依赖包

dbt deps
undefined
dbt deps
undefined