Loading...
Loading...
Guides the agent through async database integration with SQLAlchemy and Alembic migrations for FastAPI applications. Triggered when users ask to "set up a database", "create database models", "add SQLAlchemy", "create migrations", "run Alembic", "connect to PostgreSQL", "add a database layer", "create CRUD operations", "set up async database", or mention SQLAlchemy, Alembic, ORM, database models, async database, connection pool, or database migrations.
npx skill4agent add ingpdw/pdw-python-dev-tool databaseuv add "sqlalchemy[asyncio]" alembic asyncpg # PostgreSQL
# or
uv add "sqlalchemy[asyncio]" alembic aiosqlite # SQLitesqlalchemy[asyncio]asyncpgaiosqlitealembicfrom sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/mydb"
# For SQLite: "sqlite+aiosqlite:///./app.db"
engine = create_async_engine(
DATABASE_URL,
echo=False, # Set True for SQL query logging
pool_size=5, # Number of persistent connections
max_overflow=10, # Additional connections allowed beyond pool_size
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
)
async_session = async_sessionmaker(
engine,
expire_on_commit=False, # Prevent lazy-load after commit in async
)from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Engine is created at module level; dispose on shutdown
yield
await engine.dispose()
app = FastAPI(lifespan=lifespan)
async def get_db() -> AsyncIterator[AsyncSession]:
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseDepends(get_db)from datetime import datetime
from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship,
)
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
onupdate=func.now(),
)class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
# Relationships
posts: Mapped[list["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan",
)
class Post(TimestampMixin, Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")Mapped[type]mapped_column()Column()from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
return await self.session.get(User, user_id)
async def get_by_email(self, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def list_users(
self, *, offset: int = 0, limit: int = 20
) -> list[User]:
stmt = select(User).offset(offset).limit(limit).order_by(User.id)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def create(self, **kwargs) -> User:
user = User(**kwargs)
self.session.add(user)
await self.session.flush() # Assign ID without committing
return user
async def update(self, user: User, **kwargs) -> User:
for key, value in kwargs.items():
setattr(user, key, value)
await self.session.flush()
return user
async def delete(self, user: User) -> None:
await self.session.delete(user)
async def count(self) -> int:
stmt = select(func.count()).select_from(User)
result = await self.session.execute(stmt)
return result.scalar_one()from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
repo = UserRepository(db)
user = await repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("/", status_code=201)
async def create_user(
data: UserCreate,
db: AsyncSession = Depends(get_db),
):
repo = UserRepository(db)
existing = await repo.get_by_email(data.email)
if existing:
raise HTTPException(status_code=409, detail="Email already registered")
user = await repo.create(**data.model_dump())
return userfrom sqlalchemy.orm import selectinload, joinedload
# selectinload: separate IN query (best for collections)
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
# joinedload: LEFT JOIN (best for single relationships)
stmt = select(Post).options(joinedload(Post.author)).where(Post.id == post_id)from sqlalchemy import and_, or_, desc
# Complex filters
stmt = (
select(User)
.where(
and_(
User.is_active == True,
or_(
User.name.ilike(f"%{query}%"),
User.email.ilike(f"%{query}%"),
),
)
)
.order_by(desc(User.created_at))
.offset(offset)
.limit(limit)
)from pydantic import BaseModel
class PaginatedResponse[T](BaseModel):
items: list[T]
total: int
offset: int
limit: int
@property
def has_more(self) -> bool:
return self.offset + self.limit < self.total
async def paginate(
session: AsyncSession,
stmt,
*,
offset: int = 0,
limit: int = 20,
) -> tuple[list, int]:
# Count query
count_stmt = select(func.count()).select_from(stmt.subquery())
total = (await session.execute(count_stmt)).scalar_one()
# Data query
result = await session.execute(stmt.offset(offset).limit(limit))
items = list(result.scalars().all())
return items, total# Initialize Alembic
uv run alembic init alembic
# For async support, use the async template
uv run alembic init -t async alembicalembic/env.py# alembic/env.py
from app.database import Base, DATABASE_URL
from app.models import User, Post # Import all models
config = context.config
config.set_main_option("sqlalchemy.url", DATABASE_URL.replace("+asyncpg", ""))
target_metadata = Base.metadatarun_async_migrations()env.py# Generate a migration from model changes
uv run alembic revision --autogenerate -m "add users table"
# Apply all pending migrations
uv run alembic upgrade head
# Rollback one migration
uv run alembic downgrade -1
# Show current migration status
uv run alembic current
# Show migration history
uv run alembic historyupgradedowngradeadd_users_tableupdatealembic/versions/| Parameter | Default | Description |
|---|---|---|
| 5 | Number of persistent connections in the pool |
| 10 | Extra connections allowed beyond |
| 30 | Seconds to wait for a connection before error |
| -1 | Seconds before a connection is recycled (set for PG) |
| False | Test connections before checkout (set True for prod) |
engine = create_async_engine(
DATABASE_URL,
pool_size=5, # 5 per worker = 20 total connections
max_overflow=10, # Burst to 15 per worker
pool_pre_ping=True, # Handle dropped connections
pool_recycle=3600, # Recycle hourly
)workers * (pool_size + max_overflow)max_connectionsimport pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from app.database import Base, get_db
from app.main import create_app
@pytest.fixture
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture
async def db_session(db_engine):
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()
@pytest.fixture
def app(db_session):
app = create_app()
app.dependency_overrides[get_db] = lambda: db_session
yield app
app.dependency_overrides.clear()@pytest.fixture
async def db_session(db_engine):
async with db_engine.connect() as conn:
trans = await conn.begin()
session = AsyncSession(bind=conn, expire_on_commit=False)
yield session
await trans.rollback()pydanticapp-scaffoldingasync-patternsdocker-buildtest-runner