databricks-sdk-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Databricks SDK Patterns

Databricks SDK Patterns

Overview

概述

Production-ready patterns for Databricks SDK usage in Python.
适用于Python中Databricks SDK使用的生产级模式。

Prerequisites

前提条件

  • Completed
    databricks-install-auth
    setup
  • Familiarity with async/await patterns
  • Understanding of error handling best practices
  • 已完成
    databricks-install-auth
    配置
  • 熟悉async/await模式
  • 了解错误处理最佳实践

Instructions

操作步骤

Step 1: Implement Singleton Pattern

步骤1:实现单例模式

python
undefined
python
undefined

src/databricks/client.py

src/databricks/client.py

from databricks.sdk import WorkspaceClient from functools import lru_cache from typing import Optional import os
@lru_cache(maxsize=1) def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient: """Get singleton WorkspaceClient instance.""" return WorkspaceClient(profile=profile)
from databricks.sdk import WorkspaceClient from functools import lru_cache from typing import Optional import os
@lru_cache(maxsize=1) def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient: """Get singleton WorkspaceClient instance.""" return WorkspaceClient(profile=profile)

Multi-workspace support

Multi-workspace support

_clients: dict[str, WorkspaceClient] = {}
def get_client_for_workspace(workspace_name: str) -> WorkspaceClient: """Get or create client for specific workspace.""" if workspace_name not in _clients: config = get_workspace_config(workspace_name) _clients[workspace_name] = WorkspaceClient( host=config["host"], token=config["token"] ) return _clients[workspace_name]
def get_workspace_config(name: str) -> dict: """Load workspace config from environment.""" return { "host": os.environ[f"DATABRICKS_{name.upper()}HOST"], "token": os.environ[f"DATABRICKS{name.upper()}_TOKEN"], }
undefined
_clients: dict[str, WorkspaceClient] = {}
def get_client_for_workspace(workspace_name: str) -> WorkspaceClient: """Get or create client for specific workspace.""" if workspace_name not in _clients: config = get_workspace_config(workspace_name) _clients[workspace_name] = WorkspaceClient( host=config["host"], token=config["token"] ) return _clients[workspace_name]
def get_workspace_config(name: str) -> dict: """Load workspace config from environment.""" return { "host": os.environ[f"DATABRICKS_{name.upper()}HOST"], "token": os.environ[f"DATABRICKS{name.upper()}_TOKEN"], }
undefined

Step 2: Add Error Handling Wrapper

步骤2:添加错误处理包装器

python
undefined
python
undefined

src/databricks/errors.py

src/databricks/errors.py

from databricks.sdk.errors import ( DatabricksError, NotFound, ResourceAlreadyExists, PermissionDenied, ResourceConflict, BadRequest, ) from dataclasses import dataclass from typing import TypeVar, Generic, Optional import logging
logger = logging.getLogger(name)
T = TypeVar("T")
@dataclass class Result(Generic[T]): """Result wrapper for API operations.""" data: Optional[T] = None error: Optional[Exception] = None
@property
def is_success(self) -> bool:
    return self.error is None

def unwrap(self) -> T:
    if self.error:
        raise self.error
    return self.data
def safe_databricks_call(operation): """Decorator for safe Databricks API calls.""" def wrapper(*args, **kwargs) -> Result: try: result = operation(*args, **kwargs) return Result(data=result) except NotFound as e: logger.warning(f"Resource not found: {e}") return Result(error=e) except ResourceAlreadyExists as e: logger.warning(f"Resource already exists: {e}") return Result(error=e) except PermissionDenied as e: logger.error(f"Permission denied: {e}") return Result(error=e) except DatabricksError as e: logger.error(f"Databricks error: {e.error_code} - {e.message}") return Result(error=e) return wrapper
undefined
from databricks.sdk.errors import ( DatabricksError, NotFound, ResourceAlreadyExists, PermissionDenied, ResourceConflict, BadRequest, ) from dataclasses import dataclass from typing import TypeVar, Generic, Optional import logging
logger = logging.getLogger(name)
T = TypeVar("T")
@dataclass class Result(Generic[T]): """Result wrapper for API operations.""" data: Optional[T] = None error: Optional[Exception] = None
@property
def is_success(self) -> bool:
    return self.error is None

def unwrap(self) -> T:
    if self.error:
        raise self.error
    return self.data
def safe_databricks_call(operation): """Decorator for safe Databricks API calls.""" def wrapper(*args, **kwargs) -> Result: try: result = operation(*args, **kwargs) return Result(data=result) except NotFound as e: logger.warning(f"Resource not found: {e}") return Result(error=e) except ResourceAlreadyExists as e: logger.warning(f"Resource already exists: {e}") return Result(error=e) except PermissionDenied as e: logger.error(f"Permission denied: {e}") return Result(error=e) except DatabricksError as e: logger.error(f"Databricks error: {e.error_code} - {e.message}") return Result(error=e) return wrapper
undefined

