dlt-skill
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesedlt Pipeline Creator
dlt 管道创建工具
Choose pipeline type with the decision tree below; then follow the Core Workflow.
Quick start: 1) Use the decision tree. 2) Follow the Core Workflow. 3) Use patterns and references as needed.
通过下方的决策树选择管道类型,然后遵循核心工作流程操作。
快速开始:1) 使用决策树。2) 遵循核心工作流程。3) 按需使用模式示例与参考文档。
Pipeline Type Decision Tree
管道类型决策树
When a user requests a dlt pipeline, determine which type to create:
START: User wants to create a dlt pipeline
│
├─→ Is there a dlt verified source available for this platform?
│ (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│ │
│ YES → Use VERIFIED SOURCE approach
│ │ Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│ │ Action: Guide user through `dlt init <source> <destination>`
│ │
│ NO → Continue to next question
│
├─→ Is this a REST API with standard patterns?
│ (Standard auth, pagination, JSON responses)
│ │
│ YES → Use DECLARATIVE REST API approach
│ │ Examples: Pokemon API, simple REST APIs with clear endpoints
│ │ Action: Create config-based pipeline with rest_api_source
│ │
│ NO → Continue to next question
│
└─→ Does this require custom logic or Python packages?
│
YES → Use CUSTOM PYTHON approach
Examples: Python packages (simple-salesforce), complex transformations,
non-standard APIs, custom data sources
Action: Create custom source with @dlt.source and @dlt.resource decorators当用户请求创建dlt管道时,先确定要创建的管道类型:
START: User wants to create a dlt pipeline
│
├─→ Is there a dlt verified source available for this platform?
│ (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│ │
│ YES → Use VERIFIED SOURCE approach
│ │ Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│ │ Action: Guide user through `dlt init <source> <destination>`
│ │
│ NO → Continue to next question
│
├─→ Is this a REST API with standard patterns?
│ (Standard auth, pagination, JSON responses)
│ │
│ YES → Use DECLARATIVE REST API approach
│ │ Examples: Pokemon API, simple REST APIs with clear endpoints
│ │ Action: Create config-based pipeline with rest_api_source
│ │
│ NO → Continue to next question
│
└─→ Does this require custom logic or Python packages?
│
YES → Use CUSTOM PYTHON approach
Examples: Python packages (simple-salesforce), complex transformations,
non-standard APIs, custom data sources
Action: Create custom source with @dlt.source and @dlt.resource decoratorsCore Workflow
核心工作流程
1. Understand Requirements
1. 明确需求
Ask clarifying questions:
- Source: What is the data source? (API URL, platform name, database, etc.)
- Source type: Does this match a verified source, REST API, or require custom code?
- Destination: Where should data be loaded? (DuckDB, BigQuery, Snowflake, etc.)
- Resources: What specific data/endpoints are needed?
- Incremental: Should the pipeline load incrementally or do full refreshes?
- Authentication: What credentials are required?
询问澄清问题:
- 数据源:数据来源是什么?(API地址、平台名称、数据库等)
- 数据源类型:是否匹配已验证数据源、REST API,还是需要自定义代码?
- 目标存储:数据需要加载到哪里?(DuckDB、BigQuery、Snowflake等)
- 资源:需要哪些具体的数据/端点?
- 增量加载:管道应采用增量加载还是全量刷新?
- 身份验证:需要哪些凭证?
2. Choose Pipeline Approach
2. 选择管道实现方式
Based on the decision tree above, select:
- Verified source - Pre-built, tested connector
- Declarative REST API - Config-based REST API pipeline
- Custom Python - Full control with Python code
根据上述决策树,选择以下方式之一:
- 已验证数据源 - 预构建、经过测试的连接器
- 声明式REST API - 基于配置的REST API管道
- 自定义Python代码 - 完全可控的Python代码实现
3. Initialize or Create Pipeline
3. 初始化或创建管道
Verified source
已验证数据源
bash
dlt init <source_name> <destination_name>Examples:
dlt init salesforce bigquerydlt init github duckdbdlt init stripe snowflake
bash
dlt init <source_name> <destination_name>示例:
dlt init salesforce bigquerydlt init github duckdbdlt init stripe snowflake
Declarative REST API or Custom Python
声明式REST API或自定义Python
Use templates from this skill's assets/templates/ (copy into the project if needed):
- - For REST APIs
declarative_rest_pipeline.py - - For custom sources
custom_python_pipeline.py
使用本技能的assets/templates/中的模板(必要时复制到项目中):
- - 适用于REST API
declarative_rest_pipeline.py - - 适用于自定义数据源
custom_python_pipeline.py
4. Install Required Packages
4. 安装所需依赖包
Recommended: Use the helper script (detects pip/uv/poetry):
bash
python scripts/install_packages.py --destination <destination_name>Manual: (e.g. , ). For DuckDB use only. The extra is required for and the dashboard.
pip install "dlt[<destination>,workspace]"bigquerysnowflakedlt[workspace]workspacedlt pipeline <name> show推荐方式:使用辅助脚本(自动检测pip/uv/poetry):
bash
python scripts/install_packages.py --destination <destination_name>手动安装:(例如、)。对于DuckDB,仅需安装。扩展是使用命令和仪表板的必要依赖。
pip install "dlt[<destination>,workspace]"bigquerysnowflakedlt[workspace]workspacedlt pipeline <name> show5. Configure Credentials
5. 配置凭证
Create or update :
.dlt/secrets.tomlStructure:
toml
[sources.<source_name>]创建或更新文件:
.dlt/secrets.toml结构:
toml
[sources.<source_name>]Source credentials here
数据源凭证填写此处
[destination.<destination_name>]
[destination.<destination_name>]
Destination credentials here
目标存储凭证填写此处
Use the template: [assets/templates/.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)
**Important**: Remind user to add `.dlt/secrets.toml` to `.gitignore`!
**Note for DuckDB**: DuckDB doesn't require credentials in secrets.toml. Just specify the database file path in the pipeline or config.toml.
使用模板:[assets/templates/.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)
**重要提示**:提醒用户将`.dlt/secrets.toml`添加到`.gitignore`中!
**DuckDB注意事项**:DuckDB无需在secrets.toml中配置凭证,只需在管道或config.toml中指定数据库文件路径即可。6. Configure Pipeline Settings
6. 配置管道设置
Create or update for non-sensitive settings:
.dlt/config.tomltoml
[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30
[destination.<destination_name>]
location = "US"Use the template: assets/templates/.dlt/config.toml
创建或更新文件以配置非敏感设置:
.dlt/config.tomltoml
[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30
[destination.<destination_name>]
location = "US"使用模板:assets/templates/.dlt/config.toml
7. Implement Pipeline Logic
7. 实现管道逻辑
Flesh out the pipeline code based on requirements:
For verified sources:
- Customize resource selection with
.with_resources() - Configure incremental loading with
.apply_hints() - See: references/verified-sources.md
For Declarative REST API:
- Define client configuration (base_url, auth)
- Configure resources and endpoints
- Set up pagination and incremental loading
- Resource-level options (e.g. ,
max_table_nesting) are set in the resource dict in the config; see references/rest-api-source.md Resource configuration.table_name - See: references/rest-api-source.md
For Custom Python:
- Implement and
@dlt.sourcefunctions@dlt.resource - Use generators and yield patterns
- Configure write dispositions and primary keys
- See: references/custom-sources.md
根据需求完善管道代码:
已验证数据源:
- 使用自定义资源选择
.with_resources() - 使用配置增量加载
.apply_hints() - 参考:references/verified-sources.md
声明式REST API:
- 定义客户端配置(base_url、身份验证)
- 配置资源和端点
- 设置分页和增量加载
- 资源级选项(如、
max_table_nesting)在配置的资源字典中设置;参考references/rest-api-source.md中的资源配置部分。table_name - 参考:references/rest-api-source.md
自定义Python代码:
- 实现和
@dlt.source装饰器函数@dlt.resource - 使用生成器和yield模式
- 配置写入策略和主键
- 参考:references/custom-sources.md
8. Configure Incremental Loading (If Needed)
8. 配置增量加载(如有需要)
For pipelines that should load only new/changed data:
- Identify cursor field (timestamp, ID)
- Set write disposition to
merge - Define primary key for deduplication
- Configure incremental parameters
See: references/incremental-loading.md
对于仅需加载新增/变更数据的管道:
- 确定游标字段(时间戳、ID)
- 将写入策略设置为
merge - 定义用于去重的主键
- 配置增量参数
参考:references/incremental-loading.md
9. Test and Run Pipeline
9. 测试并运行管道
python
python <pipeline_file>.pyCheck for errors and verify data is loaded correctly.
python
python <pipeline_file>.py检查错误并验证数据是否正确加载。
10. Inspect Results
10. 检查结果
Prerequisite: Ensure is installed (included by default when using ).
dlt[workspace]install_packages.pyOpen the dlt dashboard to inspect loaded data:
bash
dlt pipeline <pipeline_name> showOr use the helper script:
bash
python scripts/open_dashboard.py <pipeline_name>前提条件:确保已安装(使用时默认包含)。
dlt[workspace]install_packages.py打开dlt仪表板查看加载的数据:
bash
dlt pipeline <pipeline_name> show或使用辅助脚本:
bash
python scripts/open_dashboard.py <pipeline_name>Pipeline Patterns
管道模式示例
Pattern 1: Verified source — Select specific resources
模式1:已验证数据源 — 选择特定资源
python
from salesforce import salesforce_source
source = salesforce_source()
pipeline = dlt.pipeline(
pipeline_name='salesforce_pipeline',
destination='bigquery',
dataset_name='salesforce_data'
)python
from salesforce import salesforce_source
source = salesforce_source()
pipeline = dlt.pipeline(
pipeline_name='salesforce_pipeline',
destination='bigquery',
dataset_name='salesforce_data'
)Load only specific Salesforce objects
仅加载特定的Salesforce对象
pipeline.run(source.with_resources("Account", "Opportunity", "Contact"))
undefinedpipeline.run(source.with_resources("Account", "Opportunity", "Contact"))
undefinedPattern 2: Declarative REST API - Simple Endpoints
模式2:声明式REST API - 简单端点
python
from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resources": [
"pokemon",
{
"name": "pokemon_details",
"endpoint": "pokemon/{name}",
"write_disposition": "merge",
"primary_key": "id"
}
]
}
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))python
from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resources": [
"pokemon",
{
"name": "pokemon_details",
"endpoint": "pokemon/{name}",
"write_disposition": "merge",
"primary_key": "id"
}
]
}
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))Pattern 3: Custom Python - Using Python Package
模式3:自定义Python - 使用Python包
python
import dlt
from simple_salesforce import Salesforce
@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
sf = Salesforce(username=username, password=password)
@dlt.resource(write_disposition='merge', primary_key='Id')
def accounts():
records = sf.query_all("SELECT Id, Name FROM Account")
yield records['records']
return accounts
pipeline = dlt.pipeline(
pipeline_name='salesforce_custom',
destination='duckdb',
dataset_name='salesforce'
)
pipeline.run(salesforce_custom())python
import dlt
from simple_salesforce import Salesforce
@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
sf = Salesforce(username=username, password=password)
@dlt.resource(write_disposition='merge', primary_key='Id')
def accounts():
records = sf.query_all("SELECT Id, Name FROM Account")
yield records['records']
return accounts
pipeline = dlt.pipeline(
pipeline_name='salesforce_custom',
destination='duckdb',
dataset_name='salesforce'
)
pipeline.run(salesforce_custom())Pattern 4: Incremental Loading with REST API
模式4:REST API的增量加载
python
config = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {"token": dlt.secrets["github_token"]}
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"state": "all",
"since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
},
"write_disposition": "merge",
"primary_key": "id"
}
]
}python
config = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {"token": dlt.secrets["github_token"]}
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"state": "all",
"since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
},
"write_disposition": "merge",
"primary_key": "id"
}
]
}Pattern 5: Non-endpoint resources for REST API sources (e.g. Database-Seeded or File-Seeded parameters)
模式5:REST API数据源的非端点资源(如数据库或文件驱动的参数)
Use non-endpoint resources (e.g. Database-Seeded or File-Seeded parameters) to drive REST API calls from a database, file, or other non-API source. Pre-fetch data outside the dlt pipeline context to avoid / context conflicts. The seed resource must yield a list of dicts so each row drives one API request.
dlt.attach()python
import duckdb
import dlt
from dlt.sources.rest_api import rest_api_source使用非端点资源(如数据库或文件驱动的参数),从数据库、文件或其他非API源触发REST API调用。需在dlt管道上下文外部预获取数据,以避免/上下文冲突。种子资源必须生成字典列表,这样每一行数据都会触发一次API请求。
dlt.attach()python
import duckdb
import dlt
from dlt.sources.rest_api import rest_api_source1. Pre-fetch data from database (outside dlt context)
1. 从数据库预获取数据(在dlt上下文外部)
def get_locations():
conn = duckdb.connect("locations.duckdb", read_only=True)
result = conn.execute("SELECT id, lat, lng FROM locations").fetchall()
conn.close()
return [{"id": r[0], "lat": r[1], "lng": r[2]} for r in result]
def get_locations():
conn = duckdb.connect("locations.duckdb", read_only=True)
result = conn.execute("SELECT id, lat, lng FROM locations").fetchall()
conn.close()
return [{"id": r[0], "lat": r[1], "lng": r[2]} for r in result]
2. Create seed resource
2. 创建种子资源
@dlt.resource(selected=False)
def locations():
yield get_locations() # Yield as LIST
@dlt.resource(selected=False)
def locations():
yield get_locations() # 以列表形式生成数据
3. Configure REST API with resolve
3. 配置带解析逻辑的REST API
config = {
"client": {"base_url": "https://api.weather.com/"},
"resources": [
locations(),
{
"name": "weather",
"endpoint": {
"path": "forecast",
"params": {
"lat": "{resources.locations.lat}",
"lng": "{resources.locations.lng}"
},
"data_selector": "$",
"paginator": "single_page"
},
"include_from_parent": ["id"],
"primary_key": "_locations_id"
}
]
}
source = rest_api_source(config)
pipeline = dlt.pipeline(
pipeline_name="weather",
destination="duckdb",
dataset_name="weather_data"
)
pipeline.run(source)
See: [references/rest-api-source.md](references/rest-api-source.md) (Non-REST Endpoint Resources, Query/Path Params, Single-Object Responses, include_from_parent).config = {
"client": {"base_url": "https://api.weather.com/"},
"resources": [
locations(),
{
"name": "weather",
"endpoint": {
"path": "forecast",
"params": {
"lat": "{resources.locations.lat}",
"lng": "{resources.locations.lng}"
},
"data_selector": "$",
"paginator": "single_page"
},
"include_from_parent": ["id"],
"primary_key": "_locations_id"
}
]
}
source = rest_api_source(config)
pipeline = dlt.pipeline(
pipeline_name="weather",
destination="duckdb",
dataset_name="weather_data"
)
pipeline.run(source)
参考:[references/rest-api-source.md](references/rest-api-source.md)(非REST端点资源、查询/路径参数、单对象响应、include_from_parent)。Best Practices (Data Engineering)
最佳实践(数据工程)
- Secrets: Use ; never hardcode; add to
.dlt/secrets.toml.gitignore - Primary keys: Set for merge operations and deduplication
- Write dispositions: (events),
append(stateful),merge(snapshots)replace - Performance: Yield pages not rows; use incremental loading when possible
See references/performance-tuning.md, references/incremental-loading.md, and references/troubleshooting.md for more.
- 密钥管理:使用存储密钥;切勿硬编码;将其添加到
.dlt/secrets.toml.gitignore - 主键设置:为合并操作和去重设置主键
- 写入策略:(事件类数据)、
append(有状态数据)、merge(快照数据)replace - 性能优化:按页生成数据而非按行;尽可能使用增量加载
更多内容请参考references/performance-tuning.md、references/incremental-loading.md和references/troubleshooting.md。
Common Challenges and Solutions
常见挑战与解决方案
Auth (OAuth2): In REST config use . For custom Python use with . See references/rest-api-source.md.
"auth": {"type": "oauth2_client_credentials", ...}dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentialspaginate()Custom pagination / nested data / performance: See references/rest-api-source.md, references/custom-sources.md, references/performance-tuning.md.
身份验证(OAuth2):在REST配置中使用。对于自定义Python代码,使用结合。参考references/rest-api-source.md。
"auth": {"type": "oauth2_client_credentials", ...}dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentialspaginate()自定义分页 / 嵌套数据 / 性能问题:参考references/rest-api-source.md、references/custom-sources.md、references/performance-tuning.md。
Reference Documentation — When to Read What
参考文档 — 场景对应指南
- Full workflow / step-by-step example → references/examples.md
- Verified source → references/verified-sources.md
- Declarative REST API → references/rest-api-source.md
- Custom Python source → references/custom-sources.md
- Incremental loading → references/incremental-loading.md
- Performance → references/performance-tuning.md
- Errors / debugging → references/troubleshooting.md
- dlt basics → references/core-concepts.md
- 完整工作流程 / 分步示例 → references/examples.md
- 已验证数据源 → references/verified-sources.md
- 声明式REST API → references/rest-api-source.md
- 自定义Python数据源 → references/custom-sources.md
- 增量加载 → references/incremental-loading.md
- 性能优化 → references/performance-tuning.md
- 错误排查 / 调试 → references/troubleshooting.md
- dlt基础概念 → references/core-concepts.md
Templates and Scripts
模板与脚本
Templates (assets/templates/)
模板(assets/templates/)
- custom_python_pipeline.py - Custom Python pipeline skeleton
- verified_source_pipeline.py - Verified source pipeline skeleton
- declarative_rest_pipeline.py - Declarative REST API pipeline skeleton
- .dlt/config.toml - Configuration file template
- .dlt/secrets.toml - Secrets file template
- .gitignore - Git ignore template for dlt projects
- custom_python_pipeline.py - 自定义Python管道骨架
- verified_source_pipeline.py - 已验证数据源管道骨架
- declarative_rest_pipeline.py - 声明式REST API管道骨架
- .dlt/config.toml - 配置文件模板
- .dlt/secrets.toml - 密钥文件模板
- .gitignore - dlt项目的Git忽略模板
Scripts (scripts/)
脚本(scripts/)
- install_packages.py - Install dlt + destination extras (includes ). Run when setting up a new project or adding a destination.
workspace - open_dashboard.py - Open pipeline dashboard (). Run after a pipeline run to inspect loaded data.
dlt pipeline <name> show
- install_packages.py - 安装dlt + 目标存储扩展(包含)。在新建项目或添加目标存储时运行。
workspace - open_dashboard.py - 打开管道仪表板()。管道运行后执行以查看加载的数据。
dlt pipeline <name> show
Key Reminders
关键提醒
- Always ask about destination - Don't assume
- Security first - Never commit secrets; use and provide
.dlt/secrets.toml.gitignore - Start simple - Use verified sources when available; test incrementally
- Read references - Load detailed docs only when needed
- 始终确认目标存储 - 不要假设
- 安全优先 - 切勿提交密钥;使用并提供
.dlt/secrets.toml.gitignore - 从简开始 - 优先使用已验证数据源;逐步测试
- 参考文档 - 仅在需要时查阅详细文档