fastapi-clean-architecture

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

FastAPI Clean Architecture Skill

FastAPI整洁架构技能

Overview

概述

This skill guides you in building FastAPI applications following Clean Architecture principles, based on production patterns from enterprise financial systems. It emphasizes proper layer separation, dependency injection, repository patterns, and comprehensive testing strategies.
本技能将指导你遵循整洁架构(Clean Architecture)原则构建FastAPI应用,这些原则源自企业级金融系统的生产实践模式。它强调合理的分层、依赖注入、仓库模式以及全面的测试策略。

When to Use This Skill

适用场景

  • Starting a new FastAPI project requiring clean architecture
  • Refactoring existing FastAPI code for better maintainability
  • Implementing domain-driven design with FastAPI
  • Building testable, scalable backend services
  • Integrating multiple data sources (PostgreSQL, Redis, external APIs)
  • 启动需要采用整洁架构的全新FastAPI项目
  • 重构现有FastAPI代码以提升可维护性
  • 基于FastAPI实现领域驱动设计
  • 构建可测试、可扩展的后端服务
  • 集成多种数据源(PostgreSQL、Redis、外部API)

Core Architecture Principles

核心架构原则

Three-Layer Architecture

三层架构

src/
├── api/              # API Layer (Controllers, Routes, DTOs)
├── domain/           # Domain Layer (Business Logic, Entities, Services)
└── infra/            # Infrastructure Layer (Database, External Services)
Layer Responsibilities:
  1. API Layer (
    src/api/
    )
    • HTTP endpoints and routing
    • Request/Response DTOs (Pydantic models)
    • Input validation
    • Authentication/Authorization middleware
    • Error handling and HTTP status codes
  2. Domain Layer (
    src/domain/
    )
    • Business entities and value objects
    • Service interfaces (abstract classes)
    • Business rules and validation
    • Domain exceptions
    • Pure business logic (framework-agnostic)
  3. Infrastructure Layer (
    src/infra/
    )
    • Database repositories (SQLAlchemy)
    • External API adapters
    • Cache implementations (Redis)
    • File storage adapters
    • Concrete service implementations
src/
├── api/              # API Layer (Controllers, Routes, DTOs)
├── domain/           # Domain Layer (Business Logic, Entities, Services)
└── infra/            # Infrastructure Layer (Database, External Services)
各层职责:
  1. API层 (
    src/api/
    )
    • HTTP端点与路由
    • 请求/响应DTO(Pydantic模型)
    • 输入验证
    • 认证/授权中间件
    • 错误处理与HTTP状态码
  2. 领域层 (
    src/domain/
    )
    • 业务实体与值对象
    • 服务接口(抽象类)
    • 业务规则与验证
    • 领域异常
    • 纯业务逻辑(与框架无关)
  3. 基础设施层 (
    src/infra/
    )
    • 数据库仓库(SQLAlchemy实现)
    • 外部API适配器
    • 缓存实现(Redis)
    • 文件存储适配器
    • 具体服务实现

Dependency Flow

依赖流向

API Layer → Domain Layer ← Infrastructure Layer
Critical Rules:
  • API depends on Domain (imports domain services/entities)
  • Infrastructure implements Domain interfaces
  • Domain NEVER imports from API or Infrastructure
  • Use dependency injection to wire everything together
API Layer → Domain Layer ← Infrastructure Layer
关键规则:
  • API层依赖领域层(导入领域服务/实体)
  • 基础设施层实现领域层接口
  • 领域层绝不能从API层或基础设施层导入内容
  • 使用依赖注入将所有组件连接起来

Project Structure Template

项目结构模板

