golang協(xié)程關(guān)閉踩坑實(shí)戰(zhàn)記錄
前言
Go語言中,協(xié)程創(chuàng)建和啟動非常簡單,但是如何才能正確關(guān)閉協(xié)程呢,和開車一樣,前進(jìn)總是很容易,但是如何正確的把車停在指定的地方總是不容易的。生產(chǎn)實(shí)踐中,go常常遇到未能正確關(guān)閉協(xié)程而影響程序運(yùn)行的場景,輕則協(xié)程泄漏資源浪費(fèi),重則程序崩潰。
本文,總結(jié)協(xié)程關(guān)閉的三大原則,結(jié)合實(shí)際場景讓你徹底搞定協(xié)程關(guān)閉,保證又快又穩(wěn)!
場景
結(jié)合如下典型場景,主進(jìn)程中起多個(gè)協(xié)程,這些協(xié)程會
1.共同消費(fèi)一個(gè)數(shù)據(jù)通道 data channel
2.也可能共享一個(gè)退出通道channel或context

那么,應(yīng)該如何正確關(guān)閉呢
原則1-協(xié)程接受通知主動關(guān)閉
并不推薦強(qiáng)制停止,更多的時(shí)候我們希望在停止時(shí),干一點(diǎn)事比如資源清理/連接清理等,這時(shí)候最好的方式就是通知協(xié)程退出,具體何時(shí)退出和退出前做什么完全由當(dāng)前要關(guān)閉的協(xié)程控制。
通知一般有三種方式
data channel關(guān)閉通知退出
適用簡單任務(wù),復(fù)雜的更推薦context單獨(dú)通知
// cancelFn 數(shù)據(jù)通道關(guān)閉通知退出
func cancelFn(dataChan chan int) {
for {
select {
case val, ok := <-dataChan:
// 關(guān)閉data通道時(shí),通知退出
// 一個(gè)可選是判斷data=指定值時(shí)退出
if !ok {
log.Printf("Channel closed !??!")
return
}
log.Printf("Revice dataChan %d\n", val)
}
}
}
exit channel關(guān)閉通知退出
部分簡單場景適用
// exitChannelFn 單獨(dú)退出通道關(guān)閉通知退出
func exitChannelFn(wg *sync.WaitGroup, taskNo int, dataChan chan int, exitChan chan struct{}) {
defer wg.Done()
for {
select {
case val, ok := <-dataChan:
if !ok {
log.Printf("Task %d channel closed !??!", taskNo)
return
}
log.Printf("Task %d revice dataChan %d\n", taskNo, val)
// 關(guān)閉exit通道時(shí),通知退出
case <-exitChan:
log.Printf("Task %d revice exitChan signal!\n", taskNo)
return
}
}
}
context超時(shí)或取消通知退出
主流推薦
// contextCancelFn context取消或超時(shí)通知退出
func contextCancelFn(wg *sync.WaitGroup, taskNo int, dataChan chan int, ctx context.Context) {
defer wg.Done()
for {
select {
case val, ok := <-dataChan:
if !ok {
log.Printf("Task %d channel closed ?。?!", taskNo)
return
}
log.Printf("Task %d revice dataChan %d\n", taskNo, val)
// ctx取消或超時(shí),通知退出
case <-ctx.Done():
log.Printf("Task %d revice exit signal!\n", taskNo)
return
}
}
}
原則2-誰負(fù)責(zé)創(chuàng)建協(xié)程誰負(fù)責(zé)關(guān)閉協(xié)程
go func可以立即創(chuàng)建一個(gè)協(xié)程,因此常常遇到我們可能在任何一個(gè)地方創(chuàng)建協(xié)程,但是在哪里關(guān)閉呢,是需要統(tǒng)一管理嗎?官方推薦的最佳實(shí)踐就是,誰負(fù)責(zé)創(chuàng)建協(xié)程誰負(fù)責(zé)關(guān)閉協(xié)程。
參考如下,每次調(diào)用execDataTaskFunc函數(shù)執(zhí)行都會起一個(gè)協(xié)程異步執(zhí)行,協(xié)程關(guān)閉通過監(jiān)控外層函數(shù)context參數(shù)來實(shí)現(xiàn)。
func execDataTaskFunc(ctx context.Context, dataChan chan int, taskName string) chan int {
out := make(chan int)
log.Printf("Task %s start!\n", taskName)
go func() {
defer close(out)
for {
select {
case data, ok := <-dataChan:
if !ok {
log.Printf("Task %s revice data channel close signal!\n", taskName)
return
}
// do something
out <- data
case <-ctx.Done():
log.Printf("Task %s revice exit signal!\n", taskName)
return
}
}
}()
return out
}
原則3-等待所有協(xié)程關(guān)閉再退出
通常對于正在運(yùn)行的協(xié)程,發(fā)出退出通知后,具體程序何時(shí)才能退出呢?一般如下三種方式
WaitGroup/ErrGroup判斷所有協(xié)程關(guān)閉后退出
最常用,參考如下
// 多個(gè)任務(wù)并行控制,等待所有任務(wù)完成
func TestTaskControl(t *testing.T) {
dataChan := make(chan int)
taskNum := 3
wg := sync.WaitGroup{}
wg.Add(taskNum)
// 起多個(gè)協(xié)程,data關(guān)閉時(shí)退出
for i := 0; i < taskNum; i++ {
go func(taskNo int) {
defer wg.Done()
t.Logf("Task %d run\n", taskNo)
for {
select {
case _, ok := <-dataChan:
if !ok {
t.Logf("Task %d notify to stop\n", taskNo)
return
}
}
}
}(i)
}
// 通知退出
go func() {
time.Sleep(3 * time.Second)
close(dataChan)
}()
// 等待退出完成
wg.Wait()
}
等待channel關(guān)閉后退出
參考如下,對于部分任務(wù)場景,協(xié)程數(shù)據(jù)輸出到新建的channel中,可以在此channel上阻塞等待,直到協(xié)程通知關(guān)閉時(shí),關(guān)閉此channel然后程序退出。
// 多個(gè)任務(wù)并行控制,等待所有任務(wù)完成
func TestTaskControl2(t *testing.T) {
dataChan := make(chan int)
// 起協(xié)程返回新chan,在輸出chan等待判斷完成
out := make(chan int)
go func() {
defer close(out) // 結(jié)束則自動關(guān)閉
for {
select {
case _, ok := <-dataChan:
if !ok {
t.Logf("Task notify to stop\n")
return
}
}
}
}()
// 通知退出
go func() {
time.Sleep(3 * time.Second)
close(dataChan)
}()
dataChan <- 1
// 等待退出完成
for data := range out {
t.Logf("%d\n", data)
}
}
等待足夠長時(shí)間后關(guān)閉
對于部分任務(wù),能夠估算從通知關(guān)閉到實(shí)際關(guān)閉時(shí)間,則可等待足夠長時(shí)間來保證協(xié)程關(guān)閉然后退出,實(shí)際場景并不推薦,帶有一定不確定性,很容易出錯(cuò)。
func TestTaskControl3(t *testing.T) {
dataChan := make(chan int)
// 起協(xié)程返回新chan
out := make(chan int)
go func() {
defer close(out) // 結(jié)束則自動關(guān)閉
for {
select {
case _, ok := <-dataChan:
if !ok {
t.Logf("Task notify to stop\n")
return
}
}
}
}()
// 通知退出
go func() {
time.Sleep(3 * time.Second)
close(dataChan)
}()
dataChan <- 1
// 等待足夠長時(shí)間,退出完成
time.Sleep(10 * time.Second)
}
復(fù)雜退出場景
結(jié)合三大原則,這里展示部分復(fù)雜場景的協(xié)程關(guān)閉方案。
嵌套協(xié)程,同時(shí)關(guān)閉
如下是多個(gè)任務(wù)執(zhí)行,每個(gè)任務(wù)一個(gè)協(xié)程,現(xiàn)在考慮如下目標(biāo)
支持多級嵌套,父任務(wù)停止后,子任務(wù)自動停止

方案:使用context通知,WaitGroup等待所有任務(wù)關(guān)閉后退出
任務(wù)運(yùn)行代碼
type TaskFunc func(ctx context.Context)
func runTaskFunc(wg *sync.WaitGroup, ctx context.Context, taskName string, f TaskFunc) {
defer wg.Done()
log.Printf("Task %s start!\n", taskName)
f(ctx)
for {
select {
case <-ctx.Done():
log.Printf("Task %s revice exit signal!\n", taskName)
return
}
}
}
整體實(shí)現(xiàn)代碼
// 簡單并行任務(wù)-同時(shí)停止
func TestStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var wg = sync.WaitGroup{}
// 起多個(gè)任務(wù)
wg.Add(1)
go runTaskFunc(&wg, ctx, "A", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "B", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "C", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "D", func(ctx context.Context) {})
})
})
wg.Add(1)
go runTaskFunc(&wg, ctx, "E", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "F", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "G", func(ctx context.Context) {})
})
})
})
// 通知關(guān)閉
go func() {
time.Sleep(3 * time.Second)
cancel()
}()
// 等待全部關(guān)閉后退出
wg.Wait()
}
協(xié)程關(guān)閉是無序的,如下
2023/01/07 22:40:09 Task A start!
2023/01/07 22:40:09 Task E start!
2023/01/07 22:40:09 Task F start!
2023/01/07 22:40:09 Task G start!
2023/01/07 22:40:09 Task B start!
2023/01/07 22:40:09 Task C start!
2023/01/07 22:40:09 Task D start!
2023/01/07 22:40:12 Task A revice exit signal!
2023/01/07 22:40:12 Task G revice exit signal!
2023/01/07 22:40:12 Task B revice exit signal!
2023/01/07 22:40:12 Task F revice exit signal!
2023/01/07 22:40:12 Task D revice exit signal!
2023/01/07 22:40:12 Task C revice exit signal!
2023/01/07 22:40:12 Task E revice exit signal!
嵌套協(xié)程,指定順序關(guān)閉
還是上述場景,現(xiàn)在需求是:控制停止順序,先停EFG 再停BCD 最后停A

