maravilla-workflows

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Maravilla Workflows

Maravilla 工作流

Workflows are durable, replay-based functions. They run inside the runtime, persist every step's output to a ledger, and survive process restarts: on resume, the runtime replays the workflow function up to the last completed step, then continues from there.
This makes them ideal for:
  • Multi-step processes where each step depends on the previous (
    step.run
    for at-most-once side effects)
  • Long sleeps (
    step.sleep('1h')
    ,
    step.sleepUntil(date)
    ) — the function isn't running while it sleeps
  • Waiting for an external event (
    step.waitForEvent
    )
  • Composition (
    step.invoke
    to call a child workflow)
Workflows are unconditionally enabled — there's no opt-in flag. They have no HTTP trigger; you start them from your runtime code via
platform.workflows.start(...)
.
Workflows 是持久化、基于重放的函数。它们在 runtime 中运行,将每个步骤的输出持久化到 ledger,并能在进程重启后继续:恢复时,runtime 会重放工作流函数直到最后一个已完成的步骤,然后从该点继续执行。
这使得它们非常适合以下场景:
  • 各步骤依赖前一步骤输出的多步骤流程(使用
    step.run
    实现最多一次的副作用)
  • 长时间延迟任务(
    step.sleep('1h')
    step.sleepUntil(date)
    )——函数在延迟期间不会运行
  • 等待外部事件(
    step.waitForEvent
  • 组合调用(使用
    step.invoke
    调用子工作流)
Workflows 默认无条件启用——无需启用标志。它们没有 HTTP 触发器;你需要通过 runtime 代码中的
platform.workflows.start(...)
来启动它们。

Defining a workflow

定义工作流

Workflow files live in
workflows/*.ts
(auto-discovered by the framework adapter at build time). Use
defineWorkflow
from the runtime subpath:
typescript
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';

interface Input { userId: string; reportId: string; }

export const buildReport = defineWorkflow<Input>(
  { id: 'build-report', options: { retries: 3, timeoutSecs: 60 * 60 } },
  async (input, step, ctx) => {
    const data = await step.run('fetch-data', async () => {
      return await fetchExpensiveData(input.userId);
    });

    const rendered = await step.run('render', async () => {
      return await renderPdf(data);
    });

    await step.run('upload', async () => {
      await ctx.platform.env.STORAGE.put(`reports/${input.reportId}.pdf`, rendered);
    });

    return { ok: true };
  },
);
Import note for v0.2.5:
defineWorkflow
is exposed at
@maravilla-labs/functions/workflows/runtime
. The docs'
platform/workflows
subpath does not resolve in this release.
工作流文件存放在
workflows/*.ts
中(框架适配器会在构建时自动发现)。使用 runtime 子路径中的
defineWorkflow
typescript
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';

interface Input { userId: string; reportId: string; }

export const buildReport = defineWorkflow<Input>(
  { id: 'build-report', options: { retries: 3, timeoutSecs: 60 * 60 } },
  async (input, step, ctx) => {
    const data = await step.run('fetch-data', async () => {
      return await fetchExpensiveData(input.userId);
    });

    const rendered = await step.run('render', async () => {
      return await renderPdf(data);
    });

    await step.run('upload', async () => {
      await ctx.platform.env.STORAGE.put(`reports/${input.reportId}.pdf`, rendered);
    });

    return { ok: true };
  },
);
v0.2.5 版本导入说明:
defineWorkflow
暴露在
@maravilla-labs/functions/workflows/runtime
。文档中提到的
platform/workflows
子路径在该版本中无法解析

Canonical example — the "click-watch" pattern

典型示例——“点击监控”模式

This is the demo's per-invitee click-watch workflow, lifted verbatim. One run per invitee escalates an unread-link warning twice over short windows, exits cleanly if the invitee record disappears, and emits live status via KV writes (which fire REN events for the owner's UI).
typescript
/**
 * One durable workflow per invitee.
 *
 *   t=0                       start (snapshot + go to sleep)
 *   t=first-grace             check 1 — if not clicked, flag `unclicked_first` (amber chip)
 *   t=first-grace + second    check 2 — if still not clicked, flag `unclicked_final` (red chip)
 *
 * Each `ctx.kv.put` on `inv:{nanoid}` fires a REN event the owner's guest
 * list is already subscribed to, so chips appear live with no reload.
 */
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';

interface Input { inviteeNanoid: string; inviteId: string; ownerUserId: string; }

export const inviteeClickWatch = defineWorkflow<Input>(
  { id: 'invitee-click-watch', options: { retries: 3, timeoutSecs: 7 * 24 * 3600 } },
  async (input, step, ctx) => {
    const kv = ctx.kv as { get: any; put: any };
    const key = `inv:${input.inviteeNanoid}`;

    await step.sleep('first-grace', '60s');

    const firstOutcome = await step.run('check-1', async () => {
      const raw = await kv.get('invites', key);
      if (!raw) return 'removed' as const;
      const invitee = JSON.parse(raw);
      if (invitee.clicked_at) return 'clicked' as const;
      invitee.unclicked_first = true;
      await kv.put('invites', key, JSON.stringify(invitee));
      return 'unclicked' as const;
    });

    if (firstOutcome === 'removed') return { outcome: 'invitee-removed' };

    await step.sleep('second-grace', '120s');

    const secondOutcome = await step.run('check-2', async () => {
      const raw = await kv.get('invites', key);
      if (!raw) return 'removed' as const;
      const invitee = JSON.parse(raw);
      if (invitee.clicked_at) return 'clicked' as const;
      invitee.unclicked_final = true;
      await kv.put('invites', key, JSON.stringify(invitee));
      return 'unclicked' as const;
    });

    if (secondOutcome === 'removed') return { outcome: 'invitee-removed' };

    return { outcome: 'done', firstOutcome, secondOutcome };
  },
);
Two patterns to copy:
  1. Wrap every side effect in
    step.run
    .
    Naked
    await fetch(...)
    or
    await kv.put(...)
    re-runs on replay.
    step.run('name', ...)
    is recorded in the ledger and skipped on replay if already completed.
  2. Exit cleanly on missing data. Returning early when the watched record is gone makes the workflow safe against deletions; you don't need to remember to cancel each run.
这是演示项目中针对每个受邀者的点击监控工作流,直接提取自代码。每个受邀者对应一个运行实例,会在短时间内两次升级未读链接警告;如果受邀者记录消失则干净退出,并通过 KV 写入发送实时状态(会触发 REN 事件,供所有者的 UI 使用)。
typescript
/**
 * 每个受邀者对应一个持久化工作流。
 *
 *   t=0                       启动(快照 + 进入延迟状态)
 *   t=first-grace             第一次检查——如果未点击,标记 `unclicked_first`(琥珀色标记)
 *   t=first-grace + second    第二次检查——如果仍未点击,标记 `unclicked_final`(红色标记)
 *
 * 每次对 `inv:{nanoid}` 执行 `ctx.kv.put` 都会触发所有者 guest 的 REN 事件
 * 列表已订阅该事件,因此标记会实时显示,无需刷新页面。
 */
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';

interface Input { inviteeNanoid: string; inviteId: string; ownerUserId: string; }

export const inviteeClickWatch = defineWorkflow<Input>(
  { id: 'invitee-click-watch', options: { retries: 3, timeoutSecs: 7 * 24 * 3600 } },
  async (input, step, ctx) => {
    const kv = ctx.kv as { get: any; put: any };
    const key = `inv:${input.inviteeNanoid}`;

    await step.sleep('first-grace', '60s');

    const firstOutcome = await step.run('check-1', async () => {
      const raw = await kv.get('invites', key);
      if (!raw) return 'removed' as const;
      const invitee = JSON.parse(raw);
      if (invitee.clicked_at) return 'clicked' as const;
      invitee.unclicked_first = true;
      await kv.put('invites', key, JSON.stringify(invitee));
      return 'unclicked' as const;
    });

    if (firstOutcome === 'removed') return { outcome: 'invitee-removed' };

    await step.sleep('second-grace', '120s');

    const secondOutcome = await step.run('check-2', async () => {
      const raw = await kv.get('invites', key);
      if (!raw) return 'removed' as const;
      const invitee = JSON.parse(raw);
      if (invitee.clicked_at) return 'clicked' as const;
      invitee.unclicked_final = true;
      await kv.put('invites', key, JSON.stringify(invitee));
      return 'unclicked' as const;
    });

    if (secondOutcome === 'removed') return { outcome: 'invitee-removed' };

    return { outcome: 'done', firstOutcome, secondOutcome };
  },
);
可借鉴的两种模式:
  1. 将所有副作用包裹在
    step.run
    。未包裹的
    await fetch(...)
    await kv.put(...)
    会在重放时再次运行。
    step.run('name', ...)
    会被记录到 ledger 中,如果已完成则在重放时跳过。
  2. 数据缺失时干净退出。当监控的记录消失时提前返回,可使工作流在删除操作下保持安全;无需记住取消每个运行实例。

The
step
API

step
API

step.run(name, fn)
— at-most-once side effect

step.run(name, fn)
—— 最多一次的副作用

typescript
const result = await step.run('charge-card', async () => {
  return await stripe.charges.create({ amount, source });
});
The first time this step runs,
fn
executes and its return value is persisted. On replay, the persisted value is returned without re-running
fn
. Each step name within a workflow run must be unique.
typescript
const result = await step.run('charge-card', async () => {
  return await stripe.charges.create({ amount, source });
});
该步骤首次运行时,
fn
会执行,其返回值会被持久化。重放时,会直接返回持久化的值而不会重新运行
fn
每个工作流运行中的步骤名称必须唯一

step.sleep(name, duration)
— short-form sleep

step.sleep(name, duration)
—— 简化版延迟

typescript
await step.sleep('cool-down', '30s');
await step.sleep('grace', '24h');
Duration formats:
<n>s
,
<n>m
,
<n>h
,
<n>d
. The function unwinds while sleeping — your isolate isn't pinned for hours.
typescript
await step.sleep('cool-down', '30s');
await step.sleep('grace', '24h');
时长格式:
<n>s
(秒)、
<n>m
(分钟)、
<n>h
(小时)、
<n>d
(天)。函数在延迟期间会释放资源——你的隔离实例不会被长时间占用。

step.sleepUntil(name, date)
— sleep to a wall-clock target

step.sleepUntil(name, date)
—— 延迟到指定时间点

typescript
await step.sleepUntil('event-start', new Date(invite.event_date));
Pass a
Date
or ISO-8601 string.
typescript
await step.sleepUntil('event-start', new Date(invite.event_date));
传入
Date
对象或 ISO-8601 格式的字符串。

step.waitForEvent(name, filter, options?)
— durable rendezvous

step.waitForEvent(name, filter, options?)
—— 持久化等待

typescript
const payment = await step.waitForEvent('payment-received', {
  type: 'payment.completed',
  match: { orderId: input.orderId },        // every key must equal in payload
}, { timeoutMs: 60 * 60 * 1000 });

if (!payment) {
  return { outcome: 'payment-timeout' };
}
Resolves when something else calls
platform.workflows.sendEvent('payment.completed', { orderId, ... })
and the
match
keys equal the payload's. Returns the payload, or
null
on timeout.
typescript
const payment = await step.waitForEvent('payment-received', {
  type: 'payment.completed',
  match: { orderId: input.orderId },        // 负载中的每个键必须匹配
}, { timeoutMs: 60 * 60 * 1000 });

if (!payment) {
  return { outcome: 'payment-timeout' };
}
当其他代码调用
platform.workflows.sendEvent('payment.completed', { orderId, ... })
match
中的键与负载中的值匹配时,该步骤会完成。返回负载内容,超时则返回
null

step.invoke(name, workflowId, input)
— child workflow

step.invoke(name, workflowId, input)
—— 子工作流

typescript
const handle = await step.invoke('child', 'send-receipt', { orderId, email });
const result = await handle.result();
Composes workflows. The child's full step history is its own; the parent records only the invocation result.
typescript
const handle = await step.invoke('child', 'send-receipt', { orderId, email });
const result = await handle.result();
组合调用工作流。子工作流的完整步骤历史独立存储;父工作流仅记录调用结果。

Starting and managing runs

启动和管理工作流实例

Run starts come from your normal runtime code (route handlers, event handlers, other workflows) — there is no HTTP trigger for workflows.
typescript
const platform = getPlatform();

// Start
const handle = await platform.workflows.start('build-report', { userId, reportId });
console.log(handle.runId);

// Get status (poll or surface in admin UI)
const run = await handle.status();
// { runId, workflowId, status: 'queued' | 'running' | 'sleeping' | 'waiting_event' | 'completed' | 'failed' | 'cancelled', ... }

// Step history (debugging)
const steps = await handle.history();

// Wait for completion
const output = await handle.result({ timeoutMs: 5 * 60_000 });

// Cancel
const cancelled = await handle.cancel();   // best-effort; returns true if it transitioned

// Get a handle to an existing run (no start)
const existing = platform.workflows.handle(savedRunId);
工作流实例的启动来自常规的 runtime 代码(路由处理器、事件处理器、其他工作流)——工作流没有 HTTP 触发器
typescript
const platform = getPlatform();

// 启动
const handle = await platform.workflows.start('build-report', { userId, reportId });
console.log(handle.runId);

// 获取状态(轮询或在管理界面展示)
const run = await handle.status();
// { runId, workflowId, status: 'queued' | 'running' | 'sleeping' | 'waiting_event' | 'completed' | 'failed' | 'cancelled', ... }

// 步骤历史(调试用)
const steps = await handle.history();

// 等待完成
const output = await handle.result({ timeoutMs: 5 * 60_000 });

// 取消
const cancelled = await handle.cancel();   // 尽力而为;如果状态转换成功则返回 true

// 获取已有实例的句柄(不启动新实例)
const existing = platform.workflows.handle(savedRunId);

Sending events to waiters

向等待中的实例发送事件

typescript
// Anywhere in your runtime code:
const woken = await platform.workflows.sendEvent('payment.completed', {
  orderId: '123',
  amount: 4200,
});
// woken: number of runs resolved
Match rules: the
eventType
you pass must equal the waiter's filter
type
; every key in the waiter's
match
must be present in
payload
with an equal value.
typescript
// 在 runtime 代码中的任意位置:
const woken = await platform.workflows.sendEvent('payment.completed', {
  orderId: '123',
  amount: 4200,
});
// woken: 被唤醒的实例数量
匹配规则:传入的
eventType
必须与等待者的过滤器
type
一致;等待者
match
中的每个键必须存在于
payload
中且值相等。

Patterns

模式

Lazy-start on first sight

首次触发时延迟启动

If you can't always reach a "creation" point — e.g. the invitee row may already exist — start the workflow lazily on any save and let the workflow itself short-circuit if it's a duplicate:
typescript
const handle = await platform.workflows.start('invitee-click-watch', {
  inviteeNanoid, inviteId, ownerUserId,
});
// Multiple starts → multiple runs; design the workflow to be safe on duplicates
// (e.g. include the nanoid in step names, or check a marker before flagging)
For strict deduplication, lookup by a stable key in KV:
typescript
const existing = await kv.get('workflow-runs', `click-watch:${inviteeNanoid}`);
if (!existing) {
  const handle = await platform.workflows.start(/* ... */);
  await kv.put('workflow-runs', `click-watch:${inviteeNanoid}`, handle.runId);
}
如果你无法总是找到“创建”节点——例如受邀者记录可能已存在——可在任意保存操作时延迟启动工作流,并让工作流自身在重复启动时短路:
typescript
const handle = await platform.workflows.start('invitee-click-watch', {
  inviteeNanoid, inviteId, ownerUserId,
});
// 多次启动会创建多个实例;需设计工作流以确保重复启动时安全
// (例如在步骤名称中包含 nanoid,或在标记前检查标记)
如需严格去重,可通过 KV 中的稳定键查询:
typescript
const existing = await kv.get('workflow-runs', `click-watch:${inviteeNanoid}`);
if (!existing) {
  const handle = await platform.workflows.start(/* ... */);
  await kv.put('workflow-runs', `click-watch:${inviteeNanoid}`, handle.runId);
}

Saga / compensation

事务补偿(Saga / compensation)

Each side effect lives in its own
step.run
. On failure, run compensating steps:
typescript
try {
  const charge = await step.run('charge', () => stripe.charges.create(/* ... */));
  await step.run('reserve-inventory', () => inventory.reserve(items));
  await step.run('ship', () => shipping.create(/* ... */));
} catch (err) {
  await step.run('refund', () => stripe.refunds.create({ charge: charge.id }));
  throw err;
}
每个副作用都放在独立的
step.run
中。失败时,执行补偿步骤:
typescript
try {
  const charge = await step.run('charge', () => stripe.charges.create(/* ... */));
  await step.run('reserve-inventory', () => inventory.reserve(items));
  await step.run('ship', () => shipping.create(/* ... */));
} catch (err) {
  await step.run('refund', () => stripe.refunds.create({ charge: charge.id }));
  throw err;
}

Reminder pipeline

提醒流水线

typescript
await step.sleep('1h-warning', '1h');
await step.run('send-1h-warning', () => platform.push.send(target, /* ... */));

await step.sleep('5m-warning', '55m');
await step.run('send-5m-warning', () => platform.push.send(target, /* ... */));

await step.sleepUntil('event-time', input.event_date);
await step.run('event-started', () => /* ... */);
typescript
await step.sleep('1h-warning', '1h');
await step.run('send-1h-warning', () => platform.push.send(target, /* ... */));

await step.sleep('5m-warning', '55m');
await step.run('send-5m-warning', () => platform.push.send(target, /* ... */));

await step.sleepUntil('event-time', input.event_date);
await step.run('event-started', () => /* ... */);

Pitfalls

注意事项

  • Naked side effects in the workflow body. Anything outside
    step.run
    re-runs on every replay. Even
    console.log
    is fine, but
    fetch
    ,
    kv.put
    ,
    db.insertOne
    are not — wrap them.
  • Non-deterministic logic outside steps.
    Math.random()
    ,
    Date.now()
    ,
    crypto.randomUUID()
    outside
    step.run
    will give different values on replay. Capture them inside a step.
  • Step name collisions. The runtime keys steps by
    name
    per-run. Reusing a name in the same run is undefined behavior — append an iteration counter if you loop.
  • Long timeouts.
    options.timeoutSecs
    is the whole-run budget. Make sure it covers worst-case sleeps + step durations.
  • Workflow vs event. If you only need to react once and quickly, use an event handler (see maravilla-events). Workflows pay a ledger cost per step.
  • 工作流主体中的未包裹副作用。任何
    step.run
    之外的代码都会在每次重放时再次运行。
    console.log
    没问题,但
    fetch
    kv.put
    db.insertOne
    不行——必须包裹它们。
  • 步骤外的非确定性逻辑
    Math.random()
    Date.now()
    crypto.randomUUID()
    step.run
    之外的代码会在重放时生成不同的值。需在步骤内捕获这些值。
  • 步骤名称冲突。runtime 会按每个实例的步骤名称作为键。同一实例中重复使用名称会导致未定义行为——如果循环执行,需添加迭代计数器。
  • 过长的超时时间
    options.timeoutSecs
    整个实例的执行时长上限。确保它能覆盖最坏情况下的延迟时间和步骤执行时间。
  • 工作流 vs 事件。如果你只需要快速响应一次事件,使用事件处理器(参见 maravilla-events)。工作流每个步骤都会产生账本存储成本。

Related skills

相关技能

  • maravilla-events — single-trigger handlers; often the entry point that starts a workflow
  • maravilla-realtime
    step.waitForEvent
    rendezvous source
  • maravilla-push — typical workflow side effect (reminders)
  • maravilla-kv — surfacing live workflow state to the UI
  • maravilla-events —— 单次触发处理器;通常是启动工作流的入口
  • maravilla-realtime ——
    step.waitForEvent
    的事件源
  • maravilla-push —— 工作流常见的副作用(提醒)
  • maravilla-kv —— 向 UI 展示工作流实时状态