data-pipeline

Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics

16 stars

Best use case

data-pipeline is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics

Teams using data-pipeline should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/data-pipeline/SKILL.md --create-dirs "https://raw.githubusercontent.com/diegosouzapw/awesome-omni-skill/main/skills/data-ai/data-pipeline/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/data-pipeline/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How data-pipeline Compares

Feature / Agentdata-pipelineStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# Data Pipeline

Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.

## Overview

This skill covers:
- Data extraction from multiple sources
- Transformation and cleaning
- Loading to destinations
- Scheduling and monitoring
- Error handling and alerts

---

## ETL Patterns

### Basic ETL Flow

```
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   EXTRACT   │───▶│  TRANSFORM  │───▶│    LOAD     │
│             │    │             │    │             │
│ • APIs      │    │ • Clean     │    │ • Database  │
│ • Databases │    │ • Map       │    │ • Warehouse │
│ • Files     │    │ • Aggregate │    │ • Files     │
│ • Webhooks  │    │ • Enrich    │    │ • APIs      │
└─────────────┘    └─────────────┘    └─────────────┘
```

### n8n ETL Workflow

```yaml
workflow: "Daily Sales ETL"
schedule: "2am daily"

nodes:
  # EXTRACT
  - name: "Extract from Shopify"
    type: shopify
    action: get_orders
    filter: created_at >= yesterday
    
  - name: "Extract from Stripe"
    type: stripe
    action: get_payments
    filter: created >= yesterday
    
  # TRANSFORM
  - name: "Merge Data"
    type: merge
    mode: combine_by_key
    key: order_id
    
  - name: "Transform"
    type: code
    code: |
      return items.map(item => ({
        date: item.created_at.split('T')[0],
        order_id: item.id,
        customer_email: item.email,
        total: parseFloat(item.total_price),
        currency: item.currency,
        items: item.line_items.length,
        source: item.source_name,
        payment_status: item.payment.status
      }));
      
  # LOAD
  - name: "Load to BigQuery"
    type: google_bigquery
    action: insert_rows
    table: sales_daily
    
  - name: "Update Google Sheets"
    type: google_sheets
    action: append_rows
    spreadsheet: "Daily Sales Report"
```

---

## Data Sources

### Common Extractors

```yaml
extractors:
  databases:
    - postgresql:
        connection: connection_string
        query: "SELECT * FROM orders WHERE date >= $1"
        
    - mysql:
        connection: connection_string
        query: custom_sql
        
    - mongodb:
        connection: connection_string
        collection: orders
        filter: {date: {$gte: yesterday}}
        
  apis:
    - rest_api:
        url: "https://api.example.com/data"
        method: GET
        headers: {Authorization: "Bearer {token}"}
        pagination: handle_automatically
        
    - graphql:
        url: "https://api.example.com/graphql"
        query: graphql_query
        
  files:
    - csv:
        source: sftp/s3/google_drive
        delimiter: ","
        encoding: utf-8
        
    - excel:
        source: file_path
        sheet: "Sheet1"
        
    - json:
        source: api/file
        path: "data.items"
        
  saas:
    - salesforce: get_objects
    - hubspot: get_contacts/deals
    - stripe: get_charges
    - shopify: get_orders
```

---

## Transformations

### Common Transformations

```yaml
transformations:
  cleaning:
    - remove_nulls: drop_or_fill
    - trim_whitespace: all_string_fields
    - deduplicate: by_key
    - validate: against_schema
    
  mapping:
    - rename_fields: {old_name: new_name}
    - convert_types: {date_string: date}
    - map_values: {status_code: status_name}
    
  aggregation:
    - group_by: [date, category]
    - sum: [revenue, quantity]
    - count: orders
    - average: order_value
    
  enrichment:
    - lookup: from_reference_table
    - geocode: from_address
    - calculate: derived_fields
    
  filtering:
    - where: condition
    - limit: n_rows
    - sample: percentage
```

### Code Transform Examples

