ETL Pipeline

Design and automate Extract, Transform, Load data pipelines for data integration and analytics

33 stars

Best use case

ETL Pipeline is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Design and automate Extract, Transform, Load data pipelines for data integration and analytics

Teams using ETL Pipeline should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/etl-pipeline/SKILL.md --create-dirs "https://raw.githubusercontent.com/aAAaqwq/AGI-Super-Team/main/skills/etl-pipeline/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/etl-pipeline/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How ETL Pipeline Compares

Feature / AgentETL PipelineStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Design and automate Extract, Transform, Load data pipelines for data integration and analytics

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

SKILL.md Source

# ETL Pipeline

Comprehensive skill for designing and automating Extract, Transform, Load data pipelines.

## Pipeline Architecture

### Core ETL Flow

```
DATA PIPELINE ARCHITECTURE:
┌─────────────────────────────────────────────────────────┐
│                     EXTRACT                              │
├─────────┬─────────┬─────────┬─────────┬─────────────────┤
│ Postgres│  MySQL  │ MongoDB │  APIs   │  Files (CSV/JSON)│
└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘
     │         │         │         │              │
     └─────────┴─────────┴────┬────┴──────────────┘
                              ▼
┌─────────────────────────────────────────────────────────┐
│                    TRANSFORM                             │
│  • Clean & Validate    • Aggregate & Join               │
│  • Normalize           • Calculate Metrics              │
│  • Deduplicate         • Apply Business Rules           │
└────────────────────────┬────────────────────────────────┘
                         ▼
┌─────────────────────────────────────────────────────────┐
│                      LOAD                                │
├─────────────┬─────────────┬─────────────┬───────────────┤
│  BigQuery   │  Snowflake  │  Redshift   │  Data Lake    │
└─────────────┴─────────────┴─────────────┴───────────────┘
```

## Source Connectors

### Database Connections

```yaml
sources:
  postgres:
    type: postgresql
    host: db.example.com
    port: 5432
    database: production
    ssl: true
    extraction:
      method: incremental
      key: updated_at
      batch_size: 10000

  mysql:
    type: mysql
    host: mysql.example.com
    port: 3306
    database: analytics
    extraction:
      method: cdc
      binlog: true

  mongodb:
    type: mongodb
    connection_string: mongodb+srv://...
    database: app_data
    extraction:
      method: change_streams
```

### API Sources

```yaml
api_sources:
  stripe:
    type: rest_api
    base_url: https://api.stripe.com/v1
    auth: bearer_token
    endpoints:
      - /charges
      - /customers
      - /subscriptions
    pagination: cursor
    rate_limit: 100/second

  salesforce:
    type: salesforce
    instance_url: https://company.salesforce.com
    auth: oauth2
    objects:
      - Account
      - Opportunity
      - Contact
    bulk_api: true
```

## Transformation Layer

### Common Transformations

```python
# Data Cleaning
transformations = {
    "clean_nulls": {
        "operation": "fill_null",
        "columns": ["email", "phone"],
        "value": "unknown"
    },
    
    "standardize_dates": {
        "operation": "date_parse",
        "columns": ["created_at", "updated_at"],
        "format": "ISO8601"
    },
    
    "normalize_currency": {
        "operation": "convert_currency",
        "source_column": "amount",
        "currency_column": "currency",
        "target": "USD"
    },
    
    "deduplicate": {
        "operation": "distinct",
        "key_columns": ["customer_id", "transaction_id"],
        "keep": "latest"
    }
}
```

### Aggregation Rules

```sql
-- Daily Revenue Aggregation
SELECT 
    DATE(created_at) as date,
    product_category,
    COUNT(*) as transactions,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_order_value,
    COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE created_at >= '${start_date}'
GROUP BY 1, 2
```

### Join Operations

```yaml
joins:
  - name: enrich_orders
    left: orders
    right: customers
    type: left
    on:
      - left: customer_id
        right: id
    select:
      - orders.*
      - customers.email
      - customers.segment
      - customers.lifetime_value

  - name: add_product_details
    left: enriched_orders
    right: products
    type: left
    on:
      - left: product_id
        right: id
```

