maravilla-workflows
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMaravilla Workflows
Maravilla 工作流
Workflows are durable, replay-based functions. They run inside the runtime, persist every step's output to a ledger, and survive process restarts: on resume, the runtime replays the workflow function up to the last completed step, then continues from there.
This makes them ideal for:
- Multi-step processes where each step depends on the previous (for at-most-once side effects)
step.run - Long sleeps (,
step.sleep('1h')) — the function isn't running while it sleepsstep.sleepUntil(date) - Waiting for an external event ()
step.waitForEvent - Composition (to call a child workflow)
step.invoke
Workflows are unconditionally enabled — there's no opt-in flag. They have no HTTP trigger; you start them from your runtime code via .
platform.workflows.start(...)Workflows 是持久化、基于重放的函数。它们在 runtime 中运行,将每个步骤的输出持久化到 ledger,并能在进程重启后继续:恢复时,runtime 会重放工作流函数直到最后一个已完成的步骤,然后从该点继续执行。
这使得它们非常适合以下场景:
- 各步骤依赖前一步骤输出的多步骤流程(使用 实现最多一次的副作用)
step.run - 长时间延迟任务(、
step.sleep('1h'))——函数在延迟期间不会运行step.sleepUntil(date) - 等待外部事件()
step.waitForEvent - 组合调用(使用 调用子工作流)
step.invoke
Workflows 默认无条件启用——无需启用标志。它们没有 HTTP 触发器;你需要通过 runtime 代码中的 来启动它们。
platform.workflows.start(...)Defining a workflow
定义工作流
Workflow files live in (auto-discovered by the framework adapter at build time). Use from the runtime subpath:
workflows/*.tsdefineWorkflowtypescript
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { userId: string; reportId: string; }
export const buildReport = defineWorkflow<Input>(
{ id: 'build-report', options: { retries: 3, timeoutSecs: 60 * 60 } },
async (input, step, ctx) => {
const data = await step.run('fetch-data', async () => {
return await fetchExpensiveData(input.userId);
});
const rendered = await step.run('render', async () => {
return await renderPdf(data);
});
await step.run('upload', async () => {
await ctx.platform.env.STORAGE.put(`reports/${input.reportId}.pdf`, rendered);
});
return { ok: true };
},
);Import note for v0.2.5:is exposed atdefineWorkflow. The docs'@maravilla-labs/functions/workflows/runtimesubpath does not resolve in this release.platform/workflows
工作流文件存放在 中(框架适配器会在构建时自动发现)。使用 runtime 子路径中的 :
workflows/*.tsdefineWorkflowtypescript
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { userId: string; reportId: string; }
export const buildReport = defineWorkflow<Input>(
{ id: 'build-report', options: { retries: 3, timeoutSecs: 60 * 60 } },
async (input, step, ctx) => {
const data = await step.run('fetch-data', async () => {
return await fetchExpensiveData(input.userId);
});
const rendered = await step.run('render', async () => {
return await renderPdf(data);
});
await step.run('upload', async () => {
await ctx.platform.env.STORAGE.put(`reports/${input.reportId}.pdf`, rendered);
});
return { ok: true };
},
);v0.2.5 版本导入说明:暴露在defineWorkflow。文档中提到的@maravilla-labs/functions/workflows/runtime子路径在该版本中无法解析。platform/workflows
Canonical example — the "click-watch" pattern
典型示例——“点击监控”模式
This is the demo's per-invitee click-watch workflow, lifted verbatim. One run per invitee escalates an unread-link warning twice over short windows, exits cleanly if the invitee record disappears, and emits live status via KV writes (which fire REN events for the owner's UI).
typescript
/**
* One durable workflow per invitee.
*
* t=0 start (snapshot + go to sleep)
* t=first-grace check 1 — if not clicked, flag `unclicked_first` (amber chip)
* t=first-grace + second check 2 — if still not clicked, flag `unclicked_final` (red chip)
*
* Each `ctx.kv.put` on `inv:{nanoid}` fires a REN event the owner's guest
* list is already subscribed to, so chips appear live with no reload.
*/
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { inviteeNanoid: string; inviteId: string; ownerUserId: string; }
export const inviteeClickWatch = defineWorkflow<Input>(
{ id: 'invitee-click-watch', options: { retries: 3, timeoutSecs: 7 * 24 * 3600 } },
async (input, step, ctx) => {
const kv = ctx.kv as { get: any; put: any };
const key = `inv:${input.inviteeNanoid}`;
await step.sleep('first-grace', '60s');
const firstOutcome = await step.run('check-1', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_first = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (firstOutcome === 'removed') return { outcome: 'invitee-removed' };
await step.sleep('second-grace', '120s');
const secondOutcome = await step.run('check-2', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_final = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (secondOutcome === 'removed') return { outcome: 'invitee-removed' };
return { outcome: 'done', firstOutcome, secondOutcome };
},
);Two patterns to copy:
- Wrap every side effect in . Naked
step.runorawait fetch(...)re-runs on replay.await kv.put(...)is recorded in the ledger and skipped on replay if already completed.step.run('name', ...) - Exit cleanly on missing data. Returning early when the watched record is gone makes the workflow safe against deletions; you don't need to remember to cancel each run.
这是演示项目中针对每个受邀者的点击监控工作流,直接提取自代码。每个受邀者对应一个运行实例,会在短时间内两次升级未读链接警告;如果受邀者记录消失则干净退出,并通过 KV 写入发送实时状态(会触发 REN 事件,供所有者的 UI 使用)。
typescript
/**
* 每个受邀者对应一个持久化工作流。
*
* t=0 启动(快照 + 进入延迟状态)
* t=first-grace 第一次检查——如果未点击,标记 `unclicked_first`(琥珀色标记)
* t=first-grace + second 第二次检查——如果仍未点击,标记 `unclicked_final`(红色标记)
*
* 每次对 `inv:{nanoid}` 执行 `ctx.kv.put` 都会触发所有者 guest 的 REN 事件
* 列表已订阅该事件,因此标记会实时显示,无需刷新页面。
*/
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { inviteeNanoid: string; inviteId: string; ownerUserId: string; }
export const inviteeClickWatch = defineWorkflow<Input>(
{ id: 'invitee-click-watch', options: { retries: 3, timeoutSecs: 7 * 24 * 3600 } },
async (input, step, ctx) => {
const kv = ctx.kv as { get: any; put: any };
const key = `inv:${input.inviteeNanoid}`;
await step.sleep('first-grace', '60s');
const firstOutcome = await step.run('check-1', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_first = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (firstOutcome === 'removed') return { outcome: 'invitee-removed' };
await step.sleep('second-grace', '120s');
const secondOutcome = await step.run('check-2', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_final = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (secondOutcome === 'removed') return { outcome: 'invitee-removed' };
return { outcome: 'done', firstOutcome, secondOutcome };
},
);可借鉴的两种模式:
- 将所有副作用包裹在 中。未包裹的
step.run或await fetch(...)会在重放时再次运行。await kv.put(...)会被记录到 ledger 中,如果已完成则在重放时跳过。step.run('name', ...) - 数据缺失时干净退出。当监控的记录消失时提前返回,可使工作流在删除操作下保持安全;无需记住取消每个运行实例。
The step
API
stepstep
API
stepstep.run(name, fn)
— at-most-once side effect
step.run(name, fn)step.run(name, fn)
—— 最多一次的副作用
step.run(name, fn)typescript
const result = await step.run('charge-card', async () => {
return await stripe.charges.create({ amount, source });
});The first time this step runs, executes and its return value is persisted. On replay, the persisted value is returned without re-running . Each step name within a workflow run must be unique.
fnfntypescript
const result = await step.run('charge-card', async () => {
return await stripe.charges.create({ amount, source });
});该步骤首次运行时, 会执行,其返回值会被持久化。重放时,会直接返回持久化的值而不会重新运行 。每个工作流运行中的步骤名称必须唯一。
fnfnstep.sleep(name, duration)
— short-form sleep
step.sleep(name, duration)step.sleep(name, duration)
—— 简化版延迟
step.sleep(name, duration)typescript
await step.sleep('cool-down', '30s');
await step.sleep('grace', '24h');Duration formats: , , , . The function unwinds while sleeping — your isolate isn't pinned for hours.
<n>s<n>m<n>h<n>dtypescript
await step.sleep('cool-down', '30s');
await step.sleep('grace', '24h');时长格式:(秒)、(分钟)、(小时)、(天)。函数在延迟期间会释放资源——你的隔离实例不会被长时间占用。
<n>s<n>m<n>h<n>dstep.sleepUntil(name, date)
— sleep to a wall-clock target
step.sleepUntil(name, date)step.sleepUntil(name, date)
—— 延迟到指定时间点
step.sleepUntil(name, date)typescript
await step.sleepUntil('event-start', new Date(invite.event_date));Pass a or ISO-8601 string.
Datetypescript
await step.sleepUntil('event-start', new Date(invite.event_date));传入 对象或 ISO-8601 格式的字符串。
Datestep.waitForEvent(name, filter, options?)
— durable rendezvous
step.waitForEvent(name, filter, options?)step.waitForEvent(name, filter, options?)
—— 持久化等待
step.waitForEvent(name, filter, options?)typescript
const payment = await step.waitForEvent('payment-received', {
type: 'payment.completed',
match: { orderId: input.orderId }, // every key must equal in payload
}, { timeoutMs: 60 * 60 * 1000 });
if (!payment) {
return { outcome: 'payment-timeout' };
}Resolves when something else calls and the keys equal the payload's. Returns the payload, or on timeout.
platform.workflows.sendEvent('payment.completed', { orderId, ... })matchnulltypescript
const payment = await step.waitForEvent('payment-received', {
type: 'payment.completed',
match: { orderId: input.orderId }, // 负载中的每个键必须匹配
}, { timeoutMs: 60 * 60 * 1000 });
if (!payment) {
return { outcome: 'payment-timeout' };
}当其他代码调用 且 中的键与负载中的值匹配时,该步骤会完成。返回负载内容,超时则返回 。
platform.workflows.sendEvent('payment.completed', { orderId, ... })matchnullstep.invoke(name, workflowId, input)
— child workflow
step.invoke(name, workflowId, input)step.invoke(name, workflowId, input)
—— 子工作流
step.invoke(name, workflowId, input)typescript
const handle = await step.invoke('child', 'send-receipt', { orderId, email });
const result = await handle.result();Composes workflows. The child's full step history is its own; the parent records only the invocation result.
typescript
const handle = await step.invoke('child', 'send-receipt', { orderId, email });
const result = await handle.result();组合调用工作流。子工作流的完整步骤历史独立存储;父工作流仅记录调用结果。
Starting and managing runs
启动和管理工作流实例
Run starts come from your normal runtime code (route handlers, event handlers, other workflows) — there is no HTTP trigger for workflows.
typescript
const platform = getPlatform();
// Start
const handle = await platform.workflows.start('build-report', { userId, reportId });
console.log(handle.runId);
// Get status (poll or surface in admin UI)
const run = await handle.status();
// { runId, workflowId, status: 'queued' | 'running' | 'sleeping' | 'waiting_event' | 'completed' | 'failed' | 'cancelled', ... }
// Step history (debugging)
const steps = await handle.history();
// Wait for completion
const output = await handle.result({ timeoutMs: 5 * 60_000 });
// Cancel
const cancelled = await handle.cancel(); // best-effort; returns true if it transitioned
// Get a handle to an existing run (no start)
const existing = platform.workflows.handle(savedRunId);工作流实例的启动来自常规的 runtime 代码(路由处理器、事件处理器、其他工作流)——工作流没有 HTTP 触发器。
typescript
const platform = getPlatform();
// 启动
const handle = await platform.workflows.start('build-report', { userId, reportId });
console.log(handle.runId);
// 获取状态(轮询或在管理界面展示)
const run = await handle.status();
// { runId, workflowId, status: 'queued' | 'running' | 'sleeping' | 'waiting_event' | 'completed' | 'failed' | 'cancelled', ... }
// 步骤历史(调试用)
const steps = await handle.history();
// 等待完成
const output = await handle.result({ timeoutMs: 5 * 60_000 });
// 取消
const cancelled = await handle.cancel(); // 尽力而为;如果状态转换成功则返回 true
// 获取已有实例的句柄(不启动新实例)
const existing = platform.workflows.handle(savedRunId);Sending events to waiters
向等待中的实例发送事件
typescript
// Anywhere in your runtime code:
const woken = await platform.workflows.sendEvent('payment.completed', {
orderId: '123',
amount: 4200,
});
// woken: number of runs resolvedMatch rules: the you pass must equal the waiter's filter ; every key in the waiter's must be present in with an equal value.
eventTypetypematchpayloadtypescript
// 在 runtime 代码中的任意位置:
const woken = await platform.workflows.sendEvent('payment.completed', {
orderId: '123',
amount: 4200,
});
// woken: 被唤醒的实例数量匹配规则:传入的 必须与等待者的过滤器 一致;等待者 中的每个键必须存在于 中且值相等。
eventTypetypematchpayloadPatterns
模式
Lazy-start on first sight
首次触发时延迟启动
If you can't always reach a "creation" point — e.g. the invitee row may already exist — start the workflow lazily on any save and let the workflow itself short-circuit if it's a duplicate:
typescript
const handle = await platform.workflows.start('invitee-click-watch', {
inviteeNanoid, inviteId, ownerUserId,
});
// Multiple starts → multiple runs; design the workflow to be safe on duplicates
// (e.g. include the nanoid in step names, or check a marker before flagging)For strict deduplication, lookup by a stable key in KV:
typescript
const existing = await kv.get('workflow-runs', `click-watch:${inviteeNanoid}`);
if (!existing) {
const handle = await platform.workflows.start(/* ... */);
await kv.put('workflow-runs', `click-watch:${inviteeNanoid}`, handle.runId);
}如果你无法总是找到“创建”节点——例如受邀者记录可能已存在——可在任意保存操作时延迟启动工作流,并让工作流自身在重复启动时短路:
typescript
const handle = await platform.workflows.start('invitee-click-watch', {
inviteeNanoid, inviteId, ownerUserId,
});
// 多次启动会创建多个实例;需设计工作流以确保重复启动时安全
// (例如在步骤名称中包含 nanoid,或在标记前检查标记)如需严格去重,可通过 KV 中的稳定键查询:
typescript
const existing = await kv.get('workflow-runs', `click-watch:${inviteeNanoid}`);
if (!existing) {
const handle = await platform.workflows.start(/* ... */);
await kv.put('workflow-runs', `click-watch:${inviteeNanoid}`, handle.runId);
}Saga / compensation
事务补偿(Saga / compensation)
Each side effect lives in its own . On failure, run compensating steps:
step.runtypescript
try {
const charge = await step.run('charge', () => stripe.charges.create(/* ... */));
await step.run('reserve-inventory', () => inventory.reserve(items));
await step.run('ship', () => shipping.create(/* ... */));
} catch (err) {
await step.run('refund', () => stripe.refunds.create({ charge: charge.id }));
throw err;
}每个副作用都放在独立的 中。失败时,执行补偿步骤:
step.runtypescript
try {
const charge = await step.run('charge', () => stripe.charges.create(/* ... */));
await step.run('reserve-inventory', () => inventory.reserve(items));
await step.run('ship', () => shipping.create(/* ... */));
} catch (err) {
await step.run('refund', () => stripe.refunds.create({ charge: charge.id }));
throw err;
}Reminder pipeline
提醒流水线
typescript
await step.sleep('1h-warning', '1h');
await step.run('send-1h-warning', () => platform.push.send(target, /* ... */));
await step.sleep('5m-warning', '55m');
await step.run('send-5m-warning', () => platform.push.send(target, /* ... */));
await step.sleepUntil('event-time', input.event_date);
await step.run('event-started', () => /* ... */);typescript
await step.sleep('1h-warning', '1h');
await step.run('send-1h-warning', () => platform.push.send(target, /* ... */));
await step.sleep('5m-warning', '55m');
await step.run('send-5m-warning', () => platform.push.send(target, /* ... */));
await step.sleepUntil('event-time', input.event_date);
await step.run('event-started', () => /* ... */);Pitfalls
注意事项
- Naked side effects in the workflow body. Anything outside re-runs on every replay. Even
step.runis fine, butconsole.log,fetch,kv.putare not — wrap them.db.insertOne - Non-deterministic logic outside steps. ,
Math.random(),Date.now()outsidecrypto.randomUUID()will give different values on replay. Capture them inside a step.step.run - Step name collisions. The runtime keys steps by per-run. Reusing a name in the same run is undefined behavior — append an iteration counter if you loop.
name - Long timeouts. is the whole-run budget. Make sure it covers worst-case sleeps + step durations.
options.timeoutSecs - Workflow vs event. If you only need to react once and quickly, use an event handler (see maravilla-events). Workflows pay a ledger cost per step.
- 工作流主体中的未包裹副作用。任何 之外的代码都会在每次重放时再次运行。
step.run没问题,但console.log、fetch、kv.put不行——必须包裹它们。db.insertOne - 步骤外的非确定性逻辑。、
Math.random()、Date.now()在crypto.randomUUID()之外的代码会在重放时生成不同的值。需在步骤内捕获这些值。step.run - 步骤名称冲突。runtime 会按每个实例的步骤名称作为键。同一实例中重复使用名称会导致未定义行为——如果循环执行,需添加迭代计数器。
- 过长的超时时间。是整个实例的执行时长上限。确保它能覆盖最坏情况下的延迟时间和步骤执行时间。
options.timeoutSecs - 工作流 vs 事件。如果你只需要快速响应一次事件,使用事件处理器(参见 maravilla-events)。工作流每个步骤都会产生账本存储成本。
Related skills
相关技能
- maravilla-events — single-trigger handlers; often the entry point that starts a workflow
- maravilla-realtime — rendezvous source
step.waitForEvent - maravilla-push — typical workflow side effect (reminders)
- maravilla-kv — surfacing live workflow state to the UI
Full reference: https://www.maravilla.cloud/llms-full.txt.
- maravilla-events —— 单次触发处理器;通常是启动工作流的入口
- maravilla-realtime —— 的事件源
step.waitForEvent - maravilla-push —— 工作流常见的副作用(提醒)
- maravilla-kv —— 向 UI 展示工作流实时状态