implement-repository-pattern

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese
Works with Python files in domain/repositories/ and infrastructure/ directories.
适用于domain/repositories/和infrastructure/目录下的Python文件。

Implement Repository Pattern

实现Repository模式

Table of Contents

目录

Core Sections

核心章节

Patterns & Best Practices

模式与最佳实践

Supporting Resources

支持资源

  • Requirements
    • Dependencies, project structure, and setup requirements
  • See Also
    • templates/protocol-template.py - Repository protocol skeleton
    • templates/implementation-template.py - Neo4j implementation skeleton
    • templates/test-template.py - Test suite skeleton
    • references/pattern-guide.md - Complete pattern catalog
    • references/troubleshooting.md - Common issues and solutions
    • scripts/analyze_queries.py - Analyze Cypher queries in repository implementations
    • scripts/generate_repository.py - Generate repository pattern files with domain protocol and implementation
    • scripts/validate_repository_patterns.py - Validate repository pattern compliance across the codebase
  • 依赖要求
    • 依赖项、项目结构和设置要求
  • 相关链接
    • templates/protocol-template.py - 仓库Protocol骨架
    • templates/implementation-template.py - Neo4j实现骨架
    • templates/test-template.py - 测试套件骨架
    • references/pattern-guide.md - 完整模式目录
    • references/troubleshooting.md - 常见问题与解决方案
    • scripts/analyze_queries.py - 分析仓库实现中的Cypher查询
    • scripts/generate_repository.py - 生成包含领域Protocol与实现的仓库模式文件
    • scripts/validate_repository_patterns.py - 验证整个代码库中的仓库模式合规性

Purpose

目的

Create repositories following Clean Architecture principles with Protocol (domain layer) and Implementation (infrastructure layer) separation. Ensures proper dependency inversion, ServiceResult return types, and resource lifecycle management.
按照整洁架构原则创建仓库,分离Protocol(领域层)与实现(基础设施层)。确保正确的依赖倒置、ServiceResult返回类型和资源生命周期管理。

When to Use

使用场景

Use this skill when:
  • Adding new data access layer - Creating persistence for domain models
  • Creating database interaction - Implementing queries and commands against data stores
  • Implementing persistence - Storing and retrieving domain entities
  • Need to store/retrieve domain models - Data access abstraction required
Trigger phrases:
  • "Create a repository for X"
  • "Implement data access for Y"
  • "Add persistence layer for Z"
  • "Store/retrieve domain model X"
在以下场景使用本技能:
  • 添加新的数据访问层 - 为领域模型创建持久化机制
  • 创建数据库交互 - 实现针对数据存储的查询与命令
  • 实现持久化 - 存储与检索领域实体
  • 需要存储/检索领域模型 - 需抽象数据访问逻辑
触发短语:
  • "为X创建仓库"
  • "为Y实现数据访问"
  • "为Z添加持久化层"
  • "存储/检索领域模型X"

Quick Start

快速开始

User: "Create a repository for storing search history"
What happens:
  1. Create Protocol interface in
    domain/repositories/search_history_repository.py
  2. Create Neo4j implementation in
    infrastructure/neo4j/search_history_repository.py
  3. Implement ManagedResource for lifecycle
  4. Use ServiceResult for all operations
  5. Add required Cypher queries
Result: ✅ Repository with Protocol + Implementation ready for dependency injection
用户请求: "创建一个用于存储搜索历史的仓库"
执行流程:
  1. domain/repositories/search_history_repository.py
    中创建Protocol接口
  2. infrastructure/neo4j/search_history_repository.py
    中创建Neo4j实现
  3. 实现ManagedResource用于生命周期管理
  4. 所有操作使用ServiceResult
  5. 添加所需的Cypher查询语句
结果: ✅ 具备Protocol + 实现的仓库,可直接用于依赖注入

Instructions

操作步骤

Step 1: Create Domain Protocol (Interface)

步骤1:创建领域Protocol(接口)