```javascript
// Clean and normalize data
function transform(items) {
  return items.map(item => ({
    // Clean strings
    name: item.name?.trim().toLowerCase(),
    
    // Parse dates
    date: new Date(item.created_at).toISOString().split('T')[0],
    
    // Convert types
    amount: parseFloat(item.amount) || 0,
    
    // Map values
    status: statusMap[item.status_code] || 'unknown',
    
    // Calculate fields
    total: item.quantity * item.unit_price,
    
    // Filter nested
    tags: item.tags?.filter(t => t.active).map(t => t.name),
    
    // Default values
    source: item.source || 'direct'
  }));
}

// Aggregate data
function aggregate(items) {
  const grouped = {};
  
  items.forEach(item => {
    const key = `${item.date}_${item.category}`;
    if (!grouped[key]) {
      grouped[key] = {
        date: item.date,
        category: item.category,
        total_revenue: 0,
        order_count: 0
      };
    }
    grouped[key].total_revenue += item.amount;
    grouped[key].order_count += 1;
  });
  
  return Object.values(grouped);
}
```

---

## Data Destinations

### Common Loaders

```yaml
loaders:
  data_warehouses:
    - bigquery:
        project: project_id
        dataset: analytics
        table: sales
        write_mode: append/truncate
        
    - snowflake:
        account: account_id
        warehouse: compute_wh
        database: analytics
        schema: public
        
    - redshift:
        cluster: cluster_id
        database: analytics
        
  databases:
    - postgresql:
        upsert: on_conflict_update
        
    - mysql:
        batch_insert: 1000_rows
        
  files:
    - s3:
        bucket: data-lake
        path: /processed/{date}/
        format: parquet
        
    - google_cloud_storage:
        bucket: data-bucket
        
  spreadsheets:
    - google_sheets:
        mode: append/overwrite
        
    - airtable:
        base: base_id
        table: table_name
        
  apis:
    - webhook:
        url: destination_url
        batch_size: 100
```

---

## Scheduling & Monitoring

### Pipeline Scheduling

```yaml
scheduling:
  patterns:
    hourly:
      cron: "0 * * * *"
      use_for: real_time_dashboards
      
    daily:
      cron: "0 2 * * *"
      use_for: daily_reports
      
    weekly:
      cron: "0 3 * * 1"
      use_for: weekly_summaries
      
    on_demand:
      trigger: webhook/manual
      use_for: ad_hoc_analysis
      
  dependencies:
    - pipeline_a: must_complete_before pipeline_b
    - wait_for: all_extracts_complete
    
  retries:
    max_attempts: 3
    delay: exponential_backoff
    alert_on: final_failure
```

### Monitoring & Alerts

```yaml
monitoring:
  metrics:
    - rows_processed
    - execution_time
    - error_count
    - data_freshness
    
  alerts:
    pipeline_failed:
      channels: [slack, pagerduty]
      template: |
        🚨 *Pipeline Failed*
        
        Pipeline: {pipeline_name}
        Stage: {failed_stage}
        Error: {error_message}
        
        [View Logs]({logs_url})
        
    data_quality:
      trigger: anomaly_detected
      conditions:
        - row_count: differs_by > 50%
        - null_rate: exceeds_threshold
        - schema: changed_unexpectedly
        
    stale_data:
      trigger: last_update > threshold
      threshold: 2_hours
```

---

## Data Quality

### Quality Checks

```yaml
data_quality:
  schema_validation:
    - required_fields: [id, date, amount]
    - field_types:
        id: integer
        date: date
        amount: number
    - allowed_values:
        status: [active, pending, closed]
        
  statistical_checks:
    - null_rate: < 5%
    - duplicate_rate: < 1%
    - value_range:
        amount: [0, 1000000]
        
  business_rules:
    - total_equals_sum_of_line_items
    - dates_are_not_in_future
    - email_format_valid
    
  trend_analysis:
    - row_count: within_2_std_of_mean
    - total_value: within_expected_range
```

