snowflake-core-workflow-b

Execute Snowflake data transformation with streams, tasks, and dynamic tables. Use when building ELT pipelines, scheduling transformations, or implementing change data capture with Snowflake streams. Trigger with phrases like "snowflake transform", "snowflake ELT", "snowflake stream", "snowflake task", "snowflake pipeline", "snowflake dynamic table", "snowflake CDC".

1,868 stars

Best use case

snowflake-core-workflow-b is best used when you need a repeatable AI agent workflow instead of a one-off prompt.

Execute Snowflake data transformation with streams, tasks, and dynamic tables. Use when building ELT pipelines, scheduling transformations, or implementing change data capture with Snowflake streams. Trigger with phrases like "snowflake transform", "snowflake ELT", "snowflake stream", "snowflake task", "snowflake pipeline", "snowflake dynamic table", "snowflake CDC".

Teams using snowflake-core-workflow-b should expect a more consistent output, faster repeated execution, less prompt rewriting.

When to use this skill

  • You want a reusable workflow that can be run more than once with consistent structure.

When not to use this skill

  • You only need a quick one-off answer and do not need a reusable workflow.
  • You cannot install or maintain the underlying files, dependencies, or repository context.

Installation

Claude Code / Cursor / Codex

$curl -o ~/.claude/skills/snowflake-core-workflow-b/SKILL.md --create-dirs "https://raw.githubusercontent.com/jeremylongshore/claude-code-plugins-plus-skills/main/plugins/saas-packs/snowflake-pack/skills/snowflake-core-workflow-b/SKILL.md"

Manual Installation

  1. Download SKILL.md from GitHub
  2. Place it in .claude/skills/snowflake-core-workflow-b/SKILL.md inside your project
  3. Restart your AI agent — it will auto-discover the skill

How snowflake-core-workflow-b Compares

Feature / Agentsnowflake-core-workflow-bStandard Approach
Platform SupportNot specifiedLimited / Varies
Context Awareness High Baseline
Installation ComplexityUnknownN/A

Frequently Asked Questions

What does this skill do?

Execute Snowflake data transformation with streams, tasks, and dynamic tables. Use when building ELT pipelines, scheduling transformations, or implementing change data capture with Snowflake streams. Trigger with phrases like "snowflake transform", "snowflake ELT", "snowflake stream", "snowflake task", "snowflake pipeline", "snowflake dynamic table", "snowflake CDC".

Where can I find the source code?

You can find the source code on GitHub using the link provided at the top of the page.

Related Guides

SKILL.md Source

# Snowflake Core Workflow B — Data Transformation

## Overview

Build ELT pipelines using streams (change data capture), tasks (scheduling), and dynamic tables (declarative transforms).

## Prerequisites

- Data loaded into Snowflake (via `snowflake-core-workflow-a`)
- Understanding of ELT vs ETL patterns
- Role with `CREATE TASK`, `CREATE STREAM` privileges

## Instructions

### Step 1: Create a Stream for Change Data Capture

```sql
-- Track changes on the raw orders table
CREATE OR REPLACE STREAM orders_stream ON TABLE raw_orders
  APPEND_ONLY = FALSE;

-- Append-only stream (lighter weight, inserts only)
CREATE OR REPLACE STREAM events_stream ON TABLE raw_events
  APPEND_ONLY = TRUE;

-- Check what's changed since last consumption
SELECT * FROM orders_stream;
-- METADATA$ACTION = 'INSERT' | 'DELETE'
-- METADATA$ISUPDATE = TRUE if row is part of an UPDATE
-- METADATA$ROW_ID = unique row identifier
```

### Step 2: Create a Task to Process Stream Data

