前面几篇我们讲了标准库的并发原语、atomic 和 Channel,掌握这些已经能解决 80% 的并发问题。但要进一步提升并发编程能力,还需要了解 扩展并发原语分布式并发原语。这篇文章分两部分:上半部分讲 Go 官方和社区提供的进程内扩展原语(Semaphore、SingleFlight、ErrGroup、CyclicBarrier),下半部分讲基于 etcd 的分布式并发原语(Leader 选举、分布式锁、队列、栅栏、STM)。


上篇:扩展并发原语

一、Semaphore(信号量)

什么是信号量?

信号量(Semaphore)是荷兰计算机科学家 Edsger Dijkstra 在 1963 年提出的并发原语,用来 控制多个 goroutine 同时访问多个资源

它的核心是一个计数器(0 到 n),表示当前可用的资源数量:

┌────────────────────────────────────────────────────────────────┐
│                        信号量模型                               │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│   初始资源数 = n                                               │
│                                                                │
│   Acquire (P 操作):计数器 - 1                                 │
│     计数器 > 0 → 成功获取,继续执行                            │
│     计数器 = 0 → 阻塞等待,直到有资源释放                     │
│                                                                │
│   Release (V 操作):计数器 + 1                                 │
│     唤醒一个等待中的 goroutine                                 │
│                                                                │
└────────────────────────────────────────────────────────────────┘

P/V 操作 的名称来自荷兰语:P(Proberen,尝试)减少信号量,V(Verhogen,增加)增加信号量。

当 n = 1 时,信号量就退化成了互斥锁。 所以 Mutex 本质上是信号量的特例。

golang.org/x/sync/semaphore

Go 官方扩展库提供了加权信号量 semaphore.Weighted

import "golang.org/x/sync/semaphore"

// 创建一个容量为 10 的信号量
sem := semaphore.NewWeighted(10)

// 获取 1 个资源(阻塞直到有可用资源)
sem.Acquire(ctx, 1)

// 释放 1 个资源
sem.Release(1)

// 尝试获取(非阻塞,返回 bool)
if sem.TryAcquire(3) {
    // 成功获取 3 个资源
}

API 一览:

┌──────────────────────────────────────────────────────────────────┐
│                  semaphore.Weighted                               │
├──────────────────────────────────────────────────────────────────┤
│  NewWeighted(n int64) *Weighted        创建信号量,容量为 n      │
│  Acquire(ctx, n int64) error           获取 n 个资源(可阻塞)   │
│  TryAcquire(n int64) bool              非阻塞尝试获取            │
│  Release(n int64)                      释放 n 个资源             │
└──────────────────────────────────────────────────────────────────┘

“加权” 是指一次可以获取/释放多个资源,而不只是 1 个。比如一个任务需要 3 个数据库连接,就 Acquire(ctx, 3)

实战示例:限制并行 goroutine 数

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/semaphore"
)

func main() {
	const maxConcurrency = 3
	sem := semaphore.NewWeighted(maxConcurrency)
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			// 获取信号量,超时 5 秒
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			defer cancel()

			if err := sem.Acquire(ctx, 1); err != nil {
				fmt.Printf("Worker %d: 获取信号量超时\n", id)
				return
			}
			defer sem.Release(1)

			fmt.Printf("Worker %d: 开始(并发数 ≤ %d)\n", id, maxConcurrency)
			time.Sleep(time.Second) // 模拟工作
		}(i)
	}
	wg.Wait()
}

Semaphore vs 缓冲 Channel

在 Channel 那篇文章中,我们用缓冲 Channel 实现过信号量。两者对比:

特性semaphore.Weighted缓冲 Channel
加权获取(一次多个)Acquire(ctx, n)❌ 需要循环
超时/取消控制✅ 通过 Context✅ 通过 select + timer
非阻塞尝试TryAcquire✅ select + default
适用场景需要加权或复杂控制简单的并发数限制

