cloudflare-workflows

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cloudflare Workflows

Cloudflare Workflows

Status: Production Ready ✅ Last Updated: 2025-10-22 Dependencies: cloudflare-worker-base (for Worker setup) Latest Versions: wrangler@4.44.0, @cloudflare/workers-types@4.20251014.0

状态:已就绪可用于生产环境 ✅ 最后更新:2025-10-22 依赖项:cloudflare-worker-base(用于Worker配置) 最新版本:wrangler@4.44.0, @cloudflare/workers-types@4.20251014.0

Quick Start (10 Minutes)

快速入门(10分钟)

1. Create a Workflow

1. 创建工作流

Use the Cloudflare Workflows starter template:
bash
npm create cloudflare@latest my-workflow -- --template cloudflare/workflows-starter --git --deploy false
cd my-workflow
What you get:
  • WorkflowEntrypoint class template
  • Worker to trigger workflows
  • Complete wrangler.jsonc configuration
使用Cloudflare Workflows入门模板:
bash
npm create cloudflare@latest my-workflow -- --template cloudflare/workflows-starter --git --deploy false
cd my-workflow
你将获得:
  • WorkflowEntrypoint类模板
  • 用于触发工作流的Worker
  • 完整的wrangler.jsonc配置

2. Understand the Basic Structure

2. 理解基础结构

src/index.ts:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';

type Env = {
  MY_WORKFLOW: Workflow;
};

type Params = {
  userId: string;
  email: string;
};

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Access params from event.payload
    const { userId, email } = event.payload;

    // Step 1: Do some work
    const result = await step.do('process user', async () => {
      return { processed: true, userId };
    });

    // Step 2: Wait before next action
    await step.sleep('wait 1 hour', '1 hour');

    // Step 3: Continue workflow
    await step.do('send email', async () => {
      // Send email logic
      return { sent: true, email };
    });

    // Optional: return final state
    return { completed: true, userId };
  }
}

// Worker to trigger workflow
export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    // Create new workflow instance
    const instance = await env.MY_WORKFLOW.create({
      params: { userId: '123', email: 'user@example.com' }
    });

    return Response.json({
      id: instance.id,
      status: await instance.status()
    });
  }
};
src/index.ts:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';

type Env = {
  MY_WORKFLOW: Workflow;
};

type Params = {
  userId: string;
  email: string;
};

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // 从event.payload中获取参数
    const { userId, email } = event.payload;

    // 步骤1:执行任务
    const result = await step.do('process user', async () => {
      return { processed: true, userId };
    });

    // 步骤2:等待后执行下一个操作
    await step.sleep('wait 1 hour', '1 hour');

    // 步骤3:继续工作流
    await step.do('send email', async () => {
      // 发送邮件逻辑
      return { sent: true, email };
    });

    // 可选:返回最终状态
    return { completed: true, userId };
  }
}

// 用于触发工作流的Worker
export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    // 创建新的工作流实例
    const instance = await env.MY_WORKFLOW.create({
      params: { userId: '123', email: 'user@example.com' }
    });

    return Response.json({
      id: instance.id,
      status: await instance.status()
    });
  }
};

3. Configure Wrangler

3. 配置Wrangler

wrangler.jsonc:
jsonc
{
  "name": "my-workflow",
  "main": "src/index.ts",
  "compatibility_date": "2025-10-22",
  "workflows": [
    {
      "name": "my-workflow",
      "binding": "MY_WORKFLOW",
      "class_name": "MyWorkflow"
    }
  ]
}
wrangler.jsonc:
jsonc
{
  "name": "my-workflow",
  "main": "src/index.ts",
  "compatibility_date": "2025-10-22",
  "workflows": [
    {
      "name": "my-workflow",
      "binding": "MY_WORKFLOW",
      "class_name": "MyWorkflow"
    }
  ]
}

4. Deploy and Test

4. 部署与测试

bash
undefined
bash
undefined

Deploy workflow

部署工作流

npm run deploy
npm run deploy

Trigger workflow (visit in browser or curl)

触发工作流(在浏览器访问或使用curl)

curl https://my-workflow.<subdomain>.workers.dev/
curl https://my-workflow.<subdomain>.workers.dev/

View workflow instances

查看工作流实例

npx wrangler workflows instances list my-workflow
npx wrangler workflows instances list my-workflow

Check instance status

检查实例状态

npx wrangler workflows instances describe my-workflow <instance-id>

---
npx wrangler workflows instances describe my-workflow <instance-id>

---

WorkflowEntrypoint Class

WorkflowEntrypoint类

Extend WorkflowEntrypoint

继承WorkflowEntrypoint

Every Workflow must extend
WorkflowEntrypoint
and implement a
run()
method:
typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Workflow steps here
  }
}
Type Parameters:
  • Env
    - Environment bindings (KV, D1, R2, etc.)
  • Params
    - Type of workflow parameters passed via
    event.payload
每个工作流必须继承
WorkflowEntrypoint
并实现
run()
方法:
typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // 工作流步骤写在这里
  }
}
类型参数:
  • Env
    - 环境绑定(KV、D1、R2等)
  • Params
    - 通过
    event.payload
    传递的工作流参数类型

run() Method

run()方法

typescript
async run(
  event: WorkflowEvent<Params>,
  step: WorkflowStep
): Promise<T | void>
Parameters:
  • event
    - Contains workflow metadata and payload
  • step
    - Provides step methods (do, sleep, sleepUntil, waitForEvent)
Returns:
  • Optional return value (must be serializable)
  • Return value available via instance.status()
Example:
typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
  async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
    const { orderId, customerId } = event.payload;

    // Access bindings via this.env
    const order = await this.env.DB.prepare(
      'SELECT * FROM orders WHERE id = ?'
    ).bind(orderId).first();

    const result = await step.do('process payment', async () => {
      // Payment processing
      return { paid: true, amount: order.total };
    });

    // Return final state
    return {
      orderId,
      status: 'completed',
      paidAmount: result.amount
    };
  }
}

typescript
async run(
  event: WorkflowEvent<Params>,
  step: WorkflowStep
): Promise<T | void>
参数:
  • event
    - 包含工作流元数据和负载
  • step
    - 提供步骤方法(do、sleep、sleepUntil、waitForEvent)
返回值:
  • 可选返回值(必须可序列化)
  • 返回值可通过instance.status()获取
示例:
typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
  async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
    const { orderId, customerId } = event.payload;

    // 通过this.env访问绑定资源
    const order = await this.env.DB.prepare(
      'SELECT * FROM orders WHERE id = ?'
    ).bind(orderId).first();

    const result = await step.do('process payment', async () => {
      // 支付处理逻辑
      return { paid: true, amount: order.total };
    });

    // 返回最终状态
    return {
      orderId,
      status: 'completed',
      paidAmount: result.amount
    };
  }
}

Step Methods

步骤方法

step.do() - Execute Work

step.do() - 执行任务

typescript
step.do<T>(
  name: string,
  config?: WorkflowStepConfig,
  callback: () => Promise<T>
): Promise<T>
OR (config is optional):
typescript
step.do<T>(
  name: string,
  callback: () => Promise<T>
): Promise<T>
Parameters:
  • name
    - Step name (for observability)
  • config
    (optional) - Retry configuration
  • callback
    - Async function that does the work