project-name/
├── src/
│   ├── api/
│   │   ├── path/              # Route modules
│   │   │   ├── users.py
│   │   │   ├── auth.py
│   │   │   └── financial.py
│   │   ├── middlewares/       # Custom middleware
│   │   │   ├── auth.py
│   │   │   └── error_handler.py
│   │   └── schemas/           # Request/Response DTOs
│   │       ├── user.py
│   │       └── financial.py
│   ├── domain/
│   │   ├── modules/           # Business modules
│   │   │   ├── user/
│   │   │   │   ├── entity.py       # User entity
│   │   │   │   ├── service.py      # IUserService (abstract)
│   │   │   │   ├── repository.py   # IUserRepository (abstract)
│   │   │   │   └── exceptions.py   # Domain exceptions
│   │   │   └── financial/
│   │   │       ├── entity.py
│   │   │       ├── service.py
│   │   │       └── value_objects.py
│   │   └── shared/            # Shared domain logic
│   │       ├── base_entity.py
│   │       └── exceptions.py
│   ├── infra/
│   │   ├── database/
│   │   │   ├── alchemist/     # SQLAlchemy implementations
│   │   │   │   ├── modules/
│   │   │   │   │   ├── user/
│   │   │   │   │   │   └── repository.py  # UserRepository
│   │   │   │   │   └── financial/
│   │   │   │   │       └── repository.py
│   │   │   │   └── session.py
│   │   │   └── migrations/    # Database migrations
│   │   ├── adapters/          # External service adapters
│   │   │   ├── auth/
│   │   │   │   └── sso_adapter.py
│   │   │   └── payment/
│   │   │       └── payment_gateway.py
│   │   ├── cache/             # Redis implementations
│   │   │   └── redis_cache.py
│   │   └── services/          # Service implementations
│   │       ├── user_service.py
│   │       └── financial_service.py
│   ├── config/
│   │   ├── settings.py        # Environment configuration
│   │   ├── dependency.py      # DI Container
│   │   └── database.py        # DB configuration
│   └── main.py                # FastAPI app entry point
├── tests/
│   ├── api/                   # API integration tests
│   ├── domain/                # Domain unit tests
│   ├── infra/                 # Infrastructure tests
│   └── conftest.py            # Pytest fixtures
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
└── README.md
project-name/
├── src/
│   ├── api/
│   │   ├── path/              # Route modules
│   │   │   ├── users.py
│   │   │   ├── auth.py
│   │   │   └── financial.py
│   │   ├── middlewares/       # Custom middleware
│   │   │   ├── auth.py
│   │   │   └── error_handler.py
│   │   └── schemas/           # Request/Response DTOs
│   │       ├── user.py
│   │       └── financial.py
│   ├── domain/
│   │   ├── modules/           # Business modules
│   │   │   ├── user/
│   │   │   │   ├── entity.py       # User entity
│   │   │   │   ├── service.py      # IUserService (abstract)
│   │   │   │   ├── repository.py   # IUserRepository (abstract)
│   │   │   │   └── exceptions.py   # Domain exceptions
│   │   │   └── financial/
│   │   │       ├── entity.py
│   │   │       ├── service.py
│   │   │       └── value_objects.py
│   │   └── shared/            # Shared domain logic
│   │       ├── base_entity.py
│   │       └── exceptions.py
│   ├── infra/
│   │   ├── database/
│   │   │   ├── alchemist/     # SQLAlchemy implementations
│   │   │   │   ├── modules/
│   │   │   │   │   ├── user/
│   │   │   │   │   │   └── repository.py  # UserRepository
│   │   │   │   │   └── financial/
│   │   │   │   │       └── repository.py
│   │   │   │   └── session.py
│   │   │   └── migrations/    # Database migrations
│   │   ├── adapters/          # External service adapters
│   │   │   ├── auth/
│   │   │   │   └── sso_adapter.py
│   │   │   └── payment/
│   │   │       └── payment_gateway.py
│   │   ├── cache/             # Redis implementations
│   │   │   └── redis_cache.py
│   │   └── services/          # Service implementations
│   │       ├── user_service.py
│   │       └── financial_service.py
│   ├── config/
│   │   ├── settings.py        # Environment configuration
│   │   ├── dependency.py      # DI Container
│   │   └── database.py        # DB configuration
│   └── main.py                # FastAPI app entry point
├── tests/
│   ├── api/                   # API integration tests
│   ├── domain/                # Domain unit tests
│   ├── infra/                 # Infrastructure tests
│   └── conftest.py            # Pytest fixtures
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
└── README.md

Implementation Patterns

实现模式

1. Domain Entity Pattern

1. 领域实体模式

python
undefined
python
undefined

src/domain/modules/user/entity.py

src/domain/modules/user/entity.py

from dataclasses import dataclass from datetime import datetime
@dataclass class User: """Domain entity representing a user.
Pure business logic with no framework dependencies.
"""
id: int | None
cpf: str
name: str
email: str
created_at: datetime
is_active: bool = True

def deactivate(self) -> None:
    """Business rule: Deactivate user account."""
    if not self.is_active:
        raise UserAlreadyInactiveError(f"User {self.cpf} is already inactive")
    self.is_active = False

def validate_cpf(self) -> bool:
    """Business rule: Validate CPF format."""
    cleaned = ''.join(filter(str.isdigit, self.cpf))
    return len(cleaned) == 11 and not all(c == cleaned[0] for c in cleaned)
undefined
from dataclasses import dataclass from datetime import datetime
@dataclass class User: """Domain entity representing a user.
Pure business logic with no framework dependencies.
"""
id: int | None
cpf: str
name: str
email: str
created_at: datetime
is_active: bool = True

def deactivate(self) -> None:
    """Business rule: Deactivate user account."""
    if not self.is_active:
        raise UserAlreadyInactiveError(f"User {self.cpf} is already inactive")
    self.is_active = False

def validate_cpf(self) -> bool:
    """Business rule: Validate CPF format."""
    cleaned = ''.join(filter(str.isdigit, self.cpf))
    return len(cleaned) == 11 and not all(c == cleaned[0] for c in cleaned)
undefined

2. Repository Interface Pattern

2. 仓库接口模式

python
undefined
python
undefined

src/domain/modules/user/repository.py

src/domain/modules/user/repository.py

from abc import ABC, abstractmethod from typing import List
from src.domain.modules.user.entity import User
class IUserRepository(ABC): """Abstract repository interface in domain layer.
Defines contract without implementation details.
"""

@abstractmethod
async def get_by_id(self, user_id: int) -> User | None:
    """Retrieve user by ID."""
    pass

@abstractmethod
async def get_by_cpf(self, cpf: str) -> User | None:
    """Retrieve user by CPF."""
    pass

@abstractmethod
async def create(self, user: User) -> User:
    """Create new user."""
    pass

@abstractmethod
async def update(self, user: User) -> User:
    """Update existing user."""
    pass

@abstractmethod
async def list_active(self, limit: int = 100) -> List[User]:
    """List all active users."""
    pass
undefined
from abc import ABC, abstractmethod from typing import List
from src.domain.modules.user.entity import User
class IUserRepository(ABC): """Abstract repository interface in domain layer.
Defines contract without implementation details.
"""

@abstractmethod
async def get_by_id(self, user_id: int) -> User | None:
    """Retrieve user by ID."""
    pass

