Golang中sync.Mutex的源碼分析
Mutex結構
type Mutex struct {
state int32
sema uint32
}
- state 記錄鎖的狀態(tài),轉換為二進制前29位表示等待鎖的goroutine數(shù)量,后三位從左到右分別表示當前g 是否已獲得鎖、是否被喚醒、是否正饑餓
- sema 充當臨界資源,其地址作為這個鎖在全局的唯一標識,所有等待這個鎖的goroutine都會在阻塞前把自己的sudog放到這個鎖的等待隊列上,然后等待被喚醒,sema的值就是可以被喚醒的goroutine的數(shù)目,只有0和1。
常量
const ( mutexLocked = 1 << iota // mutex is locked //值1,轉二進制后三位為001,表示鎖已被搶 mutexWoken //值2,轉二進制后三位為010,告訴即將釋放鎖的g現(xiàn)在已有g被喚醒 mutexStarving //值4,轉二進制后三位為100,表示當前處在饑餓狀態(tài) mutexWaiterShift = iota //值3,表示mutex.state右移3位為等待鎖的goroutine數(shù)量 starvationThresholdNs = 1e6 //表示mutext切換到饑餓狀態(tài)所需等待時間的閾值,1ms。 )
Locker接口
type Locker interface {
Lock()
Unlock()
}
下面重點看這兩個方法。
加鎖Lock

Lock()
func (m *Mutex) Lock() {
// 第一種情況:快上鎖,即此刻無人來搶鎖
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled { //競爭檢測相關,不用看
race.Acquire(unsafe.Pointer(m))
}
return
}
// 第二種情況:慢上鎖,即此刻有競爭對手
m.lockSlow()
}
CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool){},go的CAS操作,底層通過調用cpu指令集提供的CAS指令實現(xiàn),位置在src/runtime/internal/atomic/atomic_amd64.s/·Cas(SB)。
- 參數(shù)addr:變量地址
- 參數(shù)old:舊值
- 參數(shù)new:新值
- 原理:如果addr和old相等,則將new賦值給addr,并且返回true,否則返回false
lockSlow()
// 注釋里的第一人稱“我”只當前g
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待開始的時間
starving := false // 我是否饑餓
awoke := false // 我是否被喚醒
iter := 0 // 我的自旋次數(shù)
old := m.state // 這個鎖此時此刻所有的信息
for {
// 如果:鎖已經被搶了 或著 正處在饑餓狀態(tài) 或者 允許我自旋 那么進行自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果:我沒有處在喚醒態(tài) 并且 當前無g處在喚醒態(tài) 并且
// 有等待鎖的g 并且CAS嘗試將我置為喚醒態(tài)成功 則進行自旋
// 之所以將我置為喚醒態(tài)是為了明示那些執(zhí)行完畢正在退出的g不用再去喚醒其它g了,因為只允許存在一個喚醒的g。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // 我自旋一次
iter++ // 我自旋次數(shù)加1
old = m.state
continue
}
new := old // new只是個中間態(tài),后面的cas操作將會判斷是否將這個中間態(tài)落實
// 如果不是處在饑餓模式就立即搶鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果鎖被搶了 或者 處在饑餓模式,那就去排隊
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 等待鎖的goroutine數(shù)量加1
}
// 如果我現(xiàn)在饑渴難耐 而且 鎖也被搶走了,那就立即將鎖置為饑餓模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 釋放我的喚醒態(tài)
// 因為后面我要么搶到鎖要么被阻塞,都不是處在和喚醒態(tài)
new &^= mutexWoken
}
//此處CAS操作嘗試將new這個中間態(tài)落實
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 搶鎖成功!
}
// queueLifo我之前有沒有排過隊
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//原語:如果我之前排過隊,這次就把我放到等待隊列隊首,否則把我放到隊尾,并將我掛起
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 剛被喚醒的我先判斷自己是不是饑餓了,如果我等待鎖的時間小于starvationThresholdNs(1ms),那就不餓
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 我一覺醒來發(fā)覺鎖正處在饑餓狀態(tài),蒼天有眼這個鎖屬于我了,因為饑餓狀態(tài)絕對沒有人跟我搶鎖
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// delta是一個中間狀態(tài),atomic.AddInt32方法將給鎖落實這個狀態(tài)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 如果現(xiàn)在我不饑餓或者等待鎖的就我一個,那么就將鎖切換到正常狀態(tài)。
// 饑餓模式效率很低,而且一旦有兩個g把mutex切換為饑餓模式,那就會死鎖。
delta -= mutexStarving
}
// 原語:給鎖落實delta的狀態(tài)。
atomic.AddInt32(&m.state, delta)
// 我拿到鎖啦
break
}
// 把我的狀態(tài)置為喚醒,我將繼續(xù)去搶鎖
awoke = true
// 把我的自旋次數(shù)置0,我又可以自旋搶鎖啦
iter = 0
} else {
// 繼續(xù)去搶鎖
old = m.state
}
}
// 競爭檢測的代碼,不管
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
runtime_canSpin(iter) 判斷當前g可否自旋,已經自旋過iter次
func sync_runtime_canSpin(i int) bool {
// 可自旋的條件:
// 1.多核cpu
// 2.GOMAXPROCS > 1 且 至少有一個其他的p在運行 且 該p的本地runq為空
// 3.iter小于最大自旋次數(shù)active_spin = 4
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
runtime_doSpin()通過調用procyield(n int32)方法來實現(xiàn)空耗CPU,n乃空耗CPU的次數(shù)。
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
procyield(active_spin_cnt) 的底層通過執(zhí)行PAUSE指令來空耗30個CPU時鐘周期。
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
runtime_SemacquireMutex(&m.sema, queueLifo, 1) 將當前g放到mutex的等待隊列中去
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) 若lifo為true,則把g放到等待隊列隊首,若lifo為false,則把g放到隊尾
atomic.AddInt32(int32_t *val, int32_t delta) 原語:給t加上t_delta
uint32_t
AddUint32 (uint32_t *val, uint32_t delta)
{
return __atomic_add_fetch (val, delta, __ATOMIC_SEQ_CST);
}
解鎖Unlock

