trigger-agents

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

AI Agent Patterns with Trigger.dev

基于Trigger.dev的AI Agent模式

Build production-ready AI agents using Trigger.dev's durable execution.
使用Trigger.dev的持久化执行功能构建可投入生产的AI Agent。

Pattern Selection

模式选择

Need to...                              → Use
─────────────────────────────────────────────────────
Process items in parallel               → Parallelization
Route to different models/handlers      → Routing
Chain steps with validation gates       → Prompt Chaining
Coordinate multiple specialized tasks   → Orchestrator-Workers
Self-improve until quality threshold    → Evaluator-Optimizer
Pause for human approval                → Human-in-the-Loop (waitpoints.md)
Stream progress to frontend             → Realtime Streams (streaming.md)
Let LLM call your tasks as tools        → ai.tool (ai-tool.md)

需要...                              → 使用
─────────────────────────────────────────────────────
并行处理项目               → 并行处理模式
路由到不同模型/处理程序      → 路由模式
带验证关卡的步骤链式调用       → 提示词链式调用
协调多个专业化任务   → 编排器-工作者模式
自我优化直至达到质量阈值    → 评估器-优化器模式
暂停以等待人工审核                → 人机协同(waitpoints.md)
向前端流式传输进度             → 实时流(streaming.md)
让LLM将你的任务作为工具调用        → ai.tool(ai-tool.md)

Core Patterns

核心模式

1. Prompt Chaining (Sequential with Gates)

1. 提示词链式调用(带关卡的顺序执行)

Chain LLM calls with validation between steps. Fail early if intermediate output is bad.
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";

export const translateCopy = task({
  id: "translate-copy",
  run: async ({ text, targetLanguage, maxWords }) => {
    // Step 1: Generate
    const draft = await generateText({
      model: openai("gpt-4o"),
      prompt: `Write marketing copy about: ${text}`,
    });

    // Gate: Validate before continuing
    const wordCount = draft.text.split(/\s+/).length;
    if (wordCount > maxWords) {
      throw new Error(`Draft too long: ${wordCount} > ${maxWords}`);
    }

    // Step 2: Translate (only if gate passed)
    const translated = await generateText({
      model: openai("gpt-4o"),
      prompt: `Translate to ${targetLanguage}: ${draft.text}`,
    });

    return { draft: draft.text, translated: translated.text };
  },
});

在LLM调用之间加入验证环节,若中间输出不符合要求则提前终止。
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";

export const translateCopy = task({
  id: "translate-copy",
  run: async ({ text, targetLanguage, maxWords }) => {
    // 步骤1:生成内容
    const draft = await generateText({
      model: openai("gpt-4o"),
      prompt: `撰写关于以下内容的营销文案:${text}`,
    });

    // 关卡:继续执行前先验证
    const wordCount = draft.text.split(/\s+/).length;
    if (wordCount > maxWords) {
      throw new Error(`文案过长:${wordCount} > ${maxWords}`);
    }

    // 步骤2:翻译(仅当关卡验证通过时执行)
    const translated = await generateText({
      model: openai("gpt-4o"),
      prompt: `将以下内容翻译成${targetLanguage}${draft.text}`,
    });

    return { draft: draft.text, translated: translated.text };
  },
});

2. Routing (Classify → Dispatch)

2. 路由模式(分类 → 分发)

Use a cheap model to classify, then route to appropriate handler.
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

const routingSchema = z.object({
  model: z.enum(["gpt-4o", "o1-mini"]),
  reason: z.string(),
});

export const routeQuestion = task({
  id: "route-question",
  run: async ({ question }) => {
    // Cheap classification call
    const routing = await generateText({
      model: openai("gpt-4o-mini"),
      messages: [
        {
          role: "system",
          content: `Classify question complexity. Return JSON: {"model": "gpt-4o" | "o1-mini", "reason": "..."}
          - gpt-4o: simple factual questions
          - o1-mini: complex reasoning, math, code`,
        },
        { role: "user", content: question },
      ],
    });

    const { model } = routingSchema.parse(JSON.parse(routing.text));

    // Route to selected model
    const answer = await generateText({
      model: openai(model),
      prompt: question,
    });

    return { answer: answer.text, routedTo: model };
  },
});

使用轻量模型进行分类,然后路由到合适的处理程序。
typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

const routingSchema = z.object({
  model: z.enum(["gpt-4o", "o1-mini"]),
  reason: z.string(),
});

export const routeQuestion = task({
  id: "route-question",
  run: async ({ question }) => {
    // 低成本分类调用
    const routing = await generateText({
      model: openai("gpt-4o-mini"),
      messages: [
        {
          role: "system",
          content: `对问题复杂度进行分类。返回JSON格式:{"model": "gpt-4o" | "o1-mini", "reason": "..."}
          - gpt-4o:简单事实类问题
          - o1-mini:复杂推理、数学、代码类问题`,
        },
        { role: "user", content: question },
      ],
    });

    const { model } = routingSchema.parse(JSON.parse(routing.text));

    // 路由到选定的模型
    const answer = await generateText({
      model: openai(model),
      prompt: question,
    });

    return { answer: answer.text, routedTo: model };
  },
});

