Loading...
Loading...
Compare original and translation side by side
| If you're writing... | Check this section/reference |
|---|---|
| Assets or |
| Resources or |
| Automation or |
| Sensors or |
| Partitions or |
Tests with | Testing or |
| |
| |
| dbt Integration or |
| |
| 如果你正在编写... | 查看此章节/参考文档 |
|---|---|
| 资产 或 |
| 资源 或 |
| 自动化 或 |
| 传感器 或 |
| 分区 或 |
使用 | 测试 或 |
| |
| |
| dbt集成 或 |
| |
@dg.assetConfigurableResourcedg.define_asset_job()dg.ScheduleDefinition@dg.sensorPartitionsDefinition@dg.assetConfigurableResourcedg.define_asset_job()dg.ScheduleDefinition@dg.sensorPartitionsDefinitionimport dagster as dg
@dg.asset
def my_asset() -> None:
"""Asset description appears in the UI."""
# Your computation logic here
passimport dagster as dg
@dg.asset
def my_asset() -> None:
"""资产描述会显示在UI中。"""
# 此处编写你的计算逻辑
pass@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""Depends on upstream_asset by naming it as a parameter."""
return {"processed": upstream_asset}@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""通过将上游资产作为参数传入来建立依赖。"""
return {"processed": upstream_asset}@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="Cleaned customer data",
)
def customers() -> None:
passcustomersdaily_revenueload_customers@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="清洗后的客户数据",
)
def customers() -> None:
passcustomersdaily_revenueload_customersfrom dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# Implementation here
passfrom dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# 此处编写实现逻辑
pass@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # Daily at midnight
)import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # 每日午夜执行
)| Pattern | Meaning |
|---|---|
| Every hour |
| Daily at midnight |
| Weekly on Monday |
| Monthly on the 1st |
| Monthly on the 5th |
| 表达式 | 含义 |
|---|---|
| 每小时执行一次 |
| 每日午夜执行 |
| 每周一执行 |
| 每月1日执行 |
| 每月5日执行 |
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. Read cursor (previous state)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. Check for changes
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. Return result with updated cursor
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. 读取游标(之前的状态)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. 检查变更
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. 返回更新游标后的结果
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # e.g., "2023-01-01"
# Process data for this partitionweekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # 示例:"2023-01-01"
# 处理该分区的数据region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_keyregion_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_key| Type | Use Case |
|---|---|
| One partition per day |
| One partition per week |
| One partition per month |
| Fixed set of partitions |
| Combine multiple partition dimensions |
| 类型 | 使用场景 |
|---|---|
| 按天划分分区 |
| 按周划分分区 |
| 按月划分分区 |
| 固定集合的分区 |
| 组合多个分区维度 |
def test_my_asset():
result = my_asset()
assert result == expected_valuedef test_my_asset():
result = my_asset()
assert result == expected_valuedef test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expecteddef test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expectedfrom unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.successfrom unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.success@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)dbt-developmentdbt-developmentfrom dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)references/assets.mdreferences/assets.mdreferences/resources.mdreferences/resources.mdConfigurableResourceConfigurableResourcereferences/automation.mdreferences/automation.mdreferences/testing.mdreferences/testing.mddg.materialize()dg.materialize()references/etl-patterns.mdreferences/etl-patterns.mdreferences/project-structure.mdreferences/project-structure.mdDefinitionsdgDefinitionsdgmy_project/
├── pyproject.toml
├── src/
│ └── my_project/
│ ├── definitions.py # Main Definitions
│ └── defs/
│ ├── assets/
│ │ ├── __init__.py
│ │ └── my_assets.py
│ ├── jobs.py
│ ├── schedules.py
│ ├── sensors.py
│ └── resources.py
└── tests/
└── test_assets.pymy_project/
├── pyproject.toml
├── src/
│ └── my_project/
│ ├── definitions.py # 主Definitions文件
│ └── defs/
│ ├── assets/
│ │ ├── __init__.py
│ │ └── my_assets.py
│ ├── jobs.py
│ ├── schedules.py
│ ├── sensors.py
│ └── resources.py
└── tests/
└── test_assets.pyundefinedundefinedundefinedundefinedundefinedundefined
---
---trip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)trip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""Depends on external_table which isn't managed by Dagster."""
pass@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""依赖于未被Dagster管理的external_table。"""
pass| Anti-Pattern | Better Approach |
|---|---|
| Hardcoding credentials in assets | Use |
| Giant assets that do everything | Split into focused, composable assets |
| Ignoring asset return types | Use type annotations for clarity |
| Skipping tests for assets | Test assets like regular Python functions |
| Not using partitions for time-series | Use |
| Putting all assets in one file | Organize by domain in separate modules |
| 反模式 | 更佳方案 |
|---|---|
| 在资产中硬编码凭证 | 使用带环境变量的 |
| 包揽所有功能的巨型资产 | 拆分为专注、可组合的资产 |
| 忽略资产返回类型 | 使用类型注解提升清晰度 |
| 不为资产编写测试 | 像测试普通Python函数一样测试资产 |
| 不为时间序列数据使用分区 | 使用 |
| 将所有资产放在一个文件中 | 按领域划分到不同模块 |
undefinedundefined
---
---references/assets.mdreferences/resources.mdreferences/automation.mdreferences/testing.mdreferences/etl-patterns.mdreferences/project-structure.mdreferences/assets.mdreferences/resources.mdreferences/automation.mdreferences/testing.mdreferences/etl-patterns.mdreferences/project-structure.md