Go?實(shí)戰(zhàn)單隊(duì)列到優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)圖文示例
優(yōu)先級(jí)隊(duì)列概述
隊(duì)列,是數(shù)據(jù)結(jié)構(gòu)中實(shí)現(xiàn)先進(jìn)先出策略的一種數(shù)據(jù)結(jié)構(gòu)。而優(yōu)先隊(duì)列則是帶有優(yōu)先級(jí)的隊(duì)列,即先按優(yōu)先級(jí)分類,然后相同優(yōu)先級(jí)的再 進(jìn)行排隊(duì)。優(yōu)先級(jí)高的隊(duì)列中的元素會(huì)優(yōu)先被消費(fèi)。如下圖所示:

在Go中,可以定義一個(gè)切片,切片的每個(gè)元素代表一種優(yōu)先級(jí)隊(duì)列,切片的索引順序代表優(yōu)先級(jí)順序,后面代碼實(shí)現(xiàn)部分我們會(huì)詳細(xì)講解。
為什么需要優(yōu)先級(jí)隊(duì)列
先來(lái)看現(xiàn)實(shí)生活中的例子。銀行的辦事窗口,有普通窗口和vip窗口,vip窗口因?yàn)榕抨?duì)人數(shù)少,等待的時(shí)間就短,比普通窗口就會(huì)優(yōu)先處理。同樣,在登機(jī)口,就有貴賓通道和普通,同樣貴賓通道優(yōu)先登機(jī)。
在互聯(lián)網(wǎng)中,當(dāng)然就是請(qǐng)求和響應(yīng)。使用優(yōu)先級(jí)隊(duì)列的作用是將請(qǐng)求按特定的屬性劃分出優(yōu)先級(jí),然后按優(yōu)先級(jí)的高低進(jìn)行優(yōu)先處理。在研發(fā)服務(wù)的時(shí)候這里有個(gè)隱含的約束條件就是服務(wù)器資源(CPU、內(nèi)存、帶寬等)是有限的。如果服務(wù)器資源是無(wú)限的,那么也就不需要隊(duì)列進(jìn)行排隊(duì)了,來(lái)一個(gè)請(qǐng)求就立即處理一個(gè)請(qǐng)求就好了。所以,為了在最大限度的利用服務(wù)器資源的前提下,將更重要的任務(wù)(優(yōu)先級(jí)高的請(qǐng)求)優(yōu)先處理,以更好的服務(wù)用戶。
對(duì)于請(qǐng)求優(yōu)先級(jí)的劃分可以根據(jù)業(yè)務(wù)的特點(diǎn)根據(jù)價(jià)值高的優(yōu)先原則來(lái)進(jìn)行劃分即可。例如可以根據(jù)是否是否是會(huì)員、是否是VIP會(huì)員等屬性進(jìn)行劃分優(yōu)先級(jí)。也可以根據(jù)是否是付費(fèi)用戶進(jìn)行劃分。在博客的業(yè)務(wù)中,也可以根據(jù)是否是大V的屬性進(jìn)行優(yōu)先級(jí)劃分。在互聯(lián)網(wǎng)廣告業(yè)務(wù)中,可以根據(jù)廣告位資源價(jià)值高低來(lái)劃分優(yōu)先級(jí)。
優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)原理
01 四個(gè)角色
在完整的優(yōu)先級(jí)隊(duì)列中有四個(gè)角色,分別是優(yōu)先級(jí)隊(duì)列、工作單元、消費(fèi)者worker、通知channel。
工作單元Job:隊(duì)列里的元素。我們把每一次業(yè)務(wù)處理都封裝成一個(gè)工作單元,該工作單元會(huì)進(jìn)入對(duì)應(yīng)的優(yōu)先級(jí)隊(duì)列進(jìn)行排隊(duì),然后等待消費(fèi)者worker來(lái)消費(fèi)執(zhí)行。優(yōu)先級(jí)隊(duì)列:按優(yōu)先級(jí)劃分的隊(duì)列,用來(lái)暫存對(duì)應(yīng)優(yōu)先級(jí)的工作單元Job,相同優(yōu)先級(jí)的工作單元會(huì)在同一個(gè)隊(duì)列里。noticeChan通道:當(dāng)有工作單元進(jìn)入優(yōu)先級(jí)隊(duì)列排隊(duì)后,會(huì)在通道里發(fā)送一個(gè)消息,以通知消費(fèi)者worker從隊(duì)列中獲取元素(工作單元)進(jìn)行消費(fèi)。消費(fèi)者worker:監(jiān)聽(tīng)noticeChan,當(dāng)監(jiān)聽(tīng)到noticeChan有消息時(shí),說(shuō)明隊(duì)列中有工作單元需要被處理,優(yōu)先從高優(yōu)先級(jí)隊(duì)列中獲取元素進(jìn)行消費(fèi)。
02 隊(duì)列-消費(fèi)者模式
根據(jù)隊(duì)列個(gè)數(shù)和消費(fèi)者個(gè)數(shù),我們可以將隊(duì)列-消費(fèi)者模式分為單隊(duì)列-單消費(fèi)者模式、多隊(duì)列(優(yōu)先級(jí)隊(duì)列)- 單消費(fèi)者模式、多隊(duì)列(優(yōu)先級(jí)隊(duì)列)- 多消費(fèi)者模式。
我們先從最簡(jiǎn)單的單隊(duì)列-單消費(fèi)者模式實(shí)現(xiàn),然后一步步演化成多隊(duì)列(優(yōu)先級(jí)隊(duì)列)-多消費(fèi)者模式。
03 單隊(duì)列-單消費(fèi)者模式實(shí)現(xiàn)

