Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解
0. 簡(jiǎn)介
上篇博客介紹了goroutine的創(chuàng)建、執(zhí)行和退出的過(guò)程,并且提及了在協(xié)程切換時(shí)涉及的調(diào)度循環(huán),本篇博客我們就來(lái)探究一下其他情形引起的協(xié)程調(diào)度。
1. 協(xié)程調(diào)度發(fā)生的時(shí)機(jī)
在以下情形中,goroutine可能會(huì)發(fā)生調(diào)度:
| 情形 | 說(shuō)明 |
|---|---|
| go func(){} | 使用go關(guān)鍵字創(chuàng)建一個(gè)新的goroutine,調(diào)度器會(huì)考慮調(diào)度 |
| GC | 由于GC也需要在系統(tǒng)線程M上執(zhí)行,且其中需要所有的goroutine都停止運(yùn)行,所以也會(huì)發(fā)生調(diào)度 |
| 系統(tǒng)調(diào)用 | 發(fā)生系統(tǒng)的調(diào)用時(shí),會(huì)阻塞M,所以它會(huì)被調(diào)度走,同時(shí)新的goroutine也會(huì)被調(diào)度上來(lái) |
| 同步內(nèi)存訪問(wèn) | mutex、channel等操作會(huì)使得goroutine阻塞,因此會(huì)被調(diào)度走,等條件滿(mǎn)足后,還會(huì)被調(diào)度上來(lái)繼續(xù)運(yùn)行 |
2. 創(chuàng)建協(xié)程時(shí)的調(diào)度
其中,使用go關(guān)鍵字創(chuàng)建協(xié)程時(shí)的調(diào)度分析,上篇博客做了初步的分析,特別是有關(guān)調(diào)度循環(huán)的分析,但是我們沒(méi)有具體分析,當(dāng)創(chuàng)建協(xié)程時(shí),系統(tǒng)是怎么發(fā)生調(diào)度的。
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}我們還記得,go關(guān)鍵字在創(chuàng)建協(xié)程時(shí),Go的編譯器會(huì)將其轉(zhuǎn)換為runtime.newproc函數(shù),上篇我們?cè)敿?xì)分析了main goroutine的創(chuàng)建過(guò)程,在runtime.main函數(shù)中,全局變量mainStarted會(huì)被置為true,之后普通協(xié)程的創(chuàng)建,則會(huì)調(diào)用runtime.wakep函數(shù)嘗試喚醒空閑的P。
func wakep() {
if atomic.Load(&sched.npidle) == 0 {
return
}
// be conservative about spinning threads
if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}wakep函數(shù)首先確認(rèn)是否有其他線程正在處于spinning狀態(tài),即M是否在找工作,如果沒(méi)有的話,則調(diào)用startm函數(shù)創(chuàng)建一個(gè)新的、或者喚醒一個(gè)處于睡眠狀態(tài)的工作線程出來(lái)工作。
func startm(_p_ *p, spinning bool) {
// Disable preemption.
//
// Every owned P must have an owner that will eventually stop it in the
// event of a GC stop request. startm takes transient ownership of a P
// (either from argument or pidleget below) and transfers ownership to
// a started M, which will be responsible for performing the stop.
//
// Preemption must be disabled during this transient ownership,
// otherwise the P this is running on may enter GC stop while still
// holding the transient P, leaving that P in limbo and deadlocking the
// STW.
//
// Callers passing a non-nil P must already be in non-preemptible
// context, otherwise such preemption could occur on function entry to
// startm. Callers passing a nil P may be preemptible, so we must
// disable preemption before acquiring a P from pidleget below.
mp := acquirem() // 保證在此期間不會(huì)發(fā)生棧擴(kuò)展
lock(&sched.lock)
if _p_ == nil { // 沒(méi)有指定p,那么需要從空閑隊(duì)列中取一個(gè)p
_p_ = pidleget()
if _p_ == nil {// 如果沒(méi)有空閑的p,直接返回
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
releasem(mp)
return
}
}
nmp := mget() // 如果有空閑的p,那么取出一個(gè)空閑的m
if nmp == nil {// 如果沒(méi)有空閑的m,那么調(diào)用newm創(chuàng)建一個(gè),然后返回
// No M is available, we must drop sched.lock and call newm.
// However, we already own a P to assign to the M.
//
// Once sched.lock is released, another G (e.g., in a syscall),
// could find no idle P while checkdead finds a runnable G but
// no running M's because this new M hasn't started yet, thus
// throwing in an apparent deadlock.
//
// Avoid this situation by pre-allocating the ID for the new M,
// thus marking it as 'running' before we drop sched.lock. This
// new M will eventually run the scheduler to execute any
// queued G's.
id := mReserveID()
unlock(&sched.lock)
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_, id)
// Ownership transfer of _p_ committed by start in newm.
// Preemption is now safe.
releasem(mp)
return
}
unlock(&sched.lock)
if nmp.spinning {
throw("startm: m is spinning")
}
if nmp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
nmp.spinning = spinning
nmp.nextp.set(_p_)
notewakeup(&nmp.park) // 如果有空閑的m,則喚醒這個(gè)m
// Ownership transfer of _p_ committed by wakeup. Preemption is now
// safe.
releasem(mp)
}startm函數(shù)首先判斷是否有空閑的P,如果沒(méi)有則直接返回;如果有,則判斷是否有空閑的M,如果沒(méi)有,則新建一個(gè);如果有空閑的M,則喚醒這個(gè)M。說(shuō)白了,wakep函數(shù)就是為了更大程度的利用P,利用CPU資源。
說(shuō)到這里,我們就需要重溫一下上篇博客講到的,調(diào)度中獲取goroutine的規(guī)則是:
- 每調(diào)度61次就需要從全局隊(duì)列中獲取
goroutine; - 其次優(yōu)先從本P所在隊(duì)列中獲取
goroutine; - 如果還沒(méi)有獲取到,則從其他P的運(yùn)行隊(duì)列中竊取
goroutine;
其中,從其他P隊(duì)列中竊取goroutine,調(diào)用的是findrunnable函數(shù),這個(gè)函數(shù)很長(zhǎng),為了簡(jiǎn)化說(shuō)明,我們刪除一些不是很重要的代碼:
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
...
// local runq
// 再?gòu)谋镜仃?duì)列找找
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 再看看全局隊(duì)列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
...
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
gp, inheritTime, tnow, w, newWork := stealWork(now) // 調(diào)用stealWork盜取goroutine
now = tnow
if gp != nil {
// Successfully stole.
return gp, inheritTime
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
...
// return P and block
// 上面的竊取沒(méi)有成功,那么解除m和p的綁定,摒棄娥江p放到空閑隊(duì)列,然后去休眠
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
...
_g_.m.spinning = false // m即將睡眠,狀態(tài)不再是spinning
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
...
stopm() // 休眠
goto top
}從上面的代碼可以看出,工作線程會(huì)反復(fù)嘗試尋找運(yùn)行的goroutine,實(shí)在找不到的情況下才會(huì)進(jìn)入到睡眠。需要注意的是,工作線程M從其他P的本地隊(duì)列中盜取goroutine時(shí)的狀態(tài)稱(chēng)之為自旋(spinning)狀態(tài),而前面講到wakep調(diào)用startm函數(shù),也是優(yōu)先從自旋狀態(tài)的M中選取,實(shí)在沒(méi)有才去喚醒休眠的M,再?zèng)]有就創(chuàng)建新的M。
竊取算法stealWork我們就不分析了,有興趣的同學(xué)可以看看。下面具體分析下stopm是怎么實(shí)現(xiàn)線程睡眠的。
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) // 把m放到sched.midle空閑隊(duì)列
unlock(&sched.lock)
mPark()
acquirep(_g_.m.nextp.ptr()) // 綁定這個(gè)m和其下一個(gè)p,這里沒(méi)有看懂為啥這么操作
_g_.m.nextp = 0
}
func mPark() {
gp := getg()
notesleep(&gp.m.park) // 進(jìn)入睡眠狀態(tài)
noteclear(&gp.m.park)
}可以看出,stopm主要是將m對(duì)象放到調(diào)度器的空閑線程隊(duì)列,然后通過(guò)notesleep進(jìn)入睡眠狀態(tài)。note是go runtime實(shí)現(xiàn)的一次性睡眠和喚醒機(jī)制,通過(guò)notesleep進(jìn)入睡眠狀態(tài),然后另一個(gè)線程可以通過(guò)notewakeup喚醒這個(gè)線程。
小結(jié)
上面巴拉巴拉講了那么多,看的人有點(diǎn)頭暈,我們接下來(lái)講一個(gè)很小的例子梳理一下以上的邏輯(主線程的創(chuàng)建和執(zhí)行在上一篇博客中詳細(xì)敘述過(guò),這里不再贅述),主線程創(chuàng)建了一個(gè)goroutine,這時(shí)候會(huì)觸發(fā)wakep,接下來(lái)可能會(huì)喚醒空閑的工作線程(如果是第一個(gè)非main goroutine,就沒(méi)有空閑的工作線程),或者創(chuàng)建一個(gè)新的工作線程,或者什么都不做。
如果是創(chuàng)建一個(gè)新的工作線程,那么其開(kāi)啟執(zhí)行的點(diǎn)也是mstart函數(shù)(注意區(qū)分mstart和startm),然后在schedule函數(shù)中會(huì)嘗試去獲取goroutine,如果全局和本地的goroutine隊(duì)列都沒(méi)有,則會(huì)去其他的P上竊取goroutine,如果竊取不成功,則會(huì)休眠。
如果是去喚醒工作協(xié)程,喚醒后會(huì)在休眠的地方開(kāi)始,重新進(jìn)行竊取。
竊取到工作協(xié)程后,就會(huì)去執(zhí)行,然后就會(huì)因?yàn)楦鞣N原因重新開(kāi)始調(diào)度循環(huán)。

