From hamilton
Builds Hamilton DAGs using 5-step workflow: natural language to DOT graphs, function signatures, validation, TDD implementation. Use when creating new Hamilton Python modules from scratch.
npx claudepluginhub apache/hamilton --plugin hamiltonThis skill is limited to using the following tools:
<!-- SPDX-License-Identifier: Apache-2.0 -->
Guides creation of Hamilton DAGs in Python using function-based nodes, decorators, drivers, testing with pytest, and debugging dataflows.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use for data pipelines, workflow orchestration, or batch jobs.
Builds agentic workflow pipelines using Graflow (Python) for multi-step processes with @task decorators, sequential/parallel composition, Studio Agent integration, CLI subprocesses, dynamic branching, and HITL flows.
Share bugs, ideas, or general feedback.
TL;DR: Build Hamilton DAGs systematically using DOT → Signatures → Validation → TDD implementation.
This workflow optimizes for:
Convert user requirements into DOT language representation
DOT language is:
Example:
User: "Load CSV data, clean nulls, calculate 7-day rolling average, output top 10"
DOT representation:
digraph hamilton_dag {
// Inputs
data_path [shape=box, color=blue]
// Pipeline (arrows show data flow: upstream -> downstream)
data_path -> raw_data
raw_data -> cleaned_data
cleaned_data -> rolling_7d
rolling_7d -> top_10
}
Best Practices:
[shape=box, color=blue]Convert DOT nodes into Python function signatures
This step is mechanical and can be automated. For each node:
Any if uncertain)Example:
"""Data processing pipeline."""
from typing import Any
import pandas as pd
def raw_data(data_path: str) -> pd.DataFrame:
"""Load raw data from CSV file.
:param data_path: Path to CSV file
:return: Raw DataFrame
"""
pass
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Remove null values and duplicates.
:param raw_data: Raw data from previous step
:return: Cleaned DataFrame
"""
pass
def rolling_7d(cleaned_data: pd.DataFrame) -> pd.Series:
"""Calculate 7-day rolling average of sales.
:param cleaned_data: Cleaned data
:return: 7-day rolling average
"""
pass
def top_10(rolling_7d: pd.Series) -> pd.Series:
"""Get top 10 days by value.
:param rolling_7d: Rolling average values
:return: Top 10 values
"""
pass
Type Annotation Strategy:
Any - you'll fix this later with MonkeyTypeAny, refine incrementallyVerify the graph structure before implementing logic
Use the Hamilton CLI to validate:
# Validate the DAG structure
hamilton build --module my_pipeline
# Visualize the graph
hamilton build --module my_pipeline --output dag.png
# List all nodes
hamilton build --module my_pipeline --list-nodes
What to check:
If validation fails:
hamilton buildImplement nodes incrementally with test-driven development
This is where the workflow shines for LLMs. Instead of writing all code at once:
Process:
Example - Node by node:
# Test for raw_data
def test_raw_data(tmp_path):
"""Test CSV loading."""
# Setup
csv_file = tmp_path / "data.csv"
csv_file.write_text("col_a,col_b\n1,2\n3,4")
# Execute
result = raw_data(str(csv_file))
# Verify
assert isinstance(result, pd.DataFrame)
assert len(result) == 2
assert list(result.columns) == ['col_a', 'col_b']
# Implementation
def raw_data(data_path: str) -> pd.DataFrame:
"""Load raw data from CSV file."""
return pd.read_csv(data_path)
# Test for cleaned_data
def test_cleaned_data():
"""Test data cleaning."""
# Setup
raw = pd.DataFrame({
'col_a': [1, 2, None, 4],
'col_b': [1, 1, 2, 2]
})
# Execute
result = cleaned_data(raw)
# Verify
assert len(result) == 3 # One null removed
assert result['col_a'].isna().sum() == 0
# Implementation
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Remove null values and duplicates."""
return raw_data.dropna().drop_duplicates()
Why this works:
Use MonkeyType or runtime inference to add accurate types
Don't spend time guessing types upfront. Instead:
Option A: Use Any everywhere initially
from typing import Any
def my_function(input_data: Any) -> Any:
"""Process data."""
# Implementation here
pass
Option B: Disable type validation in Driver
from hamilton import driver
from hamilton.lifecycle import base
# Use an adapter that doesn't validate types
class NoTypeCheckAdapter(base.BaseDo):
"""Adapter that skips type validation."""
pass
dr = driver.Driver(
{},
module,
adapter=NoTypeCheckAdapter()
)
Option C: Run MonkeyType to infer types
After implementing and testing:
# Record runtime types
monkeytype run -m pytest tests/
# Generate type stubs
monkeytype stub my_pipeline
# Apply types to source
monkeytype apply my_pipeline
MonkeyType will:
Result: Accurate types without upfront guessing!
Important: MonkeyType isn't perfect, but it's:
User Request: "Create a pipeline that loads JSON data, filters by status='active', enriches with external API data, and outputs to Parquet"
Step 1: DOT Graph
digraph pipeline {
// Inputs
json_path [shape=box, color=blue]
api_endpoint [shape=box, color=blue]
output_path [shape=box, color=blue]
// Pipeline (arrows show data flow: upstream -> downstream)
json_path -> raw_data
raw_data -> active_records
active_records -> enriched_data
api_endpoint -> enriched_data
enriched_data -> final_output
output_path -> final_output
}
Step 2: Function Signatures
"""Data enrichment pipeline."""
from typing import Any
import pandas as pd
def raw_data(json_path: str) -> pd.DataFrame:
"""Load data from JSON file.
:param json_path: Path to JSON file
:return: Raw DataFrame
"""
pass
def active_records(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Filter for active status only.
:param raw_data: Raw data
:return: Filtered DataFrame
"""
pass
def enriched_data(active_records: pd.DataFrame, api_endpoint: str) -> pd.DataFrame:
"""Enrich with external API data.
:param active_records: Active records
:param api_endpoint: API endpoint URL
:return: Enriched DataFrame
"""
pass
def final_output(enriched_data: pd.DataFrame, output_path: str) -> str:
"""Save to Parquet file.
:param enriched_data: Enriched data
:param output_path: Output file path
:return: Path to saved file
"""
pass
Step 3: Validate
hamilton build --module enrichment_pipeline
# Output: ✓ DAG is valid, 4 nodes, 0 cycles
Step 4: TDD Implementation
Test 1:
def test_raw_data(tmp_path):
json_file = tmp_path / "data.json"
json_file.write_text('[{"id": 1, "status": "active"}]')
result = raw_data(str(json_file))
assert len(result) == 1
Implementation 1:
def raw_data(json_path: str) -> pd.DataFrame:
return pd.read_json(json_path)
Test 2:
def test_active_records():
raw = pd.DataFrame([
{"id": 1, "status": "active"},
{"id": 2, "status": "inactive"}
])
result = active_records(raw)
assert len(result) == 1
assert result.iloc[0]["status"] == "active"
Implementation 2:
def active_records(raw_data: pd.DataFrame) -> pd.DataFrame:
return raw_data[raw_data["status"] == "active"]
...continue for each node...
Step 5: Add Types with MonkeyType
monkeytype run -m pytest tests/
monkeytype apply enrichment_pipeline
Result: Accurate type annotations added automatically!
Add decorators in Step 2:
from hamilton.function_modifiers import parameterize
@parameterize(
rolling_7d={'window': 7},
rolling_30d={'window': 30},
rolling_90d={'window': 90},
)
def rolling_average(data: pd.Series, window: int) -> pd.Series:
"""Calculate rolling average.
:param data: Input data
:param window: Rolling window size
:return: Rolling average
"""
pass
Update DOT graph to reflect expanded nodes:
data -> rolling_7d
data -> rolling_30d
data -> rolling_90d
Mark async in Step 2:
async def api_data(endpoint: str, record_id: int) -> dict:
"""Fetch data from API.
:param endpoint: API endpoint
:param record_id: Record ID to fetch
:return: API response data
"""
pass
Use async driver:
from hamilton import async_driver
dr = await async_driver.Builder().with_modules(module).build()
Don't write function bodies before running hamilton build. Validate structure first!
Don't implement all nodes before testing. Go node-by-node with TDD.
Don't waste time guessing types. Use Any + MonkeyType or disable validation initially.
Don't jump straight to code. The DOT graph is your blueprint - validate it first.
Don't implement the next node until the current one passes its test.
# Step 3: Validate DAG
hamilton build --module my_module
hamilton build --module my_module --output dag.png
# Step 4: Run tests
pytest tests/test_my_module.py::test_my_function -v
# Step 5: Infer types
monkeytype run -m pytest tests/
monkeytype apply my_module
# Full pipeline validation
python -c "from hamilton import driver; import my_module; driver.Driver({}, my_module)"
hamilton build before implementingThis workflow enables Claude to build complex Hamilton DAGs systematically, avoiding the pitfalls of monolithic implementation.
For additional patterns and examples, see the skill-specific guides:
/hamilton-core - Core patterns and decorators/hamilton-scale - Async, Ray, Dask, Spark/hamilton-llm - LLM and RAG workflows/hamilton-observability - Monitoring and tracking/hamilton-integrations - Airflow, FastAPI, Streamlit