frappe-service

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Frappe Service Layer Design

Frappe服务层设计

Create well-structured service layer classes that encapsulate business logic, coordinate between repositories, and provide clean interfaces for controllers and APIs.
创建结构清晰的服务层类,封装业务逻辑,协调各仓库(repository)之间的交互,并为控制器和API提供简洁的接口。

When to Use

适用场景

  • Implementing complex business logic
  • Coordinating operations across multiple DocTypes
  • Creating reusable business operations
  • Separating concerns between controllers and data access
  • Building transaction-aware operations
  • 实现复杂业务逻辑
  • 协调多个DocType之间的操作
  • 创建可复用的业务操作
  • 分离控制器与数据访问的关注点
  • 构建支持事务的操作

Arguments

参数说明

/frappe-service <service_name> [--doctype <doctype>] [--operations <op1,op2>]
Examples:
/frappe-service OrderProcessing --doctype "Sales Order"
/frappe-service InventoryManagement --operations allocate,release,transfer
/frappe-service PaymentGateway
/frappe-service <service_name> [--doctype <doctype>] [--operations <op1,op2>]
示例:
/frappe-service OrderProcessing --doctype "Sales Order"
/frappe-service InventoryManagement --operations allocate,release,transfer
/frappe-service PaymentGateway

Procedure

实施步骤

Step 1: Gather Service Requirements

步骤1:收集服务需求

Ask the user for:
  1. Service Name (PascalCase, e.g.,
    OrderProcessingService
    )
  2. Primary DocType (if applicable)
  3. Key Operations to implement
  4. External Integrations (APIs, payment gateways, etc.)
  5. Transaction Requirements (atomic operations, rollback needs)
