spark-declarative-pipelines

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Lakeflow Spark Declarative Pipelines (SDP)

Lakeflow Spark声明式管道(SDP)

IMPORTANT: If this is a new pipeline (one does not already exist), see Quick Start. Be sure to use whatever language user has specified only (Python or SQL). Be sure to use Databricks Asset Bundles for new projects.

重要提示:如果是新建管道(当前不存在该管道),请查看快速入门。务必仅使用用户指定的语言(Python或SQL)。新项目请务必使用Databricks Asset Bundles。

Critical Rules (always follow)

核心规则(必须遵守)

  • MUST confirm language as Python or SQL. Stick with that language unless told otherwise.
  • MUST if not modifying an existing pipeline, use Quick Start below.
  • MUST create serverless pipelines by default. ** Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.
  • 必须确认使用的语言是Python还是SQL。除非另有说明,否则始终使用该语言。
  • 必须如果不是修改现有管道,请使用下方的快速入门
  • 必须默认创建无服务器管道。仅当用户明确要求使用R语言、Spark RDD API或JAR库时,才使用经典集群。

Required Steps

必要步骤

Copy this checklist and verify each item:
- [ ] Language selected: Python or SQL
- [ ] Compute type decided: serverless or classic compute
- [ ] Decide on multiple catalogs or schemas vs. all in one default schema
- [ ] Consider what should be parameterized at the pipeline level to make deployment easy.
- [ ] Consider [Multi-Schema Patterns](#multi-schema-patterns) below, ask if unclear on best choices.
- [ ] Consider [Modern Defaults](#modern-defaults) below, ask if unclear on best choices.
复制以下检查清单并逐一验证:
- [ ] 已选择语言:Python或SQL
- [ ] 已确定计算类型:无服务器或经典计算
- [ ] 已确定使用多目录/多Schema还是全部放在默认Schema中
- [ ] 考虑哪些内容需要在管道层面参数化,以便简化部署
- [ ] 参考下方的[多Schema模式](#多-schema-模式),若对最佳选择有疑问请询问用户
- [ ] 参考下方的[现代默认配置](#现代默认配置),若对最佳选择有疑问请询问用户

Quick Start: Initialize New Pipeline Project

快速入门:初始化新管道项目

RECOMMENDED: Use
databricks pipelines init
to create production-ready Asset Bundle projects with multi-environment support.
推荐:使用
databricks pipelines init
命令创建支持多环境的生产级Asset Bundle项目。

When to Use Bundle Initialization

何时使用Bundle初始化

Use bundle initialization for New pipeline projects for a professional structure from the start
Use manual workflow for:
  • Quick prototyping without multi-environment needs
  • Existing manual projects you want to continue
  • Learning/experimentation
对于新管道项目,使用Bundle初始化可以从一开始就搭建专业的项目结构
在以下场景使用手动工作流:
  • 无需多环境支持的快速原型开发
  • 希望继续维护的现有手动项目
  • 学习/实验场景

Step 1: Initialize Project

步骤1:初始化项目

I will automatically run this command when you request a new pipeline:
bash
databricks pipelines init
Interactive Prompts:
  • Project name: e.g.,
    customer_orders_pipeline
  • Initial catalog: Unity Catalog name (e.g.,
    main
    ,
    prod_catalog
    )
  • Personal schema per user?:
    yes
    for dev (each user gets their own schema),
    no
    for prod
  • Language: SQL or Python (auto-detected from your request - see language detection below)
Generated Structure:
my_pipeline/
├── databricks.yml              # Multi-environment config (dev/prod)
├── resources/
│   └── *_etl.pipeline.yml      # Pipeline resource definition
└── src/
    └── *_etl/
        ├── explorations/       # Exploratory code in .ipynb
        └── transformations/    # Your .sql or .py files here
当你请求创建新管道时,我会自动运行以下命令:
bash
databricks pipelines init
交互式提示:
  • 项目名称:例如
    customer_orders_pipeline
  • 初始目录:Unity Catalog名称(例如
    main
    prod_catalog
  • 是否为每个用户分配独立Schema?:开发环境选
    yes
    (每个用户拥有独立Schema),生产环境选
    no
  • 语言:SQL或Python(根据你的请求自动检测 - 详见下文的语言检测规则)
生成的项目结构:
my_pipeline/
├── databricks.yml              # 多环境配置(开发/生产)
├── resources/
│   └── *_etl.pipeline.yml      # 管道资源定义
└── src/
    └── *_etl/
        ├── explorations/       # .ipynb格式的探索性代码
        └── transformations/    # 存放你的.sql或.py文件

Step 2: Customize Transformations

步骤2:自定义转换逻辑

Replace the example code created by the init process with custom transformation files in
src/transformations/
based on provided requirements, using best practice guidance from this skill.
For Python pipelines using cloudFiles: Ask the user where to store Auto Loader schema metadata. Recommend:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas
根据需求,将初始化过程生成的示例代码替换为
src/transformations/
目录下的自定义转换文件,并遵循本技能中的最佳实践指导。
对于使用cloudFiles的Python管道:询问用户Auto Loader Schema元数据的存储位置。推荐路径:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas

Step 3: Deploy and Run

步骤3:部署并运行

bash
undefined
bash
undefined

Deploy to workspace (dev by default)

部署到工作区(默认部署到开发环境)

databricks bundle deploy
databricks bundle deploy

Run pipeline

运行管道

databricks bundle run my_pipeline_etl
databricks bundle run my_pipeline_etl

Deploy to production

部署到生产环境

databricks bundle deploy --target prod
undefined
databricks bundle deploy --target prod
undefined

Quick Reference

快速参考

ConceptDetails
NamesSDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines = Lakeflow Pipelines (all interchangeable)
Python Import
from pyspark import pipelines as dp
Primary Decorators
@dp.table()
,
@dp.materialized_view()
,
@dp.temporary_view()
Temporary Views
@dp.temporary_view()
creates in-pipeline temporary views (no catalog/schema, no cluster_by). Useful for intermediate logic before AUTO CDC or when a view needs multiple references without persistence.
ReplacesDelta Live Tables (DLT) with
import dlt
Based OnApache Spark 4.1+ (Databricks' modern data pipeline framework)
Docshttps://docs.databricks.com/aws/en/ldp/developer/python-dev

概念详情
名称对应SDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines = Lakeflow Pipelines(以上名称可互换)
Python导入语句
from pyspark import pipelines as dp
主要装饰器
@dp.table()
@dp.materialized_view()
@dp.temporary_view()
临时视图
@dp.temporary_view()
用于创建管道内临时视图(无目录/Schema,无cluster_by)。适用于AUTO CDC之前的中间逻辑,或当视图需要被多次引用但无需持久化的场景。
替代方案替代使用
import dlt
的Delta Live Tables (DLT)
基于框架Apache Spark 4.1+(Databricks的现代数据管道框架)
官方文档https://docs.databricks.com/aws/en/ldp/developer/python-dev

Detailed guides

详细指南

Ingestion patterns: Use 1-ingestion-patterns.md when planning how to get new data into your Lakeflow pipeline —- covers file formats, batch/streaming options, and tips for incremental and full loads. (Keywords: Auto Loader, Kafka, Event Hub, Kinesis, file formats)
Streaming pipeline patterns: See 2-streaming-patterns.md for designing pipelines with streaming data sources, change data detection, triggers, and windowing. (Keywords: deduplication, windowing, stateful operations, joins)
SCD query patterns: See 3-scd-query-patterns.md for querying Slowly Changing Dimensions Type 2 history tables, including current state queries, point-in-time analysis, temporal joins, and change tracking. (Keywords: SCD Type 2 history tables, temporal joins, querying historical data)
Performance tuning: Use 4-performance-tuning.md for optimizing pipelines with Liquid Clustering, state management, and best practices for high-performance streaming workloads. (Keywords: Liquid Clustering, optimization, state management)
Python API reference: See 5-python-api.md for the modern
pyspark.pipelines
(dp) API reference and migration from legacy
dlt
API patterns. (Keywords: dp API, dlt API comparison)
DLT migration: Use 6-dlt-migration.md when migrating existing Delta Live Tables (DLT) pipelines to Spark Declarative Pipelines (SDP). (Keywords: migrating DLT pipelines to SDP)
Advanced configuration: See 7-advanced-configuration.md for advanced pipeline settings including development mode, continuous execution, notifications, Python dependencies, and custom cluster configurations. (Keywords: extra_settings parameter reference, examples)
Project initialization: Use 8-project-initialization.md for setting up new pipeline projects with
databricks pipelines init
, Asset Bundles, multi-environment deployments, and language detection logic. (Keywords: databricks pipelines init, Asset Bundles, language detection, migration guides)
AUTO CDC patterns: Use 9-auto_cdc.md for implementing Change Data Capture with AUTO CDC, including Slow Changing Dimensions (SCD Type 1 and Type 2) for tracking changes and deduplication. (Keywords: AUTO CDC, Slow Changing Dimension, SCD, SCD Type 1, SCD Type 2, change data capture, deduplication)

摄入模式:规划如何将新数据接入Lakeflow管道时,请参考1-ingestion-patterns.md —— 涵盖文件格式、批处理/流处理选项,以及增量和全量加载的技巧。(关键词:Auto Loader、Kafka、Event Hub、Kinesis、文件格式)
流管道模式:设计流数据源管道、变更数据检测、触发器和窗口操作时,请参考2-streaming-patterns.md。(关键词:去重、窗口操作、有状态操作、关联)
SCD查询模式:查询慢变维度Type 2历史表时,请参考3-scd-query-patterns.md,包括当前状态查询、时点分析、时态关联和变更跟踪。(关键词:SCD Type 2历史表、时态关联、历史数据查询)
性能调优:使用4-performance-tuning.md优化管道,包括Liquid Clustering、状态管理,以及高性能流处理工作负载的最佳实践。(关键词:Liquid Clustering、优化、状态管理)
Python API参考:参考5-python-api.md获取现代
pyspark.pipelines
(dp) API参考,以及从旧版
dlt
API迁移的指南。(关键词:dp API、dlt API对比)
DLT迁移:将现有Delta Live Tables (DLT)管道迁移到Spark声明式管道(SDP)时,请参考6-dlt-migration.md。(关键词:将DLT管道迁移到SDP)
高级配置:参考7-advanced-configuration.md获取高级管道设置,包括开发模式、持续执行、通知、Python依赖和自定义集群配置。(关键词:extra_settings参数参考、示例)
项目初始化:参考8-project-initialization.md了解如何使用
databricks pipelines init
、Asset Bundles、多环境部署和语言检测逻辑搭建新管道项目。(关键词:databricks pipelines init、Asset Bundles、语言检测、迁移指南)
AUTO CDC模式:实现变更数据捕获时,请参考9-auto_cdc.md,包括慢变维度(SCD Type 1和Type 2)的变更跟踪和去重。(关键词:AUTO CDC、慢变维度、SCD、SCD Type 1、SCD Type 2、变更数据捕获、去重)

Workflow

工作流

  1. Determine the task type:
    Setting up new project? → Read 8-project-initialization.md first Creating new pipeline? → Read 1-ingestion-patterns.md Creating stream table? → Read 2-streaming-patterns.md Querying SCD history tables? → Read 3-scd-query-patterns.md Implementing AUTO CDC or SCD? → Read 9-auto_cdc.md Performance issues? → Read 4-performance-tuning.md Using Python API? → Read 5-python-api.md Migrating from DLT? → Read 6-dlt-migration.md Advanced configuration? → Read 7-advanced-configuration.md Validating? → Read validation-checklist.md
  2. Follow the instructions in the relevant guide
  3. Repeat for next task type

  1. 确定任务类型:
    搭建新项目? → 先阅读8-project-initialization.md 创建新管道? → 阅读1-ingestion-patterns.md 创建流表? → 阅读2-streaming-patterns.md 查询SCD历史表? → 阅读3-scd-query-patterns.md 实现AUTO CDC或SCD? → 阅读9-auto_cdc.md 性能问题? → 阅读4-performance-tuning.md 使用Python API? → 阅读5-python-api.md 从DLT迁移? → 阅读6-dlt-migration.md 高级配置? → 阅读7-advanced-configuration.md 验证? → 阅读validation-checklist.md
  2. 遵循对应指南中的说明
  3. 重复上述步骤处理下一个任务

Official Documentation

官方文档

Medallion Architecture Pattern

Medallion分层架构模式

Bronze Layer (Raw)
  • Raw data ingested from sources in original format
  • Minimal transformations (append-only, add metadata like
    _ingested_at
    ,
    _source_file
    )
  • Single source of truth preserving data lineage
Silver Layer (Validated)
  • Cleaned and validated data.
  • Might deduplicate here with auto_cdc, but often wait until the final step for auto_cdc if possible.
  • Business logic applied (type casting, quality checks, filtering invalid records)
  • Enterprise view of key business entities
  • Enables self-service analytics and ML
Gold Layer (Business-Ready)
  • Aggregated, denormalized, project-specific tables
  • Optimized for consumption (reporting, dashboards, BI tools)
  • Fewer joins, read-optimized data models
  • Kimball star schema tables - dim_<entity_name>, fact_<entity_name>
  • Deduplication often happens here via Slow Changing Dimensions (SCD), using auto_cdc. Sometimes that will happen upstream in silver instead, such as when joining multiple tables or business users plan to query the table from silver.
Typical Flow (Can vary)
Bronze: read_files() or spark.readStream.format("cloudFiles") → streaming table
Silver: read bronze → filter/clean/validate → streaming table Gold: read silver → aggregate/denormalize → auto_cdc or materialized view
Sources:
For medallion architecture (bronze/silver/gold), two approaches work:
  • Flat with naming (template default):
    bronze_*.sql
    ,
    silver_*.sql
    ,
    gold_*.sql
  • Subdirectories:
    bronze/orders.sql
    ,
    silver/cleaned.sql
    ,
    gold/summary.sql
Both work with the
transformations/**
glob pattern. Choose based on preference.
See 8-project-initialization.md for complete details on bundle initialization, migration, and troubleshooting.

青铜层(原始数据)
  • 从数据源以原始格式摄入的原始数据
  • 仅做最小程度的转换(仅追加,添加
    _ingested_at
    _source_file
    等元数据)
  • 保留数据血缘的单一事实源
白银层(已验证数据)
  • 经过清洗和验证的数据。
  • 可在此处通过auto_cdc去重,但如果可能,通常会等到最后一步再执行auto_cdc。
  • 应用业务逻辑(类型转换、质量检查、过滤无效记录)
  • 关键业务实体的企业级视图
  • 支持自助分析和机器学习
黄金层(业务就绪数据)
  • 经过聚合、反规范化的项目特定表
  • 针对消费场景优化(报表、仪表盘、BI工具)
  • 减少关联操作,数据模型为读优化
  • Kimball星型模型表 - dim_<实体名称>、fact_<实体名称>
  • 通常通过慢变维度(SCD)在此处执行去重,使用auto_cdc。有时也会在白银层上游执行,例如当需要关联多个表或业务用户计划直接查询白银层表时。
典型流程(可按需调整)
青铜层:read_files()或spark.readStream.format("cloudFiles") → 流表
白银层:读取青铜层数据 → 过滤/清洗/验证 → 流表 黄金层:读取白银层数据 → 聚合/反规范化 → auto_cdc或物化视图
参考来源:
对于Medallion分层架构(青铜/白银/黄金),有两种实现方式:
  • 带命名前缀的扁平结构(模板默认):
    bronze_*.sql
    silver_*.sql
    gold_*.sql
  • 子目录结构
    bronze/orders.sql
    silver/cleaned.sql
    gold/summary.sql
两种方式都支持
transformations/**
通配符模式。可根据偏好选择。
有关Bundle初始化、迁移和故障排除的完整详情,请参考**8-project-initialization.md**。

General SDP development guidance

SDP开发通用指南

Step 1: Write Pipeline Files Locally

步骤1:在本地编写管道文件

Create
.sql
or
.py
files in a local folder:
my_pipeline/
├── bronze/
│   ├── ingest_orders.sql       # SQL (default for most cases)
│   └── ingest_events.py        # Python (for complex logic)
├── silver/
│   └── clean_orders.sql
└── gold/
    └── daily_summary.sql
SQL Example (
bronze/ingest_orders.sql
):
sql
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
  *,
  current_timestamp() AS _ingested_at,
  _metadata.file_path AS _source_file
FROM read_files(
  '/Volumes/catalog/schema/raw/orders/',
  format => 'json',
  schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);
Python Example (
bronze/ingest_events.py
):
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp
在本地文件夹中创建
.sql
.py
文件:
my_pipeline/
├── bronze/
│   ├── ingest_orders.sql       # SQL(大多数场景的默认选择)
│   └── ingest_events.py        # Python(用于复杂逻辑)
├── silver/
│   └── clean_orders.sql
└── gold/
    └── daily_summary.sql
SQL示例
bronze/ingest_orders.sql
):
sql
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
  *,
  current_timestamp() AS _ingested_at,
  _metadata.file_path AS _source_file
FROM read_files(
  '/Volumes/catalog/schema/raw/orders/',
  format => 'json',
  schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);
Python示例
bronze/ingest_events.py
):
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp

Get schema location from pipeline configuration

从管道配置中获取Schema存储位置

schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"]) def bronze_events(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events") .load("/Volumes/catalog/schema/raw/events/") .withColumn("_ingested_at", current_timestamp()) .withColumn("_source_file", col("_metadata.file_path")) )

**IMPORTANT for Python Pipelines**: When using `spark.readStream.format("cloudFiles")` for cloud storage ingestion, with schema inference (no schema specified), you **must specify a schema location**.

**Always ask the user** where to store Auto Loader schema metadata. Recommend:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas

Example: `/Volumes/my_catalog/pipeline_metadata/orders_pipeline_metadata/schemas`

**Never use the source data volume** - this causes permission conflicts. The schema location should be configured in the pipeline settings and accessed via `spark.conf.get("schema_location_base")`.

**Language Selection:**

**CRITICAL RULE**: If the user explicitly mentions "Python" in their request (e.g., "Python Spark Declarative Pipeline", "Python SDP", "use Python"), **ALWAYS use Python without asking**. The same applies to SQL - if they say "SQL pipeline", use SQL.

- **Explicit language request**: User says "Python" → Use Python. User says "SQL" → Use SQL. **Do not ask for clarification.**
- **Auto-detection** (only when no explicit language mentioned):
  - **SQL indicators**: "sql files", "simple transformations", "aggregations", "materialized view", "CREATE OR REFRESH"
  - **Python indicators**: ".py files", "UDF", "complex logic", "ML inference", "external API", "@dp.table", "pandas", "decorator"
- **Prompt for clarification** only when language intent is truly ambiguous (no explicit mention, mixed signals)
- **Default to SQL** only when ambiguous AND no Python indicators present

See **[8-project-initialization.md](8-project-initialization.md)** for detailed language detection logic.
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"]) def bronze_events(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events") .load("/Volumes/catalog/schema/raw/events/") .withColumn("_ingested_at", current_timestamp()) .withColumn("_source_file", col("_metadata.file_path")) )

**Python管道重要提示**:当使用`spark.readStream.format("cloudFiles")`从云存储摄入数据且启用Schema推断(未指定Schema)时,**必须指定Schema存储位置**。

**请始终询问用户**Auto Loader Schema元数据的存储位置。推荐路径:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas

示例:`/Volumes/my_catalog/pipeline_metadata/orders_pipeline_metadata/schemas`

**绝不要使用源数据卷** - 这会导致权限冲突。Schema存储位置应在管道设置中配置,并通过`spark.conf.get("schema_location_base")`访问。

**语言选择规则:**

**核心规则**:如果用户在请求中明确提及"Python"(例如"Python Spark Declarative Pipeline"、"Python SDP"、"使用Python"),**始终使用Python,无需询问**。SQL同理 - 如果用户说"SQL pipeline",则使用SQL。

- **明确语言请求**:用户说"Python" → 使用Python;用户说"SQL" → 使用SQL。**无需进一步确认。**
- **自动检测**(仅当未明确提及语言时):
  - **SQL标识**:"sql files"、"简单转换"、"聚合"、"物化视图"、"CREATE OR REFRESH"
  - **Python标识**:".py files"、"UDF"、"复杂逻辑"、"ML推理"、"外部API"、"@dp.table"、"pandas"、"装饰器"
- **仅当语言意图真正模糊时**(未明确提及,信号混合)才提示确认
- **仅当模糊且无Python标识时**默认使用SQL

有关详细的语言检测逻辑,请参考**[8-project-initialization.md](8-project-initialization.md)**。

Option 1: Pipelines with DABs:

选项1:基于DAB的管道:

Use asset bundles and pipeline CLI. See Quick Start and 8-project-initialization.md for complete details.
使用Asset Bundles和管道CLI。 有关完整详情,请参考快速入门和**8-project-initialization.md**。

Option 2: Manual Workflow (Advanced)

选项2:手动工作流(高级)

For rapid prototyping, experimentation, or when you prefer direct control without Asset Bundles, use the manual workflow with MCP tools.
Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is
create_or_update_pipeline
which handles the entire lifecycle.
IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly ask for classic, pro, advances compute or requires R language, Spark RDD APIs, or JAR libraries.
See 10-mcp-approach.md for detailed guide.
对于快速原型开发、实验或偏好不使用Asset Bundles直接控制的场景,可使用MCP工具的手动工作流。
使用MCP工具创建、运行和迭代无服务器SDP管道。核心工具是
create_or_update_pipeline
,它处理整个生命周期。
**重要提示:始终默认创建无服务器管道。**仅当用户明确要求经典、专业或高级计算,或需要使用R语言、Spark RDD API或JAR库时,才使用经典集群。
有关详细指南,请参考**10-mcp-approach.md**。

Best Practices (2026)

最佳实践(2026)

Project Structure

项目结构

  • Default to
    databricks pipelines init
    for new projects (creates Asset Bundle)
  • Use Asset Bundles for multi-environment deployments (dev/staging/prod)
  • Manual structure only for quick prototypes or legacy migration
  • Medallion architecture: Two approaches work with Asset Bundles:
    • Flat structure (template default):
      bronze_*.sql
      ,
      silver_*.sql
      ,
      gold_*.sql
      in
      transformations/
    • Subdirectories:
      transformations/bronze/
      ,
      transformations/silver/
      ,
      transformations/gold/
    • Both work with the
      transformations/**
      glob pattern - choose based on team preference
  • See 8-project-initialization.md for project setup details
  • **默认使用
    databricks pipelines init
    **创建新项目(生成Asset Bundle)
  • 使用Asset Bundles实现多环境部署(开发/预发布/生产)
  • 仅在快速原型或遗留系统迁移时使用手动结构
  • Medallion分层架构:Asset Bundles支持两种实现方式:
    • 扁平结构(模板默认):
      transformations/
      目录下的
      bronze_*.sql
      silver_*.sql
      gold_*.sql
    • 子目录结构
      transformations/bronze/
      transformations/silver/
      transformations/gold/
    • 两种方式都支持
      transformations/**
      通配符模式 - 可根据团队偏好选择
  • 有关项目设置的详细信息,请参考**8-project-initialization.md**

Minimal pipeline config pointers

管道配置要点

  • Define parameters in your pipeline’s configuration and access them in code with spark.conf.get("key").
  • In Databricks Asset Bundles, set these under resources.pipelines.<pipeline>.configuration; validate with databricks bundle validate.
  • 在管道配置中定义参数,并通过spark.conf.get("key")在代码中访问。
  • 在Databricks Asset Bundles中,将参数添加到
    resources.pipelines.<pipeline>.configuration
    下;使用
    databricks bundle validate
    验证。

Modern Defaults

现代默认配置

  • CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
  • Raw
    .sql
    /
    .py
    files
    , not notebooks
  • Serverless compute ONLY - Do not use classic clusters unless explicitly required
  • Unity Catalog (required for serverless)
  • read_files() when using SQL for cloud storage ingestion - see 1-ingestion-patterns.md
  • 使用CLUSTER BY(Liquid Clustering),而非PARTITION BY - 参考4-performance-tuning.md
  • 使用原生
    .sql
    /
    .py
    文件
    ,而非笔记本
  • 仅使用无服务器计算 - 除非明确要求,否则不使用经典集群
  • 使用Unity Catalog(无服务器管道必需)
  • **使用read_files()**在SQL中实现云存储摄入 - 参考1-ingestion-patterns.md

Multi-Schema Patterns

多Schema模式

Default: Single target schema per pipeline. Each pipeline has one target
catalog
and
schema
where all tables are written.
**默认:每个管道对应一个目标Schema。**每个管道有一个目标
catalog
schema
,所有表都写入该位置。

Option 1: Single Pipeline, Single Schema with Prefixes (Recommended)

选项1:单管道、单Schema加前缀(推荐)

Use one schema with table name prefixes to distinguish layers:
python
undefined
使用一个Schema,通过表名前缀区分不同层级:
python
undefined

All tables write to: catalog.schema.bronze_, silver_, gold_*

所有表写入:catalog.schema.bronze_、silver_、gold_*

@dp.table(name="bronze_orders") # → catalog.schema.bronze_orders @dp.table(name="silver_orders") # → catalog.schema.silver_orders @dp.table(name="gold_summary") # → catalog.schema.gold_summary

**Advantages:**
- Simpler configuration (one pipeline)
- All tables in one schema for easy discovery
@dp.table(name="bronze_orders") # → catalog.schema.bronze_orders @dp.table(name="silver_orders") # → catalog.schema.silver_orders @dp.table(name="gold_summary") # → catalog.schema.gold_summary

**优势:**
- 配置更简单(仅一个管道)
- 所有表在一个Schema中,便于发现

Option 2:

选项2:

Use varaiables to specific separate catalog and/or schema for different steps.
Below are Python SDP examples that source variables from pipeline configs via spark.conf.get, and use the default catalog/schema for bronze.
使用变量为不同步骤指定独立的catalog和/或schema。
以下是Python SDP示例,通过spark.conf.get从管道配置中获取变量,青铜层使用默认的catalog/schema。
Same catalog, separate schemas; bronze uses pipeline defaults
同一catalog,不同schema;青铜层使用管道默认配置
  • Set your pipeline’s default catalog and default schema to the bronze layer (for example, catalog=my_catalog, schema=bronze). When you omit catalog/schema in code, reads/writes go to these defaults.
  • Use pipeline parameters for the other schemas and any source schema/path, retrieved in code with spark.conf.get(...).
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col
  • 将管道的默认catalog和默认schema设置为青铜层(例如catalog=my_catalog,schema=bronze)。当代码中省略catalog/schema时,读写操作将使用这些默认值。
  • 使用管道参数配置其他schema和源schema/路径,在代码中通过spark.conf.get(...)获取。
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

Pull variables from pipeline configuration parameters

从管道配置参数中获取变量

silver_schema = spark.conf.get("silver_schema") # e.g., "silver" gold_schema = spark.conf.get("gold_schema") # e.g., "gold" landing_schema = spark.conf.get("landing_schema") # e.g., "landing"
silver_schema = spark.conf.get("silver_schema") # 例如"silver" gold_schema = spark.conf.get("gold_schema") # 例如"gold" landing_schema = spark.conf.get("landing_schema") # 例如"landing"

Bronze → uses default catalog/schema (set to bronze in pipeline settings)

青铜层 → 使用默认catalog/schema(在管道设置中配置为青铜层)

@dp.table(name="orders_bronze") def orders_bronze(): # Read from another schema in the same default catalog return spark.readStream.table(f"{landing_schema}.orders_raw")
@dp.table(name="orders_bronze") def orders_bronze(): # 从同一默认catalog下的另一个schema读取数据 return spark.readStream.table(f"{landing_schema}.orders_raw")

Silver → same catalog, schema from parameter

白银层 → 同一catalog,schema来自参数

@dp.table(name=f"{silver_schema}.orders_clean") def orders_clean(): return (spark.read.table("orders_bronze") # unqualified = default catalog/schema .filter(col("order_id").isNotNull()))
@dp.table(name=f"{silver_schema}.orders_clean") def orders_clean(): return (spark.read.table("orders_bronze") # 非限定名称 = 默认catalog/schema .filter(col("order_id").isNotNull()))

Gold → same catalog, schema from parameter

黄金层 → 同一catalog,schema来自参数

@dp.materialized_view(name=f"{gold_schema}.orders_by_date") def orders_by_date(): return (spark.read.table(f"{silver_schema}.orders_clean") .groupBy("order_date") .count().withColumnRenamed("count", "order_count"))
- Using unqualified names for bronze ensures it lands in the pipeline’s default catalog/schema; silver/gold are explicitly schema-qualified within the same catalog.

---
@dp.materialized_view(name=f"{gold_schema}.orders_by_date") def orders_by_date(): return (spark.read.table(f"{silver_schema}.orders_clean") .groupBy("order_date") .count().withColumnRenamed("count", "order_count"))
- 青铜层使用非限定名称确保数据写入管道的默认catalog/schema;白银/黄金层在同一catalog内使用明确的schema限定名称。

---
Custom catalog/schema per layer; bronze still uses pipeline defaults
每个层级使用自定义catalog/schema;青铜层仍使用管道默认配置
  • Keep bronze in the pipeline defaults (default catalog/schema set to your bronze layer). For silver/gold, use fully-qualified names with catalog and schema variables from pipeline configuration.
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col
  • 青铜层保留管道默认配置(默认catalog/schema设置为青铜层)。对于白银/黄金层,使用来自管道配置的catalog和schema变量拼接完全限定名称。
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

Pull variables from pipeline configuration parameters

从管道配置参数中获取变量

silver_catalog = spark.conf.get("silver_catalog") # e.g., "my_catalog" silver_schema = spark.conf.get("silver_schema") # e.g., "silver" gold_catalog = spark.conf.get("gold_catalog") # e.g., "my_catalog" gold_schema = spark.conf.get("gold_schema") # e.g., "gold" landing_catalog = spark.conf.get("landing_catalog") # optional, if source is in another catalog landing_schema = spark.conf.get("landing_schema")
silver_catalog = spark.conf.get("silver_catalog") # 例如"my_catalog" silver_schema = spark.conf.get("silver_schema") # 例如"silver" gold_catalog = spark.conf.get("gold_catalog") # 例如"my_catalog" gold_schema = spark.conf.get("gold_schema") # 例如"gold" landing_catalog = spark.conf.get("landing_catalog") # 可选,如果源数据在其他catalog中 landing_schema = spark.conf.get("landing_schema")

Bronze → uses default catalog/schema (set to bronze)

青铜层 → 使用默认catalog/schema(设置为青铜层)

@dp.table(name="orders_bronze") def orders_bronze(): # If source is in a specified catalog/schema: return spark.readStream.table(f"{landing_catalog}.{landing_schema}.orders_raw")
@dp.table(name="orders_bronze") def orders_bronze(): # 如果源数据在指定的catalog/schema中: return spark.readStream.table(f"{landing_catalog}.{landing_schema}.orders_raw")

Silver → custom catalog + schema via parameters

白银层 → 通过参数指定自定义catalog + schema

@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean") def orders_clean(): # Read bronze by its unqualified name (defaults), or fully qualify if preferred return (spark.read.table("orders_bronze") .filter(col("order_id").isNotNull()))
@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean") def orders_clean(): # 使用非限定名称读取青铜层数据(默认配置),也可选择完全限定名称 return (spark.read.table("orders_bronze") .filter(col("order_id").isNotNull()))

Gold → custom catalog + schema via parameters

黄金层 → 通过参数指定自定义catalog + schema

@dp.materialized_view(name=f"{gold_catalog}.{gold_schema}.orders_by_date}") def orders_by_date(): return (spark.read.table(f"{silver_catalog}.{silver_schema}.orders_clean") .groupBy("order_date") .count().withColumnRenamed("count", "order_count"))
- Multipart names in the decorator’s name argument let you publish to explicit catalog.schema targets within one pipeline.
- Unqualified reads/writes use the pipeline defaults; use fully-qualified names when crossing catalogs or when you need explicit namespace control.

---


**Note:** The `@dp.table()` decorator does not currently support separate for `schema=` or `catalog=` parameters. The table parameter is a string that contains the catalog.schema.table_name, or it can leave off catalog and or schema to use the pipeilnes configured default target schema.
@dp.materialized_view(name=f"{gold_catalog}.{gold_schema}.orders_by_date") def orders_by_date(): return (spark.read.table(f"{silver_catalog}.{silver_schema}.orders_clean") .groupBy("order_date") .count().withColumnRenamed("count", "order_count"))
- 装饰器的name参数中使用多部分名称,可在一个管道内将表发布到明确的catalog.schema目标位置。
- 非限定读写操作使用管道默认配置;跨catalog或需要明确命名空间控制时使用完全限定名称。

---


**注意:**`@dp.table()`装饰器目前不支持单独的`schema=`或`catalog=`参数。table参数是一个包含catalog.schema.table_name的字符串,也可省略catalog和/或schema以使用管道配置的默认目标schema。

Reading Tables in Python

Python中读取表的最佳实践

Modern SDP Best Practice:
  • Use
    spark.read.table()
    for batch reads
  • Use
    spark.readStream.table()
    for streaming reads
  • Don't use
    dp.read()
    or
    dp.read_stream()
    (old syntax, no longer documented)
  • Don't use
    dlt.read()
    or
    dlt.read_stream()
    (legacy DLT API)
Key Point: SDP automatically tracks table dependencies from standard Spark DataFrame operations. No special read APIs are needed.
现代SDP最佳实践:
  • 使用
    spark.read.table()
    进行批处理读取
  • 使用
    spark.readStream.table()
    进行流处理读取
  • 不要使用
    dp.read()
    dp.read_stream()
    (旧语法,已不再文档化)
  • 不要使用
    dlt.read()
    dlt.read_stream()
    (旧版DLT API)
**关键点:**SDP会自动跟踪标准Spark DataFrame操作的表依赖关系。无需使用特殊的读取API。

Three-Tier Identifier Resolution

三级标识符解析

SDP supports three levels of table name qualification:
LevelSyntaxWhen to Use
Unqualified
spark.read.table("my_table")
Reading tables within the same pipeline's target catalog/schema (recommended)
Partially-qualified
spark.read.table("other_schema.my_table")
Reading from different schema in same catalog
Fully-qualified
spark.read.table("other_catalog.other_schema.my_table")
Reading from external catalogs/schemas
SDP支持三级表名限定:
级别语法使用场景
非限定名称
spark.read.table("my_table")
读取同一管道目标catalog/schema内的表(推荐)
部分限定名称
spark.read.table("other_schema.my_table")
读取同一catalog下其他schema中的表
完全限定名称
spark.read.table("other_catalog.other_schema.my_table")
读取外部catalog/schema中的表

Option 1: Unqualified Names (Recommended for Pipeline Tables)

选项1:非限定名称(推荐用于管道内表)

Best practice for tables within the same pipeline. SDP resolves unqualified names to the pipeline's configured target catalog and schema. This makes code portable across environments (dev/prod).
python
@dp.table(name="silver_clean")
def silver_clean():
    # Reads from pipeline's target catalog/schema (e.g., dev_catalog.dev_schema.bronze_raw)
    return (
        spark.read.table("bronze_raw")
        .filter(F.col("valid") == True)
    )

@dp.table(name="silver_events")
def silver_events():
    # Streaming read from same pipeline's bronze_events table
    return (
        spark.readStream.table("bronze_events")
        .withColumn("processed_at", F.current_timestamp())
    )
同一管道内创建的表的最佳实践。SDP会将非限定名称解析为管道配置的目标catalog和schema。这使得代码在不同环境间可移植。
python
@dp.table(name="silver_clean")
def silver_clean():
    # 读取管道目标catalog/schema中的表(例如dev_catalog.dev_schema.bronze_raw)
    return (
        spark.read.table("bronze_raw")
        .filter(F.col("valid") == True)
    )

@dp.table(name="silver_events")
def silver_events():
    # 读取同一管道内的bronze_events表的流数据
    return (
        spark.readStream.table("bronze_events")
        .withColumn("processed_at", F.current_timestamp())
    )

Option 2: Pipeline Parameters (For External Sources)

选项2:管道参数(用于外部源)

Use
spark.conf.get()
to parameterize external catalog/schema references.
Define parameters in pipeline configuration, then reference them at the module level.
python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
使用
spark.conf.get()
参数化外部catalog/schema引用
。在管道配置中定义参数,然后在模块级别引用。
python
from pyspark import pipelines as dp
from pyspark.sql import functions as F

Get parameterized values at module level (evaluated once at pipeline start)

在模块级别获取参数值(管道启动时仅计算一次)

source_catalog = spark.conf.get("source_catalog") source_schema = spark.conf.get("source_schema", "sales") # with default
@dp.table(name="transaction_summary") def transaction_summary(): return ( spark.read.table(f"{source_catalog}.{source_schema}.transactions") .groupBy("account_id") .agg( F.count("txn_id").alias("txn_count"), F.sum("txn_amount").alias("account_revenue") ) )

**Configure parameters in pipeline settings:**
- **Asset Bundles**: Add to `pipeline.yml` under `configuration:`
- **Manual/MCP**: Pass via `extra_settings.configuration` dict

```yaml
source_catalog = spark.conf.get("source_catalog") source_schema = spark.conf.get("source_schema", "sales") # 带默认值
@dp.table(name="transaction_summary") def transaction_summary(): return ( spark.read.table(f"{source_catalog}.{source_schema}.transactions") .groupBy("account_id") .agg( F.count("txn_id").alias("txn_count"), F.sum("txn_amount").alias("account_revenue") ) )

**在管道设置中配置参数:**
- **Asset Bundles**:添加到`pipeline.yml`的`configuration:`下
- **手动/MCP**:通过`extra_settings.configuration`字典传递

```yaml

In resources/my_pipeline.pipeline.yml

在resources/my_pipeline.pipeline.yml中

configuration: source_catalog: "shared_catalog" source_schema: "sales"
undefined
configuration: source_catalog: "shared_catalog" source_schema: "sales"
undefined

Option 3: Fully-Qualified Names (For Fixed External References)

选项3:完全限定名称(用于固定外部引用)

Use when referencing specific external tables that don't change across environments:
python
@dp.table(name="enriched_orders")
def enriched_orders():
    # Pipeline-internal table (unqualified)
    orders = spark.read.table("bronze_orders")

    # External reference table (fully-qualified)
    products = spark.read.table("shared_catalog.reference.products")

    return orders.join(products, "product_id")
当引用跨环境位置固定的共享/参考表时使用:
python
@dp.table(name="enriched_orders")
def enriched_orders():
    # 管道内表(非限定名称)
    orders = spark.read.table("bronze_orders")

    # 外部参考表(完全限定名称)
    products = spark.read.table("shared_catalog.reference.products")

    return orders.join(products, "product_id")

Choosing the Right Approach

选择合适的方式

ScenarioRecommended Approach
Reading tables created in same pipelineUnqualified names - portable, uses target catalog/schema
Reading from external source that varies by environmentPipeline parameters - configurable per deployment
Reading from shared/reference tables with fixed locationFully-qualified names - explicit and clear
Mixed pipeline (some internal, some external)Combine approaches - unqualified for internal, parameters for external

场景推荐方式
读取同一管道内创建的表非限定名称 - 可移植,使用目标catalog/schema
读取跨环境变化的外部源管道参数 - 可按部署环境配置
读取位置固定的共享/参考表完全限定名称 - 明确清晰
混合管道(部分内部表,部分外部表)组合方式 - 内部表使用非限定名称,外部源使用参数

Common Issues

常见问题

IssueSolution
Empty output tablesUse
get_table_details
to verify, check upstream sources
Pipeline stuck INITIALIZINGNormal for serverless, wait a few minutes
"Column not found"Check
schemaHints
match actual data
Streaming reads failFor file ingestion in a streaming table, you must use the
STREAM
keyword with
read_files
:
FROM STREAM read_files(...)
. For table streams use
FROM stream(table)
. See read_files — Usage in streaming tables.
Timeout during runIncrease
timeout
, or use
wait_for_completion=False
and poll with
get_update
MV doesn't refreshEnable row tracking on source tables
SCD2: query column not foundLakeflow uses
__START_AT
and
__END_AT
(double underscore), not
START_AT
/
END_AT
. Use
WHERE __END_AT IS NULL
for current rows. See 3-scd-patterns.md.
AUTO CDC parse error at APPLY/SEQUENCEPut
APPLY AS DELETE WHEN
before
SEQUENCE BY
. Only list columns in
COLUMNS * EXCEPT (...)
that exist in the source (omit
_rescued_data
unless bronze uses rescue data). Omit
TRACK HISTORY ON *
if it causes "end of input" errors; default is equivalent. See 2-streaming-patterns.md.
"Cannot create streaming table from batch query"In a streaming table query, use
FROM STREAM read_files(...)
so
read_files
leverages Auto Loader;
FROM read_files(...)
alone is batch. See 1-ingestion-patterns.md and read_files — Usage in streaming tables.
For detailed errors, the
result["message"]
from
create_or_update_pipeline
includes suggested next steps. Use
get_pipeline_events(pipeline_id=...)
for full stack traces.

问题解决方案
输出表为空使用
get_table_details
验证,检查上游数据源
管道卡在INITIALIZING状态无服务器管道的正常现象,请等待几分钟
"列未找到"检查
schemaHints
与实际数据是否匹配
流读取失败对于流表中的文件摄入,必须在
read_files
中使用
STREAM
关键字:
FROM STREAM read_files(...)
。对于表流,使用
FROM stream(table)
。参考read_files — 流表中的用法
运行时超时增加
timeout
,或使用
wait_for_completion=False
并通过
get_update
轮询
物化视图不刷新启用源表的行跟踪功能
SCD2:查询列未找到Lakeflow使用
__START_AT
__END_AT
(双下划线),而非
START_AT
/
END_AT
。使用
WHERE __END_AT IS NULL
查询当前行。参考3-scd-patterns.md
AUTO CDC在APPLY/SEQUENCE处解析错误
APPLY AS DELETE WHEN
放在
SEQUENCE BY
之前。
COLUMNS * EXCEPT (...)
中仅列出源表中存在的列(除非青铜层使用了rescue data,否则省略
_rescued_data
)。如果
TRACK HISTORY ON *
导致"输入结束"错误,则省略该语句;默认配置与之等效。参考2-streaming-patterns.md
"无法从批处理查询创建流表"在流表查询中,使用
FROM STREAM read_files(...)
使
read_files
利用Auto Loader;仅
FROM read_files(...)
是批处理操作。参考1-ingestion-patterns.mdread_files — 流表中的用法
如需详细错误信息
create_or_update_pipeline
返回的
result["message"]
包含建议的下一步操作。使用
get_pipeline_events(pipeline_id=...)
获取完整堆栈跟踪。

Advanced Pipeline Configuration

高级管道配置

For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.

有关高级配置选项(开发模式、持续管道、自定义集群、通知、Python依赖等),请参考**7-advanced-configuration.md**。

Platform Constraints

平台限制

Serverless Pipeline Requirements (Default)

无服务器管道要求(默认)

RequirementDetails
Unity CatalogRequired - serverless pipelines always use UC
Workspace RegionMust be in serverless-enabled region
Serverless TermsMust accept serverless terms of use
CDC FeaturesRequires serverless (or Pro/Advanced with classic clusters)
要求详情
Unity Catalog必需 - 无服务器管道始终使用UC
工作区区域必须在支持无服务器的区域
无服务器条款必须接受无服务器服务条款
CDC功能需要无服务器(或使用经典集群的专业/高级版)

Serverless Limitations (When Classic Clusters Required)

无服务器限制(需要使用经典集群的场景)

LimitationWorkaround
R languageNot supported - use classic clusters if required
Spark RDD APIsNot supported - use classic clusters if required
JAR librariesNot supported - use classic clusters if required
Maven coordinatesNot supported - use classic clusters if required
DBFS root accessLimited - must use Unity Catalog external locations
Global temp viewsNot supported
限制替代方案
R语言不支持 - 若需要则使用经典集群
Spark RDD API不支持 - 若需要则使用经典集群
JAR库不支持 - 若需要则使用经典集群
Maven坐标不支持 - 若需要则使用经典集群
DBFS根目录访问受限 - 必须使用Unity Catalog外部位置
全局临时视图不支持

General Constraints

通用限制

ConstraintDetails
Schema EvolutionStreaming tables require full refresh for incompatible changes
SQL LimitationsPIVOT clause unsupported
SinksPython only, streaming only, append flows only
Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.
限制详情
Schema演化流表的不兼容变更需要全量刷新
SQL限制不支持PIVOT子句
输出端仅支持Python、仅支持流处理、仅支持追加流
默认使用无服务器管道,除非用户明确要求使用R语言、RDD API或JAR库。