Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程方法
為什么需要協(xié)程
協(xié)程的本質(zhì)是將一段數(shù)據(jù)的運(yùn)行狀態(tài)進(jìn)行打包,可以在線程之間調(diào)度,所以協(xié)程就是在單線程的環(huán)境下實(shí)現(xiàn)的應(yīng)用程序級(jí)別的并發(fā),就是把本來(lái)由操作系統(tǒng)控制的切換+保存狀態(tài)在應(yīng)用程序里面實(shí)現(xiàn)了。
所以我們需要協(xié)程的目的其實(shí)就是它更加節(jié)省資源、可以在有限的資源內(nèi)支持更高的并發(fā),體現(xiàn)在以下三個(gè)方面:
- 資源利用:程可以利用任何的線程去運(yùn)行,不需要等待CPU的調(diào)度。
- 快速調(diào)度:協(xié)程可以快速地調(diào)度(避開(kāi)了系統(tǒng)調(diào)用和切換),快速的切換。
- 超高并發(fā):有限的線程就可以并發(fā)很多的協(xié)程。
協(xié)程的本質(zhì)
協(xié)程在go語(yǔ)言中使用runtime\runtime2.go下的g結(jié)構(gòu)體來(lái)表示,這個(gè)結(jié)構(gòu)體中包含了協(xié)程的很多信息,我們只挑選其中的重要字段來(lái)進(jìn)行分析:
type g struct {
// 協(xié)程的棧幀,里面包含了兩個(gè)字段:lo和hi,分別是協(xié)程棧的高位指針和低位指針
stack stack
// gobuf結(jié)構(gòu)體中儲(chǔ)存了很多與協(xié)程棧相關(guān)的指針,比如pc、sp
sched gobuf
// 用來(lái)標(biāo)記協(xié)程當(dāng)前的狀態(tài)
atomicstatus uint32
// 每個(gè)協(xié)程的唯一標(biāo)識(shí),不向應(yīng)用層暴露。但是goid的地址會(huì)存在寄存器里面,可以通過(guò)ebpf工具無(wú)侵入地去獲取
goid int64
}

對(duì)線程的描述
我們知道,go語(yǔ)言中的協(xié)程是跑在線程上面的,那么go中肯定會(huì)有對(duì)線程的抽象描述,這個(gè)結(jié)構(gòu)體也在runtime\runtime2.go中,我們只展示重要的部分:
type m struct {
// 每次啟動(dòng)一個(gè)M都會(huì)第一個(gè)創(chuàng)建的gourtine,用于操作調(diào)度器,所以它不指向任何函數(shù),只負(fù)責(zé)調(diào)度
g0 *g // goroutine with scheduling stack
// 當(dāng)前正在線程上運(yùn)行的協(xié)程
curg *g // current running goroutine
// 線程id
id int64
// 記錄每種操作系統(tǒng)對(duì)于線程額外的描述信息
mOS
}
協(xié)程如何在線程中執(zhí)行
我們從最簡(jiǎn)單的單線程調(diào)度模型來(lái)看,協(xié)程在線程中的執(zhí)行流程可以參考下圖:

線程循環(huán)
在go中每個(gè)線程都是循環(huán)執(zhí)行一系列工作,又稱作單線程循環(huán)如下圖所示:左側(cè)為棧,右側(cè)為線程執(zhí)行的函數(shù)順序,其中的業(yè)務(wù)方法就是協(xié)程方法。
普通協(xié)程棧只能記錄業(yè)務(wù)方法的業(yè)務(wù)信息,且當(dāng)線程沒(méi)有獲得協(xié)程之前是沒(méi)有普通協(xié)程棧的。所以在內(nèi)存中開(kāi)辟了一個(gè)g0棧,專門用于記錄函數(shù)調(diào)用跳轉(zhuǎn)的信息,因此g0棧其實(shí)就是調(diào)度中心的棧。
線程循環(huán)會(huì)按順序循環(huán)去執(zhí)行上圖右側(cè)的函數(shù):schedule->execute->gogo->業(yè)務(wù)方法->goexit。
schedule
schedule函數(shù)的作用是為當(dāng)前的P獲取一個(gè)可以執(zhí)行的g,并執(zhí)行它。
- 首先會(huì)有1/61的概率檢查全局隊(duì)列,確保全局隊(duì)列中的G也會(huì)被調(diào)度。
- 然后有60/61的概率從本地隊(duì)列中獲取g。
- 如果從本地隊(duì)列中沒(méi)有獲取到可執(zhí)行的g,就會(huì)調(diào)用
findrunnable函數(shù)去獲取。
findrunnable函數(shù)的流程:
- 調(diào)用runqget函數(shù)來(lái)從P自己的runnable G隊(duì)列中得到一個(gè)可以執(zhí)行的G;
- 如果1失敗,調(diào)用globrunqget函數(shù)從全局runnableG隊(duì)列中得到一個(gè)可以執(zhí)行的G;
- 如果2失敗,調(diào)用netpoll(非阻塞)函數(shù)取一個(gè)異步回調(diào)的G;
- 如果3失敗,嘗試從其他P那里偷取一半數(shù)量的G過(guò)來(lái);
- 如果4失敗,再次調(diào)用globrunqget函數(shù)從全局runnableG隊(duì)列中得到一個(gè)可以執(zhí)行的G;
- 如果5失敗,調(diào)用netpoll(阻塞)函數(shù)取一個(gè)異步回調(diào)的G;
- 如果6仍然沒(méi)有取到G,那么調(diào)用stopm函數(shù)停止這個(gè)M。
如果獲取到了可執(zhí)行的g,就調(diào)用execute函數(shù)去執(zhí)行。
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
......
// 新建一個(gè)gp變量,gp就是即將要運(yùn)行的協(xié)程指針
var gp *g
var inheritTime bool
// 垃圾回收相關(guān)的工作
......
// 調(diào)度過(guò)程中有1/61的概率檢查全局隊(duì)列,確保全局隊(duì)列中的G也會(huì)被調(diào)度。
// M綁定的P首先有1/61概率從全局隊(duì)列獲取G,60/61概率從本地隊(duì)列獲取G
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 從本地隊(duì)列中獲取g
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
// 如果從本地隊(duì)列獲取失敗,就會(huì)調(diào)用findrunnable函數(shù)去獲取g
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
......
execute(gp, inheritTime)
}
execute
execute函數(shù)會(huì)為schedule獲取到的可執(zhí)行協(xié)程初始化相關(guān)結(jié)構(gòu)體,然后以sched結(jié)構(gòu)體為參數(shù)調(diào)用gogo函數(shù):
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// 初始化g結(jié)構(gòu)體
// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
......
// 匯編實(shí)現(xiàn)的函數(shù),通過(guò)gobuf結(jié)構(gòu)體中的信息,跳轉(zhuǎn)到執(zhí)行業(yè)務(wù)的方法
gogo(&gp.sched)
gogo
gogo函數(shù)實(shí)際上是匯編實(shí)現(xiàn)的,每個(gè)操作系統(tǒng)實(shí)現(xiàn)的gogo方法是不同的,它會(huì)通過(guò)傳進(jìn)來(lái)的gobuf結(jié)構(gòu)體,先向普通協(xié)程棧中壓入goexit函數(shù),然后跳轉(zhuǎn)到執(zhí)行業(yè)務(wù)的方法,協(xié)程棧也會(huì)被切換成業(yè)務(wù)協(xié)程自己的棧。
業(yè)務(wù)方法
業(yè)務(wù)方法就是協(xié)程中需要執(zhí)行的相關(guān)函數(shù)。
goexit
goexit也是匯編實(shí)現(xiàn)的,當(dāng)執(zhí)行完協(xié)程棧中的業(yè)務(wù)方法之后,就會(huì)退到goexit方法中,它會(huì)將業(yè)務(wù)協(xié)程的棧切換成調(diào)度器的棧(也就是g0棧),然后重新調(diào)用schedule函數(shù),形成一個(gè)閉環(huán)。
GMP調(diào)度模型
上述的調(diào)度模型是單線程的,但是現(xiàn)代CPU往往是多核的,應(yīng)用采用的也是多線程,因此單線程調(diào)度模型有些浪費(fèi)資源。所以我們?cè)趯?shí)際使用中,其實(shí)是一種多線程循環(huán)。但是多個(gè)線程在獲取可執(zhí)行g(shù)的時(shí)候就會(huì)存在并發(fā)沖突的問(wèn)題,所以就有了GMP調(diào)度模型。
GMP調(diào)度模型簡(jiǎn)單來(lái)說(shuō)是這樣的:
G是指協(xié)程goroutine,M是指操作系統(tǒng)線程,P是指調(diào)度器。
首先,GMP調(diào)度模型中有一個(gè)全局隊(duì)列,用于存放等待運(yùn)行的G。然后每個(gè)P都有自己的本地隊(duì)列,存放的也是等待運(yùn)行的G,但是存的數(shù)量有限,不會(huì)超過(guò)256個(gè)。我們新建goroutine的時(shí)候,是優(yōu)先放到P的本地隊(duì)列中的,如果隊(duì)列滿了,會(huì)把本地隊(duì)列中一半的G都移到全局隊(duì)列中。
線程想運(yùn)行任務(wù)就得獲取P,從P的本地隊(duì)列獲取G,G執(zhí)行之后,M會(huì)從P獲取下一個(gè)G,不斷重復(fù)下去。P隊(duì)列為空時(shí),M會(huì)嘗試從全局隊(duì)列拿一批G放到P的本地隊(duì)列,如果獲取不到就會(huì)從其他P的本地隊(duì)列偷一半放到自己P的本地隊(duì)列。
當(dāng)M執(zhí)行某一個(gè)G時(shí)候如果發(fā)生了系統(tǒng)調(diào)用或者其余阻塞操作,M會(huì)阻塞,如果當(dāng)前有一些G在執(zhí)行,runtime會(huì)把這個(gè)線程M從P中摘除(detach),然后再創(chuàng)建一個(gè)新的操作系統(tǒng)的線程(如果有空閑的線程可用就復(fù)用空閑線程)來(lái)服務(wù)于這個(gè)P。當(dāng)M系統(tǒng)調(diào)用結(jié)束時(shí)候,這個(gè)G會(huì)嘗試獲取一個(gè)空閑的P執(zhí)行,并放入到這個(gè)P的本地隊(duì)列。如果獲取不到P,那么這個(gè)線程M變成休眠狀態(tài), 加入到空閑線程中,然后這個(gè)G會(huì)被放入全局隊(duì)列中。
P的底層結(jié)構(gòu)
我們發(fā)現(xiàn)GMP調(diào)度模型中有一個(gè)P,P就是調(diào)度器,我們來(lái)看一下P的底層數(shù)據(jù)結(jié)構(gòu),同樣在runtime\runtime2.go文件中:
type p struct {
id int32
status uint32 // one of pidle/prunning/...
// 指向調(diào)度器服務(wù)的那個(gè)線程
m muintptr // back-link to associated m (nil if idle)
// Queue of runnable goroutines. Accessed without lock.
// 調(diào)度器的本地隊(duì)列,因?yàn)橹环?wù)于一個(gè)線程,所以可以無(wú)鎖的訪問(wèn),隊(duì)列本身實(shí)際上是一個(gè)大小為256的指針數(shù)組
runqhead uint32
runqtail uint32
runq [256]guintptr
// 指向下一個(gè)可用g的指針
runnext guintptr
}協(xié)程并發(fā)
我們上面介紹的調(diào)度模型實(shí)際上是非搶占式的,非搶占式模型的特點(diǎn)就是只有當(dāng)協(xié)程主動(dòng)讓出后,M才會(huì)去運(yùn)行本地隊(duì)列后面的協(xié)程,那么這樣就很容易造成隊(duì)列尾部的協(xié)程餓死。
其實(shí)Go語(yǔ)言的協(xié)程是基于搶占式來(lái)實(shí)現(xiàn)的,也就是當(dāng)協(xié)程執(zhí)行一段時(shí)間后將當(dāng)前任務(wù)暫定,執(zhí)行后續(xù)協(xié)程任務(wù),防止時(shí)間敏感攜程執(zhí)行失敗。如下圖所示:

搶占式調(diào)度
當(dāng)目前線程中執(zhí)行的協(xié)程是一個(gè)超長(zhǎng)時(shí)間的任務(wù),此時(shí)先保存該協(xié)程的運(yùn)行狀態(tài)也就是保護(hù)現(xiàn)場(chǎng),若是后續(xù)還需繼續(xù)執(zhí)行就將其放入本地隊(duì)列中去,如果不需要執(zhí)行就將其處于休眠狀態(tài),然后直接跳轉(zhuǎn)到schedule函數(shù)中。
實(shí)現(xiàn):
- 主動(dòng)掛取:gopark方法,當(dāng)業(yè)務(wù)調(diào)用這個(gè)方法線程就會(huì)直接回到schedule函數(shù)并切換協(xié)程棧,當(dāng)前運(yùn)行的協(xié)程將會(huì)處于等待狀態(tài),等待狀態(tài)的協(xié)程是無(wú)法立即進(jìn)入任務(wù)隊(duì)列中的。程序員無(wú)法主動(dòng)調(diào)用gopark函數(shù),但是我們可以通過(guò)Sleep等具有g(shù)opark的函數(shù)來(lái)進(jìn)行主動(dòng)掛取,Sleep五秒之后系統(tǒng)將會(huì)把任務(wù)的等待狀態(tài)更改為運(yùn)行狀態(tài)放入隊(duì)列中。
- 系統(tǒng)調(diào)用完成時(shí):go程序在運(yùn)行狀態(tài)中進(jìn)行了系統(tǒng)調(diào)用,那么當(dāng)系統(tǒng)的底層調(diào)用完成后就會(huì)調(diào)用exitsyscall函數(shù),線程就會(huì)停止執(zhí)行當(dāng)前協(xié)程,將當(dāng)前協(xié)程放入隊(duì)列中去。
- 標(biāo)記搶占morestack():當(dāng)函數(shù)跳轉(zhuǎn)時(shí)都會(huì)調(diào)用這個(gè)方法,它的本意在于檢查當(dāng)前協(xié)程??臻g是否有足夠內(nèi)存,如果不夠就要擴(kuò)大該??臻g。當(dāng)系統(tǒng)監(jiān)控到協(xié)程運(yùn)行超過(guò)
10ms,就將g.stackguard0置為0xfffffade(該值是一個(gè)搶占標(biāo)志),讓程序在只執(zhí)行morestack函數(shù)時(shí)順便判斷一下是否將g中的stackguard置為搶占,如果的確被標(biāo)記搶占,就回到schedule方法,并將當(dāng)前協(xié)程放回隊(duì)列中。
全局隊(duì)列的饑餓問(wèn)題
上述操作讓本地隊(duì)列成了一個(gè)小循環(huán),但是如果目前系統(tǒng)中的線程的本地隊(duì)列中都擁有一個(gè)超大的協(xié)程任務(wù),那么所有的線程都將在一段時(shí)間內(nèi)處于忙碌狀態(tài),全局隊(duì)列中的任務(wù)將會(huì)長(zhǎng)期無(wú)法運(yùn)行,這個(gè)問(wèn)題又稱為全局隊(duì)列饑餓問(wèn)題,解決方式就是在本地隊(duì)列循環(huán)時(shí),以一定的概率從全局隊(duì)列中取出某個(gè)任務(wù),讓它也參與到本地循環(huán)當(dāng)中去。
其實(shí)在執(zhí)行schedule函數(shù)尋找可運(yùn)行g(shù)的時(shí)候,首先會(huì)去執(zhí)行下面的代碼,即調(diào)度過(guò)程中有1/61的概率去全局隊(duì)列中獲取可執(zhí)行的協(xié)程,防止全局隊(duì)列中的協(xié)程被餓死。
// 調(diào)度過(guò)程中有1/61的概率檢查全局隊(duì)列,確保全局隊(duì)列中的G也會(huì)被調(diào)度。
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
到此這篇關(guān)于Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程方法的文章就介紹到這了,更多相關(guān)Go協(xié)程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解go 動(dòng)態(tài)數(shù)組 二維動(dòng)態(tài)數(shù)組
這篇文章主要介紹了go 動(dòng)態(tài)數(shù)組 二維動(dòng)態(tài)數(shù)組,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07
詳解如何在Go中實(shí)現(xiàn)優(yōu)雅停止
和其他語(yǔ)言相比,Go 中有相同也有不同,相同的是實(shí)現(xiàn)思路上和其他語(yǔ)言沒(méi)啥差異,不同在于 Go 采用的是 goroutine + channel 的并發(fā)模型,與傳統(tǒng)的進(jìn)程線程相比,實(shí)現(xiàn)細(xì)節(jié)上存在差異,本文將從實(shí)際場(chǎng)景和它的一般實(shí)現(xiàn)方式展開(kāi),逐步討論這個(gè)話題,需要的朋友可以參考下2024-04-04
Golang實(shí)現(xiàn)四層負(fù)載均衡的示例代碼
做開(kāi)發(fā)的同學(xué)應(yīng)該經(jīng)常聽(tīng)到過(guò)負(fù)載均衡的概念,今天我們就來(lái)實(shí)現(xiàn)一個(gè)乞丐版的四層負(fù)載均衡,并用它對(duì)mysql進(jìn)行負(fù)載均衡測(cè)試,感興趣的可以了解一下2023-07-07
Golang使用Gin框架實(shí)現(xiàn)路由分類處理請(qǐng)求流程詳解
Gin是一個(gè)golang的微框架,封裝比較優(yōu)雅,具有快速靈活,容錯(cuò)方便等特點(diǎn),這篇文章主要介紹了Golang使用Gin框架實(shí)現(xiàn)路由分類處理請(qǐng)求,感興趣的同學(xué)可以參考下文2023-05-05
解決Go語(yǔ)言time包數(shù)字與時(shí)間相乘的問(wèn)題
這篇文章主要介紹了Go語(yǔ)言time包數(shù)字與時(shí)間相乘的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-04-04
Go語(yǔ)言開(kāi)發(fā)區(qū)塊鏈只需180行代碼(推薦)
這篇文章主要介紹了Go語(yǔ)言開(kāi)發(fā)區(qū)塊鏈只需180行代碼,文章中將不會(huì)涉及工作量證明算法(PoW)以及權(quán)益證明算法(PoS)這類的共識(shí)算法。需要的朋友可以參考下2018-05-05
go開(kāi)源Hugo站點(diǎn)渲染之模板詞法解析
這篇文章主要為大家介紹了go開(kāi)源Hugo站點(diǎn)渲染之模板詞法解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02

