Go并發(fā)原語之SingleFlight請求合并方法實例
SingleFlight 的使用場景
在處理多個 goroutine 同時調(diào)用同一個函數(shù)的時候,如何只用一個 goroutine 去調(diào)用一次函數(shù),并將返回結(jié)果給到所有 goroutine,這是可以使用 SingleFlight,可以減少并發(fā)調(diào)用的數(shù)量。
在高并發(fā)請求場景中,例如秒殺場景:多個用戶在同一時間查詢庫存數(shù),這時候?qū)τ谒械挠脩舳?,同一時間查詢結(jié)果都是一樣的,如果后臺都去查緩存或者數(shù)據(jù)庫,那么性能壓力很大。如果相同時間只有一個查詢,那么性能將顯著提升。

一句話總結(jié):SingleFlight 主要作用是合并并發(fā)請求的場景,針對于相同的讀請求。
SingleFlight 的基本使用
下面先看看這段代碼,5個協(xié)程同時并發(fā)返回 getProductById ,看看輸出結(jié)果如何:
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result := getProductById("商品A")
fmt.Printf("%v\n", result)
}()
}
wg.Wait()
}
func getProductById(name string) string {
fmt.Println("getProductById doing...")
time.Sleep(time.Millisecond * 10) // 模擬一下耗時
return name
}$ go run main.go getProductById doing... getProductById doing... getProductById doing... getProductById doing... getProductById doing... 商品A 商品A 商品A 商品A 商品A
可以看出 getProductById 方法被訪問了五次,那么如何通過 SingleFlight 進(jìn)行優(yōu)化呢?
定義一個全局變量 SingleFlight,在訪問 getProductById 方法時調(diào)用 Do 方法,即可實現(xiàn)同一時間只有一次方法,代碼如下:
import (
"fmt"
"golang.org/x/sync/singleflight"
"sync"
"time"
)
var g singleflight.Group
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, _, _ := g.Do("商品A", func() (interface{}, error) {
result := getProductById("商品A")
return result, nil
})
fmt.Printf("%v\n", resp)
}()
}
wg.Wait()
}
func getProductById(name string) string {
fmt.Println("getProductById doing...")
time.Sleep(time.Millisecond * 10) // 模擬一下耗時
return name
}$ go run main.go getProductById doing... 商品A 商品A 商品A 商品A 商品A
你可能會想 SingleFlight 和 sync.Once 的區(qū)別,sync.Once 主要是用在單次初始化場景中,而 SingleFlight 主要用在合并請求中,針對于同一時間的并發(fā)場景。
SingleFlight 的實現(xiàn)原理
SingleFlight 的數(shù)據(jù)結(jié)構(gòu)是 Group ,結(jié)構(gòu)如下:
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}可以看出,SingleFlight 是使用互斥鎖 Mutex 和 Map 來實現(xiàn)的。互斥鎖 Mutex 提供并發(fā)時的讀寫保護(hù),而 Map 用于保存同一個 key 正在處理的請求。
其提供了3個方法:

Do 方法的實現(xiàn)邏輯
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}SingleFlight 定義了一個輔助對象 call,用于代表正在執(zhí)行 fn 函數(shù)的請求或者是否已經(jīng)執(zhí)行完請求。
- 如果存在相同的 key,其他請求將會等待這個 key 執(zhí)行完成,并使用第一個 key 獲取到的請求結(jié)果
- 如果不存在,創(chuàng)建一個 call ,并將其加入到 map 中,執(zhí)行調(diào)用 fn 函數(shù)。
DoChan 方法的實現(xiàn)邏輯
而 DoChan 方法與 Do 方法類似:
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}Forget 方法的實現(xiàn)邏輯
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}將 key 從 map 中刪除。
總結(jié)
使用 SingleFlight 時,通過將多個請求合并成一個,降低并發(fā)訪問的壓力,極大地提升了系統(tǒng)性能,針對于多并發(fā)讀請求的場景,可以考慮是否滿足 SingleFlight 的使用情況。
而對于并發(fā)寫請求的場景,如果是多次寫只需要一次的情況,那么也是滿足的。例如:每個 http 請求都會攜帶 token,每次請求都需要把 token 存入緩存或者寫入數(shù)據(jù)庫,如果多次并發(fā)請求同時來,只需要寫一次即可
以上就是Go并發(fā)原語之SingleFlight請求合并方法實例的詳細(xì)內(nèi)容,更多關(guān)于Go SingleFlight 請求合并的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go切片導(dǎo)致rand.Shuffle產(chǎn)生重復(fù)數(shù)據(jù)的原因與解決方案
在 Go 語言的實際開發(fā)中,切片(slice)是一種非常靈活的數(shù)據(jù)結(jié)構(gòu),然而,由于其底層數(shù)據(jù)共享的特性,在某些情況下可能會導(dǎo)致意想不到的 Bug,本文將詳細(xì)分析 rand.Shuffle 之后,切片中的數(shù)據(jù)出現(xiàn)重復(fù)的問題,探討其根本原因,并給出最佳解決方案,需要的朋友可以參考下2025-02-02
Go語言使用Timeout Context取消任務(wù)的實現(xiàn)
本文主要介紹了Go語言使用Timeout Context取消任務(wù)的實現(xiàn),包括基本的任務(wù)取消和控制HTTP客戶端請求的超時,具有一定的參考價值,感興趣的可以了解一下2024-01-01

