Use when working with database migrations in pgdbm - provides complete AsyncMigrationManager API with all methods and migration file format
Provides complete AsyncMigrationManager API reference for pgdbm database migrations. Use when creating, applying, or managing migration files with proper template syntax and module_name configuration.
/plugin marketplace add juanre/pgdbm/plugin install pgdbm@juanre-ai-toolsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
Complete API reference for AsyncMigrationManager and migration file format.
All signatures, parameters, return types. No documentation lookup needed.
from pgdbm import AsyncMigrationManager
migrations = AsyncMigrationManager(
db_manager: AsyncDatabaseManager,
migrations_path: str = "./migrations",
migrations_table: str = "schema_migrations",
module_name: Optional[str] = None,
)
Parameters:
db_manager: AsyncDatabaseManager instance (schema already set)migrations_path: Directory containing .sql migration filesmigrations_table: Table name for tracking migrations (default: "schema_migrations")module_name: CRITICAL - Unique identifier for this module's migrationsIMPORTANT:
schema parameter (doesn't exist, schema comes from db_manager)module_name to prevent conflictsmodule_name=f"mylib_{schema}" (include schema)# Apply all pending migrations
result = await migrations.apply_pending_migrations(
dry_run: bool = False
) -> dict[str, Any]
# Returns: {
# "status": "success" | "up_to_date" | "dry_run" | "error",
# "applied": [{"filename": "001_...", "execution_time_ms": 123.4}, ...],
# "skipped": ["001_already_applied.sql", ...],
# "total": 5,
# "total_time_ms": 456.7 # Only if status="success"
# }
# Get applied migrations
applied = await migrations.get_applied_migrations() -> dict[str, Migration]
# Returns: {"001_users.sql": Migration(...), ...}
# Get pending migrations
pending = await migrations.get_pending_migrations() -> list[Migration]
# Returns: [Migration(...), Migration(...)]
# Find migration files on disk
files = await migrations.find_migration_files() -> list[Migration]
# Returns: All .sql files in migrations_path
# Apply single migration
execution_time = await migrations.apply_migration(
migration: Migration
) -> float
# Returns: Execution time in milliseconds
# Get migration history (recent migrations)
history = await migrations.get_migration_history(
limit: int = 10
) -> list[dict[str, Any]]
# Returns: Recent migrations with timestamps, checksums
# Ensure migrations table exists
await migrations.ensure_migrations_table() -> None
# Creates schema_migrations table if doesn't exist
# Create new migration file
filepath = await migrations.create_migration(
name: str,
content: str,
auto_transaction: bool = True
) -> str
# Returns: Path to created file
# Example
path = await migrations.create_migration(
name="add_users_table",
content="CREATE TABLE {{tables.users}} (id SERIAL PRIMARY KEY)",
auto_transaction=True # Wraps in BEGIN/COMMIT
)
# Creates: migrations/20251025_120000_add_users_table.sql
# Rollback migration (removes from tracking, doesn't undo changes)
await migrations.rollback_migration(filename: str) -> None
# Marks migration as not applied
# WARNING: Doesn't undo the migration, just removes tracking
Must match one of these patterns:
001_description.sql (numeric prefix)V1__description.sql (Flyway pattern)20251025120000_description.sql (timestamp)Examples:
migrations/
├── 001_create_users.sql
├── 002_add_profiles.sql
├── 003_create_sessions.sql
└── 004_add_indexes.sql
Always use templates for schema portability:
-- All template placeholders
{{tables.tablename}} -- Table with schema qualification
{{schema}} -- Schema name only
Example migration:
-- migrations/001_create_users.sql
CREATE TABLE IF NOT EXISTS {{tables.users}} (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
full_name VARCHAR(255),
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS users_email
ON {{tables.users}} (email);
CREATE INDEX IF NOT EXISTS users_created
ON {{tables.users}} (created_at DESC);
-- Trigger using schema placeholder
CREATE OR REPLACE FUNCTION {{schema}}.update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_updated_at
BEFORE UPDATE ON {{tables.users}}
FOR EACH ROW
EXECUTE FUNCTION {{schema}}.update_updated_at();
Automatic: Migrations run in transactions by default
Manual control:
-- migrations/002_manual_transaction.sql
BEGIN;
CREATE TABLE {{tables.posts}} (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL
);
-- If this fails, whole migration rolls back
CREATE INDEX posts_title ON {{tables.posts}} (title);
COMMIT;
Non-transactional operations:
-- migrations/003_create_index_concurrently.sql
-- CREATE INDEX CONCURRENTLY cannot run in transaction
CREATE INDEX CONCURRENTLY users_email_concurrent
ON {{tables.users}} (email);
from pgdbm import AsyncDatabaseManager, AsyncMigrationManager, DatabaseConfig
# Create database manager
config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
db = AsyncDatabaseManager(config)
await db.connect()
# Create migration manager
migrations = AsyncMigrationManager(
db,
migrations_path="./migrations",
module_name="myapp" # REQUIRED
)
# Apply all pending
result = await migrations.apply_pending_migrations()
if result["status"] == "success":
print(f"Applied {len(result['applied'])} migrations")
for mig in result["applied"]:
print(f" - {mig['filename']} ({mig['execution_time_ms']:.1f}ms)")
# Check what would be applied without applying
result = await migrations.apply_pending_migrations(dry_run=True)
if result["status"] == "dry_run":
print(f"Would apply {len(result['pending'])} migrations:")
for filename in result["pending"]:
print(f" - {filename}")
# Get applied migrations
applied = await migrations.get_applied_migrations()
print(f"Applied: {list(applied.keys())}")
# Get pending migrations
pending = await migrations.get_pending_migrations()
print(f"Pending: {[m.filename for m in pending]}")
# Get history
history = await migrations.get_migration_history(limit=5)
for entry in history:
print(f"{entry['applied_at']}: {entry['filename']} ({entry['execution_time_ms']:.1f}ms)")
# Create new migration
path = await migrations.create_migration(
name="add_users_avatar",
content="""
ALTER TABLE {{tables.users}}
ADD COLUMN avatar_url VARCHAR(500);
""",
auto_transaction=True # Wraps in BEGIN/COMMIT
)
print(f"Created migration: {path}")
# Output: migrations/20251025_143022_add_users_avatar.sql
# Remove migration from tracking (doesn't undo changes!)
await migrations.rollback_migration("003_add_column.sql")
# Migration can now be re-applied
result = await migrations.apply_pending_migrations()
# Will apply 003_add_column.sql again
WARNING: rollback_migration only removes tracking record. It does NOT undo the migration's database changes. For true rollback, write a down migration.
Used internally, but can be accessed:
class Migration:
filename: str
checksum: str
content: str
applied_at: Optional[datetime]
module_name: Optional[str]
@property
def is_applied(self) -> bool
@property
def version(self) -> str
| Method | Parameters | Returns | Use Case |
|---|---|---|---|
apply_pending_migrations | dry_run=False | dict | Apply all pending |
get_applied_migrations | - | dict[str, Migration] | Check what's applied |
get_pending_migrations | - | list[Migration] | Check what's pending |
find_migration_files | - | list[Migration] | List files on disk |
apply_migration | migration | float | Apply single migration |
ensure_migrations_table | - | None | Create tracking table |
create_migration | name, content, auto_transaction | str | Create new file |
rollback_migration | filename | None | Remove tracking (dev only) |
get_migration_history | limit=10 | list[dict] | Recent migrations |
-- ✅ CORRECT
CREATE TABLE {{tables.users}} (...);
CREATE INDEX users_email ON {{tables.users}} (email);
-- ❌ WRONG
CREATE TABLE users (...);
CREATE TABLE myschema.users (...);
-- ✅ CORRECT - Can run multiple times safely
CREATE TABLE IF NOT EXISTS {{tables.users}} (...);
CREATE INDEX IF NOT EXISTS users_email ON {{tables.users}} (email);
-- ❌ WRONG - Fails if already exists
CREATE TABLE {{tables.users}} (...);
# ✅ CORRECT - Unique per module
AsyncMigrationManager(db, "migrations", module_name="users")
AsyncMigrationManager(db, "migrations", module_name="orders")
# ❌ WRONG - Default module name causes conflicts
AsyncMigrationManager(db, "migrations") # module_name="default"
AsyncMigrationManager(db, "migrations") # Same default - conflict!
# ✅ CORRECT - Can use same library multiple times
module_name = f"mylib_{schema}" # "mylib_tenant1", "mylib_tenant2"
# ❌ WRONG - Conflicts if library used twice
module_name = "mylib" # Always the same
-- migrations/001_initial_schema.sql
CREATE TABLE IF NOT EXISTS {{tables.users}} (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS {{tables.posts}} (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES {{tables.users}}(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- migrations/002_add_indexes.sql
CREATE INDEX IF NOT EXISTS users_email
ON {{tables.users}} (email);
CREATE INDEX IF NOT EXISTS posts_user_id
ON {{tables.posts}} (user_id);
CREATE INDEX IF NOT EXISTS posts_created
ON {{tables.posts}} (created_at DESC);
-- migrations/003_add_triggers.sql
CREATE OR REPLACE FUNCTION {{schema}}.update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_users_timestamp
BEFORE UPDATE ON {{tables.users}}
FOR EACH ROW
EXECUTE FUNCTION {{schema}}.update_timestamp();
-- migrations/004_seed_data.sql
INSERT INTO {{tables.users}} (email, full_name)
VALUES
('admin@example.com', 'Admin User'),
('support@example.com', 'Support User')
ON CONFLICT (email) DO NOTHING;
Migrations are checksummed to detect modifications:
# If migration file changes after being applied
result = await migrations.apply_pending_migrations()
# Raises: MigrationError(
# "Migration '001_users.sql' has been modified after being applied!"
# "Expected checksum: abc123..."
# "Current checksum: def456..."
# )
Why: Prevents silent schema divergence. Applied migrations must not change.
If you need to modify: Create a new migration instead.
Migrations tracked in schema_migrations table:
CREATE TABLE schema_migrations (
id SERIAL PRIMARY KEY,
filename VARCHAR(255) NOT NULL,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
execution_time_ms REAL,
module_name VARCHAR(255),
UNIQUE(filename, module_name)
);
Key points:
filename: Migration file namechecksum: SHA256 of file contentsmodule_name: Isolates migrations per moduleThis allows:
result = await migrations.apply_pending_migrations()
if result["status"] == "error":
print(f"Failed on: {result['failed_migration']}")
print(f"Error: {result['error']}")
print(f"Successfully applied: {result['applied']}")
elif result["status"] == "success":
print(f"Applied {len(result['applied'])} migrations")
elif result["status"] == "up_to_date":
print("No pending migrations")
# In application startup (FastAPI lifespan, etc.)
from pgdbm import AsyncDatabaseManager, AsyncMigrationManager, DatabaseConfig
config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
db = AsyncDatabaseManager(config)
await db.connect()
migrations = AsyncMigrationManager(
db,
migrations_path="./migrations",
module_name="myapp"
)
result = await migrations.apply_pending_migrations()
if result["status"] != "success" and result["status"] != "up_to_date":
raise RuntimeError(f"Migration failed: {result}")
# Each service runs its own migrations
services = [
(users_db, "migrations/users", "users"),
(orders_db, "migrations/orders", "orders"),
(payments_db, "migrations/payments", "payments"),
]
for db, path, name in services:
migrations = AsyncMigrationManager(db, path, module_name=name)
result = await migrations.apply_pending_migrations()
if result["status"] == "success":
print(f"{name}: Applied {len(result['applied'])} migrations")
class MyLibrary:
async def initialize(self):
# Library ALWAYS runs its own migrations
migrations = AsyncMigrationManager(
self.db,
migrations_path=str(Path(__file__).parent / "migrations"),
module_name=f"mylib_{self.db.schema}" # Include schema!
)
result = await migrations.apply_pending_migrations()
# Use different table name (e.g., for multi-tenant)
migrations = AsyncMigrationManager(
db,
migrations_path="tenant_migrations",
migrations_table="tenant_migrations", # Custom table name
module_name=f"tenant_{tenant_id}"
)
# Check pending before applying
pending = await migrations.get_pending_migrations()
if pending:
print(f"Found {len(pending)} pending migrations:")
for mig in pending:
print(f" - {mig.filename}")
# Ask user confirmation
if input("Apply? (y/n): ") == "y":
result = await migrations.apply_pending_migrations()
else:
print("Schema is up to date")
# 1. Create migration
path = await migrations.create_migration(
name="add_user_roles",
content="""
CREATE TABLE {{tables.roles}} (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL
);
ALTER TABLE {{tables.users}}
ADD COLUMN role_id INTEGER REFERENCES {{tables.roles}}(id);
"""
)
# 2. Apply it
result = await migrations.apply_pending_migrations()
# 3. If something wrong, rollback the record
await migrations.rollback_migration("20251025_143022_add_user_roles.sql")
# 4. Fix the file, re-apply
result = await migrations.apply_pending_migrations()
# WRONG - schema parameter doesn't exist
migrations = AsyncMigrationManager(
db,
"migrations",
schema="myschema" # TypeError!
)
Fix: Schema comes from db_manager:
db = AsyncDatabaseManager(pool=pool, schema="myschema")
migrations = AsyncMigrationManager(db, "migrations", module_name="myapp")
# WRONG - Uses "default" module name
migrations = AsyncMigrationManager(db, "migrations")
Fix: Always specify unique module_name:
migrations = AsyncMigrationManager(db, "migrations", module_name="myservice")
# WRONG - Conflict if library used twice
migrations = AsyncMigrationManager(db1, "migrations", module_name="mylib")
migrations = AsyncMigrationManager(db2, "migrations", module_name="mylib")
Fix: Include schema in module_name:
migrations = AsyncMigrationManager(db1, "migrations", module_name=f"mylib_{schema1}")
migrations = AsyncMigrationManager(db2, "migrations", module_name=f"mylib_{schema2}")
-- WRONG
CREATE TABLE users (...);
CREATE TABLE myschema.users (...);
Fix: Use templates:
-- CORRECT
CREATE TABLE {{tables.users}} (...);
-- You applied 001_users.sql yesterday
-- Today you edit it and add a column
-- Next deploy: ERROR - checksum mismatch!
Fix: Never modify applied migrations. Create new migration:
-- migrations/002_add_user_column.sql
ALTER TABLE {{tables.users}} ADD COLUMN new_field VARCHAR(255);
-- WRONG - Fails if run twice
CREATE TABLE {{tables.users}} (...);
Fix: Use IF NOT EXISTS:
-- CORRECT
CREATE TABLE IF NOT EXISTS {{tables.users}} (...);
CREATE INDEX IF NOT EXISTS users_email ON {{tables.users}} (email);
# Get recent migration history
history = await migrations.get_migration_history(limit=10)
for entry in history:
print(f"""
Migration: {entry['filename']}
Module: {entry['module_name']}
Applied: {entry['applied_at']}
Time: {entry['execution_time_ms']}ms
Checksum: {entry['checksum']}
""")
Before running migrations:
{{tables.}} syntaxmodule_name specifiedpgdbm:using-pgdbm, pgdbm:choosing-patternpgdbm:shared-pool-pattern, pgdbm:dual-mode-librarypgdbm:core-api-reference