@abstractmethod
async def get_by_cpf(self, cpf: str) -> User | None:
    """Retrieve user by CPF."""
    pass

@abstractmethod
async def create(self, user: User) -> User:
    """Create new user."""
    pass

@abstractmethod
async def update(self, user: User) -> User:
    """Update existing user."""
    pass

@abstractmethod
async def list_active(self, limit: int = 100) -> List[User]:
    """List all active users."""
    pass
undefined

3. Service Interface Pattern

3. 服务接口模式

python
undefined
python
undefined

src/domain/modules/user/service.py

src/domain/modules/user/service.py

from abc import ABC, abstractmethod from typing import List
from src.domain.modules.user.entity import User
class IUserService(ABC): """Abstract service interface defining business operations."""
@abstractmethod
async def register_user(self, cpf: str, name: str, email: str) -> User:
    """Register new user with validation."""
    pass

@abstractmethod
async def deactivate_user(self, cpf: str) -> User:
    """Deactivate user account."""
    pass

@abstractmethod
async def get_user_profile(self, cpf: str) -> User:
    """Get user profile by CPF."""
    pass
undefined
from abc import ABC, abstractmethod from typing import List
from src.domain.modules.user.entity import User
class IUserService(ABC): """Abstract service interface defining business operations."""
@abstractmethod
async def register_user(self, cpf: str, name: str, email: str) -> User:
    """Register new user with validation."""
    pass

@abstractmethod
async def deactivate_user(self, cpf: str) -> User:
    """Deactivate user account."""
    pass

@abstractmethod
async def get_user_profile(self, cpf: str) -> User:
    """Get user profile by CPF."""
    pass
undefined

4. Concrete Repository Implementation

4. 具体仓库实现

python
undefined
python
undefined

src/infra/database/alchemist/modules/user/repository.py

src/infra/database/alchemist/modules/user/repository.py

from typing import List from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession
from src.domain.modules.user.entity import User from src.domain.modules.user.repository import IUserRepository from src.infra.database.alchemist.models import UserModel
class UserRepository(IUserRepository): """SQLAlchemy implementation of user repository.
Handles database operations and entity mapping.
"""

def __init__(self, session: AsyncSession):
    self._session = session

async def get_by_id(self, user_id: int) -> User | None:
    """Retrieve user by ID."""
    result = await self._session.execute(
        select(UserModel).where(UserModel.id == user_id)
    )
    model = result.scalar_one_or_none()
    return self._to_entity(model) if model else None

async def get_by_cpf(self, cpf: str) -> User | None:
    """Retrieve user by CPF."""
    result = await self._session.execute(
        select(UserModel).where(UserModel.cpf == cpf)
    )
    model = result.scalar_one_or_none()
    return self._to_entity(model) if model else None

async def create(self, user: User) -> User:
    """Create new user."""
    model = UserModel(
        cpf=user.cpf,
        name=user.name,
        email=user.email,
        is_active=user.is_active,
    )
    self._session.add(model)
    await self._session.flush()
    await self._session.refresh(model)
    return self._to_entity(model)

async def update(self, user: User) -> User:
    """Update existing user."""
    result = await self._session.execute(
        select(UserModel).where(UserModel.id == user.id)
    )
    model = result.scalar_one()
    model.name = user.name
    model.email = user.email
    model.is_active = user.is_active
    await self._session.flush()
    await self._session.refresh(model)
    return self._to_entity(model)

async def list_active(self, limit: int = 100) -> List[User]:
    """List all active users."""
    result = await self._session.execute(
        select(UserModel)
        .where(UserModel.is_active == True)  # noqa: E712
        .limit(limit)
    )
    models = result.scalars().all()
    return [self._to_entity(model) for model in models]

def _to_entity(self, model: UserModel) -> User:
    """Convert database model to domain entity."""
    return User(
        id=model.id,
        cpf=model.cpf,
        name=model.name,
        email=model.email,
        created_at=model.created_at,
        is_active=model.is_active,
    )
undefined
from typing import List from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession
from src.domain.modules.user.entity import User from src.domain.modules.user.repository import IUserRepository from src.infra.database.alchemist.models import UserModel
class UserRepository(IUserRepository): """SQLAlchemy implementation of user repository.
Handles database operations and entity mapping.
"""

def __init__(self, session: AsyncSession):
    self._session = session

async def get_by_id(self, user_id: int) -> User | None:
    """Retrieve user by ID."""
    result = await self._session.execute(
        select(UserModel).where(UserModel.id == user_id)
    )
    model = result.scalar_one_or_none()
    return self._to_entity(model) if model else None

async def get_by_cpf(self, cpf: str) -> User | None:
    """Retrieve user by CPF."""
    result = await self._session.execute(
        select(UserModel).where(UserModel.cpf == cpf)
    )
    model = result.scalar_one_or_none()
    return self._to_entity(model) if model else None

async def create(self, user: User) -> User:
    """Create new user."""
    model = UserModel(
        cpf=user.cpf,
        name=user.name,
        email=user.email,
        is_active=user.is_active,
    )
    self._session.add(model)
    await self._session.flush()
    await self._session.refresh(model)
    return self._to_entity(model)

async def update(self, user: User) -> User:
    """Update existing user."""
    result = await self._session.execute(
        select(UserModel).where(UserModel.id == user.id)
    )
    model = result.scalar_one()
    model.name = user.name
    model.email = user.email
    model.is_active = user.is_active
    await self._session.flush()
    await self._session.refresh(model)
    return self._to_entity(model)

