淺談Go語言高并發(fā)處理思路
Go語言作為新興的語言,最近發(fā)展勢頭很是迅猛,其最大的特點(diǎn)就是原生支持并發(fā)。它使用的是“協(xié)程(goroutine)模型”,和傳統(tǒng)基于 OS 線程和進(jìn)程實(shí)現(xiàn)不同,Go語言的并發(fā)是基于用戶態(tài)的并發(fā),這種并發(fā)方式就變得非常輕量,能夠輕松運(yùn)行幾萬并發(fā)邏輯。Go 的并發(fā)屬于 CSP 并發(fā)模型的一種實(shí)現(xiàn),CSP 并發(fā)模型的核心概念是:“不要通過共享內(nèi)存來通信,而應(yīng)該通
過通信來共享內(nèi)存”。這在 Go 語言中的實(shí)現(xiàn)就是 Goroutine 和 Channel。
1、場景描述
在一些場景下,有大規(guī)模請求(十萬或百萬級qps),我們處理的請求可能不需要立馬知道結(jié)果,例如數(shù)據(jù)的打點(diǎn),文件的上傳等等。這時候我們需要異步化處理。常用的方法有使用resque、MQ、RabbitMQ等。這里我們在Golang語言里進(jìn)行設(shè)計實(shí)踐。
2、方案演進(jìn)
2.1、直接使用goroutine
在Go語言原生并發(fā)的支持下,我們可以直接使用一個goroutine(如下方式)去并行處理這個請求。但是,這種方法明顯有些不好的地方,我們沒法控制goroutine產(chǎn)生數(shù)量,如果處理程序稍微耗時,在單機(jī)萬級十萬級qps請求下,goroutine大規(guī)模爆發(fā),內(nèi)存暴漲,處理效率會很快下降甚至引發(fā)程序崩潰。
2.2、緩沖隊(duì)列
緩沖隊(duì)列一定程度上了提高了并發(fā),但也是治標(biāo)不治本,大規(guī)模并發(fā)只是推遲了問題的發(fā)生時間。當(dāng)請求速度遠(yuǎn)大于隊(duì)列的處理速度時,緩沖區(qū)很快被打滿,后面的請求一樣被堵塞了。
2.3 隊(duì)列+工作池
只用緩沖隊(duì)列不能解決根本問題,這時候我們可以參考一下線程池的概念,定一個工作池(協(xié)程池),來限定最大goroutine數(shù)目。每次來新的job時,從工作池里取出一個可用的worker來執(zhí)行job。這樣一來即保障了goroutine的可控性,也盡可能大的提高了并發(fā)處理能力。