Returns:
  • The value returned from callback (must be serializable)
Example:
typescript
// Simple step
const files = await step.do('fetch files', async () => {
  const response = await fetch('https://api.example.com/files');
  return await response.json();
});

// Step with retry config
const result = await step.do(
  'call payment API',
  {
    retries: {
      limit: 10,
      delay: '10 seconds',
      backoff: 'exponential'
    },
    timeout: '5 minutes'
  },
  async () => {
    const response = await fetch('https://payment-api.example.com/charge', {
      method: 'POST',
      body: JSON.stringify({ amount: 100 })
    });
    return await response.json();
  }
);
CRITICAL - Serialization:
  • Return value must be JSON serializable
  • ✅ Allowed: string, number, boolean, Array, Object, null
  • ❌ Forbidden: Function, Symbol, circular references, undefined
  • Step will throw error if return value isn't serializable

typescript
step.do<T>(
  name: string,
  config?: WorkflowStepConfig,
  callback: () => Promise<T>
): Promise<T>
(config为可选参数):
typescript
step.do<T>(
  name: string,
  callback: () => Promise<T>
): Promise<T>
参数:
  • name
    - 步骤名称(用于可观测性)
  • config
    (可选)- 重试配置
  • callback
    - 执行任务的异步函数
返回值:
  • callback返回的值(必须可序列化)
示例:
typescript
// 简单步骤
const files = await step.do('fetch files', async () => {
  const response = await fetch('https://api.example.com/files');
  return await response.json();
});

// 带重试配置的步骤
const result = await step.do(
  'call payment API',
  {
    retries: {
      limit: 10,
      delay: '10 seconds',
      backoff: 'exponential'
    },
    timeout: '5 minutes'
  },
  async () => {
    const response = await fetch('https://payment-api.example.com/charge', {
      method: 'POST',
      body: JSON.stringify({ amount: 100 })
    });
    return await response.json();
  }
);
关键注意事项 - 序列化:
  • 返回值必须可JSON序列化
  • ✅ 允许类型:字符串、数字、布尔值、数组、对象、null
  • ❌ 禁止类型:函数、Symbol、循环引用、undefined
  • 如果返回值不可序列化,步骤会抛出错误

step.sleep() - Relative Sleep

step.sleep() - 相对时间等待

typescript
step.sleep(name: string, duration: WorkflowDuration): Promise<void>
Parameters:
  • name
    - Step name
  • duration
    - Number (milliseconds) or human-readable string
Accepted units:
  • "second"
    /
    "seconds"
  • "minute"
    /
    "minutes"
  • "hour"
    /
    "hours"
  • "day"
    /
    "days"
  • "week"
    /
    "weeks"
  • "month"
    /
    "months"
  • "year"
    /
    "years"
Examples:
typescript
// Sleep for 5 minutes
await step.sleep('wait 5 minutes', '5 minutes');

// Sleep for 1 hour
await step.sleep('hourly delay', '1 hour');

// Sleep for 2 days
await step.sleep('wait 2 days', '2 days');

// Sleep using milliseconds
await step.sleep('wait 30 seconds', 30000);

// Common pattern: schedule daily task
await step.do('send daily report', async () => {
  // Send report
});
await step.sleep('wait until tomorrow', '1 day');
// Workflow continues next day
Priority:
  • Workflows resuming from sleep take priority over new instances
  • Ensures older workflows complete before new ones start

typescript
step.sleep(name: string, duration: WorkflowDuration): Promise<void>
参数:
  • name
    - 步骤名称
  • duration
    - 数字(毫秒)或人类可读的字符串
支持的单位:
  • "second"
    /
    "seconds"
  • "minute"
    /
    "minutes"
  • "hour"
    /
    "hours"
  • "day"
    /
    "days"
  • "week"
    /
    "weeks"
  • "month"
    /
    "months"
  • "year"
    /
    "years"
示例:
typescript
// 等待5分钟
await step.sleep('wait 5 minutes', '5 minutes');

// 等待1小时
await step.sleep('hourly delay', '1 hour');

// 等待2天
await step.sleep('wait 2 days', '2 days');

// 使用毫秒等待30秒
await step.sleep('wait 30 seconds', 30000);

// 常见模式:调度每日任务
await step.do('send daily report', async () => {
  // 发送报告
});
await step.sleep('wait until tomorrow', '1 day');
// 工作流次日继续执行
优先级:
  • 从睡眠中恢复的工作流优先于新实例
  • 确保旧工作流在新实例启动前完成

step.sleepUntil() - Sleep to Specific Date

step.sleepUntil() - 等待至指定日期

typescript
step.sleepUntil(
  name: string,
  timestamp: Date | number
): Promise<void>
Parameters:
  • name
    - Step name
  • timestamp
    - Date object or UNIX timestamp (milliseconds)
Examples:
typescript
// Sleep until specific date
const launchDate = new Date('2025-12-25T00:00:00Z');
await step.sleepUntil('wait for launch', launchDate);

// Sleep until UNIX timestamp
const timestamp = Date.parse('24 Oct 2024 13:00:00 UTC');
await step.sleepUntil('wait until time', timestamp);

// Sleep until next Monday 9am UTC
const nextMonday = new Date();
nextMonday.setDate(nextMonday.getDate() + ((1 + 7 - nextMonday.getDay()) % 7 || 7));
nextMonday.setUTCHours(9, 0, 0, 0);
await step.sleepUntil('wait until Monday 9am', nextMonday);

// Schedule work at specific time
await step.do('prepare campaign', async () => {
  // Prepare marketing campaign
});

const campaignLaunch = new Date('2025-11-01T12:00:00Z');
await step.sleepUntil('wait for campaign launch', campaignLaunch);

await step.do('launch campaign', async () => {
  // Launch campaign
});

typescript
step.sleepUntil(
  name: string,
  timestamp: Date | number
): Promise<void>
参数:
  • name
    - 步骤名称
  • timestamp
    - Date对象或UNIX时间戳(毫秒)
示例:
typescript
// 等待至指定日期
const launchDate = new Date('2025-12-25T00:00:00Z');
await step.sleepUntil('wait for launch', launchDate);

// 等待至UNIX时间戳
const timestamp = Date.parse('24 Oct 2024 13:00:00 UTC');
await step.sleepUntil('wait until time', timestamp);

// 等待至下周一UTC时间9点
const nextMonday = new Date();
nextMonday.setDate(nextMonday.getDate() + ((1 + 7 - nextMonday.getDay()) % 7 || 7));
nextMonday.setUTCHours(9, 0, 0, 0);
await step.sleepUntil('wait until Monday 9am', nextMonday);

// 在指定时间调度任务
await step.do('prepare campaign', async () => {
  // 准备营销活动
});

const campaignLaunch = new Date('2025-11-01T12:00:00Z');
await step.sleepUntil('wait for campaign launch', campaignLaunch);

await step.do('launch campaign', async () => {
  // 启动营销活动
});

step.waitForEvent() - Wait for External Event

step.waitForEvent() - 等待外部事件

typescript
step.waitForEvent<T>(
  name: string,
  options: { type: string; timeout?: string | number }
): Promise<T>
Parameters:
  • name
    - Step name
  • options.type
    - Event type to match
  • options.timeout
    (optional) - Max wait time (default: 24 hours)
Returns:
  • The event payload sent via
    instance.sendEvent()
