Golang 锁 sync包 源码分析 (三、Once、Cond、semaphore、singleflight)
注意当前go版本代码为1.23
sync包 源码分析包主要介绍 Go 语言中常见的同步原语 sync.Mutex
、sync.RWMutex
、sync.WaitGroup
、sync.Once
和 sync.Cond
以及扩展原语 golang/sync/errgroup.Group
、golang/sync/semaphore.Weighted
和 golang/sync/singleflight.Group
的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。
Once
示例
1 2 3 4 5 6 7 8 9 10 11
| func main() { o := &sync.Once{} for i := 0; i < 10; i++ { o.Do(func() { fmt.Println("only once") }) } }
$ go run main.go 输出:only once
|
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
type Once struct { done atomic.Uint32
m Mutex }
func (o *Once) Do(f func()) { if o.done.Load() == 0 { o.doSlow(f) } }
func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock()
if o.done.Load() == 0 { defer o.done.Store(1) f() } }
|
Go 语言标准库中 sync.Once
可以保证在 Go 程序运行期间的某段代码只会执行一次。
sync.Once.Do
是 sync.Once
结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:
sync.Once
会通过成员变量 done
设置为1确保函数不会执行第二次。
Cond
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| var status int64
func main() { c := sync.NewCond(&sync.Mutex{}) for i := 0; i < 5; i++ { go listen(c) } time.Sleep(1 * time.Second) go broadcast(c) }
func broadcast(c *sync.Cond) { c.L.Lock() atomic.StoreInt64(&status, 1) c.Broadcast() c.L.Unlock() }
func listen(c *sync.Cond) { c.L.Lock() for atomic.LoadInt64(&status) != 1 { c.Wait() } fmt.Println("listen") c.L.Unlock() }
$ go run main.go 输出: listen listen listen listen listen
|
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| type Cond struct { noCopy noCopy
L Locker
notify notifyList checker copyChecker }
func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() }
func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) }
func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) }
func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } }
func notifyListWait(l *notifyList, t uint32) { s := acquireSudog() s.g = getg() s.ticket = t if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) releaseSudog(s) }
func notifyListNotifyAll(l *notifyList) { s := l.head l.head = nil l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
|
sync.Cond
不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {}
进行忙碌等待相比,sync.Cond
能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:
扩展包semaphore
信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。
- 每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来;
- 当遇到计数器大于信号量大小时,会进入休眠等待其他线程释放信号;
Go 语言的扩展包中就提供了带权重的信号量 golang/sync/semaphore.Weighted
,我们可以按照不同的权重对资源的访问进行管理,这个结构体对外也只暴露了四个方法:
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| type waiter struct { n int64 ready chan<- struct{} }
func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w }
type Weighted struct { size int64 cur int64 mu sync.Mutex waiters list.List }
func (s *Weighted) Acquire(ctx context.Context, n int64) error { done := ctx.Done()
s.mu.Lock() select { case <-done: s.mu.Unlock() return ctx.Err() default: } if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil }
if n > s.size { s.mu.Unlock() <-done return ctx.Err() }
ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock()
select { case <-done: s.mu.Lock() select { case <-ready: s.cur -= n s.notifyWaiters() default: isFront := s.waiters.Front() == elem s.waiters.Remove(elem) if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return ctx.Err()
case <-ready: select { case <-done: s.Release(n) return ctx.Err() default: } return nil } }
func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
func (s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } s.notifyWaiters() s.mu.Unlock() }
func (s *Weighted) notifyWaiters() { for { next := s.waiters.Front() if next == nil { break }
w := next.Value.(waiter) if s.size-s.cur < w.n { break }
s.cur += w.n s.waiters.Remove(next) close(w.ready) } }
|
小结
带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的唯一一种信号量实现,在使用的过程中我们需要注意以下的几个问题:
扩展包singleflight
golang/sync/singleflight.Group
是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时,通过singleflight我们能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。。
示例
相关示例代码:https://github.com/longpi1/gopkg/blob/main/libary/singleslight/singleflight.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| type service struct { requestGroup singleflight.Group }
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) { v, err, _ := s.requestGroup.Do(request.Hash(), func() (interface{}, error) { rows, err := if err != nil { return nil, err } return rows, nil }) if err != nil { return nil, err } return Response{ rows: rows, }, nil }
|
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| type call struct { wg sync.WaitGroup
val interface{} err error
forgotten bool
dups int chans []chan<- Result }
type Group struct { mu sync.Mutex m map[string]*call }
type Result struct { Val interface{} Err error Shared bool }
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock()
g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock()
go g.doCall(c, key, fn)
return ch }
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { c.val, c.err = fn() c.wg.Done()
g.mu.Lock() if !c.forgotten { delete(g.m, key) } for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } g.mu.Unlock() }
func (g *Group) Forget(key string) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete(g.m, key) g.mu.Unlock() }
|
小结
当我们需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Group
来增加吞吐量和服务质量,不过在使用的过程中我们也需要注意以下的几个问题:
golang/sync/singleflight.Group
提供了两个用于抑制相同请求的方法:
这两个方法在功能上没有太多的区别,分别提供了同步和异步的调用方式。
总结
这一节中介绍了 Go 语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语能够帮助我们更好地利用 Go 语言的特性构建高吞吐量、低延时的服务、解决并发带来的问题。
在设计同步原语时,我们不仅要考虑 API 接口的易用、解决并发编程中可能遇到的线程竞争问题,还需要对尾延时进行、优化保证公平性,理解同步原语也是我们理解并发编程无法跨越的一个步骤。
参考链接
1.3.Go 语言设计与实现