Golang服務(wù)的請(qǐng)求調(diào)度的實(shí)現(xiàn)
1. 寫在前面
最近在看相關(guān)的Go服務(wù)的請(qǐng)求調(diào)度的時(shí)候,發(fā)現(xiàn)在gin中默認(rèn)提供的中間件中,不含有請(qǐng)求調(diào)度相關(guān)的邏輯中間件,去github查看了一些服務(wù)框架,發(fā)現(xiàn)在go-zero中,有一個(gè)SheddingHandler的中間件來(lái)幫助服務(wù)請(qǐng)求進(jìn)行調(diào)度,防止在流量徒增的時(shí)候,服務(wù)出現(xiàn)滾雪球進(jìn)一步惡化,導(dǎo)致最后服務(wù)不可用的現(xiàn)象出現(xiàn)。
SheddingHandler中間件存在的意義就是盡量保證服務(wù)可用的情況下盡可能多的處理請(qǐng)求,而在流量突增的時(shí)候,丟棄部分請(qǐng)求以確保服務(wù)可用,防止服務(wù)因?yàn)榱髁窟^(guò)大而崩潰。
2. SheddingHandler的實(shí)現(xiàn)原理
SheddingHandler簡(jiǎn)單來(lái)說(shuō)就是維持了一套指標(biāo),在每個(gè)請(qǐng)求進(jìn)入系統(tǒng)的時(shí)候,利用指標(biāo)進(jìn)行計(jì)算,判斷當(dāng)前的請(qǐng)求是否允許被進(jìn)入系統(tǒng),如果允許則請(qǐng)求通過(guò)中間件繼續(xù)向下被服務(wù)處理,如果不被允許則在中間件層面就丟棄掉(正是這個(gè)丟棄,保證了在流量突增時(shí)服務(wù)的穩(wěn)定)。
具體看源碼:
// SheddingHandler returns a middleware that does load shedding.
func SheddingHandler(shedder load.Shedder, metrics *stat.Metrics) func(http.Handler) http.Handler {
if shedder == nil {
return func(next http.Handler) http.Handler {
return next
}
}
ensureSheddingStat() // 負(fù)責(zé)每分鐘打印shedding相關(guān)的數(shù)據(jù)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sheddingStat.IncrementTotal()
promise, err := shedder.Allow() // 判斷是否允許此請(qǐng)求進(jìn)入下一步
if err != nil {
metrics.AddDrop() // drop掉請(qǐng)求,在中間件層面就拒絕了請(qǐng)求
sheddingStat.IncrementDrop()
logx.Errorf("[http] dropped, %s - %s - %s",
r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
w.WriteHeader(http.StatusServiceUnavailable)// 返回503,提示服務(wù)不可用
return
}
cw := response.NewWithCodeResponseWriter(w)
defer func() {
if cw.Code == http.StatusServiceUnavailable {
promise.Fail() // 相關(guān)指標(biāo)記錄
} else {
sheddingStat.IncrementPass()
promise.Pass() // 相關(guān)指標(biāo)記錄
}
}()
next.ServeHTTP(cw, r)
})
}
}可以看到請(qǐng)求是否可以繼續(xù)向下,取決于Allow()這個(gè)方法,這個(gè)方法的實(shí)現(xiàn)如下:
// Allow implements Shedder.Allow.
func (as *adaptiveShedder) Allow() (Promise, error) {
if as.shouldDrop() {// 判斷是否應(yīng)該丟棄
as.droppedRecently.Set(true)
return nil, ErrServiceOverloaded// 丟棄
}
as.addFlying(1) // 通過(guò)校驗(yàn)
return &promise{
start: timex.Now(),
shedder: as,
}, nil
}繼續(xù)看shouldDrop()方法:
func (as *adaptiveShedder) shouldDrop() bool {
if as.systemOverloaded() || as.stillHot() {// 如果任一滿足,這個(gè)請(qǐng)求都會(huì)被過(guò)載
if as.highThru() {
flying := atomic.LoadInt64(&as.flying)
as.avgFlyingLock.Lock()
avgFlying := as.avgFlying
as.avgFlyingLock.Unlock()
msg := fmt.Sprintf(
"dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
logx.Error(msg)
stat.Report(msg)
return true
}
}
return false
}
func (as *adaptiveShedder) systemOverloaded() bool {
if !systemOverloadChecker(as.cpuThreshold) { // 校驗(yàn)CPU的負(fù)載是否超出設(shè)定值
return false
}
as.overloadTime.Set(timex.Now())// 超出設(shè)定值,記錄當(dāng)前的時(shí)間(這主要是為了后續(xù)流量減小,系統(tǒng)的恢復(fù)用)
return true
}
func (as *adaptiveShedder) stillHot() bool {
if !as.droppedRecently.True() {// 如果這個(gè)請(qǐng)求之前有請(qǐng)求被drop這里值為true,反之為false
return false// 之前的請(qǐng)求沒(méi)有被drop表示系統(tǒng)可能沒(méi)有遇到過(guò)載的問(wèn)題,返回false
}
overloadTime := as.overloadTime.Load()// 如果之前有請(qǐng)求被drop,表示存在過(guò)載
if overloadTime == 0 {// 看看是否有記錄過(guò)載的時(shí)間
return false
}
if timex.Since(overloadTime) < coolOffDuration {// 如果小于冷卻時(shí)間,表示系統(tǒng)依然是過(guò)載狀態(tài)
return true
}
as.droppedRecently.Set(false)// 表示CPU過(guò)載,上一次過(guò)載過(guò)了冷卻器,這個(gè)請(qǐng)求可以繼續(xù)執(zhí)行,設(shè)置為false
return false
}可以看到請(qǐng)求被drop的前置條件有兩個(gè):
- 系統(tǒng)的CPU負(fù)載超出了設(shè)定值,目前go-zero設(shè)置的默認(rèn)值為90%,即系統(tǒng)CPU負(fù)載達(dá)到90%后,就意味著系統(tǒng)過(guò)載了,只要是過(guò)載,請(qǐng)求會(huì)被直接拒絕;否則判斷第二個(gè)條件
- 因?yàn)檫^(guò)載可能會(huì)隨著流量減小而恢復(fù),或者丟棄的請(qǐng)求太多,系統(tǒng)CPU會(huì)慢慢的恢復(fù)正常水平(90%以下),所以需要看一下過(guò)載時(shí)間,如果超過(guò)了冷卻時(shí)間,而第一個(gè)條件又表示系統(tǒng)CPU負(fù)載正常,此時(shí)我們會(huì)認(rèn)定系統(tǒng)恢復(fù)了,這個(gè)請(qǐng)求可以處理。
滿足上述任一條件,此請(qǐng)求就會(huì)進(jìn)入最后的highThru()方法判斷環(huán)節(jié),如果滿足了,此請(qǐng)求就會(huì)被丟棄。
從上面我們可以得到,我們判斷服務(wù)是否過(guò)載,是依靠CPU的使用率去判斷的,那么我們?nèi)绾蝿?dòng)態(tài)的計(jì)算CPU的使用率呢?
在go-zero里面,采用的是直接獲取linux機(jī)器上的cpu的相關(guān)文件,然后通過(guò)代碼邏輯將相關(guān)的文件進(jìn)行解析并計(jì)算出CPU使用率??梢詤⒖迹?a rel="external nofollow" target="_blank">[cgroup_linux.go]

這里為了效率問(wèn)題,并不是實(shí)時(shí)去計(jì)算的,而是在啟動(dòng)的時(shí)候,啟動(dòng)了一個(gè)goroutine每250ms進(jìn)行以此CPU使用率數(shù)據(jù)的刷新。
const (
// 250ms and 0.95 as beta will count the average cpu load for past 5 seconds
cpuRefreshInterval = time.Millisecond * 250
allRefreshInterval = time.Minute
// moving average beta hyperparameter
beta = 0.95
)
var cpuUsage int64
func init() {
go func() {
cpuTicker := time.NewTicker(cpuRefreshInterval)
defer cpuTicker.Stop()
allTicker := time.NewTicker(allRefreshInterval)
defer allTicker.Stop()
for {
select {
case <-cpuTicker.C:
threading.RunSafe(func() {
curUsage := internal.RefreshCpu() // 刷新CPU使用率數(shù)據(jù)
prevUsage := atomic.LoadInt64(&cpuUsage)
// cpu = cpu??1 * beta + cpu? * (1 - beta)
usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1-beta))
atomic.StoreInt64(&cpuUsage, usage)
})
case <-allTicker.C:
if logEnabled.True() {
printUsage()
}
}
}
}()
}最后再來(lái)看highThru()方法,這個(gè)方法相對(duì)來(lái)說(shuō)比較復(fù)雜:
func (as *adaptiveShedder) addFlying(delta int64) {
flying := atomic.AddInt64(&as.flying, delta)// 請(qǐng)求通過(guò)檢驗(yàn)進(jìn)入后會(huì)加1,請(qǐng)求被服務(wù)處理完后會(huì)減1
if delta < 0 {
as.avgFlyingLock.Lock()
// 平均請(qǐng)求數(shù)計(jì)算為當(dāng)前平均請(qǐng)求數(shù)*0.9 + 當(dāng)前運(yùn)行請(qǐng)求數(shù)*0.1
as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
as.avgFlyingLock.Unlock()
}
}
func (as *adaptiveShedder) highThru() bool {
as.avgFlyingLock.Lock()
avgFlying := as.avgFlying // 運(yùn)行中的平均請(qǐng)求數(shù)
as.avgFlyingLock.Unlock()
maxFlight := as.maxFlight()// 運(yùn)行的最大的請(qǐng)求數(shù)
// 如果運(yùn)行的平均請(qǐng)求數(shù)>最大的請(qǐng)求數(shù)且當(dāng)前運(yùn)行的請(qǐng)求數(shù)>最大的請(qǐng)求數(shù),表示依舊高負(fù)載
return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
}
func (as *adaptiveShedder) maxFlight() int64 {
// windows = buckets per second
// maxQPS = maxPASS * windows
// minRT = min average response time in milliseconds
// maxQPS * minRT / milliseconds_per_second
// 最大的運(yùn)行數(shù)的計(jì)算為最大請(qǐng)求數(shù)*窗口的長(zhǎng)度*最小的處理時(shí)間
return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
}上面關(guān)于flying的計(jì)算,在SheddingHandler中有兩個(gè)count統(tǒng)計(jì)器在統(tǒng)計(jì)這通過(guò)的總請(qǐng)求數(shù)以及請(qǐng)求的平均耗時(shí)。默認(rèn)會(huì)在5s的時(shí)間內(nèi)啟動(dòng)50個(gè)大小的bucket來(lái)循環(huán)滾動(dòng),即每個(gè)bucket統(tǒng)計(jì)100ms內(nèi)的請(qǐng)求數(shù)。
這里利用窗口統(tǒng)計(jì)請(qǐng)求數(shù)大小的判斷主要是為了規(guī)避在負(fù)載的情況下,丟棄了太多的請(qǐng)求導(dǎo)致系統(tǒng)實(shí)際運(yùn)行的請(qǐng)求數(shù)減少的太多,所以加了這一層判斷,這個(gè)可以保證在系統(tǒng)高負(fù)載丟棄了大量的請(qǐng)求的情況下,系統(tǒng)盡可能多的處理更多的請(qǐng)求,而不是負(fù)載一高就直接丟棄。
func (as *adaptiveShedder) maxPass() int64 {
var result float64 = 1
as.passCounter.Reduce(func(b *collection.Bucket) {
if b.Sum > result {
result = b.Sum
}
})
return int64(result)
}
func (as *adaptiveShedder) minRt() float64 {
result := defaultMinRt
as.rtCounter.Reduce(func(b *collection.Bucket) {
if b.Count <= 0 {
return
}
avg := math.Round(b.Sum / float64(b.Count))
if avg < result {
result = avg
}
})
return result
}3. 相關(guān)方案的對(duì)比
在調(diào)度請(qǐng)求這一塊,go-zero的方案確實(shí)很棒,結(jié)合了CPU使用率和過(guò)載冷缺以及請(qǐng)求數(shù)大小因素,不僅保證了系統(tǒng)高負(fù)載下服務(wù)的正常,還確保了系統(tǒng)能夠盡可能多的處理請(qǐng)求。
但從我們目前的調(diào)度模式以及執(zhí)行單元的狀態(tài)角度出發(fā),我們會(huì)發(fā)現(xiàn)服務(wù)接收到一個(gè)請(qǐng)求后會(huì)解析請(qǐng)求讀取請(qǐng)求的內(nèi)容,然后調(diào)度此請(qǐng)求給到執(zhí)行單元,這個(gè)執(zhí)行單元可能是一個(gè)線程或者一個(gè)Goroutine,從執(zhí)行單元的角度來(lái)看,以線程為例,線程的生命周期會(huì)有如下圖所示的幾個(gè)階段:
- 新建
- 就緒
- 運(yùn)行
- 阻塞
- 死亡

