解決RabbitMq消息隊列Qos?Prefetch消息堵塞問題
mq是實現(xiàn)代碼擴展的有利手段,個人喜歡用概念來學(xué)習(xí)新知識,介紹堵塞問題的之前,先來段概念的學(xué)習(xí)。
ConnectionFactory:創(chuàng)建connection的工廠類
Connection: 簡單理解為socket
Channel:和mq交互的接口,定義queue、exchange和綁定queue、exhange等接口都是它。
接下來就是和mq的交互類
exchange:簡單地看成路由,類型不是重點,看看官網(wǎng)即可
queue:客戶端監(jiān)聽的是queue,而不是exchange,但是使用queue的前提要先將exchange和queue綁定。用過java queue工具類應(yīng)該很容易上手,queue分為寫和讀,各自可以有自己頻率,寫得快讀得慢,容易堵塞;寫得慢讀得快又容易造成消費者的空閑。
Prefetc:一個重要卻容易被忽略的指標(biāo),也是這次遇到的問題。
prefetch與消息投遞
prefetch是指單一消費者最多能消費的unacked messages數(shù)目。
如何理解呢?
mq為每一個 consumer設(shè)置一個緩沖區(qū),大小就是prefetch。每次收到一條消息,MQ會把消息推送到緩存區(qū)中,然后再推送給客戶端。當(dāng)收到一個ack消息時(consumer 發(fā)出baseack指令),mq會從緩沖區(qū)中空出一個位置,然后加入新的消息。但是這時候如果緩沖區(qū)是滿的,MQ將進入堵塞狀態(tài)。
更具體點描述,假設(shè)prefetch值設(shè)為10,共有兩個consumer。也就是說每個consumer每次會從queue中預(yù)抓取 10 條消息到本地緩存著等待消費。同時該channel的unacked數(shù)變?yōu)?0。而Rabbit投遞的順序是,先為consumer1投遞滿10個message,再往consumer2投遞10個message。如果這時有新message需要投遞,先判斷channel的unacked數(shù)是否等于20,如果是則不會將消息投遞到consumer中,message繼續(xù)呆在queue中。之后其中consumer對一條消息進行ack,unacked此時等于19,Rabbit就判斷哪個consumer的unacked少于10,就投遞到哪個consumer中。
我遇到的問題是一個粗心的程序員,在編寫代碼的時候,他對某些消息處理方式是這樣的
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
logger.error("######### The message is not delete from queue : {}", body);
}首先他講ack機制設(shè)置為手動的,然后他的理解是如果處理成功的消息,就ack給MQ,期望MQ就可以刪除完成的數(shù)據(jù)。不然,保留數(shù)據(jù)再次被處理。
這里的誤區(qū)就是就是對ack的理解,失敗的時候,如果需要讓程序繼續(xù)處理,應(yīng)該使用basicNack,并告訴mq將消息再次放入隊列
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}對于客戶端意外宕機的情況,沒有ack服務(wù)器確實不會刪除掉數(shù)據(jù),但是consumer重啟以后,對于服務(wù)器就是一個新的消費者了,也就是它的緩沖區(qū)又被重置為原來的n-prefetch,所以這個問題被粗心的小哥想當(dāng)然地測試通過了。
prefetch的大小應(yīng)該為多少
這篇文章給了很好的建議,我簡單地說一下我的理解。
理想狀況下,計算MQ SERVER 從緩沖區(qū)中拿到消息并推送到消費端,加上消費端處理完ack消息到MQ server,的時間,假設(shè)為100ms,其中消費端處理業(yè)務(wù)話費了10ms。
這里可以得出我們 prefetch = 100ms / 10ms = 10,也就是消息來回的總時間/業(yè)務(wù)處理的時間,這里要求我們 prefetch >= 10。一般計算這個時間不會太準(zhǔn)確只能毛姑姑的,所以prefetch一般要大一點。但是這個值也不能太大,不然消費端就一只處于空閑狀態(tài)了。
所以如果你保證所有的消息都ack了,但是還是出現(xiàn)比較長時間的堵塞,你就或者加大一點prefetch,或者多加一些機器,或者減少業(yè)務(wù)處理的時間了。一開始建議采用或者,使用一個線程池來處理這些業(yè)務(wù)邏輯。
以上就是解決RabbitMqQosPrefetch消息堵塞問題的詳細內(nèi)容,更多關(guān)于RabbitMqQosPrefetch消息堵塞的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實現(xiàn)在線考試系統(tǒng)與設(shè)計(學(xué)生功能)
這篇文章主要介紹了Java實現(xiàn)在線考試系統(tǒng)與設(shè)計(學(xué)生功能),本文通過實例代碼給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02
SpringBoot中使用configtree讀取樹形文件目錄中的配置詳解
這篇文章主要介紹了SpringBoot中使用configtree讀取樹形文件目錄中的配置詳解,configtree通過spring.config.import?+?configtree:前綴的方式,加載以文件名為key、文件內(nèi)容為value的配置屬性,需要的朋友可以參考下2023-12-12
實現(xiàn)一個基于Servlet的hello world程序詳解步驟
Java Servlet 是運行在 Web 服務(wù)器或應(yīng)用服務(wù)器上的程序,它是作為來自 Web 瀏覽器或其他 HTTP 客戶端的請求和 HTTP 服務(wù)器上的數(shù)據(jù)庫或應(yīng)用程序之間的中間層2022-02-02
Spring SpringMVC在啟動完成后執(zhí)行方法源碼解析
這篇文章主要介紹了SpringMVC在啟動完成后執(zhí)行方法源碼解析,還是非常不錯的,在這里分享給大家,需要的朋友可以參考下。2017-09-09
springboot項目整合druid數(shù)據(jù)庫連接池的實現(xiàn)
這篇文章主要介紹了springboot項目整合druid數(shù)據(jù)庫連接池的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04

