database-migrations-migration-observability

Migration monitoring, CDC, and observability infrastructure

31,392 stars
Complexity: easy

About this skill

This skill transforms an AI agent into a specialized database observability expert. It empowers the agent to understand, discuss, and design advanced monitoring strategies for database migrations. Focusing on critical components like Change Data Capture (CDC) pipelines (e.g., using Debezium and Kafka), real-time migration monitoring, and enterprise-grade observability infrastructure (e.g., Prometheus and Grafana), the agent can provide guidance on creating comprehensive solutions, including anomaly detection and automated alerting systems. It's designed to help users establish robust monitoring for complex database environment changes, offering best practices, checklists, and architectural recommendations.

Best use case

Designing or planning database migration strategies that require robust real-time monitoring and observability; Seeking expert guidance on implementing Change Data Capture (CDC) for data replication and integrity during migrations; Developing anomaly detection and automated alerting systems for database performance and health during migration processes; Generating best practices, checklists, or comprehensive documentation for database migration observability.

Migration monitoring, CDC, and observability infrastructure

The user will receive expert-level guidance, detailed design proposals, architectural recommendations, best practices, or comprehensive checklists related to database migration observability. This includes recommendations for CDC implementation, real-time monitoring setups, anomaly detection strategies, and automated alerting systems, all tailored to their specific migration challenges and technology stack.

Practical example

Example input

Design a real-time monitoring solution for migrating a PostgreSQL database to a new cloud environment, focusing on CDC with Debezium and Kafka. What metrics should I prioritize, and how can I set up effective anomaly detection and alerting in Grafana?

Example output

As a database observability expert, here's a comprehensive plan for monitoring your PostgreSQL migration using Debezium, Kafka, Prometheus, and Grafana:

**1. CDC Pipeline Design (Debezium & Kafka):**
*   **Source Connector:** Configure Debezium PostgreSQL connector for logical replication. Ensure `wal_level` is `logical` and a replication slot is created.
*   **Kafka Topics:** Define dedicated Kafka topics for each replicated table or a single topic for all change events.
*   **Consumers:** Plan for consumers (e.g., Kafka Connect, custom apps) to process CDC events for validation, transformation, or replication to the target.

**2. Key Metrics to Prioritize (Prometheus):**
*   **PostgreSQL Source Database:**
    *   `pg_stat_activity`: Active connections, queries, idle in transaction.
    *   `pg_stat_replication`: Replication lag, WAL sender/receiver status.
    *   Resource utilization: CPU, memory, disk I/O, network.
    *   Error logs: Monitor for critical errors, deadlocks.
*   **Debezium Connector:**
    *   Number of events processed, latency from source to Kafka.
    *   Offset commit lag, connector task status (running, failed).
    *   Tombstone events, schema changes detected.
*   **Kafka Cluster:**
    *   Producer/consumer lag, broker health, topic message rates.
    *   Disk usage, network throughput.
*   **Target Cloud Database:** (e.g., metrics specific to AWS RDS, Azure DB)
    *   Replication lag from source, data consistency checks.
    *   Resource utilization, query performance.
    *   Error rates, transaction throughput.

**3. Anomaly Detection Strategies:**
*   **Baseline Deviation:** Establish baselines for key metrics during a stable period. Alert on significant deviations (e.g., `cpu_usage > 2 * baseline_std_dev`).
*   **Rate of Change:** Monitor for sudden spikes or drops in event rates, transaction rates, or error counts.
*   **Replication Lag:** Detect sustained or rapidly increasing replication lag between PostgreSQL and Kafka, and between Kafka and the target.
*   **Resource Saturation:** Identify consistent high CPU, memory, or I/O utilization that could indicate bottlenecks.

**4. Automated Alerting (Grafana & Alertmanager):**
*   **Critical Alerts:**
    *   Debezium connector failure.
    *   Kafka broker down.
    *   PostgreSQL replication slot issues (e.g., `active = false`).
    *   Target database unreachable or severe errors.
    *   Extreme replication lag (e.g., > 5 minutes).
*   **Warning Alerts:**
    *   Increased replication lag (e.g., > 30 seconds).
    *   High resource utilization (e.g., CPU > 80%).
    *   Spikes in transaction errors.
    *   Unusual patterns in CDC event throughput.
*   **Alert Channels:** Integrate with PagerDuty, Slack, Email, etc., with escalation policies.
*   **Dashboards:** Create dedicated Grafana dashboards for a holistic view of the migration, showing source, CDC pipeline, and target health side-by-side.

Remember to perform thorough testing in a staging environment and establish clear rollback procedures.

