解決Golang并發(fā)工具Singleflight的問題
前言
前段時間在一個項目里使用到了分布式鎖進行共享資源的訪問限制,后來了解到Golang里還能夠使用singleflight對共享資源的訪問做限制,于是利用空余時間了解,將知識沉淀下來,并做分享
文章盡量用通俗的語言表達自己的理解,從入門demo開始,結(jié)合源碼分析singleflight的重點方法,最后分享singleflight的實際使用方式與需要注意的“坑“。
定義
按照官方文檔的定義,singleflight 提供了一個重復的函數(shù)調(diào)用抑制機制
Package singleflight provides a duplicate function call suppression
用途
通俗的來說就是 singleflight將相同的并發(fā)請求合并成一個請求,進而減少對下層服務(wù)的壓力,通常用于解決緩存擊穿的問題
- 緩存擊穿是指: 在高并發(fā)的場景中,大量的request同時請求查詢一個共享資源(例如Redis緩存的key) ,如果這個共享資源正好過期失效了,就會導致大量相同的request都打到Redis下游的數(shù)據(jù)庫,導致數(shù)據(jù)庫的負載上升。

簡單Demo
var (
sfKey1 = "key1"
wg *sync.WaitGroup
sf singleflight.Group
nums = 10
)
func getValueService(key string) { //service
var val string
wg = &sync.WaitGroup{}
wg.Add(nums)
for idx := 0; idx < nums; idx++ { // 模擬多協(xié)程同時請求
go func(idx int) { // 注意for的一個小坑
defer wg.Done()
value, _ := getAndSetCacheNoChan(idx, key) //簡化代碼,不處理error
log.Printf("request %v get value: %v", idx, value)
val = value
}(idx)
}
wg.Wait()
log.Println("val: ", val)
return
}
// getValueBySingleflight 使用singleflight取cacheKey對應(yīng)的value值
func getValueBySingleflight(idx int, cacheKey string) (string, error) {
log.Printf("idx %v into-cache...", idx)
// 調(diào)用singleflight的Do()方法
value, _, _ := sf.Do(cacheKey, func() (ret interface{}, err error) {
log.Printf("idx %v is-setting-cache", idx)
// 休眠0.1s以捕獲并發(fā)的相同請求
time.Sleep(100 * time.Millisecond)
log.Printf("idx %v set-cache-success!", idx)
return "myValue", nil
})
return value.(string), nil
}看看實際效果

- 由結(jié)果圖可以看到,索引=8的協(xié)程第一個進入了Do()方法,其他協(xié)程則阻塞住,等到idx=8的協(xié)程拿到執(zhí)行結(jié)果后,協(xié)程以亂序的形式返回執(zhí)行結(jié)果。
- 相同key的情況下,singleflight將我們的多個請求合并成1個請求。由1個請求去執(zhí)行對共享資源的操作。
源碼分析
結(jié)構(gòu)
type (
Group struct { // singleflight實體
mu sync.Mutex // 互斥鎖
m map[string]*call // 懶加載
}
call struct {
wg sync.WaitGroup
// 存儲 調(diào)用singleflight.Do()方法返回的結(jié)果
val interface{}
err error
// 調(diào)用singleflight.Forget(key)時將對應(yīng)的key從Group.m中刪除
forgotten bool
// 通俗的理解成singleflight合并的并發(fā)請求數(shù)
dups int
// 存儲 調(diào)用singleflight.DoChan()方法返回的結(jié)果
chans []chan<- Result
}
Result struct {
Val interface{}
Err error
Shared bool
}
)
對外暴露的方法
func Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func DoChan(key string, fn func() (interface{}, error)) <-chan Result)
// 將key從Group.m中刪除
func Forget(key string) DoChan()和Do()最大的區(qū)別是DoChan()屬于異步調(diào)用,返回一個channel,解決同步調(diào)用時的阻塞問題
重點方法分析
Do
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock() // 加互斥鎖
if g.m == nil { // 懶加載map
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 檢查相同的請求已經(jīng)是否進入過singleflight
c.dups++
g.mu.Unlock()
c.wg.Wait() // 調(diào)用waitGroup的wait()方法阻塞住本次調(diào)用,等待第一個進入singleflight的請求執(zhí)行完畢拿到結(jié)果,將本次請求喚醒.
if e, ok := c.err.(*panicError); ok { //如果調(diào)用完成,發(fā)生error ,將error上拋
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
// 返回調(diào)用結(jié)果
return c.val, c.err, true
}
c := new(call) // 相同的請求第一次進入singleflight
c.wg.Add(1)
g.m[key] = c // new一個call實體,放入singleflight.call這個map
g.mu.Unlock()
g.doCall(c, key, fn) //實際執(zhí)行的函數(shù)
return c.val, c.err, c.dups > 0
}
流程圖

由源碼可以分析出,最后實際執(zhí)行我們業(yè)務(wù)邏輯的函數(shù)其實是放到了doCall() 里,我們稍后分析這個函數(shù)
Forget
再簡單看看Forget()函數(shù),很短.
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true // key的forgotten標志位記為true
}
delete(g.m, key) // Group.m中刪除對應(yīng)的key
g.mu.Unlock()
}
doCall
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
//使用雙重defer來區(qū)分error的類型: panic && runtime.error
defer func() {
if !normalReturn && !recovered {
// fn()發(fā)生了panic且fn()中的panic沒有被recover掉
// errGoexit連接runtime.Goexit錯誤
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten { // 檢查key是否調(diào)用了Forget()
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// 如果返回的是?panic?錯誤,為了避免channel被永久阻塞,我們需要確保這個panic無法被recover
if len(c.chans) > 0 {
go panic(e) // panic無法被恢復
select {} // 阻塞本goroutinue.
} else {
panic(e)
}
} else {
// 將結(jié)果正常地返回
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// 表示fn()發(fā)生了panic()
// 此時與panic相關(guān)的堆棧已經(jīng)被丟棄(調(diào)用的fn()) ,無法通過堆棧跟蹤去確定error類型
if r := recover(); r != nil {
c.err = newPanicError(r) //new一個新的自定義panic err,往第一個defer拋
}
}
}()
// 執(zhí)行我們實際的業(yè)務(wù)邏輯,并將業(yè)務(wù)方法的返回值賦給singleflight.call
c.val, c.err = fn()的val和err屬性
// 如果fn()發(fā)生panic,normalReturn無法被賦值為true,而是進入doCall()的第二個defer()
normalReturn = true
}()
// 如果normalResult為false時,表示fn()發(fā)生了panic
// 但是執(zhí)行到了這一步,表示fn()中的panic被recover了
if !normalReturn {
recovered = true // recovered標志位置為true
}
}
由以上分析可以得出幾個重要的結(jié)論
singleflight主要使用sync.Mutex和sync.WaitGroup進行并發(fā)控制.
對于key相同的請求, singleflight只會處理的一個進入的請求,后續(xù)的請求都會使用waitGroup.wait()將請求阻塞
使用雙重defer()區(qū)分了panic和runtime.Goexit錯誤,如果返回的是一個panic錯誤,group.c.chans會發(fā)生阻塞,那么需要拋出這個panic且確保其無法被recover
實際使用
分享一段實際項目中使用singleflight結(jié)合本地緩存的代碼模版
func (s Service) getDataBySingleFlight(ctx context.Context) (entity.List, error) {
// 1. 從localCache查
resData, err := local_cache.Get(ctx, key)
if err != nil {
log.Fatalln()
return resData, err
}
if resData != nil {
return resData, nil
}
// 2. localCache無數(shù)據(jù),從redis查
resData, err = srv.rdsRepo.Get()
if err != nil && err != redis.Nil {
// redis錯誤
log.Fatalln()
return resData, err
} else if redis.Nil == err {
// redis無數(shù)據(jù) ,查db
resData, err, _ = singleFlight.Do(key, func() (interface{}, error) {
// 構(gòu)建db查詢條件
searchConn := entity.SearchInfo{}
// 建議休眠0.1s 捕獲0.1s內(nèi)的重復請求
time.Sleep(100 * time.Millisecond)
// 4. 查db
data, err := srv.dBRepo.GetByConn(ctx, searchConn)
if err != nil {
log.Fatalln()
return data, err
}
// 5. 回寫localCache && redisCache
err = local_cache.Set(ctx, data)
if err != nil {
log.Fatalln()
}
err = srv.rdsRepo.Set(ctx, data)
if err != nil {
log.Fatalln()
}
// 返回db數(shù)據(jù),回寫cache的error不上拋
return data, nil
})
return resData, err
}
return resData, nil
弊端與解決方案
singleflight當然不是解決問題的銀彈,在使用的過程中有一些“坑”需要我們注意
- Do()方法是一個同步調(diào)用的方法,無法處理下游服務(wù)調(diào)用的超時情況
解決方案:
使用singleflight的doChan()方法,在service層使用 channel+select 做超時控制.
func enterGetAndSetCacheWithChan(ctx context.Context, key string) (str string, err error) {
tag := "enterGetAndSetCacheWithChan"
sonCtx, _ := context.WithTimeout(ctx, 2 * time.Second)
val := ""
nums := 10 //協(xié)程數(shù)
wg = &sync.WaitGroup{}
wg.Add(nums)
for idx := 0; idx < nums; idx++ {
go func() {
defer wg.Done()
val, err = getAndSetCacheWithChan(sonCtx, idx, key)
if err != nil {
log.Printf("err:[%+v]", err)
return
}
str = val
}()
}
wg.Wait()
log.Printf("tag:[%s] val:[%s]", tag, val)
return
}
func getAndSetCacheWithChan(ctx context.Context, idx int, cacheKey string) (string, error) {
tag := "getAndSetCacheWithChan"
log.Printf("tag: %s ;idx %d into-cache...", tag, idx)
ch := sf.DoChan(cacheKey, func() (ret interface{}, err error) { // do的入?yún)ey,可以直接使用緩存的key,這樣同一個緩存,只有一個協(xié)程會去讀DB
log.Printf("idx %v is-setting-cache", idx)
time.Sleep(100 * time.Millisecond)
log.Printf("idx %v set-cache-success!", idx)
return "myValue", nil
})
for { // 選擇 context + select 超時控制
select {
case <-ctx.Done():
return "", errors.New("ctx-timeout") // 根據(jù)業(yè)務(wù)邏輯選擇上拋 error
case data, _ := <-ch:
return data.Val.(string), nil
default:
}
}
}
- 如果第一個請求失敗了,那么所有等待的請求都會返回同一個error
解決方案
根據(jù)實際情況,結(jié)合下游服務(wù)調(diào)用耗時與下游實際能支持的QPS等數(shù)據(jù),對key做定時Forget()。
go func() {
time.Sleep(100 * time.Millisecond)
g.Forget(key)
}()
參考文章
singleflight雙重defer: developer.51cto.com/article/652…
到此這篇關(guān)于Golang并發(fā)工具-Singleflight的文章就介紹到這了,更多相關(guān)Golang并發(fā)Singleflight內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解golang consul-grpc 服務(wù)注冊與發(fā)現(xiàn)
這篇文章主要介紹了詳解golang consul-grpc 服務(wù)注冊與發(fā)現(xiàn),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-06-06
VsCode搭建Go語言開發(fā)環(huán)境的配置教程
這篇文章主要介紹了在VsCode中搭建Go開發(fā)環(huán)境的配置教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05
Golang中函數(shù)(Function)和方法(Method)的區(qū)別詳解
在Golang中,大家必然會頻繁使用到函數(shù)(Function)和方法(Method),但是有的同學可能并沒有注意過函數(shù)和方法的異同點,函數(shù)和方法都是用來執(zhí)行特定任務(wù)的代碼塊,雖然很相似,但也有很大的區(qū)別,所以本文將詳細講解函數(shù)和方法的定義以及它們的異同點2023-07-07
Go語言reflect.TypeOf()和reflect.Type通過反射獲取類型信息
這篇文章主要介紹了Go語言reflect.TypeOf()和reflect.Type通過反射獲取類型信息,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
golang 實現(xiàn)Location跳轉(zhuǎn)方式
這篇文章主要介紹了golang 實現(xiàn)Location跳轉(zhuǎn)方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05