async def list_active(self, limit: int = 100) -> List[User]:
    """List all active users."""
    result = await self._session.execute(
        select(UserModel)
        .where(UserModel.is_active == True)  # noqa: E712
        .limit(limit)
    )
    models = result.scalars().all()
    return [self._to_entity(model) for model in models]

def _to_entity(self, model: UserModel) -> User:
    """Convert database model to domain entity."""
    return User(
        id=model.id,
        cpf=model.cpf,
        name=model.name,
        email=model.email,
        created_at=model.created_at,
        is_active=model.is_active,
    )
undefined

5. Concrete Service Implementation

5. 具体服务实现

python
undefined
python
undefined

src/infra/services/user_service.py

src/infra/services/user_service.py

from src.domain.modules.user.entity import User from src.domain.modules.user.service import IUserService from src.domain.modules.user.repository import IUserRepository from src.domain.modules.user.exceptions import ( UserAlreadyExistsError, UserNotFoundError, InvalidCPFError, )
class UserService(IUserService): """Concrete implementation of user service.
Orchestrates business logic using repository.
"""

def __init__(self, user_repository: IUserRepository):
    self._repository = user_repository

async def register_user(self, cpf: str, name: str, email: str) -> User:
    """Register new user with validation."""
    # Check if user already exists
    existing_user = await self._repository.get_by_cpf(cpf)
    if existing_user:
        raise UserAlreadyExistsError(f"User with CPF {cpf} already exists")

    # Create and validate entity
    user = User(
        id=None,
        cpf=cpf,
        name=name,
        email=email,
        created_at=datetime.now(),
        is_active=True,
    )

    if not user.validate_cpf():
        raise InvalidCPFError(f"Invalid CPF: {cpf}")

    # Persist
    return await self._repository.create(user)

async def deactivate_user(self, cpf: str) -> User:
    """Deactivate user account."""
    user = await self._repository.get_by_cpf(cpf)
    if not user:
        raise UserNotFoundError(f"User with CPF {cpf} not found")

    user.deactivate()  # Business logic in entity
    return await self._repository.update(user)

async def get_user_profile(self, cpf: str) -> User:
    """Get user profile by CPF."""
    user = await self._repository.get_by_cpf(cpf)
    if not user:
        raise UserNotFoundError(f"User with CPF {cpf} not found")
    return user
undefined
from src.domain.modules.user.entity import User from src.domain.modules.user.service import IUserService from src.domain.modules.user.repository import IUserRepository from src.domain.modules.user.exceptions import ( UserAlreadyExistsError, UserNotFoundError, InvalidCPFError, )
class UserService(IUserService): """Concrete implementation of user service.
Orchestrates business logic using repository.
"""

def __init__(self, user_repository: IUserRepository):
    self._repository = user_repository

async def register_user(self, cpf: str, name: str, email: str) -> User:
    """Register new user with validation."""
    # Check if user already exists
    existing_user = await self._repository.get_by_cpf(cpf)
    if existing_user:
        raise UserAlreadyExistsError(f"User with CPF {cpf} already exists")

    # Create and validate entity
    user = User(
        id=None,
        cpf=cpf,
        name=name,
        email=email,
        created_at=datetime.now(),
        is_active=True,
    )

    if not user.validate_cpf():
        raise InvalidCPFError(f"Invalid CPF: {cpf}")

    # Persist
    return await self._repository.create(user)

async def deactivate_user(self, cpf: str) -> User:
    """Deactivate user account."""
    user = await self._repository.get_by_cpf(cpf)
    if not user:
        raise UserNotFoundError(f"User with CPF {cpf} not found")

    user.deactivate()  # Business logic in entity
    return await self._repository.update(user)

async def get_user_profile(self, cpf: str) -> User:
    """Get user profile by CPF."""
    user = await self._repository.get_by_cpf(cpf)
    if not user:
        raise UserNotFoundError(f"User with CPF {cpf} not found")
    return user
undefined

6. Dependency Injection Container

6. 依赖注入容器

python
undefined
python
undefined

src/config/dependency.py

src/config/dependency.py

from dependency_injector import containers, providers from dependency_injector.wiring import Provide, inject
from src.infra.database.alchemist.session import get_session from src.infra.database.alchemist.modules.user.repository import UserRepository from src.infra.services.user_service import UserService
class Container(containers.DeclarativeContainer): """Dependency injection container.
Wires together all dependencies.
"""

wiring_config = containers.WiringConfiguration(
    modules=[
        "src.api.path.users",
        "src.api.path.auth",
    ]
)

# Database
db_session = providers.Resource(get_session)

# Repositories
user_repository = providers.Factory(
    UserRepository,
    session=db_session,
)

# Services
user_service = providers.Factory(
    UserService,
    user_repository=user_repository,
)
undefined
from dependency_injector import containers, providers from dependency_injector.wiring import Provide, inject
from src.infra.database.alchemist.session import get_session from src.infra.database.alchemist.modules.user.repository import UserRepository from src.infra.services.user_service import UserService
class Container(containers.DeclarativeContainer): """Dependency injection container.
Wires together all dependencies.
"""

wiring_config = containers.WiringConfiguration(
    modules=[
        "src.api.path.users",
        "src.api.path.auth",
    ]
)

# Database
db_session = providers.Resource(get_session)

# Repositories
user_repository = providers.Factory(
    UserRepository,
    session=db_session,
)

# Services
user_service = providers.Factory(
    UserService,
    user_repository=user_repository,
)
undefined

7. API Endpoint Implementation

