Golang 锁 sync包 源码分析 (二、RWMutex、WaitGroup )
注意当前go版本代码为1.23
sync包 源码分析包主要介绍 Go 语言中常见的同步原语 sync.Mutex
、sync.RWMutex
、sync.WaitGroup
、sync.Once
和 sync.Cond
以及扩展原语 golang/sync/errgroup.Group
、golang/sync/semaphore.Weighted
和 golang/sync/singleflight.Group
的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。
定义
RWMutex
读写互斥锁 sync.RWMutex
是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
1 2 3 4 5 6 7 8
| type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount atomic.Int32 readerWait atomic.Int32 }
|
RWMutex写锁
加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 { runtime_SemacquireRWMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }
|
- 调用结构体持有的
sync.Mutex
结构体的sync.Mutex.Lock
阻塞后续的写操作;
- 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
- 调用
sync/atomic.AddInt32
函数阻塞后续的读操作:
- 如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用
runtime.sync_runtime_SemacquireMutex
进入休眠状态等待所有读锁所有者执行结束后释放 writerSem
信号量将当前协程唤醒;
解锁
sync.RWMutex.Unlock
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() }
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders { race.Enable() fatal("sync: Unlock of unlocked RWMutex") } for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }
rw.w.Unlock()
if race.Enabled { race.Enable() }
}
|
与加锁的过程正好相反,写锁的释放主要为以下几个执行:
- 调用
sync/atomic.AddInt32
函数将 readerCount
变回正数,释放读锁;
- 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
- 调用
sync.Mutex.Unlock
释放写锁;
RWMutex读锁
加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } if rw.readerCount.Add(1) < 0 { runtime_SemacquireRWMutexR(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
|
解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := rw.readerCount.Add(-1); r < 0 { rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } }
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() fatal("sync: RUnlock of unlocked RWMutex") }
if rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) } }
|
WaitGroup
sync.WaitGroup
用于等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
结构体
1 2 3 4 5 6 7 8 9 10 11 12
|
type WaitGroup struct { noCopy noCopy
state atomic.Uint64
sema uint32 }
|
Add方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
func (wg *WaitGroup) Add(delta int) { if race.Enabled { if delta < 0 { race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() }
state := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) { race.Read(unsafe.Pointer(&wg.sema)) }
if v < 0 { panic("sync: negative WaitGroup counter") }
if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }
if v > 0 || w == 0 { return }
if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }
wg.state.Store(0)
for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) } }
|
Done方法
1 2 3 4
| func (wg *WaitGroup) Done() { wg.Add(-1) }
|
Wait 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func (wg *WaitGroup) Wait() { if race.Enabled { race.Disable() }
for { state := wg.state.Load() v := int32(state >> 32) w := uint32(state)
if v == 0 { if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return }
if wg.state.CompareAndSwap(state, state+1) { if race.Enabled && w == 0 { race.Write(unsafe.Pointer(&wg.sema)) }
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") }
if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }
|
小结
state
字段:state
是一个 64 位的原子整数,高 32 位表示计数器(counter
),低 32 位表示等待的 goroutine 数量(waiter count
)。
Add
方法:用于增加或减少计数器。如果计数器变为零,所有等待的 goroutine 将被唤醒。如果计数器变为负数,会引发 panic。
Done
方法:是 Add(-1)
的简写,用于减少计数器。
Wait
方法:阻塞当前 goroutine,直到计数器变为零。如果计数器已经为零,则立即返回。
- 竞态检测:代码中包含了竞态检测的逻辑,用于在竞态检测工具(如
-race
)启用时检测并发问题。
sync.WaitGroup
必须在 sync.WaitGroup.Wait
方法返回之后才能被重新使用;
- 可以同时有多个 Goroutine 等待当前
sync.WaitGroup
计数器的归零,这些 Goroutine 会被同时唤醒;
参考链接
1.3.Go 语言设计与实现