3.1 隊(duì)列的實(shí)現(xiàn)
我們先來(lái)看下隊(duì)列的實(shí)現(xiàn)。這里我們用Golang中的List數(shù)據(jù)結(jié)果來(lái)實(shí)現(xiàn),List數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,包含了將元素放到鏈表尾部、將頭部元素彈出的操作,符合隊(duì)列先進(jìn)先出的特性。
好,我們看下具體的隊(duì)列的數(shù)據(jù)結(jié)構(gòu):
type JobQueue struct {
mu sync.Mutex //隊(duì)列的操作需要并發(fā)安全
jobList *list.List //List是golang庫(kù)的雙向隊(duì)列實(shí)現(xiàn),每個(gè)元素都是一個(gè)job
noticeChan chan struct{} //入隊(duì)一個(gè)job就往該channel中放入一個(gè)消息,以供消費(fèi)者消費(fèi)
}入隊(duì)操作
/**
* 隊(duì)列的Push操作
*/
func (queue *JobQueue) PushJob(job Job) {
queue.jobList.PushBack(job) //將job加到隊(duì)尾
queue.noticeChan <- struct{}{}
}到這里有同學(xué)就會(huì)問(wèn)了,為什么不直接將job推送到Channel中,然后讓消費(fèi)者依次消費(fèi)不就行了么?是的,單隊(duì)列這樣是可以的,因?yàn)槲覀冏罱K目標(biāo)是為了實(shí)現(xiàn)優(yōu)先級(jí)的多隊(duì)列,所以這里即使是單隊(duì)列,我們也使用List數(shù)據(jù)結(jié)構(gòu),以便后續(xù)的演變。
還有一點(diǎn),大家注意到了,這里入隊(duì)操作時(shí)有一個(gè) 這樣的操作:
queue.noticeChan <- struct{}{}消費(fèi)者監(jiān)聽(tīng)的實(shí)際上不是隊(duì)列本身,而是通道noticeChan。當(dāng)有一個(gè)元素入隊(duì)時(shí),就往noticeChan通道中輸入一條消息,這里是一個(gè)空結(jié)構(gòu)體,主要作用就是通知消費(fèi)者worker,隊(duì)列里有要處理的元素了,可以從隊(duì)列中獲取了。 這個(gè)在后面演化成多隊(duì)列以及多消費(fèi)者模式時(shí)會(huì)很有用。
出隊(duì)操作
根據(jù)隊(duì)列的先進(jìn)先出原則,是要獲取隊(duì)列的最先進(jìn)入的元素。Golang中List結(jié)構(gòu)體的Front()函數(shù)是獲取鏈表的第一個(gè)元素,然后通過(guò)Remove函數(shù)將該元素從鏈表中移出,即得到了隊(duì)列中的第一個(gè)元素。這里的Job結(jié)構(gòu)體先不用關(guān)心,我們后面實(shí)現(xiàn)工作單元Job時(shí),會(huì)詳細(xì)講解。
/**
* 彈出隊(duì)列的第一個(gè)元素
*/
func (queue *JobQueue) PopJob() Job {
queue.mu.Lock()
defer queue.mu.Unlock()
/**
* 說(shuō)明在隊(duì)列中沒(méi)有元素了
*/
if queue.jobList.Len() == 0 {
return nil
}
elements := queue.jobList.Front() //獲取隊(duì)里的第一個(gè)元素
return queue.jobList.Remove(elements).(Job) //將元素從隊(duì)列中移除并返回
}等待通知操作
上面我們提到,消費(fèi)者監(jiān)聽(tīng)的是noticeChan通道。當(dāng)有元素入隊(duì)時(shí),會(huì)往noticeChan中輸入一條消息,以便通知消費(fèi)者進(jìn)行消費(fèi)。如果隊(duì)列中沒(méi)有要消費(fèi)的元素,那么消費(fèi)者就會(huì)阻塞在該通道上。
func (queue *JobQueue) WaitJob() <-chan struct{} {
return queue.noticeChan
}3.2 工作單元--Job的實(shí)現(xiàn)
一個(gè)工作單元就是一個(gè)要執(zhí)行的任務(wù)。在系統(tǒng)中往往需要執(zhí)行不同的任務(wù),就是需要有不同類型的工作單元,但這些工作單元都有一組共同的執(zhí)行流程。我們看下工作單元的類圖。