When to use this skill

  • When an organization is undergoing a complex database migration and needs to ensure data integrity, minimal downtime, and continuous monitoring; When building or optimizing a Change Data Capture (CDC) pipeline using technologies like Debezium and Kafka; When setting up a new observability stack (e.g., Prometheus, Grafana) specifically for database migrations; When requiring an AI agent to act as a subject matter expert to provide detailed advice or generate technical documentation on database observability.

When not to use this skill

  • When directly executing database migration commands or making live changes to a production database; When the primary need is for a general-purpose database administration task unrelated to migration or advanced observability; When expecting the AI agent to interact with live database systems or deploy monitoring infrastructure without human oversight; When seeking a skill for basic database querying or data manipulation.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/database-migrations-migration-observability/SKILL.md --create-dirs "https://raw.githubusercontent.com/sickn33/antigravity-awesome-skills/main/plugins/antigravity-awesome-skills-claude/skills/database-migrations-migration-observability/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/database-migrations-migration-observability/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How database-migrations-migration-observability Compares

Feature / Agentdatabase-migrations-migration-observabilityStandard Approach
Platform SupportClaudeLimited / Varies
Context Awareness High Baseline
Installation ComplexityeasyN/A

Frequently Asked Questions

What does this skill do?

Migration monitoring, CDC, and observability infrastructure

Which AI agents support this skill?

This skill is designed for Claude.

How difficult is it to install?

The installation complexity is rated as easy. You can find the installation instructions above.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# Migration Observability and Real-time Monitoring

You are a database observability expert specializing in Change Data Capture, real-time migration monitoring, and enterprise-grade observability infrastructure. Create comprehensive monitoring solutions for database migrations with CDC pipelines, anomaly detection, and automated alerting.

## Use this skill when

- Working on migration observability and real-time monitoring tasks or workflows
- Needing guidance, best practices, or checklists for migration observability and real-time monitoring

## Do not use this skill when

- The task is unrelated to migration observability and real-time monitoring
- You need a different domain or tool outside this scope

## Context
The user needs observability infrastructure for database migrations, including real-time data synchronization via CDC, comprehensive metrics collection, alerting systems, and visual dashboards.

## Requirements
$ARGUMENTS

## Instructions

### 1. Observable MongoDB Migrations

```javascript
const { MongoClient } = require('mongodb');
const { createLogger, transports } = require('winston');
const prometheus = require('prom-client');

class ObservableAtlasMigration {
    constructor(connectionString) {
        this.client = new MongoClient(connectionString);
        this.logger = createLogger({
            transports: [
                new transports.File({ filename: 'migrations.log' }),
                new transports.Console()
            ]
        });
        this.metrics = this.setupMetrics();
    }

    setupMetrics() {
        const register = new prometheus.Registry();

        return {
            migrationDuration: new prometheus.Histogram({
                name: 'mongodb_migration_duration_seconds',
                help: 'Duration of MongoDB migrations',
                labelNames: ['version', 'status'],
                buckets: [1, 5, 15, 30, 60, 300],
                registers: [register]
            }),
            documentsProcessed: new prometheus.Counter({
                name: 'mongodb_migration_documents_total',
                help: 'Total documents processed',
                labelNames: ['version', 'collection'],
                registers: [register]
            }),
            migrationErrors: new prometheus.Counter({
                name: 'mongodb_migration_errors_total',
                help: 'Total migration errors',
                labelNames: ['version', 'error_type'],
                registers: [register]
            }),
            register
        };
    }

    async migrate() {
        await this.client.connect();
        const db = this.client.db();

        for (const [version, migration] of this.migrations) {
            await this.executeMigrationWithObservability(db, version, migration);
        }
    }

    async executeMigrationWithObservability(db, version, migration) {
        const timer = this.metrics.migrationDuration.startTimer({ version });
        const session = this.client.startSession();

        try {
            this.logger.info(`Starting migration ${version}`);

            await session.withTransaction(async () => {
                await migration.up(db, session, (collection, count) => {
                    this.metrics.documentsProcessed.inc({
                        version,
                        collection
                    }, count);
                });
            });

            timer({ status: 'success' });
            this.logger.info(`Migration ${version} completed`);

        } catch (error) {
            this.metrics.migrationErrors.inc({
                version,
                error_type: error.name
            });
            timer({ status: 'failed' });
            throw error;
        } finally {
            await session.endSession();
        }
    }
}
```

### 2. Change Data Capture with Debezium

