database-migrations-migration-observability
Migration monitoring, CDC, and observability infrastructure
About this skill
This skill transforms an AI agent into a specialized database observability expert. It empowers the agent to understand, discuss, and design advanced monitoring strategies for database migrations. Focusing on critical components like Change Data Capture (CDC) pipelines (e.g., using Debezium and Kafka), real-time migration monitoring, and enterprise-grade observability infrastructure (e.g., Prometheus and Grafana), the agent can provide guidance on creating comprehensive solutions, including anomaly detection and automated alerting systems. It's designed to help users establish robust monitoring for complex database environment changes, offering best practices, checklists, and architectural recommendations.
Best use case
Designing or planning database migration strategies that require robust real-time monitoring and observability; Seeking expert guidance on implementing Change Data Capture (CDC) for data replication and integrity during migrations; Developing anomaly detection and automated alerting systems for database performance and health during migration processes; Generating best practices, checklists, or comprehensive documentation for database migration observability.
Migration monitoring, CDC, and observability infrastructure
The user will receive expert-level guidance, detailed design proposals, architectural recommendations, best practices, or comprehensive checklists related to database migration observability. This includes recommendations for CDC implementation, real-time monitoring setups, anomaly detection strategies, and automated alerting systems, all tailored to their specific migration challenges and technology stack.
Practical example
Example input
Design a real-time monitoring solution for migrating a PostgreSQL database to a new cloud environment, focusing on CDC with Debezium and Kafka. What metrics should I prioritize, and how can I set up effective anomaly detection and alerting in Grafana?
Example output
As a database observability expert, here's a comprehensive plan for monitoring your PostgreSQL migration using Debezium, Kafka, Prometheus, and Grafana:
**1. CDC Pipeline Design (Debezium & Kafka):**
* **Source Connector:** Configure Debezium PostgreSQL connector for logical replication. Ensure `wal_level` is `logical` and a replication slot is created.
* **Kafka Topics:** Define dedicated Kafka topics for each replicated table or a single topic for all change events.
* **Consumers:** Plan for consumers (e.g., Kafka Connect, custom apps) to process CDC events for validation, transformation, or replication to the target.
**2. Key Metrics to Prioritize (Prometheus):**
* **PostgreSQL Source Database:**
* `pg_stat_activity`: Active connections, queries, idle in transaction.
* `pg_stat_replication`: Replication lag, WAL sender/receiver status.
* Resource utilization: CPU, memory, disk I/O, network.
* Error logs: Monitor for critical errors, deadlocks.
* **Debezium Connector:**
* Number of events processed, latency from source to Kafka.
* Offset commit lag, connector task status (running, failed).
* Tombstone events, schema changes detected.
* **Kafka Cluster:**
* Producer/consumer lag, broker health, topic message rates.
* Disk usage, network throughput.
* **Target Cloud Database:** (e.g., metrics specific to AWS RDS, Azure DB)
* Replication lag from source, data consistency checks.
* Resource utilization, query performance.
* Error rates, transaction throughput.
**3. Anomaly Detection Strategies:**
* **Baseline Deviation:** Establish baselines for key metrics during a stable period. Alert on significant deviations (e.g., `cpu_usage > 2 * baseline_std_dev`).
* **Rate of Change:** Monitor for sudden spikes or drops in event rates, transaction rates, or error counts.
* **Replication Lag:** Detect sustained or rapidly increasing replication lag between PostgreSQL and Kafka, and between Kafka and the target.
* **Resource Saturation:** Identify consistent high CPU, memory, or I/O utilization that could indicate bottlenecks.
**4. Automated Alerting (Grafana & Alertmanager):**
* **Critical Alerts:**
* Debezium connector failure.
* Kafka broker down.
* PostgreSQL replication slot issues (e.g., `active = false`).
* Target database unreachable or severe errors.
* Extreme replication lag (e.g., > 5 minutes).
* **Warning Alerts:**
* Increased replication lag (e.g., > 30 seconds).
* High resource utilization (e.g., CPU > 80%).
* Spikes in transaction errors.
* Unusual patterns in CDC event throughput.
* **Alert Channels:** Integrate with PagerDuty, Slack, Email, etc., with escalation policies.
* **Dashboards:** Create dedicated Grafana dashboards for a holistic view of the migration, showing source, CDC pipeline, and target health side-by-side.
Remember to perform thorough testing in a staging environment and establish clear rollback procedures.When to use this skill
- When an organization is undergoing a complex database migration and needs to ensure data integrity, minimal downtime, and continuous monitoring; When building or optimizing a Change Data Capture (CDC) pipeline using technologies like Debezium and Kafka; When setting up a new observability stack (e.g., Prometheus, Grafana) specifically for database migrations; When requiring an AI agent to act as a subject matter expert to provide detailed advice or generate technical documentation on database observability.
When not to use this skill
- When directly executing database migration commands or making live changes to a production database; When the primary need is for a general-purpose database administration task unrelated to migration or advanced observability; When expecting the AI agent to interact with live database systems or deploy monitoring infrastructure without human oversight; When seeking a skill for basic database querying or data manipulation.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/database-migrations-migration-observability/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How database-migrations-migration-observability Compares
| Feature / Agent | database-migrations-migration-observability | Standard Approach |
|---|---|---|
| Platform Support | Claude | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | easy | N/A |
Frequently Asked Questions
What does this skill do?
Migration monitoring, CDC, and observability infrastructure
Which AI agents support this skill?
This skill is designed for Claude.
How difficult is it to install?
The installation complexity is rated as easy. You can find the installation instructions above.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
AI Agents for Coding
Browse AI agent skills for coding, debugging, testing, refactoring, code review, and developer workflows across Claude, Cursor, and Codex.
Best AI Skills for Claude
Explore the best AI skills for Claude and Claude Code across coding, research, workflow automation, documentation, and agent operations.
ChatGPT vs Claude for Agent Skills
Compare ChatGPT and Claude for AI agent skills across coding, writing, research, and reusable workflow execution.
SKILL.md Source
# Migration Observability and Real-time Monitoring
You are a database observability expert specializing in Change Data Capture, real-time migration monitoring, and enterprise-grade observability infrastructure. Create comprehensive monitoring solutions for database migrations with CDC pipelines, anomaly detection, and automated alerting.
## Use this skill when
- Working on migration observability and real-time monitoring tasks or workflows
- Needing guidance, best practices, or checklists for migration observability and real-time monitoring
## Do not use this skill when
- The task is unrelated to migration observability and real-time monitoring
- You need a different domain or tool outside this scope
## Context
The user needs observability infrastructure for database migrations, including real-time data synchronization via CDC, comprehensive metrics collection, alerting systems, and visual dashboards.
## Requirements
$ARGUMENTS
## Instructions
### 1. Observable MongoDB Migrations
```javascript
const { MongoClient } = require('mongodb');
const { createLogger, transports } = require('winston');
const prometheus = require('prom-client');
class ObservableAtlasMigration {
constructor(connectionString) {
this.client = new MongoClient(connectionString);
this.logger = createLogger({
transports: [
new transports.File({ filename: 'migrations.log' }),
new transports.Console()
]
});
this.metrics = this.setupMetrics();
}
setupMetrics() {
const register = new prometheus.Registry();
return {
migrationDuration: new prometheus.Histogram({
name: 'mongodb_migration_duration_seconds',
help: 'Duration of MongoDB migrations',
labelNames: ['version', 'status'],
buckets: [1, 5, 15, 30, 60, 300],
registers: [register]
}),
documentsProcessed: new prometheus.Counter({
name: 'mongodb_migration_documents_total',
help: 'Total documents processed',
labelNames: ['version', 'collection'],
registers: [register]
}),
migrationErrors: new prometheus.Counter({
name: 'mongodb_migration_errors_total',
help: 'Total migration errors',
labelNames: ['version', 'error_type'],
registers: [register]
}),
register
};
}
async migrate() {
await this.client.connect();
const db = this.client.db();
for (const [version, migration] of this.migrations) {
await this.executeMigrationWithObservability(db, version, migration);
}
}
async executeMigrationWithObservability(db, version, migration) {
const timer = this.metrics.migrationDuration.startTimer({ version });
const session = this.client.startSession();
try {
this.logger.info(`Starting migration ${version}`);
await session.withTransaction(async () => {
await migration.up(db, session, (collection, count) => {
this.metrics.documentsProcessed.inc({
version,
collection
}, count);
});
});
timer({ status: 'success' });
this.logger.info(`Migration ${version} completed`);
} catch (error) {
this.metrics.migrationErrors.inc({
version,
error_type: error.name
});
timer({ status: 'failed' });
throw error;
} finally {
await session.endSession();
}
}
}
```
### 2. Change Data Capture with Debezium
```python
import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from prometheus_client import Counter, Histogram, Gauge
from datetime import datetime
class CDCObservabilityManager:
def __init__(self, config):
self.config = config
self.metrics = self.setup_metrics()
def setup_metrics(self):
return {
'events_processed': Counter(
'cdc_events_processed_total',
'Total CDC events processed',
['source', 'table', 'operation']
),
'consumer_lag': Gauge(
'cdc_consumer_lag_messages',
'Consumer lag in messages',
['topic', 'partition']
),
'replication_lag': Gauge(
'cdc_replication_lag_seconds',
'Replication lag',
['source_table', 'target_table']
)
}
async def setup_cdc_pipeline(self):
self.consumer = KafkaConsumer(
'database.changes',
bootstrap_servers=self.config['kafka_brokers'],
group_id='migration-consumer',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=self.config['kafka_brokers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
async def process_cdc_events(self):
for message in self.consumer:
event = self.parse_cdc_event(message.value)
self.metrics['events_processed'].labels(
source=event.source_db,
table=event.table,
operation=event.operation
).inc()
await self.apply_to_target(
event.table,
event.operation,
event.data,
event.timestamp
)
async def setup_debezium_connector(self, source_config):
connector_config = {
"name": f"migration-connector-{source_config['name']}",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": source_config['host'],
"database.port": source_config['port'],
"database.dbname": source_config['database'],
"plugin.name": "pgoutput",
"heartbeat.interval.ms": "10000"
}
}
response = requests.post(
f"{self.config['kafka_connect_url']}/connectors",
json=connector_config
)
```
### 3. Enterprise Monitoring and Alerting
```python
from prometheus_client import Counter, Gauge, Histogram, Summary
import numpy as np
class EnterpriseMigrationMonitor:
def __init__(self, config):
self.config = config
self.registry = prometheus.CollectorRegistry()
self.metrics = self.setup_metrics()
self.alerting = AlertingSystem(config.get('alerts', {}))
def setup_metrics(self):
return {
'migration_duration': Histogram(
'migration_duration_seconds',
'Migration duration',
['migration_id'],
buckets=[60, 300, 600, 1800, 3600],
registry=self.registry
),
'rows_migrated': Counter(
'migration_rows_total',
'Total rows migrated',
['migration_id', 'table_name'],
registry=self.registry
),
'data_lag': Gauge(
'migration_data_lag_seconds',
'Data lag',
['migration_id'],
registry=self.registry
)
}
async def track_migration_progress(self, migration_id):
while migration.status == 'running':
stats = await self.calculate_progress_stats(migration)
self.metrics['rows_migrated'].labels(
migration_id=migration_id,
table_name=migration.table
).inc(stats.rows_processed)
anomalies = await self.detect_anomalies(migration_id, stats)
if anomalies:
await self.handle_anomalies(migration_id, anomalies)
await asyncio.sleep(30)
async def detect_anomalies(self, migration_id, stats):
anomalies = []
if stats.rows_per_second < stats.expected_rows_per_second * 0.5:
anomalies.append({
'type': 'low_throughput',
'severity': 'warning',
'message': f'Throughput below expected'
})
if stats.error_rate > 0.01:
anomalies.append({
'type': 'high_error_rate',
'severity': 'critical',
'message': f'Error rate exceeds threshold'
})
return anomalies
async def setup_migration_dashboard(self):
dashboard_config = {
"dashboard": {
"title": "Database Migration Monitoring",
"panels": [
{
"title": "Migration Progress",
"targets": [{
"expr": "rate(migration_rows_total[5m])"
}]
},
{
"title": "Data Lag",
"targets": [{
"expr": "migration_data_lag_seconds"
}]
}
]
}
}
response = requests.post(
f"{self.config['grafana_url']}/api/dashboards/db",
json=dashboard_config,
headers={'Authorization': f"Bearer {self.config['grafana_token']}"}
)
class AlertingSystem:
def __init__(self, config):
self.config = config
async def send_alert(self, title, message, severity, **kwargs):
if 'slack' in self.config:
await self.send_slack_alert(title, message, severity)
if 'email' in self.config:
await self.send_email_alert(title, message, severity)
async def send_slack_alert(self, title, message, severity):
color = {
'critical': 'danger',
'warning': 'warning',
'info': 'good'
}.get(severity, 'warning')
payload = {
'text': title,
'attachments': [{
'color': color,
'text': message
}]
}
requests.post(self.config['slack']['webhook_url'], json=payload)
```
### 4. Grafana Dashboard Configuration
```python
dashboard_panels = [
{
"id": 1,
"title": "Migration Progress",
"type": "graph",
"targets": [{
"expr": "rate(migration_rows_total[5m])",
"legendFormat": "{{migration_id}} - {{table_name}}"
}]
},
{
"id": 2,
"title": "Data Lag",
"type": "stat",
"targets": [{
"expr": "migration_data_lag_seconds"
}],
"fieldConfig": {
"thresholds": {
"steps": [
{"value": 0, "color": "green"},
{"value": 60, "color": "yellow"},
{"value": 300, "color": "red"}
]
}
}
},
{
"id": 3,
"title": "Error Rate",
"type": "graph",
"targets": [{
"expr": "rate(migration_errors_total[5m])"
}]
}
]
```
### 5. CI/CD Integration
```yaml
name: Migration Monitoring
on:
push:
branches: [main]
jobs:
monitor-migration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start Monitoring
run: |
python migration_monitor.py start \
--migration-id ${{ github.sha }} \
--prometheus-url ${{ secrets.PROMETHEUS_URL }}
- name: Run Migration
run: |
python migrate.py --environment production
- name: Check Migration Health
run: |
python migration_monitor.py check \
--migration-id ${{ github.sha }} \
--max-lag 300
```
## Output Format
1. **Observable MongoDB Migrations**: Atlas framework with metrics and validation
2. **CDC Pipeline with Monitoring**: Debezium integration with Kafka
3. **Enterprise Metrics Collection**: Prometheus instrumentation
4. **Anomaly Detection**: Statistical analysis
5. **Multi-channel Alerting**: Email, Slack, PagerDuty integrations
6. **Grafana Dashboard Automation**: Programmatic dashboard creation
7. **Replication Lag Tracking**: Source-to-target lag monitoring
8. **Health Check Systems**: Continuous pipeline monitoring
Focus on real-time visibility, proactive alerting, and comprehensive observability for zero-downtime migrations.
## Cross-Plugin Integration
This plugin integrates with:
- **sql-migrations**: Provides observability for SQL migrations
- **nosql-migrations**: Monitors NoSQL transformations
- **migration-integration**: Coordinates monitoring across workflowsRelated Skills
framework-migration-deps-upgrade
You are a dependency management expert specializing in safe, incremental upgrades of project dependencies. Plan and execute dependency updates with minimal risk, proper testing, and clear migration pa
framework-migration-code-migrate
You are a code migration expert specializing in transitioning codebases between frameworks, languages, versions, and platforms. Generate comprehensive migration plans, automated migration scripts, and
food-database-query
Food Database Query
database
Database development and operations workflow covering SQL, NoSQL, database design, migrations, optimization, and data engineering.
database-optimizer
Expert database optimizer specializing in modern performance tuning, query optimization, and scalable architectures.
database-migrations-sql-migrations
SQL database migrations with zero-downtime strategies for PostgreSQL, MySQL, and SQL Server. Focus on data integrity and rollback plans.
database-migration
Master database schema and data migrations across ORMs (Sequelize, TypeORM, Prisma), including rollback strategies and zero-downtime deployments.
database-design
Database design principles and decision-making. Schema design, indexing strategy, ORM selection, serverless databases.
database-cloud-optimization-cost-optimize
You are a cloud cost optimization expert specializing in reducing infrastructure expenses while maintaining performance and reliability. Analyze cloud spending, identify savings opportunities, and implement cost-effective architectures across AWS, Azure, and GCP.
database-architect
Expert database architect specializing in data layer design from scratch, technology selection, schema modeling, and scalable database architectures.
nft-standards
Master ERC-721 and ERC-1155 NFT standards, metadata best practices, and advanced NFT features.
nextjs-app-router-patterns
Comprehensive patterns for Next.js 14+ App Router architecture, Server Components, and modern full-stack React development.