圖-job類圖
我們看下類圖中的幾個(gè)角色:
- Job接口:定義了所有Job要實(shí)現(xiàn)的方法。
- BaseJob類(結(jié)構(gòu)體):定義了具體Job的基類。因?yàn)榫唧wJob類中的有共同的屬性和方法。所以抽象出一個(gè)基類,避免重復(fù)實(shí)現(xiàn)。但該基類對(duì)Execute方法沒(méi)有實(shí)現(xiàn),因?yàn)椴煌墓ぷ鲉卧芯唧w的執(zhí)行邏輯。
- SquareJob和AreaJob類(結(jié)構(gòu)體):是我們要具體實(shí)現(xiàn)的業(yè)務(wù)工作Job。主要是實(shí)現(xiàn)Execute的具體執(zhí)行邏輯。根據(jù)業(yè)務(wù)的需要定義自己的工作Job和對(duì)應(yīng)的Execute方法即可。
接下來(lái),我們以計(jì)算一個(gè)int類型數(shù)字的平方的SquareJob為例來(lái)看下具體的實(shí)現(xiàn)。
- BaseJob結(jié)構(gòu)體
首先看下該結(jié)構(gòu)體的定義
type BaseJob struct {
Err error
DoneChan chan struct{} //當(dāng)作業(yè)完成時(shí),或者作業(yè)被取消時(shí),通知調(diào)用者
Ctx context.Context
cancelFunc context.CancelFunc
}在該結(jié)構(gòu)體中,我們主要關(guān)注DoneChan字段就行,該字段是當(dāng)具體的Job的Execute執(zhí)行完成后,來(lái)通知調(diào)用者的。
再來(lái)看Done函數(shù),該函數(shù)就是在Execute函數(shù)完成后,要關(guān)閉DoneChan通道,以解除Job的阻塞而繼續(xù)執(zhí)行其他邏輯。
/**
* 作業(yè)執(zhí)行完畢,關(guān)閉DoneChan,所有監(jiān)聽(tīng)DoneChan的接收者都能收到關(guān)閉的信號(hào)
*/
func (job *BaseJob) Done() {
close(job.DoneChan)
}再來(lái)看WaitDone函數(shù),該函數(shù)是當(dāng)Job執(zhí)行后,要等待Job執(zhí)行完成,在未完成之前,DoneChan里沒(méi)有消息,通過(guò)該函數(shù)就能將job阻塞,直到Execute中調(diào)用了Done(),以便解除阻塞。
/**
* 等待job執(zhí)行完成
*/
func (job *BaseJob) WaitDone() {
select {
case <-job.DoneChan:
return
}
}SquareJob結(jié)構(gòu)體
type SquareJob struct {
*BaseJob
x int
}從結(jié)構(gòu)體的定義中可知,SquareJob嵌套了BaseJob,所以該結(jié)構(gòu)體擁有BaseJob的所有字段和方法。在該結(jié)構(gòu)體主要實(shí)現(xiàn)了Execute的邏輯:對(duì)x求平方。
func (s *SquareJob) Execute() error {
result := s.x * s.x
fmt.Println("the result is ", result)
return nil
}3.3 消費(fèi)者Worker的實(shí)現(xiàn)
Worker主要功能是通過(guò)監(jiān)聽(tīng)隊(duì)列里的noticeChan是否有需要處理的元素,如果有元素的話從隊(duì)列里獲取到要處理的元素job,然后執(zhí)行job的Execute方法。
我們將該結(jié)構(gòu)體定位為WorkerManager,因?yàn)樵诤竺嫖覀冎v解多Worker模式時(shí),會(huì)需要一個(gè)Worker的管理者,因此定義成了WorkerManager。
type WorkerManager struct {
queue *JobQueue
closeChan chan struct{}
}StartWorker函數(shù),只有一個(gè)for循環(huán),不斷的從隊(duì)列中獲取Job。獲取到Job后,進(jìn)行消費(fèi)Job,即ConsumeJob。
func (m *WorkerManager) StartWork() error {
fmt.Println("Start to Work")
for {
select {
case <-m.closeChan:
return nil
case <-m.queue.noticeChan:
job := m.queue.PopJob()
m.ConsumeJob(job)
}
}
return nil
}
func (m *WorkerManager) ConsumeJob(job Job) {
defer func() {
job.Done()
}()
job.Execute()
}到這里,單隊(duì)列-單消費(fèi)者模式中各角色的實(shí)現(xiàn)就講解完了。我們通過(guò)main函數(shù)將其關(guān)聯(lián)起來(lái)。
func main() {
//初始化一個(gè)隊(duì)列
queue := &JobQueue{
jobList: list.New(),
noticeChan: make(chan struct{}, 10),
}
//初始化一個(gè)消費(fèi)worker
workerManger := NewWorkerManager(queue)
// worker開(kāi)始監(jiān)聽(tīng)隊(duì)列
go workerManger.StartWork()
// 構(gòu)造SquareJob
job := &SquareJob{
BaseJob: &BaseJob{
DoneChan: make(chan struct{}, 1),
},
x: 5,
}
//壓入隊(duì)列尾部
queue.PushJob(job)
//等待job執(zhí)行完成
job.WaitDone()
print("The End")
}04 多隊(duì)列-單消費(fèi)者模式
有了單隊(duì)列-單消費(fèi)者的基礎(chǔ),我們?nèi)绾螌?shí)現(xiàn)多隊(duì)列-單消費(fèi)者模式。也就是優(yōu)先級(jí)隊(duì)列。

