databricks-core-workflow-a
Execute Databricks primary workflow: Delta Lake ETL pipelines. Use when building data ingestion pipelines, implementing medallion architecture, or creating Delta Lake transformations. Trigger with phrases like "databricks ETL", "delta lake pipeline", "medallion architecture", "databricks data pipeline", "bronze silver gold".
Best use case
databricks-core-workflow-a is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Execute Databricks primary workflow: Delta Lake ETL pipelines. Use when building data ingestion pipelines, implementing medallion architecture, or creating Delta Lake transformations. Trigger with phrases like "databricks ETL", "delta lake pipeline", "medallion architecture", "databricks data pipeline", "bronze silver gold".
Teams using databricks-core-workflow-a should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/databricks-core-workflow-a/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How databricks-core-workflow-a Compares
| Feature / Agent | databricks-core-workflow-a | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Execute Databricks primary workflow: Delta Lake ETL pipelines. Use when building data ingestion pipelines, implementing medallion architecture, or creating Delta Lake transformations. Trigger with phrases like "databricks ETL", "delta lake pipeline", "medallion architecture", "databricks data pipeline", "bronze silver gold".
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Databricks Core Workflow A: Delta Lake ETL
## Overview
Build production Delta Lake ETL pipelines using the medallion architecture (Bronze > Silver > Gold). Uses Auto Loader (`cloudFiles`) for incremental ingestion, `MERGE INTO` for upserts, and Delta Live Tables for declarative pipelines.
## Prerequisites
- Completed `databricks-install-auth` setup
- Unity Catalog enabled with catalogs/schemas created
- Access to cloud storage for raw data (S3, ADLS, GCS)
## Architecture
```
Raw Sources (S3/ADLS/GCS)
│ Auto Loader (cloudFiles)
▼
Bronze (raw + metadata)
│ Cleanse, deduplicate, type-cast
▼
Silver (conformed)
│ Aggregate, join, feature engineer
▼
Gold (analytics-ready)
```
## Instructions
### Step 1: Bronze Layer — Raw Ingestion with Auto Loader
Auto Loader (`cloudFiles` format) incrementally processes new files as they arrive. It handles schema inference, evolution, and scales to millions of files.
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit
spark = SparkSession.builder.getOrCreate()
# Streaming ingestion with Auto Loader
bronze_stream = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoints/bronze/orders/schema")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://data-lake/raw/orders/")
)
# Add ingestion metadata
bronze_with_meta = (
bronze_stream
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_source_system", lit("orders-api"))
)
# Write to bronze Delta table
(bronze_with_meta.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/bronze/orders/data")
.option("mergeSchema", "true")
.toTable("prod_catalog.bronze.raw_orders"))
```
### Step 2: Silver Layer — Cleansing and Deduplication
Read from Bronze, apply business logic, and MERGE INTO Silver with upsert semantics.
```python
from pyspark.sql.functions import col, trim, lower, to_timestamp, sha2, concat_ws
from delta.tables import DeltaTable
# Read new records from bronze (batch mode for scheduled jobs)
bronze_df = spark.table("prod_catalog.bronze.raw_orders")
# Apply transformations
silver_df = (
bronze_df
.withColumn("order_id", col("order_id").cast("string"))
.withColumn("customer_email", lower(trim(col("customer_email"))))
.withColumn("order_date", to_timestamp(col("order_date"), "yyyy-MM-dd'T'HH:mm:ss"))
.withColumn("amount", col("amount").cast("decimal(12,2)"))
.withColumn("email_hash", sha2(col("customer_email"), 256))
.filter(col("order_id").isNotNull())
.dropDuplicates(["order_id"])
)
# Upsert into silver with MERGE
if spark.catalog.tableExists("prod_catalog.silver.orders"):
target = DeltaTable.forName(spark, "prod_catalog.silver.orders")
(target.alias("t")
.merge(silver_df.alias("s"), "t.order_id = s.order_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
else:
silver_df.write.format("delta").saveAsTable("prod_catalog.silver.orders")
```
### Step 3: Gold Layer — Business Aggregations
Aggregate Silver data into analytics-ready tables. Use partition-level overwrites for efficient updates.
```python
from pyspark.sql.functions import sum as _sum, count, avg, date_trunc
# Daily order metrics
gold_metrics = (
spark.table("prod_catalog.silver.orders")
.withColumn("order_day", date_trunc("day", col("order_date")))
.groupBy("order_day", "region")
.agg(
count("order_id").alias("total_orders"),
_sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value"),
)
)
# Overwrite only changed partitions
(gold_metrics.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", f"order_day >= '{target_date}'")
.saveAsTable("prod_catalog.gold.daily_order_metrics"))
```
### Step 4: Delta Table Maintenance
```sql
-- Compact small files (bin-packing)
OPTIMIZE prod_catalog.silver.orders;
-- Z-order for query performance on frequently filtered columns
OPTIMIZE prod_catalog.silver.orders ZORDER BY (order_date, region);
-- Or use Liquid Clustering (DBR 13.3+) — replaces partitioning + Z-order
ALTER TABLE prod_catalog.silver.orders CLUSTER BY (order_date, region);
OPTIMIZE prod_catalog.silver.orders;
-- Clean up old file versions (default: 7 days)
VACUUM prod_catalog.silver.orders RETAIN 168 HOURS;
-- Compute statistics for query optimizer
ANALYZE TABLE prod_catalog.silver.orders COMPUTE STATISTICS;
```
### Step 5: Delta Live Tables (Declarative Pipeline)
DLT manages orchestration, data quality, lineage, and error handling automatically.
```python
import dlt
from pyspark.sql.functions import col, current_timestamp
@dlt.table(
comment="Raw orders from Auto Loader",
table_properties={"quality": "bronze"},
)
def bronze_orders():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("s3://data-lake/raw/orders/")
.withColumn("_ingested_at", current_timestamp())
)
@dlt.table(comment="Cleansed orders")
@dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
def silver_orders():
return (
dlt.read_stream("bronze_orders")
.withColumn("amount", col("amount").cast("decimal(12,2)"))
.dropDuplicates(["order_id"])
)
@dlt.table(comment="Daily revenue metrics")
def gold_daily_revenue():
return (
dlt.read("silver_orders")
.groupBy("region", "order_date")
.agg({"amount": "sum", "order_id": "count"})
)
```
### Step 6: Schedule the Pipeline
```python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
CreateJob, Task, NotebookTask, JobCluster, CronSchedule,
)
from databricks.sdk.service.compute import ClusterSpec, AutoScale
w = WorkspaceClient()
job = w.jobs.create(
name="daily-orders-etl",
tasks=[
Task(task_key="bronze", job_cluster_key="etl",
notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/bronze")),
Task(task_key="silver", job_cluster_key="etl",
notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/silver"),
depends_on=[{"task_key": "bronze"}]),
Task(task_key="gold", job_cluster_key="etl",
notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/gold"),
depends_on=[{"task_key": "silver"}]),
],
job_clusters=[JobCluster(
job_cluster_key="etl",
new_cluster=ClusterSpec(
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
autoscale=AutoScale(min_workers=1, max_workers=4),
),
)],
schedule=CronSchedule(quartz_cron_expression="0 0 6 * * ?", timezone_id="UTC"),
max_concurrent_runs=1,
)
print(f"Created job: {job.job_id}")
```
## Output
- Bronze layer with raw data, Auto Loader schema evolution, and ingestion metadata
- Silver layer with cleansed, deduplicated, type-cast data via MERGE upserts
- Gold layer with business-ready aggregations
- Table maintenance schedule (OPTIMIZE, VACUUM, ANALYZE)
- Optional DLT pipeline with built-in data quality expectations
## Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| `AnalysisException: mergeSchema` | Source schema changed | Auto Loader handles this; for batch add `.option("mergeSchema", "true")` |
| `ConcurrentAppendException` | Multiple jobs writing same table | Use MERGE with retry logic or serialize writes via `max_concurrent_runs=1` |
| `Null primary key` | Bad source data | Add `@dlt.expect_or_drop` or `.filter(col("pk").isNotNull())` |
| `java.lang.OutOfMemoryError` | Driver collecting large results | Never call `.collect()` on large data; use `.write` to keep distributed |
| `VACUUM below retention` | Retention < 7 days | Set `delta.deletedFileRetentionDuration = '168 hours'` minimum |
## Examples
### Quick Pipeline Validation
```sql
-- Verify row counts flow through medallion layers
SELECT 'bronze' AS layer, COUNT(*) AS rows FROM prod_catalog.bronze.raw_orders
UNION ALL SELECT 'silver', COUNT(*) FROM prod_catalog.silver.orders
UNION ALL SELECT 'gold', COUNT(*) FROM prod_catalog.gold.daily_order_metrics;
```
## Resources
- [Auto Loader](https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/auto-loader/)
- [Delta Lake MERGE INTO](https://docs.databricks.com/aws/en/delta/merge)
- [OPTIMIZE](https://docs.databricks.com/aws/en/sql/language-manual/delta-optimize)
- [Delta Live Tables](https://docs.databricks.com/aws/en/delta-live-tables/)
## Next Steps
For ML workflows, see `databricks-core-workflow-b`.Related Skills
step-functions-workflow
Step Functions Workflow - Auto-activating skill for AWS Skills. Triggers on: step functions workflow, step functions workflow Part of the AWS Skills skill category.
sprint-workflow
Execute this skill should be used when the user asks about "how sprints work", "sprint phases", "iteration workflow", "convergent development", "sprint lifecycle", "when to use sprints", or wants to understand the sprint execution model and its convergent diffusion approach. Use when appropriate context detected. Trigger with relevant phrases based on skill purpose.
scorecard-marketing
Build quiz and assessment funnels that generate qualified leads at 30-50% conversion. Use when the user mentions "lead magnet", "quiz funnel", "assessment tool", "lead generation", or "score-based segmentation". Covers question design, dynamic results by tier, and automated follow-up sequences. For landing page conversion, see cro-methodology. For full marketing plans, see one-page-marketing. Trigger with 'scorecard', 'marketing'.
n8n-workflow-generator
N8N Workflow Generator - Auto-activating skill for Business Automation. Triggers on: n8n workflow generator, n8n workflow generator Part of the Business Automation skill category.
jira-workflow-creator
Jira Workflow Creator - Auto-activating skill for Enterprise Workflows. Triggers on: jira workflow creator, jira workflow creator Part of the Enterprise Workflows skill category.
building-gitops-workflows
This skill enables Claude to construct GitOps workflows using ArgoCD and Flux. It is designed to generate production-ready configurations, implement best practices, and ensure a security-first approach for Kubernetes deployments. Use this skill when the user explicitly requests "GitOps workflow", "ArgoCD", "Flux", or asks for help with setting up a continuous delivery pipeline using GitOps principles. The skill will generate the necessary configuration files and setup code based on the user's specific requirements and infrastructure.
git-workflow-manager
Git Workflow Manager - Auto-activating skill for DevOps Basics. Triggers on: git workflow manager, git workflow manager Part of the DevOps Basics skill category.
fathom-core-workflow-b
Sync Fathom meeting data to CRM and build automated follow-up workflows. Use when integrating Fathom with Salesforce, HubSpot, or custom CRMs, or creating automated post-meeting email summaries. Trigger with phrases like "fathom crm sync", "fathom salesforce", "fathom follow-up", "fathom post-meeting workflow".
fathom-core-workflow-a
Build a meeting analytics pipeline with Fathom transcripts and summaries. Use when extracting insights from meetings, building CRM sync, or creating automated meeting follow-up workflows. Trigger with phrases like "fathom analytics", "fathom meeting pipeline", "fathom transcript analysis", "fathom action items sync".
exa-core-workflow-b
Execute Exa findSimilar, getContents, answer, and streaming answer workflows. Use when finding pages similar to a URL, retrieving content for known URLs, or getting AI-generated answers with citations. Trigger with phrases like "exa find similar", "exa get contents", "exa answer", "exa similarity search", "findSimilarAndContents".
exa-core-workflow-a
Execute Exa neural search with contents, date filters, and domain scoping. Use when building search features, implementing RAG context retrieval, or querying the web with semantic understanding. Trigger with phrases like "exa search", "exa neural search", "search with exa", "exa searchAndContents", "exa query".
evernote-core-workflow-b
Execute Evernote secondary workflow: Search and Retrieval. Use when implementing search features, finding notes, filtering content, or building search interfaces. Trigger with phrases like "search evernote", "find evernote notes", "evernote search", "query evernote".