dagster-development
Original:🇺🇸 English
Translated
Expert guidance for Dagster data orchestration including assets, resources, schedules, sensors, partitions, testing, and ETL patterns. Use when building or extending Dagster projects, writing assets, configuring automation, or integrating with dbt/dlt/Sling.
6installs
Added on
NPX Install
npx skill4agent add c00ldudenoonan/economic-data-project dagster-developmentTags
Translated version includes tags in frontmatterSKILL.md Content
View Translation Comparison →Dagster Development Expert
Quick Reference
| If you're writing... | Check this section/reference |
|---|---|
| Assets or |
| Resources or |
| Automation or |
| Sensors or |
| Partitions or |
Tests with | Testing or |
| |
| |
| dbt Integration or |
| |
Core Concepts
Asset: A persistent object (table, file, model) that your pipeline produces. Define with .
@dg.assetResource: External services/tools (databases, APIs) shared across assets. Define with .
ConfigurableResourceJob: A selection of assets to execute together. Create with .
dg.define_asset_job()Schedule: Time-based automation for jobs. Create with .
dg.ScheduleDefinitionSensor: Event-driven automation that watches for changes. Define with .
@dg.sensorPartition: Logical divisions of data (by date, category). Define with .
PartitionsDefinitionDefinitions: The container for all Dagster objects in a code location.
Assets Quick Reference
Basic Asset
python
import dagster as dg
@dg.asset
def my_asset() -> None:
"""Asset description appears in the UI."""
# Your computation logic here
passAsset with Dependencies
python
@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""Depends on upstream_asset by naming it as a parameter."""
return {"processed": upstream_asset}Asset with Metadata
python
@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="Cleaned customer data",
)
def customers() -> None:
passNaming: Use nouns describing what is produced (, ), not verbs ().
customersdaily_revenueload_customersResources Quick Reference
Define a Resource
python
from dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# Implementation here
passUse in Assets
python
@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")Register in Definitions
python
dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)Automation Quick Reference
Schedule
python
import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # Daily at midnight
)Common Cron Patterns
| Pattern | Meaning |
|---|---|
| Every hour |
| Daily at midnight |
| Weekly on Monday |
| Monthly on the 1st |
| Monthly on the 5th |
Sensors Quick Reference
Basic Sensor Pattern
python
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. Read cursor (previous state)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. Check for changes
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. Return result with updated cursor
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)Key: Use cursors to track state between sensor evaluations.
Partitions Quick Reference
Time-Based Partition
python
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # e.g., "2023-01-01"
# Process data for this partitionStatic Partition
python
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_keyPartition Types
| Type | Use Case |
|---|---|
| One partition per day |
| One partition per week |
| One partition per month |
| Fixed set of partitions |
| Combine multiple partition dimensions |
Testing Quick Reference
Direct Function Testing
python
def test_my_asset():
result = my_asset()
assert result == expected_valueTesting with Materialization
python
def test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expectedMocking Resources
python
from unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.successAsset Checks
python
@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)dbt Integration
For dbt integration, use the minimal pattern below. For comprehensive dbt patterns, see the skill.
dbt-developmentBasic dbt Assets
python
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()dbt Resource
python
dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)Full patterns: See Dagster dbt docs
When to Load References
Load references/assets.md
when:
references/assets.md- Defining complex asset dependencies
- Adding metadata, groups, or key prefixes
- Working with asset factories
- Understanding asset materialization patterns
Load references/resources.md
when:
references/resources.md- Creating custom classes
ConfigurableResource - Integrating with databases, APIs, or cloud services
- Understanding resource scoping and lifecycle
Load references/automation.md
when:
references/automation.md- Creating schedules with complex cron patterns
- Building sensors with cursors and state management
- Implementing partitions and backfills
- Automating dbt or other integration runs
Load references/testing.md
when:
references/testing.md- Writing unit tests for assets
- Mocking resources and dependencies
- Using for integration tests
dg.materialize() - Creating asset checks for data validation
Load references/etl-patterns.md
when:
references/etl-patterns.md- Using dlt for embedded ETL
- Using Sling for database replication
- Loading data from files or APIs
- Integrating external ETL tools
Load references/project-structure.md
when:
references/project-structure.md- Setting up a new Dagster project
- Configuring and code locations
Definitions - Using CLI for scaffolding
dg - Organizing large projects with Components
Project Structure
Recommended Layout
my_project/
├── pyproject.toml
├── src/
│ └── my_project/
│ ├── definitions.py # Main Definitions
│ └── defs/
│ ├── assets/
│ │ ├── __init__.py
│ │ └── my_assets.py
│ ├── jobs.py
│ ├── schedules.py
│ ├── sensors.py
│ └── resources.py
└── tests/
└── test_assets.pyDefinitions Pattern (Modern)
python
# src/my_project/definitions.py
from pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(project_root=Path(__file__).parent.parent.parent)Scaffolding with dg CLI
bash
# Create new project
uvx create-dagster my_project
# Scaffold new asset file
dg scaffold defs dagster.asset assets/new_asset.py
# Scaffold schedule
dg scaffold defs dagster.schedule schedules.py
# Scaffold sensor
dg scaffold defs dagster.sensor sensors.py
# Validate definitions
dg check defsCommon Patterns
Job Definition
python
trip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)Run Configuration
python
from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")Asset Dependencies with External Sources
python
@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""Depends on external_table which isn't managed by Dagster."""
passAnti-Patterns to Avoid
| Anti-Pattern | Better Approach |
|---|---|
| Hardcoding credentials in assets | Use |
| Giant assets that do everything | Split into focused, composable assets |
| Ignoring asset return types | Use type annotations for clarity |
| Skipping tests for assets | Test assets like regular Python functions |
| Not using partitions for time-series | Use |
| Putting all assets in one file | Organize by domain in separate modules |
CLI Quick Reference
bash
# Development
dg dev # Start Dagster UI
dg check defs # Validate definitions
# Scaffolding
dg scaffold defs dagster.asset assets/file.py
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.sensor sensors.py
# Production
dagster job execute -j my_job # Execute a job
dagster asset materialize -a my_asset # Materialize an assetReferences
- Assets: - Detailed asset patterns
references/assets.md - Resources: - Resource configuration
references/resources.md - Automation: - Schedules, sensors, partitions
references/automation.md - Testing: - Testing patterns and asset checks
references/testing.md - ETL Patterns: - dlt, Sling, file/API ingestion
references/etl-patterns.md - Project Structure: - Definitions, Components
references/project-structure.md - Official Docs: https://docs.dagster.io
- API Reference: https://docs.dagster.io/api/dagster