From majestic-data
Performs data quality checks for completeness, uniqueness, freshness, volume, and distribution drift. Generates scorecards and HTML reports for pipelines.
npx claudepluginhub majesticlabs-dev/majestic-marketplace --plugin majestic-dataThis skill is limited to using the following tools:
**Audience:** Data engineers building quality gates for pipelines.
References data quality dimensions with qsv checks and provides remediation decision tree for tabular CSV assessment and fixes.
Provides step-by-step guidance, code, and configurations for data quality checks in pipelines covering ETL, transformations, orchestration, and streaming processing. Activates on 'data quality checker' phrases.
Checks data freshness, schema drift, null rates, orphaned records, and pipeline status across databases, Airflow DAGs, dbt models, BigQuery, and Snowflake.
Share bugs, ideas, or general feedback.
Audience: Data engineers building quality gates for pipelines.
Goal: Measure, monitor, and report on data quality dimensions.
Related skills:
data-profiler - For comprehensive data profilinganomaly-detector - For outlier detectionExecute quality functions from scripts/quality_metrics.py:
from scripts.quality_metrics import (
QualityDimension,
QualityMetric,
QualityScorecard,
calculate_completeness,
calculate_uniqueness,
check_freshness,
check_volume,
detect_distribution_drift,
generate_scorecard,
generate_html_report
)
from scripts.quality_metrics import calculate_completeness, calculate_uniqueness
# Completeness check
completeness = calculate_completeness(df, required_cols=['id', 'email', 'status'])
print(f"Completeness: {completeness.score}% - {'PASS' if completeness.passed else 'FAIL'}")
# Uniqueness check
uniqueness = calculate_uniqueness(df, key_cols=['id'])
print(f"Uniqueness: {uniqueness.score}%")
from scripts.quality_metrics import check_freshness
freshness = check_freshness(df, timestamp_col='updated_at', max_age_hours=24)
if not freshness.passed:
print(f"Data is stale: {freshness.details['age_hours']} hours old")
from scripts.quality_metrics import generate_scorecard, generate_html_report
scorecard = generate_scorecard(
df,
name="users_table",
required_cols=['id', 'email'],
key_cols=['id']
)
print(f"Overall Score: {scorecard.overall_score:.1f}%")
print(f"Status: {'PASSED' if scorecard.passed else 'FAILED'}")
# Generate HTML report
html = generate_html_report(scorecard)
from scripts.quality_metrics import detect_distribution_drift
drift = detect_distribution_drift(baseline_df['revenue'], current_df['revenue'])
if drift['drifted']:
print(f"Distribution drift detected: {drift['test']} p-value={drift['p_value']:.4f}")
| Dimension | What It Measures |
|---|---|
| Completeness | Missing values, required fields |
| Uniqueness | Duplicates in key columns |
| Validity | Format, range, pattern compliance |
| Accuracy | Correctness vs source of truth |
| Consistency | Cross-field logical rules |
| Timeliness | Data freshness, staleness |
from scipy import stats
import numpy as np
def detect_numeric_drift(
baseline: pd.Series,
current: pd.Series,
significance: float = 0.05
) -> dict:
"""Detect drift in numeric column using KS test."""
baseline_clean = baseline.dropna()
current_clean = current.dropna()
ks_stat, ks_pvalue = stats.ks_2samp(baseline_clean, current_clean)
psi = calculate_psi(baseline_clean, current_clean)
return {
'ks_statistic': ks_stat,
'ks_pvalue': ks_pvalue,
'drifted': ks_pvalue < significance,
'psi': psi,
'psi_alert': psi > 0.25,
}
def calculate_psi(baseline: pd.Series, current: pd.Series, bins: int = 10) -> float:
"""Calculate Population Stability Index."""
_, bin_edges = np.histogram(baseline, bins=bins)
baseline_counts = np.histogram(baseline, bins=bin_edges)[0]
current_counts = np.histogram(current, bins=bin_edges)[0]
baseline_pct = np.where(baseline_counts / len(baseline) == 0, 0.0001, baseline_counts / len(baseline))
current_pct = np.where(current_counts / len(current) == 0, 0.0001, current_counts / len(current))
return np.sum((current_pct - baseline_pct) * np.log(current_pct / baseline_pct))
def detect_categorical_drift(
baseline: pd.Series,
current: pd.Series,
significance: float = 0.05
) -> dict:
"""Detect drift in categorical column using chi-square."""
baseline_dist = baseline.value_counts(normalize=True)
current_dist = current.value_counts(normalize=True)
all_categories = set(baseline_dist.index) | set(current_dist.index)
baseline_aligned = [baseline_dist.get(c, 0) for c in all_categories]
current_aligned = [current_dist.get(c, 0) for c in all_categories]
chi2, pvalue = stats.chisquare(current_aligned, baseline_aligned)
return {
'chi2_statistic': chi2,
'chi2_pvalue': pvalue,
'drifted': pvalue < significance,
'new_categories': list(set(current.unique()) - set(baseline.unique())),
'missing_categories': list(set(baseline.unique()) - set(current.unique())),
}
def compare_schemas(baseline: pd.DataFrame, current: pd.DataFrame) -> dict:
baseline_cols = set(baseline.columns)
current_cols = set(current.columns)
return {
'added_columns': list(current_cols - baseline_cols),
'removed_columns': list(baseline_cols - current_cols),
'type_changes': [
{'column': col, 'baseline_type': str(baseline[col].dtype), 'current_type': str(current[col].dtype)}
for col in baseline_cols & current_cols
if baseline[col].dtype != current[col].dtype
],
}
drift_config:
psi_thresholds:
green: 0.1
yellow: 0.25
red: 0.5
volume_thresholds:
max_daily_change_pct: 30
max_weekly_change_pct: 50
null_rate_thresholds:
max_increase_pct: 5
pandas
scipy # For distribution drift detection
numpy # For PSI calculation