## Load Strategies

### BigQuery Load

```yaml
bigquery_load:
  project: my-project
  dataset: analytics
  table: fact_orders
  
  schema:
    - name: order_id
      type: STRING
      mode: REQUIRED
    - name: customer_id
      type: STRING
    - name: amount
      type: NUMERIC
    - name: created_at
      type: TIMESTAMP
  
  load_config:
    write_disposition: WRITE_APPEND
    create_disposition: CREATE_IF_NEEDED
    clustering_fields: [customer_id]
    time_partitioning:
      field: created_at
      type: DAY
```

### Snowflake Load

```yaml
snowflake_load:
  warehouse: ETL_WH
  database: ANALYTICS
  schema: PUBLIC
  table: FACT_ORDERS
  
  staging:
    stage: '@MY_STAGE'
    file_format: JSON
  
  copy_options:
    on_error: CONTINUE
    purge: true
    match_by_column_name: CASE_INSENSITIVE
```

## Pipeline Orchestration

### DAG Definition

```yaml
pipeline:
  name: daily_analytics_etl
  schedule: "0 2 * * *"  # 2 AM daily
  
  tasks:
    - id: extract_orders
      type: extract
      source: postgres
      query: "SELECT * FROM orders WHERE date = '${execution_date}'"
      
    - id: extract_customers
      type: extract
      source: postgres
      query: "SELECT * FROM customers"
      
    - id: transform_data
      type: transform
      dependencies: [extract_orders, extract_customers]
      operations:
        - join_customers
        - calculate_metrics
        - apply_business_rules
      
    - id: load_warehouse
      type: load
      dependencies: [transform_data]
      target: bigquery
      table: fact_orders
      
    - id: notify_complete
      type: notification
      dependencies: [load_warehouse]
      channel: slack
      message: "Daily ETL completed successfully"
```

### Error Handling

```yaml
error_handling:
  retry:
    max_attempts: 3
    delay_seconds: 300
    exponential_backoff: true
  
  on_failure:
    - log_error
    - send_alert
    - save_failed_records
  
  dead_letter:
    enabled: true
    destination: s3://etl-errors/
    retention_days: 30
```

## Data Quality

### Validation Rules

```yaml
quality_checks:
  - name: null_check
    column: customer_id
    rule: not_null
    severity: error
    
  - name: range_check
    column: amount
    rule: between
    min: 0
    max: 100000
    severity: warning
    
  - name: uniqueness
    columns: [order_id]
    rule: unique
    severity: error
    
  - name: referential_integrity
    column: product_id
    reference_table: products
    reference_column: id
    severity: error
    
  - name: freshness
    column: updated_at
    rule: max_age_hours
    value: 24
    severity: warning
```

### Quality Metrics Dashboard

```
DATA QUALITY REPORT - ${date}
═══════════════════════════════════════
Total Records Processed: 1,250,000
Passed Validation:       1,247,500 (99.8%)
Failed Validation:           2,500 (0.2%)

ISSUES BY TYPE:
┌─────────────────┬────────┬──────────┐
│ Issue Type      │ Count  │ Severity │
├─────────────────┼────────┼──────────┤
│ Null values     │ 1,200  │ Warning  │
│ Invalid format  │   850  │ Error    │
│ Out of range    │   300  │ Warning  │
│ Duplicates      │   150  │ Error    │
└─────────────────┴────────┴──────────┘
```

## Monitoring & Alerting

### Pipeline Metrics

```yaml
metrics:
  - name: pipeline_duration
    type: gauge
    labels: [pipeline_name, status]
    
  - name: records_processed
    type: counter
    labels: [pipeline_name, source, destination]
    
  - name: error_count
    type: counter
    labels: [pipeline_name, error_type]
    
  - name: data_freshness
    type: gauge
    labels: [table_name]
```

### Alert Configuration

