一文帶你深入了解Golang中的Mutex
在我們的日常開發(fā)中,總會有時候需要對一些變量做并發(fā)讀寫,比如 web 應用在同時接到多個請求之后, 需要對一些資源做初始化,而這些資源可能是只需要初始化一次的,而不是每一個 http 請求都初始化, 在這種情況下,我們需要限制只能一個協(xié)程來做初始化的操作,比如初始化數(shù)據(jù)庫連接等, 這個時候,我們就需要有一種機制,可以限制只有一個協(xié)程來執(zhí)行這些初始化的代碼。 在 go 語言中,我們可以使用互斥鎖(Mutex)來實現(xiàn)這種功能。
互斥鎖的定義
這里引用一下維基百科的定義:
互斥鎖(Mutual exclusion,縮寫 Mutex)是一種用于多線程編程中,防止兩個線程同時對同一公共資源 (比如全局變量)進行讀寫的機制。該目的通過將代碼切片成一個一個的臨界區(qū)域(critical section)達成。 臨街區(qū)域指的是一塊對公共資源進行訪問的代碼,并非一種機制或是算法。
互斥,顧名思義,也就是只有一個線程能持有鎖。當然,在 go 中,是只有一個協(xié)程能持有鎖。
下面是一個簡單的例子:
var sum int // 和
var mu sync.Mutex // 互斥鎖
// add 將 sum 加 1
func add() {
// 獲取鎖,只能有一個協(xié)程獲取到鎖,
// 其他協(xié)程需要阻塞等待鎖釋放才能獲取到鎖。
mu.Lock()
// 臨界區(qū)域
sum++
mu.Unlock()
}
func TestMutex(t *testing.T) {
// 啟動 1000 個協(xié)程
var wg sync.WaitGroup
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
// 每個協(xié)程里面調用 add()
add()
wg.Done()
}()
}
// 等待所有協(xié)程執(zhí)行完畢
wg.Wait()
// 最終 sum 的值應該是 1000
assert.Equal(t, 1000, sum)
}上面的例子中,我們定義了一個全局變量 sum,用于存儲和,然后定義了一個互斥鎖 mu, 在 add() 函數(shù)中,我們使用 mu.Lock() 來加鎖,然后對 sum 進行加 1 操作, 最后使用 mu.Unlock() 來解鎖,這樣就保證了在任意時刻,只有一個協(xié)程能夠對 sum 進行加 1 操作, 從而保證了在并發(fā)執(zhí)行 add() 操作的時候 sum 的值是正確的。
上面這個例子,在我之前的文章中已經(jīng)作為例子出現(xiàn)過很多次了,這里不再贅述了。
go Mutex 的基本用法
Mutex 我們一般只會用到它的兩個方法:
Lock:獲取互斥鎖。(只會有一個協(xié)程可以獲取到鎖,通常用在臨界區(qū)開始的地方。)Unlock: 釋放互斥鎖。(釋放獲取到的鎖,通常用在臨界區(qū)結束的地方。)
Mutex 的模型可以用下圖表示:

