dbt-data-transformation

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

dbt Data Transformation

dbt 数据转换

A comprehensive skill for mastering dbt (data build tool) for analytics engineering. This skill covers model development, testing strategies, documentation practices, incremental builds, Jinja templating, macro development, package management, and production deployment workflows.
这是一份帮助分析工程师掌握dbt(data build tool,数据构建工具)的全面指南。本指南涵盖模型开发、测试策略、文档实践、增量构建、Jinja模板、宏开发、包管理以及生产部署工作流。

When to Use This Skill

何时使用本指南

Use this skill when:
  • Building data transformation pipelines for analytics and business intelligence
  • Creating a data warehouse with modular, testable SQL transformations
  • Implementing ELT (Extract, Load, Transform) workflows
  • Developing dimensional models (facts, dimensions) for analytics
  • Managing complex SQL dependencies and data lineage
  • Creating reusable data transformation logic across projects
  • Testing data quality and implementing data contracts
  • Documenting data models and business logic
  • Building incremental models for large datasets
  • Orchestrating dbt with tools like Airflow, Dagster, or dbt Cloud
  • Migrating legacy ETL processes to modern ELT architecture
  • Implementing DataOps practices for analytics teams
在以下场景中使用本指南:
  • 为分析和商业智能构建数据转换管道
  • 创建具备模块化、可测试SQL转换的数据仓库
  • 实现ELT(提取、加载、转换)工作流
  • 为分析开发维度模型(事实表、维度表)
  • 管理复杂的SQL依赖和数据血缘
  • 在项目间创建可复用的数据转换逻辑
  • 测试数据质量并实现数据契约
  • 为数据模型和业务逻辑编写文档
  • 为大型数据集构建增量模型
  • 使用Airflow、Dagster或dbt Cloud编排dbt
  • 将传统ETL流程迁移至现代ELT架构
  • 为分析团队实施DataOps实践

Core Concepts

核心概念

What is dbt?

什么是dbt?

dbt (data build tool) enables analytics engineers to transform data in their warehouse more effectively. It's a development framework that brings software engineering best practices to data transformation:
  • Version Control: SQL transformations as code in Git
  • Testing: Built-in data quality testing framework
  • Documentation: Auto-generated, searchable data dictionary
  • Modularity: Reusable SQL through refs and macros
  • Lineage: Automatic dependency resolution and visualization
  • Deployment: CI/CD for data transformations
dbt(data build tool)让分析工程师能够更高效地在数据仓库中转换数据。它是一个将软件工程最佳实践引入数据转换的开发框架:
  • 版本控制:SQL转换以代码形式存储在Git中
  • 测试:内置的数据质量测试框架
  • 文档:自动生成的可搜索数据字典
  • 模块化:通过引用(refs)和宏实现可复用SQL
  • 血缘关系:自动依赖解析和可视化
  • 部署:数据转换的CI/CD流程

The dbt Workflow

dbt工作流

1. Develop: Write SQL SELECT statements as models
2. Test: Define data quality tests
3. Document: Add descriptions and metadata
4. Build: dbt run compiles and executes models
5. Test: dbt test validates data quality
6. Deploy: CI/CD pipelines deploy to production
1. 开发:将SQL SELECT语句编写为模型
2. 测试:定义数据质量测试
3. 文档:添加描述和元数据
4. 构建:dbt run编译并执行模型
5. 测试:dbt test验证数据质量
6. 部署:CI/CD管道部署至生产环境

Key dbt Entities

dbt关键实体

  1. Models: SQL SELECT statements that define data transformations
  2. Sources: Raw data tables in your warehouse
  3. Seeds: CSV files loaded into your warehouse
  4. Tests: Data quality assertions
  5. Macros: Reusable Jinja-SQL functions
  6. Snapshots: Type 2 slowly changing dimension captures
  7. Exposures: Downstream uses of dbt models (dashboards, ML models)
  8. Metrics: Business metric definitions
  1. 模型:定义数据转换的SQL SELECT语句
  2. 数据源:数据仓库中的原始数据表
  3. 种子数据:加载至数据仓库的CSV文件
  4. 测试:数据质量断言
  5. :可复用的Jinja-SQL函数
  6. 快照:Type 2缓慢变化维度捕获
  7. 暴露对象:dbt模型的下游使用(仪表盘、机器学习模型)
  8. 指标:业务指标定义

Model Development

模型开发

Basic Model Structure

基础模型结构

A dbt model is a SELECT statement saved as a
.sql
file:
sql
-- models/staging/stg_orders.sql

with source as (
    select * from {{ source('jaffle_shop', 'orders') }}
),

renamed as (
    select
        id as order_id,
        user_id as customer_id,
        order_date,
        status,
        _etl_loaded_at
    from source
)

select * from renamed
Key Points:
  • Models are SELECT statements only (no DDL)
  • Use CTEs (Common Table Expressions) for readability
  • Reference sources with
    {{ source() }}
  • dbt handles CREATE/INSERT logic based on materialization
一个dbt模型是保存为
.sql
文件的SELECT语句:
sql
-- models/staging/stg_orders.sql

with source as (
    select * from {{ source('jaffle_shop', 'orders') }}
),

renamed as (
    select
        id as order_id,
        user_id as customer_id,
        order_date,
        status,
        _etl_loaded_at
    from source
)

select * from renamed
关键点:
  • 模型仅包含SELECT语句(无DDL)
  • 使用CTE(公共表表达式)提升可读性
  • 使用
    {{ source() }}
    引用数据源
  • dbt根据物化方式处理CREATE/INSERT逻辑

The ref() Function

ref()函数

Reference other models using
{{ ref() }}
:
sql
-- models/marts/fct_orders.sql

with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('stg_customers') }}
),

joined as (
    select
        orders.order_id,
        orders.order_date,
        customers.customer_name,
        orders.status
    from orders
    left join customers
        on orders.customer_id = customers.customer_id
)

select * from joined
Benefits of ref():
  • Builds dependency graph automatically
  • Resolves to correct schema/database
  • Enables testing in dev without affecting prod
  • Powers lineage visualization
使用
{{ ref() }}
引用其他模型:
sql
-- models/marts/fct_orders.sql

with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('stg_customers') }}
),

joined as (
    select
        orders.order_id,
        orders.order_date,
        customers.customer_name,
        orders.status
    from orders
    left join customers
        on orders.customer_id = customers.customer_id
)

select * from joined
ref()函数的优势:
  • 自动构建依赖图
  • 解析为正确的模式/数据库
  • 支持在开发环境测试而不影响生产
  • 支持血缘关系可视化

The source() Function

source()函数

Define and reference raw data sources:
yaml
undefined
定义并引用原始数据源:
yaml
undefined

models/staging/sources.yml

models/staging/sources.yml

version: 2
sources:
  • name: jaffle_shop description: Raw data from the Jaffle Shop application database: raw schema: jaffle_shop tables:
    • name: orders description: One record per order columns:
      • name: id description: Primary key for orders tests:
        • unique
        • not_null
      • name: user_id description: Foreign key to customers
      • name: order_date description: Date order was placed
      • name: status description: Order status (completed, pending, cancelled)

```sql
-- Reference the source
select * from {{ source('jaffle_shop', 'orders') }}
Source Features:
  • Document raw data tables
  • Test source data quality
  • Track freshness with
    freshness
    config
  • Separate source definitions from transformations
version: 2
sources:
  • name: jaffle_shop description: 来自Jaffle Shop应用的原始数据 database: raw schema: jaffle_shop tables:
    • name: orders description: 每条记录对应一个订单 columns:
      • name: id description: 订单主键 tests:
        • unique
        • not_null
      • name: user_id description: 关联客户的外键
      • name: order_date description: 订单创建日期
      • name: status description: 订单状态(已完成、待处理、已取消)

```sql
-- 引用数据源
select * from {{ source('jaffle_shop', 'orders') }}
数据源特性:
  • 为原始数据表编写文档
  • 测试源数据质量
  • 使用
    freshness
    配置跟踪数据新鲜度
  • 将源定义与转换逻辑分离

Model Organization

模型组织

Recommended project structure:
models/
├── staging/           # One-to-one with source tables
│   ├── jaffle_shop/
│   │   ├── _jaffle_shop__sources.yml
│   │   ├── _jaffle_shop__models.yml
│   │   ├── stg_jaffle_shop__orders.sql
│   │   └── stg_jaffle_shop__customers.sql
│   └── stripe/
│       ├── _stripe__sources.yml
│       ├── _stripe__models.yml
│       └── stg_stripe__payments.sql
├── intermediate/      # Purpose-built transformations
│   └── int_orders_joined.sql
└── marts/            # Business-defined entities
    ├── core/
    │   ├── _core__models.yml
    │   ├── dim_customers.sql
    │   └── fct_orders.sql
    └── marketing/
        └── fct_customer_sessions.sql
Naming Conventions:
  • stg_
    : Staging models (one-to-one with sources)
  • int_
    : Intermediate models (not exposed to end users)
  • fct_
    : Fact tables
  • dim_
    : Dimension tables
推荐的项目结构:
models/
├── staging/           # 与源表一一对应
│   ├── jaffle_shop/
│   │   ├── _jaffle_shop__sources.yml
│   │   ├── _jaffle_shop__models.yml
│   │   ├── stg_jaffle_shop__orders.sql
│   │   └── stg_jaffle_shop__customers.sql
│   └── stripe/
│       ├── _stripe__sources.yml
│       ├── _stripe__models.yml
│       └── stg_stripe__payments.sql
├── intermediate/      # 特定用途的转换
│   └── int_orders_joined.sql
└── marts/            # 业务定义的实体
    ├── core/
    │   ├── _core__models.yml
    │   ├── dim_customers.sql
    │   └── fct_orders.sql
    └── marketing/
        └── fct_customer_sessions.sql
命名规范:
  • stg_
    : staging模型(与源表一一对应)
  • int_
    : 中间模型(不暴露给终端用户)
  • fct_
    : 事实表
  • dim_
    : 维度表

Materializations

物化方式

Materializations determine how dbt builds models in your warehouse:
物化方式决定dbt如何在数据仓库中构建模型:

1. View (Default)

1. 视图(默认)

sql
{{ config(materialized='view') }}

select * from {{ ref('base_model') }}
Characteristics:
  • Lightweight, no data stored
  • Query runs each time view is accessed
  • Best for: Small datasets, models queried infrequently
  • Fast to build, slower to query
sql
{{ config(materialized='view') }}

select * from {{ ref('base_model') }}
特性:
  • 轻量级,不存储数据
  • 每次访问视图时执行查询
  • 最佳场景:小型数据集、不常查询的模型
  • 构建速度快,查询速度慢

2. Table

2. 表

sql
{{ config(materialized='table') }}

select * from {{ ref('base_model') }}
Characteristics:
  • Full table rebuild on each run
  • Data physically stored
  • Best for: Small to medium datasets, heavily queried models
  • Slower to build, faster to query
sql
{{ config(materialized='table') }}

select * from {{ ref('base_model') }}
特性:
  • 每次运行时完全重建表
  • 数据物理存储
  • 最佳场景:中小型数据集、频繁查询的模型
  • 构建速度慢,查询速度快

3. Incremental

3. 增量

sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='fail'
) }}

select * from {{ source('jaffle_shop', 'orders') }}

{% if is_incremental() %}
    -- Only process new/updated records
    where order_date > (select max(order_date) from {{ this }})
{% endif %}
Characteristics:
  • Only processes new data on subsequent runs
  • First run builds full table
  • Best for: Large datasets, event/time-series data
  • Fast incremental builds, maintains historical data
Incremental Strategies:
sql
-- Append (default): Add new rows only
{{ config(
    materialized='incremental',
    incremental_strategy='append'
) }}

-- Merge: Upsert based on unique_key
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

-- Delete+Insert: Delete matching records, insert new
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='delete+insert'
) }}
sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='fail'
) }}

select * from {{ source('jaffle_shop', 'orders') }}

{% if is_incremental() %}
    -- 仅处理新增/更新的记录
    where order_date > (select max(order_date) from {{ this }})
{% endif %}
特性:
  • 后续运行仅处理新数据
  • 首次运行构建完整表
  • 最佳场景:大型数据集、时间序列数据
  • 增量构建速度快,保留历史数据
增量策略:
sql
-- 追加(默认):仅添加新行
{{ config(
    materialized='incremental',
    incremental_strategy='append'
) }}

-- 合并:基于unique_key进行更新插入
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

-- 删除+插入:删除匹配记录,插入新记录
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='delete+insert'
) }}

4. Ephemeral

4. 临时

sql
{{ config(materialized='ephemeral') }}

select * from {{ ref('base_model') }}
Characteristics:
  • Not built in warehouse
  • Interpolated as CTE in dependent models
  • Best for: Lightweight transformations, avoiding view proliferation
  • No storage, compiled into downstream models
sql
{{ config(materialized='ephemeral') }}

select * from {{ ref('base_model') }}
特性:
  • 不在数据仓库中构建
  • 作为CTE插入到依赖模型中
  • 最佳场景:轻量级转换、避免视图泛滥
  • 无存储,编译到下游模型中

Configuration Comparison

配置对比