```python
import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from prometheus_client import Counter, Histogram, Gauge
from datetime import datetime

class CDCObservabilityManager:
    def __init__(self, config):
        self.config = config
        self.metrics = self.setup_metrics()

    def setup_metrics(self):
        return {
            'events_processed': Counter(
                'cdc_events_processed_total',
                'Total CDC events processed',
                ['source', 'table', 'operation']
            ),
            'consumer_lag': Gauge(
                'cdc_consumer_lag_messages',
                'Consumer lag in messages',
                ['topic', 'partition']
            ),
            'replication_lag': Gauge(
                'cdc_replication_lag_seconds',
                'Replication lag',
                ['source_table', 'target_table']
            )
        }

    async def setup_cdc_pipeline(self):
        self.consumer = KafkaConsumer(
            'database.changes',
            bootstrap_servers=self.config['kafka_brokers'],
            group_id='migration-consumer',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )

        self.producer = KafkaProducer(
            bootstrap_servers=self.config['kafka_brokers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    async def process_cdc_events(self):
        for message in self.consumer:
            event = self.parse_cdc_event(message.value)

            self.metrics['events_processed'].labels(
                source=event.source_db,
                table=event.table,
                operation=event.operation
            ).inc()

            await self.apply_to_target(
                event.table,
                event.operation,
                event.data,
                event.timestamp
            )

    async def setup_debezium_connector(self, source_config):
        connector_config = {
            "name": f"migration-connector-{source_config['name']}",
            "config": {
                "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                "database.hostname": source_config['host'],
                "database.port": source_config['port'],
                "database.dbname": source_config['database'],
                "plugin.name": "pgoutput",
                "heartbeat.interval.ms": "10000"
            }
        }

        response = requests.post(
            f"{self.config['kafka_connect_url']}/connectors",
            json=connector_config
        )
```

### 3. Enterprise Monitoring and Alerting

```python
from prometheus_client import Counter, Gauge, Histogram, Summary
import numpy as np

class EnterpriseMigrationMonitor:
    def __init__(self, config):
        self.config = config
        self.registry = prometheus.CollectorRegistry()
        self.metrics = self.setup_metrics()
        self.alerting = AlertingSystem(config.get('alerts', {}))

    def setup_metrics(self):
        return {
            'migration_duration': Histogram(
                'migration_duration_seconds',
                'Migration duration',
                ['migration_id'],
                buckets=[60, 300, 600, 1800, 3600],
                registry=self.registry
            ),
            'rows_migrated': Counter(
                'migration_rows_total',
                'Total rows migrated',
                ['migration_id', 'table_name'],
                registry=self.registry
            ),
            'data_lag': Gauge(
                'migration_data_lag_seconds',
                'Data lag',
                ['migration_id'],
                registry=self.registry
            )
        }

    async def track_migration_progress(self, migration_id):
        while migration.status == 'running':
            stats = await self.calculate_progress_stats(migration)

            self.metrics['rows_migrated'].labels(
                migration_id=migration_id,
                table_name=migration.table
            ).inc(stats.rows_processed)

            anomalies = await self.detect_anomalies(migration_id, stats)
            if anomalies:
                await self.handle_anomalies(migration_id, anomalies)

            await asyncio.sleep(30)

    async def detect_anomalies(self, migration_id, stats):
        anomalies = []

        if stats.rows_per_second < stats.expected_rows_per_second * 0.5:
            anomalies.append({
                'type': 'low_throughput',
                'severity': 'warning',
                'message': f'Throughput below expected'
            })

        if stats.error_rate > 0.01:
            anomalies.append({
                'type': 'high_error_rate',
                'severity': 'critical',
                'message': f'Error rate exceeds threshold'
            })

        return anomalies

    async def setup_migration_dashboard(self):
        dashboard_config = {
            "dashboard": {
                "title": "Database Migration Monitoring",
                "panels": [
                    {
                        "title": "Migration Progress",
                        "targets": [{
                            "expr": "rate(migration_rows_total[5m])"
                        }]
                    },
                    {
                        "title": "Data Lag",
                        "targets": [{
                            "expr": "migration_data_lag_seconds"
                        }]
                    }
                ]
            }
        }

        response = requests.post(
            f"{self.config['grafana_url']}/api/dashboards/db",
            json=dashboard_config,
            headers={'Authorization': f"Bearer {self.config['grafana_token']}"}
        )

class AlertingSystem:
    def __init__(self, config):
        self.config = config

    async def send_alert(self, title, message, severity, **kwargs):
        if 'slack' in self.config:
            await self.send_slack_alert(title, message, severity)

        if 'email' in self.config:
            await self.send_email_alert(title, message, severity)

    async def send_slack_alert(self, title, message, severity):
        color = {
            'critical': 'danger',
            'warning': 'warning',
            'info': 'good'
        }.get(severity, 'warning')

        payload = {
            'text': title,
            'attachments': [{
                'color': color,
                'text': message
            }]
        }

        requests.post(self.config['slack']['webhook_url'], json=payload)
```

### 4. Grafana Dashboard Configuration

