trigger-tasks

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Trigger.dev Tasks

Trigger.dev 任务

Build durable background tasks that run reliably with automatic retries, queuing, and observability.
构建具备自动重试、队列调度和可观测性的可靠持久化后台任务。

When to Use

适用场景

  • Creating background jobs or async workflows
  • Building AI agents that need long-running execution
  • Processing webhooks, emails, or file uploads
  • Scheduling recurring tasks (cron)
  • Any work that shouldn't block your main application
  • 创建后台作业或异步工作流
  • 构建需要长时间运行的AI Agent
  • 处理Webhook、邮件或文件上传
  • 调度周期性任务(Cron)
  • 任何不应阻塞主应用的工作

Critical Rules

核心规则

  1. Always use
    @trigger.dev/sdk
    — never use deprecated
    client.defineJob
  2. Check
    result.ok
    before accessing
    result.output
    from
    triggerAndWait()
  3. Never use
    Promise.all
    with
    triggerAndWait()
    or
    wait.*
    calls
  4. Export tasks from files in your
    trigger/
    directory
  1. 始终使用
    @trigger.dev/sdk
    — 切勿使用已弃用的
    client.defineJob
  2. 在访问
    triggerAndWait()
    返回的
    result.output
    前,务必检查
    result.ok
  3. 切勿将
    Promise.all
    triggerAndWait()
    wait.*
    调用一起使用
  4. trigger/
    目录下的文件中导出任务

Basic Task

基础任务

ts
import { task } from "@trigger.dev/sdk";

export const processData = task({
  id: "process-data",
  retry: {
    maxAttempts: 10,
    factor: 1.8,
    minTimeoutInMs: 500,
    maxTimeoutInMs: 30_000,
  },
  run: async (payload: { userId: string; data: any[] }) => {
    console.log(`Processing ${payload.data.length} items`);
    return { processed: payload.data.length };
  },
});
ts
import { task } from "@trigger.dev/sdk";

export const processData = task({
  id: "process-data",
  retry: {
    maxAttempts: 10,
    factor: 1.8,
    minTimeoutInMs: 500,
    maxTimeoutInMs: 30_000,
  },
  run: async (payload: { userId: string; data: any[] }) => {
    console.log(`Processing ${payload.data.length} items`);
    return { processed: payload.data.length };
  },
});

Schema Task (Validated Input)

带校验的Schema任务(输入验证)

ts
import { schemaTask } from "@trigger.dev/sdk";
import { z } from "zod";

export const validatedTask = schemaTask({
  id: "validated-task",
  schema: z.object({
    name: z.string(),
    email: z.string().email(),
  }),
  run: async (payload) => {
    // payload is typed and validated
    return { message: `Hello ${payload.name}` };
  },
});
ts
import { schemaTask } from "@trigger.dev/sdk";
import { z } from "zod";

export const validatedTask = schemaTask({
  id: "validated-task",
  schema: z.object({
    name: z.string(),
    email: z.string().email(),
  }),
  run: async (payload) => {
    // payload已被类型化并验证
    return { message: `Hello ${payload.name}` };
  },
});

Triggering Tasks

触发任务

From Backend Code

从后端代码触发

ts
import { tasks } from "@trigger.dev/sdk";
import type { processData } from "./trigger/tasks";

// Single trigger (fire and forget)
const handle = await tasks.trigger<typeof processData>("process-data", {
  userId: "123",
  data: [{ id: 1 }],
});

// Batch trigger (up to 1,000 items, 3MB per payload)
const batchHandle = await tasks.batchTrigger<typeof processData>("process-data", [
  { payload: { userId: "123", data: [] } },
  { payload: { userId: "456", data: [] } },
]);
ts
import { tasks } from "@trigger.dev/sdk";
import type { processData } from "./trigger/tasks";

// 单次触发(即发即弃)
const handle = await tasks.trigger<typeof processData>("process-data", {
  userId: "123",
  data: [{ id: 1 }],
});

// 批量触发(最多1000个条目,每个负载最大3MB)
const batchHandle = await tasks.batchTrigger<typeof processData>("process-data", [
  { payload: { userId: "123", data: [] } },
  { payload: { userId: "456", data: [] } },
]);