說明:
- 同一時刻只能有一個協(xié)程獲取到
Mutex的使用權,其他協(xié)程需要排隊等待(也就是上圖的G1->G2->Gn)。 - 擁有鎖的協(xié)程從臨界區(qū)退出的時候需要使用
Unlock來釋放鎖,這個時候等待隊列的下一個協(xié)程可以獲取到鎖(實際實現(xiàn)比這里說的復雜很多,后面會細說),從而進入臨界區(qū)。 - 等待的協(xié)程會在
Lock調用處阻塞,Unlock的時候會使得一個等待的協(xié)程解除阻塞的狀態(tài),得以繼續(xù)執(zhí)行。
上面提到的這幾點也是 Mutex 的基本原理。
互斥鎖使用的兩個例子
了解了 go Mutex 基本原理之后,讓我們再來看看 Mutex 的一些使用的例子。
gin Context 中的 Set 方法
一個很常見的場景就是,并發(fā)對 map 進行讀寫,熟悉 go 的朋友應該知道,go 中的 map 是不支持并發(fā)讀寫的, 如果我們對 map 進行并發(fā)讀寫會導致 panic。
而在 gin 的 Context 結構體中,也有一個 map 類型的字段 Keys,用來在上下文間傳遞鍵值對數(shù)據(jù), 所以在通過 Set 來設置鍵值對的時候需要使用 c.mu.Lock() 來先獲取互斥鎖,然后再對 Keys 做設置。
// Set is used to store a new key/value pair exclusively for this context.
// It also lazy initializes c.Keys if it was not used previously.
func (c *Context) Set(key string, value any) {
// 獲取鎖
c.mu.Lock()
// 如果 Keys 還沒初始化,則進行初始化
if c.Keys == nil {
c.Keys = make(map[string]any)
}
// 設置鍵值對
c.Keys[key] = value
// 釋放鎖
c.mu.Unlock()
}同樣的,對 Keys 做讀操作的時候也需要使用互斥鎖:
// Get returns the value for the given key, ie: (value, true).
// If the value does not exist it returns (nil, false)
func (c *Context) Get(key string) (value any, exists bool) {
// 獲取鎖
c.mu.RLock()
// 讀取 key
value, exists = c.Keys[key]
// 釋放鎖
c.mu.RUnlock()
return
}可能會有人覺得奇怪,為什么從 map 中讀也還需要鎖。這是因為,如果讀的時候沒有鎖保護, 那么就有可能在 Set 設置的過程中,同時也在進行讀操作,這樣就會 panic 了。
這個例子想要說明的是,像 map 這種數(shù)據(jù)結構本身就不支持并發(fā)讀寫,我們這種情況下只有使用 Mutex 了。
sync.Pool 中的 pinSlow 方法
在 sync.Pool 的實現(xiàn)中,有一個全局變量記錄了進程內所有的 sync.Pool 對象,那就是 allPools 變量, 另外有一個鎖 allPoolsMu 用來保護對 allPools 的讀寫操作:
var ( // 保護 allPools 和 oldPools 的互斥鎖。 allPoolsMu Mutex // allPools is the set of pools that have non-empty primary // caches. Protected by either 1) allPoolsMu and pinning or 2) // STW. allPools []*Pool // oldPools is the set of pools that may have non-empty victim // caches. Protected by STW. oldPools []*Pool )
pinSlow 方法中會在 allPoolsMu 的保護下對 allPools 做讀寫操作:
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock() // 獲取鎖
defer allPoolsMu.Unlock() // 函數(shù)返回的時候釋放鎖
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p) // 全局變量修改
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}這個例子主要是為了說明使用 mu 的另外一種非常常見的場景:并發(fā)讀寫全局變量。
互斥鎖使用的注意事項
互斥鎖如果使用不當,可能會導致死鎖或者出現(xiàn) panic 的情況,下面是一些常見的錯誤:
- 忘記使用
Unlock釋放鎖。 Lock之后還沒Unlock之前又使用Lock獲取鎖。也就是重復上鎖,go 中的Mutex不可重入。- 死鎖:位于臨界區(qū)內不同的兩個協(xié)程都想獲取對方持有的不同的鎖。
- 還沒
Lock之前就Unlock。這會導致panic,因為這是沒有任何意義的。 - 復制
Mutex,比如將Mutex作為參數(shù)傳遞。
對于第 1 點,我們往往可以使用 defer 關鍵字來做釋放鎖的操作。第 2 點不太好發(fā)現(xiàn),只能在開發(fā)的時候多加注意。 第 3 點我們在使用鎖的時候可以考慮盡量避免在臨界區(qū)內再去使用別的鎖。 最后,Mutex 是不可以復制的,這個可以在編譯之前通過 go vet 來做檢查。
為什么 Mutex 不能被復制呢?因為 Mutex 中包含了鎖的狀態(tài),如果復制了,那么這個狀態(tài)也會被復制, 如果在復制前進行 Lock,復制后進行 Unlock,那就意味著 Lock 和 Unlock 操作的其實是兩個不同的狀態(tài), 這樣顯然是不行的,是釋放不了鎖的。
雖然不可以復制,但是我們可以通過傳遞指針類型的參數(shù)來傳遞 Mutex。
互斥鎖鎖定的是什么
在前一篇文章中,我們提到過,原子操作本質上是變量級的互斥鎖。而互斥鎖本身鎖定的又是什么呢? 其實互斥鎖本質上是一個信號量,它通過獲取釋放信號量,最終使得協(xié)程獲得某一個代碼塊的執(zhí)行權力。
也就是說,互斥鎖,鎖定的是一塊代碼塊。
我們以 go-zero 里面的 collection/fifo.go 為例子說明一下:
// Take takes the first element out of q if not empty.
func (q *Queue) Take() (any, bool) {
// 獲取互斥鎖(只能有一個協(xié)程獲取到鎖)
q.lock.Lock()
// 函數(shù)返回的時候釋放互斥鎖(獲取到鎖的協(xié)程釋放鎖之后,其他協(xié)程才能進行搶占鎖)
defer q.lock.Unlock()
// 下面的代碼只有搶占到(也就是互斥鎖鎖定的代碼塊)
if q.count == 0 {
return nil, false
}
element := q.elements[q.head]
q.head = (q.head + 1) % len(q.elements)
q.count--
return element, true
}除了鎖定代碼塊的這一個作用,有另外一個比較關鍵的地方也是我們不能忽視的, 那就是 互斥鎖并不保證臨界區(qū)內操作的變量不能被其他協(xié)程訪問。 互斥鎖只能保證一段代碼只能一個協(xié)程執(zhí)行,但是對于臨界區(qū)內涉及的共享資源, 你在臨界區(qū)外也依然是可以對其進行讀寫的。
我們以上面的代碼說明一下:在上面的 Take 函數(shù)中,我們對 q.head 和 q.count 都進行了操作, 雖然這些操作代碼位于臨界區(qū)內,但是臨界區(qū)并不保證持有鎖期間其他協(xié)程不會在臨界區(qū)外去修改 q.head 和 q.count。
下面就是一個非常典型的錯誤的例子:
import (
"fmt"
"sync"
"testing"
)
var mu sync.Mutex
var sum int
// 在鎖的保護下對 sum 做讀寫操作
func test() {
mu.Lock()
sum++
mu.Unlock()
}
func TestMutex(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1000)
for i := 0; i < 500; i++ {
go func() {
test()
wg.Done()
}()
// 位于臨界區(qū)外,也依然是可以對 sum 做讀寫操作的。
sum++
}
wg.Wait()
fmt.Println(sum)
}靠譜的做法是,對于有共享資源的讀寫的操作都使用 Mutex 保護起來。
當然,如果我們只有一個變量,那么可能使用原子操作就足夠了。
互斥鎖實現(xiàn)原理
互斥鎖的實現(xiàn)有以下幾個關鍵的地方:
- 信號量:這是操作系統(tǒng)中的同步對象。
- 等待隊列:獲取不到互斥鎖的協(xié)程,會放入到一個先入先出隊列的隊列尾部。這樣信號量釋放的時候,可以依次對它們喚醒。
- 原子操作:互斥鎖的實現(xiàn)中,使用了一個字段來記錄了幾種不同的狀態(tài),使用原子操作可以保證幾種狀態(tài)可以一次性變更完成。
我們先來看看 Mutex結構體定義:
type Mutex struct {
state int32 // 狀態(tài)字段
sema uint32 // 信號量
}其中 state 字段記錄了四種不同的信息:

這四種不同信息在源碼中定義了不同的常量:
const ( mutexLocked = 1 << iota // 表示有 goroutine 擁有鎖 mutexWoken // 喚醒(就是第 2 位) mutexStarving // 饑餓(第 3 位) mutexWaiterShift = iota // 表示第 4 位開始,表示等待者的數(shù)量 starvationThresholdNs = 1e6 // 1ms 進入饑餓模式的等待時間閾值 )
而 sema 的含義比較簡單,就是一個用作不同 goroutine 同步的信號量。
信號量
go 的 Mutex 是基于信號量來實現(xiàn)的,那信號量又是什么呢?
維基百科:信號量是一個同步對象,用于保持在 0 至指定最大值之間的一個計數(shù)值。當線程完成一次對該 semaphore 對象的等待(wait)時,該計數(shù)值減一;當線程完成一次對 semaphore 對象的釋放(release)時,計數(shù)值加一。
上面這個解釋有點難懂,通俗地說,就是一個數(shù)字,調用 wait 的時候,這個數(shù)字減去 1,調用 release 的時候,這個數(shù)字加上 1。 (還有一個隱含的邏輯是,如果這個數(shù)小于 0,那么調用 wait 的時候會阻塞,直到它大于 0。)
對應到 go 的 Mutex 中,有兩個操作信號量的函數(shù):
runtime_Semrelease: 自動遞增信號量并通知等待的 goroutine。runtime_SemacquireMutex: 是一直等到信號量大于 0,然后自動遞減。
我們注意到了,其實 runtime_SemacquireMutex 是有一個前提條件的,那就是等到信號量大于 0。 其實信號量的兩個操作 P/V 就是一個加 1 一個減 1,所以在實際使用的時候,也是需要一個獲取鎖的操作對應一個釋放鎖的操作, 否則,其他協(xié)程都無法獲取到鎖,因為信號量一直不滿足。
等待隊列
go 中如果已經(jīng)有 goroutine 持有互斥鎖,那么其他的協(xié)程會放入一個 FIFO 隊列中,如下圖:

說明:
G1表示持有互斥鎖的 goroutine,G2...Gn表示一個 goroutine 的等待隊列,這是一個先入先出的隊列。G1先持有鎖,得以進入臨界區(qū),其他想搶占鎖的 goroutine 阻塞在Lock調用處。G1在使用完鎖后,會使用Unlock來釋放鎖,本質上是釋放了信號量,然后會喚醒FIFO隊列頭部的goroutine。G2從FIFO隊列中移除,進入臨界區(qū)。G2使用完鎖之后也會使用Unlock來釋放鎖。
上面只是一個大概模型,在實際實現(xiàn)中,比這個復雜很多倍,下面會繼續(xù)深入講解。
原子操作
go 的 Mutex 實現(xiàn)中,state 字段是一個 32 位的整數(shù),不同的位記錄了四種不同信息,在這種情況下, 只需要通過原子操作就可以保證一次性實現(xiàn)對四種不同狀態(tài)信息的更改,而不需要更多額外的同步機制。
但是毋庸置疑,這種實現(xiàn)會大大降低代碼的可讀性,因為通過一個整數(shù)來記錄不同的信息, 就意味著,需要通過各種位運算來實現(xiàn)對這個整數(shù)不同位的修改,比如將上鎖的操作:
new |= mutexLocked
當然,這只是 Mutex 實現(xiàn)中最簡單的一種位運算了。下面以 state 記錄的四種不同信息為維度來具體講解一下:
1.mutexLocked:這是 state 的最低位,1 表示鎖被占用,0 表示鎖沒有被占用。
new := mutexLocked 新狀態(tài)為上鎖狀態(tài)
2.mutexWoken: 這是表示是否有協(xié)程被喚醒了的狀態(tài)
new = (old - 1<<mutexWaiterShift) | mutexWoken等待者數(shù)量減去 1 的同時,設置喚醒標識new &^= mutexWoken清除喚醒標識
3.mutexStarving:饑餓模式的標識
new |= mutexStarving 設置饑餓標識
4.等待者數(shù)量:state >> mutexWaiterShift 就是等待者的數(shù)量,也就是上面提到的 FIFO 隊列中 goroutine 的數(shù)量
new += 1 << mutexWaiterShift等待者數(shù)量加 1delta := int32(mutexLocked - 1<<mutexWaiterShift)上鎖的同時,將等待者數(shù)量減 1
這里并沒有涵蓋 Mutex 中所有的位運算,其他操作在下文講解源碼實現(xiàn)的時候會提到。
在上面做了這一系列的位運算之后,我們會得到一個新的 state 狀態(tài),假設名為 new,那么我們就可以通過 CAS 操作來將 Mutex 的 state 字段更新:
atomic.CompareAndSwapInt32(&m.state, old, new)
通過上面這個原子操作,我們就可以一次性地更新 Mutex 的 state 字段,也就是一次性更新了四種狀態(tài)信息。
這種通過一個整數(shù)記錄不同狀態(tài)的寫法在 sync 包其他的一些地方也有用到,比如 WaitGroup 中的 state 字段。
最后,對于這種操作,我們需要注意的是,因為我們在執(zhí)行 CAS 前后是沒有其他什么鎖或者其他的保護機制的, 這也就意味著上面的這個 CAS 操作是有可能會失敗的,那如果失敗了怎么辦呢?
如果失敗了,也就意味著肯定有另外一個 goroutine 率先執(zhí)行了 CAS 操作并且成功了,將 state 修改為了一個新的值。 這個時候,其實我們前面做的一系列位運算得到的結果實際上已經(jīng)不對了,在這種情況下,我們需要獲取最新的 state,然后再次計算得到一個新的 state。
所以我們會在源碼里面看到 CAS 操作是寫在 for 循環(huán)里面的。
Mutex 的公平性
在前面,我們提到 goroutien 獲取不到鎖的時候,會進入一個 FIFO 隊列的隊列尾,在實際實現(xiàn)中,其實沒有那么簡單, 為了獲得更好的性能,在實現(xiàn)的時候會盡量先讓運行狀態(tài)的 goroutine 獲得鎖,當然如果隊列中的 goroutine 等待太久(大于 1ms), 那么就會先讓隊列中的 goroutine 獲得鎖。
下面是文檔中的說明:
Mutex 可以處于兩種操作模式:正常模式和饑餓模式。在正常模式下,等待者按照FIFO(先進先出)的順序排隊,但是被喚醒的等待者不擁有互斥鎖,會與新到達的 Goroutine 競爭所有權。新到達的 Goroutine 有優(yōu)勢——它們已經(jīng)在 CPU 上運行,數(shù)量可能很多,因此被喚醒的等待者有很大的機會失去鎖。在這種情況下,它將排在等待隊列的前面。如果等待者未能在1毫秒內獲取到互斥鎖,則將互斥鎖切換到饑餓模式。 在饑餓模式下,互斥鎖的所有權直接從解鎖 Goroutine 移交給隊列前面的等待者。新到達的 Goroutine 即使看起來未被鎖定,也不會嘗試獲取互斥鎖,也不會嘗試自旋。相反,它們會將自己排隊在等待隊列的末尾。如果等待者獲得互斥鎖的所有權并發(fā)現(xiàn)(1)它是隊列中的最后一個等待者,或者(2)它等待時間少于1毫秒,則將互斥鎖切換回正常模式。 正常模式的性能要優(yōu)于饑餓模式,因為 Goroutine 可以連續(xù)多次獲取互斥鎖,即使有被阻塞的等待者。饑餓模式很重要,可以防止尾部延遲的病態(tài)情況。
簡單總結:
1.Mutex 有兩種模式:正常模式、饑餓模式。
2.正常模式下:
被喚醒的 goroutine 和正在運行的 goroutine 競爭鎖。這樣可以運行中的協(xié)程有機會先獲取到鎖,從而避免了協(xié)程切換的開銷。性能更好。
3.饑餓模式下:
優(yōu)先讓隊列中的 goroutine 獲得鎖,并且直接放棄時間片,讓給隊列中的 goroutine,運行中的 goroutine 想獲取鎖要到隊尾排隊。更加公平。
Mutex 源碼剖析
Mutex 本身的源碼其實很少,但是復雜程度是非常高的,所以第一次看的時候可能會非常懵逼,但是不妨礙我們去了解它的大概實現(xiàn)原理。
Mutex 中主要有兩個方法,Lock 和 Unlock,使用起來非常的簡單,但是它的實現(xiàn)可不簡單。下面我們就來深入了解一下它的實現(xiàn)。
Lock
Lock 方法的實現(xiàn)如下:
// Lock 獲取鎖。
// 如果鎖已在使用中,則調用 goroutine 將阻塞,直到互斥量可用。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 上鎖成功則直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
// 沒有上鎖成功,這個時候需要做的事情就有點多了。
m.lockSlow()
}在 Lock 方法中,第一次獲取鎖的時候是非常簡單的,一個簡單的原子操作設置一下 mutexLocked 標識就完成了。 但是如果這個原子操作失敗了,表示有其他 goroutine 先獲取到了鎖,這個時候就需要調用 lockSlow 來做一些額外的操作了:
// 獲取 mutex 鎖
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 當前協(xié)程開始等待的時間
starving := false // 當前協(xié)程是否是饑餓模式
awoke := false // 喚醒標志(是否當前協(xié)程就是被喚醒的協(xié)程)
iter := 0 // 自旋次數(shù)(超過一定次數(shù)如果還沒能獲得鎖,就進入等待)
old := m.state // 舊的狀態(tài),每次 for 循環(huán)會重新獲取當前的狀態(tài)字段
for {
// 自旋:目的是讓正在運行中的 goroutine 盡快獲取到鎖。
// 兩種情況不會自旋:
// 1. 饑餓模式:在饑餓模式下,鎖會直接交給等待隊列中的 goroutine,所以不會自旋。
// 2. 鎖被釋放了:另外如果運行到這里的時候,發(fā)現(xiàn)鎖已經(jīng)被釋放了,也就不需要自旋了。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 設置 mutexWoken 標識
// 如果自旋是有意義的,則會進入到這里,嘗試設置 mutexWoken 標識。
// 設置成功在持有鎖的 goroutine 獲取鎖的時候不會喚醒等待隊列中的 goroutine,下一個獲取鎖的就是當前 goroutine。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 各個判斷的含義:
// !awoke 已經(jīng)被喚醒過一次了,說明當前協(xié)程是被從等待隊列中喚醒的協(xié)程/又或者已經(jīng)成功設置 mutexWoken 標識了,不需要再喚醒了。
// old&mutexWoken == 0 如果不等于 0 說明有 goroutine 被喚醒了,不會嘗試設置 mutexWoken 標識
// old>>mutexWaiterShift != 0 如果等待隊列為空,當前 goroutine 就是下一個搶占鎖的 goroutine
// 前面的判斷都通過了,才會進行 CAS 操作嘗試設置 mutexWoken 標識
awoke = true
}
runtime_doSpin() // 自旋
iter++ // 自旋次數(shù) +1(超過一定次數(shù)會停止自旋)
old = m.state // 再次獲取鎖的最新狀態(tài),之后會檢查是否鎖被釋放了
continue // 繼續(xù)下一次檢查
}
new := old
// 饑餓模式下,新到達的 goroutines 必須排隊。
// 不是饑餓狀態(tài),直接競爭鎖。
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 進入等待隊列的兩種情況:
// 1. 鎖依然被占用。
// 2. 進入了饑餓模式。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 等待者數(shù)量 +1
}
// 已經(jīng)等待超過了 1ms,且鎖被其他協(xié)程占用,則進入饑餓模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 喚醒之后,需要重置喚醒標志。
// 不管有沒有獲取到鎖,都是要清除這個標識的:
// 獲取到鎖肯定要清除,如果獲取到鎖,需要讓其他運行中的 goroutine 來搶占鎖;
// 如果沒有獲取到鎖,goroutine 會阻塞,這個時候是需要持有鎖的 goroutine 來喚醒的,如果有 mutexWoken 標識,持有鎖的 goroutine 喚醒不了。
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 重置喚醒標志
}
// 成功設置新狀態(tài)
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原來鎖的狀態(tài)已釋放,并且不是饑餓狀態(tài),正常請求到了鎖,返回
if old&(mutexLocked|mutexStarving) == 0 { // 這意味著當前的 goroutine 成功獲取了鎖
break
}
// 如果已經(jīng)被喚醒過,會被加入到等待隊列頭。
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
// queueLifo 為 true,表示加入到隊列頭。否則,加入到隊列尾。
// (首次加入隊列加入到隊尾,不是首次加入則加入隊頭,這樣等待最久的 goroutine 優(yōu)先能夠獲取到鎖。)
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 從等待隊列中喚醒,檢查鎖是否應該進入饑餓模式。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 獲取當前的鎖最新狀態(tài)
old = m.state
// 如果鎖已經(jīng)處于饑餓狀態(tài),直接搶到鎖,返回。
// 饑餓模式下,被喚醒的協(xié)程可以直接獲取到鎖。
// 新來的 goroutine 都需要進入隊列等待。
if old&mutexStarving != 0 {
// 如果這個 goroutine 被喚醒并且 Mutex 處于饑餓模式,P 的所有權已經(jīng)移交給我們,
// 但 Mutex 處于不一致的狀態(tài):mutexLocked 未設置,我們仍然被視為等待者。修復這個問題。
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加鎖,并且減少等待者數(shù)量。
// 實際上是兩步操作合成了一步:
// 1. m.state = m.state + 1 (獲取鎖)
// 2. m.state = m.state - 1<<mutexWaiterShift(waiter - 1)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 清除饑餓狀態(tài)的兩種情況:
// 1. 如果不需要進入饑餓模式(當前被喚醒的 goroutine 的等待時間小于 1ms)
// 2. 原來的等待者數(shù)量為 1,說明是最后一個被喚醒的 goroutine。
if !starving || old>>mutexWaiterShift == 1 {
// 退出饑餓模式
delta -= mutexStarving
}
// 原子操作,設置新狀態(tài)。
atomic.AddInt32(&m.state, delta)
break
}
// 設置喚醒標記,重新?lián)屨兼i(會與那些運行中的 goroutine 一起競爭鎖)
awoke = true
iter = 0
} else {
// CAS 更新狀態(tài)失敗,獲取最新狀態(tài),然后重試
old = m.state
}
}
}我們可以看到,lockSlow 的處理非常的復雜,又要考慮讓運行中的 goroutine 盡快獲取到鎖,又要考慮不能讓等待隊列中的 goroutine 等待太久。
代碼中注釋很多,再簡單總結一下其中的流程:
1.為了讓循環(huán)中的 goroutine 可以先獲取到鎖,會先讓 goroutine 自旋等待鎖的釋放,這是因為運行中的 goroutine 正在占用 CPU,讓它先獲取到鎖可以避免一些不必要的協(xié)程切換,從而獲得更好的性能。
3.自旋完畢之后,會嘗試獲取鎖,同時也要根據(jù)舊的鎖狀態(tài)來更新鎖的不同狀態(tài)信息,比如是否進入饑餓模式等。
3.計算得到一個新的 state 后,會進行 CAS 操作嘗試更新 state 狀態(tài)。
4.CAS 失敗會重試上面的流程。
5.CAS 成功之后會做如下操作:
- 判斷當前是否已經(jīng)獲取到鎖,如果是,則返回,
Lock成功了。 - 會判斷當前的 goroutine 是否是已經(jīng)被喚醒過,如果是,會將當前 goroutine 加入到等待隊列頭部。
- 調用
runtime_SemacquireMutex,進入阻塞狀態(tài),等待下一次喚醒。 - 喚醒之后,判斷是否需要進入饑餓模式。
- 最后,如果已經(jīng)是饑餓模式,當前 goroutine 直接獲取到鎖,退出循環(huán),否則,再進行下一次搶占鎖的循環(huán)中。
具體流程我們可以參考一下下面的流程圖:

