ETL Pipeline
Design and automate Extract, Transform, Load data pipelines for data integration and analytics
Best use case
ETL Pipeline is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Design and automate Extract, Transform, Load data pipelines for data integration and analytics
Teams using ETL Pipeline should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/etl-pipeline/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How ETL Pipeline Compares
| Feature / Agent | ETL Pipeline | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Design and automate Extract, Transform, Load data pipelines for data integration and analytics
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# ETL Pipeline
Comprehensive skill for designing and automating Extract, Transform, Load data pipelines.
## Pipeline Architecture
### Core ETL Flow
```
DATA PIPELINE ARCHITECTURE:
┌─────────────────────────────────────────────────────────┐
│ EXTRACT │
├─────────┬─────────┬─────────┬─────────┬─────────────────┤
│ Postgres│ MySQL │ MongoDB │ APIs │ Files (CSV/JSON)│
└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘
│ │ │ │ │
└─────────┴─────────┴────┬────┴──────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ TRANSFORM │
│ • Clean & Validate • Aggregate & Join │
│ • Normalize • Calculate Metrics │
│ • Deduplicate • Apply Business Rules │
└────────────────────────┬────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ LOAD │
├─────────────┬─────────────┬─────────────┬───────────────┤
│ BigQuery │ Snowflake │ Redshift │ Data Lake │
└─────────────┴─────────────┴─────────────┴───────────────┘
```
## Source Connectors
### Database Connections
```yaml
sources:
postgres:
type: postgresql
host: db.example.com
port: 5432
database: production
ssl: true
extraction:
method: incremental
key: updated_at
batch_size: 10000
mysql:
type: mysql
host: mysql.example.com
port: 3306
database: analytics
extraction:
method: cdc
binlog: true
mongodb:
type: mongodb
connection_string: mongodb+srv://...
database: app_data
extraction:
method: change_streams
```
### API Sources
```yaml
api_sources:
stripe:
type: rest_api
base_url: https://api.stripe.com/v1
auth: bearer_token
endpoints:
- /charges
- /customers
- /subscriptions
pagination: cursor
rate_limit: 100/second
salesforce:
type: salesforce
instance_url: https://company.salesforce.com
auth: oauth2
objects:
- Account
- Opportunity
- Contact
bulk_api: true
```
## Transformation Layer
### Common Transformations
```python
# Data Cleaning
transformations = {
"clean_nulls": {
"operation": "fill_null",
"columns": ["email", "phone"],
"value": "unknown"
},
"standardize_dates": {
"operation": "date_parse",
"columns": ["created_at", "updated_at"],
"format": "ISO8601"
},
"normalize_currency": {
"operation": "convert_currency",
"source_column": "amount",
"currency_column": "currency",
"target": "USD"
},
"deduplicate": {
"operation": "distinct",
"key_columns": ["customer_id", "transaction_id"],
"keep": "latest"
}
}
```
### Aggregation Rules
```sql
-- Daily Revenue Aggregation
SELECT
DATE(created_at) as date,
product_category,
COUNT(*) as transactions,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE created_at >= '${start_date}'
GROUP BY 1, 2
```
### Join Operations
```yaml
joins:
- name: enrich_orders
left: orders
right: customers
type: left
on:
- left: customer_id
right: id
select:
- orders.*
- customers.email
- customers.segment
- customers.lifetime_value
- name: add_product_details
left: enriched_orders
right: products
type: left
on:
- left: product_id
right: id
```
## Load Strategies
### BigQuery Load
```yaml
bigquery_load:
project: my-project
dataset: analytics
table: fact_orders
schema:
- name: order_id
type: STRING
mode: REQUIRED
- name: customer_id
type: STRING
- name: amount
type: NUMERIC
- name: created_at
type: TIMESTAMP
load_config:
write_disposition: WRITE_APPEND
create_disposition: CREATE_IF_NEEDED
clustering_fields: [customer_id]
time_partitioning:
field: created_at
type: DAY
```
### Snowflake Load
```yaml
snowflake_load:
warehouse: ETL_WH
database: ANALYTICS
schema: PUBLIC
table: FACT_ORDERS
staging:
stage: '@MY_STAGE'
file_format: JSON
copy_options:
on_error: CONTINUE
purge: true
match_by_column_name: CASE_INSENSITIVE
```
## Pipeline Orchestration
### DAG Definition
```yaml
pipeline:
name: daily_analytics_etl
schedule: "0 2 * * *" # 2 AM daily
tasks:
- id: extract_orders
type: extract
source: postgres
query: "SELECT * FROM orders WHERE date = '${execution_date}'"
- id: extract_customers
type: extract
source: postgres
query: "SELECT * FROM customers"
- id: transform_data
type: transform
dependencies: [extract_orders, extract_customers]
operations:
- join_customers
- calculate_metrics
- apply_business_rules
- id: load_warehouse
type: load
dependencies: [transform_data]
target: bigquery
table: fact_orders
- id: notify_complete
type: notification
dependencies: [load_warehouse]
channel: slack
message: "Daily ETL completed successfully"
```
### Error Handling
```yaml
error_handling:
retry:
max_attempts: 3
delay_seconds: 300
exponential_backoff: true
on_failure:
- log_error
- send_alert
- save_failed_records
dead_letter:
enabled: true
destination: s3://etl-errors/
retention_days: 30
```
## Data Quality
### Validation Rules
```yaml
quality_checks:
- name: null_check
column: customer_id
rule: not_null
severity: error
- name: range_check
column: amount
rule: between
min: 0
max: 100000
severity: warning
- name: uniqueness
columns: [order_id]
rule: unique
severity: error
- name: referential_integrity
column: product_id
reference_table: products
reference_column: id
severity: error
- name: freshness
column: updated_at
rule: max_age_hours
value: 24
severity: warning
```
### Quality Metrics Dashboard
```
DATA QUALITY REPORT - ${date}
═══════════════════════════════════════
Total Records Processed: 1,250,000
Passed Validation: 1,247,500 (99.8%)
Failed Validation: 2,500 (0.2%)
ISSUES BY TYPE:
┌─────────────────┬────────┬──────────┐
│ Issue Type │ Count │ Severity │
├─────────────────┼────────┼──────────┤
│ Null values │ 1,200 │ Warning │
│ Invalid format │ 850 │ Error │
│ Out of range │ 300 │ Warning │
│ Duplicates │ 150 │ Error │
└─────────────────┴────────┴──────────┘
```
## Monitoring & Alerting
### Pipeline Metrics
```yaml
metrics:
- name: pipeline_duration
type: gauge
labels: [pipeline_name, status]
- name: records_processed
type: counter
labels: [pipeline_name, source, destination]
- name: error_count
type: counter
labels: [pipeline_name, error_type]
- name: data_freshness
type: gauge
labels: [table_name]
```
### Alert Configuration
```yaml
alerts:
- name: pipeline_failed
condition: status == 'failed'
channels: [pagerduty, slack]
- name: high_error_rate
condition: error_rate > 0.05
channels: [slack]
- name: slow_pipeline
condition: duration > 2 * avg_duration
channels: [slack]
- name: data_freshness
condition: freshness_hours > 24
channels: [email]
```
## Best Practices
1. **Incremental Loading**: Use incremental extraction when possible
2. **Idempotency**: Ensure pipelines can be re-run safely
3. **Partitioning**: Partition large tables by date
4. **Monitoring**: Track pipeline health metrics
5. **Documentation**: Document all transformations
6. **Testing**: Test with sample data before production
7. **Version Control**: Track pipeline changes in gitRelated Skills
data-engineering-data-pipeline
You are a data pipeline architecture expert specializing in scalable, reliable, and cost-effective data pipelines for batch and streaming data processing.
csv-pipeline
Process, transform, analyze, and report on CSV and JSON data files. Use when the user needs to filter rows, join datasets, compute aggregates, convert formats, deduplicate, or generate summary reports from tabular data. Works with any CSV, TSV, or JSON Lines file.
cost-aware-llm-pipeline
Cost optimization patterns for LLM API usage — model routing by task complexity, budget tracking, retry logic, and prompt caching.
wemp-operator
> 微信公众号全功能运营——草稿/发布/评论/用户/素材/群发/统计/菜单/二维码 API 封装
zsxq-smart-publish
Publish and manage content on 知识星球 (zsxq.com). Supports talk posts, Q&A, long articles, file sharing, digest/bookmark, homework tasks, and tag management. Use when publishing content to 知识星球, creating/editing posts, uploading files/images/audio, managing digests, batch publishing, or formatting content for 知识星球.
zoom-automation
Automate Zoom meeting creation, management, recordings, webinars, and participant tracking via Rube MCP (Composio). Always search tools first for current schemas.
zoho-crm-automation
Automate Zoho CRM tasks via Rube MCP (Composio): create/update records, search contacts, manage leads, and convert leads. Always search tools first for current schemas.
ziliu-publisher
字流(Ziliu) - AI驱动的多平台内容分发工具。用于一次创作、智能适配排版、一键分发到16+平台(公众号/知乎/小红书/B站/抖音/微博/X等)。当用户需要多平台发布、内容排版、格式适配时使用。触发词:字流、ziliu、多平台发布、一键分发、内容分发、排版发布。
zhihu-post-skill
> 知乎文章发布——知乎平台内容创作与发布自动化
zendesk-automation
Automate Zendesk tasks via Rube MCP (Composio): tickets, users, organizations, replies. Always search tools first for current schemas.
youtube-knowledge-extractor
This skill performs deep analysis of YouTube videos through **both information channels** Multimodal YouTube video analysis through both audio (transcript) and visual (frame extraction + image analysis) channels. Especially powerful for HowTo videos, tutorials, demos, and explainer videos where what is SHOWN (screenshots, UI demos, diagrams, code, physical actions) is just as important as what is SAID. Use this skill whenever a user wants to analyze, summarize, or create step-by-step guides from YouTube videos, or when they share a YouTube URL and want to understand what happens in the video. Triggers on requests like "Analyze this YouTube video", "Create a step-by-step guide from this video", "What does this video show?", "Summarize this tutorial", or any YouTube URL shared with analysis intent.
youtube-factory
Generate complete YouTube videos from a single prompt - script, voiceover, stock footage, captions, thumbnail. Self-contained, no external modules. 100% free tools.