優(yōu)先級(jí)的隊(duì)列,實(shí)質(zhì)上就是根據(jù)工作單元Job的優(yōu)先級(jí)屬性,將其放到對(duì)應(yīng)的優(yōu)先級(jí)隊(duì)列中,以便worker可以根據(jù)優(yōu)先級(jí)進(jìn)行消費(fèi)。我們要在Job結(jié)構(gòu)體中增加一個(gè)Priority屬性。因?yàn)樵搶傩允撬蠮ob都共有的,因此定義在BaseJob上更合適.
type BaseJob struct {
Err error
DoneChan chan struct{} //當(dāng)作業(yè)完成時(shí),或者作業(yè)被取消時(shí),通知調(diào)用者
Ctx context.Context
cancelFunc context.CancelFunc
priority int //工作單元的優(yōu)先級(jí)
}我們?cè)賮?lái)看看多隊(duì)列如何實(shí)現(xiàn)。實(shí)際上就是用一個(gè)切片來(lái)存儲(chǔ)各個(gè)隊(duì)列,切片的每個(gè)元素存儲(chǔ)一個(gè)JobQueue隊(duì)列元素即可。
var queues = make([]*JobQueue, 10, 100)
那各優(yōu)先級(jí)的隊(duì)列在切片中是如何存儲(chǔ)的呢?切片索引順序只代表優(yōu)先級(jí)的高于低,不代表具體是哪個(gè)優(yōu)先級(jí)。
什么意思呢?假設(shè)我們現(xiàn)在對(duì)目前的工作單元定義了1、4、7三個(gè)優(yōu)先級(jí)。這3個(gè)優(yōu)先級(jí)在切片中是按優(yōu)先級(jí)從小到到依次存儲(chǔ)在queues切片中的,如下圖:

圖-正確的切片存儲(chǔ)的優(yōu)先級(jí)
那為什么不讓切片的索引就代表優(yōu)先級(jí),讓優(yōu)先級(jí)為1的隊(duì)列存儲(chǔ)在索引1處,優(yōu)先級(jí)4的隊(duì)列存儲(chǔ)在索引4處,優(yōu)先級(jí)7的隊(duì)列存儲(chǔ)在索引7處呢?如果這樣存儲(chǔ)的話,就會(huì)變成如下這樣:

圖4-直接使用索引作為優(yōu)先級(jí)缺點(diǎn)
可見(jiàn)如果我們?cè)O(shè)定的優(yōu)先級(jí)不是連續(xù)的,那么就會(huì)造成空間的浪費(fèi)。所以,我們是將隊(duì)列按優(yōu)先級(jí)高低依次存放到了切片中。
那既然這樣,當(dāng)一個(gè)優(yōu)先級(jí)的job來(lái)了之后,我該怎么知道該優(yōu)先級(jí)的隊(duì)列是存儲(chǔ)在哪個(gè)索引中呢?我們用一個(gè)map來(lái)映射優(yōu)先級(jí)和切片索引之間的關(guān)系。這樣當(dāng)一個(gè)工作單元Job入隊(duì)的時(shí)候,以優(yōu)先級(jí)為key,就可以查找到對(duì)應(yīng)優(yōu)先級(jí)的隊(duì)列存儲(chǔ)在切片的哪個(gè)位置了。如下圖所示:

圖-優(yōu)先級(jí)和索引映射
代碼定義:
var priorityIdx map[int][int]//該map的key是優(yōu)先級(jí),value代表的是queues切片的索引
好了,我們重新定義一下隊(duì)列的結(jié)構(gòu)體:
type PriorityQueue struct {
mu sync.Mutex
noticeChan chan struct{}
queues []*JobQueue
priorityIdx map[int]int
}
//原來(lái)的JobQueue會(huì)變成如下這樣:
type JobQueue struct {
priority int //代表該隊(duì)列是哪種優(yōu)先級(jí)的隊(duì)列
jobList *list.List //List是golang庫(kù)的雙向隊(duì)列實(shí)現(xiàn),每個(gè)元素都是一個(gè)job
}這里我們注意到有以下幾個(gè)變化:
JobQueue里多了一個(gè)Priority屬性,代表該隊(duì)列是哪個(gè)優(yōu)先級(jí)別。noticeChan屬性從JobQueue中移動(dòng)到了PriorityQueue中。因?yàn)楝F(xiàn)在有多個(gè)隊(duì)列,只要任意一個(gè)隊(duì)列里有元素就需要通知消費(fèi)者worker進(jìn)行消費(fèi),因此消費(fèi)者worker監(jiān)聽(tīng)的是PriorityQueue中是否有元素,而在監(jiān)聽(tīng)階段不關(guān)心具體哪個(gè)優(yōu)先級(jí)隊(duì)列中有元素。
好了,數(shù)據(jù)結(jié)構(gòu)定義完了,我們看看將工作單元Job推入隊(duì)列和從隊(duì)列中彈出Job又有什么變化。
優(yōu)先級(jí)隊(duì)列的入隊(duì)操作
優(yōu)先級(jí)隊(duì)列的入隊(duì)操作,就需要根據(jù)入隊(duì)Job的優(yōu)先級(jí)屬性放到對(duì)應(yīng)的優(yōu)先級(jí)隊(duì)列中,入隊(duì)流程圖如下:

圖-優(yōu)先級(jí)隊(duì)列入隊(duì)流程
當(dāng)一個(gè)Job加入隊(duì)列的時(shí)候,有兩種場(chǎng)景,一種是該優(yōu)先級(jí)的隊(duì)列已經(jīng)存在,則直接Push到隊(duì)尾即可。一種是該優(yōu)先級(jí)的隊(duì)列還不存在,則需要先創(chuàng)建該優(yōu)先級(jí)的隊(duì)列,然后再將該工作單元Push到隊(duì)尾。如下是兩種場(chǎng)景。
隊(duì)列已經(jīng)存在的場(chǎng)景
這種場(chǎng)景會(huì)比較簡(jiǎn)單。假設(shè)我們要插入優(yōu)先級(jí)為7的工作單元,首先從映射表中查找7是否存在,發(fā)現(xiàn)對(duì)應(yīng)關(guān)系是2,則直接找到切片中索引2的元素,即優(yōu)先級(jí)為7的隊(duì)列,將job加入即可。如下圖。

圖-已存在隊(duì)列插入
隊(duì)列不存在的場(chǎng)景
這種場(chǎng)景稍微復(fù)雜些,在映射表中找不到要插入優(yōu)先級(jí)的隊(duì)列的話,則需要在切片中插入一個(gè)優(yōu)先級(jí)隊(duì)列,而為了優(yōu)先級(jí)隊(duì)列在切片中也保持有序(保持有序就可以知道隊(duì)列的優(yōu)先級(jí)的高低了),則需要移動(dòng)相關(guān)的元素。我們以插入優(yōu)先級(jí)為6的工作單元為例來(lái)講解。
1、首先,我們的隊(duì)列有一個(gè)初始化的狀態(tài),存儲(chǔ)了優(yōu)先級(jí)1、4、7的隊(duì)列。如下圖。

2、當(dāng)插入優(yōu)先級(jí)為6的工作單元時(shí),發(fā)現(xiàn)在映射表中沒(méi)有優(yōu)先級(jí)6的映射關(guān)系,說(shuō)明在切片中還沒(méi)有優(yōu)先級(jí)為6的隊(duì)列的元素。所以需要在切片中依次查找到優(yōu)先級(jí)6應(yīng)該插入的位置在4和7之間,也就是需要存儲(chǔ)在切片2的位置。

3、將原來(lái)索引2位置的優(yōu)先級(jí)為7的隊(duì)列往后移動(dòng)到3,同時(shí)更新映射表中的對(duì)應(yīng)關(guān)系。

4、將優(yōu)先級(jí)為6的工作單元插入到索引2的隊(duì)列中,同時(shí)更新映射表中的優(yōu)先級(jí)和索引的關(guān)系。

