databricks-python-sdk
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDatabricks Development Guide
Databricks开发指南
This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API.
SDK Documentation: https://databricks-sdk-py.readthedocs.io/en/latest/
GitHub Repository: https://github.com/databricks/databricks-sdk-py
本指南提供关于Databricks SDK、Databricks Connect、CLI及REST API的使用指导。
SDK文档: https://databricks-sdk-py.readthedocs.io/en/latest/
GitHub仓库: https://github.com/databricks/databricks-sdk-py
Environment Setup
环境设置
- Use existing virtual environment at or use
.venvto create oneuv - For Spark operations:
uv pip install databricks-connect - For SDK operations:
uv pip install databricks-sdk - Databricks CLI version should be 0.278.0 or higher
- 使用已有的虚拟环境,或使用
.venv创建新环境uv - 如需执行Spark操作:
uv pip install databricks-connect - 如需使用SDK操作:
uv pip install databricks-sdk - Databricks CLI版本需为0.278.0或更高
Configuration
配置
- Default profile name:
DEFAULT - Config file:
~/.databrickscfg - Environment variables: ,
DATABRICKS_HOSTDATABRICKS_TOKEN
- 默认配置文件名称:
DEFAULT - 配置文件路径:
~/.databrickscfg - 环境变量:、
DATABRICKS_HOSTDATABRICKS_TOKEN
Databricks Connect (Spark Operations)
Databricks Connect(Spark操作)
Use for running Spark code locally against a Databricks cluster.
databricks-connectpython
from databricks.connect import DatabricksSession使用在本地运行Spark代码,对接Databricks集群。
databricks-connectpython
from databricks.connect import DatabricksSessionAuto-detects 'DEFAULT' profile from ~/.databrickscfg
自动从~/.databrickscfg中检测'DEFAULT'配置文件
spark = DatabricksSession.builder.getOrCreate()
spark = DatabricksSession.builder.getOrCreate()
With explicit profile
使用指定配置文件
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
Use spark as normal
像常规Spark一样使用
df = spark.sql("SELECT * FROM catalog.schema.table")
df.show()
**IMPORTANT:** Do NOT set `.master("local[*]")` - this will cause issues with Databricks Connect.
---df = spark.sql("SELECT * FROM catalog.schema.table")
df.show()
**重要提示:** 请勿设置`.master("local[*]")`——这会导致Databricks Connect出现问题。
---Direct REST API Access
直接调用REST API
For operations not yet in SDK or overly complex via SDK, use direct REST API:
python
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()对于SDK尚未支持的操作,或使用SDK实现过于复杂的场景,可直接调用REST API:
python
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()Direct API call using authenticated client
使用已认证的客户端直接调用API
response = w.api_client.do(
method="GET",
path="/api/2.0/clusters/list"
)
response = w.api_client.do(
method="GET",
path="/api/2.0/clusters/list"
)
POST with body
带请求体的POST调用
response = w.api_client.do(
method="POST",
path="/api/2.0/jobs/run-now",
body={"job_id": 123}
)
**When to use:** Prefer SDK methods when available. Use `api_client.do` for:
- New API endpoints not yet in SDK
- Complex operations where SDK abstraction is problematic
- Debugging/testing raw API responses
---response = w.api_client.do(
method="POST",
path="/api/2.0/jobs/run-now",
body={"job_id": 123}
)
**使用场景:** 优先使用SDK方法。在以下场景使用`api_client.do`:
- SDK尚未支持的新API端点
- SDK抽象层存在问题的复杂操作
- 调试或测试原始API响应
---Databricks CLI
Databricks CLI
bash
undefinedbash
undefinedCheck version (should be >= 0.278.0)
检查版本(需≥0.278.0)
databricks --version
databricks --version
Use specific profile
使用指定配置文件
databricks --profile MY_PROFILE clusters list
databricks --profile MY_PROFILE clusters list
Common commands
常用命令
databricks clusters list
databricks jobs list
databricks workspace ls /Users/me
---databricks clusters list
databricks jobs list
databricks workspace ls /Users/me
---SDK Documentation Architecture
SDK文档架构
The SDK documentation follows a predictable URL pattern:
Base: https://databricks-sdk-py.readthedocs.io/en/latest/
Workspace APIs: /workspace/{category}/{service}.html
Account APIs: /account/{category}/{service}.html
Authentication: /authentication.html
DBUtils: /dbutils.htmlSDK文档遵循可预测的URL模式:
基础地址:https://databricks-sdk-py.readthedocs.io/en/latest/
工作区API: /workspace/{category}/{service}.html
账户API: /account/{category}/{service}.html
认证: /authentication.html
DBUtils: /dbutils.htmlWorkspace API Categories
工作区API分类
| Category | Services |
|---|---|
| clusters, cluster_policies, command_execution, instance_pools, libraries |
| catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations |
| jobs |
| warehouses, statement_execution, queries, alerts, dashboards |
| serving_endpoints |
| vector_search_indexes, vector_search_endpoints |
| pipelines |
| repos, secrets, workspace, git_credentials |
| files, dbfs |
| experiments, model_registry |
| 分类 | 服务 |
|---|---|
| clusters, cluster_policies, command_execution, instance_pools, libraries |
| catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations |
| jobs |
| warehouses, statement_execution, queries, alerts, dashboards |
| serving_endpoints |
| vector_search_indexes, vector_search_endpoints |
| pipelines |
| repos, secrets, workspace, git_credentials |
| files, dbfs |
| experiments, model_registry |
Authentication
认证
Environment Variables
环境变量
bash
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi... # Personal Access Tokenbash
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi... # 个人访问令牌Code Patterns
代码示例
python
undefinedpython
undefinedAuto-detect credentials from environment
从环境变量自动检测凭证
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
Explicit token auth
显式使用令牌认证
w = WorkspaceClient(
host="https://your-workspace.cloud.databricks.com",
token="dapi..."
)
w = WorkspaceClient(
host="https://your-workspace.cloud.databricks.com",
token="dapi..."
)
Azure Service Principal
Azure服务主体认证
w = WorkspaceClient(
host="https://adb-xxx.azuredatabricks.net",
azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...",
azure_tenant_id="tenant-id",
azure_client_id="client-id",
azure_client_secret="secret"
)
w = WorkspaceClient(
host="https://adb-xxx.azuredatabricks.net",
azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...",
azure_tenant_id="tenant-id",
azure_client_id="client-id",
azure_client_secret="secret"
)
Use a named profile from ~/.databrickscfg
使用~/.databrickscfg中的指定配置文件
w = WorkspaceClient(profile="MY_PROFILE")
---w = WorkspaceClient(profile="MY_PROFILE")
---Core API Reference
核心API参考
Clusters API
集群API
python
undefinedList all clusters
列出所有集群
for cluster in w.clusters.list():
print(f"{cluster.cluster_name}: {cluster.state}")
for cluster in w.clusters.list():
print(f"{cluster.cluster_name}: {cluster.state}")
Get cluster details
获取集群详情
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
Create a cluster (returns Wait object)
创建集群(返回Wait对象)
wait = w.clusters.create(
cluster_name="my-cluster",
spark_version=w.clusters.select_spark_version(latest=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=2
)
cluster = wait.result() # Wait for cluster to be running
wait = w.clusters.create(
cluster_name="my-cluster",
spark_version=w.clusters.select_spark_version(latest=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=2
)
cluster = wait.result() # 等待集群启动完成
Or use create_and_wait for blocking call
或使用create_and_wait进行阻塞调用
cluster = w.clusters.create_and_wait(
cluster_name="my-cluster",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
cluster = w.clusters.create_and_wait(
cluster_name="my-cluster",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
Start/stop/delete
启动/停止/删除集群
w.clusters.start(cluster_id="...").result()
w.clusters.stop(cluster_id="...")
w.clusters.delete(cluster_id="...")
undefinedw.clusters.start(cluster_id="...").result()
w.clusters.stop(cluster_id="...")
w.clusters.delete(cluster_id="...")
undefinedJobs API
任务API
python
from databricks.sdk.service.jobs import Task, NotebookTaskpython
from databricks.sdk.service.jobs import Task, NotebookTaskList jobs
列出任务
for job in w.jobs.list():
print(f"{job.job_id}: {job.settings.name}")
for job in w.jobs.list():
print(f"{job.job_id}: {job.settings.name}")
Create a job
创建任务
created = w.jobs.create(
name="my-job",
tasks=[
Task(
task_key="main",
notebook_task=NotebookTask(notebook_path="/Users/me/notebook"),
existing_cluster_id="0123-456789-abcdef"
)
]
)
created = w.jobs.create(
name="my-job",
tasks=[
Task(
task_key="main",
notebook_task=NotebookTask(notebook_path="/Users/me/notebook"),
existing_cluster_id="0123-456789-abcdef"
)
]
)
Run a job now
立即运行任务
run = w.jobs.run_now_and_wait(job_id=created.job_id)
print(f"Run completed: {run.state.result_state}")
run = w.jobs.run_now_and_wait(job_id=created.job_id)
print(f"任务运行完成: {run.state.result_state}")
Get run output
获取任务运行输出
output = w.jobs.get_run_output(run_id=run.run_id)
undefinedoutput = w.jobs.get_run_output(run_id=run.run_id)
undefinedSQL Statement Execution
SQL语句执行
python
undefinedExecute SQL query
执行SQL查询
response = w.statement_execution.execute_statement(
warehouse_id="abc123",
statement="SELECT * FROM catalog.schema.table LIMIT 10",
wait_timeout="30s"
)
response = w.statement_execution.execute_statement(
warehouse_id="abc123",
statement="SELECT * FROM catalog.schema.table LIMIT 10",
wait_timeout="30s"
)
Check status and get results
检查状态并获取结果
if response.status.state == StatementState.SUCCEEDED:
for row in response.result.data_array:
print(row)
if response.status.state == StatementState.SUCCEEDED:
for row in response.result.data_array:
print(row)
For large results, fetch chunks
处理大结果集,分块获取
chunk = w.statement_execution.get_statement_result_chunk_n(
statement_id=response.statement_id,
chunk_index=0
)
undefinedchunk = w.statement_execution.get_statement_result_chunk_n(
statement_id=response.statement_id,
chunk_index=0
)
undefinedSQL Warehouses
SQL仓库
python
undefinedpython
undefinedList warehouses
列出仓库
for wh in w.warehouses.list():
print(f"{wh.name}: {wh.state}")
for wh in w.warehouses.list():
print(f"{wh.name}: {wh.state}")
Get warehouse
获取仓库详情
warehouse = w.warehouses.get(id="abc123")
warehouse = w.warehouses.get(id="abc123")
Create warehouse
创建仓库
created = w.warehouses.create_and_wait(
name="my-warehouse",
cluster_size="Small",
max_num_clusters=1,
auto_stop_mins=15
)
created = w.warehouses.create_and_wait(
name="my-warehouse",
cluster_size="Small",
max_num_clusters=1,
auto_stop_mins=15
)
Start/stop
启动/停止仓库
w.warehouses.start(id="abc123").result()
w.warehouses.stop(id="abc123").result()
undefinedw.warehouses.start(id="abc123").result()
w.warehouses.stop(id="abc123").result()
undefinedUnity Catalog - Tables
Unity Catalog - 表
python
undefinedpython
undefinedList tables in a schema
列出指定Schema中的表
for table in w.tables.list(catalog_name="main", schema_name="default"):
print(f"{table.full_name}: {table.table_type}")
for table in w.tables.list(catalog_name="main", schema_name="default"):
print(f"{table.full_name}: {table.table_type}")
Get table info
获取表信息
table = w.tables.get(full_name="main.default.my_table")
print(f"Columns: {[c.name for c in table.columns]}")
table = w.tables.get(full_name="main.default.my_table")
print(f"列信息: {[c.name for c in table.columns]}")
Check if table exists
检查表是否存在
exists = w.tables.exists(full_name="main.default.my_table")
undefinedexists = w.tables.exists(full_name="main.default.my_table")
undefinedUnity Catalog - Catalogs & Schemas
Unity Catalog - 目录与Schema
Doc (Catalogs): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html
Doc (Schemas): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
python
undefinedList catalogs
列出目录
for catalog in w.catalogs.list():
print(catalog.name)
for catalog in w.catalogs.list():
print(catalog.name)
Create catalog
创建目录
w.catalogs.create(name="my_catalog", comment="Description")
w.catalogs.create(name="my_catalog", comment="描述信息")
List schemas
列出Schema
for schema in w.schemas.list(catalog_name="main"):
print(schema.name)
for schema in w.schemas.list(catalog_name="main"):
print(schema.name)
Create schema
创建Schema
w.schemas.create(name="my_schema", catalog_name="main")
undefinedw.schemas.create(name="my_schema", catalog_name="main")
undefinedVolumes
卷
python
from databricks.sdk.service.catalog import VolumeTypepython
from databricks.sdk.service.catalog import VolumeTypeList volumes
列出卷
for vol in w.volumes.list(catalog_name="main", schema_name="default"):
print(f"{vol.full_name}: {vol.volume_type}")
for vol in w.volumes.list(catalog_name="main", schema_name="default"):
print(f"{vol.full_name}: {vol.volume_type}")
Create managed volume
创建托管卷
w.volumes.create(
catalog_name="main",
schema_name="default",
name="my_volume",
volume_type=VolumeType.MANAGED
)
w.volumes.create(
catalog_name="main",
schema_name="default",
name="my_volume",
volume_type=VolumeType.MANAGED
)
Read volume info
读取卷信息
vol = w.volumes.read(name="main.default.my_volume")
undefinedvol = w.volumes.read(name="main.default.my_volume")
undefinedFiles API
文件API
python
undefinedpython
undefinedUpload file to volume
上传文件到卷
w.files.upload(
file_path="/Volumes/main/default/my_volume/data.csv",
contents=open("local_file.csv", "rb")
)
w.files.upload(
file_path="/Volumes/main/default/my_volume/data.csv",
contents=open("local_file.csv", "rb")
)
Download file
下载文件
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f:
content = f.read()
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f:
content = f.read()
List directory contents
列出目录内容
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"):
print(f"{entry.name}: {entry.is_directory}")
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"):
print(f"{entry.name}: {entry.is_directory}")
Upload/download with progress (parallel)
带进度条的并行上传/下载
w.files.upload_from(
file_path="/Volumes/main/default/my_volume/large.parquet",
source_path="/local/path/large.parquet",
use_parallel=True
)
w.files.download_to(
file_path="/Volumes/main/default/my_volume/large.parquet",
destination="/local/output/",
use_parallel=True
)
undefinedw.files.upload_from(
file_path="/Volumes/main/default/my_volume/large.parquet",
source_path="/local/path/large.parquet",
use_parallel=True
)
w.files.download_to(
file_path="/Volumes/main/default/my_volume/large.parquet",
destination="/local/output/",
use_parallel=True
)
undefinedServing Endpoints (Model Serving)
服务端点(模型服务)
python
undefinedList endpoints
列出端点
for ep in w.serving_endpoints.list():
print(f"{ep.name}: {ep.state}")
for ep in w.serving_endpoints.list():
print(f"{ep.name}: {ep.state}")
Get endpoint
获取端点详情
endpoint = w.serving_endpoints.get(name="my-endpoint")
endpoint = w.serving_endpoints.get(name="my-endpoint")
Query endpoint
查询端点
response = w.serving_endpoints.query(
name="my-endpoint",
inputs={"prompt": "Hello, world!"}
)
response = w.serving_endpoints.query(
name="my-endpoint",
inputs={"prompt": "Hello, world!"}
)
For chat/completions endpoints
对于聊天/补全类端点
response = w.serving_endpoints.query(
name="my-chat-endpoint",
messages=[{"role": "user", "content": "Hello!"}]
)
response = w.serving_endpoints.query(
name="my-chat-endpoint",
messages=[{"role": "user", "content": "Hello!"}]
)
Get OpenAI-compatible client
获取兼容OpenAI的客户端
openai_client = w.serving_endpoints.get_open_ai_client()
undefinedopenai_client = w.serving_endpoints.get_open_ai_client()
undefinedVector Search
向量搜索
List vector search indexes
列出向量搜索索引
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"):
print(idx.name)
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"):
print(idx.name)
Query index
查询索引
results = w.vector_search_indexes.query_index(
index_name="main.default.my_index",
columns=["id", "text", "embedding"],
query_text="search query",
num_results=10
)
for doc in results.result.data_array:
print(doc)
undefinedresults = w.vector_search_indexes.query_index(
index_name="main.default.my_index",
columns=["id", "text", "embedding"],
query_text="search query",
num_results=10
)
for doc in results.result.data_array:
print(doc)
undefinedPipelines (Delta Live Tables)
流水线(Delta Live Tables)
python
undefinedList pipelines
列出流水线
for pipeline in w.pipelines.list_pipelines():
print(f"{pipeline.name}: {pipeline.state}")
for pipeline in w.pipelines.list_pipelines():
print(f"{pipeline.name}: {pipeline.state}")
Get pipeline
获取流水线详情
pipeline = w.pipelines.get(pipeline_id="abc123")
pipeline = w.pipelines.get(pipeline_id="abc123")
Start pipeline update
启动流水线更新
w.pipelines.start_update(pipeline_id="abc123")
w.pipelines.start_update(pipeline_id="abc123")
Stop pipeline
停止流水线
w.pipelines.stop_and_wait(pipeline_id="abc123")
undefinedw.pipelines.stop_and_wait(pipeline_id="abc123")
undefinedSecrets
密钥
python
undefinedList secret scopes
列出密钥作用域
for scope in w.secrets.list_scopes():
print(scope.name)
for scope in w.secrets.list_scopes():
print(scope.name)
Create scope
创建密钥作用域
w.secrets.create_scope(scope="my-scope")
w.secrets.create_scope(scope="my-scope")
Put secret
添加密钥
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
Get secret (returns GetSecretResponse with value)
获取密钥(返回包含值的GetSecretResponse对象)
secret = w.secrets.get_secret(scope="my-scope", key="api-key")
secret = w.secrets.get_secret(scope="my-scope", key="api-key")
List secrets in scope (metadata only, not values)
列出作用域中的密钥(仅元数据,不包含值)
for s in w.secrets.list_secrets(scope="my-scope"):
print(s.key)
undefinedfor s in w.secrets.list_secrets(scope="my-scope"):
print(s.key)
undefinedDBUtils
DBUtils
python
undefinedpython
undefinedAccess dbutils through WorkspaceClient
通过WorkspaceClient访问dbutils
dbutils = w.dbutils
dbutils = w.dbutils
File system operations
文件系统操作
files = dbutils.fs.ls("/")
dbutils.fs.cp("dbfs:/source", "dbfs:/dest")
dbutils.fs.rm("dbfs:/path", recurse=True)
files = dbutils.fs.ls("/")
dbutils.fs.cp("dbfs:/source", "dbfs:/dest")
dbutils.fs.rm("dbfs:/path", recurse=True)
Secrets (same as w.secrets but dbutils interface)
密钥操作(与w.secrets功能相同,但使用dbutils接口)
value = dbutils.secrets.get(scope="my-scope", key="my-key")
---value = dbutils.secrets.get(scope="my-scope", key="my-key")
---Common Patterns
常见模式
CRITICAL: Async Applications (FastAPI, etc.)
关键注意事项:异步应用(FastAPI等)
The Databricks SDK is fully synchronous. All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with to avoid blocking the event loop.
asyncio.to_thread()python
import asyncio
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()Databricks SDK是完全同步的。所有调用都会阻塞线程。在异步应用(FastAPI、asyncio)中,必须使用包装SDK调用,避免阻塞事件循环。
asyncio.to_thread()python
import asyncio
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()WRONG - blocks the event loop
错误示例 - 阻塞事件循环
async def get_clusters_bad():
return list(w.clusters.list()) # BLOCKS!
async def get_clusters_bad():
return list(w.clusters.list()) # 阻塞线程!
CORRECT - runs in thread pool
正确示例 - 在线程池中运行
async def get_clusters_good():
return await asyncio.to_thread(lambda: list(w.clusters.list()))
async def get_clusters_good():
return await asyncio.to_thread(lambda: list(w.clusters.list()))
CORRECT - for simple calls
正确示例 - 简单调用
async def get_cluster(cluster_id: str):
return await asyncio.to_thread(w.clusters.get, cluster_id)
async def get_cluster(cluster_id: str):
return await asyncio.to_thread(w.clusters.get, cluster_id)
CORRECT - FastAPI endpoint
正确示例 - FastAPI端点
from fastapi import FastAPI
app = FastAPI()
@app.get("/clusters")
async def list_clusters():
clusters = await asyncio.to_thread(lambda: list(w.clusters.list()))
return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query")
async def run_query(sql: str, warehouse_id: str):
# Wrap the blocking SDK call
response = await asyncio.to_thread(
w.statement_execution.execute_statement,
statement=sql,
warehouse_id=warehouse_id,
wait_timeout="30s"
)
return response.result.data_array
**Note:** `WorkspaceClient().config.host` is NOT a network call - it just reads config. No need to wrap property access.
---from fastapi import FastAPI
app = FastAPI()
@app.get("/clusters")
async def list_clusters():
clusters = await asyncio.to_thread(lambda: list(w.clusters.list()))
return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query")
async def run_query(sql: str, warehouse_id: str):
# 包装阻塞的SDK调用
response = await asyncio.to_thread(
w.statement_execution.execute_statement,
statement=sql,
warehouse_id=warehouse_id,
wait_timeout="30s"
)
return response.result.data_array
**注意:** `WorkspaceClient().config.host` 不会发起网络请求——它仅读取配置。无需包装属性访问操作。
---Wait for Long-Running Operations
等待长时间运行的操作
python
from datetime import timedeltapython
from datetime import timedeltaPattern 1: Use *_and_wait methods
模式1:使用*_and_wait方法
cluster = w.clusters.create_and_wait(
cluster_name="test",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
cluster = w.clusters.create_and_wait(
cluster_name="test",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
Pattern 2: Use Wait object
模式2:使用Wait对象
wait = w.clusters.create(...)
cluster = wait.result() # Blocks until ready
wait = w.clusters.create(...)
cluster = wait.result() # 阻塞直到操作完成
Pattern 3: Manual polling with callback
模式3:带回调的手动轮询
def progress(cluster):
print(f"State: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running(
cluster_id="...",
timeout=timedelta(minutes=30),
callback=progress
)
undefineddef progress(cluster):
print(f"状态: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running(
cluster_id="...",
timeout=timedelta(minutes=30),
callback=progress
)
undefinedPagination
分页处理
python
undefinedpython
undefinedAll list methods return iterators that handle pagination automatically
所有list方法返回自动处理分页的迭代器
for job in w.jobs.list(): # Fetches all pages
print(job.settings.name)
for job in w.jobs.list(): # 自动获取所有分页数据
print(job.settings.name)
For manual control
手动控制分页
from databricks.sdk.service.jobs import ListJobsRequest
response = w.jobs.list(limit=10)
for job in response:
print(job)
undefinedfrom databricks.sdk.service.jobs import ListJobsRequest
response = w.jobs.list(limit=10)
for job in response:
print(job)
undefinedError Handling
错误处理
python
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists
try:
cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
print("Cluster not found")
except PermissionDenied:
print("Access denied")python
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists
try:
cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
print("集群不存在")
except PermissionDenied:
print("访问被拒绝")When Uncertain
不确定时的处理方式
If I'm unsure about a method, I should:
-
Check the documentation URL pattern:
https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html
-
Common categories:
- Clusters:
/workspace/compute/clusters.html - Jobs:
/workspace/jobs/jobs.html - Tables:
/workspace/catalog/tables.html - Warehouses:
/workspace/sql/warehouses.html - Serving:
/workspace/serving/serving_endpoints.html
- Clusters:
-
Fetch and verify before providing guidance on parameters or return types.
如果对某个方法不确定,应:
-
检查文档URL模式:
https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html
-
常见分类:
- 集群:
/workspace/compute/clusters.html - 任务:
/workspace/jobs/jobs.html - 表:
/workspace/catalog/tables.html - 仓库:
/workspace/sql/warehouses.html - 服务端点:
/workspace/serving/serving_endpoints.html
- 集群:
-
在提供参数或返回类型的指导前,先获取并验证相关信息。