using-workflows

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Working with Workflows

工作流使用指南

Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
创建并运行包含步骤、流处理和Agent执行的持久化工作流,涵盖工作流的启动、恢复以及结果持久化。

Working with Workflows

工作流使用指南

Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
See:

创建并运行包含步骤、流处理和Agent执行的持久化工作流,涵盖工作流的启动、恢复以及结果持久化。
参考:

Workflow Folder Structure

工作流文件夹结构

Each workflow has its own subfolder in
src/workflows/
:
src/workflows/
  steps/           # Shared step functions
    stream.ts      # UI message stream helpers
  chat/
    index.ts       # Workflow orchestration function ("use workflow")
    steps/         # Workflow-specific steps ("use step")
      history.ts
      logger.ts
      name-chat.ts
    types.ts       # Workflow-specific types
  • workflows/steps/
    - Shared step functions reusable across workflows (e.g., stream helpers).
  • index.ts
    - Contains the main workflow function with the
    "use workflow"
    directive. Orchestrates the workflow by calling step functions.
  • steps/
    - Contains individual step functions with the
    "use step"
    directive. Each step is a durable checkpoint.
  • types.ts
    - Type definitions for the workflow's UI messages.

每个工作流在
src/workflows/
目录下都有独立的子文件夹:
src/workflows/
  steps/           # Shared step functions
    stream.ts      # UI message stream helpers
  chat/
    index.ts       # Workflow orchestration function ("use workflow")
    steps/         # Workflow-specific steps ("use step")
      history.ts
      logger.ts
      name-chat.ts
    types.ts       # Workflow-specific types
  • workflows/steps/
    - 可在多个工作流中复用的共享步骤函数(例如流处理工具)。
  • index.ts
    - 包含带有
    "use workflow"
    指令的主工作流函数,通过调用步骤函数来编排工作流。
  • steps/
    - 包含带有
    "use step"
    指令的独立步骤函数,每个步骤都是一个持久化检查点。
  • types.ts
    - 工作流专属的UI消息类型定义。

Creating a Workflow

创建工作流

Define workflows with the
"use workflow"
directive:
typescript
// src/workflows/chat/index.ts
import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, userMessage }) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  // Persist user message
  await persistUserMessage({ chatId, message: userMessage });

  // Create assistant placeholder with runId for resumption
  const messageId = await createAssistantMessage({
    chatId,
    runId: workflowRunId,
  });

  // Get message history
  const history = await getMessageHistory(chatId);

  // Start the UI message stream
  await startStream(messageId);

  // Run agent with streaming
  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  // Persist and finalize
  await persistMessageParts({ chatId, messageId, parts });

  // Finish the UI message stream
  await finishStream();

  await removeRunId(messageId);
}
使用
"use workflow"
指令定义工作流:
typescript
// src/workflows/chat/index.ts
import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, userMessage }) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  // Persist user message
  await persistUserMessage({ chatId, message: userMessage });

  // Create assistant placeholder with runId for resumption
  const messageId = await createAssistantMessage({
    chatId,
    runId: workflowRunId,
  });

  // Get message history
  const history = await getMessageHistory(chatId);

  // Start the UI message stream
  await startStream(messageId);

  // Run agent with streaming
  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  // Persist and finalize
  await persistMessageParts({ chatId, messageId, parts });

  // Finish the UI message stream
  await finishStream();

  await removeRunId(messageId);
}

Starting a Workflow

启动工作流

Use the
start
function from
workflow/api
:
typescript
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";

const run = await start(chatWorkflow, [{ chatId, userMessage }]);

// run.runId - unique identifier for this run
// run.readable - stream of UI message chunks
使用
workflow/api
中的
start
函数:
typescript
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";

const run = await start(chatWorkflow, [{ chatId, userMessage }]);

// run.runId - unique identifier for this run
// run.readable - stream of UI message chunks

Resuming a Workflow Stream

恢复工作流流

Use
getRun
to reconnect to an in-progress or completed workflow:
typescript
import { getRun } from "workflow/api";

const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });
使用
getRun
重新连接到正在进行或已完成的工作流:
typescript
import { getRun } from "workflow/api";

const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });

Using Steps

使用步骤

Steps are durable checkpoints that persist their results:
typescript
async function getMessageHistory(chatId: string) {
  "use step";

  const dbMessages = await getChatMessages(chatId);
  return convertDbMessagesToUIMessages(dbMessages);
}

步骤是持久化检查点,会保存执行结果:
typescript
async function getMessageHistory(chatId: string) {
  "use step";

  const dbMessages = await getChatMessages(chatId);
  return convertDbMessagesToUIMessages(dbMessages);
}

Streaming UIMessageChunks

流式传输UIMessageChunks

