Golang信號(hào)量設(shè)計(jì)實(shí)現(xiàn)示例詳解
開篇
在我們此前的文章 Golang Mutex 原理解析 中曾提到過,Mutex 的底層結(jié)構(gòu)包含了兩個(gè)字段,state 和 sema:
type Mutex struct {
state int32
sema uint32
}
- state 代表互斥鎖的狀態(tài),比如是否被鎖定;
- sema 表示信號(hào)量,協(xié)程阻塞會(huì)等待該信號(hào)量,解鎖的協(xié)程釋放信號(hào)量從而喚醒等待信號(hào)量的協(xié)程。
這個(gè) sema 就是 semaphore 信號(hào)量的意思。Golang 協(xié)程之間的搶鎖,實(shí)際上爭(zhēng)搶給Locked賦值的權(quán)利,能給 Locked 置為1,就說明搶鎖成功。搶不到就阻塞等待 sema 信號(hào)量,一旦持有鎖的協(xié)程解鎖,那么等待的協(xié)程會(huì)依次被喚醒。
有意思的是,雖然 semaphore 在鎖的實(shí)現(xiàn)中起到了至關(guān)重要的作用,Golang 對(duì)信號(hào)量的實(shí)現(xiàn)卻是隱藏在 runtime 中,并沒有包含到標(biāo)準(zhǔn)庫(kù)里來,在 src 源碼中我們可以看到底層依賴的信號(hào)量相關(guān)函數(shù)。
// defined in package runtime // Semacquire waits until *s > 0 and then atomically decrements it. // It is intended as a simple sleep primitive for use by the synchronization // library and should not be used directly. func runtime_Semacquire(s *uint32) // Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. // If handoff is true, pass count directly to the first waiter. // skipframes is the number of frames to omit during tracing, counting from // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
- runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻將 s 減去 1【原子操作】;
- runtime_Semrelease:將 s 增加 1,然后通知一個(gè)阻塞在 runtime_Semacquire 的 goroutine【原子操作】。
兩個(gè)原子操作,一個(gè) acquire,一個(gè) release,其實(shí)就代表了對(duì)資源的獲取和釋放。Mutex 作為 sync 包的核心,支撐了 RWMutex,channel,singleflight 等多個(gè)并發(fā)控制的能力,而對(duì)信號(hào)量的管理又是 Mutex 的基礎(chǔ)。
雖然源碼看不到,但 Golang 其實(shí)在擴(kuò)展庫(kù) golang.org/x/sync/semaphore 也提供了一套信號(hào)量的實(shí)現(xiàn),我們可以由此來參考一下,理解 semaphore 的實(shí)現(xiàn)思路。
信號(hào)量
在看源碼之前,我們先理清楚【信號(hào)量】設(shè)計(jì)背后的場(chǎng)景和原理。
信號(hào)量的概念是荷蘭計(jì)算機(jī)科學(xué)家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應(yīng)用在不同的操作系統(tǒng)中。在系統(tǒng)中,會(huì)給每一個(gè)進(jìn)程一個(gè)信號(hào)量,代表每個(gè)進(jìn)程目前的狀態(tài)。未得到控制權(quán)的進(jìn)程,會(huì)在特定的地方被迫停下來,等待可以繼續(xù)進(jìn)行的信號(hào)到來。
在 Mutex 依賴的信號(hào)量機(jī)制中我們可以看到,這里本質(zhì)就是依賴 sema 一個(gè) uint32 的變量 + 原子操作來實(shí)現(xiàn)并發(fā)控制能力。當(dāng) goroutine 完成對(duì)信號(hào)量等待時(shí),該變量 -1,當(dāng) goroutine 完成對(duì)信號(hào)量的釋放時(shí),該變量 +1。
如果一個(gè)新的 goroutine 發(fā)現(xiàn)信號(hào)量不大于 0,說明資源暫時(shí)沒有,就得阻塞等待。直到信號(hào)量 > 0,此時(shí)的語義是有新的資源,該goroutine就會(huì)結(jié)束等待,完成對(duì)信號(hào)量的 -1 并返回。注意我們上面有提到,runtime 支持的兩個(gè)方法都是原子性的,不用擔(dān)心兩個(gè)同時(shí)在等待的 goroutine 同時(shí)搶占同一份資源。
典型的信號(hào)量場(chǎng)景是【圖書館借書】。假設(shè)學(xué)校圖書館某熱門書籍現(xiàn)在只有 100 本存貨,但是上萬學(xué)生都想借閱,怎么辦?
直接買一萬本書是非常簡(jiǎn)單粗暴的解法,但資源有限,這不是長(zhǎng)久之計(jì)。
常見的解決方案很簡(jiǎn)單:學(xué)生們先登記,一個(gè)一個(gè)來。我們先給 100 個(gè)同學(xué)發(fā)出,剩下的你們繼續(xù)等,等到什么時(shí)候借書的同學(xué)看完了,把書還回來了,就給排隊(duì)等待的同學(xué)們發(fā)放。同時(shí),為了避免超發(fā),每發(fā)一個(gè),都需要在維護(hù)的記錄里將【余量】減去 1,每還回來一個(gè),就把【余量】加上 1。
runtime_Semacquire 就是排隊(duì)等待借書,runtime_Semrelease 就是看完了把書歸還給圖書館。
另外需要注意,雖然我們上面舉例的增加/減小的粒度都是 1,但這本質(zhì)上只是一種場(chǎng)景,事實(shí)上就算是圖書館借書,也完全有可能出現(xiàn)一個(gè)人同時(shí)借了兩本一模一樣的書。所以,信號(hào)量的設(shè)計(jì)需要支持 N 個(gè)資源的獲取和釋放。
所以,我們對(duì)于 acquire 和 release 兩種操作的語義如下:
- release: 將信號(hào)量增加 n【保證原子性】;
- acquire: 若信號(hào)量 < n,阻塞等待,直到信號(hào)量 >= n,此時(shí)將信號(hào)量的值減去 n【保證原子性】。
semaphore 擴(kuò)展庫(kù)實(shí)現(xiàn)
這里我們結(jié)合golang.org/x/sync/semaphore 源碼來看看怎樣設(shè)計(jì)出來我們上面提到的信號(hào)量結(jié)構(gòu)。
// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64 // 最大資源數(shù)
cur int64 // 當(dāng)前已被使用的資源
mu sync.Mutex
waiters list.List // 等待隊(duì)列
}
有意思的是,雖然包名是 semaphore,但是擴(kuò)展庫(kù)里真正給【信號(hào)量結(jié)構(gòu)體】定義的名稱是 Weighted。從上面的定義我們可以看到,傳入初始資源個(gè)數(shù) n(對(duì)應(yīng) size),就可以生成一個(gè) Weighted 信號(hào)量結(jié)構(gòu)。