向用户确认以下信息:
  1. 服务名称(采用大驼峰命名,例如
    OrderProcessingService
  2. 主DocType(如适用)
  3. 需实现的核心操作
  4. 外部集成(API、支付网关等)
  5. 事务需求(原子操作、回滚要求)

Step 2: Design Service Architecture

步骤2:设计服务架构

Determine the service pattern:
PatternUse CaseExample
CRUD ServiceBasic DocType operations
CustomerService
Workflow ServiceState transitions, approvals
ApprovalService
Integration ServiceExternal API calls
PaymentGatewayService
Orchestration ServiceMulti-DocType coordination
OrderFulfillmentService
Batch ServiceBulk operations
BulkImportService
确定服务模式:
模式适用场景示例
CRUD服务基础DocType操作
CustomerService
工作流服务状态转换、审批流程
ApprovalService
集成服务外部API调用
PaymentGatewayService
编排服务多DocType协调操作
OrderFulfillmentService
批量服务批量操作
BulkImportService

Step 3: Generate Service Class

步骤3:生成服务类

Create
<app>/<module>/services/<service_name>.py
:
python
"""
<Service Name> Service

<Detailed description of what this service handles>

Responsibilities:
    - <Responsibility 1>
    - <Responsibility 2>
    - <Responsibility 3>

Usage:
    from <app>.<module>.services.<service_name> import <ServiceName>Service

    service = <ServiceName>Service()
    result = service.process_order(order_data)
"""

import frappe
from frappe import _
from frappe.utils import now, today, flt, cint, cstr
from typing import TYPE_CHECKING, Optional, Any, Callable
from contextlib import contextmanager
from functools import wraps

from <app>.<module>.services.base import BaseService
from <app>.<module>.repositories.<doctype>_repository import <DocType>Repository

if TYPE_CHECKING:
    from frappe.model.document import Document
创建文件
<app>/<module>/services/<service_name>.py
python
"""
<Service Name> Service

<该服务处理内容的详细描述>

职责:
    - <职责1>
    - <职责2>
    - <职责3>

使用方式:
    from <app>.<module>.services.<service_name> import <ServiceName>Service

    service = <ServiceName>Service()
    result = service.process_order(order_data)
"""

import frappe
from frappe import _
from frappe.utils import now, today, flt, cint, cstr
from typing import TYPE_CHECKING, Optional, Any, Callable
from contextlib import contextmanager
from functools import wraps

from <app>.<module>.services.base import BaseService
from <app>.<module>.repositories.<doctype>_repository import <DocType>Repository

if TYPE_CHECKING:
    from frappe.model.document import Document

──────────────────────────────────────────────────────────────────────────────

──────────────────────────────────────────────────────────────────────────────

Decorators

装饰器

──────────────────────────────────────────────────────────────────────────────

──────────────────────────────────────────────────────────────────────────────

def require_permission(doctype: str, ptype: str = "read"): """Decorator to check permission before method execution.""" def decorator(func: Callable): @wraps(func) def wrapper(self, *args, **kwargs): if not frappe.has_permission(doctype, ptype): frappe.throw( _("Permission denied: {0} {1}").format(ptype, doctype), frappe.PermissionError ) return func(self, *args, **kwargs) return wrapper return decorator
def with_transaction(func: Callable): """Decorator to wrap method in database transaction.""" @wraps(func) def wrapper(self, *args, **kwargs): try: result = func(self, *args, **kwargs) frappe.db.commit() return result except Exception: frappe.db.rollback() raise return wrapper
def log_operation(operation_name: str): """Decorator to log service operation.""" def decorator(func: Callable): @wraps(func) def wrapper(self, *args, **kwargs): frappe.logger().info(f"[{operation_name}] Starting...") try: result = func(self, *args, **kwargs) frappe.logger().info(f"[{operation_name}] Completed successfully") return result except Exception as e: frappe.logger().error(f"[{operation_name}] Failed: {str(e)}") raise return wrapper return decorator
def require_permission(doctype: str, ptype: str = "read"): """用于在方法执行前检查权限的装饰器。""" def decorator(func: Callable): @wraps(func) def wrapper(self, *args, **kwargs): if not frappe.has_permission(doctype, ptype): frappe.throw( _("权限不足:{0} {1}").format(ptype, doctype), frappe.PermissionError ) return func(self, *args, **kwargs) return wrapper return decorator
def with_transaction(func: Callable): """将方法包装在数据库事务中的装饰器。""" @wraps(func) def wrapper(self, *args, **kwargs): try: result = func(self, *args, **kwargs) frappe.db.commit() return result except Exception: frappe.db.rollback() raise return wrapper
def log_operation(operation_name: str): """记录服务操作的装饰器。""" def decorator(func: Callable): @wraps(func) def wrapper(self, *args, **kwargs): frappe.logger().info(f"[{operation_name}] 开始执行...") try: result = func(self, *args, **kwargs) frappe.logger().info(f"[{operation_name}] 执行成功") return result except Exception as e: frappe.logger().error(f"[{operation_name}] 执行失败:{str(e)}") raise return wrapper return decorator

──────────────────────────────────────────────────────────────────────────────

──────────────────────────────────────────────────────────────────────────────

Service Implementation

服务实现

──────────────────────────────────────────────────────────────────────────────

──────────────────────────────────────────────────────────────────────────────

class <ServiceName>Service(BaseService): """ Service for <description>.
This service handles:
    - <Operation 1>
    - <Operation 2>
    - <Operation 3>

Architecture:
    Controller/API → Service → Repository → Database

Example:
    service = <ServiceName>Service()
    order = service.create_order(customer="CUST-001", items=[...])
    service.submit_order(order.name)
"""

def __init__(self, user: Optional[str] = None):
    super().__init__(user)
    self.repo = <DocType>Repository()
    # Initialize other repositories as needed
    # self.item_repo = ItemRepository()
    # self.customer_repo = CustomerRepository()

# ──────────────────────────────────────────────────────────────────────────
# Public Operations (Business Logic)
# ──────────────────────────────────────────────────────────────────────────

@require_permission("<DocType>", "create")
@with_transaction
@log_operation("create_<doctype>")
def create(self, data: dict) -> dict:
    """
    Create a new <DocType>.

    Args:
        data: Document data containing:
            - title (str): Required title
            - date (str): Date in YYYY-MM-DD format
            - description (str): Optional description

    Returns:
        Created document summary

    Raises:
        frappe.ValidationError: If validation fails
        frappe.PermissionError: If user lacks permission

    Example:
        service.create({
            "title": "New Order",
            "date": "2024-01-15"
        })
    """
    # 1. Validate input
    self._validate_create_data(data)

    # 2. Apply business rules
    data = self._apply_defaults(data)
    data = self._apply_business_rules(data)

    # 3. Create via repository
    doc = self.repo.create(data)

    # 4. Post-creation actions
    self._on_create(doc)

    # 5. Return summary
    return doc.get_summary()

@require_permission("<DocType>", "write")
@with_transaction
def update(self, name: str, data: dict) -> dict:
    """
    Update existing <DocType>.

    Args:
        name: Document name
        data: Fields to update

    Returns:
        Updated document summary
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # Validate update is allowed
    self._validate_can_update(doc)

    # Apply update
    doc.update(data)
    doc.save()

    return doc.get_summary()

@require_permission("<DocType>", "submit")
@with_transaction
@log_operation("submit_<doctype>")
def submit(self, name: str) -> dict:
    """
    Submit document for processing.

    This triggers:
        1. Pre-submission validation
        2. Document submission
        3. Post-submission actions (e.g., stock updates, GL entries)

    Args:
        name: Document name

    Returns:
        Submitted document summary

    Raises:
        frappe.ValidationError: If submission requirements not met
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # Pre-submission checks
    self._validate_submission(doc)

    # Submit
    doc.submit()

    # Post-submission processing
    self._on_submit(doc)

    return doc.get_summary()

@require_permission("<DocType>", "cancel")
@with_transaction
@log_operation("cancel_<doctype>")
def cancel(self, name: str, reason: Optional[str] = None) -> dict:
    """
    Cancel submitted document.

    Args:
        name: Document name
        reason: Cancellation reason (recommended)

    Returns:
        Cancelled document summary
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # Validate cancellation
    self._validate_cancellation(doc)

    # Store reason
    if reason:
        doc.db_set("cancellation_reason", reason, update_modified=False)

    # Cancel
    doc.cancel()

    # Post-cancellation processing
    self._on_cancel(doc)

    return doc.get_summary()

# ──────────────────────────────────────────────────────────────────────────
# Complex Business Operations
# ──────────────────────────────────────────────────────────────────────────

@with_transaction
def process_workflow(
    self,
    name: str,
    action: str,
    comment: Optional[str] = None
) -> dict:
    """
    Process workflow action on document.

    Args:
        name: Document name
        action: Workflow action (e.g., "Approve", "Reject")
        comment: Optional comment for the action

    Returns:
        Updated document with new workflow state
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # Validate action is allowed
    allowed_actions = self._get_allowed_workflow_actions(doc)
    if action not in allowed_actions:
        frappe.throw(
            _("Action '{0}' not allowed. Allowed: {1}").format(
                action, ", ".join(allowed_actions)
            )
        )

    # Apply workflow action
    from frappe.model.workflow import apply_workflow
    apply_workflow(doc, action)

    # Add comment
    if comment:
        doc.add_comment("Workflow", f"{action}: {comment}")

    return doc.get_summary()

def calculate_totals(self, name: str) -> dict:
    """
    Calculate and update document totals.

    Args:
        name: Document name

    Returns:
        Calculated totals
    """
    doc = self.repo.get_or_throw(name)

    subtotal = sum(
        flt(item.qty) * flt(item.rate)
        for item in doc.get("items", [])
    )

    tax_amount = flt(subtotal) * flt(doc.tax_rate or 0) / 100
    grand_total = flt(subtotal) + flt(tax_amount)

    return {
        "subtotal": subtotal,
        "tax_amount": tax_amount,
        "grand_total": grand_total
    }

def bulk_operation(
    self,
    names: list[str],
    operation: str,
    **kwargs
) -> dict:
    """
    Perform bulk operation on multiple documents.

    Args:
        names: List of document names
        operation: Operation to perform (update_status, submit, cancel)
        **kwargs: Operation-specific arguments

    Returns:
        Results summary
    """
    results = {"success": [], "failed": []}

    for name in names:
        try:
            if operation == "update_status":
                self.update(name, {"status": kwargs.get("status")})
            elif operation == "submit":
                self.submit(name)
            elif operation == "cancel":
                self.cancel(name, kwargs.get("reason"))

            results["success"].append(name)
        except Exception as e:
            results["failed"].append({
                "name": name,
                "error": str(e)
            })

    return results

# ──────────────────────────────────────────────────────────────────────────
# Query Methods
# ──────────────────────────────────────────────────────────────────────────

def get_pending_items(self, limit: int = 50) -> list[dict]:
    """Get items pending action."""
    return self.repo.get_list(
        filters={"status": "Pending", "docstatus": 0},
        fields=["name", "title", "date", "owner", "creation"],
        order_by="creation asc",
        limit=limit
    )

def get_statistics(self, period: str = "month") -> dict:
    """
    Get statistics for dashboard.

    Args:
        period: Time period (day, week, month, year)

    Returns:
        Statistics dict
    """
    from frappe.utils import add_days, add_months, get_first_day

    today_date = today()

    if period == "day":
        from_date = today_date
    elif period == "week":
        from_date = add_days(today_date, -7)
    elif period == "month":
        from_date = get_first_day(today_date)
    else:  # year
        from_date = add_months(get_first_day(today_date), -12)

    return {
        "total": self.repo.get_count(),
        "period_total": self.repo.get_count(
            {"creation": [">=", from_date]}
        ),
        "by_status": self._get_counts_by_status(),
        "period": period,
        "from_date": from_date
    }

# ──────────────────────────────────────────────────────────────────────────
# Private Methods (Internal Logic)
# ──────────────────────────────────────────────────────────────────────────

def _validate_create_data(self, data: dict) -> None:
    """Validate data for document creation."""
    self.validate_mandatory(data, ["title"])

    # Custom validations
    if data.get("date") and data["date"] < today():
        frappe.throw(_("Date cannot be in the past"))

def _validate_can_update(self, doc: "Document") -> None:
    """Validate document can be updated."""
    if doc.docstatus == 2:
        frappe.throw(_("Cannot update cancelled document"))

    if doc.status == "Completed":
        frappe.throw(_("Cannot update completed document"))

def _validate_submission(self, doc: "Document") -> None:
    """Validate all requirements for submission."""
    if doc.docstatus != 0:
        frappe.throw(_("Document is not in draft state"))

    # Add more validations as needed
    # if not doc.get("items"):
    #     frappe.throw(_("Cannot submit without items"))

def _validate_cancellation(self, doc: "Document") -> None:
    """Validate document can be cancelled."""
    if doc.docstatus != 1:
        frappe.throw(_("Only submitted documents can be cancelled"))

    # Check for linked documents
    # linked = self._get_linked_submitted_docs(doc.name)
    # if linked:
    #     frappe.throw(_("Cannot cancel. Linked documents exist: {0}").format(linked))

def _apply_defaults(self, data: dict) -> dict:
    """Apply default values to data."""
    if not data.get("date"):
        data["date"] = today()

    if not data.get("status"):
        data["status"] = "Draft"

    return data

def _apply_business_rules(self, data: dict) -> dict:
    """Apply business rules to data."""
    # Example: Set posting date to today if not specified
    # Example: Calculate derived fields
    return data

def _on_create(self, doc: "Document") -> None:
    """Post-creation hook for additional processing."""
    # Send notification
    # frappe.publish_realtime("new_document", {"name": doc.name})
    pass

def _on_submit(self, doc: "Document") -> None:
    """Post-submission processing."""
    # Create linked records (GL entries, stock ledger, etc.)
    # Update inventory
    # Send notifications
    pass

def _on_cancel(self, doc: "Document") -> None:
    """Post-cancellation processing."""
    # Reverse linked records
    # Update inventory
    pass

def _get_allowed_workflow_actions(self, doc: "Document") -> list[str]:
    """Get allowed workflow actions for document."""
    from frappe.model.workflow import get_transitions
    return [t.action for t in get_transitions(doc)]

def _get_counts_by_status(self) -> dict:
    """Get document counts grouped by status."""
    result = frappe.db.sql("""
        SELECT status, COUNT(*) as count
        FROM `tab<DocType>`
        WHERE docstatus < 2
        GROUP BY status
    """, as_dict=True)

    return {row.status: row.count for row in result}
class <ServiceName>Service(BaseService): """ 用于<描述>的服务。
该服务处理以下内容:
    - <操作1>
    - <操作2>
    - <操作3>

架构:
    控制器/API → 服务 → 仓库 → 数据库

示例:
    service = <ServiceName>Service()
    order = service.create_order(customer="CUST-001", items=[...])
    service.submit_order(order.name)
"""

def __init__(self, user: Optional[str] = None):
    super().__init__(user)
    self.repo = <DocType>Repository()
    # 根据需要初始化其他仓库
    # self.item_repo = ItemRepository()
    # self.customer_repo = CustomerRepository()

# ──────────────────────────────────────────────────────────────────────────
# 公开操作(业务逻辑)
# ──────────────────────────────────────────────────────────────────────────

@require_permission("<DocType>", "create")
@with_transaction
@log_operation("create_<doctype>")
def create(self, data: dict) -> dict:
    """
    创建新的<DocType>记录。

    参数:
        data: 包含以下内容的文档数据:
            - title (str): 必填标题
            - date (str): 日期格式为YYYY-MM-DD
            - description (str): 可选描述

    返回:
        创建完成的文档摘要

    异常:
        frappe.ValidationError: 验证失败时抛出
        frappe.PermissionError: 用户权限不足时抛出

    示例:
        service.create({
            "title": "新订单",
            "date": "2024-01-15"
        })
    """
    # 1. 验证输入数据
    self._validate_create_data(data)

    # 2. 应用业务规则
    data = self._apply_defaults(data)
    data = self._apply_business_rules(data)

    # 3. 通过仓库创建记录
    doc = self.repo.create(data)

    # 4. 创建完成后的后续操作
    self._on_create(doc)

    # 5. 返回摘要
    return doc.get_summary()

@require_permission("<DocType>", "write")
@with_transaction
def update(self, name: str, data: dict) -> dict:
    """
    更新已有的<DocType>记录。

    参数:
        name: 文档名称
        data: 需要更新的字段

    返回:
        更新完成的文档摘要
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # 验证是否允许更新
    self._validate_can_update(doc)

    # 应用更新
    doc.update(data)
    doc.save()

    return doc.get_summary()

@require_permission("<DocType>", "submit")
@with_transaction
@log_operation("submit_<doctype>")
def submit(self, name: str) -> dict:
    """
    提交文档进行处理。

    此操作会触发:
        1. 提交前验证
        2. 文档提交
        3. 提交后操作(例如库存更新、总账分录)

    参数:
        name: 文档名称

    返回:
        提交完成的文档摘要

    异常:
        frappe.ValidationError: 提交条件不满足时抛出
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # 提交前检查
    self._validate_submission(doc)

    # 提交文档
    doc.submit()

    # 提交后的处理
    self._on_submit(doc)

    return doc.get_summary()

@require_permission("<DocType>", "cancel")
@with_transaction
@log_operation("cancel_<doctype>")
def cancel(self, name: str, reason: Optional[str] = None) -> dict:
    """
    取消已提交的文档。

    参数:
        name: 文档名称
        reason: 取消原因(建议填写)

    返回:
        取消完成的文档摘要
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # 验证是否允许取消
    self._validate_cancellation(doc)

    存储取消原因
    if reason:
        doc.db_set("cancellation_reason", reason, update_modified=False)

    # 取消文档
    doc.cancel()

    # 取消后的处理
    self._on_cancel(doc)

    return doc.get_summary()

# ──────────────────────────────────────────────────────────────────────────
# 复杂业务操作
# ──────────────────────────────────────────────────────────────────────────

@with_transaction
def process_workflow(
    self,
    name: str,
    action: str,
    comment: Optional[str] = None
) -> dict:
    """
    对文档执行工作流操作。

    参数:
        name: 文档名称
        action: 工作流操作(例如"Approve"、"Reject")
        comment: 操作的可选备注

    返回:
        更新后的文档,包含新的工作流状态
    """
    doc = self.repo.get_or_throw(name, for_update=True)

    # 验证操作是否被允许
    allowed_actions = self._get_allowed_workflow_actions(doc)
    if action not in allowed_actions:
        frappe.throw(
            _("操作'{0}'不被允许。允许的操作:{1}").format(
                action, ", ".join(allowed_actions)
            )
        )

    # 执行工作流操作
    from frappe.model.workflow import apply_workflow
    apply_workflow(doc, action)

    # 添加备注
    if comment:
        doc.add_comment("Workflow", f"{action}: {comment}")

    return doc.get_summary()

def calculate_totals(self, name: str) -> dict:
    """
    计算并更新文档的总计金额。

    参数:
        name: 文档名称

    返回:
        计算得到的总计金额
    """
    doc = self.repo.get_or_throw(name)

    subtotal = sum(
        flt(item.qty) * flt(item.rate)
        for item in doc.get("items", [])
    )

    tax_amount = flt(subtotal) * flt(doc.tax_rate or 0) / 100
    grand_total = flt(subtotal) + flt(tax_amount)

    return {
        "subtotal": subtotal,
        "tax_amount": tax_amount,
        "grand_total": grand_total
    }

def bulk_operation(
    self,
    names: list[str],
    operation: str,
    **kwargs
) -> dict:
    """
    对多个文档执行批量操作。

    参数:
        names: 文档名称列表
        operation: 要执行的操作(update_status、submit、cancel)
        **kwargs: 操作的特定参数

    返回:
        操作结果汇总
    """
    results = {"success": [], "failed": []}

    for name in names:
        try:
            if operation == "update_status":
                self.update(name, {"status": kwargs.get("status")})
            elif operation == "submit":
                self.submit(name)
            elif operation == "cancel":
                self.cancel(name, kwargs.get("reason"))

            results["success"].append(name)
        except Exception as e:
            results["failed"].append({
                "name": name,
                "error": str(e)
            })

    return results

# ──────────────────────────────────────────────────────────────────────────
# 查询方法
# ──────────────────────────────────────────────────────────────────────────

def get_pending_items(self, limit: int = 50) -> list[dict]:
    """获取待处理的条目。"""
    return self.repo.get_list(
        filters={"status": "Pending", "docstatus": 0},
        fields=["name", "title", "date", "owner", "creation"],
        order_by="creation asc",
        limit=limit
    )

def get_statistics(self, period: str = "month") -> dict:
    """
    获取用于仪表盘的统计数据。

    参数:
        period: 时间周期(day、week、month、year)

    返回:
        统计数据字典
    """
    from frappe.utils import add_days, add_months, get_first_day

    today_date = today()

    if period == "day":
        from_date = today_date
    elif period == "week":
        from_date = add_days(today_date, -7)
    elif period == "month":
        from_date = get_first_day(today_date)
    else:  # year
        from_date = add_months(get_first_day(today_date), -12)

    return {
        "total": self.repo.get_count(),
        "period_total": self.repo.get_count(
            {"creation": [">=", from_date]}
        ),
        "by_status": self._get_counts_by_status(),
        "period": period,
        "from_date": from_date
    }

# ──────────────────────────────────────────────────────────────────────────
# 私有方法(内部逻辑)
# ──────────────────────────────────────────────────────────────────────────

def _validate_create_data(self, data: dict) -> None:
    """验证文档创建的数据。"""
    self.validate_mandatory(data, ["title"])

    # 自定义验证规则
    if data.get("date") and data["date"] < today():
        frappe.throw(_("日期不能早于今天"))

def _validate_can_update(self, doc: "Document") -> None:
    """验证文档是否可以更新。"""
    if doc.docstatus == 2:
        frappe.throw(_("无法更新已取消的文档"))

    if doc.status == "Completed":
        frappe.throw(_("无法更新已完成的文档"))

def _validate_submission(self, doc: "Document") -> None:
    """验证文档提交的所有条件。"""
    if doc.docstatus != 0:
        frappe.throw(_("文档必须处于草稿状态才能提交"))

    # 根据需要添加更多验证规则
    # if not doc.get("items"):
    #     frappe.throw(_("无条目无法提交"))

def _validate_cancellation(self, doc: "Document") -> None:
    """验证文档是否可以取消。"""
    if doc.docstatus != 1:
        frappe.throw(_("只有已提交的文档才能被取消"))

    # 检查关联文档
    # linked = self._get_linked_submitted_docs(doc.name)
    # if linked:
    #     frappe.throw(_("无法取消,存在关联文档:{0}").format(linked))

def _apply_defaults(self, data: dict) -> dict:
    """为数据设置默认值。"""
    if not data.get("date"):
        data["date"] = today()

    if not data.get("status"):
        data["status"] = "Draft"

    return data

def _apply_business_rules(self, data: dict) -> dict:
    """为数据应用业务规则。"""
    # 示例:如果未指定过账日期,则设置为今天
    # 示例:计算派生字段
    return data

def _on_create(self, doc: "Document") -> None:
    """文档创建完成后的钩子方法,用于后续处理。"""
    # 发送通知
    # frappe.publish_realtime("new_document", {"name": doc.name})
    pass

def _on_submit(self, doc: "Document") -> None:
    """文档提交完成后的处理。"""
    # 创建关联记录(总账分录、库存台账等)
    # 更新库存
    # 发送通知
    pass

def _on_cancel(self, doc: "Document") -> None:
    """文档取消完成后的处理。"""
    # 撤销关联记录
    # 更新库存
    pass

def _get_allowed_workflow_actions(self, doc: "Document") -> list[str]:
    """获取文档允许的工作流操作。"""
    from frappe.model.workflow import get_transitions
    return [t.action for t in get_transitions(doc)]

def _get_counts_by_status(self) -> dict:
    """按状态分组获取文档数量。"""
    result = frappe.db.sql("""
        SELECT status, COUNT(*) as count
        FROM `tab<DocType>`
        WHERE docstatus < 2
        GROUP BY status
    """, as_dict=True)

    return {row.status: row.count for row in result}

──────────────────────────────────────────────────────────────────────────────

──────────────────────────────────────────────────────────────────────────────

Service Factory (for dependency injection)

服务工厂(用于依赖注入)

──────────────────────────────────────────────────────────────────────────────

──────────────────────────────────────────────────────────────────────────────

def get_<service_name>_service(user: Optional[str] = None) -> <ServiceName>Service: """ Factory function for <ServiceName>Service.
Use this instead of direct instantiation for easier testing/mocking.

Args:
    user: Optional user context

Returns:
    Service instance
"""
return <ServiceName>Service(user=user)
undefined
def get_<service_name>_service(user: Optional[str] = None) -> <ServiceName>Service: """ <ServiceName>Service的工厂函数。
为了便于测试和模拟,建议使用此函数而非直接实例化服务。

参数:
    user: 可选的用户上下文

返回:
    服务实例
"""
return <ServiceName>Service(user=user)
undefined

Step 4: Generate Integration Service Pattern (Optional)

步骤4:生成集成服务模式(可选)

For services that integrate with external APIs:
python
"""
External Integration Service

Handles communication with external APIs with retry logic,
error handling, and response normalization.
"""

import frappe
from frappe import _
from typing import Optional, Any
import requests
from tenacity import retry, stop_after_attempt, wait_exponential


class <Integration>Service:
    """
    Service for integrating with <External Service>.

    Configuration:
        - API Key: System Settings > <Integration> API Key
        - Base URL: System Settings > <Integration> Base URL
    """

    def __init__(self):
        self.api_key = frappe.db.get_single_value("System Settings", "<integration>_api_key")
        self.base_url = frappe.db.get_single_value("System Settings", "<integration>_base_url")

        if not self.api_key:
            frappe.throw(_("<Integration> API key not configured"))

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    def _make_request(
        self,
        method: str,
        endpoint: str,
        data: Optional[dict] = None
    ) -> dict:
        """
        Make HTTP request with retry logic.

        Args:
            method: HTTP method (GET, POST, PUT, DELETE)
            endpoint: API endpoint
            data: Request payload

        Returns:
            Response data

        Raises:
            frappe.ValidationError: On API error
        """
        url = f"{self.base_url}/{endpoint}"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        try:
            response = requests.request(
                method=method,
                url=url,
                json=data,
                headers=headers,
                timeout=30
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.Timeout:
            frappe.throw(_("Request timed out. Please try again."))

        except requests.exceptions.HTTPError as e:
            error_msg = self._parse_error_response(e.response)
            frappe.throw(_("API Error: {0}").format(error_msg))

        except requests.exceptions.RequestException as e:
            frappe.throw(_("Connection error: {0}").format(str(e)))

    def _parse_error_response(self, response) -> str:
        """Parse error message from API response."""
        try:
            data = response.json()
            return data.get("message") or data.get("error") or response.text
        except Exception:
            return response.text

    # Public API methods
    def create_external_record(self, data: dict) -> dict:
        """Create record in external system."""
        return self._make_request("POST", "records", data)

    def get_external_record(self, external_id: str) -> dict:
        """Get record from external system."""
        return self._make_request("GET", f"records/{external_id}")

    def sync_records(self) -> dict:
        """Sync records with external system."""
        # Implementation
        pass
针对需要与外部API集成的服务:
python
"""
外部集成服务

处理与外部API的通信,包含重试逻辑、错误处理和响应标准化。
"""

import frappe
from frappe import _
from typing import Optional, Any
import requests
from tenacity import retry, stop_after_attempt, wait_exponential


class <Integration>Service:
    """
    用于与<外部服务>集成的服务。

    配置信息:
        - API密钥:系统设置 > <Integration> API密钥
        - 基础URL:系统设置 > <Integration> 基础URL
    """

    def __init__(self):
        self.api_key = frappe.db.get_single_value("System Settings", "<integration>_api_key")
        self.base_url = frappe.db.get_single_value("System Settings", "<integration>_base_url")

        if not self.api_key:
            frappe.throw(_("<Integration> API密钥未配置"))

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    def _make_request(
        self,
        method: str,
        endpoint: str,
        data: Optional[dict] = None
    ) -> dict:
        """
        发送HTTP请求,包含重试逻辑。

        参数:
            method: HTTP方法(GET、POST、PUT、DELETE)
            endpoint: API端点
            data: 请求负载

        返回:
            响应数据

        异常:
            frappe.ValidationError: API调用出错时抛出
        """
        url = f"{self.base_url}/{endpoint}"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        try:
            response = requests.request(
                method=method,
                url=url,
                json=data,
                headers=headers,
                timeout=30
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.Timeout:
            frappe.throw(_("请求超时,请重试。"))

        except requests.exceptions.HTTPError as e:
            error_msg = self._parse_error_response(e.response)
            frappe.throw(_("API错误:{0}").format(error_msg))

        except requests.exceptions.RequestException as e:
            frappe.throw(_("连接错误:{0}").format(str(e)))

    def _parse_error_response(self, response) -> str:
        """解析API响应中的错误信息。"""
        try:
            data = response.json()
            return data.get("message") or data.get("error") or response.text
        except Exception:
            return response.text

    # 公开API方法
    def create_external_record(self, data: dict) -> dict:
        """在外部系统中创建记录。"""
        return self._make_request("POST", "records", data)

    def get_external_record(self, external_id: str) -> dict:
        """从外部系统获取记录。"""
        return self._make_request("GET", f"records/{external_id}")

    def sync_records(self) -> dict:
        """与外部系统同步记录。"""
        # 实现逻辑
        pass

Step 5: Show Service Design and Confirm

步骤5:展示服务设计并确认

undefined
undefined

Service Layer Preview

服务层预览

Service: <ServiceName>Service Module: <app>.<module>.services.<service_name>
服务: <ServiceName>Service 模块: <app>.<module>.services.<service_name>

Architecture:

架构:

┌─────────────────────┐
│   Controller/API    │
└──────────┬──────────┘
┌─────────────────────┐
│   <ServiceName>     │ ← Business Logic
│      Service        │
└──────────┬──────────┘
┌─────────────────────┐
│   <DocType>         │ ← Data Access
│   Repository        │
└──────────┬──────────┘
┌─────────────────────┐
│     Database        │
└─────────────────────┘
┌─────────────────────┐
│   控制器/API        │
└──────────┬──────────┘
┌─────────────────────┐
│   <ServiceName>     │ ← 业务逻辑
│      服务           │
└──────────┬──────────┘
┌─────────────────────┐
│   <DocType>         │ ← 数据访问
│   仓库              │
└──────────┬──────────┘
┌─────────────────────┐
│     数据库          │
└─────────────────────┘

Operations:

操作列表:

MethodPermissionDescription
create()createCreate new document
update()writeUpdate document
submit()submitSubmit for processing
cancel()cancelCancel document
process_workflow()writeExecute workflow action
get_statistics()readDashboard stats
方法权限描述
create()create创建新文档
update()write更新文档
submit()submit提交文档进行处理
cancel()cancel取消文档
process_workflow()write执行工作流操作
get_statistics()read获取仪表盘统计数据

Features:

特性:

  • ✅ Permission decorators
  • ✅ Transaction management
  • ✅ Operation logging
  • ✅ Validation layer
  • ✅ Business rules separation
  • ✅ Factory function for DI

Create this service?
undefined
  • ✅ 权限装饰器
  • ✅ 事务管理
  • ✅ 操作日志
  • ✅ 验证层
  • ✅ 业务规则分离
  • ✅ 用于依赖注入的工厂函数

是否创建此服务?
undefined

Step 6: Execute and Verify

步骤6:执行并验证

After approval, create service file and run tests.
获得用户确认后,创建服务文件并运行测试。

Output Format

输出格式

undefined
undefined

Service Created

服务已创建

Name: <ServiceName>Service Path: <app>/<module>/services/<service_name>.py
名称: <ServiceName>Service 路径: <app>/<module>/services/<service_name>.py

Features:

特性:

  • ✅ Base service inheritance
  • ✅ Repository integration
  • ✅ Permission checking
  • ✅ Transaction management
  • ✅ Business logic methods
  • ✅ Factory function
  • ✅ 继承基础服务
  • ✅ 集成仓库
  • ✅ 权限检查
  • ✅ 事务管理
  • ✅ 业务逻辑方法
  • ✅ 工厂函数

Usage:

使用方式:

python
from <app>.<module>.services.<service_name> import <ServiceName>Service

service = <ServiceName>Service()
python
from <app>.<module>.services.<service_name> import <ServiceName>Service

service = <ServiceName>Service()

Create

创建记录

result = service.create({"title": "New Record"})
result = service.create({"title": "新记录"})

Submit

提交记录

service.submit(result["name"])
service.submit(result["name"])

Get statistics

获取统计数据

stats = service.get_statistics(period="month")
undefined
stats = service.get_statistics(period="month")
undefined

Rules

规则

  1. Single Responsibility — Each service handles one domain/aggregate
  2. Use Repositories — Services call repositories for data access; repositories handle
    frappe.db
    /
    frappe.get_doc
  3. Transaction Awareness — Frappe auto-commits on success; use
    @with_transaction
    only for explicit rollback needs
  4. Permission Checks — Always check permissions at service boundary
  5. Validation First — Validate before any business logic
  6. Factory Pattern — Use factory function for easier testing/mocking
  7. ALWAYS Confirm — Never create files without explicit user approval
  1. 单一职责 — 每个服务仅处理一个领域/聚合
  2. 使用仓库 — 服务调用仓库进行数据访问;仓库处理
    frappe.db
    /
    frappe.get_doc
    操作
  3. 事务感知 — Frappe会在操作成功时自动提交;仅在需要显式回滚时使用
    @with_transaction
  4. 权限检查 — 始终在服务边界处检查权限
  5. 先验证 — 在执行任何业务逻辑前先进行验证
  6. 工厂模式 — 使用工厂函数便于测试和模拟
  7. 必须确认 — 未经用户明确确认,不得创建任何文件

Security Guidelines

安全指南

  1. SQL Injection Prevention — Use
    frappe.db.sql()
    with parameterized queries:
    python
    # CORRECT: Parameterized
    frappe.db.sql("SELECT name FROM tabUser WHERE email=%s", [email])
    
    # WRONG: String formatting (SQL injection risk)
    frappe.db.sql(f"SELECT name FROM tabUser WHERE email='{email}'")
  2. Avoid eval/exec — Never use
    eval()
    or
    exec()
    with user input. Use
    frappe.safe_eval()
    if code evaluation is absolutely required.
  3. Permission Bypass Awareness
    frappe.db.set_value()
    and
    frappe.get_all()
    bypass permissions. Use only for system operations, never for user-facing code.
  4. Input Sanitization — Validate and sanitize all user inputs. Use type annotations for automatic v15 validation.
  1. 防止SQL注入 — 使用带参数的
    frappe.db.sql()
    查询:
    python
    # 正确写法:参数化查询
    frappe.db.sql("SELECT name FROM tabUser WHERE email=%s", [email])
    
    # 错误写法:字符串拼接(存在SQL注入风险)
    frappe.db.sql(f"SELECT name FROM tabUser WHERE email='{email}'")
  2. 避免使用eval/exec — 切勿对用户输入使用
    eval()
    exec()
    。如果确实需要执行代码,请使用
    frappe.safe_eval()
  3. 注意权限绕过
    frappe.db.set_value()
    frappe.get_all()
    会绕过权限检查。仅在系统操作中使用,切勿用于面向用户的代码。
  4. 输入 sanitization — 验证并清理所有用户输入。使用类型注解实现v15的自动验证。