databricks-pipelines

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Lakeflow Spark Declarative Pipelines Development

Lakeflow Spark Declarative Pipelines 开发

FIRST: Use the parent
databricks-core
skill for CLI basics, authentication, profile selection, and data discovery commands.
首要提示:请使用父级
databricks-core
技能了解CLI基础、身份验证、配置文件选择和数据发现命令。

Decision Tree

决策树

Use this tree to determine which dataset type and features to use. Multiple features can apply to the same dataset — e.g., a Streaming Table can use Auto Loader for ingestion, Append Flows for fan-in, and Expectations for data quality. Choose the dataset type first, then layer on applicable features.
User request → What kind of output?
├── Intermediate/reusable logic (not persisted) → Temporary View
│   ├── Preprocessing/filtering before Auto CDC → Temporary View feeding CDC flow
│   ├── Shared intermediate streaming logic reused by multiple downstream tables
│   ├── Pipeline-private helper logic (not published to catalog)
│   └── Published to UC for external queries → Persistent View (SQL only)
├── Persisted dataset
│   ├── Source is streaming/incremental/continuously growing → Streaming Table
│   │   ├── File ingestion (cloud storage, Volumes) → Auto Loader
│   │   ├── Message bus (Kafka, Kinesis, Pub/Sub, Pulsar, Event Hubs) → streaming source read
│   │   ├── Existing streaming/Delta table → streaming read from table
│   │   ├── CDC / upserts / track changes / keep latest per key / SCD Type 1 or 2 → Auto CDC
│   │   ├── Multiple sources into one table → Append Flows (NOT union)
│   │   ├── Historical backfill + live stream → one-time Append Flow + regular flow
│   │   └── Windowed aggregation with watermark → stateful streaming
│   └── Source is batch/historical/full scan → Materialized View
│       ├── Aggregation/join across full dataset (GROUP BY, SUM, COUNT, etc.)
│       ├── Gold layer aggregation from streaming table → MV with batch read (spark.read / no STREAM)
│       ├── JDBC/Federation/external batch sources
│       └── Small static file load (reference data, no streaming read)
├── Output to external system (Python only) → Sink
│   ├── Existing external table not managed by this pipeline → Sink with format="delta"
│   │   (prefer fully-qualified dataset names if the pipeline should own the table — see Publishing Modes)
│   ├── Kafka / Event Hubs → Sink with format="kafka" + @dp.append_flow(target="sink_name")
│   ├── Custom destination not natively supported → Sink with custom format
│   ├── Custom merge/upsert logic per batch → ForEachBatch Sink (Public Preview)
│   └── Multiple destinations per batch → ForEachBatch Sink (Public Preview)
└── Data quality constraints → Expectations (on any dataset type)
使用此决策树确定要使用的数据集类型和功能。同一数据集可以应用多个功能——例如,Streaming Table可以使用Auto Loader进行数据摄入、Append Flows实现扇入、Expectations保障数据质量。请先选择数据集类型,再叠加适用的功能。
User request → What kind of output?
├── Intermediate/reusable logic (not persisted) → Temporary View
│   ├── Preprocessing/filtering before Auto CDC → Temporary View feeding CDC flow
│   ├── Shared intermediate streaming logic reused by multiple downstream tables
│   ├── Pipeline-private helper logic (not published to catalog)
│   └── Published to UC for external queries → Persistent View (SQL only)
├── Persisted dataset
│   ├── Source is streaming/incremental/continuously growing → Streaming Table
│   │   ├── File ingestion (cloud storage, Volumes) → Auto Loader
│   │   ├── Message bus (Kafka, Kinesis, Pub/Sub, Pulsar, Event Hubs) → streaming source read
│   │   ├── Existing streaming/Delta table → streaming read from table
│   │   ├── CDC / upserts / track changes / keep latest per key / SCD Type 1 or 2 → Auto CDC
│   │   ├── Multiple sources into one table → Append Flows (NOT union)
│   │   ├── Historical backfill + live stream → one-time Append Flow + regular flow
│   │   └── Windowed aggregation with watermark → stateful streaming
│   └── Source is batch/historical/full scan → Materialized View
│       ├── Aggregation/join across full dataset (GROUP BY, SUM, COUNT, etc.)
│       ├── Gold layer aggregation from streaming table → MV with batch read (spark.read / no STREAM)
│       ├── JDBC/Federation/external batch sources
│       └── Small static file load (reference data, no streaming read)
├── Output to external system (Python only) → Sink
│   ├── Existing external table not managed by this pipeline → Sink with format="delta"
│   │   (prefer fully-qualified dataset names if the pipeline should own the table — see Publishing Modes)
│   ├── Kafka / Event Hubs → Sink with format="kafka" + @dp.append_flow(target="sink_name")
│   ├── Custom destination not natively supported → Sink with custom format
│   ├── Custom merge/upsert logic per batch → ForEachBatch Sink (Public Preview)
│   └── Multiple destinations per batch → ForEachBatch Sink (Public Preview)
└── Data quality constraints → Expectations (on any dataset type)

