From hamilton
Guides creation of Hamilton DAGs in Python using function-based nodes, decorators, drivers, testing with pytest, and debugging dataflows.
npx claudepluginhub apache/hamilton --plugin hamiltonThis skill is limited to using the following tools:
<!-- SPDX-License-Identifier: Apache-2.0 -->
Builds Hamilton DAGs using 5-step workflow: natural language to DOT graphs, function signatures, validation, TDD implementation. Use when creating new Hamilton Python modules from scratch.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use for data pipelines, workflow orchestration, or batch jobs.
Guides authoring Apache Airflow DAGs via structured workflow: discover environment with af CLI, plan structure, implement patterns, validate syntax and test.
Share bugs, ideas, or general feedback.
Apache Hamilton is a lightweight Python framework for building Directed Acyclic Graphs (DAGs) of data transformations using declarative, function-based definitions.
Function-Based DAG Definition
Key Architecture Components
.execute() runs the DAG)Separation of Concerns
Basic Module Structure:
"""
Module docstring explaining the DAG's purpose.
"""
import pandas as pd
from hamilton.function_modifiers import extract_columns
def raw_data(data_path: str) -> pd.DataFrame:
"""Load raw data from source.
:param data_path: Path to data file (passed as input)
:return: Raw DataFrame
"""
return pd.read_csv(data_path)
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Remove null values and duplicates.
:param raw_data: Raw data from previous node
:return: Cleaned DataFrame
"""
return raw_data.dropna().drop_duplicates()
def feature_a(cleaned_data: pd.DataFrame) -> pd.Series:
"""Calculate feature A.
:param cleaned_data: Cleaned data
:return: Feature A values
"""
return cleaned_data['column_a'] * 2
Driver Setup:
from hamilton import driver
import my_functions
dr = driver.Driver({}, my_functions)
results = dr.execute(
['feature_a', 'cleaned_data'],
inputs={'data_path': 'data.csv'}
)
Best Practices:
Configuration & Polymorphism:
from hamilton.function_modifiers import config
@config.when(model_type='linear')
def predictions(features: pd.DataFrame) -> pd.Series:
"""Linear model predictions."""
from sklearn.linear_model import LinearRegression
model = LinearRegression()
return model.fit_predict(features)
@config.when(model_type='tree')
def predictions(features: pd.DataFrame) -> pd.Series:
"""Tree model predictions."""
from sklearn.tree import DecisionTreeRegressor
model = DecisionTreeRegressor()
return model.fit_predict(features)
# Use: driver.Driver({'model_type': 'linear'}, module)
Parameterization - Creating Multiple Nodes:
from hamilton.function_modifiers import parameterize
@parameterize(
rolling_7d={'window': 7},
rolling_30d={'window': 30},
rolling_90d={'window': 90},
)
def rolling_average(spend: pd.Series, window: int) -> pd.Series:
"""Calculate rolling average for different windows."""
return spend.rolling(window).mean()
# Creates 3 nodes: rolling_7d, rolling_30d, rolling_90d
Column Extraction - DataFrames to Series:
from hamilton.function_modifiers import extract_columns
@extract_columns('feature_1', 'feature_2', 'feature_3')
def features(cleaned_data: pd.DataFrame) -> pd.DataFrame:
"""Generate multiple features."""
return pd.DataFrame({
'feature_1': cleaned_data['a'] * 2,
'feature_2': cleaned_data['b'] ** 2,
'feature_3': cleaned_data['a'] + cleaned_data['b'],
})
# Creates 3 nodes: feature_1, feature_2, feature_3 (each a Series)
Data Quality Validation:
from hamilton.function_modifiers import check_output
import pandera as pa
@check_output(
data_type=float,
range=(0, 100),
importance="fail"
)
def revenue_percentage(revenue: float, total: float) -> float:
"""Calculate revenue as percentage."""
return (revenue / total) * 100
# With Pandera schemas
@check_output(
schema=pa.SeriesSchema(float, pa.Check.greater_than(0)),
importance="fail"
)
def positive_values(data: pd.Series) -> pd.Series:
"""Ensure all values are positive."""
return data
I/O Materialization:
from hamilton.function_modifiers import save_to, load_from
from hamilton.io.materialization import to
@save_to(to.csv(path="output.csv"))
def final_results(aggregated_data: pd.DataFrame) -> pd.DataFrame:
"""Save final results to CSV."""
return aggregated_data
@load_from(from_='data.parquet', reader='parquet')
def input_data() -> pd.DataFrame:
"""Load data from parquet."""
pass # Function body ignored when using @load_from
Before (Script):
import pandas as pd
df = pd.read_csv('data.csv')
df = df.dropna()
df['feature'] = df['col_a'] * 2
result = df.groupby('category')['feature'].mean()
print(result)
After (Hamilton Module):
"""Data processing DAG."""
import pandas as pd
def raw_data(data_path: str) -> pd.DataFrame:
"""Load raw data."""
return pd.read_csv(data_path)
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Remove nulls."""
return raw_data.dropna()
def feature(cleaned_data: pd.DataFrame) -> pd.Series:
"""Calculate feature."""
return cleaned_data['col_a'] * 2
def data_with_feature(cleaned_data: pd.DataFrame, feature: pd.Series) -> pd.DataFrame:
"""Add feature to dataset."""
df = cleaned_data.copy()
df['feature'] = feature
return df
def result(data_with_feature: pd.DataFrame) -> pd.Series:
"""Aggregate by category."""
return data_with_feature.groupby('category')['feature'].mean()
Conversion Guidelines:
Generate Visualization:
from hamilton import driver
import my_functions
dr = driver.Driver({}, my_functions)
# Create visualization
dr.display_all_functions('dag.png') # All nodes
dr.visualize_execution(
['final_output'],
'execution.png',
inputs={'input_data': ...}
) # Execution path only
Understanding DAG Structure:
Debugging Tips:
dr.list_available_variables() to see all nodesdr.what_is_downstream_of('node_name') for dependenciesUnit Testing Pattern:
import pytest
import pandas as pd
from my_functions import cleaned_data, feature
def test_cleaned_data():
"""Test data cleaning."""
raw = pd.DataFrame({
'col_a': [1, 2, None, 4],
'col_b': ['a', 'b', 'c', 'd']
})
result = cleaned_data(raw)
assert len(result) == 3
assert result['col_a'].isna().sum() == 0
def test_feature():
"""Test feature calculation."""
data = pd.DataFrame({'col_a': [1, 2, 3]})
result = feature(data)
pd.testing.assert_series_equal(
result,
pd.Series([2, 4, 6], name='col_a')
)
Integration Testing with Driver:
def test_full_pipeline():
"""Test complete DAG execution."""
from hamilton import driver
import my_functions
dr = driver.Driver({}, my_functions)
result = dr.execute(
['result'],
inputs={'data_path': 'test_data.csv'}
)
assert 'result' in result
assert result['result'].sum() > 0
Circular Dependencies:
# ❌ Bad - circular dependency
def a(b: int) -> int:
return b + 1
def b(a: int) -> int:
return a + 1
# ✅ Good - break the cycle
def a(input_value: int) -> int:
return input_value + 1
def b(a: int) -> int:
return a + 1
Missing Type Hints:
# ❌ Bad - no type hints
def process(data):
return data * 2
# ✅ Good - clear types
def process(data: pd.Series) -> pd.Series:
return data * 2
Mutating Inputs:
# ❌ Bad - mutates input
def add_column(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
df[col_name] = 0 # Modifies original!
return df
# ✅ Good - returns new object
def add_column(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
result = df.copy()
result[col_name] = 0
return result
hamilton/ - Main package codehamilton/driver.py - Main orchestration classhamilton/function_modifiers/ - Decoratorsexamples/ - Production examplestests/ - Unit and integration testsdocs/ - Official documentationdocs/ directory in repoexamples/ directory for patterns/hamilton-scale for async/Spark, /hamilton-llm for AI workflowsFor detailed reference material, see: