dlt-skill

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

dlt Pipeline Creator

dlt 管道创建工具

Choose pipeline type with the decision tree below; then follow the Core Workflow.
Quick start: 1) Use the decision tree. 2) Follow the Core Workflow. 3) Use patterns and references as needed.
通过下方的决策树选择管道类型,然后遵循核心工作流程操作。
快速开始:1) 使用决策树。2) 遵循核心工作流程。3) 按需使用模式示例与参考文档。

Pipeline Type Decision Tree

管道类型决策树

When a user requests a dlt pipeline, determine which type to create:
START: User wants to create a dlt pipeline
├─→ Is there a dlt verified source available for this platform?
│   (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│   │
│   YES → Use VERIFIED SOURCE approach
│   │     Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│   │     Action: Guide user through `dlt init <source> <destination>`
│   │
│   NO → Continue to next question
├─→ Is this a REST API with standard patterns?
│   (Standard auth, pagination, JSON responses)
│   │
│   YES → Use DECLARATIVE REST API approach
│   │     Examples: Pokemon API, simple REST APIs with clear endpoints
│   │     Action: Create config-based pipeline with rest_api_source
│   │
│   NO → Continue to next question
└─→ Does this require custom logic or Python packages?
    YES → Use CUSTOM PYTHON approach
          Examples: Python packages (simple-salesforce), complex transformations,
                   non-standard APIs, custom data sources
          Action: Create custom source with @dlt.source and @dlt.resource decorators
当用户请求创建dlt管道时,先确定要创建的管道类型:
START: User wants to create a dlt pipeline
├─→ Is there a dlt verified source available for this platform?
│   (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│   │
│   YES → Use VERIFIED SOURCE approach
│   │     Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│   │     Action: Guide user through `dlt init <source> <destination>`
│   │
│   NO → Continue to next question
├─→ Is this a REST API with standard patterns?
│   (Standard auth, pagination, JSON responses)
│   │
│   YES → Use DECLARATIVE REST API approach
│   │     Examples: Pokemon API, simple REST APIs with clear endpoints
│   │     Action: Create config-based pipeline with rest_api_source
│   │
│   NO → Continue to next question
└─→ Does this require custom logic or Python packages?
    YES → Use CUSTOM PYTHON approach
          Examples: Python packages (simple-salesforce), complex transformations,
                   non-standard APIs, custom data sources
          Action: Create custom source with @dlt.source and @dlt.resource decorators

Core Workflow

核心工作流程

1. Understand Requirements

1. 明确需求

Ask clarifying questions:
  • Source: What is the data source? (API URL, platform name, database, etc.)
  • Source type: Does this match a verified source, REST API, or require custom code?
  • Destination: Where should data be loaded? (DuckDB, BigQuery, Snowflake, etc.)
  • Resources: What specific data/endpoints are needed?
  • Incremental: Should the pipeline load incrementally or do full refreshes?
  • Authentication: What credentials are required?
询问澄清问题:
  • 数据源:数据来源是什么?(API地址、平台名称、数据库等)
  • 数据源类型:是否匹配已验证数据源、REST API,还是需要自定义代码?
  • 目标存储:数据需要加载到哪里?(DuckDB、BigQuery、Snowflake等)
  • 资源:需要哪些具体的数据/端点?
  • 增量加载:管道应采用增量加载还是全量刷新?
  • 身份验证:需要哪些凭证?

2. Choose Pipeline Approach

2. 选择管道实现方式

Based on the decision tree above, select:
  • Verified source - Pre-built, tested connector
  • Declarative REST API - Config-based REST API pipeline
  • Custom Python - Full control with Python code
根据上述决策树,选择以下方式之一:
  • 已验证数据源 - 预构建、经过测试的连接器
  • 声明式REST API - 基于配置的REST API管道
  • 自定义Python代码 - 完全可控的Python代码实现

3. Initialize or Create Pipeline

3. 初始化或创建管道

Verified source

已验证数据源

bash
dlt init <source_name> <destination_name>
Examples:
  • dlt init salesforce bigquery
  • dlt init github duckdb
  • dlt init stripe snowflake
bash
dlt init <source_name> <destination_name>
示例:
  • dlt init salesforce bigquery
  • dlt init github duckdb
  • dlt init stripe snowflake

Declarative REST API or Custom Python

声明式REST API或自定义Python

Use templates from this skill's assets/templates/ (copy into the project if needed):
  • declarative_rest_pipeline.py
    - For REST APIs
  • custom_python_pipeline.py
    - For custom sources
使用本技能的assets/templates/中的模板(必要时复制到项目中):
  • declarative_rest_pipeline.py
    - 适用于REST API
  • custom_python_pipeline.py
    - 适用于自定义数据源

4. Install Required Packages

4. 安装所需依赖包

Recommended: Use the helper script (detects pip/uv/poetry):
bash
python scripts/install_packages.py --destination <destination_name>
Manual:
pip install "dlt[<destination>,workspace]"
(e.g.
bigquery
,
snowflake
). For DuckDB use
dlt[workspace]
only. The
workspace
extra is required for
dlt pipeline <name> show
and the dashboard.
推荐方式:使用辅助脚本(自动检测pip/uv/poetry):
bash
python scripts/install_packages.py --destination <destination_name>
手动安装
pip install "dlt[<destination>,workspace]"
(例如
bigquery
snowflake
)。对于DuckDB,仅需安装
dlt[workspace]
workspace
扩展是使用
dlt pipeline <name> show
命令和仪表板的必要依赖。

5. Configure Credentials

5. 配置凭证

Create or update
.dlt/secrets.toml
:
Structure:
toml
[sources.<source_name>]
创建或更新
.dlt/secrets.toml
文件:
结构
toml
[sources.<source_name>]

Source credentials here

数据源凭证填写此处

[destination.<destination_name>]
[destination.<destination_name>]

Destination credentials here

目标存储凭证填写此处


Use the template: [assets/templates/.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)

**Important**: Remind user to add `.dlt/secrets.toml` to `.gitignore`!

**Note for DuckDB**: DuckDB doesn't require credentials in secrets.toml. Just specify the database file path in the pipeline or config.toml.

使用模板:[assets/templates/.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)

**重要提示**:提醒用户将`.dlt/secrets.toml`添加到`.gitignore`中!

**DuckDB注意事项**:DuckDB无需在secrets.toml中配置凭证,只需在管道或config.toml中指定数据库文件路径即可。

6. Configure Pipeline Settings

6. 配置管道设置

Create or update
.dlt/config.toml
for non-sensitive settings:
toml
[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30

[destination.<destination_name>]
location = "US"
Use the template: assets/templates/.dlt/config.toml
创建或更新
.dlt/config.toml
文件以配置非敏感设置:
toml
[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30

[destination.<destination_name>]
location = "US"
使用模板:assets/templates/.dlt/config.toml

7. Implement Pipeline Logic

7. 实现管道逻辑

Flesh out the pipeline code based on requirements:
For verified sources:
  • Customize resource selection with
    .with_resources()
  • Configure incremental loading with
    .apply_hints()
  • See: references/verified-sources.md
For Declarative REST API:
  • Define client configuration (base_url, auth)
  • Configure resources and endpoints
  • Set up pagination and incremental loading
  • Resource-level options (e.g.
    max_table_nesting
    ,
    table_name
    ) are set in the resource dict in the config; see references/rest-api-source.md Resource configuration.
  • See: references/rest-api-source.md
For Custom Python:
  • Implement
    @dlt.source
    and
    @dlt.resource
    functions
  • Use generators and yield patterns
  • Configure write dispositions and primary keys
  • See: references/custom-sources.md
根据需求完善管道代码:
已验证数据源
  • 使用
    .with_resources()
    自定义资源选择
  • 使用
    .apply_hints()
    配置增量加载
  • 参考:references/verified-sources.md
声明式REST API
  • 定义客户端配置(base_url、身份验证)
  • 配置资源和端点
  • 设置分页和增量加载
  • 资源级选项(如
    max_table_nesting
    table_name
    )在配置的资源字典中设置;参考references/rest-api-source.md中的资源配置部分。
  • 参考:references/rest-api-source.md
自定义Python代码
  • 实现
    @dlt.source
    @dlt.resource
    装饰器函数
  • 使用生成器和yield模式
  • 配置写入策略和主键
  • 参考:references/custom-sources.md

8. Configure Incremental Loading (If Needed)

8. 配置增量加载(如有需要)

For pipelines that should load only new/changed data:
  • Identify cursor field (timestamp, ID)
  • Set write disposition to
    merge
  • Define primary key for deduplication
  • Configure incremental parameters
See: references/incremental-loading.md
对于仅需加载新增/变更数据的管道:
  • 确定游标字段(时间戳、ID)
  • 将写入策略设置为
    merge
  • 定义用于去重的主键
  • 配置增量参数
参考:references/incremental-loading.md

9. Test and Run Pipeline

9. 测试并运行管道

python
python <pipeline_file>.py
Check for errors and verify data is loaded correctly.
python
python <pipeline_file>.py
检查错误并验证数据是否正确加载。

10. Inspect Results

10. 检查结果

Prerequisite: Ensure
dlt[workspace]
is installed (included by default when using
install_packages.py
).
Open the dlt dashboard to inspect loaded data:
bash
dlt pipeline <pipeline_name> show
Or use the helper script:
bash
python scripts/open_dashboard.py <pipeline_name>
前提条件:确保已安装
dlt[workspace]
(使用
install_packages.py
时默认包含)。
打开dlt仪表板查看加载的数据:
bash
dlt pipeline <pipeline_name> show
或使用辅助脚本:
bash
python scripts/open_dashboard.py <pipeline_name>

Pipeline Patterns

管道模式示例

Pattern 1: Verified source — Select specific resources

模式1:已验证数据源 — 选择特定资源

python
from salesforce import salesforce_source

source = salesforce_source()
pipeline = dlt.pipeline(
    pipeline_name='salesforce_pipeline',
    destination='bigquery',
    dataset_name='salesforce_data'
)
python
from salesforce import salesforce_source

source = salesforce_source()
pipeline = dlt.pipeline(
    pipeline_name='salesforce_pipeline',
    destination='bigquery',
    dataset_name='salesforce_data'
)

Load only specific Salesforce objects

仅加载特定的Salesforce对象

pipeline.run(source.with_resources("Account", "Opportunity", "Contact"))
undefined
pipeline.run(source.with_resources("Account", "Opportunity", "Contact"))
undefined

Pattern 2: Declarative REST API - Simple Endpoints

模式2:声明式REST API - 简单端点

python
from dlt.sources.rest_api import rest_api_source

config = {
    "client": {
        "base_url": "https://pokeapi.co/api/v2/",
    },
    "resources": [
        "pokemon",
        {
            "name": "pokemon_details",
            "endpoint": "pokemon/{name}",
            "write_disposition": "merge",
            "primary_key": "id"
        }
    ]
}

pipeline = dlt.pipeline(
    pipeline_name="pokemon",
    destination="duckdb",
    dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))
python
from dlt.sources.rest_api import rest_api_source

config = {
    "client": {
        "base_url": "https://pokeapi.co/api/v2/",
    },
    "resources": [
        "pokemon",
        {
            "name": "pokemon_details",
            "endpoint": "pokemon/{name}",
            "write_disposition": "merge",
            "primary_key": "id"
        }
    ]
}

pipeline = dlt.pipeline(
    pipeline_name="pokemon",
    destination="duckdb",
    dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))

Pattern 3: Custom Python - Using Python Package

模式3:自定义Python - 使用Python包

python
import dlt
from simple_salesforce import Salesforce

@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
    sf = Salesforce(username=username, password=password)

    @dlt.resource(write_disposition='merge', primary_key='Id')
    def accounts():
        records = sf.query_all("SELECT Id, Name FROM Account")
        yield records['records']

    return accounts

pipeline = dlt.pipeline(
    pipeline_name='salesforce_custom',
    destination='duckdb',
    dataset_name='salesforce'
)
pipeline.run(salesforce_custom())
python
import dlt
from simple_salesforce import Salesforce

@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
    sf = Salesforce(username=username, password=password)

    @dlt.resource(write_disposition='merge', primary_key='Id')
    def accounts():
        records = sf.query_all("SELECT Id, Name FROM Account")
        yield records['records']

    return accounts

pipeline = dlt.pipeline(
    pipeline_name='salesforce_custom',
    destination='duckdb',
    dataset_name='salesforce'
)
pipeline.run(salesforce_custom())

Pattern 4: Incremental Loading with REST API

模式4:REST API的增量加载

python
config = {
    "client": {
        "base_url": "https://api.github.com/repos/dlt-hub/dlt/",
        "auth": {"token": dlt.secrets["github_token"]}
    },
    "resources": [
        {
            "name": "issues",
            "endpoint": {
                "path": "issues",
                "params": {
                    "state": "all",
                    "since": "{incremental.start_value}"
                }
            },
            "incremental": {
                "cursor_path": "updated_at",
                "initial_value": "2024-01-01T00:00:00Z"
            },
            "write_disposition": "merge",
            "primary_key": "id"
        }
    ]
}
python
config = {
    "client": {
        "base_url": "https://api.github.com/repos/dlt-hub/dlt/",
        "auth": {"token": dlt.secrets["github_token"]}
    },
    "resources": [
        {
            "name": "issues",
            "endpoint": {
                "path": "issues",
                "params": {
                    "state": "all",
                    "since": "{incremental.start_value}"
                }
            },
            "incremental": {
                "cursor_path": "updated_at",
                "initial_value": "2024-01-01T00:00:00Z"
            },
            "write_disposition": "merge",
            "primary_key": "id"
        }
    ]
}

Pattern 5: Non-endpoint resources for REST API sources (e.g. Database-Seeded or File-Seeded parameters)

模式5:REST API数据源的非端点资源(如数据库或文件驱动的参数)

Use non-endpoint resources (e.g. Database-Seeded or File-Seeded parameters) to drive REST API calls from a database, file, or other non-API source. Pre-fetch data outside the dlt pipeline context to avoid
dlt.attach()
/ context conflicts. The seed resource must yield a list of dicts so each row drives one API request.
python
import duckdb
import dlt
from dlt.sources.rest_api import rest_api_source
使用非端点资源(如数据库或文件驱动的参数),从数据库、文件或其他非API源触发REST API调用。需在dlt管道上下文外部预获取数据,以避免
dlt.attach()
/上下文冲突。种子资源必须生成字典列表,这样每一行数据都会触发一次API请求。
python
import duckdb
import dlt
from dlt.sources.rest_api import rest_api_source

1. Pre-fetch data from database (outside dlt context)

1. 从数据库预获取数据(在dlt上下文外部)

def get_locations(): conn = duckdb.connect("locations.duckdb", read_only=True) result = conn.execute("SELECT id, lat, lng FROM locations").fetchall() conn.close() return [{"id": r[0], "lat": r[1], "lng": r[2]} for r in result]
def get_locations(): conn = duckdb.connect("locations.duckdb", read_only=True) result = conn.execute("SELECT id, lat, lng FROM locations").fetchall() conn.close() return [{"id": r[0], "lat": r[1], "lng": r[2]} for r in result]

2. Create seed resource

2. 创建种子资源

@dlt.resource(selected=False) def locations(): yield get_locations() # Yield as LIST
@dlt.resource(selected=False) def locations(): yield get_locations() # 以列表形式生成数据

3. Configure REST API with resolve

3. 配置带解析逻辑的REST API

config = { "client": {"base_url": "https://api.weather.com/"}, "resources": [ locations(), { "name": "weather", "endpoint": { "path": "forecast", "params": { "lat": "{resources.locations.lat}", "lng": "{resources.locations.lng}" }, "data_selector": "$", "paginator": "single_page" }, "include_from_parent": ["id"], "primary_key": "_locations_id" } ] }
source = rest_api_source(config) pipeline = dlt.pipeline( pipeline_name="weather", destination="duckdb", dataset_name="weather_data" ) pipeline.run(source)

See: [references/rest-api-source.md](references/rest-api-source.md) (Non-REST Endpoint Resources, Query/Path Params, Single-Object Responses, include_from_parent).
config = { "client": {"base_url": "https://api.weather.com/"}, "resources": [ locations(), { "name": "weather", "endpoint": { "path": "forecast", "params": { "lat": "{resources.locations.lat}", "lng": "{resources.locations.lng}" }, "data_selector": "$", "paginator": "single_page" }, "include_from_parent": ["id"], "primary_key": "_locations_id" } ] }
source = rest_api_source(config) pipeline = dlt.pipeline( pipeline_name="weather", destination="duckdb", dataset_name="weather_data" ) pipeline.run(source)

参考:[references/rest-api-source.md](references/rest-api-source.md)(非REST端点资源、查询/路径参数、单对象响应、include_from_parent)。

Best Practices (Data Engineering)

最佳实践(数据工程)

  • Secrets: Use
    .dlt/secrets.toml
    ; never hardcode; add to
    .gitignore
  • Primary keys: Set for merge operations and deduplication
  • Write dispositions:
    append
    (events),
    merge
    (stateful),
    replace
    (snapshots)
  • Performance: Yield pages not rows; use incremental loading when possible
See references/performance-tuning.md, references/incremental-loading.md, and references/troubleshooting.md for more.
  • 密钥管理:使用
    .dlt/secrets.toml
    存储密钥;切勿硬编码;将其添加到
    .gitignore
  • 主键设置:为合并操作和去重设置主键
  • 写入策略
    append
    (事件类数据)、
    merge
    (有状态数据)、
    replace
    (快照数据)
  • 性能优化:按页生成数据而非按行;尽可能使用增量加载
更多内容请参考references/performance-tuning.mdreferences/incremental-loading.mdreferences/troubleshooting.md

Common Challenges and Solutions

常见挑战与解决方案

Auth (OAuth2): In REST config use
"auth": {"type": "oauth2_client_credentials", ...}
. For custom Python use
dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentials
with
paginate()
. See references/rest-api-source.md.
Custom pagination / nested data / performance: See references/rest-api-source.md, references/custom-sources.md, references/performance-tuning.md.
身份验证(OAuth2):在REST配置中使用
"auth": {"type": "oauth2_client_credentials", ...}
。对于自定义Python代码,使用
dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentials
结合
paginate()
。参考references/rest-api-source.md
自定义分页 / 嵌套数据 / 性能问题:参考references/rest-api-source.mdreferences/custom-sources.mdreferences/performance-tuning.md

Reference Documentation — When to Read What

参考文档 — 场景对应指南

  • Full workflow / step-by-step examplereferences/examples.md
  • Verified sourcereferences/verified-sources.md
  • Declarative REST APIreferences/rest-api-source.md
  • Custom Python sourcereferences/custom-sources.md
  • Incremental loadingreferences/incremental-loading.md
  • Performancereferences/performance-tuning.md
  • Errors / debuggingreferences/troubleshooting.md
  • dlt basicsreferences/core-concepts.md
  • 完整工作流程 / 分步示例references/examples.md
  • 已验证数据源references/verified-sources.md
  • 声明式REST APIreferences/rest-api-source.md
  • 自定义Python数据源references/custom-sources.md
  • 增量加载references/incremental-loading.md
  • 性能优化references/performance-tuning.md
  • 错误排查 / 调试references/troubleshooting.md
  • dlt基础概念references/core-concepts.md

Templates and Scripts

模板与脚本

Templates (assets/templates/)

模板(assets/templates/)

  • custom_python_pipeline.py - Custom Python pipeline skeleton
  • verified_source_pipeline.py - Verified source pipeline skeleton
  • declarative_rest_pipeline.py - Declarative REST API pipeline skeleton
  • .dlt/config.toml - Configuration file template
  • .dlt/secrets.toml - Secrets file template
  • .gitignore - Git ignore template for dlt projects
  • custom_python_pipeline.py - 自定义Python管道骨架
  • verified_source_pipeline.py - 已验证数据源管道骨架
  • declarative_rest_pipeline.py - 声明式REST API管道骨架
  • .dlt/config.toml - 配置文件模板
  • .dlt/secrets.toml - 密钥文件模板
  • .gitignore - dlt项目的Git忽略模板

Scripts (scripts/)

脚本(scripts/)

  • install_packages.py - Install dlt + destination extras (includes
    workspace
    ). Run when setting up a new project or adding a destination.
  • open_dashboard.py - Open pipeline dashboard (
    dlt pipeline <name> show
    ). Run after a pipeline run to inspect loaded data.
  • install_packages.py - 安装dlt + 目标存储扩展(包含
    workspace
    )。在新建项目或添加目标存储时运行。
  • open_dashboard.py - 打开管道仪表板(
    dlt pipeline <name> show
    )。管道运行后执行以查看加载的数据。

Key Reminders

关键提醒

  • Always ask about destination - Don't assume
  • Security first - Never commit secrets; use
    .dlt/secrets.toml
    and provide
    .gitignore
  • Start simple - Use verified sources when available; test incrementally
  • Read references - Load detailed docs only when needed
  • 始终确认目标存储 - 不要假设
  • 安全优先 - 切勿提交密钥;使用
    .dlt/secrets.toml
    并提供
    .gitignore
  • 从简开始 - 优先使用已验证数据源;逐步测试
  • 参考文档 - 仅在需要时查阅详细文档