Golang限流庫與漏桶和令牌桶的使用介紹
RateLimit 限流中間件
為什么需要限流中間件
在大數(shù)據(jù)量高并發(fā)訪問時(shí),經(jīng)常會出現(xiàn)服務(wù)或接口面對大量的請求而導(dǎo)致數(shù)據(jù)庫崩潰的情況,甚至引發(fā)連鎖反映導(dǎo)致整個(gè)系統(tǒng)崩潰?;蛘哂腥藧阂夤艟W(wǎng)站,大量的無用請求出現(xiàn)會導(dǎo)致緩存穿透的情況出現(xiàn)。使用限流中間件可以在短時(shí)間內(nèi)對請求進(jìn)行限制數(shù)量,起到降級的作用,從而保障了網(wǎng)站的安全性。
應(yīng)對大量并發(fā)請求的策略
- 使用消息中間件進(jìn)行統(tǒng)一限制(降速)
- 使用限流方案將多余請求返回(限流)
- 升級服務(wù)器
- 負(fù)載均衡升級
- 等等
可以看出在代碼已經(jīng)無法提升的情況下,只能去提升硬件水平?;蛘吒膭蛹軜?gòu)再加一層!也可以使用消息中間件統(tǒng)一處理。而結(jié)合看來,限流方案是一種既不需要大幅改動也不需要高額開銷的策略。
常見的限流方案
- 令牌桶算法
- 漏桶算法
- 滑動窗口算法
- 等等
這里主要根據(jù)golang的庫介紹令牌桶和漏桶的實(shí)現(xiàn)原理以及在實(shí)際項(xiàng)目中如何應(yīng)用。
漏桶
引入ratelimit庫
go get -u go.uber.org/ratelimit
庫函數(shù)源代碼
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
return newAtomicBased(rate, opts...)
}
// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}
initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}
該函數(shù)使用了函數(shù)選項(xiàng)模式對多個(gè)結(jié)構(gòu)體對象進(jìn)行初始化
首先根據(jù)傳入的值來初始化一個(gè)桶結(jié)構(gòu)體 rate 為int傳參 (time.Duration(rate)單位為納秒 = 1/1e9秒)
初始化過程中包括了
- 每一滴水需要的時(shí)間
perquest = config.per / time.Duration(rate) maxSlack寬松度(寬松度為負(fù)值)-1 * time.Duration(config.slack) * perRequest松緊度是用來規(guī)范等待時(shí)間的
// Clock is the minimum necessary interface to instantiate a rate limiter with
// a clock or mock clock, compatible with clocks created using
// github.com/andres-erbsen/clock.
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
同時(shí)還需要結(jié)構(gòu)體Clock來記錄當(dāng)前請求的時(shí)間now和此刻的請求所需要花費(fèi)等待的時(shí)間sleep
type state struct {
last time.Time
sleepFor time.Duration
}
state 主要用來記錄上次執(zhí)行的時(shí)間以及當(dāng)前執(zhí)行請求需要花費(fèi)等待的時(shí)間(作為中間狀態(tài)記錄)
最重要的Take邏輯
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
var (
newState state
taken bool
interval time.Duration
)
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// If this is our first request, then we allow it.
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// 計(jì)算是否需要進(jìn)行等待取水操作
newState.sleepFor += t.perRequest(每兩滴水之間的間隔時(shí)間) - now.Sub(oldState.last)(當(dāng)前時(shí)間與上次取水時(shí)間的間隔)
// 如果等待取水時(shí)間特別小,就需要松緊度進(jìn)行維護(hù)
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果等待時(shí)間大于0,就進(jìn)行更新
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
// 最后返回需要等待的時(shí)間
return newState.last
}
實(shí)現(xiàn)一個(gè)Take方法
- 該Take方法會進(jìn)行原子性操作(可以理解為加鎖和解鎖),在大量并發(fā)請求下仍可以保證正常使用。
- 記錄下當(dāng)前的時(shí)間 now := t.clock.Now()
- oldState.last.IsZero() 判斷是不是第一次取水,如果是就直接將state結(jié)構(gòu)體中的值進(jìn)行返回。而這個(gè)結(jié)構(gòu)體中初始化了上次執(zhí)行時(shí)間,如果是第一次取水就作為當(dāng)前時(shí)間直接傳參。
- 如果 newState.sleepFor 非常小,就會出現(xiàn)問題,因此需要借助寬松度,一旦這個(gè)最小值比寬松度小,就用寬松度對取水時(shí)間進(jìn)行維護(hù)。
- 如果newState.sleepFor > 0 就直接更新結(jié)構(gòu)體中上次執(zhí)行時(shí)間newState.last = newState.last.Add(newState.sleepFor)并記錄需要等待的時(shí)間interval, newState.sleepFor = newState.sleepFor, 0。
- 如果允許取水和等待操作,那就說明沒有發(fā)生并發(fā)競爭的情況,就模擬睡眠時(shí)間t.clock.Sleep(interval)。然后將取水的目標(biāo)時(shí)間進(jìn)行返回,由服務(wù)端代碼來判斷是否打回響應(yīng)或者等待該時(shí)間后繼續(xù)響應(yīng)。
t.clock.Sleep(interval)
func (c *clock) Sleep(d time.Duration) {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> time.Sleep(d) }實(shí)際上在一個(gè)請求來的時(shí)候,限流器就會進(jìn)行睡眠對應(yīng)的時(shí)間,并在睡眠后將最新取水時(shí)間返回。
實(shí)際應(yīng)用(使用Gin框架)
func ratelimit1() func(ctx *gin.Context) {
r1 := rate1.New(100)
return func(ctx *gin.Context) {
now := time.Now()
// Take 返回的是一個(gè) time.Duration的時(shí)間
if r1.Take().Sub(now) > 0 {
// 返回的時(shí)間比當(dāng)前的時(shí)間還大,說明需要進(jìn)行等待
// 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
// 如果不需要等待請求時(shí)間,就直接進(jìn)行Abort 然后返回
response(ctx, http.StatusRequestTimeout, "rate1 limit...")
fmt.Println("rate1 limit...")
ctx.Abort()
return
}
// 放行
ctx.Next()
}
}
這里你可以進(jìn)行選擇是否返回。因?yàn)門ake一定會執(zhí)行sleep函數(shù),所以當(dāng)執(zhí)行take結(jié)束后表示當(dāng)前請求已經(jīng)接到了水。當(dāng)前演示使用第一種情況。
- 如果你的業(yè)務(wù)要求響應(yīng)不允許進(jìn)行等待。那么可以在該請求接完水之后然后,如上例。
- 如果你的業(yè)務(wù)允許響應(yīng)等待,那么該請求等待對應(yīng)的接水時(shí)間后進(jìn)行下一步。具體代碼就是將
if中的內(nèi)容直接忽略。(建議使用)
測試代碼
這里定義了一個(gè)響應(yīng)函數(shù)和一個(gè)handler函數(shù)方便測試
func response(c *gin.Context, code int, info any) {
c.JSON(code, info)
}
func pingHandler(c *gin.Context) {
response(c, 200, "ping ok~")
}
執(zhí)行go test -run=Run -v先開啟一個(gè)web服務(wù)
func TestRun(t *testing.T) {
r := gin.Default()
r.GET("/ping1", ratelimit1(), pingHandler)
r.GET("/ping2", ratelimit2(), helloHandler)
_ = r.Run(":4399")
}
使用接口壓力測試工具go-wrk進(jìn)行測試->tsliwowicz/go-wrk: go-wrk
在golang install版本可以直接通過go install github.com/tsliwowicz/go-wrk@latest下載
使用幫助
Usage: go-wrk <options> <url>
Options:
-H Header to add to each request (you can define multiple -H flags) (Default )
-M HTTP method (Default GET)
-T Socket/request timeout in ms (Default 1000)
-body request body string or @filename (Default )
-c Number of goroutines to use (concurrent connections) (Default 10)
-ca CA file to verify peer against (SSL/TLS) (Default )
-cert CA certificate file to verify peer against (SSL/TLS) (Default )
-d Duration of test in seconds (Default 10)
-f Playback file name (Default <empty>)
-help Print help (Default false)
-host Host Header (Default )
-http Use HTTP/2 (Default true)
-key Private key file name (SSL/TLS (Default )
-no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
-no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
-no-vr Skip verifying SSL certificate of the server (Default false)
-redir Allow Redirects (Default false)
-v Print version details (Default false)
-t 8個(gè)線程 -c 400個(gè)連接 -n 模擬1k次請求 -d 替換-n 表示連接時(shí)間
輸入go-wrk -t=8 -c=400 -n=1000 http://127.0.0.1:4399/ping1
可以稍微等待一下水流積攢否則一個(gè)請求也不一定能夠響應(yīng)。

可以看出,89個(gè)請求全部返回。也就是說在一段請求高峰期,不會有請求進(jìn)行響應(yīng)。因此我認(rèn)為既然內(nèi)部已經(jīng)睡眠,那么就應(yīng)該對請求放行處理。限流器實(shí)現(xiàn)的比較純粹!
令牌桶
引入ratelimit庫
go get -u github.com/juju/ratelimit
初始化
// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}
// NewBucketWithClock is identical to NewBucket but injects a testable clock
// interface.
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}
進(jìn)行Bucket桶的初始化。
/ NewBucketWithQuantumAndClock is like NewBucketWithQuantum, but
// also has a clock argument that allows clients to fake the passing
// of time. If clock is nil, the system clock will be used.
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if clock == nil {
clock = realClock{}
}
// 填充速率
if fillInterval <= 0 {
panic("token bucket fill interval is not > 0")
}
// 最大令牌容量
if capacity <= 0 {
panic("token bucket capacity is not > 0")
}
// 單次令牌生成量
if quantum <= 0 {
panic("token bucket quantum is not > 0")
}
return &Bucket{
clock: clock,
startTime: clock.Now(),
latestTick: 0,
fillInterval: fillInterval,
capacity: capacity,
quantum: quantum,
availableTokens: capacity,
}
}
令牌桶初始化過程,初始化結(jié)構(gòu)體 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。
如果三個(gè)變量有一個(gè)小于或者等于0的話直接進(jìn)行報(bào)錯(cuò)返回。在最開始就將當(dāng)前令牌數(shù)初始化為最大容量。
調(diào)用
// TakeAvailable takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}
調(diào)用TakeAvailable函數(shù),傳入?yún)?shù)為需要取出的令牌數(shù)量,返回參數(shù)是實(shí)際能夠取出的令牌數(shù)量。
內(nèi)部實(shí)現(xiàn)
// takeAvailable is the internal version of TakeAvailable - it takes the
// current time as an argument to enable easy testing.
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
// 如果需要取出的令牌數(shù)小于等于零,那么就返回0個(gè)令牌
if count <= 0 {
return 0
}
// 根據(jù)時(shí)間對當(dāng)前桶中令牌數(shù)進(jìn)行計(jì)算
tb.adjustavailableTokens(tb.currentTick(now))
// 計(jì)算之后的令牌總數(shù)小于等于0,說明當(dāng)前令牌不足取出,那么就直接返回0個(gè)令牌
if tb.availableTokens <= 0 {
return 0
}
// 如果當(dāng)前存儲的令牌數(shù)量多于請求數(shù)量,那么就返回取出令牌數(shù)
if count > tb.availableTokens {
count = tb.availableTokens
}
// 調(diào)整令牌數(shù)
tb.availableTokens -= count
return count
}
調(diào)整令牌
// adjustavailableTokens adjusts the current number of tokens
// available in the bucket at the given time, which must
// be in the future (positive) with respect to tb.latestTick.
func (tb *Bucket) adjustavailableTokens(tick int64) {
lastTick := tb.latestTick
tb.latestTick = tick
// 如果當(dāng)前令牌數(shù)大于最大等于容量,直接返回最大容量
if tb.availableTokens >= tb.capacity {
return
}
// 當(dāng)前令牌數(shù) += (當(dāng)前時(shí)間 - 上次取出令牌數(shù)的時(shí)間) * quannum(每次生成令牌量)
tb.availableTokens += (tick - lastTick) * tb.quantum
// 如果當(dāng)前令牌數(shù)大于最大等于容量, 將當(dāng)前令牌數(shù) = 最大容量 然后返回 當(dāng)前令牌數(shù)
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
return
}
實(shí)現(xiàn)原理
加鎖 defer 解鎖
判斷count(想要取出的令牌數(shù)) 是否小于等于 0,如果是直接返回 0
調(diào)用函數(shù)adjustTokens 獲取可用的令牌數(shù)量,該函數(shù)實(shí)現(xiàn)原理:
- 如果當(dāng)前令牌數(shù)大于最大等于容量,直接返回最大容量
- 當(dāng)前令牌數(shù) += (當(dāng)前時(shí)間 - 上次取出令牌數(shù)的時(shí)間) * quannum(每次生成令牌量)
- 如果當(dāng)前令牌數(shù)大于最大等于容量, 將當(dāng)前令牌數(shù) = 最大容量 然后返回 當(dāng)前令牌數(shù)
如果當(dāng)前可以取出的令牌數(shù)小于等于0 直接返回 0
如果當(dāng)前可以取出的令牌數(shù)小于當(dāng)前想要取出的令牌數(shù)(count) count = 當(dāng)前可以取出的令牌數(shù)
當(dāng)前的令牌數(shù) -= 取出的令牌數(shù)(count)
返回 count
額外介紹
take函數(shù),能夠返回等待時(shí)間和布爾值,允許欠賬,沒有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration函數(shù),可以根據(jù)最大等待時(shí)間來進(jìn)行判斷。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因?yàn)樗麄儍?nèi)部的實(shí)現(xiàn)都基于令牌調(diào)整,我這里不做過多介紹,如果感興趣可以自行研究一下。
測試
func ratelimit2() func(ctx *gin.Context) {
// 生成速率 最大容量
r2 := rate2.NewBucket(time.Second, 200)
return func(ctx *gin.Context) {
//r2.Take() // 允許欠賬,令牌不夠也可以接收請求
if r2.TakeAvailable(1) == 1 {
// 如果想要取出1個(gè)令牌并且能夠取出,就放行
ctx.Next()
return
}
response(ctx, http.StatusRequestTimeout, "rate2 limit...")
ctx.Abort()
return
}
}