MaterializationBuild SpeedQuery SpeedStorageUse Case
ViewFastSlowNoneSmall datasets, infrequent queries
TableSlowFastHighMedium datasets, frequent queries
IncrementalFast*FastHighLarge datasets, time-series data
EphemeralN/AVariesNoneIntermediate logic, CTEs
*After initial full build
物化方式构建速度查询速度存储使用场景
视图小型数据集、不常查询
中型数据集、频繁查询
增量快*大型数据集、时间序列数据
临时N/A多变中间逻辑、CTE
*首次完整构建后

Testing

测试

Schema Tests

模式测试

Built-in generic tests defined in YAML:
yaml
undefined
在YAML中定义内置通用测试:
yaml
undefined

models/staging/stg_orders.yml

models/staging/stg_orders.yml

version: 2
models:
  • name: stg_orders description: Staged order data columns:
    • name: order_id description: Primary key tests:
      • unique
      • not_null
    • name: customer_id description: Foreign key to customers tests:
      • not_null
      • relationships: to: ref('stg_customers') field: customer_id
    • name: status description: Order status tests:
      • accepted_values: values: ['placed', 'shipped', 'completed', 'returned', 'cancelled']
    • name: order_total description: Total order amount tests:
      • not_null
      • dbt_utils.expression_is_true: expression: ">= 0"

**Built-in Tests:**
- `unique`: No duplicate values
- `not_null`: No null values
- `accepted_values`: Value in specified list
- `relationships`: Foreign key validation
version: 2
models:
  • name: stg_orders description: 经过预处理的订单数据 columns:
    • name: order_id description: 主键 tests:
      • unique
      • not_null
    • name: customer_id description: 关联客户的外键 tests:
      • not_null
      • relationships: to: ref('stg_customers') field: customer_id
    • name: status description: 订单状态 tests:
      • accepted_values: values: ['placed', 'shipped', 'completed', 'returned', 'cancelled']
    • name: order_total description: 订单总金额 tests:
      • not_null
      • dbt_utils.expression_is_true: expression: ">= 0"

**内置测试:**
- `unique`: 无重复值
- `not_null`: 无空值
- `accepted_values`: 值在指定列表中
- `relationships`: 外键验证

Custom Data Tests

自定义数据测试

Create custom tests in
tests/
directory:
sql
-- tests/assert_positive_order_totals.sql

select
    order_id,
    order_total
from {{ ref('fct_orders') }}
where order_total < 0
How it works:
  • Test fails if query returns any rows
  • Query should return failing records
  • Can use any SQL logic
tests/
目录中创建自定义测试:
sql
-- tests/assert_positive_order_totals.sql

select
    order_id,
    order_total
from {{ ref('fct_orders') }}
where order_total < 0
工作原理:
  • 如果查询返回任何行,测试失败
  • 查询应返回不符合要求的记录
  • 可使用任意SQL逻辑

Advanced Testing Patterns

高级测试模式

sql
-- Test for data freshness
-- tests/assert_orders_are_fresh.sql

with latest_order as (
    select max(order_date) as max_date
    from {{ ref('fct_orders') }}
)

select max_date
from latest_order
where max_date < current_date - interval '1 day'
sql
-- Test for referential integrity across time
-- tests/assert_no_orphaned_orders.sql

select
    o.order_id,
    o.customer_id
from {{ ref('fct_orders') }} o
left join {{ ref('dim_customers') }} c
    on o.customer_id = c.customer_id
where c.customer_id is null
sql
-- 测试数据新鲜度
-- tests/assert_orders_are_fresh.sql

with latest_order as (
    select max(order_date) as max_date
    from {{ ref('fct_orders') }}
)

select max_date
from latest_order
where max_date < current_date - interval '1 day'
sql
-- 跨时间测试引用完整性
-- tests/assert_no_orphaned_orders.sql

select
    o.order_id,
    o.customer_id
from {{ ref('fct_orders') }} o
left join {{ ref('dim_customers') }} c
    on o.customer_id = c.customer_id
where c.customer_id is null

Testing with dbt_utils

使用dbt_utils测试

yaml
undefined
yaml
undefined

Requires dbt-utils package

需要dbt-utils包

models:
  • name: stg_orders columns:
    • name: order_id tests:

      Test for uniqueness across multiple columns

      • dbt_utils.unique_combination_of_columns: combination_of_columns: - order_id - order_date

      Test for sequential values

      • dbt_utils.sequential_values: interval: 1

      Test that values match regex

      • dbt_utils.not_null_proportion: at_least: 0.95
undefined
models:
  • name: stg_orders columns:
    • name: order_id tests:

      测试多列组合唯一性

      • dbt_utils.unique_combination_of_columns: combination_of_columns: - order_id - order_date

      测试值的连续性

      • dbt_utils.sequential_values: interval: 1

      测试值匹配正则表达式

      • dbt_utils.not_null_proportion: at_least: 0.95
undefined

Test Severity Levels

测试严重级别

yaml
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique:
              severity: error  # Fail build (default)
          - not_null:
              severity: warn   # Warning only
yaml
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique:
              severity: error  # 构建失败(默认)
          - not_null:
              severity: warn   # 仅警告

Documentation

文档

Model Documentation

模型文档

yaml
undefined
yaml
undefined

models/marts/core/_core__models.yml

models/marts/core/_core__models.yml

version: 2
models:
  • name: fct_orders description: | Order fact table containing one row per order with associated customer and payment information. This is the primary table for order analytics and reporting.
    Grain: One row per order
    Refresh: Incremental, updates daily at 2 AM UTC
    Notes:
    • Includes cancelled orders (filter with status column)
    • Payment info joined from Stripe data
    • Customer info joined from application database
    columns:
    • name: order_id description: Primary key for orders table tests:
      • unique
      • not_null
    • name: customer_id description: | Foreign key to dim_customers. Links to customer who placed the order. Note: May be null for guest checkout orders.
    • name: order_date description: Date order was placed (UTC timezone)
    • name: status description: | Current order status. Possible values:
      • placed
        : Order received, not yet processed
      • shipped
        : Order shipped to customer
      • completed
        : Order delivered and confirmed
      • returned
        : Order returned by customer
      • cancelled
        : Order cancelled before shipment
    • name: order_total description: Total order amount in USD including tax and shipping
undefined
version: 2
models:
  • name: fct_orders description: | 订单事实表,每条记录对应一个订单,包含关联的客户和支付信息。这是订单分析和报告的主表。
    粒度:每条记录对应一个订单
    刷新频率:增量更新,每天UTC时间2点更新
    说明
    • 包含已取消的订单(通过status列过滤)
    • 支付信息来自Stripe数据
    • 客户信息来自应用数据库
    columns:
    • name: order_id description: 订单表主键 tests:
      • unique
      • not_null
    • name: customer_id description: | 关联dim_customers的外键,指向创建订单的客户。 注意:访客结账的订单该字段可能为null。
    • name: order_date description: 订单创建日期(UTC时区)
    • name: status description: | 当前订单状态。可能的值:
      • placed
        : 订单已接收,尚未处理
      • shipped
        : 订单已发货
      • completed
        : 订单已送达并确认
      • returned
        : 客户已发起退货
      • cancelled
        : 订单已取消
    • name: order_total description: 订单总金额(包含税费和运费)
undefined

Documentation Blocks

文档块

Create reusable documentation:
markdown
<!-- models/docs.md -->

{% docs order_status %}

Order status indicates the current state of an order in our fulfillment pipeline.

| Status | Description | Next Steps |
|--------|-------------|------------|
| placed | Order received | Inventory check |
| shipped | En route to customer | Track shipment |
| completed | Delivered successfully | Request feedback |
| returned | Customer return initiated | Process refund |
| cancelled | Order cancelled | Update inventory |

{% enddocs %}

{% docs customer_id %}

Unique identifier for customers. This ID is:
- Generated by the application on account creation
- Persistent across orders
- Used to track customer lifetime value
- **Note:** NULL for guest checkouts

{% enddocs %}
Reference documentation blocks:
yaml
models:
  - name: fct_orders
    columns:
      - name: status
        description: "{{ doc('order_status') }}"

      - name: customer_id
        description: "{{ doc('customer_id') }}"
创建可复用的文档:
markdown
<!-- models/docs.md -->

{% docs order_status %}

订单状态表示订单在履约流程中的当前状态。

| 状态 | 描述 | 下一步操作 |
|--------|-------------|------------|
| placed | 订单已接收 | 库存检查 |
| shipped | 正在配送 | 跟踪物流 |
| completed | 已成功送达 | 请求反馈 |
| returned | 客户发起退货 | 处理退款 |
| cancelled | 订单已取消 | 更新库存 |

{% enddocs %}

{% docs customer_id %}

客户的唯一标识符。该ID:
- 在客户创建账户时由应用生成
- 在所有订单中保持一致
- 用于跟踪客户生命周期价值
- **注意**:访客结账的订单该字段为NULL

{% enddocs %}
引用文档块:
yaml
models:
  - name: fct_orders
    columns:
      - name: status
        description: "{{ doc('order_status') }}"

      - name: customer_id
        description: "{{ doc('customer_id') }}"

Generating Documentation

生成文档

bash
undefined
bash
undefined

Generate documentation site

生成文档站点

dbt docs generate
dbt docs generate

Serve documentation locally

本地启动文档服务

dbt docs serve --port 8001
dbt docs serve --port 8001

View in browser at http://localhost:8001

在浏览器中访问 http://localhost:8001


**Documentation Features:**
- Interactive lineage graph (DAG visualization)
- Searchable model catalog
- Column-level documentation
- Source freshness tracking
- Test coverage visibility
- Compiled SQL preview

**文档特性:**
- 交互式血缘图(DAG可视化)
- 可搜索的模型目录
- 列级文档
- 源数据新鲜度跟踪
- 测试覆盖率可见性
- 编译后的SQL预览

Documentation Best Practices

文档最佳实践

  1. Document at all levels: Project, models, columns, sources
  2. Explain business logic: Why transformations exist
  3. Define grain explicitly: One row represents...
  4. Note refresh schedules: How often data updates
  5. Document assumptions: Edge cases, known issues
  6. Link to external resources: Confluence, wiki, dashboards
  1. 全层级文档:项目、模型、列、数据源都需要文档
  2. 解释业务逻辑:说明转换的原因
  3. 明确定义粒度:每条记录代表什么
  4. 说明刷新计划:数据更新频率
  5. 记录假设:边缘情况、已知问题
  6. 链接外部资源:Confluence、Wiki、仪表盘

Incremental Models

增量模型

Basic Incremental Pattern

基础增量模式

sql
{{ config(
    materialized='incremental',
    unique_key='event_id'
) }}

with source as (
    select
        event_id,
        user_id,
        event_timestamp,
        event_type,
        event_properties
    from {{ source('analytics', 'events') }}

    {% if is_incremental() %}
        -- Only process events newer than existing data
        where event_timestamp > (select max(event_timestamp) from {{ this }})
    {% endif %}
)

select * from source
Key Components:
  • is_incremental()
    : True after first run
  • {{ this }}
    : References current model's table
  • unique_key
    : Column(s) for deduplication
sql
{{ config(
    materialized='incremental',
    unique_key='event_id'
) }}

with source as (
    select
        event_id,
        user_id,
        event_timestamp,
        event_type,
        event_properties
    from {{ source('analytics', 'events') }}

    {% if is_incremental() %}
        -- 仅处理比现有数据新的事件
        where event_timestamp > (select max(event_timestamp) from {{ this }})
    {% endif %}
)

select * from source
核心组件:
  • is_incremental()
    :首次运行后为True
  • {{ this }}
    :引用当前模型的表
  • unique_key
    :用于去重的列

Incremental with Merge Strategy

合并策略的增量模型

sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',
    merge_update_columns=['status', 'updated_at'],
    merge_exclude_columns=['created_at']
) }}

with orders as (
    select
        order_id,
        customer_id,
        order_date,
        status,
        order_total,
        current_timestamp() as updated_at,
        case
            when status = 'placed' then current_timestamp()
            else null
        end as created_at
    from {{ source('ecommerce', 'orders') }}

    {% if is_incremental() %}
        -- Look back 3 days to catch late-arriving updates
        where order_date >= (select max(order_date) - interval '3 days' from {{ this }})
    {% endif %}
)

select * from orders
Merge Strategy Features:
  • Updates existing records based on
    unique_key
  • Inserts new records
  • Optional: Specify which columns to update/exclude
  • Best for: Slowly changing data, updates to historical records
sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',
    merge_update_columns=['status', 'updated_at'],
    merge_exclude_columns=['created_at']
) }}

with orders as (
    select
        order_id,
        customer_id,
        order_date,
        status,
        order_total,
        current_timestamp() as updated_at,
        case
            when status = 'placed' then current_timestamp()
            else null
        end as created_at
    from {{ source('ecommerce', 'orders') }}

    {% if is_incremental() %}
        -- 回溯3天以捕获延迟到达的更新
        where order_date >= (select max(order_date) - interval '3 days' from {{ this }})
    {% endif %}
)

select * from orders
合并策略特性:
  • 基于
    unique_key
    更新现有记录
  • 插入新记录
  • 可选:指定要更新/排除的列
  • 最佳场景:缓慢变化的数据、历史记录更新

Incremental with Delete+Insert

删除+插入策略的增量模型

