npx claudepluginhub ibrahim-3d/conductor-orchestrator-superpowersWant just this skill?
Then install: npx claudepluginhub u/[userId]/[slug]
File-based message queue for inter-agent coordination. Used by workers AND board directors to communicate. Provides: progress updates, task completion signals, file locking, board deliberation. Core infrastructure for parallel execution.
This skill uses the workspace's default tool permissions.
scripts/init-bus.pyscripts/monitor-bus.pyMessage Bus -- Inter-Agent Communication Protocol
File-based message queue enabling workers and board directors to coordinate via shared state.
Directory Structure
conductor/tracks/{track}/.message-bus/
├── queue.jsonl # Append-only message log (all messages)
├── locks.json # Current file locks
├── worker-status.json # Worker heartbeats and states
├── events/ # Signal files for polling
│ ├── TASK_COMPLETE_1.1.event
│ └── FILE_UNLOCK_*.event
└── board/ # Board deliberation sessions
├── session-{ts}.json # Session metadata
├── assessments.json # Director assessments (Phase 1)
├── discussion.jsonl # Discussion messages (Phase 2)
└── votes.json # Final votes (Phase 3)
Message Types
Worker Messages
| Type | Purpose | Payload |
|---|---|---|
PROGRESS | Task progress update | { task_id, progress_pct, current_subtask } |
TASK_COMPLETE | Task finished | { task_id, commit_sha, files_modified, unblocks[] } |
TASK_FAILED | Task failed | { task_id, error, stack_trace } |
FILE_LOCK | Acquire file lock | { filepath, lock_type, expires_at } |
FILE_UNLOCK | Release file lock | { filepath } |
BLOCKED | Waiting on dependency | { task_id, waiting_for, resource } |
Board Messages
| Type | Purpose | Payload |
|---|---|---|
BOARD_ASSESS | Director assessment | { director, verdict, score, concerns[], recommendations[] } |
BOARD_DISCUSS | Discussion message | { from, to, type, message, changes_my_verdict } |
BOARD_VOTE | Final vote | { director, final_verdict, confidence, conditions[] } |
BOARD_RESOLVE | Aggregated decision | { verdict, vote_summary, conditions[], dissent[] } |
Message Format
All messages follow this structure:
{
"id": "msg-{uuid}",
"type": "PROGRESS | TASK_COMPLETE | BOARD_ASSESS | ...",
"source": "worker-1.1-xxx | CA | orchestrator",
"timestamp": "2026-02-01T12:00:00Z",
"payload": { ... }
}
Worker Protocol
Posting Messages
def post_message(bus_path: str, msg_type: str, source: str, payload: dict):
message = {
"id": f"msg-{uuid4()}",
"type": msg_type,
"source": source,
"timestamp": datetime.utcnow().isoformat() + "Z",
"payload": payload
}
# Append to queue (atomic via file locking)
with open(f"{bus_path}/queue.jsonl", "a") as f:
f.write_file(json.dumps(message) + "\n")
# Create event file for polling
if msg_type in ["TASK_COMPLETE", "FILE_UNLOCK", "BOARD_RESOLVE"]:
event_file = f"{bus_path}/events/{msg_type}_{payload.get('task_id', 'all')}.event"
Path(event_file).touch()
Reading Messages
def read_messages(bus_path: str, since: str = None, msg_type: str = None) -> list:
messages = []
with open(f"{bus_path}/queue.jsonl", "r") as f:
for line in f:
msg = json.loads(line)
if since and msg["timestamp"] < since:
continue
if msg_type and msg["type"] != msg_type:
continue
messages.append(msg)
return messages
Polling for Events
def wait_for_event(bus_path: str, event_pattern: str, timeout: int = 300) -> bool:
"""Wait for event file to appear. Returns True if found, False if timeout."""
import glob
import time
start = time.time()
while time.time() - start < timeout:
matches = glob.glob(f"{bus_path}/events/{event_pattern}")
if matches:
return True
time.sleep(1)
return False
File Lock Protocol
Acquiring Locks
def acquire_lock(bus_path: str, filepath: str, worker_id: str) -> bool:
locks_file = f"{bus_path}/locks.json"
# read_file current locks
locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}
# Check if already locked
if filepath in locks:
existing = locks[filepath]
# Check if expired (30 min timeout)
if datetime.fromisoformat(existing["expires_at"]) > datetime.utcnow():
if existing["worker_id"] != worker_id:
return False # Locked by another worker
# Acquire lock
locks[filepath] = {
"worker_id": worker_id,
"acquired_at": datetime.utcnow().isoformat() + "Z",
"expires_at": (datetime.utcnow() + timedelta(minutes=30)).isoformat() + "Z"
}
with open(locks_file, "w") as f:
json.dump(locks, f, indent=2)
# Post lock message
post_message(bus_path, "FILE_LOCK", worker_id, {"filepath": filepath})
return True
Releasing Locks
def release_lock(bus_path: str, filepath: str, worker_id: str):
locks_file = f"{bus_path}/locks.json"
locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}
if filepath in locks and locks[filepath]["worker_id"] == worker_id:
del locks[filepath]
with open(locks_file, "w") as f:
json.dump(locks, f, indent=2)
# Post unlock message and event
post_message(bus_path, "FILE_UNLOCK", worker_id, {"filepath": filepath})
Worker Status Heartbeat
Workers post heartbeats every 5 minutes:
def update_worker_status(bus_path: str, worker_id: str, task_id: str, status: str, progress: int):
status_file = f"{bus_path}/worker-status.json"
statuses = json.load(open(status_file)) if os.path.exists(status_file) else {}
statuses[worker_id] = {
"task_id": task_id,
"status": status, # "RUNNING" | "COMPLETE" | "FAILED" | "BLOCKED"
"progress_pct": progress,
"last_heartbeat": datetime.utcnow().isoformat() + "Z"
}
with open(status_file, "w") as f:
json.dump(statuses, f, indent=2)
Board Deliberation Protocol
Phase 1: Assessment
Each director posts their assessment:
def post_board_assessment(bus_path: str, director: str, assessment: dict):
board_path = f"{bus_path}/board"
# read_file existing assessments
assess_file = f"{board_path}/assessments.json"
assessments = json.load(open(assess_file)) if os.path.exists(assess_file) else {}
# Add this director's assessment
assessments[director] = assessment
with open(assess_file, "w") as f:
json.dump(assessments, f, indent=2)
# Post to main queue too
post_message(bus_path, "BOARD_ASSESS", director, assessment)
Phase 2: Discussion
Directors respond to each other:
def post_board_discussion(bus_path: str, from_dir: str, to_dir: str,
msg_type: str, message: str, changes_verdict: bool):
board_path = f"{bus_path}/board"
discussion_msg = {
"from": from_dir,
"to": to_dir,
"type": msg_type, # "CHALLENGE" | "AGREE" | "QUESTION" | "CLARIFY"
"message": message,
"changes_my_verdict": changes_verdict,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
# Append to discussion log
with open(f"{board_path}/discussion.jsonl", "a") as f:
f.write_file(json.dumps(discussion_msg) + "\n")
# Post to main queue
post_message(bus_path, "BOARD_DISCUSS", from_dir, discussion_msg)
Phase 3: Voting
Directors cast final votes:
def post_board_vote(bus_path: str, director: str, verdict: str,
confidence: float, conditions: list):
board_path = f"{bus_path}/board"
votes_file = f"{board_path}/votes.json"
votes = json.load(open(votes_file)) if os.path.exists(votes_file) else {}
votes[director] = {
"final_verdict": verdict, # "APPROVE" | "REJECT"
"confidence": confidence, # 0.0 - 1.0
"conditions": conditions,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
with open(votes_file, "w") as f:
json.dump(votes, f, indent=2)
post_message(bus_path, "BOARD_VOTE", director, votes[director])
Phase 4: Resolution
Orchestrator aggregates votes:
def resolve_board_vote(bus_path: str) -> dict:
board_path = f"{bus_path}/board"
votes = json.load(open(f"{board_path}/votes.json"))
approve_count = sum(1 for v in votes.values() if v["final_verdict"] == "APPROVE")
reject_count = len(votes) - approve_count
# Determine verdict
if approve_count >= 4:
verdict = "APPROVED"
elif approve_count == 3:
verdict = "APPROVED_WITH_REVIEW"
elif reject_count >= 4:
verdict = "REJECTED"
elif reject_count == 3:
verdict = "REJECTED"
else:
verdict = "ESCALATE"
# Collect conditions
all_conditions = []
for director, vote in votes.items():
for cond in vote.get("conditions", []):
all_conditions.append(f"{cond} ({director})")
resolution = {
"verdict": verdict,
"vote_summary": {d: v["final_verdict"] for d, v in votes.items()},
"conditions": all_conditions,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
# Post resolution
post_message(bus_path, "BOARD_RESOLVE", "orchestrator", resolution)
# Create event file
Path(f"{bus_path}/events/BOARD_RESOLVE.event").touch()
return resolution
Deadlock Detection
Monitor for circular waits:
def detect_deadlock(bus_path: str) -> list:
"""Returns list of workers in deadlock cycle, or empty if none."""
status_file = f"{bus_path}/worker-status.json"
locks_file = f"{bus_path}/locks.json"
statuses = json.load(open(status_file)) if os.path.exists(status_file) else {}
locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}
# Build wait-for graph
# worker -> worker it's waiting for
wait_for = {}
# Find blocked workers
blocked_msgs = read_messages(bus_path, msg_type="BLOCKED")
for msg in blocked_msgs:
blocker = msg["payload"].get("waiting_for")
if blocker:
wait_for[msg["source"]] = blocker
# Detect cycles using DFS
def find_cycle(start, visited, path):
if start in path:
return path[path.index(start):]
if start in visited:
return []
visited.add(start)
path.append(start)
if start in wait_for:
cycle = find_cycle(wait_for[start], visited, path)
if cycle:
return cycle
path.pop()
return []
visited = set()
for worker in wait_for:
cycle = find_cycle(worker, visited, [])
if cycle:
return cycle
return []
Initialization
Initialize message bus for a track:
def init_message_bus(track_path: str):
bus_path = f"{track_path}/.message-bus"
# Create directories
os.makedirs(bus_path, exist_ok=True)
os.makedirs(f"{bus_path}/events", exist_ok=True)
os.makedirs(f"{bus_path}/board", exist_ok=True)
# Initialize files
Path(f"{bus_path}/queue.jsonl").touch()
with open(f"{bus_path}/locks.json", "w") as f:
json.dump({}, f)
with open(f"{bus_path}/worker-status.json", "w") as f:
json.dump({}, f)
with open(f"{bus_path}/board/assessments.json", "w") as f:
json.dump({}, f)
with open(f"{bus_path}/board/votes.json", "w") as f:
json.dump({}, f)
Path(f"{bus_path}/board/discussion.jsonl").touch()
Usage in Worker Agents
## Worker Protocol
1. **On Start**:
- read_file message bus for TASK_COMPLETE events of dependencies
- Verify all dependencies are met
- Update worker-status.json with RUNNING
2. **Before Modifying Files**:
- Call acquire_lock() for each file
- If lock fails, post BLOCKED message and wait
3. **During Execution**:
- Post PROGRESS every 5 minutes
- Update worker-status.json heartbeat
4. **On Completion**:
- Release all file locks
- Post TASK_COMPLETE with commit SHA and files modified
- Update worker-status.json with COMPLETE
5. **On Failure**:
- Release all file locks
- Post TASK_FAILED with error details
- Update worker-status.json with FAILED
Usage in Board Deliberation
## Board Protocol
1. **Phase 1 (ASSESS)**:
- All 5 directors read_file proposal
- Each posts BOARD_ASSESS to assessments.json
- Wait for all 5 assessments
2. **Phase 2 (DISCUSS)** -- 3 rounds:
- Directors read_file others' assessments
- Post BOARD_DISCUSS messages
- Respond to challenges and questions
3. **Phase 3 (VOTE)**:
- Each director posts BOARD_VOTE
- Include confidence level and conditions
4. **Phase 4 (RESOLVE)**:
- Orchestrator calls resolve_board_vote()
- Posts BOARD_RESOLVE
- Creates event file for completion
Board Session Management
Creating a Board Session
def create_board_session(bus_path: str, checkpoint: str, proposal: dict) -> str:
"""Initialize a new board session for deliberation."""
board_path = f"{bus_path}/board"
session_id = f"board-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
session = {
"session_id": session_id,
"checkpoint": checkpoint, # "EVALUATE_PLAN" | "EVALUATE_EXECUTION" | "PRE_LAUNCH"
"status": "ASSESSING",
"proposal": proposal,
"directors": ["CA", "CPO", "CSO", "COO", "CXO"],
"started_at": datetime.utcnow().isoformat() + "Z",
"phases": {
"assess": {"status": "IN_PROGRESS", "complete": 0, "of": 5},
"discuss": {"status": "NOT_STARTED", "rounds": 0, "max_rounds": 3},
"vote": {"status": "NOT_STARTED", "complete": 0, "of": 5},
"resolve": {"status": "NOT_STARTED"}
}
}
# Clear previous session data
with open(f"{board_path}/assessments.json", "w") as f:
json.dump({}, f, indent=2)
with open(f"{board_path}/votes.json", "w") as f:
json.dump({}, f, indent=2)
Path(f"{board_path}/discussion.jsonl").write_text("")
# Save session metadata
with open(f"{board_path}/session-{session_id}.json", "w") as f:
json.dump(session, f, indent=2)
return session_id
Checking Phase Completion
def check_board_phase_complete(bus_path: str, session_id: str) -> dict:
"""Check if current board phase is complete and advance if ready."""
board_path = f"{bus_path}/board"
session_file = f"{board_path}/session-{session_id}.json"
session = json.load(open(session_file))
assessments = json.load(open(f"{board_path}/assessments.json"))
votes = json.load(open(f"{board_path}/votes.json"))
discussions = []
with open(f"{board_path}/discussion.jsonl") as f:
discussions = [json.loads(l) for l in f if l.strip()]
result = {"phase": session["status"], "complete": False, "can_advance": False}
if session["status"] == "ASSESSING":
session["phases"]["assess"]["complete"] = len(assessments)
if len(assessments) >= 5:
result["complete"] = True
result["can_advance"] = True
result["next_phase"] = "DISCUSSING"
elif session["status"] == "DISCUSSING":
current_round = session["phases"]["discuss"]["rounds"]
if current_round >= 3:
result["complete"] = True
result["can_advance"] = True
result["next_phase"] = "VOTING"
elif session["status"] == "VOTING":
session["phases"]["vote"]["complete"] = len(votes)
if len(votes) >= 5:
result["complete"] = True
result["can_advance"] = True
result["next_phase"] = "RESOLVING"
# Save updated session
with open(session_file, "w") as f:
json.dump(session, f, indent=2)
return result
Advancing Board Phase
def advance_board_phase(bus_path: str, session_id: str) -> str:
"""Advance to next deliberation phase."""
board_path = f"{bus_path}/board"
session_file = f"{board_path}/session-{session_id}.json"
session = json.load(open(session_file))
transitions = {
"ASSESSING": "DISCUSSING",
"DISCUSSING": "VOTING",
"VOTING": "RESOLVING",
"RESOLVING": "COMPLETE"
}
current = session["status"]
next_phase = transitions.get(current, current)
session["status"] = next_phase
session["phases"][next_phase.lower().replace("ing", "")]["status"] = "IN_PROGRESS"
with open(session_file, "w") as f:
json.dump(session, f, indent=2)
return next_phase
Orchestrator Board Integration
Invoking Board from Orchestrator
async def invoke_board_meeting(
bus_path: str,
checkpoint: str,
proposal: str,
context: dict
) -> dict:
"""
Full 4-phase board deliberation.
Called by orchestrator at EVALUATE_PLAN or EVALUATE_EXECUTION checkpoints.
"""
# 1. Create session
session_id = create_board_session(bus_path, checkpoint, {
"proposal": proposal,
"context": context
})
# 2. Phase 1: ASSESS -- Dispatch all directors in parallel
director_prompts = {
"CA": f"Evaluate technical aspects: {proposal}",
"CPO": f"Evaluate product value: {proposal}",
"CSO": f"Evaluate security posture: {proposal}",
"COO": f"Evaluate operational feasibility: {proposal}",
"CXO": f"Evaluate user experience: {proposal}"
}
# Dispatch via parallel Task calls (see agent-factory)
assessments = await dispatch_board_directors(director_prompts, bus_path)
# Wait for all assessments
while check_board_phase_complete(bus_path, session_id)["complete"] == False:
await asyncio.sleep(5)
advance_board_phase(bus_path, session_id)
# 3. Phase 2: DISCUSS -- 3 rounds
for round_num in range(3):
await run_discussion_round(bus_path, session_id, round_num)
advance_board_phase(bus_path, session_id)
# 4. Phase 3: VOTE -- All directors vote
await dispatch_final_votes(bus_path, session_id)
while check_board_phase_complete(bus_path, session_id)["complete"] == False:
await asyncio.sleep(5)
advance_board_phase(bus_path, session_id)
# 5. Phase 4: RESOLVE
resolution = resolve_board_vote(bus_path)
return {
"session_id": session_id,
"verdict": resolution["verdict"],
"votes": resolution["vote_summary"],
"conditions": resolution["conditions"]
}
Quick Board Review (No Discussion)
async def invoke_board_review(bus_path: str, proposal: str) -> dict:
"""
Quick board review -- Phase 1 only, no discussion.
Used for execution quality checks or low-stakes decisions.
"""
session_id = create_board_session(bus_path, "QUICK_REVIEW", {
"proposal": proposal,
"quick_mode": True
})
# Dispatch all directors
await dispatch_board_directors(proposal, bus_path)
# Wait for assessments
while check_board_phase_complete(bus_path, session_id)["complete"] == False:
await asyncio.sleep(5)
# Aggregate assessments directly (skip discussion and vote)
board_path = f"{bus_path}/board"
assessments = json.load(open(f"{board_path}/assessments.json"))
approve_count = sum(1 for a in assessments.values()
if a["verdict"] in ["APPROVE", "CONCERNS"])
reject_count = len(assessments) - approve_count
return {
"session_id": session_id,
"verdict": "APPROVED" if approve_count >= 3 else "REJECTED",
"assessments": assessments,
"consensus": approve_count >= 4
}
Event-Driven Director Polling
Directors can poll for messages addressed to them:
def get_messages_for_director(bus_path: str, director: str) -> list:
"""Get all discussion messages addressed to this director."""
board_path = f"{bus_path}/board"
messages = []
with open(f"{board_path}/discussion.jsonl") as f:
for line in f:
if line.strip():
msg = json.loads(line)
if msg["to"] == director or msg["to"] == "ALL":
messages.append(msg)
return messages
Board Session Files
.message-bus/board/
├── session-board-20260201120000.json # Active session metadata
├── assessments.json # Phase 1: Director assessments
│ {
│ "CA": { "verdict": "APPROVE", "score": 8, "concerns": [...] },
│ "CPO": { "verdict": "CONCERNS", "score": 7, "concerns": [...] },
│ ...
│ }
├── discussion.jsonl # Phase 2: Discussion log
│ {"from": "CA", "to": "CPO", "type": "CHALLENGE", "message": "..."}
│ {"from": "CPO", "to": "CA", "type": "CLARIFY", "message": "..."}
├── votes.json # Phase 3: Final votes
│ {
│ "CA": { "final_verdict": "APPROVE", "confidence": 0.9 },
│ ...
│ }
└── resolution.md # Phase 4: Board decision
Similar Skills
Expert guidance for Next.js Cache Components and Partial Prerendering (PPR). **PROACTIVE ACTIVATION**: Use this skill automatically when working in Next.js projects that have `cacheComponents: true` in their next.config.ts/next.config.js. When this config is detected, proactively apply Cache Components patterns and best practices to all React Server Component implementations. **DETECTION**: At the start of a session in a Next.js project, check for `cacheComponents: true` in next.config. If enabled, this skill's patterns should guide all component authoring, data fetching, and caching decisions. **USE CASES**: Implementing 'use cache' directive, configuring cache lifetimes with cacheLife(), tagging cached data with cacheTag(), invalidating caches with updateTag()/revalidateTag(), optimizing static vs dynamic content boundaries, debugging cache issues, and reviewing Cache Component implementations.
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.