inngest-durable-functions

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Inngest Durable Functions

Inngest 持久化函数

Master Inngest's durable execution model for building fault-tolerant, long-running workflows. This skill covers the complete lifecycle from triggers to error handling.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
掌握Inngest的持久化执行模型,用于构建容错、长时间运行的工作流。本技能涵盖从触发器到错误处理的完整生命周期。
本技能聚焦于TypeScript。若使用Python或Go,请参考Inngest文档获取语言专属指导。核心概念适用于所有语言。

Core Concepts You Need to Know

你需要了解的核心概念

Durable Execution Model

持久化执行模型

  • Each step should encapsulate side-effects and non-deterministic code
  • Memoization prevents re-execution of completed steps
  • State persistence survives infrastructure failures
  • Automatic retries with configurable retry count
  • 每个步骤应封装副作用和非确定性代码
  • 记忆化可防止已完成步骤的重复执行
  • 状态持久化可在基础设施故障时保留状态
  • 自动重试,支持配置重试次数

Step Execution Flow

步骤执行流程

typescript
// ❌ BAD: Non-deterministic logic outside steps
async ({ event, step }) => {
  const timestamp = Date.now(); // This runs multiple times!

  const result = await step.run("process-data", () => {
    return processData(event.data);
  });
};

// ✅ GOOD: All non-deterministic logic in steps
async ({ event, step }) => {
  const result = await step.run("process-with-timestamp", () => {
    const timestamp = Date.now(); // Only runs once
    return processData(event.data, timestamp);
  });
};
typescript
// ❌ 不良示例:步骤外存在非确定性逻辑
async ({ event, step }) => {
  const timestamp = Date.now(); // 这段代码会多次执行!

  const result = await step.run("process-data", () => {
    return processData(event.data);
  });
};

// ✅ 良好示例:所有非确定性逻辑都放在步骤内
async ({ event, step }) => {
  const result = await step.run("process-with-timestamp", () => {
    const timestamp = Date.now(); // 仅执行一次
    return processData(event.data, timestamp);
  });
};

Function Limits

函数限制

Every Inngest function has these hard limits:
  • Maximum 1,000 steps per function run
  • Maximum 4MB returned data for each step
  • Maximum 32MB combined function run state including, event data, step output, and function output
  • Each step = separate HTTP request (~50-100ms overhead)
If you're hitting these limits, break your function into smaller functions connected via
step.invoke()
or
step.sendEvent()
.
所有Inngest函数都有以下硬性限制:
  • 每个函数运行最多包含1000个步骤
  • 每个步骤返回的数据最大为4MB
  • 函数运行的组合状态(包括事件数据、步骤输出和函数输出)最大为32MB
  • 每个步骤对应一次独立的HTTP请求(约50-100ms的开销)
如果遇到这些限制,请将函数拆分为更小的函数,通过
step.invoke()
step.sendEvent()
连接。

When to Use Steps

何时使用步骤

Always wrap in
step.run()
:
  • API calls and network requests
  • Database reads and writes
  • File I/O operations
  • Any non-deterministic operation
  • Anything you want retried independently on failure
Never wrap in
step.run()
:
  • Pure calculations and data transformations
  • Simple validation logic
  • Deterministic operations with no side effects
  • Logging (use outside steps)
务必用
step.run()
包裹:
  • API调用和网络请求
  • 数据库读写操作
  • 文件I/O操作
  • 任何非确定性操作
  • 任何希望在失败时独立重试的操作
切勿用
step.run()
包裹:
  • 纯计算和数据转换
  • 简单验证逻辑
  • 无副作用的确定性操作
  • 日志记录(放在步骤外执行)

Function Creation

函数创建

Basic Function Structure

基础函数结构

typescript
const processOrder = inngest.createFunction(
  {
    id: "process-order", // Unique, never change this
    retries: 4, // Default: 4 retries per step
    concurrency: 10 // Max concurrent executions
  },
  { event: "order/created" }, // Trigger
  async ({ event, step }) => {
    // Your durable workflow
  }
);
typescript
const processOrder = inngest.createFunction(
  {
    id: "process-order", // 唯一标识,请勿修改
    retries: 4, // 默认:每个步骤重试4次
    concurrency: 10 // 最大并发执行数
  },
  { event: "order/created" }, // 触发器
  async ({ event, step }) => {
    // 你的持久化工作流逻辑
  }
);

Step IDs and Memoization

步骤ID与记忆化

typescript
// Step IDs can be reused - Inngest handles counters automatically
const data = await step.run("fetch-data", () => fetchUserData());
const more = await step.run("fetch-data", () => fetchOrderData()); // Different execution

// Use descriptive IDs for clarity
await step.run("validate-payment", () => validatePayment(event.data.paymentId));
await step.run("charge-customer", () => chargeCustomer(event.data));
await step.run("send-confirmation", () => sendEmail(event.data.email));
typescript
// 步骤ID可重复使用 - Inngest会自动处理计数器
const data = await step.run("fetch-data", () => fetchUserData());
const more = await step.run("fetch-data", () => fetchOrderData()); // 独立的执行实例

// 使用描述性ID提升可读性
await step.run("validate-payment", () => validatePayment(event.data.paymentId));
await step.run("charge-customer", () => chargeCustomer(event.data));
await step.run("send-confirmation", () => sendEmail(event.data.email));

Triggers and Events

触发器与事件

Event Triggers

事件触发器

typescript
// Single event trigger
{ event: "user/signup" }

// Event with conditional filter
{
  event: "user/action",
  if: 'event.data.action == "purchase" && event.data.amount > 100'
}

// Multiple triggers (up to 10)
[
  { event: "user/signup" },
  { event: "user/login", if: 'event.data.firstLogin == true' },
  { cron: "0 9 * * *" } // Daily at 9 AM
]
typescript
// 单个事件触发器
{ event: "user/signup" }

// 带条件过滤的事件
{
  event: "user/action",
  if: 'event.data.action == "purchase" && event.data.amount > 100'
}

// 多个触发器(最多10个)
[
  { event: "user/signup" },
  { event: "user/login", if: 'event.data.firstLogin == true' },
  { cron: "0 9 * * *" } // 每天上午9点执行
]

Cron Triggers

定时触发器

typescript
// Basic cron
{
  cron: "0 */6 * * *";
} // Every 6 hours

// With timezone
{
  cron: "TZ=Europe/Paris 0 12 * * 5";
} // Fridays at noon Paris time

// Combine with events
[
  { event: "manual/report.requested" },
  { cron: "0 0 * * 0" } // Weekly on Sunday
];
typescript
// 基础定时任务
{
  cron: "0 */6 * * *";
} // 每6小时执行一次

// 带时区的定时任务
{
  cron: "TZ=Europe/Paris 0 12 * * 5";
} // 巴黎时间每周五中午执行

// 与事件触发器组合
[
  { event: "manual/report.requested" },
  { cron: "0 0 * * 0" } // 每周日执行
];

Function Invocation

函数调用

typescript
// Invoke another function as a step
const result = await step.invoke("generate-report", {
  function: generateReportFunction,
  data: { userId: event.data.userId }
});

// Use returned data
await step.run("process-report", () => {
  return processReport(result);
});
typescript
// 在步骤中调用另一个函数
const result = await step.invoke("generate-report", {
  function: generateReportFunction,
  data: { userId: event.data.userId }
});

// 使用返回的数据
await step.run("process-report", () => {
  return processReport(result);
});

Idempotency Strategies

幂等性策略

Event-Level Idempotency (Producer Side)

事件级幂等性(生产者端)

typescript
// Prevent duplicate events with custom ID
await inngest.send({
  id: `checkout-completed-${cartId}`, // 24-hour deduplication
  name: "cart/checkout.completed",
  data: { cartId, email: "user@example.com" }
});
typescript
// 使用自定义ID防止重复事件
await inngest.send({
  id: `checkout-completed-${cartId}`, // 24小时去重
  name: "cart/checkout.completed",
  data: { cartId, email: "user@example.com" }
});

Function-Level Idempotency (Consumer Side)

函数级幂等性(消费者端)

typescript
const sendEmail = inngest.createFunction(
  {
    id: "send-checkout-email",
    // Only run once per cartId per 24 hours
    idempotency: "event.data.cartId"
  },
  { event: "cart/checkout.completed" },
  async ({ event, step }) => {
    // This function won't run twice for same cartId
  }
);

// Complex idempotency keys
const processUserAction = inngest.createFunction(
  {
    id: "process-user-action",
    // Unique per user + organization combination
    idempotency: 'event.data.userId + "-" + event.data.organizationId'
  },
  { event: "user/action.performed" },
  async ({ event, step }) => {
    /* ... */
  }
);
typescript
const sendEmail = inngest.createFunction(
  {
    id: "send-checkout-email",
    // 每个cartId在24小时内仅执行一次
    idempotency: "event.data.cartId"
  },
  { event: "cart/checkout.completed" },
  async ({ event, step }) => {
    // 相同cartId不会触发多次执行
  }
);

