data-pipeline-monitor
Track ETL and data pipeline jobs with success/failure status, duration tracking, heartbeat monitoring, and dependency visualization. Use when you need to monitor scheduled jobs, detect failures, track pipeline health over time, or visualize ETL step dependencies. Triggers include "pipeline monitoring", "job tracking", "ETL status", "cron job health", "heartbeat monitor", "pipeline failed", or any task involving monitoring data workflows.
Best use case
data-pipeline-monitor is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Track ETL and data pipeline jobs with success/failure status, duration tracking, heartbeat monitoring, and dependency visualization. Use when you need to monitor scheduled jobs, detect failures, track pipeline health over time, or visualize ETL step dependencies. Triggers include "pipeline monitoring", "job tracking", "ETL status", "cron job health", "heartbeat monitor", "pipeline failed", or any task involving monitoring data workflows.
Teams using data-pipeline-monitor should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/data-pipeline-monitor/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How data-pipeline-monitor Compares
| Feature / Agent | data-pipeline-monitor | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Track ETL and data pipeline jobs with success/failure status, duration tracking, heartbeat monitoring, and dependency visualization. Use when you need to monitor scheduled jobs, detect failures, track pipeline health over time, or visualize ETL step dependencies. Triggers include "pipeline monitoring", "job tracking", "ETL status", "cron job health", "heartbeat monitor", "pipeline failed", or any task involving monitoring data workflows.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# data-pipeline-monitor
Track ETL and data pipeline jobs. Self-hosted. REST API.
## When to use
- Monitoring scheduled ETL jobs and cron tasks
- Detecting silent failures in data pipelines
- Tracking run duration history and detecting regressions
- Visualizing job dependency graphs (DAG)
- Getting alerts when pipelines fail or miss their schedule
- Any workflow where jobs need to report start, progress, and completion
## Prerequisites
1. A running data-pipeline-monitor server
2. An API key (created in Settings or via API)
## Installation
```bash
# Run with Docker
docker compose up -d
# Or run locally
pnpm install && pnpm build && pnpm start
```
## Quick Start
```bash
# 1. Create an API key
curl -s -X POST http://localhost:3000/api/keys \
-H 'X-API-Key: your-bootstrap-key' \
-d '{"label":"my-server"}'
# -> {"key":"dpm_abc123...","label":"my-server"} (shown once)
# 2. Register a pipeline
curl -s -X POST http://localhost:3000/api/pipelines \
-H 'X-API-Key: dpm_abc123...' \
-H 'Content-Type: application/json' \
-d '{"id":"daily-etl","schedule":"0 2 * * *","jobs":[{"id":"extract"},{"id":"load","depends_on":["extract"]}]}'
# 3. Report a run
RUN_ID=$(curl -s -X POST http://localhost:3000/api/jobs/extract/start \
-H 'X-API-Key: dpm_abc123...' | jq -r .run_id)
# ... do work ...
curl -s -X POST http://localhost:3000/api/jobs/extract/complete \
-H 'X-API-Key: dpm_abc123...' \
-d "{\"run_id\":\"$RUN_ID\"}"
```
## Core API
### Register a pipeline
```
POST /api/pipelines
Body: { id, name?, schedule?, expected_duration_s?, heartbeat_timeout_s?, jobs[], tags? }
```
### Job lifecycle
```
POST /api/jobs/:id/start -> { run_id, started_at }
POST /api/jobs/:id/heartbeat -> body: { run_id }
POST /api/jobs/:id/complete -> body: { run_id }
POST /api/jobs/:id/fail -> body: { run_id, error? }
```
### Query data
```
GET /api/pipelines -> list all pipelines
GET /api/pipelines/:id -> single pipeline + DAG + latest run
GET /api/history -> paginated run history
GET /api/metrics -> success rates, duration percentiles, run counts
GET /api/health -> health check (no auth required)
```
## Heartbeat Pattern
Jobs must send periodic heartbeats or they are marked failed automatically.
```bash
# Recommended: heartbeat every heartbeat_timeout/3 seconds
(while true; do
curl -s -X POST http://localhost:3000/api/jobs/$JOB_ID/heartbeat \
-H "X-API-Key: $API_KEY" -d "{\"run_id\":\"$RUN_ID\"}"
sleep 60
done) & HB_PID=$!
# Do your work
./run-job.sh
# Clean up
kill $HB_PID 2>/dev/null
# Report result
if [ $? -eq 0 ]; then
curl -s -X POST .../complete -d "{\"run_id\":\"$RUN_ID\"}"
else
curl -s -X POST .../fail -d "{\"run_id\":\"$RUN_ID\",\"error\":\"Exit $?\"}"
fi
```
## Job Dependency (DAG)
Jobs in a pipeline can declare dependencies. The monitor validates no cycles exist.
```json
{
"id": "analytics-rollup",
"jobs": [
{ "id": "fetch-events" },
{ "id": "fetch-users" },
{ "id": "aggregate", "depends_on": ["fetch-events", "fetch-users"] },
{ "id": "write-stats", "depends_on": ["aggregate"] }
]
}
```
## Alert Rules
```
POST /api/alerts
Body: {
condition: "on_failure" | "missed_run" | "duration_exceeded",
pipeline_id?: string, (null = all pipelines)
threshold_s?: number, (required for duration_exceeded)
channel: "email" | "webhook",
target: string (email address or webhook URL)
}
```
## CLI Reference
| Command | Description |
|---|---|
| `dpm pipelines` | List all pipelines with status |
| `dpm pipeline <id>` | Show pipeline detail |
| `dpm history [pipeline-id]` | Show run history |
| `dpm keys` | List API keys |
| `dpm keys create --label <label>` | Create API key |
| `dpm keys revoke <id>` | Revoke API key |
| `dpm --help` | Show help |
| `dpm --version` | Show version |
## Environment Variables
| Variable | Description | Default |
|---|---|---|
| `DPM_PORT` | HTTP server port | 3000 |
| `DPM_HOST` | Bind address | 127.0.0.1 |
| `DPM_DATA_DIR` | Data directory | ~/.dpm |
| `DPM_API_KEY` | Bootstrap key (created on first run) | - |
| `DPM_ALERT_EMAIL_FROM` | From address for email alerts | - |
| `DPM_SMTP_HOST` | SMTP server host | - |
| `DPM_SMTP_PORT` | SMTP server port | 587 |
| `DPM_SMTP_USER` | SMTP username | - |
| `DPM_SMTP_PASS` | SMTP password | - |
| `DPM_LOG_LEVEL` | debug / info / warn / error | info |
| `DPM_RETENTION_DAYS` | Run history retention | 90 |
| `DPM_DEV` | Dev mode: verbose logging, no auth (0/1) | 0 |
## Job Status Values
| Status | Meaning |
|---|---|
| `running` | Job started, heartbeats being received |
| `success` | Job completed normally |
| `failed` | Job failed (explicit fail call or heartbeat timeout) |
| `pending` | Job registered but not yet started in this run group |
| `missed` | Scheduled run did not start within the missed_run_window |
| `skipped` | Job skipped because a dependency failed |
## Troubleshooting
### "heartbeat timeout"
The job did not send a heartbeat within `heartbeat_timeout_s` seconds. Start heartbeats immediately after the start call and ensure the background loop does not get killed before the job finishes.
### "depends_on contains unknown job ID"
The `depends_on` array references a job ID that does not exist in the same pipeline. Check spelling and ensure all referenced jobs are in the `jobs` array.
### Alert not firing
Check that the alert rule is enabled in Settings. Check the alert log at `GET /api/alerts/log`. Verify SMTP settings if using email channel.
### "SQLITE_LOCKED" errors
The server is configured without WAL mode. This should not happen in a normal install. Check that `DPM_DATA_DIR` is on a local disk, not NFS.Related Skills
Skill: Uptime Monitoring
## Overview
food-database
No description provided.
serial-monitor
No description provided.
ssl-cert-monitor
Operate ssl-cert-monitor -- add hosts, configure alert rules, trigger checks, review history, and deploy the stack.
backup-monitor
Track backup jobs via heartbeat pings, alert on missed or failed backups. Use when you need to monitor scheduled backup scripts, get alerted when a backup misses its window, or track backup execution history. Triggers include "backup monitoring", "backup alerts", "missed backup", "backup heartbeat", "backup job tracking", or any task involving backup reliability verification.
cron-monitor
Send heartbeat pings to cron-monitor after cron job completion, check job status, and register new jobs. Use when you need to confirm a scheduled task ran successfully, check if a cron job is healthy, or add monitoring to a new cron script. Triggers include "ping cron-monitor", "check job status", "register cron job", "heartbeat", "cron health check", or any task involving scheduled job monitoring.
database-size-monitor
Dashboard for monitoring PostgreSQL and MySQL table sizes over time, with growth tracking, threshold alerts, and snapshot comparison
sqlite-data
Query and inspect SQLite databases used by data tools. Use when you need to directly inspect stored pipeline runs, metrics, or configuration data stored in a SQLite database file. Triggers include "query the database", "inspect SQLite", "check raw data", "what is in the db", or any task requiring direct database access.
data-visualization
Chart types, data aggregation patterns, and recharts usage for the csv-explorer chart builder
process-monitor
Monitor system processes for resource usage using process-tree watch mode. Use when tracking CPU or memory usage over time, finding resource hogs, or watching a specific process. Triggers include "monitor processes", "watch cpu usage", "process monitor", "top processes", "resource usage", "ptree watch".
finetune-data-curator
Web app for creating, editing, and validating JSONL fine-tuning datasets. Checks format compliance for OpenAI, Anthropic, and Llama formats, detects duplicates, scores quality, and exports clean datasets.
Skill: Status Page
## Overview