简单场景用缓冲 Channel,需要加权或 Context 集成时用 semaphore。

二、SingleFlight(请求合并)

解决什么问题?

假设有 1000 个 goroutine 同时请求同一个缓存 key,而这个 key 刚好过期(缓存击穿):

没有 SingleFlight:                  有 SingleFlight:

  goroutine 1 ──→ 查 DB              goroutine 1 ──→ 查 DB
  goroutine 2 ──→ 查 DB              goroutine 2 ──→ 等待...
  goroutine 3 ──→ 查 DB              goroutine 3 ──→ 等待...
  ...                                 ...
  goroutine 1000 ──→ 查 DB           goroutine 1000 ──→ 等待...
                                     
  DB 压力: 1000 次查询 💀             goroutine 1 查完 ──→ 结果共享给所有人
                                      DB 压力: 1 次查询 ✅

SingleFlight 的核心能力:对同一个 key 的并发调用,只有一个会真正执行,其余等待并共享结果。

SingleFlight vs sync.Once

┌──────────────────────────────────────────────────────────────┐
│  sync.Once                                                    │
│  • 永远只执行一次,不管调用多少次                             │
│  • 适合单例初始化                                            │
├──────────────────────────────────────────────────────────────┤
│  SingleFlight                                                 │
│  • 每次调用都会执行,但并发的同 key 调用只执行一次            │
│  • 上一轮执行完毕后,下一轮的并发请求又会重新执行            │
│  • 适合缓存击穿、请求合并                                    │
└──────────────────────────────────────────────────────────────┘

基本用法

import "golang.org/x/sync/singleflight"

var g singleflight.Group

// Do:同 key 并发只执行一次
v, err, shared := g.Do("cache-key", func() (any, error) {
    // 只有一个 goroutine 会执行这个函数
    return queryDB("cache-key")
})
// shared == true 表示结果是和其他 goroutine 共享的

// DoChan:异步版本,返回 channel
ch := g.DoChan("cache-key", func() (any, error) {
    return queryDB("cache-key")
})
result := <-ch

// Forget:删除某个 key,下一次调用会重新执行
g.Forget("cache-key")

API 一览:

┌──────────────────────────────────────────────────────────────────────┐
│                     singleflight.Group                               │
├──────────────────────────────────────────────────────────────────────┤
│  Do(key, fn) (v any, err error, shared bool)                        │
│      同步执行,同 key 并发只让一个 fn 执行,其余等待共享结果        │
│                                                                      │
│  DoChan(key, fn) <-chan Result                                       │
│      异步版本,返回 channel,可配合 select 做超时控制               │
│                                                                      │
│  Forget(key)                                                         │
│      移除 key,下一次 Do 会重新执行 fn                               │
└──────────────────────────────────────────────────────────────────────┘

实战示例:防缓存击穿

var g singleflight.Group
var cache sync.Map

func getData(key string) (string, error) {
	// 1. 先查缓存
	if v, ok := cache.Load(key); ok {
		return v.(string), nil
	}

	// 2. 缓存未命中,用 SingleFlight 合并请求
	v, err, _ := g.Do(key, func() (any, error) {
		// 只有一个 goroutine 会真正查 DB
		result, err := queryDB(key)
		if err != nil {
			return nil, err
		}
		cache.Store(key, result) // 回填缓存
		return result, nil
	})
	if err != nil {
		return "", err
	}
	return v.(string), nil
}

注意事项

  1. fn 出错时,错误也会共享给所有等待者——如果不希望一个 goroutine 的失败影响其他人,考虑用 Forget 或者在 fn 内部做重试
  2. fn panic 时,所有等待者都会 panic——确保 fn 不会 panic
  3. DoChan + select 可以实现超时控制——避免等待时间过长

三、ErrGroup(分组任务编排)

解决什么问题?

将一个大任务拆成多个子任务并发执行,全部完成后汇总结果。和 WaitGroup 相比,ErrGroup 提供了 错误传播Context 集成