// 复杂幂等键
const processUserAction = inngest.createFunction(
  {
    id: "process-user-action",
    // 按用户+组织的组合生成唯一键
    idempotency: 'event.data.userId + "-" + event.data.organizationId'
  },
  { event: "user/action.performed" },
  async ({ event, step }) => {
    /* ... */
  }
);

Cancellation Patterns

取消模式

Event-Based Cancellation

基于事件的取消

In expressions,
event
= the original triggering event,
async
= the new event being matched. See Expression Syntax Reference for full details.
typescript
const processOrder = inngest.createFunction(
  {
    id: "process-order",
    cancelOn: [
      {
        event: "order/cancelled",
        if: "event.data.orderId == async.data.orderId"
      }
    ]
  },
  { event: "order/created" },
  async ({ event, step }) => {
    await step.sleepUntil("wait-for-payment", event.data.paymentDue);
    // Will be cancelled if order/cancelled event received
    await step.run("charge-payment", () => processPayment(event.data));
  }
);
在表达式中,
event
原始触发事件,
async
指匹配到的事件。详情请参考表达式语法参考
typescript
const processOrder = inngest.createFunction(
  {
    id: "process-order",
    cancelOn: [
      {
        event: "order/cancelled",
        if: "event.data.orderId == async.data.orderId"
      }
    ]
  },
  { event: "order/created" },
  async ({ event, step }) => {
    await step.sleepUntil("wait-for-payment", event.data.paymentDue);
    // 若收到order/cancelled事件,将被取消
    await step.run("charge-payment", () => processPayment(event.data));
  }
);

Timeout Cancellation

超时取消

typescript
const processWithTimeout = inngest.createFunction(
  {
    id: "process-with-timeout",
    timeouts: {
      start: "5m", // Cancel if not started within 5 minutes
      run: "30m" // Cancel if running longer than 30 minutes
    }
  },
  { event: "long/process.requested" },
  async ({ event, step }) => {
    /* ... */
  }
);
typescript
const processWithTimeout = inngest.createFunction(
  {
    id: "process-with-timeout",
    timeouts: {
      start: "5m", // 若5分钟内未启动则取消
      run: "30m" // 若运行超过30分钟则取消
    }
  },
  { event: "long/process.requested" },
  async ({ event, step }) => {
    /* ... */
  }
);

Handling Cancellation Cleanup

处理取消后的清理工作

typescript
// Listen for cancellation events
const cleanupCancelled = inngest.createFunction(
  { id: "cleanup-cancelled-process" },
  { event: "inngest/function.cancelled" },
  async ({ event, step }) => {
    if (event.data.function_id === "process-order") {
      await step.run("cleanup-resources", () => {
        return cleanupOrderResources(event.data.run_id);
      });
    }
  }
);
typescript
// 监听取消事件
const cleanupCancelled = inngest.createFunction(
  { id: "cleanup-cancelled-process" },
  { event: "inngest/function.cancelled" },
  async ({ event, step }) => {
    if (event.data.function_id === "process-order") {
      await step.run("cleanup-resources", () => {
        return cleanupOrderResources(event.data.run_id);
      });
    }
  }
);

Error Handling and Retries

错误处理与重试

Default Retry Behavior

默认重试行为

  • 5 total attempts (1 initial + 4 retries) per step
  • Exponential backoff with jitter
  • Independent retry counters per step
  • 每个步骤最多5次尝试(1次初始执行+4次重试)
  • 带抖动的指数退避策略
  • 每个步骤有独立的重试计数器

Custom Retry Configuration

自定义重试配置

typescript
const reliableFunction = inngest.createFunction(
  {
    id: "reliable-function",
    retries: 10 // Up to 10 retries per step
  },
  { event: "critical/task" },
  async ({ event, step, attempt }) => {
    // Access attempt number (0-indexed)
    if (attempt > 5) {
      // Different logic for later attempts
    }
  }
);
typescript
const reliableFunction = inngest.createFunction(
  {
    id: "reliable-function",
    retries: 10 // 每个步骤最多重试10次
  },
  { event: "critical/task" },
  async ({ event, step, attempt }) => {
    // 获取尝试次数(从0开始计数)
    if (attempt > 5) {
      // 针对后续尝试的特殊逻辑
    }
  }
);

Non-Retriable Errors

不可重试错误

