databricks-python-sdk

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Databricks Development Guide

Databricks开发指南

This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API.

本指南提供关于Databricks SDK、Databricks Connect、CLI及REST API的使用指导。

Environment Setup

环境设置

  • Use existing virtual environment at
    .venv
    or use
    uv
    to create one
  • For Spark operations:
    uv pip install databricks-connect
  • For SDK operations:
    uv pip install databricks-sdk
  • Databricks CLI version should be 0.278.0 or higher
  • 使用已有的虚拟环境
    .venv
    ,或使用
    uv
    创建新环境
  • 如需执行Spark操作:
    uv pip install databricks-connect
  • 如需使用SDK操作:
    uv pip install databricks-sdk
  • Databricks CLI版本需为0.278.0或更高

Configuration

配置

  • Default profile name:
    DEFAULT
  • Config file:
    ~/.databrickscfg
  • Environment variables:
    DATABRICKS_HOST
    ,
    DATABRICKS_TOKEN

  • 默认配置文件名称:
    DEFAULT
  • 配置文件路径:
    ~/.databrickscfg
  • 环境变量:
    DATABRICKS_HOST
    DATABRICKS_TOKEN

Databricks Connect (Spark Operations)

Databricks Connect(Spark操作)

Use
databricks-connect
for running Spark code locally against a Databricks cluster.
python
from databricks.connect import DatabricksSession
使用
databricks-connect
在本地运行Spark代码,对接Databricks集群。
python
from databricks.connect import DatabricksSession

Auto-detects 'DEFAULT' profile from ~/.databrickscfg

自动从~/.databrickscfg中检测'DEFAULT'配置文件

spark = DatabricksSession.builder.getOrCreate()
spark = DatabricksSession.builder.getOrCreate()

With explicit profile

使用指定配置文件

spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()

Use spark as normal

像常规Spark一样使用

df = spark.sql("SELECT * FROM catalog.schema.table") df.show()

**IMPORTANT:** Do NOT set `.master("local[*]")` - this will cause issues with Databricks Connect.

---
df = spark.sql("SELECT * FROM catalog.schema.table") df.show()

**重要提示:** 请勿设置`.master("local[*]")`——这会导致Databricks Connect出现问题。

---

Direct REST API Access

直接调用REST API

For operations not yet in SDK or overly complex via SDK, use direct REST API:
python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
对于SDK尚未支持的操作,或使用SDK实现过于复杂的场景,可直接调用REST API:
python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

Direct API call using authenticated client

使用已认证的客户端直接调用API

response = w.api_client.do( method="GET", path="/api/2.0/clusters/list" )
response = w.api_client.do( method="GET", path="/api/2.0/clusters/list" )

POST with body

带请求体的POST调用

response = w.api_client.do( method="POST", path="/api/2.0/jobs/run-now", body={"job_id": 123} )

**When to use:** Prefer SDK methods when available. Use `api_client.do` for:
- New API endpoints not yet in SDK
- Complex operations where SDK abstraction is problematic
- Debugging/testing raw API responses

---
response = w.api_client.do( method="POST", path="/api/2.0/jobs/run-now", body={"job_id": 123} )

**使用场景:** 优先使用SDK方法。在以下场景使用`api_client.do`:
- SDK尚未支持的新API端点
- SDK抽象层存在问题的复杂操作
- 调试或测试原始API响应

---

Databricks CLI

Databricks CLI

bash
undefined
bash
undefined

Check version (should be >= 0.278.0)

检查版本(需≥0.278.0)

databricks --version
databricks --version

Use specific profile

使用指定配置文件

databricks --profile MY_PROFILE clusters list
databricks --profile MY_PROFILE clusters list

Common commands

常用命令

databricks clusters list databricks jobs list databricks workspace ls /Users/me

---
databricks clusters list databricks jobs list databricks workspace ls /Users/me

---

SDK Documentation Architecture

SDK文档架构

The SDK documentation follows a predictable URL pattern:
Base: https://databricks-sdk-py.readthedocs.io/en/latest/