7. API端点实现

python
undefined
python
undefined

src/api/path/users.py

src/api/path/users.py

from fastapi import APIRouter, Depends, status from dependency_injector.wiring import inject, Provide
from src.api.schemas.user import UserCreateRequest, UserResponse from src.domain.modules.user.service import IUserService from src.domain.modules.user.exceptions import UserAlreadyExistsError, InvalidCPFError from src.config.dependency import Container
router = APIRouter(prefix="/v1/users", tags=["users"])
@router.post( "/", response_model=UserResponse, status_code=status.HTTP_201_CREATED, ) @inject async def create_user( request: UserCreateRequest, user_service: IUserService = Depends(Provide[Container.user_service]), ): """Create new user endpoint.
Delegates to service layer for business logic.
"""
try:
    user = await user_service.register_user(
        cpf=request.cpf,
        name=request.name,
        email=request.email,
    )
    return UserResponse.from_entity(user)
except UserAlreadyExistsError as e:
    raise HTTPException(status_code=409, detail=str(e))
except InvalidCPFError as e:
    raise HTTPException(status_code=400, detail=str(e))
@router.get("/{cpf}", response_model=UserResponse) @inject async def get_user( cpf: str, user_service: IUserService = Depends(Provide[Container.user_service]), ): """Get user by CPF.""" user = await user_service.get_user_profile(cpf) return UserResponse.from_entity(user)
undefined
from fastapi import APIRouter, Depends, status from dependency_injector.wiring import inject, Provide
from src.api.schemas.user import UserCreateRequest, UserResponse from src.domain.modules.user.service import IUserService from src.domain.modules.user.exceptions import UserAlreadyExistsError, InvalidCPFError from src.config.dependency import Container
router = APIRouter(prefix="/v1/users", tags=["users"])
@router.post( "/", response_model=UserResponse, status_code=status.HTTP_201_CREATED, ) @inject async def create_user( request: UserCreateRequest, user_service: IUserService = Depends(Provide[Container.user_service]), ): """Create new user endpoint.
Delegates to service layer for business logic.
"""
try:
    user = await user_service.register_user(
        cpf=request.cpf,
        name=request.name,
        email=request.email,
    )
    return UserResponse.from_entity(user)
except UserAlreadyExistsError as e:
    raise HTTPException(status_code=409, detail=str(e))
except InvalidCPFError as e:
    raise HTTPException(status_code=400, detail=str(e))
@router.get("/{cpf}", response_model=UserResponse) @inject async def get_user( cpf: str, user_service: IUserService = Depends(Provide[Container.user_service]), ): """Get user by CPF.""" user = await user_service.get_user_profile(cpf) return UserResponse.from_entity(user)
undefined

8. Pydantic DTOs (Request/Response)

8. Pydantic DTO(请求/响应)

python
undefined
python
undefined

src/api/schemas/user.py

src/api/schemas/user.py

from datetime import datetime from pydantic import BaseModel, EmailStr, Field
from src.domain.modules.user.entity import User
class UserCreateRequest(BaseModel): """Request DTO for user creation.""" cpf: str = Field(..., min_length=11, max_length=14) name: str = Field(..., min_length=3, max_length=100) email: EmailStr
class UserResponse(BaseModel): """Response DTO for user data.""" id: int cpf: str name: str email: str created_at: datetime is_active: bool
@classmethod
def from_entity(cls, user: User) -> "UserResponse":
    """Convert domain entity to DTO."""
    return cls(
        id=user.id,
        cpf=user.cpf,
        name=user.name,
        email=user.email,
        created_at=user.created_at,
        is_active=user.is_active,
    )
undefined
from datetime import datetime from pydantic import BaseModel, EmailStr, Field
from src.domain.modules.user.entity import User
class UserCreateRequest(BaseModel): """Request DTO for user creation.""" cpf: str = Field(..., min_length=11, max_length=14) name: str = Field(..., min_length=3, max_length=100) email: EmailStr
class UserResponse(BaseModel): """Response DTO for user data.""" id: int cpf: str name: str email: str created_at: datetime is_active: bool
@classmethod
def from_entity(cls, user: User) -> "UserResponse":
    """Convert domain entity to DTO."""
    return cls(
        id=user.id,
        cpf=user.cpf,
        name=user.name,
        email=user.email,
        created_at=user.created_at,
        is_active=user.is_active,
    )
undefined

Testing Strategy

测试策略

Unit Tests (Domain Layer)

单元测试(领域层)

python
undefined
python
undefined

tests/domain/modules/user/test_entity.py

tests/domain/modules/user/test_entity.py

import pytest from src.domain.modules.user.entity import User from src.domain.modules.user.exceptions import UserAlreadyInactiveError
def test_user_deactivation(): """Test user deactivation business rule.""" user = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), is_active=True, )
user.deactivate()
assert user.is_active is False
def test_user_already_inactive_raises_error(): """Test deactivating already inactive user raises error.""" user = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), is_active=False, )
with pytest.raises(UserAlreadyInactiveError):
    user.deactivate()
def test_cpf_validation(): """Test CPF validation logic.""" user = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), )
assert user.validate_cpf() is True

user.cpf = "11111111111"  # All same digits
assert user.validate_cpf() is False
undefined
import pytest from src.domain.modules.user.entity import User from src.domain.modules.user.exceptions import UserAlreadyInactiveError
def test_user_deactivation(): """Test user deactivation business rule.""" user = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), is_active=True, )
user.deactivate()
assert user.is_active is False
def test_user_already_inactive_raises_error(): """Test deactivating already inactive user raises error.""" user = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), is_active=False, )
with pytest.raises(UserAlreadyInactiveError):
    user.deactivate()
