elixir-otp

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

OTP Patterns

OTP模式

Expert guidance for process design, supervision, and concurrency in Elixir/OTP.
针对Elixir/OTP中的进程设计、监督和并发场景的专业指导。

The Golden Rule

黄金准则

Database is the source of truth for domain entities. Processes are for infrastructure.
Don't reach for a GenServer to hold domain state (users, orders, tasks). Use PostgreSQL. Use processes for:
  • Connection pools
  • Caches (ETS)
  • Rate limiters
  • PubSub / event buses
  • Background workers
  • Real-time session state
elixir
undefined
数据库是领域实体的唯一可信数据源,进程仅用于基础设施相关场景。
不要使用GenServer存储领域状态(用户、订单、任务等),请使用PostgreSQL。进程的适用场景包括:
  • 连接池
  • 缓存(ETS)
  • 限流器
  • PubSub / 事件总线
  • 后台 worker
  • 实时会话状态
elixir
undefined

❌ Bad: GenServer for domain entity

❌ 错误示例:用GenServer存储领域实体

defmodule MyApp.TaskServer do use GenServer

Holds task state in process memory

Lost on crash, hard to query, doesn't scale

end
defmodule MyApp.TaskServer do use GenServer

将任务状态存储在进程内存中

崩溃时会丢失数据、难以查询、无法扩展

end

✅ Good: Database for domain, process for infrastructure

✅ 正确示例:数据库存领域数据,进程处理基础设施逻辑

defmodule MyApp.Tasks do def get_task!(id), do: Repo.get!(Task, id) def update_task(task, attrs), do: task |> Task.changeset(attrs) |> Repo.update() end
defmodule MyApp.RateLimiter do use GenServer

Rate limiting IS infrastructure — process is appropriate

end
undefined
defmodule MyApp.Tasks do def get_task!(id), do: Repo.get!(Task, id) def update_task(task, attrs), do: task |> Task.changeset(attrs) |> Repo.update() end
defmodule MyApp.RateLimiter do use GenServer

限流属于基础设施逻辑,适合用进程实现

end
undefined

When to Use What

各类组件选型指南

AbstractionUse WhenDon't Use When
GenServerNeed stateful process with request/responseJust storing domain data
AgentSimple state wrapper, no complex logicNeed handle_info, timeouts, or complex state
TaskOne-off async work, fire-and-forget or awaitNeed persistent state or retries
Task.SupervisorConcurrent tasks that might failTasks must all succeed atomically
ETSFast concurrent reads, shared cacheData must survive node restart
RegistryDynamic process lookup by nameStatic, known-at-compile-time processes
ObanReliable background jobs with retriesSimple in-process async work
No processPure functions, Repo calls, pipelines
抽象组件适用场景不适用场景
GenServer需要具备请求/响应能力的有状态进程仅用于存储领域数据
Agent简单状态封装,无复杂逻辑需要handle_info、超时处理或复杂状态管理
Task一次性异步工作, fire-and-forget 或需要等待返回结果需要持久化状态或重试能力
Task.Supervisor可能出现失败的并发任务要求所有任务原子性成功
ETS高并发读、共享缓存场景要求数据在节点重启后仍然留存
Registry按名称动态查找进程编译期就已知的静态进程
Oban需要重试能力的可靠后台任务简单的进程内异步工作
不使用进程纯函数、Repo调用、管道处理

GenServer

GenServer

