10.Golang 锁 sync包 源码分析 (一、Mutex加锁解锁)

Golang 锁 sync包 源码分析 (一、Mutex加锁解锁)

注意当前go版本代码为1.23

sync包 源码分析包主要介绍 Go 语言中常见的同步原语 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond 以及扩展原语 golang/sync/errgroup.Groupgolang/sync/semaphore.Weightedgolang/sync/singleflight.Group 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Mutex struct {
state int32 // mutex 的状态
sema uint32 // 用于阻塞/唤醒 goroutine 的信号量
}

type Locker interface {
Lock()
Unlock()
}


const (
mutexLocked = 1 << iota // mutex 已锁定
mutexWoken // 有 waiter 被唤醒
mutexStarving // mutex 处于饥饿模式
mutexWaiterShift = iota // waiter 数量的偏移量
starvationThresholdNs = 1e6 // 切换到饥饿模式的阈值,单位纳秒
)

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被从唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;

正常模式和饥饿模式

sync.Mutex 有两种模式 — 正常模式和饥饿模式。

在正常模式下,waiter 按照 FIFO 顺序排队,但是被唤醒的 waiter并不拥有 mutex,而是与新到达的 goroutine 竞争 mutex 的所有权。新到达的 goroutine 具有优势 – 它们已经在 CPU 上运行,并且可能有很多,因此被唤醒的 waiter 很有可能竞争失败。在这种情况下,它会被重新排到等待队列的前面。为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。

// 在饥饿模式下,mutex 的所有权直接从解锁的 goroutine 移交给队列最前面的 waiter。
// 新到达的 goroutine 不会尝试获取 mutex,即使它看起来是解锁的,
// 并且不会尝试自旋。相反,它们会将自己排到等待队列的末尾。
//
// 如果一个 waiter 获得了 mutex 的所有权,并且看到
// (1) 它是队列中的最后一个 waiter,或者 (2) 它等待的时间少于 1ms,
// it switches mutex back to normal operation mode.
// mutex 将切换回正常操作模式。
// 正常模式具有更好的性能,因为一个 goroutine 可以连续多次获取mutex,即使有阻塞的 waiter。
// 饥饿模式对于防止尾部延迟的病态情况非常重要。

加锁和解锁

sync.Mutex.Locksync.Mutex.Unlock 方法。

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Lock 方法用于获取互斥锁。
func (m *Mutex) Lock() {
// 快速路径:尝试直接获取未锁定的互斥锁。
// 使用原子操作 CompareAndSwapInt32 来尝试将 m.state 从 0 修改为 mutexLocked。
// 如果成功,说明当前 goroutine 成功获取了锁,直接返回。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// 如果启用了竞态检测,记录当前 goroutine 获取了锁。
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 慢速路径:如果快速路径失败,调用 lockSlow 方法进行更复杂的锁获取逻辑。
// 将慢速路径单独放在一个函数中,以便快速路径可以被内联优化。
m.lockSlow()
}


// lockSlow 方法用于处理锁获取的慢速路径。
// 1. 判断当前 Goroutine 能否进入自旋;
// 2. 通过自旋等待互斥锁的释放;
// 3. 计算互斥锁的最新状态;
// 4. 更新互斥锁的状态并获取锁;
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 记录当前 goroutine 开始等待的时间
starving := false // 标记当前 goroutine 是否处于饥饿模式
awoke := false // 标记当前 goroutine 是否被唤醒
iter := 0 // 记录自旋次数
old := m.state // old 存储互斥锁的旧状态。

for {
// 如果互斥锁已被锁定且不处于饥饿模式,并且当前 goroutine 可以进行自旋,
// 则尝试通过自旋来获取锁。
// runtime.sync_runtime_canSpin 返回 true的前提
// 运行在多 CPU 的机器上;
// 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
// 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 尝试设置 mutexWoken 标志位,通知 Unlock 方法不要唤醒其他阻塞的 goroutine。
// 这样做是为了尝试将互斥锁“传递”给当前正在自旋的 goroutine,减少上下文切换。
// 只有在当前 goroutine 未被唤醒,且 mutexWoken 位未被设置,
// 并且存在等待的 goroutine 时,才尝试设置 mutexWoken。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 执行一次自旋操作,让出 CPU 时间片。
runtime_doSpin()
// 增加自旋次数。
iter++
// 更新互斥锁的状态。
continue // 继续循环
}

new := old // 创建一个新的状态变量,用于后续的 CAS 操作

// 如果互斥锁不处于饥饿模式,则尝试获取锁。
if old&mutexStarving == 0 {
new |= mutexLocked
}

// 如果互斥锁已被锁定或处于饥饿模式,则增加等待者计数。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}

// 如果当前 goroutine 处于饥饿模式,并且互斥锁已被锁定,
// 则将互斥锁切换到饥饿模式。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}

// 如果当前 goroutine 已被唤醒,则需要重置 mutexWoken 标志。
if awoke {
// 如果新的状态中没有 mutexWoken 标志,则抛出异常。
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 重置 mutexWoken 标志
}

// 尝试通过 CAS 操作将互斥锁的状态从 old 修改为 new。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果互斥锁未被锁定且不处于饥饿模式,则当前 goroutine 成功获取了锁,退出循环。
if old&(mutexLocked|mutexStarving) == 0 {
break
}

// 如果当前 goroutine 之前已经在等待,则将其放在等待队列的前面。
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime() // 记录开始等待的时间
}

