effect-ts

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Effect-TS Expert Guide

Effect-TS 专家指南

Effect-TS is a functional TypeScript library providing typed effects, structured concurrency, and a robust runtime. This skill covers correct usage patterns and addresses common misconceptions from LLM-generated content.
Effect-TS 是一个函数式TypeScript库,提供类型化副作用、结构化并发以及健壮的运行时。本指南涵盖正确的使用模式,并纠正由大语言模型生成内容中的常见误区。

Quick Reference

快速参考

typescript
import { Effect, Layer, Context, Fiber, Schedule, Cache, Scope } from "effect";
import { Schema, JSONSchema } from "@effect/schema";
Core Type Signature:
typescript
Effect<Success, Error, Requirements>
//      ↑        ↑       ↑
//      |        |       └── Dependencies (provided via Layers)
//      |        └── Expected errors (typed, must be handled)
//      └── Success value

typescript
import { Effect, Layer, Context, Fiber, Schedule, Cache, Scope } from "effect";
import { Schema, JSONSchema } from "@effect/schema";
核心类型签名:
typescript
Effect<Success, Error, Requirements>
//      ↑        ↑       ↑
//      |        |       └── 依赖项(通过Layer提供)
//      |        └── 预期错误(已类型化,必须处理)
//      └── 成功返回值

Common Misconceptions

常见误区

LLM outputs often contain incorrect APIs. Use this table to correct them:
Wrong (common in AI outputs)Correct
Effect.cachedWithTTL(...)
Cache.make({ timeToLive: Duration })
Effect.cachedInvalidateWithTTL(...)
cache.invalidate(key)
/
cache.invalidateAll()
Effect.match(...)
Effect.either
+
Either.match
, or
Effect.catchTag
"thread-local storage""fiber-local storage" via
FiberRef
JSON Schema Draft 2020-12
@effect/schema
generates Draft-07
fibers are "cancelled"fibers are "terminated" or "interrupted"
all queues have back-pressureonly bounded queues; sliding/dropping do not
--only=production
--omit=dev
(npm 7+)

大语言模型生成的内容常包含错误API。 使用下表进行纠正:
错误用法(AI生成内容中常见)正确用法
Effect.cachedWithTTL(...)
Cache.make({ timeToLive: Duration })
Effect.cachedInvalidateWithTTL(...)
cache.invalidate(key)
/
cache.invalidateAll()
Effect.match(...)
Effect.either
+
Either.match
, 或
Effect.catchTag
"thread-local storage"基于
FiberRef
的"fiber-local storage"
JSON Schema Draft 2020-12
@effect/schema
生成 Draft-07 版本
fibers are "cancelled"fibers 是"terminated"(终止)或"interrupted"(中断)
所有队列都具备背压有界队列具备;滑动/丢弃队列不具备
--only=production
--omit=dev
(npm 7+)

Error Handling: Two Error Types

错误处理:两种错误类型

Effect distinguishes between:
  1. Expected Errors (Failures) - Typed in
    E
    channel, must be handled
  2. Unexpected Errors (Defects) - Runtime bugs, captured but not typed
typescript
// Expected error - typed
const fetchUser = (id: string): Effect.Effect<User, UserNotFoundError | NetworkError> => ...

// Handle expected errors
const handled = pipe(
  fetchUser("123"),
  Effect.catchTag("UserNotFoundError", (e) => Effect.succeed(defaultUser)),
  Effect.catchTag("NetworkError", (e) => Effect.retry(schedule))
);

// Unexpected errors (defects) - captured by runtime
Effect.catchAllDefect(program, (defect) =>
  Console.error("Unexpected error", defect)
);

Effect 区分以下两种错误:
  1. 预期错误(Failures) - 在
    E
    通道中类型化,必须处理
  2. 意外错误(Defects) - 运行时Bug,会被捕获但未类型化
typescript
// 预期错误 - 已类型化
const fetchUser = (id: string): Effect.Effect<User, UserNotFoundError | NetworkError> => ...

// 处理预期错误
const handled = pipe(
  fetchUser("123"),
  Effect.catchTag("UserNotFoundError", (e) => Effect.succeed(defaultUser)),
  Effect.catchTag("NetworkError", (e) => Effect.retry(schedule))
);

