RocketMQ消息生產(chǎn)者是如何選擇Broker示例詳解
前言
在RocketMQ中為,我們創(chuàng)建消息生產(chǎn)者時(shí),只需要設(shè)置NameServer地址,消息就能正確地發(fā)送到對(duì)應(yīng)的Broker中,那么RocketMQ消息生產(chǎn)者是如何找到Broker的呢?如果有多個(gè)Broker實(shí)例,那么消息發(fā)送是如何選擇發(fā)送到哪個(gè)Broker的呢?
從NameServer查詢(xún)Topic信息
通過(guò)Debug消息發(fā)送send()方法,我們最終可以定位到DefaultMQProducerImpl.sendDefaultImpl()這個(gè)方法,并且我們找到了最關(guān)鍵的Topic信息:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
這個(gè)方法就是通過(guò)topic從NameServer拉出對(duì)應(yīng)的Broker信息:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
1.一開(kāi)始的話,是從當(dāng)前緩存中找Topic信息,第一次肯定是找不到的;
2.找不到Topic信息,那么就調(diào)用updateTopicRouteInfoFromNameServer(topic)從NameServer拉對(duì)應(yīng)的信息,如果拉到了就更新到緩存中;
3.如果依然找不到Topic信息,說(shuō)明沒(méi)有任何Broker上面是有這個(gè)Topic的;但是我們還要拉開(kāi)啟了自動(dòng)創(chuàng)建Topic配置的Broker信息,通過(guò)updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)實(shí)現(xiàn);
生產(chǎn)者客戶(hù)端會(huì)從兩個(gè)地方獲取Broker信息,第一個(gè)就是從內(nèi)存緩存中獲取,第二個(gè)就是從NameServer中獲取。從NameServer中分兩次獲取,一次是獲取存在的Topic對(duì)應(yīng)的Broker信息,第二次是獲取還沒(méi)有創(chuàng)建出來(lái)的Topic對(duì)應(yīng)的Broker信息;
如何選擇Broker
當(dāng)客戶(hù)端拿到了Topic對(duì)應(yīng)的Broker信息后,它是如何選擇目標(biāo)Broker的呢?繼續(xù)向下看,我們找到了關(guān)鍵代碼:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
......
1.如果是同步發(fā)送消息,那么【總的發(fā)送次數(shù)】=1+【重試次數(shù)】,如果是異步發(fā)送,默認(rèn)是1;我們當(dāng)前是同步模式,所以會(huì)存在重試;
2.選擇Broker的關(guān)鍵代碼就在selectOneMessageQueue()方法中,通過(guò)前面拿到的topicPublishInfo作為參數(shù),lastBrokerName作為額外的考慮參數(shù);
追蹤代碼,我們進(jìn)入MQFaultStrategy.selectOneMessageQueue()中:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
1.如果開(kāi)啟了延遲故障規(guī)避,那么執(zhí)行規(guī)避策略;
- 1.1:輪詢(xún)找一個(gè)
Broker,該Broker要么不在規(guī)避名單內(nèi),要么已經(jīng)度過(guò)了規(guī)避期(發(fā)送消息失敗會(huì)將目標(biāo)Broker放進(jìn)規(guī)避名單,沉默一段時(shí)間); - 1.2:如果所有的
Broker都沒(méi)有度過(guò)規(guī)避期,那么從比較好的那一部分Broker里面找一個(gè)出來(lái); - 1.3:如果依然沒(méi)有找到合適的
Broker,那么就隨機(jī)選一個(gè)Broker;
2.否則就隨機(jī)選一個(gè)Broker;
下面我們來(lái)看一下隨機(jī)發(fā)送的策略是怎么實(shí)現(xiàn)的:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
1.如果第一次發(fā)送消息,那么通過(guò)自增求余的方式從列表中找一個(gè)Broker,其實(shí)就是輪詢(xún)方式;
2.如果不是第一次發(fā)送消息,那么會(huì)盡可能避開(kāi)上一次的Broker服務(wù),也是為了讓Broker服務(wù)負(fù)載均衡;
3.如果沒(méi)有避開(kāi)上一次的Broker,那么再向后找另一個(gè)Broker;除非只有一個(gè)Broker服務(wù),否則會(huì)盡可能避開(kāi)上次發(fā)送的Broker;
小結(jié)
通過(guò)源碼分析,我們已經(jīng)知道了生產(chǎn)者是如何選擇目標(biāo)Broker的了:
1.第一次發(fā)消息,通過(guò)輪詢(xún)的方式選擇Broker;
2.后續(xù)發(fā)消息會(huì)規(guī)避上次的Broker,同樣采用輪詢(xún)的方式選擇Broker;
3.在消息發(fā)送過(guò)程中,存在一個(gè)Broker規(guī)避列表,用戶(hù)可以通過(guò)setSendLatencyFaultEnable(true)開(kāi)啟故障規(guī)避策略,客戶(hù)端會(huì)盡可能選擇不在規(guī)避列表中的Broker,如果所有的Broker都在規(guī)避列表中,那么會(huì)選擇一個(gè)相對(duì)比較好的Broker來(lái)用;
以上就是RocketMQ消息生產(chǎn)者是如何選擇Broker示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息生產(chǎn)者Broker的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot themaleaf 第一次進(jìn)頁(yè)面不加載css的問(wèn)題
這篇文章主要介紹了springboot themaleaf 第一次進(jìn)頁(yè)面不加載css的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10
Mybatis的collection三層嵌套查詢(xún)方式(驗(yàn)證通過(guò))
這篇文章主要介紹了Mybatis的collection三層嵌套查詢(xún)方式(驗(yàn)證通過(guò)),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
mybatis多數(shù)據(jù)源動(dòng)態(tài)切換的完整步驟
這篇文章主要給大家介紹了關(guān)于mybatis多數(shù)據(jù)源動(dòng)態(tài)切換的完整步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
SpringBoot中實(shí)現(xiàn)訂單30分鐘自動(dòng)取消的三種方案分享
在電商和其他涉及到在線支付的應(yīng)用中,通常需要實(shí)現(xiàn)一個(gè)功能:如果用戶(hù)在生成訂單后的一定時(shí)間內(nèi)未完成支付,系統(tǒng)將自動(dòng)取消該訂單,本文將詳細(xì)介紹基于Spring Boot框架實(shí)現(xiàn)訂單30分鐘內(nèi)未支付自動(dòng)取消的幾種方案,并提供實(shí)例代碼,需要的朋友可以參考下2023-10-10
JavaWeb倉(cāng)庫(kù)管理系統(tǒng)詳解
這篇文章主要為大家詳細(xì)介紹了JavaWeb倉(cāng)庫(kù)管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09
基于RecyclerChart的KLine的繪制Scale詳解
這篇文章主要為大家詳細(xì)介紹了基于RecyclerChart實(shí)現(xiàn)KLine繪制Scale的相關(guān)資料,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-03-03
AsyncHttpClient?ClientStats源碼流程解讀
這篇文章主要為大家介紹了AsyncHttpClient?ClientStats源碼流程解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12

