This skill should be used when the user asks to "convert asyncpg to SQLAlchemy", "convert database queries", "migrate asyncpg code", "transform asyncpg patterns to SQLAlchemy", or "update FastAPI database layer". It provides systematic conversion of asyncpg code to SQLAlchemy async patterns with proper error handling and transaction management.
Converts asyncpg database code to SQLAlchemy async patterns with proper error handling.
/plugin marketplace add kivo360/claude-toolbelt/plugin install asyncpg-to-sqlalchemy-converter@claude-toolbeltThis skill inherits all available tools. When active, it can use any tool Claude has access to.
This skill provides systematic conversion of asyncpg database code to SQLAlchemy 2.0+ with async support, maintaining async performance while providing ORM benefits.
Convert asyncpg procedural code to SQLAlchemy declarative patterns while preserving async functionality and improving maintainability.
Replace asyncpg imports with SQLAlchemy:
import asyncpg → from sqlalchemy.ext.asyncio import AsyncSession, create_async_enginefrom asyncpg import Connection → from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmakerConvert connection setup:
# Before (asyncpg)
engine = await asyncpg.create_pool(dsn)
# After (SQLAlchemy)
engine = create_async_engine(
DATABASE_URL,
echo=True,
poolclass=NullPool # For asyncpg compatibility
)
Replace connection objects with async sessions:
# Before (asyncpg)
async def get_user(db, user_id):
async with db.acquire() as conn:
result = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
return dict(result)
# After (SQLAlchemy)
async def get_user(session: AsyncSession, user_id: int):
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one()
Transform fetch operations to SQLAlchemy Core/ORM:
fetchall() → execute().scalars().all()fetchrow() → execute().scalar_one() or execute().first()fetchval() → execute().scalar()iter() → execute().yield_per()Convert execute patterns:
# Before (asyncpg)
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
name, email
)
# After (SQLAlchemy ORM)
session.add(User(name=name, email=email))
await session.commit()
Update transaction patterns:
# Before (asyncpg)
async with conn.transaction():
await conn.execute("UPDATE users SET status = $1", status)
# After (SQLAlchemy)
async with session.begin():
await session.execute(
update(User).where(User.id == user_id).values(status=status)
)
To convert asyncpg code:
Update exception handling:
asyncpg.PostgresError → sqlalchemy.exc.DBAPIErrorasyncpg.InterfaceError → sqlalchemy.exc.InterfaceErrorasyncpg.exceptions → Use SQLAlchemy's built-in exceptionsImplement robust error handling:
# Before
try:
conn = await asyncpg.connect(dsn)
except asyncpg.PostgresError as e:
logger.error(f"Database connection failed: {e}")
# After
try:
engine = create_async_engine(DATABASE_URL)
async with engine.begin() as conn:
pass
except SQLAlchemyError as e:
logger.error(f"Database setup failed: {e}")
references/pattern-mapping.md - Comprehensive asyncpg to SQLAlchemy conversion mappingreferences/async-patterns.md - Async SQLAlchemy best practicesreferences/error-handling.md - SQLAlchemy exception handling patternsexamples/conversion-comparison.md - Side-by-side asyncpg vs SQLAlchemy examplesexamples/migration-scripts.py - Automated conversion utilitiesexamples/test-validation.py - Testing converted code patternsUse when working with Payload CMS projects (payload.config.ts, collections, fields, hooks, access control, Payload API). Use when debugging validation errors, security issues, relationship queries, transactions, or hook behavior.