12.Golang 锁 sync包 源码分析 (三、Once、Cond、semaphore、singleflight)

Golang 锁 sync包 源码分析 (三、Once、Cond、semaphore、singleflight)

注意当前go版本代码为1.23

sync包 源码分析包主要介绍 Go 语言中常见的同步原语 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond 以及扩展原语 golang/sync/errgroup.Groupgolang/sync/semaphore.Weightedgolang/sync/singleflight.Group 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。

Once

示例

1
2
3
4
5
6
7
8
9
10
11
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}

$ go run main.go
输出:only once

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Once 是一个用于确保某个操作只执行一次的结构体。
// 它通常用于初始化、资源加载等场景,确保即使在并发环境下,操作也只会执行一次。
type Once struct {
// done 是一个原子变量,用于指示操作是否已经执行。
done atomic.Uint32

// m 是一个互斥锁,用于在doSlow 中保护临界区。
m Mutex
}

// Do 方法用于执行传入的函数 f,并确保 f 只会被执行一次。
// 如果 Once 实例已经被标记为“已完成”(done == 1),则 f 不会被执行。
// 否则,f 会被执行,并且 Once 实例会被标记为“已完成”。
func (o *Once) Do(f func()) {
// 首先检查 done 是否为 0,即操作是否尚未执行。
// 如果 done 为 0,则进入doSlow执行操作。
if o.done.Load() == 0 {
o.doSlow(f)
}
}

// doSlow 是 Do 方法的慢路径实现,用于处理并发情况。
// 它通过互斥锁 m 来确保只有一个 goroutine 能够执行操作。
func (o *Once) doSlow(f func()) {
o.m.Lock()
// 在函数返回时释放互斥锁,确保锁总是会被释放。
defer o.m.Unlock()

// 再次检查 done 是否为 0,因为在获取锁的过程中,其他 goroutine 可能已经执行了操作。
// 如果 done 仍然为 0,则执行操作。
if o.done.Load() == 0 {
// 在函数返回时将 done 设置为 1,标记操作已经完成。
// 使用 defer 确保即使 f 发生 panic,done 也会被正确设置。
defer o.done.Store(1)
// 执行传入的函数 f。
f()
}
}

Go 语言标准库中 sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次。

sync.Once.Dosync.Once 结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:

  • 如果传入的函数已经执行过,会直接返回;
  • 如果传入的函数没有执行过,会调用 sync.Once.doSlow 执行传入的函数:

sync.Once 会通过成员变量 done 设置为1确保函数不会执行第二次。

Cond

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
var status int64

func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 5; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
}

func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}

func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}

$ go run main.go
输出:
listen
listen
listen
listen
listen

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
type Cond struct {
noCopy noCopy

// 互斥锁,用于在观察或修改条件时保护共享资源
L Locker

// notify 是等待在此条件上的 goroutine 列表。
notify notifyList
// checker 用于检测 Cond 结构体是否被复制。
checker copyChecker
}

// Wait 使当前 goroutine 进入等待状态,直到被 Signal 或 Broadcast 唤醒。
// 调用 Wait 时,必须持有 c.L 锁。
// Wait 会在进入等待状态前释放锁,并在被唤醒后重新获取锁。
func (c *Cond) Wait() {
c.checker.check() // 检查 Cond 是否被复制
t := runtime_notifyListAdd(&c.notify) // 将当前 goroutine 添加到等待列表中,并返回一个标记
c.L.Unlock() // 释放锁,允许其他 goroutine 修改共享资源
runtime_notifyListWait(&c.notify, t) // 进入等待状态,直到被唤醒
c.L.Lock() // 被唤醒后,重新获取锁
}

// Signal 唤醒一个正在等待的 goroutine(如果有的话)。
// 调用 Signal 时,可以但不必须持有 c.L 锁。
// Signal 不会影响 goroutine 的调度优先级;如果有其他 goroutine 正在尝试锁定 c.L,
// 它们可能会在等待的 goroutine 之前被唤醒。
func (c *Cond) Signal() {
c.checker.check() // 检查 Cond 是否被复制
runtime_notifyListNotifyOne(&c.notify) // 唤醒一个等待的 goroutine
}

// Broadcast 唤醒所有正在等待的 goroutine。
// 调用 Broadcast 时,可以但不必须持有 c.L 锁。
func (c *Cond) Broadcast() {
c.checker.check() // 检查 Cond 是否被复制
runtime_notifyListNotifyAll(&c.notify) // 唤醒所有等待的 goroutine
}


// 用于检测 Cond 是否被复制。
// 如果检测到 Cond 被复制,会触发 panic,防止潜在的并发问题。
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

// notifyListWait 获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端,调用 runtime.goparkunlock 将当前 Goroutine 陷入休眠,该函数也是在 Go 语言切换 Goroutine 时经常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}

// runtime.notifyListNotifyAll 依次通过 runtime.readyWithTime 唤醒链表中 Goroutine,Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil

atomic.Store(&l.notify, atomic.Load(&l.wait))

for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}

sync.Cond 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {} 进行忙碌等待相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:

  • sync.Cond.Wait 在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;
  • sync.Cond.Signal 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
  • sync.Cond.Broadcast 会按照一定顺序广播通知等待的全部 Goroutine;

扩展包semaphore

信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。

  • 每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来;
  • 当遇到计数器大于信号量大小时,会进入休眠等待其他线程释放信号;

Go 语言的扩展包中就提供了带权重的信号量 golang/sync/semaphore.Weighted,我们可以按照不同的权重对资源的访问进行管理,这个结构体对外也只暴露了四个方法:

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// waiter 结构体表示一个等待获取信号量的请求。
type waiter struct {
n int64 // 请求的权重(即需要的资源数量)
ready chan<- struct{} // 当信号量获取成功时,关闭该通道以通知等待者
}

// NewWeighted 创建一个新的加权信号量,允许的最大并发访问权重为 n。
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}

// Weighted 结构体提供了一种限制并发访问资源的方式。
// 调用者可以请求带有特定权重的访问权限。
type Weighted struct {
size int64 // 信号量的总容量
cur int64 // 当前已分配的权重
mu sync.Mutex // 互斥锁,用于保护对 cur 和 waiters 的并发访问
waiters list.List // 等待队列,存储等待获取信号量的请求
}

// Acquire 方法尝试获取权重为 n 的信号量,阻塞直到资源可用或 ctx 被取消。
// 成功时返回 nil,失败时返回 ctx.Err() 并且信号量保持不变。
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
done := ctx.Done() // 获取 ctx 的 done 通道,用于监听取消信号

s.mu.Lock() // 加锁,保护对 cur 和 waiters 的访问
select {
case <-done:
// 如果 ctx 已经被取消(无论是调用前还是等待锁时),则直接返回错误
s.mu.Unlock()
return ctx.Err()
default:
}
if s.size-s.cur >= n && s.waiters.Len() == 0 {
// 如果当前有足够的资源并且没有等待者,则直接分配资源
s.cur += n
s.mu.Unlock()
return nil
}

if n > s.size {
// 如果请求的权重超过了信号量的总容量,则直接返回错误
s.mu.Unlock()
<-done // 等待 ctx 被取消
return ctx.Err()
}

// 创建一个通道,用于通知等待者资源已分配
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w) // 将等待者加入等待队列
s.mu.Unlock()

select {
case <-done:
// 如果 ctx 被取消,则从等待队列中移除该请求
s.mu.Lock()
select {
case <-ready:
// 如果在取消后成功获取了信号量,则回滚资源分配
s.cur -= n
s.notifyWaiters() // 通知其他等待者
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// 如果该请求在队列的最前面并且有剩余资源,则通知其他等待者
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return ctx.Err()

case <-ready:
// 成功获取信号量,检查 ctx 是否已被取消
select {
case <-done:
// 如果 ctx 已被取消,则释放资源并返回错误
s.Release(n)
return ctx.Err()
default:
}
return nil
}
}

// TryAcquire 方法尝试获取权重为 n 的信号量,但不会阻塞。
// 成功时返回 true,失败时返回 false 并且信号量保持不变。
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0 // 检查是否有足够的资源
if success {
s.cur += n // 分配资源
}
s.mu.Unlock()
return success
}

// Release 方法释放权重为 n 的信号量。
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n // 释放资源
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held") // 如果释放的资源超过持有的资源,则 panic
}
s.notifyWaiters() // 通知等待者
s.mu.Unlock()
}

// notifyWaiters 方法通知等待队列中的等待者,尝试分配资源。
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // 如果没有等待者,则退出循环
}

w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 如果没有足够的资源分配给下一个等待者,则退出循环
// 这是为了避免大请求的饥饿问题
break
}

s.cur += w.n // 分配资源
s.waiters.Remove(next) // 从等待队列中移除该等待者
close(w.ready) // 关闭通道,通知等待者资源已分配
}
}

小结

带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的唯一一种信号量实现,在使用的过程中我们需要注意以下的几个问题:

扩展包singleflight

golang/sync/singleflight.Group 是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时,通过singleflight我们能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。。

golang-query-without-single-flight

示例