def test_cpf_validation(): """Test CPF validation logic.""" user = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), )
assert user.validate_cpf() is True

user.cpf = "11111111111"  # All same digits
assert user.validate_cpf() is False
undefined

Service Tests (Infrastructure Layer)

服务测试(基础设施层)

python
undefined
python
undefined

tests/infra/services/test_user_service.py

tests/infra/services/test_user_service.py

import pytest from unittest.mock import AsyncMock
from src.infra.services.user_service import UserService from src.domain.modules.user.entity import User from src.domain.modules.user.exceptions import UserAlreadyExistsError
@pytest.fixture def mock_repository(): """Mock user repository.""" return AsyncMock()
@pytest.fixture def user_service(mock_repository): """User service with mocked repository.""" return UserService(user_repository=mock_repository)
@pytest.mark.asyncio async def test_register_user_success(user_service, mock_repository): """Test successful user registration.""" mock_repository.get_by_cpf.return_value = None mock_repository.create.return_value = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), is_active=True, )
user = await user_service.register_user(
    cpf="12345678901",
    name="Test User",
    email="test@example.com",
)

assert user.id == 1
assert user.cpf == "12345678901"
mock_repository.create.assert_called_once()
@pytest.mark.asyncio async def test_register_user_already_exists(user_service, mock_repository): """Test registering existing user raises error.""" mock_repository.get_by_cpf.return_value = User( id=1, cpf="12345678901", name="Existing User", email="existing@example.com", created_at=datetime.now(), is_active=True, )
with pytest.raises(UserAlreadyExistsError):
    await user_service.register_user(
        cpf="12345678901",
        name="Test User",
        email="test@example.com",
    )
undefined
import pytest from unittest.mock import AsyncMock
from src.infra.services.user_service import UserService from src.domain.modules.user.entity import User from src.domain.modules.user.exceptions import UserAlreadyExistsError
@pytest.fixture def mock_repository(): """Mock user repository.""" return AsyncMock()
@pytest.fixture def user_service(mock_repository): """User service with mocked repository.""" return UserService(user_repository=mock_repository)
@pytest.mark.asyncio async def test_register_user_success(user_service, mock_repository): """Test successful user registration.""" mock_repository.get_by_cpf.return_value = None mock_repository.create.return_value = User( id=1, cpf="12345678901", name="Test User", email="test@example.com", created_at=datetime.now(), is_active=True, )
user = await user_service.register_user(
    cpf="12345678901",
    name="Test User",
    email="test@example.com",
)

assert user.id == 1
assert user.cpf == "12345678901"
mock_repository.create.assert_called_once()
@pytest.mark.asyncio async def test_register_user_already_exists(user_service, mock_repository): """Test registering existing user raises error.""" mock_repository.get_by_cpf.return_value = User( id=1, cpf="12345678901", name="Existing User", email="existing@example.com", created_at=datetime.now(), is_active=True, )
with pytest.raises(UserAlreadyExistsError):
    await user_service.register_user(
        cpf="12345678901",
        name="Test User",
        email="test@example.com",
    )
undefined

Integration Tests (API Layer)

集成测试(API层)

python
undefined
python
undefined

tests/api/test_users.py

tests/api/test_users.py

import pytest from httpx import AsyncClient
@pytest.mark.asyncio async def test_create_user_endpoint(client: AsyncClient): """Test user creation endpoint.""" response = await client.post( "/v1/users/", json={ "cpf": "12345678901", "name": "Test User", "email": "test@example.com", }, )
assert response.status_code == 201
data = response.json()
assert data["cpf"] == "12345678901"
assert data["name"] == "Test User"
@pytest.mark.asyncio async def test_create_duplicate_user_returns_409(client: AsyncClient): """Test creating duplicate user returns conflict.""" user_data = { "cpf": "12345678901", "name": "Test User", "email": "test@example.com", }
# Create first user
await client.post("/v1/users/", json=user_data)

# Try to create duplicate
response = await client.post("/v1/users/", json=user_data)
assert response.status_code == 409
undefined
import pytest from httpx import AsyncClient
@pytest.mark.asyncio async def test_create_user_endpoint(client: AsyncClient): """Test user creation endpoint.""" response = await client.post( "/v1/users/", json={ "cpf": "12345678901", "name": "Test User", "email": "test@example.com", }, )
assert response.status_code == 201
data = response.json()
assert data["cpf"] == "12345678901"
assert data["name"] == "Test User"
@pytest.mark.asyncio async def test_create_duplicate_user_returns_409(client: AsyncClient): """Test creating duplicate user returns conflict.""" user_data = { "cpf": "12345678901", "name": "Test User", "email": "test@example.com", }
# Create first user
await client.post("/v1/users/", json=user_data)

# Try to create duplicate
response = await client.post("/v1/users/", json=user_data)
assert response.status_code == 409
undefined

Database Session Management

数据库会话管理

python
undefined
python
undefined

src/infra/database/alchemist/session.py

src/infra/database/alchemist/session.py

from contextlib import asynccontextmanager from sqlalchemy.ext.asyncio import ( AsyncSession, create_async_engine, async_sessionmaker, )
from src.config.settings import app_settings
from contextlib import asynccontextmanager from sqlalchemy.ext.asyncio import ( AsyncSession, create_async_engine, async_sessionmaker, )
from src.config.settings import app_settings