// 意外错误(缺陷)- 由运行时捕获
Effect.catchAllDefect(program, (defect) =>
  Console.error("Unexpected error", defect)
);

Fibers & Cancellation (Critical for MCP)

Fiber与中断(对MCP至关重要)

Fibers are lightweight virtual threads with native interruption:
typescript
// Fork a fiber
const fiber = yield* Effect.fork(longRunningTask);

// Interrupt it (e.g., when MCP client disconnects)
yield* Fiber.interrupt(fiber);

// Structured concurrency: child fibers auto-terminate with parent
const parent = Effect.gen(function* () {
  yield* Effect.fork(backgroundTask);  // Auto-interrupted when parent ends
  yield* mainTask;
});

// Daemon fibers outlive their parent
yield* Effect.forkDaemon(longLivedBackgroundTask);

Fiber是具备原生中断能力的轻量级虚拟线程:
typescript
// 启动一个Fiber
const fiber = yield* Effect.fork(longRunningTask);

// 中断它(例如当MCP客户端断开连接时)
yield* Fiber.interrupt(fiber);

// 结构化并发:子Fiber随父Fiber自动终止
const parent = Effect.gen(function* () {
  yield* Effect.fork(backgroundTask);  // 父Fiber结束时自动中断
  yield* mainTask;
});

// 守护Fiber的生命周期超过父Fiber
yield* Effect.forkDaemon(longLivedBackgroundTask);

Concurrency Primitives

并发原语

Effect.race - First Wins, Losers Interrupted

Effect.race - 先完成者获胜,失败者自动中断

typescript
// First to succeed wins; other is automatically interrupted
const result = yield* Effect.race(
  fetchFromCache,
  fetchFromDatabase
);
typescript
// 先成功的任务获胜;另一个会自动中断
const result = yield* Effect.race(
  fetchFromCache,
  fetchFromDatabase
);

Effect.all with Concurrency Control

带并发控制的Effect.all

typescript
// Process 50 documents with max 5 concurrent
const results = yield* Effect.all(documents.map(processDoc), {
  concurrency: 5  // NOT a "worker pool" - limits concurrent tasks
});
typescript
// 处理50个文档,最大并发数为5
const results = yield* Effect.all(documents.map(processDoc), {
  concurrency: 5  // 不是“工作池” - 仅限制并发任务数
});

Queue Types

队列类型

typescript
// Bounded - applies back-pressure (offer suspends when full)
const bounded = yield* Queue.bounded<string>(100);

// Dropping - discards new items when full (no back-pressure)
const dropping = yield* Queue.dropping<string>(100);

// Sliding - discards oldest items when full (no back-pressure)
const sliding = yield* Queue.sliding<string>(100);

typescript
// 有界队列 - 应用背压(队列满时offer操作会挂起)
const bounded = yield* Queue.bounded<string>(100);

// 丢弃队列 - 队列满时丢弃新项(无背压)
const dropping = yield* Queue.dropping<string>(100);

// 滑动队列 - 队列满时丢弃最旧项(无背压)
const sliding = yield* Queue.sliding<string>(100);

Layers for Dependency Injection

用于依赖注入的Layer

Layers construct services without leaking dependencies:
typescript
// Define a service
class Database extends Context.Tag("Database")<
  Database,
  { query: (sql: string) => Effect.Effect<Result> }
>() {}

// Create layer (dependencies handled at construction)
const DatabaseLive = Layer.effect(
  Database,
  Effect.gen(function* () {
    const config = yield* Config;  // Dependency injected here
    return {
      query: (sql) => Effect.tryPromise(() => runQuery(sql, config))
    };
  })
);

// Provide to program
const runnable = program.pipe(Effect.provide(DatabaseLive));

// For testing - swap implementation
const DatabaseTest = Layer.succeed(Database, {
  query: () => Effect.succeed(mockResult)
});

Layer可在不暴露依赖的情况下构造服务:
typescript
// 定义服务
class Database extends Context.Tag("Database")<
  Database,
  { query: (sql: string) => Effect.Effect<Result> }
>() {}

// 创建Layer(依赖在构造时处理)
const DatabaseLive = Layer.effect(
  Database,
  Effect.gen(function* () {
    const config = yield* Config;  // 此处注入依赖
    return {
      query: (sql) => Effect.tryPromise(() => runQuery(sql, config))
    };
  })
);