Example:
typescript
export class PaymentWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Create payment intent
    await step.do('create payment intent', async () => {
      // Call Stripe API
    });

    // Wait for webhook from Stripe (max 1 hour)
    const webhookData = await step.waitForEvent<StripeWebhook>(
      'wait for payment confirmation',
      { type: 'stripe-webhook', timeout: '1 hour' }
    );

    // Continue based on webhook
    if (webhookData.status === 'succeeded') {
      await step.do('fulfill order', async () => {
        // Fulfill order
      });
    } else {
      await step.do('handle failed payment', async () => {
        // Handle failure
      });
    }
  }
}

// Worker receives webhook and sends event to workflow
export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    if (req.url.includes('/webhook/stripe')) {
      const webhookData = await req.json();

      // Get workflow instance by ID (stored when created)
      const instance = await env.PAYMENT_WORKFLOW.get(instanceId);

      // Send event to waiting workflow
      await instance.sendEvent({
        type: 'stripe-webhook',
        payload: webhookData
      });

      return new Response('OK');
    }
  }
};
Timeout behavior:
  • If timeout expires, throws error and workflow can retry or fail
  • Wrap in try-catch if timeout should not fail workflow
typescript
try {
  const event = await step.waitForEvent('wait for user input', {
    type: 'user-submitted',
    timeout: '10 minutes'
  });
} catch (error) {
  // Timeout occurred - handle gracefully
  await step.do('send reminder', async () => {
    // Send reminder to user
  });
}

typescript
step.waitForEvent<T>(
  name: string,
  options: { type: string; timeout?: string | number }
): Promise<T>
参数:
  • name
    - 步骤名称
  • options.type
    - 要匹配的事件类型
  • options.timeout
    (可选)- 最长等待时间(默认:24小时)
返回值:
  • 通过
    instance.sendEvent()
    发送的事件负载
示例:
typescript
export class PaymentWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // 创建支付意向
    await step.do('create payment intent', async () => {
      // 调用Stripe API
    });

    // 等待Stripe的Webhook(最长1小时)
    const webhookData = await step.waitForEvent<StripeWebhook>(
      'wait for payment confirmation',
      { type: 'stripe-webhook', timeout: '1 hour' }
    );

    // 根据Webhook内容继续执行
    if (webhookData.status === 'succeeded') {
      await step.do('fulfill order', async () => {
        // 履行订单
      });
    } else {
      await step.do('handle failed payment', async () => {
        // 处理支付失败
      });
    }
  }
}

// Worker接收Webhook并向工作流发送事件
export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    if (req.url.includes('/webhook/stripe')) {
      const webhookData = await req.json();

      // 通过ID获取工作流实例(创建时已存储)
      const instance = await env.PAYMENT_WORKFLOW.get(instanceId);

      // 向等待中的工作流发送事件
      await instance.sendEvent({
        type: 'stripe-webhook',
        payload: webhookData
      });

      return new Response('OK');
    }
  }
};
超时行为:
  • 如果超时,会抛出错误,工作流可重试或失败
  • 如果超时不应导致工作流失败,可包裹在try-catch中
typescript
try {
  const event = await step.waitForEvent('wait for user input', {
    type: 'user-submitted',
    timeout: '10 minutes'
  });
} catch (error) {
  // 发生超时 - 优雅处理
  await step.do('send reminder', async () => {
    // 向用户发送提醒
  });
}

WorkflowStepConfig

WorkflowStepConfig

Configure retry behavior for individual steps:
typescript
interface WorkflowStepConfig {
  retries?: {
    limit: number;          // Max retry attempts (Infinity allowed)
    delay: string | number; // Delay between retries
    backoff?: 'constant' | 'linear' | 'exponential';
  };
  timeout?: string | number; // Max time per attempt
}
为单个步骤配置重试行为:
typescript
interface WorkflowStepConfig {
  retries?: {
    limit: number;          // 最大重试次数(允许设为Infinity)
    delay: string | number; // 重试间隔
    backoff?: 'constant' | 'linear' | 'exponential';
  };
  timeout?: string | number; // 单次尝试最长时间
}

Default Configuration

默认配置

If no config provided, Workflows uses:
typescript
{
  retries: {
    limit: 5,
    delay: 10000,      // 10 seconds
    backoff: 'exponential'
  },
  timeout: '10 minutes'
}
如果未提供配置,Workflows会使用以下默认值:
typescript
{
  retries: {
    limit: 5,
    delay: 10000,      // 10秒
    backoff: 'exponential'
  },
  timeout: '10 minutes'
}

Retry Examples

重试示例

Constant Backoff (same delay each time):
typescript
await step.do(
  'send email',
  {
    retries: {
      limit: 3,
      delay: '30 seconds',
      backoff: 'constant'  // Always wait 30 seconds
    }
  },
  async () => {
    // Send email
  }
);
Linear Backoff (increasing delay):
typescript
await step.do(
  'poll API',
  {
    retries: {
      limit: 5,
      delay: '1 minute',
      backoff: 'linear'  // 1m, 2m, 3m, 4m, 5m
    }
  },
  async () => {
    // Poll API
  }
);
Exponential Backoff (recommended for most cases):
typescript
await step.do(
  'call rate-limited API',
  {
    retries: {
      limit: 10,
      delay: '10 seconds',
      backoff: 'exponential'  // 10s, 20s, 40s, 80s, 160s, ...
    },
    timeout: '5 minutes'
  },
  async () => {
    // API call
  }
);
Unlimited Retries:
typescript
await step.do(
  'critical operation',
  {
    retries: {
      limit: Infinity,  // Retry forever
      delay: '1 minute',
      backoff: 'exponential'
    }
  },
  async () => {
    // Operation that must succeed eventually
  }
);
No Retries:
typescript
await step.do(
  'non-idempotent operation',
  {
    retries: {
      limit: 0  // Fail immediately on error
    }
  },
  async () => {
    // One-time operation
  }
);

恒定间隔重试(每次延迟相同):
typescript
await step.do(
  'send email',
  {
    retries: {
      limit: 3,
      delay: '30 seconds',
      backoff: 'constant'  // 始终等待30秒
    }
  },
  async () => {
    // 发送邮件
  }
);
线性间隔重试(延迟逐渐增加):
typescript
await step.do(
  'poll API',
  {
    retries: {
      limit: 5,
      delay: '1 minute',
      backoff: 'linear'  // 1分钟、2分钟、3分钟、4分钟、5分钟
    }
  },
  async () => {
    // 轮询API
  }
);
指数间隔重试(大多数场景推荐):
typescript
await step.do(
  'call rate-limited API',
  {
    retries: {
      limit: 10,
      delay: '10 seconds',
      backoff: 'exponential'  // 10秒、20秒、40秒、80秒、160秒...
    },
    timeout: '5 minutes'
  },
  async () => {
    // API调用
  }
);
无限重试:
typescript
await step.do(
  'critical operation',
  {
    retries: {
      limit: Infinity,  // 一直重试
      delay: '1 minute',
      backoff: 'exponential'
    }
  },
  async () => {
    // 必须最终成功的操作
  }
);
不重试:
typescript
await step.do(
  'non-idempotent operation',
  {
    retries: {
      limit: 0  // 出错时立即失败
    }
  },
  async () => {
    // 一次性操作
  }
);

