golang協(xié)程設(shè)計及調(diào)度原理
一、協(xié)程設(shè)計-GMP模型
線程是操作系統(tǒng)調(diào)度到CPU中執(zhí)行的基本單位,多線程總是交替式地搶占CPU的時間片,線程在上下文的切換過程中需要經(jīng)過操作系統(tǒng)用戶態(tài)與內(nèi)核態(tài)的切換。golang的協(xié)程(G)依然運行在工作線程(M)之上,但是借助語言的調(diào)度器,協(xié)程只需要在用戶態(tài)即可完成切換,工作線程是感受不到協(xié)程存在的。golang在設(shè)計上通過邏輯處理器(P)建立起了工作線程與協(xié)程之間的聯(lián)系。最簡單的GMP關(guān)系模型為(圖是靜態(tài)的,在程序運行的過程中,GMP三者之間的綁定關(guān)系都是不固定的):

1.工作線程M
工作線程是最終運行協(xié)程的實體。操作系統(tǒng)中的線程與在運行時代表線程的m結(jié)構(gòu)體進行了綁定:
// go/src/runtime/runtime2.go
type m struct {
g0 *g // goroutine with scheduling stack
tls [tlsSlots]uintptr // thread-local storage (for x86 extern register)
curg *g // current running goroutine
p puintptr // attached p for executing go code (nil if not executing go code)
nextp puintptr
oldp puintptr // the p that was attached before executing a syscall
park note
...
}
為了執(zhí)行g(shù)o代碼,每一個工作線程m都與一個邏輯處理器p進行綁定,同時記錄了線程當前正在運行的用戶協(xié)程curg。
每一個工作線程中都有一個特殊的協(xié)程g0,稱為調(diào)度協(xié)程,其主要作用是執(zhí)行協(xié)程調(diào)度。而普通的協(xié)程g無差別地用于執(zhí)行用戶代碼。當用戶協(xié)程g主動讓渡、退出或者是被搶占時,m內(nèi)部就需要重新執(zhí)行協(xié)程調(diào)度,這時需要從用戶協(xié)程g切換到調(diào)度協(xié)程g0,g0調(diào)度一個普通協(xié)程g來執(zhí)行用戶代碼,便從g0又切換回普通協(xié)程g。每個工作線程內(nèi)部都在完成g->g0->g這樣的調(diào)度循環(huán)。
操作系統(tǒng)的線程與m結(jié)構(gòu)體是通過線程本地存儲(thread-local storage)進行綁定的。普通的全局變量對進程中的所有線程可見,而線程本地存儲(tls)中的變量只對當前線程可見。系統(tǒng)線程通過m.tls即可在任意時刻獲取到當前線程上的正在運行的協(xié)程g、邏輯處理器p、特殊協(xié)程g0、線程結(jié)構(gòu)體m等信息。
2.邏輯處理器p
系統(tǒng)線程m想要運行用戶協(xié)程g,必須先綁定邏輯處理器p。在代碼中可以通過runtime.GOMAXPROCS()具體指定程序運行需要使用多少個邏輯處理器p。通常指定多少個邏輯處理器p最多就可以同時使用到多少個CPU核心數(shù)。
邏輯處理器p通過結(jié)構(gòu)體p進行定義:
type p struct {
id int32
status uint32 // one of pidle/prunning/...
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
m muintptr // back-link to associated m (nil if idle)
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
...
}在p中,通過字段m維護了與工作線程m的綁定關(guān)系。每一個邏輯處理器p都具有唯一的id,以及當前的狀態(tài)status。如果p的狀態(tài)為正在運行中,則必然綁定到了一個工作線程m上,當邏輯處理完成后,解綁工作線程(m==nil),p的狀態(tài)便是空閑的。需要注意的是,m與p的數(shù)量沒有絕對關(guān)系,當m阻塞時,p就會切換到一個空閑的m,當不存在空閑的m時,便會創(chuàng)建一個m。所以即使p的數(shù)量是1,也有可能會創(chuàng)建很多個m出來。
程序中往往有成千上萬的協(xié)程存在,不可能同時被執(zhí)行。協(xié)程需要進行調(diào)度執(zhí)行,而那些等待被調(diào)度執(zhí)行的協(xié)程存儲在運行隊列中。go語言調(diào)度器將運行隊列分為全局運行隊列與局部運行隊列。邏輯處理器p中維護了局部運行隊列runq。局部運行隊列是每個p特有的長度為256的數(shù)組。該數(shù)組模擬了一個循環(huán)隊列,p.runqhead為隊頭,p.runqtail為隊尾,協(xié)程g都從隊尾入隊,從隊頭獲取。而全局運行隊列維護在schedt.runq中(見后文)。
p中還有一個特殊的runnext字段,用于標識下一個要執(zhí)行的協(xié)程g,如果p.runnext不為空,則會直接執(zhí)行runnext指向的協(xié)程,而不會再去p.runq數(shù)組中尋找。
3.協(xié)程g
協(xié)程通常分為特殊的調(diào)度協(xié)程g0以及執(zhí)行用戶代碼的普通協(xié)程g。
無論g0還是g,都通過結(jié)構(gòu)體g進行定義:
// go/src/runtime/runtime2.go
type g struct {
stack stack // offset known to runtime/cgo
m *m // current m; offset known to arm liblink
sched gobuf
...
}
// Stack describes a Go execution stack.
type stack struct {
lo uintptr
hi uintptr
}
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr // for framepointer-enabled architectures
}協(xié)程g中包含了協(xié)程的執(zhí)行??臻g(stack),執(zhí)行當前協(xié)程的工作線程m以及執(zhí)行現(xiàn)場sched。協(xié)程g執(zhí)行上下文切換時需要保存當前的執(zhí)行現(xiàn)場,以便在切回協(xié)程g時能夠繼續(xù)正常執(zhí)行。協(xié)程g中的執(zhí)行現(xiàn)場由結(jié)構(gòu)體gobuf定義,其保存了CPU中幾個重要的寄存器值,以及執(zhí)行現(xiàn)場信息屬于哪個協(xié)程g。
4.全局調(diào)度信息schedt
golang協(xié)程設(shè)計中,除了工作線程m、邏輯處理器p、協(xié)程g以外,還存在一個存儲全局調(diào)度信息的結(jié)構(gòu)體schedt:
// go/src/runtime/runtime2.go
type schedt struct {
lock mutex
midle muintptr // idle m's waiting for work
nmidle int32 // number of idle m's waiting for work
nmidlelocked int32 // number of locked m's waiting for work
mnext int64 // number of m's that have been created and next M ID
maxmcount int32 // maximum number of m's allowed (or die)
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // cumulative number of freed m's
ngsys uint32 // number of system goroutines; updated atomically
pidle puintptr // idle p's
npidle uint32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue.
runq gQueue
runqsize int32
// Global cache of dead G's.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
// freem is the list of m's waiting to be freed when their
// m.exited is set. Linked through m.freelink.
freem *m
...
}
schedt中維護了空閑的工作線程midle、空閑工作線程的數(shù)量nmidle、等待被釋放的線程列表freem、系統(tǒng)協(xié)程g的數(shù)量ngsys、空閑邏輯處理器pidle、空閑邏輯處理器的數(shù)量npidle、以及全局運行隊列runq及全局運行隊列的大小runqsize、處于新建或者被銷毀狀態(tài)的協(xié)程g列表gFree等信息。
schedt中的信息是全局共享的,例如全局運行隊列runq被所有p共享,所以schedt中也持有一個鎖lock以保證原子性訪問。
5.GMP詳細示圖
通過上述說明,我們可以進一步細化GMP模型示圖為:

二、協(xié)程調(diào)度
已經(jīng)知道,每個工作線程m中都有一個調(diào)度協(xié)程g0,專門執(zhí)行協(xié)程的調(diào)度循環(huán)(g->g0->g->g0-g)。在調(diào)度循環(huán)中,協(xié)程g具體是如何被調(diào)度的呢?go語言調(diào)度器實現(xiàn)了自己的調(diào)度策略。
1.調(diào)度策略
工作線程m需要通過協(xié)程調(diào)度獲得具體可運行的某一協(xié)程g。
獲取協(xié)程g的一般策略主要包含三大步:
- 1. 查找p本地的局部運行隊列
- 2. 查找schedt中的全局運行隊列
- 3. 竊取其他p中的局部運行隊列
在運行時通過findRunnable()函數(shù)獲取可運行的協(xié)程g:
// go/src/runtime/proc.go
// Finds a runnable goroutine to execute.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_p_, 1)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
...
// local runq
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime, false
}
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
...
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
gp, inheritTime, tnow, w, newWork := stealWork(now)
now = tnow
if gp != nil {
// Successfully stole.
return gp, inheritTime, false
}
...
}
}獲取本地運行隊列
在查找可運行的協(xié)程g時,首先通過函數(shù)runqget()從p本地的運行隊列中獲取:
首先嘗試從runnext中獲取下一個執(zhí)行的g。當runnext不為空時則返回對應的協(xié)程g,如果為空則繼續(xù)從局部運行隊列runq中查找。 當循環(huán)隊列的隊頭runqhead和隊尾runqtail相同時,說明循環(huán)隊列中沒有任何可運行的協(xié)程,否則從隊列頭部獲取一個協(xié)程返回。 由于可能存在其他邏輯處理器p來竊取協(xié)程,從而造成當前p與其他p同時訪問局部隊列的情況,因此在此處需要加鎖訪問,訪問結(jié)束后釋放鎖。
// go/src/runtime/proc.go
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
next := _p_.runnext
// If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
// because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
// Hence, there's no need to retry this CAS if it falls.
if next != 0 && _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}協(xié)程調(diào)度時由于總是優(yōu)先查找局部運行隊列中的協(xié)程g,如果只是循環(huán)往復的地執(zhí)行局部隊列中的g,那么全局隊列中的g可能一個都不會被調(diào)度到。
因此,為了保證調(diào)度的公平性,p中每執(zhí)行61次調(diào)度,就會優(yōu)先從全局隊列中獲取一個g到當前p中執(zhí)行:
// go/src/runtime/proc.go
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_p_, 1)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
...
}
獲取全局運行隊列
當p每執(zhí)行61次調(diào)度,或者p本地運行隊列不存在可運行的協(xié)程時,需要從全局運行隊列中獲取一批協(xié)程分配給本地運行隊列。由于每個p共享了全局運行隊列,因此為了保證公平,需要將全局運行隊列中的g按照p的數(shù)量進行平分,平分后數(shù)量也不能超過局部運行隊列容量的一半(即128=256/2)。最后通過循環(huán)調(diào)用runqput將全局隊列中的g放入到p的局部運行隊列中。

