Go singleflight使用以及原理
這個東西很重要,可以經(jīng)常用在項目當中,所以我們單獨拿出來進行講解。
在使用它之前我們需要導(dǎo)包:
go get golang.org/x/sync/singleflight
golang/sync/singleflight.Group 是 Go 語言擴展包中提供了另一種同步原語,它能夠在一個服務(wù)中抑制對下游的多次重復(fù)請求。一個比較常見的使用場景是:我們在使用 Redis 對數(shù)據(jù)庫中的數(shù)據(jù)進行緩存,發(fā)生緩存擊穿時,大量的流量都會打到數(shù)據(jù)庫上進而影響服務(wù)的尾延時。

但是 golang/sync/singleflight.Group 能有效地解決這個問題,它能夠限制對同一個鍵值對的多次重復(fù)請求,減少對下游的瞬時流量。

使用方法
singleflight類的使用方法就新建一個singleflight.Group,使用其方法Do或者DoChan來包裝方法,被包裝的方法在對于同一個key,只會有一個協(xié)程執(zhí)行,其他協(xié)程等待那個協(xié)程執(zhí)行結(jié)束后,拿到同樣的結(jié)果。
Group結(jié)構(gòu)體
代表一類工作,同一個group中,同樣的key同時只能被執(zhí)行一次
Do方法
func (g *Group) Do(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) (v interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, err error, shared bool)key:同一個key,同時只有一個協(xié)程執(zhí)行
fn:被包裝的函數(shù)
v:返回值,即執(zhí)行結(jié)果。其他等待的協(xié)程都會拿到
shared:表示是否由其他協(xié)程得到了這個結(jié)果v
DoChan方法
func (g *Group) DoChan(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) <-chan Result和Do差不多其實,因此我們就只講解Do的實際應(yīng)用場景了。
具體應(yīng)用場景
var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
log.Printf("request %s start to get and set cache...", r.URL)
value, err, _ := singleSetCache.Do(cacheKey, func() (interface{}, error) {
log.Printf("request %s is getting cache...", r.URL)
time.Sleep(3 * time.Second)
log.Printf("request %s get cache success!", r.URL)
return cacheKey, nil
})
return value.(string), err
}
func main() {
r := gin.Default()
r.GET("/sekill/:id", func(context *gin.Context) {
ID := context.Param("id")
cache, err := GetAndSetCache(context.Request, ID)
if err != nil {
log.Println(err)
}
log.Printf("request %s get value: %v", context.Request.URL, cache)
})
r.Run()
}來看一下執(zhí)行結(jié)果:
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:19 request /sekill/9 start to get and set cache...
2022/12/29 16:21:19 request /sekill/5 start to get and set cache...
2022/12/29 16:21:21 request /sekill/9 get cache success!
2022/12/29 16:21:21 request /sekill/5 get cache success!
2022/12/29 16:21:21 request /sekill/5 get value: 5
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 3.0106529s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.8090881s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.2166003s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.6064069s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.4178652s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.8101267s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 3.0116892s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.6074537s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.4076473s | 127.0.0.1 | GET "/sekill/5"
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.218686s | 127.0.0.1 | GET "/sekill/5"
可以看到確實只有一個協(xié)程執(zhí)行了被包裝的函數(shù),并且其他協(xié)程都拿到了結(jié)果。
接下來我們來看一下它的原理吧!
原理
首先來看一下Group結(jié)構(gòu)體:
type Group struct {
mu sync.Mutex // 鎖保證并發(fā)安全
m map[string]*call //保存key對應(yīng)的函數(shù)執(zhí)行過程和結(jié)果的變量。
}
然后我們來看一下call結(jié)構(gòu)體:
type call struct {
wg sync.WaitGroup //用WaitGroup實現(xiàn)只有一個協(xié)程執(zhí)行函數(shù)
val interface{} //函數(shù)執(zhí)行結(jié)果
err error
forgotten bool
dups int //含義是duplications,即同時執(zhí)行同一個key的協(xié)程數(shù)量
chans []chan<- Result
}
然后我們來看一下Do方法:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 寫Group的m字段時,加鎖保證寫安全
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
// 如果key已經(jīng)存在,說明已經(jīng)由協(xié)程在執(zhí)行,則dups++并等待其執(zhí)行結(jié)果,執(zhí)行結(jié)果保存在對應(yīng)的call的val字段里
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 如果key不存在,則新建一個call,并使用WaitGroup來阻塞其他協(xié)程,同時在m字段里寫入key和對應(yīng)的call
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn) // 進來的第一個協(xié)程就來執(zhí)行這個函數(shù)
return c.val, c.err, c.dups > 0
}
然后我們來分析一下doCall函數(shù):
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
- 運行傳入的函數(shù)
fn,該函數(shù)的返回值會賦值給c.val和c.err; - 調(diào)用
sync.WaitGroup.Done方法通知所有等待結(jié)果的Goroutine— 當前函數(shù)已經(jīng)執(zhí)行完成,可以從call結(jié)構(gòu)體中取出返回值并返回了; - 獲取持有的互斥鎖并通過管道將信息同步給使用
golang/sync/singleflight.Group.DoChan方法的Goroutine;
問題分析
分析了源碼之后,我們得出了一個結(jié)論,這個東西是用阻塞來實現(xiàn)的,這就引發(fā)了一個問題:如果我們處理的那個請求剛好遇到問題了,那么后面的所有請求都會被阻塞,也就是,我們應(yīng)該加上適合的超時控制,如果在一定時間內(nèi),沒有獲得結(jié)果,那么就當作超時處理。
于是這個適合我們應(yīng)該使用DoChan()。兩者實現(xiàn)上完全一樣,不同的是, DoChan() 通過 channel 返回結(jié)果。因此可以使用 select 語句實現(xiàn)超時控制。
var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
log.Printf("request %s start to get and set cache...", r.URL)
retChan := singleSetCache.DoChan(cacheKey, func() (interface{}, error) {
log.Printf("request %s is getting cache...", r.URL)
time.Sleep(3 * time.Second)
log.Printf("request %s get cache success!", r.URL)
return cacheKey, nil
})
var ret singleflight.Result
timeout := time.After(2 * time.Second)
select {
case <-timeout:
log.Println("time out!")
return "", errors.New("time out")
case ret = <-retChan: // 從chan中獲取結(jié)果
return ret.Val.(string), ret.Err
}
}
func main() {
r := gin.Default()
r.GET("/sekill/:id", func(context *gin.Context) {
ID := context.Param("id")
cache, err := GetAndSetCache(context.Request, ID)
if err != nil {
log.Println(err)
}
log.Printf("request %s get value: %v", context.Request.URL, cache)
})
r.Run()
}
補充
這里其實還有一個Forget方法,它可以在映射表中刪除某個鍵,接下來對鍵的調(diào)用就不會等待前面的函數(shù)返回了。
總結(jié)
當然,如果單次的失敗無法容忍,在高并發(fā)的場景下更好的處理方案是:
放棄使用同步請求,犧牲數(shù)據(jù)更新的實時性
“緩存” 存儲準實時的數(shù)據(jù) + “異步更新” 數(shù)據(jù)到緩存
到此這篇關(guān)于Go singleflight使用以及原理的文章就介紹到這了,更多相關(guān)Go singleflight內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
B站新一代 golang規(guī)則引擎gengine基礎(chǔ)語法
這篇文章主要為大家介紹了B站新一代 golang規(guī)則引擎gengine基礎(chǔ)語法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12
golang實現(xiàn)基于channel的通用連接池詳解
這篇文章主要給大家介紹了關(guān)于golang實現(xiàn)基于channel的通用連接池的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-02-02