Use for stateful infrastructure with request/response semantics:
elixir
defmodule MyApp.Cache do
  use GenServer

  # Client API — called by other processes
  def start_link(opts) do
    name = Keyword.fetch!(opts, :name)
    ttl = Keyword.get(opts, :ttl, :timer.minutes(5))
    GenServer.start_link(__MODULE__, %{ttl: ttl}, name: name)
  end

  def get(server, key) do
    GenServer.call(server, {:get, key})
  end

  def put(server, key, value) do
    GenServer.cast(server, {:put, key, value})
  end

  # Server callbacks
  @impl true
  def init(state) do
    schedule_cleanup()
    {:ok, Map.put(state, :store, %{})}
  end

  @impl true
  def handle_call({:get, key}, _from, state) do
    case Map.get(state.store, key) do
      {value, expires_at} when expires_at > System.monotonic_time(:millisecond) ->
        {:reply, {:ok, value}, state}
      _ ->
        {:reply, :miss, state}
    end
  end

  @impl true
  def handle_cast({:put, key, value}, state) do
    expires_at = System.monotonic_time(:millisecond) + state.ttl
    {:noreply, put_in(state, [:store, key], {value, expires_at})}
  end

  @impl true
  def handle_info(:cleanup, state) do
    now = System.monotonic_time(:millisecond)
    store = Map.reject(state.store, fn {_k, {_v, exp}} -> exp <= now end)
    schedule_cleanup()
    {:noreply, %{state | store: store}}
  end

  defp schedule_cleanup, do: Process.send_after(self(), :cleanup, :timer.minutes(1))
end
适用于具备请求/响应语义的有状态基础设施:
elixir
defmodule MyApp.Cache do
  use GenServer

  # 客户端API — 供其他进程调用
  def start_link(opts) do
    name = Keyword.fetch!(opts, :name)
    ttl = Keyword.get(opts, :ttl, :timer.minutes(5))
    GenServer.start_link(__MODULE__, %{ttl: ttl}, name: name)
  end

  def get(server, key) do
    GenServer.call(server, {:get, key})
  end

  def put(server, key, value) do
    GenServer.cast(server, {:put, key, value})
  end

  # 服务端回调
  @impl true
  def init(state) do
    schedule_cleanup()
    {:ok, Map.put(state, :store, %{})}
  end

  @impl true
  def handle_call({:get, key}, _from, state) do
    case Map.get(state.store, key) do
      {value, expires_at} when expires_at > System.monotonic_time(:millisecond) ->
        {:reply, {:ok, value}, state}
      _ ->
        {:reply, :miss, state}
    end
  end

  @impl true
  def handle_cast({:put, key, value}, state) do
    expires_at = System.monotonic_time(:millisecond) + state.ttl
    {:noreply, put_in(state, [:store, key], {value, expires_at})}
  end

  @impl true
  def handle_info(:cleanup, state) do
    now = System.monotonic_time(:millisecond)
    store = Map.reject(state.store, fn {_k, {_v, exp}} -> exp <= now end)
    schedule_cleanup()
    {:noreply, %{state | store: store}}
  end

  defp schedule_cleanup, do: Process.send_after(self(), :cleanup, :timer.minutes(1))
end

GenServer Best Practices

GenServer最佳实践

  • Keep
    handle_call
    fast — don't do heavy work while blocking the caller
  • Use
    handle_cast
    for fire-and-forget,
    handle_call
    for request/response
  • Use
    handle_info
    for self-sent messages, timers, and external messages
  • Use
    handle_continue
    for post-init work that shouldn't block
    start_link
  • Always define
    @impl true
    on callbacks
  • Return
    {:stop, reason, state}
    for clean shutdown
elixir
undefined
  • 保持
    handle_call
    执行速度快——不要执行耗时操作阻塞调用方
  • fire-and-forget 场景用
    handle_cast
    ,请求响应场景用
    handle_call
  • 处理进程自身发送的消息、定时器、外部消息时用
    handle_info
  • 不应该阻塞
    start_link
    的初始化后工作用
    handle_continue
    处理
  • 所有回调都要添加
    @impl true
    注解
  • 优雅关闭时返回
    {:stop, reason, state}
elixir
undefined

✅ Good: Use handle_continue for expensive init

✅ 正确示例:用handle_continue处理耗时初始化逻辑

@impl true def init(opts) do {:ok, %{data: nil}, {:continue, :load_data}} end
@impl true def handle_continue(:load_data, state) do data = expensive_load() {:noreply, %{state | data: data}} end
undefined
@impl true def init(opts) do {:ok, %{data: nil}, {:continue, :load_data}} end
@impl true def handle_continue(:load_data, state) do data = expensive_load() {:noreply, %{state | data: data}} end
undefined