// go/src/runtime/proc.go
// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(_p_ *p, max int32) *g {
assertLockHeld(&sched.lock)
if sched.runqsize == 0 {
return nil
}
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}協(xié)程竊取
當p在局部運行隊列、全局運行隊列中都找不到可運行的協(xié)程時,就需要從其他p的本地運行隊列中竊取一批可用的協(xié)程。所有的p都存儲在全局的allp []*p變量中, 調(diào)度器隨機在其中選擇一個p來進行協(xié)程竊取工作。竊取工作總共會執(zhí)行不超過4次,當竊取成功時即返回。
// go/src/runtime/proc.go
// stealWork attempts to steal a runnable goroutine or timer from any P.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg().m.p.ptr()
ranTimer := false
const stealTries = 4
for i := 0; i < stealTries; i++ {
stealTimersOrRunNextG := i == stealTries-1
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
// GC work may be available.
return nil, false, now, pollUntil, true
}
p2 := allp[enum.position()]
if pp == p2 {
continue
}
...
// Don't bother to attempt to steal if p2 is idle.
if !idlepMask.read(enum.position()) {
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
return gp, false, now, pollUntil, ranTimer
}
}
}
}
...
}
協(xié)程竊取的主要執(zhí)行邏輯通過runqsteal以及runqgrab函數(shù)實現(xiàn),竊取的核心邏輯是:將要竊取的p本地運行隊列中g(shù)個數(shù)的一半放入到自己的運行隊列中。

// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
...
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}2.調(diào)度時機
調(diào)度策略讓我們知道了協(xié)程是如何調(diào)度的,下面繼續(xù)說明什么時候會發(fā)生協(xié)程調(diào)度。
主動調(diào)度
協(xié)程可以選擇主動讓渡自己的執(zhí)行權(quán),這主要通過在代碼中主動執(zhí)行runtime.Gosched()函數(shù)實現(xiàn)。
- 主動調(diào)度會從當前協(xié)程g切換到g0并更新協(xié)程狀態(tài)由運行中
_Grunning變?yōu)榭蛇\行_Grunnable; - 然后通過
dropg()取消g與m的綁定關(guān)系; - 接著通過
globrunqput()將g放入到全局運行隊列中; - 最后調(diào)用
schedule()函數(shù)開啟新一輪的調(diào)度循環(huán)。
// go/src/runtime/proc.go
// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {
checkTimeouts()
mcall(gosched_m) //
}
// Gosched continuation on g0.
func gosched_m(gp *g) {
...
goschedImpl(gp) //
}
func goschedImpl(gp *g) {
...
casgstatus(gp, _Grunning, _Grunnable)
dropg() //
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
schedule()
}
// dropg removes the association between m and the current goroutine m->curg (gp for short).
func dropg() {
_g_ := getg()
setMNoWB(&_g_.m.curg.m, nil)
setGNoWB(&_g_.m.curg, nil)
}被動調(diào)度
當協(xié)程休眠、通道堵塞、網(wǎng)絡堵塞、垃圾回收導致暫停時,協(xié)程會被動讓渡出執(zhí)行的權(quán)利給其他可運行的協(xié)程繼續(xù)執(zhí)行。調(diào)度器通過gopark()函數(shù)執(zhí)行被動調(diào)度邏輯。gopark()函數(shù)最終調(diào)用park_m()函數(shù)來完成調(diào)度邏輯。
- 首先會從當前協(xié)程g切換到g0并更新協(xié)程狀態(tài)由運行中
_Grunning變?yōu)榈却?code>_Gwaiting; - 然后通過
dropg()取消g與m的綁定關(guān)系; - 接著執(zhí)行
waitunlockf函數(shù),如果該函數(shù)返回false,則協(xié)程g立即恢復執(zhí)行,否則等待喚醒; - 最后調(diào)用
schedule()函數(shù)開啟新一輪的調(diào)度循環(huán)。
// go/src/runtime/proc.go
// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
...
mcall(park_m)
}
// park continuation on g0.
func park_m(gp *g) {
...
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
...
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}與主動調(diào)度不同的是,被動調(diào)度的協(xié)程g不會放入到全局隊列中進行調(diào)度。而是一直處于等待中_Gwaiting狀態(tài)等待被喚醒。當?shù)却械膮f(xié)程被喚醒時,協(xié)程的狀態(tài)由_Gwaiting變?yōu)榭蛇\行_Grunnable狀態(tài),然后被添加到當前p的局部運行隊列中。喚醒邏輯通過函數(shù)goready()調(diào)用ready()實現(xiàn):
// go/src/runtime/proc.go
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
...
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
wakep()
...
}搶占調(diào)度
go應用程序在啟動時會開啟一個特殊的線程來執(zhí)行系統(tǒng)監(jiān)控任務,系統(tǒng)監(jiān)控運行在一個獨立的工作線程m上,該線程不用綁定邏輯處理器p。系統(tǒng)監(jiān)控每隔10ms會檢測是否有準備就緒的網(wǎng)絡協(xié)程,并放置到全局隊列中。
為了保證每個協(xié)程都有執(zhí)行的機會,系統(tǒng)監(jiān)控服務會對執(zhí)行時間過長(大于10ms)的協(xié)程、或者處于系統(tǒng)調(diào)用(大于20微秒)的協(xié)程進行搶占。搶占的核心邏輯通過retake()函數(shù)實現(xiàn):
// go/src/runtime/proc.go
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
...
}
unlock(&allpLock)
return uint32(n)
}到此這篇關(guān)于golang協(xié)程設(shè)計及調(diào)度原理的文章就介紹到這了,更多相關(guān)go協(xié)程設(shè)計內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解go-zero如何使用validator進行參數(shù)校驗
這篇文章主要介紹了如何使用validator庫做參數(shù)校驗的一些十分實用的使用技巧,包括翻譯校驗錯誤提示信息、自定義提示信息的字段名稱、自定義校驗方法等,感興趣的可以了解下2024-01-01
Go?實現(xiàn)?Nginx?加權(quán)輪詢算法的方法步驟
本文主要介紹了Go?實現(xiàn)?Nginx?加權(quán)輪詢算法的方法步驟,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12
自己動手用Golang實現(xiàn)約瑟夫環(huán)算法的示例
這篇文章主要介紹了自己動手用Golang實現(xiàn)約瑟夫環(huán)算法的示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-12-12