圖中有一些矩形方框描述了 unlockSlow 的關鍵流程。
Unlock
Unlock 方法的實現(xiàn)如下:
// Unlock 釋放互斥鎖。
// 如果 m 在進入 Unlock 時未被鎖定,則會出現(xiàn)運行時錯誤。
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
// unlock 成功
// unLock 操作實際上是將 state 減去 1。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 { // 等待隊列為空的時候直接返回了
// 喚醒一個等待鎖的 goroutine
m.unlockSlow(new)
}
}Unlock 做了兩件事:
- 釋放當前 goroutine 持有的互斥鎖:也就是將
state減去 1 - 喚醒等待隊列中的下一個 goroutine
如果只有一個 goroutine 在使用鎖,只需要簡單地釋放鎖就可以了。 但是如果有其他的 goroutine 在阻塞等待,那么持有互斥鎖的 goroutine 就有義務去喚醒下一個 goroutine。
喚醒的流程相對復雜一些:
// unlockSlow 喚醒下一個等待鎖的協(xié)程。
func (m *Mutex) unlockSlow(new int32) {
// 如果未加鎖,則會拋出錯誤。
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
// 下面的操作是喚醒一個在等待鎖的協(xié)程。
// 存在兩種情況:
// 1. 正常模式:
// a. 不需要喚醒:沒有等待者、鎖已經(jīng)被搶占、有其他運行中的協(xié)程在嘗試獲取鎖、已經(jīng)進入了饑餓模式
// b. 需要喚醒:其他情況
// 2. 饑餓模式:喚醒等待隊列頭部的那個協(xié)程
if new&mutexStarving == 0 {
// 不是饑餓模式
old := new
// 自旋
for {
// 下面幾種情況不需要喚醒:
// 1. 沒有等待者了(沒得喚醒)
// 2. 鎖已經(jīng)被占用(只能有一個 goroutine 持有鎖)
// 3. 有其他運行中的協(xié)程已經(jīng)被喚醒(運行中的 goroutine 通過自旋先搶占到了鎖)
// 4. 饑餓模式(饑餓模式下,所有新的 goroutine 都要排隊,饑餓模式會直接喚醒等待隊列頭部的 gorutine)
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 獲取到喚醒等待者的權力,開始喚醒一個等待者。
// 下面這一行實際上是兩個操作:
// 1. waiter 數(shù)量 - 1
// 2. 設置 mutexWoken 標志
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 正常模式下喚醒了一個 goroutine
//(第二個參數(shù)為 false,表示當前的 goroutine 在釋放信號量后還會繼續(xù)執(zhí)行直到用完時間片)
runtime_Semrelease(&m.sema, false, 1)
return
}
// 喚醒失敗,進行下一次嘗試。
old = m.state
}
} else {
// 饑餓模式:將互斥鎖的所有權移交給下一個等待者,并放棄我們的時間片,以便下一個等待者可以立即開始運行。
// 注意:如果“mutexLocked”未設置,等待者在喚醒后會將其設置。
// 但是,如果設置了“mutexStarving”,則仍然認為互斥鎖已被鎖定,因此新到來的goroutine不會獲取它。
//
// 當前的 goroutine 放棄 CPU 時間片,讓給阻塞在 sema 的 goroutine。
runtime_Semrelease(&m.sema, true, 1)
}
}unlockSlow 邏輯相比 lockSlow 要簡單許多,我們可以再結合下面的流程圖來閱讀上面的源碼:

runtime_Semrelease 第二個參數(shù)的含義
細心的朋友可能注意到了,在 unlockSlow 的實現(xiàn)中,有兩處地方調用了 runtime_Semrelease 這個方法, 這個方法的作用是釋放一個信號量,這樣可以讓阻塞在信號量上的 goroutine 得以繼續(xù)執(zhí)行。 它的第一個參數(shù)我們都知道,是信號量,而第二個參數(shù) true 和 false 分別傳遞了一次, 那么 true 和 false 分別有什么作用呢?
答案是,設置為 true 的時候,當前的 goroutine 會直接放棄自己的時間片, 將 P 的使用權交給 Mutex 等待隊列中的第一個 goroutine, 這樣的目的是,讓 Mutex 等待隊列中的 goroutine 可以盡快地獲取到鎖。
總結
互斥鎖在并發(fā)編程中也算是非常常見的一種操作了,使用互斥鎖可以限制只有一個 goroutine 可以進入臨界區(qū), 這對于并發(fā)修改全局變量、初始化等情況非常好用。最后,再總結一下本文所講述的內容:
1.互斥鎖是一種用于多線程編程中,防止兩個線程同時對同一公共資源進行讀寫的機制。go 中的互斥鎖實現(xiàn)是 sync.Mutex。
2.Mutex 的操作只有兩個:
Lock獲取鎖,同一時刻只能有一個 goroutine 可以獲取到鎖,其他 goroutine 會先通過自旋搶占鎖,搶不到則阻塞等待。Unlock釋放鎖,釋放鎖之前必須有 goroutine 持有鎖。釋放鎖之后,會喚醒等待隊列中的下一個 goroutine。
3.Mutex 常見的使用場景有兩個:
- 并發(fā)讀寫
map:如gin中Context的Keys屬性的讀寫。 - 并發(fā)讀寫全局變量:如
sync.Pool中對allPools的讀寫。
4.使用 Mutex 需要注意以下幾點:
- 不要忘記使用
Unlock釋放鎖 Lock之后,沒有釋放鎖之前,不能再次使用Lock- 注意不同 goroutine 競爭不同鎖的情況,需要考慮一下是否有可能會死鎖
- 在
Unlock之前,必須已經(jīng)調用了Lock,否則會panic - 在第一次使用
Mutex之后,不能復制,因為這樣一來Mutex的狀態(tài)也會被復制。這個可以使用go vet來檢查。
5.互斥鎖可以保護一塊代碼塊只能有一個 goroutine 執(zhí)行,但是不保證臨界區(qū)內操作的變量不被其他 goroutine 做并發(fā)讀寫操作。
6.go 的 Mutex 基于以下技術實現(xiàn):
- 信號量:這是操作系統(tǒng)層面的同步機制
- 隊列:在 goroutine 獲取不到鎖的時候,會將這些 goroutine 放入一個 FIFO 隊列中,下次喚醒會喚醒隊列頭的 goroutine
- 原子操作:
state字段記錄了四種不同的信息,通過原子操作就可以保證數(shù)據(jù)的完整性
7.go Mutex 的公平性:
- 正在運行的 goroutine 如果需要鎖的話,盡量讓它先獲取到鎖,可以避免不必要的協(xié)程上下文切換。會和被喚醒的 goroutine 一起競爭鎖。
- 但是如果等待隊列中的 goroutine 超過了 1ms 還沒有獲取到鎖,那么會進入饑餓模式
8.go Mutex 的兩種模式:
- 正常模式:運行中的 goroutine 有一定機會比等待隊列中的 goroutine 先獲取到鎖,這種模式有更好的性能。
- 饑餓模式:所有后來的 goroutine 都直接進入等待隊列,會依次從等待隊列頭喚醒 goroutine??梢杂行П苊馕惭舆t。
9.饑餓模式下,Unlock 的時候會直接將當前 goroutine 所在 P 的使用權交給等待隊列頭部的 goroutine,放棄原本屬于自己的時間片。
以上就是一文帶你深入了解Golang中的Mutex的詳細內容,更多關于Golang Mutex的資料請關注腳本之家其它相關文章!
相關文章
Go通過SJSON實現(xiàn)動態(tài)修改JSON
在Go語言 json 處理領域,在 json 數(shù)據(jù)處理中,讀取與修改是兩個核心需求,本文我們就來看看如何使用SJSON進行動態(tài)修改JSON吧,有需要的小伙伴可以了解下2025-03-03