Agent

Agent

Simple state wrapper — use when you just need get/update with no complex logic:
elixir
undefined
简单状态封装——仅用于不需要复杂逻辑的状态读取/更新场景:
elixir
undefined

✅ Good: Agent for simple shared counter

✅ 正确示例:用Agent实现简单的共享计数器

{:ok, counter} = Agent.start_link(fn -> 0 end, name: MyApp.Counter) Agent.get(MyApp.Counter, & &1) # => 0 Agent.update(MyApp.Counter, &(&1 + 1)) # => :ok Agent.get(MyApp.Counter, & &1) # => 1
{:ok, counter} = Agent.start_link(fn -> 0 end, name: MyApp.Counter) Agent.get(MyApp.Counter, & &1) # => 0 Agent.update(MyApp.Counter, &(&1 + 1)) # => :ok Agent.get(MyApp.Counter, & &1) # => 1

❌ Bad: Agent for complex logic — use GenServer instead

❌ 错误示例:在Agent中实现复杂逻辑 —— 请改用GenServer

Agent.update(agent, fn state ->

50 lines of complex business logic here...

This runs INSIDE the agent process, blocking all other callers

end)
undefined
Agent.update(agent, fn state ->

50行复杂业务逻辑写在这里...

这段代码会在Agent进程内部执行,阻塞所有其他调用方

end)
undefined

Task

Task

For one-off concurrent work:
elixir
undefined
用于一次性并发工作:
elixir
undefined

Fire-and-forget

Fire-and-forget 场景

Task.start(fn -> send_welcome_email(user) end)
Task.start(fn -> send_welcome_email(user) end)

Await result (with timeout)

等待返回结果(带超时)

task = Task.async(fn -> fetch_external_data(url) end) result = Task.await(task, 5_000)
task = Task.async(fn -> fetch_external_data(url) end) result = Task.await(task, 5_000)

Multiple concurrent tasks

多个并发任务

tasks = Enum.map(urls, fn url -> Task.async(fn -> fetch(url) end) end) results = Task.await_many(tasks, 10_000)
undefined
tasks = Enum.map(urls, fn url -> Task.async(fn -> fetch(url) end) end) results = Task.await_many(tasks, 10_000)
undefined

Task.Supervisor

Task.Supervisor

For tasks that might fail — isolates crashes from the caller:
elixir
undefined
用于可能失败的任务,将崩溃与调用方隔离:
elixir
undefined

In your supervision tree

在你的监督树中配置

children = [ {Task.Supervisor, name: MyApp.TaskSupervisor} ]
children = [ {Task.Supervisor, name: MyApp.TaskSupervisor} ]

Spawn tasks that can crash safely

安全启动可能崩溃的任务

Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> send_notification(user) # If this crashes, caller is unaffected end)
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> send_notification(user) # 即便这里崩溃,调用方也不会受影响 end)

Async with supervisor

结合Supervisor使用异步任务

task = Task.Supervisor.async(MyApp.TaskSupervisor, fn -> fetch_external_data(url) end) result = Task.await(task)
task = Task.Supervisor.async(MyApp.TaskSupervisor, fn -> fetch_external_data(url) end) result = Task.await(task)

Concurrent stream with backpressure

带背压的并发流处理

MyApp.TaskSupervisor |> Task.Supervisor.async_stream(urls, &fetch/1, max_concurrency: 10, ordered: false) |> Enum.reduce([], fn {:ok, result}, acc -> [result | acc] {:exit, _reason}, acc -> acc end)
undefined
MyApp.TaskSupervisor |> Task.Supervisor.async_stream(urls, &fetch/1, max_concurrency: 10, ordered: false) |> Enum.reduce([], fn {:ok, result}, acc -> [result | acc] {:exit, _reason}, acc -> acc end)
undefined

ETS

ETS

Fast concurrent reads, shared across processes:
elixir
undefined
高并发读,进程间共享:
elixir
undefined

✅ Good: ETS for read-heavy cache