Error Handling

错误处理

NonRetryableError

NonRetryableError

Force workflow to fail immediately without retrying:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    await step.do('validate input', async () => {
      if (!event.payload.userId) {
        throw new NonRetryableError('userId is required');
      }

      // Validate user exists
      const user = await this.env.DB.prepare(
        'SELECT * FROM users WHERE id = ?'
      ).bind(event.payload.userId).first();

      if (!user) {
        // Terminal error - retrying won't help
        throw new NonRetryableError('User not found');
      }

      return user;
    });
  }
}
When to use NonRetryableError:
  • ✅ Authentication/authorization failures
  • ✅ Invalid input that won't change
  • ✅ Resource doesn't exist (404)
  • ✅ Validation errors
  • ❌ Network failures (should retry)
  • ❌ Rate limits (should retry with backoff)
  • ❌ Temporary service outages (should retry)

强制工作流立即失败,不进行重试:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    await step.do('validate input', async () => {
      if (!event.payload.userId) {
        throw new NonRetryableError('userId is required');
      }

      // 验证用户是否存在
      const user = await this.env.DB.prepare(
        'SELECT * FROM users WHERE id = ?'
      ).bind(event.payload.userId).first();

      if (!user) {
        // 终止错误 - 重试无意义
        throw new NonRetryableError('User not found');
      }

      return user;
    });
  }
}
何时使用NonRetryableError:
  • ✅ 认证/授权失败
  • ✅ 不会变更的无效输入
  • ✅ 资源不存在(404)
  • ✅ 验证错误
  • ❌ 网络故障(应重试)
  • ❌ 速率限制(应带间隔重试)
  • ❌ 临时服务中断(应重试)

Catch Errors to Continue Workflow

捕获错误以继续工作流

Prevent entire workflow from failing by catching step errors:
typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Critical step - workflow fails if this fails
    await step.do('process payment', async () => {
      // Payment processing
    });

    // Optional step - workflow continues even if it fails
    try {
      await step.do('send confirmation email', async () => {
        // Email sending
      });
    } catch (error) {
      console.log(`Email failed: ${error.message}`);

      // Do cleanup or alternative action
      await step.do('log email failure', async () => {
        await this.env.DB.prepare(
          'INSERT INTO failed_emails (user_id, error) VALUES (?, ?)'
        ).bind(event.payload.userId, error.message).run();
      });
    }

    // Workflow continues
    await step.do('update order status', async () => {
      // Update status
    });
  }
}
Pattern: Graceful degradation:
typescript
// Try primary service, fall back to secondary
let result;

try {
  result = await step.do('call primary API', async () => {
    return await callPrimaryAPI();
  });
} catch (error) {
  console.log('Primary API failed, trying backup');

  result = await step.do('call backup API', async () => {
    return await callBackupAPI();
  });
}

通过捕获步骤错误,避免整个工作流失败:
typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // 关键步骤 - 此步骤失败则工作流失败
    await step.do('process payment', async () => {
      // 支付处理
    });

    // 可选步骤 - 即使失败,工作流仍继续执行
    try {
      await step.do('send confirmation email', async () => {
        // 发送邮件
      });
    } catch (error) {
      console.log(`Email failed: ${error.message}`);

      // 执行清理或替代操作
      await step.do('log email failure', async () => {
        await this.env.DB.prepare(
          'INSERT INTO failed_emails (user_id, error) VALUES (?, ?)'
        ).bind(event.payload.userId, error.message).run();
      });
    }

    // 工作流继续执行
    await step.do('update order status', async () => {
      // 更新状态
    });
  }
}
模式:优雅降级:
typescript
// 尝试主服务,失败则回退到备用服务
let result;

try {
  result = await step.do('call primary API', async () => {
    return await callPrimaryAPI();
  });
} catch (error) {
  console.log('Primary API failed, trying backup');

  result = await step.do('call backup API', async () => {
    return await callBackupAPI();
  });
}

Triggering Workflows

触发工作流

From Workers

从Worker触发

Configure binding in wrangler.jsonc:
jsonc
{
  "name": "trigger-worker",
  "main": "src/index.ts",
  "compatibility_date": "2025-10-22",
  "workflows": [
    {
      "name": "my-workflow",
      "binding": "MY_WORKFLOW",
      "class_name": "MyWorkflow",
      "script_name": "workflow-worker"  // If workflow is in different Worker
    }
  ]
}
Trigger from Worker:
typescript
type Env = {
  MY_WORKFLOW: Workflow;
};

export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    // Create new workflow instance
    const instance = await env.MY_WORKFLOW.create({
      params: {
        userId: '123',
        email: 'user@example.com'
      }
    });

    // Return instance ID
    return Response.json({
      id: instance.id,
      status: await instance.status()
    });
  }
};
在wrangler.jsonc中配置绑定:
jsonc
{
  "name": "trigger-worker",
  "main": "src/index.ts",
  "compatibility_date": "2025-10-22",
  "workflows": [
    {
      "name": "my-workflow",
      "binding": "MY_WORKFLOW",
      "class_name": "MyWorkflow",
      "script_name": "workflow-worker"  // 如果工作流在其他Worker中
    }
  ]
}
从Worker触发:
typescript
type Env = {
  MY_WORKFLOW: Workflow;
};

export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    // 创建新的工作流实例
    const instance = await env.MY_WORKFLOW.create({
      params: {
        userId: '123',
        email: 'user@example.com'
      }
    });

    // 返回实例ID
    return Response.json({
      id: instance.id,
      status: await instance.status()
    });
  }
};

Get Instance Status

获取实例状态

typescript
// Get instance by ID
const instance = await env.MY_WORKFLOW.get(instanceId);

// Get status
const status = await instance.status();

console.log(status);
// {
//   status: 'running' | 'complete' | 'errored' | 'queued' | 'unknown',
//   error: string | null,
//   output: any  // Return value from run() if complete
// }
typescript
// 通过ID获取实例
const instance = await env.MY_WORKFLOW.get(instanceId);

// 获取状态
const status = await instance.status();

console.log(status);
// {
//   status: 'running' | 'complete' | 'errored' | 'queued' | 'unknown',
//   error: string | null,
//   output: any  // 如果已完成,为run()的返回值
// }

Send Events to Running Instance

向运行中的实例发送事件

typescript
// Get instance
const instance = await env.MY_WORKFLOW.get(instanceId);

// Send event (will be received by step.waitForEvent)
await instance.sendEvent({
  type: 'user-action',
  payload: { action: 'approved' }
});
typescript
// 获取实例
const instance = await env.MY_WORKFLOW.get(instanceId);

// 发送事件(将被step.waitForEvent接收)
await instance.sendEvent({
  type: 'user-action',
  payload: { action: 'approved' }
});

Pause and Resume

暂停与恢复

typescript
// Pause instance
await instance.pause();

// Resume instance
await instance.resume();

// Terminate instance
await instance.terminate();

typescript
// 暂停实例
await instance.pause();

// 恢复实例
await instance.resume();

// 终止实例
await instance.terminate();

Workflow Patterns

工作流模式

Pattern 1: Long-Running Process

模式1:长期运行的流程

typescript
export class VideoProcessingWorkflow extends WorkflowEntrypoint<Env, VideoParams> {
  async run(event: WorkflowEvent<VideoParams>, step: WorkflowStep) {
    const { videoId } = event.payload;

    // Step 1: Upload to processing service
    const uploadResult = await step.do('upload video', async () => {
      const video = await this.env.MY_BUCKET.get(`videos/${videoId}`);
      const response = await fetch('https://processor.example.com/upload', {
        method: 'POST',
        body: video?.body
      });
      return await response.json();
    });

    // Step 2: Wait for processing (could take hours)
    await step.sleep('wait for initial processing', '10 minutes');

    // Step 3: Poll for completion
    let processed = false;
    let attempts = 0;

    while (!processed && attempts < 20) {
      const status = await step.do(`check status attempt ${attempts}`, async () => {
        const response = await fetch(
          `https://processor.example.com/status/${uploadResult.jobId}`
        );
        return await response.json();
      });

      if (status.complete) {
        processed = true;
      } else {
        attempts++;
        await step.sleep(`wait before retry ${attempts}`, '5 minutes');
      }
    }

    // Step 4: Download processed video
    await step.do('download processed video', async () => {
      const response = await fetch(uploadResult.downloadUrl);
      const processed = await response.blob();
      await this.env.MY_BUCKET.put(`processed/${videoId}`, processed);
    });

    return { videoId, status: 'complete' };
  }
}

typescript
export class VideoProcessingWorkflow extends WorkflowEntrypoint<Env, VideoParams> {
  async run(event: WorkflowEvent<VideoParams>, step: WorkflowStep) {
    const { videoId } = event.payload;

    // 步骤1:上传到处理服务
    const uploadResult = await step.do('upload video', async () => {
      const video = await this.env.MY_BUCKET.get(`videos/${videoId}`);
      const response = await fetch('https://processor.example.com/upload', {
        method: 'POST',
        body: video?.body
      });
      return await response.json();
    });

    // 步骤2:等待初始处理(可能需要数小时)
    await step.sleep('wait for initial processing', '10 minutes');

    // 步骤3:轮询处理完成状态
    let processed = false;
    let attempts = 0;

    while (!processed && attempts < 20) {
      const status = await step.do(`check status attempt ${attempts}`, async () => {
        const response = await fetch(
          `https://processor.example.com/status/${uploadResult.jobId}`
        );
        return await response.json();
      });

      if (status.complete) {
        processed = true;
      } else {
        attempts++;
        await step.sleep(`wait before retry ${attempts}`, '5 minutes');
      }
    }

    // 步骤4:下载处理后的视频
    await step.do('download processed video', async () => {
      const response = await fetch(uploadResult.downloadUrl);
      const processed = await response.blob();
      await this.env.MY_BUCKET.put(`processed/${videoId}`, processed);
    });

    return { videoId, status: 'complete' };
  }
}

Pattern 2: Event-Driven Approval Flow

模式2:事件驱动的审批流程

typescript
export class ApprovalWorkflow extends WorkflowEntrypoint<Env, ApprovalParams> {
  async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
    const { requestId, requesterId } = event.payload;

    // Step 1: Create approval request
    await step.do('create approval request', async () => {
      await this.env.DB.prepare(
        'INSERT INTO approvals (id, requester_id, status) VALUES (?, ?, ?)'
      ).bind(requestId, requesterId, 'pending').run();
    });

    // Step 2: Send notification to approvers
    await step.do('notify approvers', async () => {
      await sendNotification(requestId);
    });

    // Step 3: Wait for approval (max 7 days)
    let approvalEvent;

    try {
      approvalEvent = await step.waitForEvent<ApprovalEvent>(
        'wait for approval decision',
        { type: 'approval-decision', timeout: '7 days' }
      );
    } catch (error) {
      // Timeout - auto-reject
      await step.do('auto-reject due to timeout', async () => {
        await this.env.DB.prepare(
          'UPDATE approvals SET status = ? WHERE id = ?'
        ).bind('rejected', requestId).run();
      });

      return { requestId, status: 'rejected', reason: 'timeout' };
    }

    // Step 4: Process decision
    await step.do('process approval decision', async () => {
      await this.env.DB.prepare(
        'UPDATE approvals SET status = ?, approver_id = ? WHERE id = ?'
      ).bind(approvalEvent.approved ? 'approved' : 'rejected', approvalEvent.approverId, requestId).run();
    });

    // Step 5: Execute approved action (if approved)
    if (approvalEvent.approved) {
      await step.do('execute approved action', async () => {
        // Execute the action
      });
    }

    return { requestId, status: approvalEvent.approved ? 'approved' : 'rejected' };
  }
}

typescript
export class ApprovalWorkflow extends WorkflowEntrypoint<Env, ApprovalParams> {
  async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
    const { requestId, requesterId } = event.payload;

    // 步骤1:创建审批请求
    await step.do('create approval request', async () => {
      await this.env.DB.prepare(
        'INSERT INTO approvals (id, requester_id, status) VALUES (?, ?, ?)'
      ).bind(requestId, requesterId, 'pending').run();
    });

    // 步骤2:向审批人发送通知
    await step.do('notify approvers', async () => {
      await sendNotification(requestId);
    });

    // 步骤3:等待审批(最长7天)
    let approvalEvent;

    try {
      approvalEvent = await step.waitForEvent<ApprovalEvent>(
        'wait for approval decision',
        { type: 'approval-decision', timeout: '7 days' }
      );
    } catch (error) {
      // 超时 - 自动拒绝
      await step.do('auto-reject due to timeout', async () => {
        await this.env.DB.prepare(
          'UPDATE approvals SET status = ? WHERE id = ?'
        ).bind('rejected', requestId).run();
      });

      return { requestId, status: 'rejected', reason: 'timeout' };
    }

    // 步骤4:处理审批结果
    await step.do('process approval decision', async () => {
      await this.env.DB.prepare(
        'UPDATE approvals SET status = ?, approver_id = ? WHERE id = ?'
      ).bind(approvalEvent.approved ? 'approved' : 'rejected', approvalEvent.approverId, requestId).run();
    });

    // 步骤5:如果审批通过,执行对应操作
    if (approvalEvent.approved) {
      await step.do('execute approved action', async () => {
        // 执行操作
      });
    }

    return { requestId, status: approvalEvent.approved ? 'approved' : 'rejected' };
  }
}

Pattern 3: Scheduled Workflow

模式3:定时工作流

typescript
export class DailyReportWorkflow extends WorkflowEntrypoint<Env, ReportParams> {
  async run(event: WorkflowEvent<ReportParams>, step: WorkflowStep) {
    // Calculate next 9am UTC
    const now = new Date();
    const tomorrow9am = new Date();
    tomorrow9am.setUTCDate(tomorrow9am.getUTCDate() + 1);
    tomorrow9am.setUTCHours(9, 0, 0, 0);

    // Sleep until tomorrow 9am
    await step.sleepUntil('wait until 9am tomorrow', tomorrow9am);

    // Generate report
    const report = await step.do('generate daily report', async () => {
      const results = await this.env.DB.prepare(
        'SELECT * FROM metrics WHERE date = ?'
      ).bind(now.toISOString().split('T')[0]).all();

      return {
        date: now.toISOString().split('T')[0],
        metrics: results.results
      };
    });

    // Send report
    await step.do('send report', async () => {
      await sendEmail({
        to: event.payload.recipients,
        subject: `Daily Report - ${report.date}`,
        body: formatReport(report.metrics)
      });
    });

    return { sent: true, date: report.date };
  }
}

