Builds observability for database migrations with CDC pipelines, Prometheus metrics, Grafana dashboards, alerting, and MongoDB examples.
From antigravity-awesome-skillsnpx claudepluginhub sickn33/antigravity-awesome-skills --plugin antigravity-awesome-skillsThis skill uses the workspace's default tool permissions.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Enables AI agents to execute x402 payments with per-task budgets, spending controls, and non-custodial wallets via MCP tools. Use when agents pay for APIs, services, or other agents.
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
You are a database observability expert specializing in Change Data Capture, real-time migration monitoring, and enterprise-grade observability infrastructure. Create comprehensive monitoring solutions for database migrations with CDC pipelines, anomaly detection, and automated alerting.
The user needs observability infrastructure for database migrations, including real-time data synchronization via CDC, comprehensive metrics collection, alerting systems, and visual dashboards.
$ARGUMENTS
const { MongoClient } = require('mongodb');
const { createLogger, transports } = require('winston');
const prometheus = require('prom-client');
class ObservableAtlasMigration {
constructor(connectionString) {
this.client = new MongoClient(connectionString);
this.logger = createLogger({
transports: [
new transports.File({ filename: 'migrations.log' }),
new transports.Console()
]
});
this.metrics = this.setupMetrics();
}
setupMetrics() {
const register = new prometheus.Registry();
return {
migrationDuration: new prometheus.Histogram({
name: 'mongodb_migration_duration_seconds',
help: 'Duration of MongoDB migrations',
labelNames: ['version', 'status'],
buckets: [1, 5, 15, 30, 60, 300],
registers: [register]
}),
documentsProcessed: new prometheus.Counter({
name: 'mongodb_migration_documents_total',
help: 'Total documents processed',
labelNames: ['version', 'collection'],
registers: [register]
}),
migrationErrors: new prometheus.Counter({
name: 'mongodb_migration_errors_total',
help: 'Total migration errors',
labelNames: ['version', 'error_type'],
registers: [register]
}),
register
};
}
async migrate() {
await this.client.connect();
const db = this.client.db();
for (const [version, migration] of this.migrations) {
await this.executeMigrationWithObservability(db, version, migration);
}
}
async executeMigrationWithObservability(db, version, migration) {
const timer = this.metrics.migrationDuration.startTimer({ version });
const session = this.client.startSession();
try {
this.logger.info(`Starting migration ${version}`);
await session.withTransaction(async () => {
await migration.up(db, session, (collection, count) => {
this.metrics.documentsProcessed.inc({
version,
collection
}, count);
});
});
timer({ status: 'success' });
this.logger.info(`Migration ${version} completed`);
} catch (error) {
this.metrics.migrationErrors.inc({
version,
error_type: error.name
});
timer({ status: 'failed' });
throw error;
} finally {
await session.endSession();
}
}
}
import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from prometheus_client import Counter, Histogram, Gauge
from datetime import datetime
class CDCObservabilityManager:
def __init__(self, config):
self.config = config
self.metrics = self.setup_metrics()
def setup_metrics(self):
return {
'events_processed': Counter(
'cdc_events_processed_total',
'Total CDC events processed',
['source', 'table', 'operation']
),
'consumer_lag': Gauge(
'cdc_consumer_lag_messages',
'Consumer lag in messages',
['topic', 'partition']
),
'replication_lag': Gauge(
'cdc_replication_lag_seconds',
'Replication lag',
['source_table', 'target_table']
)
}
async def setup_cdc_pipeline(self):
self.consumer = KafkaConsumer(
'database.changes',
bootstrap_servers=self.config['kafka_brokers'],
group_id='migration-consumer',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=self.config['kafka_brokers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
async def process_cdc_events(self):
for message in self.consumer:
event = self.parse_cdc_event(message.value)
self.metrics['events_processed'].labels(
source=event.source_db,
table=event.table,
operation=event.operation
).inc()
await self.apply_to_target(
event.table,
event.operation,
event.data,
event.timestamp
)
async def setup_debezium_connector(self, source_config):
connector_config = {
"name": f"migration-connector-{source_config['name']}",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": source_config['host'],
"database.port": source_config['port'],
"database.dbname": source_config['database'],
"plugin.name": "pgoutput",
"heartbeat.interval.ms": "10000"
}
}
response = requests.post(
f"{self.config['kafka_connect_url']}/connectors",
json=connector_config
)
from prometheus_client import Counter, Gauge, Histogram, Summary
import numpy as np
class EnterpriseMigrationMonitor:
def __init__(self, config):
self.config = config
self.registry = prometheus.CollectorRegistry()
self.metrics = self.setup_metrics()
self.alerting = AlertingSystem(config.get('alerts', {}))
def setup_metrics(self):
return {
'migration_duration': Histogram(
'migration_duration_seconds',
'Migration duration',
['migration_id'],
buckets=[60, 300, 600, 1800, 3600],
registry=self.registry
),
'rows_migrated': Counter(
'migration_rows_total',
'Total rows migrated',
['migration_id', 'table_name'],
registry=self.registry
),
'data_lag': Gauge(
'migration_data_lag_seconds',
'Data lag',
['migration_id'],
registry=self.registry
)
}
async def track_migration_progress(self, migration_id):
while migration.status == 'running':
stats = await self.calculate_progress_stats(migration)
self.metrics['rows_migrated'].labels(
migration_id=migration_id,
table_name=migration.table
).inc(stats.rows_processed)
anomalies = await self.detect_anomalies(migration_id, stats)
if anomalies:
await self.handle_anomalies(migration_id, anomalies)
await asyncio.sleep(30)
async def detect_anomalies(self, migration_id, stats):
anomalies = []
if stats.rows_per_second < stats.expected_rows_per_second * 0.5:
anomalies.append({
'type': 'low_throughput',
'severity': 'warning',
'message': f'Throughput below expected'
})
if stats.error_rate > 0.01:
anomalies.append({
'type': 'high_error_rate',
'severity': 'critical',
'message': f'Error rate exceeds threshold'
})
return anomalies
async def setup_migration_dashboard(self):
dashboard_config = {
"dashboard": {
"title": "Database Migration Monitoring",
"panels": [
{
"title": "Migration Progress",
"targets": [{
"expr": "rate(migration_rows_total[5m])"
}]
},
{
"title": "Data Lag",
"targets": [{
"expr": "migration_data_lag_seconds"
}]
}
]
}
}
response = requests.post(
f"{self.config['grafana_url']}/api/dashboards/db",
json=dashboard_config,
headers={'Authorization': f"Bearer {self.config['grafana_token']}"}
)
class AlertingSystem:
def __init__(self, config):
self.config = config
async def send_alert(self, title, message, severity, **kwargs):
if 'slack' in self.config:
await self.send_slack_alert(title, message, severity)
if 'email' in self.config:
await self.send_email_alert(title, message, severity)
async def send_slack_alert(self, title, message, severity):
color = {
'critical': 'danger',
'warning': 'warning',
'info': 'good'
}.get(severity, 'warning')
payload = {
'text': title,
'attachments': [{
'color': color,
'text': message
}]
}
requests.post(self.config['slack']['webhook_url'], json=payload)
dashboard_panels = [
{
"id": 1,
"title": "Migration Progress",
"type": "graph",
"targets": [{
"expr": "rate(migration_rows_total[5m])",
"legendFormat": "{{migration_id}} - {{table_name}}"
}]
},
{
"id": 2,
"title": "Data Lag",
"type": "stat",
"targets": [{
"expr": "migration_data_lag_seconds"
}],
"fieldConfig": {
"thresholds": {
"steps": [
{"value": 0, "color": "green"},
{"value": 60, "color": "yellow"},
{"value": 300, "color": "red"}
]
}
}
},
{
"id": 3,
"title": "Error Rate",
"type": "graph",
"targets": [{
"expr": "rate(migration_errors_total[5m])"
}]
}
]
name: Migration Monitoring
on:
push:
branches: [main]
jobs:
monitor-migration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start Monitoring
run: |
python migration_monitor.py start \
--migration-id ${{ github.sha }} \
--prometheus-url ${{ secrets.PROMETHEUS_URL }}
- name: Run Migration
run: |
python migrate.py --environment production
- name: Check Migration Health
run: |
python migration_monitor.py check \
--migration-id ${{ github.sha }} \
--max-lag 300
Focus on real-time visibility, proactive alerting, and comprehensive observability for zero-downtime migrations.
This plugin integrates with: