trigger-agents
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAI Agent Patterns with Trigger.dev
基于Trigger.dev的AI Agent模式
Build production-ready AI agents using Trigger.dev's durable execution.
使用Trigger.dev的持久化执行功能构建可投入生产的AI Agent。
Pattern Selection
模式选择
Need to... → Use
─────────────────────────────────────────────────────
Process items in parallel → Parallelization
Route to different models/handlers → Routing
Chain steps with validation gates → Prompt Chaining
Coordinate multiple specialized tasks → Orchestrator-Workers
Self-improve until quality threshold → Evaluator-Optimizer
Pause for human approval → Human-in-the-Loop (waitpoints.md)
Stream progress to frontend → Realtime Streams (streaming.md)
Let LLM call your tasks as tools → ai.tool (ai-tool.md)需要... → 使用
─────────────────────────────────────────────────────
并行处理项目 → 并行处理模式
路由到不同模型/处理程序 → 路由模式
带验证关卡的步骤链式调用 → 提示词链式调用
协调多个专业化任务 → 编排器-工作者模式
自我优化直至达到质量阈值 → 评估器-优化器模式
暂停以等待人工审核 → 人机协同(waitpoints.md)
向前端流式传输进度 → 实时流(streaming.md)
让LLM将你的任务作为工具调用 → ai.tool(ai-tool.md)Core Patterns
核心模式
1. Prompt Chaining (Sequential with Gates)
1. 提示词链式调用(带关卡的顺序执行)
Chain LLM calls with validation between steps. Fail early if intermediate output is bad.
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
export const translateCopy = task({
id: "translate-copy",
run: async ({ text, targetLanguage, maxWords }) => {
// Step 1: Generate
const draft = await generateText({
model: openai("gpt-4o"),
prompt: `Write marketing copy about: ${text}`,
});
// Gate: Validate before continuing
const wordCount = draft.text.split(/\s+/).length;
if (wordCount > maxWords) {
throw new Error(`Draft too long: ${wordCount} > ${maxWords}`);
}
// Step 2: Translate (only if gate passed)
const translated = await generateText({
model: openai("gpt-4o"),
prompt: `Translate to ${targetLanguage}: ${draft.text}`,
});
return { draft: draft.text, translated: translated.text };
},
});在LLM调用之间加入验证环节,若中间输出不符合要求则提前终止。
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
export const translateCopy = task({
id: "translate-copy",
run: async ({ text, targetLanguage, maxWords }) => {
// 步骤1:生成内容
const draft = await generateText({
model: openai("gpt-4o"),
prompt: `撰写关于以下内容的营销文案:${text}`,
});
// 关卡:继续执行前先验证
const wordCount = draft.text.split(/\s+/).length;
if (wordCount > maxWords) {
throw new Error(`文案过长:${wordCount} > ${maxWords}`);
}
// 步骤2:翻译(仅当关卡验证通过时执行)
const translated = await generateText({
model: openai("gpt-4o"),
prompt: `将以下内容翻译成${targetLanguage}:${draft.text}`,
});
return { draft: draft.text, translated: translated.text };
},
});2. Routing (Classify → Dispatch)
2. 路由模式(分类 → 分发)
Use a cheap model to classify, then route to appropriate handler.
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const routingSchema = z.object({
model: z.enum(["gpt-4o", "o1-mini"]),
reason: z.string(),
});
export const routeQuestion = task({
id: "route-question",
run: async ({ question }) => {
// Cheap classification call
const routing = await generateText({
model: openai("gpt-4o-mini"),
messages: [
{
role: "system",
content: `Classify question complexity. Return JSON: {"model": "gpt-4o" | "o1-mini", "reason": "..."}
- gpt-4o: simple factual questions
- o1-mini: complex reasoning, math, code`,
},
{ role: "user", content: question },
],
});
const { model } = routingSchema.parse(JSON.parse(routing.text));
// Route to selected model
const answer = await generateText({
model: openai(model),
prompt: question,
});
return { answer: answer.text, routedTo: model };
},
});使用轻量模型进行分类,然后路由到合适的处理程序。
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const routingSchema = z.object({
model: z.enum(["gpt-4o", "o1-mini"]),
reason: z.string(),
});
export const routeQuestion = task({
id: "route-question",
run: async ({ question }) => {
// 低成本分类调用
const routing = await generateText({
model: openai("gpt-4o-mini"),
messages: [
{
role: "system",
content: `对问题复杂度进行分类。返回JSON格式:{"model": "gpt-4o" | "o1-mini", "reason": "..."}
- gpt-4o:简单事实类问题
- o1-mini:复杂推理、数学、代码类问题`,
},
{ role: "user", content: question },
],
});
const { model } = routingSchema.parse(JSON.parse(routing.text));
// 路由到选定的模型
const answer = await generateText({
model: openai(model),
prompt: question,
});
return { answer: answer.text, routedTo: model };
},
});3. Parallelization
3. 并行处理模式
Run independent LLM calls simultaneously with .
batch.triggerByTaskAndWaittypescript
import { batch, task } from "@trigger.dev/sdk";
export const analyzeContent = task({
id: "analyze-content",
run: async ({ text }) => {
// All three run in parallel
const { runs: [sentiment, summary, moderation] } = await batch.triggerByTaskAndWait([
{ task: analyzeSentiment, payload: { text } },
{ task: summarizeText, payload: { text } },
{ task: moderateContent, payload: { text } },
]);
// Check moderation first
if (moderation.ok && moderation.output.flagged) {
return { error: "Content flagged", reason: moderation.output.reason };
}
return {
sentiment: sentiment.ok ? sentiment.output : null,
summary: summary.ok ? summary.output : null,
};
},
});See: for advanced patterns
references/orchestration.md使用同时执行独立的LLM调用。
batch.triggerByTaskAndWaittypescript
import { batch, task } from "@trigger.dev/sdk";
export const analyzeContent = task({
id: "analyze-content",
run: async ({ text }) => {
// 三个任务并行执行
const { runs: [sentiment, summary, moderation] } = await batch.triggerByTaskAndWait([
{ task: analyzeSentiment, payload: { text } },
{ task: summarizeText, payload: { text } },
{ task: moderateContent, payload: { text } },
]);
// 先检查内容审核结果
if (moderation.ok && moderation.output.flagged) {
return { error: "内容被标记", reason: moderation.output.reason };
}
return {
sentiment: sentiment.ok ? sentiment.output : null,
summary: summary.ok ? summary.output : null,
};
},
});参考: 查看高级模式
references/orchestration.md4. Orchestrator-Workers (Fan-out/Fan-in)
4. 编排器-工作者模式(扇出/扇入)
Orchestrator extracts work items, fans out to workers, aggregates results.
typescript
import { batch, task } from "@trigger.dev/sdk";
export const factChecker = task({
id: "fact-checker",
run: async ({ article }) => {
// Step 1: Extract claims (sequential - need output first)
const { runs: [extractResult] } = await batch.triggerByTaskAndWait([
{ task: extractClaims, payload: { article } },
]);
if (!extractResult.ok) throw new Error("Failed to extract claims");
const claims = extractResult.output;
// Step 2: Fan-out - verify all claims in parallel
const { runs } = await batch.triggerByTaskAndWait(
claims.map(claim => ({ task: verifyClaim, payload: claim }))
);
// Step 3: Fan-in - aggregate results
const verified = runs
.filter((r): r is typeof r & { ok: true } => r.ok)
.map(r => r.output);
return { claims, verifications: verified };
},
});编排器提取工作项,分发给工作者执行,最后聚合结果。
typescript
import { batch, task } from "@trigger.dev/sdk";
export const factChecker = task({
id: "fact-checker",
run: async ({ article }) => {
// 步骤1:提取主张(顺序执行 - 需要先获取输出结果)
const { runs: [extractResult] } = await batch.triggerByTaskAndWait([
{ task: extractClaims, payload: { article } },
]);
if (!extractResult.ok) throw new Error("提取主张失败");
const claims = extractResult.output;
// 步骤2:扇出 - 并行验证所有主张
const { runs } = await batch.triggerByTaskAndWait(
claims.map(claim => ({ task: verifyClaim, payload: claim }))
);
// 步骤3:扇入 - 聚合结果
const verified = runs
.filter((r): r is typeof r & { ok: true } => r.ok)
.map(r => r.output);
return { claims, verifications: verified };
},
});5. Evaluator-Optimizer (Self-Refining Loop)
5. 评估器-优化器模式(自我优化循环)
Generate → Evaluate → Retry with feedback until approved.
typescript
import { task } from "@trigger.dev/sdk";
export const refineTranslation = task({
id: "refine-translation",
run: async ({ text, targetLanguage, feedback, attempt = 0 }) => {
// Bail condition
if (attempt >= 5) {
return { text, status: "MAX_ATTEMPTS", attempts: attempt };
}
// Generate (with feedback if retrying)
const prompt = feedback
? `Improve this translation based on feedback:\n${feedback}\n\nOriginal: ${text}`
: `Translate to ${targetLanguage}: ${text}`;
const translation = await generateText({
model: openai("gpt-4o"),
prompt,
});
// Evaluate
const evaluation = await generateText({
model: openai("gpt-4o"),
prompt: `Evaluate translation quality. Reply APPROVED or provide specific feedback:\n${translation.text}`,
});
if (evaluation.text.includes("APPROVED")) {
return { text: translation.text, status: "APPROVED", attempts: attempt + 1 };
}
// Recursive self-call with feedback
return refineTranslation.triggerAndWait({
text,
targetLanguage,
feedback: evaluation.text,
attempt: attempt + 1,
}).unwrap();
},
});生成 → 评估 → 根据反馈重试直至通过审核。
typescript
import { task } from "@trigger.dev/sdk";
export const refineTranslation = task({
id: "refine-translation",
run: async ({ text, targetLanguage, feedback, attempt = 0 }) => {
// 终止条件
if (attempt >= 5) {
return { text, status: "达到最大尝试次数", attempts: attempt };
}
// 生成内容(如果是重试则带上反馈)
const prompt = feedback
? `根据以下反馈优化翻译结果:\n${feedback}\n\n原文: ${text}`
: `将以下内容翻译成${targetLanguage}: ${text}`;
const translation = await generateText({
model: openai("gpt-4o"),
prompt,
});
// 评估结果
const evaluation = await generateText({
model: openai("gpt-4o"),
prompt: `评估翻译质量。回复“APPROVED”或提供具体反馈:\n${translation.text}`,
});
if (evaluation.text.includes("APPROVED")) {
return { text: translation.text, status: "已通过", attempts: attempt + 1 };
}
// 带反馈的递归自调用
return refineTranslation.triggerAndWait({
text,
targetLanguage,
feedback: evaluation.text,
attempt: attempt + 1,
}).unwrap();
},
});Trigger-Specific Features
Trigger.dev专属功能
| Feature | What it enables | Reference |
|---|---|---|
| Waitpoints | Human approval gates, external callbacks | |
| Streams | Real-time progress to frontend | |
| ai.tool | Let LLMs call your tasks as tools | |
| batch.triggerByTaskAndWait | Typed parallel execution | |
| 功能 | 适用场景 | 参考文档 |
|---|---|---|
| Waitpoints | 人工审核关卡、外部回调 | |
| Streams | 向前端实时传输进度 | |
| ai.tool | 让LLM将你的任务作为工具调用 | |
| batch.triggerByTaskAndWait | 类型安全的并行执行 | |
Error Handling
错误处理
typescript
const { runs } = await batch.triggerByTaskAndWait([...]);
// Check individual results
for (const run of runs) {
if (run.ok) {
console.log(run.output); // Typed output
} else {
console.error(run.error); // Error details
console.log(run.taskIdentifier); // Which task failed
}
}
// Or filter by task type
const verifications = runs
.filter((r): r is typeof r & { ok: true } =>
r.ok && r.taskIdentifier === "verify-claim"
)
.map(r => r.output);typescript
const { runs } = await batch.triggerByTaskAndWait([...]);
// 检查单个任务结果
for (const run of runs) {
if (run.ok) {
console.log(run.output); // 类型安全的输出
} else {
console.error(run.error); // 错误详情
console.log(run.taskIdentifier); // 哪个任务执行失败
}
}
// 或按任务类型筛选
const verifications = runs
.filter((r): r is typeof r & { ok: true } =>
r.ok && r.taskIdentifier === "verify-claim"
)
.map(r => r.output);Quick Reference
快速参考
typescript
// Trigger and wait for result
const result = await myTask.triggerAndWait(payload);
if (result.ok) console.log(result.output);
// Batch trigger same task
const results = await myTask.batchTriggerAndWait([
{ payload: item1 },
{ payload: item2 },
]);
// Batch trigger different tasks (typed)
const { runs } = await batch.triggerByTaskAndWait([
{ task: taskA, payload: { foo: 1 } },
{ task: taskB, payload: { bar: "x" } },
]);
// Self-recursion with unwrap
return myTask.triggerAndWait(newPayload).unwrap();typescript
// 触发任务并等待结果
const result = await myTask.triggerAndWait(payload);
if (result.ok) console.log(result.output);
// 批量触发同一个任务
const results = await myTask.batchTriggerAndWait([
{ payload: item1 },
{ payload: item2 },
]);
// 批量触发不同任务(类型安全)
const { runs } = await batch.triggerByTaskAndWait([
{ task: taskA, payload: { foo: 1 } },
{ task: taskB, payload: { bar: "x" } },
]);
// 带unwrap的递归自调用
return myTask.triggerAndWait(newPayload).unwrap();