11.Golang 锁 sync包 源码分析 (二、RWMutex、WaitGroup )

Golang 锁 sync包 源码分析 (二、RWMutex、WaitGroup )

注意当前go版本代码为1.23

sync包 源码分析包主要介绍 Go 语言中常见的同步原语 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond 以及扩展原语 golang/sync/errgroup.Groupgolang/sync/semaphore.Weightedgolang/sync/singleflight.Group 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。

定义

RWMutex

读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

Y N
N N
1
2
3
4
5
6
7
8
type RWMutex struct {
w Mutex // 复用互斥锁提供的能力;
writerSem uint32 // 用于写等待读
readerSem uint32 // 用于读等待写
readerCount atomic.Int32 // 存储了当前正在执行的读操作数量;
readerWait atomic.Int32 // 表示当写操作被阻塞时等待的读操作个数
}

RWMutex写锁

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state // 访问 rw.w 的状态,用于 race 检测
race.Disable() // 禁用 race 检测,因为我们即将进入临界区
}
// 首先,解决与其他写锁的竞争。
rw.w.Lock() // 获取内部互斥锁 w,这会阻塞其他写操作。
// 这确保了在任何时候只有一个写操作可以持有写锁。

// 向读者宣告有一个正在等待的写者。
// 通过从 readerCount 中减去 rwmuxMaxReaders 来实现。
// rwmuxMaxReaders 是一个很大的值,使得 readerCount 变为负数或接近负数。
// 这会通知正在尝试获取读锁的 goroutine,当前有写锁正在等待。
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// 此时,r 的值等于在添加操作之前 readerCount 的值。
// 也就是说,r 代表了在写锁尝试获取时,活跃的读者的数量。

// 等待活跃的读者完成。
// 如果在写锁尝试获取时,仍然有活跃的读者 (r != 0),
// 并且我们递增 readerWait 计数器,并且返回值不为 0,
// 那么当前写锁的 goroutine 需要等待。
// readerWait 记录了需要等待的读者数量。
if r != 0 && rw.readerWait.Add(r) != 0 {
// runtime_SemacquireRWMutex 是一个 runtime 包提供的函数,
// 用于阻塞当前的 goroutine,直到收到来自读者的信号。
// &rw.writerSem 是用于写锁的信号量。
// false 表示这不是一个可取消的等待。
// 0 表示没有超时时间。
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable() // 重新启用 race 检测
race.Acquire(unsafe.Pointer(&rw.readerSem)) // 通知 race 检测器我们获得了读信号量
race.Acquire(unsafe.Pointer(&rw.writerSem)) // 通知 race 检测器我们获得了写信号量
}
}
  1. 调用结构体持有的sync.Mutex结构体的sync.Mutex.Lock阻塞后续的写操作;
    • 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
  2. 调用 sync/atomic.AddInt32 函数阻塞后续的读操作:
  3. 如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒;

解锁

sync.RWMutex.Unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state // 访问 rw.w 的状态,用于 race 检测
race.Release(unsafe.Pointer(&rw.readerSem)) // 通知 race 检测器我们释放了读信号量
race.Disable() // 禁用 race 检测,因为我们即将离开临界区
}

// 向读者宣告没有活跃的写者。
// 通过向 readerCount 中加上 rwmuxMaxReaders 来实现。
// 这会将 readerCount 的值恢复到非负值,表示没有写锁持有。
r := rw.readerCount.Add(rwmutexMaxReaders)
// 此时,r 的值代表了在添加操作之后 readerCount 的值。

// 如果 r 的值大于等于 rwmuxMaxReaders,
// 说明在调用 Unlock 之前,readerCount 的值就已经是大于等于 0 的,
// 这意味着没有写锁被持有,发生了对未锁定 RWMutex 的解锁操作。
if r >= rwmutexMaxReaders {
race.Enable() // 重新启用 race 检测
fatal("sync: Unlock of unlocked RWMutex") // 抛出致命错误
}
// 解除阻塞被阻塞的读者,如果有的话。
// 循环次数为之前等待的读者数量,即 r 的低位部分。
// 因为 readerCount 可能很大,只有低位部分才是实际等待的读者数量。
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}

// 释放写锁的互斥锁,允许其他写者获取写锁。
rw.w.Unlock()

// 如果启用了竞态检测,重新启用竞态检测。
if race.Enabled {
race.Enable()
}

}

与加锁的过程正好相反,写锁的释放主要为以下几个执行:

  1. 调用 sync/atomic.AddInt32 函数将 readerCount 变回正数,释放读锁;
  2. 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
  3. 调用 sync.Mutex.Unlock 释放写锁;