```python
dashboard_panels = [
    {
        "id": 1,
        "title": "Migration Progress",
        "type": "graph",
        "targets": [{
            "expr": "rate(migration_rows_total[5m])",
            "legendFormat": "{{migration_id}} - {{table_name}}"
        }]
    },
    {
        "id": 2,
        "title": "Data Lag",
        "type": "stat",
        "targets": [{
            "expr": "migration_data_lag_seconds"
        }],
        "fieldConfig": {
            "thresholds": {
                "steps": [
                    {"value": 0, "color": "green"},
                    {"value": 60, "color": "yellow"},
                    {"value": 300, "color": "red"}
                ]
            }
        }
    },
    {
        "id": 3,
        "title": "Error Rate",
        "type": "graph",
        "targets": [{
            "expr": "rate(migration_errors_total[5m])"
        }]
    }
]
```

### 5. CI/CD Integration

```yaml
name: Migration Monitoring

on:
  push:
    branches: [main]

jobs:
  monitor-migration:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Start Monitoring
        run: |
          python migration_monitor.py start \
            --migration-id ${{ github.sha }} \
            --prometheus-url ${{ secrets.PROMETHEUS_URL }}

      - name: Run Migration
        run: |
          python migrate.py --environment production

      - name: Check Migration Health
        run: |
          python migration_monitor.py check \
            --migration-id ${{ github.sha }} \
            --max-lag 300
```

## Output Format

1. **Observable MongoDB Migrations**: Atlas framework with metrics and validation
2. **CDC Pipeline with Monitoring**: Debezium integration with Kafka
3. **Enterprise Metrics Collection**: Prometheus instrumentation
4. **Anomaly Detection**: Statistical analysis
5. **Multi-channel Alerting**: Email, Slack, PagerDuty integrations
6. **Grafana Dashboard Automation**: Programmatic dashboard creation
7. **Replication Lag Tracking**: Source-to-target lag monitoring
8. **Health Check Systems**: Continuous pipeline monitoring

Focus on real-time visibility, proactive alerting, and comprehensive observability for zero-downtime migrations.

## Cross-Plugin Integration

This plugin integrates with:
- **sql-migrations**: Provides observability for SQL migrations
- **nosql-migrations**: Monitors NoSQL transformations
- **migration-integration**: Coordinates monitoring across workflows

Related Skills

framework-migration-deps-upgrade

31392
from sickn33/antigravity-awesome-skills

You are a dependency management expert specializing in safe, incremental upgrades of project dependencies. Plan and execute dependency updates with minimal risk, proper testing, and clear migration pa

Software DevelopmentClaude

framework-migration-code-migrate

31392
from sickn33/antigravity-awesome-skills

You are a code migration expert specializing in transitioning codebases between frameworks, languages, versions, and platforms. Generate comprehensive migration plans, automated migration scripts, and

Code MigrationClaude

food-database-query

31392
from sickn33/antigravity-awesome-skills

Food Database Query

NutritionClaude

database

31392
from sickn33/antigravity-awesome-skills

Database development and operations workflow covering SQL, NoSQL, database design, migrations, optimization, and data engineering.

Workflow & Automation BundlesClaude

database-optimizer

31392
from sickn33/antigravity-awesome-skills

Expert database optimizer specializing in modern performance tuning, query optimization, and scalable architectures.

Database ManagementClaude

database-migrations-sql-migrations

31392
from sickn33/antigravity-awesome-skills

SQL database migrations with zero-downtime strategies for PostgreSQL, MySQL, and SQL Server. Focus on data integrity and rollback plans.

Database ManagementClaude

database-migration

31392
from sickn33/antigravity-awesome-skills

Master database schema and data migrations across ORMs (Sequelize, TypeORM, Prisma), including rollback strategies and zero-downtime deployments.

Database ManagementClaude

database-design

31392
from sickn33/antigravity-awesome-skills

Database design principles and decision-making. Schema design, indexing strategy, ORM selection, serverless databases.

Software DevelopmentClaude

database-cloud-optimization-cost-optimize

31392
from sickn33/antigravity-awesome-skills

You are a cloud cost optimization expert specializing in reducing infrastructure expenses while maintaining performance and reliability. Analyze cloud spending, identify savings opportunities, and implement cost-effective architectures across AWS, Azure, and GCP.

Cloud Cost OptimizationClaude

database-architect

31392
from sickn33/antigravity-awesome-skills

Expert database architect specializing in data layer design from scratch, technology selection, schema modeling, and scalable database architectures.

Database Design & ArchitectureClaude

nft-standards

31392
from sickn33/antigravity-awesome-skills

Master ERC-721 and ERC-1155 NFT standards, metadata best practices, and advanced NFT features.

Web3 & BlockchainClaude

nextjs-app-router-patterns

31392
from sickn33/antigravity-awesome-skills

Comprehensive patterns for Next.js 14+ App Router architecture, Server Components, and modern full-stack React development.

Web FrameworksClaude