Workspace APIs:  /workspace/{category}/{service}.html
Account APIs:    /account/{category}/{service}.html
Authentication:  /authentication.html
DBUtils:         /dbutils.html
SDK文档遵循可预测的URL模式:
基础地址:https://databricks-sdk-py.readthedocs.io/en/latest/

工作区API:  /workspace/{category}/{service}.html
账户API:    /account/{category}/{service}.html
认证:  /authentication.html
DBUtils:         /dbutils.html

Workspace API Categories

工作区API分类

CategoryServices
compute
clusters, cluster_policies, command_execution, instance_pools, libraries
catalog
catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations
jobs
jobs
sql
warehouses, statement_execution, queries, alerts, dashboards
serving
serving_endpoints
vectorsearch
vector_search_indexes, vector_search_endpoints
pipelines
pipelines
workspace
repos, secrets, workspace, git_credentials
files
files, dbfs
ml
experiments, model_registry

分类服务
compute
clusters, cluster_policies, command_execution, instance_pools, libraries
catalog
catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations
jobs
jobs
sql
warehouses, statement_execution, queries, alerts, dashboards
serving
serving_endpoints
vectorsearch
vector_search_indexes, vector_search_endpoints
pipelines
pipelines
workspace
repos, secrets, workspace, git_credentials
files
files, dbfs
ml
experiments, model_registry

Authentication

认证

Environment Variables

环境变量

bash
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi...  # Personal Access Token
bash
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi...  # 个人访问令牌

Code Patterns

代码示例

python
undefined
python
undefined

Auto-detect credentials from environment

从环境变量自动检测凭证

from databricks.sdk import WorkspaceClient w = WorkspaceClient()
from databricks.sdk import WorkspaceClient w = WorkspaceClient()

Explicit token auth

显式使用令牌认证

w = WorkspaceClient( host="https://your-workspace.cloud.databricks.com", token="dapi..." )
w = WorkspaceClient( host="https://your-workspace.cloud.databricks.com", token="dapi..." )

Azure Service Principal

Azure服务主体认证

w = WorkspaceClient( host="https://adb-xxx.azuredatabricks.net", azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...", azure_tenant_id="tenant-id", azure_client_id="client-id", azure_client_secret="secret" )
w = WorkspaceClient( host="https://adb-xxx.azuredatabricks.net", azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...", azure_tenant_id="tenant-id", azure_client_id="client-id", azure_client_secret="secret" )

Use a named profile from ~/.databrickscfg

使用~/.databrickscfg中的指定配置文件

w = WorkspaceClient(profile="MY_PROFILE")

---
w = WorkspaceClient(profile="MY_PROFILE")

---

Core API Reference

核心API参考

Clusters API

集群API

List all clusters

列出所有集群

for cluster in w.clusters.list(): print(f"{cluster.cluster_name}: {cluster.state}")
for cluster in w.clusters.list(): print(f"{cluster.cluster_name}: {cluster.state}")

Get cluster details

获取集群详情

cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")

Create a cluster (returns Wait object)

创建集群(返回Wait对象)

wait = w.clusters.create( cluster_name="my-cluster", spark_version=w.clusters.select_spark_version(latest=True), node_type_id=w.clusters.select_node_type(local_disk=True), num_workers=2 ) cluster = wait.result() # Wait for cluster to be running
wait = w.clusters.create( cluster_name="my-cluster", spark_version=w.clusters.select_spark_version(latest=True), node_type_id=w.clusters.select_node_type(local_disk=True), num_workers=2 ) cluster = wait.result() # 等待集群启动完成

Or use create_and_wait for blocking call

或使用create_and_wait进行阻塞调用

cluster = w.clusters.create_and_wait( cluster_name="my-cluster", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) )
cluster = w.clusters.create_and_wait( cluster_name="my-cluster", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) )

Start/stop/delete

启动/停止/删除集群

w.clusters.start(cluster_id="...").result() w.clusters.stop(cluster_id="...") w.clusters.delete(cluster_id="...")
undefined
w.clusters.start(cluster_id="...").result() w.clusters.stop(cluster_id="...") w.clusters.delete(cluster_id="...")
undefined

Jobs API