sql
{{ config(
    materialized='incremental',
    unique_key=['date', 'customer_id'],
    incremental_strategy='delete+insert'
) }}

with daily_metrics as (
    select
        date_trunc('day', order_timestamp) as date,
        customer_id,
        count(*) as order_count,
        sum(order_total) as total_revenue
    from {{ ref('fct_orders') }}

    {% if is_incremental() %}
        where date_trunc('day', order_timestamp) >= (
            select max(date) - interval '7 days' from {{ this }}
        )
    {% endif %}

    group by 1, 2
)

select * from daily_metrics
Delete+Insert Strategy:
  • Deletes all rows matching
    unique_key
  • Inserts new rows
  • Best for: Aggregated data, full partition replacement
  • More efficient than merge for bulk updates
sql
{{ config(
    materialized='incremental',
    unique_key=['date', 'customer_id'],
    incremental_strategy='delete+insert'
) }}

with daily_metrics as (
    select
        date_trunc('day', order_timestamp) as date,
        customer_id,
        count(*) as order_count,
        sum(order_total) as total_revenue
    from {{ ref('fct_orders') }}

    {% if is_incremental() %}
        where date_trunc('day', order_timestamp) >= (
            select max(date) - interval '7 days' from {{ this }}
        )
    {% endif %}

    group by 1, 2
)

select * from daily_metrics
删除+插入策略:
  • 删除所有匹配
    unique_key
    的行
  • 插入新行
  • 最佳场景:聚合数据、完整分区替换
  • 批量更新比合并更高效

Handling Late-Arriving Data

处理延迟到达的数据

sql
{{ config(
    materialized='incremental',
    unique_key='order_id'
) }}

select
    order_id,
    customer_id,
    order_date,
    status,
    _loaded_at
from {{ source('ecommerce', 'orders') }}

{% if is_incremental() %}
    -- Use _loaded_at instead of order_date to catch updates
    where _loaded_at > (select max(_loaded_at) from {{ this }})

    -- OR use a lookback window
    -- where order_date > (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}
sql
{{ config(
    materialized='incremental',
    unique_key='order_id'
) }}

select
    order_id,
    customer_id,
    order_date,
    status,
    _loaded_at
from {{ source('ecommerce', 'orders') }}

{% if is_incremental() %}
    -- 使用_loaded_at而非order_date来捕获更新
    where _loaded_at > (select max(_loaded_at) from {{ this }})

    -- 或者使用回溯窗口
    -- where order_date > (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}

Incremental with Partitioning

带分区的增量模型

sql
{{ config(
    materialized='incremental',
    unique_key='event_id',
    partition_by={
        'field': 'event_date',
        'data_type': 'date',
        'granularity': 'day'
    },
    cluster_by=['user_id', 'event_type']
) }}

select
    event_id,
    user_id,
    event_type,
    event_timestamp,
    date(event_timestamp) as event_date
from {{ source('analytics', 'raw_events') }}

{% if is_incremental() %}
    where date(event_timestamp) > (select max(event_date) from {{ this }})
{% endif %}
Partition Benefits:
  • Improved query performance
  • Cost optimization (scan less data)
  • Efficient incremental processing
  • Better for time-series data
sql
{{ config(
    materialized='incremental',
    unique_key='event_id',
    partition_by={
        'field': 'event_date',
        'data_type': 'date',
        'granularity': 'day'
    },
    cluster_by=['user_id', 'event_type']
) }}

select
    event_id,
    user_id,
    event_type,
    event_timestamp,
    date(event_timestamp) as event_date
from {{ source('analytics', 'raw_events') }}

{% if is_incremental() %}
    where date(event_timestamp) > (select max(event_date) from {{ this }})
{% endif %}
分区优势:
  • 提升查询性能
  • 成本优化(扫描更少数据)
  • 高效的增量处理
  • 更适合时间序列数据

Full Refresh Capability

全量刷新能力

bash
undefined
bash
undefined

Force full rebuild of incremental models

强制全量重建增量模型

dbt run --full-refresh
dbt run --full-refresh

Full refresh specific model

全量刷新特定模型

dbt run --select my_incremental_model --full-refresh
undefined
dbt run --select my_incremental_model --full-refresh
undefined

Macros & Jinja

宏与Jinja

Basic Macro Structure

基础宏结构

sql
-- macros/cents_to_dollars.sql

{% macro cents_to_dollars(column_name, precision=2) %}
    round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
Usage:
sql
select
    order_id,
    {{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ ref('stg_orders') }}
sql
-- macros/cents_to_dollars.sql

{% macro cents_to_dollars(column_name, precision=2) %}
    round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
使用方式:
sql
select
    order_id,
    {{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ ref('stg_orders') }}

Reusable Data Quality Macros

可复用的数据质量宏

sql
-- macros/test_not_negative.sql

{% macro test_not_negative(model, column_name) %}

select
    {{ column_name }}
from {{ model }}
where {{ column_name }} < 0

{% endmacro %}
sql
-- macros/test_not_negative.sql

{% macro test_not_negative(model, column_name) %}

select
    {{ column_name }}
from {{ model }}
where {{ column_name }} < 0

{% endmacro %}

Date Spine Macro

日期序列宏

sql
-- macros/date_spine.sql

{% macro date_spine(start_date, end_date) %}

with date_spine as (
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('" ~ start_date ~ "' as date)",
        end_date="cast('" ~ end_date ~ "' as date)"
    ) }}
)

select
    date_day
from date_spine

{% endmacro %}
sql
-- macros/date_spine.sql

{% macro date_spine(start_date, end_date) %}

with date_spine as (
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('" ~ start_date ~ "' as date)",
        end_date="cast('" ~ end_date ~ "' as date)"
    ) }}
)

select
    date_day
from date_spine

{% endmacro %}

Dynamic SQL Generation

动态SQL生成

sql
-- macros/pivot_metric.sql

{% macro pivot_metric(metric_column, group_by_column, values) %}

select
    {{ group_by_column }},
    {% for value in values %}
        sum(case when status = '{{ value }}' then {{ metric_column }} else 0 end)
            as {{ value }}_{{ metric_column }}
        {% if not loop.last %},{% endif %}
    {% endfor %}
from {{ ref('fct_orders') }}
group by 1

{% endmacro %}
Usage:
sql
{{ pivot_metric(
    metric_column='order_total',
    group_by_column='customer_id',
    values=['completed', 'pending', 'cancelled']
) }}
sql
-- macros/pivot_metric.sql

{% macro pivot_metric(metric_column, group_by_column, values) %}

select
    {{ group_by_column }},
    {% for value in values %}
        sum(case when status = '{{ value }}' then {{ metric_column }} else 0 end)
            as {{ value }}_{{ metric_column }}
        {% if not loop.last %},{% endif %}
    {% endfor %}
from {{ ref('fct_orders') }}
group by 1

{% endmacro %}
使用方式:
sql
{{ pivot_metric(
    metric_column='order_total',
    group_by_column='customer_id',
    values=['completed', 'pending', 'cancelled']
) }}

Grant Permissions Macro

权限授予宏

sql
-- macros/grant_select.sql

{% macro grant_select(schema, role) %}

{% set sql %}
    grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}

{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}

{% endmacro %}
Usage in hooks:
yaml
undefined
sql
-- macros/grant_select.sql

{% macro grant_select(schema, role) %}

{% set sql %}
    grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}

{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}

{% endmacro %}
在钩子中使用:
yaml
undefined

dbt_project.yml

dbt_project.yml

on-run-end:
  • "{{ grant_select(target.schema, 'analyst_role') }}"
undefined
on-run-end:
  • "{{ grant_select(target.schema, 'analyst_role') }}"
undefined

Environment-Specific Logic

环境特定逻辑

sql
-- macros/generate_schema_name.sql

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}

    {%- if target.name == 'prod' -%}
        {%- if custom_schema_name is not none -%}
            {{ custom_schema_name | trim }}
        {%- else -%}
            {{ default_schema }}
        {%- endif -%}

    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name | trim }}
    {%- endif -%}

{%- endmacro %}
sql
-- macros/generate_schema_name.sql

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}

    {%- if target.name == 'prod' -%}
        {%- if custom_schema_name is not none -%}
            {{ custom_schema_name | trim }}
        {%- else -%}
            {{ default_schema }}
        {%- endif -%}

    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name | trim }}
    {%- endif -%}

{%- endmacro %}

Audit Column Macro

审计列宏

sql
-- macros/add_audit_columns.sql

{% macro add_audit_columns() %}

    current_timestamp() as dbt_updated_at,
    current_timestamp() as dbt_created_at,
    '{{ var("dbt_user") }}' as dbt_updated_by

{% endmacro %}
Usage:
sql
select
    order_id,
    customer_id,
    order_total,
    {{ add_audit_columns() }}
from {{ ref('stg_orders') }}
sql
-- macros/add_audit_columns.sql

{% macro add_audit_columns() %}

    current_timestamp() as dbt_updated_at,
    current_timestamp() as dbt_created_at,
    '{{ var("dbt_user") }}' as dbt_updated_by

{% endmacro %}
使用方式:
sql
select
    order_id,
    customer_id,
    order_total,
    {{ add_audit_columns() }}
from {{ ref('stg_orders') }}

Jinja Control Structures

Jinja控制结构

sql
-- Conditionals
{% if target.name == 'prod' %}
    from {{ source('production', 'orders') }}
{% else %}
    from {{ source('development', 'orders') }}
{% endif %}

-- Loops
{% for status in ['placed', 'shipped', 'completed'] %}
    sum(case when status = '{{ status }}' then 1 else 0 end) as {{ status }}_count
    {% if not loop.last %},{% endif %}
{% endfor %}

-- Set variables
{% set payment_methods = ['credit_card', 'paypal', 'bank_transfer'] %}

{% for method in payment_methods %}
    count(distinct case when payment_method = '{{ method }}'
        then customer_id end) as {{ method }}_customers
    {% if not loop.last %},{% endif %}
{% endfor %}
sql
-- 条件判断
{% if target.name == 'prod' %}
    from {{ source('production', 'orders') }}
{% else %}
    from {{ source('development', 'orders') }}
{% endif %}

-- 循环
{% for status in ['placed', 'shipped', 'completed'] %}
    sum(case when status = '{{ status }}' then 1 else 0 end) as {{ status }}_count
    {% if not loop.last %},{% endif %}
{% endfor %}

-- 设置变量
{% set payment_methods = ['credit_card', 'paypal', 'bank_transfer'] %}

{% for method in payment_methods %}
    count(distinct case when payment_method = '{{ method }}'
        then customer_id end) as {{ method }}_customers
    {% if not loop.last %},{% endif %}
{% endfor %}

Package Management

包管理

Installing Packages

安装包

yaml
undefined
yaml
undefined

packages.yml

packages.yml

packages:

dbt-utils: Essential utility macros

  • package: dbt-labs/dbt_utils version: 1.1.1

Audit helper: Compare datasets

  • package: dbt-labs/audit_helper version: 0.9.0

Codegen: Code generation utilities

  • package: dbt-labs/codegen version: 0.11.0

Custom package from Git

Local package

  • local: ../dbt-shared-macros

Install packages:

```bash
dbt deps
packages:

dbt-utils:核心工具宏

  • package: dbt-labs/dbt_utils version: 1.1.1

Audit helper:数据集对比

  • package: dbt-labs/audit_helper version: 0.9.0

Codegen:代码生成工具

  • package: dbt-labs/codegen version: 0.11.0

来自Git的自定义包

本地包

  • local: ../dbt-shared-macros

安装包:

```bash
dbt deps

Using dbt_utils

使用dbt_utils