3. Parallelization

3. 并行处理模式

Run independent LLM calls simultaneously with
batch.triggerByTaskAndWait
.
typescript
import { batch, task } from "@trigger.dev/sdk";

export const analyzeContent = task({
  id: "analyze-content",
  run: async ({ text }) => {
    // All three run in parallel
    const { runs: [sentiment, summary, moderation] } = await batch.triggerByTaskAndWait([
      { task: analyzeSentiment, payload: { text } },
      { task: summarizeText, payload: { text } },
      { task: moderateContent, payload: { text } },
    ]);

    // Check moderation first
    if (moderation.ok && moderation.output.flagged) {
      return { error: "Content flagged", reason: moderation.output.reason };
    }

    return {
      sentiment: sentiment.ok ? sentiment.output : null,
      summary: summary.ok ? summary.output : null,
    };
  },
});
See:
references/orchestration.md
for advanced patterns

使用
batch.triggerByTaskAndWait
同时执行独立的LLM调用。
typescript
import { batch, task } from "@trigger.dev/sdk";

export const analyzeContent = task({
  id: "analyze-content",
  run: async ({ text }) => {
    // 三个任务并行执行
    const { runs: [sentiment, summary, moderation] } = await batch.triggerByTaskAndWait([
      { task: analyzeSentiment, payload: { text } },
      { task: summarizeText, payload: { text } },
      { task: moderateContent, payload: { text } },
    ]);

    // 先检查内容审核结果
    if (moderation.ok && moderation.output.flagged) {
      return { error: "内容被标记", reason: moderation.output.reason };
    }

    return {
      sentiment: sentiment.ok ? sentiment.output : null,
      summary: summary.ok ? summary.output : null,
    };
  },
});
参考:
references/orchestration.md
查看高级模式

4. Orchestrator-Workers (Fan-out/Fan-in)

4. 编排器-工作者模式(扇出/扇入)

Orchestrator extracts work items, fans out to workers, aggregates results.
typescript
import { batch, task } from "@trigger.dev/sdk";

export const factChecker = task({
  id: "fact-checker",
  run: async ({ article }) => {
    // Step 1: Extract claims (sequential - need output first)
    const { runs: [extractResult] } = await batch.triggerByTaskAndWait([
      { task: extractClaims, payload: { article } },
    ]);

    if (!extractResult.ok) throw new Error("Failed to extract claims");
    const claims = extractResult.output;

    // Step 2: Fan-out - verify all claims in parallel
    const { runs } = await batch.triggerByTaskAndWait(
      claims.map(claim => ({ task: verifyClaim, payload: claim }))
    );

    // Step 3: Fan-in - aggregate results
    const verified = runs
      .filter((r): r is typeof r & { ok: true } => r.ok)
      .map(r => r.output);

    return { claims, verifications: verified };
  },
});

编排器提取工作项,分发给工作者执行,最后聚合结果。
typescript
import { batch, task } from "@trigger.dev/sdk";

export const factChecker = task({
  id: "fact-checker",
  run: async ({ article }) => {
    // 步骤1:提取主张(顺序执行 - 需要先获取输出结果)
    const { runs: [extractResult] } = await batch.triggerByTaskAndWait([
      { task: extractClaims, payload: { article } },
    ]);

    if (!extractResult.ok) throw new Error("提取主张失败");
    const claims = extractResult.output;

    // 步骤2:扇出 - 并行验证所有主张
    const { runs } = await batch.triggerByTaskAndWait(
      claims.map(claim => ({ task: verifyClaim, payload: claim }))
    );

    // 步骤3:扇入 - 聚合结果
    const verified = runs
      .filter((r): r is typeof r & { ok: true } => r.ok)
      .map(r => r.output);

    return { claims, verifications: verified };
  },
});

5. Evaluator-Optimizer (Self-Refining Loop)

5. 评估器-优化器模式(自我优化循环)

Generate → Evaluate → Retry with feedback until approved.
typescript
import { task } from "@trigger.dev/sdk";