任务API

python
from databricks.sdk.service.jobs import Task, NotebookTask
python
from databricks.sdk.service.jobs import Task, NotebookTask

List jobs

列出任务

for job in w.jobs.list(): print(f"{job.job_id}: {job.settings.name}")
for job in w.jobs.list(): print(f"{job.job_id}: {job.settings.name}")

Create a job

创建任务

created = w.jobs.create( name="my-job", tasks=[ Task( task_key="main", notebook_task=NotebookTask(notebook_path="/Users/me/notebook"), existing_cluster_id="0123-456789-abcdef" ) ] )
created = w.jobs.create( name="my-job", tasks=[ Task( task_key="main", notebook_task=NotebookTask(notebook_path="/Users/me/notebook"), existing_cluster_id="0123-456789-abcdef" ) ] )

Run a job now

立即运行任务

run = w.jobs.run_now_and_wait(job_id=created.job_id) print(f"Run completed: {run.state.result_state}")
run = w.jobs.run_now_and_wait(job_id=created.job_id) print(f"任务运行完成: {run.state.result_state}")

Get run output

获取任务运行输出

output = w.jobs.get_run_output(run_id=run.run_id)
undefined
output = w.jobs.get_run_output(run_id=run.run_id)
undefined

SQL Statement Execution

SQL语句执行

Execute SQL query

执行SQL查询

response = w.statement_execution.execute_statement( warehouse_id="abc123", statement="SELECT * FROM catalog.schema.table LIMIT 10", wait_timeout="30s" )
response = w.statement_execution.execute_statement( warehouse_id="abc123", statement="SELECT * FROM catalog.schema.table LIMIT 10", wait_timeout="30s" )

Check status and get results

检查状态并获取结果

if response.status.state == StatementState.SUCCEEDED: for row in response.result.data_array: print(row)
if response.status.state == StatementState.SUCCEEDED: for row in response.result.data_array: print(row)

For large results, fetch chunks

处理大结果集,分块获取

chunk = w.statement_execution.get_statement_result_chunk_n( statement_id=response.statement_id, chunk_index=0 )
undefined
chunk = w.statement_execution.get_statement_result_chunk_n( statement_id=response.statement_id, chunk_index=0 )
undefined

SQL Warehouses

SQL仓库

List warehouses

列出仓库

for wh in w.warehouses.list(): print(f"{wh.name}: {wh.state}")
for wh in w.warehouses.list(): print(f"{wh.name}: {wh.state}")

Get warehouse

获取仓库详情

warehouse = w.warehouses.get(id="abc123")
warehouse = w.warehouses.get(id="abc123")

Create warehouse

创建仓库

created = w.warehouses.create_and_wait( name="my-warehouse", cluster_size="Small", max_num_clusters=1, auto_stop_mins=15 )
created = w.warehouses.create_and_wait( name="my-warehouse", cluster_size="Small", max_num_clusters=1, auto_stop_mins=15 )

Start/stop

启动/停止仓库

w.warehouses.start(id="abc123").result() w.warehouses.stop(id="abc123").result()
undefined
w.warehouses.start(id="abc123").result() w.warehouses.stop(id="abc123").result()
undefined

Unity Catalog - Tables

Unity Catalog - 表

List tables in a schema

列出指定Schema中的表

for table in w.tables.list(catalog_name="main", schema_name="default"): print(f"{table.full_name}: {table.table_type}")
for table in w.tables.list(catalog_name="main", schema_name="default"): print(f"{table.full_name}: {table.table_type}")

Get table info

获取表信息

table = w.tables.get(full_name="main.default.my_table") print(f"Columns: {[c.name for c in table.columns]}")
table = w.tables.get(full_name="main.default.my_table") print(f"列信息: {[c.name for c in table.columns]}")

Check if table exists

检查表是否存在

exists = w.tables.exists(full_name="main.default.my_table")
undefined
exists = w.tables.exists(full_name="main.default.my_table")
undefined

Unity Catalog - Catalogs & Schemas

Unity Catalog - 目录与Schema

List catalogs

列出目录