Step 3: Implement Retry Logic with Backoff

步骤3:实现带退避的重试逻辑

python
undefined
python
undefined

src/databricks/retry.py

src/databricks/retry.py

import time from typing import Callable, TypeVar from databricks.sdk.errors import ( DatabricksError, TemporarilyUnavailable, TooManyRequests, )
T = TypeVar("T")
def with_retry( operation: Callable[[], T], max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0, jitter: float = 0.5, ) -> T: """Execute operation with exponential backoff retry.""" import random
for attempt in range(max_retries + 1):
    try:
        return operation()
    except (TemporarilyUnavailable, TooManyRequests) as e:
        if attempt == max_retries:
            raise
        delay = min(base_delay * (2 ** attempt), max_delay)
        delay += random.uniform(0, jitter)
        print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
        time.sleep(delay)
    except DatabricksError as e:
        # Don't retry client errors
        if e.error_code and e.error_code.startswith("4"):
            raise
        if attempt == max_retries:
            raise
        delay = min(base_delay * (2 ** attempt), max_delay)
        time.sleep(delay)
raise RuntimeError("Unreachable")
undefined
import time from typing import Callable, TypeVar from databricks.sdk.errors import ( DatabricksError, TemporarilyUnavailable, TooManyRequests, )
T = TypeVar("T")
def with_retry( operation: Callable[[], T], max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0, jitter: float = 0.5, ) -> T: """Execute operation with exponential backoff retry.""" import random
for attempt in range(max_retries + 1):
    try:
        return operation()
    except (TemporarilyUnavailable, TooManyRequests) as e:
        if attempt == max_retries:
            raise
        delay = min(base_delay * (2 ** attempt), max_delay)
        delay += random.uniform(0, jitter)
        print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
        time.sleep(delay)
    except DatabricksError as e:
        # Don't retry client errors
        if e.error_code and e.error_code.startswith("4"):
            raise
        if attempt == max_retries:
            raise
        delay = min(base_delay * (2 ** attempt), max_delay)
        time.sleep(delay)
raise RuntimeError("Unreachable")
undefined

Step 4: Context Manager for Clusters

步骤4:用于集群的上下文管理器

python
undefined
python
undefined

src/databricks/cluster.py

src/databricks/cluster.py

from contextlib import contextmanager from databricks.sdk import WorkspaceClient from databricks.sdk.service.compute import State import time
@contextmanager def managed_cluster(w: WorkspaceClient, cluster_id: str): """Context manager for cluster lifecycle.""" try: # Start cluster if not running cluster = w.clusters.get(cluster_id) if cluster.state != State.RUNNING: print(f"Starting cluster {cluster_id}...") w.clusters.start(cluster_id) w.clusters.wait_get_cluster_running(cluster_id)
    yield cluster_id
finally:
    # Optionally terminate after use
    pass  # Or: w.clusters.delete(cluster_id)
@contextmanager def ephemeral_cluster(w: WorkspaceClient, spec: dict): """Create temporary cluster that auto-deletes.""" cluster = w.clusters.create_and_wait(**spec) try: yield cluster.cluster_id finally: w.clusters.permanent_delete(cluster.cluster_id)
undefined
from contextlib import contextmanager from databricks.sdk import WorkspaceClient from databricks.sdk.service.compute import State import time
@contextmanager def managed_cluster(w: WorkspaceClient, cluster_id: str): """Context manager for cluster lifecycle.""" try: # Start cluster if not running cluster = w.clusters.get(cluster_id) if cluster.state != State.RUNNING: print(f"Starting cluster {cluster_id}...") w.clusters.start(cluster_id) w.clusters.wait_get_cluster_running(cluster_id)
    yield cluster_id
finally:
    # Optionally terminate after use
    pass  # Or: w.clusters.delete(cluster_id)
@contextmanager def ephemeral_cluster(w: WorkspaceClient, spec: dict): """Create temporary cluster that auto-deletes.""" cluster = w.clusters.create_and_wait(**spec) try: yield cluster.cluster_id finally: w.clusters.permanent_delete(cluster.cluster_id)
undefined

Step 5: Type-Safe Job Builders

步骤5:类型安全的Job构建器

python
undefined
python
undefined

src/databricks/jobs.py

src/databricks/jobs.py

from databricks.sdk import WorkspaceClient from databricks.sdk.service.jobs import ( Task, NotebookTask, PythonWheelTask, JobCluster, JobSettings, CreateJob, ) from dataclasses import dataclass, field from typing import Optional
@dataclass class JobBuilder: """Fluent builder for Databricks jobs.""" name: str tasks: list[Task] = field(default_factory=list) job_clusters: list[JobCluster] = field(default_factory=list) tags: dict[str, str] = field(default_factory=dict)
def add_notebook_task(
    self,
    task_key: str,
    notebook_path: str,
    cluster_key: str,
    parameters: Optional[dict] = None,
) -> "JobBuilder":
    """Add a notebook task."""
    self.tasks.append(Task(
        task_key=task_key,
        job_cluster_key=cluster_key,
        notebook_task=NotebookTask(
            notebook_path=notebook_path,
            base_parameters=parameters or {},
        )
    ))
    return self