From Inside Tasks

从任务内部触发

ts
export const parentTask = task({
  id: "parent-task",
  run: async (payload) => {
    // Fire and forget
    const handle = await childTask.trigger({ data: "value" });

    // Wait for result - returns Result object, NOT direct output
    const result = await childTask.triggerAndWait({ data: "value" });
    if (result.ok) {
      console.log("Output:", result.output);
    } else {
      console.error("Failed:", result.error);
    }

    // Quick unwrap (throws on error)
    const output = await childTask.triggerAndWait({ data: "value" }).unwrap();

    // Batch with wait
    const results = await childTask.batchTriggerAndWait([
      { payload: { data: "item1" } },
      { payload: { data: "item2" } },
    ]);
  },
});
ts
export const parentTask = task({
  id: "parent-task",
  run: async (payload) => {
    // 即发即弃
    const handle = await childTask.trigger({ data: "value" });

    // 等待结果 - 返回Result对象,而非直接输出
    const result = await childTask.triggerAndWait({ data: "value" });
    if (result.ok) {
      console.log("Output:", result.output);
    } else {
      console.error("Failed:", result.error);
    }

    // 快速解包(出错时抛出异常)
    const output = await childTask.triggerAndWait({ data: "value" }).unwrap();

    // 批量触发并等待
    const results = await childTask.batchTriggerAndWait([
      { payload: { data: "item1" } },
      { payload: { data: "item2" } },
    ]);
  },
});

Waits

等待操作

ts
import { task, wait } from "@trigger.dev/sdk";

export const taskWithWaits = task({
  id: "task-with-waits",
  run: async (payload) => {
    await wait.for({ seconds: 30 });
    await wait.for({ minutes: 5 });
    await wait.until({ date: new Date("2024-12-25") });

    // Wait for external approval
    await wait.forToken({
      token: "user-approval-token",
      timeoutInSeconds: 3600,
    });
  },
});
Waits > 5 seconds are checkpointed and don't count toward compute.
ts
import { task, wait } from "@trigger.dev/sdk";

export const taskWithWaits = task({
  id: "task-with-waits",
  run: async (payload) => {
    await wait.for({ seconds: 30 });
    await wait.for({ minutes: 5 });
    await wait.until({ date: new Date("2024-12-25") });

    // 等待外部审批
    await wait.forToken({
      token: "user-approval-token",
      timeoutInSeconds: 3600,
    });
  },
});
等待时长超过5秒的操作会被 checkpoint 保存,不计入计算资源消耗。

Concurrency & Queues

并发与队列

ts
import { task, queue } from "@trigger.dev/sdk";

// Shared queue
const emailQueue = queue({
  name: "email-processing",
  concurrencyLimit: 5,
});

// Task-level concurrency
export const oneAtATime = task({
  id: "sequential-task",
  queue: { concurrencyLimit: 1 },
  run: async (payload) => {
    // Only one instance runs at a time
  },
});

// Use shared queue
export const emailTask = task({
  id: "send-email",
  queue: emailQueue,
  run: async (payload) => {},
});

// Per-tenant concurrency (at trigger time)
await childTask.trigger(payload, {
  queue: {
    name: `user-${userId}`,
    concurrencyLimit: 2,
  },
});
ts
import { task, queue } from "@trigger.dev/sdk";

// 共享队列
const emailQueue = queue({
  name: "email-processing",
  concurrencyLimit: 5,
});

// 任务级并发控制
export const oneAtATime = task({
  id: "sequential-task",
  queue: { concurrencyLimit: 1 },
  run: async (payload) => {
    // 同一时间仅运行一个实例
  },
});

// 使用共享队列
export const emailTask = task({
  id: "send-email",
  queue: emailQueue,
  run: async (payload) => {},
});

// 按租户并发控制(触发时配置)
await childTask.trigger(payload, {
  queue: {
    name: `user-${userId}`,
    concurrencyLimit: 2,
  },
});

Debouncing

防抖