```sql
-- Transform task runs when stream has data
CREATE OR REPLACE TASK transform_orders
  WAREHOUSE = TRANSFORM_WH
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
  MERGE INTO dim_orders AS target
  USING (
    SELECT
      order_id,
      customer_id,
      amount::DECIMAL(12,2) AS amount,
      order_date::TIMESTAMP_NTZ AS order_date,
      CASE
        WHEN amount >= 1000 THEN 'high_value'
        WHEN amount >= 100 THEN 'medium_value'
        ELSE 'standard'
      END AS order_tier,
      CURRENT_TIMESTAMP() AS processed_at
    FROM orders_stream
    WHERE METADATA$ACTION = 'INSERT'
  ) AS source
  ON target.order_id = source.order_id
  WHEN MATCHED THEN UPDATE SET
    target.amount = source.amount,
    target.order_tier = source.order_tier,
    target.processed_at = source.processed_at
  WHEN NOT MATCHED THEN INSERT
    (order_id, customer_id, amount, order_date, order_tier, processed_at)
  VALUES
    (source.order_id, source.customer_id, source.amount,
     source.order_date, source.order_tier, source.processed_at);

-- Enable the task
ALTER TASK transform_orders RESUME;
```

### Step 3: Build a Task DAG (Directed Acyclic Graph)

```sql
-- Root task: aggregate daily metrics
CREATE OR REPLACE TASK daily_metrics_root
  WAREHOUSE = TRANSFORM_WH
  SCHEDULE = 'USING CRON 0 6 * * * America/New_York'
AS
  INSERT INTO daily_order_metrics
  SELECT
    CURRENT_DATE() - 1 AS metric_date,
    COUNT(*) AS total_orders,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
  FROM dim_orders
  WHERE order_date >= CURRENT_DATE() - 1
    AND order_date < CURRENT_DATE();

-- Child task: runs after root completes
CREATE OR REPLACE TASK update_customer_segments
  WAREHOUSE = TRANSFORM_WH
  AFTER daily_metrics_root
AS
  MERGE INTO customer_segments AS target
  USING (
    SELECT customer_id,
      COUNT(*) AS order_count,
      SUM(amount) AS lifetime_value,
      CASE
        WHEN SUM(amount) >= 10000 THEN 'platinum'
        WHEN SUM(amount) >= 5000 THEN 'gold'
        WHEN SUM(amount) >= 1000 THEN 'silver'
        ELSE 'bronze'
      END AS segment
    FROM dim_orders GROUP BY customer_id
  ) AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET
    target.order_count = source.order_count,
    target.lifetime_value = source.lifetime_value,
    target.segment = source.segment
  WHEN NOT MATCHED THEN INSERT VALUES
    (source.customer_id, source.order_count, source.lifetime_value, source.segment);

-- Resume tasks (children first, then root)
ALTER TASK update_customer_segments RESUME;
ALTER TASK daily_metrics_root RESUME;
```

### Step 4: Dynamic Tables (Declarative Alternative)

```sql
-- Auto-refreshes based on target freshness — no streams/tasks needed
CREATE OR REPLACE DYNAMIC TABLE customer_360
  TARGET_LAG = '10 minutes'
  WAREHOUSE = TRANSFORM_WH
AS
  SELECT
    c.customer_id, c.name, c.email,
    COUNT(o.order_id) AS total_orders,
    COALESCE(SUM(o.amount), 0) AS lifetime_value,
    MAX(o.order_date) AS last_order_date,
    DATEDIFF('day', MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order
  FROM customers c
  LEFT JOIN dim_orders o ON c.customer_id = o.customer_id
  GROUP BY c.customer_id, c.name, c.email;

-- Monitor refresh status
SELECT name, target_lag, refresh_mode, scheduling_state
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLES())
WHERE name = 'CUSTOMER_360';
```

### Step 5: Monitor Pipelines

```sql
-- Task run history
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD(hours, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

-- Find failed runs
SELECT name, state, error_message, scheduled_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED'
  AND scheduled_time >= DATEADD(hours, -24, CURRENT_TIMESTAMP());

-- Stream lag check — if STALE = TRUE, data may be lost
SHOW STREAMS LIKE 'orders_stream';
```

