rocketmq消費(fèi)負(fù)載均衡--push消費(fèi)詳解
前言
本文介紹了DefaultMQPushConsumerImpl消費(fèi)者,客戶端負(fù)載均衡相關(guān)知識點(diǎn)。本文從DefaultMQPushConsumerImpl啟動(dòng)過程到實(shí)現(xiàn)負(fù)載均衡,從源代碼一步一步分析,共分為6個(gè)部分進(jìn)行介紹,其中第6個(gè)部分 rebalanceByTopic 為負(fù)載均衡的核心邏輯模塊,具體過程運(yùn)用了圖文進(jìn)行闡述。
介紹之前首先拋出幾個(gè)問題:
1. 要做負(fù)載均衡,首先要解決的一個(gè)問題是什么?
2. 負(fù)載均衡是Client端處理還是Broker端處理?
個(gè)人理解:
1. 要做負(fù)載均衡,首先要做的就是信號收集。
所謂信號收集,就是得知道每一個(gè)consumerGroup有哪些consumer,對應(yīng)的topic是誰。信號收集分為Client端信號收集與Broker端信號收集兩個(gè)部分。
2. 負(fù)載均衡放在Client端處理。
具體做法是:消費(fèi)者客戶端在啟動(dòng)時(shí)完善rebalanceImpl實(shí)例,同時(shí)拷貝訂閱信息存放rebalanceImpl實(shí)例對象中,另外也是很重要的一個(gè)步驟 -- 通過心跳消息,不停的上報(bào)自己到所有Broker,注冊RegisterConsumer,等待上述過程準(zhǔn)備好之后在Client端不斷執(zhí)行的負(fù)載均衡服務(wù)線程從Broker端獲取一份全局信息(該consumerGroup下所有的消費(fèi)Client),然后分配這些全局信息,獲取當(dāng)前客戶端分配到的消費(fèi)隊(duì)列。
本文具體的內(nèi)容:
I. copySubscription
Client端信號收集,拷貝訂閱信息。
在DefaultMQPushConsumerImpl.start()時(shí),會將消費(fèi)者的topic訂閱關(guān)系設(shè)置到rebalanceImpl的SubscriptionInner的map中用于負(fù)載:
private void copySubscription() throws MQClientException {
try {
//注:一個(gè)consumer對象可以訂閱多個(gè)topic
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
FilterAPI.buildSubscriptionData接口將訂閱關(guān)系轉(zhuǎn)換為SubscriptionData 數(shù)據(jù),其中subString包含訂閱tag等信息。另外,如果該消費(fèi)者的消費(fèi)模式為集群消費(fèi),則會將retry的topic一并放到。
II. 完善rebalanceImpl實(shí)例
Client繼續(xù)收集信息:
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer .getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
本文以DefaultMQPushConsumerImpl為例,因此this對象類型為DefaultMQPushConsumerImp。
III. this.rebalanceService.start()
開啟負(fù)載均衡服務(wù)。this.rebalanceService是一個(gè)RebalanceService實(shí)例對象,它繼承與ServiceThread,是一個(gè)線程類。 this.rebalanceService.start()執(zhí)行時(shí),也即執(zhí)行RebalanceService線程體:
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
IV. this.mqClientFactory.doRebalance
客戶端遍歷消費(fèi)組table,對該客戶端上所有消費(fèi)者獨(dú)立進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)隊(duì)列:
public void doRebalance() {
for (String group : this.consumerTable.keySet()) {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}
V. MQConsumerInner.doRebalance
由于本文以DefaultMQPushConsumerImpl消費(fèi)過程為例,即DefaultMQPushConsumerImpl.doRebalance:
@Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance();
}
}
步驟II 中完善了rebalanceImpl實(shí)例,為調(diào)用rebalanceImpl.doRebalance()提供了初始數(shù)據(jù)。
rebalanceImpl.doRebalance()過程如下:
public void doRebalance() {
// 前文copySubscription中初始化了SubscriptionInner
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
VI. rebalanceByTopic -- 核心步驟之一
rebalanceByTopic方法中根據(jù)消費(fèi)者的消費(fèi)類型為BROADCASTING或CLUSTERING做不同的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集群消費(fèi)負(fù)載均衡的邏輯。
集群消費(fèi)負(fù)載均衡邏輯主要代碼如下(省略了log等代碼):
//1.從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2. 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶端clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
f (null == mqSet) { ... }
if (null == cidAll) { ... }
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
// 3.創(chuàng)建DefaultMQPushConsumer對象時(shí)默認(rèn)設(shè)置為AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 4.調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
// 5. 將分配得到的allocateResult 中的隊(duì)列放入allocateResultSet 集合
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
、
//6. 更新updateProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
注:BROADCASTING邏輯只包含上述的1、6。
集群消費(fèi)負(fù)載均衡邏輯中的1、2、4這三個(gè)點(diǎn)相關(guān)知識為其核心過程,各個(gè)點(diǎn)相關(guān)知識如下:
第1點(diǎn):從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列

第2點(diǎn): 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶端clientId
首先,消費(fèi)者對象不斷地向所有broker發(fā)送心跳包,上報(bào)自己,注冊并更新訂閱關(guān)系以及客戶端ChannelInfoTable;之后,客戶端在做消費(fèi)負(fù)載均衡時(shí)獲取那些消費(fèi)客戶端,對這些客戶端進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)的隊(duì)列。具體過程如下圖所示:

第4點(diǎn):調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列

注:上圖中cId1、cId2、...、cIdN通過 getConsumerIdListByGroup 獲取,它們在這個(gè)ConsumerGroup下所有在線客戶端列表中。
當(dāng)前消費(fèi)對進(jìn)行負(fù)載均衡策略后獲取對應(yīng)的消息消費(fèi)隊(duì)列。具體的算法很簡單,可以看源碼。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java實(shí)現(xiàn)全局監(jiān)聽鍵盤詳解
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)全局監(jiān)聽鍵盤的相關(guān)知識,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解下2024-01-01
SpringBoot集成Shiro進(jìn)行權(quán)限控制和管理的示例
這篇文章主要介紹了SpringBoot集成Shiro進(jìn)行權(quán)限控制和管理的示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03
Java開發(fā)環(huán)境配置及Vscode搭建過程
今天通過圖文并茂的形式給大家介紹Java開發(fā)環(huán)境配置及Vscode搭建過程,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-07-07
使用springboot對外部靜態(tài)資源文件的處理操作
這篇文章主要介紹了使用springboot對外部靜態(tài)資源文件的處理操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
Java?easyExcel的復(fù)雜表頭多級表頭導(dǎo)入
最近在項(xiàng)目開發(fā)中遇到的一個(gè)excel復(fù)雜表頭的導(dǎo)入數(shù)據(jù)庫操作,下面這篇文章主要給大家介紹了關(guān)于Java?easyExcel的復(fù)雜表頭多級表頭導(dǎo)入的相關(guān)資料,需要的朋友可以參考下2022-06-06
Java實(shí)現(xiàn)的微信公眾號獲取微信用戶信息示例
這篇文章主要介紹了Java實(shí)現(xiàn)的微信公眾號獲取微信用戶信息,結(jié)合實(shí)例形式分析了Java微信公眾號獲取微信用戶信息相關(guān)原理、步驟與操作注意事項(xiàng),需要的朋友可以參考下2019-10-10

