Implements stateful agent graphs using LangGraph. Use when building graphs, adding nodes/edges, defining state schemas, implementing checkpointing, handling interrupts, or creating multi-agent systems with LangGraph.
/plugin marketplace add existential-birds/beagle/plugin install beagle@existential-birdsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
PATTERNS.mdLangGraph builds stateful, multi-actor agent applications using a graph-based architecture:
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDict
class State(TypedDict):
counter: int # LastValue - stores last value
messages: Annotated[list, operator.add] # Reducer - appends lists
items: Annotated[list, lambda a, b: a + [b] if b else a] # Custom reducer
from langgraph.graph.message import MessagesState
class State(MessagesState):
# Inherits: messages: Annotated[list[AnyMessage], add_messages]
user_id: str
context: dict
from pydantic import BaseModel
class State(BaseModel):
messages: Annotated[list, add_messages]
validated_field: str # Pydantic validates on assignment
builder = StateGraph(State)
# Add nodes - functions that take state, return partial updates
builder.add_node("process", process_fn)
builder.add_node("decide", decide_fn)
# Add edges
builder.add_edge(START, "process")
builder.add_edge("process", "decide")
builder.add_edge("decide", END)
# Compile
graph = builder.compile()
def my_node(state: State) -> dict:
"""Node receives full state, returns partial update."""
return {"counter": state["counter"] + 1}
# With config access
def my_node(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"result": process(state, thread_id)}
# With Runtime context (v0.6+)
def my_node(state: State, runtime: Runtime[Context]) -> dict:
user_id = runtime.context.get("user_id")
return {"result": user_id}
from typing import Literal
def router(state: State) -> Literal["agent", "tools", "__end__"]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return END # or "__end__"
builder.add_conditional_edges("agent", router)
# With path_map for visualization
builder.add_conditional_edges(
"agent",
router,
path_map={"agent": "agent", "tools": "tools", "__end__": END}
)
from langgraph.types import Command
def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
if state["should_continue"]:
return Command(goto="next", update={"step": state["step"] + 1})
return Command(goto=END)
# Must declare destinations for visualization
builder.add_node("dynamic", dynamic_node, destinations=["next", END])
from langgraph.types import Send
def fan_out(state: State) -> list[Send]:
"""Route to multiple node instances with different inputs."""
return [Send("worker", {"item": item}) for item in state["items"]]
builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate") # Workers converge
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver # Development
from langgraph.checkpoint.postgres import PostgresSaver # Production
# In-memory (testing only)
graph = builder.compile(checkpointer=InMemorySaver())
# SQLite (development)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# Thread-based invocation
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)
# Get current state
state = graph.get_state(config)
# Get state history
for state in graph.get_state_history(config):
print(state.values, state.next)
# Update state manually
graph.update_state(config, {"key": "new_value"}, as_node="node_name")
from langgraph.types import interrupt, Command
def review_node(state: State) -> dict:
# Pause and surface value to client
human_input = interrupt({"question": "Please review", "data": state["draft"]})
return {"approved": human_input["approved"]}
# Resume with Command
graph.invoke(Command(resume={"approved": True}), config)
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # Pause before node
interrupt_after=["agent"], # Pause after node
)
# Check pending interrupts
state = graph.get_state(config)
if state.next: # Has pending nodes
# Resume
graph.invoke(None, config)
# Stream modes: "values", "updates", "custom", "messages", "debug"
# Updates only (node outputs)
for chunk in graph.stream(input, stream_mode="updates"):
print(chunk) # {"node_name": {"key": "value"}}
# Full state after each step
for chunk in graph.stream(input, stream_mode="values"):
print(chunk)
# Multiple modes
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]):
if mode == "messages":
print("Token:", chunk)
# Custom streaming from within nodes
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": 0.5}) # Custom event
return {"result": "done"}
# Define subgraph
sub_builder = StateGraph(SubState)
sub_builder.add_node("step", step_fn)
sub_builder.add_edge(START, "step")
subgraph = sub_builder.compile()
# Use as node in parent
parent_builder = StateGraph(ParentState)
parent_builder.add_node("subprocess", subgraph)
parent_builder.add_edge(START, "subprocess")
# Subgraph checkpointing
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent (default)
# checkpointer=True, # Use persistent checkpointing
# checkpointer=False, # Disable checkpointing
)
from langgraph.types import RetryPolicy, CachePolicy
retry = RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_attempts=3,
retry_on=ValueError, # Or callable: lambda e: isinstance(e, ValueError)
)
cache = CachePolicy(ttl=3600) # Cache for 1 hour
builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)
from langgraph.prebuilt import create_react_agent, ToolNode
# Simple agent
graph = create_react_agent(
model="anthropic:claude-3-5-sonnet",
tools=[my_tool],
prompt="You are a helpful assistant",
checkpointer=InMemorySaver(),
)
# Custom tool node
tool_node = ToolNode([tool1, tool2])
builder.add_node("tools", tool_node)
def should_continue(state) -> Literal["tools", "__end__"]:
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
# Multiple nodes execute in parallel when they share the same trigger
builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b") # Runs parallel with node_a
builder.add_edge(["node_a", "node_b"], "join") # Wait for both
See PATTERNS.md for advanced patterns including multi-agent systems, hierarchical graphs, and complex workflows.
Use when working with Payload CMS projects (payload.config.ts, collections, fields, hooks, access control, Payload API). Use when debugging validation errors, security issues, relationship queries, transactions, or hook behavior.
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.