## Error Handling

| Error | Cause | Solution |
|-------|-------|----------|
| `Task is suspended` | Not resumed after creation | `ALTER TASK x RESUME` |
| `Stream is stale` | Data retention exceeded | Recreate stream; increase `DATA_RETENTION_TIME_IN_DAYS` |
| `Warehouse does not exist` | Wrong warehouse in task | Verify warehouse name |
| `MERGE: duplicate rows` | Non-unique join key | Add dedup CTE before MERGE |
| `Dynamic table refresh failed` | Source schema changed | Check upstream table definitions |

## Resources

- [Streams and Tasks](https://docs.snowflake.com/en/user-guide/data-pipelines-intro)
- [Dynamic Tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-about)
- [CREATE TASK](https://docs.snowflake.com/en/sql-reference/sql/create-task)

## Next Steps

For common errors and troubleshooting, see `snowflake-common-errors`.

Related Skills

calendar-to-workflow

1868
from jeremylongshore/claude-code-plugins-plus-skills

Converts calendar events and schedules into Claude Code workflows, meeting prep documents, and standup notes. Use when the user mentions calendar events, meeting prep, standup generation, or scheduling workflows. Trigger with phrases like "prep for my meetings", "generate standup notes", "create workflow from calendar", or "summarize today's schedule".

workhuman-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Workhuman core workflow b for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow b".

workhuman-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Workhuman core workflow a for employee recognition and rewards API. Use when integrating Workhuman Social Recognition, or building recognition workflows with HRIS systems. Trigger: "workhuman core workflow a".

wispr-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Wispr Flow core workflow b for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow b".

wispr-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Wispr Flow core workflow a for voice-to-text API integration. Use when integrating Wispr Flow dictation, WebSocket streaming, or building voice-powered applications. Trigger: "wispr core workflow a".

windsurf-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Windsurf's secondary workflow: Workflows, Memories, and reusable automation. Use when creating reusable Cascade workflows, managing persistent memories, or automating repetitive development tasks. Trigger with phrases like "windsurf workflow", "windsurf automation", "windsurf memories", "cascade workflow", "windsurf slash command".

windsurf-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Windsurf's primary workflow: Cascade Write mode for multi-file agentic coding. Use when building features, refactoring across files, or performing complex code tasks. Trigger with phrases like "windsurf cascade write", "windsurf agentic coding", "windsurf multi-file edit", "cascade write mode", "windsurf build feature".

webflow-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Webflow secondary workflows — Sites management, Pages API, Forms submissions, Ecommerce (products/orders/inventory), and Custom Code via the Data API v2. Use when managing sites, reading pages, handling form data, or working with Webflow Ecommerce products and orders. Trigger with phrases like "webflow sites", "webflow pages", "webflow forms", "webflow ecommerce", "webflow products", "webflow orders".

webflow-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute the primary Webflow workflow — CMS content management: list collections, CRUD items, publish items, and manage content lifecycle via the Data API v2. Use when working with Webflow CMS collections and items, managing blog posts, team members, or any dynamic content. Trigger with phrases like "webflow CMS", "webflow collections", "webflow items", "create webflow content", "manage webflow CMS", "webflow content management".

veeva-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Veeva Vault core workflow b for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow b".

veeva-core-workflow-a

1868
from jeremylongshore/claude-code-plugins-plus-skills

Veeva Vault core workflow a for REST API and clinical operations. Use when working with Veeva Vault document management and CRM. Trigger: "veeva core workflow a".

vastai-core-workflow-b

1868
from jeremylongshore/claude-code-plugins-plus-skills

Execute Vast.ai secondary workflow: multi-instance orchestration, spot recovery, and cost optimization. Use when running distributed training, handling spot preemption, or optimizing GPU spend across multiple instances. Trigger with phrases like "vastai distributed training", "vastai spot recovery", "vastai multi-gpu", "vastai cost optimization".