Common Traps

常见误区

  • "Create a table" without specifying type → ask whether the source is streaming or batch
  • Materialized View from streaming source is an error → use a Streaming Table instead, or switch to a batch read
  • Streaming Table from batch source is an error → use a Materialized View instead, or switch to a streaming read
  • Aggregation over streaming table → use a Materialized View with batch read (
    spark.read.table
    /
    SELECT FROM
    without
    STREAM
    ), NOT a Streaming Table. This is the correct pattern for Gold layer aggregation.
  • Aggregation over batch/historical data → use a Materialized View, not a Streaming Table. MVs recompute or incrementally refresh aggregates to stay correct; STs are append-only and don't recompute when source data changes.
  • Preprocessing before Auto CDC → use a Temporary View to filter/transform the source before feeding into the CDC flow. SQL: the CDC flow reads from the view via
    STREAM(view_name)
    . Python: use
    spark.readStream.table("view_name")
    .
  • Intermediate logic → default to Temporary View → Use a Temporary View for intermediate/preprocessing logic, even when reused by multiple downstream tables. Only consider a Private MV/ST (
    private=True
    /
    CREATE PRIVATE ...
    ) when the computation is expensive and materializing once would save significant reprocessing.
  • View vs Temporary View → Persistent Views publish to Unity Catalog (SQL only), Temporary Views are pipeline-private
  • Union of streams → use multiple Append Flows. Do NOT present UNION as an alternative — it is an anti-pattern for streaming sources.
  • Changing dataset type → cannot change ST→MV or MV→ST without manually dropping the existing table first. Full refresh does NOT help. Rename the new dataset instead.
  • SQL
    OR REFRESH
    → Prefer
    CREATE OR REFRESH
    over bare
    CREATE
    for SQL dataset definitions. Both work identically, but
    OR REFRESH
    is the idiomatic convention. For PRIVATE datasets:
    CREATE OR REFRESH PRIVATE STREAMING TABLE
    /
    CREATE OR REFRESH PRIVATE MATERIALIZED VIEW
    .
  • Kafka/Event Hubs sink serialization → The
    value
    column is mandatory. Use
    to_json(struct(*)) AS value
    to serialize the entire row as JSON. Read the sink skill for details.
  • Multi-column sequencing in Auto CDC → SQL:
    SEQUENCE BY STRUCT(col1, col2)
    . Python:
    sequence_by=struct("col1", "col2")
    . Read the auto-cdc skill for details.
  • Auto CDC supports TRUNCATE (SCD Type 1 only) → SQL:
    APPLY AS TRUNCATE WHEN condition
    . Python:
    apply_as_truncates=expr("condition")
    . Do NOT say truncate is unsupported.
  • Python-only features → Sinks, ForEachBatch Sinks, CDC from snapshots, and custom data sources are Python-only. When the user is working in SQL, explicitly clarify this and suggest switching to Python.
  • MV incremental refresh → Materialized Views on serverless pipelines support automatic incremental refresh for aggregations. Mention the serverless requirement when discussing incremental refresh.
  • Recommend ONE clear approach → Present a single recommended approach. Do NOT present anti-patterns or significantly inferior alternatives — it confuses users. Only mention alternatives if they are genuinely viable for different trade-offs.
  • 用户仅提出**"创建一个表"**但未指定类型 → 询问数据源是流处理还是批处理类型
  • 从流数据源创建Materialized View属于错误操作 → 请改用Streaming Table,或切换为批读取模式
  • 从批数据源创建Streaming Table属于错误操作 → 请改用Materialized View,或切换为流读取模式
  • 对Streaming Table执行聚合 → 请使用带批读取的Materialized View(
    spark.read.table
    / 不带
    STREAM
    SELECT FROM
    ),不要使用Streaming Table,这是黄金层聚合的正确实现模式
  • 对批/历史数据执行聚合 → 请使用Materialized View,不要使用Streaming Table。MV会重新计算或增量刷新聚合结果以保证准确性;ST是仅追加模式,源数据变更时不会重新计算
  • Auto CDC前的预处理操作 → 请使用Temporary View对源数据进行过滤/转换后再输入CDC流。SQL实现:CDC流通过
    STREAM(view_name)
    读取视图数据;Python实现:使用
    spark.readStream.table("view_name")
  • 中间逻辑默认使用Temporary View → 中间/预处理逻辑请优先使用Temporary View,即使会被多个下游表复用。仅当计算成本极高,物化一次可以节省大量重复计算时,才考虑使用私有MV/ST(
    private=True
    /
    CREATE PRIVATE ...
  • 视图与临时视图的区别 → Persistent Views会发布到Unity Catalog(仅支持SQL),Temporary Views是管道私有资源
  • 流的合并操作 → 请使用多个Append Flows,不要使用UNION作为替代方案——这是流数据源的反模式
  • 变更数据集类型 → 无法直接将ST转为MV或MV转为ST,需要先手动删除现有表,全量刷新无法解决该问题,建议直接重命名新数据集
  • SQL
    OR REFRESH
    语法 → SQL数据集定义优先使用
    CREATE OR REFRESH
    而非单纯的
    CREATE
    ,两者功能一致,但
    OR REFRESH
    是通用约定写法。私有数据集写法:
    CREATE OR REFRESH PRIVATE STREAMING TABLE
    /
    CREATE OR REFRESH PRIVATE MATERIALIZED VIEW
  • Kafka/Event Hubs sink序列化 →
    value
    列为必填项,使用
    to_json(struct(*)) AS value
    将整行序列化为JSON格式,详情可参考sink相关技能文档
  • Auto CDC中的多列排序 → SQL写法:
    SEQUENCE BY STRUCT(col1, col2)
    ;Python写法:
    sequence_by=struct("col1", "col2")
    ,详情可参考auto-cdc相关技能文档
  • Auto CDC支持TRUNCATE操作(仅SCD Type 1) → SQL写法:
    APPLY AS TRUNCATE WHEN condition
    ;Python写法:
    apply_as_truncates=expr("condition")
    ,请勿声明不支持truncate操作
  • 仅Python支持的功能 → Sinks、ForEachBatch Sinks、快照来源CDC、自定义数据源仅支持Python实现。当用户使用SQL开发时,请明确说明该限制并建议切换到Python
  • MV增量刷新 → 无服务器(serverless)管道上的Materialized Views支持聚合的自动增量刷新,讨论增量刷新时请提及无服务器环境要求
  • 推荐唯一明确的方案 → 仅给出一个推荐实现方案,不要展示反模式或明显更差的替代方案,避免混淆用户。仅当不同方案存在合理的取舍权衡时,才提及替代方案

Publishing Modes

发布模式

Pipelines use a default catalog and schema configured in the pipeline settings. All datasets are published there unless overridden.
  • Fully-qualified names: Use
    catalog.schema.table
    in the dataset name to write to a different catalog/schema than the pipeline default. The pipeline creates the dataset there directly — no Sink needed.
  • USE CATALOG / USE SCHEMA: SQL commands that change the current catalog/schema for all subsequent definitions in the same file.
  • LIVE prefix: Deprecated. Ignored in the default publishing mode.
  • When reading or defining datasets within the pipeline, use the dataset name only — do NOT use fully-qualified names unless the pipeline already does so or the user explicitly requests a different target catalog/schema.
管道使用配置中设置的默认catalog和schema,所有数据集默认发布到该位置,除非另行指定。
  • 全限定名称:数据集名称使用
    catalog.schema.table
    格式可以写入到非管道默认的其他catalog/schema下,管道会直接在对应位置创建数据集,无需Sink
  • USE CATALOG / USE SCHEMA
    :SQL命令可以修改当前文件后续所有定义的默认catalog/schema
  • LIVE
    前缀:已弃用,默认发布模式下会被忽略
  • 在管道内部读取或定义数据集时,仅使用数据集名称即可——不要使用全限定名称,除非管道本身已使用该写法,或用户明确要求写入其他目标catalog/schema

Comprehensive API Reference

完整API参考

MANDATORY: Before implementing, editing, or suggesting any code for a feature, you MUST read the linked reference file for that feature. NO exceptions — always look up the reference before writing code.
Some features require reading multiple skills together:
  • Auto Loader → also read the streaming-table skill (Auto Loader produces a streaming DataFrame, so the target is a streaming table) and look up format-specific options for the file format being loaded
  • Auto CDC → also read the streaming-table skill (Auto CDC always targets a streaming table)
  • Sinks → also read the streaming-table skill (sinks use streaming append flows)
  • Expectations → also read the corresponding dataset definition skill to ensure constraints are correctly placed
强制要求:在为任何功能实现、编辑或提供代码建议前,你必须阅读该功能对应的参考文档,无例外——编写代码前务必查阅参考资料。
部分功能需要同时阅读多个技能文档:
  • Auto Loader → 还需阅读streaming-table技能文档(Auto Loader生成流DataFrame,因此目标为流表),并查阅所加载文件格式的特定配置选项
  • Auto CDC → 还需阅读streaming-table技能文档(Auto CDC的目标始终是流表)
  • Sinks → 还需阅读streaming-table技能文档(sink使用流append flow)
  • Expectations → 还需阅读对应数据集定义的技能文档,确保约束放置位置正确

Dataset Definition APIs

数据集定义API

FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Streaming Table
@dp.table()
returning streaming DF
@dlt.table()
returning streaming DF
CREATE OR REFRESH STREAMING TABLE
CREATE STREAMING LIVE TABLE
streaming-table-pythonstreaming-table-sql
Materialized View
@dp.materialized_view()
@dlt.table()
returning batch DF
CREATE OR REFRESH MATERIALIZED VIEW
CREATE LIVE TABLE
(batch)
materialized-view-pythonmaterialized-view-sql
Temporary View
@dp.temporary_view()
@dlt.view()
,
@dp.view()
CREATE TEMPORARY VIEW
CREATE TEMPORARY LIVE VIEW
temporary-view-pythontemporary-view-sql
Persistent View (UC)N/A — SQL only
CREATE VIEW
view-sql
Streaming Table (explicit)
dp.create_streaming_table()
dlt.create_streaming_table()
CREATE OR REFRESH STREAMING TABLE
(no AS)
streaming-table-pythonstreaming-table-sql
FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Streaming Table
@dp.table()
returning streaming DF
@dlt.table()
returning streaming DF
CREATE OR REFRESH STREAMING TABLE
CREATE STREAMING LIVE TABLE
streaming-table-pythonstreaming-table-sql
Materialized View
@dp.materialized_view()
@dlt.table()
returning batch DF
CREATE OR REFRESH MATERIALIZED VIEW
CREATE LIVE TABLE
(batch)
materialized-view-pythonmaterialized-view-sql
Temporary View
@dp.temporary_view()
@dlt.view()
,
@dp.view()
CREATE TEMPORARY VIEW
CREATE TEMPORARY LIVE VIEW
temporary-view-pythontemporary-view-sql
Persistent View (UC)N/A — SQL only
CREATE VIEW
view-sql
Streaming Table (explicit)
dp.create_streaming_table()
dlt.create_streaming_table()
CREATE OR REFRESH STREAMING TABLE
(no AS)
streaming-table-pythonstreaming-table-sql

Flow and Sink APIs

Flow与Sink API

FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Append Flow
@dp.append_flow()
@dlt.append_flow()
CREATE FLOW ... INSERT INTO
streaming-table-pythonstreaming-table-sql
Backfill Flow
@dp.append_flow(once=True)
@dlt.append_flow(once=True)
CREATE FLOW ... INSERT INTO ... ONCE
streaming-table-pythonstreaming-table-sql
Sink (Delta/Kafka/EH/custom)
dp.create_sink()
dlt.create_sink()
N/A — Python onlysink-python
ForEachBatch Sink
@dp.foreach_batch_sink()
N/A — Python onlyforeach-batch-sink-python
FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Append Flow
@dp.append_flow()
@dlt.append_flow()
CREATE FLOW ... INSERT INTO
streaming-table-pythonstreaming-table-sql
Backfill Flow
@dp.append_flow(once=True)
@dlt.append_flow(once=True)
CREATE FLOW ... INSERT INTO ... ONCE
streaming-table-pythonstreaming-table-sql
Sink (Delta/Kafka/EH/custom)
dp.create_sink()
dlt.create_sink()
N/A — Python onlysink-python
ForEachBatch Sink
@dp.foreach_batch_sink()
N/A — Python onlyforeach-batch-sink-python

CDC APIs

CDC API

FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Auto CDC (streaming source)
dp.create_auto_cdc_flow()
dlt.apply_changes()
,
dp.apply_changes()
AUTO CDC INTO ... FROM STREAM
APPLY CHANGES INTO ... FROM STREAM
auto-cdc-pythonauto-cdc-sql
Auto CDC (periodic snapshot)
dp.create_auto_cdc_from_snapshot_flow()
dlt.apply_changes_from_snapshot()
N/A — Python onlyauto-cdc-python
FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Auto CDC (streaming source)
dp.create_auto_cdc_flow()
dlt.apply_changes()
,
dp.apply_changes()
AUTO CDC INTO ... FROM STREAM
APPLY CHANGES INTO ... FROM STREAM
auto-cdc-pythonauto-cdc-sql
Auto CDC (periodic snapshot)
dp.create_auto_cdc_from_snapshot_flow()
dlt.apply_changes_from_snapshot()
N/A — Python onlyauto-cdc-python

Data Quality APIs

数据质量API

FeaturePython (current)Python (deprecated)SQL (current)Skill (Py)Skill (SQL)
Expect (warn)
@dp.expect()
@dlt.expect()
CONSTRAINT ... EXPECT (...)
expectations-pythonexpectations-sql
Expect or drop
@dp.expect_or_drop()
@dlt.expect_or_drop()
CONSTRAINT ... EXPECT (...) ON VIOLATION DROP ROW
expectations-pythonexpectations-sql
Expect or fail
@dp.expect_or_fail()
@dlt.expect_or_fail()
CONSTRAINT ... EXPECT (...) ON VIOLATION FAIL UPDATE
expectations-pythonexpectations-sql
Expect all (warn)
@dp.expect_all({})
@dlt.expect_all({})
Multiple
CONSTRAINT
clauses
expectations-pythonexpectations-sql
Expect all or drop
@dp.expect_all_or_drop({})
@dlt.expect_all_or_drop({})
Multiple constraints with
DROP ROW
expectations-pythonexpectations-sql
Expect all or fail
@dp.expect_all_or_fail({})
@dlt.expect_all_or_fail({})
Multiple constraints with
FAIL UPDATE
expectations-pythonexpectations-sql
FeaturePython (current)Python (deprecated)SQL (current)Skill (Py)Skill (SQL)
Expect (warn)
@dp.expect()
@dlt.expect()
CONSTRAINT ... EXPECT (...)
expectations-pythonexpectations-sql
Expect or drop
@dp.expect_or_drop()
@dlt.expect_or_drop()
CONSTRAINT ... EXPECT (...) ON VIOLATION DROP ROW
expectations-pythonexpectations-sql
Expect or fail
@dp.expect_or_fail()
@dlt.expect_or_fail()
CONSTRAINT ... EXPECT (...) ON VIOLATION FAIL UPDATE
expectations-pythonexpectations-sql
Expect all (warn)
@dp.expect_all({})
@dlt.expect_all({})
Multiple
CONSTRAINT
clauses
expectations-pythonexpectations-sql
Expect all or drop
@dp.expect_all_or_drop({})
@dlt.expect_all_or_drop({})
Multiple constraints with
DROP ROW
expectations-pythonexpectations-sql
Expect all or fail
@dp.expect_all_or_fail({})
@dlt.expect_all_or_fail({})
Multiple constraints with
FAIL UPDATE
expectations-pythonexpectations-sql

Reading Data APIs

数据读取API

FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Batch read (pipeline dataset)
spark.read.table("name")
dp.read("name")
,
dlt.read("name")
SELECT ... FROM name
SELECT ... FROM LIVE.name
Streaming read (pipeline dataset)
spark.readStream.table("name")
dp.read_stream("name")
,
dlt.read_stream("name")
SELECT ... FROM STREAM name
SELECT ... FROM STREAM LIVE.name
Auto Loader (cloud files)
spark.readStream.format("cloudFiles")
STREAM read_files(...)
auto-loader-pythonauto-loader-sql
Kafka source
spark.readStream.format("kafka")
STREAM read_kafka(...)
Kinesis source
spark.readStream.format("kinesis")
STREAM read_kinesis(...)
Pub/Sub source
spark.readStream.format("pubsub")
STREAM read_pubsub(...)
Pulsar source
spark.readStream.format("pulsar")
STREAM read_pulsar(...)
Event Hubs source
spark.readStream.format("kafka")
+ EH config
STREAM read_kafka(...)
+ EH config
JDBC / Lakehouse Federation
spark.read.format("postgresql")
etc.
Direct table ref via federation catalog
Custom data source
spark.read[Stream].format("custom")
N/A — Python only
Static file read (batch)
spark.read.format("json"|"csv"|...).load()
read_files(...)
(no STREAM)
Skip upstream change commits
.option("skipChangeCommits", "true")
read_stream("name", skipChangeCommits => true)
streaming-table-pythonstreaming-table-sql
FeaturePython (current)Python (deprecated)SQL (current)SQL (deprecated)Skill (Py)Skill (SQL)
Batch read (pipeline dataset)
spark.read.table("name")
dp.read("name")
,
dlt.read("name")
SELECT ... FROM name
SELECT ... FROM LIVE.name
Streaming read (pipeline dataset)
spark.readStream.table("name")
dp.read_stream("name")
,
dlt.read_stream("name")
SELECT ... FROM STREAM name
SELECT ... FROM STREAM LIVE.name
Auto Loader (cloud files)
spark.readStream.format("cloudFiles")
STREAM read_files(...)
auto-loader-pythonauto-loader-sql
Kafka source
spark.readStream.format("kafka")
STREAM read_kafka(...)
Kinesis source
spark.readStream.format("kinesis")
STREAM read_kinesis(...)
Pub/Sub source
spark.readStream.format("pubsub")
STREAM read_pubsub(...)
Pulsar source
spark.readStream.format("pulsar")
STREAM read_pulsar(...)
Event Hubs source
spark.readStream.format("kafka")
+ EH config
STREAM read_kafka(...)
+ EH config
JDBC / Lakehouse Federation
spark.read.format("postgresql")
etc.
Direct table ref via federation catalog
Custom data source
spark.read[Stream].format("custom")
N/A — Python only
Static file read (batch)
spark.read.format("json"|"csv"|...).load()
read_files(...)
(no STREAM)
Skip upstream change commits
.option("skipChangeCommits", "true")
read_stream("name", skipChangeCommits => true)
streaming-table-pythonstreaming-table-sql

Table/Schema Feature APIs

表/ Schema功能API

FeaturePython (current)SQL (current)Skill (Py)Skill (SQL)
Liquid clustering
cluster_by=[...]
CLUSTER BY (col1, col2)
materialized-view-pythonmaterialized-view-sql
Auto liquid clustering
cluster_by_auto=True
CLUSTER BY AUTO
materialized-view-pythonmaterialized-view-sql
Partition columns
partition_cols=[...]
PARTITIONED BY (col1, col2)
materialized-view-pythonmaterialized-view-sql
Table properties
table_properties={...}
TBLPROPERTIES (...)
materialized-view-pythonmaterialized-view-sql
Explicit schema
schema="col1 TYPE, ..."
(col1 TYPE, ...) AS
materialized-view-pythonmaterialized-view-sql
Generated columns
schema="..., col TYPE GENERATED ALWAYS AS (expr)"
col TYPE GENERATED ALWAYS AS (expr)
materialized-view-pythonmaterialized-view-sql
Row filter (Public Preview)
row_filter="ROW FILTER fn ON (col)"
WITH ROW FILTER fn ON (col)
materialized-view-pythonmaterialized-view-sql
Column mask (Public Preview)
schema="..., col TYPE MASK fn USING COLUMNS (col2)"
col TYPE MASK fn USING COLUMNS (col2)
materialized-view-pythonmaterialized-view-sql
Private dataset
private=True
CREATE PRIVATE ...
materialized-view-pythonmaterialized-view-sql
FeaturePython (current)SQL (current)Skill (Py)Skill (SQL)
Liquid clustering
cluster_by=[...]
CLUSTER BY (col1, col2)
materialized-view-pythonmaterialized-view-sql
Auto liquid clustering
cluster_by_auto=True
CLUSTER BY AUTO
materialized-view-pythonmaterialized-view-sql
Partition columns
partition_cols=[...]
PARTITIONED BY (col1, col2)
materialized-view-pythonmaterialized-view-sql
Table properties
table_properties={...}
TBLPROPERTIES (...)
materialized-view-pythonmaterialized-view-sql
Explicit schema
schema="col1 TYPE, ..."
(col1 TYPE, ...) AS
materialized-view-pythonmaterialized-view-sql
Generated columns
schema="..., col TYPE GENERATED ALWAYS AS (expr)"
col TYPE GENERATED ALWAYS AS (expr)
materialized-view-pythonmaterialized-view-sql
Row filter (Public Preview)
row_filter="ROW FILTER fn ON (col)"
WITH ROW FILTER fn ON (col)
materialized-view-pythonmaterialized-view-sql
Column mask (Public Preview)
schema="..., col TYPE MASK fn USING COLUMNS (col2)"
col TYPE MASK fn USING COLUMNS (col2)
materialized-view-pythonmaterialized-view-sql
Private dataset
private=True
CREATE PRIVATE ...
materialized-view-pythonmaterialized-view-sql

Import / Module APIs

导入/模块API

CurrentDeprecatedNotes
from pyspark import pipelines as dp
import dlt
Both work. Prefer
dp
. Do NOT change existing
dlt
imports.
spark.read.table()
/
spark.readStream.table()
dp.read()
/
dp.read_stream()
/
dlt.read()
/
dlt.read_stream()
Deprecated reads still work. Prefer
spark.*
.
LIVE.
prefix
Fully deprecated. NEVER use. Causes errors in newer pipelines.
CREATE LIVE TABLE
/
CREATE LIVE VIEW
Fully deprecated. Use
CREATE STREAMING TABLE
/
CREATE MATERIALIZED VIEW
/
CREATE TEMPORARY VIEW
.
CurrentDeprecatedNotes
from pyspark import pipelines as dp
import dlt
Both work. Prefer
dp
. Do NOT change existing
dlt
imports.
spark.read.table()
/
spark.readStream.table()
dp.read()
/
dp.read_stream()
/
dlt.read()
/
dlt.read_stream()
Deprecated reads still work. Prefer
spark.*
.
LIVE.
prefix
Fully deprecated. NEVER use. Causes errors in newer pipelines.
CREATE LIVE TABLE
/
CREATE LIVE VIEW
Fully deprecated. Use
CREATE STREAMING TABLE
/
CREATE MATERIALIZED VIEW
/
CREATE TEMPORARY VIEW
.

Language-specific guides

语言特定指南

Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables / DLT) is a framework for building batch and streaming data pipelines.
Lakeflow Spark Declarative Pipelines(前身为Delta Live Tables / DLT)是用于构建批处理和流处理数据管道的框架。

