Use when building custom Spark data source connectors for external systems (databases, APIs, message queues), implementing batch/streaming readers/writers, or creating data source plugins for systems without native Spark support. Triggers - "build Spark data source", "create Spark connector", "implement Spark reader/writer", "connect Spark to [system]", "streaming data source"
Builds custom Apache Spark data source connectors for external systems in Python with batch and streaming support.
/plugin marketplace add https://www.claudepluginhub.com/api/plugins/databricks-solutions-databricks-ai-dev-kit/marketplace.json/plugin install databricks-solutions-databricks-ai-dev-kit@cpd-databricks-solutions-databricks-ai-dev-kitThis skill inherits all available tools. When active, it can use any tool Claude has access to.
references/authentication-patterns.mdreferences/error-handling.mdreferences/partitioning-patterns.mdreferences/production-patterns.mdreferences/streaming-patterns.mdreferences/testing-patterns.mdreferences/type-conversion.mdBuild custom Python data sources for Apache Spark 4.0+ to read from and write to external systems in batch and streaming modes.
Use when building Spark connectors for external systems that lack native support:
Triggers: "build Spark data source", "create Spark connector", "implement Spark reader/writer", "connect Spark to [system]", "streaming data source"
You are an experienced Spark developer building custom Python data sources following the PySpark DataSource API. Follow these principles and patterns:
Each data source follows a flat, single-level inheritance structure:
DataSourceReader/DataSourceWriterDataSourceStreamReader/DataSourceStreamWriterSIMPLE over CLEVER - These are non-negotiable:
✅ REQUIRED:
❌ FORBIDDEN:
from pyspark.sql.datasource import (
DataSource, DataSourceReader, DataSourceWriter,
DataSourceStreamReader, DataSourceStreamWriter
)
# 1. DataSource class
class YourDataSource(DataSource):
@classmethod
def name(cls):
return "your-format"
def __init__(self, options):
self.options = options
def schema(self):
return self._infer_or_return_schema()
def reader(self, schema):
return YourBatchReader(self.options, schema)
def streamReader(self, schema):
return YourStreamReader(self.options, schema)
def writer(self, schema, overwrite):
return YourBatchWriter(self.options, schema)
def streamWriter(self, schema, overwrite):
return YourStreamWriter(self.options, schema)
# 2. Base Writer with shared logic
class YourWriter:
def __init__(self, options, schema=None):
# Validate required options
self.url = options.get("url")
assert self.url, "url is required"
self.batch_size = int(options.get("batch_size", "50"))
self.schema = schema
def write(self, iterator):
# Import libraries here for partition execution
import requests
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
msgs = []
cnt = 0
for row in iterator:
cnt += 1
msgs.append(row.asDict())
if len(msgs) >= self.batch_size:
self._send_batch(msgs)
msgs = []
if msgs:
self._send_batch(msgs)
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def _send_batch(self, msgs):
# Implement send logic
pass
# 3. Batch Writer
class YourBatchWriter(YourWriter, DataSourceWriter):
pass
# 4. Stream Writer
class YourStreamWriter(YourWriter, DataSourceStreamWriter):
def commit(self, messages, batchId):
pass
def abort(self, messages, batchId):
pass
# 5. Base Reader with partitioning
class YourReader:
def __init__(self, options, schema):
self.url = options.get("url")
assert self.url, "url is required"
self.schema = schema
def partitions(self):
# Return list of partitions for parallel reading
return [YourPartition(0, start, end)]
def read(self, partition):
# Import here for executor execution
import requests
response = requests.get(f"{self.url}?start={partition.start}")
for item in response.json():
yield tuple(item.values())
# 6. Batch Reader
class YourBatchReader(YourReader, DataSourceReader):
pass
# 7. Stream Reader
class YourStreamReader(YourReader, DataSourceStreamReader):
def initialOffset(self):
return {"offset": "0"}
def latestOffset(self):
return {"offset": str(self._get_latest())}
def partitions(self, start, end):
return [YourPartition(0, start["offset"], end["offset"])]
def commit(self, end):
pass
# Create project
poetry new your-datasource
cd your-datasource
poetry add pyspark pytest pytest-spark
# Development commands - CRITICAL: Always use 'poetry run'
poetry run pytest # Run tests
poetry run ruff check src/ # Lint
poetry run ruff format src/ # Format
poetry build # Build wheel
# Register
from your_package import YourDataSource
spark.dataSource.register(YourDataSource)
# Batch read
df = spark.read.format("your-format").option("url", "...").load()
# Batch write
df.write.format("your-format").option("url", "...").save()
# Streaming read
df = spark.readStream.format("your-format").option("url", "...").load()
# Streaming write
df.writeStream.format("your-format").option("url", "...").start()
Partitioning Strategy: Choose based on data source characteristics
Authentication: Support multiple methods in priority order
Type Conversion: Map between Spark and external types
Streaming Offsets: Design for exactly-once semantics
Error Handling: Implement retries and resilience
import pytest
from unittest.mock import patch, Mock
@pytest.fixture
def spark():
from pyspark.sql import SparkSession
return SparkSession.builder.master("local[2]").getOrCreate()
def test_data_source_name():
assert YourDataSource.name() == "your-format"
def test_writer_sends_data(spark):
with patch('requests.post') as mock_post:
mock_post.return_value = Mock(status_code=200)
df = spark.createDataFrame([(1, "test")], ["id", "value"])
df.write.format("your-format").option("url", "http://api").save()
assert mock_post.called
Before implementing, ask:
python command directly (always use poetry run)Study these for real-world patterns:
Create a Spark data source for reading from MongoDB with sharding support
Build a streaming connector for RabbitMQ with at-least-once delivery
Implement a batch writer for Snowflake with staged uploads
Write a data source for REST API with OAuth2 authentication and pagination
Activates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.