implement-repository-pattern
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseWorks with Python files in domain/repositories/ and infrastructure/ directories.
适用于domain/repositories/和infrastructure/目录下的Python文件。
Implement Repository Pattern
实现Repository模式
Table of Contents
目录
Core Sections
核心章节
- Purpose
- Core responsibility: Create repositories with Protocol/Implementation separation
- Quick Start
- Fastest path: Create repository from user request to working implementation
- Instructions
- Examples
Patterns & Best Practices
模式与最佳实践
- Common Patterns
- Red Flags - STOP
- Critical issues to watch for in repository implementation
- Success Checklist
- Complete validation before marking repository done
- 通用模式
- 警示信号 - 立即停止
- 仓库实现中需要警惕的关键问题
- 完成检查清单
- 标记仓库完成前的完整验证项
Supporting Resources
支持资源
- Requirements
- Dependencies, project structure, and setup requirements
- See Also
- templates/protocol-template.py - Repository protocol skeleton
- templates/implementation-template.py - Neo4j implementation skeleton
- templates/test-template.py - Test suite skeleton
- references/pattern-guide.md - Complete pattern catalog
- references/troubleshooting.md - Common issues and solutions
- scripts/analyze_queries.py - Analyze Cypher queries in repository implementations
- scripts/generate_repository.py - Generate repository pattern files with domain protocol and implementation
- scripts/validate_repository_patterns.py - Validate repository pattern compliance across the codebase
- 依赖要求
- 依赖项、项目结构和设置要求
- 相关链接
- templates/protocol-template.py - 仓库Protocol骨架
- templates/implementation-template.py - Neo4j实现骨架
- templates/test-template.py - 测试套件骨架
- references/pattern-guide.md - 完整模式目录
- references/troubleshooting.md - 常见问题与解决方案
- scripts/analyze_queries.py - 分析仓库实现中的Cypher查询
- scripts/generate_repository.py - 生成包含领域Protocol与实现的仓库模式文件
- scripts/validate_repository_patterns.py - 验证整个代码库中的仓库模式合规性
Purpose
目的
Create repositories following Clean Architecture principles with Protocol (domain layer) and Implementation (infrastructure layer) separation. Ensures proper dependency inversion, ServiceResult return types, and resource lifecycle management.
按照整洁架构原则创建仓库,分离Protocol(领域层)与实现(基础设施层)。确保正确的依赖倒置、ServiceResult返回类型和资源生命周期管理。
When to Use
使用场景
Use this skill when:
- Adding new data access layer - Creating persistence for domain models
- Creating database interaction - Implementing queries and commands against data stores
- Implementing persistence - Storing and retrieving domain entities
- Need to store/retrieve domain models - Data access abstraction required
Trigger phrases:
- "Create a repository for X"
- "Implement data access for Y"
- "Add persistence layer for Z"
- "Store/retrieve domain model X"
在以下场景使用本技能:
- 添加新的数据访问层 - 为领域模型创建持久化机制
- 创建数据库交互 - 实现针对数据存储的查询与命令
- 实现持久化 - 存储与检索领域实体
- 需要存储/检索领域模型 - 需抽象数据访问逻辑
触发短语:
- "为X创建仓库"
- "为Y实现数据访问"
- "为Z添加持久化层"
- "存储/检索领域模型X"
Quick Start
快速开始
User: "Create a repository for storing search history"
What happens:
- Create Protocol interface in
domain/repositories/search_history_repository.py - Create Neo4j implementation in
infrastructure/neo4j/search_history_repository.py - Implement ManagedResource for lifecycle
- Use ServiceResult for all operations
- Add required Cypher queries
Result: ✅ Repository with Protocol + Implementation ready for dependency injection
用户请求: "创建一个用于存储搜索历史的仓库"
执行流程:
- 在中创建Protocol接口
domain/repositories/search_history_repository.py - 在中创建Neo4j实现
infrastructure/neo4j/search_history_repository.py - 实现ManagedResource用于生命周期管理
- 所有操作使用ServiceResult
- 添加所需的Cypher查询语句
结果: ✅ 具备Protocol + 实现的仓库,可直接用于依赖注入
Instructions
操作步骤
Step 1: Create Domain Protocol (Interface)
步骤1:创建领域Protocol(接口)
Location:
src/project_watch_mcp/domain/repositories/{name}_repository.pyPattern:
python
from abc import ABC, abstractmethod
from project_watch_mcp.domain.common import ServiceResult
class {Name}Repository(ABC):
"""Port for {purpose} storage and retrieval.
This interface defines the contract for {operations}.
Concrete implementations will be provided in the infrastructure layer.
"""
@abstractmethod
async def {operation}(self, param: Type) -> ServiceResult[ReturnType]:
"""Brief description of operation.
Args:
param: Description
Returns:
ServiceResult[ReturnType]: Success with data or Failure on errors
"""
passKey Requirements:
- Inherit from
ABC - Use decorator
@abstractmethod - Return for all operations
ServiceResult[T] - Document expected behavior in docstrings
- No implementation details (pure interface)
位置:
src/project_watch_mcp/domain/repositories/{name}_repository.py模式:
python
from abc import ABC, abstractmethod
from project_watch_mcp.domain.common import ServiceResult
class {Name}Repository(ABC):
"""用于{purpose}存储与检索的端口。
此接口定义了{operations}的契约。
具体实现将在基础设施层提供。
"""
@abstractmethod
async def {operation}(self, param: Type) -> ServiceResult[ReturnType]:
"""操作的简要描述。
参数:
param: 参数描述
返回:
ServiceResult[ReturnType]: 成功时返回数据,失败时返回错误信息
"""
pass关键要求:
- 继承自
ABC - 使用装饰器
@abstractmethod - 所有操作返回
ServiceResult[T] - 在文档字符串中说明预期行为
- 不包含任何实现细节(纯接口)
Step 2: Create Infrastructure Implementation
步骤2:创建基础设施层实现
Location:
src/project_watch_mcp/infrastructure/neo4j/{name}_repository.pyPattern:
python
from neo4j import AsyncDriver, RoutingControl
from project_watch_mcp.config.settings import Settings
from project_watch_mcp.domain.common import ServiceResult
from project_watch_mcp.domain.repositories.{name}_repository import {Name}Repository
from project_watch_mcp.domain.services.resource_manager import ManagedResource
class Neo4j{Name}Repository({Name}Repository, ManagedResource):
"""Neo4j adapter implementing {Name}Repository interface."""
def __init__(self, driver: AsyncDriver, settings: Settings):
if not driver:
raise ValueError("Neo4j driver is required")
if not settings:
raise ValueError("Settings is required")
self.driver = driver
self.settings = settings
self.database = settings.neo4j.database_name
async def _execute_with_retry(
self,
query: str,
parameters: dict[str, Any] | None = None,
routing: RoutingControl = RoutingControl.WRITE,
) -> ServiceResult[list[dict]]:
"""Execute query with parameter validation and retry logic."""
# Validate before executing
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"Validation failed: {validation_result.error}")
validated_query = validation_result.data
try:
records, _, _ = await self.driver.execute_query(
cast(LiteralString, validated_query.query),
validated_query.parameters,
database_=self.database,
routing_=routing,
)
return ServiceResult.ok([dict(record) for record in records])
except Neo4jError as e:
return ServiceResult.fail(f"Database error: {str(e)}")
async def close(self) -> None:
"""Close and cleanup resources (ManagedResource protocol)."""
# Repository-specific cleanup if needed
passKey Requirements:
- Implement Protocol interface
- Inherit from
ManagedResource - Required constructor params:
driver: AsyncDriver, settings: Settings - Validate parameters in constructor ()
if not driver: raise ValueError - Use for all database operations
_execute_with_retry() - Implement for resource cleanup
close() - All operations return
ServiceResult[T]
位置:
src/project_watch_mcp/infrastructure/neo4j/{name}_repository.py模式:
python
from neo4j import AsyncDriver, RoutingControl
from project_watch_mcp.config.settings import Settings
from project_watch_mcp.domain.common import ServiceResult
from project_watch_mcp.domain.repositories.{name}_repository import {Name}Repository
from project_watch_mcp.domain.services.resource_manager import ManagedResource
class Neo4j{Name}Repository({Name}Repository, ManagedResource):
"""实现{Name}Repository接口的Neo4j适配器。"""
def __init__(self, driver: AsyncDriver, settings: Settings):
if not driver:
raise ValueError("Neo4j driver是必填项")
if not settings:
raise ValueError("Settings是必填项")
self.driver = driver
self.settings = settings
self.database = settings.neo4j.database_name
async def _execute_with_retry(
self,
query: str,
parameters: dict[str, Any] | None = None,
routing: RoutingControl = RoutingControl.WRITE,
) -> ServiceResult[list[dict]]:
"""带参数验证与重试逻辑的查询执行方法。"""
# 执行前验证
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"验证失败: {validation_result.error}")
validated_query = validation_result.data
try:
records, _, _ = await self.driver.execute_query(
cast(LiteralString, validated_query.query),
validated_query.parameters,
database_=self.database,
routing_=routing,
)
return ServiceResult.ok([dict(record) for record in records])
except Neo4jError as e:
return ServiceResult.fail(f"数据库错误: {str(e)}")
async def close(self) -> None:
"""关闭并清理资源(遵循ManagedResource协议)。"""
# 如有需要,添加仓库特定的清理逻辑
pass关键要求:
- 实现Protocol接口
- 继承自
ManagedResource - 构造函数必填参数:
driver: AsyncDriver, settings: Settings - 在构造函数中验证参数()
if not driver: raise ValueError - 所有数据库操作使用
_execute_with_retry() - 实现用于资源清理
close() - 所有操作返回
ServiceResult[T]
Step 3: Add Cypher Queries
步骤3:添加Cypher查询语句
Location:
src/project_watch_mcp/infrastructure/neo4j/queries.pyPattern:
python
class CypherQueries:
# Existing queries...
# {Name}Repository Queries
GET_{ENTITY} = """
MATCH (e:{Label} {project_name: $project_name, id: $id})
RETURN e
"""
SAVE_{ENTITY} = """
MERGE (e:{Label} {project_name: $project_name, id: $id})
SET e += $properties
SET e.updated_at = datetime()
RETURN e
"""Key Requirements:
- Group queries by repository
- Use parameterized queries (prevent injection)
- Use MERGE for upsert operations
- Include timestamp management
- Document query purpose
See: references/query-patterns.md for common patterns
位置:
src/project_watch_mcp/infrastructure/neo4j/queries.py模式:
python
class CypherQueries:
# 已有的查询语句...
# {Name}Repository 查询语句
GET_{ENTITY} = """
MATCH (e:{Label} {project_name: $project_name, id: $id})
RETURN e
"""
SAVE_{ENTITY} = """
MERGE (e:{Label} {project_name: $project_name, id: $id})
SET e += $properties
SET e.updated_at = datetime()
RETURN e
"""关键要求:
- 按仓库分组查询语句
- 使用参数化查询(防止注入攻击)
- 对于插入更新操作使用MERGE
- 包含时间戳管理
- 说明查询语句的用途
参考: references/query-patterns.md 中的通用模式
Step 4: Register in Container
步骤4:在容器中注册
Location:
src/project_watch_mcp/infrastructure/container.pyPattern:
python
async def {name}_repository(self) -> {Name}Repository:
"""Provide {Name}Repository implementation."""
driver = await self.neo4j_driver()
settings = await self.settings()
return Neo4j{Name}Repository(driver, settings)Key Requirements:
- Return type is Protocol (not implementation)
- Inject dependencies (driver, settings)
- Use async/await for resource initialization
- Follow naming convention:
{name}_repository()
位置:
src/project_watch_mcp/infrastructure/container.py模式:
python
async def {name}_repository(self) -> {Name}Repository:
"""提供{Name}Repository的实现。"""
driver = await self.neo4j_driver()
settings = await self.settings()
return Neo4j{Name}Repository(driver, settings)关键要求:
- 返回类型为Protocol(而非具体实现)
- 注入依赖项(driver, settings)
- 使用async/await进行资源初始化
- 遵循命名规范:
{name}_repository()
Step 5: Create Tests
步骤5:创建测试用例
Unit Tests:
Integration Tests:
tests/unit/infrastructure/neo4j/test_{name}_repository.pytests/integration/infrastructure/neo4j/test_{name}_repository.pyPattern:
python
@pytest.mark.asyncio
async def test_save_{entity}_success(mock_driver, settings):
"""Test successful {entity} save operation."""
# Arrange
repo = Neo4j{Name}Repository(mock_driver, settings)
mock_driver.execute_query.return_value = (
[{"e": {"id": "test", "name": "Test"}}],
None,
None,
)
# Act
result = await repo.save_{entity}(entity_data)
# Assert
assert result.is_success
assert result.data is not NoneKey Requirements:
- Test both success and failure cases
- Mock driver.execute_query for unit tests
- Test parameter validation
- Test ServiceResult.ok() and ServiceResult.fail() paths
- Integration tests use real Neo4j instance
单元测试:
集成测试:
tests/unit/infrastructure/neo4j/test_{name}_repository.pytests/integration/infrastructure/neo4j/test_{name}_repository.py模式:
python
@pytest.mark.asyncio
async def test_save_{entity}_success(mock_driver, settings):
"""测试{entity}保存操作的成功场景。"""
# 准备
repo = Neo4j{Name}Repository(mock_driver, settings)
mock_driver.execute_query.return_value = (
[{"e": {"id": "test", "name": "Test"}}],
None,
None,
)
# 执行
result = await repo.save_{entity}(entity_data)
# 断言
assert result.is_success
assert result.data is not None关键要求:
- 测试成功与失败两种场景
- 单元测试中mock driver.execute_query
- 测试参数验证逻辑
- 测试ServiceResult.ok()和ServiceResult.fail()分支
- 集成测试使用真实的Neo4j实例
Examples
示例
Example 1: Simple CRUD Repository
示例1:简单CRUD仓库
Protocol:
python
class SearchHistoryRepository(ABC):
@abstractmethod
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
pass
@abstractmethod
async def get_recent_queries(self, user_id: str, limit: int) -> ServiceResult[list[str]]:
passImplementation:
python
class Neo4jSearchHistoryRepository(SearchHistoryRepository, ManagedResource):
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
cypher = """
CREATE (q:SearchQuery {query: $query, user_id: $user_id, timestamp: datetime()})
"""
result = await self._execute_with_retry(cypher, {"query": query, "user_id": user_id})
return ServiceResult.ok(None) if result.is_success else resultProtocol:
python
class SearchHistoryRepository(ABC):
@abstractmethod
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
pass
@abstractmethod
async def get_recent_queries(self, user_id: str, limit: int) -> ServiceResult[list[str]]:
pass实现:
python
class Neo4jSearchHistoryRepository(SearchHistoryRepository, ManagedResource):
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
cypher = """
CREATE (q:SearchQuery {query: $query, user_id: $user_id, timestamp: datetime()})
"""
result = await self._execute_with_retry(cypher, {"query": query, "user_id": user_id})
return ServiceResult.ok(None) if result.is_success else resultExample 2: Repository with Complex Domain Model
示例2:包含复杂领域模型的仓库
For advanced patterns, see references/pattern-guide.md:
- Converting Neo4j records to domain models
- Handling nested relationships
- Batch operations
- Transaction management
进阶模式请参考:references/pattern-guide.md
- 将Neo4j记录转换为领域模型
- 处理嵌套关系
- 批量操作
- 事务管理
Example 3: Repository with Pagination
示例3:带分页的仓库
For pagination patterns, see references/pattern-guide.md:
- Cursor-based pagination
- Offset-based pagination
- Performance considerations
分页模式请参考:references/pattern-guide.md
- 基于游标分页
- 基于偏移量分页
- 性能考量
Requirements
依赖要求
Dependencies:
- - Async driver
neo4j>=5.0.0 - - ServiceResult
project_watch_mcp.domain.common - - ManagedResource
project_watch_mcp.domain.services.resource_manager - - Settings injection
project_watch_mcp.config.settings
Project Structure:
src/project_watch_mcp/
├── domain/
│ └── repositories/
│ └── {name}_repository.py # Protocol (ABC)
├── infrastructure/
│ └── neo4j/
│ ├── {name}_repository.py # Implementation
│ └── queries.py # Cypher queries
└── tests/
├── unit/infrastructure/neo4j/
│ └── test_{name}_repository.py
└── integration/infrastructure/neo4j/
└── test_{name}_repository.py依赖项:
- - 异步驱动
neo4j>=5.0.0 - - ServiceResult
project_watch_mcp.domain.common - - ManagedResource
project_watch_mcp.domain.services.resource_manager - - Settings注入
project_watch_mcp.config.settings
项目结构:
src/project_watch_mcp/
├── domain/
│ └── repositories/
│ └── {name}_repository.py # Protocol (ABC)
├── infrastructure/
│ └── neo4j/
│ ├── {name}_repository.py # 实现
│ └── queries.py # Cypher查询语句
└── tests/
├── unit/infrastructure/neo4j/
│ └── test_{name}_repository.py
└── integration/infrastructure/neo4j/
└── test_{name}_repository.pyCommon Patterns
通用模式
Pattern 1: Query Parameter Validation
模式1:查询参数验证
Always validate query parameters before execution:
python
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"Validation failed: {validation_result.error}")执行前务必验证查询参数:
python
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"验证失败: {validation_result.error}")Pattern 2: ServiceResult Propagation
模式2:ServiceResult传播
Chain ServiceResult operations:
python
result = await self._execute_with_retry(query, params)
if result.is_failure:
return ServiceResult.fail(f"Failed to save: {result.error}")链式调用ServiceResult操作:
python
result = await self._execute_with_retry(query, params)
if result.is_failure:
return ServiceResult.fail(f"保存失败: {result.error}")Transform data and return success
转换数据并返回成功结果
return ServiceResult.ok(transformed_data)
undefinedreturn ServiceResult.ok(transformed_data)
undefinedPattern 3: Resource Lifecycle
模式3:资源生命周期
Implement ManagedResource for proper cleanup:
python
async def close(self) -> None:
"""Cleanup repository-specific resources."""
# Driver is managed externally by container
# Only cleanup repository-specific resources here
logger.debug(f"{self.__class__.__name__} cleanup complete")See: references/pattern-guide.md for complete pattern catalog
实现ManagedResource以保证正确的清理:
python
async def close(self) -> None:
"""清理仓库特定资源。"""
# Driver由容器外部管理
# 仅在此处清理仓库特定的资源
logger.debug(f"{self.__class__.__name__} 清理完成")参考: references/pattern-guide.md 中的完整模式目录
Red Flags - STOP
警示信号 - 立即停止
If you see any of these, investigate immediately:
- ❌ Protocol in infrastructure layer → Must be in domain
- ❌ Return on error → Use
NoneServiceResult.fail() - ❌ Optional parameter → Must be required
settings - ❌ Direct driver usage → Use
_execute_with_retry() - ❌ Missing parameter validation → Validate in constructor
- ❌ Raw Cypher in methods → Use class
CypherQueries - ❌ Synchronous methods → All methods must be
async - ❌ Missing → Required for lifecycle
ManagedResource - ❌ Return domain models from Neo4j layer → Convert in repository
- ❌ Missing tests → Must have unit + integration tests
如果出现以下情况,请立即排查:
- ❌ Protocol位于基础设施层 → 必须放在领域层
- ❌ 错误时返回→ 应使用
NoneServiceResult.fail() - ❌ 为可选参数 → 必须设为必填
settings - ❌ 直接使用driver → 应使用
_execute_with_retry() - ❌ 缺少参数验证 → 需在构造函数中验证
- ❌ 方法中使用原始Cypher语句 → 应使用类
CypherQueries - ❌ 同步方法 → 所有方法必须为
async - ❌ 未实现→ 生命周期管理是必填项
ManagedResource - ❌ Neo4j层返回领域模型 → 应在仓库中转换
- ❌ 缺少测试用例 → 必须包含单元 + 集成测试
Success Checklist
完成检查清单
Before marking repository complete:
- Protocol exists in
domain/repositories/ - Implementation exists in
infrastructure/neo4j/ - Constructor validates and
driversettings - All methods return
ServiceResult[T] - Implements with
ManagedResourceclose() - Uses for all operations
_execute_with_retry() - Queries defined in
CypherQueries - Registered in container
- Unit tests passing (mocked driver)
- Integration tests passing (real Neo4j)
- Quality gates pass ()
./scripts/check_all.sh - Documentation updated (if new pattern)
标记仓库完成前,请确认以下项:
- Protocol存在于目录
domain/repositories/ - 实现存在于目录
infrastructure/neo4j/ - 构造函数验证了和
driversettings - 所有方法返回
ServiceResult[T] - 实现了并包含
ManagedResource方法close() - 所有操作使用
_execute_with_retry() - 查询语句定义在中
CypherQueries - 已在容器中注册
- 单元测试通过(使用mock driver)
- 集成测试通过(使用真实Neo4j)
- 质量门禁通过()
./scripts/check_all.sh - 文档已更新(若涉及新模式)
See Also
相关链接
- templates/protocol-template.py - Repository protocol skeleton
- templates/implementation-template.py - Neo4j implementation skeleton
- templates/test-template.py - Test suite skeleton
- references/pattern-guide.md - Complete pattern catalog
- references/troubleshooting.md - Common issues and solutions
Related Skills:
- - For container registration
implement-dependency-injection - - For architecture compliance
validate-layer-boundaries - - For validation before commit
run-quality-gates
Last Updated: 2025-10-18
- templates/protocol-template.py - 仓库Protocol骨架
- templates/implementation-template.py - Neo4j实现骨架
- templates/test-template.py - 测试套件骨架
- references/pattern-guide.md - 完整模式目录
- references/troubleshooting.md - 常见问题与解决方案
相关技能:
- - 用于容器注册
implement-dependency-injection - - 用于架构合规性检查
validate-layer-boundaries - - 用于提交前的验证
run-quality-gates
最后更新: 2025-10-18