Help us improve
Share bugs, ideas, or general feedback.
From studio-skills
Build agentic workflow pipelines in Python combining deterministic flow control with AI reasoning via Studio Agent and MCP tools.
npx claudepluginhub treasure-data/td-skillsHow this skill is triggered — by the user, by Claude, or both
Slash command
/studio-skills:graflowThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Build Python workflow pipelines combining **deterministic flow control** (branching, loops, parallelism) with **AI reasoning** (Studio Agent + MCP tools).
Provides proven architectural patterns for n8n workflows covering webhook processing, HTTP API integration, database operations, AI agents, batch processing, and scheduled tasks. Use when building or designing automations.
Generates Dify workflow DSL files (YAML/JSON) from natural language descriptions, with correct node schemas, edges, and layout for direct import.
Validates, runs, and debugs multi-agent YAML workflows. Use when orchestrating AI agents, configuring routing, or setting up human-in-the-loop gates.
Share bugs, ideas, or general feedback.
Build Python workflow pipelines combining deterministic flow control (branching, loops, parallelism) with AI reasoning (Studio Agent + MCP tools).
Docs: https://graflow.ai/
| Task | Guide |
|---|---|
| Build a new workflow | Follow the 3-phase process below |
| Add Studio Agent to a task | See Studio Agent Integration |
| Use CLI tools without agent overhead | See Subprocess Tasks |
| Add branching or loops | See Dynamic Control Flow |
| Add human approval gates | See HITL Patterns |
| Configure triggers and tool permissions | See manifest.yml Reference |
| Browse templates | See templates/ |
| Use Case | Tool |
|---|---|
| Multi-step process with branching, parallelism, HITL | Graflow |
| Open-ended AI reasoning (research, analysis) | Scheduled Task (TASK.md) |
| Data pipeline on TD platform | TD Workflow (digdag) |
Every task in a Graflow workflow falls into one of three categories. Choosing the right type for each step is the most important design decision — it affects performance, cost, and reliability.
| Type | When to Use | Example |
|---|---|---|
| Deterministic (pure Python) | Scoring, filtering, classification, data transforms | Categorize items by keyword, compute risk scores |
| Studio Agent (AI reasoning) | Need MCP tools, natural language analysis, content generation | Summarize data, draft reports, post to Slack |
| Subprocess (CLI tools) | Call external CLIs directly — faster and more reliable than routing through an agent | gh, tdx, curl, jq |
The subprocess type is important because spawning an agent session just to run a single CLI command adds significant latency and token cost. If the task is "fetch data from a CLI tool," subprocess.run() is the right choice.
from graflow import task, workflow
from graflow.core.context import TaskExecutionContext
from studio_agent import StudioAgent
import subprocess, json
# 1. Deterministic — pure logic, no I/O overhead
@task
def score(accounts: list) -> list:
return [a for a in accounts if a["health"] < 0.5]
# 2. Studio Agent — AI reasoning + MCP tools
@task
def analyze(account_id: str):
with StudioAgent() as agent:
return agent.run(
f"Check activity for account {account_id} "
f"and assess churn risk. Return as JSON."
)["output"]
# 3. Subprocess — call a CLI tool directly
@task(inject_context=True)
def fetch_data(ctx: TaskExecutionContext) -> list:
result = subprocess.run(
["gh", "api", "/repos/owner/repo/issues", "--paginate"],
capture_output=True, text=True, timeout=60,
)
if result.returncode != 0:
raise RuntimeError(f"gh failed: {result.stderr}")
data = json.loads(result.stdout)
ctx.get_channel().set("data", data)
return data
a >> b # Sequential: b after a
a | b # Parallel: a and b concurrently
a >> (b | c) >> d # Fan-out/fan-in
(a | b) >> c >> (d | e) # Multi-stage pipeline
Delegate AI reasoning to Studio's Claude Agent SDK with access to 100+ MCP tools (Slack, Google Calendar, CRM, etc.).
Canonical pattern: open a fresh StudioAgent inside each task and let the with block close it when the task ends. Pass state between tasks through channel set/get — not through shared agent memory.
from graflow import task, workflow
from graflow.core.context import TaskExecutionContext
from studio_agent import StudioAgent
@task(inject_context=True)
def research(ctx: TaskExecutionContext) -> str:
with StudioAgent() as agent:
result = agent.run(
"Find all active deals closing this week. Return as JSON."
)
summaries = result["output"]
ctx.get_channel().set("summaries", summaries)
return summaries
@task(inject_context=True)
def notify(ctx: TaskExecutionContext) -> str:
summaries = ctx.get_channel().get("summaries")
with StudioAgent() as agent:
# Name the tool explicitly so the agent doesn't guess
result = agent.run(
f"Deals:\n{summaries}\n\n"
f"Post a one-paragraph summary to #sales on Slack. "
f"Use slack_post_message tool."
)
return result["output"]
with workflow("example") as ctx:
research >> notify
ctx.execute("research")
Agent prompt tips:
Do not register an agent at the workflow level (ctx.register_llm_agent(...)) and share it across tasks. That keeps a Claude session open through every intervening deterministic task and couples tasks to implicit agent memory instead of explicit channel state. See Studio Agent Bridge for the one narrow exception (multi-turn chain-of-thought inside a single task) and the full StudioAgent API.
When a task just calls a CLI tool and parses its output, use subprocess.run() directly instead of routing through a Studio Agent. This avoids agent session overhead (typically 30-60s per call) and gives deterministic, reproducible results.
import subprocess, json
@task(inject_context=True)
def fetch_from_cli(ctx: TaskExecutionContext) -> list:
"""Call an external CLI and parse its JSON output."""
result = subprocess.run(
["some-cli", "list", "--json", "--limit", "100"],
capture_output=True, text=True, timeout=60,
)
if result.returncode != 0:
raise RuntimeError(f"CLI failed: {result.stderr}")
items = json.loads(result.stdout)
ctx.get_channel().set("items", items)
return items
When using subprocess, add the corresponding CLI to permissions.allow in manifest.yml:
permissions:
allow:
- "Bash(some-cli:*)" # Argument-level pattern for the CLI
Tasks share data through channels. There are two approaches:
Explicit channel set/get — Reliable and recommended for complex workflows. Works well when multiple downstream tasks need the same data, or when the channel key name differs from the parameter name.
@task(inject_context=True)
def producer(ctx: TaskExecutionContext):
data = compute_something()
ctx.get_channel().set("results", data)
return data
@task(inject_context=True)
def consumer(ctx: TaskExecutionContext):
data = ctx.get_channel().get("results")
Auto keyword resolution — Graflow can auto-resolve task parameters from matching channel keys. Simpler for short pipelines where each value is consumed by exactly one downstream task.
@task(inject_context=True)
def setup(ctx: TaskExecutionContext):
ctx.get_channel().set("threshold", 0.5)
@task
def check(threshold: float): # Auto-resolved from channel
print(threshold) # 0.5
For workflows with more than 3-4 tasks, or where the data flow isn't strictly linear, prefer explicit channel set/get. It's clearer about what data flows where, and avoids subtle bugs when parameter names don't exactly match channel keys.
Priority: channel < bound parameter < injection
When upstream tasks might return empty results, handle this gracefully instead of letting the workflow fail. Continue with a sensible default rather than terminating.
@task(inject_context=True)
def process(ctx: TaskExecutionContext) -> str:
items = ctx.get_channel().get("items")
if not items:
summary = "No items found for this period."
ctx.get_channel().set("summary", summary)
return summary
# ... normal processing ...
Use ctx.terminate_workflow() only for conditions where continuing would be meaningless (not just empty data). For most cases, propagate a "nothing to report" message through the pipeline so the final task can still produce useful output.
# Conditional branching — skip to a specific task
@task(inject_context=True)
def classify(context, data: dict):
if data["priority"] == "high":
context.next_task(urgent_handler, goto=True)
elif data["priority"] == "low":
context.terminate_workflow("Low priority — skipped")
# Iterative refinement — loop until convergence
@task(inject_context=True, max_cycles=10)
def refine(context, draft: str = ""):
improved = improve(draft)
if quality_score(improved) >= 0.9:
return improved
context.next_iteration({"draft": improved})
from graflow.core.task import ParallelGroup
from graflow.coordination.executor import ExecutionPolicy
# Best effort — continue even if some tasks fail
ParallelGroup([a, b, c], name="checks", execution_policy=ExecutionPolicy.BEST_EFFORT)
# At least N — succeed if 2 of 3 complete
ParallelGroup([a, b, c], name="quorum", execution_policy=ExecutionPolicy.AT_LEAST_N, min_successes=2)
Every workflow directory contains a manifest, the workflow code, and pinned dependencies, plus optional supporting files:
{workflow-id}/
├── manifest.yml # Metadata, triggers, permissions, execution config
├── workflow.py # Graflow workflow definition
├── requirements.txt # Python dependencies (pinned versions)
├── guide.md # Human-readable description / run notes (optional)
├── reference/ # Context files for AI nodes (optional)
└── data/ # Input data, configs (optional)
manifest.yml (minimal):
name: my-workflow
profile: "@tdx-studio:<site>:<account-id>:<user-id>" # AUTO-STAMPED by Studio at create time — DO NOT hand-author
description: What this workflow does
permissions:
allow:
- "mcp__tdx-studio__slack_post_message" # Full MCP tool name
- "Bash(gh:*)" # CLI with argument pattern
triggers:
- type: cron
schedule: "0 9 * * *"
execution:
timeout: 600 # Budget ~120s per agent task
results:
max_retained: 30
See manifest.yml Reference for all trigger types, the permissions.allow syntax, and every option.
requirements.txt:
graflow>=0.1.8
studio-agent @ file:///path/to/studio/python/studio_agent
Studio's scaffold fills in the correct file:// path to the bundled studio_agent package automatically.
Pin dependency versions. Avoid open-ended ranges for production workflows.
Gather requirements — Ask the user:
Create design document (workflow_design.md):
# Workflow Design: {name}
## Tasks
| Task ID | Type | Responsibility |
|---------|------|---------------|
| fetch | Subprocess | Pull data from external CLI |
| summarize | Studio Agent | Summarize and analyze data |
| categorize | Deterministic | Classify items by rules |
| draft | Studio Agent | Compose report |
| post | Studio Agent | Post to Slack |
## Task Graph
fetch >> summarize >> categorize >> draft >> post
## Studio Agent Usage
- summarize: ephemeral session; receives raw data via channel
- draft: ephemeral session; receives categorized data via channel
- post: ephemeral session; receives draft via channel; use slack_post_message tool
Write workflow.py + manifest.yml + requirements.txt based on the approved design.
Checklist:
@task decorators with appropriate injections (inject_context=True when using channels or control flow)>> / | composition matching the design graphwith StudioAgent() as agent: blockchannel.set() / channel.get()subprocess.run() with timeout and error checkingterminate_workflow)manifest.yml with correct triggers, a minimal permissions.allow list, and adequate timeoutrequirements.txtChecklist:
manifest.yml triggers are correct and permissions.allow is as narrow as possibleexecution.timeout is large enough — budget ~120s per agent task plus time for subprocess/deterministic tasksCommon issues:
inject_context=True when using ctx.get_channel() or context.next_task()ctx.register_llm_agent(...) instead of the ephemeral-per-task patternBash(cli:*) to permissions.allowtimeout: 300 being too short for workflows with multiple agent tasksterminate_workflow for empty data when the pipeline should continue with a "nothing to report" message| Template | Pattern | Use Case |
|---|---|---|
| simple-pipeline.py | fetch >> process >> notify | Starting point for new workflows |
| cs-health-check.py | scan >> evaluate >> alert | CS account monitoring |
| sdr-sequence.py | qualify >> (enrich_co | enrich_ct) >> personalize >> schedule | SDR outreach automation |
| parallel-enrichment.py | fetch >> (crm | usage | support) >> merge >> report | Multi-source data enrichment |
| hitl-approval.py | draft >> approve >> send/revise | Human-gated content delivery with Studio approval UI |
permissions.allow syntax (including argument-level patterns), execution settings.dig files, td> operator)tdx wf