golang-concurrency-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Go Concurrency Patterns (Production)

生产环境下的Go并发模式

Overview

概述

Go concurrency scales when goroutine lifetimes are explicit, cancellation is propagated with
context.Context
, and shared state is protected (channels or locks). Apply these patterns to build reliable services and avoid common failure modes: goroutine leaks, deadlocks, and data races.
当goroutine的生命周期明确、通过
context.Context
传播取消信号,且共享状态受到保护(通道或锁)时,Go的并发能力可以很好地扩展。应用这些模式来构建可靠的服务,避免常见的故障模式:goroutine泄漏、死锁和数据竞态。

Quick Start

快速入门

Default building blocks
  • Use
    context
    to drive cancellation and deadlines.
  • Use
    errgroup.WithContext
    for fan-out/fan-in with early abort.
  • Bound concurrency (avoid unbounded goroutines) with a semaphore or worker pool.
  • Prefer immutable data; otherwise protect shared state with a mutex or make a single goroutine the owner.
Avoid
  • Fire-and-forget goroutines in request handlers.
  • time.After
    inside hot loops.
  • Closing channels from the receiver side.
  • Sharing mutable variables across goroutines without synchronization.
默认构建块
  • 使用
    context
    驱动取消操作和截止时间。
  • 使用
    errgroup.WithContext
    实现带提前终止的扇入/扇出。
  • 利用信号量或工作池限制并发数(避免无界goroutine)。
  • 优先使用不可变数据;否则使用互斥锁保护共享状态,或让单个goroutine拥有状态所有权。
需要避免的操作
  • 在请求处理器中使用“一劳永逸”的goroutine。
  • 在热循环中使用
    time.After
  • 从接收端关闭通道。
  • 在不同goroutine间共享可变变量却不进行同步。

Core Concepts

核心概念

Goroutine lifecycle

Goroutine生命周期

Treat goroutines as resources with a clear owner and shutdown condition.
Correct: stop goroutines via context
go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
    ticker := time.NewTicker(250 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // do work
        }
    }
}()
Wrong: goroutine without a stop condition
go
go func() {
    for {
        doWork() // leaks forever
    }
}()
将goroutine视为拥有明确所有者和关闭条件的资源。
正确:通过上下文停止goroutine
go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
    ticker := time.NewTicker(250 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // do work
        }
    }
}()
错误:无停止条件的goroutine
go
go func() {
    for {
        doWork() // 永久泄漏
    }
}()

Channels vs mutexes (choose intentionally)

通道 vs 互斥锁(按需选择)

  • Use channels to model ownership/serialization of state or to pipeline work.
  • Use mutexes to protect shared in-memory state with simple read/write patterns.
Correct: one goroutine owns the map
go
type req struct {
    key   string
    reply chan<- int
}

func mapOwner(ctx context.Context, in <-chan req) {
    m := map[string]int{}
    for {
        select {
        case <-ctx.Done():
            return
        case r := <-in:
            r.reply <- m[r.key]
        }
    }
}
Correct: mutex protects shared map
go
type SafeMap struct {
    mu sync.RWMutex
    m  map[string]int
}

func (s *SafeMap) Get(k string) (int, bool) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    v, ok := s.m[k]
    return v, ok
}
  • 使用通道来建模状态的所有权/序列化,或实现工作流水线。
  • 使用互斥锁来保护具有简单读写模式的共享内存状态。
正确:单个goroutine拥有map
go
type req struct {
    key   string
    reply chan<- int
}

func mapOwner(ctx context.Context, in <-chan req) {
    m := map[string]int{}
    for {
        select {
        case <-ctx.Done():
            return
        case r := <-in:
            r.reply <- m[r.key]
        }
    }
}
正确:互斥锁保护共享map
go
type SafeMap struct {
    mu sync.RWMutex
    m  map[string]int
}

func (s *SafeMap) Get(k string) (int, bool) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    v, ok := s.m[k]
    return v, ok
}

Patterns

模式

1) Fan-out/fan-in with cancellation (
errgroup
)