Engine configuration

Engine configuration

engine = create_async_engine( app_settings.DATABASE_URL, echo=app_settings.DEBUG, pool_size=10, max_overflow=20, pool_recycle=3600, )
engine = create_async_engine( app_settings.DATABASE_URL, echo=app_settings.DEBUG, pool_size=10, max_overflow=20, pool_recycle=3600, )

Session factory

Session factory

AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, )
@asynccontextmanager async def get_session() -> AsyncSession: """Get database session with automatic cleanup.
Usage with dependency injection:
    session = Depends(get_session)
"""
async with AsyncSessionLocal() as session:
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()
undefined
AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, )
@asynccontextmanager async def get_session() -> AsyncSession: """Get database session with automatic cleanup.
Usage with dependency injection:
    session = Depends(get_session)
"""
async with AsyncSessionLocal() as session:
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()
undefined

Configuration Management

配置管理

python
undefined
python
undefined

src/config/settings.py

src/config/settings.py

from pydantic_settings import BaseSettings, SettingsConfigDict
class AppSettings(BaseSettings): """Application configuration from environment variables."""
model_config = SettingsConfigDict(
    env_file=".env",
    env_file_encoding="utf-8",
    case_sensitive=True,
)

# Application
APP_NAME: str = "FastAPI Clean Architecture"
DEBUG: bool = False
API_VERSION: str = "v1"

# Database
DATABASE_URL: str
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 20

# Redis
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_TTL: int = 3600

# Security
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "RS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60

# CORS
CORS_ORIGINS: list[str] = ["*"]
CORS_CREDENTIALS: bool = True
app_settings = AppSettings()
undefined
from pydantic_settings import BaseSettings, SettingsConfigDict
class AppSettings(BaseSettings): """Application configuration from environment variables."""
model_config = SettingsConfigDict(
    env_file=".env",
    env_file_encoding="utf-8",
    case_sensitive=True,
)

# Application
APP_NAME: str = "FastAPI Clean Architecture"
DEBUG: bool = False
API_VERSION: str = "v1"

# Database
DATABASE_URL: str
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 20

# Redis
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_TTL: int = 3600

# Security
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = "RS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60

# CORS
CORS_ORIGINS: list[str] = ["*"]
CORS_CREDENTIALS: bool = True
app_settings = AppSettings()
undefined

Main Application Setup

主应用设置

python
undefined
python
undefined

src/main.py

src/main.py

from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware
from src.config.settings import app_settings from src.config.dependency import Container from src.api.path import users, auth
def create_app() -> FastAPI: """Create and configure FastAPI application."""
# Initialize DI container
container = Container()

# Create app
app = FastAPI(
    title=app_settings.APP_NAME,
    version=app_settings.API_VERSION,
    debug=app_settings.DEBUG,
)

# Wire container
app.container = container

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=app_settings.CORS_ORIGINS,
    allow_credentials=app_settings.CORS_CREDENTIALS,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Include routers
app.include_router(users.router)
app.include_router(auth.router)

return app
app = create_app()
@app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy"}
undefined
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware
from src.config.settings import app_settings from src.config.dependency import Container from src.api.path import users, auth
def create_app() -> FastAPI: """Create and configure FastAPI application."""
# Initialize DI container
container = Container()

# Create app
app = FastAPI(
    title=app_settings.APP_NAME,
    version=app_settings.API_VERSION,
    debug=app_settings.DEBUG,
)

# Wire container
app.container = container

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=app_settings.CORS_ORIGINS,
    allow_credentials=app_settings.CORS_CREDENTIALS,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Include routers
app.include_router(users.router)
app.include_router(auth.router)

return app
app = create_app()
@app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy"}
undefined

Best Practices Checklist

最佳实践检查清单

Architecture

架构

  • ✅ Domain layer has NO dependencies on API or Infrastructure
  • ✅ All database logic encapsulated in repositories
  • ✅ Business logic lives in domain entities and services
  • ✅ Infrastructure implements domain interfaces
  • ✅ API layer only handles HTTP concerns
  • ✅ 领域层不依赖API层或基础设施层
  • ✅ 所有数据库逻辑封装在仓库中
  • ✅ 业务逻辑存在于领域实体与服务中
  • ✅ 基础设施层实现领域层接口
  • ✅ API层仅处理HTTP相关事宜

Dependency Injection

依赖注入

  • ✅ Use
    dependency-injector
    for IoC container
  • ✅ Wire all dependencies through container
  • ✅ Inject interfaces, not concrete implementations
  • ✅ Use
    @inject
    decorator on endpoints
  • ✅ Configure wiring for all API modules
  • ✅ 使用
    dependency-injector
    作为控制反转容器
  • ✅ 通过容器连接所有依赖
  • ✅ 注入接口而非具体实现
  • ✅ 在端点上使用
    @inject
    装饰器
  • ✅ 为所有API模块配置依赖注入

Database

