詳解如何用Golang處理每分鐘100萬個(gè)請(qǐng)求
面臨的問題
在我設(shè)計(jì)一個(gè)分析系統(tǒng)中,我們公司的目標(biāo)是能夠處理來自數(shù)百萬個(gè)端點(diǎn)的大量POST請(qǐng)求。web 網(wǎng)絡(luò)處理程序?qū)⑹盏揭粋€(gè)JSON文檔,其中可能包含許多有效載荷的集合,需要寫入Amazon S3,以便我們的地圖還原系統(tǒng)隨后對(duì)這些數(shù)據(jù)進(jìn)行操作。
傳統(tǒng)上,我們會(huì)研究創(chuàng)建一個(gè)工人層架構(gòu),利用諸如以下東西:
- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- 還有等等其他的技術(shù)手段...
并設(shè)置 2 個(gè)不同的集群,一個(gè)用于 Web 前端,另一個(gè)用于 worker 處理進(jìn)程,這樣我們就可以擴(kuò)大我們可以處理的后臺(tái)工作量。
但從一開始,我們的團(tuán)隊(duì)就知道我們應(yīng)該在 Go 中這樣做,因?yàn)樵谟懻撾A段我們看到這可能是一個(gè)非常大的流量系統(tǒng)。 我使用 Go 已有大約 2 年左右的時(shí)間,我們公司在處理業(yè)務(wù)時(shí)開發(fā)了一些系統(tǒng),但沒有一個(gè)能承受如此大的負(fù)載。以下是優(yōu)化的過程。
我們首先創(chuàng)建一些結(jié)構(gòu)體來定義我們將通過 POST 調(diào)用接收的 Web 請(qǐng)求負(fù)載,以及一種將其上傳到我們的 S3 存儲(chǔ)桶的方法。代碼如下:
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// ...負(fù)載字段
}
func (p *Payload) UploadToS3() error {
// storageFolder 方法確保在我們?cè)阪I名中獲得相同時(shí)間戳?xí)r不會(huì)發(fā)生名稱沖突
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// 我們發(fā)布到 S3 存儲(chǔ)桶的所有內(nèi)容都應(yīng)標(biāo)記為“私有”
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}使用 Go 協(xié)程
最初我們采用了一個(gè)非常簡(jiǎn)單的 POST 處理程序?qū)崿F(xiàn),只是試圖將job 處理程序并行化到一個(gè)簡(jiǎn)單的 goroutine 中:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 將body讀入字符串進(jìn)行json解碼
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// 分別檢查每個(gè)有效負(fù)載和隊(duì)列項(xiàng)目以發(fā)布到 S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- 這是不建議的做法。這里是最開始的做法。
}
w.WriteHeader(http.StatusOK)
}對(duì)于中等負(fù)載,這可能適用于大多數(shù)公司的流量,但很快證明這在大規(guī)模情況下效果不佳。 我們期望有很多請(qǐng)求,但沒有達(dá)到我們將第一個(gè)版本部署到生產(chǎn)環(huán)境時(shí)開始看到的數(shù)量級(jí)。 我們完全低估了流量。
上面的方法在幾個(gè)不同的方面是不好的。 無法控制我們生成了多少個(gè) go routines。 由于我們每分鐘收到 100 萬個(gè) POST 請(qǐng)求,因此這段代碼很快崩潰了。
進(jìn)一步優(yōu)化
我們需要找到一種不同的方式。 從一開始我們就開始討論我們需要如何保持請(qǐng)求處理程序的生命周期非常短,并在后臺(tái)進(jìn)行生成處理。 當(dāng)然,這是你在使用 Ruby on Rails 時(shí)必須做的,否則你將阻止所有可用的 worker web 處理器,無論你使用的是 puma、unicorn 還是 passenger(請(qǐng)不要進(jìn)入 JRuby 討論)。 然后我們需要利用常見的解決方案來做到這一點(diǎn),例如 Resque、Sidekiq、SQS 等等,有很多方法可以實(shí)現(xiàn)這一點(diǎn)。
所以第二次迭代是創(chuàng)建一個(gè)緩沖通道,我們可以創(chuàng)建一些隊(duì)列,然后把 job push到隊(duì)列并將它們上傳到 S3,并且由于我們可以控制job 隊(duì)列中的最大數(shù)數(shù)量并且我們有足夠的內(nèi)存來處理隊(duì)列中的 job。在這個(gè)方案中,我們認(rèn)為只需要在通道隊(duì)列中緩沖需要處理的 job 就可以了。
代碼如下:
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// 分別檢查每個(gè)有效負(fù)載和隊(duì)列項(xiàng)目以發(fā)布到 S3
for _, payload := range content.Payloads {
Queue <- payload // <----- 這是建議的做法。
}
...
}然后為了實(shí)際出列作業(yè)并處理它們,我們使用了類似的東西:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- 這里雖然優(yōu)化了,但還不是最好的。
}
}
}在上面的代碼中,我們用一個(gè)緩沖隊(duì)列來交換有缺陷的并發(fā)性,而緩沖隊(duì)列只是推遲了問題。 我們的同步處理器一次只將一個(gè)有效負(fù)載上傳到 S3,并且由于傳入請(qǐng)求的速率遠(yuǎn)遠(yuǎn)大于單個(gè)處理器上傳到 S3 的能力,我們的 job 緩沖通道很快達(dá)到了極限并阻止了請(qǐng)求處理程序的能力,隊(duì)列很快就阻塞滿了。
我們只是在避免這個(gè)問題,并開始倒計(jì)時(shí),直到我們的系統(tǒng)最終死亡。 在我們部署這個(gè)有缺陷的版本后,我們的延遲率在幾分鐘內(nèi)以恒定的速度持續(xù)增加。以下是延遲率增長(zhǎng)圖:

更好的解決方案
我們決定在使用 Go 通道時(shí)使用一種通用模式,以創(chuàng)建一個(gè) 2 層通道系統(tǒng),一個(gè)用于 Job 隊(duì)列,另一個(gè)用于控制同時(shí)在 Job 隊(duì)列上操作的 Worker 的數(shù)量。
這個(gè)想法是將上傳到 S3 的數(shù)據(jù)并行化到某種程度上可持續(xù)的速度,這種速度既不會(huì)削弱機(jī)器也不會(huì)開始從 S3 生成連接錯(cuò)誤。 所以我們選擇創(chuàng)建 Job/Worker 模式。 對(duì)于那些熟悉 Java、C# 等的人來說,可以將其視為 Golang 使用通道實(shí)現(xiàn) Worker 線程池的方式。
代碼如下:
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job 表示要運(yùn)行的作業(yè)
type Job struct {
Payload Payload
}
// 我們可以在 Job 隊(duì)列上發(fā)送工作請(qǐng)求的緩沖通道。
var JobQueue chan Job
// Worker 代表執(zhí)行作業(yè)的 Worker。
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start 方法為 Worker 啟動(dòng)循環(huán)監(jiān)聽。監(jiān)聽退出信號(hào)以防我們需要停止它。
func (w Worker) Start() {
go func() {
for {
// 將當(dāng)前 woker 注冊(cè)到工作隊(duì)列中。
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 接收 work 請(qǐng)求。
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// 接收一個(gè)退出的信號(hào)。
return
}
}
}()
}
// 將退出信號(hào)傳遞給 Worker 進(jìn)程以停止處理清理。
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}我們已經(jīng)修改了我們的 Web 請(qǐng)求處理程序,以創(chuàng)建一個(gè)帶有有效負(fù)載的 Job 結(jié)構(gòu)實(shí)例,并將其發(fā)送到 JobQueue 通道以供 Worker 提取。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 將body讀入字符串進(jìn)行json解碼
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// 分別檢查每個(gè)有效負(fù)載和隊(duì)列項(xiàng)目以發(fā)布到 S3
for _, payload := range content.Payloads {
// 創(chuàng)建一個(gè)有效負(fù)載的job
work := Job{Payload: payload}
// 將 work push 到隊(duì)列。
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}在我們的 Web 服務(wù)器初始化期間,我們創(chuàng)建一個(gè) Dispatcher 調(diào)度器并調(diào)用 Run() 來創(chuàng)建 Woker 工作池并開始偵聽將出現(xiàn)在 Job 隊(duì)列中的 Job。
dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
下面是我們的調(diào)度程序?qū)崿F(xiàn)的代碼:
type Dispatcher struct {
// 通過調(diào)度器注冊(cè)一個(gè) Worker 通道池
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// 啟動(dòng)指定數(shù)量的 Worker
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// 接收一個(gè) job 請(qǐng)求
go func(job Job) {
// 嘗試獲取可用的 worker job 通道
// 這將阻塞 worker 直到空閑
jobChannel := <-d.WorkerPool
// 調(diào)度一個(gè) job 到 worker job 通道
jobChannel <- job
}(job)
}
}
}請(qǐng)注意,我們提供了要實(shí)例化并添加到我們的 Worker 池中的最大worker 數(shù)量。 由于我們?cè)谶@個(gè)項(xiàng)目中使用了 Amazon Elasticbeanstalk 和 dockerized Go 環(huán)境,因此我們從環(huán)境變量中讀取這些值。 這樣我們就可以控制 Job 隊(duì)列的數(shù)量和最大大小,因此我們可以快速調(diào)整這些值而無需重新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)在我們部署它之后,我們立即看到我們所有的延遲率都下降到極低的延遲,并且我們處理請(qǐng)求的能力急劇上升。以下是流量截圖:

在我們的彈性負(fù)載均衡器完全預(yù)熱幾分鐘后,我們看到我們的 ElasticBeanstalk 應(yīng)用程序每分鐘處理近 100 萬個(gè)請(qǐng)求。 我們通常在早上有幾個(gè)小時(shí)的流量會(huì)飆升至每分鐘超過一百萬。
一旦我們部署了新代碼,服務(wù)器數(shù)量就從 100 臺(tái)服務(wù)器大幅下降到大約 20 臺(tái)服務(wù)器。以下是服務(wù)器數(shù)量變化截圖:

在正確配置集群和自動(dòng)縮放設(shè)置后,我們能夠?qū)⑵溥M(jìn)一步降低到僅 4x EC2 c4.Large 實(shí)例,并且如果 CPU 使用率超過 90% 持續(xù) 5 天,Elastic Auto-Scaling 將生成一個(gè)新實(shí)例 分鐘值。以下是截圖:

總結(jié)
可以看出利用 Elasticbeanstalk 自動(dòng)縮放的強(qiáng)大功能以及 Golang 提供的開箱即用的高效和簡(jiǎn)單的并發(fā)方法,就可以構(gòu)建出一個(gè)高性能的處理程序。
以上就是詳解如何用Golang處理每分鐘100萬個(gè)請(qǐng)求的詳細(xì)內(nèi)容,更多關(guān)于Golang處理請(qǐng)求的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang如何調(diào)用windows下的dll動(dòng)態(tài)庫中的函數(shù)
這篇文章主要介紹了Golang如何調(diào)用windows下的dll動(dòng)態(tài)庫中的函數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-05-05
在 Golang 中實(shí)現(xiàn) Cache::remember 方法詳解
這篇文章主要介紹了在 Golang 中實(shí)現(xiàn) Cache::remember 方法詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
go local history本地歷史恢復(fù)代碼神器
這篇文章主要為大家介紹了go local history本地歷史恢復(fù)代碼神器的使用功能詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
go語言處理TCP拆包/粘包的具體實(shí)現(xiàn)
TCP的拆包/粘包也算是網(wǎng)絡(luò)編程中一個(gè)比較基礎(chǔ)的問題了,本文主要介紹了go語言處理TCP拆包/粘包,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
golang實(shí)現(xiàn)并發(fā)數(shù)控制的方法
下面小編就為大家分享一篇golang實(shí)現(xiàn)并發(fā)數(shù)控制的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2017-12-12
以alpine作為基礎(chǔ)鏡像構(gòu)建Golang可執(zhí)行程序操作
這篇文章主要介紹了以alpine作為基礎(chǔ)鏡像構(gòu)建Golang可執(zhí)行程序操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-12-12

