taskflow

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

TaskFlow

TaskFlow

Use TaskFlow when a job needs to outlive one prompt or one detached run, but you still want one owner session, one return context, and one place to inspect or resume the work.
当作业需要在单次提示或单次独立运行之外持续存在,但你仍希望拥有单个所有者会话、单个返回上下文以及单个用于检查或恢复工作的位置时,请使用TaskFlow。

When to use it

适用场景

  • Multi-step background work with one owner
  • Work that waits on detached ACP or subagent tasks
  • Jobs that may need to emit one clear update back to the owner
  • Jobs that need small persisted state between steps
  • Plugin or tool work that must survive restarts and revision conflicts cleanly
  • 属于同一所有者的多步骤后台工作
  • 需要等待独立ACP或subagent任务的工作
  • 可能需要向所有者发送清晰更新的作业
  • 步骤间需要少量持久化状态的作业
  • 必须能在重启和版本冲突时正常运行的插件或工具工作

What TaskFlow owns

TaskFlow的职责

  • flow identity
  • owner session and requester origin
  • currentStep
    ,
    stateJson
    , and
    waitJson
  • linked child tasks and their parent flow id
  • finish, fail, cancel, waiting, and blocked state
  • revision tracking for conflict-safe mutations
It does not own branching or business logic. Put that in Lobster, acpx, or the calling code.
  • 流标识
  • 所有者会话和请求来源
  • currentStep
    stateJson
    waitJson
  • 关联的子任务及其父流ID
  • 完成、失败、取消、等待和阻塞状态
  • 版本跟踪,确保变更无冲突
负责分支或业务逻辑。请将这些逻辑放在Lobster、acpx或调用代码中。

Current runtime shape

当前运行时形态

Canonical plugin/runtime entrypoint:
  • api.runtime.tasks.flow
  • api.runtime.taskFlow
    still exists as an alias, but
    api.runtime.tasks.flow
    is the canonical shape
Binding:
  • api.runtime.tasks.flow.fromToolContext(ctx)
    when you already have trusted tool context with
    sessionKey
  • api.runtime.tasks.flow.bindSession({ sessionKey, requesterOrigin })
    when your binding layer already resolved the session and delivery context
Managed-flow lifecycle:
  1. createManaged(...)
  2. runTask(...)
  3. setWaiting(...)
    when waiting on a person or an external system
  4. resume(...)
    when work can continue
  5. finish(...)
    or
    fail(...)
  6. requestCancel(...)
    or
    cancel(...)
    when the whole job should stop
标准插件/运行时入口:
  • api.runtime.tasks.flow
  • api.runtime.taskFlow
    仍作为别名存在,但
    api.runtime.tasks.flow
    是标准形态
绑定方式:
  • 当你已拥有包含
    sessionKey
    的可信工具上下文时,使用
    api.runtime.tasks.flow.fromToolContext(ctx)
  • 当你的绑定层已解析会话和交付上下文时,使用
    api.runtime.tasks.flow.bindSession({ sessionKey, requesterOrigin })
托管流生命周期:
  1. createManaged(...)
  2. runTask(...)
  3. 等待人员或外部系统时调用
    setWaiting(...)
  4. 工作可继续时调用
    resume(...)
  5. 调用
    finish(...)
    fail(...)
  6. 整个作业需要停止时调用
    requestCancel(...)
    cancel(...)

Design constraints

设计约束

  • Use managed TaskFlows when your code owns the orchestration.
  • One-task mirrored flows are created by core runtime for detached ACP/subagent work; this skill is mainly about managed flows.
  • Treat
    stateJson
    as the persisted state bag. There is no separate
    setFlowOutput
    or
    appendFlowOutput
    API.
  • Every mutating method after creation is revision-checked. Carry forward the latest
    flow.revision
    after each successful mutation.
  • runTask(...)
    links the child task to the flow. Use it instead of manually creating detached tasks when you want parent orchestration.
  • 当你的代码负责编排时,请使用托管TaskFlow。
  • 核心运行时会为独立ACP/subagent工作创建单任务镜像流;本技能主要介绍托管流。
  • stateJson
    视为持久化状态包。没有单独的
    setFlowOutput
    appendFlowOutput
    API。
  • 创建后的每个变更方法都会进行版本校验。每次成功变更后,需沿用最新的
    flow.revision
  • runTask(...)
    会将子任务与流关联。当你需要父级编排时,请使用它而非手动创建独立任务。

Example shape

示例代码

ts
const taskFlow = api.runtime.tasks.flow.fromToolContext(ctx);

const created = taskFlow.createManaged({
  controllerId: "my-plugin/inbox-triage",
  goal: "triage inbox",
  currentStep: "classify",
  stateJson: {
    businessThreads: [],
    personalItems: [],
    eodSummary: [],
  },
});

