data-lineage-mapper

Extracts and maps data lineage from various sources including SQL, dbt, Airflow, and Spark, generating comprehensive lineage graphs for impact analysis.

509 stars

Best use case

data-lineage-mapper is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Extracts and maps data lineage from various sources including SQL, dbt, Airflow, and Spark, generating comprehensive lineage graphs for impact analysis.

Teams using data-lineage-mapper should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/data-lineage-mapper/SKILL.md --create-dirs "https://raw.githubusercontent.com/a5c-ai/babysitter/main/library/specializations/data-engineering-analytics/skills/data-lineage-mapper/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/data-lineage-mapper/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How data-lineage-mapper Compares

Feature / Agentdata-lineage-mapperStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Extracts and maps data lineage from various sources including SQL, dbt, Airflow, and Spark, generating comprehensive lineage graphs for impact analysis.

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Data Lineage Mapper

Extracts and maps data lineage from various sources to provide comprehensive data flow visibility.

## Overview

This skill parses and extracts data lineage information from SQL queries, dbt projects, Airflow DAGs, and Spark jobs. It generates comprehensive lineage graphs showing data flow from source to destination, enabling impact analysis and data governance.

## Capabilities

- **SQL parsing for lineage extraction** - Parse SELECT, INSERT, MERGE statements
- **dbt lineage integration** - Extract lineage from manifest.json
- **Airflow task lineage mapping** - Map data flows across DAG tasks
- **Spark job lineage extraction** - Parse Spark SQL and DataFrame operations
- **Cross-system lineage connection** - Connect lineage across different tools
- **Column-level lineage tracing** - Track individual column transformations
- **Impact analysis** - Downstream/upstream impact assessment
- **Lineage graph generation** - Visual and machine-readable lineage
- **Integration with data catalogs** - Export to DataHub, Amundsen, Alation

## Input Schema

```json
{
  "sources": {
    "type": "array",
    "required": true,
    "items": {
      "type": {
        "type": "string",
        "enum": ["sql", "dbt", "airflow", "spark", "file"]
      },
      "content": {
        "type": "string|object",
        "description": "SQL string, file path, or manifest object"
      },
      "metadata": {
        "type": "object",
        "properties": {
          "database": "string",
          "schema": "string",
          "catalog": "string"
        }
      }
    }
  },
  "existingLineage": {
    "type": "object",
    "description": "Existing lineage graph to merge with"
  },
  "targetCatalog": {
    "type": "string",
    "enum": ["datahub", "amundsen", "alation", "openlineage", "json"],
    "default": "json",
    "description": "Target format for lineage export"
  },
  "options": {
    "type": "object",
    "properties": {
      "columnLevel": {
        "type": "boolean",
        "default": true,
        "description": "Extract column-level lineage"
      },
      "resolveViews": {
        "type": "boolean",
        "default": false,
        "description": "Resolve views to underlying tables"
      },
      "includeTemporary": {
        "type": "boolean",
        "default": false,
        "description": "Include temporary/CTE tables in lineage"
      }
    }
  }
}
```

## Output Schema

```json
{
  "lineageGraph": {
    "type": "object",
    "properties": {
      "nodes": {
        "type": "array",
        "items": {
          "id": "string",
          "type": "table|view|file|external",
          "name": "string",
          "database": "string",
          "schema": "string",
          "columns": "array"
        }
      },
      "edges": {
        "type": "array",
        "items": {
          "source": "string",
          "target": "string",
          "transformationType": "string",
          "sql": "string"
        }
      }
    }
  },
  "columnLineage": {
    "type": "array",
    "items": {
      "targetColumn": {
        "table": "string",
        "column": "string"
      },
      "sourceColumns": {
        "type": "array",
        "items": {
          "table": "string",
          "column": "string",
          "transformation": "string"
        }
      },
      "transformationLogic": "string"
    }
  },
  "impactAnalysis": {
    "type": "object",
    "properties": {
      "upstream": {
        "type": "array",
        "description": "All upstream dependencies"
      },
      "downstream": {
        "type": "array",
        "description": "All downstream dependents"
      },
      "criticalPath": {
        "type": "array",
        "description": "Most important lineage path"
      }
    }
  },
  "catalogIntegration": {
    "type": "object",
    "description": "Export format for target catalog",
    "properties": {
      "format": "string",
      "payload": "object|string"
    }
  },
  "statistics": {
    "tablesCount": "number",
    "columnsCount": "number",
    "edgesCount": "number",
    "maxDepth": "number"
  }
}
```

## Usage Examples

### SQL Query Lineage

```json
{
  "sources": [
    {
      "type": "sql",
      "content": "INSERT INTO analytics.fct_orders SELECT o.order_id, c.customer_name FROM staging.orders o JOIN staging.customers c ON o.customer_id = c.id",
      "metadata": {
        "database": "warehouse",
        "schema": "analytics"
      }
    }
  ],
  "options": {
    "columnLevel": true
  }
}
```

### dbt Project Lineage

```json
{
  "sources": [
    {
      "type": "dbt",
      "content": "./target/manifest.json"
    }
  ],
  "targetCatalog": "datahub",
  "options": {
    "resolveViews": true
  }
}
```

### Multi-Source Lineage