When streaming
UIMessageChunk
responses to clients (e.g., chat messages), you must signal the start and end of the stream. This is required for proper stream framing with
WorkflowChatTransport
.
Always call
startStream()
before
agent.run()
and
finishStream()
after:
typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, messageId }) {
  "use workflow";

  const history = await getMessageHistory(chatId);

  // Signal stream start with the message ID
  await startStream(messageId);

  // Run agent - streams UIMessageChunks to the client
  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  await persistMessageParts({ chatId, messageId, parts });

  // Signal stream end and close the writable
  await finishStream();
}
The stream step functions write
UIMessageChunk
messages:
  • startStream(messageId)
    - Writes
    { type: "start", messageId }
    to signal a new message
  • finishStream()
    - Writes
    { type: "finish", finishReason: "stop" }
    and closes the stream
Without these signals, the client's
WorkflowChatTransport
cannot properly parse the streamed response.

向客户端流式传输
UIMessageChunk
响应(例如聊天消息)时,必须标记流的开始和结束。这是
WorkflowChatTransport
正确解析流响应的必要条件。
务必在
agent.run()
之前调用
startStream()
,之后调用
finishStream()
typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, messageId }) {
  "use workflow";

  const history = await getMessageHistory(chatId);

  // Signal stream start with the message ID
  await startStream(messageId);

  // Run agent - streams UIMessageChunks to the client
  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  await persistMessageParts({ chatId, messageId, parts });

  // Signal stream end and close the writable
  await finishStream();
}
流步骤函数会写入
UIMessageChunk
消息:
  • startStream(messageId)
    - 写入
    { type: "start", messageId }
    以标记新消息开始
  • finishStream()
    - 写入
    { type: "finish", finishReason: "stop" }
    并关闭流
如果没有这些标记,客户端的
WorkflowChatTransport
将无法正确解析流式响应。

Getting Workflow Metadata

获取工作流元数据

Access the current run's metadata:
typescript
import { getWorkflowMetadata } from "workflow";

export async function chatWorkflow({ chatId }) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  // Store runId for resumption
  await createAssistantMessage({ chatId, runId: workflowRunId });
}
访问当前运行实例的元数据:
typescript
import { getWorkflowMetadata } from "workflow";

export async function chatWorkflow({ chatId }) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  // Store runId for resumption
  await createAssistantMessage({ chatId, runId: workflowRunId });
}

Workflow-Safe Logging

工作流安全日志

The workflow runtime doesn't support Node.js modules. Wrap logger calls in steps:
typescript
// src/workflows/chat/steps/logger.ts
import { logger } from "@/lib/logging/logger";

export async function log(
  level: "info" | "warn" | "error" | "debug",
  message: string,
  data?: Record<string, unknown>,
): Promise<void> {
  "use step";

  if (data) {
    logger[level](data, message);
  } else {
    logger[level](message);
  }
}
工作流运行时不支持Node.js模块,需将日志调用包装在步骤中:
typescript
// src/workflows/chat/steps/logger.ts
import { logger } from "@/lib/logging/logger";

export async function log(
  level: "info" | "warn" | "error" | "debug",
  message: string,
  data?: Record<string, unknown>,
): Promise<void> {
  "use step";

  if (data) {
    logger[level](data, message);
  } else {
    logger[level](message);
  }
}

Running Agents in Workflows

在工作流中运行Agent

Use the custom
Agent
class for full streaming control:
typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, userMessage }) {
  "use workflow";

  const messageId = await createAssistantMessage({ chatId, runId });
  const history = await getMessageHistory(chatId);

  await startStream(messageId);

  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  await persistMessageParts({ chatId, messageId, parts });
  await finishStream();
}
使用自定义
Agent
类实现完整的流控制:
typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, userMessage }) {
  "use workflow";

  const messageId = await createAssistantMessage({ chatId, runId });
  const history = await getMessageHistory(chatId);

  await startStream(messageId);

  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  await persistMessageParts({ chatId, messageId, parts });
  await finishStream();
}

Persisting Workflow Results

持久化工作流结果

Save agent output using step functions. The
assertChatAgentParts
function validates that generic
UIMessage["parts"]
(returned by agents) match your application's specific tool and data types:
typescript
// src/workflows/chat/steps/history.ts
import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";

export async function persistMessageParts({
  chatId,
  messageId,
  parts,
}: {
  chatId: string;
  messageId: string;
  parts: UIMessage["parts"];
}): Promise<void> {
  "use step";

  assertChatAgentParts(parts);

  await insertMessageParts(chatId, messageId, parts);

  // Update chat timestamp
  await db
    .update(chats)
    .set({ updatedAt: new Date() })
    .where(eq(chats.id, chatId));
}

使用步骤函数保存Agent输出。
assertChatAgentParts
函数用于验证Agent返回的通用
UIMessage["parts"]
是否与应用的特定工具和数据类型匹配:
typescript
// src/workflows/chat/steps/history.ts
import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";

export async function persistMessageParts({
  chatId,
  messageId,
  parts,
}: {
  chatId: string;
  messageId: string;
  parts: UIMessage["parts"];
}): Promise<void> {
  "use step";

  assertChatAgentParts(parts);

  await insertMessageParts(chatId, messageId, parts);

  // Update chat timestamp
  await db
    .update(chats)
    .set({ updatedAt: new Date() })
    .where(eq(chats.id, chatId));
}

References

参考资料