typescript
export class DailyReportWorkflow extends WorkflowEntrypoint<Env, ReportParams> {
  async run(event: WorkflowEvent<ReportParams>, step: WorkflowStep) {
    // 计算下一个UTC时间9点
    const now = new Date();
    const tomorrow9am = new Date();
    tomorrow9am.setUTCDate(tomorrow9am.getUTCDate() + 1);
    tomorrow9am.setUTCHours(9, 0, 0, 0);

    // 等待到次日9点
    await step.sleepUntil('wait until 9am tomorrow', tomorrow9am);

    // 生成报告
    const report = await step.do('generate daily report', async () => {
      const results = await this.env.DB.prepare(
        'SELECT * FROM metrics WHERE date = ?'
      ).bind(now.toISOString().split('T')[0]).all();

      return {
        date: now.toISOString().split('T')[0],
        metrics: results.results
      };
    });

    // 发送报告
    await step.do('send report', async () => {
      await sendEmail({
        to: event.payload.recipients,
        subject: `Daily Report - ${report.date}`,
        body: formatReport(report.metrics)
      });
    });

    return { sent: true, date: report.date };
  }
}

Pattern 4: Workflow Chaining

模式4:工作流链式调用

typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
  async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
    const { orderId } = event.payload;

    // Step 1: Process payment
    const paymentResult = await step.do('process payment', async () => {
      return await processPayment(orderId);
    });

    // Step 2: Trigger fulfillment workflow
    const fulfillmentInstance = await step.do('start fulfillment', async () => {
      return await this.env.FULFILLMENT_WORKFLOW.create({
        params: {
          orderId,
          paymentId: paymentResult.id
        }
      });
    });

    // Step 3: Wait for fulfillment to complete
    await step.sleep('wait for fulfillment', '5 minutes');

    // Step 4: Check fulfillment status
    const fulfillmentStatus = await step.do('check fulfillment', async () => {
      const instance = await this.env.FULFILLMENT_WORKFLOW.get(fulfillmentInstance.id);
      return await instance.status();
    });

    if (fulfillmentStatus.status === 'complete') {
      // Step 5: Send confirmation
      await step.do('send order confirmation', async () => {
        await sendConfirmation(orderId);
      });
    }

    return { orderId, status: 'complete' };
  }
}

typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
  async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
    const { orderId } = event.payload;

    // 步骤1:处理支付
    const paymentResult = await step.do('process payment', async () => {
      return await processPayment(orderId);
    });

    // 步骤2:触发履行工作流
    const fulfillmentInstance = await step.do('start fulfillment', async () => {
      return await this.env.FULFILLMENT_WORKFLOW.create({
        params: {
          orderId,
          paymentId: paymentResult.id
        }
      });
    });

    // 步骤3:等待履行完成
    await step.sleep('wait for fulfillment', '5 minutes');

    // 步骤4:检查履行状态
    const fulfillmentStatus = await step.do('check fulfillment', async () => {
      const instance = await this.env.FULFILLMENT_WORKFLOW.get(fulfillmentInstance.id);
      return await instance.status();
    });

    if (fulfillmentStatus.status === 'complete') {
      // 步骤5:发送确认邮件
      await step.do('send order confirmation', async () => {
        await sendConfirmation(orderId);
      });
    }

    return { orderId, status: 'complete' };
  }
}

Wrangler Commands

Wrangler命令

List Workflow Instances

列出工作流实例

bash
undefined
bash
undefined

List all instances of a workflow

列出某个工作流的所有实例

npx wrangler workflows instances list my-workflow
npx wrangler workflows instances list my-workflow

Filter by status

按状态过滤

npx wrangler workflows instances list my-workflow --status running npx wrangler workflows instances list my-workflow --status complete npx wrangler workflows instances list my-workflow --status errored
undefined
npx wrangler workflows instances list my-workflow --status running npx wrangler workflows instances list my-workflow --status complete npx wrangler workflows instances list my-workflow --status errored
undefined

Describe Instance

查看实例详情

bash
undefined
bash
undefined

Get detailed info about specific instance

获取特定实例的详细信息

npx wrangler workflows instances describe my-workflow <instance-id>
npx wrangler workflows instances describe my-workflow <instance-id>

Output shows:

输出内容包括:

- Current status (running/complete/errored)

- 当前状态(running/complete/errored)

- Each step with start/end times

每个步骤的开始/结束时间

- Step outputs

- 步骤输出

- Retry history

- 重试历史

- Any errors

- 错误信息

- Sleep state (if sleeping)

- 睡眠状态(如果正在睡眠)

undefined
undefined

Trigger Workflow (Development)

触发工作流(开发环境)

bash
undefined
bash
undefined

Deploy workflow

部署工作流

npx wrangler deploy
npx wrangler deploy

Trigger via HTTP (if Worker is set up to trigger)

通过HTTP触发(如果Worker已配置触发逻辑)

curl https://my-workflow.<subdomain>.workers.dev/

---
curl https://my-workflow.<subdomain>.workers.dev/

---

State Persistence

状态持久化

What Can Be Persisted

可持久化的内容

Workflows automatically persist state returned from
step.do()
:
✅ Serializable Types:
  • Primitives:
    string
    ,
    number
    ,
    boolean
    ,
    null
  • Arrays:
    [1, 2, 3]
    ,
    ['a', 'b', 'c']
  • Objects:
    { key: 'value' }
    ,
    { nested: { data: true } }
  • Nested structures:
    { users: [{ id: 1, name: 'Alice' }] }
❌ Non-Serializable Types:
  • Functions:
    () => {}
  • Symbols:
    Symbol('key')
  • Circular references:
    const obj = {}; obj.self = obj;
  • undefined (use null instead)
  • Class instances (serialize to plain objects)
Example - Correct Serialization:
typescript
// ✅ Good - all values serializable
const result = await step.do('fetch data', async () => {
  return {
    users: [
      { id: 1, name: 'Alice', active: true },
      { id: 2, name: 'Bob', active: false }
    ],
    timestamp: Date.now(),
    metadata: null
  };
});

// ❌ Bad - contains function
const bad = await step.do('bad example', async () => {
  return {
    data: [1, 2, 3],
    transform: (x) => x * 2  // ❌ Function not serializable
  };
});
// This will throw an error!
Workflows会自动持久化
step.do()
返回的状态:
✅ 可序列化类型:
  • 基本类型:
    string
    number
    boolean
    null
  • 数组:
    [1, 2, 3]
    ['a', 'b', 'c']
  • 对象:
    { key: 'value' }
    { nested: { data: true } }
  • 嵌套结构:
    { users: [{ id: 1, name: 'Alice' }] }
❌ 不可序列化类型:
  • 函数:
    () => {}
  • Symbol:
    Symbol('key')
  • 循环引用:
    const obj = {}; obj.self = obj;
  • undefined(请使用null替代)
  • 类实例(需序列化为普通对象)
示例 - 正确的序列化:
typescript
// ✅ 正确 - 所有值均可序列化
const result = await step.do('fetch data', async () => {
  return {
    users: [
      { id: 1, name: 'Alice', active: true },
      { id: 2, name: 'Bob', active: false }
    ],
    timestamp: Date.now(),
    metadata: null
  };
});

