From databricks-skills
Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation.
npx claudepluginhub databricks/databricks-agent-skillsThis skill uses the workspace's default tool permissions.
**FIRST**: Use the parent `databricks-core` skill for CLI basics, authentication, profile selection, and data discovery commands.
agents/openai.yamlassets/databricks.pngassets/databricks.svgreferences/auto-cdc-python.mdreferences/auto-cdc-sql.mdreferences/auto-cdc.mdreferences/auto-loader-python.mdreferences/auto-loader-sql.mdreferences/auto-loader.mdreferences/expectations-python.mdreferences/expectations-sql.mdreferences/expectations.mdreferences/foreach-batch-sink-python.mdreferences/foreach-batch-sink.mdreferences/materialized-view-python.mdreferences/materialized-view-sql.mdreferences/materialized-view.mdreferences/options-avro.mdreferences/options-csv.mdreferences/options-json.mdCreates, configures, and updates Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles data ingestion via streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader.
Builds scalable data pipelines, modern data warehouses, and real-time streaming architectures using Spark, dbt, Airflow, Kafka, and cloud platforms like Snowflake, BigQuery.
Builds Databricks Delta Lake ETL pipelines using medallion architecture, Auto Loader for incremental ingestion, and MERGE INTO for Silver layer upserts.
Share bugs, ideas, or general feedback.
FIRST: Use the parent databricks-core skill for CLI basics, authentication, profile selection, and data discovery commands.
Use this tree to determine which dataset type and features to use. Multiple features can apply to the same dataset — e.g., a Streaming Table can use Auto Loader for ingestion, Append Flows for fan-in, and Expectations for data quality. Choose the dataset type first, then layer on applicable features.
User request → What kind of output?
├── Intermediate/reusable logic (not persisted) → Temporary View
│ ├── Preprocessing/filtering before Auto CDC → Temporary View feeding CDC flow
│ ├── Shared intermediate streaming logic reused by multiple downstream tables
│ ├── Pipeline-private helper logic (not published to catalog)
│ └── Published to UC for external queries → Persistent View (SQL only)
├── Persisted dataset
│ ├── Source is streaming/incremental/continuously growing → Streaming Table
│ │ ├── File ingestion (cloud storage, Volumes) → Auto Loader
│ │ ├── Message bus (Kafka, Kinesis, Pub/Sub, Pulsar, Event Hubs) → streaming source read
│ │ ├── Existing streaming/Delta table → streaming read from table
│ │ ├── CDC / upserts / track changes / keep latest per key / SCD Type 1 or 2 → Auto CDC
│ │ ├── Multiple sources into one table → Append Flows (NOT union)
│ │ ├── Historical backfill + live stream → one-time Append Flow + regular flow
│ │ └── Windowed aggregation with watermark → stateful streaming
│ └── Source is batch/historical/full scan → Materialized View
│ ├── Aggregation/join across full dataset (GROUP BY, SUM, COUNT, etc.)
│ ├── Gold layer aggregation from streaming table → MV with batch read (spark.read / no STREAM)
│ ├── JDBC/Federation/external batch sources
│ └── Small static file load (reference data, no streaming read)
├── Output to external system (Python only) → Sink
│ ├── Existing external table not managed by this pipeline → Sink with format="delta"
│ │ (prefer fully-qualified dataset names if the pipeline should own the table — see Publishing Modes)
│ ├── Kafka / Event Hubs → Sink with format="kafka" + @dp.append_flow(target="sink_name")
│ ├── Custom destination not natively supported → Sink with custom format
│ ├── Custom merge/upsert logic per batch → ForEachBatch Sink (Public Preview)
│ └── Multiple destinations per batch → ForEachBatch Sink (Public Preview)
└── Data quality constraints → Expectations (on any dataset type)
spark.read.table / SELECT FROM without STREAM), NOT a Streaming Table. This is the correct pattern for Gold layer aggregation.STREAM(view_name). Python: use spark.readStream.table("view_name").private=True / CREATE PRIVATE ...) when the computation is expensive and materializing once would save significant reprocessing.OR REFRESH → Prefer CREATE OR REFRESH over bare CREATE for SQL dataset definitions. Both work identically, but OR REFRESH is the idiomatic convention. For PRIVATE datasets: CREATE OR REFRESH PRIVATE STREAMING TABLE / CREATE OR REFRESH PRIVATE MATERIALIZED VIEW.value column is mandatory. Use to_json(struct(*)) AS value to serialize the entire row as JSON. Read the sink skill for details.SEQUENCE BY STRUCT(col1, col2). Python: sequence_by=struct("col1", "col2"). Read the auto-cdc skill for details.APPLY AS TRUNCATE WHEN condition. Python: apply_as_truncates=expr("condition"). Do NOT say truncate is unsupported.Pipelines use a default catalog and schema configured in the pipeline settings. All datasets are published there unless overridden.
catalog.schema.table in the dataset name to write to a different catalog/schema than the pipeline default. The pipeline creates the dataset there directly — no Sink needed.MANDATORY: Before implementing, editing, or suggesting any code for a feature, you MUST read the linked reference file for that feature. NO exceptions — always look up the reference before writing code.
Some features require reading multiple skills together:
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|---|
| Streaming Table | @dp.table() returning streaming DF | @dlt.table() returning streaming DF | CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING LIVE TABLE | streaming-table-python | streaming-table-sql |
| Materialized View | @dp.materialized_view() | @dlt.table() returning batch DF | CREATE OR REFRESH MATERIALIZED VIEW | CREATE LIVE TABLE (batch) | materialized-view-python | materialized-view-sql |
| Temporary View | @dp.temporary_view() | @dlt.view(), @dp.view() | CREATE TEMPORARY VIEW | CREATE TEMPORARY LIVE VIEW | temporary-view-python | temporary-view-sql |
| Persistent View (UC) | N/A — SQL only | — | CREATE VIEW | — | — | view-sql |
| Streaming Table (explicit) | dp.create_streaming_table() | dlt.create_streaming_table() | CREATE OR REFRESH STREAMING TABLE (no AS) | — | streaming-table-python | streaming-table-sql |
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|---|
| Append Flow | @dp.append_flow() | @dlt.append_flow() | CREATE FLOW ... INSERT INTO | — | streaming-table-python | streaming-table-sql |
| Backfill Flow | @dp.append_flow(once=True) | @dlt.append_flow(once=True) | CREATE FLOW ... INSERT INTO ... ONCE | — | streaming-table-python | streaming-table-sql |
| Sink (Delta/Kafka/EH/custom) | dp.create_sink() | dlt.create_sink() | N/A — Python only | — | sink-python | — |
| ForEachBatch Sink | @dp.foreach_batch_sink() | — | N/A — Python only | — | foreach-batch-sink-python | — |
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|---|
| Auto CDC (streaming source) | dp.create_auto_cdc_flow() | dlt.apply_changes(), dp.apply_changes() | AUTO CDC INTO ... FROM STREAM | APPLY CHANGES INTO ... FROM STREAM | auto-cdc-python | auto-cdc-sql |
| Auto CDC (periodic snapshot) | dp.create_auto_cdc_from_snapshot_flow() | dlt.apply_changes_from_snapshot() | N/A — Python only | — | auto-cdc-python | — |
| Feature | Python (current) | Python (deprecated) | SQL (current) | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|
| Expect (warn) | @dp.expect() | @dlt.expect() | CONSTRAINT ... EXPECT (...) | expectations-python | expectations-sql |
| Expect or drop | @dp.expect_or_drop() | @dlt.expect_or_drop() | CONSTRAINT ... EXPECT (...) ON VIOLATION DROP ROW | expectations-python | expectations-sql |
| Expect or fail | @dp.expect_or_fail() | @dlt.expect_or_fail() | CONSTRAINT ... EXPECT (...) ON VIOLATION FAIL UPDATE | expectations-python | expectations-sql |
| Expect all (warn) | @dp.expect_all({}) | @dlt.expect_all({}) | Multiple CONSTRAINT clauses | expectations-python | expectations-sql |
| Expect all or drop | @dp.expect_all_or_drop({}) | @dlt.expect_all_or_drop({}) | Multiple constraints with DROP ROW | expectations-python | expectations-sql |
| Expect all or fail | @dp.expect_all_or_fail({}) | @dlt.expect_all_or_fail({}) | Multiple constraints with FAIL UPDATE | expectations-python | expectations-sql |
| Feature | Python (current) | Python (deprecated) | SQL (current) | SQL (deprecated) | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|---|
| Batch read (pipeline dataset) | spark.read.table("name") | dp.read("name"), dlt.read("name") | SELECT ... FROM name | SELECT ... FROM LIVE.name | — | — |
| Streaming read (pipeline dataset) | spark.readStream.table("name") | dp.read_stream("name"), dlt.read_stream("name") | SELECT ... FROM STREAM name | SELECT ... FROM STREAM LIVE.name | — | — |
| Auto Loader (cloud files) | spark.readStream.format("cloudFiles") | — | STREAM read_files(...) | — | auto-loader-python | auto-loader-sql |
| Kafka source | spark.readStream.format("kafka") | — | STREAM read_kafka(...) | — | — | — |
| Kinesis source | spark.readStream.format("kinesis") | — | STREAM read_kinesis(...) | — | — | — |
| Pub/Sub source | spark.readStream.format("pubsub") | — | STREAM read_pubsub(...) | — | — | — |
| Pulsar source | spark.readStream.format("pulsar") | — | STREAM read_pulsar(...) | — | — | — |
| Event Hubs source | spark.readStream.format("kafka") + EH config | — | STREAM read_kafka(...) + EH config | — | — | — |
| JDBC / Lakehouse Federation | spark.read.format("postgresql") etc. | — | Direct table ref via federation catalog | — | — | — |
| Custom data source | spark.read[Stream].format("custom") | — | N/A — Python only | — | — | — |
| Static file read (batch) | spark.read.format("json"|"csv"|...).load() | — | read_files(...) (no STREAM) | — | — | — |
| Skip upstream change commits | .option("skipChangeCommits", "true") | — | read_stream("name", skipChangeCommits => true) | — | streaming-table-python | streaming-table-sql |
| Feature | Python (current) | SQL (current) | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|
| Liquid clustering | cluster_by=[...] | CLUSTER BY (col1, col2) | materialized-view-python | materialized-view-sql |
| Auto liquid clustering | cluster_by_auto=True | CLUSTER BY AUTO | materialized-view-python | materialized-view-sql |
| Partition columns | partition_cols=[...] | PARTITIONED BY (col1, col2) | materialized-view-python | materialized-view-sql |
| Table properties | table_properties={...} | TBLPROPERTIES (...) | materialized-view-python | materialized-view-sql |
| Explicit schema | schema="col1 TYPE, ..." | (col1 TYPE, ...) AS | materialized-view-python | materialized-view-sql |
| Generated columns | schema="..., col TYPE GENERATED ALWAYS AS (expr)" | col TYPE GENERATED ALWAYS AS (expr) | materialized-view-python | materialized-view-sql |
| Row filter (Public Preview) | row_filter="ROW FILTER fn ON (col)" | WITH ROW FILTER fn ON (col) | materialized-view-python | materialized-view-sql |
| Column mask (Public Preview) | schema="..., col TYPE MASK fn USING COLUMNS (col2)" | col TYPE MASK fn USING COLUMNS (col2) | materialized-view-python | materialized-view-sql |
| Private dataset | private=True | CREATE PRIVATE ... | materialized-view-python | materialized-view-sql |
| Current | Deprecated | Notes |
|---|---|---|
from pyspark import pipelines as dp | import dlt | Both work. Prefer dp. Do NOT change existing dlt imports. |
spark.read.table() / spark.readStream.table() | dp.read() / dp.read_stream() / dlt.read() / dlt.read_stream() | Deprecated reads still work. Prefer spark.*. |
| — | LIVE. prefix | Fully deprecated. NEVER use. Causes errors in newer pipelines. |
| — | CREATE LIVE TABLE / CREATE LIVE VIEW | Fully deprecated. Use CREATE STREAMING TABLE / CREATE MATERIALIZED VIEW / CREATE TEMPORARY VIEW. |
Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables / DLT) is a framework for building batch and streaming data pipelines.
Use databricks bundle init with a config file to scaffold non-interactively. This creates a project in the <project_name>/ directory:
databricks bundle init lakeflow-pipelines --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') --profile <PROFILE> < /dev/null
project_name: letters, numbers, underscores onlylanguage: python or sql. Ask the user which they prefer:
After scaffolding, create CLAUDE.md and AGENTS.md in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content:
# Declarative Automation Bundles Project
This project uses Declarative Automation Bundles (formerly Databricks Asset Bundles) for deployment.
## Prerequisites
Install the Databricks CLI (>= v0.288.0) if not already installed:
- macOS: `brew tap databricks/tap && brew install databricks`
- Linux: `curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh`
- Windows: `winget install Databricks.DatabricksCLI`
Verify: `databricks -v`
## For AI Agents
Read the `databricks-core` skill for CLI basics, authentication, and deployment workflow.
Read the `databricks-pipelines` skill for pipeline-specific guidance.
If skills are not available, install them: `databricks experimental aitools install`
src/ or transformations/ foldermy-pipeline-project/
├── databricks.yml # Bundle configuration
├── resources/
│ ├── my_pipeline.pipeline.yml # Pipeline definition
│ └── my_pipeline_job.job.yml # Scheduling job (optional)
└── src/
├── my_table.py (or .sql) # One dataset per file
├── another_table.py (or .sql)
└── ...
To schedule a pipeline, add a job that triggers it in resources/<name>.job.yml:
resources:
jobs:
my_pipeline_job:
trigger:
periodic:
interval: 1
unit: DAYS
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.my_pipeline.id}
You must deploy before running. In local development, code changes only take effect after databricks bundle deploy. Always deploy before any run, dry run, or selective refresh.
databricks bundle validate --profile <profile>databricks bundle deploy -t dev --profile <profile>databricks bundle run <pipeline_name> -t dev --profile <profile>databricks pipelines get --pipeline-id <id> --profile <profile>Detailed reference guides for each pipeline API. Read the relevant guide before writing pipeline code.