Create safe data migration scripts with rollback capabilities and validation checks.
Creates production-safe data migration scripts with rollback capabilities and validation checks. Use when you need to modify database schemas or transform data safely, with transaction wrapping, batch processing, and automated verification.
/plugin marketplace add majesticlabs-dev/majestic-marketplace/plugin install majestic-data@majestic-marketplaceAutonomous agent that creates production-safe data migration scripts.
Analyze current state
Design migration strategy
Generate migration scripts
Add safety measures
-- Migration: 20240115_add_customer_tier
-- Description: Add customer_tier column and backfill from order history
-- Author: migration-builder
-- Estimated duration: 5 minutes for 1M rows
-- ============================================
-- PRE-MIGRATION CHECKS
-- ============================================
-- Verify source data exists
DO $$
BEGIN
IF (SELECT COUNT(*) FROM customers) = 0 THEN
RAISE EXCEPTION 'No customers found - aborting migration';
END IF;
END $$;
-- Capture baseline metrics
CREATE TEMP TABLE migration_baseline AS
SELECT
COUNT(*) as total_customers,
COUNT(DISTINCT id) as unique_ids,
NOW() as snapshot_time
FROM customers;
-- ============================================
-- FORWARD MIGRATION
-- ============================================
BEGIN;
-- Step 1: Add new column (nullable first)
ALTER TABLE customers ADD COLUMN IF NOT EXISTS tier VARCHAR(20);
-- Step 2: Backfill in batches
DO $$
DECLARE
batch_size INT := 10000;
affected INT;
BEGIN
LOOP
WITH batch AS (
SELECT id
FROM customers
WHERE tier IS NULL
LIMIT batch_size
FOR UPDATE SKIP LOCKED
)
UPDATE customers c
SET tier = CASE
WHEN total_orders >= 100 THEN 'platinum'
WHEN total_orders >= 50 THEN 'gold'
WHEN total_orders >= 10 THEN 'silver'
ELSE 'bronze'
END
FROM (
SELECT customer_id, COUNT(*) as total_orders
FROM orders
GROUP BY customer_id
) o
WHERE c.id = o.customer_id
AND c.id IN (SELECT id FROM batch);
GET DIAGNOSTICS affected = ROW_COUNT;
RAISE NOTICE 'Updated % rows', affected;
EXIT WHEN affected = 0;
-- Checkpoint for long migrations
COMMIT;
BEGIN;
END LOOP;
END $$;
-- Step 3: Set default for new records
ALTER TABLE customers
ALTER COLUMN tier SET DEFAULT 'bronze';
-- Step 4: Add constraint after data is valid
ALTER TABLE customers
ADD CONSTRAINT chk_tier
CHECK (tier IN ('bronze', 'silver', 'gold', 'platinum'));
COMMIT;
-- ============================================
-- POST-MIGRATION VALIDATION
-- ============================================
-- Verify no nulls remain
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM customers WHERE tier IS NULL) THEN
RAISE EXCEPTION 'Migration incomplete: NULL tiers found';
END IF;
END $$;
-- Verify row count unchanged
DO $$
DECLARE
before_count INT;
after_count INT;
BEGIN
SELECT total_customers INTO before_count FROM migration_baseline;
SELECT COUNT(*) INTO after_count FROM customers;
IF before_count != after_count THEN
RAISE EXCEPTION 'Row count mismatch: before=%, after=%',
before_count, after_count;
END IF;
END $$;
-- Distribution check
SELECT tier, COUNT(*) as count, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as pct
FROM customers
GROUP BY tier
ORDER BY count DESC;
-- Rollback: 20240115_add_customer_tier
-- Run this to undo the migration
BEGIN;
-- Remove constraint first
ALTER TABLE customers DROP CONSTRAINT IF EXISTS chk_tier;
-- Remove default
ALTER TABLE customers ALTER COLUMN tier DROP DEFAULT;
-- Drop the column
ALTER TABLE customers DROP COLUMN IF EXISTS tier;
COMMIT;
-- Verify rollback
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'customers' AND column_name = 'tier';
-- Should return 0 rows
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import logging
@dataclass
class Migration:
id: str
description: str
up: Callable
down: Callable
validate: Callable
class MigrationRunner:
def __init__(self, engine):
self.engine = engine
self.logger = logging.getLogger(__name__)
def run(self, migration: Migration, dry_run: bool = False) -> bool:
self.logger.info(f"Starting migration: {migration.id}")
with self.engine.begin() as conn:
# Pre-validation
baseline = self._capture_baseline(conn)
if dry_run:
self.logger.info("DRY RUN - no changes applied")
return True
try:
# Run migration
migration.up(conn)
# Post-validation
if not migration.validate(conn, baseline):
raise ValueError("Validation failed")
self.logger.info(f"Migration {migration.id} completed successfully")
return True
except Exception as e:
self.logger.error(f"Migration failed: {e}")
self.logger.info("Rolling back...")
migration.down(conn)
raise
def _capture_baseline(self, conn) -> dict:
return {
'timestamp': datetime.now(),
'table_counts': self._get_table_counts(conn)
}
-- Safe pattern: nullable first, then backfill, then constraint
ALTER TABLE t ADD COLUMN new_col TYPE;
UPDATE t SET new_col = compute_value();
ALTER TABLE t ALTER COLUMN new_col SET NOT NULL;
-- Safe pattern: add new, copy, drop old
ALTER TABLE t ADD COLUMN new_name TYPE;
UPDATE t SET new_name = old_name;
ALTER TABLE t DROP COLUMN old_name;
-- Safe pattern: add new column, migrate, swap
ALTER TABLE t ADD COLUMN col_new NEWTYPE;
UPDATE t SET col_new = col::NEWTYPE;
ALTER TABLE t DROP COLUMN col;
ALTER TABLE t RENAME COLUMN col_new TO col;
-- Pattern: batch updates with progress
DO $$
DECLARE
batch_size INT := 10000;
total_updated INT := 0;
BEGIN
LOOP
WITH batch AS (
SELECT id FROM large_table
WHERE needs_update = true
LIMIT batch_size
)
UPDATE large_table SET ...
WHERE id IN (SELECT id FROM batch);
GET DIAGNOSTICS rows_affected = ROW_COUNT;
total_updated := total_updated + rows_affected;
RAISE NOTICE 'Progress: % rows updated', total_updated;
EXIT WHEN rows_affected = 0;
PERFORM pg_sleep(0.1); -- Brief pause to reduce lock contention
END LOOP;
END $$;
Before running migration:
After migration:
Designs feature architectures by analyzing existing codebase patterns and conventions, then providing comprehensive implementation blueprints with specific files to create/modify, component designs, data flows, and build sequences