Golang sync包中errgroup的使用詳解
1、初識 errgroup
WaitGroup 主要用于控制任務(wù)組下的并發(fā)子任務(wù)。它的具體做法就是,子任務(wù) goroutine 執(zhí)行前通過 Add 方法添加任務(wù)數(shù)目,子任務(wù) goroutine 結(jié)束時調(diào)用 Done 標(biāo)記已完成任務(wù)數(shù),主任務(wù) goroutine 通過 Wait 方法等待所有的任務(wù)完成后才能執(zhí)行后續(xù)邏輯。
package main
import (
"net/http"
"sync"
)
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.baidu.com/",
"http://www.bokeyuan12111.com/",
}
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
resp, err := http.Get(url)
if err != nil {
return
}
resp.Body.Close()
}(url)
}
wg.Wait()
}
在以上示例代碼中,我們通過三個 goroutine 去并發(fā)的請求 url,直到所有的子任務(wù) goroutine 均完成訪問,主任務(wù) goroutine 下的 wg.Wait 才會停止阻塞。
但在實際的項目代碼中,子任務(wù) goroutine 的執(zhí)行并不總是順風(fēng)順?biāo)鼈円苍S會產(chǎn)生 error。而 WaitGroup 并沒有告訴我們在子 goroutine 發(fā)生錯誤時,如何將其拋給主任務(wù) groutine。
這個時候可以考慮使用 errgroup
package main
import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.baidu.com/",
"http://www.bokeyuan12111.com/",
}
g := new(errgroup.Group)
for _, url := range urls {
url := url
g.Go(func() error {
resp, err := http.Get(url)
if err != nil {
fmt.Println(err)
return err
}
fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)
return resp.Body.Close()
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
} else {
fmt.Println("All success!")
}
}
結(jié)果如下:
get [http://www.baidu.com/] success: [200]
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
Get "http://www.golang.org/": dial tcp 142.251.42.241:80: i/o timeout
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
可以看到,執(zhí)行獲取www.bokeyuan12111.com和www.golang.org兩個 url 的子 groutine 均發(fā)生了錯誤,在主任務(wù) goroutine 中成功捕獲到了第一個錯誤信息。
除了 擁有 WaitGroup 的控制能力 和 錯誤傳播 的功能之外,errgroup 還有最重要的 context 反向傳播機制,我們來看一下它的設(shè)計。
2、errgroup 源碼解析
errgroup 的設(shè)計非常精練,全部代碼如下
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
可以看到,errgroup 的實現(xiàn)依靠于結(jié)構(gòu)體 Group,它通過封裝 sync.WaitGroup,繼承了 WaitGroup 的特性,在 Go() 方法中新起一個子任務(wù) goroutine,并在 Wait() 方法中通過 sync.WaitGroup 的 Wait 進行阻塞等待。
同時 Group 利用 sync.Once 保證了它有且僅會保留第一個子 goroutine 錯誤。
最后,Group 通過嵌入 context.WithCancel 方法產(chǎn)生的 cancel 函數(shù),能夠在子 goroutine 發(fā)生錯誤時,及時通過調(diào)用 cancle 函數(shù),將 Context 的取消信號及時傳播出去。當(dāng)然,這一特性需要用戶代碼的配合。
3、errgroup 上下文取消
在 errgroup 的文檔(https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup#example-Group-Pipeline)中,它基于 Go 官方文檔的 pipeline( https://blog.golang.org/pipelines) ,實現(xiàn)了一個任務(wù)組 goroutine 中上下文取消(Context cancelation)演示的示例。但該 Demo 的前提知識略多,本文這里基于其思想,提供一個易于理解的使用示例。
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
dataChan := make(chan int, 20)
// 數(shù)據(jù)生產(chǎn)端任務(wù)子 goroutine
g.Go(func() error {
defer close(dataChan)
for i := 1; ; i++ {
if i == 10 {
return fmt.Errorf("data 10 is wrong")
}
dataChan <- i
fmt.Println(fmt.Sprintf("sending %d", i))
}
})
// 數(shù)據(jù)消費端任務(wù)子 goroutine
for i := 0; i < 3; i++ {
g.Go(func() error {
for j := 1; ; j++ {
select {
case <-ctx.Done():
return ctx.Err()
case number := <-dataChan:
fmt.Println(fmt.Sprintf("receiving %d", number))
}
}
})
}
// 主任務(wù) goroutine 等待 pipeline 結(jié)束數(shù)據(jù)流
err := g.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("main goroutine done!")
}
在以上示例中,我們模擬了一個數(shù)據(jù)傳送管道。在數(shù)據(jù)的生產(chǎn)與消費任務(wù)集中,有四個子任務(wù) goroutine:一個生產(chǎn)數(shù)據(jù)的 goroutine,三個消費數(shù)據(jù)的 goroutine。當(dāng)數(shù)據(jù)生產(chǎn)方存在錯誤數(shù)據(jù)時(數(shù)據(jù)等于 10 ),我們停止數(shù)據(jù)的生產(chǎn)與消費,并將錯誤拋出,回到 main goroutine 的執(zhí)行邏輯中。
可以看到,因為 errgroup 中的 Context cancle 函數(shù)的嵌入,我們在子任務(wù) goroutine 中也能反向控制任務(wù)上下文。
程序的某一次運行,輸出結(jié)果如下:
sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 1
receiving 3
receiving 2
receiving 4
data 10 is wrong
main goroutine done!
4、總結(jié)
errgroup 是 Go 官方的并發(fā)原語補充庫,相對于標(biāo)準(zhǔn)庫中提供的原語而言,顯得沒那么核心。這里總結(jié)一下 errgroup 的特性。
- 繼承了 WaitGroup 的功能
- 錯誤傳播:能夠返回任務(wù)組中發(fā)生的第一個錯誤,但有且僅能返回該錯誤
- context 信號傳播:如果子任務(wù) goroutine 中有循環(huán)邏輯,則可以添加 ctx.Done 邏輯,此時通過 context 的取消信號,提前結(jié)束子任務(wù)執(zhí)行。
到此這篇關(guān)于Golang sync包中errgroup的使用詳解的文章就介紹到這了,更多相關(guān)Golang errgroup內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang?Gin解析JSON請求數(shù)據(jù)避免出現(xiàn)EOF錯誤
這篇文章主要為大家介紹了Golang?Gin?優(yōu)雅地解析JSON請求數(shù)據(jù),避免ShouldBindBodyWith出現(xiàn)EOF錯誤的源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04
Go語言中函數(shù)可變參數(shù)(Variadic Parameter)詳解
在Python中,在函數(shù)參數(shù)不確定數(shù)量的情況下,可以動態(tài)在函數(shù)內(nèi)獲取參數(shù)。在Go語言中,也有類似的實現(xiàn)方式,本文就來為大家詳細講解一下2022-07-07

