From vercel
Provides expert guidance for Vercel Workflow DevKit when building durable workflows, long-running tasks, API routes, or agents needing pause/resume, retries, step execution, or crash-safe orchestration.
npx claudepluginhub vercel/vercel-plugin --plugin vercel-pluginThis skill uses the workspace's default tool permissions.
Your knowledge of `workflow` is outdated.
Creates durable, resumable workflows using Vercel's Workflow SDK. Use for surviving restarts, pausing on external events, retrying failures, or coordinating multi-step operations over time.
Builds durable, resumable TypeScript workflows with Vercel Workflow DevKit for long-running processes, AI agents, background jobs, multi-step pipelines, webhooks, and event-driven systems.
Implements durable multi-step workflows on Cloudflare Workers with retries, state persistence, sleeps, event waiting, and NonRetryableError handling. Use for long-running tasks.
Share bugs, ideas, or general feedback.
workflow DocumentationYour knowledge of workflow is outdated.
The workflow documentation outlined below matches the installed version of the Workflow DevKit.
Follow these instructions before starting on any workflow-related tasks:
Search the bundled documentation in node_modules/workflow/docs/:
glob "node_modules/workflow/docs/**/*.mdx"grep "your query" node_modules/workflow/docs/Documentation structure in node_modules/workflow/docs/:
getting-started/ - Framework setup (next.mdx, express.mdx, hono.mdx, etc.)foundations/ - Core concepts (workflows-and-steps.mdx, hooks.mdx, streaming.mdx, etc.)api-reference/workflow/ - API docs (sleep.mdx, create-hook.mdx, fatal-error.mdx, etc.)api-reference/workflow-api/ - Client API (start.mdx, get-run.mdx, resume-hook.mdx, etc.)ai/ - AI SDK integration docserrors/ - Error code documentationRelated packages also include bundled docs:
@workflow/ai: node_modules/@workflow/ai/docs/ - DurableAgent and AI integration@workflow/core: node_modules/@workflow/core/docs/ - Core runtime (foundations, how-it-works)@workflow/next: node_modules/@workflow/next/docs/ - Next.js integrationWhen in doubt, update to the latest version of the Workflow DevKit.
Directives:
"use workflow"; // First line - makes async function durable
"use step"; // First line - makes function a cached, retryable unit
Essential imports:
// Workflow primitives
import { sleep, fetch, createHook, createWebhook, getWritable } from "workflow";
import { FatalError, RetryableError } from "workflow";
import { getWorkflowMetadata, getStepMetadata } from "workflow";
// API operations
import { start, getRun, resumeHook, resumeWebhook } from "workflow/api";
// Framework integrations
import { withWorkflow } from "workflow/next";
import { workflow } from "workflow/vite";
import { workflow } from "workflow/astro";
// Or use modules: ["workflow/nitro"] for Nitro/Nuxt
// AI agent
import { DurableAgent } from "@workflow/ai/agent";
"use workflow" functions run in a sandboxed VM. "use step" functions have full Node.js access. Put your logic in steps and use the workflow function purely for orchestration.
// Steps have full Node.js and npm access
async function fetchUserData(userId: string) {
"use step";
const response = await fetch(`https://api.example.com/users/${userId}`);
return response.json();
}
async function processWithAI(data: any) {
"use step";
// AI SDK works in steps without workarounds
return await generateText({
model: openai("gpt-4"),
prompt: `Process: ${JSON.stringify(data)}`,
});
}
// Workflow orchestrates steps - no sandbox issues
export async function dataProcessingWorkflow(userId: string) {
"use workflow";
const data = await fetchUserData(userId);
const processed = await processWithAI(data);
return { success: true, processed };
}
Benefits: Steps have automatic retry, results are persisted for replay, and no sandbox restrictions.
When you need logic directly in a workflow function (not in a step), these restrictions apply:
| Limitation | Workaround |
|---|---|
No fetch() | import { fetch } from "workflow" then globalThis.fetch = fetch |
No setTimeout/setInterval | Use sleep("5s") from "workflow" |
| No Node.js modules (fs, crypto, etc.) | Move to a step function |
Example - Using fetch in workflow context:
import { fetch } from "workflow";
export async function myWorkflow() {
"use workflow";
globalThis.fetch = fetch; // Required for AI SDK and HTTP libraries
// Now generateText() and other libraries work
}
Note: DurableAgent from @workflow/ai handles the fetch assignment automatically.
Use DurableAgent to build AI agents that maintain state and survive interruptions. It handles the workflow sandbox automatically (no manual globalThis.fetch needed).
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";
async function lookupData({ query }: { query: string }) {
"use step";
// Step functions have full Node.js access
return `Results for "${query}"`;
}
export async function myAgentWorkflow(userMessage: string) {
"use workflow";
const agent = new DurableAgent({
model: "anthropic/claude-sonnet-4-5",
system: "You are a helpful assistant.",
tools: {
lookupData: {
description: "Search for information",
inputSchema: z.object({ query: z.string() }),
execute: lookupData,
},
},
});
const result = await agent.stream({
messages: [{ role: "user", content: userMessage }],
writable: getWritable<UIMessageChunk>(),
maxSteps: 10,
});
return result.messages;
}
Key points:
getWritable<UIMessageChunk>() streams output to the workflow run's default streamexecute functions that need Node.js/npm access should use "use step"execute functions that use workflow primitives (sleep(), createHook()) should NOT use "use step" — they run at the workflow levelmaxSteps limits the number of LLM calls (default is unlimited)result.messages plus new user messages to subsequent agent.stream() callsFor more details on DurableAgent, check the AI docs in node_modules/@workflow/ai/docs/.
Use start() to launch workflows from API routes. start() cannot be called directly in workflow context — wrap it in a step function.
import { start } from "workflow/api";
// From an API route — works directly
export async function POST() {
const run = await start(myWorkflow, [arg1, arg2]);
return Response.json({ runId: run.runId });
}
// No-args workflow
const run = await start(noArgWorkflow);
Starting child workflows from inside a workflow — must use a step:
import { start } from "workflow/api";
// Wrap start() in a step function
async function triggerChild(data: string) {
"use step";
const run = await start(childWorkflow, [data]);
return run.runId;
}
export async function parentWorkflow() {
"use workflow";
const childRunId = await triggerChild("some data"); // Fire-and-forget via step
await sleep("1h");
}
start() returns immediately — it doesn't wait for the workflow to complete. Use run.returnValue to await completion.
Hooks let workflows wait for external data. Use createHook() inside a workflow and resumeHook() from API routes. Deterministic tokens are for createHook() + resumeHook() (server-side) only. createWebhook() always generates random tokens — do not pass a token option to createWebhook().
import { createHook } from "workflow";
export async function approvalWorkflow() {
"use workflow";
const hook = createHook<{ approved: boolean }>({
token: "approval-123", // deterministic token for external systems
});
const result = await hook; // Workflow suspends here
return result.approved;
}
Hooks implement AsyncIterable — use for await...of to receive multiple events:
import { createHook } from "workflow";
export async function chatWorkflow(channelId: string) {
"use workflow";
const hook = createHook<{ text: string; done?: boolean }>({
token: `chat-${channelId}`,
});
for await (const event of hook) {
await processMessage(event.text);
if (event.done) break;
}
}
Each resumeHook(token, payload) call delivers the next value to the loop.
import { resumeHook } from "workflow/api";
export async function POST(req: Request) {
const { token, data } = await req.json();
await resumeHook(token, data);
return new Response("ok");
}
Use FatalError for permanent failures (no retry), RetryableError for transient failures:
import { FatalError, RetryableError } from "workflow";
if (res.status >= 400 && res.status < 500) {
throw new FatalError(`Client error: ${res.status}`);
}
if (res.status === 429) {
throw new RetryableError("Rate limited", { retryAfter: "5m" });
}
All data passed to/from workflows and steps must be serializable.
Supported types: string, number, boolean, null, undefined, bigint, plain objects, arrays, Date, RegExp, URL, URLSearchParams, Map, Set, Headers, ArrayBuffer, typed arrays, Request, Response, ReadableStream, WritableStream.
Not supported: Functions, class instances, Symbols, WeakMap/WeakSet. Pass data, not callbacks.
Use getWritable() to stream data from workflows. getWritable() can be called in both workflow and step contexts, but you cannot interact with the stream (call getWriter(), write(), close()) directly in a workflow function. The stream must be passed to step functions for actual I/O, or steps can call getWritable() themselves.
Get the stream in a workflow, pass it to a step:
import { getWritable } from "workflow";
export async function myWorkflow() {
"use workflow";
const writable = getWritable();
await writeData(writable, "hello world");
}
async function writeData(writable: WritableStream, chunk: string) {
"use step";
const writer = writable.getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
Call getWritable() directly inside a step (no need to pass it):
import { getWritable } from "workflow";
async function streamData(chunk: string) {
"use step";
const writer = getWritable().getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
Use getWritable({ namespace: 'name' }) to create multiple independent streams for different types of data. This is useful for separating logs from primary output, different log levels, agent outputs, metrics, or any distinct data channels. Long-running workflows benefit from namespaced streams because you can replay only the important events (e.g., final results) while keeping verbose logs in a separate stream.
Example: Log levels and agent output separation:
import { getWritable } from "workflow";
type LogEntry = { level: "debug" | "info" | "warn" | "error"; message: string; timestamp: number };
type AgentOutput = { type: "thought" | "action" | "result"; content: string };
async function logDebug(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:debug" }).getWriter();
try {
await writer.write({ level: "debug", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function logInfo(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:info" }).getWriter();
try {
await writer.write({ level: "info", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function emitAgentThought(thought: string) {
"use step";
const writer = getWritable<AgentOutput>({ namespace: "agent:thoughts" }).getWriter();
try {
await writer.write({ type: "thought", content: thought });
} finally {
writer.releaseLock();
}
}
async function emitAgentResult(result: string) {
"use step";
// Important results go to the default stream for easy replay
const writer = getWritable<AgentOutput>().getWriter();
try {
await writer.write({ type: "result", content: result });
} finally {
writer.releaseLock();
}
}
export async function agentWorkflow(task: string) {
"use workflow";
await logInfo(`Starting task: ${task}`);
await logDebug("Initializing agent context");
await emitAgentThought("Analyzing the task requirements...");
// ... agent processing ...
await emitAgentResult("Task completed successfully");
await logInfo("Workflow finished");
}
Consuming namespaced streams:
import { start, getRun } from "workflow/api";
import { agentWorkflow } from "./workflows/agent";
export async function POST(request: Request) {
const run = await start(agentWorkflow, ["process data"]);
// Access specific streams by namespace
const results = run.getReadable({ namespace: undefined }); // Default stream (important results)
const infoLogs = run.getReadable({ namespace: "logs:info" });
const debugLogs = run.getReadable({ namespace: "logs:debug" });
const thoughts = run.getReadable({ namespace: "agent:thoughts" });
// Return only important results for most clients
return new Response(results, { headers: { "Content-Type": "application/json" } });
}
// Resume from a specific point (useful for long sessions)
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const runId = searchParams.get("runId")!;
const startIndex = parseInt(searchParams.get("startIndex") || "0", 10);
const run = getRun(runId);
// Resume only the important stream, skip verbose debug logs
const stream = run.getReadable({ startIndex });
return new Response(stream);
}
Pro tip: For very long-running sessions (50+ minutes), namespaced streams help manage replay performance. Put verbose/debug output in separate namespaces so you can replay just the important events quickly.
# Check workflow endpoints are reachable
npx workflow health
npx workflow health --port 3001 # Non-default port
# Visual dashboard for runs
npx workflow web
npx workflow web <run_id>
# CLI inspection (use --json for machine-readable output, --help for full usage)
npx workflow inspect runs
npx workflow inspect run <run_id>
# For Vercel-deployed projects, specify backend and project
npx workflow inspect runs --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect run <run_id> --backend vercel --project <project-name> --team <team-slug>
# Open Vercel dashboard in browser for a specific run
npx workflow inspect run <run_id> --web
npx workflow web <run_id> --backend vercel --project <project-name> --team <team-slug>
# Cancel a running workflow
npx workflow cancel <run_id>
npx workflow cancel <run_id> --backend vercel --project <project-name> --team <team-slug>
# --env defaults to "production"; use --env preview for preview deployments
Debugging tips:
--json (-j) on any command for machine-readable output--web to open the Vercel Observability dashboard in your browser--help on any command for full usage detailsWorkflow DevKit provides a Vitest plugin for testing workflows in-process — no running server required.
Unit testing steps: Steps are just functions; without the compiler, "use step" is a no-op. Test them directly:
import { describe, it, expect } from "vitest";
import { createUser } from "./user-signup";
describe("createUser step", () => {
it("should create a user", async () => {
const user = await createUser("test@example.com");
expect(user.email).toBe("test@example.com");
});
});
Integration testing: Use @workflow/vitest for workflows using sleep(), hooks, webhooks, or retries:
// vitest.integration.config.ts
import { defineConfig } from "vitest/config";
import { workflow } from "@workflow/vitest";
export default defineConfig({
plugins: [workflow()],
test: {
include: ["**/*.integration.test.ts"],
testTimeout: 60_000,
},
});
// approval.integration.test.ts
import { describe, it, expect } from "vitest";
import { start, getRun, resumeHook } from "workflow/api";
import { waitForHook, waitForSleep } from "@workflow/vitest";
import { approvalWorkflow } from "./approval";
describe("approvalWorkflow", () => {
it("should publish when approved", async () => {
const run = await start(approvalWorkflow, ["doc-123"]);
// Wait for the hook, then resume it
await waitForHook(run, { token: "approval:doc-123" });
await resumeHook("approval:doc-123", { approved: true, reviewer: "alice" });
// Wait for sleep, then wake it up
const sleepId = await waitForSleep(run);
await getRun(run.runId).wakeUp({ correlationIds: [sleepId] });
const result = await run.returnValue;
expect(result).toEqual({ status: "published", reviewer: "alice" });
});
});
Testing webhooks: Use resumeWebhook() with a Request object — no HTTP server needed:
import { start, resumeWebhook } from "workflow/api";
import { waitForHook } from "@workflow/vitest";
const run = await start(ingestWorkflow, ["ep-1"]);
const hook = await waitForHook(run); // Discovers the random webhook token
await resumeWebhook(hook.token, new Request("https://example.com/webhook", {
method: "POST",
body: JSON.stringify({ event: "order.created" }),
}));
Key APIs:
start() — trigger a workflowrun.returnValue — await workflow completionwaitForHook(run, { token? }) / waitForSleep(run) — wait for workflow to reach a pause pointresumeHook(token, data) / resumeWebhook(token, request) — resume paused workflowsgetRun(runId).wakeUp({ correlationIds }) — skip sleep() callsBest practices:
workflow() plugin) in separate configstestTimeout — workflows may run longer than typical unit testsvi.mock() does not work in integration tests — step dependencies are bundled by esbuild