data-pipelines

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese
You are building data pipelines using a DuckDB-centric stack. The tools, in typical execution order: dlt (extract + load) → sqlmesh (transform) → DuckDB/MotherDuck (query engine) → polars (DataFrame work) → marimo (notebooks/apps). uv manages Python projects and dependencies.
你将构建以DuckDB为核心的技术栈来搭建数据流水线。各工具的典型执行顺序为:dlt(提取+加载)→ sqlmesh(转换)→ DuckDB/MotherDuck(查询引擎)→ polars(DataFrame处理)→ marimo(笔记本/应用)。uv用于管理Python项目及依赖。

Language Preference

语言偏好

SQL first (DuckDB dialect), then Python, then bash. Use the simplest language that gets the job done.
优先使用SQL(DuckDB方言),其次是Python,最后是bash。选择能完成任务的最简单语言。

uv — Project Management

uv — 项目管理

Never use pip directly. All Python work goes through uv.
bash
uv init my-project                    # New project
uv add "dlt[duckdb]" sqlmesh polars   # Add dependencies
uv sync                               # Install into .venv
uv run python pipeline.py             # Run in project venv
uv run --with requests script.py      # Ad-hoc dependency
Inline script dependencies (PEP 723) for standalone scripts:
python
undefined
切勿直接使用pip。所有Python工作都需通过uv完成。
bash
uv init my-project                    # New project
uv add "dlt[duckdb]" sqlmesh polars   # Add dependencies
uv sync                               # Install into .venv
uv run python pipeline.py             # Run in project venv
uv run --with requests script.py      # Ad-hoc dependency
独立脚本的内联依赖声明(PEP 723):
python
undefined

/// script

/// script

dependencies = ["dlt[duckdb]", "polars"]

dependencies = ["dlt[duckdb]", "polars"]

requires-python = ">=3.12"

requires-python = ">=3.12"

///

///


Run with `uv run script.py` — deps are resolved automatically.

Always commit `uv.lock`. Use `pyproject.toml` for dependency declarations, never `requirements.txt`.

使用`uv run script.py`运行——依赖会自动解析。

务必提交`uv.lock`文件。使用`pyproject.toml`声明依赖,切勿使用`requirements.txt`。

dlt — Extract + Load

dlt — 提取+加载

dlt handles ingestion: API calls, pagination, schema inference, incremental loading, and state management.
dlt负责数据摄入:API调用、分页、 schema推断、增量加载及状态管理。

Scaffold and Run

脚手架搭建与运行

bash
dlt init rest_api duckdb             # Scaffold pipeline
uv run python pipeline.py           # Run extraction
dlt pipeline <name> info             # Inspect state
dlt pipeline <name> schema           # View inferred schema
bash
dlt init rest_api duckdb             # Scaffold pipeline
uv run python pipeline.py           # Run extraction
dlt pipeline <name> info             # Inspect state
dlt pipeline <name> schema           # View inferred schema

Pipeline Patterns

流水线模式

Minimal pipeline:
python
import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb",
    dataset_name="raw",
)
info = pipeline.run(data, table_name="events")
Incremental loading:
python
@dlt.resource(write_disposition="merge", primary_key="id")
def users(updated_at=dlt.sources.incremental("updated_at")):
    yield from fetch_users(since=updated_at.last_value)
REST API source (declarative):
python
from dlt.sources.rest_api import rest_api_source

source = rest_api_source({
    "client": {"base_url": "https://api.example.com/v1"},
    "resource_defaults": {"primary_key": "id", "write_disposition": "merge"},
    "resources": [
        "users",
        {
            "name": "events",
            "write_disposition": "append",
            "endpoint": {
                "path": "events",
                "incremental": {"cursor_path": "created_at", "initial_value": "2024-01-01"},
            },
        },
    ],
})
Write dispositions:
DispositionBehaviorUse For
append
Insert rows (default)Immutable events, logs
replace
Drop and recreateSmall lookup tables
merge
Upsert by
primary_key
Mutable records
Destinations:
duckdb
(local file),
motherduck
(cloud). Set
motherduck_token
env var or configure in
.dlt/secrets.toml
.
最简流水线:
python
import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb",
    dataset_name="raw",
)
info = pipeline.run(data, table_name="events")
增量加载:
python
@dlt.resource(write_disposition="merge", primary_key="id")
def users(updated_at=dlt.sources.incremental("updated_at")):
    yield from fetch_users(since=updated_at.last_value)