数据库

  • ✅ Use SQLAlchemy 2.0 async patterns
  • ✅ Separate ORM models from domain entities
  • ✅ Implement mapper methods (
    _to_entity
    )
  • ✅ Use connection pooling
  • ✅ Handle transactions properly (commit/rollback)
  • ✅ 使用SQLAlchemy 2.0异步模式
  • ✅ 将ORM模型与领域实体分离
  • ✅ 实现映射方法(
    _to_entity
  • ✅ 使用连接池
  • ✅ 正确处理事务(提交/回滚)

Testing

测试

  • ✅ 100% coverage for domain layer (pure logic)
  • ✅ Mock repositories in service tests
  • ✅ Use AsyncMock for async methods
  • ✅ Integration tests for endpoints
  • ✅ Separate test database for integration tests
  • ✅ 领域层(纯逻辑)覆盖率达到100%
  • ✅ 在服务测试中模拟仓库
  • ✅ 为异步方法使用AsyncMock
  • ✅ 为端点编写集成测试
  • ✅ 为集成测试使用独立的测试数据库

Naming Conventions

命名规范

  • ✅ Entities: PascalCase (
    User
    ,
    Order
    )
  • ✅ Services:
    I{Name}Service
    (interface),
    {Name}Service
    (implementation)
  • ✅ Repositories:
    I{Name}Repository
    (interface),
    {Name}Repository
    (implementation)
  • ✅ DTOs:
    {Name}Request
    ,
    {Name}Response
  • ✅ Use ptBR names for database columns if applicable
  • ✅ 实体:大驼峰命名(
    User
    Order
  • ✅ 服务:
    I{Name}Service
    (接口),
    {Name}Service
    (实现)
  • ✅ 仓库:
    I{Name}Repository
    (接口),
    {Name}Repository
    (实现)
  • ✅ DTO:
    {Name}Request
    {Name}Response
  • ✅ 如适用,数据库列使用葡萄牙语(巴西)命名

Error Handling

错误处理

  • ✅ Domain exceptions for business rule violations
  • ✅ HTTP exceptions at API layer only
  • ✅ Proper status codes (400, 404, 409, 500)
  • ✅ Meaningful error messages
  • ✅ Global exception handler
  • ✅ 使用领域异常处理业务规则违反情况
  • ✅ 仅在API层使用HTTP异常
  • ✅ 使用正确的状态码(400、404、409、500)
  • ✅ 提供有意义的错误信息
  • ✅ 全局异常处理器

Common Pitfalls to Avoid

需避免的常见陷阱

  1. Importing Infrastructure in Domain
    • ❌ Never import SQLAlchemy models in domain layer
    • ✅ Use mapper functions to convert between layers
  2. Business Logic in API Layer
    • ❌ Never put validation or business rules in endpoints
    • ✅ Move all logic to services or entities
  3. Tight Coupling
    • ❌ Don't instantiate dependencies directly
    • ✅ Use dependency injection everywhere
  4. Anemic Entities
    • ❌ Don't use entities as plain data containers
    • ✅ Put behavior and validation in entities
  5. Repository Leakage
    • ❌ Don't expose SQLAlchemy queries outside repositories
    • ✅ Return domain entities only
  6. Improper Transaction Management
    • ❌ Don't commit/rollback in repositories
    • ✅ Manage transactions at service or endpoint level
  1. 在领域层中导入基础设施层内容
    • ❌ 绝不要在领域层中导入SQLAlchemy模型
    • ✅ 使用映射函数在各层之间转换
  2. 在API层中编写业务逻辑
    • ❌ 绝不要在端点中放置验证或业务规则
    • ✅ 将所有逻辑移至服务或实体中
  3. 紧耦合
    • ❌ 不要直接实例化依赖
    • ✅ 全程使用依赖注入
  4. 贫血实体
    • ❌ 不要将实体用作纯数据容器
    • ✅ 将行为与验证逻辑放入实体中
  5. 仓库泄漏
    • ❌ 不要在仓库外暴露SQLAlchemy查询
    • ✅ 仅返回领域实体
  6. 事务管理不当
    • ❌ 不要在仓库中执行提交/回滚
    • ✅ 在服务或端点层管理事务

Migration Guide (Legacy → Clean Architecture)

迁移指南(遗留系统 → 整洁架构)

Step 1: Create Domain Layer

步骤1:创建领域层

  1. Extract business entities from database models
  2. Define repository interfaces
  3. Define service interfaces
  4. Move business logic to entities/services
  1. 从数据库模型中提取业务实体
  2. 定义仓库接口
  3. 定义服务接口
  4. 将业务逻辑移至实体/服务中

Step 2: Create Infrastructure Layer

步骤2:创建基础设施层

  1. Implement repositories with SQLAlchemy
  2. Create service implementations
  3. Keep database models separate from entities
  1. 使用SQLAlchemy实现仓库
  2. 创建服务实现
  3. 将数据库模型与实体分离

Step 3: Refactor API Layer

步骤3:重构API层

  1. Create request/response DTOs
  2. Update endpoints to use services
  3. Remove direct database access
  4. Add dependency injection
  1. 创建请求/响应DTO
  2. 更新端点以使用服务
  3. 移除直接数据库访问
  4. 添加依赖注入

Step 4: Testing

步骤4:测试

  1. Write unit tests for domain layer
  2. Add service tests with mocked repositories
  3. Create integration tests for endpoints
  4. Ensure 100% coverage
  1. 为领域层编写单元测试
  2. 添加带有模拟仓库的服务测试
  3. 为端点创建集成测试
  4. 确保100%的覆盖率

References

参考资料

Production Examples

生产实践示例

This skill is based on patterns from:
  • GEFIN Backend: Financial management system with 595+ tests
  • Clean Architecture: Domain-driven design principles
  • Enterprise Best Practices: Scalability, maintainability, testability
本技能基于以下项目的模式:
  • GEFIN后端: 拥有595+测试用例的财务管理系统
  • 整洁架构: 领域驱动设计原则
  • 企业级最佳实践: 可扩展性、可维护性、可测试性