RWMutex读锁

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// RLock 获取读锁。
// 如果没有其他协程持有写锁,则允许当前协程持有读锁。
// 可以有多个协程同时持有读锁。
func (rw *RWMutex) RLock() {
if race.Enabled {
// 如果启用了竞态检测,则访问 rw.w.state,这会触发竞态检测器去检查是否有并发的写操作。
_ = rw.w.state
// 在尝试获取锁之前,禁用竞态检测,以避免竞态检测器自身导致的虚假报告。
race.Disable()
}
// readerCount 原子地增加 1,表示当前有一个新的读者获取了读锁。
// 如果增加后的值小于 0,则表示当前有写锁被持有或者有写锁在等待获取,
// 此时读者需要等待。
if rw.readerCount.Add(1) < 0 {
// 有一个写锁正在等待或者已经被持有,当前读者需要等待。
// runtime_SemacquireRWMutexR 是一个运行时函数,用于阻塞当前协程,
// 直到 readerSem 信号量变为可用(即写锁被释放)。
// 第二个参数 false 表示这不是一个可中断的等待。
// 第三个参数 0 表示没有特定的超时时间。
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
if race.Enabled {
// 在成功获取读锁后,重新启用竞态检测。
race.Enable()
// 通知竞态检测器,当前协程已经获取了 readerSem 信号量。
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// RUnlock 释放一个 [RWMutex.RLock] 调用持有的读锁;
// 它不会影响其他同时持有读锁的协程。
// 如果在调用 RUnlock 时 rw 没有被读锁定,则会发生运行时错误。
func (rw *RWMutex) RUnlock() {
if race.Enabled {
// 如果启用了竞态检测,则访问 rw.w.state,这会触发竞态检测器去检查是否有并发的写操作。
_ = rw.w.state
// 通知竞态检测器,当前协程正在释放 writerSem 信号量。
// ReleaseMerge 表示这是一个读锁释放,可能会导致等待的写锁被唤醒。
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
// 在执行解锁操作期间,禁用竞态检测。
race.Disable()
}
// 减少 readerCount 的值,表示当前有一个读锁被释放。
// 如果返回值小于 0,说明当前有写锁在等待,需要进入慢路径处理。
if r := rw.readerCount.Add(-1); r < 0 {
// 外联的慢路径解锁逻辑,目的是让快速路径能够被内联优化。
rw.rUnlockSlow(r)
}
if race.Enabled {
// 在解锁操作完成后,重新启用竞态检测。
race.Enable()
}
}

// rUnlockSlow 是 RUnlock 的慢路径实现。
// 当 readerCount 变为负数时调用,表示可能存在错误或者有写锁在等待。
func (rw *RWMutex) rUnlockSlow(r int32) {
// 如果 r+1 == 0 或 r+1 == -rwmutexMaxReaders,说明 RWMutex 没有被读锁锁定。
// 这是一个运行时错误,会触发 fatal 错误。
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable() // 重新启用竞态检测
fatal("sync: RUnlock of unlocked RWMutex") // 触发 fatal 错误
}

// 减少 readerWait 的值,表示当前有一个读锁被释放。
// 如果 readerWait 的值变为 0,说明所有读锁都已释放,可以唤醒等待的写锁。
if rw.readerWait.Add(-1) == 0 {
// 最后一个读锁释放,唤醒等待的写锁。
// runtime_Semrelease 是一个底层函数,用于唤醒等待的写锁。
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

WaitGroup

sync.WaitGroup 用于等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:

结构体

1
2
3
4
5
6
7
8
9
10
11
12
// WaitGroup 用于等待一组 goroutine 完成。
// 它内部维护了一个计数器,用于跟踪尚未完成的 goroutine 数量。
type WaitGroup struct {
noCopy noCopy // 用于防止 WaitGroup 被复制的标记

// state 是一个原子操作的 64 位整数,高 32 位表示计数器(counter),
// 低 32 位表示等待的 goroutine 数量(waiter count)。
state atomic.Uint64

// sema 是一个信号量,用于阻塞和唤醒等待的 goroutine。
sema uint32
}

Add方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Add 方法将 delta(可以是负数)添加到 WaitGroup 的计数器。
// 如果计数器变为零,所有阻塞在 Wait 上的 goroutine 将被释放。
// 如果计数器变为负数,Add 会引发 panic。
//
// 注意:
// 1. 当计数器为零时,带有正 delta 的 Add 调用必须在 Wait 之前执行。
// 2. 带有负 delta 的 Add 调用,或者在计数器大于零时开始的带有正 delta 的 Add 调用,-可以在任何时候执行。
// 3. 通常这意味着 Add 调用应该在创建 goroutine 或其他等待事件的语句之前执行。
// 4. 如果 WaitGroup 被重用于等待多个独立的事件集合,新的 Add 调用必须在前一个 Wait 调用返回之后执行。
func (wg *WaitGroup) Add(delta int) {
if race.Enabled { // 如果启用了竞态检测
if delta < 0 {
// 同步减量操作与 Wait 操作。
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable() // 禁用竞态检测
defer race.Enable() // 在函数返回时重新启用竞态检测
}

// 将 delta 左移 32 位并添加到 state 中,返回更新后的 state。因为state 是一个 64 位的原子整数,高 32 位表示计数器(counter),低 32 位表示等待的 goroutine 数量(waiter count)。
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32) // 获取计数器(高 32 位)
w := uint32(state) // 获取等待的 goroutine 数量(低 32 位)

if race.Enabled && delta > 0 && v == int32(delta) {
// 第一次增量操作必须与 Wait 同步。
// 需要将其建模为读操作,因为可能有多个并发的 wg.counter 从 0 开始转换。
race.Read(unsafe.Pointer(&wg.sema))
}

if v < 0 {
panic("sync: negative WaitGroup counter") // 如果计数器为负,引发 panic
}

if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait") // 如果 Add 和 Wait 并发调用,引发 panic
}

if v > 0 || w == 0 {
return // 如果计数器大于零或没有等待的 goroutine,直接返回
}

// 当前 goroutine 已将计数器设置为零,且存在等待的 goroutine。
// 此时不能有并发的 state 修改:
// - Add 操作不能与 Wait 操作并发执行,
// - 如果 Wait 操作看到计数器为零,它不会增加等待的 goroutine 数量。
// 仍然进行一个简单的检查以检测 WaitGroup 的误用。
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait") // 如果 state 被并发修改,引发 panic
}

// 将等待的 goroutine 数量重置为零。
wg.state.Store(0)

// 释放所有等待的 goroutine。
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}

Done方法

1
2
3
4
// Done 方法将 WaitGroup 的计数器减一。
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Wait 方法阻塞,直到 WaitGroup 的计数器变为零。
func (wg *WaitGroup) Wait() {
if race.Enabled { // 如果启用了竞态检测
race.Disable() // 禁用竞态检测
}

for {
state := wg.state.Load() // 获取当前的 state
v := int32(state >> 32) // 获取计数器(高 32 位)
w := uint32(state) // 获取等待的 goroutine 数量(低 32 位)

if v == 0 {
// 如果计数器为零,无需等待。
if race.Enabled {
race.Enable() // 重新启用竞态检测
race.Acquire(unsafe.Pointer(wg)) // 获取竞态检测锁
}
return
}

// 尝试增加等待的 goroutine 数量。
if wg.state.CompareAndSwap(state, state+1) {
if race.Enabled && w == 0 {
// Wait 操作必须与第一次 Add 操作同步。
// 需要将其建模为写操作,以与 Add 中的读操作竞争。
// 因此,只能对第一个等待的 goroutine 进行写操作,
// 否则并发的 Wait 操作会相互竞争。
race.Write(unsafe.Pointer(&wg.sema))
}

// 阻塞当前 goroutine,直到信号量被释放。
runtime_Semacquire(&wg.sema)

if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned") // 如果 WaitGroup 被重用,引发 panic
}

if race.Enabled {
race.Enable() // 重新启用竞态检测
race.Acquire(unsafe.Pointer(wg)) // 获取竞态检测锁
}
return
}
}
}

小结

  1. state 字段state 是一个 64 位的原子整数,高 32 位表示计数器(counter),低 32 位表示等待的 goroutine 数量(waiter count)。
  2. Add 方法:用于增加或减少计数器。如果计数器变为零,所有等待的 goroutine 将被唤醒。如果计数器变为负数,会引发 panic。
  3. Done 方法:是 Add(-1) 的简写,用于减少计数器。
  4. Wait 方法:阻塞当前 goroutine,直到计数器变为零。如果计数器已经为零,则立即返回。
  5. 竞态检测:代码中包含了竞态检测的逻辑,用于在竞态检测工具(如 -race)启用时检测并发问题。
  6. sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;
  7. 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒;

参考链接

1.3.Go 语言设计与实现


11.Golang 锁 sync包 源码分析 (二、RWMutex、WaitGroup )
https://blog.longpi1.com/2025/01/06/11-Golang-锁-sync包-源码分析-(二、RWMutex、WaitGroup-)/