Weighted 提供了三個(gè)方法來實(shí)現(xiàn)對(duì)信號(hào)量機(jī)制的支持:
- Acquire
對(duì)應(yīng)上面我們提到的 acquire 語義,注意我們提到過,抽象的來講,acquire 成功與否其實(shí)不太看返回值,而是只要獲取不了就一直阻塞,如果返回了,就意味著獲取到了。
但在 Golang 實(shí)現(xiàn)當(dāng)中,我們肯定不希望,如果發(fā)生了異常 case,導(dǎo)致一直阻塞在這里。所以你可以看到 Acquire 的入?yún)⒗镉袀€(gè) context.Context,借用 context 的上下文控制能力,你可以對(duì)此進(jìn)行 cancel, 可以設(shè)置 timeout 等待超時(shí),就能對(duì) acquire 行為進(jìn)行更多約束。
所以,acquire 之后我們?nèi)匀恍枰獧z查返回值 error,如果為 nil,代表正常獲取了資源。否則可能是 context 已經(jīng)不合法了。
- Release
跟上面提到的 release 語義完全一致,傳入你要釋放的資源數(shù) n,保證原子性地增加信號(hào)量。
- TryAcquire
這里其實(shí)跟 sync 包中的各類 TryXXX 函數(shù)定位很像。并發(fā)的機(jī)制中大都包含 fast path 和 slow path,比如首個(gè) goroutine 先來 acquire,那么一定是能拿到的,后續(xù)再來請(qǐng)求的 goroutine 由于慢了一步,只能走 slow path 進(jìn)行等待,自旋等操作。sync 包中絕大部分精華,都在于 slow path 的處理。fast path 大多是一個(gè)基于 atomic 包的原子操作,比如 CAS 就可以解決。
TryAcquire 跟 Acquire 的區(qū)別在于,雖然也是要資源,但是不等待。有了我就獲取,就減信號(hào)量,返回 trye。但是如果目前還沒有,我不會(huì)阻塞在這里,而是直接返回 false。
下面我們逐個(gè)方法看看,Weighted 是怎樣實(shí)現(xiàn)的。
Acquire
// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// If we're at the front and there're extra tokens left, notify other waiters.
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
在閱讀之前回憶一下上面 Weighted 結(jié)構(gòu)的定義,注意 Weighted 并沒有維護(hù)一個(gè)變量用來表示【當(dāng)前剩余的資源】,這一點(diǎn)是通過 size(初始化的時(shí)候設(shè)置,表示總資源數(shù))減去 cur(當(dāng)前已被使用的資源),二者作差得到的。
我們來拆解一下上面這段代碼:
第一步:這是常規(guī)意義上的 fast path
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
- 先上鎖,保證并發(fā)安全;
- 校驗(yàn)如果 size - cur >= n,代表剩余的資源是足夠,同時(shí) waiters 這個(gè)等待隊(duì)列為空,代表沒有別的協(xié)程在等待;
- 此時(shí)就沒什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 個(gè)資源,然后解鎖返回,很直接。
第二步:針對(duì)特定場(chǎng)景做提前剪枝
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
如果請(qǐng)求的資源數(shù)量,甚至都大于資源總數(shù)量了,說明這個(gè)協(xié)程心里沒數(shù)。。。。就算我現(xiàn)在把所有初始化的資源都拿回來,也喂不飽你呀?。。∧悄茉趺崔k,我就不煩勞后面流程處理了,直接等你的 context 什么時(shí)候 Done,給你返回 context 的錯(cuò)誤就行了,同時(shí)先解個(gè)鎖,別耽誤別的 goroutine 拿資源。
第三步:資源是夠的,只是現(xiàn)在沒有,那就把當(dāng)前goroutine加到排隊(duì)的隊(duì)伍里
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
這里 ready 結(jié)構(gòu)是個(gè)空結(jié)構(gòu)體的 channel,僅僅是為了實(shí)現(xiàn)協(xié)程間通信,通知什么時(shí)候資源 ready,建立一個(gè)屬于這個(gè) goroutine 的 waiter,然后塞到 Weighted 結(jié)構(gòu)的等待隊(duì)列 waiters 里。
搞定了以后直接解鎖,因?yàn)槟阋呀?jīng)來排隊(duì)了,手續(xù)處理完成,以后的路有別的通知機(jī)制保證,就沒必要在這兒拿著鎖阻塞新來的 goroutine 了,人家也得排隊(duì)。
第四步:排隊(duì)等待
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// If we're at the front and there're extra tokens left, notify other waiters.
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
一個(gè) select 語句,只看兩種情況:1. 這個(gè) goroutine 的 context 超時(shí)了;2. 拿到了資源,皆大歡喜。
重點(diǎn)在于 ctx.Done 分支里 default 的處理。我們可以看到,如果是超時(shí)了,此時(shí)還沒拿到資源,首先會(huì)把當(dāng)前 goroutine 從 waiters 等待隊(duì)列里移除(合情合理,你既然因?yàn)樽约旱脑蜃霾涣酥?,沒法繼續(xù)等待了,就別耽誤別人事了)。
然后接著判斷,若這個(gè) goroutine 同時(shí)也是排在最前的 goroutine,而且恰好現(xiàn)在有資源了,就趕緊通知隊(duì)里的 goroutine 們,伙計(jì)們,現(xiàn)在有資源了,趕緊來拿。我們來看看這個(gè) notifyWaiters 干了什么:
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}
其實(shí)很簡(jiǎn)單,遍歷 waiters 這個(gè)等待隊(duì)列,拿到排隊(duì)最前的 waiter,判斷資源夠不夠,如果夠了,增加 cur 變量,資源給你,然后把你從等待隊(duì)列里移出去,再 close ready 那個(gè)goroutine 就行,算是通知一下。
重點(diǎn)部分在于,如果資源不夠怎么辦?
想象一下現(xiàn)在的處境,Weighted 這個(gè) semaphore 的確有資源,而目前要處理的這個(gè) goroutine 的的確確就是排隊(duì)最靠前的,而且人家也沒獅子大開口,要比你總 size 還大的資源。但是,但是,好巧不巧,現(xiàn)在你要的數(shù)量,比我手上有的少。。。。
很無奈,那怎么辦呢?
無非兩種解法:
- 我先不管你,反正你要的不夠,我先看看你后面那個(gè) goroutine 人家夠不夠,雖然你現(xiàn)在是排位第一個(gè),但是也得繼續(xù)等著;
- 沒辦法,你排第一,需求我就得滿足,所以我們都繼續(xù)等,等啥時(shí)候資源夠了就給你。
擴(kuò)展庫(kù)實(shí)際選用的是第 2 種策略,即一定要滿足排在最前面的 goroutine,這里的考慮在注釋里有提到,如果直接繼續(xù)看后面的 goroutine 夠不夠,優(yōu)先滿足后面的,在一些情況下會(huì)餓死有大資源要求的 goroutine,設(shè)計(jì)上不希望這樣的情況發(fā)生。
簡(jiǎn)單說:要的多不是錯(cuò),既然你排第一,目前貨不多,那就大家一起阻塞等待,保障你的權(quán)利。
Release
// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
Release 這里的實(shí)現(xiàn)非常簡(jiǎn)單,一把鎖保障不出現(xiàn)并發(fā),然后將 cur 減去 n 即可,說明此時(shí)又有 n 個(gè)資源回到了貨倉(cāng)。然后和上面 Acquire 一樣,調(diào)用 notifyWaiters,叫排隊(duì)第一的哥們(哦不,是 goroutine)來領(lǐng)東西了。
TryAcquire
// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
其實(shí)就是 Acquire 方法的 fast path,只是返回了個(gè) bool,標(biāo)識(shí)是否獲取成功。
總結(jié)
今天我們了解了擴(kuò)展庫(kù) semaphore 對(duì)于信號(hào)量的封裝實(shí)現(xiàn),整體代碼加上注釋也才 100 多行,是非常好的學(xué)習(xí)材料,建議大家有空了對(duì)著源碼再過一遍。Acquire 和 Release 的實(shí)現(xiàn)都很符合直覺。
其實(shí),我們使用 buffered channel 其實(shí)也可以模擬出來 n 個(gè)信號(hào)量的效果,但就不具備 semaphore Weighted 這套實(shí)現(xiàn)里面,一次獲取多個(gè)資源的能力了。
以上就是Golang信號(hào)量設(shè)計(jì)實(shí)現(xiàn)示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Go信號(hào)量設(shè)計(jì)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang?Compare?And?Swap算法詳細(xì)介紹
CAS算法是一種有名的無鎖算法。無鎖編程,即不使用鎖的情況下實(shí)現(xiàn)多線程之間的變量同步,也就是在沒有線程被阻塞的情況下實(shí)現(xiàn)變量的同步,所以也叫非阻塞同步Non-blocking?Synchronization2022-10-10
go語言中切片Slice與數(shù)組Array對(duì)比以及panic:?runtime?error:?index?out?
go語言中數(shù)組與其他語言有在顯著的不同,包括其不能夠進(jìn)行添加,以及值拷貝的特性,下面這篇文章主要給大家介紹了關(guān)于go語言中切片Slice與數(shù)組Array對(duì)比以及panic:?runtime?error:?index?out?of?range問題解決的相關(guān)資料,需要的朋友可以參考下2022-07-07
Golang?WebSocket創(chuàng)建單獨(dú)會(huì)話詳細(xì)實(shí)例
這篇文章主要給大家介紹了關(guān)于Golang?WebSocket創(chuàng)建單獨(dú)會(huì)話的相關(guān)資料,WebSocket 協(xié)議主要為了解決基于 HTTP/1.x 的 Web 應(yīng)用無法實(shí)現(xiàn)服務(wù)端向客戶端主動(dòng)推送的問題,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-11-11
golang字符串拼接實(shí)現(xiàn)方式和區(qū)別對(duì)比
本文介紹了Go語言中字符串拼接的多種方法及其優(yōu)缺點(diǎn),推薦使用strings.Builder進(jìn)行頻繁拼接以優(yōu)化內(nèi)存分配和性能,同時(shí),還討論了通過sync.Pool優(yōu)化高頻創(chuàng)建的對(duì)象,以減少垃圾回收壓力,感興趣的朋友一起看看吧2025-02-02
詳解Go語言微服務(wù)開發(fā)框架之Go chassis
分布式系統(tǒng)中每個(gè)進(jìn)程的動(dòng)態(tài)配置管理及運(yùn)行時(shí)熱加載就成為了一個(gè)亟待解決的問題。go chassis汲取了netflix的archaius框架經(jīng)驗(yàn),并做出來自己的創(chuàng)新特性。2021-05-05
golang的匿名函數(shù)和普通函數(shù)的區(qū)別解析
匿名函數(shù)是不具名的函數(shù),可以在不定義函數(shù)名的情況下直接使用,通常用于函數(shù)內(nèi)部的局部作用域中,這篇文章主要介紹了golang的匿名函數(shù)和普通函數(shù)的區(qū)別,需要的朋友可以參考下2023-03-03
使用Golang實(shí)現(xiàn)加權(quán)負(fù)載均衡算法的實(shí)現(xiàn)代碼
這篇文章主要介紹了使用Golang實(shí)現(xiàn)加權(quán)負(fù)載均衡算法的實(shí)現(xiàn)代碼,詳細(xì)說明權(quán)重轉(zhuǎn)發(fā)算法的實(shí)現(xiàn),通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09
詳解golang中?work與?module?的區(qū)別與聯(lián)系
Go?模塊通常由一個(gè)項(xiàng)目或庫(kù)組成,并包含一組隨后一起發(fā)布的?Go?包,Go?模塊通過允許用戶將項(xiàng)目代碼放在他們選擇的目錄中并為每個(gè)模塊指定依賴項(xiàng)的版本,解決了原始系統(tǒng)的許多問題,本文將給大家介紹一下golang中?work與?module?的區(qū)別與聯(lián)系,需要的朋友可以參考下2023-09-09

