Creates, configures, and updates Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion patterns. Use when building data pipelines, working with Delta Live Tables, ingesting streaming data, implementing change data capture, or when the user mentions SDP, LDP, DLT, Lakeflow pipelines, streaming tables, or bronze/silver/gold medallion architectures.
Creates and manages Databricks Lakeflow pipelines for streaming data, medallion architectures, and automated CDC workflows.
/plugin marketplace add https://www.claudepluginhub.com/api/plugins/databricks-solutions-databricks-ai-dev-kit/marketplace.json/plugin install databricks-solutions-databricks-ai-dev-kit@cpd-databricks-solutions-databricks-ai-dev-kitThis skill inherits all available tools. When active, it can use any tool Claude has access to.
1-ingestion-patterns.md10-mcp-approach.md2-streaming-patterns.md3-scd-query-patterns.md4-performance-tuning.md5-python-api.md6-dlt-migration.md7-advanced-configuration.md8-project-initialization.md9-auto_cdc.mdIMPORTANT: If this is a new pipeline (one does not already exist), see Quick Start. Be sure to use whatever language user has specified only (Python or SQL). Be sure to use Databricks Asset Bundles for new projects.
Copy this checklist and verify each item:
- [ ] Language selected: Python or SQL
- [ ] Compute type decided: serverless or classic compute
- [ ] Decide on multiple catalogs or schemas vs. all in one default schema
- [ ] Consider what should be parameterized at the pipeline level to make deployment easy.
- [ ] Consider [Multi-Schema Patterns](#multi-schema-patterns) below, ask if unclear on best choices.
- [ ] Consider [Modern Defaults](#modern-defaults) below, ask if unclear on best choices.
## Quick Start: Initialize New Pipeline Project
**RECOMMENDED**: Use `databricks pipelines init` to create production-ready Asset Bundle projects with multi-environment support.
### When to Use Bundle Initialization
Use bundle initialization for **New pipeline projects** for a professional structure from the start
Use manual workflow for:
- Quick prototyping without multi-environment needs
- Existing manual projects you want to continue
- Learning/experimentation
### Step 1: Initialize Project
I will automatically run this command when you request a new pipeline:
```bash
databricks pipelines init
Interactive Prompts:
customer_orders_pipelinemain, prod_catalog)yes for dev (each user gets their own schema), no for prodGenerated Structure:
my_pipeline/
├── databricks.yml # Multi-environment config (dev/prod)
├── resources/
│ └── *_etl.pipeline.yml # Pipeline resource definition
└── src/
└── *_etl/
├── explorations/ # Exploratory code in .ipynb
└── transformations/ # Your .sql or .py files here
Replace the example code created by the init process with custom transformation files in src/transformations/ based on provided requirements, using best practice guidance from this skill.
For Python pipelines using cloudFiles: Ask the user where to store Auto Loader schema metadata. Recommend:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas
# Deploy to workspace (dev by default)
databricks bundle deploy
# Run pipeline
databricks bundle run my_pipeline_etl
# Deploy to production
databricks bundle deploy --target prod
| Concept | Details |
|---|---|
| Names | SDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines = Lakeflow Pipelines (all interchangeable) |
| Python Import | from pyspark import pipelines as dp |
| Primary Decorators | @dp.table(), @dp.materialized_view(), @dp.temporary_view() |
| Temporary Views | @dp.temporary_view() creates in-pipeline temporary views (no catalog/schema, no cluster_by). Useful for intermediate logic before AUTO CDC or when a view needs multiple references without persistence. |
| Replaces | Delta Live Tables (DLT) with import dlt |
| Based On | Apache Spark 4.1+ (Databricks' modern data pipeline framework) |
| Docs | https://docs.databricks.com/aws/en/ldp/developer/python-dev |
Ingestion patterns: Use 1-ingestion-patterns.md when planning how to get new data into your Lakeflow pipeline —- covers file formats, batch/streaming options, and tips for incremental and full loads. (Keywords: Auto Loader, Kafka, Event Hub, Kinesis, file formats)
Streaming pipeline patterns: See 2-streaming-patterns.md for designing pipelines with streaming data sources, change data detection, triggers, and windowing. (Keywords: deduplication, windowing, stateful operations, joins)
SCD query patterns: See 3-scd-query-patterns.md for querying Slowly Changing Dimensions Type 2 history tables, including current state queries, point-in-time analysis, temporal joins, and change tracking. (Keywords: SCD Type 2 history tables, temporal joins, querying historical data)
Performance tuning: Use 4-performance-tuning.md for optimizing pipelines with Liquid Clustering, state management, and best practices for high-performance streaming workloads. (Keywords: Liquid Clustering, optimization, state management)
Python API reference: See 5-python-api.md for the modern pyspark.pipelines (dp) API reference and migration from legacy dlt API patterns. (Keywords: dp API, dlt API comparison)
DLT migration: Use 6-dlt-migration.md when migrating existing Delta Live Tables (DLT) pipelines to Spark Declarative Pipelines (SDP). (Keywords: migrating DLT pipelines to SDP)
Advanced configuration: See 7-advanced-configuration.md for advanced pipeline settings including development mode, continuous execution, notifications, Python dependencies, and custom cluster configurations. (Keywords: extra_settings parameter reference, examples)
Project initialization: Use 8-project-initialization.md for setting up new pipeline projects with databricks pipelines init, Asset Bundles, multi-environment deployments, and language detection logic. (Keywords: databricks pipelines init, Asset Bundles, language detection, migration guides)
AUTO CDC patterns: Use 9-auto_cdc.md for implementing Change Data Capture with AUTO CDC, including Slow Changing Dimensions (SCD Type 1 and Type 2) for tracking changes and deduplication. (Keywords: AUTO CDC, Slow Changing Dimension, SCD, SCD Type 1, SCD Type 2, change data capture, deduplication)
Determine the task type:
Setting up new project? → Read 8-project-initialization.md first Creating new pipeline? → Read 1-ingestion-patterns.md Creating stream table? → Read 2-streaming-patterns.md Querying SCD history tables? → Read 3-scd-query-patterns.md Implementing AUTO CDC or SCD? → Read 9-auto_cdc.md Performance issues? → Read 4-performance-tuning.md Using Python API? → Read 5-python-api.md Migrating from DLT? → Read 6-dlt-migration.md Advanced configuration? → Read 7-advanced-configuration.md Validating? → Read validation-checklist.md
Follow the instructions in the relevant guide
Repeat for next task type
pyspark.pipelines APIBronze Layer (Raw)
_ingested_at, _source_file)Silver Layer (Validated)
Gold Layer (Business-Ready)
Typical Flow (Can vary)
Bronze: read_files() or spark.readStream.format("cloudFiles") → streaming table
Silver: read bronze → filter/clean/validate → streaming table
Gold: read silver → aggregate/denormalize → auto_cdc or materialized view
Sources:
For medallion architecture (bronze/silver/gold), two approaches work:
bronze_*.sql, silver_*.sql, gold_*.sqlbronze/orders.sql, silver/cleaned.sql, gold/summary.sqlBoth work with the transformations/** glob pattern. Choose based on preference.
See 8-project-initialization.md for complete details on bundle initialization, migration, and troubleshooting.
Create .sql or .py files in a local folder:
my_pipeline/
├── bronze/
│ ├── ingest_orders.sql # SQL (default for most cases)
│ └── ingest_events.py # Python (for complex logic)
├── silver/
│ └── clean_orders.sql
└── gold/
└── daily_summary.sql
SQL Example (bronze/ingest_orders.sql):
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);
Python Example (bronze/ingest_events.py):
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp
# Get schema location from pipeline configuration
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
IMPORTANT for Python Pipelines: When using spark.readStream.format("cloudFiles") for cloud storage ingestion, with schema inference (no schema specified), you must specify a schema location.
Always ask the user where to store Auto Loader schema metadata. Recommend:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas
Example: /Volumes/my_catalog/pipeline_metadata/orders_pipeline_metadata/schemas
Never use the source data volume - this causes permission conflicts. The schema location should be configured in the pipeline settings and accessed via spark.conf.get("schema_location_base").
Language Selection:
CRITICAL RULE: If the user explicitly mentions "Python" in their request (e.g., "Python Spark Declarative Pipeline", "Python SDP", "use Python"), ALWAYS use Python without asking. The same applies to SQL - if they say "SQL pipeline", use SQL.
See 8-project-initialization.md for detailed language detection logic.
Use asset bundles and pipeline CLI. See Quick Start and 8-project-initialization.md for complete details.
For rapid prototyping, experimentation, or when you prefer direct control without Asset Bundles, use the manual workflow with MCP tools.
Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline which handles the entire lifecycle.
IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly ask for classic, pro, advances compute or requires R language, Spark RDD APIs, or JAR libraries.
See 10-mcp-approach.md for detailed guide.
databricks pipelines init for new projects (creates Asset Bundle)bronze_*.sql, silver_*.sql, gold_*.sql in transformations/transformations/bronze/, transformations/silver/, transformations/gold/transformations/** glob pattern - choose based on team preference.sql/.py files, not notebooksDefault: Single target schema per pipeline. Each pipeline has one target catalog and schema where all tables are written.
Use one schema with table name prefixes to distinguish layers:
# All tables write to: catalog.schema.bronze_*, silver_*, gold_*
@dp.table(name="bronze_orders") # → catalog.schema.bronze_orders
@dp.table(name="silver_orders") # → catalog.schema.silver_orders
@dp.table(name="gold_summary") # → catalog.schema.gold_summary
Advantages:
Use varaiables to specific separate catalog and/or schema for different steps.
Below are Python SDP examples that source variables from pipeline configs via spark.conf.get, and use the default catalog/schema for bronze.
from pyspark import pipelines as dp
from pyspark.sql.functions import col
# Pull variables from pipeline configuration parameters
silver_schema = spark.conf.get("silver_schema") # e.g., "silver"
gold_schema = spark.conf.get("gold_schema") # e.g., "gold"
landing_schema = spark.conf.get("landing_schema") # e.g., "landing"
# Bronze → uses default catalog/schema (set to bronze in pipeline settings)
@dp.table(name="orders_bronze")
def orders_bronze():
# Read from another schema in the same default catalog
return spark.readStream.table(f"{landing_schema}.orders_raw")
# Silver → same catalog, schema from parameter
@dp.table(name=f"{silver_schema}.orders_clean")
def orders_clean():
return (spark.read.table("orders_bronze") # unqualified = default catalog/schema
.filter(col("order_id").isNotNull()))
# Gold → same catalog, schema from parameter
@dp.materialized_view(name=f"{gold_schema}.orders_by_date")
def orders_by_date():
return (spark.read.table(f"{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
from pyspark import pipelines as dp
from pyspark.sql.functions import col
# Pull variables from pipeline configuration parameters
silver_catalog = spark.conf.get("silver_catalog") # e.g., "my_catalog"
silver_schema = spark.conf.get("silver_schema") # e.g., "silver"
gold_catalog = spark.conf.get("gold_catalog") # e.g., "my_catalog"
gold_schema = spark.conf.get("gold_schema") # e.g., "gold"
landing_catalog = spark.conf.get("landing_catalog") # optional, if source is in another catalog
landing_schema = spark.conf.get("landing_schema")
# Bronze → uses default catalog/schema (set to bronze)
@dp.table(name="orders_bronze")
def orders_bronze():
# If source is in a specified catalog/schema:
return spark.readStream.table(f"{landing_catalog}.{landing_schema}.orders_raw")
# Silver → custom catalog + schema via parameters
@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean")
def orders_clean():
# Read bronze by its unqualified name (defaults), or fully qualify if preferred
return (spark.read.table("orders_bronze")
.filter(col("order_id").isNotNull()))
# Gold → custom catalog + schema via parameters
@dp.materialized_view(name=f"{gold_catalog}.{gold_schema}.orders_by_date}")
def orders_by_date():
return (spark.read.table(f"{silver_catalog}.{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
Note: The @dp.table() decorator does not currently support separate for schema= or catalog= parameters. The table parameter is a string that contains the catalog.schema.table_name, or it can leave off catalog and or schema to use the pipeilnes configured default target schema.
Modern SDP Best Practice:
spark.read.table() for batch readsspark.readStream.table() for streaming readsdp.read() or dp.read_stream() (old syntax, no longer documented)dlt.read() or dlt.read_stream() (legacy DLT API)Key Point: SDP automatically tracks table dependencies from standard Spark DataFrame operations. No special read APIs are needed.
SDP supports three levels of table name qualification:
| Level | Syntax | When to Use |
|---|---|---|
| Unqualified | spark.read.table("my_table") | Reading tables within the same pipeline's target catalog/schema (recommended) |
| Partially-qualified | spark.read.table("other_schema.my_table") | Reading from different schema in same catalog |
| Fully-qualified | spark.read.table("other_catalog.other_schema.my_table") | Reading from external catalogs/schemas |
Best practice for tables within the same pipeline. SDP resolves unqualified names to the pipeline's configured target catalog and schema. This makes code portable across environments (dev/prod).
@dp.table(name="silver_clean")
def silver_clean():
# Reads from pipeline's target catalog/schema (e.g., dev_catalog.dev_schema.bronze_raw)
return (
spark.read.table("bronze_raw")
.filter(F.col("valid") == True)
)
@dp.table(name="silver_events")
def silver_events():
# Streaming read from same pipeline's bronze_events table
return (
spark.readStream.table("bronze_events")
.withColumn("processed_at", F.current_timestamp())
)
Use spark.conf.get() to parameterize external catalog/schema references. Define parameters in pipeline configuration, then reference them at the module level.
from pyspark import pipelines as dp
from pyspark.sql import functions as F
# Get parameterized values at module level (evaluated once at pipeline start)
source_catalog = spark.conf.get("source_catalog")
source_schema = spark.conf.get("source_schema", "sales") # with default
@dp.table(name="transaction_summary")
def transaction_summary():
return (
spark.read.table(f"{source_catalog}.{source_schema}.transactions")
.groupBy("account_id")
.agg(
F.count("txn_id").alias("txn_count"),
F.sum("txn_amount").alias("account_revenue")
)
)
Configure parameters in pipeline settings:
pipeline.yml under configuration:extra_settings.configuration dict# In resources/my_pipeline.pipeline.yml
configuration:
source_catalog: "shared_catalog"
source_schema: "sales"
Use when referencing specific external tables that don't change across environments:
@dp.table(name="enriched_orders")
def enriched_orders():
# Pipeline-internal table (unqualified)
orders = spark.read.table("bronze_orders")
# External reference table (fully-qualified)
products = spark.read.table("shared_catalog.reference.products")
return orders.join(products, "product_id")
| Scenario | Recommended Approach |
|---|---|
| Reading tables created in same pipeline | Unqualified names - portable, uses target catalog/schema |
| Reading from external source that varies by environment | Pipeline parameters - configurable per deployment |
| Reading from shared/reference tables with fixed location | Fully-qualified names - explicit and clear |
| Mixed pipeline (some internal, some external) | Combine approaches - unqualified for internal, parameters for external |
| Issue | Solution |
|---|---|
| Empty output tables | Use get_table_details to verify, check upstream sources |
| Pipeline stuck INITIALIZING | Normal for serverless, wait a few minutes |
| "Column not found" | Check schemaHints match actual data |
| Streaming reads fail | For file ingestion in a streaming table, you must use the STREAM keyword with read_files: FROM STREAM read_files(...). For table streams use FROM stream(table). See read_files — Usage in streaming tables. |
| Timeout during run | Increase timeout, or use wait_for_completion=False and check status with get_pipeline |
| MV doesn't refresh | Enable row tracking on source tables |
| SCD2: query column not found | Lakeflow uses __START_AT and __END_AT (double underscore), not START_AT/END_AT. Use WHERE __END_AT IS NULL for current rows. See 3-scd-patterns.md. |
| AUTO CDC parse error at APPLY/SEQUENCE | Put APPLY AS DELETE WHEN before SEQUENCE BY. Only list columns in COLUMNS * EXCEPT (...) that exist in the source (omit _rescued_data unless bronze uses rescue data). Omit TRACK HISTORY ON * if it causes "end of input" errors; default is equivalent. See 2-streaming-patterns.md. |
| "Cannot create streaming table from batch query" | In a streaming table query, use FROM STREAM read_files(...) so read_files leverages Auto Loader; FROM read_files(...) alone is batch. See 1-ingestion-patterns.md and read_files — Usage in streaming tables. |
For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline(pipeline_id=...) which includes recent events and error details.
For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.
| Requirement | Details |
|---|---|
| Unity Catalog | Required - serverless pipelines always use UC |
| Workspace Region | Must be in serverless-enabled region |
| Serverless Terms | Must accept serverless terms of use |
| CDC Features | Requires serverless (or Pro/Advanced with classic clusters) |
| Limitation | Workaround |
|---|---|
| R language | Not supported - use classic clusters if required |
| Spark RDD APIs | Not supported - use classic clusters if required |
| JAR libraries | Not supported - use classic clusters if required |
| Maven coordinates | Not supported - use classic clusters if required |
| DBFS root access | Limited - must use Unity Catalog external locations |
| Global temp views | Not supported |
| Constraint | Details |
|---|---|
| Schema Evolution | Streaming tables require full refresh for incompatible changes |
| SQL Limitations | PIVOT clause unsupported |
| Sinks | Python only, streaming only, append flows only |
Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.
Activates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.