✅ 正确示例:用ETS实现读密集型缓存

defmodule MyApp.ConfigCache do use GenServer
@table :config_cache
def start_link(_opts) do GenServer.start_link(MODULE, [], name: MODULE) end
def get(key) do case :ets.lookup(@table, key) do [{^key, value}] -> {:ok, value} [] -> :error end end
def put(key, value) do :ets.insert(@table, {key, value}) :ok end
@impl true def init(_) do table = :ets.new(@table, [:set, :public, :named_table, read_concurrency: true]) {:ok, %{table: table}} end end
undefined
defmodule MyApp.ConfigCache do use GenServer
@table :config_cache
def start_link(_opts) do GenServer.start_link(MODULE, [], name: MODULE) end
def get(key) do case :ets.lookup(@table, key) do [{^key, value}] -> {:ok, value} [] -> :error end end
def put(key, value) do :ets.insert(@table, {key, value}) :ok end
@impl true def init(_) do table = :ets.new(@table, [:set, :public, :named_table, read_concurrency: true]) {:ok, %{table: table}} end end
undefined

ETS vs GenServer State

ETS vs GenServer状态对比

ETSGenServer state
ReadsConcurrent, no bottleneckSerialized through process
WritesAtomic per-rowSerialized (safe)
Survives crashOnly if heir setNo (state lost)
QueryMatch specs, selectFull Elixir
Best forRead-heavy cache, countersComplex state machines
ETSGenServer状态
读性能并发无瓶颈通过进程串行处理
写性能单行原子操作串行处理(安全)
崩溃后留存仅设置heir时可以否(状态丢失)
查询能力匹配规则、select语句完整Elixir能力
最佳适用场景读密集型缓存、计数器复杂状态机

Registry

Registry

Dynamic process lookup by key:
elixir
undefined
按键动态查找进程:
elixir
undefined

In supervision tree

在监督树中配置

children = [ {Registry, keys: :unique, name: MyApp.Registry}, {DynamicSupervisor, name: MyApp.RoomSupervisor} ]
children = [ {Registry, keys: :unique, name: MyApp.Registry}, {DynamicSupervisor, name: MyApp.RoomSupervisor} ]

Start a named process dynamically

动态启动命名进程

def start_room(room_id) do DynamicSupervisor.start_child( MyApp.RoomSupervisor, {MyApp.Room, room_id: room_id, name: via(room_id)} ) end
def via(room_id) do {:via, Registry, {MyApp.Registry, room_id}} end
def start_room(room_id) do DynamicSupervisor.start_child( MyApp.RoomSupervisor, {MyApp.Room, room_id: room_id, name: via(room_id)} ) end
def via(room_id) do {:via, Registry, {MyApp.Registry, room_id}} end

Look up and call

查找并调用进程

def get_room_state(room_id) do GenServer.call(via(room_id), :get_state) end
undefined
def get_room_state(room_id) do GenServer.call(via(room_id), :get_state) end
undefined

Supervision Trees

监督树

Design Principles

设计原则

  • One-for-one: Restart only the crashed child (default, most common)
  • One-for-all: Restart all children if one crashes (tightly coupled)
  • Rest-for-one: Restart crashed child and all children started after it
elixir
defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Start order matters — dependencies first
      MyApp.Repo,                                          # Database
      {Phoenix.PubSub, name: MyApp.PubSub},               # PubSub
      MyApp.ConfigCache,                                   # Cache (depends on nothing)
      {Registry, keys: :unique, name: MyApp.Registry},     # Registry
      {DynamicSupervisor, name: MyApp.RoomSupervisor},     # Dynamic processes
      {Task.Supervisor, name: MyApp.TaskSupervisor},       # Task supervisor
      {Oban, Application.fetch_env!(:my_app, Oban)},       # Background jobs
      MyAppWeb.Endpoint,                                   # Web server (last)
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
One-for-one:仅重启崩溃的子进程(默认配置,最常用) One-for-all:某个子进程崩溃时重启所有子进程(适用于紧耦合场景) Rest-for-one:重启崩溃的子进程以及所有在它之后启动的子进程
elixir
defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # 启动顺序很重要 —— 依赖项优先启动
      MyApp.Repo,                                          # 数据库
      {Phoenix.PubSub, name: MyApp.PubSub},               # PubSub
      MyApp.ConfigCache,                                   # 缓存(无依赖)
      {Registry, keys: :unique, name: MyApp.Registry},     # Registry
      {DynamicSupervisor, name: MyApp.RoomSupervisor},     # 动态进程
      {Task.Supervisor, name: MyApp.TaskSupervisor},       # Task监督器
      {Oban, Application.fetch_env!(:my_app, Oban)},       # 后台任务
      MyAppWeb.Endpoint,                                   # Web服务(最后启动)
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Child Spec