export const refineTranslation = task({
  id: "refine-translation",
  run: async ({ text, targetLanguage, feedback, attempt = 0 }) => {
    // Bail condition
    if (attempt >= 5) {
      return { text, status: "MAX_ATTEMPTS", attempts: attempt };
    }

    // Generate (with feedback if retrying)
    const prompt = feedback
      ? `Improve this translation based on feedback:\n${feedback}\n\nOriginal: ${text}`
      : `Translate to ${targetLanguage}: ${text}`;

    const translation = await generateText({
      model: openai("gpt-4o"),
      prompt,
    });

    // Evaluate
    const evaluation = await generateText({
      model: openai("gpt-4o"),
      prompt: `Evaluate translation quality. Reply APPROVED or provide specific feedback:\n${translation.text}`,
    });

    if (evaluation.text.includes("APPROVED")) {
      return { text: translation.text, status: "APPROVED", attempts: attempt + 1 };
    }

    // Recursive self-call with feedback
    return refineTranslation.triggerAndWait({
      text,
      targetLanguage,
      feedback: evaluation.text,
      attempt: attempt + 1,
    }).unwrap();
  },
});

生成 → 评估 → 根据反馈重试直至通过审核。
typescript
import { task } from "@trigger.dev/sdk";

export const refineTranslation = task({
  id: "refine-translation",
  run: async ({ text, targetLanguage, feedback, attempt = 0 }) => {
    // 终止条件
    if (attempt >= 5) {
      return { text, status: "达到最大尝试次数", attempts: attempt };
    }

    // 生成内容(如果是重试则带上反馈)
    const prompt = feedback
      ? `根据以下反馈优化翻译结果:\n${feedback}\n\n原文: ${text}`
      : `将以下内容翻译成${targetLanguage}: ${text}`;

    const translation = await generateText({
      model: openai("gpt-4o"),
      prompt,
    });

    // 评估结果
    const evaluation = await generateText({
      model: openai("gpt-4o"),
      prompt: `评估翻译质量。回复“APPROVED”或提供具体反馈:\n${translation.text}`,
    });

    if (evaluation.text.includes("APPROVED")) {
      return { text: translation.text, status: "已通过", attempts: attempt + 1 };
    }

    // 带反馈的递归自调用
    return refineTranslation.triggerAndWait({
      text,
      targetLanguage,
      feedback: evaluation.text,
      attempt: attempt + 1,
    }).unwrap();
  },
});

Trigger-Specific Features

Trigger.dev专属功能

FeatureWhat it enablesReference
WaitpointsHuman approval gates, external callbacks
references/waitpoints.md
StreamsReal-time progress to frontend
references/streaming.md
ai.toolLet LLMs call your tasks as tools
references/ai-tool.md
batch.triggerByTaskAndWaitTyped parallel execution
references/orchestration.md

功能适用场景参考文档
Waitpoints人工审核关卡、外部回调
references/waitpoints.md
Streams向前端实时传输进度
references/streaming.md
ai.tool让LLM将你的任务作为工具调用
references/ai-tool.md
batch.triggerByTaskAndWait类型安全的并行执行
references/orchestration.md

Error Handling

错误处理

typescript
const { runs } = await batch.triggerByTaskAndWait([...]);

// Check individual results
for (const run of runs) {
  if (run.ok) {
    console.log(run.output);  // Typed output
  } else {
    console.error(run.error);  // Error details
    console.log(run.taskIdentifier);  // Which task failed
  }
}

// Or filter by task type
const verifications = runs
  .filter((r): r is typeof r & { ok: true } =>
    r.ok && r.taskIdentifier === "verify-claim"
  )
  .map(r => r.output);

typescript
const { runs } = await batch.triggerByTaskAndWait([...]);

// 检查单个任务结果
for (const run of runs) {
  if (run.ok) {
    console.log(run.output);  // 类型安全的输出
  } else {
    console.error(run.error);  // 错误详情
    console.log(run.taskIdentifier);  // 哪个任务执行失败
  }
}

// 或按任务类型筛选
const verifications = runs
  .filter((r): r is typeof r & { ok: true } =>
    r.ok && r.taskIdentifier === "verify-claim"
  )
  .map(r => r.output);

Quick Reference

快速参考

typescript
// Trigger and wait for result
const result = await myTask.triggerAndWait(payload);
if (result.ok) console.log(result.output);

// Batch trigger same task
const results = await myTask.batchTriggerAndWait([
  { payload: item1 },
  { payload: item2 },
]);

// Batch trigger different tasks (typed)
const { runs } = await batch.triggerByTaskAndWait([
  { task: taskA, payload: { foo: 1 } },
  { task: taskB, payload: { bar: "x" } },
]);

// Self-recursion with unwrap
return myTask.triggerAndWait(newPayload).unwrap();
typescript
// 触发任务并等待结果
const result = await myTask.triggerAndWait(payload);
if (result.ok) console.log(result.output);

// 批量触发同一个任务
const results = await myTask.batchTriggerAndWait([
  { payload: item1 },
  { payload: item2 },
]);

// 批量触发不同任务(类型安全)
const { runs } = await batch.triggerByTaskAndWait([
  { task: taskA, payload: { foo: 1 } },
  { task: taskB, payload: { bar: "x" } },
]);

// 带unwrap的递归自调用
return myTask.triggerAndWait(newPayload).unwrap();