```json
{
  "sources": [
    {
      "type": "dbt",
      "content": "./analytics/target/manifest.json"
    },
    {
      "type": "airflow",
      "content": "./dags/etl_pipeline.py"
    },
    {
      "type": "sql",
      "content": "SELECT * FROM external_db.customers"
    }
  ],
  "targetCatalog": "openlineage"
}
```

### Impact Analysis for Table Change

```json
{
  "sources": [
    {
      "type": "dbt",
      "content": "./target/manifest.json"
    }
  ],
  "options": {
    "columnLevel": true,
    "impactAnalysisTarget": "raw.customers"
  }
}
```

## Lineage Extraction Methods

### SQL Parsing

| Statement Type | Extracted Information |
|---------------|----------------------|
| SELECT | Source tables, column mappings |
| INSERT INTO...SELECT | Target table, source tables |
| CREATE TABLE AS | New table, source lineage |
| MERGE | Target, source, update/insert columns |
| UPDATE...FROM | Target table, source join tables |

### dbt Manifest

Extracts from `manifest.json`:
- Model dependencies via `ref()` and `source()`
- Column-level lineage from `catalog.json`
- Test dependencies
- Documentation links

### Airflow DAGs

Maps lineage from:
- XCom data passing
- Operator source/destination parameters
- Task dependencies representing data flow
- External task sensors

### Spark Jobs

Parses lineage from:
- Spark SQL queries
- DataFrame operations (join, select, groupBy)
- Read/write operations
- Catalog table references

## Column-Level Lineage

### Transformation Types

| Type | Example | Lineage |
|------|---------|---------|
| Direct | `SELECT customer_id` | 1:1 mapping |
| Rename | `customer_id AS cust_id` | Rename mapping |
| Expression | `CONCAT(first, last) AS name` | Multi-column → single |
| Aggregation | `SUM(amount) AS total` | Many → single with agg |
| Case | `CASE WHEN...` | Conditional mapping |

### Example Output

```json
{
  "columnLineage": [
    {
      "targetColumn": {
        "table": "fct_orders",
        "column": "customer_name"
      },
      "sourceColumns": [
        {
          "table": "stg_customers",
          "column": "first_name",
          "transformation": "CONCAT"
        },
        {
          "table": "stg_customers",
          "column": "last_name",
          "transformation": "CONCAT"
        }
      ],
      "transformationLogic": "CONCAT(first_name, ' ', last_name)"
    }
  ]
}
```

## Catalog Export Formats

### DataHub

```json
{
  "format": "datahub",
  "payload": {
    "entities": [...],
    "relationships": [...]
  }
}
```

### OpenLineage

```json
{
  "format": "openlineage",
  "payload": {
    "eventType": "COMPLETE",
    "run": {...},
    "job": {...},
    "inputs": [...],
    "outputs": [...]
  }
}
```

### Amundsen

```json
{
  "format": "amundsen",
  "payload": {
    "tables": [...],
    "columns": [...],
    "lineage": [...]
  }
}
```

## Integration Points

### MCP Server Integration

- **dbt MCP** - Direct manifest access
- **Database MCPs** - Schema and view resolution
- **MindsDB** - Cross-platform lineage

### Related Skills

- dbt Project Analyzer (SK-DEA-003) - dbt lineage analysis
- Data Catalog Enricher (SK-DEA-017) - Catalog metadata enhancement

### Applicable Processes

- Data Lineage Mapping (`data-lineage.js`)
- Data Catalog (`data-catalog.js`)
- dbt Project Setup (`dbt-project-setup.js`)

## References

- [OpenLineage Specification](https://openlineage.io/)
- [DataHub Lineage](https://datahubproject.io/docs/lineage/lineage-feature-guide)
- [dbt Lineage](https://docs.getdbt.com/docs/collaborate/explore-projects#view-lineage)
- [Apache Atlas Lineage](https://atlas.apache.org/)
- [Marquez](https://marquezproject.ai/)

## Version History

- **1.0.0** - Initial release with multi-source lineage extraction

Related Skills

structured-data

509
from a5c-ai/babysitter

JSON-LD schema markup and validation.

CVE/CWE Database Skill

509
from a5c-ai/babysitter

CVE and CWE database querying and management

test-data-generation

509
from a5c-ai/babysitter

Synthetic test data generation and management using Faker.js and similar tools. Generate realistic test data, create data factories, implement database seeding, and manage test data anonymization.

iOS Persistence (Core Data/Realm)

509
from a5c-ai/babysitter

Specialized skill for iOS local data persistence solutions

Room Database

509
from a5c-ai/babysitter

Expert skill for Android Room persistence library

metadata-standards-implementation

509
from a5c-ai/babysitter

Apply Dublin Core, METS, MODS, and other metadata schemas for digital collections and archival materials

health-data-integration

509
from a5c-ai/babysitter

Facilitate interoperability between health IT systems including EHR, HIE, and clinical decision support through HL7, FHIR, and other healthcare data standards

data-versioning-manager

509
from a5c-ai/babysitter

Skill for managing data versions and provenance

connected-papers-mapper

509
from a5c-ai/babysitter

Citation graph exploration for discovering related work through visual graph traversal

analogy-mapper

509
from a5c-ai/babysitter

Skill for identifying and mapping analogies across domains

qubit-mapper

509
from a5c-ai/babysitter

Qubit mapping and routing skill for hardware topology optimization

data-encoder

509
from a5c-ai/babysitter

Classical data encoding skill for quantum machine learning applications