azure-cosmos-db-py
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCosmos DB Service Implementation
Cosmos DB 服务实现
Build production-grade Azure Cosmos DB NoSQL services following clean code, security best practices, and TDD principles.
遵循整洁代码、安全最佳实践和TDD原则,构建生产级Azure Cosmos DB NoSQL服务。
Installation
安装
bash
pip install azure-cosmos azure-identitybash
pip install azure-cosmos azure-identityEnvironment Variables
环境变量
bash
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE_NAME=<database-name>
COSMOS_CONTAINER_ID=<container-id>bash
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE_NAME=<database-name>
COSMOS_CONTAINER_ID=<container-id>For emulator only (not production)
仅适用于模拟器(生产环境不使用)
COSMOS_KEY=<emulator-key>
undefinedCOSMOS_KEY=<emulator-key>
undefinedAuthentication
认证方式
DefaultAzureCredential (preferred):
python
from azure.cosmos import CosmosClient
from azure.identity import DefaultAzureCredential
client = CosmosClient(
url=os.environ["COSMOS_ENDPOINT"],
credential=DefaultAzureCredential()
)Emulator (local development):
python
from azure.cosmos import CosmosClient
client = CosmosClient(
url="https://localhost:8081",
credential=os.environ["COSMOS_KEY"],
connection_verify=False
)DefaultAzureCredential(推荐):
python
from azure.cosmos import CosmosClient
from azure.identity import DefaultAzureCredential
client = CosmosClient(
url=os.environ["COSMOS_ENDPOINT"],
credential=DefaultAzureCredential()
)模拟器(本地开发):
python
from azure.cosmos import CosmosClient
client = CosmosClient(
url="https://localhost:8081",
credential=os.environ["COSMOS_KEY"],
connection_verify=False
)Architecture Overview
架构概述
┌─────────────────────────────────────────────────────────────────┐
│ FastAPI Router │
│ - Auth dependencies (get_current_user, get_current_user_required)
│ - HTTP error responses (HTTPException) │
└──────────────────────────────┬──────────────────────────────────┘
│
┌──────────────────────────────▼──────────────────────────────────┐
│ Service Layer │
│ - Business logic and validation │
│ - Document ↔ Model conversion │
│ - Graceful degradation when Cosmos unavailable │
└──────────────────────────────┬──────────────────────────────────┘
│
┌──────────────────────────────▼──────────────────────────────────┐
│ Cosmos DB Client Module │
│ - Singleton container initialization │
│ - Dual auth: DefaultAzureCredential (Azure) / Key (emulator) │
│ - Async wrapper via run_in_threadpool │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│ FastAPI 路由层 │
│ - 认证依赖项 (get_current_user, get_current_user_required)
│ - HTTP错误响应 (HTTPException) │
└──────────────────────────────┬──────────────────────────────────┘
│
┌──────────────────────────────▼──────────────────────────────────┐
│ 服务层 │
│ - 业务逻辑与验证 │
│ - 文档 ↔ 模型转换 │
│ - Cosmos不可用时的优雅降级 │
└──────────────────────────────┬──────────────────────────────────┘
│
┌──────────────────────────────▼──────────────────────────────────┐
│ Cosmos DB 客户端模块 │
│ - 单例容器初始化 │
│ - 双重认证:DefaultAzureCredential(Azure环境)/ 密钥(模拟器)│
│ - 通过run_in_threadpool实现异步包装 │
└─────────────────────────────────────────────────────────────────┘Quick Start
快速开始
1. Client Module Setup
1. 客户端模块配置
Create a singleton Cosmos client with dual authentication:
python
undefined创建支持双重认证的单例Cosmos客户端:
python
undefineddb/cosmos.py
db/cosmos.py
from azure.cosmos import CosmosClient
from azure.identity import DefaultAzureCredential
from starlette.concurrency import run_in_threadpool
_cosmos_container = None
def _is_emulator_endpoint(endpoint: str) -> bool:
return "localhost" in endpoint or "127.0.0.1" in endpoint
async def get_container():
global _cosmos_container
if _cosmos_container is None:
if _is_emulator_endpoint(settings.cosmos_endpoint):
client = CosmosClient(
url=settings.cosmos_endpoint,
credential=settings.cosmos_key,
connection_verify=False
)
else:
client = CosmosClient(
url=settings.cosmos_endpoint,
credential=DefaultAzureCredential()
)
db = client.get_database_client(settings.cosmos_database_name)
_cosmos_container = db.get_container_client(settings.cosmos_container_id)
return _cosmos_container
**Full implementation**: See [references/client-setup.md](references/client-setup.md)from azure.cosmos import CosmosClient
from azure.identity import DefaultAzureCredential
from starlette.concurrency import run_in_threadpool
_cosmos_container = None
def _is_emulator_endpoint(endpoint: str) -> bool:
return "localhost" in endpoint or "127.0.0.1" in endpoint
async def get_container():
global _cosmos_container
if _cosmos_container is None:
if _is_emulator_endpoint(settings.cosmos_endpoint):
client = CosmosClient(
url=settings.cosmos_endpoint,
credential=settings.cosmos_key,
connection_verify=False
)
else:
client = CosmosClient(
url=settings.cosmos_endpoint,
credential=DefaultAzureCredential()
)
db = client.get_database_client(settings.cosmos_database_name)
_cosmos_container = db.get_container_client(settings.cosmos_container_id)
return _cosmos_container
**完整实现**:查看 [references/client-setup.md](references/client-setup.md)2. Pydantic Model Hierarchy
2. Pydantic 模型层级
Use five-tier model pattern for clean separation:
python
class ProjectBase(BaseModel): # Shared fields
name: str = Field(..., min_length=1, max_length=200)
class ProjectCreate(ProjectBase): # Creation request
workspace_id: str = Field(..., alias="workspaceId")
class ProjectUpdate(BaseModel): # Partial updates (all optional)
name: Optional[str] = Field(None, min_length=1)
class Project(ProjectBase): # API response
id: str
created_at: datetime = Field(..., alias="createdAt")
class ProjectInDB(Project): # Internal with docType
doc_type: str = "project"使用五层模型模式实现清晰的职责分离:
python
class ProjectBase(BaseModel): # 共享字段
name: str = Field(..., min_length=1, max_length=200)
class ProjectCreate(ProjectBase): # 创建请求模型
workspace_id: str = Field(..., alias="workspaceId")
class ProjectUpdate(BaseModel): # 部分更新模型(所有字段可选)
name: Optional[str] = Field(None, min_length=1)
class Project(ProjectBase): # API响应模型
id: str
created_at: datetime = Field(..., alias="createdAt")
class ProjectInDB(Project): # 内部数据库模型(含docType)
doc_type: str = "project"3. Service Layer Pattern
3. 服务层模式
python
class ProjectService:
def _use_cosmos(self) -> bool:
return get_container() is not None
async def get_by_id(self, project_id: str, workspace_id: str) -> Project | None:
if not self._use_cosmos():
return None
doc = await get_document(project_id, partition_key=workspace_id)
if doc is None:
return None
return self._doc_to_model(doc)Full patterns: See references/service-layer.md
python
class ProjectService:
def _use_cosmos(self) -> bool:
return get_container() is not None
async def get_by_id(self, project_id: str, workspace_id: str) -> Project | None:
if not self._use_cosmos():
return None
doc = await get_document(project_id, partition_key=workspace_id)
if doc is None:
return None
return self._doc_to_model(doc)完整模式:查看 references/service-layer.md
Core Principles
核心原则
Security Requirements
安全要求
- RBAC Authentication: Use in Azure — never store keys in code
DefaultAzureCredential - Emulator-Only Keys: Hardcode the well-known emulator key only for local development
- Parameterized Queries: Always use syntax — never string concatenation
@parameter - Partition Key Validation: Validate partition key access matches user authorization
- RBAC认证:在Azure环境中使用——绝不在代码中存储密钥
DefaultAzureCredential - 仅模拟器使用密钥:仅在本地开发时使用众所周知的模拟器密钥
- 参数化查询:始终使用语法——绝不使用字符串拼接
@parameter - 分区键验证:验证分区键访问权限与用户授权匹配
Clean Code Conventions
整洁代码规范
- Single Responsibility: Client module handles connection; services handle business logic
- Graceful Degradation: Services return /
Nonewhen Cosmos unavailable[] - Consistent Naming: ,
_doc_to_model(),_model_to_doc()_use_cosmos() - Type Hints: Full typing on all public methods
- CamelCase Aliases: Use for JSON serialization
Field(alias="camelCase")
- 单一职责:客户端模块处理连接;服务层处理业务逻辑
- 优雅降级:Cosmos不可用时,服务返回/
None[] - 命名一致:、
_doc_to_model()、_model_to_doc()_use_cosmos() - 类型提示:所有公共方法都添加完整类型注解
- 驼峰别名:使用实现JSON序列化
Field(alias="camelCase")
TDD Requirements
TDD 要求
Write tests BEFORE implementation using these patterns:
python
@pytest.fixture
def mock_cosmos_container(mocker):
container = mocker.MagicMock()
mocker.patch("app.db.cosmos.get_container", return_value=container)
return container
@pytest.mark.asyncio
async def test_get_project_by_id_returns_project(mock_cosmos_container):
# Arrange
mock_cosmos_container.read_item.return_value = {"id": "123", "name": "Test"}
# Act
result = await project_service.get_by_id("123", "workspace-1")
# Assert
assert result.id == "123"
assert result.name == "Test"Full testing guide: See references/testing.md
在实现前编写测试,遵循以下模式:
python
@pytest.fixture
def mock_cosmos_container(mocker):
container = mocker.MagicMock()
mocker.patch("app.db.cosmos.get_container", return_value=container)
return container
@pytest.mark.asyncio
async def test_get_project_by_id_returns_project(mock_cosmos_container):
# 准备
mock_cosmos_container.read_item.return_value = {"id": "123", "name": "Test"}
# 执行
result = await project_service.get_by_id("123", "workspace-1")
# 断言
assert result.id == "123"
assert result.name == "Test"完整测试指南:查看 references/testing.md
Reference Files
参考文档
| File | When to Read |
|---|---|
| references/client-setup.md | Setting up Cosmos client with dual auth, SSL config, singleton pattern |
| references/service-layer.md | Implementing full service class with CRUD, conversions, graceful degradation |
| references/testing.md | Writing pytest tests, mocking Cosmos, integration test setup |
| references/partitioning.md | Choosing partition keys, cross-partition queries, move operations |
| references/error-handling.md | Handling CosmosResourceNotFoundError, logging, HTTP error mapping |
| 文件 | 阅读场景 |
|---|---|
| references/client-setup.md | 配置带有双重认证、SSL设置、单例模式的Cosmos客户端 |
| references/service-layer.md | 实现包含CRUD、转换逻辑、优雅降级的完整服务类 |
| references/testing.md | 编写pytest测试、模拟Cosmos、集成测试配置 |
| references/partitioning.md | 选择分区键、跨分区查询、数据迁移操作 |
| references/error-handling.md | 处理CosmosResourceNotFoundError、日志记录、HTTP错误映射 |
Template Files
模板文件
| File | Purpose |
|---|---|
| assets/cosmos_client_template.py | Ready-to-use client module |
| assets/service_template.py | Service class skeleton |
| assets/conftest_template.py | pytest fixtures for Cosmos mocking |
| 文件 | 用途 |
|---|---|
| assets/cosmos_client_template.py | 可直接使用的客户端模块 |
| assets/service_template.py | 服务类骨架 |
| assets/conftest_template.py | 用于Cosmos模拟的pytest夹具 |
Quality Attributes (NFRs)
质量属性(非功能需求)
Reliability
可靠性
- Graceful degradation when Cosmos unavailable
- Retry logic with exponential backoff for transient failures
- Connection pooling via singleton pattern
- Cosmos不可用时的优雅降级
- 针对瞬时故障的指数退避重试逻辑
- 通过单例模式实现连接池
Security
安全性
- Zero secrets in code (RBAC via DefaultAzureCredential)
- Parameterized queries prevent injection
- Partition key isolation enforces data boundaries
- 代码中无敏感信息(通过DefaultAzureCredential实现RBAC)
- 参数化查询防止注入攻击
- 分区键隔离强化数据边界
Maintainability
可维护性
- Five-tier model pattern enables schema evolution
- Service layer decouples business logic from storage
- Consistent patterns across all entity services
- 五层模型模式支持 schema 演进
- 服务层解耦业务逻辑与存储实现
- 所有实体服务遵循一致模式
Testability
可测试性
- Dependency injection via
get_container() - Easy mocking with module-level globals
- Clear separation enables unit testing without Cosmos
- 通过实现依赖注入
get_container() - 模块级全局变量便于模拟
- 清晰的职责分离无需Cosmos即可进行单元测试
Performance
性能
- Partition key queries avoid cross-partition scans
- Async wrapping prevents blocking FastAPI event loop
- Minimal document conversion overhead
- 分区键查询避免跨分区扫描
- 异步包装避免阻塞FastAPI事件循环
- 最小化文档转换开销