From lightningrod
Verifies transform pipeline output at any stage (seeds-only or full) by running transforms, inspecting quality iteratively, then scaling. Includes dataset linting before splitting.
npx claudepluginhub lightning-rod-labs/lightningrod-python-sdkThis skill uses the workspace's default tool permissions.
After `lr.transforms.run()`, inspect the returned dataset before scaling `max_questions` or moving to training.
Validates dlt pipeline-loaded schemas and data: mermaid diagrams, dashboard/MCP queries, fixes types (Decimal for money), nested structures, missing columns.
<!-- AUTO-GENERATED by export-plugins.py — DO NOT EDIT -->
Verifies ETL/ELT pipeline quality, data contracts, idempotency, and test coverage across dbt, Airflow, Dagster, and Prefect. Analyzes DAG structure, transformations, and data checks for PR reviews and audits.
Share bugs, ideas, or general feedback.
After lr.transforms.run(), inspect the returned dataset before scaling max_questions or moving to training.
Configure QuestionPipeline with the minimum stages you need: seed_generator, question_generator, labeler, context_generators, renderer, rollout_generator.
pipeline = QuestionPipeline(...)
if __name__ == "__main__":
lr_client = get_client()
cost_estimate = lr_client.transforms.estimate_cost(pipeline, max_questions=<limit>)
dataset = lr_client.transforms.run(pipeline, max_questions=<limit>, name="<project>_seeds")
Stdout includes the dataset ID. In a notebook, keep that ID in a variable for the next cell.
Use the client to download and inspect rows (prefer typed Sample objects; use flattened() only if you need a quick tabular view in pandas).
lr_client = get_client()
ds = lr_client.datasets.get(dataset_id)
rows = ds.flattened()
Then:
is_valid, label columns if present).question_text, label, reasoning, invalid_reason on a small random subset.seed_text and validation flags before adding question/label stages.Iterate: if validity is low or labels look wrong, adjust pipeline config and rerun before increasing max_questions.
max_questions (e.g. 10–50).estimate_cost for the target scale, then run the larger job.Run the dataset linter on the generated dataset before splitting or training. Linting runs server-side on the whole dataset — it catches structural issues that pipeline verification and filtering don't check (duplicate samples, missing required fields, label inconsistencies). This is useful even outside training workflows as a dataset health check.
from lightningrod import display_lint_overview, get_lint_affected_sample_ids
lint_result = lr.datasets.linter.run(dataset.id)
display_lint_overview(lint_result)
bad_ids = get_lint_affected_sample_ids(lint_result)
if bad_ids:
clean_ids = [s.id for s in dataset.samples() if s.id not in set(bad_ids)]
dataset = dataset.subset(clean_ids)