From dqx
Defines DQX data quality rules for PySpark DataFrames or Delta tables using Python classes (DQRowRule, DQDatasetRule, DQForEachColRule) or YAML/JSON metadata. Supports filters, custom checks, and criticality levels.
npx claudepluginhub databrickslabs/dqx --plugin dqxThis skill uses the workspace's default tool permissions.
DQX rules come in two interchangeable forms. **Pick based on where the checks will live.**
Validates PySpark DataFrames or Delta tables against DQX quality rules using DQEngine. Appends results as columns, splits valid/invalid rows, or uses metadata rules.
Validates data quality using Great Expectations, dbt tests, and data contracts for formal rules, expectation suites, checkpoints, and CI/CD pipelines.
Implements data quality validation with Great Expectations, dbt tests, and data contracts for pipelines, rules, and team agreements.
Share bugs, ideas, or general feedback.
DQX rules come in two interchangeable forms. Pick based on where the checks will live.
DQRowRule, DQDatasetRule, DQForEachColRule) — use when checks are authored in code next to the pipeline. Static typing + IDE autocomplete.apply_checks_by_metadata* path.Every check has a criticality of error (failing row quarantined) or warn (failing row passes but flagged). Default is error.
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule, DQForEachColRule
checks = [
# row-level: one column
DQRowRule(
name="col3_is_not_null",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col3",
),
# same check across many columns
*DQForEachColRule(
columns=["col1", "col2"],
criticality="error",
check_func=check_funcs.is_not_null,
).get_rules(),
# dataset-level: uniqueness across a composite key
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["order_id", "line_item_id"],
),
]
Load into Python via yaml.safe_load(...), then pass the resulting list[dict] to any apply_checks_by_metadata* call, or save through a storage config (see dqx-storage).
- name: col3_is_not_null
criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3
- criticality: error
check:
function: is_not_null
for_each_column: [col1, col2]
- criticality: error
check:
function: is_unique
arguments:
columns: [order_id, line_item_id]
filter="col1 < 3" (class) or filter: "col1 < 3" (YAML).check_func_args=[[1, 2]]; keyword args — check_func_kwargs={"allowed": [1, 2]}.F.try_element_at(...) or dotted path (col7.field1) as the column value.user_metadata dict (e.g. {"check_type": "completeness"}) that flows into the result struct.Column as check_func. For inline SQL, use the fallback section below — only after confirming no built-in fits.is_aggr_not_greater_than, is_aggr_not_less_than, is_aggr_equal, is_aggr_not_equal; supply aggr_type (count, avg, stddev, percentile, count_distinct…), optional group_by, and limit.is_unique, with columns, nulls_distinct (bool), and optional row_filter. Not an aggregate check — no aggr_type.Full reference: https://databrickslabs.github.io/dqx/docs/reference/quality_checks.
Search check_funcs first — the built-ins cover null/empty, range, set membership, regex, referential, aggregate, uniqueness, schema, freshness, comparison, and outlier cases with typed error messages and tested edge handling. Drop down to SQL only when no built-in fits.
sql_expression — row-level SQL boolean expression. Use when one row's validity depends on its own columns.sql_query — dataset-level SQL query against {{ input_view }}. Use for cross-row aggregates, joins to reference DataFrames, or anything needing GROUP BY. Queries are validated by is_sql_query_safe() — read-only SELECT, no DDL/DML.# row-level: SQL expression evaluated per row
- name: amount_positive_or_refunded
criticality: error
check:
function: sql_expression
arguments:
expression: amount > 0 OR refunded = true
msg: amount must be positive unless refunded
# dataset-level: SQL query, joined back to rows via merge_columns
- name: order_total_matches_lines
criticality: error
check:
function: sql_query
arguments:
query: |
SELECT order_id,
SUM(line_amount) <> order_total AS condition
FROM {{ input_view }}
GROUP BY order_id, order_total
merge_columns: [order_id] # row-level: joins back per order_id
condition_column: condition # column in query output; true = fail
# omit merge_columns for dataset-level (one verdict applies to every row)
For the equivalent class form, use DQRowRule(check_func=check_funcs.sql_expression, check_func_kwargs={"expression": "..."}) or DQDatasetRule(check_func=check_funcs.sql_query, check_func_kwargs={...}).
from databricks.labs.dqx.checks_serializer import serialize_checks, deserialize_checks
checks_metadata = serialize_checks(checks) # classes → list[dict]
checks_classes = deserialize_checks(checks_metadata) # list[dict] → classes
Catch syntax errors without running the pipeline:
from databricks.labs.dqx.engine import DQEngine
status = DQEngine.validate_checks(checks) # raises / returns ValidationStatus
name — it ends up in result columns and dashboards.dqx-storage) — classes are fine for a handful, metadata scales.check_funcs.sql_expression / sql_query when a built-in covers the case — they bypass typed error messages and security guards. Search check_funcs first.check_func — it must return a Column expression only.Canonical docs: https://databrickslabs.github.io/dqx/docs/guide/quality_checks_definition.