Scaffolding a New Pipeline Project

新管道项目脚手架搭建

Use
databricks bundle init
with a config file to scaffold non-interactively. This creates a project in the
<project_name>/
directory:
bash
databricks bundle init lakeflow-pipelines --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') --profile <PROFILE> < /dev/null
  • project_name
    : letters, numbers, underscores only
  • language
    :
    python
    or
    sql
    . Ask the user which they prefer:
    • SQL: Recommended for straightforward transformations (filters, joins, aggregations)
    • Python: Recommended for complex logic (custom UDFs, ML, advanced processing)
After scaffolding, create
CLAUDE.md
and
AGENTS.md
in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content:
undefined
使用
databricks bundle init
搭配配置文件可以非交互式生成项目脚手架,会在
<project_name>/
目录下创建项目结构:
bash
databricks bundle init lakeflow-pipelines --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') --profile <PROFILE> < /dev/null
  • project_name
    :仅支持字母、数字、下划线
  • language
    :可选
    python
    sql
    ,请询问用户偏好:
    • SQL:推荐用于简单转换场景(过滤、关联、聚合)
    • Python:推荐用于复杂逻辑场景(自定义UDF、机器学习、高级处理)
脚手架生成后,请在项目目录下创建
CLAUDE.md
AGENTS.md
文件,这些文件用于为Agent提供项目开发指导,内容如下:
undefined

