前面几篇我们讲了标准库的并发原语、atomic 和 Channel,掌握这些已经能解决 80% 的并发问题。但要进一步提升并发编程能力,还需要了解 扩展并发原语分布式并发原语。这篇文章分两部分:上半部分讲 Go 官方和社区提供的进程内扩展原语(Semaphore、SingleFlight、ErrGroup、CyclicBarrier),下半部分讲基于 etcd 的分布式并发原语(Leader 选举、分布式锁、队列、栅栏、STM)。


上篇:扩展并发原语

一、Semaphore(信号量)

什么是信号量?

信号量(Semaphore)是荷兰计算机科学家 Edsger Dijkstra 在 1963 年提出的并发原语,用来 控制多个 goroutine 同时访问多个资源

它的核心是一个计数器(0 到 n),表示当前可用的资源数量:

  • Acquire(P 操作):计数器 - 1。计数器 > 0 → 成功获取、继续执行;计数器 = 0 → 阻塞等待,直到有资源释放。
  • Release(V 操作):计数器 + 1,唤醒一个等待中的 goroutine。

P/V 操作 的名称来自荷兰语:P(Proberen,尝试)减少信号量,V(Verhogen,增加)增加信号量。

当 n = 1 时,信号量就退化成了互斥锁。 所以 Mutex 本质上是信号量的特例。

golang.org/x/sync/semaphore

Go 官方扩展库提供了加权信号量 semaphore.Weighted

import "golang.org/x/sync/semaphore"

// 创建一个容量为 10 的信号量
sem := semaphore.NewWeighted(10)

// 获取 1 个资源(阻塞直到有可用资源)
sem.Acquire(ctx, 1)

// 释放 1 个资源
sem.Release(1)

// 尝试获取(非阻塞,返回 bool)
if sem.TryAcquire(3) {
    // 成功获取 3 个资源
}

API 一览:

方法说明
NewWeighted(n int64) *Weighted创建信号量,容量为 n
Acquire(ctx, n int64) error获取 n 个资源(可阻塞)
TryAcquire(n int64) bool非阻塞尝试获取
Release(n int64)释放 n 个资源

“加权” 是指一次可以获取/释放多个资源,而不只是 1 个。比如一个任务需要 3 个数据库连接,就 Acquire(ctx, 3)

实战示例:限制并行 goroutine 数

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/semaphore"
)

func main() {
	const maxConcurrency = 3
	sem := semaphore.NewWeighted(maxConcurrency)
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			// 获取信号量,超时 5 秒
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			defer cancel()

			if err := sem.Acquire(ctx, 1); err != nil {
				fmt.Printf("Worker %d: 获取信号量超时\n", id)
				return
			}
			defer sem.Release(1)

			fmt.Printf("Worker %d: 开始(并发数 ≤ %d)\n", id, maxConcurrency)
			time.Sleep(time.Second) // 模拟工作
		}(i)
	}
	wg.Wait()
}

Semaphore vs 缓冲 Channel

在 Channel 那篇文章中,我们用缓冲 Channel 实现过信号量。两者对比:

特性semaphore.Weighted缓冲 Channel
加权获取(一次多个)Acquire(ctx, n)❌ 需要循环
超时/取消控制✅ 通过 Context✅ 通过 select + timer
非阻塞尝试TryAcquire✅ select + default
适用场景需要加权或复杂控制简单的并发数限制

简单场景用缓冲 Channel,需要加权或 Context 集成时用 semaphore。

二、SingleFlight(请求合并)

解决什么问题?

假设有 1000 个 goroutine 同时请求同一个缓存 key,而这个 key 刚好过期(缓存击穿):

💀

没有 SingleFlight

1000 个 goroutine 同时打穿缓存,直接查 DB——DB 承受 1000 次查询,压力陡增,严重时直接被打挂。

有 SingleFlight

只有 1 个 goroutine 真正查 DB,其余 999 个等待并共享结果——DB 压力 = 1 次查询

SingleFlight 的核心能力:对同一个 key 的并发调用,只有一个会真正执行,其余等待并共享结果。

