基于rocketmq的有序消費模式和并發(fā)消費模式的區(qū)別說明
rocketmq消費者注冊監(jiān)聽有兩種模式
有序消費MessageListenerOrderly和并發(fā)消費MessageListenerConcurrently,這兩種模式返回值不同。
MessageListenerOrderly
正確消費返回
ConsumeOrderlyStatus.SUCCESS
稍后消費返回
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently
正確消費返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
稍后消費返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
顧名思義,有序消費模式是按照消息的順序進行消費,但是除此之外,在實踐過程中我發(fā)現(xiàn)和并發(fā)消費模式還有很大的區(qū)別的。
第一,速度,下面我打算用實驗來探究一下。
使用mq發(fā)送消息,消費者使用有序消費模式消費,具體的業(yè)務是阻塞100ms
Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
logger.info("==========CONSUME_START===========");
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
try {
if(date1 == null)
date1 = new Date();//在第一次消費時初始化
Thread.sleep(100);
logger.info("total:"+(++total));
date2 = new Date();
totalTime = (date2.getTime() - date1.getTime());
logger.info("totalTime:"+totalTime);
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
消費100條消息

速度挺快的,為了讓結(jié)果更準確,將消息加到1000條
消費1000條消息

可以看到每一條消息平均耗時25ms,然而業(yè)務是阻塞100ms,這說明有序消費模式和同步消費可能并不是一回事,那如果不阻塞代碼我們再來看一下結(jié)果

不阻塞過后速度明顯提高了,那么我阻塞300ms會怎么樣呢?

時間相比阻塞100ms多了2倍
接下來我們測試并發(fā)消費模式
Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
try {
if(date1 == null)
date1 = new Date();
Thread.sleep(100);
logger.info("total:"+(++total));
date2 = new Date();
totalTime = (date2.getTime() - date1.getTime());
logger.info("totalTime:"+totalTime);
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
基于上次的經(jīng)驗,同樣測試三種情況,消費1000條不阻塞,消費1000條阻塞100ms,消費1000條阻塞300ms
消費1000條不阻塞的情況

和有序消費模式差不多,快個一兩秒。
消費1000條阻塞100ms

竟然比不阻塞的情況更快,可能是誤差把
消費1000條阻塞300ms

速度稍慢,但是還是比有序消費快得多。
結(jié)論是并發(fā)消費的消費速度要比有序消費更快。
另一個區(qū)別是消費失敗時的處理不同,有序消費模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消費者會立馬消費這條消息,而使用并發(fā)消費模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要過好幾秒甚至十幾秒才會再次消費。
我是在只有一條消息的情況下測試的。更重要的區(qū)別是,
返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不會增加消息的消費次數(shù),mq消息有個默認最大消費次數(shù)16,消費次數(shù)到了以后,這條消息會進入死信隊列,這個最大消費次數(shù)是可以在mqadmin中設置的。
mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3
我測試后發(fā)現(xiàn),并發(fā)模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一個消息到達最大消費次數(shù)之后就不會再出現(xiàn)了。這說明有序消費模式可能并沒有這個機制,這意味著你再有序消費模式下拋出固定異常,那么這條異常信息將會被永遠消費,并且很可能會影響之后正常的消息。下面依然做個試驗
Map<String, Integer> map = new HashMap<>();//保存消息錯誤消費次數(shù)
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
try {
if(1 == 1)
throw new Exception();
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
MessageExt msg = msgs.get(0);
if(map.containsKey(msg.getKeys())) {//消息每消費一次,加1
map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
}else {
map.put(msg.getKeys(), 1);
}
logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
發(fā)送了十條消息

可以看到雖然我發(fā)了十條消息,但是一直在消費同樣四條消息,這可能跟消息broker有默認四條隊列有關(guān)系。同時從時間可以看到,消費失敗后,會馬上拉這條信息。
至于并發(fā)消費模式則不會無限消費,而且消費失敗后不會馬上再消費。具體的就不嘗試了。
結(jié)論是有序消費模式MessageListenerOrderly要慎重地處理異常,我則是用全局變量記錄消息的錯誤消費次數(shù),只要消費次數(shù)達到一定次數(shù),那么就直接返回ConsumeOrderlyStatus.SUCCESS。
突然想到之前測試有序消費模式MessageListenerOrderly的時候為什么1000條消息阻塞100ms耗時25000ms了,因為有序消費模式是同時拉取四條隊列消息的,這就對上了。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
淺談關(guān)于Java正則和轉(zhuǎn)義中\(zhòng)\和\\\\的理解
這篇文章主要介紹了淺談關(guān)于Java正則和轉(zhuǎn)義中\(zhòng)\和\\\\的理解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-08-08
macOS中搭建Java8開發(fā)環(huán)境(基于Intel?x86?64-bit)
這篇文章主要介紹了macOS中搭建Java8開發(fā)環(huán)境(基于Intel?x86?64-bit)?的相關(guān)資料,需要的朋友可以參考下2022-12-12
Log4j2?重大漏洞編譯好的log4j-2.15.0.jar包下載(替換過程)
Apache?開源項目?Log4j?的遠程代碼執(zhí)行漏洞細節(jié)被公開,由于?Log4j?的廣泛使用,該漏洞一旦被攻擊者利用會造成嚴重危害,下面小編給大家?guī)砹薒og4j2?重大漏洞編譯好的log4j-2.15.0.jar包下載,感興趣的朋友一起看看吧2021-12-12

