From htmlgraph
Orchestrate parallel agent workflows using HtmlGraph's ParallelWorkflow. Activate when planning multi-agent work, using Task tool for sub-agents, or coordinating concurrent feature implementation.
npx claudepluginhub shakestzd/htmlgraphThis skill uses the workspace's default tool permissions.
**Trigger keywords:**
Provides Ktor server patterns for routing DSL, plugins (auth, CORS, serialization), Koin DI, WebSockets, services, and testApplication testing.
Conducts multi-source web research with firecrawl and exa MCPs: searches, scrapes pages, synthesizes cited reports. For deep dives, competitive analysis, tech evaluations, or due diligence.
Provides demand forecasting, safety stock optimization, replenishment planning, and promotional lift estimation for multi-location retailers managing 300-800 SKUs.
Trigger keywords:
Trigger situations:
HtmlGraph provides ParallelWorkflow for optimal parallel agent execution:
┌─────────────────────────────────────────────────────────────────┐
│ 1. ANALYZE → Check dependencies, assess risks │
│ 2. PREPARE → Cache shared context, isolate tasks │
│ 3. DISPATCH → Generate prompts, spawn agents │
│ 4. MONITOR → Track health, detect anti-patterns │
│ 5. AGGREGATE → Collect results, check conflicts │
│ 6. VALIDATE → Verify outputs, update dependencies │
└─────────────────────────────────────────────────────────────────┘
Always run analysis before dispatching agents!
from htmlgraph import SDK
sdk = SDK(agent="orchestrator")
# Method 1: Quick check
parallel = sdk.get_parallel_work(max_agents=5)
print(f"Max parallelism: {parallel['max_parallelism']}")
print(f"Ready now: {parallel['ready_now']}")
# Method 2: Full analysis with ParallelWorkflow
plan = sdk.plan_parallel_work(max_agents=3)
if plan["can_parallelize"]:
print(f"✅ Parallelize {plan['task_count']} tasks")
print(f" Speedup: {plan['speedup_factor']:.1f}x")
print(f" Ready: {plan['ready_tasks']}")
else:
print(f"⚠️ {plan['recommendation']}")
| Condition | Action |
|---|---|
max_parallelism >= 2 | Can parallelize |
len(ready_tasks) < 2 | Work sequentially |
| Shared file edits | Partition or sequence |
speedup < 1.5x | May not be worth cost |
Reduce redundant file reads by pre-caching shared context:
# Identify files ALL agents need
shared_files = [
"src/models.py", # Data models
"src/config.py", # Configuration
"tests/conftest.py", # Test fixtures
]
# Plan with shared context
plan = sdk.plan_parallel_work(
max_agents=3,
shared_files=shared_files
)
CRITICAL: Send ALL Task calls in a SINGLE message for true parallelism!
# Get ready-to-use prompts
prompts = plan["prompts"]
# CORRECT: All in one message (parallel)
for p in prompts:
Task(
subagent_type="general-purpose",
prompt=p["prompt"],
description=p["description"]
)
# WRONG: Sequential messages (not parallel)
# result1 = Task(...) # Wait for completion
# result2 = Task(...) # Then next one
Each prompt includes:
## Task: {feature_id}
Title: {title}
Priority: {priority}
## Your Assignment
{specific_instructions}
## Pre-Cached Context (DO NOT re-read these)
- models.py: Contains User, Session, Feature classes
- config.py: DATABASE_URL, API_KEY settings
## Files to AVOID (other agents editing)
- {files_assigned_to_other_agents}
## Efficiency Guidelines
- Use Grep before Read (search then read)
- Batch Edit operations
- Mark feature complete when done
Agents track their own health via transcript analytics:
| Metric | Healthy | Warning |
|---|---|---|
| Retry rate | < 30% | > 30% |
| Context rebuilds | < 5 | > 5 |
| Tool diversity | > 30% | < 30% |
| Overall health | > 70% | < 70% |
# These patterns trigger warnings:
("Read", "Read", "Read") # Cache instead
("Edit", "Edit", "Edit") # Batch edits
("Bash", "Bash", "Bash", "Bash") # Check errors
("Grep", "Grep", "Grep") # Read results first
After all agents complete:
# Collect agent IDs from Task tool responses
agent_ids = ["agent-abc123", "agent-def456", "agent-ghi789"]
# Aggregate with SDK
results = sdk.aggregate_parallel_results(agent_ids)
print(f"Successful: {results['successful']}/{results['total_agents']}")
print(f"Health: {results['avg_health_score']:.0%}")
print(f"Speedup: {results['parallel_speedup']:.1f}x")
print(f"Conflicts: {results['conflicts']}")
print(f"Anti-patterns: {results['total_anti_patterns']}")
{
"total_agents": 3,
"successful": 3,
"failed": 0,
"total_duration_seconds": 450.0,
"parallel_speedup": 2.3,
"avg_health_score": 0.80,
"total_anti_patterns": 4,
"files_modified": ["auth.py", "api.py", "tests/..."],
"conflicts": [], # Empty = good!
"recommendations": [...],
"validation": {
"no_conflicts": True,
"all_successful": True,
"healthy_execution": True,
},
"all_passed": True
}
if results["all_passed"]:
print("✅ Parallel execution validated!")
# Commit all changes together
else:
# Handle issues
for rec in results["recommendations"]:
print(f"⚠️ {rec}")
| Pattern | Why |
|---|---|
Grep → Read | Search before reading |
Read → Edit → Bash | Read, modify, test |
Glob → Read | Find files first |
| Single Task message | True parallelism |
| Pattern | Problem | Fix |
|---|---|---|
Read → Read → Read | Redundant reads | Cache content |
Edit → Edit → Edit | Unbatched | Combine edits |
| Sequential Task calls | No parallelism | Single message |
| Overlapping files | Conflicts | Isolate scope |
| Situation | Reason | Alternative |
|---|---|---|
| Shared dependencies | Conflicts | Sequential + handoff |
| Tasks < 1 minute | Overhead not worth it | Sequential |
| Overlapping files | Merge conflicts | Partition files |
| Complex coordination | Risk of errors | Plan agent |
htmlgraph-trackerstrategic-planningfind_bottlenecks() before parallel dispatchfrom htmlgraph import SDK
sdk = SDK(agent="orchestrator")
# 1. Plan
plan = sdk.plan_parallel_work(max_agents=3)
# 2. Check
if plan["can_parallelize"]:
# 3. Dispatch (all at once!)
for p in plan["prompts"]:
Task(prompt=p["prompt"], ...)
# 4. Aggregate (after completion)
results = sdk.aggregate_parallel_results(agent_ids)
# 5. Validate
if results["all_passed"]:
print("✅ Success!")
sdk.get_parallel_work()sdk.find_bottlenecks()