Unlock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 如果沒有g在等待鎖則立即釋放鎖
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 如果還有g在等待鎖,則在鎖釋放后需要做一點收尾工作。
m.unlockSlow(new)
}
}
unlockSlow
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果鎖處在正常模式下
if new&mutexStarving == 0 {
old := new
for {
// 如果鎖正處在正常模式下,同時 沒有等待鎖的g 或者 已經有g被喚醒了 或者 鎖已經被搶了,就什么也不用做直接返回
// 如果鎖正處在饑餓模式下,也是什么也不用做直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 給鎖的喚醒標志位置1,表示已經有g被喚醒了,Mutex.state后三位010
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 喚醒鎖的等待隊列頭部的一個g
// 并把g放到p的funq尾部
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 鎖處在饑餓模式下,直接喚醒鎖的等待隊列頭部的一個g
//因為在饑餓模式下沒人跟剛被喚醒的g搶鎖,所以不用設置鎖的喚醒標志位
runtime_Semrelease(&m.sema, true, 1)
}
}
runtime_Semrelease(&m.sema, false, 1) 用來釋放mutex等待隊列上的一個g
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
semrelease1(addr, handoff, skipframes) 參數(shù)handoff若為true,則讓被喚醒的g立刻繼承當前g的時間片繼續(xù)執(zhí)行。若handoff為false,則把剛被喚醒的g放到當前p的runq中。
到此這篇關于Golang中sync.Mutex的源碼分析 的文章就介紹到這了,更多相關Golang sync.Mutex內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
golang結合mysql設置最大連接數(shù)和最大空閑連接數(shù)
本文介紹golang?中連接MySQL時,如何設置最大連接數(shù)和最大空閑連接數(shù),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02

