From rest-api-pipeline
Queries and explores data loaded by dlt pipelines using Python, dlt dataset API, ReadableRelation, and ibis expressions. For table exploration, row counts, and ad-hoc reports.
How this skill is triggered — by the user, by Claude, or both
Slash command
/rest-api-pipeline:view-dataThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Query data loaded by a dlt pipeline using Python. Use in standalone scripts, inline code, or as the data access layer for reports.
Query data loaded by a dlt pipeline using Python. Use in standalone scripts, inline code, or as the data access layer for reports.
Parse $ARGUMENTS:
pipeline-name (optional): the dlt pipeline name. If omitted, infer from session context. If ambiguous, ask the user and stop.hints (optional, after --): additional requirements or focus areas (e.g., -- show top users by spend)Tell the user to run Workspace Dashboard if no precise query or instructions were give, this assumes user wants to just look at the data. Otherwise
dlt pipeline <pipeline_name> show
This opens a browser with table schemas, row counts, and sample data.
Essential Reading:
https://dlthub.com/docs/general-usage/dataset-access/dataset.mdhttps://dlthub.com/docs/general-usage/dataset-access/ibis-backend.mdUse pipeline.dataset() to access loaded data. This is destination agnostic — works the same on duckdb, postgres, bigquery, etc. NEVER import destination libraries (like duckdb) directly.
import dlt
pipeline = dlt.attach("<pipeline_name>")
dataset = pipeline.dataset()
Think about it as a subset of ibis with slightly different syntax.
table = dataset["my_table"]
table.head().df() # first rows as pandas
table.select("id", "name").limit(50).arrow() # select columns, arrow format
table.where("id", "in", [1, 2, 3]).df() # parametric filter
table.select("amount").max().fetchscalar() # scalar aggregate
dataset.row_counts().df() # row counts for all tables
t = dataset["my_table"].to_ibis()
expr = t.filter(t.amount > 100).group_by("category").aggregate(total=t.amount.sum())
dataset(expr).df() # execute ibis expression via dataset
Ibis is lazy, composable, and destination agnostic. Key operations:
table.group_by("col").aggregate(total=table.col.sum()) — aggregationtable.filter(table.col > 0) — filteringtable.join(other, table.id == other.parent_id) — joinstable.order_by(ibis.desc("col")) — sortingtable.mutate(new_col=table.col * 100) — computed columnstable.select("col1", "col2") — column selectionRead ibis docs: https://ibis-project.org/reference/expression-collections
dlt creates child tables for nested data (e.g., my_table__results). Join on _dlt_id / _dlt_parent_id:
parent = dataset["my_table"].to_ibis()
child = dataset["my_table__results"].to_ibis()
joined = parent.join(child, parent._dlt_id == child._dlt_parent_id)
dataset("SELECT * FROM my_table WHERE amount > 100").df()
If the user wants to create custom charts or generate insights from their data, install the data-exploration toolkit (dlt ai toolkit data-exploration install) and follow the workflow there.
npx claudepluginhub dlt-hub/dlthub-ai-workbench --plugin rest-api-pipelineConnects to dlt pipelines, profiles tables, scans schemas, plans charts with ibis and altair, and outputs analysis_plan.md artifacts for data exploration and analysis.
Validates dlt pipeline-loaded schemas and data: mermaid diagrams, dashboard/MCP queries, fixes types (Decimal for money), nested structures, missing columns.
Develops Lakeflow Spark Declarative Pipelines on Databricks for batch and streaming data pipelines using Python or SQL. Guides dataset types like Streaming Tables and features like Auto Loader, Auto CDC via decision tree.