Location:
src/project_watch_mcp/domain/repositories/{name}_repository.py
Pattern:
python
from abc import ABC, abstractmethod
from project_watch_mcp.domain.common import ServiceResult

class {Name}Repository(ABC):
    """Port for {purpose} storage and retrieval.

    This interface defines the contract for {operations}.
    Concrete implementations will be provided in the infrastructure layer.
    """

    @abstractmethod
    async def {operation}(self, param: Type) -> ServiceResult[ReturnType]:
        """Brief description of operation.

        Args:
            param: Description

        Returns:
            ServiceResult[ReturnType]: Success with data or Failure on errors
        """
        pass
Key Requirements:
  • Inherit from
    ABC
  • Use
    @abstractmethod
    decorator
  • Return
    ServiceResult[T]
    for all operations
  • Document expected behavior in docstrings
  • No implementation details (pure interface)
位置:
src/project_watch_mcp/domain/repositories/{name}_repository.py
模式:
python
from abc import ABC, abstractmethod
from project_watch_mcp.domain.common import ServiceResult

class {Name}Repository(ABC):
    """用于{purpose}存储与检索的端口。

    此接口定义了{operations}的契约。
    具体实现将在基础设施层提供。
    """

    @abstractmethod
    async def {operation}(self, param: Type) -> ServiceResult[ReturnType]:
        """操作的简要描述。

        参数:
            param: 参数描述

        返回:
            ServiceResult[ReturnType]: 成功时返回数据,失败时返回错误信息
        """
        pass
关键要求:
  • 继承自
    ABC
  • 使用
    @abstractmethod
    装饰器
  • 所有操作返回
    ServiceResult[T]
  • 在文档字符串中说明预期行为
  • 不包含任何实现细节(纯接口)

Step 2: Create Infrastructure Implementation

步骤2:创建基础设施层实现

Location:
src/project_watch_mcp/infrastructure/neo4j/{name}_repository.py
Pattern:
python
from neo4j import AsyncDriver, RoutingControl
from project_watch_mcp.config.settings import Settings
from project_watch_mcp.domain.common import ServiceResult
from project_watch_mcp.domain.repositories.{name}_repository import {Name}Repository
from project_watch_mcp.domain.services.resource_manager import ManagedResource

class Neo4j{Name}Repository({Name}Repository, ManagedResource):
    """Neo4j adapter implementing {Name}Repository interface."""

    def __init__(self, driver: AsyncDriver, settings: Settings):
        if not driver:
            raise ValueError("Neo4j driver is required")
        if not settings:
            raise ValueError("Settings is required")

        self.driver = driver
        self.settings = settings
        self.database = settings.neo4j.database_name

    async def _execute_with_retry(
        self,
        query: str,
        parameters: dict[str, Any] | None = None,
        routing: RoutingControl = RoutingControl.WRITE,
    ) -> ServiceResult[list[dict]]:
        """Execute query with parameter validation and retry logic."""
        # Validate before executing
        validation_result = validate_and_build_query(query, parameters, strict=True)
        if validation_result.is_failure:
            return ServiceResult.fail(f"Validation failed: {validation_result.error}")

        validated_query = validation_result.data

        try:
            records, _, _ = await self.driver.execute_query(
                cast(LiteralString, validated_query.query),
                validated_query.parameters,
                database_=self.database,
                routing_=routing,
            )
            return ServiceResult.ok([dict(record) for record in records])
        except Neo4jError as e:
            return ServiceResult.fail(f"Database error: {str(e)}")

    async def close(self) -> None:
        """Close and cleanup resources (ManagedResource protocol)."""
        # Repository-specific cleanup if needed
        pass
Key Requirements:
  • Implement Protocol interface
  • Inherit from
    ManagedResource
  • Required constructor params:
    driver: AsyncDriver, settings: Settings
  • Validate parameters in constructor (
    if not driver: raise ValueError
    )
  • Use
    _execute_with_retry()
    for all database operations
  • Implement
    close()
    for resource cleanup
  • All operations return
    ServiceResult[T]