相关示例代码:https://github.com/longpi1/gopkg/blob/main/libary/singleslight/singleflight.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type service struct {
requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := s.requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // 访问缓存、数据库
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// call 表示一个正在执行或已完成的 singleflight.Do 调用。
type call struct {
wg sync.WaitGroup // 用于等待当前调用完成的 WaitGroup

// 以下字段在 WaitGroup 完成之前只会被写入一次,
// 并且在 WaitGroup 完成后只会被读取。
val interface{} // 调用返回的结果值
err error // 调用返回的错误

// forgotten 表示在调用仍在执行时是否调用了 Forget 方法。
forgotten bool

// 以下字段在 WaitGroup 完成之前会被读写(需要加锁),
// 在 WaitGroup 完成后只会被读取。
dups int // 当前调用的重复次数(即有多少个并发的 Do 请求在等待此调用)
chans []chan<- Result // 用于通知等待结果的 channel 列表
}

// Group 表示一类工作,并形成一个命名空间,
// 在这个命名空间中,工作单元可以以去重的方式执行。
type Group struct {
mu sync.Mutex // 保护 m 的互斥锁
m map[string]*call // 按 key 存储的 call 结构,延迟初始化
}

// Result 保存 Do 方法的结果,以便可以通过 channel 传递。
type Result struct {
Val interface{} // 调用返回的结果值
Err error // 调用返回的错误
Shared bool // 结果是否被多个调用者共享
}

// Do 执行给定的函数并返回其结果,确保同一时间只有一个 key 对应的调用在执行。
// 如果有重复的调用,重复的调用者会等待原始调用完成并接收相同的结果。
// 返回值 shared 表示结果是否被多个调用者共享。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call) // 延迟初始化 map
}
if c, ok := g.m[key]; ok {
c.dups++ // 增加重复计数
g.mu.Unlock()
c.wg.Wait() // 等待原始调用完成
return c.val, c.err, true // 返回原始调用的结果
}
c := new(call)
c.wg.Add(1) // 设置 WaitGroup 为 1,表示有一个调用正在执行
g.m[key] = c // 将 call 存入 map
g.mu.Unlock()

g.doCall(c, key, fn) // 执行实际调用
return c.val, c.err, c.dups > 0 // 返回调用结果,并指示是否共享
}

// DoChan 类似于 Do,但返回一个 channel,当结果准备好时会通过该 channel 发送。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1) // 创建一个带缓冲的 channel
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call) // 延迟初始化 map
}
if c, ok := g.m[key]; ok {
c.dups++ // 增加重复计数
c.chans = append(c.chans, ch) // 将 channel 添加到通知列表
g.mu.Unlock()
return ch // 返回 channel
}
c := &call{chans: []chan<- Result{ch}} // 创建新的 call,并初始化 channel 列表
c.wg.Add(1) // 设置 WaitGroup 为 1
g.m[key] = c // 将 call 存入 map
g.mu.Unlock()

go g.doCall(c, key, fn) // 异步执行实际调用

return ch // 返回 channel
}

// doCall 处理单个 key 的调用。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn() // 执行函数并保存结果
c.wg.Done() // 标记调用完成

g.mu.Lock()
if !c.forgotten {
delete(g.m, key) // 如果未被遗忘,则从 map 中删除
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0} // 向所有等待的 channel 发送结果
}
g.mu.Unlock()
}

// Forget 告诉 singleflight 忘记某个 key。
// 未来对此 key 的 Do 调用将直接执行函数,而不是等待之前的调用完成。
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true // 标记为遗忘
}
delete(g.m, key) // 从 map 中删除
g.mu.Unlock()
}

小结

当我们需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Group 来增加吞吐量和服务质量,不过在使用的过程中我们也需要注意以下的几个问题:

golang/sync/singleflight.Group 提供了两个用于抑制相同请求的方法:

这两个方法在功能上没有太多的区别,分别提供了同步和异步的调用方式。

总结

这一节中介绍了 Go 语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语能够帮助我们更好地利用 Go 语言的特性构建高吞吐量、低延时的服务、解决并发带来的问题。

在设计同步原语时,我们不仅要考虑 API 接口的易用、解决并发编程中可能遇到的线程竞争问题,还需要对尾延时进行、优化保证公平性,理解同步原语也是我们理解并发编程无法跨越的一个步骤。

参考链接

1.3.Go 语言设计与实现


12.Golang 锁 sync包 源码分析 (三、Once、Cond、semaphore、singleflight)
https://blog.longpi1.com/2025/01/11/12-Golang-锁-sync包-源码分析-(三、Once、Cond、semaphore、singleflight)/