database-migrations-migration-observability
Migration monitoring, CDC, and observability infrastructure
Best use case
database-migrations-migration-observability is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Migration monitoring, CDC, and observability infrastructure
Teams using database-migrations-migration-observability should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/database-migrations-migration-observability/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How database-migrations-migration-observability Compares
| Feature / Agent | database-migrations-migration-observability | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Migration monitoring, CDC, and observability infrastructure
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Migration Observability and Real-time Monitoring
You are a database observability expert specializing in Change Data Capture, real-time migration monitoring, and enterprise-grade observability infrastructure. Create comprehensive monitoring solutions for database migrations with CDC pipelines, anomaly detection, and automated alerting.
## Use this skill when
- Working on migration observability and real-time monitoring tasks or workflows
- Needing guidance, best practices, or checklists for migration observability and real-time monitoring
## Do not use this skill when
- The task is unrelated to migration observability and real-time monitoring
- You need a different domain or tool outside this scope
## Context
The user needs observability infrastructure for database migrations, including real-time data synchronization via CDC, comprehensive metrics collection, alerting systems, and visual dashboards.
## Requirements
$ARGUMENTS
## Instructions
### 1. Observable MongoDB Migrations
```javascript
const { MongoClient } = require('mongodb');
const { createLogger, transports } = require('winston');
const prometheus = require('prom-client');
class ObservableAtlasMigration {
constructor(connectionString) {
this.client = new MongoClient(connectionString);
this.logger = createLogger({
transports: [
new transports.File({ filename: 'migrations.log' }),
new transports.Console()
]
});
this.metrics = this.setupMetrics();
}
setupMetrics() {
const register = new prometheus.Registry();
return {
migrationDuration: new prometheus.Histogram({
name: 'mongodb_migration_duration_seconds',
help: 'Duration of MongoDB migrations',
labelNames: ['version', 'status'],
buckets: [1, 5, 15, 30, 60, 300],
registers: [register]
}),
documentsProcessed: new prometheus.Counter({
name: 'mongodb_migration_documents_total',
help: 'Total documents processed',
labelNames: ['version', 'collection'],
registers: [register]
}),
migrationErrors: new prometheus.Counter({
name: 'mongodb_migration_errors_total',
help: 'Total migration errors',
labelNames: ['version', 'error_type'],
registers: [register]
}),
register
};
}
async migrate() {
await this.client.connect();
const db = this.client.db();
for (const [version, migration] of this.migrations) {
await this.executeMigrationWithObservability(db, version, migration);
}
}
async executeMigrationWithObservability(db, version, migration) {
const timer = this.metrics.migrationDuration.startTimer({ version });
const session = this.client.startSession();
try {
this.logger.info(`Starting migration ${version}`);
await session.withTransaction(async () => {
await migration.up(db, session, (collection, count) => {
this.metrics.documentsProcessed.inc({
version,
collection
}, count);
});
});
timer({ status: 'success' });
this.logger.info(`Migration ${version} completed`);
} catch (error) {
this.metrics.migrationErrors.inc({
version,
error_type: error.name
});
timer({ status: 'failed' });
throw error;
} finally {
await session.endSession();
}
}
}
```
### 2. Change Data Capture with Debezium
```python
import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from prometheus_client import Counter, Histogram, Gauge
from datetime import datetime
class CDCObservabilityManager:
def __init__(self, config):
self.config = config
self.metrics = self.setup_metrics()
def setup_metrics(self):
return {
'events_processed': Counter(
'cdc_events_processed_total',
'Total CDC events processed',
['source', 'table', 'operation']
),
'consumer_lag': Gauge(
'cdc_consumer_lag_messages',
'Consumer lag in messages',
['topic', 'partition']
),
'replication_lag': Gauge(
'cdc_replication_lag_seconds',
'Replication lag',
['source_table', 'target_table']
)
}
async def setup_cdc_pipeline(self):
self.consumer = KafkaConsumer(
'database.changes',
bootstrap_servers=self.config['kafka_brokers'],
group_id='migration-consumer',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=self.config['kafka_brokers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
async def process_cdc_events(self):
for message in self.consumer:
event = self.parse_cdc_event(message.value)
self.metrics['events_processed'].labels(
source=event.source_db,
table=event.table,
operation=event.operation
).inc()
await self.apply_to_target(
event.table,
event.operation,
event.data,
event.timestamp
)
async def setup_debezium_connector(self, source_config):
connector_config = {
"name": f"migration-connector-{source_config['name']}",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": source_config['host'],
"database.port": source_config['port'],
"database.dbname": source_config['database'],
"plugin.name": "pgoutput",
"heartbeat.interval.ms": "10000"
}
}
response = requests.post(
f"{self.config['kafka_connect_url']}/connectors",
json=connector_config
)
```
### 3. Enterprise Monitoring and Alerting
```python
from prometheus_client import Counter, Gauge, Histogram, Summary
import numpy as np
class EnterpriseMigrationMonitor:
def __init__(self, config):
self.config = config
self.registry = prometheus.CollectorRegistry()
self.metrics = self.setup_metrics()
self.alerting = AlertingSystem(config.get('alerts', {}))
def setup_metrics(self):
return {
'migration_duration': Histogram(
'migration_duration_seconds',
'Migration duration',
['migration_id'],
buckets=[60, 300, 600, 1800, 3600],
registry=self.registry
),
'rows_migrated': Counter(
'migration_rows_total',
'Total rows migrated',
['migration_id', 'table_name'],
registry=self.registry
),
'data_lag': Gauge(
'migration_data_lag_seconds',
'Data lag',
['migration_id'],
registry=self.registry
)
}
async def track_migration_progress(self, migration_id):
while migration.status == 'running':
stats = await self.calculate_progress_stats(migration)
self.metrics['rows_migrated'].labels(
migration_id=migration_id,
table_name=migration.table
).inc(stats.rows_processed)
anomalies = await self.detect_anomalies(migration_id, stats)
if anomalies:
await self.handle_anomalies(migration_id, anomalies)
await asyncio.sleep(30)
async def detect_anomalies(self, migration_id, stats):
anomalies = []
if stats.rows_per_second < stats.expected_rows_per_second * 0.5:
anomalies.append({
'type': 'low_throughput',
'severity': 'warning',
'message': f'Throughput below expected'
})
if stats.error_rate > 0.01:
anomalies.append({
'type': 'high_error_rate',
'severity': 'critical',
'message': f'Error rate exceeds threshold'
})
return anomalies
async def setup_migration_dashboard(self):
dashboard_config = {
"dashboard": {
"title": "Database Migration Monitoring",
"panels": [
{
"title": "Migration Progress",
"targets": [{
"expr": "rate(migration_rows_total[5m])"
}]
},
{
"title": "Data Lag",
"targets": [{
"expr": "migration_data_lag_seconds"
}]
}
]
}
}
response = requests.post(
f"{self.config['grafana_url']}/api/dashboards/db",
json=dashboard_config,
headers={'Authorization': f"Bearer {self.config['grafana_token']}"}
)
class AlertingSystem:
def __init__(self, config):
self.config = config
async def send_alert(self, title, message, severity, **kwargs):
if 'slack' in self.config:
await self.send_slack_alert(title, message, severity)
if 'email' in self.config:
await self.send_email_alert(title, message, severity)
async def send_slack_alert(self, title, message, severity):
color = {
'critical': 'danger',
'warning': 'warning',
'info': 'good'
}.get(severity, 'warning')
payload = {
'text': title,
'attachments': [{
'color': color,
'text': message
}]
}
requests.post(self.config['slack']['webhook_url'], json=payload)
```
### 4. Grafana Dashboard Configuration
```python
dashboard_panels = [
{
"id": 1,
"title": "Migration Progress",
"type": "graph",
"targets": [{
"expr": "rate(migration_rows_total[5m])",
"legendFormat": "{{migration_id}} - {{table_name}}"
}]
},
{
"id": 2,
"title": "Data Lag",
"type": "stat",
"targets": [{
"expr": "migration_data_lag_seconds"
}],
"fieldConfig": {
"thresholds": {
"steps": [
{"value": 0, "color": "green"},
{"value": 60, "color": "yellow"},
{"value": 300, "color": "red"}
]
}
}
},
{
"id": 3,
"title": "Error Rate",
"type": "graph",
"targets": [{
"expr": "rate(migration_errors_total[5m])"
}]
}
]
```
### 5. CI/CD Integration
```yaml
name: Migration Monitoring
on:
push:
branches: [main]
jobs:
monitor-migration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start Monitoring
run: |
python migration_monitor.py start \
--migration-id ${{ github.sha }} \
--prometheus-url ${{ secrets.PROMETHEUS_URL }}
- name: Run Migration
run: |
python migrate.py --environment production
- name: Check Migration Health
run: |
python migration_monitor.py check \
--migration-id ${{ github.sha }} \
--max-lag 300
```
## Output Format
1. **Observable MongoDB Migrations**: Atlas framework with metrics and validation
2. **CDC Pipeline with Monitoring**: Debezium integration with Kafka
3. **Enterprise Metrics Collection**: Prometheus instrumentation
4. **Anomaly Detection**: Statistical analysis
5. **Multi-channel Alerting**: Email, Slack, PagerDuty integrations
6. **Grafana Dashboard Automation**: Programmatic dashboard creation
7. **Replication Lag Tracking**: Source-to-target lag monitoring
8. **Health Check Systems**: Continuous pipeline monitoring
Focus on real-time visibility, proactive alerting, and comprehensive observability for zero-downtime migrations.
## Cross-Plugin Integration
This plugin integrates with:
- **sql-migrations**: Provides observability for SQL migrations
- **nosql-migrations**: Monitors NoSQL transformations
- **migration-integration**: Coordinates monitoring across workflowsRelated Skills
vector-database-engineer
Expert in vector databases, embedding strategies, and semantic search implementation. Masters Pinecone, Weaviate, Qdrant, Milvus, and pgvector for RAG applications, recommendation systems, and similar
sqlmap-database-pentesting
This skill should be used when the user asks to "automate SQL injection testing," "enumerate database structure," "extract database credentials using sqlmap," "dump tables and columns...
service-mesh-observability
Implement comprehensive observability for service meshes including distributed tracing, metrics, and visualization. Use when setting up mesh monitoring, debugging latency issues, or implementing SL...
observability-monitoring-slo-implement
You are an SLO (Service Level Objective) expert specializing in implementing reliability standards and error budget-based practices. Design SLO frameworks, define SLIs, and build monitoring that ba...
observability-monitoring-monitor-setup
You are a monitoring and observability expert specializing in implementing comprehensive monitoring solutions. Set up metrics collection, distributed tracing, log aggregation, and create insightful da
observability-engineer
Build production-ready monitoring, logging, and tracing systems. Implements comprehensive observability strategies, SLI/SLO management, and incident response workflows.
godot-4-migration
Specialized guide for migrating Godot 3.x projects to Godot 4 (GDScript 2.0), covering syntax changes, Tweens, and exports.
framework-migration-legacy-modernize
Orchestrate a comprehensive legacy system modernization using the strangler fig pattern, enabling gradual replacement of outdated components while maintaining continuous business operations through ex
framework-migration-deps-upgrade
You are a dependency management expert specializing in safe, incremental upgrades of project dependencies. Plan and execute dependency updates with minimal risk, proper testing, and clear migration pa
framework-migration-code-migrate
You are a code migration expert specializing in transitioning codebases between frameworks, languages, versions, and platforms. Generate comprehensive migration plans, automated migration scripts, and
database
Database development and operations workflow covering SQL, NoSQL, database design, migrations, optimization, and data engineering.
database-optimizer
Expert database optimizer specializing in modern performance tuning, query optimization, and scalable architectures.