effect-ts
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseEffect-TS Expert Guide
Effect-TS 专家指南
Effect-TS is a functional TypeScript library providing typed effects, structured concurrency, and a robust runtime. This skill covers correct usage patterns and addresses common misconceptions from LLM-generated content.
Effect-TS 是一个函数式TypeScript库,提供类型化副作用、结构化并发以及健壮的运行时。本指南涵盖正确的使用模式,并纠正由大语言模型生成内容中的常见误区。
Quick Reference
快速参考
typescript
import { Effect, Layer, Context, Fiber, Schedule, Cache, Scope } from "effect";
import { Schema, JSONSchema } from "@effect/schema";Core Type Signature:
typescript
Effect<Success, Error, Requirements>
// ↑ ↑ ↑
// | | └── Dependencies (provided via Layers)
// | └── Expected errors (typed, must be handled)
// └── Success valuetypescript
import { Effect, Layer, Context, Fiber, Schedule, Cache, Scope } from "effect";
import { Schema, JSONSchema } from "@effect/schema";核心类型签名:
typescript
Effect<Success, Error, Requirements>
// ↑ ↑ ↑
// | | └── 依赖项(通过Layer提供)
// | └── 预期错误(已类型化,必须处理)
// └── 成功返回值Common Misconceptions
常见误区
LLM outputs often contain incorrect APIs. Use this table to correct them:
| Wrong (common in AI outputs) | Correct |
|---|---|
| |
| |
| |
| "thread-local storage" | "fiber-local storage" via |
| JSON Schema Draft 2020-12 | |
| fibers are "cancelled" | fibers are "terminated" or "interrupted" |
| all queues have back-pressure | only bounded queues; sliding/dropping do not |
| |
大语言模型生成的内容常包含错误API。 使用下表进行纠正:
| 错误用法(AI生成内容中常见) | 正确用法 |
|---|---|
| |
| |
| |
| "thread-local storage" | 基于 |
| JSON Schema Draft 2020-12 | |
| fibers are "cancelled" | fibers 是"terminated"(终止)或"interrupted"(中断) |
| 所有队列都具备背压 | 仅有界队列具备;滑动/丢弃队列不具备 |
| |
Error Handling: Two Error Types
错误处理:两种错误类型
Effect distinguishes between:
- Expected Errors (Failures) - Typed in channel, must be handled
E - Unexpected Errors (Defects) - Runtime bugs, captured but not typed
typescript
// Expected error - typed
const fetchUser = (id: string): Effect.Effect<User, UserNotFoundError | NetworkError> => ...
// Handle expected errors
const handled = pipe(
fetchUser("123"),
Effect.catchTag("UserNotFoundError", (e) => Effect.succeed(defaultUser)),
Effect.catchTag("NetworkError", (e) => Effect.retry(schedule))
);
// Unexpected errors (defects) - captured by runtime
Effect.catchAllDefect(program, (defect) =>
Console.error("Unexpected error", defect)
);Effect 区分以下两种错误:
- 预期错误(Failures) - 在通道中类型化,必须处理
E - 意外错误(Defects) - 运行时Bug,会被捕获但未类型化
typescript
// 预期错误 - 已类型化
const fetchUser = (id: string): Effect.Effect<User, UserNotFoundError | NetworkError> => ...
// 处理预期错误
const handled = pipe(
fetchUser("123"),
Effect.catchTag("UserNotFoundError", (e) => Effect.succeed(defaultUser)),
Effect.catchTag("NetworkError", (e) => Effect.retry(schedule))
);
// 意外错误(缺陷)- 由运行时捕获
Effect.catchAllDefect(program, (defect) =>
Console.error("Unexpected error", defect)
);Fibers & Cancellation (Critical for MCP)
Fiber与中断(对MCP至关重要)
Fibers are lightweight virtual threads with native interruption:
typescript
// Fork a fiber
const fiber = yield* Effect.fork(longRunningTask);
// Interrupt it (e.g., when MCP client disconnects)
yield* Fiber.interrupt(fiber);
// Structured concurrency: child fibers auto-terminate with parent
const parent = Effect.gen(function* () {
yield* Effect.fork(backgroundTask); // Auto-interrupted when parent ends
yield* mainTask;
});
// Daemon fibers outlive their parent
yield* Effect.forkDaemon(longLivedBackgroundTask);Fiber是具备原生中断能力的轻量级虚拟线程:
typescript
// 启动一个Fiber
const fiber = yield* Effect.fork(longRunningTask);
// 中断它(例如当MCP客户端断开连接时)
yield* Fiber.interrupt(fiber);
// 结构化并发:子Fiber随父Fiber自动终止
const parent = Effect.gen(function* () {
yield* Effect.fork(backgroundTask); // 父Fiber结束时自动中断
yield* mainTask;
});
// 守护Fiber的生命周期超过父Fiber
yield* Effect.forkDaemon(longLivedBackgroundTask);Concurrency Primitives
并发原语
Effect.race - First Wins, Losers Interrupted
Effect.race - 先完成者获胜,失败者自动中断
typescript
// First to succeed wins; other is automatically interrupted
const result = yield* Effect.race(
fetchFromCache,
fetchFromDatabase
);typescript
// 先成功的任务获胜;另一个会自动中断
const result = yield* Effect.race(
fetchFromCache,
fetchFromDatabase
);Effect.all with Concurrency Control
带并发控制的Effect.all
typescript
// Process 50 documents with max 5 concurrent
const results = yield* Effect.all(documents.map(processDoc), {
concurrency: 5 // NOT a "worker pool" - limits concurrent tasks
});typescript
// 处理50个文档,最大并发数为5
const results = yield* Effect.all(documents.map(processDoc), {
concurrency: 5 // 不是“工作池” - 仅限制并发任务数
});Queue Types
队列类型
typescript
// Bounded - applies back-pressure (offer suspends when full)
const bounded = yield* Queue.bounded<string>(100);
// Dropping - discards new items when full (no back-pressure)
const dropping = yield* Queue.dropping<string>(100);
// Sliding - discards oldest items when full (no back-pressure)
const sliding = yield* Queue.sliding<string>(100);typescript
// 有界队列 - 应用背压(队列满时offer操作会挂起)
const bounded = yield* Queue.bounded<string>(100);
// 丢弃队列 - 队列满时丢弃新项(无背压)
const dropping = yield* Queue.dropping<string>(100);
// 滑动队列 - 队列满时丢弃最旧项(无背压)
const sliding = yield* Queue.sliding<string>(100);Layers for Dependency Injection
用于依赖注入的Layer
Layers construct services without leaking dependencies:
typescript
// Define a service
class Database extends Context.Tag("Database")<
Database,
{ query: (sql: string) => Effect.Effect<Result> }
>() {}
// Create layer (dependencies handled at construction)
const DatabaseLive = Layer.effect(
Database,
Effect.gen(function* () {
const config = yield* Config; // Dependency injected here
return {
query: (sql) => Effect.tryPromise(() => runQuery(sql, config))
};
})
);
// Provide to program
const runnable = program.pipe(Effect.provide(DatabaseLive));
// For testing - swap implementation
const DatabaseTest = Layer.succeed(Database, {
query: () => Effect.succeed(mockResult)
});Layer可在不暴露依赖的情况下构造服务:
typescript
// 定义服务
class Database extends Context.Tag("Database")<
Database,
{ query: (sql: string) => Effect.Effect<Result> }
>() {}
// 创建Layer(依赖在构造时处理)
const DatabaseLive = Layer.effect(
Database,
Effect.gen(function* () {
const config = yield* Config; // 此处注入依赖
return {
query: (sql) => Effect.tryPromise(() => runQuery(sql, config))
};
})
);
// 为程序提供依赖
const runnable = program.pipe(Effect.provide(DatabaseLive));
// 测试用 - 替换实现
const DatabaseTest = Layer.succeed(Database, {
query: () => Effect.succeed(mockResult)
});Resource Management
资源管理
Effect.ensuring - Always Runs Finalizer
Effect.ensuring - 始终运行终结器
typescript
const program = pipe(
Effect.tryPromise(() => openConnection()),
Effect.ensuring(Console.log("Cleanup")) // Runs on success, failure, OR interrupt
);typescript
const program = pipe(
Effect.tryPromise(() => openConnection()),
Effect.ensuring(Console.log("Cleanup")) // 无论成功、失败或中断都会运行
);acquireUseRelease Pattern
acquireUseRelease模式
typescript
const withConnection = Effect.acquireUseRelease(
Effect.tryPromise(() => db.connect()), // Acquire
(conn) => Effect.tryPromise(() => conn.query("SELECT *")), // Use
(conn) => Effect.promise(() => conn.close()) // Release (always runs)
);typescript
const withConnection = Effect.acquireUseRelease(
Effect.tryPromise(() => db.connect()), // 获取资源
(conn) => Effect.tryPromise(() => conn.query("SELECT *")), // 使用资源
(conn) => Effect.promise(() => conn.close()) // 释放资源(始终运行)
);Scope for Resource Lifecycle
用于资源生命周期的Scope
typescript
Effect.scoped(
Effect.gen(function* () {
const file = yield* openFile("data.txt"); // Acquired
const data = yield* file.read();
return data;
}) // File automatically released when scope closes
);typescript
Effect.scoped(
Effect.gen(function* () {
const file = yield* openFile("data.txt"); // 获取资源
const data = yield* file.read();
return data;
}) // Scope关闭时自动释放文件
);Caching
缓存
There is no . Use the Cache module:
Effect.cachedWithTTLtypescript
import { Cache } from "effect";
const cache = yield* Cache.make({
capacity: 100,
timeToLive: Duration.minutes(5),
lookup: (key: string) => fetchExpensiveData(key)
});
// Use the cache
const value = yield* cache.get("my-key");
// Invalidate
yield* cache.invalidate("my-key");
yield* cache.invalidateAll();不存在方法。 请使用Cache模块:
Effect.cachedWithTTLtypescript
import { Cache } from "effect";
const cache = yield* Cache.make({
capacity: 100,
timeToLive: Duration.minutes(5),
lookup: (key: string) => fetchExpensiveData(key)
});
// 使用缓存
const value = yield* cache.get("my-key");
// 失效缓存
yield* cache.invalidate("my-key");
yield* cache.invalidateAll();Retry with Schedule
结合Schedule的重试机制
typescript
import { Schedule } from "effect";
// Retry 3 times with exponential backoff
const policy = Schedule.exponential("100 millis").pipe(
Schedule.intersect(Schedule.recurs(3))
);
const robust = Effect.retry(unstableOperation, policy);
// Retry until condition
const untilSuccess = Effect.retry(operation, {
until: (err) => err.code === "RATE_LIMITED"
});typescript
import { Schedule } from "effect";
// 指数退避重试3次
const policy = Schedule.exponential("100 millis").pipe(
Schedule.intersect(Schedule.recurs(3))
);
const robust = Effect.retry(unstableOperation, policy);
// 重试直到满足条件
const untilSuccess = Effect.retry(operation, {
until: (err) => err.code === "RATE_LIMITED"
});Schema & JSON Schema
Schema与JSON Schema
@effect/schematypescript
import { Schema, JSONSchema } from "@effect/schema";
const User = Schema.Struct({
id: Schema.String,
age: Schema.Number.pipe(Schema.positive())
});
// Generate JSON Schema (Draft-07)
const jsonSchema = JSONSchema.make(User);
// { "$schema": "http://json-schema.org/draft-07/schema#", ... }
// Decode (parse)
const user = Schema.decodeUnknownSync(User)(rawData);
// Encode
const json = Schema.encodeSync(User)(user);@effect/schematypescript
import { Schema, JSONSchema } from "@effect/schema";
const User = Schema.Struct({
id: Schema.String,
age: Schema.Number.pipe(Schema.positive())
});
// 生成JSON Schema(Draft-07)
const jsonSchema = JSONSchema.make(User);
// { "$schema": "http://json-schema.org/draft-07/schema#", ... }
// 解码(解析)
const user = Schema.decodeUnknownSync(User)(rawData);
// 编码
const json = Schema.encodeSync(User)(user);Observability & OpenTelemetry
可观测性与OpenTelemetry
Effect has native OpenTelemetry integration:
typescript
import { NodeSdk } from "@effect/opentelemetry";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
// Add tracing to any effect
const traced = Effect.withSpan("processRequest")(myEffect);
// Logging with context
yield* Effect.log("Processing request");
yield* Effect.annotateLogs("requestId", "abc-123");
// FiberRef for fiber-local context propagation
const RequestId = FiberRef.unsafeMake<string>("");
yield* FiberRef.set(RequestId, "req-456");Effect具备原生OpenTelemetry集成:
typescript
import { NodeSdk } from "@effect/opentelemetry";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
// 为任意Effect添加追踪
const traced = Effect.withSpan("processRequest")(myEffect);
// 带上下文的日志
yield* Effect.log("Processing request");
yield* Effect.annotateLogs("requestId", "abc-123");
// 用于Fiber本地上下文传播的FiberRef
const RequestId = FiberRef.unsafeMake<string>("");
yield* FiberRef.set(RequestId, "req-456");When NOT to Use Effect
不建议使用Effect的场景
| Scenario | Recommendation |
|---|---|
| Simple MCP tool (< 100 LOC) | Use FastMCP or vanilla SDK |
| Team unfamiliar with FP | Steep learning curve; consider NestJS |
| Bundle size critical | Effect adds 15-25kb gzipped minimum |
| Existing NestJS/TypeORM codebase | Impedance mismatch with class-based DI |
| 场景 | 建议 |
|---|---|
| 简单MCP工具(代码量<100行) | 使用FastMCP或原生SDK |
| 团队不熟悉函数式编程 | 学习曲线陡峭;可考虑NestJS |
| 包体积要求严格 | Effect最小压缩后体积为15-25kb |
| 已有NestJS/TypeORM代码库 | 与基于类的依赖注入存在阻抗不匹配 |
MCP Server Patterns
MCP服务器模式
Tool Handler with Typed Errors
带类型化错误的工具处理器
typescript
const searchTool = Effect.gen(function* () {
const args = yield* parseArgs(input);
const db = yield* Database;
const results = yield* db.query(args.query);
return formatResults(results);
}).pipe(
Effect.catchTag("ParseError", () =>
Effect.fail({ code: -32602, message: "Invalid params" })
),
Effect.catchTag("DatabaseError", () =>
Effect.fail({ code: -32603, message: "Internal error" })
)
);typescript
const searchTool = Effect.gen(function* () {
const args = yield* parseArgs(input);
const db = yield* Database;
const results = yield* db.query(args.query);
return formatResults(results);
}).pipe(
Effect.catchTag("ParseError", () =>
Effect.fail({ code: -32602, message: "Invalid params" })
),
Effect.catchTag("DatabaseError", () =>
Effect.fail({ code: -32603, message: "Internal error" })
)
);Request Scoping
请求作用域
typescript
// Each MCP request gets its own scope
const handleRequest = (request: MCPRequest) =>
Effect.scoped(
Effect.gen(function* () {
// Resources acquired here auto-release when request completes
const tempFile = yield* createTempFile();
const result = yield* processRequest(request, tempFile);
return result;
})
);typescript
// 每个MCP请求拥有独立作用域
const handleRequest = (request: MCPRequest) =>
Effect.scoped(
Effect.gen(function* () {
// 此处获取的资源会在请求完成时自动释放
const tempFile = yield* createTempFile();
const result = yield* processRequest(request, tempFile);
return result;
})
);