sql
-- Surrogate key generation
select
    {{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} as order_line_id,
    order_id,
    line_item_id
from {{ ref('stg_order_lines') }}

-- Union multiple tables
{{ dbt_utils.union_relations(
    relations=[
        ref('orders_2022'),
        ref('orders_2023'),
        ref('orders_2024')
    ]
) }}

-- Get column values as list
{% set statuses = dbt_utils.get_column_values(
    table=ref('stg_orders'),
    column='status'
) %}

-- Pivot table
{{ dbt_utils.pivot(
    column='metric_name',
    values=dbt_utils.get_column_values(table=ref('metrics'), column='metric_name'),
    agg='sum',
    then_value='metric_value',
    else_value=0,
    prefix='',
    suffix='_total'
) }}
sql
-- 生成代理键
select
    {{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} as order_line_id,
    order_id,
    line_item_id
from {{ ref('stg_order_lines') }}

-- 合并多个表
{{ dbt_utils.union_relations(
    relations=[
        ref('orders_2022'),
        ref('orders_2023'),
        ref('orders_2024')
    ]
) }}

-- 获取列值列表
{% set statuses = dbt_utils.get_column_values(
    table=ref('stg_orders'),
    column='status'
) %}

-- 透视表
{{ dbt_utils.pivot(
    column='metric_name',
    values=dbt_utils.get_column_values(table=ref('metrics'), column='metric_name'),
    agg='sum',
    then_value='metric_value',
    else_value=0,
    prefix='',
    suffix='_total'
) }}

Creating Custom Packages

创建自定义包

Project structure for a package:
dbt-custom-package/
├── dbt_project.yml
├── macros/
│   ├── custom_test.sql
│   └── custom_macro.sql
├── models/
│   └── example_model.sql
└── README.md
yaml
undefined
自定义包的项目结构:
dbt-custom-package/
├── dbt_project.yml
├── macros/
│   ├── custom_test.sql
│   └── custom_macro.sql
├── models/
│   └── example_model.sql
└── README.md
yaml
undefined

dbt_project.yml for custom package

自定义包的dbt_project.yml

name: 'custom_package' version: '1.0.0' config-version: 2
require-dbt-version: [">=1.0.0", "<2.0.0"]
undefined
name: 'custom_package' version: '1.0.0' config-version: 2
require-dbt-version: [">=1.0.0", "<2.0.0"]
undefined

Package Versioning

包版本控制

yaml
undefined
yaml
undefined

Semantic versioning

语义版本控制

packages:
  • package: dbt-labs/dbt_utils version: [">=1.0.0", "<2.0.0"] # Any 1.x version

Exact version

  • package: dbt-labs/dbt_utils version: 1.1.1

Git branch/tag

Latest from branch

undefined
packages:
  • package: dbt-labs/dbt_utils version: [">=1.0.0", "<2.0.0"] # 任意1.x版本

精确版本

  • package: dbt-labs/dbt_utils version: 1.1.1

Git分支/标签

分支最新版本

undefined

Production Workflows

生产工作流

CI/CD Pipeline (GitHub Actions)

CI/CD管道(GitHub Actions)

yaml
undefined
yaml
undefined

.github/workflows/dbt_ci.yml

.github/workflows/dbt_ci.yml

name: dbt CI
on: pull_request: branches: [main]
jobs: dbt_run: runs-on: ubuntu-latest
steps:
  - name: Checkout code
    uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dbt
    run: |
      pip install dbt-core dbt-snowflake

  - name: Install dbt packages
    run: dbt deps

  - name: Run dbt debug
    run: dbt debug
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}

  - name: Run dbt models (modified only)
    run: dbt run --select state:modified+ --state ./prod_manifest
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}

  - name: Run dbt tests
    run: dbt test --select state:modified+
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
undefined
name: dbt CI
on: pull_request: branches: [main]
jobs: dbt_run: runs-on: ubuntu-latest
steps:
  - name: 拉取代码
    uses: actions/checkout@v3

  - name: 设置Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: 安装dbt
    run: |
      pip install dbt-core dbt-snowflake

  - name: 安装dbt包
    run: dbt deps

  - name: 运行dbt debug
    run: dbt debug
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}

  - name: 运行dbt模型(仅修改的模型)
    run: dbt run --select state:modified+ --state ./prod_manifest
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}

  - name: 运行dbt测试
    run: dbt test --select state:modified+
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
undefined

Slim CI (Test Changed Models Only)

轻量CI(仅测试修改的模型)

bash
undefined
bash
undefined

Store production manifest

存储生产清单

dbt compile --target prod cp target/manifest.json ./prod_manifest/
dbt compile --target prod cp target/manifest.json ./prod_manifest/

In CI: Test only changed models and downstream dependencies

在CI中:仅测试修改的模型和下游依赖

dbt test --select state:modified+ --state ./prod_manifest
undefined
dbt test --select state:modified+ --state ./prod_manifest
undefined

Production Deployment

生产部署

yaml
undefined
yaml
undefined

.github/workflows/dbt_prod.yml

.github/workflows/dbt_prod.yml

name: dbt Production Deploy
on: push: branches: [main]
jobs: deploy: runs-on: ubuntu-latest
steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dbt
    run: pip install dbt-core dbt-snowflake

  - name: Install packages
    run: dbt deps

  - name: Run dbt seed
    run: dbt seed --target prod

  - name: Run dbt run
    run: dbt run --target prod

  - name: Run dbt test
    run: dbt test --target prod

  - name: Generate docs
    run: dbt docs generate --target prod

  - name: Upload docs to S3
    run: |
      aws s3 sync target/ s3://dbt-docs-bucket/
    env:
      AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
      AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
undefined
name: dbt生产部署
on: push: branches: [main]
jobs: deploy: runs-on: ubuntu-latest
steps:
  - uses: actions/checkout@v3

  - name: 设置Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: 安装dbt
    run: pip install dbt-core dbt-snowflake

  - name: 安装包
    run: dbt deps

  - name: 运行dbt seed
    run: dbt seed --target prod

  - name: 运行dbt模型
    run: dbt run --target prod

  - name: 运行dbt测试
    run: dbt test --target prod

  - name: 生成文档
    run: dbt docs generate --target prod

  - name: 上传文档至S3
    run: |
      aws s3 sync target/ s3://dbt-docs-bucket/
    env:
      AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
      AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
undefined

Orchestration with Airflow

使用Airflow编排

python
undefined
python
undefined

dags/dbt_dag.py

dags/dbt_dag.py

