vercel-workflow
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseWorkflow Best Practices
Workflow最佳实践
Enforces proper usage of Vercel Workflow DevKit patterns for durable, resumable workflows.
确保遵循Vercel Workflow DevKit的模式,以实现可持久化、可恢复的工作流。
Core Concepts
核心概念
Directives
指令
Two fundamental directives define execution context:
"use workflow"typescript
export async function processOrder(orderId: string) {
'use workflow';
const order = await fetchOrder(orderId); // Step
await sleep('1h'); // Suspend
return await chargePayment(order); // Step
}"use step"typescript
async function fetchOrder(orderId: string) {
'use step';
// Full Node.js access: database, APIs, file I/O
return await db.orders.findUnique({ where: { id: orderId } });
}Critical Rules:
- Workflow functions must be deterministic - same inputs always produce same outputs
- Workflows run in a sandboxed environment without Node.js API access
- Steps have full runtime access and automatic retries on failure
- All parameters must be serializable (no functions, class instances, closures)
两个基础指令定义执行上下文:
"use workflow"typescript
export async function processOrder(orderId: string) {
'use workflow';
const order = await fetchOrder(orderId); // Step
await sleep('1h'); // Suspend
return await chargePayment(order); // Step
}"use step"typescript
async function fetchOrder(orderId: string) {
'use step';
// 完整Node.js访问权限:数据库、API、文件I/O
return await db.orders.findUnique({ where: { id: orderId } });
}关键规则:
- 工作流函数必须是确定性的 - 相同输入始终产生相同输出
- 工作流在沙箱环境中运行,无Node.js API访问权限
- 步骤拥有完整运行时访问权限,失败时会自动重试
- 所有参数必须是可序列化的(不能是函数、类实例、闭包)
Execution Model
执行模型
Workflows suspend and resume through:
- Step calls - Workflow yields while step executes
- - Pause for duration without consuming resources
sleep() - Hooks/Webhooks - Wait for external events
During replay, workflows re-execute using cached step results from the event log.
工作流通过以下方式暂停和恢复:
- 步骤调用 - 工作流在步骤执行时让出控制权
- - 暂停指定时长,不消耗资源
sleep() - 钩子/Webhook - 等待外部事件
重放期间,工作流会使用事件日志中缓存的步骤结果重新执行。
Structure Patterns
结构模式
Organization
组织方式
workflows/{feature-name}/
├── index.ts # Workflow orchestration
├── steps/ # Step functions
│ ├── {action}.ts
│ └── ...
└── hooks/ # Hook definitions
└── {event}.tsworkflows/{feature-name}/
├── index.ts # 工作流编排入口
├── steps/ # 步骤函数
│ ├── {action}.ts
│ └── ...
└── hooks/ # 钩子定义
└── {event}.tsWorkflow Function
工作流函数
typescript
import { sleep, createHook } from 'workflow';
import { processData } from './steps/process-data';
import { sendEmail } from './steps/send-email';
export async function myWorkflow(userId: string) {
'use workflow';
const result = await processData(userId);
await sleep('5m');
await sendEmail({ userId, result });
return { status: 'completed', result };
}Rules:
- ONE workflow export per file (main entry point)
- Orchestrate steps - don't do work directly
- Use language primitives: ,
Promise.all,for...oftry/catch - NO Node.js APIs: ,
fs,http,cryptoprocess - NO side effects: database calls, API requests, mutations
- Parameters and returns must be serializable
typescript
import { sleep, createHook } from 'workflow';
import { processData } from './steps/process-data';
import { sendEmail } from './steps/send-email';
export async function myWorkflow(userId: string) {
'use workflow';
const result = await processData(userId);
await sleep('5m');
await sendEmail({ userId, result });
return { status: 'completed', result };
}规则:
- 每个文件仅导出一个工作流(主入口点)
- 仅协调步骤,不直接执行业务逻辑
- 使用语言原语:、
Promise.all、for...oftry/catch - 禁止使用Node.js API:、
fs、http、cryptoprocess - 禁止产生副作用:数据库调用、API请求、数据变更
- 参数和返回值必须可序列化
Step Functions
步骤函数
typescript
type ProcessDataArgs = {
userId: string;
options?: { retry?: boolean };
};
export async function processData(params: ProcessDataArgs) {
'use step';
// Full Node.js access
const user = await db.users.findUnique({ where: { id: params.userId } });
const result = await externalApi.process(user);
return { processed: true, data: result };
}Rules:
- ONE step per file is preferred for clarity
- Use typed parameters (object or single value)
- Return serializable values only
- Mutations happen here - not in workflows
- Can throw errors for automatic retry
- Use for idempotency keys
getStepMetadata()
typescript
type ProcessDataArgs = {
userId: string;
options?: { retry?: boolean };
};
export async function processData(params: ProcessDataArgs) {
'use step';
// 完整Node.js访问权限
const user = await db.users.findUnique({ where: { id: params.userId } });
const result = await externalApi.process(user);
return { processed: true, data: result };
}规则:
- 为了清晰性,建议每个文件仅包含一个步骤
- 使用类型化参数(对象或单一值)
- 仅返回可序列化的值
- 数据变更应在此处执行,而非工作流中
- 可抛出错误以触发自动重试
- 使用获取幂等键
getStepMetadata()
Error Handling
错误处理
Automatic Retries
自动重试
Steps retry automatically (default: 3 attempts):
typescript
async function fetchData(url: string) {
'use step';
// Throws Error - will retry
const response = await fetch(url);
if (!response.ok) throw new Error('Fetch failed');
return response.json();
}步骤会自动重试(默认:3次尝试):
typescript
async function fetchData(url: string) {
'use step';
// 抛出错误 - 会触发重试
const response = await fetch(url);
if (!response.ok) throw new Error('Fetch failed');
return response.json();
}Fatal Errors (No Retry)
致命错误(不重试)
typescript
import { FatalError } from 'workflow';
async function validateUser(userId: string) {
'use step';
if (!userId) {
// Don't retry invalid input
throw new FatalError('User ID is required');
}
return await db.users.findUnique({ where: { id: userId } });
}typescript
import { FatalError } from 'workflow';
async function validateUser(userId: string) {
'use step';
if (!userId) {
// 无效输入,不重试
throw new FatalError('User ID is required');
}
return await db.users.findUnique({ where: { id: userId } });
}Retryable with Delay
带延迟的可重试错误
typescript
import { RetryableError } from 'workflow';
async function callRateLimitedApi() {
'use step';
const response = await fetch('https://api.example.com');
if (response.status === 429) {
// Retry after 10 seconds
throw new RetryableError('Rate limited', { delay: '10s' });
}
return response.json();
}typescript
import { RetryableError } from 'workflow';
async function callRateLimitedApi() {
'use step';
const response = await fetch('https://api.example.com');
if (response.status === 429) {
// 10秒后重试
throw new RetryableError('Rate limited', { delay: '10s' });
}
return response.json();
}Workflow Error Handling
工作流级错误处理
typescript
export async function resilientWorkflow(orderId: string) {
'use workflow';
try {
const order = await fetchOrder(orderId);
await processPayment(order);
} catch (error) {
// Log and handle at workflow level
await logError({ orderId, error: String(error) });
throw error; // Workflow will fail
}
}typescript
export async function resilientWorkflow(orderId: string) {
'use workflow';
try {
const order = await fetchOrder(orderId);
await processPayment(order);
} catch (error) {
// 在工作流层面记录并处理
await logError({ orderId, error: String(error) });
throw error; // 工作流将失败
}
}Serialization
序列化
Allowed Types
允许的类型
Primitives, objects, arrays, Date, URL, Headers, Request, Response, ReadableStream, WritableStream.
基本类型、对象、数组、Date、URL、Headers、Request、Response、ReadableStream、WritableStream。
Pass-by-Value
按值传递
Parameters are copied, not referenced:
typescript
// ❌ WRONG - mutations not visible
export async function badWorkflow() {
'use workflow';
let counter = 0;
await updateCounter(counter);
console.log(counter); // Still 0!
}
async function updateCounter(count: number) {
'use step';
count++; // Only mutates the copy
}typescript
// ✅ CORRECT - return modified values
export async function goodWorkflow() {
'use workflow';
let counter = 0;
counter = await updateCounter(counter);
console.log(counter); // 1
}
async function updateCounter(count: number) {
'use step';
return count + 1;
}参数是复制的,而非引用传递:
typescript
// ❌ 错误 - 变更不可见
export async function badWorkflow() {
'use workflow';
let counter = 0;
await updateCounter(counter);
console.log(counter); // 仍然是0!
}
async function updateCounter(count: number) {
'use step';
count++; // 仅修改副本
}typescript
// ✅ 正确 - 返回修改后的值
export async function goodWorkflow() {
'use workflow';
let counter = 0;
counter = await updateCounter(counter);
console.log(counter); // 1
}
async function updateCounter(count: number) {
'use step';
return count + 1;
}Forbidden Types
禁止的类型
NO functions, class instances, symbols, WeakMaps, closures:
typescript
// ❌ WRONG
async function badStep(callback: () => void) {
'use step';
callback(); // ERROR: Cannot serialize functions
}
// ✅ CORRECT - use configuration
type Config = { shouldLog: boolean };
async function goodStep(config: Config) {
'use step';
if (config.shouldLog) console.log('Done');
}不能是函数、类实例、符号、WeakMap、闭包:
typescript
// ❌ 错误
async function badStep(callback: () => void) {
'use step';
callback(); // 错误:无法序列化函数
}
// ✅ 正确 - 使用配置参数
type Config = { shouldLog: boolean };
async function goodStep(config: Config) {
'use step';
if (config.shouldLog) console.log('Done');
}Hooks & Webhooks
钩子 & Webhook
Type-Safe Hooks
类型安全的钩子
typescript
import { defineHook } from 'workflow';
import { z } from 'zod';
const approvalHook = defineHook({
schema: z.object({
approved: z.boolean(),
approvedBy: z.string(),
comment: z.string(),
}),
});
export async function documentWorkflow(docId: string) {
'use workflow';
const hook = approvalHook.create({
token: `approval:${docId}`,
});
const result = await hook;
return result.approved ? 'approved' : 'rejected';
}typescript
import { defineHook } from 'workflow';
import { z } from 'zod';
const approvalHook = defineHook({
schema: z.object({
approved: z.boolean(),
approvedBy: z.string(),
comment: z.string(),
}),
});
export async function documentWorkflow(docId: string) {
'use workflow';
const hook = approvalHook.create({
token: `approval:${docId}`,
});
const result = await hook;
return result.approved ? 'approved' : 'rejected';
}Iterating Over Events
遍历事件
typescript
import { createHook } from 'workflow';
export async function monitoringWorkflow(channelId: string) {
'use workflow';
const hook = createHook<{ message: string }>({
token: `messages:${channelId}`,
});
for await (const event of hook) {
await processMessage(event.message);
if (event.message === 'stop') break;
}
}typescript
import { createHook } from 'workflow';
export async function monitoringWorkflow(channelId: string) {
'use workflow';
const hook = createHook<{ message: string }>({
token: `messages:${channelId}`,
});
for await (const event of hook) {
await processMessage(event.message);
if (event.message === 'stop') break;
}
}Webhooks
Webhook
typescript
import { createWebhook } from 'workflow';
export async function paymentWorkflow(orderId: string) {
'use workflow';
const webhook = createWebhook({
respondWith: new Response('Payment received', { status: 200 }),
});
await sendPaymentLink({ orderId, webhookUrl: webhook.url });
const request = await webhook;
const payload = await request.json();
return { paid: true, transactionId: payload.id };
}typescript
import { createWebhook } from 'workflow';
export async function paymentWorkflow(orderId: string) {
'use workflow';
const webhook = createWebhook({
respondWith: new Response('Payment received', { status: 200 }),
});
await sendPaymentLink({ orderId, webhookUrl: webhook.url });
const request = await webhook;
const payload = await request.json();
return { paid: true, transactionId: payload.id };
}Streaming
流式传输
Writing to Streams (Steps Only)
写入流(仅步骤中允许)
typescript
import { getWritable } from 'workflow';
export async function progressWorkflow() {
'use workflow';
const writable = getWritable<{ progress: number }>();
await processWithProgress(writable);
await finalizeStream(writable);
}
async function processWithProgress(writable: WritableStream) {
'use step';
const writer = writable.getWriter();
try {
for (let i = 0; i <= 100; i += 10) {
await writer.write({ progress: i });
await new Promise(resolve => setTimeout(resolve, 100));
}
} finally {
writer.releaseLock();
}
}
async function finalizeStream(writable: WritableStream) {
'use step';
await writable.close();
}Critical: Workflows can GET streams but NOT interact with them. Steps must do all writing/closing.
typescript
import { getWritable } from 'workflow';
export async function progressWorkflow() {
'use workflow';
const writable = getWritable<{ progress: number }>();
await processWithProgress(writable);
await finalizeStream(writable);
}
async function processWithProgress(writable: WritableStream) {
'use step';
const writer = writable.getWriter();
try {
for (let i = 0; i <= 100; i += 10) {
await writer.write({ progress: i });
await new Promise(resolve => setTimeout(resolve, 100));
}
} finally {
writer.releaseLock();
}
}
async function finalizeStream(writable: WritableStream) {
'use step';
await writable.close();
}关键注意事项:工作流可以获取流,但不能与流交互。所有写入/关闭操作必须在步骤中执行。
Namespaced Streams
命名空间流
typescript
export async function multiStreamWorkflow() {
'use workflow';
const defaultStream = getWritable();
const logStream = getWritable({ namespace: 'logs' });
await writeToStreams(defaultStream, logStream);
}
async function writeToStreams(
defaultStream: WritableStream,
logStream: WritableStream
) {
'use step';
const writer1 = defaultStream.getWriter();
const writer2 = logStream.getWriter();
try {
await writer1.write({ data: 'main' });
await writer2.write({ log: 'processing' });
} finally {
writer1.releaseLock();
writer2.releaseLock();
}
}typescript
export async function multiStreamWorkflow() {
'use workflow';
const defaultStream = getWritable();
const logStream = getWritable({ namespace: 'logs' });
await writeToStreams(defaultStream, logStream);
}
async function writeToStreams(
defaultStream: WritableStream,
logStream: WritableStream
) {
'use step';
const writer1 = defaultStream.getWriter();
const writer2 = logStream.getWriter();
try {
await writer1.write({ data: 'main' });
await writer2.write({ log: 'processing' });
} finally {
writer1.releaseLock();
writer2.releaseLock();
}
}Common Patterns
常见模式
Sequential Steps
顺序步骤
typescript
export async function sequentialWorkflow(data: unknown) {
'use workflow';
const validated = await validateData(data);
const processed = await processData(validated);
const stored = await storeData(processed);
return stored;
}typescript
export async function sequentialWorkflow(data: unknown) {
'use workflow';
const validated = await validateData(data);
const processed = await processData(validated);
const stored = await storeData(processed);
return stored;
}Parallel Steps
并行步骤
typescript
export async function parallelWorkflow(userId: string) {
'use workflow';
const [user, orders, payments] = await Promise.all([
fetchUser(userId),
fetchOrders(userId),
fetchPayments(userId),
]);
return { user, orders, payments };
}typescript
export async function parallelWorkflow(userId: string) {
'use workflow';
const [user, orders, payments] = await Promise.all([
fetchUser(userId),
fetchOrders(userId),
fetchPayments(userId),
]);
return { user, orders, payments };
}Conditional Steps
条件步骤
typescript
export async function conditionalWorkflow(orderId: string) {
'use workflow';
const order = await fetchOrder(orderId);
if (order.isPaid) {
await fulfillOrder(order);
} else {
await sendPaymentReminder(order);
}
}typescript
export async function conditionalWorkflow(orderId: string) {
'use workflow';
const order = await fetchOrder(orderId);
if (order.isPaid) {
await fulfillOrder(order);
} else {
await sendPaymentReminder(order);
}
}Loops with Steps
带步骤的循环
typescript
export async function batchWorkflow(items: string[]) {
'use workflow';
for (const item of items) {
await processItem(item);
}
return { processed: items.length };
}typescript
export async function batchWorkflow(items: string[]) {
'use workflow';
for (const item of items) {
await processItem(item);
}
return { processed: items.length };
}Timeout Pattern
超时模式
typescript
import { sleep } from 'workflow';
export async function timeoutWorkflow(taskId: string) {
'use workflow';
const result = await Promise.race([
processTask(taskId),
sleep('30s').then(() => 'timeout' as const),
]);
if (result === 'timeout') {
throw new Error('Task timed out after 30 seconds');
}
return result;
}typescript
import { sleep } from 'workflow';
export async function timeoutWorkflow(taskId: string) {
'use workflow';
const result = await Promise.race([
processTask(taskId),
sleep('30s').then(() => 'timeout' as const),
]);
if (result === 'timeout') {
throw new Error('Task timed out after 30 seconds');
}
return result;
}Rollback Pattern
回滚模式
typescript
export async function rollbackWorkflow(orderId: string) {
'use workflow';
const rollbacks: Array<() => Promise<void>> = [];
try {
await reserveInventory(orderId);
rollbacks.push(() => releaseInventory(orderId));
await chargePayment(orderId);
rollbacks.push(() => refundPayment(orderId));
await fulfillOrder(orderId);
} catch (error) {
// Execute rollbacks in reverse order
for (const rollback of rollbacks.reverse()) {
await rollback();
}
throw error;
}
}typescript
export async function rollbackWorkflow(orderId: string) {
'use workflow';
const rollbacks: Array<() => Promise<void>> = [];
try {
await reserveInventory(orderId);
rollbacks.push(() => releaseInventory(orderId));
await chargePayment(orderId);
rollbacks.push(() => refundPayment(orderId));
await fulfillOrder(orderId);
} catch (error) {
// 反向执行回滚操作
for (const rollback of rollbacks.reverse()) {
await rollback();
}
throw error;
}
}Idempotency
幂等性
Using Step IDs
使用步骤ID
typescript
import { getStepMetadata } from 'workflow';
async function chargeUser(userId: string, amount: number) {
'use step';
const { stepId } = getStepMetadata();
return await stripe.charges.create(
{ amount, currency: 'usd', customer: userId },
{ idempotencyKey: `charge:${stepId}` }
);
}Rules:
- Always use for external API idempotency
stepId - is stable across retries
stepId - Never use attempt numbers or timestamps
typescript
import { getStepMetadata } from 'workflow';
async function chargeUser(userId: string, amount: number) {
'use step';
const { stepId } = getStepMetadata();
return await stripe.charges.create(
{ amount, currency: 'usd', customer: userId },
{ idempotencyKey: `charge:${stepId}` }
);
}规则:
- 始终使用作为外部API的幂等键
stepId - 在重试过程中保持稳定
stepId - 绝不能使用尝试次数或时间戳
Testing Workflows
测试工作流
typescript
import { start } from 'workflow/api';
import { myWorkflow } from './workflows/my-workflow';
// Start workflow
const run = await start(myWorkflow, ['arg1']);
// Check status
console.log(await run.status); // 'running' | 'completed' | 'failed'
// Wait for completion
const result = await run.returnValue;
// Stream output
const stream = run.readable;typescript
import { start } from 'workflow/api';
import { myWorkflow } from './workflows/my-workflow';
// 启动工作流
const run = await start(myWorkflow, ['arg1']);
// 检查状态
console.log(await run.status); // 'running' | 'completed' | 'failed'
// 等待完成
const result = await run.returnValue;
// 流式输出
const stream = run.readable;Anti-Patterns
反模式
❌ Direct Node.js API in Workflow
❌ 在工作流中直接使用Node.js API
typescript
export async function badWorkflow() {
'use workflow';
// ERROR: fs not available in workflow context
const data = fs.readFileSync('file.txt');
}typescript
export async function badWorkflow() {
'use workflow';
// 错误:工作流上下文不支持fs
const data = fs.readFileSync('file.txt');
}❌ Non-Deterministic Logic
❌ 非确定性逻辑
typescript
export async function badWorkflow() {
'use workflow';
// ERROR: Date.now() will change on replay
if (Date.now() > someTimestamp) { /* ... */ }
// ERROR: Math.random() will change on replay
if (Math.random() > 0.5) { /* ... */ }
}typescript
export async function badWorkflow() {
'use workflow';
// 错误:Date.now()在重放时会变化
if (Date.now() > someTimestamp) { /* ... */ }
// 错误:Math.random()在重放时会变化
if (Math.random() > 0.5) { /* ... */ }
}❌ Mutating Parameters
❌ 修改参数
typescript
export async function badWorkflow(data: { count: number }) {
'use workflow';
await incrementCount(data);
console.log(data.count); // Still original value!
}
async function incrementCount(data: { count: number }) {
'use step';
data.count++; // Only mutates the copy
}typescript
export async function badWorkflow(data: { count: number }) {
'use workflow';
await incrementCount(data);
console.log(data.count); // 仍然是原始值!
}
async function incrementCount(data: { count: number }) {
'use step';
data.count++; // 仅修改副本
}❌ Stream Interaction in Workflow
❌ 在工作流中操作流
typescript
export async function badWorkflow() {
'use workflow';
const writable = getWritable();
const writer = writable.getWriter(); // ERROR!
await writer.write('data'); // ERROR!
}typescript
export async function badWorkflow() {
'use workflow';
const writable = getWritable();
const writer = writable.getWriter(); // 错误!
await writer.write('data'); // 错误!
}❌ Missing Directive
❌ 缺少指令
typescript
// ERROR: No "use step" - won't be retried
async function fetchData() {
return await db.query('SELECT * FROM users');
}typescript
// 错误:没有"use step" - 不会触发重试
async function fetchData() {
return await db.query('SELECT * FROM users');
}Quick Reference
快速参考
Workflow Functions:
- Orchestrate steps
- Must be deterministic
- No Node.js APIs
- Sandboxed environment
Step Functions:
- Execute work
- Full Node.js access
- Automatic retries
- Can throw errors
Serialization:
- Pass-by-value (copy)
- Return modified values
- No functions/closures
Error Handling:
- - Retry automatically
Error - - No retry
FatalError - - Retry with delay
RetryableError
Streaming:
- Get stream in workflow
- Interact in steps only
- Always release locks
- Close when done
Hooks:
- Use for type safety
defineHook - Custom tokens for determinism
- Iterate with
for await...of
Idempotency:
- Use for keys
stepId - Apply to external APIs
- Steps are already idempotent
工作流函数:
- 协调步骤
- 必须是确定性的
- 无Node.js API访问权限
- 沙箱环境
步骤函数:
- 执行业务逻辑
- 拥有完整Node.js访问权限
- 自动重试
- 可抛出错误
序列化:
- 按值传递(复制)
- 返回修改后的值
- 禁止函数/闭包
错误处理:
- - 自动重试
Error - - 不重试
FatalError - - 带延迟重试
RetryableError
流式传输:
- 在工作流中获取流
- 仅在步骤中操作流
- 始终释放锁
- 使用完毕后关闭流
钩子:
- 使用保证类型安全
defineHook - 使用自定义令牌确保确定性
- 使用遍历事件
for await...of
幂等性:
- 使用作为幂等键
stepId - 应用于外部API
- 步骤本身已具备幂等性