// ❌ 错误 - 包含函数
const bad = await step.do('bad example', async () => {
  return {
    data: [1, 2, 3],
    transform: (x) => x * 2  // ❌ 函数不可序列化
  };
});
// 此代码会抛出错误!

Access State Across Steps

跨步骤访问状态

typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Step 1: Get data
    const userData = await step.do('fetch user', async () => {
      return { id: 123, email: 'user@example.com' };
    });

    // Step 2: Use data from step 1
    const orderData = await step.do('create order', async () => {
      return {
        userId: userData.id,      // ✅ Access previous step's data
        userEmail: userData.email,
        orderId: 'ORD-456'
      };
    });

    // Step 3: Use data from step 1 and 2
    await step.do('send confirmation', async () => {
      await sendEmail({
        to: userData.email,       // ✅ Still accessible
        subject: `Order ${orderData.orderId} confirmed`
      });
    });
  }
}

typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // 步骤1:获取数据
    const userData = await step.do('fetch user', async () => {
      return { id: 123, email: 'user@example.com' };
    });

    // 步骤2:使用步骤1的数据
    const orderData = await step.do('create order', async () => {
      return {
        userId: userData.id,      // ✅ 访问前一步的数据
        userEmail: userData.email,
        orderId: 'ORD-456'
      };
    });

    // 步骤3:使用步骤1和步骤2的数据
    await step.do('send confirmation', async () => {
      await sendEmail({
        to: userData.email,       // ✅ 仍可访问
        subject: `Order ${orderData.orderId} confirmed`
      });
    });
  }
}

Observability

可观测性

Built-in Metrics

内置指标

Workflows automatically track:
  • Instance status: queued, running, complete, errored, paused
  • Step execution: start/end times, duration, success/failure
  • Retry history: attempts, errors, delays
  • Sleep state: when workflow will wake up
  • Output: return values from steps and run()
Workflows会自动跟踪以下指标:
  • 实例状态:排队中、运行中、已完成、已出错、已暂停
  • 步骤执行:开始/结束时间、持续时长、成功/失败
  • 重试历史:尝试次数、错误信息、延迟时间
  • 睡眠状态:工作流唤醒时间
  • 输出:步骤和run()方法的返回值

View Metrics in Dashboard

在控制台查看指标

Access via Cloudflare dashboard:
  1. Workers & Pages
  2. Select your workflow
  3. View instances and metrics
Metrics include:
  • Total instances created
  • Success/error rates
  • Average execution time
  • Step-level performance
通过Cloudflare控制台访问:
  1. 进入Workers & Pages
  2. 选择你的工作流
  3. 查看实例和指标
指标包括:
  • 创建的实例总数
  • 成功率/错误率
  • 平均执行时间
  • 步骤级性能

Programmatic Access

程序化访问

typescript
// Get instance status
const instance = await env.MY_WORKFLOW.get(instanceId);
const status = await instance.status();

console.log(status);
// {
//   status: 'complete',
//   error: null,
//   output: { userId: '123', status: 'processed' }
// }

typescript
// 获取实例状态
const instance = await env.MY_WORKFLOW.get(instanceId);
const status = await instance.status();

console.log(status);
// {
//   status: 'complete',
//   error: null,
//   output: { userId: '123', status: 'processed' }
// }

Limits

限制

FeatureLimit
Max workflow duration30 days
Max steps per workflow10,000
Max sleep/sleepUntil duration30 days
Max step timeout15 minutes
Max concurrent instancesUnlimited (autoscales)
Max payload size128 KB
Max step output size128 KB
Max waitForEvent timeout30 days
Max retry limitInfinity (configurable)
Notes:
  • step.sleep()
    and
    step.sleepUntil()
    do NOT count toward 10,000 step limit
  • Workflows can run for up to 30 days total
  • Each step execution limited to 15 minutes max
  • Retries count as separate attempts, not separate steps

功能限制
工作流最长持续时间30天
每个工作流的最大步骤数10,000
sleep/sleepUntil最长持续时间30天
步骤最长超时时间15分钟
最大并发实例数无限制(自动扩缩容)
最大负载大小128 KB
步骤最大输出大小128 KB
waitForEvent最长超时时间30天
最大重试次数Infinity(可配置)
注意:
  • step.sleep()
    step.sleepUntil()
    不计入10,000步的限制
  • 工作流总运行时长最长为30天
  • 每个步骤的单次执行最长为15分钟
  • 重试会被计为单独的尝试,而非单独的步骤

Pricing

定价

Requires Workers Paid plan ($5/month)
Workflow Executions:
  • First 10,000,000 step executions/month: FREE
  • After that: $0.30 per million step executions
What counts as a step execution:
  • Each
    step.do()
    call
  • Each retry of a step
  • step.sleep()
    ,
    step.sleepUntil()
    ,
    step.waitForEvent()
    do NOT count
Cost examples:
  • Workflow with 5 steps, no retries: 5 step executions
  • Workflow with 3 steps, 1 step retries 2 times: 5 step executions (3 + 2)
  • 10M simple workflows/month (5 steps each): ((50M - 10M) / 1M) × $0.30 = $12/month

需要Workers付费计划(每月5美元)
工作流执行:
  • 每月前10,000,000次步骤执行:免费
  • 超出部分:每百万次步骤执行0.30美元
什么会被计为步骤执行:
  • 每次
    step.do()
    调用
  • 步骤的每次重试
  • step.sleep()
    step.sleepUntil()
    step.waitForEvent()
    不计入
成本示例:
  • 包含5个步骤、无重试的工作流:5次步骤执行
  • 包含3个步骤、其中1个步骤重试2次的工作流:5次步骤执行(3+2)
  • 每月1000万个简单工作流(每个5步):((50,000,000 - 10,000,000) / 1,000,000) × 0.30 = 每月12美元

Always Do ✅

最佳实践 ✅

  1. Use descriptive step names - "fetch user data", not "step 1"
  2. Return serializable values only - primitives, arrays, plain objects
  3. Use NonRetryableError for terminal errors - auth failures, invalid input
  4. Configure retry limits - avoid infinite retries unless necessary
  5. Catch errors for optional steps - use try-catch if step can fail gracefully
  6. Use exponential backoff for retries - default backoff for most cases
  7. Validate inputs early - fail fast with NonRetryableError if invalid
  8. Store workflow instance IDs - save to DB/KV to query status later
  9. Use waitForEvent for human-in-loop - approvals, external confirmations
  10. Monitor workflow metrics - track success rates and errors

  1. 使用描述性的步骤名称 - 比如“获取用户数据”,而非“步骤1”
  2. 仅返回可序列化的值 - 基本类型、数组、普通对象
  3. 对终止错误使用NonRetryableError - 认证失败、无效输入等
  4. 配置重试限制 - 除非必要,避免无限重试
  5. 为可选步骤添加错误捕获 - 使用try-catch让步骤可优雅失败
  6. 重试使用指数间隔 - 大多数场景的默认推荐
  7. 尽早验证输入 - 如果输入无效,使用NonRetryableError快速失败
  8. 存储工作流实例ID - 保存到数据库/KV中,以便后续查询状态
  9. 对人工参与的流程使用waitForEvent - 审批、外部确认等场景
  10. 监控工作流指标 - 跟踪成功率和错误率