方案:借助context通知,指定多個(gè)cancel點(diǎn),WaitGroup等待所有任務(wù)關(guān)閉后退出
// 簡單并行任務(wù)-控制停止順序
func TestStop2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctxb, cancelb := context.WithCancel(ctx)
ctxe, cancele := context.WithCancel(ctx)
var wg = sync.WaitGroup{}
// 起多個(gè)任務(wù)
wg.Add(1)
go runTaskFunc(&wg, ctx, "A", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctxb, "B", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "C", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "D", func(ctx context.Context) {})
})
})
wg.Add(1)
go runTaskFunc(&wg, ctxe, "E", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "F", func(ctx context.Context) {
wg.Add(1)
go runTaskFunc(&wg, ctx, "G", func(ctx context.Context) {})
})
})
})
// 通知關(guān)閉
go func() {
time.Sleep(3 * time.Second)
cancele()
time.Sleep(3 * time.Second)
cancelb()
time.Sleep(3 * time.Second)
cancel()
}()
// 等待全部關(guān)閉后退出
wg.Wait()
}
運(yùn)行如下,協(xié)程按照指定順序關(guān)閉
2023/01/07 22:40:40 Task A start!
2023/01/07 22:40:40 Task E start!
2023/01/07 22:40:40 Task F start!
2023/01/07 22:40:40 Task G start!
2023/01/07 22:40:40 Task B start!
2023/01/07 22:40:40 Task C start!
2023/01/07 22:40:40 Task D start!
2023/01/07 22:40:43 Task E revice exit signal!
2023/01/07 22:40:43 Task F revice exit signal!
2023/01/07 22:40:43 Task G revice exit signal!
2023/01/07 22:40:46 Task B revice exit signal!
2023/01/07 22:40:46 Task D revice exit signal!
2023/01/07 22:40:46 Task C revice exit signal!
2023/01/07 22:40:49 Task A revice exit signal!
嵌套協(xié)程,逐級關(guān)閉
考慮如下場景,A->B->C嵌套起協(xié)程,每個(gè)協(xié)程創(chuàng)建新的channel傳輸數(shù)據(jù)給下游

