spark-python-data-source
Build custom Python data sources for Apache Spark using the PySpark DataSource API — batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say "read from X in Spark" or "write DataFrame to Y" and there's no native connector, this skill applies.
Best use case
spark-python-data-source is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Build custom Python data sources for Apache Spark using the PySpark DataSource API — batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say "read from X in Spark" or "write DataFrame to Y" and there's no native connector, this skill applies.
Teams using spark-python-data-source should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/spark-python-data-source/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How spark-python-data-source Compares
| Feature / Agent | spark-python-data-source | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Build custom Python data sources for Apache Spark using the PySpark DataSource API — batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say "read from X in Spark" or "write DataFrame to Y" and there's no native connector, this skill applies.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# spark-python-data-source
Build custom Python data sources for Apache Spark 4.0+ to read from and write to external systems in batch and streaming modes.
## Instructions
You are an experienced Spark developer building custom Python data sources using the PySpark DataSource API. Follow these principles and patterns.
### Core Architecture
Each data source follows a flat, single-level inheritance structure:
1. **DataSource class** — entry point that returns readers/writers
2. **Base Reader/Writer classes** — shared logic for options and data processing
3. **Batch classes** — inherit from base + `DataSourceReader`/`DataSourceWriter`
4. **Stream classes** — inherit from base + `DataSourceStreamReader`/`DataSourceStreamWriter`
See [implementation-template.md](references/implementation-template.md) for the full annotated skeleton covering all four modes (batch read/write, stream read/write).
### Spark-Specific Design Constraints
These are specific to the PySpark DataSource API and its driver/executor architecture — general Python best practices (clean code, minimal dependencies, no premature abstraction) still apply but aren't repeated here.
**Flat single-level inheritance only.** PySpark serializes reader/writer instances to ship them to executors. Complex inheritance hierarchies and abstract base classes break serialization and make cross-process debugging painful. Use one shared base class mixed with the PySpark interface (e.g., `class YourBatchWriter(YourWriter, DataSourceWriter)`).
**Import third-party libraries inside executor methods.** The `read()` and `write()` methods run on remote executor processes that don't share the driver's Python environment. Top-level imports from the driver won't be available on executors — always import libraries like `requests` or database drivers inside the methods that run on workers.
**Minimize dependencies.** Every package you add must be installed on all executor nodes in the cluster, not just the driver. Prefer the standard library; when external packages are needed, keep them few and well-known.
**No async/await** unless the external system's SDK is async-only. The PySpark DataSource API is synchronous, so async adds complexity with no benefit.
### Project Setup
Create a Python project using a packaging tool such as `uv`, `poetry`, or `hatch`. Examples use `uv` (substitute your tool of choice):
```bash
uv init your-datasource
cd your-datasource
uv add pyspark pytest pytest-spark
```
```
your-datasource/
├── pyproject.toml
├── src/
│ └── your_datasource/
│ ├── __init__.py
│ └── datasource.py
└── tests/
├── conftest.py
└── test_datasource.py
```
Run all commands through the packaging tool so they execute within the correct virtual environment:
```bash
uv run pytest # Run tests
uv run ruff check src/ # Lint
uv run ruff format src/ # Format
uv build # Build wheel
```
### Key Implementation Decisions
**Partitioning Strategy** — choose based on data source characteristics:
- Time-based: for APIs with temporal data
- Token-range: for distributed databases
- ID-range: for paginated APIs
- See [partitioning-patterns.md](references/partitioning-patterns.md) for implementations of each strategy
**Authentication** — support multiple methods in priority order:
- Databricks Unity Catalog credentials
- Cloud default credentials (managed identity)
- Explicit credentials (service principal, API key, username/password)
- See [authentication-patterns.md](references/authentication-patterns.md) for patterns with fallback chains
**Type Conversion** — map between Spark and external types:
- Handle nulls, timestamps, UUIDs, collections
- See [type-conversion.md](references/type-conversion.md) for bidirectional mapping tables and helpers
**Streaming Offsets** — design for exactly-once semantics:
- JSON-serializable offset class
- Non-overlapping partition boundaries
- See [streaming-patterns.md](references/streaming-patterns.md) for offset tracking and watermark patterns
**Error Handling** — implement retries and resilience:
- Exponential backoff for transient failures (network, rate limits)
- Circuit breakers for cascading failures
- See [error-handling.md](references/error-handling.md) for retry decorators and failure classification
### Testing
```python
import pytest
from unittest.mock import patch, Mock
@pytest.fixture
def spark():
from pyspark.sql import SparkSession
return SparkSession.builder.master("local[2]").getOrCreate()
def test_data_source_name():
assert YourDataSource.name() == "your-format"
def test_writer_sends_data(spark):
with patch('requests.post') as mock_post:
mock_post.return_value = Mock(status_code=200)
df = spark.createDataFrame([(1, "test")], ["id", "value"])
df.write.format("your-format").option("url", "http://api").save()
assert mock_post.called
```
See [testing-patterns.md](references/testing-patterns.md) for unit/integration test patterns, fixtures, and running tests.
### Reference Implementations
Study these for real-world patterns:
- [cyber-spark-data-connectors](https://github.com/alexott/cyber-spark-data-connectors) — Sentinel, Splunk, REST
- [spark-cassandra-data-source](https://github.com/alexott/spark-cassandra-data-source) — Token-range partitioning
- [pyspark-hubspot](https://github.com/dgomez04/pyspark-hubspot) — REST API pagination
- [pyspark-mqtt](https://github.com/databricks-industry-solutions/python-data-sources/tree/main/mqtt) — Streaming with TLS
## Example Prompts
```
Create a Spark data source for reading from MongoDB with sharding support
Build a streaming connector for RabbitMQ with at-least-once delivery
Implement a batch writer for Snowflake with staged uploads
Write a data source for REST API with OAuth2 authentication and pagination
```
## Related
- databricks-testing: Test data sources on Databricks clusters
- databricks-spark-declarative-pipelines: Use custom sources in DLT pipelines
- python-dev: Python development best practices
## References
- [implementation-template.md](references/implementation-template.md) — Full annotated skeleton; read when starting a new data source
- [partitioning-patterns.md](references/partitioning-patterns.md) — Read when the source supports parallel reads and you need to split work across executors
- [authentication-patterns.md](references/authentication-patterns.md) — Read when the external system requires credentials or tokens
- [type-conversion.md](references/type-conversion.md) — Read when mapping between Spark types and the external system's type system
- [streaming-patterns.md](references/streaming-patterns.md) — Read when implementing `DataSourceStreamReader` or `DataSourceStreamWriter`
- [error-handling.md](references/error-handling.md) — Read when adding retry logic or handling transient failures
- [testing-patterns.md](references/testing-patterns.md) — Read when writing tests; covers unit, integration, and performance testing
- [production-patterns.md](references/production-patterns.md) — Read when hardening for production: observability, security, input validation
- [Official Databricks Documentation](https://docs.databricks.com/aws/en/pyspark/datasources)
- [Apache Spark Python DataSource Tutorial](https://spark.apache.org/docs/latest/api/python/tutorial/sql/python_data_source.html)
- [awesome-python-datasources](https://github.com/allisonwang-db/awesome-python-datasources) — Directory of community implementationsRelated Skills
databricks-zerobus-ingest
Build Zerobus Ingest clients for near real-time data ingestion into Databricks Delta tables via gRPC. Use when creating producers that write directly to Unity Catalog tables without a message bus, working with the Zerobus Ingest SDK in Python/Java/Go/TypeScript/Rust, generating Protobuf schemas from UC tables, or implementing stream-based ingestion with ACK handling and retry logic.
databricks-vector-search
Patterns for Databricks Vector Search: create endpoints and indexes, query with filters, manage embeddings. Use when building RAG applications, semantic search, or similarity matching. Covers both storage-optimized and standard endpoints.
databricks-unstructured-pdf-generation
Generate PDF documents from HTML and upload to Unity Catalog volumes. Use for creating test PDFs, demo documents, reports, or evaluation datasets.
databricks-unity-catalog
Unity Catalog system tables and volumes. Use when querying system tables (audit, lineage, billing) or working with volume file operations (upload, download, list files in /Volumes/).
databricks-synthetic-data-gen
Generate realistic synthetic data using Spark + Faker (strongly recommended). Supports serverless execution, multiple output formats (Parquet/JSON/CSV/Delta), and scales from thousands to millions of rows. For small datasets (<10K rows), can optionally generate locally and upload to volumes. Use when user mentions 'synthetic data', 'test data', 'generate data', 'demo dataset', 'Faker', or 'sample data'.
databricks-spark-structured-streaming
Comprehensive guide to Spark Structured Streaming for production workloads. Use when building streaming pipelines, working with Kafka ingestion, implementing Real-Time Mode (RTM), configuring triggers (processingTime, availableNow), handling stateful operations with watermarks, optimizing checkpoints, performing stream-stream or stream-static joins, writing to multiple sinks, or tuning streaming cost and performance.
databricks-spark-declarative-pipelines
Creates, configures, and updates Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles data ingestion with streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion patterns. Use when building data pipelines, working with Delta Live Tables, ingesting streaming data, implementing change data capture, or when the user mentions SDP, LDP, DLT, Lakeflow pipelines, streaming tables, or bronze/silver/gold medallion architectures.
databricks-python-sdk
Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
databricks-model-serving
Deploy and query Databricks Model Serving endpoints. Use when (1) deploying MLflow models or AI agents to endpoints, (2) creating ChatAgent/ResponsesAgent agents, (3) integrating UC Functions or Vector Search tools, (4) querying deployed endpoints, (5) checking endpoint status. Covers classical ML models, custom pyfunc, and GenAI agents.
databricks-mlflow-evaluation
MLflow 3 GenAI agent evaluation. Use when writing mlflow.genai.evaluate() code, creating @scorer functions, using built-in scorers (Guidelines, Correctness, Safety, RetrievalGroundedness), building eval datasets from traces, setting up trace ingestion and production monitoring, aligning judges with MemAlign from domain expert feedback, or running optimize_prompts() with GEPA for automated prompt improvement.
databricks-metric-views
Unity Catalog metric views: define, create, query, and manage governed business metrics in YAML. Use when building standardized KPIs, revenue metrics, order analytics, or any reusable business metrics that need consistent definitions across teams and tools.
databricks-lakebase-provisioned
Patterns and best practices for Lakebase Provisioned (Databricks managed PostgreSQL) for OLTP workloads. Use when creating Lakebase instances, connecting applications or Databricks Apps to PostgreSQL, implementing reverse ETL via synced tables, storing agent or chat memory, or configuring OAuth authentication for Lakebase.