for catalog in w.catalogs.list(): print(catalog.name)
for catalog in w.catalogs.list(): print(catalog.name)

Create catalog

创建目录

w.catalogs.create(name="my_catalog", comment="Description")
w.catalogs.create(name="my_catalog", comment="描述信息")

List schemas

列出Schema

for schema in w.schemas.list(catalog_name="main"): print(schema.name)
for schema in w.schemas.list(catalog_name="main"): print(schema.name)

Create schema

创建Schema

w.schemas.create(name="my_schema", catalog_name="main")
undefined
w.schemas.create(name="my_schema", catalog_name="main")
undefined

Volumes

python
from databricks.sdk.service.catalog import VolumeType
python
from databricks.sdk.service.catalog import VolumeType

List volumes

列出卷

for vol in w.volumes.list(catalog_name="main", schema_name="default"): print(f"{vol.full_name}: {vol.volume_type}")
for vol in w.volumes.list(catalog_name="main", schema_name="default"): print(f"{vol.full_name}: {vol.volume_type}")

Create managed volume

创建托管卷

w.volumes.create( catalog_name="main", schema_name="default", name="my_volume", volume_type=VolumeType.MANAGED )
w.volumes.create( catalog_name="main", schema_name="default", name="my_volume", volume_type=VolumeType.MANAGED )

Read volume info

读取卷信息

vol = w.volumes.read(name="main.default.my_volume")
undefined
vol = w.volumes.read(name="main.default.my_volume")
undefined

Files API

文件API

Upload file to volume

上传文件到卷

w.files.upload( file_path="/Volumes/main/default/my_volume/data.csv", contents=open("local_file.csv", "rb") )
w.files.upload( file_path="/Volumes/main/default/my_volume/data.csv", contents=open("local_file.csv", "rb") )

Download file

下载文件

with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f: content = f.read()
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f: content = f.read()

List directory contents

列出目录内容

for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"): print(f"{entry.name}: {entry.is_directory}")
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"): print(f"{entry.name}: {entry.is_directory}")

Upload/download with progress (parallel)

带进度条的并行上传/下载

w.files.upload_from( file_path="/Volumes/main/default/my_volume/large.parquet", source_path="/local/path/large.parquet", use_parallel=True )
w.files.download_to( file_path="/Volumes/main/default/my_volume/large.parquet", destination="/local/output/", use_parallel=True )
undefined
w.files.upload_from( file_path="/Volumes/main/default/my_volume/large.parquet", source_path="/local/path/large.parquet", use_parallel=True )
w.files.download_to( file_path="/Volumes/main/default/my_volume/large.parquet", destination="/local/output/", use_parallel=True )
undefined

Serving Endpoints (Model Serving)

服务端点(模型服务)

List endpoints

列出端点

for ep in w.serving_endpoints.list(): print(f"{ep.name}: {ep.state}")
for ep in w.serving_endpoints.list(): print(f"{ep.name}: {ep.state}")

Get endpoint

获取端点详情

endpoint = w.serving_endpoints.get(name="my-endpoint")
endpoint = w.serving_endpoints.get(name="my-endpoint")

Query endpoint

查询端点

response = w.serving_endpoints.query( name="my-endpoint", inputs={"prompt": "Hello, world!"} )
response = w.serving_endpoints.query( name="my-endpoint", inputs={"prompt": "Hello, world!"} )

For chat/completions endpoints

对于聊天/补全类端点

response = w.serving_endpoints.query( name="my-chat-endpoint", messages=[{"role": "user", "content": "Hello!"}] )
response = w.serving_endpoints.query( name="my-chat-endpoint", messages=[{"role": "user", "content": "Hello!"}] )

Get OpenAI-compatible client

获取兼容OpenAI的客户端

openai_client = w.serving_endpoints.get_open_ai_client()
undefined
openai_client = w.serving_endpoints.get_open_ai_client()
undefined

Vector Search

向量搜索

List vector search indexes

列出向量搜索索引

for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"): print(idx.name)
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"): print(idx.name)

Query index

查询索引