3. 主動(dòng)掛起
在Go中,有很多種情形會(huì)導(dǎo)致goroutine阻塞,即其主動(dòng)掛起,然后被調(diào)度走,等滿(mǎn)足其運(yùn)行條件時(shí),還會(huì)被調(diào)度上來(lái)繼續(xù)運(yùn)行。比如channel的讀寫(xiě),我們以通道的阻塞讀為例,來(lái)介紹goroutine的主動(dòng)掛起的調(diào)度方式。
3.1 協(xié)程掛起
和前面介紹的Map一樣,channel的讀也有以下兩種讀取方式:
v := <- ch v, ok := <- ch
分別對(duì)應(yīng)以下chanrecv1和chanrecv2函數(shù):
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}無(wú)論是哪個(gè)函數(shù),最終調(diào)用的都是chanrecv函數(shù):
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
c.recvq.enqueue(mysg) // 將這個(gè)goroutine放到channel的recv的queue中
atomic.Store8(&gp.parkingOnChan, 1)
// 掛起這個(gè)goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}chanrecv會(huì)先判斷channel是否有數(shù)據(jù)可讀,如果有則直接讀取并返回,如果沒(méi)有則將這個(gè)goroutine放到channel的recv的queue中,然后調(diào)用gopark函數(shù)將當(dāng)前goroutine掛起并阻塞。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}gopark函數(shù)則使用mcall函數(shù)(前面分析過(guò),主要作用是保存當(dāng)前goroutine現(xiàn)場(chǎng),然后切換到g0棧去調(diào)用作為參數(shù)傳入的函數(shù))取執(zhí)行park_m函數(shù):
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}park_m首先把當(dāng)前goroutine的狀態(tài)設(shè)置為_Gwaiting(因?yàn)樗诘却渌?code>goroutine往channel里面寫(xiě)數(shù)據(jù)),然后調(diào)用dropg函數(shù)解除g和m之間的關(guān)系,最后通過(guò)調(diào)用schedule函數(shù)進(jìn)入調(diào)度循環(huán)。
至此,一個(gè)goroutine就被主動(dòng)掛起了。
3.2 協(xié)程喚醒
我們繼續(xù)以上例子,當(dāng)另一個(gè)goroutine對(duì)這個(gè)channel發(fā)送數(shù)據(jù)的時(shí)候
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
goready(gp, skip+1)
}channel的發(fā)送流程和讀取類(lèi)似,當(dāng)檢查到接收隊(duì)列中有等待著時(shí),會(huì)調(diào)用send函數(shù)然后調(diào)用goready喚醒協(xié)程:
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
wakep()
releasem(mp)
}這里發(fā)現(xiàn),ready函數(shù)和創(chuàng)建協(xié)程時(shí)一樣,會(huì)觸發(fā)wakep來(lái)檢查是否需要喚醒空閑P來(lái)執(zhí)行。而在此之前,這個(gè)被喚醒的goroutine會(huì)放到P的本地隊(duì)列的下一個(gè)執(zhí)行goroutine,以提升時(shí)效性。
到這里,一個(gè)被掛起的協(xié)程也就被喚醒了。
4. 小結(jié)
本文我們分析了創(chuàng)建協(xié)程時(shí)發(fā)生的調(diào)度,也介紹了以channel讀寫(xiě)為例子的主動(dòng)掛起似的調(diào)度。而系統(tǒng)調(diào)用和GC觸發(fā)的調(diào)度比較復(fù)雜,我們放在后面介紹。
以上就是Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解的詳細(xì)內(nèi)容,更多關(guān)于Go goroutine調(diào)度的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go 實(shí)現(xiàn)百萬(wàn)WebSocket連接的方法示例
這篇文章主要介紹了Go 實(shí)現(xiàn)百萬(wàn)WebSocket連接的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08
?Go?語(yǔ)言實(shí)現(xiàn)?HTTP?文件上傳和下載
這篇文章主要介紹了Go語(yǔ)言實(shí)現(xiàn)HTTP文件上傳和下載,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容戒殺,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09
Go語(yǔ)言異步API設(shè)計(jì)的扇入扇出模式詳解
這篇文章主要為大家介紹了Go語(yǔ)言異步API設(shè)計(jì)的扇入扇出模式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08