def add_wheel_task(
    self,
    task_key: str,
    package_name: str,
    entry_point: str,
    cluster_key: str,
    parameters: Optional[list[str]] = None,
) -> "JobBuilder":
    """Add a Python wheel task."""
    self.tasks.append(Task(
        task_key=task_key,
        job_cluster_key=cluster_key,
        python_wheel_task=PythonWheelTask(
            package_name=package_name,
            entry_point=entry_point,
            parameters=parameters or [],
        )
    ))
    return self

def create(self, w: WorkspaceClient) -> int:
    """Create the job and return job_id."""
    job = w.jobs.create(
        name=self.name,
        tasks=self.tasks,
        job_clusters=self.job_clusters,
        tags=self.tags,
    )
    return job.job_id
from databricks.sdk import WorkspaceClient from databricks.sdk.service.jobs import ( Task, NotebookTask, PythonWheelTask, JobCluster, JobSettings, CreateJob, ) from dataclasses import dataclass, field from typing import Optional
@dataclass class JobBuilder: """Fluent builder for Databricks jobs.""" name: str tasks: list[Task] = field(default_factory=list) job_clusters: list[JobCluster] = field(default_factory=list) tags: dict[str, str] = field(default_factory=dict)
def add_notebook_task(
    self,
    task_key: str,
    notebook_path: str,
    cluster_key: str,
    parameters: Optional[dict] = None,
) -> "JobBuilder":
    """Add a notebook task."""
    self.tasks.append(Task(
        task_key=task_key,
        job_cluster_key=cluster_key,
        notebook_task=NotebookTask(
            notebook_path=notebook_path,
            base_parameters=parameters or {},
        )
    ))
    return self

def add_wheel_task(
    self,
    task_key: str,
    package_name: str,
    entry_point: str,
    cluster_key: str,
    parameters: Optional[list[str]] = None,
) -> "JobBuilder":
    """Add a Python wheel task."""
    self.tasks.append(Task(
        task_key=task_key,
        job_cluster_key=cluster_key,
        python_wheel_task=PythonWheelTask(
            package_name=package_name,
            entry_point=entry_point,
            parameters=parameters or [],
        )
    ))
    return self

def create(self, w: WorkspaceClient) -> int:
    """Create the job and return job_id."""
    job = w.jobs.create(
        name=self.name,
        tasks=self.tasks,
        job_clusters=self.job_clusters,
        tags=self.tags,
    )
    return job.job_id

Usage

Usage

job_id = ( JobBuilder("etl-pipeline") .add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster") .add_notebook_task("silver", "/pipelines/silver", "etl-cluster") .create(w) )
undefined
job_id = ( JobBuilder("etl-pipeline") .add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster") .add_notebook_task("silver", "/pipelines/silver", "etl-cluster") .create(w) )
undefined

Output

输出结果

  • Type-safe client singleton
  • Robust error handling with structured logging
  • Automatic retry with exponential backoff
  • Fluent job builder pattern
  • 类型安全的客户端单例
  • 带有结构化日志的健壮错误处理
  • 带指数退避的自动重试
  • 流畅的Job构建器模式

Error Handling

错误处理

PatternUse CaseBenefit
Result wrapperAll API callsType-safe error handling
Retry logicTransient failuresImproves reliability
Context managersCluster lifecycleResource cleanup
BuildersJob creationType safety and fluency
模式使用场景优势
结果包装器所有API调用类型安全的错误处理
重试逻辑临时故障提升可靠性
上下文管理器集群生命周期资源清理
构建器Job创建类型安全与流畅性

Examples

示例

Factory Pattern (Multi-Tenant)

工厂模式(多租户)

python
from functools import lru_cache

@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
    """Get workspace client for tenant."""
    config = get_tenant_config(tenant_id)
    return WorkspaceClient(
        host=config.workspace_url,
        token=config.token,
    )
python
from functools import lru_cache

@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
    """Get workspace client for tenant."""
    config = get_tenant_config(tenant_id)
    return WorkspaceClient(
        host=config.workspace_url,
        token=config.token,
    )

Async Operations

异步操作

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def async_list_clusters(w: WorkspaceClient):
    """Async wrapper for cluster listing."""
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        clusters = await loop.run_in_executor(
            executor,
            lambda: list(w.clusters.list())
        )
    return clusters
python
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def async_list_clusters(w: WorkspaceClient):
    """Async wrapper for cluster listing."""
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        clusters = await loop.run_in_executor(
            executor,
            lambda: list(w.clusters.list())
        )
    return clusters

Resources

参考资源

Next Steps

后续步骤

Apply patterns in
databricks-core-workflow-a
for Delta Lake ETL.
databricks-core-workflow-a
中应用这些模式以实现Delta Lake ETL。