Never Do ❌

禁忌 ❌

  1. Never return functions from steps - will throw serialization error
  2. Never create circular references - will fail to serialize
  3. Never assume steps execute immediately - they may retry or sleep
  4. Never use blocking operations - use step.do() for async work
  5. Never exceed 128 KB payload/output - will fail
  6. Never retry non-idempotent operations infinitely - use retry limits
  7. Never ignore serialization errors - fix the data structure
  8. Never use workflows for real-time operations - use Durable Objects instead
  9. Never skip error handling for critical steps - wrap in try-catch or use NonRetryableError
  10. Never assume step order is guaranteed across retries - each step is independent

  1. 永远不要从步骤中返回函数 - 会抛出序列化错误
  2. 永远不要创建循环引用 - 会序列化失败
  3. 永远不要假设步骤会立即执行 - 步骤可能会重试或进入睡眠
  4. 永远不要使用阻塞操作 - 异步任务请使用step.do()
  5. 永远不要超出128 KB的负载/输出限制 - 会导致失败
  6. 永远不要对非幂等操作无限重试 - 请设置重试限制
  7. 永远不要忽略序列化错误 - 修复数据结构
  8. 永远不要将工作流用于实时操作 - 请使用Durable Objects
  9. 永远不要忽略关键步骤的错误处理 - 使用try-catch或NonRetryableError
  10. 永远不要假设重试后步骤顺序不变 - 每个步骤都是独立的

Troubleshooting

故障排查

Issue: "Cannot perform I/O on behalf of a different request"

问题:"Cannot perform I/O on behalf of a different request"

Cause: Trying to use I/O objects created in one request context from another request handler
Solution: Always perform I/O within
step.do()
callbacks
typescript
// ❌ Bad - I/O outside step
const response = await fetch('https://api.example.com/data');
const data = await response.json();

await step.do('use data', async () => {
  // Using data from outside step's I/O context
  return data;  // This will fail!
});

// ✅ Good - I/O inside step
const data = await step.do('fetch data', async () => {
  const response = await fetch('https://api.example.com/data');
  return await response.json();  // ✅ Correct
});

原因: 尝试在一个请求上下文中创建的I/O对象,在另一个请求处理器中使用
解决方案: 始终在
step.do()
的回调中执行I/O操作
typescript
// ❌ 错误 - I/O操作在步骤外
const response = await fetch('https://api.example.com/data');
const data = await response.json();

await step.do('use data', async () => {
  // 使用步骤外I/O操作获取的数据
  return data;  // 此代码会失败!
});

// ✅ 正确 - I/O操作在步骤内
const data = await step.do('fetch data', async () => {
  const response = await fetch('https://api.example.com/data');
  return await response.json();  // ✅ 正确
});

Issue: NonRetryableError behaves differently in dev vs production

问题:NonRetryableError在开发环境和生产环境表现不同

Known Issue: Throwing NonRetryableError with empty message in dev mode causes retries, but works correctly in production
Workaround: Always provide a message to NonRetryableError
typescript
// ❌ May retry in dev
throw new NonRetryableError();

// ✅ Works consistently
throw new NonRetryableError('User not found');

已知问题: 在开发环境中,抛出不带消息的NonRetryableError会导致重试,但在生产环境中表现正常
解决方法: 始终为NonRetryableError提供错误消息
typescript
// ❌ 在开发环境中可能会重试
throw new NonRetryableError();

// ✅ 在所有环境中表现一致
throw new NonRetryableError('User not found');

Issue: "The requested module 'cloudflare:workers' does not provide an export named 'WorkflowEvent'"

问题:"The requested module 'cloudflare:workers' does not provide an export named 'WorkflowEvent'"

Cause: Incorrect import or outdated @cloudflare/workers-types
Solution:
bash
undefined
原因: 导入错误或@cloudflare/workers-types版本过时
解决方案:
bash
undefined

Update types

更新类型定义

npm install -D @cloudflare/workers-types@latest
npm install -D @cloudflare/workers-types@latest

Ensure correct import

确保导入正确

import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; import { NonRetryableError } from 'cloudflare:workflows';

---
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; import { NonRetryableError } from 'cloudflare:workflows';

---

Issue: Step returns undefined instead of expected value

问题:步骤返回undefined而非预期值

Cause: Step callback doesn't return a value
Solution: Always return from step callbacks
typescript
// ❌ Bad - no return
const result = await step.do('get data', async () => {
  const data = await fetchData();
  // Missing return!
});
console.log(result);  // undefined

// ✅ Good - explicit return
const result = await step.do('get data', async () => {
  const data = await fetchData();
  return data;  // ✅
});

原因: 步骤的回调函数没有返回值
解决方案: 确保步骤回调函数有返回值
typescript
// ❌ 错误 - 没有返回值
const result = await step.do('get data', async () => {
  const data = await fetchData();
  // 缺少return!
});
console.log(result);  // undefined

// ✅ 正确 - 显式返回
const result = await step.do('get data', async () => {
  const data = await fetchData();
  return data;  // ✅
});

Issue: Workflow instance stuck in "running" state

问题:工作流实例一直处于"running"状态

Possible causes:
  1. Step is sleeping for long duration
  2. Step is waiting for event that never arrives
  3. Step is retrying with long backoff
Solution:
bash
undefined
可能原因:
  1. 步骤正在长时间睡眠
  2. 步骤在等待永远不会到达的事件
  3. 步骤在使用长间隔重试
解决方案:
bash
undefined

Check instance details

查看实例详情

npx wrangler workflows instances describe my-workflow <instance-id>
npx wrangler workflows instances describe my-workflow <instance-id>

Look for:

重点查看:

- Sleep state (will show wake time)

- 睡眠状态(会显示唤醒时间)

- Waiting for event (will show event type and timeout)

- 等待的事件(会显示事件类型和超时时间)

- Retry history (will show attempts and delays)

- 重试历史(会显示尝试次数和延迟)


---

---

Production Checklist

生产环境检查清单

Before deploying workflows to production:
  • All steps have descriptive names
  • Retry limits configured for all steps
  • NonRetryableError used for terminal errors
  • Critical steps have error handling
  • Optional steps wrapped in try-catch
  • No non-serializable values returned
  • Payload sizes under 128 KB
  • Workflow duration under 30 days
  • Instance IDs stored for status queries
  • Monitoring and alerting configured
  • waitForEvent timeouts configured
  • Tested in development environment
  • Tested retry behavior
  • Tested error scenarios

在将工作流部署到生产环境前,请确认:
  • 所有步骤都有描述性名称
  • 所有步骤都配置了重试限制
  • 对终止错误使用了NonRetryableError
  • 关键步骤有错误处理
  • 可选步骤被包裹在try-catch中
  • 没有返回不可序列化的值
  • 负载大小不超过128 KB
  • 工作流持续时长不超过30天
  • 实例ID已存储,可用于查询状态
  • 已配置监控和告警
  • 已配置waitForEvent的超时时间
  • 已在开发环境中测试
  • 已测试重试行为
  • 已测试错误场景

Related Documentation

相关文档


Last Updated: 2025-10-22 Version: 1.0.0 Maintainer: Jeremy Dawes | jeremy@jezweb.net

最后更新:2025-10-22 版本:1.0.0 维护者:Jeremy Dawes | jeremy@jezweb.net