Declarative Automation Bundles Project

Declarative Automation Bundles Project

This project uses Declarative Automation Bundles (formerly Databricks Asset Bundles) for deployment.
This project uses Declarative Automation Bundles (formerly Databricks Asset Bundles) for deployment.

Prerequisites

Prerequisites

Install the Databricks CLI (>= v0.288.0) if not already installed:
  • macOS:
    brew tap databricks/tap && brew install databricks
  • Linux:
    curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
  • Windows:
    winget install Databricks.DatabricksCLI
Verify:
databricks -v
Install the Databricks CLI (>= v0.288.0) if not already installed:
  • macOS:
    brew tap databricks/tap && brew install databricks
  • Linux:
    curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
  • Windows:
    winget install Databricks.DatabricksCLI
Verify:
databricks -v

For AI Agents

For AI Agents

Read the
databricks-core
skill for CLI basics, authentication, and deployment workflow. Read the
databricks-pipelines
skill for pipeline-specific guidance.
If skills are not available, install them:
databricks experimental aitools skills install
undefined
Read the
databricks-core
skill for CLI basics, authentication, and deployment workflow. Read the
databricks-pipelines
skill for pipeline-specific guidance.
If skills are not available, install them:
databricks experimental aitools skills install
undefined

Pipeline Structure

