databricks-python-sdk
Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
Best use case
databricks-python-sdk is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
Teams using databricks-python-sdk should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/databricks-python-sdk/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How databricks-python-sdk Compares
| Feature / Agent | databricks-python-sdk | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# Databricks Development Guide
This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API.
**SDK Documentation:** https://databricks-sdk-py.readthedocs.io/en/latest/
**GitHub Repository:** https://github.com/databricks/databricks-sdk-py
---
## Environment Setup
- Use existing virtual environment at `.venv` or use `uv` to create one
- For Spark operations: `uv pip install databricks-connect`
- For SDK operations: `uv pip install databricks-sdk`
- Databricks CLI version should be 0.278.0 or higher
## Configuration
- Default profile name: `DEFAULT`
- Config file: `~/.databrickscfg`
- Environment variables: `DATABRICKS_HOST`, `DATABRICKS_TOKEN`
---
## Databricks Connect (Spark Operations)
Use `databricks-connect` for running Spark code locally against a Databricks cluster.
```python
from databricks.connect import DatabricksSession
# Auto-detects 'DEFAULT' profile from ~/.databrickscfg
spark = DatabricksSession.builder.getOrCreate()
# With explicit profile
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
# Use spark as normal
df = spark.sql("SELECT * FROM catalog.schema.table")
df.show()
```
**IMPORTANT:** Do NOT set `.master("local[*]")` - this will cause issues with Databricks Connect.
---
## Direct REST API Access
For operations not yet in SDK or overly complex via SDK, use direct REST API:
```python
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Direct API call using authenticated client
response = w.api_client.do(
method="GET",
path="/api/2.0/clusters/list"
)
# POST with body
response = w.api_client.do(
method="POST",
path="/api/2.0/jobs/run-now",
body={"job_id": 123}
)
```
**When to use:** Prefer SDK methods when available. Use `api_client.do` for:
- New API endpoints not yet in SDK
- Complex operations where SDK abstraction is problematic
- Debugging/testing raw API responses
---
## Databricks CLI
```bash
# Check version (should be >= 0.278.0)
databricks --version
# Use specific profile
databricks --profile MY_PROFILE clusters list
# Common commands
databricks clusters list
databricks jobs list
databricks workspace ls /Users/me
```
---
## SDK Documentation Architecture
The SDK documentation follows a predictable URL pattern:
```
Base: https://databricks-sdk-py.readthedocs.io/en/latest/
Workspace APIs: /workspace/{category}/{service}.html
Account APIs: /account/{category}/{service}.html
Authentication: /authentication.html
DBUtils: /dbutils.html
```
### Workspace API Categories
| Category | Services |
|----------|----------|
| `compute` | clusters, cluster_policies, command_execution, instance_pools, libraries |
| `catalog` | catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations |
| `jobs` | jobs |
| `sql` | warehouses, statement_execution, queries, alerts, dashboards |
| `serving` | serving_endpoints |
| `vectorsearch` | vector_search_indexes, vector_search_endpoints |
| `pipelines` | pipelines |
| `workspace` | repos, secrets, workspace, git_credentials |
| `files` | files, dbfs |
| `ml` | experiments, model_registry |
---
## Authentication
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html
### Environment Variables
```bash
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi... # Personal Access Token
```
### Code Patterns
```python
# Auto-detect credentials from environment
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Explicit token auth
w = WorkspaceClient(
host="https://your-workspace.cloud.databricks.com",
token="dapi..."
)
# Azure Service Principal
w = WorkspaceClient(
host="https://adb-xxx.azuredatabricks.net",
azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...",
azure_tenant_id="tenant-id",
azure_client_id="client-id",
azure_client_secret="secret"
)
# Use a named profile from ~/.databrickscfg
w = WorkspaceClient(profile="MY_PROFILE")
```
---
## Core API Reference
### Clusters API
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html
```python
# List all clusters
for cluster in w.clusters.list():
print(f"{cluster.cluster_name}: {cluster.state}")
# Get cluster details
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
# Create a cluster (returns Wait object)
wait = w.clusters.create(
cluster_name="my-cluster",
spark_version=w.clusters.select_spark_version(latest=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=2
)
cluster = wait.result() # Wait for cluster to be running
# Or use create_and_wait for blocking call
cluster = w.clusters.create_and_wait(
cluster_name="my-cluster",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# Start/stop/delete
w.clusters.start(cluster_id="...").result()
w.clusters.stop(cluster_id="...")
w.clusters.delete(cluster_id="...")
```
### Jobs API
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html
```python
from databricks.sdk.service.jobs import Task, NotebookTask
# List jobs
for job in w.jobs.list():
print(f"{job.job_id}: {job.settings.name}")
# Create a job
created = w.jobs.create(
name="my-job",
tasks=[
Task(
task_key="main",
notebook_task=NotebookTask(notebook_path="/Users/me/notebook"),
existing_cluster_id="0123-456789-abcdef"
)
]
)
# Run a job now
run = w.jobs.run_now_and_wait(job_id=created.job_id)
print(f"Run completed: {run.state.result_state}")
# Get run output
output = w.jobs.get_run_output(run_id=run.run_id)
```
### SQL Statement Execution
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html
```python
# Execute SQL query
response = w.statement_execution.execute_statement(
warehouse_id="abc123",
statement="SELECT * FROM catalog.schema.table LIMIT 10",
wait_timeout="30s"
)
# Check status and get results
if response.status.state == StatementState.SUCCEEDED:
for row in response.result.data_array:
print(row)
# For large results, fetch chunks
chunk = w.statement_execution.get_statement_result_chunk_n(
statement_id=response.statement_id,
chunk_index=0
)
```
### SQL Warehouses
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html
```python
# List warehouses
for wh in w.warehouses.list():
print(f"{wh.name}: {wh.state}")
# Get warehouse
warehouse = w.warehouses.get(id="abc123")
# Create warehouse
created = w.warehouses.create_and_wait(
name="my-warehouse",
cluster_size="Small",
max_num_clusters=1,
auto_stop_mins=15
)
# Start/stop
w.warehouses.start(id="abc123").result()
w.warehouses.stop(id="abc123").result()
```
### Unity Catalog - Tables
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html
```python
# List tables in a schema
for table in w.tables.list(catalog_name="main", schema_name="default"):
print(f"{table.full_name}: {table.table_type}")
# Get table info
table = w.tables.get(full_name="main.default.my_table")
print(f"Columns: {[c.name for c in table.columns]}")
# Check if table exists
exists = w.tables.exists(full_name="main.default.my_table")
```
### Unity Catalog - Catalogs & Schemas
**Doc (Catalogs):** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html
**Doc (Schemas):** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
```python
# List catalogs
for catalog in w.catalogs.list():
print(catalog.name)
# Create catalog
w.catalogs.create(name="my_catalog", comment="Description")
# List schemas
for schema in w.schemas.list(catalog_name="main"):
print(schema.name)
# Create schema
w.schemas.create(name="my_schema", catalog_name="main")
```
### Volumes
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html
```python
from databricks.sdk.service.catalog import VolumeType
# List volumes
for vol in w.volumes.list(catalog_name="main", schema_name="default"):
print(f"{vol.full_name}: {vol.volume_type}")
# Create managed volume
w.volumes.create(
catalog_name="main",
schema_name="default",
name="my_volume",
volume_type=VolumeType.MANAGED
)
# Read volume info
vol = w.volumes.read(name="main.default.my_volume")
```
### Files API
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html
```python
# Upload file to volume
w.files.upload(
file_path="/Volumes/main/default/my_volume/data.csv",
contents=open("local_file.csv", "rb")
)
# Download file
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f:
content = f.read()
# List directory contents
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"):
print(f"{entry.name}: {entry.is_directory}")
# Upload/download with progress (parallel)
w.files.upload_from(
file_path="/Volumes/main/default/my_volume/large.parquet",
source_path="/local/path/large.parquet",
use_parallel=True
)
w.files.download_to(
file_path="/Volumes/main/default/my_volume/large.parquet",
destination="/local/output/",
use_parallel=True
)
```
### Serving Endpoints (Model Serving)
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html
```python
# List endpoints
for ep in w.serving_endpoints.list():
print(f"{ep.name}: {ep.state}")
# Get endpoint
endpoint = w.serving_endpoints.get(name="my-endpoint")
# Query endpoint
response = w.serving_endpoints.query(
name="my-endpoint",
inputs={"prompt": "Hello, world!"}
)
# For chat/completions endpoints
response = w.serving_endpoints.query(
name="my-chat-endpoint",
messages=[{"role": "user", "content": "Hello!"}]
)
# Get OpenAI-compatible client
openai_client = w.serving_endpoints.get_open_ai_client()
```
### Vector Search
**Doc (Indexes):** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html
**Doc (Endpoints):** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_endpoints.html
```python
# List vector search indexes
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"):
print(idx.name)
# Query index
results = w.vector_search_indexes.query_index(
index_name="main.default.my_index",
columns=["id", "text", "embedding"],
query_text="search query",
num_results=10
)
for doc in results.result.data_array:
print(doc)
```
### Pipelines (Delta Live Tables)
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html
```python
# List pipelines
for pipeline in w.pipelines.list_pipelines():
print(f"{pipeline.name}: {pipeline.state}")
# Get pipeline
pipeline = w.pipelines.get(pipeline_id="abc123")
# Start pipeline update
w.pipelines.start_update(pipeline_id="abc123")
# Stop pipeline
w.pipelines.stop_and_wait(pipeline_id="abc123")
```
### Secrets
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html
```python
# List secret scopes
for scope in w.secrets.list_scopes():
print(scope.name)
# Create scope
w.secrets.create_scope(scope="my-scope")
# Put secret
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
# Get secret (returns GetSecretResponse with value)
secret = w.secrets.get_secret(scope="my-scope", key="api-key")
# List secrets in scope (metadata only, not values)
for s in w.secrets.list_secrets(scope="my-scope"):
print(s.key)
```
### DBUtils
**Doc:** https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html
```python
# Access dbutils through WorkspaceClient
dbutils = w.dbutils
# File system operations
files = dbutils.fs.ls("/")
dbutils.fs.cp("dbfs:/source", "dbfs:/dest")
dbutils.fs.rm("dbfs:/path", recurse=True)
# Secrets (same as w.secrets but dbutils interface)
value = dbutils.secrets.get(scope="my-scope", key="my-key")
```
---
## Common Patterns
### CRITICAL: Async Applications (FastAPI, etc.)
**The Databricks SDK is fully synchronous.** All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with `asyncio.to_thread()` to avoid blocking the event loop.
```python
import asyncio
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# WRONG - blocks the event loop
async def get_clusters_bad():
return list(w.clusters.list()) # BLOCKS!
# CORRECT - runs in thread pool
async def get_clusters_good():
return await asyncio.to_thread(lambda: list(w.clusters.list()))
# CORRECT - for simple calls
async def get_cluster(cluster_id: str):
return await asyncio.to_thread(w.clusters.get, cluster_id)
# CORRECT - FastAPI endpoint
from fastapi import FastAPI
app = FastAPI()
@app.get("/clusters")
async def list_clusters():
clusters = await asyncio.to_thread(lambda: list(w.clusters.list()))
return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query")
async def run_query(sql: str, warehouse_id: str):
# Wrap the blocking SDK call
response = await asyncio.to_thread(
w.statement_execution.execute_statement,
statement=sql,
warehouse_id=warehouse_id,
wait_timeout="30s"
)
return response.result.data_array
```
**Note:** `WorkspaceClient().config.host` is NOT a network call - it just reads config. No need to wrap property access.
---
### Wait for Long-Running Operations
```python
from datetime import timedelta
# Pattern 1: Use *_and_wait methods
cluster = w.clusters.create_and_wait(
cluster_name="test",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# Pattern 2: Use Wait object
wait = w.clusters.create(...)
cluster = wait.result() # Blocks until ready
# Pattern 3: Manual polling with callback
def progress(cluster):
print(f"State: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running(
cluster_id="...",
timeout=timedelta(minutes=30),
callback=progress
)
```
### Pagination
```python
# All list methods return iterators that handle pagination automatically
for job in w.jobs.list(): # Fetches all pages
print(job.settings.name)
# For manual control
from databricks.sdk.service.jobs import ListJobsRequest
response = w.jobs.list(limit=10)
for job in response:
print(job)
```
### Error Handling
```python
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists
try:
cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
print("Cluster not found")
except PermissionDenied:
print("Access denied")
```
---
## When Uncertain
If I'm unsure about a method, I should:
1. **Check the documentation URL pattern:**
- `https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html`
2. **Common categories:**
- Clusters: `/workspace/compute/clusters.html`
- Jobs: `/workspace/jobs/jobs.html`
- Tables: `/workspace/catalog/tables.html`
- Warehouses: `/workspace/sql/warehouses.html`
- Serving: `/workspace/serving/serving_endpoints.html`
3. **Fetch and verify** before providing guidance on parameters or return types.
---
## Quick Reference Links
| API | Documentation URL |
|-----|-------------------|
| Authentication | https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html |
| Clusters | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html |
| Jobs | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html |
| SQL Warehouses | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html |
| Statement Execution | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html |
| Tables | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html |
| Catalogs | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html |
| Schemas | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html |
| Volumes | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html |
| Files | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html |
| Serving Endpoints | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html |
| Vector Search | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html |
| Pipelines | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html |
| Secrets | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html |
| DBUtils | https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html |
## Related Skills
- **[databricks-config](../databricks-config/SKILL.md)** - profile and authentication setup
- **[databricks-bundles](../databricks-bundles/SKILL.md)** - deploying resources via DABs
- **[databricks-jobs](../databricks-jobs/SKILL.md)** - job orchestration patterns
- **[databricks-unity-catalog](../databricks-unity-catalog/SKILL.md)** - catalog governance
- **[databricks-model-serving](../databricks-model-serving/SKILL.md)** - serving endpoint management
- **[databricks-vector-search](../databricks-vector-search/SKILL.md)** - vector index operations
- **[databricks-lakebase-provisioned](../databricks-lakebase-provisioned/SKILL.md)** - managed PostgreSQL via SDKRelated Skills
spark-python-data-source
Build custom Python data sources for Apache Spark using the PySpark DataSource API — batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say "read from X in Spark" or "write DataFrame to Y" and there's no native connector, this skill applies.
databricks-zerobus-ingest
Build Zerobus Ingest clients for near real-time data ingestion into Databricks Delta tables via gRPC. Use when creating producers that write directly to Unity Catalog tables without a message bus, working with the Zerobus Ingest SDK in Python/Java/Go/TypeScript/Rust, generating Protobuf schemas from UC tables, or implementing stream-based ingestion with ACK handling and retry logic.
databricks-vector-search
Patterns for Databricks Vector Search: create endpoints and indexes, query with filters, manage embeddings. Use when building RAG applications, semantic search, or similarity matching. Covers both storage-optimized and standard endpoints.
databricks-unstructured-pdf-generation
Generate PDF documents from HTML and upload to Unity Catalog volumes. Use for creating test PDFs, demo documents, reports, or evaluation datasets.
databricks-unity-catalog
Unity Catalog system tables and volumes. Use when querying system tables (audit, lineage, billing) or working with volume file operations (upload, download, list files in /Volumes/).
databricks-synthetic-data-gen
Generate realistic synthetic data using Spark + Faker (strongly recommended). Supports serverless execution, multiple output formats (Parquet/JSON/CSV/Delta), and scales from thousands to millions of rows. For small datasets (<10K rows), can optionally generate locally and upload to volumes. Use when user mentions 'synthetic data', 'test data', 'generate data', 'demo dataset', 'Faker', or 'sample data'.
databricks-spark-structured-streaming
Comprehensive guide to Spark Structured Streaming for production workloads. Use when building streaming pipelines, working with Kafka ingestion, implementing Real-Time Mode (RTM), configuring triggers (processingTime, availableNow), handling stateful operations with watermarks, optimizing checkpoints, performing stream-stream or stream-static joins, writing to multiple sinks, or tuning streaming cost and performance.
databricks-spark-declarative-pipelines
Creates, configures, and updates Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles data ingestion with streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion patterns. Use when building data pipelines, working with Delta Live Tables, ingesting streaming data, implementing change data capture, or when the user mentions SDP, LDP, DLT, Lakeflow pipelines, streaming tables, or bronze/silver/gold medallion architectures.
databricks-model-serving
Deploy and query Databricks Model Serving endpoints. Use when (1) deploying MLflow models or AI agents to endpoints, (2) creating ChatAgent/ResponsesAgent agents, (3) integrating UC Functions or Vector Search tools, (4) querying deployed endpoints, (5) checking endpoint status. Covers classical ML models, custom pyfunc, and GenAI agents.
databricks-mlflow-evaluation
MLflow 3 GenAI agent evaluation. Use when writing mlflow.genai.evaluate() code, creating @scorer functions, using built-in scorers (Guidelines, Correctness, Safety, RetrievalGroundedness), building eval datasets from traces, setting up trace ingestion and production monitoring, aligning judges with MemAlign from domain expert feedback, or running optimize_prompts() with GEPA for automated prompt improvement.
databricks-metric-views
Unity Catalog metric views: define, create, query, and manage governed business metrics in YAML. Use when building standardized KPIs, revenue metrics, order analytics, or any reusable business metrics that need consistent definitions across teams and tools.
databricks-lakebase-provisioned
Patterns and best practices for Lakebase Provisioned (Databricks managed PostgreSQL) for OLTP workloads. Use when creating Lakebase instances, connecting applications or Databricks Apps to PostgreSQL, implementing reverse ETL via synced tables, storing agent or chat memory, or configuring OAuth authentication for Lakebase.