基于RocketMQ推拉模式詳解
消費者客戶端有兩種方式從消息中間件獲取消息并消費。嚴(yán)格意義上來講,RocketMQ并沒有實現(xiàn)PUSH模式,而是對拉模式進行一層包裝,名字雖然是 Push 開頭,實際在實現(xiàn)時,使用 Pull 方式實現(xiàn)。
通過 Pull 不斷輪詢 Broker 獲取消息。當(dāng)不存在新消息時,Broker 會掛起請求,直到有新消息產(chǎn)生,取消掛起,返回新消息。
1、概述
1.1、PULL方式
由消費者客戶端主動向消息中間件(MQ消息服務(wù)器代理)拉取消息;采用Pull方式,如何設(shè)置Pull消息的拉取頻率需要重點去考慮,舉個例子來說,可能1分鐘內(nèi)連續(xù)來了1000條消息,然后2小時內(nèi)沒有新消息產(chǎn)生(概括起來說就是“消息延遲與忙等待”)。
如果每次Pull的時間間隔比較久,會增加消息的延遲,即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間內(nèi)MQ中并沒有任何消息可以消費,那么會產(chǎn)生很多無效的Pull請求的RPC開銷,影響MQ整體的網(wǎng)絡(luò)性能;
1.2、PUSH方式
由消息中間件(MQ消息服務(wù)器代理)主動地將消息推送給消費者;采用Push方式,可以盡可能實時地將消息發(fā)送給消費者進行消費。
但是,在消費者的處理消息的能力較弱的時候(比如,消費者端的業(yè)務(wù)系統(tǒng)處理一條消息的流程比較復(fù)雜,其中的調(diào)用鏈路比較多導(dǎo)致消費時間比較久。
概括起來地說就是“慢消費問題”),而MQ不斷地向消費者Push消息,消費者端的緩沖區(qū)可能會溢出,導(dǎo)致異常;
2、PUSH模式
主動推送的模式實現(xiàn)起來簡單,避免了拉取的消費端業(yè)務(wù)邏輯的復(fù)雜度,消息的消費可以認(rèn)為是實時的,同時也存在一定的弊端,要求消費端要有很強的消費能力。
2.1、代碼實現(xiàn)
public class Consumer1 {
public static void main(String[] args){
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("consumer_push");
consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
try {
for(MessageExt msg : paramList){
String msgbody = new String(msg.getBody(), "utf-8");
SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
Date date = new Date(msg.getStoreTimestamp());
System.out.println("Consumer1=== 存入時間 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
}
});
consumer.start();
System.out.println("Consumer1===啟動成功!");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
PUSH消費方式,需要注冊一個監(jiān)聽器Listener,,用來監(jiān)聽最新的消息,進行業(yè)務(wù)處理,同時反饋消息的消費狀態(tài),消費成功(CONSUME_SUCCESS)、消費重試(RECONSUME_LATER),消息重試會根據(jù)配置的消息的延遲等級的時間間隔,定時重新發(fā)送消費失敗的記錄。(PS:延遲消息中會重點討論)
PUSH消息方式由于返回了消息的狀態(tài),服務(wù)端會維護每個消費端的消費進度,內(nèi)部會記錄消費進度,消息發(fā)送成功后會更新消費進度。
PUSH消息方式的局限性,是在HOLD住Consumer請求的時候需要占用資源,它適合用在消息隊列這種客戶端連接數(shù)可控的場景中。
上一個章節(jié)說明了服務(wù)端存儲的每個主題對應(yīng)的消費組的每個消息隊列的偏移量
查看服務(wù)器文件上的消費進度信息:
/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json

3、PULL模式
3.1、代碼實現(xiàn)(1)
public class PullConsumer {
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullConsumer");
consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
SINGLE_MQ: while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println("=============================================================");
System.out.println("Consume from the queue: " + mq + "offset:" + getMessageQueueOffset(mq) + "結(jié)果:" + pullResult.getPullStatus());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.print(new String(m.getBody()) +" == ");
}
System.out.println("");
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
結(jié)果:

每次拉取消息的時候需要提供偏移量和拉取的消息的個數(shù),需要自己業(yè)務(wù)實現(xiàn)每個主題下的隊列的消費進度。
代碼實現(xiàn)(1)這種方式只能拉取歷史的消息,最新的消息拉取不了,也可以進行改造,來實現(xiàn)一直拉取。
3.2、代碼實現(xiàn)(2)
在MQPullConsumer這個類里面,有一個MessageQueueListener,它的目的就是當(dāng)queue發(fā)生變化的時候,通知Consumer。也正是這個借口,幫助我們在Pull模式里面,實現(xiàn)負(fù)載均衡。
注意,這個接口在MQPushConsumer里面是沒有的,那里面有的是上面代碼里的MessageListener。
void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
public interface MessageQueueListener {
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
}
有了這個Listener,我們就可以動態(tài)的知道當(dāng)前的Consumer分?jǐn)偟搅藥讉€MessageQueue。然后對這些MessageQueue,我們可以開個線程池來消費。
public class PullConsumerExtend {
public static void main(String[] args) throws MQClientException {
//消費組
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("pullConsumer");
//MQ NameService地址
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
//負(fù)載均衡模式
scheduleService.setMessageModel(MessageModel.CLUSTERING);
//需要處理的消息topic
scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
System.out.println("");
System.out.println("Consume from the queue: " + mq + "offset:" + offset + "結(jié)果:" + pullResult.getPullStatus());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.print(new String(m.getBody()) +" == ");
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
//設(shè)置下一下拉取的間隔時間
context.setPullNextDelayTimeMillis(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}
}
結(jié)果:

比較**代碼實現(xiàn)(1)**這種方式改進了很多,不需要業(yè)務(wù)維護每個消費隊列的消費進度,可以更新到服務(wù)端的。
弊端也很明顯就是每次隊列拉取消息的時間間隔,時間長導(dǎo)致消息擠壓,時間段消息少,影響服務(wù)端性能。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解Spring注解@Autowired的實現(xiàn)原理和使用方法
在使用Spring開發(fā)的時候,配置的方式主要有兩種,一種是xml的方式,另外一種是 java config的方式,在使用的過程中,我們使用最多的注解應(yīng)該就是@Autowired注解了,所以本文就給大家講講@Autowired注解是如何使用和實現(xiàn)的,需要的朋友可以參考下2023-07-07
quartz實現(xiàn)定時功能實例詳解(servlet定時器配置方法)
Quartz是一個完全由java編寫的開源作業(yè)調(diào)度框架,下面提供一個小例子供大家參考,還有在servlet配置的方法2013-12-12
Java常見問題之javac Hello.java找不到文件的解決方法
剛開始編寫java代碼時,肯定會遇到各種各樣的bug,當(dāng)然對于初學(xué)者這也是能理解的,下面這篇文章主要給大家介紹了關(guān)于Java常見問題之javac Hello.java找不到文件解決的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下。2018-01-01
Spring Boot使用AOP在指定方法執(zhí)行完后執(zhí)行異步處理操作
這篇文章主要介紹了Spring Boot使用AOP在指定方法執(zhí)行完后執(zhí)行異步處理操作,本文通過實例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2024-06-06
基于SpringBoot實現(xiàn)自動裝配返回屬性的設(shè)計思路
這篇文章主要介紹了基于SpringBoot實現(xiàn)自動裝配返回屬性,這里涉及到的技術(shù)知識點有注解解析器,為什么用ResponseBodyAdvice這里解析?不在Filter,Interceptors,本文結(jié)合示例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2022-03-03
DoytoQuery中關(guān)于N+1查詢問題解決方案詳解
這篇文章主要為大家介紹了DoytoQuery中關(guān)于N+1查詢問題解決方案詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12