// 为程序提供依赖
const runnable = program.pipe(Effect.provide(DatabaseLive));

// 测试用 - 替换实现
const DatabaseTest = Layer.succeed(Database, {
  query: () => Effect.succeed(mockResult)
});

Resource Management

资源管理

Effect.ensuring - Always Runs Finalizer

Effect.ensuring - 始终运行终结器

typescript
const program = pipe(
  Effect.tryPromise(() => openConnection()),
  Effect.ensuring(Console.log("Cleanup"))  // Runs on success, failure, OR interrupt
);
typescript
const program = pipe(
  Effect.tryPromise(() => openConnection()),
  Effect.ensuring(Console.log("Cleanup"))  // 无论成功、失败或中断都会运行
);

acquireUseRelease Pattern

acquireUseRelease模式

typescript
const withConnection = Effect.acquireUseRelease(
  Effect.tryPromise(() => db.connect()),     // Acquire
  (conn) => Effect.tryPromise(() => conn.query("SELECT *")),  // Use
  (conn) => Effect.promise(() => conn.close())  // Release (always runs)
);
typescript
const withConnection = Effect.acquireUseRelease(
  Effect.tryPromise(() => db.connect()),     // 获取资源
  (conn) => Effect.tryPromise(() => conn.query("SELECT *")),  // 使用资源
  (conn) => Effect.promise(() => conn.close())  // 释放资源(始终运行)
);

Scope for Resource Lifecycle

用于资源生命周期的Scope

typescript
Effect.scoped(
  Effect.gen(function* () {
    const file = yield* openFile("data.txt");  // Acquired
    const data = yield* file.read();
    return data;
  })  // File automatically released when scope closes
);

typescript
Effect.scoped(
  Effect.gen(function* () {
    const file = yield* openFile("data.txt");  // 获取资源
    const data = yield* file.read();
    return data;
  })  // Scope关闭时自动释放文件
);

Caching

缓存

There is no
Effect.cachedWithTTL
.
Use the Cache module:
typescript
import { Cache } from "effect";

const cache = yield* Cache.make({
  capacity: 100,
  timeToLive: Duration.minutes(5),
  lookup: (key: string) => fetchExpensiveData(key)
});

// Use the cache
const value = yield* cache.get("my-key");

// Invalidate
yield* cache.invalidate("my-key");
yield* cache.invalidateAll();

不存在
Effect.cachedWithTTL
方法。
请使用Cache模块:
typescript
import { Cache } from "effect";

const cache = yield* Cache.make({
  capacity: 100,
  timeToLive: Duration.minutes(5),
  lookup: (key: string) => fetchExpensiveData(key)
});

// 使用缓存
const value = yield* cache.get("my-key");

// 失效缓存
yield* cache.invalidate("my-key");
yield* cache.invalidateAll();

Retry with Schedule

结合Schedule的重试机制

typescript
import { Schedule } from "effect";

// Retry 3 times with exponential backoff
const policy = Schedule.exponential("100 millis").pipe(
  Schedule.intersect(Schedule.recurs(3))
);

const robust = Effect.retry(unstableOperation, policy);

// Retry until condition
const untilSuccess = Effect.retry(operation, {
  until: (err) => err.code === "RATE_LIMITED"
});

typescript
import { Schedule } from "effect";

// 指数退避重试3次
const policy = Schedule.exponential("100 millis").pipe(
  Schedule.intersect(Schedule.recurs(3))
);

const robust = Effect.retry(unstableOperation, policy);

// 重试直到满足条件
const untilSuccess = Effect.retry(operation, {
  until: (err) => err.code === "RATE_LIMITED"
});

Schema & JSON Schema

Schema与JSON Schema

@effect/schema
generates JSON Schema Draft-07 (not 2020-12):
typescript
import { Schema, JSONSchema } from "@effect/schema";

const User = Schema.Struct({
  id: Schema.String,
  age: Schema.Number.pipe(Schema.positive())
});

// Generate JSON Schema (Draft-07)
const jsonSchema = JSONSchema.make(User);
// { "$schema": "http://json-schema.org/draft-07/schema#", ... }