REST API数据源(声明式):
python
from dlt.sources.rest_api import rest_api_source

source = rest_api_source({
    "client": {"base_url": "https://api.example.com/v1"},
    "resource_defaults": {"primary_key": "id", "write_disposition": "merge"},
    "resources": [
        "users",
        {
            "name": "events",
            "write_disposition": "append",
            "endpoint": {
                "path": "events",
                "incremental": {"cursor_path": "created_at", "initial_value": "2024-01-01"},
            },
        },
    ],
})
写入策略:
Disposition行为适用场景
append
插入行(默认)不可变事件、日志
replace
删除并重建表小型查找表
merge
primary_key
进行Upsert
可变记录
目标存储:
duckdb
(本地文件)、
motherduck
(云端)。设置
motherduck_token
环境变量,或在
.dlt/secrets.toml
中配置。

Project Structure

项目结构

.dlt/
  config.toml          # Pipeline config
  secrets.toml         # Credentials (gitignored)
<source>_pipeline.py
.dlt/
  config.toml          # Pipeline config
  secrets.toml         # Credentials (gitignored)
<source>_pipeline.py

sqlmesh — Transform

sqlmesh — 转换

SQL-first transformation framework. Models are SQL files with a header block. Plan/apply workflow — no accidental production changes.
优先使用SQL的转换框架。模型是带有头部块的SQL文件。采用规划/应用工作流——避免意外修改生产环境数据。

Scaffold and Run

脚手架搭建与运行

bash
sqlmesh init duckdb                              # New project
sqlmesh init -t dlt --dlt-pipeline <name>        # From dlt schema
sqlmesh plan                                     # Preview + apply (dev)
sqlmesh plan prod                                # Promote to production
sqlmesh fetchdf "SELECT * FROM analytics.users"  # Ad-hoc query
sqlmesh test                                     # Run unit tests
sqlmesh ui                                       # Web interface
bash
sqlmesh init duckdb                              # New project
sqlmesh init -t dlt --dlt-pipeline <name>        # From dlt schema
sqlmesh plan                                     # Preview + apply (dev)
sqlmesh plan prod                                # Promote to production
sqlmesh fetchdf "SELECT * FROM analytics.users"  # Ad-hoc query
sqlmesh test                                     # Run unit tests
sqlmesh ui                                       # Web interface

Model Kinds

模型类型

KindBehaviorUse For
FULL
Rewrite entire tableSmall dimension tables
INCREMENTAL_BY_TIME_RANGE
Process new time intervalsFacts, events, logs
INCREMENTAL_BY_UNIQUE_KEY
Upsert by keyMutable dimensions
SEED
Static CSV dataReference/lookup data
VIEW
SQL viewSimple pass-throughs
SCD_TYPE_2
Slowly changing dimensionsHistorical tracking
Kind行为适用场景
FULL
重写整个表小型维度表
INCREMENTAL_BY_TIME_RANGE
处理新的时间区间事实表、事件、日志
INCREMENTAL_BY_UNIQUE_KEY
按键进行Upsert可变维度表
SEED
静态CSV数据参考/查找数据
VIEW
SQL视图简单透传
SCD_TYPE_2
缓慢变化维度历史追踪

Model Example

模型示例

sql
MODEL (
    name analytics.stg_events,
    kind INCREMENTAL_BY_TIME_RANGE (time_column event_date),
    cron '@daily',
    grain (event_id),
    audits (NOT_NULL(columns=[event_id]))
);

SELECT
    event_id,
    user_id,
    event_type,
    event_date
FROM raw.events
WHERE event_date BETWEEN @start_date AND @end_date
sql
MODEL (
    name analytics.stg_events,
    kind INCREMENTAL_BY_TIME_RANGE (time_column event_date),
    cron '@daily',
    grain (event_id),
    audits (NOT_NULL(columns=[event_id]))
);

SELECT
    event_id,
    user_id,
    event_type,
    event_date