我們看下代碼實(shí)現(xiàn):
func (priorityQueue *PriorityQueue) Push(job Job) {
priorityQueue.mu.Lock()
defer priorityQueue.mu.Unlock()
//先根據(jù)job的優(yōu)先級(jí)找要入隊(duì)的隊(duì)列
var idx int
var ok bool
//從優(yōu)先級(jí)-切片索引的map中查找該優(yōu)先級(jí)的隊(duì)列是否存在
if idx, ok = priorityQueue.priorityIdx[job.Priority()]; !ok {
//如果不存在該優(yōu)先級(jí)的隊(duì)列,則需要初始化一個(gè)隊(duì)列,并返回該隊(duì)列在切片中的索引位置
idx = priorityQueue.addPriorityQueue(job.Priority)
}
//根據(jù)獲取到的切片索引idx,找到具體的隊(duì)列
queue := priority.queues[idx]
//將job推送到隊(duì)列的隊(duì)尾
queue.JobList.PushBack(job)
//隊(duì)列job個(gè)數(shù)+1
priorityQueue.Size++
//如果隊(duì)列job個(gè)數(shù)超過(guò)隊(duì)列的最大容量,則從優(yōu)先級(jí)最低的隊(duì)列中移除工作單元
if priorityQueue.size > priorityQueue.capacity {
priorityQueue.RemoveLeastPriorityJob()
}else {
//通知新進(jìn)來(lái)一個(gè)job
priorityQueue.noticeChan <- struct{}{}
}
}代碼中大部分也都做了注釋,不難理解。這里我們來(lái)看下addPriorityQueue的具體實(shí)現(xiàn):
func (priorityQueue *PriorityQueue) addPriorityQueue(priority int) int {
n := len(priorityQueue.queues)
//通過(guò)二分查找找到priority應(yīng)插入的切片索引
pos := sort.Search(n, func(i int) bool {
return priority < priorityQueue.priority
})
//更新映射表中優(yōu)先級(jí)和切片索引的對(duì)應(yīng)關(guān)系
for i := pos; i < n; i++ {
priorityQueue.priorityIdx[priorityQueue.queues[i].priority] = i + 1
}
tail := make([]*jobQueue, n-pos)
copy(tail, priorityQueue.queues[pos:])
//初始化一個(gè)新的優(yōu)先級(jí)隊(duì)列,并將該元素放到切片的pos位置中
priorityQueue.queues = append(priorityQueue.queues[0:pos], newJobQueue(priority))
//將高于priority優(yōu)先級(jí)的元素也拼接到切片后面
priorityQueue.queues = append(priorityQueue.queues, tail...)
return pos
}最后,我們?cè)賮?lái)看一個(gè)實(shí)際的調(diào)用例子:
func main() {
//初始化一個(gè)隊(duì)列
queue := &PriorityQueue{
noticeChan: make(chan struct{}, cap),
capacity: cap,
priorityIdx: make(map[int]int),
size: 0,
}
//初始化一個(gè)消費(fèi)worker
workerManger := NewWorkerManager(queue)
// worker開(kāi)始監(jiān)聽(tīng)隊(duì)列
go workerManger.StartWork()
// 構(gòu)造SquareJob
job := &SquareJob{
BaseJob: &BaseJob{
DoneChan: make(chan struct{}, 1),
},
x: 5,
priority: 10,
}
//壓入隊(duì)列尾部
queue.PushJob(job)
//等待job執(zhí)行完成
job.WaitDone()
print("The End")
}05 多隊(duì)列-多消費(fèi)者模式