3、代碼實(shí)現(xiàn)思路:
首先,我們定義一個job的接口, 具體內(nèi)容由具體job實(shí)現(xiàn);
// --------------------------- Job ---------------------
type Job interface {
Do()
}然后定義一下job隊(duì)列和work池類型,這里我們work池也用golang的channel實(shí)現(xiàn)。
type JobQueue chan Job
// --------------------------- Worker ---------------------
type Worker struct {
JobChan JobQueue //每一個worker對象具有JobQueue(隊(duì)列)屬性。
}// --------------------------- WorkerPool ---------------------
type WorkerPool struct { //線程池:
Workerlen int //線程池的大小
JobQueue JobQueue //Job隊(duì)列,接收外部的數(shù)據(jù)
WorkerQueue chan JobQueue //worker隊(duì)列:處理任務(wù)的Go程隊(duì)列
}4、完整代碼
package main
import (
"fmt"
"runtime"
"time"
)
//定義一個實(shí)現(xiàn)Job接口的數(shù)據(jù)
type Score struct {
Num int
}
//定義對數(shù)據(jù)的處理
func (s *Score) Do() {
fmt.Println("num:", s.Num)
time.Sleep(500*time.Millisecond) //模擬執(zhí)行的耗時任務(wù)
}
func main() {
num := 100 * 100 * 2 //開啟 2萬個線程
// debug.SetMaxThreads(num + 1000) //設(shè)置最大線程數(shù)
// 注冊工作池,傳入任務(wù)
// 參數(shù)1 worker并發(fā)個數(shù)
p := NewWorkerPool(num)
p.Run()
//寫入一千萬條數(shù)據(jù)
dataNum := 100 * 100* 100* 10
go func() {
for i := 1; i <= dataNum; i++ {
sc := &Score{Num: i}
p.JobQueue <- sc //數(shù)據(jù)傳進(jìn)去會被自動執(zhí)行Do()方法,具體對數(shù)據(jù)的處理自己在Do()方法中定義
}
}()
//循環(huán)打印輸出當(dāng)前進(jìn)程的Goroutine 個數(shù)
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(5 * time.Second)
}
}
// --------------------------- Job ---------------------
type Job interface {
Do()
}
type JobQueue chan Job
// --------------------------- Worker ---------------------
type Worker struct {
JobChan JobQueue //每一個worker對象具有JobQueue(隊(duì)列)屬性。
}
func NewWorker() Worker {
return Worker{JobChan: make(chan Job)}
}
//啟動參與程序運(yùn)行的Go程數(shù)量
func (w Worker) Run(wq chan JobQueue) {
go func() {
for {
wq <- w.JobChan //處理任務(wù)的Go程隊(duì)列數(shù)量有限,每運(yùn)行1個,向隊(duì)列中添加1個,隊(duì)列剩余數(shù)量少1個 (JobChain入隊(duì)列)
select {
case job := <-w.JobChan:
//fmt.Println("xxx2:",w.JobChan)
job.Do() //執(zhí)行操作
}
}
}()
}
// --------------------------- WorkerPool ---------------------
type WorkerPool struct { //線程池:
Workerlen int //線程池的大小
JobQueue JobQueue //Job隊(duì)列,接收外部的數(shù)據(jù)
WorkerQueue chan JobQueue //worker隊(duì)列:處理任務(wù)的Go程隊(duì)列
}
func NewWorkerPool(workerlen int) *WorkerPool {
return &WorkerPool{
Workerlen: workerlen,
JobQueue: make(JobQueue),
WorkerQueue: make(chan JobQueue, workerlen),
}
}
func (wp *WorkerPool) Run() {
fmt.Println("初始化worker")
//初始化worker(多個Go程)
for i := 0; i < wp.Workerlen; i++ {
worker := NewWorker()
worker.Run(wp.WorkerQueue) //開啟每一個Go程
}
// 循環(huán)獲取可用的worker,往worker中寫job
go func() {
for {
select {
//將JobQueue中的數(shù)據(jù)存入WorkerQueue
case job := <-wp.JobQueue: //線程池中有需要待處理的任務(wù)(數(shù)據(jù)來自于請求的任務(wù)) :讀取JobQueue中的內(nèi)容
worker := <-wp.WorkerQueue //隊(duì)列中有空閑的Go程 :讀取WorkerQueue中的內(nèi)容,類型為:JobQueue
worker <- job //空閑的Go程執(zhí)行任務(wù) :整個job入隊(duì)列(channel) 類型為:傳遞的參數(shù)(Score結(jié)構(gòu)體)
//fmt.Println("xxx1:",worker)
//fmt.Printf("====%T ; %T======\n",job,worker,)
}
}
}()
}運(yùn)行效果:

5、資源消耗
5.1 CPU消耗對比


5.2 內(nèi)存消耗對比


