Loading...
Loading...
Python backend implementation patterns for FastAPI applications with SQLAlchemy 2.0, Pydantic v2, and async patterns. Use during the implementation phase when creating or modifying FastAPI endpoints, Pydantic models, SQLAlchemy models, service layers, or repository classes. Covers async session management, dependency injection via Depends(), layered error handling, and Alembic migrations. Does NOT cover testing (use pytest-patterns), deployment (use deployment-pipeline), or FastAPI framework mechanics like middleware and WebSockets (use fastapi-patterns).
npx skill4agent add hieutrtr/ai1-skills python-backend-expertDepends()pytest-patternsfastapi-patternsdeployment-pipelineapi-design-patternssystem-architectureapp/
├── main.py # FastAPI application factory
├── core/
│ ├── config.py # pydantic-settings configuration
│ ├── database.py # Async engine, session factory
│ └── security.py # Password hashing, JWT utilities
├── models/ # SQLAlchemy ORM models
│ ├── __init__.py
│ ├── base.py # Declarative base
│ └── user.py
├── schemas/ # Pydantic v2 schemas
│ ├── __init__.py
│ └── user.py
├── repositories/ # Data access layer
│ ├── __init__.py
│ └── user_repo.py
├── services/ # Business logic layer
│ ├── __init__.py
│ └── user_service.py
├── routes/ # FastAPI routers
│ ├── __init__.py
│ └── users.py
├── dependencies/ # Reusable Depends() providers
│ ├── __init__.py
│ └── auth.py
└── exceptions.py # Domain exception classesrouter = APIRouter(prefix="/users", tags=["Users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
data: UserCreate,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.create_user(data)
return UserResponse.model_validate(user)
except ConflictError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.get_user(user_id)
return UserResponse.model_validate(user)
except NotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))HTTPExceptionresponse_modelstatus.HTTP_*Depends()from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.user import User
class UserRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self._session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self._session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def list_with_posts(
self, *, offset: int = 0, limit: int = 20
) -> list[User]:
result = await self._session.execute(
select(User)
.options(selectinload(User.posts))
.offset(offset)
.limit(limit)
)
return list(result.scalars().all())
async def create(self, user: User) -> User:
self._session.add(user)
await self._session.flush()
await self._session.refresh(user)
return user
async def update(self, user: User, **kwargs: object) -> User:
for key, value in kwargs.items():
setattr(user, key, value)
await self._session.flush()
await self._session.refresh(user)
return user
async def delete(self, user: User) -> None:
await self._session.delete(user)
await self._session.flush()Noneflush()refresh()add()selectinload()HTTPExceptionfrom app.exceptions import ConflictError, NotFoundError
from app.models.user import User
from app.repositories.user_repo import UserRepository
from app.schemas.user import UserCreate, UserPatch
from app.core.security import hash_password
class UserService:
def __init__(self, session: AsyncSession) -> None:
self.repo = UserRepository(session)
async def create_user(self, data: UserCreate) -> User:
# Business rule: email must be unique
existing = await self.repo.get_by_email(data.email)
if existing:
raise ConflictError(f"Email {data.email} already registered")
# Business logic: hash password before storing
user = User(
email=data.email,
hashed_password=hash_password(data.password),
display_name=data.display_name,
)
return await self.repo.create(user)
async def get_user(self, user_id: int) -> User:
user = await self.repo.get_by_id(user_id)
if user is None:
raise NotFoundError(f"User {user_id} not found")
return user
async def update_user(self, user_id: int, data: UserPatch) -> User:
user = await self.get_user(user_id)
update_fields = data.model_dump(exclude_unset=True)
if "password" in update_fields:
update_fields["hashed_password"] = hash_password(update_fields.pop("password"))
return await self.repo.update(user, **update_fields)NotFoundErrorConflictErrorHTTPExceptionAsyncSessionclass AppError(Exception):
"""Base application error."""
class NotFoundError(AppError):
"""Resource not found."""
class ConflictError(AppError):
"""Resource conflict (duplicate, version mismatch)."""
class ValidationError(AppError):
"""Business rule violation."""
class PermissionError(AppError):
"""Insufficient permissions."""from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError) -> JSONResponse:
return JSONResponse(status_code=404, content={"detail": str(exc), "code": "NOT_FOUND"})
@app.exception_handler(ConflictError)
async def conflict_handler(request: Request, exc: ConflictError) -> JSONResponse:
return JSONResponse(status_code=409, content={"detail": str(exc), "code": "CONFLICT"})from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field
class UserCreate(BaseModel):
"""POST request body — writable fields only, no id/timestamps."""
email: EmailStr
password: str = Field(min_length=8, max_length=128)
display_name: str = Field(min_length=1, max_length=100)
class UserPatch(BaseModel):
"""PATCH request body — all fields Optional."""
email: EmailStr | None = None
password: str | None = Field(default=None, min_length=8, max_length=128)
display_name: str | None = Field(default=None, min_length=1, max_length=100)
class UserResponse(BaseModel):
"""Response body — all fields including id and timestamps."""
model_config = ConfigDict(from_attributes=True)
id: int
email: str
display_name: str
is_active: bool
created_at: datetime
updated_at: datetimeConfigDict(from_attributes=True)class Config: orm_mode = Truemodel_validate()from_orm()model_dump().dict()model_dump(exclude_unset=True)Field()str | NoneOptional[str]from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
async with session.begin():
yield sessionexpire_on_commit=Falsesession.begin()Depends(get_async_session)from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
display_name: Mapped[str] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now()
)
# Relationships — ALWAYS use selectin or joined for async
posts: Mapped[list["Post"]] = relationship(
back_populates="author", lazy="selectin"
)Mapped[type]mapped_column()Column()lazy="selectin"server_defaultcreated_atupdated_at# Generate migration from model changes
alembic revision --autogenerate -m "add_users_table"
# Review the generated migration file before applying
# Apply migration
alembic upgrade head
# Rollback one step
alembic downgrade -1
# Show current revision
alembic current
# Show migration history
alembic history# alembic/env.py
naming_convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}downgrade()"add_users_table""add_email_index_to_users"from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_async_session
from app.services.user_service import UserService
async def get_user_service(
session: AsyncSession = Depends(get_async_session),
) -> UserService:
return UserService(session)
# Chain dependencies for auth
async def get_current_user(
token: str = Depends(oauth2_scheme),
session: AsyncSession = Depends(get_async_session),
) -> User:
user_id = decode_token(token)
service = UserService(session)
return await service.get_user(user_id)
async def require_admin(
user: User = Depends(get_current_user),
) -> User:
if user.role != "admin":
raise HTTPException(status_code=403, detail="Admin required")
return userPOST /usersUserCreateUserService.create_user(data)Depends()UserRepository.get_by_email()UserUserRepository.create(user)UserResponsemodel_validate()ConflictError409 Conflicttry/exceptflush()refresh()session.add()expire_on_commit=Falseasync def background_job():
async with async_session_factory() as session:
async with session.begin():
# do workselectinload()lazy="selectin"session.execute(insert(User).values(list_of_dicts))begin()@computed_fieldreferences/pydantic-v2-migration.mdreferences/sqlalchemy-patterns.md