Golang 锁 sync包 源码分析 (一、Mutex加锁解锁)
注意当前go版本代码为1.23
sync包 源码分析包主要介绍 Go 语言中常见的同步原语 sync.Mutex
、sync.RWMutex
、sync.WaitGroup
、sync.Once
和 sync.Cond
以及扩展原语 golang/sync/errgroup.Group
、golang/sync/semaphore.Weighted
和 golang/sync/singleflight.Group
的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。
定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type Mutex struct { state int32 sema uint32 }
type Locker interface { Lock() Unlock() }
const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 )
|
在默认情况下,互斥锁的所有状态位都是 0,int32
中的不同位分别表示了不同的状态:
mutexLocked
— 表示互斥锁的锁定状态;
mutexWoken
— 表示从正常模式被从唤醒;
mutexStarving
— 当前的互斥锁进入饥饿状态;
正常模式和饥饿模式
sync.Mutex
有两种模式 — 正常模式和饥饿模式。
在正常模式下,waiter 按照 FIFO 顺序排队,但是被唤醒的 waiter并不拥有 mutex,而是与新到达的 goroutine 竞争 mutex 的所有权。新到达的 goroutine 具有优势 – 它们已经在 CPU 上运行,并且可能有很多,因此被唤醒的 waiter 很有可能竞争失败。在这种情况下,它会被重新排到等待队列的前面。为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
// 在饥饿模式下,mutex 的所有权直接从解锁的 goroutine 移交给队列最前面的 waiter。
// 新到达的 goroutine 不会尝试获取 mutex,即使它看起来是解锁的,
// 并且不会尝试自旋。相反,它们会将自己排到等待队列的末尾。
//
// 如果一个 waiter 获得了 mutex 的所有权,并且看到
// (1) 它是队列中的最后一个 waiter,或者 (2) 它等待的时间少于 1ms,
// it switches mutex back to normal operation mode.
// mutex 将切换回正常操作模式。
// 正常模式具有更好的性能,因为一个 goroutine 可以连续多次获取mutex,即使有阻塞的 waiter。
// 饥饿模式对于防止尾部延迟的病态情况非常重要。
加锁和解锁
sync.Mutex.Lock
和 sync.Mutex.Unlock
方法。
加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } m.lockSlow() }
func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state
for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ continue }
new := old
if old&mutexStarving == 0 { new |= mutexLocked }
if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift }
if starving && old&mutexLocked != 0 { new |= mutexStarving }
if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break }
queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state
if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") }
delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving }
atomic.AddInt32(&m.state, delta) break }
awoke = true iter = 0 } else { old = m.state } }
if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
|
解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) }
new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } }
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 1) } }
|
总结
本篇文章主要从加锁和解锁两个部分的源码进行分析。
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,会通过置位
mutexLocked
加锁;
- 如果互斥锁处于
mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE
指令消耗 CPU 时间等待锁的释放;
- 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
runtime.sync_runtime_SemacquireMutex
将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
- 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,调用
sync.Mutex.Unlock
会直接抛出异常;
- 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位;
- 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过
sync.runtime_Semrelease
唤醒对应的 Goroutine;
参考链接
1.3.Go 语言设计与实现