const classify = taskFlow.runTask({
  flowId: created.flowId,
  runtime: "acp",
  childSessionKey: "agent:main:subagent:classifier",
  runId: "inbox-classify-1",
  task: "Classify inbox messages",
  status: "running",
  startedAt: Date.now(),
  lastEventAt: Date.now(),
});

if (!classify.created) {
  throw new Error(classify.reason);
}

const waiting = taskFlow.setWaiting({
  flowId: created.flowId,
  expectedRevision: created.revision,
  currentStep: "await_business_reply",
  stateJson: {
    businessThreads: ["slack:thread-1"],
    personalItems: [],
    eodSummary: [],
  },
  waitJson: {
    kind: "reply",
    channel: "slack",
    threadKey: "slack:thread-1",
  },
});

if (!waiting.applied) {
  throw new Error(waiting.code);
}

const resumed = taskFlow.resume({
  flowId: waiting.flow.flowId,
  expectedRevision: waiting.flow.revision,
  status: "running",
  currentStep: "finalize",
  stateJson: waiting.flow.stateJson,
});

if (!resumed.applied) {
  throw new Error(resumed.code);
}

taskFlow.finish({
  flowId: resumed.flow.flowId,
  expectedRevision: resumed.flow.revision,
  stateJson: resumed.flow.stateJson,
});
ts
const taskFlow = api.runtime.tasks.flow.fromToolContext(ctx);

const created = taskFlow.createManaged({
  controllerId: "my-plugin/inbox-triage",
  goal: "triage inbox",
  currentStep: "classify",
  stateJson: {
    businessThreads: [],
    personalItems: [],
    eodSummary: [],
  },
});

const classify = taskFlow.runTask({
  flowId: created.flowId,
  runtime: "acp",
  childSessionKey: "agent:main:subagent:classifier",
  runId: "inbox-classify-1",
  task: "Classify inbox messages",
  status: "running",
  startedAt: Date.now(),
  lastEventAt: Date.now(),
});

if (!classify.created) {
  throw new Error(classify.reason);
}

const waiting = taskFlow.setWaiting({
  flowId: created.flowId,
  expectedRevision: created.revision,
  currentStep: "await_business_reply",
  stateJson: {
    businessThreads: ["slack:thread-1"],
    personalItems: [],
    eodSummary: [],
  },
  waitJson: {
    kind: "reply",
    channel: "slack",
    threadKey: "slack:thread-1",
  },
});

if (!waiting.applied) {
  throw new Error(waiting.code);
}

const resumed = taskFlow.resume({
  flowId: waiting.flow.flowId,
  expectedRevision: waiting.flow.revision,
  status: "running",
  currentStep: "finalize",
  stateJson: waiting.flow.stateJson,
});

if (!resumed.applied) {
  throw new Error(resumed.code);
}

taskFlow.finish({
  flowId: resumed.flow.flowId,
  expectedRevision: resumed.flow.revision,
  stateJson: resumed.flow.stateJson,
});

Keep conditionals above the runtime

将条件判断放在运行时之上

Use the flow runtime for state and task linkage. Keep decisions in the authoring layer:
  • business
    → post to Slack and wait
  • personal
    → notify the owner now
  • later
    → append to an end-of-day summary bucket
使用流运行时处理状态和任务关联。将决策逻辑放在创作层:
  • business
    → 发布到Slack并等待
  • personal
    → 立即通知所有者
  • later
    → 添加到当日结束总结桶中

Operational pattern

操作模式

  • Store only the minimum state needed to resume.
  • Put human-readable wait reasons in
    blockedSummary
    or structured wait metadata in
    waitJson
    .
  • Use
    getTaskSummary(flowId)
    when the orchestrator needs a compact health view of child work.
  • Use
    requestCancel(...)
    when a caller wants the flow to stop scheduling immediately.
  • Use
    cancel(...)
    when you also want active linked child tasks cancelled.
  • 仅存储恢复工作所需的最小状态。
  • 将人类可读的等待原因放在
    blockedSummary
    中,或将结构化等待元数据放在
    waitJson
    中。
  • 当编排器需要子工作的简洁健康视图时,使用
    getTaskSummary(flowId)
  • 当调用方希望流立即停止调度时,使用
    requestCancel(...)
  • 当你还希望取消活跃的关联子任务时,使用
    cancel(...)

Examples

示例

  • See
    skills/taskflow/examples/inbox-triage.lobster
  • See
    skills/taskflow/examples/pr-intake.lobster
  • See
    skills/taskflow-inbox-triage/SKILL.md
    for a concrete routing pattern
  • 查看
    skills/taskflow/examples/inbox-triage.lobster
  • 查看
    skills/taskflow/examples/pr-intake.lobster
  • 查看
    skills/taskflow-inbox-triage/SKILL.md
    获取具体的路由模式