由于壓測速度過于快速,在實(shí)際過程中可以根據(jù)調(diào)整令牌生成速率來進(jìn)行具體限流!
小結(jié)
令牌桶可以允許自己判斷請求是否繼續(xù),不用進(jìn)行睡眠。而漏桶需要進(jìn)行睡眠,并沒有提供方法讓程序員進(jìn)行判斷是否放行。
個(gè)人用令牌桶還是多的,也可能是我對漏桶源碼的解析有誤,沒有看到相關(guān)的點(diǎn)。
到此這篇關(guān)于Golang限流庫與漏桶和令牌桶的使用介紹的文章就介紹到這了,更多相關(guān)Golang限流庫內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang?channel多協(xié)程通信常用方法底層原理全面解析
channel?是?goroutine?與?goroutine?之間通信的重要橋梁,借助?channel,我們能很輕易的寫出一個(gè)多協(xié)程通信程序,今天,我們就來看看這個(gè)?channel?的常用用法以及底層原理2023-09-09
Go實(shí)現(xiàn)map并發(fā)安全的3種方式總結(jié)
Go的原生map不是并發(fā)安全的,在多協(xié)程讀寫同一個(gè)map的時(shí)候,安全性無法得到保障,這篇文章主要給大家總結(jié)介紹了關(guān)于Go實(shí)現(xiàn)map并發(fā)安全的3種方式,需要的朋友可以參考下2023-10-10
Go語言性能監(jiān)控和調(diào)優(yōu)的工具和方法
本文介紹了Go語言性能監(jiān)控和調(diào)優(yōu)的工具和方法,包括?pprof、expvar?和?trace?等工具的使用方法和注意事項(xiàng),以及性能調(diào)優(yōu)的一些常見方法,如減少內(nèi)存分配、避免頻繁的垃圾回收、避免過度查詢數(shù)據(jù)庫等,針對不同的程序,應(yīng)該根據(jù)實(shí)際情況采用不同的優(yōu)化方法2024-01-01
Go1.18新特性使用Generics泛型進(jìn)行流式處理
這篇文章主要為大家介紹了Go1.18新特性使用Generics泛型進(jìn)行流式處理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06
解決vscode中g(shù)olang插件依賴安裝失敗問題
這篇文章主要介紹了解決vscode中g(shù)olang插件依賴安裝失敗問題,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-08-08