SingleFlight vs sync.Once

1️⃣

sync.Once

永远只执行一次,不管调用多少次。适合单例初始化。
🔀

SingleFlight

每次调用都会执行,但并发的同 key 调用合并为一次。上一轮执行完后,下一轮并发请求又会重新执行一次。适合缓存击穿、请求合并。

基本用法

import "golang.org/x/sync/singleflight"

var g singleflight.Group

// Do:同 key 并发只执行一次
v, err, shared := g.Do("cache-key", func() (any, error) {
    // 只有一个 goroutine 会执行这个函数
    return queryDB("cache-key")
})
// shared == true 表示结果是和其他 goroutine 共享的

// DoChan:异步版本,返回 channel
ch := g.DoChan("cache-key", func() (any, error) {
    return queryDB("cache-key")
})
result := <-ch

// Forget:删除某个 key,下一次调用会重新执行
g.Forget("cache-key")

API 一览:

方法说明
Do(key, fn) (v any, err error, shared bool)同步执行,同 key 并发只让一个 fn 执行,其余等待共享结果
DoChan(key, fn) <-chan Result异步版本,返回 channel,可配合 select 做超时控制
Forget(key)移除 key,下一次 Do 会重新执行 fn

实战示例:防缓存击穿

var g singleflight.Group
var cache sync.Map

func getData(key string) (string, error) {
	// 1. 先查缓存
	if v, ok := cache.Load(key); ok {
		return v.(string), nil
	}

	// 2. 缓存未命中,用 SingleFlight 合并请求
	v, err, _ := g.Do(key, func() (any, error) {
		// 只有一个 goroutine 会真正查 DB
		result, err := queryDB(key)
		if err != nil {
			return nil, err
		}
		cache.Store(key, result) // 回填缓存
		return result, nil
	})
	if err != nil {
		return "", err
	}
	return v.(string), nil
}

注意事项

  1. fn 出错时,错误也会共享给所有等待者——如果不希望一个 goroutine 的失败影响其他人,考虑用 Forget 或者在 fn 内部做重试
  2. fn panic 时,所有等待者都会 panic——确保 fn 不会 panic
  3. DoChan + select 可以实现超时控制——避免等待时间过长

三、ErrGroup(分组任务编排)

解决什么问题?

将一个大任务拆成多个子任务并发执行,全部完成后汇总结果。和 WaitGroup 相比,ErrGroup 提供了 错误传播Context 集成

基本用法

import "golang.org/x/sync/errgroup"

g, ctx := errgroup.WithContext(context.Background())

g.Go(func() error {
    return fetchUsers(ctx)
})
g.Go(func() error {
    return fetchOrders(ctx)
})
g.Go(func() error {
    return fetchProducts(ctx)
})

if err := g.Wait(); err != nil {
    log.Fatal("子任务失败:", err)
}
graph TD A[errgroup.WithContext] --> G1[g.Go fetchUsers] A --> G2[g.Go fetchOrders] A --> G3[g.Go fetchProducts] G1 --> W[g.Wait] G2 --> W G3 --> W W --> R{任一返回 error?} R -->|是| C[ctx 被 cancel
其余子任务感知并退出] R -->|否| S[全部成功] C --> RE[Wait 返回第一个 error]

ErrGroup vs WaitGroup

特性WaitGroupErrGroup
等待全部完成
错误收集需手动 + Mutex✅ 内置
任一失败取消其余✅ Context
限制并发数SetLimit
无需手动 Add/Doneg.Go()

限制并发数(Go 1.20+)

g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(5) // 最多 5 个子任务同时执行

