snowflake-core-workflow-b
Execute Snowflake data transformation with streams, tasks, and dynamic tables. Use when building ELT pipelines, scheduling transformations, or implementing change data capture with Snowflake streams. Trigger with phrases like "snowflake transform", "snowflake ELT", "snowflake stream", "snowflake task", "snowflake pipeline", "snowflake dynamic table", "snowflake CDC".
Best use case
snowflake-core-workflow-b is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Execute Snowflake data transformation with streams, tasks, and dynamic tables. Use when building ELT pipelines, scheduling transformations, or implementing change data capture with Snowflake streams. Trigger with phrases like "snowflake transform", "snowflake ELT", "snowflake stream", "snowflake task", "snowflake pipeline", "snowflake dynamic table", "snowflake CDC".
Teams using snowflake-core-workflow-b should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/snowflake-core-workflow-b/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How snowflake-core-workflow-b Compares
| Feature / Agent | snowflake-core-workflow-b | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Execute Snowflake data transformation with streams, tasks, and dynamic tables. Use when building ELT pipelines, scheduling transformations, or implementing change data capture with Snowflake streams. Trigger with phrases like "snowflake transform", "snowflake ELT", "snowflake stream", "snowflake task", "snowflake pipeline", "snowflake dynamic table", "snowflake CDC".
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
Best AI Skills for Claude
Explore the best AI skills for Claude and Claude Code across coding, research, workflow automation, documentation, and agent operations.
ChatGPT vs Claude for Agent Skills
Compare ChatGPT and Claude for AI agent skills across coding, writing, research, and reusable workflow execution.
Cursor vs Codex for AI Workflows
Compare Cursor and Codex for AI coding workflows, repository assistance, debugging, refactoring, and reusable developer skills.
SKILL.md Source
# Snowflake Core Workflow B — Data Transformation
## Overview
Build ELT pipelines using streams (change data capture), tasks (scheduling), and dynamic tables (declarative transforms).
## Prerequisites
- Data loaded into Snowflake (via `snowflake-core-workflow-a`)
- Understanding of ELT vs ETL patterns
- Role with `CREATE TASK`, `CREATE STREAM` privileges
## Instructions
### Step 1: Create a Stream for Change Data Capture
```sql
-- Track changes on the raw orders table
CREATE OR REPLACE STREAM orders_stream ON TABLE raw_orders
APPEND_ONLY = FALSE;
-- Append-only stream (lighter weight, inserts only)
CREATE OR REPLACE STREAM events_stream ON TABLE raw_events
APPEND_ONLY = TRUE;
-- Check what's changed since last consumption
SELECT * FROM orders_stream;
-- METADATA$ACTION = 'INSERT' | 'DELETE'
-- METADATA$ISUPDATE = TRUE if row is part of an UPDATE
-- METADATA$ROW_ID = unique row identifier
```
### Step 2: Create a Task to Process Stream Data
```sql
-- Transform task runs when stream has data
CREATE OR REPLACE TASK transform_orders
WAREHOUSE = TRANSFORM_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
MERGE INTO dim_orders AS target
USING (
SELECT
order_id,
customer_id,
amount::DECIMAL(12,2) AS amount,
order_date::TIMESTAMP_NTZ AS order_date,
CASE
WHEN amount >= 1000 THEN 'high_value'
WHEN amount >= 100 THEN 'medium_value'
ELSE 'standard'
END AS order_tier,
CURRENT_TIMESTAMP() AS processed_at
FROM orders_stream
WHERE METADATA$ACTION = 'INSERT'
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
target.amount = source.amount,
target.order_tier = source.order_tier,
target.processed_at = source.processed_at
WHEN NOT MATCHED THEN INSERT
(order_id, customer_id, amount, order_date, order_tier, processed_at)
VALUES
(source.order_id, source.customer_id, source.amount,
source.order_date, source.order_tier, source.processed_at);
-- Enable the task
ALTER TASK transform_orders RESUME;
```
### Step 3: Build a Task DAG (Directed Acyclic Graph)
```sql
-- Root task: aggregate daily metrics
CREATE OR REPLACE TASK daily_metrics_root
WAREHOUSE = TRANSFORM_WH
SCHEDULE = 'USING CRON 0 6 * * * America/New_York'
AS
INSERT INTO daily_order_metrics
SELECT
CURRENT_DATE() - 1 AS metric_date,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers
FROM dim_orders
WHERE order_date >= CURRENT_DATE() - 1
AND order_date < CURRENT_DATE();
-- Child task: runs after root completes
CREATE OR REPLACE TASK update_customer_segments
WAREHOUSE = TRANSFORM_WH
AFTER daily_metrics_root
AS
MERGE INTO customer_segments AS target
USING (
SELECT customer_id,
COUNT(*) AS order_count,
SUM(amount) AS lifetime_value,
CASE
WHEN SUM(amount) >= 10000 THEN 'platinum'
WHEN SUM(amount) >= 5000 THEN 'gold'
WHEN SUM(amount) >= 1000 THEN 'silver'
ELSE 'bronze'
END AS segment
FROM dim_orders GROUP BY customer_id
) AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET
target.order_count = source.order_count,
target.lifetime_value = source.lifetime_value,
target.segment = source.segment
WHEN NOT MATCHED THEN INSERT VALUES
(source.customer_id, source.order_count, source.lifetime_value, source.segment);
-- Resume tasks (children first, then root)
ALTER TASK update_customer_segments RESUME;
ALTER TASK daily_metrics_root RESUME;
```
### Step 4: Dynamic Tables (Declarative Alternative)
```sql
-- Auto-refreshes based on target freshness — no streams/tasks needed
CREATE OR REPLACE DYNAMIC TABLE customer_360
TARGET_LAG = '10 minutes'
WAREHOUSE = TRANSFORM_WH
AS
SELECT
c.customer_id, c.name, c.email,
COUNT(o.order_id) AS total_orders,
COALESCE(SUM(o.amount), 0) AS lifetime_value,
MAX(o.order_date) AS last_order_date,
DATEDIFF('day', MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order
FROM customers c
LEFT JOIN dim_orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.email;
-- Monitor refresh status
SELECT name, target_lag, refresh_mode, scheduling_state
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE name = 'CUSTOMER_360';
```
### Step 5: Monitor Pipelines
```sql
-- Task run history
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD(hours, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
-- Find failed runs
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED'
AND scheduled_time >= DATEADD(hours, -24, CURRENT_TIMESTAMP());
-- Stream lag check — if STALE = TRUE, data may be lost
SHOW STREAMS LIKE 'orders_stream';
```
## Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| `Task is suspended` | Not resumed after creation | `ALTER TASK x RESUME` |
| `Stream is stale` | Data retention exceeded | Recreate stream; increase `DATA_RETENTION_TIME_IN_DAYS` |
| `Warehouse does not exist` | Wrong warehouse in task | Verify warehouse name |
| `MERGE: duplicate rows` | Non-unique join key | Add dedup CTE before MERGE |
| `Dynamic table refresh failed` | Source schema changed | Check upstream table definitions |
## Resources
- [Streams and Tasks](https://docs.snowflake.com/en/user-guide/data-pipelines-intro)
- [Dynamic Tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-about)
- [CREATE TASK](https://docs.snowflake.com/en/sql-reference/sql/create-task)
## Next Steps
For common errors and troubleshooting, see `snowflake-common-errors`.Related Skills
calendar-to-workflow
Converts calendar events and schedules into Claude Code workflows, meeting prep documents, and standup notes. Use when the user mentions calendar events, meeting prep, standup generation, or scheduling workflows. Trigger with phrases like "prep for my meetings", "generate standup notes", "create workflow from calendar", or "summarize today's schedule".
workhuman-core-workflow-b
Workhuman core workflow b for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow b".
workhuman-core-workflow-a
Workhuman core workflow a for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow a".
wispr-core-workflow-b
Wispr Flow core workflow b for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow b".
wispr-core-workflow-a
Wispr Flow core workflow a for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow a".
windsurf-core-workflow-b
Execute Windsurf's secondary workflow: Workflows, Memories, and reusable automation. Use when creating reusable Cascade workflows, managing persistent memories, or automating repetitive development tasks. Trigger with phrases like "windsurf workflow", "windsurf automation", "windsurf memories", "cascade workflow", "windsurf slash command".
windsurf-core-workflow-a
Execute Windsurf's primary workflow: Cascade Write mode for multi-file agentic coding. Use when building features, refactoring across files, or performing complex code tasks. Trigger with phrases like "windsurf cascade write", "windsurf agentic coding", "windsurf multi-file edit", "cascade write mode", "windsurf build feature".
webflow-core-workflow-b
Execute Webflow secondary workflows — Sites management, Pages API, Forms submissions, Ecommerce (products/orders/inventory), and Custom Code via the Data API v2. Use when managing sites, reading pages, handling form data, or working with Webflow Ecommerce products and orders. Trigger with phrases like "webflow sites", "webflow pages", "webflow forms", "webflow ecommerce", "webflow products", "webflow orders".
webflow-core-workflow-a
Execute the primary Webflow workflow — CMS content management: list collections, CRUD items, publish items, and manage content lifecycle via the Data API v2. Use when working with Webflow CMS collections and items, managing blog posts, team members, or any dynamic content. Trigger with phrases like "webflow CMS", "webflow collections", "webflow items", "create webflow content", "manage webflow CMS", "webflow content management".
veeva-core-workflow-b
Veeva Vault core workflow b for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow b".
veeva-core-workflow-a
Veeva Vault core workflow a for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow a".
vastai-core-workflow-b
Execute Vast.ai secondary workflow: multi-instance orchestration, spot recovery, and cost optimization. Use when running distributed training, handling spot preemption, or optimizing GPU spend across multiple instances. Trigger with phrases like "vastai distributed training", "vastai spot recovery", "vastai multi-gpu", "vastai cost optimization".