基本用法

import "golang.org/x/sync/errgroup"

g, ctx := errgroup.WithContext(context.Background())

g.Go(func() error {
    return fetchUsers(ctx)
})
g.Go(func() error {
    return fetchOrders(ctx)
})
g.Go(func() error {
    return fetchProducts(ctx)
})

if err := g.Wait(); err != nil {
    log.Fatal("子任务失败:", err)
}
  errgroup.WithContext(ctx)
      ├── g.Go(fetchUsers)   ──→ goroutine 1
      ├── g.Go(fetchOrders)  ──→ goroutine 2
      └── g.Go(fetchProducts)──→ goroutine 3
      │── g.Wait()
      │   阻塞直到所有子任务完成
      │   如果任一子任务返回 error → ctx 被 cancel → 其余子任务可感知
      │   Wait 返回第一个 error

ErrGroup vs WaitGroup

特性WaitGroupErrGroup
等待全部完成
错误收集需手动 + Mutex✅ 内置
任一失败取消其余✅ Context
限制并发数SetLimit
无需手动 Add/Doneg.Go()

限制并发数(Go 1.20+)

g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(5) // 最多 5 个子任务同时执行

for _, url := range urls {
    u := url
    g.Go(func() error {
        return fetch(ctx, u) // 超过 5 个并发时会阻塞
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

其他任务编排库

除了 ErrGroup,社区还有一些任务编排库:

┌─────────────┬────────────────────────────────────────────────────┐
│ 库           │ 特点                                               │
├─────────────┼────────────────────────────────────────────────────┤
│ errgroup    │ 官方扩展,错误传播 + Context 取消 + 并发限制       │
│ gollback    │ 提供 Race(任一成功即返回)和 All(全部执行)      │
│ Hunch       │ 提供 All/First/Retry/Waterfall 多种编排模式        │
│ schedgroup  │ 可以延迟执行子任务,指定每个子任务的执行时间       │
└─────────────┴────────────────────────────────────────────────────┘

大多数场景下 errgroup 足够了,其他库按需选用。

四、CyclicBarrier(循环栅栏)

什么是 CyclicBarrier?

CyclicBarrier 是一个 可重用的栅栏,让一组 goroutine 在某个点上互相等待,全部到达后同时继续执行。

goroutine 1 ──→ 到达栅栏 ──→ 等待...
goroutine 2 ──→ 到达栅栏 ──→ 等待...
goroutine 3 ──→ 到达栅栏 ──→ 等待...   ← 全部到达,栅栏打开!
                                        ──→ 三个 goroutine 同时继续
                                        ──→ 栅栏自动重置,可以再用

与 WaitGroup 的区别:

┌──────────────────────────────────────────────────────────────┐
│  WaitGroup:一方等待,多方通知                                │
│  • 主 goroutine 等待,子 goroutine 通知完成                  │
│  • 单向的,用完需要重新 Add                                  │
├──────────────────────────────────────────────────────────────┤
│  CyclicBarrier:多方互相等待                                  │
│  • 所有 goroutine 都在栅栏处等待,全部到达后同时继续         │
│  • 双向的,自动重置,可反复使用                              │
└──────────────────────────────────────────────────────────────┘

使用(github.com/marusama/cyclicbarrier)

Go 标准库没有 CyclicBarrier,社区库 cyclicbarrier 提供了实现:

import "github.com/marusama/cyclicbarrier"

// 创建一个 3 参与者的栅栏
b := cyclicbarrier.New(3)

for i := 0; i < 3; i++ {
    go func(id int) {
        for round := 0; round < 5; round++ {
            fmt.Printf("Worker %d: 第 %d 轮准备就绪\n", id, round)
            b.Await(context.Background()) // 等待所有人到达
            // 全部到达后继续,栅栏自动重置
            fmt.Printf("Worker %d: 第 %d 轮开始执行\n", id, round)
        }
    }(i)
}
第 0 轮:
  Worker 0: 准备就绪 → Await (等待...)
  Worker 2: 准备就绪 → Await (等待...)
  Worker 1: 准备就绪 → Await ← 全部到达,栅栏打开!
  Worker 0/1/2: 同时开始执行

第 1 轮(栅栏已自动重置):
  Worker 1: 准备就绪 → Await (等待...)
  Worker 0: 准备就绪 → Await (等待...)
  Worker 2: 准备就绪 → Await ← 全部到达,栅栏打开!
  Worker 0/1/2: 同时开始执行

...重复...

API 一览:

┌──────────────────────────────────────────────────────────────────┐
│                     cyclicbarrier                                 │
├──────────────────────────────────────────────────────────────────┤
│  New(parties int) CyclicBarrier                                   │
│      创建栅栏,parties 为参与者数量                               │
│                                                                   │
│  NewWithAction(parties int, f func() error) CyclicBarrier        │
│      全部到达时先执行 f,再放行(用于阶段间的汇总操作)          │
│                                                                   │
│  Await(ctx context.Context) error                                │
│      在栅栏处等待,支持 Context 超时/取消                        │
│                                                                   │
│  Reset()                                                          │
│      手动重置栅栏(Await 后会自动重置,一般不需要手动调用)      │
└──────────────────────────────────────────────────────────────────┘

典型场景:多轮迭代计算(如矩阵运算),每轮结束后需要所有 worker 同步,再开始下一轮。


下篇:基于 etcd 的分布式并发原语

前面讲的都是进程内的并发原语。当资源或任务分布在 不同进程、不同机器 上时,就需要分布式并发原语。常用的协调系统有 Zookeeper、etcd、Consul,其中 etcd 对 Go 生态支持最好,提供了丰富的分布式并发原语。

┌──────────────────────────────────────────────────────────────┐
│                     etcd 分布式并发原语                       │
├──────────────────────────────────────────────────────────────┤
│  go.etcd.io/etcd/client/v3/concurrency                       │
│                                                              │
│  • Leader 选举        选出集群中的唯一主节点                 │
│  • 分布式互斥锁       跨进程/跨机器的互斥访问                │
│  • 分布式读写锁       跨进程的读写锁                         │
│                                                              │
│  github.com/etcd-io/etcd/contrib/recipes                     │
│                                                              │
│  • 分布式队列         跨节点的 FIFO 队列                     │
│  • 分布式优先级队列   带优先级的分布式队列                   │
│  • 分布式栅栏         跨节点的同步点                         │
│  • STM               软件事务内存                            │
│                                                              │
└──────────────────────────────────────────────────────────────┘

生产环境需要一个 7×24 的 etcd 集群;学习测试可以用单节点 etcd。

五、Leader 选举

什么是 Leader 选举?

主从架构中,需要确定哪个节点是主(Leader),哪个是从(Follower)。同一时刻只能有一个 Leader,否则多个节点同时执行写操作,会导致数据不一致。

正常运行:                           Leader 宕机后:

  ┌────────┐                          ┌────────┐
  │ Leader │ ← 执行写操作             │ ×宕机× │
  └────┬───┘                          └────────┘
       │                                   │
  ┌────┴────┐                         ┌────┴────┐
  │Follower │  ← 执行读操作          │Follower │ ← 被选为新 Leader
  │Follower │                         │Follower │
  └─────────┘                         └─────────┘

etcd 实现

import (
    clientv3 "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

// 连接 etcd
cli, _ := clientv3.New(clientv3.Config{
    Endpoints: []string{"localhost:2379"},
})
defer cli.Close()

// 创建 session(带 TTL 的租约,节点宕机后自动释放)
session, _ := concurrency.NewSession(cli, concurrency.WithTTL(10))
defer session.Close()

// 参与选举
e := concurrency.NewElection(session, "/my-service/leader")

// 竞选 Leader(阻塞直到当选)
if err := e.Campaign(ctx, "node-1"); err != nil {
    log.Fatal(err)
}
fmt.Println("我是 Leader!")

// 查看当前 Leader
resp, _ := e.Leader(ctx)
fmt.Println("当前 Leader:", string(resp.Kvs[0].Value))

// 主动让位
e.Resign(ctx)
┌───────────────────────────────────────────────────────────┐
│  Election API                                              │
├───────────────────────────────────────────────────────────┤
│  NewElection(session, prefix)    创建选举实例              │
│  Campaign(ctx, val)              参与竞选(阻塞直到当选)  │
│  Proclaim(ctx, val)              Leader 更新自己的值       │
│  Resign(ctx)                     主动让位                  │
│  Leader(ctx)                     查看当前 Leader           │
│  Observe(ctx) <-chan Response    监听 Leader 变化          │
└───────────────────────────────────────────────────────────┘

Session 的 TTL 机制:每个参与选举的节点通过 Session 维持一个租约(Lease)。如果节点宕机,租约过期,etcd 自动释放该节点的 Leader 身份,其他节点可以竞选成功。

六、分布式互斥锁与读写锁

分布式 Mutex

当多个进程/机器需要互斥访问同一个资源时:

session, _ := concurrency.NewSession(cli)
mu := concurrency.NewMutex(session, "/my-lock/")

// 获取锁(阻塞直到成功)
if err := mu.Lock(ctx); err != nil {
    log.Fatal(err)
}
fmt.Println("获取到分布式锁")

// 临界区操作
doSomething()

// 释放锁
if err := mu.Unlock(ctx); err != nil {
    log.Fatal(err)
}

分布式 RWMutex

读多写少的分布式场景:

session, _ := concurrency.NewSession(cli)
rw := recipe.NewRWMutex(session, "/my-rwlock/")

// 读锁(多个节点可以同时持有)
rw.RLock(ctx)
readData()
rw.RUnlock(ctx)

// 写锁(独占)
rw.Lock(ctx)
writeData()
rw.Unlock(ctx)

与进程内锁的对比

特性sync.Mutexetcd 分布式 Mutex
作用范围单进程内跨进程/跨机器
故障处理进程崩 → 锁消失节点崩 → TTL 过期自动释放
性能纳秒级毫秒级(网络 RTT)
依赖etcd 集群

七、分布式队列

etcd 提供了 FIFO 队列和优先级队列。

FIFO 队列

import recipe "go.etcd.io/etcd/contrib/recipes"

q := recipe.NewQueue(cli, "/my-queue")

// 入队(可以在任意节点)
q.Enqueue("task-1")
q.Enqueue("task-2")

// 出队(可以在另一个节点)
val, _ := q.Dequeue() // "task-1"(FIFO 顺序)
// 如果队列为空,Dequeue 会阻塞等待

优先级队列

pq := recipe.NewPriorityQueue(cli, "/my-pqueue")

// 入队,附带优先级(数字越小优先级越高)
pq.Enqueue("low-priority-task", 10)
pq.Enqueue("high-priority-task", 1)

val, _ := pq.Dequeue() // "high-priority-task"(优先级 1 先出)
普通队列:                         优先级队列:

  入队顺序 → 出队顺序               入队              出队
  task-1 → task-1                   (10) low-task     (1) high-task ← 先出
  task-2 → task-2                   (1)  high-task    (10) low-task
  task-3 → task-3                   (5)  mid-task     (5)  mid-task

八、分布式栅栏(Barrier)

etcd 提供了两种栅栏。

Barrier:阻塞/放行栅栏

由一个节点控制栅栏的开关,其他节点在栅栏处等待:

b := recipe.NewBarrier(cli, "/my-barrier")

// 节点 A:创建栅栏(阻塞其他节点)
b.Hold()

// 节点 B/C/D:在栅栏处等待
b.Wait() // 阻塞,直到栅栏被移除

// 节点 A:移除栅栏(放行所有等待者)
b.Release()

DoubleBarrier:集合点栅栏

类似 CyclicBarrier——等待指定数量的节点都进入后才放行:

db := recipe.NewDoubleBarrier(session, "/my-dbarrier", 3) // 需要 3 个参与者

// 每个节点调用 Enter(阻塞直到 3 个节点都进入)
db.Enter() // 3 个节点都 Enter 后,全部放行

// 执行任务...

// 每个节点调用 Leave(阻塞直到 3 个节点都离开)
db.Leave() // 3 个节点都 Leave 后,全部继续
节点 A: Enter (等待...)
节点 B: Enter (等待...)
节点 C: Enter ← 第 3 个到达,全部放行!

  A/B/C 同时执行任务

节点 A: Leave (等待...)
节点 C: Leave (等待...)
节点 B: Leave ← 第 3 个到达,全部继续!

九、STM(软件事务内存)

STM 提供了类似数据库事务的能力——对 etcd 中的多个 key 进行原子的读写操作:

import "go.etcd.io/etcd/client/v3/concurrency"

// 转账:从 A 扣款 100,给 B 加款 100(原子操作)
_, err := concurrency.NewSTM(cli, func(stm concurrency.STM) error {
    balanceA := stm.Get("/account/A")
    balanceB := stm.Get("/account/B")

    a, _ := strconv.Atoi(balanceA)
    b, _ := strconv.Atoi(balanceB)

    if a < 100 {
        return fmt.Errorf("余额不足")
    }

    stm.Put("/account/A", strconv.Itoa(a-100))
    stm.Put("/account/B", strconv.Itoa(b+100))
    return nil
})

STM 的隔离级别:

┌──────────────────────────────────────────────────────────────┐
│  STM 隔离级别(通过 WithIsolation 选项设置)                 │
├──────────────────────────────────────────────────────────────┤
│  SerializableSnapshot  可串行化快照(默认,最严格)          │
│  Serializable          可串行化                              │
│  RepeatableReads       可重复读                              │
│  ReadCommitted         读已提交(最宽松)                    │
└──────────────────────────────────────────────────────────────┘

STM 内部使用 乐观锁 + 重试 机制:先读取所有需要的 key,执行事务逻辑,然后尝试提交。如果发现读取的 key 在此期间被其他事务修改了,就自动重试整个事务函数。


十、全景对比

原语作用范围典型场景来源
Semaphore单进程限制并发度(加权)golang.org/x/sync
SingleFlight单进程缓存击穿、请求合并golang.org/x/sync
ErrGroup单进程并行子任务 + 错误收集golang.org/x/sync
CyclicBarrier单进程多轮迭代同步社区库
Leader 选举分布式主从架构选主etcd concurrency
分布式 Mutex分布式跨节点互斥访问etcd concurrency
分布式 Queue分布式跨节点任务队列etcd recipes
分布式 Barrier分布式跨节点同步点etcd recipes
STM分布式多 key 原子操作etcd concurrency

十一、实战建议

  1. 缓存击穿首选 SingleFlight——把 N 次并发 DB 查询合并为 1 次
  2. 并行子任务首选 ErrGroup——比 WaitGroup + Mutex 收集错误更简洁
  3. 加权并发控制用 Semaphore——一次获取多个资源,配合 Context 做超时
  4. 分布式场景选 etcd——Go 生态最成熟,API 最完善
  5. 分布式锁要设 TTL——防止持锁节点宕机导致死锁
  6. STM 事务函数必须幂等——因为冲突时会自动重试
# 两个应该加入 CI 的命令
go vet ./...
go test -race ./...

标准库的并发原语解决 80% 的问题,扩展并发原语解决剩下的 15%,分布式并发原语解决最后的 5%。根据你的场景——单机还是分布式、简单还是复杂——选择合适层次的工具,不要杀鸡用牛刀,也不要拿菜刀去砍树。