// Decode (parse)
const user = Schema.decodeUnknownSync(User)(rawData);

// Encode
const json = Schema.encodeSync(User)(user);

@effect/schema
生成 JSON Schema Draft-07 版本(而非2020-12):
typescript
import { Schema, JSONSchema } from "@effect/schema";

const User = Schema.Struct({
  id: Schema.String,
  age: Schema.Number.pipe(Schema.positive())
});

// 生成JSON Schema(Draft-07)
const jsonSchema = JSONSchema.make(User);
// { "$schema": "http://json-schema.org/draft-07/schema#", ... }

// 解码(解析)
const user = Schema.decodeUnknownSync(User)(rawData);

// 编码
const json = Schema.encodeSync(User)(user);

Observability & OpenTelemetry

可观测性与OpenTelemetry

Effect has native OpenTelemetry integration:
typescript
import { NodeSdk } from "@effect/opentelemetry";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";

// Add tracing to any effect
const traced = Effect.withSpan("processRequest")(myEffect);

// Logging with context
yield* Effect.log("Processing request");
yield* Effect.annotateLogs("requestId", "abc-123");

// FiberRef for fiber-local context propagation
const RequestId = FiberRef.unsafeMake<string>("");
yield* FiberRef.set(RequestId, "req-456");

Effect具备原生OpenTelemetry集成:
typescript
import { NodeSdk } from "@effect/opentelemetry";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";

// 为任意Effect添加追踪
const traced = Effect.withSpan("processRequest")(myEffect);

// 带上下文的日志
yield* Effect.log("Processing request");
yield* Effect.annotateLogs("requestId", "abc-123");

// 用于Fiber本地上下文传播的FiberRef
const RequestId = FiberRef.unsafeMake<string>("");
yield* FiberRef.set(RequestId, "req-456");

When NOT to Use Effect

不建议使用Effect的场景

ScenarioRecommendation
Simple MCP tool (< 100 LOC)Use FastMCP or vanilla SDK
Team unfamiliar with FPSteep learning curve; consider NestJS
Bundle size criticalEffect adds 15-25kb gzipped minimum
Existing NestJS/TypeORM codebaseImpedance mismatch with class-based DI

场景建议
简单MCP工具(代码量<100行)使用FastMCP或原生SDK
团队不熟悉函数式编程学习曲线陡峭;可考虑NestJS
包体积要求严格Effect最小压缩后体积为15-25kb
已有NestJS/TypeORM代码库与基于类的依赖注入存在阻抗不匹配

MCP Server Patterns

MCP服务器模式

Tool Handler with Typed Errors

带类型化错误的工具处理器

typescript
const searchTool = Effect.gen(function* () {
  const args = yield* parseArgs(input);
  const db = yield* Database;
  const results = yield* db.query(args.query);
  return formatResults(results);
}).pipe(
  Effect.catchTag("ParseError", () =>
    Effect.fail({ code: -32602, message: "Invalid params" })
  ),
  Effect.catchTag("DatabaseError", () =>
    Effect.fail({ code: -32603, message: "Internal error" })
  )
);
typescript
const searchTool = Effect.gen(function* () {
  const args = yield* parseArgs(input);
  const db = yield* Database;
  const results = yield* db.query(args.query);
  return formatResults(results);
}).pipe(
  Effect.catchTag("ParseError", () =>
    Effect.fail({ code: -32602, message: "Invalid params" })
  ),
  Effect.catchTag("DatabaseError", () =>
    Effect.fail({ code: -32603, message: "Internal error" })
  )
);

Request Scoping

请求作用域

typescript
// Each MCP request gets its own scope
const handleRequest = (request: MCPRequest) =>
  Effect.scoped(
    Effect.gen(function* () {
      // Resources acquired here auto-release when request completes
      const tempFile = yield* createTempFile();
      const result = yield* processRequest(request, tempFile);
      return result;
    })
  );

typescript
// 每个MCP请求拥有独立作用域
const handleRequest = (request: MCPRequest) =>
  Effect.scoped(
    Effect.gen(function* () {
      // 此处获取的资源会在请求完成时自动释放
      const tempFile = yield* createTempFile();
      const result = yield* processRequest(request, tempFile);
      return result;
    })
  );

Resources

资源链接