for _, url := range urls {
    u := url
    g.Go(func() error {
        return fetch(ctx, u) // 超过 5 个并发时会阻塞
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

其他任务编排库

除了 ErrGroup,社区还有一些任务编排库:

特点
errgroup官方扩展,错误传播 + Context 取消 + 并发限制
gollback提供 Race(任一成功即返回)和 All(全部执行)
Hunch提供 All/First/Retry/Waterfall 多种编排模式
schedgroup可以延迟执行子任务,指定每个子任务的执行时间

大多数场景下 errgroup 足够了,其他库按需选用。

四、CyclicBarrier(循环栅栏)

什么是 CyclicBarrier?

CyclicBarrier 是一个 可重用的栅栏,让一组 goroutine 在某个点上互相等待,全部到达后同时继续执行。

sequenceDiagram participant G1 as goroutine 1 participant G2 as goroutine 2 participant G3 as goroutine 3 participant B as CyclicBarrier G1->>B: 到达栅栏 等待 G2->>B: 到达栅栏 等待 G3->>B: 到达栅栏 Note over B: 全部到达,栅栏打开! B->>G1: 继续 B->>G2: 继续 B->>G3: 继续 Note over B: 栅栏自动重置,可以再用

与 WaitGroup 的区别:

⏱️

WaitGroup

一方等待,多方通知。主 goroutine 等待,子 goroutine 通知完成。单向的,用完需要重新 Add。
🔄

CyclicBarrier

多方互相等待。所有 goroutine 都在栅栏处等待,全部到达后同时继续。双向的,自动重置,可反复使用。

使用(github.com/marusama/cyclicbarrier)

Go 标准库没有 CyclicBarrier,社区库 cyclicbarrier 提供了实现:

import "github.com/marusama/cyclicbarrier"

// 创建一个 3 参与者的栅栏
b := cyclicbarrier.New(3)

for i := 0; i < 3; i++ {
    go func(id int) {
        for round := 0; round < 5; round++ {
            fmt.Printf("Worker %d: 第 %d 轮准备就绪\n", id, round)
            b.Await(context.Background()) // 等待所有人到达
            // 全部到达后继续,栅栏自动重置
            fmt.Printf("Worker %d: 第 %d 轮开始执行\n", id, round)
        }
    }(i)
}

每一轮栅栏打开时所有 worker 同时"同步一次"再继续下一轮;栅栏会在所有参与者放行后自动重置,下一轮可以再次 Await

API 一览:

方法说明
New(parties int) CyclicBarrier创建栅栏,parties 为参与者数量
NewWithAction(parties int, f func() error) CyclicBarrier全部到达时先执行 f,再放行(用于阶段间的汇总操作)
Await(ctx context.Context) error在栅栏处等待,支持 Context 超时/取消
Reset()手动重置栅栏(Await 后会自动重置,一般不需要手动调用)

典型场景:多轮迭代计算(如矩阵运算),每轮结束后需要所有 worker 同步,再开始下一轮。


下篇:基于 etcd 的分布式并发原语

前面讲的都是进程内的并发原语。当资源或任务分布在 不同进程、不同机器 上时,就需要分布式并发原语。常用的协调系统有 Zookeeper、etcd、Consul,其中 etcd 对 Go 生态支持最好,提供了丰富的分布式并发原语。

🏛️

go.etcd.io/etcd/client/v3/concurrency

  • Leader 选举 —— 选出集群中的唯一主节点
  • 分布式互斥锁 —— 跨进程/跨机器的互斥访问
  • 分布式读写锁 —— 跨进程的读写锁
📦

github.com/etcd-io/etcd/contrib/recipes

  • 分布式队列 —— 跨节点的 FIFO 队列
  • 分布式优先级队列 —— 带优先级的分布式队列
  • 分布式栅栏 —— 跨节点的同步点
  • STM —— 软件事务内存

生产环境需要一个 7×24 的 etcd 集群;学习测试可以用单节点 etcd。

五、Leader 选举

什么是 Leader 选举?

主从架构中,需要确定哪个节点是主(Leader),哪个是从(Follower)。同一时刻只能有一个 Leader,否则多个节点同时执行写操作,会导致数据不一致。

graph TB subgraph Normal["正常运行"] L[Leader
执行写操作] F1[Follower
执行读操作] F2[Follower] L --- F1 L --- F2 end subgraph Failover["Leader 宕机后"] X[❌ 宕机 Leader] NL[新 Leader
被选出] NF[Follower] X -.-> NL NL --- NF end

etcd 实现

import (
    clientv3 "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

// 连接 etcd
cli, _ := clientv3.New(clientv3.Config{
    Endpoints: []string{"localhost:2379"},
})
defer cli.Close()

// 创建 session(带 TTL 的租约,节点宕机后自动释放)
session, _ := concurrency.NewSession(cli, concurrency.WithTTL(10))
defer session.Close()

// 参与选举
e := concurrency.NewElection(session, "/my-service/leader")

// 竞选 Leader(阻塞直到当选)
if err := e.Campaign(ctx, "node-1"); err != nil {
    log.Fatal(err)
}
fmt.Println("我是 Leader!")

// 查看当前 Leader
resp, _ := e.Leader(ctx)
fmt.Println("当前 Leader:", string(resp.Kvs[0].Value))

// 主动让位
e.Resign(ctx)

Election API 一览:

方法说明
NewElection(session, prefix)创建选举实例
Campaign(ctx, val)参与竞选(阻塞直到当选)
Proclaim(ctx, val)Leader 更新自己的值
Resign(ctx)主动让位
Leader(ctx)查看当前 Leader
Observe(ctx) <-chan Response监听 Leader 变化

Session 的 TTL 机制:每个参与选举的节点通过 Session 维持一个租约(Lease)。如果节点宕机,租约过期,etcd 自动释放该节点的 Leader 身份,其他节点可以竞选成功。

六、分布式互斥锁与读写锁

分布式 Mutex

当多个进程/机器需要互斥访问同一个资源时:

session, _ := concurrency.NewSession(cli)
mu := concurrency.NewMutex(session, "/my-lock/")

// 获取锁(阻塞直到成功)
if err := mu.Lock(ctx); err != nil {
    log.Fatal(err)
}
fmt.Println("获取到分布式锁")

// 临界区操作
doSomething()

// 释放锁
if err := mu.Unlock(ctx); err != nil {
    log.Fatal(err)
}

分布式 RWMutex

读多写少的分布式场景:

session, _ := concurrency.NewSession(cli)
rw := recipe.NewRWMutex(session, "/my-rwlock/")

// 读锁(多个节点可以同时持有)
rw.RLock(ctx)
readData()
rw.RUnlock(ctx)

// 写锁(独占)
rw.Lock(ctx)
writeData()
rw.Unlock(ctx)

与进程内锁的对比

特性sync.Mutexetcd 分布式 Mutex
作用范围单进程内跨进程/跨机器
故障处理进程崩 → 锁消失节点崩 → TTL 过期自动释放
性能纳秒级毫秒级(网络 RTT)
依赖etcd 集群

七、分布式队列

etcd 提供了 FIFO 队列和优先级队列。

FIFO 队列

import recipe "go.etcd.io/etcd/contrib/recipes"

q := recipe.NewQueue(cli, "/my-queue")

// 入队(可以在任意节点)
q.Enqueue("task-1")
q.Enqueue("task-2")

// 出队(可以在另一个节点)
val, _ := q.Dequeue() // "task-1"(FIFO 顺序)
// 如果队列为空,Dequeue 会阻塞等待

优先级队列

pq := recipe.NewPriorityQueue(cli, "/my-pqueue")

// 入队,附带优先级(数字越小优先级越高)
pq.Enqueue("low-priority-task", 10)
pq.Enqueue("high-priority-task", 1)

val, _ := pq.Dequeue() // "high-priority-task"(优先级 1 先出)
对比维度普通 FIFO 队列优先级队列
入队顺序task-1, task-2, task-3(10) low, (1) high, (5) mid
出队顺序task-1, task-2, task-3(FIFO)(1) high, (5) mid, (10) low(按优先级)

八、分布式栅栏(Barrier)

etcd 提供了两种栅栏。

Barrier:阻塞/放行栅栏

由一个节点控制栅栏的开关,其他节点在栅栏处等待:

b := recipe.NewBarrier(cli, "/my-barrier")

// 节点 A:创建栅栏(阻塞其他节点)
b.Hold()

// 节点 B/C/D:在栅栏处等待
b.Wait() // 阻塞,直到栅栏被移除

// 节点 A:移除栅栏(放行所有等待者)
b.Release()

DoubleBarrier:集合点栅栏

类似 CyclicBarrier——等待指定数量的节点都进入后才放行:

db := recipe.NewDoubleBarrier(session, "/my-dbarrier", 3) // 需要 3 个参与者

// 每个节点调用 Enter(阻塞直到 3 个节点都进入)
db.Enter() // 3 个节点都 Enter 后,全部放行

// 执行任务...

// 每个节点调用 Leave(阻塞直到 3 个节点都离开)
db.Leave() // 3 个节点都 Leave 后,全部继续
sequenceDiagram participant A as 节点 A participant B as 节点 B participant C as 节点 C participant D as DoubleBarrier A->>D: Enter 等待... B->>D: Enter 等待... C->>D: Enter Note over D: 3 个节点到达,全部放行 D->>A: 放行 D->>B: 放行 D->>C: 放行 Note over A,C: 同时执行任务 A->>D: Leave 等待... C->>D: Leave 等待... B->>D: Leave Note over D: 3 个节点离开,全部继续

九、STM(软件事务内存)

STM 提供了类似数据库事务的能力——对 etcd 中的多个 key 进行原子的读写操作:

import "go.etcd.io/etcd/client/v3/concurrency"

// 转账:从 A 扣款 100,给 B 加款 100(原子操作)
_, err := concurrency.NewSTM(cli, func(stm concurrency.STM) error {
    balanceA := stm.Get("/account/A")
    balanceB := stm.Get("/account/B")

    a, _ := strconv.Atoi(balanceA)
    b, _ := strconv.Atoi(balanceB)

    if a < 100 {
        return fmt.Errorf("余额不足")
    }

    stm.Put("/account/A", strconv.Itoa(a-100))
    stm.Put("/account/B", strconv.Itoa(b+100))
    return nil
})

STM 的隔离级别:

级别说明
SerializableSnapshot可串行化快照(默认,最严格)
Serializable可串行化
RepeatableReads可重复读
ReadCommitted读已提交(最宽松)

STM 内部使用 乐观锁 + 重试 机制:先读取所有需要的 key,执行事务逻辑,然后尝试提交。如果发现读取的 key 在此期间被其他事务修改了,就自动重试整个事务函数。


十、全景对比

原语作用范围典型场景来源
Semaphore单进程限制并发度(加权)golang.org/x/sync
SingleFlight单进程缓存击穿、请求合并golang.org/x/sync
ErrGroup单进程并行子任务 + 错误收集golang.org/x/sync
CyclicBarrier单进程多轮迭代同步社区库
Leader 选举分布式主从架构选主etcd concurrency
分布式 Mutex分布式跨节点互斥访问etcd concurrency
分布式 Queue分布式跨节点任务队列etcd recipes
分布式 Barrier分布式跨节点同步点etcd recipes
STM分布式多 key 原子操作etcd concurrency

十一、实战建议

  1. 缓存击穿首选 SingleFlight——把 N 次并发 DB 查询合并为 1 次
  2. 并行子任务首选 ErrGroup——比 WaitGroup + Mutex 收集错误更简洁
  3. 加权并发控制用 Semaphore——一次获取多个资源,配合 Context 做超时
  4. 分布式场景选 etcd——Go 生态最成熟,API 最完善
  5. 分布式锁要设 TTL——防止持锁节点宕机导致死锁
  6. STM 事务函数必须幂等——因为冲突时会自动重试
# 两个应该加入 CI 的命令
go vet ./...
go test -race ./...

标准库的并发原语解决 80% 的问题,扩展并发原语解决剩下的 15%,分布式并发原语解决最后的 5%。根据你的场景——单机还是分布式、简单还是复杂——选择合适层次的工具,不要杀鸡用牛刀,也不要拿菜刀去砍树。