data-pipeline
Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics
Best use case
data-pipeline is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics
Teams using data-pipeline should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/data-pipeline/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How data-pipeline Compares
| Feature / Agent | data-pipeline | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Data Pipeline
Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.
## Overview
This skill covers:
- Data extraction from multiple sources
- Transformation and cleaning
- Loading to destinations
- Scheduling and monitoring
- Error handling and alerts
---
## ETL Patterns
### Basic ETL Flow
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ EXTRACT │───▶│ TRANSFORM │───▶│ LOAD │
│ │ │ │ │ │
│ • APIs │ │ • Clean │ │ • Database │
│ • Databases │ │ • Map │ │ • Warehouse │
│ • Files │ │ • Aggregate │ │ • Files │
│ • Webhooks │ │ • Enrich │ │ • APIs │
└─────────────┘ └─────────────┘ └─────────────┘
```
### n8n ETL Workflow
```yaml
workflow: "Daily Sales ETL"
schedule: "2am daily"
nodes:
# EXTRACT
- name: "Extract from Shopify"
type: shopify
action: get_orders
filter: created_at >= yesterday
- name: "Extract from Stripe"
type: stripe
action: get_payments
filter: created >= yesterday
# TRANSFORM
- name: "Merge Data"
type: merge
mode: combine_by_key
key: order_id
- name: "Transform"
type: code
code: |
return items.map(item => ({
date: item.created_at.split('T')[0],
order_id: item.id,
customer_email: item.email,
total: parseFloat(item.total_price),
currency: item.currency,
items: item.line_items.length,
source: item.source_name,
payment_status: item.payment.status
}));
# LOAD
- name: "Load to BigQuery"
type: google_bigquery
action: insert_rows
table: sales_daily
- name: "Update Google Sheets"
type: google_sheets
action: append_rows
spreadsheet: "Daily Sales Report"
```
---
## Data Sources
### Common Extractors
```yaml
extractors:
databases:
- postgresql:
connection: connection_string
query: "SELECT * FROM orders WHERE date >= $1"
- mysql:
connection: connection_string
query: custom_sql
- mongodb:
connection: connection_string
collection: orders
filter: {date: {$gte: yesterday}}
apis:
- rest_api:
url: "https://api.example.com/data"
method: GET
headers: {Authorization: "Bearer {token}"}
pagination: handle_automatically
- graphql:
url: "https://api.example.com/graphql"
query: graphql_query
files:
- csv:
source: sftp/s3/google_drive
delimiter: ","
encoding: utf-8
- excel:
source: file_path
sheet: "Sheet1"
- json:
source: api/file
path: "data.items"
saas:
- salesforce: get_objects
- hubspot: get_contacts/deals
- stripe: get_charges
- shopify: get_orders
```
---
## Transformations
### Common Transformations
```yaml
transformations:
cleaning:
- remove_nulls: drop_or_fill
- trim_whitespace: all_string_fields
- deduplicate: by_key
- validate: against_schema
mapping:
- rename_fields: {old_name: new_name}
- convert_types: {date_string: date}
- map_values: {status_code: status_name}
aggregation:
- group_by: [date, category]
- sum: [revenue, quantity]
- count: orders
- average: order_value
enrichment:
- lookup: from_reference_table
- geocode: from_address
- calculate: derived_fields
filtering:
- where: condition
- limit: n_rows
- sample: percentage
```
### Code Transform Examples
```javascript
// Clean and normalize data
function transform(items) {
return items.map(item => ({
// Clean strings
name: item.name?.trim().toLowerCase(),
// Parse dates
date: new Date(item.created_at).toISOString().split('T')[0],
// Convert types
amount: parseFloat(item.amount) || 0,
// Map values
status: statusMap[item.status_code] || 'unknown',
// Calculate fields
total: item.quantity * item.unit_price,
// Filter nested
tags: item.tags?.filter(t => t.active).map(t => t.name),
// Default values
source: item.source || 'direct'
}));
}
// Aggregate data
function aggregate(items) {
const grouped = {};
items.forEach(item => {
const key = `${item.date}_${item.category}`;
if (!grouped[key]) {
grouped[key] = {
date: item.date,
category: item.category,
total_revenue: 0,
order_count: 0
};
}
grouped[key].total_revenue += item.amount;
grouped[key].order_count += 1;
});
return Object.values(grouped);
}
```
---
## Data Destinations
### Common Loaders
```yaml
loaders:
data_warehouses:
- bigquery:
project: project_id
dataset: analytics
table: sales
write_mode: append/truncate
- snowflake:
account: account_id
warehouse: compute_wh
database: analytics
schema: public
- redshift:
cluster: cluster_id
database: analytics
databases:
- postgresql:
upsert: on_conflict_update
- mysql:
batch_insert: 1000_rows
files:
- s3:
bucket: data-lake
path: /processed/{date}/
format: parquet
- google_cloud_storage:
bucket: data-bucket
spreadsheets:
- google_sheets:
mode: append/overwrite
- airtable:
base: base_id
table: table_name
apis:
- webhook:
url: destination_url
batch_size: 100
```
---
## Scheduling & Monitoring
### Pipeline Scheduling
```yaml
scheduling:
patterns:
hourly:
cron: "0 * * * *"
use_for: real_time_dashboards
daily:
cron: "0 2 * * *"
use_for: daily_reports
weekly:
cron: "0 3 * * 1"
use_for: weekly_summaries
on_demand:
trigger: webhook/manual
use_for: ad_hoc_analysis
dependencies:
- pipeline_a: must_complete_before pipeline_b
- wait_for: all_extracts_complete
retries:
max_attempts: 3
delay: exponential_backoff
alert_on: final_failure
```
### Monitoring & Alerts
```yaml
monitoring:
metrics:
- rows_processed
- execution_time
- error_count
- data_freshness
alerts:
pipeline_failed:
channels: [slack, pagerduty]
template: |
🚨 *Pipeline Failed*
Pipeline: {pipeline_name}
Stage: {failed_stage}
Error: {error_message}
[View Logs]({logs_url})
data_quality:
trigger: anomaly_detected
conditions:
- row_count: differs_by > 50%
- null_rate: exceeds_threshold
- schema: changed_unexpectedly
stale_data:
trigger: last_update > threshold
threshold: 2_hours
```
---
## Data Quality
### Quality Checks
```yaml
data_quality:
schema_validation:
- required_fields: [id, date, amount]
- field_types:
id: integer
date: date
amount: number
- allowed_values:
status: [active, pending, closed]
statistical_checks:
- null_rate: < 5%
- duplicate_rate: < 1%
- value_range:
amount: [0, 1000000]
business_rules:
- total_equals_sum_of_line_items
- dates_are_not_in_future
- email_format_valid
trend_analysis:
- row_count: within_2_std_of_mean
- total_value: within_expected_range
```
---
## Output Example
**Request**: "Create a daily sales data pipeline"
**Output**:
```markdown
# Daily Sales Data Pipeline
## Pipeline Overview
```
Shopify + Stripe → Transform → BigQuery + Sheets
```
## Schedule
- Runs: 2am daily
- Timezone: UTC
- Retry: 3 attempts
## Extract
### Shopify Orders
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]
```
### Stripe Payments
```yaml
source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]
```
## Transform
```javascript
// Join and clean data
{
date: order.created_at.split('T')[0],
order_id: order.id,
customer: order.email,
revenue: parseFloat(order.total_price),
items: order.line_items.length,
payment_status: payment.status
}
```
## Load
### BigQuery
- Table: `analytics.sales_daily`
- Mode: Append
### Google Sheets
- Sheet: "Daily Sales Dashboard"
- Tab: "Raw Data"
## Quality Checks
- [ ] Row count > 0
- [ ] No null order_ids
- [ ] Revenue sum matches Stripe
## Alerts
- Slack: #data-alerts
- On failure: @data-team
```
---
*Data Pipeline Skill - Part of Claude Office Skills*Related Skills
large-data-with-dask
Specific optimization strategies for Python scripts working with larger-than-memory datasets via Dask.
ipdata-co-automation
Automate Ipdata co tasks via Rube MCP (Composio). Always search tools first for current schemas.
gdpr-data-handling
Implement GDPR-compliant data handling with consent management, data subject rights, and privacy by design. Use when building systems that process EU personal data, implementing privacy controls, o...
fair-data-model-assessment
Assess data models against FAIR principles using RDA-FDMM indicators. Use when: (1) Evaluating vendor-delivered data models for FAIR compliance, (2) Reviewing schemas, ontologies, or data dictionaries before integration, (3) Creating FAIR assessment reports for data governance reviews, (4) Preparing data model documentation for enterprise or regulatory standards, (5) Auditing existing data assets for FAIRness gaps. Covers 41 RDA indicators across Findable, Accessible, Interoperable, Reusable dimensions with maturity scoring (0-4 scale).
etl-pipeline
Build automated ETL (Extract-Transform-Load) pipelines for construction data. Process PDFs, Excel, BIM exports. Generate reports, dashboards, and integrate with other systems. Orchestrate with Airflow or n8n.
docker-database
Configure database containers with security, persistence, and health checks
datarobot-automation
Automate Datarobot tasks via Rube MCP (Composio). Always search tools first for current schemas.
dataql-analysis
Analyze data files using SQL queries with DataQL. Use when working with CSV, JSON, Parquet, Excel files or when the user mentions data analysis, filtering, aggregation, or SQL queries on files.
datahub-connector-pr-review
This skill should be used when the user asks to "review my connector", "check my datahub connector", "review connector code", "audit connector", "review PR", "check code quality", or any request to review/check/audit a DataHub ingestion source. Covers compliance with standards, best practices, testing quality, and merge readiness.
datagma-automation
Automate Datagma tasks via Rube MCP (Composio). Always search tools first for current schemas.
Database Sync
Automate database synchronization, replication, migration, and cross-platform data integration
database-skill
Design and manage relational databases including table creation, migrations, and schema design. Use for database modeling and maintenance.