results = w.vector_search_indexes.query_index( index_name="main.default.my_index", columns=["id", "text", "embedding"], query_text="search query", num_results=10 ) for doc in results.result.data_array: print(doc)
undefined
results = w.vector_search_indexes.query_index( index_name="main.default.my_index", columns=["id", "text", "embedding"], query_text="search query", num_results=10 ) for doc in results.result.data_array: print(doc)
undefined

Pipelines (Delta Live Tables)

流水线(Delta Live Tables)

List pipelines

列出流水线

for pipeline in w.pipelines.list_pipelines(): print(f"{pipeline.name}: {pipeline.state}")
for pipeline in w.pipelines.list_pipelines(): print(f"{pipeline.name}: {pipeline.state}")

Get pipeline

获取流水线详情

pipeline = w.pipelines.get(pipeline_id="abc123")
pipeline = w.pipelines.get(pipeline_id="abc123")

Start pipeline update

启动流水线更新

w.pipelines.start_update(pipeline_id="abc123")
w.pipelines.start_update(pipeline_id="abc123")

Stop pipeline

停止流水线

w.pipelines.stop_and_wait(pipeline_id="abc123")
undefined
w.pipelines.stop_and_wait(pipeline_id="abc123")
undefined

Secrets

密钥

List secret scopes

列出密钥作用域

for scope in w.secrets.list_scopes(): print(scope.name)
for scope in w.secrets.list_scopes(): print(scope.name)

Create scope

创建密钥作用域

w.secrets.create_scope(scope="my-scope")
w.secrets.create_scope(scope="my-scope")

Put secret

添加密钥

w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")

Get secret (returns GetSecretResponse with value)

获取密钥(返回包含值的GetSecretResponse对象)

secret = w.secrets.get_secret(scope="my-scope", key="api-key")
secret = w.secrets.get_secret(scope="my-scope", key="api-key")

List secrets in scope (metadata only, not values)

列出作用域中的密钥(仅元数据,不包含值)

for s in w.secrets.list_secrets(scope="my-scope"): print(s.key)
undefined
for s in w.secrets.list_secrets(scope="my-scope"): print(s.key)
undefined

DBUtils

DBUtils

Access dbutils through WorkspaceClient

通过WorkspaceClient访问dbutils

dbutils = w.dbutils
dbutils = w.dbutils

File system operations

文件系统操作

files = dbutils.fs.ls("/") dbutils.fs.cp("dbfs:/source", "dbfs:/dest") dbutils.fs.rm("dbfs:/path", recurse=True)
files = dbutils.fs.ls("/") dbutils.fs.cp("dbfs:/source", "dbfs:/dest") dbutils.fs.rm("dbfs:/path", recurse=True)

Secrets (same as w.secrets but dbutils interface)

密钥操作(与w.secrets功能相同,但使用dbutils接口)

value = dbutils.secrets.get(scope="my-scope", key="my-key")

---
value = dbutils.secrets.get(scope="my-scope", key="my-key")

---

Common Patterns

常见模式

CRITICAL: Async Applications (FastAPI, etc.)

关键注意事项:异步应用(FastAPI等)

The Databricks SDK is fully synchronous. All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with
asyncio.to_thread()
to avoid blocking the event loop.
python
import asyncio
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
Databricks SDK是完全同步的。所有调用都会阻塞线程。在异步应用(FastAPI、asyncio)中,必须使用
asyncio.to_thread()
包装SDK调用,避免阻塞事件循环。
python
import asyncio
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

WRONG - blocks the event loop

错误示例 - 阻塞事件循环

async def get_clusters_bad(): return list(w.clusters.list()) # BLOCKS!
async def get_clusters_bad(): return list(w.clusters.list()) # 阻塞线程!

CORRECT - runs in thread pool

正确示例 - 在线程池中运行

async def get_clusters_good(): return await asyncio.to_thread(lambda: list(w.clusters.list()))
async def get_clusters_good(): return await asyncio.to_thread(lambda: list(w.clusters.list()))

CORRECT - for simple calls

正确示例 - 简单调用

async def get_cluster(cluster_id: str): return await asyncio.to_thread(w.clusters.get, cluster_id)
async def get_cluster(cluster_id: str): return await asyncio.to_thread(w.clusters.get, cluster_id)