子进程规格

elixir
undefined
elixir
undefined

Customize restart behavior

自定义重启行为

defmodule MyApp.CriticalWorker do use GenServer, restart: :permanent # Always restart (default) end
defmodule MyApp.OptionalWorker do use GenServer, restart: :transient # Only restart on abnormal exit end
defmodule MyApp.OneShot do use GenServer, restart: :temporary # Never restart end
undefined
defmodule MyApp.CriticalWorker do use GenServer, restart: :permanent # 始终重启(默认配置) end
defmodule MyApp.OptionalWorker do use GenServer, restart: :transient # 仅在异常退出时重启 end
defmodule MyApp.OneShot do use GenServer, restart: :temporary # 永不重启 end
undefined

Background Jobs with Oban

用Oban实现后台任务

For reliable, persistent background work — NOT Task or GenServer:
elixir
undefined
用于可靠、持久化的后台工作——不要用Task或GenServer实现这类需求:
elixir
undefined

Define a worker

定义Worker

defmodule MyApp.Workers.SendEmail do use Oban.Worker, queue: :mailers, max_attempts: 3
@impl Oban.Worker def perform(%Oban.Job{args: %{"user_id" => user_id, "template" => template}}) do user = Accounts.get_user!(user_id) MyApp.Mailer.deliver(user, template) :ok end end
defmodule MyApp.Workers.SendEmail do use Oban.Worker, queue: :mailers, max_attempts: 3
@impl Oban.Worker def perform(%Oban.Job{args: %{"user_id" => user_id, "template" => template}}) do user = Accounts.get_user!(user_id) MyApp.Mailer.deliver(user, template) :ok end end

Enqueue a job

入队任务

%{user_id: user.id, template: "welcome"} |> MyApp.Workers.SendEmail.new() |> Oban.insert()
%{user_id: user.id, template: "welcome"} |> MyApp.Workers.SendEmail.new() |> Oban.insert()

Schedule for later

延迟调度

%{user_id: user.id, template: "reminder"} |> MyApp.Workers.SendEmail.new(scheduled_at: DateTime.add(DateTime.utc_now(), 3600)) |> Oban.insert()
%{user_id: user.id, template: "reminder"} |> MyApp.Workers.SendEmail.new(scheduled_at: DateTime.add(DateTime.utc_now(), 3600)) |> Oban.insert()

Unique jobs (prevent duplicates)

唯一任务(防止重复执行)

%{report_id: report.id} |> MyApp.Workers.GenerateReport.new(unique: [period: 300, fields: [:args]]) |> Oban.insert()
undefined
%{report_id: report.id} |> MyApp.Workers.GenerateReport.new(unique: [period: 300, fields: [:args]]) |> Oban.insert()
undefined

When to Use Oban vs Task

Oban vs Task 选型对比

ObanTask
PersistedYes (database)No (in-memory)
RetriesBuilt-in with backoffManual
Survives deployYesNo
SchedulingBuilt-inManual with Process.send_after
MonitoringOban Web dashboardNone
Use forEmails, reports, webhooks, importsQuick async, fan-out, parallel fetch
ObanTask
持久化是(存储在数据库)否(内存中)
重试能力内置退避重试需手动实现
部署后留存
调度能力内置需配合Process.send_after手动实现
监控能力Oban Web可视化面板
适用场景邮件、报表、Webhook、数据导入快速异步、扇出、并行拉取

