Golang?手寫一個簡單的并發(fā)任務(wù)?manager
前言
今天也是偏實戰(zhàn)的內(nèi)容,作為一個并發(fā)復(fù)習(xí)課,很簡單,我們來看看怎樣實現(xiàn)一個并發(fā)任務(wù) manager。
在微服務(wù)的場景下,我們有很多任務(wù)的執(zhí)行是沒有明確的先后順序的,比如一個接口同時要做到任務(wù) A 和 任務(wù) B,兩個任務(wù)分別拿到一些數(shù)據(jù),最后組裝裁剪后通過接口下發(fā)。
此時,A 和 B 兩個任務(wù)沒有依賴關(guān)系,如果我們串行來執(zhí)行,會拖慢整個任務(wù)的執(zhí)行節(jié)奏,用并發(fā)的方式來優(yōu)化是一個方向。
那怎么實現(xiàn)呢?
errgroup
一個常見的想法是用 errgroup,我們之前也介紹過 Golang errgroup 設(shè)計和原理解析。
今天我們不打算用這種實現(xiàn),希望用更加基礎(chǔ)的組件來引發(fā)思考,看看如何活用 sync 包提供的基礎(chǔ)能力。另外一點是 errgroup 也有他的缺陷,如果在啟動的協(xié)程中沒有手動 recover,那么一旦在我們的任務(wù)中出現(xiàn) panic,整個程序就 crash 了。
這一點還是很有爭議的,很多開發(fā)者認(rèn)為這是符合預(yù)期的,也有一些開發(fā)者希望在 New 一個 errgroup 的時候能夠提供 option 控制是否來 recover。近期還有兩個 issue 在進行激烈的討論,目前看沒有定論。
感興趣的同學(xué)可以看下這兩個 issue:
- x/sync/errgroup: why not recover the fn's err in errgroup #40484
- proposal: x/sync/errgroup: propagate panics and Goexits through Wait #53757
需求拆解
ok,我們來試著用 sync 包基礎(chǔ)能力來實現(xiàn)一個簡單的并行任務(wù) manager。首先我們分析下需求。
- 一定要能做到并發(fā)執(zhí)行各個任務(wù),開多個協(xié)程,而不是在一個 main goroutine 里串行執(zhí)行各個任務(wù);
- 并發(fā)安全,我們當(dāng)然不希望出現(xiàn)數(shù)據(jù)異常,不希望并發(fā)執(zhí)行任務(wù)導(dǎo)致最后程序因為 runtime error 而掛掉;
- 如果多個任務(wù)都失敗,只返回一個 error 即可;
- 能夠 recover from panic,不需要開發(fā)者使用的時候再手動去寫 recover 邏輯;
- 性能有保障。
并發(fā)執(zhí)行這一點我們可以借助 sync.WaitGroup 的能力,每次啟動一個goroutine,WaitGroup 就加 1,在 defer 里完成 Done,啟動所有 goroutine 之后,等著 Wait 返回結(jié)果即可。常規(guī)的能力復(fù)用。
需要額外處理的地方在于,怎么實現(xiàn)多個線程只有一個 error 能賦值,以及 recover 的適配。
實戰(zhàn)代碼
我們理一下思路,看看代碼怎么寫。
Job
首先一定需要定義一個通用的函數(shù)簽名,使得開發(fā)者能夠傳入自己要執(zhí)行的并發(fā)任務(wù)。
type Job interface {
Do(ctx context.Context, param interface{}) error
Name() string
}JobManager
我們的 job manager 現(xiàn)階段可以簡單實現(xiàn),只是一組 Job 的集合:
type JobManager []Job
錯誤處理
要達到只有一個 error 賦值,且不出現(xiàn) race condition,有兩個方案:
- sync.Mutex 加鎖;
- sync.Once 只執(zhí)行一次。
當(dāng)然,什么時候我們都可以用一把大鎖解決問題,但它的性能不會很好,能用原子操作解決的盡量還是不要用 Mutex,這里參照 errgroup,我們可以用一個 Once 對象來控制只賦值一次。
panic 恢復(fù)可以直接在 defer 里面 recover 即可,需要能帶出來 stack trace,把它變成一個 error 賦值
及時退出
有時候我們這個并發(fā)任務(wù)數(shù)量非常多,可能還沒創(chuàng)建完 goroutine,某個先創(chuàng)建的任務(wù)就已經(jīng)掛了,這時候需要有一個全局的信號,終止后續(xù)的 goroutine 創(chuàng)建。這一點用原子操作就能實現(xiàn)。
完整代碼
把上面的分析落地,這樣我們就實現(xiàn)了一個帶上了 recover 能力,以及終止能力的的 errgroup。
package main
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
)
type Job interface {
Do(ctx context.Context, param interface{}) error
Name() string
}
type JobManager []Job
func (mgr JobManager) Execute(ctx context.Context, param interface{}) error {
var (
stop int32 = 0
err error
wg sync.WaitGroup
errOnce sync.Once
)
for _, job := range mgr {
if atomic.LoadInt32(&stop) > 0 {
break
}
wg.Add(1)
go func(j Job) {
defer func() {
wg.Done()
if r := recover(); r != nil {
errMsg := fmt.Sprintf("JobManager panic: job: %v, reason: %v", j.Name(), r)
nerr := errors.New(errMsg)
errOnce.Do(func() {
if err == nil {
err = nerr
}
})
atomic.AddInt32(&stop, 1)
}
}()
nerr := j.Do(ctx, param)
if nerr != nil {
atomic.AddInt32(&stop, 1)
errOnce.Do(func() {
if err == nil {
err = nerr
}
})
}
}(job)
}
wg.Wait()
return err
}使用方法也很簡單:
var mgr = JobManager{
AJob, BJob, CJob, // 這里的各個 Job 需要實現(xiàn)一開始我們定義的接口
}
err := mgr.Execute(ctx, param)這里我們需要定義統(tǒng)一的 param interface{},建議是一個接口,各個 Job 執(zhí)行完畢后如果有需要寫入的數(shù)據(jù),可以調(diào)用 param 的 Setter 方法寫入,最后直接拿 param 來做后續(xù)邏輯。
小結(jié)
今天我們用 sync.Once,以及 sync.WaitGroup 的能力實現(xiàn)了一個簡易的并發(fā)任務(wù)調(diào)度器,希望能夠幫助大家回顧一下此前介紹的并發(fā)相關(guān)概念和用法。其實并發(fā)管理這一點很多時候我們會存在依賴,這時候可能需要將多個 job 分層,或者梳理出來拓?fù)潢P(guān)系來執(zhí)行,我們今天只是簡單入門,復(fù)習(xí)一下相關(guān)知識。
建議大家回顧一下此前對于 once 以及 errgroup 的源碼解析,相信你會更能融會貫通。
到此這篇關(guān)于Golang 手寫一個簡單的并發(fā)任務(wù) manager的文章就介紹到這了,更多相關(guān)Golang manager內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言Elasticsearch數(shù)據(jù)清理工具思路詳解
這篇文章主要介紹了Go語言Elasticsearch數(shù)據(jù)清理工具思路詳解,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-10-10
Golang使用ReverseProxy實現(xiàn)反向代理的方法
本文介紹了如何使用Golang的ReverseProxy實現(xiàn)反向代理,包括源碼結(jié)構(gòu)解析和官方單機示例NewSingleHostReverseProxy,同時指出,若要實現(xiàn)負(fù)載均衡,需要自行開發(fā),還提供了一個簡單的HTTP服務(wù)用于測試,感興趣的朋友跟隨小編一起看看吧2024-09-09
go語言實現(xiàn)http服務(wù)端與客戶端的例子
今天小編就為大家分享一篇go語言實現(xiàn)http服務(wù)端與客戶端的例子,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08
Qt6.5 grpc組件使用 + golang grpc server
這篇文章主要介紹了Qt6.5 grpc組件使用+golang grpc server示例,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-05-05
Golang實現(xiàn)EasyCache緩存庫實例探究
這篇文章主要為大家介紹了Golang實現(xiàn)EasyCache緩存庫實例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01