6、代碼分析
核心代碼:
思考:臨時變量 worker是channel,沒有讀操作,只有寫操作。為什么沒有發(fā)生死鎖現(xiàn)象?
select {
case job := <-wp.JobQueue:
worker := <-wp.WorkerQueue
worker <- job
}分別輸出臨時變量worker、w.JobChan,代碼如下:
// 循環(huán)獲取可用的worker,往worker中寫job
go func() {
for {
select {
//將JobQueue中的數(shù)據(jù)存入WorkerQueue
case job := <-wp.JobQueue: //線程池中有需要待處理的任務(wù)(數(shù)據(jù)來自于請求的任務(wù)) :讀取JobQueue中的內(nèi)容
worker := <-wp.WorkerQueue //隊(duì)列中有空閑的Go程 :讀取WorkerQueue中的內(nèi)容,類型為:JobQueue
worker <- job //空閑的Go程執(zhí)行任務(wù) :整個job入隊(duì)列(channel) 類型為:傳遞的參數(shù)(Score結(jié)構(gòu)體)
fmt.Println("臨時變量worker:",worker) //todo: 地址是什么
fmt.Printf("====job類型:%T ; worker類型%T======\n",job,worker,)
}
}
}()//啟動參與程序運(yùn)行的Go程數(shù)量
func (w Worker) Run(wq chan JobQueue) {
go func() {
for {
wq <- w.JobChan //處理任務(wù)的Go程隊(duì)列數(shù)量有限,每運(yùn)行1個,向隊(duì)列中添加1個,隊(duì)列剩余數(shù)量少1個 (JobChain入隊(duì)列)
select {
case job := <-w.JobChan:
fmt.Println("執(zhí)行處理任務(wù)的worker:",w.JobChan) //todo: 地址是什么
job.Do() //執(zhí)行操作
}
}
}()
}
輸出效果為:
?
結(jié)果發(fā)現(xiàn)worker、w.JobChan是同一地址(指向的地址一樣)。所以 在
worker <- job :向worker中寫數(shù)據(jù);
job := <-w.JobChan: 從w.JobChan中讀數(shù)據(jù)。
由于worker、w.JobChan是對同一數(shù)據(jù)進(jìn)行操作,所以臨時變量worker不會發(fā)生死鎖現(xiàn)象。
說明:也可以將核心內(nèi)容封裝成一個庫,以后直接調(diào)用即可。
知識點(diǎn)說明:
1、 地址引用參考:http://www.dhdzp.com/jiaoben/353414oen.htm
2、對應(yīng)channel讀寫的操作(ch為chan 的類型):
ch<-數(shù)據(jù)類型 //數(shù)據(jù)寫入ch
數(shù)據(jù)類型:=<-ch //ch通道中的數(shù)據(jù)取出,存入數(shù)據(jù)類型對應(yīng)的變量。
ch1<-ch2 //其中ch1,ch2都是 chan類型(channel類型);執(zhí)行順序是從左至右,將ch2存入ch1(ch2入隊(duì)列ch1),即將ch2看做數(shù)據(jù)添加到通道ch1中。
到此這篇關(guān)于淺談Go語言高并發(fā)處理思路的文章就介紹到這了,更多相關(guān)Go語言 高并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- golang高并發(fā)之本地緩存詳解
- Golang使用Channel組建高并發(fā)HTTP服務(wù)器
- golang使用map支持高并發(fā)的方法(1000萬次操作14ms)
- golang高并發(fā)系統(tǒng)限流策略漏桶和令牌桶算法源碼剖析
- 關(guān)于golang高并發(fā)的實(shí)現(xiàn)與注意事項(xiàng)說明
- 基于Golang 高并發(fā)問題的解決方案
- golang高并發(fā)限流操作 ping / telnet
- golang-gin-mgo高并發(fā)服務(wù)器搭建教程
- golang高并發(fā)的深入理解
- 如何利用Golang寫出高并發(fā)代碼詳解
相關(guān)文章
gin自定義中間件解決requestBody不可重讀(請求體取值)
這篇文章主要介紹了gin自定義中間件解決requestBody不可重讀,確??刂破髂軌颢@取請求體值,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
golang設(shè)置http response響應(yīng)頭與填坑記錄
這篇文章主要給大家介紹了關(guān)于golang設(shè)置http response響應(yīng)頭與填坑記錄的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08
GO語言創(chuàng)建錢包并遍歷錢包(wallet)的實(shí)現(xiàn)代碼
比特幣錢包實(shí)際上是一個密鑰對,當(dāng)你安裝 一個錢包應(yīng)用,或者是使用一個比特幣客戶端來生成一個新地址是,他就會為你生成一個密鑰對,今天通過本文給大家分享go語言遍歷錢包的相關(guān)知識,一起看看吧2021-05-05
在golang xorm中使用postgresql的json,array類型的操作
這篇文章主要介紹了在golang xorm中使用postgresql的json,array類型的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04
golang使用go mod導(dǎo)入本地包和第三方包的方式
這篇文章主要介紹了golang使用go mod導(dǎo)入本地包和第三方包的方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01

