From hamilton
Tracks, monitors, and debugs Python dataflows using Hamilton UI and SDK. Enables DAG visualization, lineage tracking, execution history, and performance analysis.
npx claudepluginhub apache/hamilton --plugin hamiltonThis skill is limited to using the following tools:
<!-- SPDX-License-Identifier: Apache-2.0 -->
Provides integration patterns for Hamilton dataflows with Airflow, Dagster, FastAPI, Streamlit, Jupyter notebooks, Flask, and more. Use for orchestration, APIs, dashboards, and experiment tracking.
Guides Dagu Web UI usage for managing workflows, monitoring real-time executions, viewing history/logs, DAG visualization, and troubleshooting via browser.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use for data pipelines, workflow orchestration, or batch jobs.
Share bugs, ideas, or general feedback.
This skill covers the Hamilton UI, SDK, and observability patterns for tracking and monitoring your dataflows in development and production.
Hamilton UI is a web-based dashboard for:
pip install "sf-hamilton[sdk,ui]"
# Start the UI server locally
hamilton ui
# UI will be available at http://localhost:8241
"""Add HamiltonTracker to your driver."""
from hamilton_sdk import adapters
from hamilton import driver
# Create tracker
tracker = adapters.HamiltonTracker(
project_id=1, # Your project ID from UI
username="your.email@example.com",
dag_name="my_pipeline",
tags={"environment": "dev", "team": "data-science"}
)
# Build driver with tracker
dr = driver.Builder()\
.with_config(your_config)\
.with_modules(*your_modules)\
.with_adapters(tracker)\
.build()
# Execute as normal - runs are automatically tracked!
results = dr.execute(['final_output'], inputs={'data_path': 'data.csv'})
Open http://localhost:8241 and see:
"""Minimal tracking setup."""
from hamilton_sdk import adapters
tracker = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="etl_pipeline"
)
# Attach to driver
dr = driver.Builder().with_adapters(tracker).build()
"""Use tags for filtering and organization."""
tracker = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="ml_training",
tags={
"environment": "production",
"model_version": "v2.1",
"team": "ml-platform",
"experiment_id": "exp_123"
}
)
# Tags appear in UI for filtering and search
"""Track async workflows."""
from hamilton import async_driver
from hamilton_sdk import adapters
tracker = adapters.AsyncHamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="async_rag_pipeline"
)
dr = await async_driver.Builder()\
.with_modules(async_module)\
.with_adapters(tracker)\
.build()
result = await dr.execute(['llm_response'], inputs={'query': 'test'})
Projects group related DAGs together:
# Create project via UI
# 1. Open http://localhost:8241
# 2. Click "New Project"
# 3. Name it (e.g., "Customer Analytics")
# 4. Get the project_id
# Or via API
import requests
response = requests.post(
"http://localhost:8241/api/v1/projects",
json={"name": "Customer Analytics", "description": "Customer data pipelines"}
)
project_id = response.json()['id']
"""Organize DAGs by team and environment."""
# Team A - Development
tracker_team_a_dev = adapters.HamiltonTracker(
project_id=1, # "Team A Analytics" project
username="user@example.com",
dag_name="user_segmentation",
tags={"team": "team-a", "env": "dev"}
)
# Team A - Production
tracker_team_a_prod = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="user_segmentation",
tags={"team": "team-a", "env": "prod"}
)
# Team B - Different project
tracker_team_b = adapters.HamiltonTracker(
project_id=2, # "Team B ML" project
username="user@example.com",
dag_name="recommendation_model",
tags={"team": "team-b", "env": "dev"}
)
When a DAG fails, the UI shows:
"""DAG fails at 'processed_data' node."""
# In UI:
# - Navigate to failed run
# - Click on red 'processed_data' node
# - See error: "ValueError: Cannot convert string to float"
# - Inspect inputs: raw_data contains 'N/A' strings
# - Fix data cleaning logic
Compare two DAG runs side-by-side:
"""Compare dev vs prod performance."""
# Run 1: Development (10 seconds)
# Run 2: Production (45 seconds)
# In UI:
# - Select both runs
# - Click "Compare"
# - See: 'feature_engineering' node is 8x slower in prod
# - Reason: Prod has 10x more data
# - Solution: Add caching or parallelize
Drill into any node to see:
Hamilton UI automatically tracks:
"""Track lineage across training and inference."""
# Training pipeline
training_tracker = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="model_training",
tags={"stage": "training", "model_version": "v2.1"}
)
# Inference pipeline (same project)
inference_tracker = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="model_inference",
tags={"stage": "inference", "model_version": "v2.1"}
)
# In UI: Filter by model_version="v2.1" to see both pipelines
"""Track production metrics."""
tracker = adapters.HamiltonTracker(
project_id=1,
username="service@example.com",
dag_name="production_etl",
tags={
"environment": "production",
"service": "data-pipeline",
"version": os.getenv("SERVICE_VERSION", "unknown"),
"host": os.getenv("HOSTNAME", "unknown")
}
)
# Monitor in UI:
# - Execution frequency (runs per hour)
# - Success rate (failures per day)
# - Execution time trends
# - Node-level performance
"""Set up failure notifications."""
# Hamilton UI can send alerts on:
# - DAG failures
# - Slow executions (> threshold)
# - Specific node failures
# Configure in UI:
# 1. Go to Project Settings
# 2. Set up webhook or email alerts
# 3. Define alert conditions
Track performance over time:
"""Monitor performance degradation."""
# Week 1: Average execution time = 5 minutes
# Week 2: Average execution time = 8 minutes
# Week 3: Average execution time = 12 minutes
# In UI:
# - View execution time chart
# - Identify 'data_processing' node is slowing down
# - Root cause: Data volume increased 3x
# - Solution: Add partitioning or switch to Spark
"""Track both Hamilton and MLflow."""
from hamilton_sdk import adapters
import mlflow
hamilton_tracker = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="ml_training"
)
# Use both adapters
dr = driver.Builder()\
.with_adapters(hamilton_tracker, mlflow_tracker)\
.build()
# Results tracked in both Hamilton UI and MLflow
"""Track Hamilton DAGs in Airflow tasks."""
from airflow import DAG
from airflow.operators.python import PythonOperator
from hamilton_sdk import adapters
def run_hamilton_pipeline(**context):
"""Execute Hamilton with tracking."""
tracker = adapters.HamiltonTracker(
project_id=1,
username="airflow@example.com",
dag_name="airflow_etl",
tags={
"airflow_dag": context['dag'].dag_id,
"airflow_run": context['run_id'],
"task": context['task_instance'].task_id
}
)
dr = driver.Builder()\
.with_modules(my_module)\
.with_adapters(tracker)\
.build()
return dr.execute(['output'], inputs=context['params'])
with DAG('my_dag', schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='hamilton_pipeline',
python_callable=run_hamilton_pipeline
)
"""Query Hamilton UI via SDK."""
from hamilton_sdk import client
# Connect to Hamilton UI
hc = client.HamiltonClient(
base_url="http://localhost:8241",
username="user@example.com"
)
# Get recent runs
runs = hc.get_runs(
project_id=1,
dag_name="my_pipeline",
limit=10
)
for run in runs:
print(f"Run {run.id}: {run.status} in {run.duration}s")
# Get specific run details
run_detail = hc.get_run(run_id=runs[0].id)
print(f"Inputs: {run_detail.inputs}")
print(f"Outputs: {run_detail.outputs}")
"""Add custom metadata to runs."""
tracker = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="my_pipeline",
tags={
"git_commit": subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip(),
"git_branch": subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).decode().strip(),
"dataset_version": "v2024.01",
"experiment_name": "baseline_v2"
}
)
# All metadata searchable in UI
# Check UI is running
curl http://localhost:8241/api/v1/ping
# Check tracker configuration
tracker = adapters.HamiltonTracker(
project_id=1, # Does this project exist?
username="user@example.com", # Is this user registered?
dag_name="my_pipeline",
api_url="http://localhost:8241" # Override if UI is on different host
)
"""Optimize tracking for large DAGs."""
tracker = adapters.HamiltonTracker(
project_id=1,
username="user@example.com",
dag_name="large_pipeline",
# Don't capture large outputs
capture_data_statistics=False, # Skip stats collection
# Or be selective about what to capture
)
/hamilton-core/hamilton-scale