我們?cè)購(gòu)南到y(tǒng)服務(wù)的限制方面考慮,一般系統(tǒng)的限制包括I/O限制和CPU限制,I/O限制指代I/O密集型的應(yīng)用程序的限制,而CPU限制則是CPU密集型應(yīng)用程序的限制:
- I/O密集型:表示服務(wù)需要進(jìn)行大量的I/O操作,如磁盤讀寫、網(wǎng)絡(luò)傳輸?shù)龋@類服務(wù)不需要進(jìn)行大量的計(jì)算,但需要等待I/O操作完成,所以一般CPU占用率很低。
- CPU密集型:表示服務(wù)需要進(jìn)行大量的CPU操作,如數(shù)據(jù)處理、圖像處理、加密解密等,這類服務(wù)需要進(jìn)行大量的計(jì)算,但不需要進(jìn)行太多I/O相關(guān)的操作,所以I/O等待時(shí)間短,CPU占用率高。
在目前的服務(wù)應(yīng)用中,絕大部分的應(yīng)用程序是CPU密集型。
而CPU密集型服務(wù),要想最大限度的利用CPU,最理想的情況所有的執(zhí)行單元都處于運(yùn)行和等待的狀態(tài),但等待和運(yùn)行之間有個(gè)就緒的中間態(tài),這也就意味著,如果想讓所有的執(zhí)行單元都處于運(yùn)行和代碼狀態(tài),我們就需要最小化就緒的執(zhí)行單元數(shù)量。而就緒單元一旦獲取到CPU資源(時(shí)間片)就會(huì)進(jìn)入Running狀態(tài)。
如果處于就緒的單元不斷增多,在某種意義上意味著程序的CPU資源不足,即CPU過(guò)負(fù)載。從這個(gè)角度出發(fā),我們可以利用執(zhí)行單元處于就緒態(tài)的數(shù)量來(lái)判斷服務(wù)是否過(guò)載。
在Golang的GMP模型中,P的數(shù)量是一定的,M的數(shù)量最多不超過(guò)10000個(gè),而Goroutine的數(shù)量幾乎是不定的。從上面利用就緒態(tài)(在Golang中是GRunnable狀態(tài))的數(shù)量來(lái)判斷系統(tǒng)過(guò)載,也給我們提供了一個(gè)新的方案:判斷系統(tǒng)所有P上(本地隊(duì)列)的Goroutine處于GRunnable的數(shù)量,如果數(shù)量超過(guò)一個(gè)界定值,表示CPU資源不足,即過(guò)載。
4. 小結(jié)
在剛開(kāi)始接觸到服務(wù)的請(qǐng)求調(diào)度的時(shí)候,就想著看看是否有開(kāi)源的方案來(lái)解決這個(gè)問(wèn)題,果不其然,你能夠想到的,大家曾經(jīng)都想到過(guò)并付諸了時(shí)間和精力去給出了具體的方案設(shè)計(jì),無(wú)論是SheddingHandler的設(shè)計(jì),還是利用Goroutine的狀態(tài)來(lái)判斷系統(tǒng)是否過(guò)載,它們都有各自的理論為依托,但從精確度來(lái)說(shuō)go-zero的SheddingHandler的設(shè)計(jì)相對(duì)來(lái)說(shuō)更為準(zhǔn)確,因?yàn)閺腃PU的真實(shí)數(shù)據(jù)出發(fā),得到具體的CPU是否負(fù)載是最為可靠直觀的。
判斷Goroutine的就緒態(tài)數(shù)量這個(gè)方案,在最開(kāi)始的接觸中,自己是不太理解的,但從具體理論出發(fā),包括后續(xù)自己也進(jìn)行了相關(guān)的壓測(cè),以及Golang的trace.out文件的分析,在某種程度上,這種方案也是可行的,不禁感嘆自己還是太弱了,還是要多學(xué)習(xí),加油!
到此這篇關(guān)于Golang服務(wù)的請(qǐng)求調(diào)度的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Golang服務(wù)請(qǐng)求調(diào)度內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go?實(shí)戰(zhàn)單隊(duì)列到優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)圖文示例
這篇文章主要為大家介紹了Go?實(shí)戰(zhàn)單隊(duì)列到優(yōu)先級(jí)隊(duì)列圖文示例實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
使用golang在windows上設(shè)置全局快捷鍵的操作
最近在工作中,總是重復(fù)的做事,想著自己設(shè)置一個(gè)快捷鍵實(shí)現(xiàn)windows 剪貼板的功能,所以本文小編給大家分享了使用golang在windows上設(shè)置全局快捷鍵的操作,文中有相關(guān)的代碼示例供大家參考,需要的朋友可以參考下2024-02-02
Go語(yǔ)言循環(huán)遍歷含有中文的字符串的方法小結(jié)
這篇文章主要介紹了Go語(yǔ)言循環(huán)遍歷含有中文的字符串的幾種方法,文章通過(guò)代碼示例講解的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴跟著小編一起來(lái)看看吧2023-07-07
CentOS 32 bit安裝golang 1.7的步驟詳解
Go是Google開(kāi)發(fā)的一種編譯型,并發(fā)型,并具有垃圾回收功能的編程語(yǔ)言。在發(fā)布了6個(gè)rc版本之后,Go 1.7終于正式發(fā)布了。本文主要介紹了在CentOS 32 bit安裝golang 1.7的步驟,文中給出了詳細(xì)的步驟,相信對(duì)大家的學(xué)習(xí)和理解具有一定的參考借鑒價(jià)值,下面來(lái)一起看看吧。2016-12-12
golang定時(shí)器Timer的用法和實(shí)現(xiàn)原理解析
這篇文章主要介紹了golang定時(shí)器Ticker,本文主要來(lái)看一下Timer的用法和實(shí)現(xiàn)原理,需要的朋友可以參考以下內(nèi)容2023-04-04
基于Go語(yǔ)言搭建靜態(tài)文件服務(wù)器的詳細(xì)教程
Go 是一個(gè)開(kāi)源的編程語(yǔ)言,它能讓構(gòu)造簡(jiǎn)單、可靠且高效的軟件變得容易,本文給大家介紹了基于Go語(yǔ)言搭建靜態(tài)文件服務(wù)器的詳細(xì)教程,文中通過(guò)圖文和代碼講解的非常詳細(xì),需要的朋友可以參考下2024-10-10

