Loading...
Loading...
Expert-level dbt (data build tool), models, tests, documentation, incremental models, macros, and Jinja templating
npx skill4agent add personamanagmentlayer/pcl dbt-expertname: 'analytics'
version: '1.0.0'
config-version: 2
profile: 'analytics'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
analytics:
# Staging models (source system copies)
staging:
+materialized: view
+schema: staging
+tags: ["staging"]
# Intermediate models (business logic)
intermediate:
+materialized: ephemeral
+schema: intermediate
+tags: ["intermediate"]
# Mart models (final tables for BI)
marts:
+materialized: table
+schema: marts
+tags: ["marts"]
finance:
+schema: finance
marketing:
+schema: marketing
# Model-specific configs
models:
staging:
+persist_docs:
relation: true
columns: true
vars:
# Global variables
start_date: '2024-01-01'
exclude_test_data: true
on-run-start:
- "{{ log('Starting dbt run...', info=true) }}"
on-run-end:
- "{{ log('dbt run completed!', info=true) }}"analytics:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: "{{ env_var('DBT_USER') }}"
password: "{{ env_var('DBT_PASSWORD') }}"
dbname: analytics_dev
schema: dbt_{{ env_var('USER') }}
threads: 4
keepalives_idle: 0
prod:
type: postgres
host: prod-db.company.com
port: 5432
user: "{{ env_var('DBT_PROD_USER') }}"
password: "{{ env_var('DBT_PROD_PASSWORD') }}"
dbname: analytics_prod
schema: analytics
threads: 8
keepalives_idle: 0
snowflake:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: transformer
database: analytics
warehouse: transforming
schema: dbt_{{ env_var('USER') }}
threads: 8version: 2
sources:
- name: raw_postgres
description: Raw data from production PostgreSQL database
database: production
schema: public
tables:
- name: users
description: User account information
columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: email
description: User email address
tests:
- unique
- not_null
- name: created_at
description: Account creation timestamp
tests:
- not_null
# Freshness checks
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
# Loaded at timestamp
loaded_at_field: _synced_at
- name: orders
description: Order transactions
columns:
- name: id
tests:
- unique
- not_null
- name: user_id
description: Foreign key to users
tests:
- not_null
- relationships:
to: source('raw_postgres', 'users')
field: id
- name: total_amount
tests:
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: raw_s3
description: Raw data files from S3
meta:
external_location: 's3://company-data/raw/'
tables:
- name: events
description: Event tracking data
external:
location: 's3://company-data/raw/events/'
file_format: parquet-- models/staging/stg_users.sql
{{
config(
materialized='view',
tags=['daily']
)
}}
with source as (
select * from {{ source('raw_postgres', 'users') }}
),
renamed as (
select
-- Primary key
id as user_id,
-- Attributes
email,
first_name,
last_name,
{{ dbt_utils.generate_surrogate_key(['email']) }} as user_key,
-- Flags
is_active,
is_deleted,
-- Timestamps
created_at,
updated_at,
deleted_at,
-- Metadata
_synced_at as dbt_loaded_at
from source
where not is_deleted or deleted_at is null
)
select * from renamed
-- models/staging/stg_orders.sql
{{
config(
materialized='view'
)
}}
with source as (
select * from {{ source('raw_postgres', 'orders') }}
),
renamed as (
select
-- Primary key
id as order_id,
-- Foreign keys
user_id,
-- Metrics
total_amount,
tax_amount,
shipping_amount,
total_amount - tax_amount - shipping_amount as subtotal,
-- Dimensions
status,
payment_method,
-- Timestamps
created_at as order_created_at,
updated_at as order_updated_at,
completed_at
from source
)
select * from renamed-- models/intermediate/int_order_items_joined.sql
{{
config(
materialized='ephemeral'
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select * from {{ ref('stg_order_items') }}
),
products as (
select * from {{ ref('stg_products') }}
),
joined as (
select
orders.order_id,
orders.user_id,
orders.order_created_at,
order_items.order_item_id,
order_items.quantity,
order_items.unit_price,
products.product_id,
products.product_name,
products.category,
order_items.quantity * order_items.unit_price as line_total
from orders
inner join order_items
on orders.order_id = order_items.order_id
inner join products
on order_items.product_id = products.product_id
)
select * from joined-- models/marts/fct_orders.sql
{{
config(
materialized='table',
tags=['fact']
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select
order_id,
count(*) as item_count,
sum(quantity) as total_quantity,
sum(line_total) as items_subtotal
from {{ ref('int_order_items_joined') }}
group by order_id
),
final as (
select
-- Primary key
orders.order_id,
-- Foreign keys
orders.user_id,
-- Metrics
orders.total_amount,
orders.subtotal,
orders.tax_amount,
orders.shipping_amount,
order_items.item_count,
order_items.total_quantity,
-- Dimensions
orders.status,
orders.payment_method,
-- Timestamps
orders.order_created_at,
orders.completed_at,
-- Metadata
current_timestamp() as dbt_updated_at
from orders
left join order_items
on orders.order_id = order_items.order_id
)
select * from final
-- models/marts/dim_customers.sql
{{
config(
materialized='table',
tags=['dimension']
)
}}
with users as (
select * from {{ ref('stg_users') }}
),
orders as (
select * from {{ ref('fct_orders') }}
),
customer_orders as (
select
user_id,
count(*) as lifetime_orders,
sum(total_amount) as lifetime_value,
avg(total_amount) as avg_order_value,
min(order_created_at) as first_order_at,
max(order_created_at) as last_order_at,
max(completed_at) as last_completed_at
from orders
where status = 'completed'
group by user_id
),
final as (
select
-- Primary key
users.user_id,
users.user_key,
-- Attributes
users.email,
users.first_name,
users.last_name,
users.first_name || ' ' || users.last_name as full_name,
-- Customer metrics
coalesce(customer_orders.lifetime_orders, 0) as lifetime_orders,
coalesce(customer_orders.lifetime_value, 0) as lifetime_value,
customer_orders.avg_order_value,
-- Segmentation
case
when customer_orders.lifetime_value >= 10000 then 'VIP'
when customer_orders.lifetime_value >= 5000 then 'High Value'
when customer_orders.lifetime_value >= 1000 then 'Medium Value'
when customer_orders.lifetime_value > 0 then 'Low Value'
else 'No Orders'
end as customer_segment,
-- Timestamps
users.created_at as user_created_at,
customer_orders.first_order_at,
customer_orders.last_order_at,
-- Metadata
current_timestamp() as dbt_updated_at
from users
left join customer_orders
on users.user_id = customer_orders.user_id
where users.is_active
)
select * from final-- models/marts/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
on_schema_change='fail',
incremental_strategy='merge'
)
}}
with events as (
select * from {{ ref('stg_events') }}
{% if is_incremental() %}
-- Only load new events
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
),
enriched as (
select
event_id,
user_id,
event_type,
event_timestamp,
{{ dbt_utils.generate_surrogate_key(['user_id', 'event_timestamp']) }} as event_key,
properties,
current_timestamp() as dbt_loaded_at
from events
)
select * from enriched
-- Incremental with delete + insert
{{
config(
materialized='incremental',
unique_key='date',
incremental_strategy='delete+insert'
)
}}
with daily_metrics as (
select
date_trunc('day', order_created_at) as date,
count(*) as order_count,
sum(total_amount) as revenue
from {{ ref('fct_orders') }}
{% if is_incremental() %}
where date_trunc('day', order_created_at) >= date_trunc('day', current_date - interval '7 days')
{% endif %}
group by 1
)
select * from daily_metrics# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: Order transactions fact table
columns:
- name: order_id
description: Unique order identifier
tests:
- unique
- not_null
- name: user_id
description: Customer identifier
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: user_id
- name: total_amount
description: Order total amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: dim_customers
description: Customer dimension table
tests:
# Table-level test
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- user_id
- email-- tests/assert_positive_revenue.sql
-- This test fails if any daily revenue is negative
select
date,
sum(total_amount) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by date
having sum(total_amount) < 0
-- tests/assert_order_counts_match.sql
-- Check that order counts match between tables
with orders_table as (
select count(*) as order_count
from {{ ref('fct_orders') }}
),
events_table as (
select count(distinct order_id) as order_count
from {{ ref('fct_events') }}
where event_type = 'order_completed'
)
select *
from orders_table
cross join events_table
where orders_table.order_count != events_table.order_count-- tests/generic/test_valid_percentage.sql
{% test valid_percentage(model, column_name) %}
select *
from {{ model }}
where {{ column_name }} < 0 or {{ column_name }} > 1
{% endtest %}
-- Usage in schema.yml
# - name: conversion_rate
# tests:
# - valid_percentage-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
({{ column_name }} / 100.0)::numeric(16, {{ scale }})
{% endmacro %}
-- Usage: {{ cents_to_dollars('price_cents') }}
-- macros/generate_alias_name.sql
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{%- if custom_alias_name is none -%}
{{ node.name }}
{%- else -%}
{{ custom_alias_name | trim }}
{%- endif -%}
{%- endmacro %}
-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
)
select date_day
from date_spine
{% endmacro %}
-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
{% set sql %}
grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}
-- Usage in on-run-end hook
-- {{ grant_select('analytics', 'analyst') }}-- macros/pivot_metrics.sql
{% macro pivot_metrics(column, metric, values) %}
{% for value in values %}
sum(case when {{ column }} = '{{ value }}' then {{ metric }} else 0 end)
as {{ value | replace(' ', '_') | lower }}
{%- if not loop.last -%},{%- endif %}
{% endfor %}
{% endmacro %}
-- Usage:
-- select
-- date,
-- {{ pivot_metrics('status', 'total_amount', ['pending', 'completed', 'cancelled']) }}
-- from orders
-- group by date
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' and custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}-- snapshots/orders_snapshot.sql
{% snapshot orders_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='order_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
select * from {{ source('raw_postgres', 'orders') }}
{% endsnapshot %}-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='check',
check_cols=['email', 'status', 'plan_type'],
invalidate_hard_deletes=True
)
}}
select * from {{ source('raw_postgres', 'customers') }}
{% endsnapshot %}# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: |
# Order Transactions Fact Table
This table contains one row per order with associated metrics and dimensions.
## Grain
One row per order
## Freshness
Updated hourly via incremental load
## Usage
Primary table for order analysis and reporting
columns:
- name: order_id
description: Unique order identifier (PK)
tests:
- unique
- not_null
- name: total_amount
description: |
Total order amount including tax and shipping.
Formula: `subtotal + tax_amount + shipping_amount`
- name: customer_segment
description: Customer value segment
meta:
dimension:
type: category
label: Customer Segment<!-- docs/overview.md -->
{% docs __overview__ %}
# Analytics dbt Project
This dbt project transforms raw data from our production systems into
analytics-ready models for BI and data science use cases.
## Data Sources
- PostgreSQL (production database)
- S3 (event tracking)
- Snowflake (external data)
## Model Layers
1. **Staging**: Light transformations, renaming
2. **Intermediate**: Business logic, joins
3. **Marts**: Final tables for consumption
{% enddocs %}packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
- package: calogica/dbt_expectations
version: 0.10.0
- package: dbt-labs/codegen
version: 0.12.1
- git: "https://github.com/dbt-labs/dbt-audit-helper.git"
revision: 0.9.0-- Using dbt_utils
select
{{ dbt_utils.generate_surrogate_key(['user_id', 'order_id']) }} as order_key,
{{ dbt_utils.safe_divide('revenue', 'orders') }} as avg_order_value,
{{ dbt_utils.star(from=ref('stg_orders'), except=['_synced_at']) }}
from {{ ref('stg_orders') }}
-- Using dbt_expectations
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100-- Bad: Many nested CTEs
with cte1 as (...), cte2 as (...), cte3 as (...)
-- 20 more CTEs
select * from cte23
-- Good: Break into intermediate models
select * from {{ ref('int_cleaned_data') }}-- Bad: Direct table reference
select * from analytics.staging.stg_orders
-- Good: Use ref
select * from {{ ref('stg_orders') }}-- Bad: No tests
-- Good: Always test PKs and FKs
columns:
- name: id
tests: [unique, not_null]-- Bad: Hardcoded date
where created_at >= '2024-01-01'
-- Good: Use variables
where created_at >= '{{ var("start_date") }}'