1) 带取消的扇入/扇出(
errgroup

Use
errgroup.WithContext
to run concurrent tasks, cancel siblings on error, and wait for completion.
Correct: cancel on first error
go
g, ctx := errgroup.WithContext(ctx)

for _, id := range ids {
    id := id // capture
    g.Go(func() error {
        return process(ctx, id)
    })
}

if err := g.Wait(); err != nil {
    return err
}
Wrong: WaitGroup loses the first error and does not propagate cancellation
go
var wg sync.WaitGroup
for _, id := range ids {
    wg.Add(1)
    go func() {
        defer wg.Done()
        _ = process(context.Background(), id) // ignores caller ctx + captures id
    }()
}
wg.Wait()
使用
errgroup.WithContext
运行并发任务,在出错时取消其他任务,并等待所有任务完成。
正确:首个错误触发取消
go
g, ctx := errgroup.WithContext(ctx)

for _, id := range ids {
    id := id // 捕获变量
    g.Go(func() error {
        return process(ctx, id)
    })
}

if err := g.Wait(); err != nil {
    return err
}
错误:WaitGroup丢失首个错误且不传播取消信号
go
var wg sync.WaitGroup
for _, id := range ids {
    wg.Add(1)
    go func() {
        defer wg.Done()
        _ = process(context.Background(), id) // 忽略调用方ctx + 变量捕获问题
    }()
}
wg.Wait()

2) Bounded concurrency (semaphore pattern)

2) 有界并发(信号量模式)

Bound parallelism to prevent CPU/memory exhaustion and downstream overload.
Correct: bounded fan-out
go
limit := make(chan struct{}, 8) // max 8 concurrent
g, ctx := errgroup.WithContext(ctx)

for _, id := range ids {
    id := id
    g.Go(func() error {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case limit <- struct{}{}:
        }
        defer func() { <-limit }()

        return process(ctx, id)
    })
}

return g.Wait()
限制并行数以防止CPU/内存耗尽和下游过载。
正确:有界扇出
go
limit := make(chan struct{}, 8) // 最大8个并发
g, ctx := errgroup.WithContext(ctx)

for _, id := range ids {
    id := id
    g.Go(func() error {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case limit <- struct{}{}:
        }
        defer func() { <-limit }()

        return process(ctx, id)
    })
}

return g.Wait()

3) Worker pool (durable throughput)

3) 工作池(稳定吞吐量)

Use a fixed number of workers for stable throughput and predictable resource usage.
Correct: worker pool with context stop
go
type Job struct{ ID string }

func runPool(ctx context.Context, jobs <-chan Job, workers int) error {
    g, ctx := errgroup.WithContext(ctx)

    for i := 0; i < workers; i++ {
        g.Go(func() error {
            for {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case j, ok := <-jobs:
                    if !ok {
                        return nil
                    }
                    if err := handleJob(ctx, j); err != nil {
                        return err
                    }
                }
            }
        })
    }

    return g.Wait()
}
使用固定数量的工作者以实现稳定的吞吐量和可预测的资源使用。
正确:带上下文停止的工作池
go
type Job struct{ ID string }

func runPool(ctx context.Context, jobs <-chan Job, workers int) error {
    g, ctx := errgroup.WithContext(ctx)

    for i := 0; i < workers; i++ {
        g.Go(func() error {
            for {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case j, ok := <-jobs:
                    if !ok {
                        return nil
                    }
                    if err := handleJob(ctx, j); err != nil {
                        return err
                    }
                }
            }
        })
    }

    return g.Wait()
}

4) Pipeline stages (fan-out between stages)

4) 流水线阶段(阶段间扇出)

Prefer one-directional channels and close only from the sending side.
Correct: sender closes
go
func stageA(ctx context.Context, out chan<- int) {
    defer close(out)
    for i := 0; i < 10; i++ {
        select {
        case <-ctx.Done():
            return
        case out <- i:
        }
    }
}
Wrong: receiver closes
go
func stageB(in <-chan int) {
    close(in) // compile error in<-chan; also wrong ownership model
}
优先使用单向通道,且仅从发送端关闭通道。
正确:发送端关闭通道
go
func stageA(ctx context.Context, out chan<- int) {
    defer close(out)
    for i := 0; i < 10; i++ {
        select {
        case <-ctx.Done():
            return
        case out <- i:
        }
    }
}
错误:接收端关闭通道
go
func stageB(in <-chan int) {
    close(in) // 编译错误(in是只读通道);同时违反所有权模型
}

5) Periodic work without leaks (
time.Ticker
vs
time.After
)

