Loading...
Loading...
Compare original and translation side by side
step.runstep.sleep('1h')step.sleepUntil(date)step.waitForEventstep.invokeplatform.workflows.start(...)step.runstep.sleep('1h')step.sleepUntil(date)step.waitForEventstep.invokeplatform.workflows.start(...)workflows/*.tsdefineWorkflowimport { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { userId: string; reportId: string; }
export const buildReport = defineWorkflow<Input>(
{ id: 'build-report', options: { retries: 3, timeoutSecs: 60 * 60 } },
async (input, step, ctx) => {
const data = await step.run('fetch-data', async () => {
return await fetchExpensiveData(input.userId);
});
const rendered = await step.run('render', async () => {
return await renderPdf(data);
});
await step.run('upload', async () => {
await ctx.platform.env.STORAGE.put(`reports/${input.reportId}.pdf`, rendered);
});
return { ok: true };
},
);Import note for v0.2.5:is exposed atdefineWorkflow. The docs'@maravilla-labs/functions/workflows/runtimesubpath does not resolve in this release.platform/workflows
workflows/*.tsdefineWorkflowimport { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { userId: string; reportId: string; }
export const buildReport = defineWorkflow<Input>(
{ id: 'build-report', options: { retries: 3, timeoutSecs: 60 * 60 } },
async (input, step, ctx) => {
const data = await step.run('fetch-data', async () => {
return await fetchExpensiveData(input.userId);
});
const rendered = await step.run('render', async () => {
return await renderPdf(data);
});
await step.run('upload', async () => {
await ctx.platform.env.STORAGE.put(`reports/${input.reportId}.pdf`, rendered);
});
return { ok: true };
},
);v0.2.5 版本导入说明:暴露在defineWorkflow。文档中提到的@maravilla-labs/functions/workflows/runtime子路径在该版本中无法解析。platform/workflows
/**
* One durable workflow per invitee.
*
* t=0 start (snapshot + go to sleep)
* t=first-grace check 1 — if not clicked, flag `unclicked_first` (amber chip)
* t=first-grace + second check 2 — if still not clicked, flag `unclicked_final` (red chip)
*
* Each `ctx.kv.put` on `inv:{nanoid}` fires a REN event the owner's guest
* list is already subscribed to, so chips appear live with no reload.
*/
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { inviteeNanoid: string; inviteId: string; ownerUserId: string; }
export const inviteeClickWatch = defineWorkflow<Input>(
{ id: 'invitee-click-watch', options: { retries: 3, timeoutSecs: 7 * 24 * 3600 } },
async (input, step, ctx) => {
const kv = ctx.kv as { get: any; put: any };
const key = `inv:${input.inviteeNanoid}`;
await step.sleep('first-grace', '60s');
const firstOutcome = await step.run('check-1', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_first = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (firstOutcome === 'removed') return { outcome: 'invitee-removed' };
await step.sleep('second-grace', '120s');
const secondOutcome = await step.run('check-2', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_final = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (secondOutcome === 'removed') return { outcome: 'invitee-removed' };
return { outcome: 'done', firstOutcome, secondOutcome };
},
);step.runawait fetch(...)await kv.put(...)step.run('name', ...)/**
* 每个受邀者对应一个持久化工作流。
*
* t=0 启动(快照 + 进入延迟状态)
* t=first-grace 第一次检查——如果未点击,标记 `unclicked_first`(琥珀色标记)
* t=first-grace + second 第二次检查——如果仍未点击,标记 `unclicked_final`(红色标记)
*
* 每次对 `inv:{nanoid}` 执行 `ctx.kv.put` 都会触发所有者 guest 的 REN 事件
* 列表已订阅该事件,因此标记会实时显示,无需刷新页面。
*/
import { defineWorkflow } from '@maravilla-labs/functions/workflows/runtime';
interface Input { inviteeNanoid: string; inviteId: string; ownerUserId: string; }
export const inviteeClickWatch = defineWorkflow<Input>(
{ id: 'invitee-click-watch', options: { retries: 3, timeoutSecs: 7 * 24 * 3600 } },
async (input, step, ctx) => {
const kv = ctx.kv as { get: any; put: any };
const key = `inv:${input.inviteeNanoid}`;
await step.sleep('first-grace', '60s');
const firstOutcome = await step.run('check-1', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_first = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (firstOutcome === 'removed') return { outcome: 'invitee-removed' };
await step.sleep('second-grace', '120s');
const secondOutcome = await step.run('check-2', async () => {
const raw = await kv.get('invites', key);
if (!raw) return 'removed' as const;
const invitee = JSON.parse(raw);
if (invitee.clicked_at) return 'clicked' as const;
invitee.unclicked_final = true;
await kv.put('invites', key, JSON.stringify(invitee));
return 'unclicked' as const;
});
if (secondOutcome === 'removed') return { outcome: 'invitee-removed' };
return { outcome: 'done', firstOutcome, secondOutcome };
},
);step.runawait fetch(...)await kv.put(...)step.run('name', ...)stepstepstep.run(name, fn)step.run(name, fn)const result = await step.run('charge-card', async () => {
return await stripe.charges.create({ amount, source });
});fnfnconst result = await step.run('charge-card', async () => {
return await stripe.charges.create({ amount, source });
});fnfnstep.sleep(name, duration)step.sleep(name, duration)await step.sleep('cool-down', '30s');
await step.sleep('grace', '24h');<n>s<n>m<n>h<n>dawait step.sleep('cool-down', '30s');
await step.sleep('grace', '24h');<n>s<n>m<n>h<n>dstep.sleepUntil(name, date)step.sleepUntil(name, date)await step.sleepUntil('event-start', new Date(invite.event_date));Dateawait step.sleepUntil('event-start', new Date(invite.event_date));Datestep.waitForEvent(name, filter, options?)step.waitForEvent(name, filter, options?)const payment = await step.waitForEvent('payment-received', {
type: 'payment.completed',
match: { orderId: input.orderId }, // every key must equal in payload
}, { timeoutMs: 60 * 60 * 1000 });
if (!payment) {
return { outcome: 'payment-timeout' };
}platform.workflows.sendEvent('payment.completed', { orderId, ... })matchnullconst payment = await step.waitForEvent('payment-received', {
type: 'payment.completed',
match: { orderId: input.orderId }, // 负载中的每个键必须匹配
}, { timeoutMs: 60 * 60 * 1000 });
if (!payment) {
return { outcome: 'payment-timeout' };
}platform.workflows.sendEvent('payment.completed', { orderId, ... })matchnullstep.invoke(name, workflowId, input)step.invoke(name, workflowId, input)const handle = await step.invoke('child', 'send-receipt', { orderId, email });
const result = await handle.result();const handle = await step.invoke('child', 'send-receipt', { orderId, email });
const result = await handle.result();const platform = getPlatform();
// Start
const handle = await platform.workflows.start('build-report', { userId, reportId });
console.log(handle.runId);
// Get status (poll or surface in admin UI)
const run = await handle.status();
// { runId, workflowId, status: 'queued' | 'running' | 'sleeping' | 'waiting_event' | 'completed' | 'failed' | 'cancelled', ... }
// Step history (debugging)
const steps = await handle.history();
// Wait for completion
const output = await handle.result({ timeoutMs: 5 * 60_000 });
// Cancel
const cancelled = await handle.cancel(); // best-effort; returns true if it transitioned
// Get a handle to an existing run (no start)
const existing = platform.workflows.handle(savedRunId);const platform = getPlatform();
// 启动
const handle = await platform.workflows.start('build-report', { userId, reportId });
console.log(handle.runId);
// 获取状态(轮询或在管理界面展示)
const run = await handle.status();
// { runId, workflowId, status: 'queued' | 'running' | 'sleeping' | 'waiting_event' | 'completed' | 'failed' | 'cancelled', ... }
// 步骤历史(调试用)
const steps = await handle.history();
// 等待完成
const output = await handle.result({ timeoutMs: 5 * 60_000 });
// 取消
const cancelled = await handle.cancel(); // 尽力而为;如果状态转换成功则返回 true
// 获取已有实例的句柄(不启动新实例)
const existing = platform.workflows.handle(savedRunId);// Anywhere in your runtime code:
const woken = await platform.workflows.sendEvent('payment.completed', {
orderId: '123',
amount: 4200,
});
// woken: number of runs resolvedeventTypetypematchpayload// 在 runtime 代码中的任意位置:
const woken = await platform.workflows.sendEvent('payment.completed', {
orderId: '123',
amount: 4200,
});
// woken: 被唤醒的实例数量eventTypetypematchpayloadconst handle = await platform.workflows.start('invitee-click-watch', {
inviteeNanoid, inviteId, ownerUserId,
});
// Multiple starts → multiple runs; design the workflow to be safe on duplicates
// (e.g. include the nanoid in step names, or check a marker before flagging)const existing = await kv.get('workflow-runs', `click-watch:${inviteeNanoid}`);
if (!existing) {
const handle = await platform.workflows.start(/* ... */);
await kv.put('workflow-runs', `click-watch:${inviteeNanoid}`, handle.runId);
}const handle = await platform.workflows.start('invitee-click-watch', {
inviteeNanoid, inviteId, ownerUserId,
});
// 多次启动会创建多个实例;需设计工作流以确保重复启动时安全
// (例如在步骤名称中包含 nanoid,或在标记前检查标记)const existing = await kv.get('workflow-runs', `click-watch:${inviteeNanoid}`);
if (!existing) {
const handle = await platform.workflows.start(/* ... */);
await kv.put('workflow-runs', `click-watch:${inviteeNanoid}`, handle.runId);
}step.runtry {
const charge = await step.run('charge', () => stripe.charges.create(/* ... */));
await step.run('reserve-inventory', () => inventory.reserve(items));
await step.run('ship', () => shipping.create(/* ... */));
} catch (err) {
await step.run('refund', () => stripe.refunds.create({ charge: charge.id }));
throw err;
}step.runtry {
const charge = await step.run('charge', () => stripe.charges.create(/* ... */));
await step.run('reserve-inventory', () => inventory.reserve(items));
await step.run('ship', () => shipping.create(/* ... */));
} catch (err) {
await step.run('refund', () => stripe.refunds.create({ charge: charge.id }));
throw err;
}await step.sleep('1h-warning', '1h');
await step.run('send-1h-warning', () => platform.push.send(target, /* ... */));
await step.sleep('5m-warning', '55m');
await step.run('send-5m-warning', () => platform.push.send(target, /* ... */));
await step.sleepUntil('event-time', input.event_date);
await step.run('event-started', () => /* ... */);await step.sleep('1h-warning', '1h');
await step.run('send-1h-warning', () => platform.push.send(target, /* ... */));
await step.sleep('5m-warning', '55m');
await step.run('send-5m-warning', () => platform.push.send(target, /* ... */));
await step.sleepUntil('event-time', input.event_date);
await step.run('event-started', () => /* ... */);step.runconsole.logfetchkv.putdb.insertOneMath.random()Date.now()crypto.randomUUID()step.runnameoptions.timeoutSecsstep.runconsole.logfetchkv.putdb.insertOneMath.random()Date.now()crypto.randomUUID()step.runoptions.timeoutSecsstep.waitForEventstep.waitForEvent