elixir-otp
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseOTP Patterns
OTP模式
Expert guidance for process design, supervision, and concurrency in Elixir/OTP.
针对Elixir/OTP中的进程设计、监督和并发场景的专业指导。
The Golden Rule
黄金准则
Database is the source of truth for domain entities. Processes are for infrastructure.
Don't reach for a GenServer to hold domain state (users, orders, tasks). Use PostgreSQL. Use processes for:
- Connection pools
- Caches (ETS)
- Rate limiters
- PubSub / event buses
- Background workers
- Real-time session state
elixir
undefined数据库是领域实体的唯一可信数据源,进程仅用于基础设施相关场景。
不要使用GenServer存储领域状态(用户、订单、任务等),请使用PostgreSQL。进程的适用场景包括:
- 连接池
- 缓存(ETS)
- 限流器
- PubSub / 事件总线
- 后台 worker
- 实时会话状态
elixir
undefined❌ Bad: GenServer for domain entity
❌ 错误示例:用GenServer存储领域实体
defmodule MyApp.TaskServer do
use GenServer
Holds task state in process memory
Lost on crash, hard to query, doesn't scale
end
defmodule MyApp.TaskServer do
use GenServer
将任务状态存储在进程内存中
崩溃时会丢失数据、难以查询、无法扩展
end
✅ Good: Database for domain, process for infrastructure
✅ 正确示例:数据库存领域数据,进程处理基础设施逻辑
defmodule MyApp.Tasks do
def get_task!(id), do: Repo.get!(Task, id)
def update_task(task, attrs), do: task |> Task.changeset(attrs) |> Repo.update()
end
defmodule MyApp.RateLimiter do
use GenServer
Rate limiting IS infrastructure — process is appropriate
end
undefineddefmodule MyApp.Tasks do
def get_task!(id), do: Repo.get!(Task, id)
def update_task(task, attrs), do: task |> Task.changeset(attrs) |> Repo.update()
end
defmodule MyApp.RateLimiter do
use GenServer
限流属于基础设施逻辑,适合用进程实现
end
undefinedWhen to Use What
各类组件选型指南
| Abstraction | Use When | Don't Use When |
|---|---|---|
| GenServer | Need stateful process with request/response | Just storing domain data |
| Agent | Simple state wrapper, no complex logic | Need handle_info, timeouts, or complex state |
| Task | One-off async work, fire-and-forget or await | Need persistent state or retries |
| Task.Supervisor | Concurrent tasks that might fail | Tasks must all succeed atomically |
| ETS | Fast concurrent reads, shared cache | Data must survive node restart |
| Registry | Dynamic process lookup by name | Static, known-at-compile-time processes |
| Oban | Reliable background jobs with retries | Simple in-process async work |
| No process | Pure functions, Repo calls, pipelines | — |
| 抽象组件 | 适用场景 | 不适用场景 |
|---|---|---|
| GenServer | 需要具备请求/响应能力的有状态进程 | 仅用于存储领域数据 |
| Agent | 简单状态封装,无复杂逻辑 | 需要handle_info、超时处理或复杂状态管理 |
| Task | 一次性异步工作, fire-and-forget 或需要等待返回结果 | 需要持久化状态或重试能力 |
| Task.Supervisor | 可能出现失败的并发任务 | 要求所有任务原子性成功 |
| ETS | 高并发读、共享缓存场景 | 要求数据在节点重启后仍然留存 |
| Registry | 按名称动态查找进程 | 编译期就已知的静态进程 |
| Oban | 需要重试能力的可靠后台任务 | 简单的进程内异步工作 |
| 不使用进程 | 纯函数、Repo调用、管道处理 | — |
GenServer
GenServer
Use for stateful infrastructure with request/response semantics:
elixir
defmodule MyApp.Cache do
use GenServer
# Client API — called by other processes
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
ttl = Keyword.get(opts, :ttl, :timer.minutes(5))
GenServer.start_link(__MODULE__, %{ttl: ttl}, name: name)
end
def get(server, key) do
GenServer.call(server, {:get, key})
end
def put(server, key, value) do
GenServer.cast(server, {:put, key, value})
end
# Server callbacks
@impl true
def init(state) do
schedule_cleanup()
{:ok, Map.put(state, :store, %{})}
end
@impl true
def handle_call({:get, key}, _from, state) do
case Map.get(state.store, key) do
{value, expires_at} when expires_at > System.monotonic_time(:millisecond) ->
{:reply, {:ok, value}, state}
_ ->
{:reply, :miss, state}
end
end
@impl true
def handle_cast({:put, key, value}, state) do
expires_at = System.monotonic_time(:millisecond) + state.ttl
{:noreply, put_in(state, [:store, key], {value, expires_at})}
end
@impl true
def handle_info(:cleanup, state) do
now = System.monotonic_time(:millisecond)
store = Map.reject(state.store, fn {_k, {_v, exp}} -> exp <= now end)
schedule_cleanup()
{:noreply, %{state | store: store}}
end
defp schedule_cleanup, do: Process.send_after(self(), :cleanup, :timer.minutes(1))
end适用于具备请求/响应语义的有状态基础设施:
elixir
defmodule MyApp.Cache do
use GenServer
# 客户端API — 供其他进程调用
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
ttl = Keyword.get(opts, :ttl, :timer.minutes(5))
GenServer.start_link(__MODULE__, %{ttl: ttl}, name: name)
end
def get(server, key) do
GenServer.call(server, {:get, key})
end
def put(server, key, value) do
GenServer.cast(server, {:put, key, value})
end
# 服务端回调
@impl true
def init(state) do
schedule_cleanup()
{:ok, Map.put(state, :store, %{})}
end
@impl true
def handle_call({:get, key}, _from, state) do
case Map.get(state.store, key) do
{value, expires_at} when expires_at > System.monotonic_time(:millisecond) ->
{:reply, {:ok, value}, state}
_ ->
{:reply, :miss, state}
end
end
@impl true
def handle_cast({:put, key, value}, state) do
expires_at = System.monotonic_time(:millisecond) + state.ttl
{:noreply, put_in(state, [:store, key], {value, expires_at})}
end
@impl true
def handle_info(:cleanup, state) do
now = System.monotonic_time(:millisecond)
store = Map.reject(state.store, fn {_k, {_v, exp}} -> exp <= now end)
schedule_cleanup()
{:noreply, %{state | store: store}}
end
defp schedule_cleanup, do: Process.send_after(self(), :cleanup, :timer.minutes(1))
endGenServer Best Practices
GenServer最佳实践
- Keep fast — don't do heavy work while blocking the caller
handle_call - Use for fire-and-forget,
handle_castfor request/responsehandle_call - Use for self-sent messages, timers, and external messages
handle_info - Use for post-init work that shouldn't block
handle_continuestart_link - Always define on callbacks
@impl true - Return for clean shutdown
{:stop, reason, state}
elixir
undefined- 保持执行速度快——不要执行耗时操作阻塞调用方
handle_call - fire-and-forget 场景用,请求响应场景用
handle_casthandle_call - 处理进程自身发送的消息、定时器、外部消息时用
handle_info - 不应该阻塞的初始化后工作用
start_link处理handle_continue - 所有回调都要添加注解
@impl true - 优雅关闭时返回
{:stop, reason, state}
elixir
undefined✅ Good: Use handle_continue for expensive init
✅ 正确示例:用handle_continue处理耗时初始化逻辑
@impl true
def init(opts) do
{:ok, %{data: nil}, {:continue, :load_data}}
end
@impl true
def handle_continue(:load_data, state) do
data = expensive_load()
{:noreply, %{state | data: data}}
end
undefined@impl true
def init(opts) do
{:ok, %{data: nil}, {:continue, :load_data}}
end
@impl true
def handle_continue(:load_data, state) do
data = expensive_load()
{:noreply, %{state | data: data}}
end
undefinedAgent
Agent
Simple state wrapper — use when you just need get/update with no complex logic:
elixir
undefined简单状态封装——仅用于不需要复杂逻辑的状态读取/更新场景:
elixir
undefined✅ Good: Agent for simple shared counter
✅ 正确示例:用Agent实现简单的共享计数器
{:ok, counter} = Agent.start_link(fn -> 0 end, name: MyApp.Counter)
Agent.get(MyApp.Counter, & &1) # => 0
Agent.update(MyApp.Counter, &(&1 + 1)) # => :ok
Agent.get(MyApp.Counter, & &1) # => 1
{:ok, counter} = Agent.start_link(fn -> 0 end, name: MyApp.Counter)
Agent.get(MyApp.Counter, & &1) # => 0
Agent.update(MyApp.Counter, &(&1 + 1)) # => :ok
Agent.get(MyApp.Counter, & &1) # => 1
❌ Bad: Agent for complex logic — use GenServer instead
❌ 错误示例:在Agent中实现复杂逻辑 —— 请改用GenServer
Agent.update(agent, fn state ->
50 lines of complex business logic here...
This runs INSIDE the agent process, blocking all other callers
end)
undefinedAgent.update(agent, fn state ->
50行复杂业务逻辑写在这里...
这段代码会在Agent进程内部执行,阻塞所有其他调用方
end)
undefinedTask
Task
For one-off concurrent work:
elixir
undefined用于一次性并发工作:
elixir
undefinedFire-and-forget
Fire-and-forget 场景
Task.start(fn -> send_welcome_email(user) end)
Task.start(fn -> send_welcome_email(user) end)
Await result (with timeout)
等待返回结果(带超时)
task = Task.async(fn -> fetch_external_data(url) end)
result = Task.await(task, 5_000)
task = Task.async(fn -> fetch_external_data(url) end)
result = Task.await(task, 5_000)
Multiple concurrent tasks
多个并发任务
tasks = Enum.map(urls, fn url ->
Task.async(fn -> fetch(url) end)
end)
results = Task.await_many(tasks, 10_000)
undefinedtasks = Enum.map(urls, fn url ->
Task.async(fn -> fetch(url) end)
end)
results = Task.await_many(tasks, 10_000)
undefinedTask.Supervisor
Task.Supervisor
For tasks that might fail — isolates crashes from the caller:
elixir
undefined用于可能失败的任务,将崩溃与调用方隔离:
elixir
undefinedIn your supervision tree
在你的监督树中配置
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
Spawn tasks that can crash safely
安全启动可能崩溃的任务
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
send_notification(user) # If this crashes, caller is unaffected
end)
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
send_notification(user) # 即便这里崩溃,调用方也不会受影响
end)
Async with supervisor
结合Supervisor使用异步任务
task = Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
fetch_external_data(url)
end)
result = Task.await(task)
task = Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
fetch_external_data(url)
end)
result = Task.await(task)
Concurrent stream with backpressure
带背压的并发流处理
MyApp.TaskSupervisor
|> Task.Supervisor.async_stream(urls, &fetch/1, max_concurrency: 10, ordered: false)
|> Enum.reduce([], fn
{:ok, result}, acc -> [result | acc]
{:exit, _reason}, acc -> acc
end)
undefinedMyApp.TaskSupervisor
|> Task.Supervisor.async_stream(urls, &fetch/1, max_concurrency: 10, ordered: false)
|> Enum.reduce([], fn
{:ok, result}, acc -> [result | acc]
{:exit, _reason}, acc -> acc
end)
undefinedETS
ETS
Fast concurrent reads, shared across processes:
elixir
undefined高并发读,进程间共享:
elixir
undefined✅ Good: ETS for read-heavy cache
✅ 正确示例:用ETS实现读密集型缓存
defmodule MyApp.ConfigCache do
use GenServer
@table :config_cache
def start_link(_opts) do
GenServer.start_link(MODULE, [], name: MODULE)
end
def get(key) do
case :ets.lookup(@table, key) do
[{^key, value}] -> {:ok, value}
[] -> :error
end
end
def put(key, value) do
:ets.insert(@table, {key, value})
:ok
end
@impl true
def init(_) do
table = :ets.new(@table, [:set, :public, :named_table, read_concurrency: true])
{:ok, %{table: table}}
end
end
undefineddefmodule MyApp.ConfigCache do
use GenServer
@table :config_cache
def start_link(_opts) do
GenServer.start_link(MODULE, [], name: MODULE)
end
def get(key) do
case :ets.lookup(@table, key) do
[{^key, value}] -> {:ok, value}
[] -> :error
end
end
def put(key, value) do
:ets.insert(@table, {key, value})
:ok
end
@impl true
def init(_) do
table = :ets.new(@table, [:set, :public, :named_table, read_concurrency: true])
{:ok, %{table: table}}
end
end
undefinedETS vs GenServer State
ETS vs GenServer状态对比
| ETS | GenServer state | |
|---|---|---|
| Reads | Concurrent, no bottleneck | Serialized through process |
| Writes | Atomic per-row | Serialized (safe) |
| Survives crash | Only if heir set | No (state lost) |
| Query | Match specs, select | Full Elixir |
| Best for | Read-heavy cache, counters | Complex state machines |
| ETS | GenServer状态 | |
|---|---|---|
| 读性能 | 并发无瓶颈 | 通过进程串行处理 |
| 写性能 | 单行原子操作 | 串行处理(安全) |
| 崩溃后留存 | 仅设置heir时可以 | 否(状态丢失) |
| 查询能力 | 匹配规则、select语句 | 完整Elixir能力 |
| 最佳适用场景 | 读密集型缓存、计数器 | 复杂状态机 |
Registry
Registry
Dynamic process lookup by key:
elixir
undefined按键动态查找进程:
elixir
undefinedIn supervision tree
在监督树中配置
children = [
{Registry, keys: :unique, name: MyApp.Registry},
{DynamicSupervisor, name: MyApp.RoomSupervisor}
]
children = [
{Registry, keys: :unique, name: MyApp.Registry},
{DynamicSupervisor, name: MyApp.RoomSupervisor}
]
Start a named process dynamically
动态启动命名进程
def start_room(room_id) do
DynamicSupervisor.start_child(
MyApp.RoomSupervisor,
{MyApp.Room, room_id: room_id, name: via(room_id)}
)
end
def via(room_id) do
{:via, Registry, {MyApp.Registry, room_id}}
end
def start_room(room_id) do
DynamicSupervisor.start_child(
MyApp.RoomSupervisor,
{MyApp.Room, room_id: room_id, name: via(room_id)}
)
end
def via(room_id) do
{:via, Registry, {MyApp.Registry, room_id}}
end
Look up and call
查找并调用进程
def get_room_state(room_id) do
GenServer.call(via(room_id), :get_state)
end
undefineddef get_room_state(room_id) do
GenServer.call(via(room_id), :get_state)
end
undefinedSupervision Trees
监督树
Design Principles
设计原则
- One-for-one: Restart only the crashed child (default, most common)
- One-for-all: Restart all children if one crashes (tightly coupled)
- Rest-for-one: Restart crashed child and all children started after it
elixir
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Start order matters — dependencies first
MyApp.Repo, # Database
{Phoenix.PubSub, name: MyApp.PubSub}, # PubSub
MyApp.ConfigCache, # Cache (depends on nothing)
{Registry, keys: :unique, name: MyApp.Registry}, # Registry
{DynamicSupervisor, name: MyApp.RoomSupervisor}, # Dynamic processes
{Task.Supervisor, name: MyApp.TaskSupervisor}, # Task supervisor
{Oban, Application.fetch_env!(:my_app, Oban)}, # Background jobs
MyAppWeb.Endpoint, # Web server (last)
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endOne-for-one:仅重启崩溃的子进程(默认配置,最常用)
One-for-all:某个子进程崩溃时重启所有子进程(适用于紧耦合场景)
Rest-for-one:重启崩溃的子进程以及所有在它之后启动的子进程
elixir
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# 启动顺序很重要 —— 依赖项优先启动
MyApp.Repo, # 数据库
{Phoenix.PubSub, name: MyApp.PubSub}, # PubSub
MyApp.ConfigCache, # 缓存(无依赖)
{Registry, keys: :unique, name: MyApp.Registry}, # Registry
{DynamicSupervisor, name: MyApp.RoomSupervisor}, # 动态进程
{Task.Supervisor, name: MyApp.TaskSupervisor}, # Task监督器
{Oban, Application.fetch_env!(:my_app, Oban)}, # 后台任务
MyAppWeb.Endpoint, # Web服务(最后启动)
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endChild Spec
子进程规格
elixir
undefinedelixir
undefinedCustomize restart behavior
自定义重启行为
defmodule MyApp.CriticalWorker do
use GenServer, restart: :permanent # Always restart (default)
end
defmodule MyApp.OptionalWorker do
use GenServer, restart: :transient # Only restart on abnormal exit
end
defmodule MyApp.OneShot do
use GenServer, restart: :temporary # Never restart
end
undefineddefmodule MyApp.CriticalWorker do
use GenServer, restart: :permanent # 始终重启(默认配置)
end
defmodule MyApp.OptionalWorker do
use GenServer, restart: :transient # 仅在异常退出时重启
end
defmodule MyApp.OneShot do
use GenServer, restart: :temporary # 永不重启
end
undefinedBackground Jobs with Oban
用Oban实现后台任务
For reliable, persistent background work — NOT Task or GenServer:
elixir
undefined用于可靠、持久化的后台工作——不要用Task或GenServer实现这类需求:
elixir
undefinedDefine a worker
定义Worker
defmodule MyApp.Workers.SendEmail do
use Oban.Worker, queue: :mailers, max_attempts: 3
@impl Oban.Worker
def perform(%Oban.Job{args: %{"user_id" => user_id, "template" => template}}) do
user = Accounts.get_user!(user_id)
MyApp.Mailer.deliver(user, template)
:ok
end
end
defmodule MyApp.Workers.SendEmail do
use Oban.Worker, queue: :mailers, max_attempts: 3
@impl Oban.Worker
def perform(%Oban.Job{args: %{"user_id" => user_id, "template" => template}}) do
user = Accounts.get_user!(user_id)
MyApp.Mailer.deliver(user, template)
:ok
end
end
Enqueue a job
入队任务
%{user_id: user.id, template: "welcome"}
|> MyApp.Workers.SendEmail.new()
|> Oban.insert()
%{user_id: user.id, template: "welcome"}
|> MyApp.Workers.SendEmail.new()
|> Oban.insert()
Schedule for later
延迟调度
%{user_id: user.id, template: "reminder"}
|> MyApp.Workers.SendEmail.new(scheduled_at: DateTime.add(DateTime.utc_now(), 3600))
|> Oban.insert()
%{user_id: user.id, template: "reminder"}
|> MyApp.Workers.SendEmail.new(scheduled_at: DateTime.add(DateTime.utc_now(), 3600))
|> Oban.insert()
Unique jobs (prevent duplicates)
唯一任务(防止重复执行)
%{report_id: report.id}
|> MyApp.Workers.GenerateReport.new(unique: [period: 300, fields: [:args]])
|> Oban.insert()
undefined%{report_id: report.id}
|> MyApp.Workers.GenerateReport.new(unique: [period: 300, fields: [:args]])
|> Oban.insert()
undefinedWhen to Use Oban vs Task
Oban vs Task 选型对比
| Oban | Task | |
|---|---|---|
| Persisted | Yes (database) | No (in-memory) |
| Retries | Built-in with backoff | Manual |
| Survives deploy | Yes | No |
| Scheduling | Built-in | Manual with Process.send_after |
| Monitoring | Oban Web dashboard | None |
| Use for | Emails, reports, webhooks, imports | Quick async, fan-out, parallel fetch |
| Oban | Task | |
|---|---|---|
| 持久化 | 是(存储在数据库) | 否(内存中) |
| 重试能力 | 内置退避重试 | 需手动实现 |
| 部署后留存 | 是 | 否 |
| 调度能力 | 内置 | 需配合Process.send_after手动实现 |
| 监控能力 | Oban Web可视化面板 | 无 |
| 适用场景 | 邮件、报表、Webhook、数据导入 | 快速异步、扇出、并行拉取 |
When NOT to Use Processes
不要使用进程的场景
elixir
undefinedelixir
undefined❌ Don't use a process just to "hold" a value
❌ 不要为了“存储”一个值就使用进程
defmodule MyApp.CurrentUser do
use Agent
def start_link(user), do: Agent.start_link(fn -> user end)
def get(pid), do: Agent.get(pid, & &1)
end
defmodule MyApp.CurrentUser do
use Agent
def start_link(user), do: Agent.start_link(fn -> user end)
def get(pid), do: Agent.get(pid, & &1)
end
Just pass the user as a function argument!
直接把user作为函数参数传递就可以了!
❌ Don't use GenServer for sequential data transformation
❌ 不要用GenServer处理顺序数据转换
defmodule MyApp.DataPipeline do
use GenServer
def process(data), do: GenServer.call(MODULE, {:process, data})
def handle_call({:process, data}, _from, state) do
result = data |> step1() |> step2() |> step3()
{:reply, result, state}
end
end
defmodule MyApp.DataPipeline do
use GenServer
def process(data), do: GenServer.call(MODULE, {:process, data})
def handle_call({:process, data}, _from, state) do
result = data |> step1() |> step2() |> step3()
{:reply, result, state}
end
end
Just use a regular function pipeline!
直接用普通函数管道就可以了!
def process(data), do: data |> step1() |> step2() |> step3()
def process(data), do: data |> step1() |> step2() |> step3()
❌ Don't use processes for domain entities
❌ 不要用进程存储领域实体
Users, orders, tasks, etc. belong in the database
用户、订单、任务等数据应该存在数据库中
✅ DO use processes for:
✅ 进程的正确适用场景:
- Connection pools (Repo, HTTP clients)
- 连接池(Repo、HTTP客户端)
- Caches (ETS-backed GenServer)
- 缓存(基于ETS的GenServer)
- Rate limiters
- 限流器
- Real-time session state (LiveView, channels)
- 实时会话状态(LiveView、channels)
- Periodic work (GenServer with send_after)
- 周期性工作(带send_after的GenServer)
- Dynamic workers (DynamicSupervisor + Registry)
- 动态worker(DynamicSupervisor + Registry)
undefinedundefinedCommon Mistakes
常见错误
elixir
undefinedelixir
undefined❌ Don't call GenServer from within its own callbacks
❌ 不要在GenServer的回调内部调用自身
def handle_call(:get_data, _from, state) do
other = GenServer.call(self(), :other) # DEADLOCK!
{:reply, other, state}
end
def handle_call(:get_data, _from, state) do
other = GenServer.call(self(), :other) # 死锁!
{:reply, other, state}
end
❌ Don't do heavy work in handle_call (blocks all callers)
❌ 不要在handle_call中执行耗时操作(会阻塞所有调用方)
def handle_call(:generate_report, _from, state) do
report = generate_huge_report() # All other callers wait!
{:reply, report, state}
end
def handle_call(:generate_report, _from, state) do
report = generate_huge_report() # 所有其他调用方都会等待!
{:reply, report, state}
end
✅ Offload heavy work
✅ 卸载耗时工作到独立Task
def handle_call(:generate_report, from, state) do
Task.start(fn ->
report = generate_huge_report()
GenServer.reply(from, report)
end)
{:noreply, state}
end
def handle_call(:generate_report, from, state) do
Task.start(fn ->
report = generate_huge_report()
GenServer.reply(from, report)
end)
{:noreply, state}
end
❌ Don't forget to handle unexpected messages
❌ 不要忘记处理意外消息
Unhandled messages in handle_info will crash the GenServer in OTP 27+
OTP 27+中handle_info未处理的消息会导致GenServer崩溃
✅ Add a catch-all
✅ 添加兜底处理逻辑
@impl true
def handle_info(msg, state) do
Logger.warning("Unexpected message: #{inspect(msg)}")
{:noreply, state}
end
undefined@impl true
def handle_info(msg, state) do
Logger.warning("Unexpected message: #{inspect(msg)}")
{:noreply, state}
end
undefined