Consolidate rapid triggers into a single execution:
ts
await myTask.trigger(
  { userId: "123" },
  {
    debounce: {
      key: "user-123-update",
      delay: "5s",
      mode: "trailing", // Use latest payload (default: "leading")
    },
  }
);
将频繁触发的请求合并为单次执行:
ts
await myTask.trigger(
  { userId: "123" },
  {
    debounce: {
      key: "user-123-update",
      delay: "5s",
      mode: "trailing", // 使用最新负载(默认值:"leading")
    },
  }
);

Idempotency

幂等性

ts
import { task, idempotencyKeys } from "@trigger.dev/sdk";

export const paymentTask = task({
  id: "process-payment",
  run: async (payload: { orderId: string }) => {
    const key = await idempotencyKeys.create(`payment-${payload.orderId}`);

    await chargeCustomer.trigger(payload, {
      idempotencyKey: key,
      idempotencyKeyTTL: "24h",
    });
  },
});
ts
import { task, idempotencyKeys } from "@trigger.dev/sdk";

export const paymentTask = task({
  id: "process-payment",
  run: async (payload: { orderId: string }) => {
    const key = await idempotencyKeys.create(`payment-${payload.orderId}`);

    await chargeCustomer.trigger(payload, {
      idempotencyKey: key,
      idempotencyKeyTTL: "24h",
    });
  },
});

Error Handling & Retries

错误处理与重试

ts
import { task, retry, AbortTaskRunError } from "@trigger.dev/sdk";

export const resilientTask = task({
  id: "resilient-task",
  retry: {
    maxAttempts: 10,
    factor: 1.8,
    minTimeoutInMs: 500,
    maxTimeoutInMs: 30_000,
  },
  catchError: async ({ error, ctx }) => {
    if (error.code === "FATAL_ERROR") {
      throw new AbortTaskRunError("Cannot retry");
    }
    return { retryAt: new Date(Date.now() + 60000) };
  },
  run: async (payload) => {
    // Retry specific operations
    const result = await retry.onThrow(
      async () => unstableApiCall(payload),
      { maxAttempts: 3 }
    );

    // HTTP retries with conditions
    const response = await retry.fetch("https://api.example.com", {
      retry: {
        maxAttempts: 5,
        condition: (res, err) => res?.status === 429 || res?.status >= 500,
      },
    });
  },
});
ts
import { task, retry, AbortTaskRunError } from "@trigger.dev/sdk";

export const resilientTask = task({
  id: "resilient-task",
  retry: {
    maxAttempts: 10,
    factor: 1.8,
    minTimeoutInMs: 500,
    maxTimeoutInMs: 30_000,
  },
  catchError: async ({ error, ctx }) => {
    if (error.code === "FATAL_ERROR") {
      throw new AbortTaskRunError("无法重试");
    }
    return { retryAt: new Date(Date.now() + 60000) };
  },
  run: async (payload) => {
    // 针对特定操作重试
    const result = await retry.onThrow(
      async () => unstableApiCall(payload),
      { maxAttempts: 3 }
    );

    // 带条件的HTTP重试
    const response = await retry.fetch("https://api.example.com", {
      retry: {
        maxAttempts: 5,
        condition: (res, err) => res?.status === 429 || res?.status >= 500,
      },
    });
  },
});

Scheduled Tasks (Cron)

调度任务(Cron)

ts
import { schedules } from "@trigger.dev/sdk";

// Declarative schedule
export const dailyTask = schedules.task({
  id: "daily-cleanup",
  cron: "0 0 * * *", // Midnight UTC
  run: async (payload) => {
    // payload.timestamp - scheduled time
    // payload.timezone - IANA timezone
    // payload.scheduleId - schedule identifier
  },
});

// With timezone
export const tokyoTask = schedules.task({
  id: "tokyo-morning",
  cron: { pattern: "0 9 * * *", timezone: "Asia/Tokyo" },
  run: async () => {},
});

// Dynamic/multi-tenant schedules
await schedules.create({
  task: "reminder-task",
  cron: "0 8 * * *",
  timezone: "America/New_York",
  externalId: userId,
  deduplicationKey: `${userId}-daily`,
});
ts
import { schedules } from "@trigger.dev/sdk";