位置:
src/project_watch_mcp/infrastructure/neo4j/{name}_repository.py
模式:
python
from neo4j import AsyncDriver, RoutingControl
from project_watch_mcp.config.settings import Settings
from project_watch_mcp.domain.common import ServiceResult
from project_watch_mcp.domain.repositories.{name}_repository import {Name}Repository
from project_watch_mcp.domain.services.resource_manager import ManagedResource

class Neo4j{Name}Repository({Name}Repository, ManagedResource):
    """实现{Name}Repository接口的Neo4j适配器。"""

    def __init__(self, driver: AsyncDriver, settings: Settings):
        if not driver:
            raise ValueError("Neo4j driver是必填项")
        if not settings:
            raise ValueError("Settings是必填项")

        self.driver = driver
        self.settings = settings
        self.database = settings.neo4j.database_name

    async def _execute_with_retry(
        self,
        query: str,
        parameters: dict[str, Any] | None = None,
        routing: RoutingControl = RoutingControl.WRITE,
    ) -> ServiceResult[list[dict]]:
        """带参数验证与重试逻辑的查询执行方法。"""
        # 执行前验证
        validation_result = validate_and_build_query(query, parameters, strict=True)
        if validation_result.is_failure:
            return ServiceResult.fail(f"验证失败: {validation_result.error}")

        validated_query = validation_result.data

        try:
            records, _, _ = await self.driver.execute_query(
                cast(LiteralString, validated_query.query),
                validated_query.parameters,
                database_=self.database,
                routing_=routing,
            )
            return ServiceResult.ok([dict(record) for record in records])
        except Neo4jError as e:
            return ServiceResult.fail(f"数据库错误: {str(e)}")

    async def close(self) -> None:
        """关闭并清理资源(遵循ManagedResource协议)。"""
        # 如有需要,添加仓库特定的清理逻辑
        pass