```yaml
alerts:
  - name: pipeline_failed
    condition: status == 'failed'
    channels: [pagerduty, slack]
    
  - name: high_error_rate
    condition: error_rate > 0.05
    channels: [slack]
    
  - name: slow_pipeline
    condition: duration > 2 * avg_duration
    channels: [slack]
    
  - name: data_freshness
    condition: freshness_hours > 24
    channels: [email]
```

## Best Practices

1. **Incremental Loading**: Use incremental extraction when possible
2. **Idempotency**: Ensure pipelines can be re-run safely
3. **Partitioning**: Partition large tables by date
4. **Monitoring**: Track pipeline health metrics
5. **Documentation**: Document all transformations
6. **Testing**: Test with sample data before production
7. **Version Control**: Track pipeline changes in git

Related Skills

data-engineering-data-pipeline

33
from aAAaqwq/AGI-Super-Team

You are a data pipeline architecture expert specializing in scalable, reliable, and cost-effective data pipelines for batch and streaming data processing.

csv-pipeline

33
from aAAaqwq/AGI-Super-Team

Process, transform, analyze, and report on CSV and JSON data files. Use when the user needs to filter rows, join datasets, compute aggregates, convert formats, deduplicate, or generate summary reports from tabular data. Works with any CSV, TSV, or JSON Lines file.

cost-aware-llm-pipeline

33
from aAAaqwq/AGI-Super-Team

Cost optimization patterns for LLM API usage — model routing by task complexity, budget tracking, retry logic, and prompt caching.

wemp-operator

33
from aAAaqwq/AGI-Super-Team

> 微信公众号全功能运营——草稿/发布/评论/用户/素材/群发/统计/菜单/二维码 API 封装

Content & Documentation

zsxq-smart-publish

33
from aAAaqwq/AGI-Super-Team

Publish and manage content on 知识星球 (zsxq.com). Supports talk posts, Q&A, long articles, file sharing, digest/bookmark, homework tasks, and tag management. Use when publishing content to 知识星球, creating/editing posts, uploading files/images/audio, managing digests, batch publishing, or formatting content for 知识星球.

zoom-automation

33
from aAAaqwq/AGI-Super-Team

Automate Zoom meeting creation, management, recordings, webinars, and participant tracking via Rube MCP (Composio). Always search tools first for current schemas.

zoho-crm-automation

33
from aAAaqwq/AGI-Super-Team

Automate Zoho CRM tasks via Rube MCP (Composio): create/update records, search contacts, manage leads, and convert leads. Always search tools first for current schemas.

ziliu-publisher

33
from aAAaqwq/AGI-Super-Team

字流(Ziliu) - AI驱动的多平台内容分发工具。用于一次创作、智能适配排版、一键分发到16+平台(公众号/知乎/小红书/B站/抖音/微博/X等)。当用户需要多平台发布、内容排版、格式适配时使用。触发词:字流、ziliu、多平台发布、一键分发、内容分发、排版发布。

zhihu-post-skill

33
from aAAaqwq/AGI-Super-Team

> 知乎文章发布——知乎平台内容创作与发布自动化

zendesk-automation

33
from aAAaqwq/AGI-Super-Team

Automate Zendesk tasks via Rube MCP (Composio): tickets, users, organizations, replies. Always search tools first for current schemas.

youtube-knowledge-extractor

33
from aAAaqwq/AGI-Super-Team

This skill performs deep analysis of YouTube videos through **both information channels** Multimodal YouTube video analysis through both audio (transcript) and visual (frame extraction + image analysis) channels. Especially powerful for HowTo videos, tutorials, demos, and explainer videos where what is SHOWN (screenshots, UI demos, diagrams, code, physical actions) is just as important as what is SAID. Use this skill whenever a user wants to analyze, summarize, or create step-by-step guides from YouTube videos, or when they share a YouTube URL and want to understand what happens in the video. Triggers on requests like "Analyze this YouTube video", "Create a step-by-step guide from this video", "What does this video show?", "Summarize this tutorial", or any YouTube URL shared with analysis intent.

youtube-factory

33
from aAAaqwq/AGI-Super-Team

Generate complete YouTube videos from a single prompt - script, voiceover, stock footage, captions, thumbnail. Self-contained, no external modules. 100% free tools.