前面几篇我们讲了标准库的并发原语、atomic 和 Channel,掌握这些已经能解决 80% 的并发问题。但要进一步提升并发编程能力,还需要了解 扩展并发原语 和 分布式并发原语。这篇文章分两部分:上半部分讲 Go 官方和社区提供的进程内扩展原语(Semaphore、SingleFlight、ErrGroup、CyclicBarrier),下半部分讲基于 etcd 的分布式并发原语(Leader 选举、分布式锁、队列、栅栏、STM)。
上篇:扩展并发原语
一、Semaphore(信号量)
什么是信号量?
信号量(Semaphore)是荷兰计算机科学家 Edsger Dijkstra 在 1963 年提出的并发原语,用来 控制多个 goroutine 同时访问多个资源。
它的核心是一个计数器(0 到 n),表示当前可用的资源数量:
┌────────────────────────────────────────────────────────────────┐
│ 信号量模型 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 初始资源数 = n │
│ │
│ Acquire (P 操作):计数器 - 1 │
│ 计数器 > 0 → 成功获取,继续执行 │
│ 计数器 = 0 → 阻塞等待,直到有资源释放 │
│ │
│ Release (V 操作):计数器 + 1 │
│ 唤醒一个等待中的 goroutine │
│ │
└────────────────────────────────────────────────────────────────┘P/V 操作 的名称来自荷兰语:P(Proberen,尝试)减少信号量,V(Verhogen,增加)增加信号量。
当 n = 1 时,信号量就退化成了互斥锁。 所以 Mutex 本质上是信号量的特例。
golang.org/x/sync/semaphore
Go 官方扩展库提供了加权信号量 semaphore.Weighted:
import "golang.org/x/sync/semaphore"
// 创建一个容量为 10 的信号量
sem := semaphore.NewWeighted(10)
// 获取 1 个资源(阻塞直到有可用资源)
sem.Acquire(ctx, 1)
// 释放 1 个资源
sem.Release(1)
// 尝试获取(非阻塞,返回 bool)
if sem.TryAcquire(3) {
// 成功获取 3 个资源
}API 一览:
┌──────────────────────────────────────────────────────────────────┐
│ semaphore.Weighted │
├──────────────────────────────────────────────────────────────────┤
│ NewWeighted(n int64) *Weighted 创建信号量,容量为 n │
│ Acquire(ctx, n int64) error 获取 n 个资源(可阻塞) │
│ TryAcquire(n int64) bool 非阻塞尝试获取 │
│ Release(n int64) 释放 n 个资源 │
└──────────────────────────────────────────────────────────────────┘“加权” 是指一次可以获取/释放多个资源,而不只是 1 个。比如一个任务需要 3 个数据库连接,就 Acquire(ctx, 3)。
实战示例:限制并行 goroutine 数
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
const maxConcurrency = 3
sem := semaphore.NewWeighted(maxConcurrency)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 获取信号量,超时 5 秒
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("Worker %d: 获取信号量超时\n", id)
return
}
defer sem.Release(1)
fmt.Printf("Worker %d: 开始(并发数 ≤ %d)\n", id, maxConcurrency)
time.Sleep(time.Second) // 模拟工作
}(i)
}
wg.Wait()
}Semaphore vs 缓冲 Channel
在 Channel 那篇文章中,我们用缓冲 Channel 实现过信号量。两者对比:
| 特性 | semaphore.Weighted | 缓冲 Channel |
|---|---|---|
| 加权获取(一次多个) | ✅ Acquire(ctx, n) | ❌ 需要循环 |
| 超时/取消控制 | ✅ 通过 Context | ✅ 通过 select + timer |
| 非阻塞尝试 | ✅ TryAcquire | ✅ select + default |
| 适用场景 | 需要加权或复杂控制 | 简单的并发数限制 |
简单场景用缓冲 Channel,需要加权或 Context 集成时用 semaphore。
二、SingleFlight(请求合并)
解决什么问题?
假设有 1000 个 goroutine 同时请求同一个缓存 key,而这个 key 刚好过期(缓存击穿):
没有 SingleFlight: 有 SingleFlight:
goroutine 1 ──→ 查 DB goroutine 1 ──→ 查 DB
goroutine 2 ──→ 查 DB goroutine 2 ──→ 等待...
goroutine 3 ──→ 查 DB goroutine 3 ──→ 等待...
... ...
goroutine 1000 ──→ 查 DB goroutine 1000 ──→ 等待...
DB 压力: 1000 次查询 💀 goroutine 1 查完 ──→ 结果共享给所有人
DB 压力: 1 次查询 ✅SingleFlight 的核心能力:对同一个 key 的并发调用,只有一个会真正执行,其余等待并共享结果。
SingleFlight vs sync.Once
┌──────────────────────────────────────────────────────────────┐
│ sync.Once │
│ • 永远只执行一次,不管调用多少次 │
│ • 适合单例初始化 │
├──────────────────────────────────────────────────────────────┤
│ SingleFlight │
│ • 每次调用都会执行,但并发的同 key 调用只执行一次 │
│ • 上一轮执行完毕后,下一轮的并发请求又会重新执行 │
│ • 适合缓存击穿、请求合并 │
└──────────────────────────────────────────────────────────────┘基本用法
import "golang.org/x/sync/singleflight"
var g singleflight.Group
// Do:同 key 并发只执行一次
v, err, shared := g.Do("cache-key", func() (any, error) {
// 只有一个 goroutine 会执行这个函数
return queryDB("cache-key")
})
// shared == true 表示结果是和其他 goroutine 共享的
// DoChan:异步版本,返回 channel
ch := g.DoChan("cache-key", func() (any, error) {
return queryDB("cache-key")
})
result := <-ch
// Forget:删除某个 key,下一次调用会重新执行
g.Forget("cache-key")API 一览:
┌──────────────────────────────────────────────────────────────────────┐
│ singleflight.Group │
├──────────────────────────────────────────────────────────────────────┤
│ Do(key, fn) (v any, err error, shared bool) │
│ 同步执行,同 key 并发只让一个 fn 执行,其余等待共享结果 │
│ │
│ DoChan(key, fn) <-chan Result │
│ 异步版本,返回 channel,可配合 select 做超时控制 │
│ │
│ Forget(key) │
│ 移除 key,下一次 Do 会重新执行 fn │
└──────────────────────────────────────────────────────────────────────┘实战示例:防缓存击穿
var g singleflight.Group
var cache sync.Map
func getData(key string) (string, error) {
// 1. 先查缓存
if v, ok := cache.Load(key); ok {
return v.(string), nil
}
// 2. 缓存未命中,用 SingleFlight 合并请求
v, err, _ := g.Do(key, func() (any, error) {
// 只有一个 goroutine 会真正查 DB
result, err := queryDB(key)
if err != nil {
return nil, err
}
cache.Store(key, result) // 回填缓存
return result, nil
})
if err != nil {
return "", err
}
return v.(string), nil
}注意事项
- fn 出错时,错误也会共享给所有等待者——如果不希望一个 goroutine 的失败影响其他人,考虑用
Forget或者在 fn 内部做重试 - fn panic 时,所有等待者都会 panic——确保 fn 不会 panic
- DoChan + select 可以实现超时控制——避免等待时间过长
三、ErrGroup(分组任务编排)
解决什么问题?
将一个大任务拆成多个子任务并发执行,全部完成后汇总结果。和 WaitGroup 相比,ErrGroup 提供了 错误传播 和 Context 集成。
基本用法
import "golang.org/x/sync/errgroup"
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
return fetchUsers(ctx)
})
g.Go(func() error {
return fetchOrders(ctx)
})
g.Go(func() error {
return fetchProducts(ctx)
})
if err := g.Wait(); err != nil {
log.Fatal("子任务失败:", err)
} errgroup.WithContext(ctx)
│
├── g.Go(fetchUsers) ──→ goroutine 1
├── g.Go(fetchOrders) ──→ goroutine 2
└── g.Go(fetchProducts)──→ goroutine 3
│
│── g.Wait()
│ 阻塞直到所有子任务完成
│ 如果任一子任务返回 error → ctx 被 cancel → 其余子任务可感知
│ Wait 返回第一个 errorErrGroup vs WaitGroup
| 特性 | WaitGroup | ErrGroup |
|---|---|---|
| 等待全部完成 | ✅ | ✅ |
| 错误收集 | 需手动 + Mutex | ✅ 内置 |
| 任一失败取消其余 | ❌ | ✅ Context |
| 限制并发数 | ❌ | ✅ SetLimit |
| 无需手动 Add/Done | ❌ | ✅ g.Go() |
限制并发数(Go 1.20+)
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(5) // 最多 5 个子任务同时执行
for _, url := range urls {
u := url
g.Go(func() error {
return fetch(ctx, u) // 超过 5 个并发时会阻塞
})
}
if err := g.Wait(); err != nil {
log.Fatal(err)
}其他任务编排库
除了 ErrGroup,社区还有一些任务编排库:
┌─────────────┬────────────────────────────────────────────────────┐
│ 库 │ 特点 │
├─────────────┼────────────────────────────────────────────────────┤
│ errgroup │ 官方扩展,错误传播 + Context 取消 + 并发限制 │
│ gollback │ 提供 Race(任一成功即返回)和 All(全部执行) │
│ Hunch │ 提供 All/First/Retry/Waterfall 多种编排模式 │
│ schedgroup │ 可以延迟执行子任务,指定每个子任务的执行时间 │
└─────────────┴────────────────────────────────────────────────────┘大多数场景下 errgroup 足够了,其他库按需选用。
四、CyclicBarrier(循环栅栏)
什么是 CyclicBarrier?
CyclicBarrier 是一个 可重用的栅栏,让一组 goroutine 在某个点上互相等待,全部到达后同时继续执行。
goroutine 1 ──→ 到达栅栏 ──→ 等待...
goroutine 2 ──→ 到达栅栏 ──→ 等待...
goroutine 3 ──→ 到达栅栏 ──→ 等待... ← 全部到达,栅栏打开!
──→ 三个 goroutine 同时继续
──→ 栅栏自动重置,可以再用与 WaitGroup 的区别:
┌──────────────────────────────────────────────────────────────┐
│ WaitGroup:一方等待,多方通知 │
│ • 主 goroutine 等待,子 goroutine 通知完成 │
│ • 单向的,用完需要重新 Add │
├──────────────────────────────────────────────────────────────┤
│ CyclicBarrier:多方互相等待 │
│ • 所有 goroutine 都在栅栏处等待,全部到达后同时继续 │
│ • 双向的,自动重置,可反复使用 │
└──────────────────────────────────────────────────────────────┘使用(github.com/marusama/cyclicbarrier)
Go 标准库没有 CyclicBarrier,社区库 cyclicbarrier 提供了实现:
import "github.com/marusama/cyclicbarrier"
// 创建一个 3 参与者的栅栏
b := cyclicbarrier.New(3)
for i := 0; i < 3; i++ {
go func(id int) {
for round := 0; round < 5; round++ {
fmt.Printf("Worker %d: 第 %d 轮准备就绪\n", id, round)
b.Await(context.Background()) // 等待所有人到达
// 全部到达后继续,栅栏自动重置
fmt.Printf("Worker %d: 第 %d 轮开始执行\n", id, round)
}
}(i)
}第 0 轮:
Worker 0: 准备就绪 → Await (等待...)
Worker 2: 准备就绪 → Await (等待...)
Worker 1: 准备就绪 → Await ← 全部到达,栅栏打开!
Worker 0/1/2: 同时开始执行
第 1 轮(栅栏已自动重置):
Worker 1: 准备就绪 → Await (等待...)
Worker 0: 准备就绪 → Await (等待...)
Worker 2: 准备就绪 → Await ← 全部到达,栅栏打开!
Worker 0/1/2: 同时开始执行
...重复...API 一览:
┌──────────────────────────────────────────────────────────────────┐
│ cyclicbarrier │
├──────────────────────────────────────────────────────────────────┤
│ New(parties int) CyclicBarrier │
│ 创建栅栏,parties 为参与者数量 │
│ │
│ NewWithAction(parties int, f func() error) CyclicBarrier │
│ 全部到达时先执行 f,再放行(用于阶段间的汇总操作) │
│ │
│ Await(ctx context.Context) error │
│ 在栅栏处等待,支持 Context 超时/取消 │
│ │
│ Reset() │
│ 手动重置栅栏(Await 后会自动重置,一般不需要手动调用) │
└──────────────────────────────────────────────────────────────────┘典型场景:多轮迭代计算(如矩阵运算),每轮结束后需要所有 worker 同步,再开始下一轮。
下篇:基于 etcd 的分布式并发原语
前面讲的都是进程内的并发原语。当资源或任务分布在 不同进程、不同机器 上时,就需要分布式并发原语。常用的协调系统有 Zookeeper、etcd、Consul,其中 etcd 对 Go 生态支持最好,提供了丰富的分布式并发原语。
┌──────────────────────────────────────────────────────────────┐
│ etcd 分布式并发原语 │
├──────────────────────────────────────────────────────────────┤
│ go.etcd.io/etcd/client/v3/concurrency │
│ │
│ • Leader 选举 选出集群中的唯一主节点 │
│ • 分布式互斥锁 跨进程/跨机器的互斥访问 │
│ • 分布式读写锁 跨进程的读写锁 │
│ │
│ github.com/etcd-io/etcd/contrib/recipes │
│ │
│ • 分布式队列 跨节点的 FIFO 队列 │
│ • 分布式优先级队列 带优先级的分布式队列 │
│ • 分布式栅栏 跨节点的同步点 │
│ • STM 软件事务内存 │
│ │
└──────────────────────────────────────────────────────────────┘生产环境需要一个 7×24 的 etcd 集群;学习测试可以用单节点 etcd。
五、Leader 选举
什么是 Leader 选举?
主从架构中,需要确定哪个节点是主(Leader),哪个是从(Follower)。同一时刻只能有一个 Leader,否则多个节点同时执行写操作,会导致数据不一致。
正常运行: Leader 宕机后:
┌────────┐ ┌────────┐
│ Leader │ ← 执行写操作 │ ×宕机× │
└────┬───┘ └────────┘
│ │
┌────┴────┐ ┌────┴────┐
│Follower │ ← 执行读操作 │Follower │ ← 被选为新 Leader
│Follower │ │Follower │
└─────────┘ └─────────┘etcd 实现
import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// 连接 etcd
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
})
defer cli.Close()
// 创建 session(带 TTL 的租约,节点宕机后自动释放)
session, _ := concurrency.NewSession(cli, concurrency.WithTTL(10))
defer session.Close()
// 参与选举
e := concurrency.NewElection(session, "/my-service/leader")
// 竞选 Leader(阻塞直到当选)
if err := e.Campaign(ctx, "node-1"); err != nil {
log.Fatal(err)
}
fmt.Println("我是 Leader!")
// 查看当前 Leader
resp, _ := e.Leader(ctx)
fmt.Println("当前 Leader:", string(resp.Kvs[0].Value))
// 主动让位
e.Resign(ctx)┌───────────────────────────────────────────────────────────┐
│ Election API │
├───────────────────────────────────────────────────────────┤
│ NewElection(session, prefix) 创建选举实例 │
│ Campaign(ctx, val) 参与竞选(阻塞直到当选) │
│ Proclaim(ctx, val) Leader 更新自己的值 │
│ Resign(ctx) 主动让位 │
│ Leader(ctx) 查看当前 Leader │
│ Observe(ctx) <-chan Response 监听 Leader 变化 │
└───────────────────────────────────────────────────────────┘Session 的 TTL 机制:每个参与选举的节点通过 Session 维持一个租约(Lease)。如果节点宕机,租约过期,etcd 自动释放该节点的 Leader 身份,其他节点可以竞选成功。
六、分布式互斥锁与读写锁
分布式 Mutex
当多个进程/机器需要互斥访问同一个资源时:
session, _ := concurrency.NewSession(cli)
mu := concurrency.NewMutex(session, "/my-lock/")
// 获取锁(阻塞直到成功)
if err := mu.Lock(ctx); err != nil {
log.Fatal(err)
}
fmt.Println("获取到分布式锁")
// 临界区操作
doSomething()
// 释放锁
if err := mu.Unlock(ctx); err != nil {
log.Fatal(err)
}分布式 RWMutex
读多写少的分布式场景:
session, _ := concurrency.NewSession(cli)
rw := recipe.NewRWMutex(session, "/my-rwlock/")
// 读锁(多个节点可以同时持有)
rw.RLock(ctx)
readData()
rw.RUnlock(ctx)
// 写锁(独占)
rw.Lock(ctx)
writeData()
rw.Unlock(ctx)与进程内锁的对比
| 特性 | sync.Mutex | etcd 分布式 Mutex |
|---|---|---|
| 作用范围 | 单进程内 | 跨进程/跨机器 |
| 故障处理 | 进程崩 → 锁消失 | 节点崩 → TTL 过期自动释放 |
| 性能 | 纳秒级 | 毫秒级(网络 RTT) |
| 依赖 | 无 | etcd 集群 |
七、分布式队列
etcd 提供了 FIFO 队列和优先级队列。
FIFO 队列
import recipe "go.etcd.io/etcd/contrib/recipes"
q := recipe.NewQueue(cli, "/my-queue")
// 入队(可以在任意节点)
q.Enqueue("task-1")
q.Enqueue("task-2")
// 出队(可以在另一个节点)
val, _ := q.Dequeue() // "task-1"(FIFO 顺序)
// 如果队列为空,Dequeue 会阻塞等待优先级队列
pq := recipe.NewPriorityQueue(cli, "/my-pqueue")
// 入队,附带优先级(数字越小优先级越高)
pq.Enqueue("low-priority-task", 10)
pq.Enqueue("high-priority-task", 1)
val, _ := pq.Dequeue() // "high-priority-task"(优先级 1 先出)普通队列: 优先级队列:
入队顺序 → 出队顺序 入队 出队
task-1 → task-1 (10) low-task (1) high-task ← 先出
task-2 → task-2 (1) high-task (10) low-task
task-3 → task-3 (5) mid-task (5) mid-task八、分布式栅栏(Barrier)
etcd 提供了两种栅栏。
Barrier:阻塞/放行栅栏
由一个节点控制栅栏的开关,其他节点在栅栏处等待:
b := recipe.NewBarrier(cli, "/my-barrier")
// 节点 A:创建栅栏(阻塞其他节点)
b.Hold()
// 节点 B/C/D:在栅栏处等待
b.Wait() // 阻塞,直到栅栏被移除
// 节点 A:移除栅栏(放行所有等待者)
b.Release()DoubleBarrier:集合点栅栏
类似 CyclicBarrier——等待指定数量的节点都进入后才放行:
db := recipe.NewDoubleBarrier(session, "/my-dbarrier", 3) // 需要 3 个参与者
// 每个节点调用 Enter(阻塞直到 3 个节点都进入)
db.Enter() // 3 个节点都 Enter 后,全部放行
// 执行任务...
// 每个节点调用 Leave(阻塞直到 3 个节点都离开)
db.Leave() // 3 个节点都 Leave 后,全部继续节点 A: Enter (等待...)
节点 B: Enter (等待...)
节点 C: Enter ← 第 3 个到达,全部放行!
A/B/C 同时执行任务
节点 A: Leave (等待...)
节点 C: Leave (等待...)
节点 B: Leave ← 第 3 个到达,全部继续!九、STM(软件事务内存)
STM 提供了类似数据库事务的能力——对 etcd 中的多个 key 进行原子的读写操作:
import "go.etcd.io/etcd/client/v3/concurrency"
// 转账:从 A 扣款 100,给 B 加款 100(原子操作)
_, err := concurrency.NewSTM(cli, func(stm concurrency.STM) error {
balanceA := stm.Get("/account/A")
balanceB := stm.Get("/account/B")
a, _ := strconv.Atoi(balanceA)
b, _ := strconv.Atoi(balanceB)
if a < 100 {
return fmt.Errorf("余额不足")
}
stm.Put("/account/A", strconv.Itoa(a-100))
stm.Put("/account/B", strconv.Itoa(b+100))
return nil
})STM 的隔离级别:
┌──────────────────────────────────────────────────────────────┐
│ STM 隔离级别(通过 WithIsolation 选项设置) │
├──────────────────────────────────────────────────────────────┤
│ SerializableSnapshot 可串行化快照(默认,最严格) │
│ Serializable 可串行化 │
│ RepeatableReads 可重复读 │
│ ReadCommitted 读已提交(最宽松) │
└──────────────────────────────────────────────────────────────┘STM 内部使用 乐观锁 + 重试 机制:先读取所有需要的 key,执行事务逻辑,然后尝试提交。如果发现读取的 key 在此期间被其他事务修改了,就自动重试整个事务函数。
十、全景对比
| 原语 | 作用范围 | 典型场景 | 来源 |
|---|---|---|---|
| Semaphore | 单进程 | 限制并发度(加权) | golang.org/x/sync |
| SingleFlight | 单进程 | 缓存击穿、请求合并 | golang.org/x/sync |
| ErrGroup | 单进程 | 并行子任务 + 错误收集 | golang.org/x/sync |
| CyclicBarrier | 单进程 | 多轮迭代同步 | 社区库 |
| Leader 选举 | 分布式 | 主从架构选主 | etcd concurrency |
| 分布式 Mutex | 分布式 | 跨节点互斥访问 | etcd concurrency |
| 分布式 Queue | 分布式 | 跨节点任务队列 | etcd recipes |
| 分布式 Barrier | 分布式 | 跨节点同步点 | etcd recipes |
| STM | 分布式 | 多 key 原子操作 | etcd concurrency |
十一、实战建议
- 缓存击穿首选 SingleFlight——把 N 次并发 DB 查询合并为 1 次
- 并行子任务首选 ErrGroup——比 WaitGroup + Mutex 收集错误更简洁
- 加权并发控制用 Semaphore——一次获取多个资源,配合 Context 做超时
- 分布式场景选 etcd——Go 生态最成熟,API 最完善
- 分布式锁要设 TTL——防止持锁节点宕机导致死锁
- STM 事务函数必须幂等——因为冲突时会自动重试
# 两个应该加入 CI 的命令
go vet ./...
go test -race ./...标准库的并发原语解决 80% 的问题,扩展并发原语解决剩下的 15%,分布式并发原语解决最后的 5%。根据你的场景——单机还是分布式、简单还是复杂——选择合适层次的工具,不要杀鸡用牛刀,也不要拿菜刀去砍树。