// 声明式调度
export const dailyTask = schedules.task({
  id: "daily-cleanup",
  cron: "0 0 * * *", // UTC时间午夜
  run: async (payload) => {
    // payload.timestamp - 调度时间
    // payload.timezone - IANA时区
    // payload.scheduleId - 调度标识符
  },
});

// 指定时区
export const tokyoTask = schedules.task({
  id: "tokyo-morning",
  cron: { pattern: "0 9 * * *", timezone: "Asia/Tokyo" },
  run: async () => {},
});

// 动态/多租户调度
await schedules.create({
  task: "reminder-task",
  cron: "0 8 * * *",
  timezone: "America/New_York",
  externalId: userId,
  deduplicationKey: `${userId}-daily`,
});

Metadata & Progress

元数据与进度跟踪

ts
import { task, metadata } from "@trigger.dev/sdk";

export const batchProcessor = task({
  id: "batch-processor",
  run: async (payload: { items: any[] }) => {
    metadata.set("progress", 0).set("total", payload.items.length);

    for (let i = 0; i < payload.items.length; i++) {
      await processItem(payload.items[i]);
      metadata.set("progress", ((i + 1) / payload.items.length) * 100);
    }

    metadata.set("status", "completed");
  },
});
ts
import { task, metadata } from "@trigger.dev/sdk";

export const batchProcessor = task({
  id: "batch-processor",
  run: async (payload: { items: any[] }) => {
    metadata.set("progress", 0).set("total", payload.items.length);

    for (let i = 0; i < payload.items.length; i++) {
      await processItem(payload.items[i]);
      metadata.set("progress", ((i + 1) / payload.items.length) * 100);
    }

    metadata.set("status", "completed");
  },
});

Tags

标签

ts
import { task, tags } from "@trigger.dev/sdk";

export const processUser = task({
  id: "process-user",
  run: async (payload: { userId: string }) => {
    await tags.add(`user_${payload.userId}`);
  },
});

// Trigger with tags
await processUser.trigger(
  { userId: "123" },
  { tags: ["priority", "user_123"] }
);
ts
import { task, tags } from "@trigger.dev/sdk";

export const processUser = task({
  id: "process-user",
  run: async (payload: { userId: string }) => {
    await tags.add(`user_${payload.userId}`);
  },
});

// 带标签触发
await processUser.trigger(
  { userId: "123" },
  { tags: ["priority", "user_123"] }
);

Machine Presets

机器预设

ts
export const heavyTask = task({
  id: "heavy-computation",
  machine: { preset: "large-2x" }, // 8 vCPU, 16 GB RAM
  maxDuration: 1800, // 30 minutes
  run: async (payload) => {},
});
PresetvCPURAM
micro0.250.25 GB
small-1x0.50.5 GB (default)
small-2x11 GB
medium-1x12 GB
medium-2x24 GB
large-1x48 GB
large-2x816 GB
ts
export const heavyTask = task({
  id: "heavy-computation",
  machine: { preset: "large-2x" }, // 8核CPU,16GB内存
  maxDuration: 1800, // 30分钟
  run: async (payload) => {},
});
预设规格vCPU内存
micro0.250.25 GB
small-1x0.50.5 GB(默认值)
small-2x11 GB
medium-1x12 GB
medium-2x24 GB
large-1x48 GB
large-2x816 GB

Best Practices

最佳实践

  1. Make tasks idempotent — safe to retry without side effects
  2. Use queues to prevent overwhelming external services
  3. Configure appropriate retries with exponential backoff
  4. Track progress with metadata for long-running tasks
  5. Use debouncing for user activity and webhook bursts
  6. Match machine size to computational requirements
See
references/
for detailed documentation on each feature.
  1. 确保任务幂等性 — 重试时不会产生副作用
  2. 使用队列避免压垮外部服务
  3. 配置合适的重试策略,使用指数退避算法
  4. 通过元数据跟踪进度,适用于长时间运行的任务
  5. 对用户操作和Webhook突发流量使用防抖
  6. 根据计算需求匹配机器规格
详见
references/
目录下的各功能详细文档。