From bauplan
Generates data quality check code for bauplan pipelines (expectations.py with @bauplan.expectation) and ingestion workflows (validate_import() in WAP scripts). Uses Polars, never Pandas.
npx claudepluginhub bauplanlabs/bauplan-skills --plugin bauplanThis skill is limited to using the following tools:
This skill writes data quality check code. It produces one of two things:
Validates data quality using Great Expectations, dbt tests, and data contracts for formal rules, expectation suites, checkpoints, and CI/CD pipelines.
Implements data quality validation with Great Expectations, dbt tests, and data contracts for pipelines, rules, and team agreements.
Safely ingests S3 data into Bauplan lakehouse using temporary branches, quality checks, and Write-Audit-Publish pattern before atomic merge to main. For validated parquet/csv/jsonl imports.
Share bugs, ideas, or general feedback.
This skill writes data quality check code. It produces one of two things:
expectations.py file using @bauplan.expectation() that runs as part of bauplan run.validate_import() function using the bauplan SDK, embedded in a WAP script between import_data() and merge_branch().Output is always working code. Not reports, not profiling summaries, not markdown.
This skill is invoked by the bauplan-data-pipeline and bauplan-safe-ingestion workflow skills when they determine that quality checks are needed. It can also be invoked directly by the user. Either way, the skill needs to know three things before it can write code:
namespace.table_name and the ref to validate againstmodels.py exists that consumes the table. Read it, derive checks, propose them to the user for confirmation, then write code.If the skill is invoked without enough information, ask for what's missing. But ask for specifics — "which columns and what properties?" — not for a general description of the pipeline's purpose.
NEVER run checks or pipelines on
main. All validation targets a development or import branch.
Branch naming convention: <username>.<branch_name>. Get your username with bauplan info.
Before writing any Python, check whether the project uses uv (look for pyproject.toml or uv.lock). If so, use uv run python to execute scripts and uv add to install packages. Otherwise, use the system python and pip install.
Ensure the required packages are installed:
bauplan (the Bauplan Python SDK — required)polars (if custom expectations need DataFrame operations — zero-copy Arrow interop)Do not use pandas. Bauplan's client.query() returns a PyArrow table directly — no .to_arrow() call needed. In pipeline expectations, model inputs arrive as Arrow tables too. Polars reads Arrow natively with zero-copy (pl.from_arrow(table)). Pandas requires a full data copy and is slower.
This section is the foundation. It applies to both pipeline expectations and ingestion validation — the thinking is identical, only the code form differs.
When the user provides explicit specifications, much of this thinking is already done. Translate their specs to code, using the methodology below to fill in gaps (e.g., if they specify a check but not its severity).
When deriving checks from pipeline code, this methodology is the primary tool.
Profiling is exploratory — you compute statistics to learn what the data looks like. You don't know the shape yet. Use the bauplan-explore-data skill for this.
Testing is confirmatory — you know what the data should look like and you verify that it does. Every check encodes a specific expectation that can pass or fail.
The workflow is: profile once to understand the data, form assumptions from what you learn, then encode those assumptions as tests using this skill. If the user hasn't profiled the data and can't state what they expect, they're not ready for this skill yet — point them to bauplan-explore-data first.
For each column you check, state why you're checking it and what you expect. If you can't state why, drop the check. Ten focused checks with clear assertions beat a hundred random queries nobody reads.
The anti-pattern: running null counts, uniqueness checks, and range queries on every column in a table because they exist. This produces numbers, not insights. Nobody acts on "column X has 3.2% nulls" unless they know whether 3.2% is acceptable.
Every check encodes a hypothesis. Before writing any expectation or validation query, state it:
"I expect [column] to be [property] because [consumer/reason], and if it fails it should [halt/warn] because [impact]."
Examples:
order_id to have no nulls because the billing pipeline joins on it, and if it fails it should halt because every downstream table breaks."event_time to be within the last 24 hours because the dashboard shows daily metrics, and if it fails it should warn because stale data is misleading but not corrupting."price to be positive because the revenue model sums it, and if it fails it should halt because negative prices produce wrong totals."If you cannot fill in this template for a check, you do not have enough context to write it. Ask the user, inspect the schema, or read the downstream models.
When the user provides specifications directly, the "because" clause may be implicit. That's fine — they've done the reasoning. But if they haven't specified severity (FAIL vs WARN), use the template to figure it out.
When a models.py exists, read it to find bauplan.Model() references. These tell you exactly which columns matter and how they're used:
columns parameter → those columns are needed downstream. Check completeness (no nulls on critical ones).filter expressions → the model assumes data matching this condition. Check that the assumption holds (e.g., filter="total > 0" → check for non-positive values).bauplan.Model() inputs joined on a key) → the join column needs uniqueness in the parent table and no nulls in both.Example — given this model:
data=bauplan.Model('orders', columns=['order_id', 'total', 'customer_id'], filter="total > 0")
Derive:
order_id: selected column, likely a key → no nulls (FAIL), unique (FAIL)total: filtered to positive → no nulls (FAIL), all values > 0 (FAIL)customer_id: selected column → no nulls (FAIL); if joined to a customers table → unique in that tableAlways propose derived checks to the user for confirmation before writing code. State each as a plain-language assumption and let the user approve, modify, or remove.
Six dimensions, each producing a specific kind of assertion:
Completeness — are required values present?
expect_column_no_nulls, expect_column_some_null, expect_column_all_nullUniqueness — are identifiers actually unique?
expect_column_all_unique, expect_column_not_uniqueValidity — do values conform to expected format and range?
expect_column_accepted_values, expect_column_mean_greater_than, expect_column_mean_smaller_than, expect_column_mean_greater_or_equal_than, expect_column_mean_smaller_or_equal_thanFreshness — is the data current enough for its purpose?
Consistency — do related values agree?
expect_column_equal_concatenation; custom for most casesVolume — is the amount of data within expected bounds?
Not every table needs all six dimensions. Pick the ones that matter for this table's consumers.
Every check must have a severity before you write the code. This determines whether it halts execution or logs a warning.
FAIL (halt the pipeline / block the merge): Downstream use is unsafe. Examples: missing primary key, zero rows, wrong schema, broken join column, negative values in a revenue column that gets summed.
In pipeline expectations: use assert.
In ingestion validation: raise an exception to prevent merge_branch().
WARN (log and continue): Quality is degraded but not catastrophic. Examples: higher-than-usual null rate in a non-critical column, data is 2 hours stale against a 24-hour SLA, unexpected but non-breaking extra columns.
In pipeline expectations: print the result, do not assert. In ingestion validation: log the warning, let the user decide whether to merge.
If you cannot classify a check, you do not understand its impact yet. Go back to the consumer analysis.
Every check must run against a specific branch and ref. Never check "whatever main looks like right now."
bauplan run handles this — the run executes against the checked-out branch.client.query() calls use ref=branch_name explicitly.This ensures checks are reproducible. If a check fails, you can go back to exactly that data state and investigate.
my-pipeline/
bauplan_project.yml
models.py
expectations.py ← quality checks live here
Expectations are Python functions in expectations.py in the pipeline project directory, alongside models.py.
An expectation is a function decorated with @bauplan.expectation() that takes one or more model outputs as input via bauplan.Model() and returns a boolean.
import bauplan
@bauplan.expectation()
@bauplan.python('3.11')
def test_no_null_order_ids(data=bauplan.Model('clean_orders')):
"""order_id must not be null — billing pipeline joins on it."""
from bauplan.standard_expectations import expect_column_no_nulls
result = expect_column_no_nulls(data, 'order_id')
assert result, 'order_id contains null values'
return result
Key mechanics:
bauplan run, after the model they depend on completes.True = pass, False = fail.assert makes the failure halt the pipeline. Without assert, the result is logged but execution continues.--strict mode (bauplan run --strict) makes all expectation failures halt the run immediately.The built-in library provides vectorized, SIMD-optimized checks. Always prefer these over hand-rolled equivalents — they are faster, more memory-efficient, and easier to maintain.
Each function takes an Arrow table and returns a boolean:
| Function | Dimension | What it checks |
|---|---|---|
expect_column_no_nulls(table, col) | Completeness | Column has zero null values |
expect_column_some_null(table, col) | Completeness | Column has at least one null |
expect_column_all_null(table, col) | Completeness | Column is entirely null |
expect_column_all_unique(table, col) | Uniqueness | All values are distinct |
expect_column_not_unique(table, col) | Uniqueness | Column has at least one duplicate |
expect_column_accepted_values(table, col, values) | Validity | All values are in allowed set |
expect_column_mean_greater_than(table, col, val) | Validity | Mean exceeds threshold |
expect_column_mean_smaller_than(table, col, val) | Validity | Mean below threshold |
expect_column_mean_greater_or_equal_than(table, col, val) | Validity | Mean >= threshold |
expect_column_mean_smaller_or_equal_than(table, col, val) | Validity | Mean <= threshold |
expect_column_equal_concatenation(table, target, cols, sep) | Consistency | Column equals concatenation of others |
Import them inside the function body, not at module level:
@bauplan.expectation()
@bauplan.python('3.11')
def test_valid_event_types(data=bauplan.Model('staging')):
"""event_type must be one of the known types — downstream filters depend on it."""
from bauplan.standard_expectations import expect_column_accepted_values
result = expect_column_accepted_values(
data, 'event_type', ['view', 'cart', 'purchase', 'remove']
)
assert result, 'event_type contains unexpected values'
return result
For checks not covered by the built-in library — freshness, volume, cross-column logic, referential integrity — write custom expectations using the Arrow table directly or with Polars/DuckDB.
Freshness check:
@bauplan.expectation()
@bauplan.python('3.11', pip={'polars': '1.15.0'})
def test_data_freshness(data=bauplan.Model('daily_summary', columns=['date'])):
"""Most recent date must be within 2 days of today — dashboard shows daily metrics."""
import polars as pl
from datetime import datetime, timedelta
df = pl.from_arrow(data)
max_date = df.select(pl.col('date').max()).item()
threshold = datetime.now() - timedelta(days=2)
is_fresh = max_date >= threshold
assert is_fresh, f'Data is stale: most recent date is {max_date}'
return is_fresh
Volume check:
@bauplan.expectation()
@bauplan.python('3.11')
def test_minimum_row_count(data=bauplan.Model('staging')):
"""Table must have at least 1000 rows — fewer indicates a broken upstream source."""
row_count = data.num_rows
is_sufficient = row_count >= 1000
assert is_sufficient, f'Only {row_count} rows — expected at least 1000'
return is_sufficient
Cross-column consistency:
@bauplan.expectation()
@bauplan.python('3.11', pip={'polars': '1.15.0'})
def test_dates_ordered(
data=bauplan.Model('trips', columns=['pickup_datetime', 'dropoff_datetime'])
):
"""dropoff must be after pickup — time travel model breaks on reversed trips."""
import polars as pl
df = pl.from_arrow(data)
violations = df.filter(pl.col('dropoff_datetime') < pl.col('pickup_datetime'))
is_valid = violations.height == 0
assert is_valid, f'{violations.height} rows have dropoff before pickup'
return is_valid
The columns parameter in @bauplan.model() provides lightweight structural schema enforcement. It validates that the model's output contains exactly the declared columns.
@bauplan.model(
columns=['order_id', 'customer_id', 'total', 'order_date'],
materialization_strategy='REPLACE'
)
@bauplan.python('3.11')
def clean_orders(data=bauplan.Model('raw_orders')):
...
This is not statistical quality checking — it catches schema drift, dropped columns, and structural mismatches. Use it on every model. It complements expectations but does not replace them.
In strict mode (bauplan run --strict), column mismatches fail the run immediately.
# Validate DAG, schemas, and expectations without materializing
bauplan run --dry-run --strict
# Execute pipeline with blocking expectations
bauplan run --strict
After a run, expectation results appear in the run output. Failed expectations show the assertion message. Use bauplan job logs <job_id> to review results from a previous run.
Validation logic is embedded directly in the WAP (Write-Audit-Publish) script, between import_data() and the merge decision. No separate file, no decorators — SDK calls and assertions in Python.
# === IMPORT PHASE ===
client.import_data(table=table_name, search_uri=s3_path, branch=branch_name, ...)
# === VALIDATION PHASE === ← checks go here
validate_import(client, table_name, branch_name, namespace)
# === MERGE PHASE ===
client.merge_branch(source_ref=branch_name, into_branch="main")
If the user already has a working ingestion script and wants to add or strengthen validation:
import_data() and merge_branch().validate_import() function using either the user's specifications or checks derived from pipeline code.validate_import().Do not rewrite the import or merge phases. Only touch the validation logic.
Write a validation function that runs all checks and raises on failure:
def validate_import(client, table_name, branch, namespace="bauplan"):
fq_table = f"{namespace}.{table_name}"
# FAIL checks — assert to block merge
result = client.query(f"SELECT COUNT(*) as n FROM {fq_table}", ref=branch)
row_count = result.column("n")[0].as_py()
assert row_count > 0, f"{fq_table} has 0 rows after import"
# WARN checks — print, don't assert
result = client.query(f"SELECT MIN(total) as lo FROM {fq_table}", ref=branch)
if result.column("lo")[0].as_py() < 0:
print(f"⚠ negative totals found")
# ... see examples/ingestion_validation.py for complete implementation
| Aspect | Pipeline (expectations.py) | Ingestion (WAP script) |
|---|---|---|
| Decorator | @bauplan.expectation() | None — plain Python |
| Input | Arrow table via bauplan.Model() | SDK queries via client.query() |
| Execution | Automatic during bauplan run | Called explicitly in script |
| Data access | In-memory Arrow table | SQL queries against branch |
| Failure handling | assert halts pipeline | assert prevents merge |
| Environment | Containerized per-function | Local Python process |
All queries must use ref=branch_name to pin to the import branch.
Volume:
result = client.query(f"SELECT COUNT(*) as n FROM {fq_table}", ref=branch)
row_count = result.column("n")[0].as_py()
Schema:
table_meta = client.get_table(table=table_name, ref=branch, namespace=namespace)
actual_columns = {f.name for f in table_meta.fields}
actual_types = {f.name: f.type for f in table_meta.fields}
Completeness (null count):
result = client.query(
f"SELECT COUNT(*) - COUNT({col}) as nulls FROM {fq_table}", ref=branch
)
Uniqueness (duplicate count):
result = client.query(
f"SELECT COUNT({col}) - COUNT(DISTINCT {col}) as dupes FROM {fq_table}", ref=branch
)
Validity (bounds):
result = client.query(
f"SELECT MIN({col}) as lo, MAX({col}) as hi FROM {fq_table}", ref=branch
)
Freshness:
result = client.query(
f"SELECT MAX({time_col}) as latest FROM {fq_table}", ref=branch
)
Consistency (cross-column):
result = client.query(
f"SELECT COUNT(*) as violations FROM {fq_table} WHERE {col_a} > {col_b}",
ref=branch
)
When unsure about a method signature, CLI flag, or concept, fetch the relevant doc page via WebFetch rather than guessing. Pages are markdown and LLM-friendly.
Python SDK: https://docs.bauplanlabs.com/reference/bauplan.md
Standard expectations: https://docs.bauplanlabs.com/reference/bauplan-standard-expectations.md
Relevant concept pages:
https://docs.bauplanlabs.com/concepts/expectations.mdFull doc index: https://docs.bauplanlabs.com/llms.txt
CLI: The bauplan CLI is self-documenting:
bauplan --help — lists all available commandsbauplan <command> --help — shows arguments and options for a specific command (e.g., bauplan run --help, bauplan job --help)Validating generated Python: After writing or updating expectations.py or validation code, run ruff check and ruff format to catch syntax errors and style issues, and ty to catch type errors — these verify the code compiles and the SDK calls are well-formed without executing it. Only run these if they are installed (check with which ruff / which ty).