管道结构

  • Follow the medallion architecture pattern (Bronze → Silver → Gold) unless the user specifies otherwise
  • Use the convention of 1 dataset per file, named after the dataset
  • Place transformation files in a
    src/
    or
    transformations/
    folder
my-pipeline-project/
├── databricks.yml                        # Bundle configuration
├── resources/
│   ├── my_pipeline.pipeline.yml          # Pipeline definition
│   └── my_pipeline_job.job.yml           # Scheduling job (optional)
└── src/
    ├── my_table.py (or .sql)             # One dataset per file
    ├── another_table.py (or .sql)
    └── ...
  • 除非用户另行指定,否则遵循medallion架构模式(青铜→白银→黄金层)
  • 遵循一个文件对应一个数据集的约定,文件名与数据集名称一致
  • 转换代码文件放置在
    src/
    transformations/
    目录下
my-pipeline-project/
├── databricks.yml                        # Bundle configuration
├── resources/
│   ├── my_pipeline.pipeline.yml          # Pipeline definition
│   └── my_pipeline_job.job.yml           # Scheduling job (optional)
└── src/
    ├── my_table.py (or .sql)             # One dataset per file
    ├── another_table.py (or .sql)
    └── ...

Scheduling Pipelines

管道调度

To schedule a pipeline, add a job that triggers it in
resources/<name>.job.yml
:
yaml
resources:
  jobs:
    my_pipeline_job:
      trigger:
        periodic:
          interval: 1
          unit: DAYS
      tasks:
        - task_key: refresh_pipeline
          pipeline_task:
            pipeline_id: ${resources.pipelines.my_pipeline.id}