When NOT to Use Processes

不要使用进程的场景

elixir
undefined
elixir
undefined

❌ Don't use a process just to "hold" a value

❌ 不要为了“存储”一个值就使用进程

defmodule MyApp.CurrentUser do use Agent def start_link(user), do: Agent.start_link(fn -> user end) def get(pid), do: Agent.get(pid, & &1) end
defmodule MyApp.CurrentUser do use Agent def start_link(user), do: Agent.start_link(fn -> user end) def get(pid), do: Agent.get(pid, & &1) end

Just pass the user as a function argument!

直接把user作为函数参数传递就可以了!

❌ Don't use GenServer for sequential data transformation

❌ 不要用GenServer处理顺序数据转换

defmodule MyApp.DataPipeline do use GenServer def process(data), do: GenServer.call(MODULE, {:process, data}) def handle_call({:process, data}, _from, state) do result = data |> step1() |> step2() |> step3() {:reply, result, state} end end
defmodule MyApp.DataPipeline do use GenServer def process(data), do: GenServer.call(MODULE, {:process, data}) def handle_call({:process, data}, _from, state) do result = data |> step1() |> step2() |> step3() {:reply, result, state} end end

Just use a regular function pipeline!

直接用普通函数管道就可以了!

def process(data), do: data |> step1() |> step2() |> step3()
def process(data), do: data |> step1() |> step2() |> step3()

❌ Don't use processes for domain entities

❌ 不要用进程存储领域实体

Users, orders, tasks, etc. belong in the database

用户、订单、任务等数据应该存在数据库中

✅ DO use processes for:

✅ 进程的正确适用场景:

- Connection pools (Repo, HTTP clients)

- 连接池(Repo、HTTP客户端)

- Caches (ETS-backed GenServer)

- 缓存(基于ETS的GenServer)

- Rate limiters

- 限流器

- Real-time session state (LiveView, channels)

- 实时会话状态(LiveView、channels)

- Periodic work (GenServer with send_after)

- 周期性工作(带send_after的GenServer)

- Dynamic workers (DynamicSupervisor + Registry)

- 动态worker(DynamicSupervisor + Registry)

undefined
undefined

Common Mistakes

常见错误

elixir
undefined
elixir
undefined

❌ Don't call GenServer from within its own callbacks

❌ 不要在GenServer的回调内部调用自身

def handle_call(:get_data, _from, state) do other = GenServer.call(self(), :other) # DEADLOCK! {:reply, other, state} end
def handle_call(:get_data, _from, state) do other = GenServer.call(self(), :other) # 死锁! {:reply, other, state} end

❌ Don't do heavy work in handle_call (blocks all callers)

❌ 不要在handle_call中执行耗时操作(会阻塞所有调用方)

def handle_call(:generate_report, _from, state) do report = generate_huge_report() # All other callers wait! {:reply, report, state} end
def handle_call(:generate_report, _from, state) do report = generate_huge_report() # 所有其他调用方都会等待! {:reply, report, state} end

✅ Offload heavy work

✅ 卸载耗时工作到独立Task

def handle_call(:generate_report, from, state) do Task.start(fn -> report = generate_huge_report() GenServer.reply(from, report) end) {:noreply, state} end
def handle_call(:generate_report, from, state) do Task.start(fn -> report = generate_huge_report() GenServer.reply(from, report) end) {:noreply, state} end

❌ Don't forget to handle unexpected messages

❌ 不要忘记处理意外消息

Unhandled messages in handle_info will crash the GenServer in OTP 27+

OTP 27+中handle_info未处理的消息会导致GenServer崩溃

✅ Add a catch-all

✅ 添加兜底处理逻辑

@impl true def handle_info(msg, state) do Logger.warning("Unexpected message: #{inspect(msg)}") {:noreply, state} end
undefined
@impl true def handle_info(msg, state) do Logger.warning("Unexpected message: #{inspect(msg)}") {:noreply, state} end
undefined