fastapi-clean-architecture
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFastAPI Clean Architecture Skill
FastAPI整洁架构技能
Overview
概述
This skill guides you in building FastAPI applications following Clean Architecture principles, based on production patterns from enterprise financial systems. It emphasizes proper layer separation, dependency injection, repository patterns, and comprehensive testing strategies.
本技能将指导你遵循整洁架构(Clean Architecture)原则构建FastAPI应用,这些原则源自企业级金融系统的生产实践模式。它强调合理的分层、依赖注入、仓库模式以及全面的测试策略。
When to Use This Skill
适用场景
- Starting a new FastAPI project requiring clean architecture
- Refactoring existing FastAPI code for better maintainability
- Implementing domain-driven design with FastAPI
- Building testable, scalable backend services
- Integrating multiple data sources (PostgreSQL, Redis, external APIs)
- 启动需要采用整洁架构的全新FastAPI项目
- 重构现有FastAPI代码以提升可维护性
- 基于FastAPI实现领域驱动设计
- 构建可测试、可扩展的后端服务
- 集成多种数据源(PostgreSQL、Redis、外部API)
Core Architecture Principles
核心架构原则
Three-Layer Architecture
三层架构
src/
├── api/ # API Layer (Controllers, Routes, DTOs)
├── domain/ # Domain Layer (Business Logic, Entities, Services)
└── infra/ # Infrastructure Layer (Database, External Services)Layer Responsibilities:
-
API Layer ()
src/api/- HTTP endpoints and routing
- Request/Response DTOs (Pydantic models)
- Input validation
- Authentication/Authorization middleware
- Error handling and HTTP status codes
-
Domain Layer ()
src/domain/- Business entities and value objects
- Service interfaces (abstract classes)
- Business rules and validation
- Domain exceptions
- Pure business logic (framework-agnostic)
-
Infrastructure Layer ()
src/infra/- Database repositories (SQLAlchemy)
- External API adapters
- Cache implementations (Redis)
- File storage adapters
- Concrete service implementations
src/
├── api/ # API Layer (Controllers, Routes, DTOs)
├── domain/ # Domain Layer (Business Logic, Entities, Services)
└── infra/ # Infrastructure Layer (Database, External Services)各层职责:
-
API层 ()
src/api/- HTTP端点与路由
- 请求/响应DTO(Pydantic模型)
- 输入验证
- 认证/授权中间件
- 错误处理与HTTP状态码
-
领域层 ()
src/domain/- 业务实体与值对象
- 服务接口(抽象类)
- 业务规则与验证
- 领域异常
- 纯业务逻辑(与框架无关)
-
基础设施层 ()
src/infra/- 数据库仓库(SQLAlchemy实现)
- 外部API适配器
- 缓存实现(Redis)
- 文件存储适配器
- 具体服务实现
Dependency Flow
依赖流向
API Layer → Domain Layer ← Infrastructure LayerCritical Rules:
- API depends on Domain (imports domain services/entities)
- Infrastructure implements Domain interfaces
- Domain NEVER imports from API or Infrastructure
- Use dependency injection to wire everything together
API Layer → Domain Layer ← Infrastructure Layer关键规则:
- API层依赖领域层(导入领域服务/实体)
- 基础设施层实现领域层接口
- 领域层绝不能从API层或基础设施层导入内容
- 使用依赖注入将所有组件连接起来
Project Structure Template
项目结构模板
project-name/
├── src/
│ ├── api/
│ │ ├── path/ # Route modules
│ │ │ ├── users.py
│ │ │ ├── auth.py
│ │ │ └── financial.py
│ │ ├── middlewares/ # Custom middleware
│ │ │ ├── auth.py
│ │ │ └── error_handler.py
│ │ └── schemas/ # Request/Response DTOs
│ │ ├── user.py
│ │ └── financial.py
│ ├── domain/
│ │ ├── modules/ # Business modules
│ │ │ ├── user/
│ │ │ │ ├── entity.py # User entity
│ │ │ │ ├── service.py # IUserService (abstract)
│ │ │ │ ├── repository.py # IUserRepository (abstract)
│ │ │ │ └── exceptions.py # Domain exceptions
│ │ │ └── financial/
│ │ │ ├── entity.py
│ │ │ ├── service.py
│ │ │ └── value_objects.py
│ │ └── shared/ # Shared domain logic
│ │ ├── base_entity.py
│ │ └── exceptions.py
│ ├── infra/
│ │ ├── database/
│ │ │ ├── alchemist/ # SQLAlchemy implementations
│ │ │ │ ├── modules/
│ │ │ │ │ ├── user/
│ │ │ │ │ │ └── repository.py # UserRepository
│ │ │ │ │ └── financial/
│ │ │ │ │ └── repository.py
│ │ │ │ └── session.py
│ │ │ └── migrations/ # Database migrations
│ │ ├── adapters/ # External service adapters
│ │ │ ├── auth/
│ │ │ │ └── sso_adapter.py
│ │ │ └── payment/
│ │ │ └── payment_gateway.py
│ │ ├── cache/ # Redis implementations
│ │ │ └── redis_cache.py
│ │ └── services/ # Service implementations
│ │ ├── user_service.py
│ │ └── financial_service.py
│ ├── config/
│ │ ├── settings.py # Environment configuration
│ │ ├── dependency.py # DI Container
│ │ └── database.py # DB configuration
│ └── main.py # FastAPI app entry point
├── tests/
│ ├── api/ # API integration tests
│ ├── domain/ # Domain unit tests
│ ├── infra/ # Infrastructure tests
│ └── conftest.py # Pytest fixtures
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
└── README.mdproject-name/
├── src/
│ ├── api/
│ │ ├── path/ # Route modules
│ │ │ ├── users.py
│ │ │ ├── auth.py
│ │ │ └── financial.py
│ │ ├── middlewares/ # Custom middleware
│ │ │ ├── auth.py
│ │ │ └── error_handler.py
│ │ └── schemas/ # Request/Response DTOs
│ │ ├── user.py
│ │ └── financial.py
│ ├── domain/
│ │ ├── modules/ # Business modules
│ │ │ ├── user/
│ │ │ │ ├── entity.py # User entity
│ │ │ │ ├── service.py # IUserService (abstract)
│ │ │ │ ├── repository.py # IUserRepository (abstract)
│ │ │ │ └── exceptions.py # Domain exceptions
│ │ │ └── financial/
│ │ │ ├── entity.py
│ │ │ ├── service.py
│ │ │ └── value_objects.py
│ │ └── shared/ # Shared domain logic
│ │ ├── base_entity.py
│ │ └── exceptions.py
│ ├── infra/
│ │ ├── database/
│ │ │ ├── alchemist/ # SQLAlchemy implementations
│ │ │ │ ├── modules/
│ │ │ │ │ ├── user/
│ │ │ │ │ │ └── repository.py # UserRepository
│ │ │ │ │ └── financial/
│ │ │ │ │ └── repository.py
│ │ │ │ └── session.py
│ │ │ └── migrations/ # Database migrations
│ │ ├── adapters/ # External service adapters
│ │ │ ├── auth/
│ │ │ │ └── sso_adapter.py
│ │ │ └── payment/
│ │ │ └── payment_gateway.py
│ │ ├── cache/ # Redis implementations
│ │ │ └── redis_cache.py
│ │ └── services/ # Service implementations
│ │ ├── user_service.py
│ │ └── financial_service.py
│ ├── config/
│ │ ├── settings.py # Environment configuration
│ │ ├── dependency.py # DI Container
│ │ └── database.py # DB configuration
│ └── main.py # FastAPI app entry point
├── tests/
│ ├── api/ # API integration tests
│ ├── domain/ # Domain unit tests
│ ├── infra/ # Infrastructure tests
│ └── conftest.py # Pytest fixtures
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
└── README.mdImplementation Patterns
实现模式
1. Domain Entity Pattern
1. 领域实体模式
python
undefinedpython
undefinedsrc/domain/modules/user/entity.py
src/domain/modules/user/entity.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
"""Domain entity representing a user.
Pure business logic with no framework dependencies.
"""
id: int | None
cpf: str
name: str
email: str
created_at: datetime
is_active: bool = True
def deactivate(self) -> None:
"""Business rule: Deactivate user account."""
if not self.is_active:
raise UserAlreadyInactiveError(f"User {self.cpf} is already inactive")
self.is_active = False
def validate_cpf(self) -> bool:
"""Business rule: Validate CPF format."""
cleaned = ''.join(filter(str.isdigit, self.cpf))
return len(cleaned) == 11 and not all(c == cleaned[0] for c in cleaned)undefinedfrom dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
"""Domain entity representing a user.
Pure business logic with no framework dependencies.
"""
id: int | None
cpf: str
name: str
email: str
created_at: datetime
is_active: bool = True
def deactivate(self) -> None:
"""Business rule: Deactivate user account."""
if not self.is_active:
raise UserAlreadyInactiveError(f"User {self.cpf} is already inactive")
self.is_active = False
def validate_cpf(self) -> bool:
"""Business rule: Validate CPF format."""
cleaned = ''.join(filter(str.isdigit, self.cpf))
return len(cleaned) == 11 and not all(c == cleaned[0] for c in cleaned)undefined2. Repository Interface Pattern
2. 仓库接口模式
python
undefinedpython
undefinedsrc/domain/modules/user/repository.py
src/domain/modules/user/repository.py
from abc import ABC, abstractmethod
from typing import List
from src.domain.modules.user.entity import User
class IUserRepository(ABC):
"""Abstract repository interface in domain layer.
Defines contract without implementation details.
"""
@abstractmethod
async def get_by_id(self, user_id: int) -> User | None:
"""Retrieve user by ID."""
pass
@abstractmethod
async def get_by_cpf(self, cpf: str) -> User | None:
"""Retrieve user by CPF."""
pass
@abstractmethod
async def create(self, user: User) -> User:
"""Create new user."""
pass
@abstractmethod
async def update(self, user: User) -> User:
"""Update existing user."""
pass
@abstractmethod
async def list_active(self, limit: int = 100) -> List[User]:
"""List all active users."""
passundefinedfrom abc import ABC, abstractmethod
from typing import List
from src.domain.modules.user.entity import User
class IUserRepository(ABC):
"""Abstract repository interface in domain layer.
Defines contract without implementation details.
"""
@abstractmethod
async def get_by_id(self, user_id: int) -> User | None:
"""Retrieve user by ID."""
pass
@abstractmethod
async def get_by_cpf(self, cpf: str) -> User | None:
"""Retrieve user by CPF."""
pass
@abstractmethod
async def create(self, user: User) -> User:
"""Create new user."""
pass
@abstractmethod
async def update(self, user: User) -> User:
"""Update existing user."""
pass
@abstractmethod
async def list_active(self, limit: int = 100) -> List[User]:
"""List all active users."""
passundefined3. Service Interface Pattern
3. 服务接口模式
python
undefinedpython
undefinedsrc/domain/modules/user/service.py
src/domain/modules/user/service.py
from abc import ABC, abstractmethod
from typing import List
from src.domain.modules.user.entity import User
class IUserService(ABC):
"""Abstract service interface defining business operations."""
@abstractmethod
async def register_user(self, cpf: str, name: str, email: str) -> User:
"""Register new user with validation."""
pass
@abstractmethod
async def deactivate_user(self, cpf: str) -> User:
"""Deactivate user account."""
pass
@abstractmethod
async def get_user_profile(self, cpf: str) -> User:
"""Get user profile by CPF."""
passundefinedfrom abc import ABC, abstractmethod
from typing import List
from src.domain.modules.user.entity import User
class IUserService(ABC):
"""Abstract service interface defining business operations."""
@abstractmethod
async def register_user(self, cpf: str, name: str, email: str) -> User:
"""Register new user with validation."""
pass
@abstractmethod
async def deactivate_user(self, cpf: str) -> User:
"""Deactivate user account."""
pass
@abstractmethod
async def get_user_profile(self, cpf: str) -> User:
"""Get user profile by CPF."""
passundefined4. Concrete Repository Implementation
4. 具体仓库实现
python
undefinedpython
undefinedsrc/infra/database/alchemist/modules/user/repository.py
src/infra/database/alchemist/modules/user/repository.py
from typing import List
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from src.domain.modules.user.entity import User
from src.domain.modules.user.repository import IUserRepository
from src.infra.database.alchemist.models import UserModel
class UserRepository(IUserRepository):
"""SQLAlchemy implementation of user repository.
Handles database operations and entity mapping.
"""
def __init__(self, session: AsyncSession):
self._session = session
async def get_by_id(self, user_id: int) -> User | None:
"""Retrieve user by ID."""
result = await self._session.execute(
select(UserModel).where(UserModel.id == user_id)
)
model = result.scalar_one_or_none()
return self._to_entity(model) if model else None
async def get_by_cpf(self, cpf: str) -> User | None:
"""Retrieve user by CPF."""
result = await self._session.execute(
select(UserModel).where(UserModel.cpf == cpf)
)
model = result.scalar_one_or_none()
return self._to_entity(model) if model else None
async def create(self, user: User) -> User:
"""Create new user."""
model = UserModel(
cpf=user.cpf,
name=user.name,
email=user.email,
is_active=user.is_active,
)
self._session.add(model)
await self._session.flush()
await self._session.refresh(model)
return self._to_entity(model)
async def update(self, user: User) -> User:
"""Update existing user."""
result = await self._session.execute(
select(UserModel).where(UserModel.id == user.id)
)
model = result.scalar_one()
model.name = user.name
model.email = user.email
model.is_active = user.is_active
await self._session.flush()
await self._session.refresh(model)
return self._to_entity(model)
async def list_active(self, limit: int = 100) -> List[User]:
"""List all active users."""
result = await self._session.execute(
select(UserModel)
.where(UserModel.is_active == True) # noqa: E712
.limit(limit)
)
models = result.scalars().all()
return [self._to_entity(model) for model in models]
def _to_entity(self, model: UserModel) -> User:
"""Convert database model to domain entity."""
return User(
id=model.id,
cpf=model.cpf,
name=model.name,
email=model.email,
created_at=model.created_at,
is_active=model.is_active,
)undefinedfrom typing import List
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from src.domain.modules.user.entity import User
from src.domain.modules.user.repository import IUserRepository
from src.infra.database.alchemist.models import UserModel
class UserRepository(IUserRepository):
"""SQLAlchemy implementation of user repository.
Handles database operations and entity mapping.
"""
def __init__(self, session: AsyncSession):
self._session = session
async def get_by_id(self, user_id: int) -> User | None:
"""Retrieve user by ID."""
result = await self._session.execute(
select(UserModel).where(UserModel.id == user_id)
)
model = result.scalar_one_or_none()
return self._to_entity(model) if model else None
async def get_by_cpf(self, cpf: str) -> User | None:
"""Retrieve user by CPF."""
result = await self._session.execute(
select(UserModel).where(UserModel.cpf == cpf)
)
model = result.scalar_one_or_none()
return self._to_entity(model) if model else None
async def create(self, user: User) -> User:
"""Create new user."""
model = UserModel(
cpf=user.cpf,
name=user.name,
email=user.email,
is_active=user.is_active,
)
self._session.add(model)
await self._session.flush()
await self._session.refresh(model)
return self._to_entity(model)
async def update(self, user: User) -> User:
"""Update existing user."""
result = await self._session.execute(
select(UserModel).where(UserModel.id == user.id)
)
model = result.scalar_one()
model.name = user.name
model.email = user.email
model.is_active = user.is_active
await self._session.flush()
await self._session.refresh(model)
return self._to_entity(model)
async def list_active(self, limit: int = 100) -> List[User]:
"""List all active users."""
result = await self._session.execute(
select(UserModel)
.where(UserModel.is_active == True) # noqa: E712
.limit(limit)
)
models = result.scalars().all()
return [self._to_entity(model) for model in models]
def _to_entity(self, model: UserModel) -> User:
"""Convert database model to domain entity."""
return User(
id=model.id,
cpf=model.cpf,
name=model.name,
email=model.email,
created_at=model.created_at,
is_active=model.is_active,
)undefined5. Concrete Service Implementation
5. 具体服务实现
python
undefinedpython
undefinedsrc/infra/services/user_service.py
src/infra/services/user_service.py
from src.domain.modules.user.entity import User
from src.domain.modules.user.service import IUserService
from src.domain.modules.user.repository import IUserRepository
from src.domain.modules.user.exceptions import (
UserAlreadyExistsError,
UserNotFoundError,
InvalidCPFError,
)
class UserService(IUserService):
"""Concrete implementation of user service.
Orchestrates business logic using repository.
"""
def __init__(self, user_repository: IUserRepository):
self._repository = user_repository
async def register_user(self, cpf: str, name: str, email: str) -> User:
"""Register new user with validation."""
# Check if user already exists
existing_user = await self._repository.get_by_cpf(cpf)
if existing_user:
raise UserAlreadyExistsError(f"User with CPF {cpf} already exists")
# Create and validate entity
user = User(
id=None,
cpf=cpf,
name=name,
email=email,
created_at=datetime.now(),
is_active=True,
)
if not user.validate_cpf():
raise InvalidCPFError(f"Invalid CPF: {cpf}")
# Persist
return await self._repository.create(user)
async def deactivate_user(self, cpf: str) -> User:
"""Deactivate user account."""
user = await self._repository.get_by_cpf(cpf)
if not user:
raise UserNotFoundError(f"User with CPF {cpf} not found")
user.deactivate() # Business logic in entity
return await self._repository.update(user)
async def get_user_profile(self, cpf: str) -> User:
"""Get user profile by CPF."""
user = await self._repository.get_by_cpf(cpf)
if not user:
raise UserNotFoundError(f"User with CPF {cpf} not found")
return userundefinedfrom src.domain.modules.user.entity import User
from src.domain.modules.user.service import IUserService
from src.domain.modules.user.repository import IUserRepository
from src.domain.modules.user.exceptions import (
UserAlreadyExistsError,
UserNotFoundError,
InvalidCPFError,
)
class UserService(IUserService):
"""Concrete implementation of user service.
Orchestrates business logic using repository.
"""
def __init__(self, user_repository: IUserRepository):
self._repository = user_repository
async def register_user(self, cpf: str, name: str, email: str) -> User:
"""Register new user with validation."""
# Check if user already exists
existing_user = await self._repository.get_by_cpf(cpf)
if existing_user:
raise UserAlreadyExistsError(f"User with CPF {cpf} already exists")
# Create and validate entity
user = User(
id=None,
cpf=cpf,
name=name,
email=email,
created_at=datetime.now(),
is_active=True,
)
if not user.validate_cpf():
raise InvalidCPFError(f"Invalid CPF: {cpf}")
# Persist
return await self._repository.create(user)
async def deactivate_user(self, cpf: str) -> User:
"""Deactivate user account."""
user = await self._repository.get_by_cpf(cpf)
if not user:
raise UserNotFoundError(f"User with CPF {cpf} not found")
user.deactivate() # Business logic in entity
return await self._repository.update(user)
async def get_user_profile(self, cpf: str) -> User:
"""Get user profile by CPF."""
user = await self._repository.get_by_cpf(cpf)
if not user:
raise UserNotFoundError(f"User with CPF {cpf} not found")
return userundefined6. Dependency Injection Container
6. 依赖注入容器
python
undefinedpython
undefinedsrc/config/dependency.py
src/config/dependency.py
from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
from src.infra.database.alchemist.session import get_session
from src.infra.database.alchemist.modules.user.repository import UserRepository
from src.infra.services.user_service import UserService
class Container(containers.DeclarativeContainer):
"""Dependency injection container.
Wires together all dependencies.
"""
wiring_config = containers.WiringConfiguration(
modules=[
"src.api.path.users",
"src.api.path.auth",
]
)
# Database
db_session = providers.Resource(get_session)
# Repositories
user_repository = providers.Factory(
UserRepository,
session=db_session,
)
# Services
user_service = providers.Factory(
UserService,
user_repository=user_repository,
)undefinedfrom dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
from src.infra.database.alchemist.session import get_session
from src.infra.database.alchemist.modules.user.repository import UserRepository
from src.infra.services.user_service import UserService
class Container(containers.DeclarativeContainer):
"""Dependency injection container.
Wires together all dependencies.
"""
wiring_config = containers.WiringConfiguration(
modules=[
"src.api.path.users",
"src.api.path.auth",
]
)
# Database
db_session = providers.Resource(get_session)
# Repositories
user_repository = providers.Factory(
UserRepository,
session=db_session,
)
# Services
user_service = providers.Factory(
UserService,
user_repository=user_repository,
)undefined7. API Endpoint Implementation
7. API端点实现
python
undefinedpython
undefinedsrc/api/path/users.py
src/api/path/users.py
from fastapi import APIRouter, Depends, status
from dependency_injector.wiring import inject, Provide
from src.api.schemas.user import UserCreateRequest, UserResponse
from src.domain.modules.user.service import IUserService
from src.domain.modules.user.exceptions import UserAlreadyExistsError, InvalidCPFError
from src.config.dependency import Container
router = APIRouter(prefix="/v1/users", tags=["users"])
@router.post(
"/",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
)
@inject
async def create_user(
request: UserCreateRequest,
user_service: IUserService = Depends(Provide[Container.user_service]),
):
"""Create new user endpoint.
Delegates to service layer for business logic.
"""
try:
user = await user_service.register_user(
cpf=request.cpf,
name=request.name,
email=request.email,
)
return UserResponse.from_entity(user)
except UserAlreadyExistsError as e:
raise HTTPException(status_code=409, detail=str(e))
except InvalidCPFError as e:
raise HTTPException(status_code=400, detail=str(e))@router.get("/{cpf}", response_model=UserResponse)
@inject
async def get_user(
cpf: str,
user_service: IUserService = Depends(Provide[Container.user_service]),
):
"""Get user by CPF."""
user = await user_service.get_user_profile(cpf)
return UserResponse.from_entity(user)
undefinedfrom fastapi import APIRouter, Depends, status
from dependency_injector.wiring import inject, Provide
from src.api.schemas.user import UserCreateRequest, UserResponse
from src.domain.modules.user.service import IUserService
from src.domain.modules.user.exceptions import UserAlreadyExistsError, InvalidCPFError
from src.config.dependency import Container
router = APIRouter(prefix="/v1/users", tags=["users"])
@router.post(
"/",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
)
@inject
async def create_user(
request: UserCreateRequest,
user_service: IUserService = Depends(Provide[Container.user_service]),
):
"""Create new user endpoint.
Delegates to service layer for business logic.
"""
try:
user = await user_service.register_user(
cpf=request.cpf,
name=request.name,
email=request.email,
)
return UserResponse.from_entity(user)
except UserAlreadyExistsError as e:
raise HTTPException(status_code=409, detail=str(e))
except InvalidCPFError as e:
raise HTTPException(status_code=400, detail=str(e))@router.get("/{cpf}", response_model=UserResponse)
@inject
async def get_user(
cpf: str,
user_service: IUserService = Depends(Provide[Container.user_service]),
):
"""Get user by CPF."""
user = await user_service.get_user_profile(cpf)
return UserResponse.from_entity(user)
undefined8. Pydantic DTOs (Request/Response)
8. Pydantic DTO(请求/响应)
python
undefinedpython
undefinedsrc/api/schemas/user.py
src/api/schemas/user.py
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field
from src.domain.modules.user.entity import User
class UserCreateRequest(BaseModel):
"""Request DTO for user creation."""
cpf: str = Field(..., min_length=11, max_length=14)
name: str = Field(..., min_length=3, max_length=100)
email: EmailStr
class UserResponse(BaseModel):
"""Response DTO for user data."""
id: int
cpf: str
name: str
email: str
created_at: datetime
is_active: bool
@classmethod
def from_entity(cls, user: User) -> "UserResponse":
"""Convert domain entity to DTO."""
return cls(
id=user.id,
cpf=user.cpf,
name=user.name,
email=user.email,
created_at=user.created_at,
is_active=user.is_active,
)undefinedfrom datetime import datetime
from pydantic import BaseModel, EmailStr, Field
from src.domain.modules.user.entity import User
class UserCreateRequest(BaseModel):
"""Request DTO for user creation."""
cpf: str = Field(..., min_length=11, max_length=14)
name: str = Field(..., min_length=3, max_length=100)
email: EmailStr
class UserResponse(BaseModel):
"""Response DTO for user data."""
id: int
cpf: str
name: str
email: str
created_at: datetime
is_active: bool
@classmethod
def from_entity(cls, user: User) -> "UserResponse":
"""Convert domain entity to DTO."""
return cls(
id=user.id,
cpf=user.cpf,
name=user.name,
email=user.email,
created_at=user.created_at,
is_active=user.is_active,
)undefinedTesting Strategy
测试策略
Unit Tests (Domain Layer)
单元测试(领域层)
python
undefinedpython
undefinedtests/domain/modules/user/test_entity.py
tests/domain/modules/user/test_entity.py
import pytest
from src.domain.modules.user.entity import User
from src.domain.modules.user.exceptions import UserAlreadyInactiveError
def test_user_deactivation():
"""Test user deactivation business rule."""
user = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
is_active=True,
)
user.deactivate()
assert user.is_active is Falsedef test_user_already_inactive_raises_error():
"""Test deactivating already inactive user raises error."""
user = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
is_active=False,
)
with pytest.raises(UserAlreadyInactiveError):
user.deactivate()def test_cpf_validation():
"""Test CPF validation logic."""
user = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
)
assert user.validate_cpf() is True
user.cpf = "11111111111" # All same digits
assert user.validate_cpf() is Falseundefinedimport pytest
from src.domain.modules.user.entity import User
from src.domain.modules.user.exceptions import UserAlreadyInactiveError
def test_user_deactivation():
"""Test user deactivation business rule."""
user = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
is_active=True,
)
user.deactivate()
assert user.is_active is Falsedef test_user_already_inactive_raises_error():
"""Test deactivating already inactive user raises error."""
user = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
is_active=False,
)
with pytest.raises(UserAlreadyInactiveError):
user.deactivate()def test_cpf_validation():
"""Test CPF validation logic."""
user = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
)
assert user.validate_cpf() is True
user.cpf = "11111111111" # All same digits
assert user.validate_cpf() is FalseundefinedService Tests (Infrastructure Layer)
服务测试(基础设施层)
python
undefinedpython
undefinedtests/infra/services/test_user_service.py
tests/infra/services/test_user_service.py
import pytest
from unittest.mock import AsyncMock
from src.infra.services.user_service import UserService
from src.domain.modules.user.entity import User
from src.domain.modules.user.exceptions import UserAlreadyExistsError
@pytest.fixture
def mock_repository():
"""Mock user repository."""
return AsyncMock()
@pytest.fixture
def user_service(mock_repository):
"""User service with mocked repository."""
return UserService(user_repository=mock_repository)
@pytest.mark.asyncio
async def test_register_user_success(user_service, mock_repository):
"""Test successful user registration."""
mock_repository.get_by_cpf.return_value = None
mock_repository.create.return_value = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
is_active=True,
)
user = await user_service.register_user(
cpf="12345678901",
name="Test User",
email="test@example.com",
)
assert user.id == 1
assert user.cpf == "12345678901"
mock_repository.create.assert_called_once()@pytest.mark.asyncio
async def test_register_user_already_exists(user_service, mock_repository):
"""Test registering existing user raises error."""
mock_repository.get_by_cpf.return_value = User(
id=1,
cpf="12345678901",
name="Existing User",
email="existing@example.com",
created_at=datetime.now(),
is_active=True,
)
with pytest.raises(UserAlreadyExistsError):
await user_service.register_user(
cpf="12345678901",
name="Test User",
email="test@example.com",
)undefinedimport pytest
from unittest.mock import AsyncMock
from src.infra.services.user_service import UserService
from src.domain.modules.user.entity import User
from src.domain.modules.user.exceptions import UserAlreadyExistsError
@pytest.fixture
def mock_repository():
"""Mock user repository."""
return AsyncMock()
@pytest.fixture
def user_service(mock_repository):
"""User service with mocked repository."""
return UserService(user_repository=mock_repository)
@pytest.mark.asyncio
async def test_register_user_success(user_service, mock_repository):
"""Test successful user registration."""
mock_repository.get_by_cpf.return_value = None
mock_repository.create.return_value = User(
id=1,
cpf="12345678901",
name="Test User",
email="test@example.com",
created_at=datetime.now(),
is_active=True,
)
user = await user_service.register_user(
cpf="12345678901",
name="Test User",
email="test@example.com",
)
assert user.id == 1
assert user.cpf == "12345678901"
mock_repository.create.assert_called_once()@pytest.mark.asyncio
async def test_register_user_already_exists(user_service, mock_repository):
"""Test registering existing user raises error."""
mock_repository.get_by_cpf.return_value = User(
id=1,
cpf="12345678901",
name="Existing User",
email="existing@example.com",
created_at=datetime.now(),
is_active=True,
)
with pytest.raises(UserAlreadyExistsError):
await user_service.register_user(
cpf="12345678901",
name="Test User",
email="test@example.com",
)undefinedIntegration Tests (API Layer)
集成测试(API层)
python
undefinedpython
undefinedtests/api/test_users.py
tests/api/test_users.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_create_user_endpoint(client: AsyncClient):
"""Test user creation endpoint."""
response = await client.post(
"/v1/users/",
json={
"cpf": "12345678901",
"name": "Test User",
"email": "test@example.com",
},
)
assert response.status_code == 201
data = response.json()
assert data["cpf"] == "12345678901"
assert data["name"] == "Test User"@pytest.mark.asyncio
async def test_create_duplicate_user_returns_409(client: AsyncClient):
"""Test creating duplicate user returns conflict."""
user_data = {
"cpf": "12345678901",
"name": "Test User",
"email": "test@example.com",
}
# Create first user
await client.post("/v1/users/", json=user_data)
# Try to create duplicate
response = await client.post("/v1/users/", json=user_data)
assert response.status_code == 409undefinedimport pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_create_user_endpoint(client: AsyncClient):
"""Test user creation endpoint."""
response = await client.post(
"/v1/users/",
json={
"cpf": "12345678901",
"name": "Test User",
"email": "test@example.com",
},
)
assert response.status_code == 201
data = response.json()
assert data["cpf"] == "12345678901"
assert data["name"] == "Test User"@pytest.mark.asyncio
async def test_create_duplicate_user_returns_409(client: AsyncClient):
"""Test creating duplicate user returns conflict."""
user_data = {
"cpf": "12345678901",
"name": "Test User",
"email": "test@example.com",
}
# Create first user
await client.post("/v1/users/", json=user_data)
# Try to create duplicate
response = await client.post("/v1/users/", json=user_data)
assert response.status_code == 409undefinedDatabase Session Management
数据库会话管理
python
undefinedpython
undefinedsrc/infra/database/alchemist/session.py
src/infra/database/alchemist/session.py
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
)
from src.config.settings import app_settings
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
)
from src.config.settings import app_settings
Engine configuration
Engine configuration
engine = create_async_engine(
app_settings.DATABASE_URL,
echo=app_settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_recycle=3600,
)
engine = create_async_engine(
app_settings.DATABASE_URL,
echo=app_settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_recycle=3600,
)
Session factory
Session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
@asynccontextmanager
async def get_session() -> AsyncSession:
"""Get database session with automatic cleanup.
Usage with dependency injection:
session = Depends(get_session)
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()undefinedAsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
@asynccontextmanager
async def get_session() -> AsyncSession:
"""Get database session with automatic cleanup.
Usage with dependency injection:
session = Depends(get_session)
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()undefinedConfiguration Management
配置管理
python
undefinedpython
undefinedsrc/config/settings.py
src/config/settings.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class AppSettings(BaseSettings):
"""Application configuration from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
)
# Application
APP_NAME: str = "FastAPI Clean Architecture"
DEBUG: bool = False
API_VERSION: str = "v1"
# Database
DATABASE_URL: str
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 20
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_TTL: int = 3600
# Security
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "RS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
# CORS
CORS_ORIGINS: list[str] = ["*"]
CORS_CREDENTIALS: bool = Trueapp_settings = AppSettings()
undefinedfrom pydantic_settings import BaseSettings, SettingsConfigDict
class AppSettings(BaseSettings):
"""Application configuration from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
)
# Application
APP_NAME: str = "FastAPI Clean Architecture"
DEBUG: bool = False
API_VERSION: str = "v1"
# Database
DATABASE_URL: str
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 20
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_TTL: int = 3600
# Security
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "RS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
# CORS
CORS_ORIGINS: list[str] = ["*"]
CORS_CREDENTIALS: bool = Trueapp_settings = AppSettings()
undefinedMain Application Setup
主应用设置
python
undefinedpython
undefinedsrc/main.py
src/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.config.settings import app_settings
from src.config.dependency import Container
from src.api.path import users, auth
def create_app() -> FastAPI:
"""Create and configure FastAPI application."""
# Initialize DI container
container = Container()
# Create app
app = FastAPI(
title=app_settings.APP_NAME,
version=app_settings.API_VERSION,
debug=app_settings.DEBUG,
)
# Wire container
app.container = container
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=app_settings.CORS_ORIGINS,
allow_credentials=app_settings.CORS_CREDENTIALS,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(users.router)
app.include_router(auth.router)
return appapp = create_app()
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
undefinedfrom fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.config.settings import app_settings
from src.config.dependency import Container
from src.api.path import users, auth
def create_app() -> FastAPI:
"""Create and configure FastAPI application."""
# Initialize DI container
container = Container()
# Create app
app = FastAPI(
title=app_settings.APP_NAME,
version=app_settings.API_VERSION,
debug=app_settings.DEBUG,
)
# Wire container
app.container = container
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=app_settings.CORS_ORIGINS,
allow_credentials=app_settings.CORS_CREDENTIALS,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(users.router)
app.include_router(auth.router)
return appapp = create_app()
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
undefinedBest Practices Checklist
最佳实践检查清单
Architecture
架构
- ✅ Domain layer has NO dependencies on API or Infrastructure
- ✅ All database logic encapsulated in repositories
- ✅ Business logic lives in domain entities and services
- ✅ Infrastructure implements domain interfaces
- ✅ API layer only handles HTTP concerns
- ✅ 领域层不依赖API层或基础设施层
- ✅ 所有数据库逻辑封装在仓库中
- ✅ 业务逻辑存在于领域实体与服务中
- ✅ 基础设施层实现领域层接口
- ✅ API层仅处理HTTP相关事宜
Dependency Injection
依赖注入
- ✅ Use for IoC container
dependency-injector - ✅ Wire all dependencies through container
- ✅ Inject interfaces, not concrete implementations
- ✅ Use decorator on endpoints
@inject - ✅ Configure wiring for all API modules
- ✅ 使用作为控制反转容器
dependency-injector - ✅ 通过容器连接所有依赖
- ✅ 注入接口而非具体实现
- ✅ 在端点上使用装饰器
@inject - ✅ 为所有API模块配置依赖注入
Database
数据库
- ✅ Use SQLAlchemy 2.0 async patterns
- ✅ Separate ORM models from domain entities
- ✅ Implement mapper methods ()
_to_entity - ✅ Use connection pooling
- ✅ Handle transactions properly (commit/rollback)
- ✅ 使用SQLAlchemy 2.0异步模式
- ✅ 将ORM模型与领域实体分离
- ✅ 实现映射方法()
_to_entity - ✅ 使用连接池
- ✅ 正确处理事务(提交/回滚)
Testing
测试
- ✅ 100% coverage for domain layer (pure logic)
- ✅ Mock repositories in service tests
- ✅ Use AsyncMock for async methods
- ✅ Integration tests for endpoints
- ✅ Separate test database for integration tests
- ✅ 领域层(纯逻辑)覆盖率达到100%
- ✅ 在服务测试中模拟仓库
- ✅ 为异步方法使用AsyncMock
- ✅ 为端点编写集成测试
- ✅ 为集成测试使用独立的测试数据库
Naming Conventions
命名规范
- ✅ Entities: PascalCase (,
User)Order - ✅ Services: (interface),
I{Name}Service(implementation){Name}Service - ✅ Repositories: (interface),
I{Name}Repository(implementation){Name}Repository - ✅ DTOs: ,
{Name}Request{Name}Response - ✅ Use ptBR names for database columns if applicable
- ✅ 实体:大驼峰命名(、
User)Order - ✅ 服务:(接口),
I{Name}Service(实现){Name}Service - ✅ 仓库:(接口),
I{Name}Repository(实现){Name}Repository - ✅ DTO:、
{Name}Request{Name}Response - ✅ 如适用,数据库列使用葡萄牙语(巴西)命名
Error Handling
错误处理
- ✅ Domain exceptions for business rule violations
- ✅ HTTP exceptions at API layer only
- ✅ Proper status codes (400, 404, 409, 500)
- ✅ Meaningful error messages
- ✅ Global exception handler
- ✅ 使用领域异常处理业务规则违反情况
- ✅ 仅在API层使用HTTP异常
- ✅ 使用正确的状态码(400、404、409、500)
- ✅ 提供有意义的错误信息
- ✅ 全局异常处理器
Common Pitfalls to Avoid
需避免的常见陷阱
-
Importing Infrastructure in Domain
- ❌ Never import SQLAlchemy models in domain layer
- ✅ Use mapper functions to convert between layers
-
Business Logic in API Layer
- ❌ Never put validation or business rules in endpoints
- ✅ Move all logic to services or entities
-
Tight Coupling
- ❌ Don't instantiate dependencies directly
- ✅ Use dependency injection everywhere
-
Anemic Entities
- ❌ Don't use entities as plain data containers
- ✅ Put behavior and validation in entities
-
Repository Leakage
- ❌ Don't expose SQLAlchemy queries outside repositories
- ✅ Return domain entities only
-
Improper Transaction Management
- ❌ Don't commit/rollback in repositories
- ✅ Manage transactions at service or endpoint level
-
在领域层中导入基础设施层内容
- ❌ 绝不要在领域层中导入SQLAlchemy模型
- ✅ 使用映射函数在各层之间转换
-
在API层中编写业务逻辑
- ❌ 绝不要在端点中放置验证或业务规则
- ✅ 将所有逻辑移至服务或实体中
-
紧耦合
- ❌ 不要直接实例化依赖
- ✅ 全程使用依赖注入
-
贫血实体
- ❌ 不要将实体用作纯数据容器
- ✅ 将行为与验证逻辑放入实体中
-
仓库泄漏
- ❌ 不要在仓库外暴露SQLAlchemy查询
- ✅ 仅返回领域实体
-
事务管理不当
- ❌ 不要在仓库中执行提交/回滚
- ✅ 在服务或端点层管理事务
Migration Guide (Legacy → Clean Architecture)
迁移指南(遗留系统 → 整洁架构)
Step 1: Create Domain Layer
步骤1:创建领域层
- Extract business entities from database models
- Define repository interfaces
- Define service interfaces
- Move business logic to entities/services
- 从数据库模型中提取业务实体
- 定义仓库接口
- 定义服务接口
- 将业务逻辑移至实体/服务中
Step 2: Create Infrastructure Layer
步骤2:创建基础设施层
- Implement repositories with SQLAlchemy
- Create service implementations
- Keep database models separate from entities
- 使用SQLAlchemy实现仓库
- 创建服务实现
- 将数据库模型与实体分离
Step 3: Refactor API Layer
步骤3:重构API层
- Create request/response DTOs
- Update endpoints to use services
- Remove direct database access
- Add dependency injection
- 创建请求/响应DTO
- 更新端点以使用服务
- 移除直接数据库访问
- 添加依赖注入
Step 4: Testing
步骤4:测试
- Write unit tests for domain layer
- Add service tests with mocked repositories
- Create integration tests for endpoints
- Ensure 100% coverage
- 为领域层编写单元测试
- 添加带有模拟仓库的服务测试
- 为端点创建集成测试
- 确保100%的覆盖率
References
参考资料
Production Examples
生产实践示例
This skill is based on patterns from:
- GEFIN Backend: Financial management system with 595+ tests
- Clean Architecture: Domain-driven design principles
- Enterprise Best Practices: Scalability, maintainability, testability
本技能基于以下项目的模式:
- GEFIN后端: 拥有595+测试用例的财务管理系统
- 整洁架构: 领域驱动设计原则
- 企业级最佳实践: 可扩展性、可维护性、可测试性