---

## Output Example

**Request**: "Create a daily sales data pipeline"

**Output**:

```markdown
# Daily Sales Data Pipeline

## Pipeline Overview

```
Shopify + Stripe → Transform → BigQuery + Sheets
```

## Schedule
- Runs: 2am daily
- Timezone: UTC
- Retry: 3 attempts

## Extract

### Shopify Orders
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]
```

### Stripe Payments
```yaml
source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]
```

## Transform

```javascript
// Join and clean data
{
  date: order.created_at.split('T')[0],
  order_id: order.id,
  customer: order.email,
  revenue: parseFloat(order.total_price),
  items: order.line_items.length,
  payment_status: payment.status
}
```

## Load

### BigQuery
- Table: `analytics.sales_daily`
- Mode: Append

### Google Sheets
- Sheet: "Daily Sales Dashboard"
- Tab: "Raw Data"

## Quality Checks
- [ ] Row count > 0
- [ ] No null order_ids
- [ ] Revenue sum matches Stripe

## Alerts
- Slack: #data-alerts
- On failure: @data-team
```

---

*Data Pipeline Skill - Part of Claude Office Skills*

Related Skills

large-data-with-dask

16
from diegosouzapw/awesome-omni-skill

Specific optimization strategies for Python scripts working with larger-than-memory datasets via Dask.

ipdata-co-automation

16
from diegosouzapw/awesome-omni-skill

Automate Ipdata co tasks via Rube MCP (Composio). Always search tools first for current schemas.

gdpr-data-handling

16
from diegosouzapw/awesome-omni-skill

Implement GDPR-compliant data handling with consent management, data subject rights, and privacy by design. Use when building systems that process EU personal data, implementing privacy controls, o...

fair-data-model-assessment

16
from diegosouzapw/awesome-omni-skill

Assess data models against FAIR principles using RDA-FDMM indicators. Use when: (1) Evaluating vendor-delivered data models for FAIR compliance, (2) Reviewing schemas, ontologies, or data dictionaries before integration, (3) Creating FAIR assessment reports for data governance reviews, (4) Preparing data model documentation for enterprise or regulatory standards, (5) Auditing existing data assets for FAIRness gaps. Covers 41 RDA indicators across Findable, Accessible, Interoperable, Reusable dimensions with maturity scoring (0-4 scale).

etl-pipeline

16
from diegosouzapw/awesome-omni-skill

Build automated ETL (Extract-Transform-Load) pipelines for construction data. Process PDFs, Excel, BIM exports. Generate reports, dashboards, and integrate with other systems. Orchestrate with Airflow or n8n.

docker-database

16
from diegosouzapw/awesome-omni-skill

Configure database containers with security, persistence, and health checks

datarobot-automation

16
from diegosouzapw/awesome-omni-skill

Automate Datarobot tasks via Rube MCP (Composio). Always search tools first for current schemas.

dataql-analysis

16
from diegosouzapw/awesome-omni-skill

Analyze data files using SQL queries with DataQL. Use when working with CSV, JSON, Parquet, Excel files or when the user mentions data analysis, filtering, aggregation, or SQL queries on files.

datahub-connector-pr-review

16
from diegosouzapw/awesome-omni-skill

This skill should be used when the user asks to "review my connector", "check my datahub connector", "review connector code", "audit connector", "review PR", "check code quality", or any request to review/check/audit a DataHub ingestion source. Covers compliance with standards, best practices, testing quality, and merge readiness.

datagma-automation

16
from diegosouzapw/awesome-omni-skill

Automate Datagma tasks via Rube MCP (Composio). Always search tools first for current schemas.

Database Sync

16
from diegosouzapw/awesome-omni-skill

Automate database synchronization, replication, migration, and cross-platform data integration

database-skill

16
from diegosouzapw/awesome-omni-skill

Design and manage relational databases including table creation, migrations, and schema design. Use for database modeling and maintenance.