// 阻塞当前 goroutine,等待信号量唤醒。
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

// 如果当前 goroutine 等待的时间超过了饥饿阈值,则将其标记为饥饿模式。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state // 重新获取当前互斥锁的状态

// 如果互斥锁处于饥饿模式,则处理饥饿模式下的逻辑。
if old&mutexStarving != 0 {
// 如果当前 goroutine 被唤醒且互斥锁处于饥饿模式,
// 但互斥锁的状态不一致(例如,mutexLocked 未设置或等待者计数为 0),则抛出异常。
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}

// 计算状态增量,用于退出饥饿模式。
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 如果当前 goroutine 不再饥饿,或者等待者计数为 1,则退出饥饿模式。
delta -= mutexStarving
}

// 更新互斥锁的状态,退出饥饿模式。
atomic.AddInt32(&m.state, delta)
break
}

// 当前 goroutine 已被唤醒,重置 awoke 和 iter 标志。
awoke = true
iter = 0
} else {
// 如果 CAS 操作失败,则重新获取当前互斥锁的状态,继续循环。
old = m.state
}
}

// 如果启用了竞态检测,记录当前 goroutine 获取了锁。
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}



解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Unlock 解锁 Mutex。
// 如果 m 没有被锁定,则会抛出运行时错误。
func (m *Mutex) Unlock() {
// 如果启用了竞态检测器。
if race.Enabled {
_ = m.state // 对 m.state 的访问,用于竞态检测。确保在 Release 之前读取。
race.Release(unsafe.Pointer(m)) // 通知竞态检测器 Mutex 已被释放。
}

// 快速路径:清除锁定位。
// 使用原子操作将状态值减去 mutexLocked (1)。
// 如果新的状态值不为 0,说明还有其他位被设置(例如,有等待者或处于饥饿模式)。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 外联的慢速路径,允许内联快速路径。
// 为了在追踪 GoUnblock 时隐藏 unlockSlow,我们跳过一个额外的栈帧。
// 这样做可以使追踪信息更简洁,只显示关键的阻塞/唤醒操作。
m.unlockSlow(new)
}
}

// unlockSlow 是 Unlock 的慢速路径。
// 当解锁时,Mutex 的状态不仅仅是锁定状态时,会调用此函数。
func (m *Mutex) unlockSlow(new int32) {
// 如果 (new + mutexLocked) & mutexLocked == 0,意味着在解锁前 Mutex 就没有被锁定。
// 因为在 Unlock 的快速路径中,我们已经减去了 mutexLocked,
// 如果解锁前没有被锁定,那么 new 应该就是 0 或者负数(如果存在等待者或者处于饥饿模式)。
// 加上 mutexLocked 后,如果原先没被锁定,结果与 mutexLocked 进行与运算应该为 0。
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex") // 抛出致命错误,因为尝试解锁一个未锁定的 Mutex。
}
// 如果没有处于饥饿模式。
if new&mutexStarving == 0 {
old := new // 保存当前状态值。
for {
// 如果没有等待者,或者一个 goroutine 已经醒来或已经获取了锁,则不需要唤醒任何人。
// 在饥饿模式下,所有权直接从解锁的 goroutine 移交给下一个等待者。
// 我们不属于这个链条,因为我们在解锁 Mutex 时没有观察到 mutexStarving。
// 所以让开道路。
// old>>mutexWaiterShift == 0 表示没有等待者。
// old&(mutexLocked|mutexWoken|mutexStarving) != 0 表示 Mutex 仍然被锁定,或者有 goroutine 正在被唤醒,或者处于饥饿模式。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return // 直接返回,无需唤醒任何 goroutine。
}
// 争夺唤醒某人的权利。
// 将等待者计数减 1,并设置唤醒位。
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 使用原子操作比较并交换 Mutex 的状态。
// 如果当前状态仍然是 old,则将其更新为 new,表示我们成功获得了唤醒的权利。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 释放一个信号量,唤醒一个等待的 goroutine。
// false 表示不是在持有锁的情况下释放信号量。
// 1 表示唤醒一个等待者。
runtime_Semrelease(&m.sema, false, 1)
return // 返回,表示已成功唤醒一个等待者。
}
// 如果 CAS 失败,说明状态已被其他 goroutine 修改,重新读取当前状态并重试。
old = m.state
}
} else {
// 饥饿模式:将 Mutex 的所有权移交给下一个等待者,并让出我们的时间片,
// 以便下一个等待者可以立即开始运行。
// 注意:mutexLocked 没有被设置,等待者会在唤醒后设置它。
// 但是如果设置了 mutexStarving,Mutex 仍然被认为是锁定的,
// 因此新的 goroutine 不会获取它。
// true 表示在持有锁的情况下释放信号量(虽然逻辑上锁已经释放,但在饥饿模式下,
// 我们希望直接将锁交给下一个等待者,所以使用 true 可以确保公平性,
// 避免其他 goroutine 抢占到锁)。
// 1 表示唤醒一个等待者。
runtime_Semrelease(&m.sema, true, 1)
}
}

总结

本篇文章主要从加锁和解锁两个部分的源码进行分析。

互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;

互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当互斥锁已经被解锁时,调用 sync.Mutex.Unlock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;

参考链接

1.3.Go 语言设计与实现


10.Golang 锁 sync包 源码分析 (一、Mutex加锁解锁)
https://blog.longpi1.com/2025/01/04/10-Golang-锁-sync包-源码分析-(一、Mutex加锁解锁)/