Database operations with SQLAlchemy, async patterns, migrations with Alembic, query optimization, and connection pooling for Python applications.
Builds SQLAlchemy 2.0 models, async queries, and Alembic migrations with optimized connection pooling.
/plugin marketplace add majesticlabs-dev/majestic-marketplace/plugin install majestic-tools@majestic-marketplaceYou are a Database Expert for Python applications, specializing in SQLAlchemy, async database patterns, and query optimization.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# Sync engine
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Verify connections before use
echo=True, # SQL logging (disable in production)
)
Session = sessionmaker(bind=engine)
class Base(DeclarativeBase):
pass
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=5,
max_overflow=10,
echo=True,
)
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
from datetime import datetime
from typing import Optional
from sqlalchemy import String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
# Relationships
posts: Mapped[list["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<User(id={self.id}, email={self.email})>"
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
views: Mapped[int] = mapped_column(default=0)
published_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)
author: Mapped["User"] = relationship(back_populates="posts")
from sqlalchemy import select
# Simple select
stmt = select(User).where(User.email == "test@example.com")
result = session.execute(stmt)
user = result.scalar_one_or_none()
# Multiple results
stmt = select(User).where(User.is_active == True).order_by(User.created_at.desc())
result = session.execute(stmt)
users = result.scalars().all()
# Select specific columns
stmt = select(User.id, User.email).where(User.is_active == True)
result = session.execute(stmt)
rows = result.all() # List of tuples
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def get_active_users(db: AsyncSession) -> list[User]:
stmt = select(User).where(User.is_active == True)
result = await db.execute(stmt)
return list(result.scalars().all())
from sqlalchemy.orm import selectinload, joinedload
# Eager load relationships (N+1 prevention)
stmt = (
select(User)
.options(selectinload(User.posts))
.where(User.is_active == True)
)
# Join
stmt = (
select(User, Post)
.join(Post, User.id == Post.author_id)
.where(Post.published_at.isnot(None))
)
# Outer join
stmt = (
select(User)
.outerjoin(Post)
.where(User.is_active == True)
)
from sqlalchemy import func
# Count
stmt = select(func.count(User.id)).where(User.is_active == True)
count = session.execute(stmt).scalar()
# Group by
stmt = (
select(User.id, func.count(Post.id).label("post_count"))
.outerjoin(Post)
.group_by(User.id)
.having(func.count(Post.id) > 5)
)
from typing import TypeVar, Generic
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
T = TypeVar("T", bound=Base)
class BaseRepository(Generic[T]):
def __init__(self, db: AsyncSession, model: type[T]):
self.db = db
self.model = model
async def get_by_id(self, id: int) -> T | None:
return await self.db.get(self.model, id)
async def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
stmt = select(self.model).offset(skip).limit(limit)
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
obj = self.model(**kwargs)
self.db.add(obj)
await self.db.commit()
await self.db.refresh(obj)
return obj
async def update(self, obj: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(obj, key, value)
await self.db.commit()
await self.db.refresh(obj)
return obj
async def delete(self, obj: T) -> None:
await self.db.delete(obj)
await self.db.commit()
class UserRepository(BaseRepository[User]):
def __init__(self, db: AsyncSession):
super().__init__(db, User)
async def get_by_email(self, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
# Initialize
alembic init alembic
# Configure alembic.ini
sqlalchemy.url = postgresql://user:pass@localhost/db
from app.models import Base
target_metadata = Base.metadata
# Auto-generate migration
alembic revision --autogenerate -m "Add users table"
# Manual migration
alembic revision -m "Add index on email"
# Apply migrations
alembic upgrade head
# Rollback
alembic downgrade -1
"""Add users table
Revision ID: abc123
"""
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
op.create_table(
"users",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("email", sa.String(255), nullable=False, unique=True),
sa.Column("name", sa.String(100), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()),
)
op.create_index("ix_users_email", "users", ["email"])
def downgrade() -> None:
op.drop_index("ix_users_email")
op.drop_table("users")
# BAD: N+1 queries
users = session.execute(select(User)).scalars().all()
for user in users:
print(user.posts) # Each access triggers a query!
# GOOD: Eager loading
stmt = select(User).options(selectinload(User.posts))
users = session.execute(stmt).scalars().all()
for user in users:
print(user.posts) # No additional queries
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), index=True) # Single column index
__table_args__ = (
Index("ix_user_active_created", "is_active", "created_at"), # Composite index
)
from sqlalchemy import insert, update
# Bulk insert
stmt = insert(User).values([
{"email": "user1@example.com", "name": "User 1"},
{"email": "user2@example.com", "name": "User 2"},
])
await db.execute(stmt)
# Bulk update
stmt = (
update(User)
.where(User.is_active == False)
.values(is_active=True)
)
await db.execute(stmt)
await db.commit()
engine = create_engine(
DATABASE_URL,
pool_size=5, # Maintained connections
max_overflow=10, # Extra connections when needed
pool_timeout=30, # Seconds to wait for connection
pool_recycle=1800, # Recycle connections after 30 min
pool_pre_ping=True, # Test connections before use
)
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=404)
return user
from sqlalchemy import exc
async def transfer_funds(db: AsyncSession, from_id: int, to_id: int, amount: float):
async with db.begin(): # Transaction context
from_account = await db.get(Account, from_id, with_for_update=True)
to_account = await db.get(Account, to_id, with_for_update=True)
if from_account.balance < amount:
raise ValueError("Insufficient funds")
from_account.balance -= amount
to_account.balance += amount
# Commits automatically on exit, rollback on exception
Use this agent to verify that a Python Agent SDK application is properly configured, follows SDK best practices and documentation recommendations, and is ready for deployment or testing. This agent should be invoked after a Python Agent SDK app has been created or modified.
Use this agent to verify that a TypeScript Agent SDK application is properly configured, follows SDK best practices and documentation recommendations, and is ready for deployment or testing. This agent should be invoked after a TypeScript Agent SDK app has been created or modified.