FROM raw.events
WHERE event_date BETWEEN @start_date AND @end_date

Config (
config.yaml
)

配置文件 (
config.yaml
)

yaml
gateways:
  local:
    connection:
      type: duckdb
      database: db.duckdb
default_gateway: local
model_defaults:
  dialect: duckdb
yaml
gateways:
  local:
    connection:
      type: duckdb
      database: db.duckdb
default_gateway: local
model_defaults:
  dialect: duckdb

dlt Integration

与dlt集成

sqlmesh init -t dlt
auto-generates external models and incremental staging models from dlt's inferred schema. Schema changes from dlt are detected by
sqlmesh plan
.
sqlmesh init -t dlt
会从dlt推断的schema自动生成外部模型和增量 staging模型。dlt的schema变更会被
sqlmesh plan
检测到。

DuckDB — Query Engine

DuckDB — 查询引擎

DuckDB is the shared SQL engine across the entire stack. Use DuckDB-specific syntax freely.
DuckDB是整个技术栈共享的SQL引擎。可自由使用DuckDB特定语法。

CLI

命令行界面

bash
duckdb                              # In-memory
duckdb my_data.db                   # Persistent local
duckdb md:my_db                     # MotherDuck
duckdb -c "SELECT 42"              # One-shot
bash
duckdb                              # In-memory
duckdb my_data.db                   # Persistent local
duckdb md:my_db                     # MotherDuck
duckdb -c "SELECT 42"              # One-shot

DuckDB SQL Syntax

DuckDB SQL语法

Friendly SQL:
sql
FROM my_table;                                          -- Implicit SELECT *
FROM my_table SELECT col1, col2 WHERE col3 > 5;        -- FROM-first
SELECT * EXCLUDE (internal_id) FROM events;             -- Drop columns
SELECT * REPLACE (amount / 100.0 AS amount) FROM txns;  -- Transform in-place
SELECT category, SUM(amount) FROM sales GROUP BY ALL;    -- Infer GROUP BY
Read files directly (no import step):
sql
SELECT * FROM 'data.parquet';
SELECT * FROM read_csv('data.csv', header=true);
SELECT * FROM 's3://bucket/path/*.parquet';
COPY (SELECT * FROM events) TO 'output.parquet' (FORMAT PARQUET);
Nested types:
sql
SELECT {'name': 'Alice', 'age': 30} AS person;
SELECT [1, 2, 3] AS nums;
SELECT list_filter([1, 2, 3, 4], x -> x > 2);
Useful commands:
sql
DESCRIBE SELECT * FROM events;
SUMMARIZE events;
便捷SQL:
sql
FROM my_table;                                          -- Implicit SELECT *
FROM my_table SELECT col1, col2 WHERE col3 > 5;        -- FROM-first
SELECT * EXCLUDE (internal_id) FROM events;             -- Drop columns
SELECT * REPLACE (amount / 100.0 AS amount) FROM txns;  -- Transform in-place
SELECT category, SUM(amount) FROM sales GROUP BY ALL;    -- Infer GROUP BY
直接读取文件(无需导入步骤):
sql
SELECT * FROM 'data.parquet';
SELECT * FROM read_csv('data.csv', header=true);
SELECT * FROM 's3://bucket/path/*.parquet';
COPY (SELECT * FROM events) TO 'output.parquet' (FORMAT PARQUET);
嵌套类型:
sql
SELECT {'name': 'Alice', 'age': 30} AS person;
SELECT [1, 2, 3] AS nums;
SELECT list_filter([1, 2, 3, 4], x -> x > 2);
实用命令:
sql
DESCRIBE SELECT * FROM events;
SUMMARIZE events;

MotherDuck

MotherDuck

sql
ATTACH 'md:';              -- All databases
ATTACH 'md:my_db';         -- Specific database
Auth via
motherduck_token
env var. Cross-database queries work:
SELECT * FROM local_db.main.t1 JOIN md:cloud_db.main.t2 USING (id)
.
sql
ATTACH 'md:';              # All databases
ATTACH 'md:my_db';         # Specific database
通过
motherduck_token
环境变量进行认证。支持跨数据库查询:
SELECT * FROM local_db.main.t1 JOIN md:cloud_db.main.t2 USING (id)

