Golang根據(jù)job數(shù)量動態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法詳解
需求
第三方的接口,限制接口請求的QPS,每秒5次
需要控制job「訪問接口」的次數(shù),每秒不能同時超過5次,包括 進行中的任務(wù)、剛啟動的任務(wù)
使用限流器來控制任務(wù)的啟動頻率
要確保單位時間內(nèi)(例如每秒)運行的任務(wù)數(shù)量不超過特定的上限(如5個任務(wù)),并且在任務(wù)執(zhí)行完成得很快時,考慮已完成的任務(wù)和正在執(zhí)行的任務(wù)作為正在運行的任務(wù)總數(shù),可以使用限流器來控制任務(wù)的啟動頻率,并結(jié)合使用信號量來管理同時運行的任務(wù)數(shù)量。
具體來說,使用一個信號量來限制同時進行的任務(wù)數(shù)量,并且在任務(wù)完成時,僅在下一秒鐘允許新的任務(wù)開始,以確保即使某些任務(wù)快速完成,也不會在同一秒鐘內(nèi)啟動超過限制數(shù)量的任務(wù)
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"golang.org/x/time/rate"
)
func RateLimit() {
const maxJobsPerSecond = 5
const numJobs = 22
var wg sync.WaitGroup
// 計數(shù)器
var runningJobs int32 // 當前正在執(zhí)行的任務(wù)數(shù)量
var startedJobs int32 // 啟動后的任務(wù)數(shù)量
var finishedJobs int32 // 剛完成的任務(wù)數(shù)量
limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(maxJobsPerSecond)), maxJobsPerSecond)
semaphore := make(chan struct{}, maxJobsPerSecond)
for i := 1; i <= numJobs; i++ {
wg.Add(1)
go func(jobID int) {
defer wg.Done()
limiter.Wait(context.Background()) // 等待限流器允許進行下一個任務(wù)
semaphore <- struct{}{} // 獲取信號量
atomic.AddInt32(&startedJobs, 1)
atomic.AddInt32(&runningJobs, 1)
executeJob(jobID) // 執(zhí)行任務(wù)
atomic.AddInt32(&finishedJobs, 1)
atomic.AddInt32(&runningJobs, -1)
<-time.After(time.Second) // 等待一秒鐘后釋放信號量
<-semaphore
// 打印當前狀態(tài)
printStatus(&runningJobs, &startedJobs, &finishedJobs)
}(i)
}
wg.Wait()
fmt.Println("所有工作完成")
}注意事項
- 限流器rate.NewLimiter用于控制任務(wù)啟動的頻率,以確保每秒不超過maxJobsPerSecond個任務(wù)開始執(zhí)行。
- 使用信號量semaphore來控制同時進行的任務(wù)數(shù)量。
- 為了確保在任何一秒內(nèi)同時進行的任務(wù)數(shù)量不超過限制,在任務(wù)完成后等待一秒鐘,然后再釋放信號量。這樣做可以保證即使任務(wù)很快完成,也不會立即啟動新的任務(wù)。
這種實現(xiàn)方式確保了即使任務(wù)執(zhí)行得很快,每秒鐘啟動的新任務(wù)數(shù)量也不會超過限制,并且同時考慮了正在執(zhí)行和剛剛完成的任務(wù)。
動態(tài)創(chuàng)建協(xié)程
- 協(xié)程的啟動是動態(tài)的。在代碼中,每個任務(wù)對應(yīng)于一個動態(tài)創(chuàng)建的協(xié)程。這些協(xié)程是在循環(huán)中根據(jù)任務(wù)數(shù)量(numJobs)動態(tài)生成的。
- 具體來說,每當有一個新的任務(wù)需要執(zhí)行時,都會創(chuàng)建一個新的協(xié)程來處理這個任務(wù)。這是通過在main函數(shù)的循環(huán)中調(diào)用go關(guān)鍵字實現(xiàn)的。這個過程在每次循環(huán)迭代中發(fā)生,從而為每個任務(wù)動態(tài)創(chuàng)建一個新的協(xié)程
由于使用了限流器(rate.Limiter),這些協(xié)程不是一次性全部創(chuàng)建,而是根據(jù)限流器允許的速率逐個創(chuàng)建。每個協(xié)程在開始執(zhí)行任務(wù)之前會等待限流器的許可,以此確保每秒啟動的任務(wù)數(shù)量不超過設(shè)定的最大值
func executeJob(jobID int) {
startTime := time.Now() // 記錄任務(wù)開始時間
// 模擬任務(wù)執(zhí)行時間
fmt.Printf("%v Job %d started\n",time.Now().Format("2006-01-02 15:04:05.000"), jobID)
// 初始化隨機數(shù)種子
rand.Seed(time.Now().UnixNano())
// 隨機生成一個時間間隔(例如,1到5000毫秒之間)
min := 1
max := 5000
duration := time.Duration(rand.Intn(max-min+1)+min) * time.Millisecond
time.Sleep(duration)
durationCost := time.Since(startTime) // 計算任務(wù)耗時
fmt.Printf("%v Job %d finished Cost:%v\n", time.Now().Format("2006-01-02 15:04:05.000"),jobID, durationCost)
}
func printStatus(runningJobs, startedJobs, finishedJobs *int32) {
fmt.Printf("Current status - Running: %d, Started: %d, Finished: %d\n",
atomic.LoadInt32(runningJobs),
atomic.LoadInt32(startedJobs),
atomic.LoadInt32(finishedJobs))
}可以在代碼中添加額外的邏輯來跟蹤和打印正在執(zhí)行、進行中、剛啟動和剛完成的任務(wù)數(shù)量。使用原子操作(來自sync/atomic包)來確保在并發(fā)環(huán)境下對這些計數(shù)器的操作是安全的。
在這個示例中:
使用sync/atomic包中的AddInt32和LoadInt32來安全地增加和讀取計數(shù)器的值。
在每個任務(wù)開始時,增加startedJobs和runningJobs計數(shù)器。
在每個任務(wù)完成時,增加finishedJobs計數(shù)器,并減少runningJobs計數(shù)器。
在任務(wù)完成后和釋放信號量前,打印當前的任務(wù)狀態(tài)。
注意事項
這種方法可以幫助我們跟蹤不同狀態(tài)下的任務(wù)數(shù)量。
使用原子操作確保在并發(fā)環(huán)境中對計數(shù)器的讀寫是安全的。
printStatus函數(shù)在每個任務(wù)的結(jié)束時被調(diào)用,以打印當前的任務(wù)狀態(tài)
以上就是Golang根據(jù)job數(shù)量動態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法詳解的詳細內(nèi)容,更多關(guān)于Golang job控制協(xié)程創(chuàng)建數(shù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言學習之將mp4通過rtmp推送流媒體服務(wù)的實現(xiàn)方法
對音視頻一直是小白,決定沉下心來,好好研究一下音視頻知識,下面這篇文章主要給大家介紹了關(guān)于Go語言學習之將mp4通過rtmp推送流媒體服務(wù)的實現(xiàn)方法,需要的朋友可以參考下2022-12-12
Go語言數(shù)據(jù)結(jié)構(gòu)之希爾排序示例詳解
這篇文章主要為大家介紹了Go語言數(shù)據(jù)結(jié)構(gòu)之希爾排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08
go?sync包中的互斥鎖Mutex和等待組WaitGroup使用詳解
這篇文章主要為大家介紹了go?sync包中的互斥鎖Mutex和等待組WaitGroup使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08
Go語言使用net/http構(gòu)建一個RESTful?API的示例代碼
Go的標準庫net/http提供了構(gòu)建Web服務(wù)所需的強大功能,雖然眾多第三方框架(如?Gin、Echo)已經(jīng)封裝了很多功能,但如果你真正想理解?Go的底層Web服務(wù)機制,那么使用net/http實現(xiàn)一個原生的RESTful?API?是最好的入門方式,需要的朋友可以參考下本文2025-08-08
Golang實現(xiàn)程序優(yōu)雅退出的方法詳解
項目開發(fā)過程中,隨著需求的迭代,代碼的發(fā)布會頻繁進行,在發(fā)布過程中,Golang如何讓程序做到優(yōu)雅的退出?本文就來詳細為大家講講2022-06-06
go程序中同一個包下為什么會存在多個同名的函數(shù)或變量(詳細解析)
這篇文章主要介紹了go程序中同一個包下為什么會存在多個同名的函數(shù)或變量(詳細解析),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2024-05-05

