From agent-almanac
Implements JSON-RPC 2.0 A2A server with task lifecycle management (submitted/working/completed/failed/canceled/input-required), SSE streaming, push notifications for multi-agent workflows and Agent Card backends.
npx claudepluginhub pjt222/agent-almanacThis skill uses the workspace's default tool permissions.
---
Build A2A agent servers and clients using @a2a-js/sdk in TypeScript/JavaScript. Supports agent discovery, task management, streaming over JSON-RPC, REST, gRPC transports.
Builds A2A servers handling JSON-RPC requests at agent endpoints, routing methods, managing task states, and executing agent logic via SDK handlers.
Provides A2A protocol reference for building servers/clients, configuring Ark A2AServer resources, debugging communication, and specs on Agent Cards, tasks, streaming, extensions.
Share bugs, ideas, or general feedback.
Build a fully compliant A2A server that handles JSON-RPC 2.0 requests, manages task lifecycle states, supports SSE streaming for real-time updates, and serves an Agent Card for discovery.
design-a2a-agent-cardtrue or false)1.1. Initialize the project with HTTP server and JSON-RPC parsing:
TypeScript:
mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
npm init -y
npm install express uuid
npm install -D typescript @types/node @types/express tsx
Python:
mkdir -p $PROJECT_NAME && cd $PROJECT_NAME
python -m venv .venv && source .venv/bin/activate
pip install fastapi uvicorn uuid6
1.2. Create the JSON-RPC 2.0 request handler:
interface JsonRpcRequest {
jsonrpc: "2.0";
id: string | number;
method: string;
params?: Record<string, unknown>;
}
interface JsonRpcResponse {
jsonrpc: "2.0";
id: string | number;
result?: unknown;
error?: { code: number; message: string; data?: unknown };
}
function handleJsonRpc(request: JsonRpcRequest): JsonRpcResponse {
switch (request.method) {
case "tasks/send":
return handleTaskSend(request);
case "tasks/get":
return handleTaskGet(request);
case "tasks/cancel":
return handleTaskCancel(request);
case "tasks/sendSubscribe":
// Handled separately via SSE
throw new Error("Use SSE endpoint for sendSubscribe");
default:
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32601, message: `Method not found: ${request.method}` },
};
}
}
1.3. Mount the JSON-RPC handler on a POST endpoint (typically /):
app.post("/", (req, res) => {
const response = handleJsonRpc(req.body);
res.json(response);
});
1.4. Serve the Agent Card at /.well-known/agent.json:
app.get("/.well-known/agent.json", (req, res) => {
res.json(agentCard);
});
Expected: An HTTP server that accepts JSON-RPC 2.0 requests and serves the Agent Card.
On failure: If JSON-RPC parsing fails, validate that the request body has jsonrpc, method, and id fields. Return -32700 (Parse error) for malformed JSON and -32600 (Invalid Request) for missing required fields.
2.1. Define the task model with all A2A lifecycle states:
type TaskState =
| "submitted"
| "working"
| "input-required"
| "completed"
| "failed"
| "canceled";
interface Task {
id: string;
sessionId: string;
status: {
state: TaskState;
message?: Message;
timestamp: string;
};
history?: TaskStatus[];
artifacts?: Artifact[];
metadata?: Record<string, unknown>;
}
interface Message {
role: "user" | "agent";
parts: Part[];
}
type Part =
| { type: "text"; text: string }
| { type: "file"; file: { name: string; mimeType: string; bytes?: string; uri?: string } }
| { type: "data"; data: Record<string, unknown> };
2.2. Implement state transition rules:
submitted -> working | failed | canceled
working -> completed | failed | canceled | input-required
input-required -> working | failed | canceled
completed -> (terminal)
failed -> (terminal)
canceled -> (terminal)
2.3. Create a task store with CRUD operations:
class TaskStore {
private tasks: Map<string, Task> = new Map();
create(sessionId: string, message: Message): Task { ... }
get(taskId: string): Task | undefined { ... }
updateStatus(taskId: string, state: TaskState, message?: Message): Task { ... }
addArtifact(taskId: string, artifact: Artifact): void { ... }
cancel(taskId: string): Task { ... }
}
2.4. If stateTransitionHistory is enabled in the Agent Card, append each status change to the task's history array with timestamps.
Expected: A task store that enforces valid state transitions and maintains history.
On failure: If an invalid state transition is attempted (e.g., completed to working), return a JSON-RPC error with code -32002 and a descriptive message. Never silently ignore invalid transitions.
3.1. Implement tasks/send — the primary method for submitting tasks:
function handleTaskSend(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId, sessionId, message } = request.params as TaskSendParams;
// Create or resume task
let task = taskStore.get(taskId);
if (!task) {
task = taskStore.create(sessionId, message);
} else if (task.status.state === "input-required") {
taskStore.updateStatus(task.id, "working");
}
// Route to skill handler based on message content
const skill = matchSkill(message);
if (!skill) {
taskStore.updateStatus(task.id, "failed", {
role: "agent",
parts: [{ type: "text", text: "No matching skill for this request." }],
});
return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}
// Execute skill (async — task will transition to working, then completed/failed)
executeSkill(skill, task, message).catch((error) => {
taskStore.updateStatus(task.id, "failed", {
role: "agent",
parts: [{ type: "text", text: error.message }],
});
});
return { jsonrpc: "2.0", id: request.id, result: taskStore.get(task.id) };
}
3.2. Implement tasks/get — retrieve task status and artifacts:
function handleTaskGet(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId, historyLength } = request.params as TaskGetParams;
const task = taskStore.get(taskId);
if (!task) {
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32001, message: `Task not found: ${taskId}` },
};
}
// Optionally trim history to requested length
const result = historyLength !== undefined
? { ...task, history: task.history?.slice(-historyLength) }
: task;
return { jsonrpc: "2.0", id: request.id, result };
}
3.3. Implement tasks/cancel:
function handleTaskCancel(request: JsonRpcRequest): JsonRpcResponse {
const { id: taskId } = request.params as TaskCancelParams;
try {
const task = taskStore.cancel(taskId);
return { jsonrpc: "2.0", id: request.id, result: task };
} catch (error) {
return {
jsonrpc: "2.0",
id: request.id,
error: { code: -32002, message: (error as Error).message },
};
}
}
Expected: Working tasks/send, tasks/get, and tasks/cancel methods that correctly manage task lifecycle.
On failure: If skill matching fails, return the task in failed state with a descriptive message. If the task store is full, return -32003 (resource exhausted).
4.1. Create an SSE endpoint for streaming task updates:
app.post("/subscribe", (req, res) => {
const request = req.body as JsonRpcRequest;
if (request.method !== "tasks/sendSubscribe") {
res.status(400).json({ error: "Only tasks/sendSubscribe supported" });
return;
}
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const { id: taskId, sessionId, message } = request.params as TaskSendParams;
let task = taskStore.get(taskId) ?? taskStore.create(sessionId, message);
// Send initial status
sendSSEEvent(res, "status", {
id: request.id,
result: { id: task.id, status: task.status },
});
// Subscribe to task updates
const unsubscribe = taskStore.onUpdate(task.id, (updatedTask) => {
if (updatedTask.status.state === "working") {
sendSSEEvent(res, "status", {
id: request.id,
result: { id: updatedTask.id, status: updatedTask.status },
});
}
if (updatedTask.artifacts?.length) {
sendSSEEvent(res, "artifact", {
id: request.id,
result: { id: updatedTask.id, artifact: updatedTask.artifacts.at(-1) },
});
}
// Close stream on terminal states
if (["completed", "failed", "canceled"].includes(updatedTask.status.state)) {
sendSSEEvent(res, "status", {
id: request.id,
result: { id: updatedTask.id, status: updatedTask.status, final: true },
});
unsubscribe();
res.end();
}
});
// Handle client disconnect
req.on("close", () => {
unsubscribe();
});
});
function sendSSEEvent(res: Response, event: string, data: unknown): void {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}
4.2. Add an event emitter or pub/sub mechanism to the task store:
class TaskStore {
private listeners: Map<string, Set<(task: Task) => void>> = new Map();
onUpdate(taskId: string, callback: (task: Task) => void): () => void {
if (!this.listeners.has(taskId)) {
this.listeners.set(taskId, new Set());
}
this.listeners.get(taskId)!.add(callback);
return () => this.listeners.get(taskId)?.delete(callback);
}
private notifyListeners(taskId: string): void {
const task = this.get(taskId);
if (task) {
this.listeners.get(taskId)?.forEach((cb) => cb(task));
}
}
}
4.3. Emit events from all task state transitions and artifact additions.
Expected: SSE streaming that sends real-time status and artifact events as the task progresses.
On failure: If SSE connection drops, the client should be able to reconnect and use tasks/get to retrieve the current state. Ensure the task store does not depend on active SSE connections.
5.1. If pushNotifications is enabled in the Agent Card, implement webhook registration via tasks/pushNotification/set:
PushNotificationConfig with url (HTTPS required), optional token, and events array (["status", "artifact"])-32004 otherwise5.2. Send webhook callbacks on task state changes:
taskId, eventType, status, and timestamp to the webhook URLAuthorization: Bearer <token> header if a token was provided5.3. Implement retry logic for failed webhooks (exponential backoff, max 3 retries).
5.4. Add tasks/pushNotification/get to retrieve the current push config for a task.
Expected: Webhook registration and delivery with retry logic.
On failure: Push notification failures must never affect task execution. Log errors and continue. If the webhook URL is persistently unreachable, remove the subscription after max retries.
6.1. Load and serve the Agent Card at startup:
agent-card.json and validate capabilities match implementationstreaming: true but SSE is not enabledpushNotifications: true but webhooks are not enabled6.2. Add CORS headers for cross-origin Agent Card discovery:
Access-Control-Allow-Origin: * on /.well-known/agent.jsonGET and OPTIONS methods6.3. Add authentication middleware matching the Agent Card's scheme:
/.well-known/agent.json (Agent Card is always public)Authorization header or API key-32000 for unauthorized requests6.4. Start the server and verify end-to-end:
# Start server
npm run dev
# Fetch Agent Card
curl -s http://localhost:3000/.well-known/agent.json | python3 -m json.tool
# Send a task
curl -X POST http://localhost:3000/ \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tasks/send","params":{"id":"task-1","sessionId":"session-1","message":{"role":"user","parts":[{"type":"text","text":"Analyze my dataset"}]}}}'
Expected: A running A2A server that serves its Agent Card, accepts tasks, and manages their full lifecycle.
On failure: If the Agent Card capabilities do not match the implementation, the startup validation from 6.1 will catch the mismatch. Fix the implementation or update the Agent Card to match.
/.well-known/agent.jsontasks/send creates tasks and transitions them through the lifecycletasks/get retrieves task status and artifactstasks/cancel moves tasks to the canceled statejsonrpc: "2.0" and correct id-32700 (parse error), -32600 (invalid request), -32601 (method not found), and custom codes for domain errors.req.on("close") to detect disconnects.submitted or working state immediately, then update via events.completed, failed, or canceled, no further state transitions are allowed. Guard against this in the state machine.design-a2a-agent-card - design the Agent Card this server implementstest-a2a-interop - validate the server against A2A conformance testsbuild-custom-mcp-server - MCP server patterns that inform A2A implementationscaffold-mcp-server - scaffolding patterns applicable to A2A server setupconfigure-ingress-networking - production deployment with TLS and routing