From hieutrtr-ai1-skills
Python backend implementation patterns for FastAPI applications with SQLAlchemy 2.0, Pydantic v2, and async patterns. Use during the implementation phase when creating or modifying FastAPI endpoints, Pydantic models, SQLAlchemy models, service layers, or repository classes. Covers async session management, dependency injection via Depends(), layered error handling, and Alembic migrations. Does NOT cover testing (use pytest-patterns), deployment (use deployment-pipeline), or FastAPI framework mechanics like middleware and WebSockets (use fastapi-patterns).
npx claudepluginhub joshuarweaver/cascade-code-testing-misc --plugin hieutrtr-ai1-skillsThis skill is limited to using the following tools:
Activate this skill when:
Creates isolated Git worktrees for feature branches with prioritized directory selection, gitignore safety checks, auto project setup for Node/Python/Rust/Go, and baseline verification.
Executes implementation plans in current session by dispatching fresh subagents per independent task, with two-stage reviews: spec compliance then code quality.
Dispatches parallel agents to independently tackle 2+ tasks like separate test failures or subsystems without shared state or dependencies.
Activate this skill when:
Depends()Do NOT use this skill for:
pytest-patterns)fastapi-patterns)deployment-pipeline)api-design-patterns)system-architecture)app/
├── main.py # FastAPI application factory
├── core/
│ ├── config.py # pydantic-settings configuration
│ ├── database.py # Async engine, session factory
│ └── security.py # Password hashing, JWT utilities
├── models/ # SQLAlchemy ORM models
│ ├── __init__.py
│ ├── base.py # Declarative base
│ └── user.py
├── schemas/ # Pydantic v2 schemas
│ ├── __init__.py
│ └── user.py
├── repositories/ # Data access layer
│ ├── __init__.py
│ └── user_repo.py
├── services/ # Business logic layer
│ ├── __init__.py
│ └── user_service.py
├── routes/ # FastAPI routers
│ ├── __init__.py
│ └── users.py
├── dependencies/ # Reusable Depends() providers
│ ├── __init__.py
│ └── auth.py
└── exceptions.py # Domain exception classes
Every endpoint follows this structure:
router = APIRouter(prefix="/users", tags=["Users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
data: UserCreate,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.create_user(data)
return UserResponse.model_validate(user)
except ConflictError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.get_user(user_id)
return UserResponse.model_validate(user)
except NotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
Rules:
HTTPException, response formattingresponse_model for automatic response serialization and OpenAPI docsstatus.HTTP_* constants, not bare integersDepends() for session, auth, and service injectionRepositories encapsulate all database access:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.user import User
class UserRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self._session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self._session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def list_with_posts(
self, *, offset: int = 0, limit: int = 20
) -> list[User]:
result = await self._session.execute(
select(User)
.options(selectinload(User.posts))
.offset(offset)
.limit(limit)
)
return list(result.scalars().all())
async def create(self, user: User) -> User:
self._session.add(user)
await self._session.flush()
await self._session.refresh(user)
return user
async def update(self, user: User, **kwargs: object) -> User:
for key, value in kwargs.items():
setattr(user, key, value)
await self._session.flush()
await self._session.refresh(user)
return user
async def delete(self, user: User) -> None:
await self._session.delete(user)
await self._session.flush()
Rules:
None — never HTTP responsesflush() + refresh() after add() to get generated fields (id, timestamps)selectinload() for eager loading relationships in async contextHTTPException from repositoriesServices contain business logic and orchestrate repositories:
from app.exceptions import ConflictError, NotFoundError
from app.models.user import User
from app.repositories.user_repo import UserRepository
from app.schemas.user import UserCreate, UserPatch
from app.core.security import hash_password
class UserService:
def __init__(self, session: AsyncSession) -> None:
self.repo = UserRepository(session)
async def create_user(self, data: UserCreate) -> User:
# Business rule: email must be unique
existing = await self.repo.get_by_email(data.email)
if existing:
raise ConflictError(f"Email {data.email} already registered")
# Business logic: hash password before storing
user = User(
email=data.email,
hashed_password=hash_password(data.password),
display_name=data.display_name,
)
return await self.repo.create(user)
async def get_user(self, user_id: int) -> User:
user = await self.repo.get_by_id(user_id)
if user is None:
raise NotFoundError(f"User {user_id} not found")
return user
async def update_user(self, user_id: int, data: UserPatch) -> User:
user = await self.get_user(user_id)
update_fields = data.model_dump(exclude_unset=True)
if "password" in update_fields:
update_fields["hashed_password"] = hash_password(update_fields.pop("password"))
return await self.repo.update(user, **update_fields)
Rules:
NotFoundError, ConflictError), NEVER HTTPExceptionAsyncSession via constructor and create their own repository instancesDefine a hierarchy of domain exceptions:
class AppError(Exception):
"""Base application error."""
class NotFoundError(AppError):
"""Resource not found."""
class ConflictError(AppError):
"""Resource conflict (duplicate, version mismatch)."""
class ValidationError(AppError):
"""Business rule violation."""
class PermissionError(AppError):
"""Insufficient permissions."""
Register global exception handlers in the FastAPI app:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError) -> JSONResponse:
return JSONResponse(status_code=404, content={"detail": str(exc), "code": "NOT_FOUND"})
@app.exception_handler(ConflictError)
async def conflict_handler(request: Request, exc: ConflictError) -> JSONResponse:
return JSONResponse(status_code=409, content={"detail": str(exc), "code": "CONFLICT"})
This allows services to raise domain exceptions without knowing about HTTP, and routes don't need try/except blocks.
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field
class UserCreate(BaseModel):
"""POST request body — writable fields only, no id/timestamps."""
email: EmailStr
password: str = Field(min_length=8, max_length=128)
display_name: str = Field(min_length=1, max_length=100)
class UserPatch(BaseModel):
"""PATCH request body — all fields Optional."""
email: EmailStr | None = None
password: str | None = Field(default=None, min_length=8, max_length=128)
display_name: str | None = Field(default=None, min_length=1, max_length=100)
class UserResponse(BaseModel):
"""Response body — all fields including id and timestamps."""
model_config = ConfigDict(from_attributes=True)
id: int
email: str
display_name: str
is_active: bool
created_at: datetime
updated_at: datetime
Key Pydantic v2 patterns:
ConfigDict(from_attributes=True) instead of class Config: orm_mode = Truemodel_validate() instead of from_orm()model_dump() instead of .dict()model_dump(exclude_unset=True) for PATCH to distinguish "not sent" from "set to null"Field() for validation constraintsstr | None syntax (Python 3.12+), not Optional[str]from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
async with session.begin():
yield session
Rules:
expire_on_commit=False prevents detached instance errors after commitsession.begin() context manager auto-commits on success, rolls back on exceptionDepends(get_async_session)from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
display_name: Mapped[str] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now()
)
# Relationships — ALWAYS use selectin or joined for async
posts: Mapped[list["Post"]] = relationship(
back_populates="author", lazy="selectin"
)
Rules:
Mapped[type] annotations (SQLAlchemy 2.0 style)mapped_column() instead of Column()lazy="selectin" on relationships for async compatibilityserver_default for database-generated defaultscreated_at and updated_at timestamps# Generate migration from model changes
alembic revision --autogenerate -m "add_users_table"
# Review the generated migration file before applying
# Apply migration
alembic upgrade head
# Rollback one step
alembic downgrade -1
# Show current revision
alembic current
# Show migration history
alembic history
Migration naming convention:
# alembic/env.py
naming_convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
Rules:
downgrade() function"add_users_table", "add_email_index_to_users"from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_async_session
from app.services.user_service import UserService
async def get_user_service(
session: AsyncSession = Depends(get_async_session),
) -> UserService:
return UserService(session)
# Chain dependencies for auth
async def get_current_user(
token: str = Depends(oauth2_scheme),
session: AsyncSession = Depends(get_async_session),
) -> User:
user_id = decode_token(token)
service = UserService(session)
return await service.get_user(user_id)
async def require_admin(
user: User = Depends(get_current_user),
) -> User:
if user.role != "admin":
raise HTTPException(status_code=403, detail="Admin required")
return user
A request to POST /users flows through all layers:
UserCreate (Pydantic validates the request body)UserService.create_user(data) via Depends()UserRepository.get_by_email()User model instanceUserRepository.create(user) to persistUserResponse via model_validate()If the email is duplicate, the service raises ConflictError, the global exception handler returns 409 Conflict. No try/except needed in the route.
Detached instance errors: Always call flush() + refresh() after session.add(). Set expire_on_commit=False on the session factory.
Async session in background tasks: Never reuse the request session. Create a new session:
async def background_job():
async with async_session_factory() as session:
async with session.begin():
# do work
N+1 queries: Use selectinload() in repository queries for relationships that will be accessed. Set lazy="selectin" as the default on model relationships.
Bulk operations: Use session.execute(insert(User).values(list_of_dicts)) for bulk inserts instead of adding one by one.
Transaction spanning multiple services: Pass the same session to all services. The session's begin() context manager handles the transaction boundary.
Pydantic v2 computed fields: Use @computed_field for derived values in response schemas. See references/pydantic-v2-migration.md.
See references/sqlalchemy-patterns.md for advanced query optimization patterns.