关键要求:
  • 实现Protocol接口
  • 继承自
    ManagedResource
  • 构造函数必填参数:
    driver: AsyncDriver, settings: Settings
  • 在构造函数中验证参数(
    if not driver: raise ValueError
  • 所有数据库操作使用
    _execute_with_retry()
  • 实现
    close()
    用于资源清理
  • 所有操作返回
    ServiceResult[T]

Step 3: Add Cypher Queries

步骤3:添加Cypher查询语句

Location:
src/project_watch_mcp/infrastructure/neo4j/queries.py
Pattern:
python
class CypherQueries:
    # Existing queries...

    # {Name}Repository Queries
    GET_{ENTITY} = """
    MATCH (e:{Label} {project_name: $project_name, id: $id})
    RETURN e
    """

    SAVE_{ENTITY} = """
    MERGE (e:{Label} {project_name: $project_name, id: $id})
    SET e += $properties
    SET e.updated_at = datetime()
    RETURN e
    """
Key Requirements:
  • Group queries by repository
  • Use parameterized queries (prevent injection)
  • Use MERGE for upsert operations
  • Include timestamp management
  • Document query purpose
See: references/query-patterns.md for common patterns
位置:
src/project_watch_mcp/infrastructure/neo4j/queries.py
模式:
python
class CypherQueries:
    # 已有的查询语句...

    # {Name}Repository 查询语句
    GET_{ENTITY} = """
    MATCH (e:{Label} {project_name: $project_name, id: $id})
    RETURN e
    """

    SAVE_{ENTITY} = """
    MERGE (e:{Label} {project_name: $project_name, id: $id})
    SET e += $properties
    SET e.updated_at = datetime()
    RETURN e
    """
关键要求:
  • 按仓库分组查询语句
  • 使用参数化查询(防止注入攻击)
  • 对于插入更新操作使用MERGE
  • 包含时间戳管理
  • 说明查询语句的用途
参考: references/query-patterns.md 中的通用模式

Step 4: Register in Container

步骤4:在容器中注册

Location:
src/project_watch_mcp/infrastructure/container.py
Pattern:
python
async def {name}_repository(self) -> {Name}Repository:
    """Provide {Name}Repository implementation."""
    driver = await self.neo4j_driver()
    settings = await self.settings()
    return Neo4j{Name}Repository(driver, settings)
Key Requirements:
  • Return type is Protocol (not implementation)
  • Inject dependencies (driver, settings)
  • Use async/await for resource initialization
  • Follow naming convention:
    {name}_repository()
位置:
src/project_watch_mcp/infrastructure/container.py
模式:
python
async def {name}_repository(self) -> {Name}Repository:
    """提供{Name}Repository的实现。"""
    driver = await self.neo4j_driver()
    settings = await self.settings()
    return Neo4j{Name}Repository(driver, settings)
关键要求:
  • 返回类型为Protocol(而非具体实现)
  • 注入依赖项(driver, settings)
  • 使用async/await进行资源初始化
  • 遵循命名规范:
    {name}_repository()

Step 5: Create Tests

步骤5:创建测试用例

Unit Tests:
tests/unit/infrastructure/neo4j/test_{name}_repository.py
Integration Tests:
tests/integration/infrastructure/neo4j/test_{name}_repository.py
Pattern:
python
@pytest.mark.asyncio
async def test_save_{entity}_success(mock_driver, settings):
    """Test successful {entity} save operation."""
    # Arrange
    repo = Neo4j{Name}Repository(mock_driver, settings)
    mock_driver.execute_query.return_value = (
        [{"e": {"id": "test", "name": "Test"}}],
        None,
        None,
    )

    # Act
    result = await repo.save_{entity}(entity_data)

    # Assert
    assert result.is_success
    assert result.data is not None
Key Requirements:
  • Test both success and failure cases
  • Mock driver.execute_query for unit tests
  • Test parameter validation
  • Test ServiceResult.ok() and ServiceResult.fail() paths
  • Integration tests use real Neo4j instance
单元测试:
tests/unit/infrastructure/neo4j/test_{name}_repository.py
集成测试:
tests/integration/infrastructure/neo4j/test_{name}_repository.py
模式:
python
@pytest.mark.asyncio
async def test_save_{entity}_success(mock_driver, settings):
    """测试{entity}保存操作的成功场景。"""
    # 准备
    repo = Neo4j{Name}Repository(mock_driver, settings)
    mock_driver.execute_query.return_value = (
        [{"e": {"id": "test", "name": "Test"}}],
        None,
        None,
    )

    # 执行
    result = await repo.save_{entity}(entity_data)

    # 断言
    assert result.is_success
    assert result.data is not None
关键要求:
  • 测试成功与失败两种场景
  • 单元测试中mock driver.execute_query
  • 测试参数验证逻辑
  • 测试ServiceResult.ok()和ServiceResult.fail()分支
  • 集成测试使用真实的Neo4j实例

Examples

示例

Example 1: Simple CRUD Repository

示例1:简单CRUD仓库

Protocol:
python
class SearchHistoryRepository(ABC):
    @abstractmethod
    async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
        pass

    @abstractmethod
    async def get_recent_queries(self, user_id: str, limit: int) -> ServiceResult[list[str]]:
        pass
Implementation:
python
class Neo4jSearchHistoryRepository(SearchHistoryRepository, ManagedResource):
    async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
        cypher = """
        CREATE (q:SearchQuery {query: $query, user_id: $user_id, timestamp: datetime()})
        """
        result = await self._execute_with_retry(cypher, {"query": query, "user_id": user_id})
        return ServiceResult.ok(None) if result.is_success else result
Protocol:
python
class SearchHistoryRepository(ABC):
    @abstractmethod
    async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
        pass

    @abstractmethod
    async def get_recent_queries(self, user_id: str, limit: int) -> ServiceResult[list[str]]:
        pass
实现:
python
class Neo4jSearchHistoryRepository(SearchHistoryRepository, ManagedResource):
    async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
        cypher = """
        CREATE (q:SearchQuery {query: $query, user_id: $user_id, timestamp: datetime()})
        """
        result = await self._execute_with_retry(cypher, {"query": query, "user_id": user_id})
        return ServiceResult.ok(None) if result.is_success else result

Example 2: Repository with Complex Domain Model

示例2:包含复杂领域模型的仓库

For advanced patterns, see references/pattern-guide.md:
  • Converting Neo4j records to domain models
  • Handling nested relationships
  • Batch operations
  • Transaction management
进阶模式请参考:references/pattern-guide.md
  • 将Neo4j记录转换为领域模型
  • 处理嵌套关系
  • 批量操作
  • 事务管理

Example 3: Repository with Pagination

示例3:带分页的仓库

For pagination patterns, see references/pattern-guide.md:
  • Cursor-based pagination
  • Offset-based pagination
  • Performance considerations
分页模式请参考:references/pattern-guide.md
  • 基于游标分页
  • 基于偏移量分页
  • 性能考量

Requirements

依赖要求

Dependencies:
  • neo4j>=5.0.0
    - Async driver
  • project_watch_mcp.domain.common
    - ServiceResult
  • project_watch_mcp.domain.services.resource_manager
    - ManagedResource
  • project_watch_mcp.config.settings
    - Settings injection
Project Structure:
src/project_watch_mcp/
├── domain/
│   └── repositories/
│       └── {name}_repository.py     # Protocol (ABC)
├── infrastructure/
│   └── neo4j/
│       ├── {name}_repository.py     # Implementation
│       └── queries.py               # Cypher queries
└── tests/
    ├── unit/infrastructure/neo4j/
    │   └── test_{name}_repository.py
    └── integration/infrastructure/neo4j/
        └── test_{name}_repository.py
依赖项:
  • neo4j>=5.0.0
    - 异步驱动
  • project_watch_mcp.domain.common
    - ServiceResult
  • project_watch_mcp.domain.services.resource_manager
    - ManagedResource
  • project_watch_mcp.config.settings
    - Settings注入
项目结构:
src/project_watch_mcp/
├── domain/
│   └── repositories/
│       └── {name}_repository.py     # Protocol (ABC)
├── infrastructure/
│   └── neo4j/
│       ├── {name}_repository.py     # 实现
│       └── queries.py               # Cypher查询语句
└── tests/
    ├── unit/infrastructure/neo4j/
    │   └── test_{name}_repository.py
    └── integration/infrastructure/neo4j/
        └── test_{name}_repository.py

Common Patterns

通用模式

Pattern 1: Query Parameter Validation

模式1:查询参数验证

Always validate query parameters before execution:
python
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
    return ServiceResult.fail(f"Validation failed: {validation_result.error}")
执行前务必验证查询参数:
python
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
    return ServiceResult.fail(f"验证失败: {validation_result.error}")

Pattern 2: ServiceResult Propagation

模式2:ServiceResult传播

Chain ServiceResult operations:
python
result = await self._execute_with_retry(query, params)
if result.is_failure:
    return ServiceResult.fail(f"Failed to save: {result.error}")
链式调用ServiceResult操作:
python
result = await self._execute_with_retry(query, params)
if result.is_failure:
    return ServiceResult.fail(f"保存失败: {result.error}")

Transform data and return success

转换数据并返回成功结果

return ServiceResult.ok(transformed_data)
undefined
return ServiceResult.ok(transformed_data)
undefined

Pattern 3: Resource Lifecycle

模式3:资源生命周期

Implement ManagedResource for proper cleanup:
python
async def close(self) -> None:
    """Cleanup repository-specific resources."""
    # Driver is managed externally by container
    # Only cleanup repository-specific resources here
    logger.debug(f"{self.__class__.__name__} cleanup complete")
See: references/pattern-guide.md for complete pattern catalog
实现ManagedResource以保证正确的清理:
python
async def close(self) -> None:
    """清理仓库特定资源。"""
    # Driver由容器外部管理
    # 仅在此处清理仓库特定的资源
    logger.debug(f"{self.__class__.__name__} 清理完成")
参考: references/pattern-guide.md 中的完整模式目录

Red Flags - STOP

警示信号 - 立即停止

If you see any of these, investigate immediately:
  1. ❌ Protocol in infrastructure layer → Must be in domain
  2. ❌ Return
    None
    on error → Use
    ServiceResult.fail()
  3. ❌ Optional
    settings
    parameter → Must be required
  4. ❌ Direct driver usage → Use
    _execute_with_retry()
  5. ❌ Missing parameter validation → Validate in constructor
  6. ❌ Raw Cypher in methods → Use
    CypherQueries
    class
  7. ❌ Synchronous methods → All methods must be
    async
  8. ❌ Missing
    ManagedResource
    → Required for lifecycle
  9. ❌ Return domain models from Neo4j layer → Convert in repository
  10. ❌ Missing tests → Must have unit + integration tests
如果出现以下情况,请立即排查:
  1. ❌ Protocol位于基础设施层 → 必须放在领域层
  2. ❌ 错误时返回
    None
    → 应使用
    ServiceResult.fail()
  3. settings
    为可选参数 → 必须设为必填
  4. ❌ 直接使用driver → 应使用
    _execute_with_retry()
  5. ❌ 缺少参数验证 → 需在构造函数中验证
  6. ❌ 方法中使用原始Cypher语句 → 应使用
    CypherQueries
  7. ❌ 同步方法 → 所有方法必须为
    async
  8. ❌ 未实现
    ManagedResource
    → 生命周期管理是必填项
  9. ❌ Neo4j层返回领域模型 → 应在仓库中转换
  10. ❌ 缺少测试用例 → 必须包含单元 + 集成测试

Success Checklist

完成检查清单

Before marking repository complete:
  • Protocol exists in
    domain/repositories/
  • Implementation exists in
    infrastructure/neo4j/
  • Constructor validates
    driver
    and
    settings
  • All methods return
    ServiceResult[T]
  • Implements
    ManagedResource
    with
    close()
  • Uses
    _execute_with_retry()
    for all operations
  • Queries defined in
    CypherQueries
  • Registered in container
  • Unit tests passing (mocked driver)
  • Integration tests passing (real Neo4j)
  • Quality gates pass (
    ./scripts/check_all.sh
    )
  • Documentation updated (if new pattern)
标记仓库完成前,请确认以下项:
  • Protocol存在于
    domain/repositories/
    目录
  • 实现存在于
    infrastructure/neo4j/
    目录
  • 构造函数验证了
    driver
    settings
  • 所有方法返回
    ServiceResult[T]
  • 实现了
    ManagedResource
    并包含
    close()
    方法
  • 所有操作使用
    _execute_with_retry()
  • 查询语句定义在
    CypherQueries
  • 已在容器中注册
  • 单元测试通过(使用mock driver)
  • 集成测试通过(使用真实Neo4j)
  • 质量门禁通过(
    ./scripts/check_all.sh
  • 文档已更新(若涉及新模式)

See Also

相关链接

  • templates/protocol-template.py - Repository protocol skeleton
  • templates/implementation-template.py - Neo4j implementation skeleton
  • templates/test-template.py - Test suite skeleton
  • references/pattern-guide.md - Complete pattern catalog
  • references/troubleshooting.md - Common issues and solutions
Related Skills:
  • implement-dependency-injection
    - For container registration
  • validate-layer-boundaries
    - For architecture compliance
  • run-quality-gates
    - For validation before commit
Last Updated: 2025-10-18
  • templates/protocol-template.py - 仓库Protocol骨架
  • templates/implementation-template.py - Neo4j实现骨架
  • templates/test-template.py - 测试套件骨架
  • references/pattern-guide.md - 完整模式目录
  • references/troubleshooting.md - 常见问题与解决方案
相关技能:
  • implement-dependency-injection
    - 用于容器注册
  • validate-layer-boundaries
    - 用于架构合规性检查
  • run-quality-gates
    - 用于提交前的验证
最后更新: 2025-10-18