RocketMQ源碼解析topic創(chuàng)建機制詳解
1. RocketMQ Topic創(chuàng)建機制
以下源碼基于Rocket MQ 4.7.0
RocketMQ Topic創(chuàng)建機制分為兩種:一種自動創(chuàng)建,一種手動創(chuàng)建??梢酝ㄟ^設置broker的配置文件來禁用或者允許自動創(chuàng)建。默認是開啟的允許自動創(chuàng)建
autoCreateTopicEnable=true/false
下面會結合源碼來深度分析一下自動創(chuàng)建和手動創(chuàng)建的過程。
2. 自動Topic
默認情況下,topic不用手動創(chuàng)建,當producer進行消息發(fā)送時,會從nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么會默認拉取broker啟動時默認創(chuàng)建好名為“TBW102”的Topic,這定義在org.apache.rocketmq.common.MixAll類中
// Will be created at broker when isAutoCreateTopicEnable public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
自動創(chuàng)建開關是下BrokerConfig類中有一個私有變量:
@ImportantField private boolean autoCreateTopicEnable = true;
這變量可以通過配置文件配置來進行修改,代碼中的默認值為true,所以在默認的情況下Rocket MQ是會自動創(chuàng)建Topic的。
在Broker啟動,會調用TopicConfigManager的構造方法,在構造方法中定義了一系列RocketMQ系統(tǒng)內置的一些系統(tǒng)Topic(這里只關注一下TBW102):
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums()); //8
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums()); //8
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
這里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 這樣一段代碼,在開啟允許自動創(chuàng)建的時候,會把當前Topic的信息存入topicConfigTable變量中。
然后通過發(fā)送定期發(fā)送心跳包把Topic和Broker的信息發(fā)送到NameServer的RouteInfoManager中進行保存。在BrokerController中定義了這樣的一個定時任務來執(zhí)行這個心跳包的發(fā)送:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
這里就說明了如何把每個Broker的系統(tǒng)自定義的Topic注冊到NameServer。
接下來看在發(fā)送過程中如何從NameServer獲取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略代碼
//獲取路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
}
通過DefaultMQProducerImpl.tryToFindTopicPublishInfo方法獲取Topic的路由信息。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//第一次從緩存中獲取--肯定沒有因為還沒創(chuàng)建
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//從NameServer獲取--也是沒有,因為沒有創(chuàng)建
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//第二次從這里獲取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
下面來看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
//省略代碼
if (isDefault && defaultMQProducer != null) {
//使用默認的TBW102 Topic獲取數據
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//這是正常的
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
//省略代碼
}
如果isDefault=true并且defaultMQProducer不為空,從nameserver中獲取默認路由信息,此時會獲取所有已開啟自動創(chuàng)建開關的broker的默認“TBW102”topic路由信息,并保存默認的topic消息隊列數量。
這里會比較一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默認值和TBW102中的值哪個更小。
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
}
判斷獲取默認的是否存在,如果存在把當前的Topic的信息更新。
也就是把TBW102 Topic的數據更新為自動創(chuàng)建的數據。
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
更新本地的緩存。這樣TBW102 Topic的負載和一些默認的路由信息就會被自己創(chuàng)建的Topic使用。這里就是整個自動創(chuàng)建的過程.
總結一下就是:通過使用系統(tǒng)內部的一個TBW102的Topic的配置來自動創(chuàng)建當前用戶的要創(chuàng)建的自定義Topic。
3. 手動創(chuàng)建--預先創(chuàng)建
手動創(chuàng)建也叫預先創(chuàng)建,就是在使用Topic之前就創(chuàng)建,可以通過命令行或者通過RocketMQ的管理界面創(chuàng)建Topic。
通過界面控制臺創(chuàng)建
項目地址: github.com/apache/rock…
TopicController主要負責Topic的管理
@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
"clusterName or brokerName can not be all blank");
logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
topicService.createOrUpdate(topicCreateOrUpdateRequest);
return true;
}
然后通過MQAdminExtImpl.createAndUpdateTopicConfig方法來創(chuàng)建:
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
}
通過調用DefaultMQAdminExtImpl.createAndUpdateTopicConfig創(chuàng)建Topic
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
最后通過MQClientAPIImpl.createTopic創(chuàng)建Topic
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}以上就是RocketMQ源碼解析topic創(chuàng)建機制詳解的詳細內容,更多關于RocketMQ topic創(chuàng)建的資料請關注腳本之家其它相關文章!
相關文章
關于HttpServletRequest獲取POST請求Body參數的3種方式
這篇文章主要介紹了關于HttpServletRequest獲取POST請求Body參數的3種方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11
SpringCloud Finchley Gateway 緩存請求Body和Form表單的實現
在接入Spring-Cloud-Gateway時,可能有需求進行緩存Json-Body數據或者Form-Urlencoded數據的情況。這篇文章主要介紹了SpringCloud Finchley Gateway 緩存請求Body和Form表單的實現,感興趣的小伙伴們可以參考一下2019-01-01