from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta
default_args = { 'owner': 'analytics', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }
with DAG( 'dbt_production', default_args=default_args, description='Run dbt models in production', schedule_interval='0 2 * * *', # 2 AM daily start_date=datetime(2024, 1, 1), catchup=False, tags=['dbt', 'analytics'], ) as dag:
dbt_deps = BashOperator(
    task_id='dbt_deps',
    bash_command='cd /opt/dbt && dbt deps',
)

dbt_seed = BashOperator(
    task_id='dbt_seed',
    bash_command='cd /opt/dbt && dbt seed --target prod',
)

dbt_run_staging = BashOperator(
    task_id='dbt_run_staging',
    bash_command='cd /opt/dbt && dbt run --select staging.* --target prod',
)

dbt_run_marts = BashOperator(
    task_id='dbt_run_marts',
    bash_command='cd /opt/dbt && dbt run --select marts.* --target prod',
)

dbt_test = BashOperator(
    task_id='dbt_test',
    bash_command='cd /opt/dbt && dbt test --target prod',
)

dbt_docs = BashOperator(
    task_id='dbt_docs',
    bash_command='cd /opt/dbt && dbt docs generate --target prod',
)

# Define task dependencies
dbt_deps >> dbt_seed >> dbt_run_staging >> dbt_run_marts >> dbt_test >> dbt_docs
undefined
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta
default_args = { 'owner': 'analytics', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }
with DAG( 'dbt_production', default_args=default_args, description='在生产环境中运行dbt模型', schedule_interval='0 2 * * *', # 每天UTC时间2点 start_date=datetime(2024, 1, 1), catchup=False, tags=['dbt', 'analytics'], ) as dag:
dbt_deps = BashOperator(
    task_id='dbt_deps',
    bash_command='cd /opt/dbt && dbt deps',
)

dbt_seed = BashOperator(
    task_id='dbt_seed',
    bash_command='cd /opt/dbt && dbt seed --target prod',
)

dbt_run_staging = BashOperator(
    task_id='dbt_run_staging',
    bash_command='cd /opt/dbt && dbt run --select staging.* --target prod',
)

dbt_run_marts = BashOperator(
    task_id='dbt_run_marts',
    bash_command='cd /opt/dbt && dbt run --select marts.* --target prod',
)

dbt_test = BashOperator(
    task_id='dbt_test',
    bash_command='cd /opt/dbt && dbt test --target prod',
)

dbt_docs = BashOperator(
    task_id='dbt_docs',
    bash_command='cd /opt/dbt && dbt docs generate --target prod',
)

# 定义任务依赖
dbt_deps >> dbt_seed >> dbt_run_staging >> dbt_run_marts >> dbt_test >> dbt_docs
undefined

dbt Cloud Integration

dbt Cloud集成

yaml
undefined
yaml
undefined

dbt_cloud.yml

dbt_cloud.yml

Environment configuration

环境配置

environments:
  • name: Production dbt_version: 1.7.latest type: deployment
  • name: Development dbt_version: 1.7.latest type: development
environments:
  • name: Production dbt_version: 1.7.latest type: deployment
  • name: Development dbt_version: 1.7.latest type: development

Job configuration

任务配置

jobs:
  • name: Production Run environment: Production triggers: schedule: cron: "0 2 * * *" # 2 AM daily commands:
    • dbt deps
    • dbt seed
    • dbt run
    • dbt test
  • name: CI Check environment: Development triggers: github_webhook: true commands:
    • dbt deps
    • dbt run --select state:modified+
    • dbt test --select state:modified+
undefined
jobs:
  • name: 生产运行 environment: Production triggers: schedule: cron: "0 2 * * *" # 每天UTC时间2点 commands:
    • dbt deps
    • dbt seed
    • dbt run
    • dbt test
  • name: CI检查 environment: Development triggers: github_webhook: true commands:
    • dbt deps
    • dbt run --select state:modified+
    • dbt test --select state:modified+
undefined

Monitoring & Alerting

监控与告警

sql
-- macros/post_hook_monitoring.sql

{% macro monitor_row_count(threshold=0) %}

{% if execute %}
    {% set row_count_query %}
        select count(*) as row_count from {{ this }}
    {% endset %}

    {% set results = run_query(row_count_query) %}
    {% set row_count = results.columns[0].values()[0] %}

    {% if row_count < threshold %}
        {{ exceptions.raise_compiler_error("Row count " ~ row_count ~ " below threshold " ~ threshold) }}
    {% endif %}

    {{ log("Model " ~ this ~ " has " ~ row_count ~ " rows", info=True) }}
{% endif %}

{% endmacro %}
Usage:
sql
{{ config(
    post_hook="{{ monitor_row_count(threshold=1000) }}"
) }}

select * from {{ ref('stg_orders') }}
sql
-- macros/post_hook_monitoring.sql

{% macro monitor_row_count(threshold=0) %}

{% if execute %}
    {% set row_count_query %}
        select count(*) as row_count from {{ this }}
    {% endset %}

    {% set results = run_query(row_count_query) %}
    {% set row_count = results.columns[0].values()[0] %}

    {% if row_count < threshold %}
        {{ exceptions.raise_compiler_error("Row count " ~ row_count ~ " below threshold " ~ threshold) }}
    {% endif %}

    {{ log("Model " ~ this ~ " has " ~ row_count ~ " rows", info=True) }}
{% endif %}

{% endmacro %}
使用方式:
sql
{{ config(
    post_hook="{{ monitor_row_count(threshold=1000) }}"
) }}

select * from {{ ref('stg_orders') }}

Best Practices

最佳实践

Naming Conventions

命名规范

Models:
stg_[source]__[entity].sql        # Staging: stg_stripe__payments.sql
int_[entity]_[verb].sql            # Intermediate: int_orders_joined.sql
fct_[entity].sql                   # Fact: fct_orders.sql
dim_[entity].sql                   # Dimension: dim_customers.sql
Tests:
assert_[description].sql           # assert_positive_order_totals.sql
Macros:
[verb]_[noun].sql                  # generate_surrogate_key.sql
模型:
stg_[source]__[entity].sql        # Staging模型:stg_stripe__payments.sql
int_[entity]_[verb].sql            # 中间模型:int_orders_joined.sql
fct_[entity].sql                   # 事实表:fct_orders.sql
dim_[entity].sql                   # 维度表:dim_customers.sql
测试:
assert_[description].sql           # assert_positive_order_totals.sql
宏:
[verb]_[noun].sql                  # generate_surrogate_key.sql

SQL Style Guide

SQL风格指南

sql
-- ✓ Good: Clear CTEs, proper formatting
with orders as (
    select
        order_id,
        customer_id,
        order_date,
        status
    from {{ ref('stg_orders') }}
    where status != 'cancelled'
),

customers as (
    select
        customer_id,
        customer_name,
        customer_email
    from {{ ref('dim_customers') }}
),

final as (
    select
        orders.order_id,
        orders.order_date,
        customers.customer_name,
        orders.status
    from orders
    left join customers
        on orders.customer_id = customers.customer_id
)

select * from final

-- ✗ Bad: Nested subqueries, poor formatting
select o.order_id, o.order_date, c.customer_name, o.status from (
select order_id, customer_id, order_date, status from {{ ref('stg_orders') }}
where status != 'cancelled') o left join (select customer_id, customer_name from
{{ ref('dim_customers') }}) c on o.customer_id = c.customer_id
sql
-- ✓ 良好:清晰的CTE、格式规范
with orders as (
    select
        order_id,
        customer_id,
        order_date,
        status
    from {{ ref('stg_orders') }}
    where status != 'cancelled'
),

customers as (
    select
        customer_id,
        customer_name,
        customer_email
    from {{ ref('dim_customers') }}
),

final as (
    select
        orders.order_id,
        orders.order_date,
        customers.customer_name,
        orders.status
    from orders
    left join customers
        on orders.customer_id = customers.customer_id
)

select * from final

-- ✗ 不良:嵌套子查询、格式混乱
select o.order_id, o.order_date, c.customer_name, o.status from (
select order_id, customer_id, order_date, status from {{ ref('stg_orders') }}
where status != 'cancelled') o left join (select customer_id, customer_name from
{{ ref('dim_customers') }}) c on o.customer_id = c.customer_id

Performance Optimization

性能优化

1. Use Incremental Models for Large Tables
sql
-- Process only new data
{{ config(materialized='incremental') }}

select * from {{ source('events', 'page_views') }}
{% if is_incremental() %}
    where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
2. Leverage Clustering and Partitioning
sql
{{ config(
    materialized='table',
    partition_by={'field': 'order_date', 'data_type': 'date'},
    cluster_by=['customer_id', 'status']
) }}
3. Reduce Data Scanned
sql
-- ✓ Good: Filter early
with source as (
    select *
    from {{ source('app', 'events') }}
    where event_date >= '2024-01-01'  -- Filter in source CTE
)

-- ✗ Bad: Filter late
with source as (
    select * from {{ source('app', 'events') }}
)

select * from source
where event_date >= '2024-01-01'  -- Filtering after full scan
4. Use Ephemeral for Simple Transformations
sql
-- Avoid creating unnecessary views
{{ config(materialized='ephemeral') }}

select
    order_id,
    lower(trim(status)) as status_clean
from {{ ref('stg_orders') }}
1. 对大表使用增量模型
sql
-- 仅处理新数据
{{ config(materialized='incremental') }}

select * from {{ source('events', 'page_views') }}
{% if is_incremental() %}
    where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
2. 利用聚类和分区
sql
{{ config(
    materialized='table',
    partition_by={'field': 'order_date', 'data_type': 'date'},
    cluster_by=['customer_id', 'status']
) }}
3. 减少扫描数据量
sql
-- ✓ 良好:尽早过滤
with source as (
    select *
    from {{ source('app', 'events') }}
    where event_date >= '2024-01-01'  # 在源CTE中过滤
)

-- ✗ 不良:延迟过滤
with source as (
    select * from {{ source('app', 'events') }}
)

select * from source
where event_date >= '2024-01-01'  # 全表扫描后过滤
4. 对简单转换使用临时模型
sql
-- 避免创建不必要的视图
{{ config(materialized='ephemeral') }}

select
    order_id,
    lower(trim(status)) as status_clean
from {{ ref('stg_orders') }}

Project Structure Best Practices

项目结构最佳实践

1. Layer Your Transformations
Staging → Intermediate → Marts
  ↓           ↓            ↓
 1:1      Purpose-built  Business
Sources    Logic        Entities
2. Modularize Complex Logic
sql
-- Instead of one massive model, break it down:

-- intermediate/int_order_items_aggregated.sql
-- intermediate/int_customer_lifetime_value.sql
-- intermediate/int_payment_summaries.sql

-- marts/fct_orders.sql (combines intermediate models)
3. Use Consistent File Organization
models/
├── staging/
│   └── [source]/
│       ├── _[source]__sources.yml
│       ├── _[source]__models.yml
│       └── stg_[source]__[table].sql
├── intermediate/
│   └── int_[purpose].sql
└── marts/
    └── [business_area]/
        ├── _[area]__models.yml
        └── [model_type]_[entity].sql
1. 分层转换
Staging → Intermediate → Marts
  ↓           ↓            ↓
  与源表一一对应   特定用途逻辑   业务实体
2. 拆分复杂逻辑
sql
-- 不要使用一个庞大的模型,而是拆分为多个:

-- intermediate/int_order_items_aggregated.sql
-- intermediate/int_customer_lifetime_value.sql
-- intermediate/int_payment_summaries.sql

-- marts/fct_orders.sql(组合中间模型)
3. 一致的文件组织
models/
├── staging/
│   └── [source]/
│       ├── _[source]__sources.yml
│       ├── _[source]__models.yml
│       └── stg_[source]__[table].sql
├── intermediate/
│   └── int_[purpose].sql
└── marts/
    └── [business_area]/
        ├── _[area]__models.yml
        └── [model_type]_[entity].sql

Testing Strategy

测试策略

1. Test at Multiple Levels
yaml
undefined
1. 多级别测试
yaml
undefined

Source tests: Data quality at ingestion

源测试: ingestion阶段的数据质量

sources:
  • name: raw_data tables:
    • name: orders columns:
      • name: id tests: [unique, not_null]
sources:
  • name: raw_data tables:
    • name: orders columns:
      • name: id tests: [unique, not_null]

Model tests: Transformation logic

模型测试:转换逻辑

models:
  • name: fct_orders tests:
    • dbt_utils.expression_is_true: expression: "order_total >= 0" columns:
    • name: order_id tests: [unique, not_null]
models:
  • name: fct_orders tests:
    • dbt_utils.expression_is_true: expression: "order_total >= 0" columns:
    • name: order_id tests: [unique, not_null]

Custom tests: Business logic

自定义测试:业务逻辑

tests/assert_revenue_reconciliation.sql

tests/assert_revenue_reconciliation.sql


**2. Use Appropriate Test Severity**

```yaml

**2. 使用合适的测试严重级别**

```yaml

Critical tests: error (fail build)

关键测试:error(构建失败)

Nice-to-have tests: warn (log but don't fail)

可选测试:warn(仅记录不失败)

tests:
  • unique: severity: error
  • dbt_utils.not_null_proportion: at_least: 0.95 severity: warn

**3. Test Coverage Goals**

- 100% of primary keys: unique + not_null
- 100% of foreign keys: relationships tests
- All business logic: custom data tests
- Critical calculations: expression tests
tests:
  • unique: severity: error
  • dbt_utils.not_null_proportion: at_least: 0.95 severity: warn

**3. 测试覆盖率目标**

- 100%的主键:unique + not_null
- 100%的外键:relationships测试
- 所有业务逻辑:自定义数据测试
- 关键计算:expression测试

Documentation Standards

文档标准

1. Document Every Model
yaml
models:
  - name: fct_orders
    description: |
      **Purpose:** [Why this model exists]
      **Grain:** [One row represents...]
      **Refresh:** [When and how often]
      **Consumers:** [Who uses this]
2. Document Complex Logic
sql
-- Use comments for complex business rules
select
    order_id,
    -- Revenue recognition: Only count completed orders
    -- cancelled within 30 days (per finance policy 2024-03)
    case
        when status = 'completed'
            and datediff('day', order_date, current_date) > 30
        then order_total
        else 0
    end as recognized_revenue
from {{ ref('stg_orders') }}
3. Keep Docs Updated
  • Update docs when logic changes
  • Review docs during code reviews
  • Generate docs regularly:
    dbt docs generate
1. 为所有模型编写文档
yaml
models:
  - name: fct_orders
    description: |
      **用途**:[该模型存在的原因]
      **粒度**:[每条记录代表什么]
      **刷新频率**:[更新时间和频率]
      **使用者**:[谁使用该模型]
2. 记录复杂逻辑
sql
-- 对复杂业务规则使用注释
select
    order_id,
    -- 收入确认:仅统计完成且超过30天的订单(根据2024-03的财务政策)
    case
        when status = 'completed'
            and datediff('day', order_date, current_date) > 30
        then order_total
        else 0
    end as recognized_revenue
from {{ ref('stg_orders') }}
3. 保持文档更新
  • 逻辑变更时更新文档
  • 代码审查时检查文档
  • 定期生成文档:
    dbt docs generate

20 Detailed Examples

20个详细示例

Example 1: Basic Staging Model

示例1:基础Staging模型

sql
-- models/staging/jaffle_shop/stg_jaffle_shop__customers.sql

with source as (
    select * from {{ source('jaffle_shop', 'customers') }}
),

renamed as (
    select
        id as customer_id,
        first_name,
        last_name,
        first_name || ' ' || last_name as customer_name,
        email,
        _loaded_at
    from source
)

select * from renamed
sql
-- models/staging/jaffle_shop/stg_jaffle_shop__customers.sql

with source as (
    select * from {{ source('jaffle_shop', 'customers') }}
),

renamed as (
    select
        id as customer_id,
        first_name,
        last_name,
        first_name || ' ' || last_name as customer_name,
        email,
        _loaded_at
    from source
)

select * from renamed

Example 2: Fact Table with Multiple Joins

示例2:多关联的事实表

sql
-- models/marts/core/fct_orders.sql

{{ config(
    materialized='table',
    tags=['core', 'daily']
) }}

with orders as (
    select * from {{ ref('stg_jaffle_shop__orders') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

payments as (
    select
        order_id,
        sum(amount) as total_payment_amount
    from {{ ref('stg_stripe__payments') }}
    where status = 'success'
    group by 1
),

final as (
    select
        orders.order_id,
        orders.customer_id,
        customers.customer_name,
        orders.order_date,
        orders.status,
        coalesce(payments.total_payment_amount, 0) as order_total,
        {{ add_audit_columns() }}
    from orders
    left join customers
        on orders.customer_id = customers.customer_id
    left join payments
        on orders.order_id = payments.order_id
)

select * from final
sql
-- models/marts/core/fct_orders.sql

{{ config(
    materialized='table',
    tags=['core', 'daily']
) }}

with orders as (
    select * from {{ ref('stg_jaffle_shop__orders') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

payments as (
    select
        order_id,
        sum(amount) as total_payment_amount
    from {{ ref('stg_stripe__payments') }}
    where status = 'success'
    group by 1
),

final as (
    select
        orders.order_id,
        orders.customer_id,
        customers.customer_name,
        orders.order_date,
        orders.status,
        coalesce(payments.total_payment_amount, 0) as order_total,
        {{ add_audit_columns() }}
    from orders
    left join customers
        on orders.customer_id = customers.customer_id
    left join payments
        on orders.order_id = payments.order_id
)

select * from final

Example 3: Incremental Event Table

示例3:增量事件表

sql
-- models/marts/analytics/fct_page_views.sql

{{ config(
    materialized='incremental',
    unique_key='page_view_id',
    partition_by={
        'field': 'event_date',
        'data_type': 'date',
        'granularity': 'day'
    },
    cluster_by=['user_id', 'page_path']
) }}

with events as (
    select
        event_id as page_view_id,
        user_id,
        session_id,
        event_timestamp,
        date(event_timestamp) as event_date,
        event_properties:page_path::string as page_path,
        event_properties:referrer::string as referrer,
        _loaded_at
    from {{ source('analytics', 'raw_events') }}
    where event_type = 'page_view'

    {% if is_incremental() %}
        -- Use _loaded_at to catch late-arriving data
        and _loaded_at > (select max(_loaded_at) from {{ this }})
    {% endif %}
),

enriched as (
    select
        page_view_id,
        user_id,
        session_id,
        event_timestamp,
        event_date,
        page_path,
        referrer,
        -- Parse URL components
        split_part(page_path, '?', 1) as page_path_clean,
        case
            when referrer like '%google%' then 'Google'
            when referrer like '%facebook%' then 'Facebook'
            when referrer is null then 'Direct'
            else 'Other'
        end as referrer_source,
        _loaded_at
    from events
)

select * from enriched
sql
-- models/marts/analytics/fct_page_views.sql

{{ config(
    materialized='incremental',
    unique_key='page_view_id',
    partition_by={
        'field': 'event_date',
        'data_type': 'date',
        'granularity': 'day'
    },
    cluster_by=['user_id', 'page_path']
) }}

with events as (
    select
        event_id as page_view_id,
        user_id,
        session_id,
        event_timestamp,
        date(event_timestamp) as event_date,
        event_properties:page_path::string as page_path,
        event_properties:referrer::string as referrer,
        _loaded_at
    from {{ source('analytics', 'raw_events') }}
    where event_type = 'page_view'

    {% if is_incremental() %}
        -- 使用_loaded_at捕获延迟到达的数据
        and _loaded_at > (select max(_loaded_at) from {{ this }})
    {% endif %}
),

enriched as (
    select
        page_view_id,
        user_id,
        session_id,
        event_timestamp,
        event_date,
        page_path,
        referrer,
        -- 解析URL组件
        split_part(page_path, '?', 1) as page_path_clean,
        case
            when referrer like '%google%' then 'Google'
            when referrer like '%facebook%' then 'Facebook'
            when referrer is null then 'Direct'
            else 'Other'
        end as referrer_source,
        _loaded_at
    from events
)

select * from enriched

Example 4: Customer Dimension with SCD Type 2

示例4:Type 2缓慢变化维度的客户维度表

sql
-- models/marts/core/dim_customers.sql

{{ config(
    materialized='table',
    unique_key='customer_key'
) }}

with customers as (
    select * from {{ ref('stg_jaffle_shop__customers') }}
),

customer_orders as (
    select
        customer_id,
        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as total_orders
    from {{ ref('fct_orders') }}
    group by 1
),

final as (
    select
        {{ dbt_utils.generate_surrogate_key(['customers.customer_id', 'customers._loaded_at']) }}
            as customer_key,
        customers.customer_id,
        customers.customer_name,
        customers.email,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        customer_orders.total_orders,
        case
            when customer_orders.total_orders >= 10 then 'VIP'
            when customer_orders.total_orders >= 5 then 'Regular'
            when customer_orders.total_orders >= 1 then 'New'
            else 'Prospect'
        end as customer_segment,
        customers._loaded_at as effective_from,
        null as effective_to,
        true as is_current
    from customers
    left join customer_orders
        on customers.customer_id = customer_orders.customer_id
)

select * from final
sql
-- models/marts/core/dim_customers.sql

{{ config(
    materialized='table',
    unique_key='customer_key'
) }}

with customers as (
    select * from {{ ref('stg_jaffle_shop__customers') }}
),

customer_orders as (
    select
        customer_id,
        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as total_orders
    from {{ ref('fct_orders') }}
    group by 1
),

final as (
    select
        {{ dbt_utils.generate_surrogate_key(['customers.customer_id', 'customers._loaded_at']) }}
            as customer_key,
        customers.customer_id,
        customers.customer_name,
        customers.email,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        customer_orders.total_orders,
        case
            when customer_orders.total_orders >= 10 then 'VIP'
            when customer_orders.total_orders >= 5 then 'Regular'
            when customer_orders.total_orders >= 1 then 'New'
            else 'Prospect'
        end as customer_segment,
        customers._loaded_at as effective_from,
        null as effective_to,
        true as is_current
    from customers
    left join customer_orders
        on customers.customer_id = customer_orders.customer_id
)

select * from final

Example 5: Aggregated Metrics Table

示例5:聚合指标表

sql
-- models/marts/analytics/daily_order_metrics.sql

{{ config(
    materialized='incremental',
    unique_key=['metric_date', 'status'],
    incremental_strategy='delete+insert'
) }}

with orders as (
    select * from {{ ref('fct_orders') }}

    {% if is_incremental() %}
        where order_date >= (select max(metric_date) - interval '7 days' from {{ this }})
    {% endif %}
),

daily_metrics as (
    select
        date_trunc('day', order_date) as metric_date,
        status,
        count(distinct order_id) as order_count,
        count(distinct customer_id) as unique_customers,
        sum(order_total) as total_revenue,
        avg(order_total) as avg_order_value,
        min(order_total) as min_order_value,
        max(order_total) as max_order_value,
        percentile_cont(0.5) within group (order by order_total) as median_order_value
    from orders
    group by 1, 2
)

select * from daily_metrics
sql
-- models/marts/analytics/daily_order_metrics.sql

{{ config(
    materialized='incremental',
    unique_key=['metric_date', 'status'],
    incremental_strategy='delete+insert'
) }}

with orders as (
    select * from {{ ref('fct_orders') }}

    {% if is_incremental() %}
        where order_date >= (select max(metric_date) - interval '7 days' from {{ this }})
    {% endif %}
),

daily_metrics as (
    select
        date_trunc('day', order_date) as metric_date,
        status,
        count(distinct order_id) as order_count,
        count(distinct customer_id) as unique_customers,
        sum(order_total) as total_revenue,
        avg(order_total) as avg_order_value,
        min(order_total) as min_order_value,
        max(order_total) as max_order_value,
        percentile_cont(0.5) within group (order by order_total) as median_order_value
    from orders
    group by 1, 2
)

select * from daily_metrics

Example 6: Pivoted Metrics Using Macro

示例6:使用宏的透视指标

sql
-- models/marts/analytics/customer_order_status_summary.sql

with orders as (
    select
        customer_id,
        status,
        order_total
    from {{ ref('fct_orders') }}
)

select
    customer_id,
    {% for status in ['placed', 'shipped', 'completed', 'returned', 'cancelled'] %}
        sum(case when status = '{{ status }}' then 1 else 0 end)
            as {{ status }}_count,
        sum(case when status = '{{ status }}' then order_total else 0 end)
            as {{ status }}_revenue
        {% if not loop.last %},{% endif %}
    {% endfor %}
from orders
group by 1
sql
-- models/marts/analytics/customer_order_status_summary.sql

with orders as (
    select
        customer_id,
        status,
        order_total
    from {{ ref('fct_orders') }}
)

select
    customer_id,
    {% for status in ['placed', 'shipped', 'completed', 'returned', 'cancelled'] %}
        sum(case when status = '{{ status }}' then 1 else 0 end)
            as {{ status }}_count,
        sum(case when status = '{{ status }}' then order_total else 0 end)
            as {{ status }}_revenue
        {% if not loop.last %},{% endif %}
    {% endfor %}
from orders
group by 1

Example 7: Snapshot for SCD Type 2

示例7:Type 2缓慢变化维度的快照

sql
-- snapshots/customers_snapshot.sql

{% snapshot customers_snapshot %}

{{
    config(
        target_schema='snapshots',
        target_database='analytics',
        unique_key='customer_id',
        strategy='timestamp',
        updated_at='updated_at',
        invalidate_hard_deletes=True
    )
}}

select
    customer_id,
    customer_name,
    email,
    customer_segment,
    updated_at
from {{ source('jaffle_shop', 'customers') }}

{% endsnapshot %}
sql
-- snapshots/customers_snapshot.sql

{% snapshot customers_snapshot %}

{{
    config(
        target_schema='snapshots',
        target_database='analytics',
        unique_key='customer_id',
        strategy='timestamp',
        updated_at='updated_at',
        invalidate_hard_deletes=True
    )
}}

select
    customer_id,
    customer_name,
    email,
    customer_segment,
    updated_at
from {{ source('jaffle_shop', 'customers') }}

{% endsnapshot %}

Example 8: Funnel Analysis Model

示例8:漏斗分析模型

sql
-- models/marts/analytics/conversion_funnel.sql

with page_views as (
    select
        user_id,
        session_id,
        min(event_timestamp) as session_start
    from {{ ref('fct_page_views') }}
    where event_date >= current_date - interval '30 days'
    group by 1, 2
),

product_views as (
    select distinct
        user_id,
        session_id
    from {{ ref('fct_page_views') }}
    where page_path like '/product/%'
        and event_date >= current_date - interval '30 days'
),

add_to_cart as (
    select distinct
        user_id,
        session_id
    from {{ ref('fct_events') }}
    where event_type = 'add_to_cart'
        and event_date >= current_date - interval '30 days'
),

checkout_started as (
    select distinct
        user_id,
        session_id
    from {{ ref('fct_events') }}
    where event_type = 'checkout_started'
        and event_date >= current_date - interval '30 days'
),

orders as (
    select distinct
        customer_id as user_id,
        session_id
    from {{ ref('fct_orders') }}
    where order_date >= current_date - interval '30 days'
        and status = 'completed'
),

funnel as (
    select
        count(distinct page_views.session_id) as sessions,
        count(distinct product_views.session_id) as product_views,
        count(distinct add_to_cart.session_id) as add_to_cart,
        count(distinct checkout_started.session_id) as checkout_started,
        count(distinct orders.session_id) as completed_orders
    from page_views
    left join product_views using (session_id)
    left join add_to_cart using (session_id)
    left join checkout_started using (session_id)
    left join orders using (session_id)
),

funnel_metrics as (
    select
        sessions,
        product_views,
        round(100.0 * product_views / nullif(sessions, 0), 2) as pct_product_views,
        add_to_cart,
        round(100.0 * add_to_cart / nullif(product_views, 0), 2) as pct_add_to_cart,
        checkout_started,
        round(100.0 * checkout_started / nullif(add_to_cart, 0), 2) as pct_checkout_started,
        completed_orders,
        round(100.0 * completed_orders / nullif(checkout_started, 0), 2) as pct_completed_orders,
        round(100.0 * completed_orders / nullif(sessions, 0), 2) as overall_conversion_rate
    from funnel
)

select * from funnel_metrics
sql
-- models/marts/analytics/conversion_funnel.sql

with page_views as (
    select
        user_id,
        session_id,
        min(event_timestamp) as session_start
    from {{ ref('fct_page_views') }}
    where event_date >= current_date - interval '30 days'
    group by 1, 2
),

product_views as (
    select distinct
        user_id,
        session_id
    from {{ ref('fct_page_views') }}
    where page_path like '/product/%'
        and event_date >= current_date - interval '30 days'
),

add_to_cart as (
    select distinct
        user_id,
        session_id
    from {{ ref('fct_events') }}
    where event_type = 'add_to_cart'
        and event_date >= current_date - interval '30 days'
),

checkout_started as (
    select distinct
        user_id,
        session_id
    from {{ ref('fct_events') }}
    where event_type = 'checkout_started'
        and event_date >= current_date - interval '30 days'
),

orders as (
    select distinct
        customer_id as user_id,
        session_id
    from {{ ref('fct_orders') }}
    where order_date >= current_date - interval '30 days'
        and status = 'completed'
),

funnel as (
    select
        count(distinct page_views.session_id) as sessions,
        count(distinct product_views.session_id) as product_views,
        count(distinct add_to_cart.session_id) as add_to_cart,
        count(distinct checkout_started.session_id) as checkout_started,
        count(distinct orders.session_id) as completed_orders
    from page_views
    left join product_views using (session_id)
    left join add_to_cart using (session_id)
    left join checkout_started using (session_id)
    left join orders using (session_id)
),

funnel_metrics as (
    select
        sessions,
        product_views,
        round(100.0 * product_views / nullif(sessions, 0), 2) as pct_product_views,
        add_to_cart,
        round(100.0 * add_to_cart / nullif(product_views, 0), 2) as pct_add_to_cart,
        checkout_started,
        round(100.0 * checkout_started / nullif(add_to_cart, 0), 2) as pct_checkout_started,
        completed_orders,
        round(100.0 * completed_orders / nullif(checkout_started, 0), 2) as pct_completed_orders,
        round(100.0 * completed_orders / nullif(sessions, 0), 2) as overall_conversion_rate
    from funnel
)

select * from funnel_metrics

Example 9: Cohort Retention Analysis

示例9:同期群留存分析

sql
-- models/marts/analytics/cohort_retention.sql

with customer_orders as (
    select
        customer_id,
        date_trunc('month', order_date) as order_month
    from {{ ref('fct_orders') }}
    where status = 'completed'
),

first_order as (
    select
        customer_id,
        min(order_month) as cohort_month
    from customer_orders
    group by 1
),

cohort_data as (
    select
        f.cohort_month,
        c.order_month,
        datediff('month', f.cohort_month, c.order_month) as months_since_first_order,
        count(distinct c.customer_id) as customer_count
    from first_order f
    join customer_orders c
        on f.customer_id = c.customer_id
    group by 1, 2, 3
),

cohort_size as (
    select
        cohort_month,
        customer_count as cohort_size
    from cohort_data
    where months_since_first_order = 0
),

retention as (
    select
        cohort_data.cohort_month,
        cohort_data.months_since_first_order,
        cohort_data.customer_count,
        cohort_size.cohort_size,
        round(100.0 * cohort_data.customer_count / cohort_size.cohort_size, 2) as retention_pct
    from cohort_data
    join cohort_size
        on cohort_data.cohort_month = cohort_size.cohort_month
)

select * from retention
order by cohort_month, months_since_first_order
sql
-- models/marts/analytics/cohort_retention.sql

with customer_orders as (
    select
        customer_id,
        date_trunc('month', order_date) as order_month
    from {{ ref('fct_orders') }}
    where status = 'completed'
),

first_order as (
    select
        customer_id,
        min(order_month) as cohort_month
    from customer_orders
    group by 1
),

cohort_data as (
    select
        f.cohort_month,
        c.order_month,
        datediff('month', f.cohort_month, c.order_month) as months_since_first_order,
        count(distinct c.customer_id) as customer_count
    from first_order f
    join customer_orders c
        on f.customer_id = c.customer_id
    group by 1, 2, 3
),

cohort_size as (
    select
        cohort_month,
        customer_count as cohort_size
    from cohort_data
    where months_since_first_order = 0
),

retention as (
    select
        cohort_data.cohort_month,
        cohort_data.months_since_first_order,
        cohort_data.customer_count,
        cohort_size.cohort_size,
        round(100.0 * cohort_data.customer_count / cohort_size.cohort_size, 2) as retention_pct
    from cohort_data
    join cohort_size
        on cohort_data.cohort_month = cohort_size.cohort_month
)

select * from retention
order by cohort_month, months_since_first_order

Example 10: Revenue Attribution Model

示例10:收入归因模型

sql
-- models/marts/analytics/revenue_attribution.sql

with touchpoints as (
    select
        user_id,
        session_id,
        event_timestamp,
        case
            when referrer like '%google%' then 'Google'
            when referrer like '%facebook%' then 'Facebook'
            when referrer like '%email%' then 'Email'
            when referrer is null then 'Direct'
            else 'Other'
        end as channel
    from {{ ref('fct_page_views') }}
),

customer_journeys as (
    select
        t.user_id,
        o.order_id,
        o.order_total,
        t.channel,
        t.event_timestamp,
        o.order_date,
        row_number() over (
            partition by o.order_id
            order by t.event_timestamp
        ) as touchpoint_number,
        count(*) over (partition by o.order_id) as total_touchpoints
    from touchpoints t
    join {{ ref('fct_orders') }} o
        on t.user_id = o.customer_id
        and t.event_timestamp <= o.order_date
        and t.event_timestamp >= dateadd('day', -30, o.order_date)
),

attributed_revenue as (
    select
        order_id,
        channel,
        order_total,
        -- First touch attribution
        case when touchpoint_number = 1
            then order_total else 0 end as first_touch_revenue,
        -- Last touch attribution
        case when touchpoint_number = total_touchpoints
            then order_total else 0 end as last_touch_revenue,
        -- Linear attribution
        order_total / total_touchpoints as linear_revenue,
        -- Time decay (more recent touchpoints get more credit)
        order_total * (power(2, touchpoint_number - 1) /
            (power(2, total_touchpoints) - 1)) as time_decay_revenue
    from customer_journeys
)

select
    channel,
    count(distinct order_id) as orders,
    sum(first_touch_revenue) as first_touch_revenue,
    sum(last_touch_revenue) as last_touch_revenue,
    sum(linear_revenue) as linear_revenue,
    sum(time_decay_revenue) as time_decay_revenue
from attributed_revenue
group by 1
sql
-- models/marts/analytics/revenue_attribution.sql

with touchpoints as (
    select
        user_id,
        session_id,
        event_timestamp,
        case
            when referrer like '%google%' then 'Google'
            when referrer like '%facebook%' then 'Facebook'
            when referrer like '%email%' then 'Email'
            when referrer is null then 'Direct'
            else 'Other'
        end as channel
    from {{ ref('fct_page_views') }}
),

customer_journeys as (
    select
        t.user_id,
        o.order_id,
        o.order_total,
        t.channel,
        t.event_timestamp,
        o.order_date,
        row_number() over (
            partition by o.order_id
            order by t.event_timestamp
        ) as touchpoint_number,
        count(*) over (partition by o.order_id) as total_touchpoints
    from touchpoints t
    join {{ ref('fct_orders') }} o
        on t.user_id = o.customer_id
        and t.event_timestamp <= o.order_date
        and t.event_timestamp >= dateadd('day', -30, o.order_date)
),

attributed_revenue as (
    select
        order_id,
        channel,
        order_total,
        -- 首次触点归因
        case when touchpoint_number = 1
            then order_total else 0 end as first_touch_revenue,
        -- 末次触点归因
        case when touchpoint_number = total_touchpoints
            then order_total else 0 end as last_touch_revenue,
        -- 线性归因
        order_total / total_touchpoints as linear_revenue,
        -- 时间衰减(越近的触点获得更多权重)
        order_total * (power(2, touchpoint_number - 1) /
            (power(2, total_touchpoints) - 1)) as time_decay_revenue
    from customer_journeys
)

select
    channel,
    count(distinct order_id) as orders,
    sum(first_touch_revenue) as first_touch_revenue,
    sum(last_touch_revenue) as last_touch_revenue,
    sum(linear_revenue) as linear_revenue,
    sum(time_decay_revenue) as time_decay_revenue
from attributed_revenue
group by 1

Example 11: Data Quality Test Suite

示例11:数据质量测试套件

sql
-- tests/assert_fct_orders_quality.sql

-- Test multiple data quality rules in one test

with order_quality_checks as (
    select
        order_id,
        customer_id,
        order_date,
        order_total,
        status,

        -- Check 1: Order total should be positive
        case when order_total < 0
            then 'Negative order total' end as check_1,

        -- Check 2: Order date should not be in future
        case when order_date > current_date
            then 'Future order date' end as check_2,

        -- Check 3: Customer ID should exist
        case when customer_id is null
            then 'Missing customer ID' end as check_3,

        -- Check 4: Status should be valid
        case when status not in ('placed', 'shipped', 'completed', 'returned', 'cancelled')
            then 'Invalid status' end as check_4

    from {{ ref('fct_orders') }}
),

failed_checks as (
    select
        order_id,
        check_1,
        check_2,
        check_3,
        check_4
    from order_quality_checks
    where check_1 is not null
        or check_2 is not null
        or check_3 is not null
        or check_4 is not null
)

select * from failed_checks
sql
-- tests/assert_fct_orders_quality.sql

-- 在一个测试中测试多个数据质量规则

with order_quality_checks as (
    select
        order_id,
        customer_id,
        order_date,
        order_total,
        status,

        -- 检查1:订单总金额应为正数
        case when order_total < 0
            then 'Negative order total' end as check_1,

        -- 检查2:订单日期不应在未来
        case when order_date > current_date
            then 'Future order date' end as check_2,

        -- 检查3:客户ID不应为空
        case when customer_id is null
            then 'Missing customer ID' end as check_3,

        -- 检查4:状态应有效
        case when status not in ('placed', 'shipped', 'completed', 'returned', 'cancelled')
            then 'Invalid status' end as check_4

    from {{ ref('fct_orders') }}
),

failed_checks as (
    select
        order_id,
        check_1,
        check_2,
        check_3,
        check_4
    from order_quality_checks
    where check_1 is not null
        or check_2 is not null
        or check_3 is not null
        or check_4 is not null
)

select * from failed_checks

Example 12: Slowly Changing Dimension Merge

示例12:缓慢变化维度的合并

sql
-- models/marts/core/dim_products_scd.sql

{{ config(
    materialized='incremental',
    unique_key='product_key',
    merge_update_columns=['product_name', 'category', 'price', 'effective_to', 'is_current']
) }}

with source_data as (
    select
        product_id,
        product_name,
        category,
        price,
        updated_at
    from {{ source('ecommerce', 'products') }}

    {% if is_incremental() %}
        where updated_at > (select max(updated_at) from {{ this }} where is_current = true)
    {% endif %}
),

{% if is_incremental() %}
existing_records as (
    select *
    from {{ this }}
    where is_current = true
),

changed_records as (
    select
        s.product_id,
        s.product_name,
        s.category,
        s.price,
        s.updated_at
    from source_data s
    join existing_records e
        on s.product_id = e.product_id
        and (
            s.product_name != e.product_name
            or s.category != e.category
            or s.price != e.price
        )
),

expire_old_records as (
    select
        e.product_key,
        e.product_id,
        e.product_name,
        e.category,
        e.price,
        e.effective_from,
        c.updated_at as effective_to,
        false as is_current,
        e.updated_at
    from existing_records e
    join changed_records c
        on e.product_id = c.product_id
),

new_versions as (
    select
        {{ dbt_utils.generate_surrogate_key(['c.product_id', 'c.updated_at']) }} as product_key,
        c.product_id,
        c.product_name,
        c.category,
        c.price,
        c.updated_at as effective_from,
        null::timestamp as effective_to,
        true as is_current,
        c.updated_at
    from changed_records c
),

combined as (
    select * from expire_old_records
    union all
    select * from new_versions
)

select * from combined

{% else %}

-- First load: all records are current
select
    {{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as product_key,
    product_id,
    product_name,
    category,
    price,
    updated_at as effective_from,
    null::timestamp as effective_to,
    true as is_current,
    updated_at
from source_data

{% endif %}
sql
-- models/marts/core/dim_products_scd.sql

{{ config(
    materialized='incremental',
    unique_key='product_key',
    merge_update_columns=['product_name', 'category', 'price', 'effective_to', 'is_current']
) }}

with source_data as (
    select
        product_id,
        product_name,
        category,
        price,
        updated_at
    from {{ source('ecommerce', 'products') }}

    {% if is_incremental() %}
        where updated_at > (select max(updated_at) from {{ this }} where is_current = true)
    {% endif %}
),

{% if is_incremental() %}
existing_records as (
    select *
    from {{ this }}
    where is_current = true
),

changed_records as (
    select
        s.product_id,
        s.product_name,
        s.category,
        s.price,
        s.updated_at
    from source_data s
    join existing_records e
        on s.product_id = e.product_id
        and (
            s.product_name != e.product_name
            or s.category != e.category
            or s.price != e.price
        )
),

expire_old_records as (
    select
        e.product_key,
        e.product_id,
        e.product_name,
        e.category,
        e.price,
        e.effective_from,
        c.updated_at as effective_to,
        false as is_current,
        e.updated_at
    from existing_records e
    join changed_records c
        on e.product_id = c.product_id
),

new_versions as (
    select
        {{ dbt_utils.generate_surrogate_key(['c.product_id', 'c.updated_at']) }} as product_key,
        c.product_id,
        c.product_name,
        c.category,
        c.price,
        c.updated_at as effective_from,
        null::timestamp as effective_to,
        true as is_current,
        c.updated_at
    from changed_records c
),

combined as (
    select * from expire_old_records
    union all
    select * from new_versions
)

select * from combined

{% else %}

-- 首次加载:所有记录都是当前版本
select
    {{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as product_key,
    product_id,
    product_name,
    category,
    price,
    updated_at as effective_from,
    null::timestamp as effective_to,
    true as is_current,
    updated_at
from source_data

{% endif %}

Example 13: Window Functions for Rankings

示例13:用于排名的窗口函数

sql
-- models/marts/analytics/customer_rfm_score.sql

with customer_metrics as (
    select
        customer_id,
        max(order_date) as last_order_date,
        count(order_id) as total_orders,
        sum(order_total) as total_revenue
    from {{ ref('fct_orders') }}
    where status = 'completed'
    group by 1
),

rfm_calculations as (
    select
        customer_id,
        -- Recency: Days since last order
        datediff('day', last_order_date, current_date) as recency_days,
        -- Frequency: Total orders
        total_orders as frequency,
        -- Monetary: Total revenue
        total_revenue as monetary,

        -- Recency score (1-5, lower days = higher score)
        ntile(5) over (order by datediff('day', last_order_date, current_date) desc) as recency_score,
        -- Frequency score (1-5, more orders = higher score)
        ntile(5) over (order by total_orders) as frequency_score,
        -- Monetary score (1-5, more revenue = higher score)
        ntile(5) over (order by total_revenue) as monetary_score
    from customer_metrics
),

rfm_segments as (
    select
        customer_id,
        recency_days,
        frequency,
        monetary,
        recency_score,
        frequency_score,
        monetary_score,
        recency_score * 100 + frequency_score * 10 + monetary_score as rfm_score,
        case
            when recency_score >= 4 and frequency_score >= 4 and monetary_score >= 4
                then 'Champions'
            when recency_score >= 3 and frequency_score >= 3 and monetary_score >= 3
                then 'Loyal Customers'
            when recency_score >= 4 and frequency_score <= 2 and monetary_score <= 2
                then 'Promising'
            when recency_score >= 3 and frequency_score <= 2 and monetary_score <= 2
                then 'Potential Loyalists'
            when recency_score <= 2 and frequency_score >= 3 and monetary_score >= 3
                then 'At Risk'
            when recency_score <= 2 and frequency_score <= 2 and monetary_score <= 2
                then 'Hibernating'
            when recency_score <= 1
                then 'Lost'
            else 'Need Attention'
        end as customer_segment
    from rfm_calculations
)

select * from rfm_segments
sql
-- models/marts/analytics/customer_rfm_score.sql

with customer_metrics as (
    select
        customer_id,
        max(order_date) as last_order_date,
        count(order_id) as total_orders,
        sum(order_total) as total_revenue
    from {{ ref('fct_orders') }}
    where status = 'completed'
    group by 1
),

rfm_calculations as (
    select
        customer_id,
        -- 最近一次购买:距离上次购买的天数
        datediff('day', last_order_date, current_date) as recency_days,
        -- 购买频率:总订单数
        total_orders as frequency,
        -- 消费金额:总收入
        total_revenue as monetary,

        -- 最近一次购买得分(1-5,天数越少得分越高)
        ntile(5) over (order by datediff('day', last_order_date, current_date) desc) as recency_score,
        -- 购买频率得分(1-5,订单数越多得分越高)
        ntile(5) over (order by total_orders) as frequency_score,
        -- 消费金额得分(1-5,收入越高得分越高)
        ntile(5) over (order by total_revenue) as monetary_score
    from customer_metrics
),

rfm_segments as (
    select
        customer_id,
        recency_days,
        frequency,
        monetary,
        recency_score,
        frequency_score,
        monetary_score,
        recency_score * 100 + frequency_score * 10 + monetary_score as rfm_score,
        case
            when recency_score >= 4 and frequency_score >= 4 and monetary_score >= 4
                then 'Champions'
            when recency_score >= 3 and frequency_score >= 3 and monetary_score >= 3
                then 'Loyal Customers'
            when recency_score >= 4 and frequency_score <= 2 and monetary_score <= 2
                then 'Promising'
            when recency_score >= 3 and frequency_score <= 2 and monetary_score <= 2
                then 'Potential Loyalists'
            when recency_score <= 2 and frequency_score >= 3 and monetary_score >= 3
                then 'At Risk'
            when recency_score <= 2 and frequency_score <= 2 and monetary_score <= 2
                then 'Hibernating'
            when recency_score <= 1
                then 'Lost'
            else 'Need Attention'
        end as customer_segment
    from rfm_calculations
)

select * from rfm_segments

Example 14: Union Multiple Sources

示例14:合并多个数据源

sql
-- models/staging/stg_all_events.sql

{{ config(
    materialized='view'
) }}

-- Union events from multiple sources using dbt_utils

{{
    dbt_utils.union_relations(
        relations=[
            ref('stg_web_events'),
            ref('stg_mobile_events'),
            ref('stg_api_events')
        ],
        exclude=['_loaded_at'],  -- Exclude source-specific columns
        source_column_name='event_source'  -- Add column to track source
    )
}}
sql
-- models/staging/stg_all_events.sql

{{ config(
    materialized='view'
) }}

-- 使用dbt_utils合并多个数据源的事件

{{
    dbt_utils.union_relations(
        relations=[
            ref('stg_web_events'),
            ref('stg_mobile_events'),
            ref('stg_api_events')
        ],
        exclude=['_loaded_at'],  -- 排除源特定列
        source_column_name='event_source'  -- 添加列跟踪数据源
    )
}}

Example 15: Surrogate Key Generation

示例15:代理键生成

sql
-- models/marts/core/fct_order_lines.sql

with order_lines as (
    select
        order_id,
        line_number,
        product_id,
        quantity,
        unit_price,
        quantity * unit_price as line_total
    from {{ source('ecommerce', 'order_lines') }}
)

select
    {{ dbt_utils.generate_surrogate_key(['order_id', 'line_number']) }} as order_line_key,
    {{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_key,
    {{ dbt_utils.generate_surrogate_key(['product_id']) }} as product_key,
    order_id,
    line_number,
    product_id,
    quantity,
    unit_price,
    line_total
from order_lines
sql
-- models/marts/core/fct_order_lines.sql

with order_lines as (
    select
        order_id,
        line_number,
        product_id,
        quantity,
        unit_price,
        quantity * unit_price as line_total
    from {{ source('ecommerce', 'order_lines') }}
)

select
    {{ dbt_utils.generate_surrogate_key(['order_id', 'line_number']) }} as order_line_key,
    {{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_key,
    {{ dbt_utils.generate_surrogate_key(['product_id']) }} as product_key,
    order_id,
    line_number,
    product_id,
    quantity,
    unit_price,
    line_total
from order_lines

Example 16: Date Spine for Time Series

示例16:时间序列的日期序列

sql
-- models/marts/analytics/daily_revenue_complete.sql

-- Generate complete date spine to ensure no missing dates

with date_spine as (
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('2020-01-01' as date)",
        end_date="cast(current_date as date)"
    ) }}
),

daily_revenue as (
    select
        date_trunc('day', order_date) as order_date,
        sum(order_total) as revenue
    from {{ ref('fct_orders') }}
    where status = 'completed'
    group by 1
),

complete_series as (
    select
        date_spine.date_day,
        coalesce(daily_revenue.revenue, 0) as revenue,
        -- 7-day moving average
        avg(coalesce(daily_revenue.revenue, 0)) over (
            order by date_spine.date_day
            rows between 6 preceding and current row
        ) as revenue_7d_ma,
        -- Month-to-date revenue
        sum(coalesce(daily_revenue.revenue, 0)) over (
            partition by date_trunc('month', date_spine.date_day)
            order by date_spine.date_day
        ) as revenue_mtd
    from date_spine
    left join daily_revenue
        on date_spine.date_day = daily_revenue.order_date
)

select * from complete_series
sql
-- models/marts/analytics/daily_revenue_complete.sql

-- 生成完整的日期序列以确保无缺失日期

with date_spine as (
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('2020-01-01' as date)",
        end_date="cast(current_date as date)"
    ) }}
),

daily_revenue as (
    select
        date_trunc('day', order_date) as order_date,
        sum(order_total) as revenue
    from {{ ref('fct_orders') }}
    where status = 'completed'
    group by 1
),

complete_series as (
    select
        date_spine.date_day,
        coalesce(daily_revenue.revenue, 0) as revenue,
        -- 7天移动平均
        avg(coalesce(daily_revenue.revenue, 0)) over (
            order by date_spine.date_day
            rows between 6 preceding and current row
        ) as revenue_7d_ma,
        -- 当月累计收入
        sum(coalesce(daily_revenue.revenue, 0)) over (
            partition by date_trunc('month', date_spine.date_day)
            order by date_spine.date_day
        ) as revenue_mtd
    from date_spine
    left join daily_revenue
        on date_spine.date_day = daily_revenue.order_date
)

select * from complete_series

Example 17: Custom Schema Macro Override

示例17:自定义模式宏覆盖

sql
-- macros/generate_schema_name.sql

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}

    {%- if target.name == 'prod' -%}
        -- Production: Use custom schema names directly
        {%- if custom_schema_name is not none -%}
            {{ custom_schema_name | trim }}
        {%- else -%}
            {{ default_schema }}
        {%- endif -%}

    {%- elif target.name == 'dev' -%}
        -- Development: Prefix with dev_username
        {%- if custom_schema_name is not none -%}
            dev_{{ env_var('DBT_USER', 'unknown') }}_{{ custom_schema_name | trim }}
        {%- else -%}
            dev_{{ env_var('DBT_USER', 'unknown') }}
        {%- endif -%}

    {%- else -%}
        -- Default: Concatenate target schema with custom schema
        {{ default_schema }}_{{ custom_schema_name | trim }}
    {%- endif -%}

{%- endmacro %}
sql
-- macros/generate_schema_name.sql

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}

    {%- if target.name == 'prod' -%}
        -- 生产环境:直接使用自定义模式名
        {%- if custom_schema_name is not none -%}
            {{ custom_schema_name | trim }}
        {%- else -%}
            {{ default_schema }}
        {%- endif -%}

    {%- elif target.name == 'dev' -%}
        -- 开发环境:前缀为dev_username
        {%- if custom_schema_name is not none -%}
            dev_{{ env_var('DBT_USER', 'unknown') }}_{{ custom_schema_name | trim }}
        {%- else -%}
            dev_{{ env_var('DBT_USER', 'unknown') }}
        {%- endif -%}

    {%- else -%}
        -- 默认:拼接目标模式和自定义模式
        {{ default_schema }}_{{ custom_schema_name | trim }}
    {%- endif -%}

{%- endmacro %}

Example 18: Cross-Database Query Macro

示例18:跨数据库查询宏

sql
-- macros/cross_db_concat.sql

-- Handle database-specific concat syntax

{% macro concat(fields) -%}
    {{ return(adapter.dispatch('concat', 'dbt_utils')(fields)) }}
{%- endmacro %}

{% macro default__concat(fields) -%}
    concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro snowflake__concat(fields) -%}
    {{ fields|join(' || ') }}
{%- endmacro %}

{% macro bigquery__concat(fields) -%}
    concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro redshift__concat(fields) -%}
    {{ fields|join(' || ') }}
{%- endmacro %}
Usage:
sql
select
    {{ concat(['first_name', "' '", 'last_name']) }} as full_name
from {{ ref('stg_customers') }}
sql
-- macros/cross_db_concat.sql

-- 处理数据库特定的拼接语法

{% macro concat(fields) -%}
    {{ return(adapter.dispatch('concat', 'dbt_utils')(fields)) }}
{%- endmacro %}

{% macro default__concat(fields) -%}
    concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro snowflake__concat(fields) -%}
    {{ fields|join(' || ') }}
{%- endmacro %}

{% macro bigquery__concat(fields) -%}
    concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro redshift__concat(fields) -%}
    {{ fields|join(' || ') }}
{%- endmacro %}
使用方式:
sql
select
    {{ concat(['first_name', "' '", 'last_name']) }} as full_name
from {{ ref('stg_customers') }}

Example 19: Pre-Hook and Post-Hook Configuration

示例19:预钩子和后钩子配置

sql
-- models/marts/core/fct_orders.sql

{{
    config(
        materialized='incremental',
        unique_key='order_id',
        pre_hook=[
            "delete from {{ this }} where order_date < dateadd('year', -3, current_date)",
            "{{ log('Starting incremental load for fct_orders', info=True) }}"
        ],
        post_hook=[
            "create index if not exists idx_fct_orders_customer_id on {{ this }}(customer_id)",
            "create index if not exists idx_fct_orders_order_date on {{ this }}(order_date)",
            "{{ grant_select(this, 'analyst_role') }}",
            "{{ log('Completed incremental load for fct_orders', info=True) }}"
        ],
        tags=['core', 'incremental']
    )
}}

select * from {{ ref('stg_orders') }}

{% if is_incremental() %}
    where order_date > (select max(order_date) from {{ this }})
{% endif %}
sql
-- models/marts/core/fct_orders.sql

{{
    config(
        materialized='incremental',
        unique_key='order_id',
        pre_hook=[
            "delete from {{ this }} where order_date < dateadd('year', -3, current_date)",
            "{{ log('Starting incremental load for fct_orders', info=True) }}"
        ],
        post_hook=[
            "create index if not exists idx_fct_orders_customer_id on {{ this }}(customer_id)",
            "create index if not exists idx_fct_orders_order_date on {{ this }}(order_date)",
            "{{ grant_select(this, 'analyst_role') }}",
            "{{ log('Completed incremental load for fct_orders', info=True) }}"
        ],
        tags=['core', 'incremental']
    )
}}

select * from {{ ref('stg_orders') }}

{% if is_incremental() %}
    where order_date > (select max(order_date) from {{ this }})
{% endif %}

Example 20: Exposure Definition

示例20:暴露对象定义

yaml
undefined
yaml
undefined

models/exposures.yml

models/exposures.yml

version: 2
exposures:
  • name: customer_dashboard description: | Executive dashboard showing customer metrics including:
    owner: name: Analytics Team email: analytics@company.com
    depends_on:
    • ref('fct_orders')
    • ref('dim_customers')
    • ref('customer_rfm_score')
    • ref('cohort_retention')
    tags: ['executive', 'customer-analytics']
  • name: revenue_forecast_model description: | Machine learning model for revenue forecasting. Uses historical order data to predict future revenue. type: ml maturity: medium url: https://mlflow.company.com/models/revenue-forecast
    owner: name: Data Science Team email: datascience@company.com
    depends_on:
    • ref('fct_orders')
    • ref('daily_revenue_complete')
    tags: ['ml', 'forecasting']
undefined
version: 2
exposures:
  • name: customer_dashboard description: | 高管仪表盘,显示客户指标,包括:
    owner: name: Analytics Team email: analytics@company.com
    depends_on:
    • ref('fct_orders')
    • ref('dim_customers')
    • ref('customer_rfm_score')
    • ref('cohort_retention')
    tags: ['executive', 'customer-analytics']
  • name: revenue_forecast_model description: | 收入预测的机器学习模型。 使用历史订单数据预测未来收入。 type: ml maturity: medium url: https://mlflow.company.com/models/revenue-forecast
    owner: name: Data Science Team email: datascience@company.com
    depends_on:
    • ref('fct_orders')
    • ref('daily_revenue_complete')
    tags: ['ml', 'forecasting']
undefined

Quick Reference Commands

快速参考命令

Essential dbt Commands

核心dbt命令

bash
undefined
bash
undefined

Install dependencies

安装依赖

dbt deps
dbt deps

Compile project (check for errors)

编译项目(检查错误)

dbt compile
dbt compile

Run all models

运行所有模型

dbt run
dbt run

Run specific model

运行特定模型

dbt run --select fct_orders
dbt run --select fct_orders

Run model and downstream dependencies

运行模型和下游依赖

dbt run --select fct_orders+
dbt run --select fct_orders+

Run model and upstream dependencies

运行模型和上游依赖

dbt run --select +fct_orders
dbt run --select +fct_orders

Run model and all dependencies

运行模型和所有依赖

dbt run --select +fct_orders+
dbt run --select +fct_orders+

Run all models in a directory

运行目录中的所有模型

dbt run --select staging.*
dbt run --select staging.*

Run models with specific tag

运行带有特定标签的模型

dbt run --select tag:daily
dbt run --select tag:daily

Run models, exclude specific ones

运行模型,排除特定模型

dbt run --exclude staging.*
dbt run --exclude staging.*

Run with full refresh (incremental models)

全量刷新运行(增量模型)

dbt run --full-refresh
dbt run --full-refresh

Test all models

测试所有模型

dbt test
dbt test

Test specific model

测试特定模型

dbt test --select fct_orders
dbt test --select fct_orders

Generate documentation

生成文档

dbt docs generate
dbt docs generate

Serve documentation

启动文档服务

dbt docs serve
dbt docs serve

Debug connection

调试连接

dbt debug
dbt debug

Clean compiled files

清理编译文件

dbt clean
dbt clean

Seed CSV files

加载CSV种子数据

dbt seed
dbt seed

Snapshot models

快照模型

dbt snapshot
dbt snapshot

List resources

列出资源

dbt ls --select staging.*
dbt ls --select staging.*

Show compiled SQL

显示编译后的SQL

dbt show --select fct_orders
dbt show --select fct_orders

Parse project

解析项目

dbt parse
undefined
dbt parse
undefined

Model Selection Syntax

模型选择语法

bash
undefined
bash
undefined

By name

按名称

--select model_name
--select model_name

By path

按路径

--select staging.jaffle_shop.*
--select staging.jaffle_shop.*

By tag

按标签

--select tag:daily
--select tag:daily

By resource type

按资源类型

--select resource_type:model
--select resource_type:model

By package

按包

--select package:dbt_utils
--select package:dbt_utils

By status (modified, new)

按状态(修改、新增)

--select state:modified+ --state ./prod_manifest
--select state:modified+ --state ./prod_manifest

Combinations (union)

组合(并集)

--select model_a model_b
--select model_a model_b

Intersections

交集

--select tag:daily,staging.*
--select tag:daily,staging.*

Graph operators

图操作符

--select +model_name # Upstream dependencies --select model_name+ # Downstream dependencies --select +model_name+ # All dependencies --select @model_name # Model + children/parents to nth degree
undefined
--select +model_name # 上游依赖 --select model_name+ # 下游依赖 --select +model_name+ # 所有依赖 --select @model_name # 模型及其n级子/父依赖
undefined

Resources

资源


Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Data Engineering, Analytics Engineering, Data Transformation Compatible With: dbt Core 1.0+, dbt Cloud, Snowflake, BigQuery, Redshift, Postgres, Databricks

技能版本:1.0.0 最后更新:2025年10月 技能分类:数据工程、分析工程、数据转换 兼容版本:dbt Core 1.0+, dbt Cloud, Snowflake, BigQuery, Redshift, Postgres, Databricks