From pivot
Use when writing Pivot pipeline stages, seeing annotation errors (Dep, Out, Annotated), loader mismatches, "cannot pickle" errors, DirectoryOut validation failures, or IncrementalOut path mismatches
How this skill is triggered — by the user, by Claude, or both
Slash command
/pivot:writing-pivot-stagesThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Pivot stages are pure Python functions declaring file I/O via type annotations. The framework handles loading, saving, caching, and DAG construction.
Pivot stages are pure Python functions declaring file I/O via type annotations. The framework handles loading, saving, caching, and DAG construction.
Core principle: Annotations handle all file I/O. Functions receive pre-loaded data and return data to be saved.
import pivot # Single import — access everything via pivot.*
from typing import Annotated, TypedDict
All Pivot types are accessed via the pivot namespace:
| What | Access |
|---|---|
| Dependencies | pivot.Dep, pivot.PlaceholderDep |
| Outputs | pivot.Out, pivot.Metric, pivot.Plot, pivot.IncrementalOut, pivot.DirectoryOut |
| Loaders | pivot.loaders.CSV(), pivot.loaders.JSON(), pivot.loaders.YAML(), etc. |
| Base classes | pivot.loaders.Reader, pivot.loaders.Writer, pivot.loaders.Loader |
| Params | pivot.StageParams |
| Pipeline | pivot.Pipeline |
| Decorators | pivot.no_fingerprint |
import pivot
class MyParams(pivot.StageParams):
threshold: float = 0.5
class MyOutputs(TypedDict):
result: Annotated[pd.DataFrame, pivot.Out("output.csv", pivot.loaders.CSV())]
metrics: Annotated[dict, pivot.Metric("metrics.json")]
def my_stage(
params: MyParams,
data: Annotated[pd.DataFrame, pivot.Dep("input.csv", pivot.loaders.CSV())],
) -> MyOutputs:
filtered = data[data["score"] > params.threshold]
return {"result": filtered, "metrics": {"count": len(filtered)}}
pipeline = pivot.Pipeline("my_pipeline")
pipeline.register(my_stage, params=MyParams(threshold=0.3))
Single output: Annotate return directly instead of TypedDict:
def transform(
data: Annotated[pd.DataFrame, pivot.Dep("input.csv", pivot.loaders.CSV())],
) -> Annotated[pd.DataFrame, pivot.Out("output.csv", pivot.loaders.CSV())]:
return data.dropna()
| Base Class | Methods | Use Case |
|---|---|---|
Reader[R] | load() -> R | Read-only (dependencies) |
Writer[W] | save(data: W, ...) | Write-only (outputs) |
Loader[W, R] | Both load() and save() | Bidirectional (incremental outputs) |
Type constraints:
Dep.loader accepts Reader[R] (or Loader, which extends Reader)Out.loader accepts Writer[W] (or Loader, which extends Writer)IncrementalOut.loader requires Loader[W, R] (needs both read and write)DirectoryOut.loader accepts Writer[T]| Loader | Base | Data Type | Options | empty() |
|---|---|---|---|---|
CSV() | Loader | DataFrame | index_col, sep, dtype | Yes |
JSON() | Loader | dict/list | indent=2, empty_factory=dict | Yes |
JSONL() | Loader | list[dict] | — | Yes |
DataFrameJSONL() | Loader | DataFrame | — | Yes |
YAML() | Loader | dict/list | empty_factory=dict | Yes |
Text() | Loader | str | — | Yes |
Pickle() | Loader | Any | protocol | No |
PathOnly() | Loader | Path | — | No |
MatplotlibFigure() | Writer | Figure | dpi=150, bbox_inches, transparent | N/A |
Notes:
DataFrameJSONL() reads/writes DataFrames as JSON Lines (orient=records). Preferred over CSV() for non-trivial DataFrames (preserves types, handles nested data).empty() support are required for IncrementalOut.MatplotlibFigure is Writer[Figure] (write-only) — images can't be loaded back as Figure objects.| Type | Default Cache | Git-Tracked | Use Case |
|---|---|---|---|
Out | True | No | Standard outputs |
Metric | False | Yes | Small YAML/JSON metrics |
Plot | True | No | Visualizations |
IncrementalOut | True | No | Builds on previous run's output |
DirectoryOut | True | No | Dynamic set of files in directory |
# Variable-length list (count can change between runs)
shards: Annotated[list[pd.DataFrame], pivot.Dep(["a.csv", "b.csv"], pivot.loaders.CSV())]
# Fixed-length tuple (exact count enforced)
pair: Annotated[tuple[pd.DataFrame, pd.DataFrame], pivot.Dep(("x.csv", "y.csv"), pivot.loaders.CSV())]
Previous output restored from cache before stage runs. Use for append-only state:
class CacheOutputs(TypedDict):
cache: Annotated[dict, pivot.IncrementalOut("cache.json", pivot.loaders.JSON())]
def incremental_stage(
cache: Annotated[dict | None, pivot.IncrementalOut("cache.json", pivot.loaders.JSON())],
) -> CacheOutputs:
existing = cache or {}
existing["new_key"] = "value"
return {"cache": existing}
Rules: Same path in input and output annotations. Loader must support empty().
For dynamic file sets determined at runtime:
class TaskOutputs(TypedDict):
results: Annotated[dict[str, dict], pivot.DirectoryOut("results/", pivot.loaders.JSON())]
def process_tasks(...) -> TaskOutputs:
return {"results": {
"task_a.json": {"accuracy": 0.95},
"task_b.json": {"accuracy": 0.87},
}}
Rules:
/../), no absolute pathsDependency with no default path — must be overridden at registration:
def compare(
baseline: Annotated[pd.DataFrame, pivot.PlaceholderDep(pivot.loaders.CSV())],
) -> CompareOutputs: ...
pipeline.register(compare, dep_path_overrides={"baseline": "model_a/results.csv"})
Plots require all three parts in the annotation:
Annotated[
matplotlib.figure.Figure, # 1. Type (must be Figure, not Axes)
pivot.Plot("plots/my_plot.png", # 2. Output type (Plot, not Out)
pivot.loaders.MatplotlibFigure()) # 3. Writer (handles save/close)
]
Full example:
import matplotlib.figure
import matplotlib.pyplot as plt
import pivot
class PlotOutputs(TypedDict):
plot: Annotated[matplotlib.figure.Figure, pivot.Plot("plots/my.png", pivot.loaders.MatplotlibFigure())]
def make_plot(
data: Annotated[pd.DataFrame, pivot.Dep("input.csv", pivot.loaders.CSV())],
) -> PlotOutputs:
fig, ax = plt.subplots()
ax.plot(data["x"], data["y"])
return {"plot": fig} # Return Figure, not Axes. Framework saves and closes.
Override paths at registration time — useful for running same stage with different inputs/outputs:
# Simple override
pipeline.register(my_stage, name="my_stage@v2", out_path_overrides={"result": "v2/output.csv"})
# Variant pattern: register same function multiple times with different paths
for variant, suffix in [("current", ""), ("legacy", "_legacy")]:
pipeline.register(
merge_data,
name=f"merge_data@{variant}",
dep_path_overrides={"input": f"data/raw/input{suffix}.jsonl"},
out_path_overrides={"output": f"data/processed/output{suffix}.jsonl"},
params=MergeParams(suffix=suffix),
)
Pipeline())..) are resolved during registration — Dep("../shared/data.csv") works.pivot/)root=pivot.project.get_project_root() to share a common base:pipeline = pivot.Pipeline("my_pipeline", root=pivot.project.get_project_root())
Extend the appropriate base class. Use @dataclasses.dataclass(frozen=True) for immutability and pickling.
import dataclasses
import pathlib
import pivot
# Reader (read-only) — for Dep
@dataclasses.dataclass(frozen=True)
class ImageReader(pivot.loaders.Reader[np.ndarray]):
def load(self, path: pathlib.Path) -> np.ndarray:
from PIL import Image
return np.array(Image.open(path))
# Writer (write-only) — for Out/Plot
@dataclasses.dataclass(frozen=True)
class HTMLWriter(pivot.loaders.Writer[str]):
def save(self, data: str, path: pathlib.Path) -> None:
path.write_text(data)
# Loader (bidirectional) — for IncrementalOut or symmetric I/O
@dataclasses.dataclass(frozen=True)
class NPY(pivot.loaders.Loader[np.ndarray, np.ndarray]):
def load(self, path: pathlib.Path) -> np.ndarray:
return np.load(path)
def save(self, data: np.ndarray, path: pathlib.Path) -> None:
np.save(path, data)
def empty(self) -> np.ndarray: # Required for IncrementalOut
return np.array([])
Rules:
empty() only if used with IncrementalOutPydantic deep-copies all defaults — lists, dicts, nested models. Never use default_factory, .model_copy(), or .copy():
# WRONG — unnecessary complexity
class MyParams(pivot.StageParams):
exclude: list[str] = pydantic.Field(default_factory=list)
base_cfg: PlotConfig = pydantic.Field(default_factory=lambda: PlotConfig())
# CORRECT — Pydantic handles safely
class MyParams(pivot.StageParams):
exclude: list[str] = []
styling: dict[str, Any] = {}
percents: list[int] = [50, 80]
nested: SubModel = SubModel()
Only use pydantic.Field() when you need its features (alias, description, ge), not for defaults. When cleaning up default_factory, also strip redundant pydantic.Field() wrappers and unused pydantic imports in the same pass.
Don't add field_validator or model_validator unless strictly necessary — keep params as plain data. Field types must match what the corresponding Dep inputs provide. Don't leave placeholder fields that bypass a param pathway — wire it fully or remove it.
preprocessing = pivot.Pipeline("preprocessing")
preprocessing.register(clean_data, name="clean")
main = pivot.Pipeline("main")
main.include(preprocessing) # Deep-copies stages
main.register(train, name="train")
Behavior:
state_dir{other.name}/ to disambiguatePass data directly (annotations are bypassed):
def test_my_stage():
result = my_stage(
params=MyParams(threshold=0.5),
data=pd.DataFrame({"score": [0.3, 0.7, 0.9]}),
)
assert len(result["result"]) == 2
pd.read_csv(), to_csv(), open() in stage bodyStageParams for config onlydefault_factory for mutable defaults — Pydantic deep-copies; use = [], = {}, = Model() directlyimport pivot — then pivot.Dep, pivot.Out, pivot.loaders.CSV(), etc.pivot repro # Run entire pipeline (DAG-aware)
pivot repro my_stage # Run my_stage AND all dependencies
pivot repro --dry-run # Validate DAG without executing
pivot run my_stage # Run ONLY my_stage (no dependency resolution)
| Error | Cause | Fix |
|---|---|---|
cannot pickle | Closure/lambda as stage | Move to module-level function |
PlaceholderDep requires override | Missing path | Add dep_path_overrides |
IncrementalOut path mismatch | Input/output paths differ | Use same path in both annotations |
DirectoryOut path must end with '/' | Missing trailing slash | Add / to path |
DirectoryOut key must have extension | Key like "task_a" | Use "task_a.json" |
loader is required | Out("file.json") without loader | Add loader: pivot.Out("file.json", pivot.loaders.JSON()) |
TypedDict field missing Out annotation | Field without Out/Metric/Plot | Add annotation to all fields |
stage 'X' already exists | Duplicate registration | Use distinct name= at registration. Note: include() auto-prefixes on collision ({pipeline.name}/stage) |
resolves outside base directory | Output path escapes project root | Keep output paths within project |
import pivot (not from pivot.outputs import ...)StageParamsdefault_factory in StageParamsfield_validator/model_validator in StageParams unless strictly neededpivot repro --dry-run to validate DAGnpx claudepluginhub sjawhar/pivot --plugin pivotAdds a new stage to existing Calkit pipelines using `calkit xr` or YAML. Supports Python scripts, Jupyter notebooks, shell commands, R/Julia/MATLAB; wires I/O, environments, git/DVC storage.
Declares ML pipelines as skrub DataOps graphs instead of bare sklearn Pipelines, using .skb.apply_func for stateless steps and .skb.apply for stateful estimators. Activates when writing or editing pipeline declarations from data source to predictor.
Creates bauplan data pipeline projects with SQL and Python models for DAG transformations, new pipelines, model writing, and project setup from scratch.