golang-concurrency-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseGo Concurrency Patterns (Production)
生产环境下的Go并发模式
Overview
概述
Go concurrency scales when goroutine lifetimes are explicit, cancellation is propagated with , and shared state is protected (channels or locks). Apply these patterns to build reliable services and avoid common failure modes: goroutine leaks, deadlocks, and data races.
context.Context当goroutine的生命周期明确、通过传播取消信号,且共享状态受到保护(通道或锁)时,Go的并发能力可以很好地扩展。应用这些模式来构建可靠的服务,避免常见的故障模式:goroutine泄漏、死锁和数据竞态。
context.ContextQuick Start
快速入门
Default building blocks
- Use to drive cancellation and deadlines.
context - Use for fan-out/fan-in with early abort.
errgroup.WithContext - Bound concurrency (avoid unbounded goroutines) with a semaphore or worker pool.
- Prefer immutable data; otherwise protect shared state with a mutex or make a single goroutine the owner.
Avoid
- Fire-and-forget goroutines in request handlers.
- inside hot loops.
time.After - Closing channels from the receiver side.
- Sharing mutable variables across goroutines without synchronization.
默认构建块
- 使用驱动取消操作和截止时间。
context - 使用实现带提前终止的扇入/扇出。
errgroup.WithContext - 利用信号量或工作池限制并发数(避免无界goroutine)。
- 优先使用不可变数据;否则使用互斥锁保护共享状态,或让单个goroutine拥有状态所有权。
需要避免的操作
- 在请求处理器中使用“一劳永逸”的goroutine。
- 在热循环中使用。
time.After - 从接收端关闭通道。
- 在不同goroutine间共享可变变量却不进行同步。
Core Concepts
核心概念
Goroutine lifecycle
Goroutine生命周期
Treat goroutines as resources with a clear owner and shutdown condition.
✅ Correct: stop goroutines via context
go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// do work
}
}
}()❌ Wrong: goroutine without a stop condition
go
go func() {
for {
doWork() // leaks forever
}
}()将goroutine视为拥有明确所有者和关闭条件的资源。
✅ 正确:通过上下文停止goroutine
go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// do work
}
}
}()❌ 错误:无停止条件的goroutine
go
go func() {
for {
doWork() // 永久泄漏
}
}()Channels vs mutexes (choose intentionally)
通道 vs 互斥锁(按需选择)
- Use channels to model ownership/serialization of state or to pipeline work.
- Use mutexes to protect shared in-memory state with simple read/write patterns.
✅ Correct: one goroutine owns the map
go
type req struct {
key string
reply chan<- int
}
func mapOwner(ctx context.Context, in <-chan req) {
m := map[string]int{}
for {
select {
case <-ctx.Done():
return
case r := <-in:
r.reply <- m[r.key]
}
}
}✅ Correct: mutex protects shared map
go
type SafeMap struct {
mu sync.RWMutex
m map[string]int
}
func (s *SafeMap) Get(k string) (int, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.m[k]
return v, ok
}- 使用通道来建模状态的所有权/序列化,或实现工作流水线。
- 使用互斥锁来保护具有简单读写模式的共享内存状态。
✅ 正确:单个goroutine拥有map
go
type req struct {
key string
reply chan<- int
}
func mapOwner(ctx context.Context, in <-chan req) {
m := map[string]int{}
for {
select {
case <-ctx.Done():
return
case r := <-in:
r.reply <- m[r.key]
}
}
}✅ 正确:互斥锁保护共享map
go
type SafeMap struct {
mu sync.RWMutex
m map[string]int
}
func (s *SafeMap) Get(k string) (int, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.m[k]
return v, ok
}Patterns
模式
1) Fan-out/fan-in with cancellation (errgroup
)
errgroup1) 带取消的扇入/扇出(errgroup
)
errgroupUse to run concurrent tasks, cancel siblings on error, and wait for completion.
errgroup.WithContext✅ Correct: cancel on first error
go
g, ctx := errgroup.WithContext(ctx)
for _, id := range ids {
id := id // capture
g.Go(func() error {
return process(ctx, id)
})
}
if err := g.Wait(); err != nil {
return err
}❌ Wrong: WaitGroup loses the first error and does not propagate cancellation
go
var wg sync.WaitGroup
for _, id := range ids {
wg.Add(1)
go func() {
defer wg.Done()
_ = process(context.Background(), id) // ignores caller ctx + captures id
}()
}
wg.Wait()使用运行并发任务,在出错时取消其他任务,并等待所有任务完成。
errgroup.WithContext✅ 正确:首个错误触发取消
go
g, ctx := errgroup.WithContext(ctx)
for _, id := range ids {
id := id // 捕获变量
g.Go(func() error {
return process(ctx, id)
})
}
if err := g.Wait(); err != nil {
return err
}❌ 错误:WaitGroup丢失首个错误且不传播取消信号
go
var wg sync.WaitGroup
for _, id := range ids {
wg.Add(1)
go func() {
defer wg.Done()
_ = process(context.Background(), id) // 忽略调用方ctx + 变量捕获问题
}()
}
wg.Wait()2) Bounded concurrency (semaphore pattern)
2) 有界并发(信号量模式)
Bound parallelism to prevent CPU/memory exhaustion and downstream overload.
✅ Correct: bounded fan-out
go
limit := make(chan struct{}, 8) // max 8 concurrent
g, ctx := errgroup.WithContext(ctx)
for _, id := range ids {
id := id
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case limit <- struct{}{}:
}
defer func() { <-limit }()
return process(ctx, id)
})
}
return g.Wait()限制并行数以防止CPU/内存耗尽和下游过载。
✅ 正确:有界扇出
go
limit := make(chan struct{}, 8) // 最大8个并发
g, ctx := errgroup.WithContext(ctx)
for _, id := range ids {
id := id
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case limit <- struct{}{}:
}
defer func() { <-limit }()
return process(ctx, id)
})
}
return g.Wait()3) Worker pool (durable throughput)
3) 工作池(稳定吞吐量)
Use a fixed number of workers for stable throughput and predictable resource usage.
✅ Correct: worker pool with context stop
go
type Job struct{ ID string }
func runPool(ctx context.Context, jobs <-chan Job, workers int) error {
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workers; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case j, ok := <-jobs:
if !ok {
return nil
}
if err := handleJob(ctx, j); err != nil {
return err
}
}
}
})
}
return g.Wait()
}使用固定数量的工作者以实现稳定的吞吐量和可预测的资源使用。
✅ 正确:带上下文停止的工作池
go
type Job struct{ ID string }
func runPool(ctx context.Context, jobs <-chan Job, workers int) error {
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workers; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case j, ok := <-jobs:
if !ok {
return nil
}
if err := handleJob(ctx, j); err != nil {
return err
}
}
}
})
}
return g.Wait()
}4) Pipeline stages (fan-out between stages)
4) 流水线阶段(阶段间扇出)
Prefer one-directional channels and close only from the sending side.
✅ Correct: sender closes
go
func stageA(ctx context.Context, out chan<- int) {
defer close(out)
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return
case out <- i:
}
}
}❌ Wrong: receiver closes
go
func stageB(in <-chan int) {
close(in) // compile error in<-chan; also wrong ownership model
}优先使用单向通道,且仅从发送端关闭通道。
✅ 正确:发送端关闭通道
go
func stageA(ctx context.Context, out chan<- int) {
defer close(out)
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return
case out <- i:
}
}
}❌ 错误:接收端关闭通道
go
func stageB(in <-chan int) {
close(in) // 编译错误(in是只读通道);同时违反所有权模型
}5) Periodic work without leaks (time.Ticker
vs time.After
)
time.Tickertime.After5) 无泄漏的周期性任务(time.Ticker
vs time.After
)
time.Tickertime.AfterUse for loops; avoid allocations in hot paths.
time.NewTickertime.After✅ Correct: ticker
go
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
poll()
}
}❌ Wrong: time.After in loop
go
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
poll()
}
}在循环中使用;避免在热路径中使用(会产生内存分配)。
time.NewTickertime.After✅ 正确:使用ticker
go
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
poll()
}
}❌ 错误:循环中使用time.After
go
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
poll()
}
}Decision Trees
决策树
Channel vs Mutex
通道 vs 互斥锁
- Need ownership/serialization (single writer, message passing) → use channel + owner goroutine
- Need shared cache/map with many readers and simple updates → use RWMutex
- Need simple counter with low contention → use atomic
- 需要所有权/序列化(单写者、消息传递)→ 使用通道 + 所有者goroutine
- 需要共享缓存/映射,且有大量读操作和简单更新→ 使用RWMutex
- 需要简单计数器且竞争低→ 使用atomic
WaitGroup vs errgroup
WaitGroup vs errgroup
- Need error propagation + sibling cancellation → use
errgroup.WithContext - Need only wait and errors are handled elsewhere → use
sync.WaitGroup
- 需要错误传播 + 同级任务取消→ 使用****
errgroup.WithContext - 仅需要等待,且错误在其他地方处理→ 使用****
sync.WaitGroup
Buffered vs unbuffered channel
缓冲通道 vs 无缓冲通道
- Need backpressure and synchronous handoff → use unbuffered
- Need burst absorption up to a known size → use buffered (size with intent)
- Unsure → start unbuffered and measure; add buffer only to remove known bottleneck
- 需要背压和同步交接→ 使用无缓冲通道
- 需要突发吸收且已知上限→ 使用缓冲通道(大小需合理设置)
- 不确定→ 从无缓冲通道开始,测量后仅在消除已知瓶颈时添加缓冲
Testing & Verification
测试与验证
Race detector and flake control
竞态检测器与flake控制
Run targeted tests with the race detector and disable caching during debugging:
bash
go test -race ./...
go test -run TestName -race -count=1 ./...使用竞态检测器运行针对性测试,调试期间禁用缓存:
bash
go test -race ./...
go test -run TestName -race -count=1 ./...Timeouts to prevent hanging tests
超时以防止测试挂起
✅ Correct: test-level timeout via context
go
func TestSomething(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := doThing(ctx); err != nil {
t.Fatal(err)
}
}✅ 正确:通过上下文设置测试级超时
go
func TestSomething(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := doThing(ctx); err != nil {
t.Fatal(err)
}
}Troubleshooting
故障排查
Symptom: deadlock (test hangs, goroutines blocked)
症状:死锁(测试挂起,goroutine阻塞)
Actions:
- Add timeouts () around blocking operations.
context.WithTimeout - Verify channel ownership: only the sender closes; receivers stop on .
ok == false - Check for missing release in semaphore patterns.
<-limit
解决措施:
- 在阻塞操作周围添加超时()。
context.WithTimeout - 验证通道所有权:仅发送端可关闭通道;接收端在时停止。
ok == false - 检查信号量模式中是否遗漏了释放操作。
<-limit
Symptom: data race (go test -race
reports)
go test -race症状:数据竞态(go test -race
报告)
go test -raceActions:
- Identify shared variables mutated by multiple goroutines.
- Add a mutex or convert to ownership model (single goroutine owns state).
- Avoid writing to captured loop variables.
解决措施:
- 识别被多个goroutine修改的共享变量。
- 添加互斥锁或转换为所有权模型(单个goroutine拥有状态)。
- 避免对循环捕获变量进行写入操作。
Symptom: goroutine leak (memory growth, slow shutdown)
症状:goroutine泄漏(内存增长、关闭缓慢)
Actions:
- Ensure every goroutine selects on .
ctx.Done() - Ensure is stopped and channels are closed by senders.
time.Ticker - Avoid inside request paths; propagate caller context.
context.Background()
解决措施:
- 确保每个goroutine都监听。
ctx.Done() - 确保被停止,且通道由发送端关闭。
time.Ticker - 避免在请求路径中使用;传播调用方的上下文。
context.Background()
Resources
资源
- Go Blog: Concurrency patterns: https://go.dev/blog/pipelines
- : https://pkg.go.dev/golang.org/x/sync/errgroup
errgroup - Go memory model: https://go.dev/ref/mem
- Go官方博客:并发模式:https://go.dev/blog/pipelines
- 文档:https://pkg.go.dev/golang.org/x/sync/errgroup
errgroup - Go内存模型:https://go.dev/ref/mem