如需调度管道,在
resources/<name>.job.yml
中添加触发管道的任务:
yaml
resources:
  jobs:
    my_pipeline_job:
      trigger:
        periodic:
          interval: 1
          unit: DAYS
      tasks:
        - task_key: refresh_pipeline
          pipeline_task:
            pipeline_id: ${resources.pipelines.my_pipeline.id}

Running Pipelines

运行管道

You must deploy before running. In local development, code changes only take effect after
databricks bundle deploy
. Always deploy before any run, dry run, or selective refresh.
  • Selective refresh is preferred when you only need to run one table. For selective refresh it is important that dependencies are already materialized.
  • Full refresh is the most expensive and dangerous option, and can lead to data loss, so it should be used only when really necessary. Always suggest this as a follow-up that the user explicitly needs to select.
运行前必须先部署。本地开发时,代码变更仅在执行
databricks bundle deploy
后生效,任何运行、试运行、选择性刷新操作前都必须先部署。
  • 仅需要运行单个表时优先使用选择性刷新,使用选择性刷新的前提是依赖已经完成物化
  • 全量刷新是成本最高、风险最大的操作,可能导致数据丢失,仅在确有必要时使用,请仅作为用户主动选择的后续选项提出

Development Workflow

开发工作流

  1. Validate:
    databricks bundle validate --profile <profile>
  2. Deploy:
    databricks bundle deploy -t dev --profile <profile>
  3. Run pipeline:
    databricks bundle run <pipeline_name> -t dev --profile <profile>
  4. Check status:
    databricks pipelines get --pipeline-id <id> --profile <profile>
  1. 校验
    databricks bundle validate --profile <profile>
  2. 部署
    databricks bundle deploy -t dev --profile <profile>
  3. 运行管道
    databricks bundle run <pipeline_name> -t dev --profile <profile>
  4. 检查状态
    databricks pipelines get --pipeline-id <id> --profile <profile>

