RocketMQ消息拉取過程詳解
前言
在上一篇文章中,我們講述了DefaultMQPushConsumer拉消息的原理,它是通過重平衡觸發(fā)pullRequest的創(chuàng)建,通過阻塞隊(duì)列作為pullRequest的存儲(chǔ)容器,另一端通過定時(shí)任務(wù)從阻塞隊(duì)列中取出pullRequest來向Broker發(fā)送拉消息的請求,無論消息拉取成功還是失敗,都會(huì)重新把pullRequest放回阻塞隊(duì)列中,這樣就能保證持續(xù)不斷地向Broker拉消息了;
今天這篇文章我們繼續(xù)講述DefaultLitePullConsumer是如何實(shí)現(xiàn)消息拉取的;
DefaultLitePullConsumer拉消息代碼示例
我們在使用DefaultLitePullConsumer時(shí)都是主動(dòng)去poll消息,并不是像DefaultMQPushConsumer那樣設(shè)置一個(gè)消息監(jiān)聽器:
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group);
consumer.setNamesrvAddr(nameSrv);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, subExpression);
try {
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
try {
while (true) {
List<MessageExt> messageExts = consumer.poll(5000);
// 處理業(yè)務(wù)邏輯
System.out.println("消息數(shù)量:" + messageExts.size());
System.out.println("消息內(nèi)容:");
for (MessageExt messageExt : messageExts) {
System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8));
}
}
} catch (Exception e) {
e.printStackTrace();
}
一般拿到消息后都會(huì)交給業(yè)務(wù)線程池去處理,上述代碼我只簡單地打印了一下消息內(nèi)容;
消息消費(fèi)
跟著poll()方法,我們最終定位到DefaultLitePullConsumerImpl.poll()這個(gè)方法:
public synchronized List<MessageExt> poll(long timeout) {
try {
this.checkServiceState();
if (timeout < 0L) {
throw new IllegalArgumentException("Timeout must not be negative");
}
?
if (this.defaultLitePullConsumer.isAutoCommit()) {
this.maybeAutoCommit();
}
?
long endTime = System.currentTimeMillis() + timeout;
// 從阻塞隊(duì)列中取ConsumeRequest
DefaultLitePullConsumerImpl.ConsumeRequest consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() > 0L) {
while(consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0L) {
break;
}
}
}
?
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts();
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
// 取到消息后直接更新消費(fèi)點(diǎn)位
this.assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
this.resetTopic(messages);
// 下面是調(diào)用consumeMessageHook
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(this.defaultLitePullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(consumeRequest.getMessageQueue());
consumeMessageContext.setMsgList(messages);
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
// 默認(rèn)是消費(fèi)成功
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
?
return messages;
}
} catch (InterruptedException var10) {
}
?
return Collections.emptyList();
}
1.直接從阻塞隊(duì)列consumeRequestCache中取出消息對象ConsumeRequest,這里面就包含了消息內(nèi)容;
2.取出來后直接更新消費(fèi)點(diǎn)位,默認(rèn)為此次消息消費(fèi)成功;
這里跟DefaultMQPushConsumer不同的是,DefaultLitePullConsumerImpl.poll()默認(rèn)的是消息消費(fèi)一定成功,如果消費(fèi)失敗的話,需要開發(fā)人員自己處理,消費(fèi)失敗的消息不會(huì)再次發(fā)送給消費(fèi)者;
那么咱們的疑問就出來了,poll()方法光顧著從consumeRequestCache中取消息,那消息是啥時(shí)候放進(jìn)去的呢?
消息拉取入口
我們可以重新了解一下消費(fèi)者重平衡過程,在MessageQueue分配完畢后,會(huì)對比被分配的MessageQueue是否和分配前的不一致,大部分情況下是會(huì)發(fā)生改變的,那么就會(huì)觸發(fā)messageQueueChanged()方法的調(diào)用:
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
// 取出所有的MessageQueueListener
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
// 依次調(diào)用messageQueueChanged方法
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
MessageQueueListener是什么時(shí)候被放進(jìn)去的呢?可以看一下subscribe()方法:
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE);
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
// 每個(gè)subscribe()都會(huì)設(shè)置MessageQueueListener
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
if (serviceState == ServiceState.RUNNING) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}
可以發(fā)現(xiàn),每個(gè)subscribe()都會(huì)設(shè)置MessageQueueListener,MessageQueueListenerImpl里面只干了一件事情:更新MessageQueue并且創(chuàng)建pullTask;
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
}
}
?
public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING:
updateAssignedMessageQueue(topic, mqAll);
// 更新拉消息任務(wù)
updatePullTask(topic, mqAll);
break;
case CLUSTERING:
updateAssignedMessageQueue(topic, mqDivided);
// 更新拉消息任務(wù)
updatePullTask(topic, mqDivided);
break;
default:
break;
}
}
現(xiàn)在終于快找到這個(gè)消息拉取的入口了:
private void startPullTask(Collection<MessageQueue> mqSet) {
for (MessageQueue messageQueue : mqSet) {
if (!this.taskTable.containsKey(messageQueue)) {
// 創(chuàng)建消息拉取任務(wù)
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
// 這個(gè)就是任務(wù)執(zhí)行的入口
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}
消息拉取的入口尋找起來還是有點(diǎn)困難的,但是主要思路還是從【重平衡】開始,另外就是觸發(fā)了MessageQueueListener,此時(shí)才會(huì)創(chuàng)建pullTask;
雖然DefaultMQPushConsumer也是【重平衡】觸發(fā)pullRequest的創(chuàng)建,但是它是將pullRequest放進(jìn)阻塞隊(duì)列,另一端由消息拉取任務(wù)去取pullRequest向Broker發(fā)送請求;而DefaultLitePullConsumer是直接創(chuàng)建pullTask去拉消息;
PullTaskImpl拉消息
很顯然,PullTaskImpl就是一個(gè)Runnable,那么最重要的就是它的run()方法,這個(gè)方法就是負(fù)責(zé)從Broker拉消息并放進(jìn)consumeRequestCache阻塞隊(duì)列中,這樣poll()方法才能從consumeRequestCache阻塞隊(duì)列中取到消息;
messageQueue暫停
if (DefaultLitePullConsumerImpl.this.assignedMessageQueue.isPaused(this.messageQueue)) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 1000L, TimeUnit.MILLISECONDS);
DefaultLitePullConsumerImpl.this.log.debug("Message Queue: {} has been paused!", this.messageQueue);
return;
}
如果messageQueue處于暫停狀態(tài),那么延遲1秒重新執(zhí)行這個(gè)任務(wù);
ProcessQueue被移除
ProcessQueue processQueue = DefaultLitePullConsumerImpl.this.assignedMessageQueue.getProcessQueue(this.messageQueue);
if (null == processQueue || processQueue.isDropped()) {
DefaultLitePullConsumerImpl.this.log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
如果processQueue不存在或者已經(jīng)被移除了,那么這個(gè)任務(wù)也不用執(zhí)行了;
- 流量控制
if ((long)DefaultLitePullConsumerImpl.this.consumeRequestCache.size() * (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize() > DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForAll()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", DefaultLitePullConsumerImpl.this.consumeRequestCache.size(), DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes);
}
return;
}
如果consumeRequestCache中的消息數(shù)量超過了PullThresholdForAll閾值,那么觸發(fā)限流機(jī)制,當(dāng)前任務(wù)將不會(huì)繼續(xù)拉消息,并且50毫秒后才會(huì)重新執(zhí)行該任務(wù);
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L;
// 單個(gè)processQueue上面消息數(shù)量限制
if (cachedMessageCount > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
}
return;
}
// 單個(gè)processQueue中消息總大小限制
if (cachedMessageSizeInMiB > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
}
return;
}
- 如果當(dāng)前
processQueue中消息的數(shù)量大于PullThresholdForQueue閾值,也同樣觸發(fā)限流機(jī)制,當(dāng)前任務(wù)不再執(zhí)行,50毫秒后重新執(zhí)行該任務(wù); - 如果當(dāng)前
processQueue中消息的總大小超過PullThresholdSizeForQueue(單位:MB)閾值,將觸發(fā)限流機(jī)制,當(dāng)前任務(wù)不再執(zhí)行,50毫秒后重新執(zhí)行該任務(wù);
if (processQueue.getMaxSpan() > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumeMaxSpan()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes});
}
return;
}
如果processQueue中的maxSpan大于消費(fèi)者的ConsumeMaxSpan,也就是第一個(gè)消息與最后一個(gè)消息的點(diǎn)位偏差大于ConsumeMaxSpan(默認(rèn)是2000),將觸發(fā)限流機(jī)制,當(dāng)前任務(wù)不執(zhí)行,50毫秒后重新執(zhí)行該任務(wù);
- 計(jì)算拉取點(diǎn)位
long offset = 0L;
try {
offset = DefaultLitePullConsumerImpl.this.nextPullOffset(this.messageQueue);
} catch (Exception var17) {
DefaultLitePullConsumerImpl.this.log.error("Failed to get next pull offset", var17);
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 3000L, TimeUnit.MILLISECONDS);
return;
}
計(jì)算消息拉取的點(diǎn)位,如果產(chǎn)生異常,那么簡隔3秒后再來重新開始任務(wù);
- 拉消息
PullResult pullResult = DefaultLitePullConsumerImpl.this.pull(this.messageQueue, subscriptionData, offset, DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize());
這個(gè)就是發(fā)請求給Broker拉消息;
- 放進(jìn)消息緩存區(qū)
switch(pullResult.getPullStatus()) {
case FOUND:
Object objLock = DefaultLitePullConsumerImpl.this.messageQueueLock.fetchLockObject(this.messageQueue);
synchronized(objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && DefaultLitePullConsumerImpl.this.assignedMessageQueue.getSeekOffset(this.messageQueue) == -1L) {
processQueue.putMessage(pullResult.getMsgFoundList());
DefaultLitePullConsumerImpl.this.submitConsumeRequest(DefaultLitePullConsumerImpl.this.new ConsumeRequest(pullResult.getMsgFoundList(), this.messageQueue, processQueue));
}
break;
}
找到消息的情況下,將調(diào)用submitConsumeRequest()方法把消息放進(jìn)阻塞隊(duì)列中,等待poll()方法來消費(fèi);
- 重新開啟拉取任務(wù)
if (!this.isCancelled()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
DefaultLitePullConsumerImpl.this.log.warn("The Pull Task is cancelled after doPullTask, {}", this.messageQueue);
}
如果當(dāng)前任務(wù)還沒有被取消的話,那么重新開啟下一個(gè)輪回,準(zhǔn)備下一次消息拉??;
以上就是RocketMQ消息拉取過程詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息拉取過程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
RequestContextHolder.getRequestAttributes()空指針問題及解決
這篇文章主要介紹了RequestContextHolder.getRequestAttributes()空指針問題及解決,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
spring boot2結(jié)合mybatis增刪改查的實(shí)現(xiàn)
這篇文章主要給大家介紹了關(guān)于spring boot2結(jié)合mybatis增刪改查的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用spring boot2具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
java正則匹配讀取txt文件提取特定開頭和結(jié)尾的字符串
通常我們可以直接通過文件流來讀取txt文件的內(nèi)容,但有時(shí)候也會(huì)遇到問題,下面這篇文章主要給大家介紹了關(guān)于java正則匹配讀取txt文件提取特定開頭和結(jié)尾的字符串的相關(guān)資料,需要的朋友可以參考下2022-11-11
Spring中的事務(wù)控制知識(shí)總結(jié)
我們講了轉(zhuǎn)賬方法存在著事務(wù)問題,當(dāng)在業(yè)務(wù)層方法更新轉(zhuǎn)入賬戶時(shí)發(fā)現(xiàn)異常,更新收款方賬戶則會(huì)出錯(cuò).當(dāng)時(shí)是通過自定義事務(wù)管理器進(jìn)行整體事務(wù)的處理.其實(shí)Spring 提供了業(yè)務(wù)層的事務(wù)處理解決方案,并且 Spring 的事務(wù)控制都是基于 AOP 的,需要的朋友可以參考下2021-06-06
IntelliJ IDEA中新建Java class的解決方案
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中新建Java class的解決方案,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-10-10
Java實(shí)現(xiàn)漢字轉(zhuǎn)全拼音的方法總結(jié)
在軟件開發(fā)中,經(jīng)常會(huì)遇到需要將漢字轉(zhuǎn)換成拼音的場景,比如在搜索引擎優(yōu)化、數(shù)據(jù)存儲(chǔ)、國際化等方面,Java作為一種廣泛使用的編程語言,提供了多種方法來實(shí)現(xiàn)漢字到拼音的轉(zhuǎn)換,本文將詳細(xì)介紹幾種常用的Java漢字轉(zhuǎn)全拼音的方法,并提供具體的代碼示例和步驟2024-12-12
Spring?Data?JPA命名約定查詢實(shí)現(xiàn)方法
這篇文章主要為大家介紹了Spring?Data?JPA命名約定查詢實(shí)現(xiàn)方法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12