CORRECT - FastAPI endpoint

正确示例 - FastAPI端点

from fastapi import FastAPI app = FastAPI()
@app.get("/clusters") async def list_clusters(): clusters = await asyncio.to_thread(lambda: list(w.clusters.list())) return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query") async def run_query(sql: str, warehouse_id: str): # Wrap the blocking SDK call response = await asyncio.to_thread( w.statement_execution.execute_statement, statement=sql, warehouse_id=warehouse_id, wait_timeout="30s" ) return response.result.data_array

**Note:** `WorkspaceClient().config.host` is NOT a network call - it just reads config. No need to wrap property access.

---
from fastapi import FastAPI app = FastAPI()
@app.get("/clusters") async def list_clusters(): clusters = await asyncio.to_thread(lambda: list(w.clusters.list())) return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query") async def run_query(sql: str, warehouse_id: str): # 包装阻塞的SDK调用 response = await asyncio.to_thread( w.statement_execution.execute_statement, statement=sql, warehouse_id=warehouse_id, wait_timeout="30s" ) return response.result.data_array

**注意:** `WorkspaceClient().config.host` 不会发起网络请求——它仅读取配置。无需包装属性访问操作。

---

Wait for Long-Running Operations

等待长时间运行的操作

python
from datetime import timedelta
python
from datetime import timedelta

Pattern 1: Use *_and_wait methods

模式1:使用*_and_wait方法

cluster = w.clusters.create_and_wait( cluster_name="test", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) )
cluster = w.clusters.create_and_wait( cluster_name="test", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) )

Pattern 2: Use Wait object

模式2:使用Wait对象

wait = w.clusters.create(...) cluster = wait.result() # Blocks until ready
wait = w.clusters.create(...) cluster = wait.result() # 阻塞直到操作完成

Pattern 3: Manual polling with callback

模式3:带回调的手动轮询

def progress(cluster): print(f"State: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running( cluster_id="...", timeout=timedelta(minutes=30), callback=progress )
undefined
def progress(cluster): print(f"状态: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running( cluster_id="...", timeout=timedelta(minutes=30), callback=progress )
undefined

Pagination

分页处理

python
undefined
python
undefined

All list methods return iterators that handle pagination automatically

所有list方法返回自动处理分页的迭代器

for job in w.jobs.list(): # Fetches all pages print(job.settings.name)
for job in w.jobs.list(): # 自动获取所有分页数据 print(job.settings.name)

For manual control

手动控制分页

from databricks.sdk.service.jobs import ListJobsRequest response = w.jobs.list(limit=10) for job in response: print(job)
undefined
from databricks.sdk.service.jobs import ListJobsRequest response = w.jobs.list(limit=10) for job in response: print(job)
undefined

Error Handling

错误处理

python
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists

try:
    cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
    print("Cluster not found")
except PermissionDenied:
    print("Access denied")

python
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists

try:
    cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
    print("集群不存在")
except PermissionDenied:
    print("访问被拒绝")

When Uncertain

不确定时的处理方式

If I'm unsure about a method, I should:
  1. Check the documentation URL pattern:
    • https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html
  2. Common categories:
    • Clusters:
      /workspace/compute/clusters.html
    • Jobs:
      /workspace/jobs/jobs.html
    • Tables:
      /workspace/catalog/tables.html
    • Warehouses:
      /workspace/sql/warehouses.html
    • Serving:
      /workspace/serving/serving_endpoints.html
  3. Fetch and verify before providing guidance on parameters or return types.

如果对某个方法不确定,应:
  1. 检查文档URL模式:
    • https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html
  2. 常见分类:
    • 集群:
      /workspace/compute/clusters.html
    • 任务:
      /workspace/jobs/jobs.html
    • 表:
      /workspace/catalog/tables.html
    • 仓库:
      /workspace/sql/warehouses.html
    • 服务端点:
      /workspace/serving/serving_endpoints.html
  3. 在提供参数或返回类型的指导前,先获取并验证相关信息。

Quick Reference Links

快速参考链接