一文帶你了解Go語言中鎖的實現(xiàn)
前言
此文為學(xué)習(xí)go鎖和讀寫鎖的總結(jié)文檔, 主要從"參考"部分的文章結(jié)合源碼學(xué)習(xí), 總結(jié)于此.
ps: 注釋"r: "開頭代表來自參考文章, 見最后
Mutex
省流不看版:
沒鎖直接鎖,鎖不上自旋或讓出調(diào)度等待喚醒,直到鎖上.
饑餓模式阻塞隊列先進(jìn)先出
Lock
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 上鎖,成功返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
//已經(jīng)鎖上的寫成進(jìn)入慢鎖流程
m.lockSlow()
}lockSlow
func (m *Mutex) lockSlow() {
var waitStartTime int64 //執(zhí)行時間
starving := false //當(dāng)前請求是否是饑餓模式
awoke := false //當(dāng)前請求是否是喚醒狀態(tài)
iter := 0 //自旋次數(shù)
old := m.state //舊state值
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
//舊state值已上鎖,并且未進(jìn)入饑餓模式,且可以自旋,進(jìn)入自旋邏輯
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 當(dāng)前協(xié)程未喚醒
//&& old.state 為未喚起狀態(tài),就是說沒有其他被喚起的waiter
//&& waiter數(shù)>0
//&& m.state標(biāo)記為喚起狀態(tài)成功
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//標(biāo)記當(dāng)前協(xié)程為喚起狀態(tài)
//r: 這是為了通知在解鎖Unlock()中不要再喚醒其他的waiter了
awoke = true
}
//自旋
runtime_doSpin()
//自旋計數(shù)器
iter++
old = m.state
continue
}
//r: old是鎖當(dāng)前的狀態(tài),new是期望的狀態(tài),以期于在后面的CAS操作中更改鎖的狀態(tài)
//new代表期望的state值
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
//old不是饑餓狀態(tài),new帶上上鎖標(biāo)志位,也就是饑餓狀態(tài)不上鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
//舊state值是上鎖狀態(tài)或饑餓狀態(tài),新state waiter數(shù)+1
//r: 表示當(dāng)前goroutine將被作為waiter置于等待隊列隊尾
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
//當(dāng)前協(xié)程為饑餓狀態(tài)&&舊state已上鎖,新state加饑餓標(biāo)志位
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//r:? 當(dāng)awoke為true,則表明當(dāng)前goroutine在自旋邏輯中,成功修改鎖的Woken狀態(tài)位為1
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//新state關(guān)閉喚醒標(biāo)志位
//r: 因為在后續(xù)的邏輯中,當(dāng)前goroutine要么是拿到鎖了,要么是被掛起。
// 如果是掛起狀態(tài),那就需要等待其他釋放鎖的goroutine來喚醒。
// 假如其他goroutine在unlock的時候發(fā)現(xiàn)Woken的位置不是0,則就不會去喚醒,那該goroutine就無法再醒來加鎖。(見unlock邏輯)
?
new &^= mutexWoken
}
//r: 嘗試將鎖的狀態(tài)更新為期望狀態(tài)
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//舊state不是鎖或饑餓狀態(tài),上鎖成功,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
//r: 如果走到這里,那就證明當(dāng)前goroutine沒有獲取到鎖
// 這里判斷waitStartTime != 0就證明當(dāng)前goroutine之前已經(jīng)等待過了,則需要將其放置在等待隊列隊頭
//進(jìn)入隊列是否排在最前
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//阻塞
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//r: 被信號量喚醒之后檢查當(dāng)前goroutine是否應(yīng)該表示為饑餓
// (這里表示為饑餓之后,會在下一輪循環(huán)中嘗試將鎖的狀態(tài)更改為饑餓模式)
// 1. 如果當(dāng)前goroutine已經(jīng)饑餓(在上一次循環(huán)中更改了starving為true)
// 2. 如果當(dāng)前goroutine已經(jīng)等待了1ms以上
//被信號量喚醒后當(dāng)前協(xié)程是否進(jìn)入饑餓狀態(tài)
//1. 之前是饑餓狀態(tài)
//2. 運行時間超過1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 再次獲取鎖狀態(tài)
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
//饑餓模式協(xié)程是在Unlock()時handoff到當(dāng)前協(xié)程的
//r:? 如果當(dāng)前鎖既不是被獲取也不是被喚醒狀態(tài),或者等待隊列為空
// 這代表鎖狀態(tài)產(chǎn)生了不一致的問題
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
//m.state 上鎖,waiter數(shù)-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//當(dāng)前協(xié)程不是饑餓狀態(tài)或舊state的waiter數(shù)=1,則m.state饑餓標(biāo)志位置0
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
//拿到鎖,退出.
break
}
awoke = true
iter = 0
} else {
//執(zhí)行循環(huán)前的語句,恢復(fù)最新現(xiàn)場
old = m.state
}
}
?
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}Unlock
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
?
// Fast path: drop lock bit.
//m.state取消鎖狀態(tài),返回值new代表修改后的新值
//如果為0代表沒有其他鎖了,退出;否則進(jìn)入unlockSlow()
//鎖空閑有兩種情況:
//1. 所有位為0,代表沒有鎖了
//2. 標(biāo)志位為0, waiter數(shù)量>0,還有協(xié)程在等待解鎖
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}UnlockSlow
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
//解鎖,結(jié)束,退出
//1. 沒有waiter了
//2. 已上鎖
//3. 鎖處于喚醒狀態(tài),表示有協(xié)程被喚醒
//4. 饑餓模式, 所有權(quán)交給了被解鎖饑餓模式的waiter
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 如果能走到這,那就是上面的if判斷沒通過
// 說明當(dāng)前鎖是空閑狀態(tài),但是等待隊列中有waiter,且沒有g(shù)oroutine被喚醒
// 所以,這里我們想要把鎖的狀態(tài)設(shè)置為被喚醒,等待隊列waiter數(shù)-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//通過信號量喚醒某一個waiter,退出
runtime_Semrelease(&m.sema, false, 1)
return
}
//失敗的話,更新old信息,進(jìn)入下個循環(huán)
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
//饑餓模式,喚醒等待隊列隊頭waiter
runtime_Semrelease(&m.sema, true, 1)
}
}其他關(guān)鍵函數(shù)
runtime_canSpin
是否可自旋,不展開
runtime_doSpin
核心是匯編實現(xiàn),循環(huán)執(zhí)行三十次PAUSE指令
runtime_SemacquireMutex
信號量上鎖
sem來自單詞semaphore 信號量
runtime_Semrelease
信號量釋放
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
If handoff is true, pass count directly to the first waiter.
handoff 就是傳球的意思,handoff 為 false 時,僅僅喚醒等待隊列中第一個協(xié)程,但是不會立馬調(diào)度該協(xié)程;當(dāng) handoff 為 true 時,會立馬調(diào)度被喚醒的協(xié)程,此外,當(dāng) handoff = true 時,被喚醒的協(xié)程會繼承當(dāng)前協(xié)程的時間片。具體例子,假設(shè)每個 goroutine 的時間片為 2ms,gorounte A 已經(jīng)執(zhí)行了 1ms,假設(shè)它通過 runtime_Semrelease(handoff = true) 喚醒了 goroutine B,則 goroutine B 剩余的時間片為 2 - 1 = 1ms。
semrelease1(addr, handoff, skipframes) 參數(shù)handoff若為true,則讓被喚醒的g立刻繼承當(dāng)前g的時間片繼續(xù)執(zhí)行。若handoff為false,則把剛被喚醒的g放到當(dāng)前p的runq中。
RWMutex
很簡單,看源碼就行
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers 當(dāng)前讀鎖數(shù)量
readerWait int32 // number of departing readers 要離開的讀鎖數(shù)量,暨等待寫鎖解鎖,解鎖后可以釋放的讀鎖數(shù)量
}Lock()
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock() //通過sync.Lock()限制多寫鎖進(jìn)入下邊的邏輯
// Announce to readers there is a pending writer.
//r值不變, rwmutexMaxReaders值為1<<30
//可以理解為只要讀鎖的數(shù)量小于1<<30位,rw.readerCount值<0表示有寫鎖.
//也可以理解為加上一個負(fù)數(shù),將31位以上都標(biāo)記為1,代表有寫鎖, 剩余30位記錄讀鎖數(shù)量
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
//r!=0 有讀鎖,不能釋放寫鎖
//將readerCount轉(zhuǎn)移到readerWait,readerWait的新值!=0 (以上可以翻譯為有讀鎖,將讀鎖數(shù)轉(zhuǎn)移到讀等待數(shù),然后寫鎖阻塞,)
// 滿足上面兩個條件,寫鎖阻塞, 等待喚醒,不返回
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}UnLock()
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
?
// Announce to readers there is no active writer.\
//將Lock()方法減去的值加回來,變成正數(shù)
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
//喚醒在RLock()方法阻塞的讀操作,數(shù)量為r
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}RLock()
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
//<0表示已上寫鎖,阻塞
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}UnRLock()
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
//<0表示已上寫鎖,慢解鎖
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
?
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
//最后一個讀等待,喚醒寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}到此這篇關(guān)于一文帶你了解Go語言中鎖的實現(xiàn)的文章就介紹到這了,更多相關(guān)Go語言 鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang中String,rune和byte的相互轉(zhuǎn)換
Go語言中,string就是只讀的采用utf8編碼的字節(jié)切片,rune是int32的別名,代表字符的Unicode編碼,這篇文章主要介紹了Golang中String,rune和byte的相互轉(zhuǎn)換,感興趣的小伙伴可以了解一下2023-10-10
Go類型斷言提取測試接口值動態(tài)類型及靜態(tài)轉(zhuǎn)換確保類型接口編譯安全
這篇文章主要為大家介紹了Go類型斷言提取測試接口值動態(tài)類型及靜態(tài)轉(zhuǎn)換確保類型實現(xiàn)特定接口的編譯時安全性詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
Golang使用pprof和trace進(jìn)行診斷和修復(fù)性能問題
在?Go?中,開發(fā)人員可以使用強(qiáng)大的內(nèi)置工具來幫助診斷和修復(fù)性能問題,其中兩個工具是?pprof?和?trace?包,下面就跟隨小編一起來了解下如何使用pprof和trace進(jìn)行診斷和修復(fù)性能問題吧2024-01-01
Go語言讀取,設(shè)置Cookie及設(shè)置cookie過期方法詳解
這篇文章主要介紹了Go語言讀取,設(shè)置Cookie及設(shè)置cookie過期方法詳解,需要的朋友可以參考下2022-04-04
Go語言實現(xiàn)二維數(shù)組的2種遍歷方式以及案例詳解
這篇文章主要介紹了Go語言實現(xiàn)二維數(shù)組的2種遍歷方式以及案例詳解,圖文代碼聲情并茂,有感興趣的可以學(xué)習(xí)下2021-03-03
Golang實現(xiàn)異步上傳文件支持進(jìn)度條查詢的方法
這篇文章主要介紹了Golang實現(xiàn)異步上傳文件支持進(jìn)度條查詢的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10
源碼剖析Golang中singleflight的應(yīng)用
這篇文章主要為大家詳細(xì)介紹了如何利用singleflight來避免緩存擊穿,并剖析singleflight包的源碼實現(xiàn)和工作原理,感興趣的可以了解下2024-03-03