Prevent retries for code that won't succeed upon retry.
typescript
import { NonRetriableError } from "inngest";

const processUser = inngest.createFunction(
  { id: "process-user" },
  { event: "user/process.requested" },
  async ({ event, step }) => {
    const user = await step.run("fetch-user", async () => {
      const user = await db.users.findOne(event.data.userId);

      if (!user) {
        // Don't retry - user doesn't exist
        throw new NonRetriableError("User not found, stopping execution");
      }

      return user;
    });

    // Continue processing...
  }
);
针对重试也无法成功的代码,禁止重试。
typescript
import { NonRetriableError } from "inngest";

const processUser = inngest.createFunction(
  { id: "process-user" },
  { event: "user/process.requested" },
  async ({ event, step }) => {
    const user = await step.run("fetch-user", async () => {
      const user = await db.users.findOne(event.data.userId);

      if (!user) {
        // 无需重试 - 用户不存在
        throw new NonRetriableError("用户不存在,终止执行");
      }

      return user;
    });

    // 继续处理...
  }
);

Custom Retry Timing

自定义重试时机

typescript
import { RetryAfterError } from "inngest";

const respectRateLimit = inngest.createFunction(
  { id: "api-call" },
  { event: "api/call.requested" },
  async ({ event, step }) => {
    await step.run("call-api", async () => {
      const response = await externalAPI.call(event.data);

      if (response.status === 429) {
        // Retry after specific time from API
        const retryAfter = response.headers["retry-after"];
        throw new RetryAfterError("Rate limited", `${retryAfter}s`);
      }

      return response.data;
    });
  }
);
typescript
import { RetryAfterError } from "inngest";

const respectRateLimit = inngest.createFunction(
  { id: "api-call" },
  { event: "api/call.requested" },
  async ({ event, step }) => {
    await step.run("call-api", async () => {
      const response = await externalAPI.call(event.data);

      if (response.status === 429) {
        // 根据API指定的时间重试
        const retryAfter = response.headers["retry-after"];
        throw new RetryAfterError("请求频率受限", `${retryAfter}s`);
      }

      return response.data;
    });
  }
);

Logging Best Practices

日志记录最佳实践

Proper Logging Setup

正确配置日志

typescript
import winston from "winston";

// Configure logger
const logger = winston.createLogger({
  level: "info",
  format: winston.format.json(),
  transports: [new winston.transports.Console()]
});

const inngest = new Inngest({
  id: "my-app",
  logger // Pass logger to client
});
typescript
import winston from "winston";

// 配置日志器
const logger = winston.createLogger({
  level: "info",
  format: winston.format.json(),
  transports: [new winston.transports.Console()]
});

const inngest = new Inngest({
  id: "my-app",
  logger // 将日志器传递给客户端
});

Function Logging Patterns

函数日志记录模式

typescript
const processData = inngest.createFunction(
  { id: "process-data" },
  { event: "data/process.requested" },
  async ({ event, step, logger }) => {
    // ✅ GOOD: Log inside steps to avoid duplicates
    const result = await step.run("fetch-data", async () => {
      logger.info("Fetching data for user", { userId: event.data.userId });
      return await fetchUserData(event.data.userId);
    });

    // ❌ AVOID: Logging outside steps can duplicate
    // logger.info("Processing complete"); // This could run multiple times!

    await step.run("log-completion", async () => {
      logger.info("Processing complete", { resultCount: result.length });
    });
  }
);
typescript
const processData = inngest.createFunction(
  { id: "process-data" },
  { event: "data/process.requested" },
  async ({ event, step, logger }) => {
    // ✅ 最佳实践:在步骤内记录日志以避免重复
    const result = await step.run("fetch-data", async () => {
      logger.info("为用户获取数据", { userId: event.data.userId });
      return await fetchUserData(event.data.userId);
    });

    // ❌ 避免:在步骤外记录日志会导致重复
    // logger.info("处理完成"); // 这段代码可能会执行多次!

    await step.run("log-completion", async () => {
      logger.info("处理完成", { resultCount: result.length });
    });
  }
);

Performance Optimization

性能优化

Checkpointing

检查点机制