Pipeline API Reference

管道API参考

Detailed reference guides for each pipeline API. Read the relevant guide before writing pipeline code.
  • Write Spark Declarative Pipelines — Core syntax and rules (Python, SQL)
  • Streaming Tables — Continuous data stream processing (Python, SQL)
  • Materialized Views — Physically stored query results with incremental refresh (Python, SQL)
  • Views — Reusable query logic published to Unity Catalog (SQL)
  • Temporary Views — Pipeline-private views (Python, SQL)
  • Auto Loader — Incrementally ingest files from cloud storage (Python, SQL)
  • Auto CDC — Process Change Data Capture feeds, SCD Type 1 & 2 (Python, SQL)
  • Expectations — Define and enforce data quality constraints (Python, SQL)
  • Sinks — Write to Kafka, Event Hubs, external Delta tables (Python)
  • ForEachBatch Sinks — Custom streaming sink with per-batch Python logic (Python)
各管道API的详细参考指南,编写管道代码前请阅读对应指南
  • Write Spark Declarative Pipelines — Core syntax and rules (Python, SQL)
  • Streaming Tables — Continuous data stream processing (Python, SQL)
  • Materialized Views — Physically stored query results with incremental refresh (Python, SQL)
  • Views — Reusable query logic published to Unity Catalog (SQL)
  • Temporary Views — Pipeline-private views (Python, SQL)
  • Auto Loader — Incrementally ingest files from cloud storage (Python, SQL)
  • Auto CDC — Process Change Data Capture feeds, SCD Type 1 & 2 (Python, SQL)
  • Expectations — Define and enforce data quality constraints (Python, SQL)
  • Sinks — Write to Kafka, Event Hubs, external Delta tables (Python)
  • ForEachBatch Sinks — Custom streaming sink with per-batch Python logic (Python)