polars — DataFrames

polars — DataFrames

Use polars when Python logic is needed — complex string transforms, ML features, row-level conditionals. For joins, aggregations, and window functions, prefer SQL.
当需要Python逻辑时使用polars——复杂字符串转换、机器学习特征、行级条件判断。对于连接、聚合和窗口函数,优先使用SQL。

Key Patterns

核心模式

python
import polars as pl
python
import polars as pl

Lazy evaluation (always prefer for production)

Lazy evaluation (always prefer for production)

lf = pl.scan_parquet("events/*.parquet") result = ( lf.filter(pl.col("event_date") >= "2024-01-01") .group_by("user_id") .agg(pl.col("amount").sum().alias("total_spend")) .sort("total_spend", descending=True) .collect() )
lf = pl.scan_parquet("events/*.parquet") result = ( lf.filter(pl.col("event_date") >= "2024-01-01") .group_by("user_id") .agg(pl.col("amount").sum().alias("total_spend")) .sort("total_spend", descending=True) .collect() )

Three contexts

Three contexts

df.select(...) # Pick/transform columns (output has ONLY these) df.with_columns(...) # Add/overwrite columns (keeps all originals) df.filter(...) # Keep rows matching condition

**DuckDB interop (zero-copy via Arrow):**

```python
import duckdb
result = duckdb.sql("SELECT * FROM df WHERE amount > 100").pl()
df.select(...) # Pick/transform columns (output has ONLY these) df.with_columns(...) # Add/overwrite columns (keeps all originals) df.filter(...) # Keep rows matching condition

**与DuckDB互操作(通过Arrow零拷贝):**

```python
import duckdb
result = duckdb.sql("SELECT * FROM df WHERE amount > 100").pl()

marimo — Notebooks

marimo — 笔记本

Reactive Python notebooks stored as plain
.py
files. Cells auto-re-execute when dependencies change.
bash
marimo edit notebook.py              # Create/edit
marimo run notebook.py               # Serve as app
marimo convert notebook.ipynb -o out.py  # From Jupyter
SQL cells use DuckDB by default and return polars DataFrames:
python
result = mo.sql(f"""
    SELECT * FROM events
    WHERE event_date >= '{start_date}'
""")
Python variables and polars DataFrames are queryable from SQL cells and vice versa.
以纯
.py
文件存储的响应式Python笔记本。当依赖项变更时,单元格会自动重新执行。
bash
marimo edit notebook.py              # Create/edit
marimo run notebook.py               # Serve as app
marimo convert notebook.ipynb -o out.py  # From Jupyter
SQL单元格默认使用DuckDB,并返回polars DataFrames:
python
result = mo.sql(f"""
    SELECT * FROM events
    WHERE event_date >= '{start_date}'
""")
Python变量和polars DataFrames可在SQL单元格中查询,反之亦然。

Typical Pipeline Flow

典型流水线流程

  1. uv init
    +
    uv add "dlt[duckdb]" "sqlmesh[duckdb]" polars marimo
  2. dlt init rest_api duckdb
    — scaffold extraction
  3. uv run python pipeline.py
    — dlt loads raw data into DuckDB
  4. sqlmesh init -t dlt --dlt-pipeline <name>
    — generate transform models
  5. Write SQL models →
    sqlmesh plan
    — transform raw into analytics
  6. marimo edit analysis.py
    — explore with SQL cells and polars
  7. For production: swap destination to
    motherduck
    ,
    sqlmesh plan prod
  1. uv init
    +
    uv add "dlt[duckdb]" "sqlmesh[duckdb]" polars marimo
  2. dlt init rest_api duckdb
    — 搭建数据提取脚手架
  3. uv run python pipeline.py
    — dlt将原始数据加载到DuckDB中
  4. sqlmesh init -t dlt --dlt-pipeline <name>
    — 生成转换模型
  5. 编写SQL模型 →
    sqlmesh plan
    — 将原始数据转换为分析用数据
  6. marimo edit analysis.py
    — 使用SQL单元格和polars进行探索分析
  7. 生产环境:将目标存储切换为
    motherduck
    ,执行
    sqlmesh plan prod