typescript
// Enable checkpointing for lower latency
const realTimeFunction = inngest.createFunction(
  {
    id: "real-time-function",
    checkpointing: {
      maxRuntime: "300s", // Max continuous execution time
      bufferedSteps: 2, // Buffer 2 steps before checkpointing
      maxInterval: "10s" // Max wait before checkpoint
    }
  },
  { event: "realtime/process" },
  async ({ event, step }) => {
    // Steps execute immediately with periodic checkpointing
    const result1 = await step.run("step-1", () => process1(event.data));
    const result2 = await step.run("step-2", () => process2(result1));
    return { result2 };
  }
);
typescript
// 启用检查点以降低延迟
const realTimeFunction = inngest.createFunction(
  {
    id: "real-time-function",
    checkpointing: {
      maxRuntime: "300s", // 最大连续执行时间
      bufferedSteps: 2, // 检查点前缓存2个步骤
      maxInterval: "10s" // 检查点的最大间隔时间
    }
  },
  { event: "realtime/process" },
  async ({ event, step }) => {
    // 步骤会立即执行,同时定期创建检查点
    const result1 = await step.run("step-1", () => process1(event.data));
    const result2 = await step.run("step-2", () => process2(result1));
    return { result2 };
  }
);

Advanced Patterns

高级模式

Conditional Step Execution

条件步骤执行

typescript
const conditionalProcess = inngest.createFunction(
  { id: "conditional-process" },
  { event: "process/conditional" },
  async ({ event, step }) => {
    const userData = await step.run("fetch-user", () => {
      return getUserData(event.data.userId);
    });

    // Conditional step execution
    if (userData.isPremium) {
      await step.run("premium-processing", () => {
        return processPremiumFeatures(userData);
      });
    }

    // Always runs
    await step.run("standard-processing", () => {
      return processStandardFeatures(userData);
    });
  }
);
typescript
const conditionalProcess = inngest.createFunction(
  { id: "conditional-process" },
  { event: "process/conditional" },
  async ({ event, step }) => {
    const userData = await step.run("fetch-user", () => {
      return getUserData(event.data.userId);
    });

    // 条件性执行步骤
    if (userData.isPremium) {
      await step.run("premium-processing", () => {
        return processPremiumFeatures(userData);
      });
    }

    // 始终执行的步骤
    await step.run("standard-processing", () => {
      return processStandardFeatures(userData);
    });
  }
);

Error Recovery Patterns

错误恢复模式

typescript
const robustProcess = inngest.createFunction(
  { id: "robust-process" },
  { event: "process/robust" },
  async ({ event, step }) => {
    let primaryResult;

    try {
      primaryResult = await step.run("primary-service", () => {
        return callPrimaryService(event.data);
      });
    } catch (error) {
      // Fallback to secondary service
      primaryResult = await step.run("fallback-service", () => {
        return callSecondaryService(event.data);
      });
    }

    return { result: primaryResult };
  }
);
typescript
const robustProcess = inngest.createFunction(
  { id: "robust-process" },
  { event: "process/robust" },
  async ({ event, step }) => {
    let primaryResult;

    try {
      primaryResult = await step.run("primary-service", () => {
        return callPrimaryService(event.data);
      });
    } catch (error) {
      // 回退到备用服务
      primaryResult = await step.run("fallback-service", () => {
        return callSecondaryService(event.data);
      });
    }

    return { result: primaryResult };
  }
);

Common Mistakes to Avoid

需要避免的常见错误

  1. ❌ Non-deterministic code outside steps
  2. ❌ Database calls outside steps
  3. ❌ Logging outside steps (causes duplicates)
  4. ❌ Changing step IDs after deployment
  5. ❌ Not handling NonRetriableError cases
  6. ❌ Ignoring idempotency for critical functions
  1. ❌ 步骤外存在非确定性代码
  2. ❌ 数据库操作放在步骤外
  3. ❌ 日志记录放在步骤外(会导致重复)
  4. ❌ 部署后修改步骤ID
  5. ❌ 未处理NonRetriableError场景
  6. ❌ 关键函数未考虑幂等性

Next Steps

后续步骤

  • See inngest-steps for detailed step method reference
  • See references/step-execution.md for detailed step patterns
  • See references/error-handling.md for comprehensive error strategies
  • See references/observability.md for monitoring and tracing setup
  • See references/checkpointing.md for performance optimization details

This skill covers Inngest's durable function patterns. For event sending and webhook handling, see the
inngest-events
skill.
  • 查看inngest-steps获取详细的步骤方法参考
  • 查看references/step-execution.md获取详细的步骤模式
  • 查看references/error-handling.md获取全面的错误处理策略
  • 查看references/observability.md获取监控与追踪配置方法
  • 查看references/checkpointing.md获取性能优化细节

本技能涵盖Inngest的持久化函数模式。关于事件发送和Webhook处理,请查看
inngest-events
技能。