我們?cè)诙嚓?duì)列-單消費(fèi)者的基礎(chǔ)上,再來(lái)看看多消費(fèi)者模式。也就是增加worker的數(shù)量,提高Job的處理速度。
我們?cè)賮?lái)看下worker的定義:
type WorkerManager struct {
queue *PriorityQueue
closeChans []chan struct{}
}這里需要注意,closeChans變成了切片數(shù)組。因?yàn)槲覀兠繂?dòng)一個(gè)worker,就需要有一個(gè)關(guān)閉通道。
然后看StartWorker函數(shù)的實(shí)現(xiàn):
func (m *WorkerManager) StartWork(n int) error {
fmt.Println("Start to Work")
for i := 0; i < n; i++ {
m.createWorker();
}
return nil
}
func (m *WorkerManager) createWorker() {
closeChan := make(chan struct{})
//每個(gè)協(xié)程,就是一個(gè)worker
go func(closeChan chan struct{}) {
var job Job
for {
select {
case <-m.closeChan:
return nil
case <-m.queue.noticeChan:
job := m.queue.PopJob()
m.ConsumeJob(job)
}
}
}(closeChan)
m.closeChanMu.Lock()
defer m.closeChanMu.Unlock()
m.closeChans = append(m.closeChans, closeChan)
return nil
}
func (m *WorkerManager) ConsumeJob(job Job) {
defer func() {
job.Done()
}()
job.Execute()
}這里需要注意的是,所有的worker都需要監(jiān)聽(tīng)隊(duì)列的noticeChan通道。測(cè)試的例子就留給讀者自己了。
另外如下圖的單隊(duì)列-多消費(fèi)者模式是多隊(duì)列-多消費(fèi)者模式的一個(gè)特例,這里就不再進(jìn)行實(shí)現(xiàn)了。

總結(jié)
隊(duì)列的作用可以用來(lái)控制流量,而優(yōu)先級(jí)隊(duì)列在兼顧流量控制的同時(shí),還能將流量按優(yōu)先級(jí)高低來(lái)進(jìn)行處理。 本文中一些細(xì)節(jié)的并發(fā)加鎖操作做了忽略,大家在實(shí)際應(yīng)用中根據(jù)需要進(jìn)行完善即可,更多關(guān)于Go 單隊(duì)列優(yōu)先級(jí)隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang http使用踩過(guò)的坑與應(yīng)對(duì)方式
這篇文章主要介紹了golang http使用踩過(guò)的坑與應(yīng)對(duì)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
Golang校驗(yàn)字符串是否JSON格式的方法總結(jié)
這篇文章主要為大家詳細(xì)介紹了Golang中校驗(yàn)字符串是否JSON格式的方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起了解一下2023-04-04
Golang巧用defer進(jìn)行錯(cuò)誤處理的方法
錯(cuò)誤處理是程序的重要組成部分,有效且優(yōu)雅的處理錯(cuò)誤是大多數(shù)程序員的追求,下面這篇文章主要給大家介紹了關(guān)于Golang中巧用defer進(jìn)行錯(cuò)誤處理的方法,文中通過(guò)示例介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧。2017-05-05
Golang中切片長(zhǎng)度和容量的區(qū)別示例詳解
切片長(zhǎng)度與容量在Go中很常見(jiàn),切片長(zhǎng)度是切片中可用元素的數(shù)量,而切片容量是從切片中第一個(gè)元素開(kāi)始計(jì)算的底層數(shù)組中的元素?cái)?shù)量,這篇文章主要給大家介紹了關(guān)于Golang中切片長(zhǎng)度和容量區(qū)別的相關(guān)資料,需要的朋友可以參考下2024-01-01
golang中g(shù)in框架接入jwt使用token驗(yàn)證身份
本文主要介紹了golang中g(shù)in框架接入jwt使用token驗(yàn)證身份,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
GO語(yǔ)言文件的創(chuàng)建與打開(kāi)實(shí)例分析
這篇文章主要介紹了GO語(yǔ)言文件的創(chuàng)建與打開(kāi)的具體用法,實(shí)例分析了GO語(yǔ)言文件創(chuàng)建與打開(kāi)操作中所涉及的函數(shù)具體用法,具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2014-12-12

