go RWMutex的實現(xiàn)示例
Overview
go 里面的 rwlock 是 write preferred 的,可以避免寫鎖饑餓。
讀鎖和寫鎖按照先來后到的規(guī)則持有鎖,一旦有協(xié)程持有了寫鎖,后面的協(xié)程只能在寫鎖被釋放后才能得到讀鎖。
同樣,一旦有 >= 1 個協(xié)程寫到了讀鎖,只有等這些讀鎖全部釋放后,后面的協(xié)程才能拿到寫鎖。
下面了解一下 Go 的 RWMutex 是如何實現(xiàn)的吧,下面的代碼取自 go1.17.2/src/sync/rwmutex.go,并刪減了 race 相關(guān)的代碼。
PS: rwmutex 的代碼挺短的,其實讀源碼也沒那么可怕...
RWMutex 的結(jié)構(gòu)
RWMutex 總體上是通過: 普通鎖和條件變量來實現(xiàn)的
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
Lock
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
Unlock
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
RLock
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
RUnlock
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
Q1: 多個協(xié)程并發(fā)拿讀鎖,如何保證這些讀鎖協(xié)程都不會被阻塞?
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
拿讀鎖時,僅僅會增加 readerCount,因此讀鎖之間是可以正常并發(fā)的
Q2: 多個協(xié)程并發(fā)拿寫鎖,如何保證只會有一個協(xié)程拿到寫鎖?
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
拿寫鎖時,會獲取 w.Lock,自然能保證同一時間只會有一把寫鎖
Q3: 在讀鎖被拿到的情況下,新協(xié)程拿寫鎖,如果保證寫鎖現(xiàn)成會被阻塞?
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
假設(shè)此時有 5 個協(xié)程拿到讀鎖,則 readerCount = 5,假設(shè) rwmutexMaxReaders = 100。
此時有一個新的協(xié)程 w1 想要拿寫鎖。
在執(zhí)行
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
后, rw.readerCount = -95,r = 5。
在執(zhí)行
atomic.AddInt32(&rw.readerWait, r)
后,rw.readerWait = 5。
readerWait 記錄了在獲取寫鎖的這一瞬間有多少個協(xié)程持有讀鎖。這一瞬間之后,就算有新的協(xié)程嘗試獲取讀鎖,也只會增加 readerCount ,而不會動到 readerWait。
之后執(zhí)行 runtime_SemacquireMutex() 睡在了 writerSem 這個信號量上面。
Q4: 在讀鎖被拿到的情況下,新協(xié)程拿寫鎖被阻塞,當舊有的讀鎖協(xié)程全部釋放,如何喚醒等待的寫鎖協(xié)程
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
繼續(xù)上一步的場景,每當執(zhí)行 RUnlock 時,readerCount 都會減去1。當 readerCount 為負數(shù)時,意味著有協(xié)程正在持有或者正在等待持有寫鎖。
之前的五個讀協(xié)程中的四個,每次 RUnlock() 之后,readerCount = -95 - 4 = -99,readerWait = 5 - 4 = 1。
當最后一個讀協(xié)程調(diào)用 RUnlock() 之后,readerCount 變成了 -100,readerWait 變成 0,此時會喚醒在 writerSem 上沉睡的協(xié)程 w1。
Q5: 在寫鎖被拿到的情況下,新協(xié)程拿讀鎖,如何讓新協(xié)程被阻塞?
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
繼續(xù)上面的場景,readerCount = -100 + 1 = -99 < 0。
新的讀協(xié)程 r1 被沉睡在 readerSem 下面。
假設(shè)此時再來一個讀協(xié)程 r2,則 readerCount = -98,依舊沉睡。
Q6: 在寫鎖被拿到的情況下,新協(xié)程拿讀鎖,寫鎖協(xié)程釋放,如何喚醒等待的讀鎖協(xié)程?
繼續(xù)上面的場景,此時協(xié)程 w1 釋放寫鎖
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
在執(zhí)行
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
后,r = readerCount = -98 + 100 = 2,代表此時有兩個讀協(xié)程 r1 和 r2 在等待
ps: 如果此時有一些新的協(xié)程想要拿讀鎖,他會因為 readerCount = 2 + 1 = 3 > 0 而順利執(zhí)行下去,不會被阻塞
之后 for 循環(huán)執(zhí)行兩次,將協(xié)程 r1 和 協(xié)程 r2 都喚醒了。
Q7: 在寫鎖被拿到的情況下,有兩個協(xié)程分別去搶讀鎖和寫鎖,當寫鎖被釋放時,這兩個協(xié)程誰會勝利?
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
由于是先喚醒讀鎖,再調(diào)用 w.Unlock() ,因此肯定是讀協(xié)程先勝利!
認為寫的比較巧妙的兩個點
readerCount 與 rwmutexMaxReaders 的糾纏
通過
readerCount + rwmutexMaxReaders以及readerCount - rwmutexMaxReaders這兩個操作可以得知當前是否有協(xié)程等待/持有寫鎖以及當前等待/持有讀鎖的協(xié)程數(shù)量readerCount 與 readerWait 的糾纏
在 Lock() 時直接將 readerCount 的值賦給 readerWait,在 readerWait = 0 而非 readerCount = 0 是喚醒寫協(xié)程,可以避免在 Lock() 后來達到的讀協(xié)程先于寫協(xié)程被執(zhí)行。
到此這篇關(guān)于go RWMutex的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)go RWMutex內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang使用原生http實現(xiàn)中間件的代碼詳解
中間件(middleware):常被用來做認證校驗、審計等,家常用的Iris、Gin等web框架,都包含了中間件邏輯,但有時我們引入該框架顯得較為繁重,本文將介紹通過golang原生http來實現(xiàn)中間件操作,需要的朋友可以參考下2024-05-05
基于Go實現(xiàn)TCP長連接上的請求數(shù)控制
在服務(wù)端開啟長連接的情況下,四層負載均衡轉(zhuǎn)發(fā)請求時,會出現(xiàn)服務(wù)端收到的請求qps不均勻的情況或是服務(wù)器無法接受到請求,因此需要服務(wù)端定期主動斷開一些長連接,所以本文給大家介紹了基于Go實現(xiàn)TCP長連接上的請求數(shù)控制,需要的朋友可以參考下2024-05-05
golang?channel多協(xié)程通信常用方法底層原理全面解析
channel?是?goroutine?與?goroutine?之間通信的重要橋梁,借助?channel,我們能很輕易的寫出一個多協(xié)程通信程序,今天,我們就來看看這個?channel?的常用用法以及底層原理2023-09-09
GoLang并發(fā)編程中條件變量sync.Cond的使用
Go標準庫提供Cond原語的目的是,為等待/通知場景下的并發(fā)問題提供支持,本文主要介紹了Go并發(fā)編程sync.Cond的具體使用,具有一定的參考價值,感興趣的可以了解一下2023-01-01
Go語言實現(xiàn)廣播式并發(fā)聊天服務(wù)器
本文主要介紹了Go語言實現(xiàn)廣播式并發(fā)聊天服務(wù)器,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-08-08