如下起任務(wù),每個(gè)任務(wù)可以通過context或者data channel關(guān)閉來通知退出
func execDataTaskFunc(ctx context.Context, dataChan chan int, taskName string) chan int {
out := make(chan int)
//out := make(chan int, 100)
log.Printf("Task %s start!\n", taskName)
go func() {
defer close(out)
for {
select {
case data, ok := <-dataChan:
if !ok {
log.Printf("Task %s revice data channel close signal!\n", taskName)
return
}
time.Sleep(2 * time.Second)
out <- data
case <-ctx.Done():
log.Printf("Task %s revice exit signal!\n", taskName)
return
}
}
}()
return out
}
整體流程如下
func TestDataTaskStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dataChanInput := make(chan int)
// 嵌套運(yùn)行協(xié)程
taskChanA := execDataTaskFunc(ctx, dataChanInput, "A")
taskChanB := execDataTaskFunc(ctx, taskChanA, "B")
taskChanC := execDataTaskFunc(ctx, taskChanB, "C")
// 通知退出
go func() {
i := 0
for {
select {
case <-time.After(time.Second):
i = i + 1
if i == 10 {
t.Logf("Notify to stop!!!")
close(dataChanInput)
//cancel()
return
}
dataChanInput <- i
}
}
}()
// 等待退出
for data := range taskChanC {
t.Logf("Out->%d", data)
}
}
這里數(shù)據(jù)每條數(shù)據(jù)產(chǎn)生間隔1秒,每個(gè)任務(wù)處理時(shí)長為2秒,也就是說通知關(guān)閉時(shí),可能上游任務(wù)處理中,下游還沒來得及處理,因此期望的是逐級依次關(guān)閉A/B/C,確保上游數(shù)據(jù)處理完成傳給下游,不要丟失數(shù)據(jù)。
對比context通知退出和data channel關(guān)閉通知退出,對比如下??梢钥吹饺绻覀兪?strong>有中間處理和逐級關(guān)閉需求的還是要依賴close關(guān)閉協(xié)程來通知,context全局通知退出是無序的,無法保證數(shù)據(jù)不丟失。
- cancel()-context通知退出
執(zhí)行如下,A/B/C同時(shí)退出,數(shù)據(jù)出現(xiàn)丟失
2023/01/07 23:23:59 Task A start!
2023/01/07 23:23:59 Task B start!
2023/01/07 23:23:59 Task C start!
complex_test.go:174: Out->1
complex_test.go:174: Out->2
complex_test.go:174: Out->3
complex_test.go:174: Out->4
complex_test.go:174: Out->5
complex_test.go:174: Out->6
complex_test.go:161: Notify to stop!!!
2023/01/07 23:24:18 Task C revice exit signal!
complex_test.go:174: Out->7
- close(dataChanInput)通知退出
執(zhí)行如下,A/B/C逐級依次關(guān)閉,數(shù)據(jù)沒有丟失
2023/01/07 23:20:18 Task A start!
2023/01/07 23:20:18 Task B start!
2023/01/07 23:20:18 Task C start!
complex_test.go:174: Out->1
complex_test.go:174: Out->2
complex_test.go:174: Out->3
complex_test.go:174: Out->4
complex_test.go:174: Out->5
complex_test.go:174: Out->6
complex_test.go:161: Notify to stop!!!
complex_test.go:174: Out->7
2023/01/07 23:20:37 Task A revice data channel close signal!
complex_test.go:174: Out->8
2023/01/07 23:20:39 Task B revice data channel close signal!
2023/01/07 23:20:41 Task C revice data channel close signal!
complex_test.go:174: Out->9
參考
演示代碼 https://gitee.com/wenzhou1219/go-in-prod/tree/master/task_stop
總結(jié)
到此這篇關(guān)于golang協(xié)程關(guān)閉踩坑的文章就介紹到這了,更多相關(guān)golang協(xié)程關(guān)閉內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go-zero源碼閱讀之布隆過濾器實(shí)現(xiàn)代碼
布隆過濾器可以用于檢索一個(gè)元素是否在一個(gè)集合中。它的優(yōu)點(diǎn)是空間效率和查詢時(shí)間都比一般的算法要好的多,缺點(diǎn)是有一定的誤識別率和刪除困難,這篇文章主要介紹了go-zero源碼閱讀-布隆過濾器,需要的朋友可以參考下2023-02-02
Go實(shí)現(xiàn)基于RSA加密算法的接口鑒權(quán)
這篇文章主要介紹了Go實(shí)現(xiàn)基于RSA加密算法的接口鑒權(quán),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-06-06
GoLang抽獎系統(tǒng)簡易實(shí)現(xiàn)流程
這篇文章主要介紹了GoLang抽獎系統(tǒng)實(shí)現(xiàn)流程,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-12-12
Golang設(shè)計(jì)模式之外觀模式講解和代碼示例
外觀是一種結(jié)構(gòu)型設(shè)計(jì)模式, 能為復(fù)雜系統(tǒng)、 程序庫或框架提供一個(gè)簡單 (但有限) 的接口,這篇文章就給大家詳細(xì)介紹一下Golang的外觀模式,文中有詳細(xì)的代碼示例,具有一定的參考價(jià)值,需要的朋友可以參考下2023-06-06
Go打印結(jié)構(gòu)體提升代碼調(diào)試效率實(shí)例詳解
這篇文章主要介紹了Go打印結(jié)構(gòu)體提升代碼調(diào)試效率實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-02-02
go語言發(fā)送smtp郵件的實(shí)現(xiàn)示例
這篇文章主要介紹了go發(fā)送smtp郵件的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
web項(xiàng)目中g(shù)olang性能監(jiān)控解析
這篇文章主要為大家介紹了web項(xiàng)目中g(shù)olang性能監(jiān)控詳細(xì)的解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04

