Golang使用singleflight解決并發(fā)重復(fù)請求
背景
高并發(fā)的場景下,經(jīng)常會出現(xiàn)并發(fā)重復(fù)請求資源的情況。
比如說,緩存失效時(shí),我們?nèi)フ埱骴b獲取最新的數(shù)據(jù),如果這個key是一個熱key,那么在緩存失效的瞬間,可能會有大量的并發(fā)請求訪問到db,導(dǎo)致db訪問量陡增,甚至是打崩db,這種場景也就是我們常說的緩存擊穿。

針對同一個key的并發(fā)請求,這些請求和響應(yīng)實(shí)際上都是一樣的。所以我們可以把這種并發(fā)請求優(yōu)化為:只進(jìn)行一次實(shí)際請求去訪問資源,然后得到實(shí)際響應(yīng),所有的并發(fā)請求共享這個實(shí)際響應(yīng)的結(jié)果
針對分布式場景,我們可以使用分布式鎖來實(shí)現(xiàn)
針對單機(jī)場景,我們可以使用singleflight來實(shí)現(xiàn)

singleflight
singleflight是golang內(nèi)置的一個包,這個包提供了對重復(fù)函數(shù)調(diào)用的抑制功能,也就是保證并發(fā)請求只會有一個實(shí)際請求去訪問資源,所有并發(fā)請求共享實(shí)際響應(yīng)。
使用
singleflight在golang sdk源碼中的路徑為:src/internal/singleflight
但是internal是golang sdk內(nèi)部的包,所以我們不能直接去使用
使用步驟:
- 引入go mod
- 使用singleflight包
引入go mod
go get golang.org/x/sync
使用singleflight包
singleflight包主要提供了三個方法
// 方法作用:保證并發(fā)請求只會執(zhí)行一次函數(shù),并共享實(shí)際響應(yīng) // 請求參數(shù) // key:請求的唯一標(biāo)識,相同的key會被視為并發(fā)請求 // fn:實(shí)際需要執(zhí)行的函數(shù) // 響應(yīng)參數(shù) // v:實(shí)際執(zhí)行函數(shù)的返回值 // err:實(shí)際執(zhí)行函數(shù)的錯誤 // shared:返回值v是否被共享,若存在并發(fā)請求,則為true;若不存在并發(fā)請求則為false func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) // 方法作用:和Do類似,不過方法返回的是chan func (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool) // 方法作用:刪除key,一般來說不會直接使用這個方法 func (g *Group) ForgetUnshared(key string) bool
針對以上的三個方法,我們重點(diǎn)了解一下Do方法的使用即可
沒有使用singleflight之前
package main
import (
"fmt"
"sync"
"testing"
"time"
)
var (
mx sync.Mutex
wg sync.WaitGroup
cacheData = make(map[string]string, 0)
)
func TestSingleFlight(t *testing.T) {
// 添加10個任務(wù),模擬并發(fā)請求
wg.Add(10)
for i := 0; i < 10; i++ {
go getData("demo")
}
// 等待所有任務(wù)完成
wg.Wait()
}
func getData(key string) {
data, _ := getDataFromCache(key)
if len(data) == 0 {
// 緩存沒有找到,則進(jìn)行回源
data, _ = getDataFromDB(key)
// 設(shè)置緩存
mx.Lock()
cacheData[key] = data
mx.Unlock()
}
fmt.Println(data)
// 任務(wù)完成
wg.Done()
}
func getDataFromCache(key string) (string, error) {
return cacheData[key], nil
}
func getDataFromDB(key string) (string, error) {
fmt.Println("getDataFromDB key: ", key)
// 模擬訪問db的耗時(shí)
time.Sleep(10 * time.Millisecond)
return "db data", nil
}執(zhí)行TestSingleFlight函數(shù)后,會發(fā)現(xiàn)并發(fā)請求多次調(diào)用了getDataFromDB函數(shù)
使用singleflight之后
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"sync"
"testing"
"time"
)
var (
mx sync.Mutex
wg sync.WaitGroup
g singleflight.Group
cacheData = make(map[string]string, 0)
)
func TestSingleFlight(t *testing.T) {
// 添加10個任務(wù)
wg.Add(10)
for i := 0; i < 10; i++ {
go getDataSingleWarp("demo")
}
// 等待所有任務(wù)完成
wg.Wait()
}
func getDataSingleWarp(key string) {
data, _ := getDataFromCache(key)
if len(data) == 0 {
// 使用singleflight來避免并發(fā)請求,實(shí)際改動就這一行
d, _, shared := g.Do(key, func() (interface{}, error) {
return getDataFromDB(key)
})
fmt.Println(shared)
data = d.(string)
// 設(shè)置緩存
mx.Lock()
cacheData[key] = data
mx.Unlock()
}
fmt.Println(data)
wg.Done()
}
func getDataFromCache(key string) (string, error) {
return cacheData[key], nil
}
func getDataFromDB(key string) (string, error) {
fmt.Println("getDataFromDB key: ", key)
// 模擬訪問db的耗時(shí)
time.Sleep(10 * time.Millisecond)
return "db data", nil
}執(zhí)行TestSingleFlight函數(shù)后,會發(fā)現(xiàn)只調(diào)用了一次getDataFromDB函數(shù)
源碼分析
- Group struct:封裝并發(fā)請求
- call struct:每一個需要執(zhí)行的函數(shù),都會被封裝成一個call
- func Do:對并發(fā)請求進(jìn)行控制的方法
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import (
"bytes"
"errors"
"fmt"
"runtime"
"runtime/debug"
"sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
value interface{}
stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
type call struct {
// 保證相同key,只會進(jìn)行一次實(shí)際請求
// 相同key的并發(fā)請求會共享返回
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
// 實(shí)際執(zhí)行函數(shù)的返回值和錯誤
val interface{}
err error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
// 是否已刪除當(dāng)前并發(fā)請求的key
forgotten bool
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
// 并發(fā)請求的次數(shù)
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
// key代表請求的唯一標(biāo)識,相同的key會被視為并發(fā)請求
// value代表實(shí)際請求,每一個實(shí)際請求都會被封裝為call
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 加鎖
g.mu.Lock()
// 懶加載
if g.m == nil {
g.m = make(map[string]*call)
}
// 判斷是否有并發(fā)請求,如果key已經(jīng)存在,則說明存在并發(fā)請求
if c, ok := g.m[key]; ok {
// 并發(fā)請求次數(shù)+1
c.dups++
// 解鎖
g.mu.Unlock()
// 等待實(shí)際請求執(zhí)行完
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
// 共享響應(yīng)
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
// 添加并發(fā)請求key
g.m[key] = c
// 解鎖
g.mu.Unlock()
// 進(jìn)行實(shí)際請求
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
// 正常返回標(biāo)識
normalReturn := false
// 是否執(zhí)行了recover標(biāo)識
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
// 實(shí)際請求執(zhí)行完成
c.wg.Done()
// 加鎖
g.mu.Lock()
defer g.mu.Unlock()
// 刪除并發(fā)請求key
if !c.forgotten {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
// 匿名函數(shù)立即執(zhí)行
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
// 執(zhí)行實(shí)際函數(shù)
c.val, c.err = fn()
// 正常返回
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}到此這篇關(guān)于Golang使用singleflight解決并發(fā)重復(fù)請求的文章就介紹到這了,更多相關(guān)Go singleflight內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言基本的語法和內(nèi)置數(shù)據(jù)類型初探
這篇文章主要介紹了Go語言基本的語法和內(nèi)置數(shù)據(jù)類型,是golang入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下2015-10-10
Golang中json和jsoniter的區(qū)別使用示例
這篇文章主要介紹了Golang中json和jsoniter的區(qū)別使用示例,本文給大家分享兩種區(qū)別,結(jié)合示例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2023-12-12
Go語言學(xué)習(xí)之循環(huán)語句使用詳解
這篇文章主要為大家介紹了Go語言中的常用循環(huán)語句的使用,例如:for循環(huán)、for-each、break等,文中的示例代碼講解詳細(xì),感興趣的可以了解一下2022-04-04