5) 无泄漏的周期性任务(
time.Ticker
vs
time.After

Use
time.NewTicker
for loops; avoid
time.After
allocations in hot paths.
Correct: ticker
go
t := time.NewTicker(1 * time.Second)
defer t.Stop()

for {
    select {
    case <-ctx.Done():
        return
    case <-t.C:
        poll()
    }
}
Wrong: time.After in loop
go
for {
    select {
    case <-ctx.Done():
        return
    case <-time.After(1 * time.Second):
        poll()
    }
}
在循环中使用
time.NewTicker
;避免在热路径中使用
time.After
(会产生内存分配)。
正确:使用ticker
go
t := time.NewTicker(1 * time.Second)
defer t.Stop()

for {
    select {
    case <-ctx.Done():
        return
    case <-t.C:
        poll()
    }
}
错误:循环中使用time.After
go
for {
    select {
    case <-ctx.Done():
        return
    case <-time.After(1 * time.Second):
        poll()
    }
}

Decision Trees

决策树

Channel vs Mutex

通道 vs 互斥锁

  • Need ownership/serialization (single writer, message passing) → use channel + owner goroutine
  • Need shared cache/map with many readers and simple updates → use RWMutex
  • Need simple counter with low contention → use atomic
  • 需要所有权/序列化(单写者、消息传递)→ 使用通道 + 所有者goroutine
  • 需要共享缓存/映射,且有大量读操作和简单更新→ 使用RWMutex
  • 需要简单计数器且竞争低→ 使用atomic

WaitGroup vs errgroup

WaitGroup vs errgroup

  • Need error propagation + sibling cancellation → use
    errgroup.WithContext
  • Need only wait and errors are handled elsewhere → use
    sync.WaitGroup
  • 需要错误传播 + 同级任务取消→ 使用**
    errgroup.WithContext
    **
  • 仅需要等待,且错误在其他地方处理→ 使用**
    sync.WaitGroup
    **

Buffered vs unbuffered channel

缓冲通道 vs 无缓冲通道

  • Need backpressure and synchronous handoff → use unbuffered
  • Need burst absorption up to a known size → use buffered (size with intent)
  • Unsure → start unbuffered and measure; add buffer only to remove known bottleneck
  • 需要背压和同步交接→ 使用无缓冲通道
  • 需要突发吸收且已知上限→ 使用缓冲通道(大小需合理设置)
  • 不确定→ 从无缓冲通道开始,测量后仅在消除已知瓶颈时添加缓冲

Testing & Verification

测试与验证

Race detector and flake control

竞态检测器与flake控制

Run targeted tests with the race detector and disable caching during debugging:
bash
go test -race ./...
go test -run TestName -race -count=1 ./...
使用竞态检测器运行针对性测试,调试期间禁用缓存:
bash
go test -race ./...
go test -run TestName -race -count=1 ./...

Timeouts to prevent hanging tests

超时以防止测试挂起

Correct: test-level timeout via context
go
func TestSomething(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    if err := doThing(ctx); err != nil {
        t.Fatal(err)
    }
}
正确:通过上下文设置测试级超时
go
func TestSomething(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    if err := doThing(ctx); err != nil {
        t.Fatal(err)
    }
}

Troubleshooting

故障排查

Symptom: deadlock (test hangs, goroutines blocked)

症状:死锁(测试挂起,goroutine阻塞)

Actions:
  • Add timeouts (
    context.WithTimeout
    ) around blocking operations.
  • Verify channel ownership: only the sender closes; receivers stop on
    ok == false
    .
  • Check for missing
    <-limit
    release in semaphore patterns.
解决措施:
  • 在阻塞操作周围添加超时(
    context.WithTimeout
    )。
  • 验证通道所有权:仅发送端可关闭通道;接收端在
    ok == false
    时停止。
  • 检查信号量模式中是否遗漏了
    <-limit
    释放操作。

Symptom: data race (
go test -race
reports)

症状:数据竞态(
go test -race
报告)

Actions:
  • Identify shared variables mutated by multiple goroutines.
  • Add a mutex or convert to ownership model (single goroutine owns state).
  • Avoid writing to captured loop variables.
解决措施:
  • 识别被多个goroutine修改的共享变量。
  • 添加互斥锁或转换为所有权模型(单个goroutine拥有状态)。
  • 避免对循环捕获变量进行写入操作。

Symptom: goroutine leak (memory growth, slow shutdown)

症状:goroutine泄漏(内存增长、关闭缓慢)

Actions:
  • Ensure every goroutine selects on
    ctx.Done()
    .
  • Ensure
    time.Ticker
    is stopped and channels are closed by senders.
  • Avoid
    context.Background()
    inside request paths; propagate caller context.
解决措施:
  • 确保每个goroutine都监听
    ctx.Done()
  • 确保
    time.Ticker
    被停止,且通道由发送端关闭。
  • 避免在请求路径中使用
    context.Background()
    ;传播调用方的上下文。

Resources

资源