RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決
前言
目前有兩套RocketMQ集群,集群A包含topic名稱為cluster_A_topic,集群B包含topic名稱為cluster_B_topic,在應(yīng)用服務(wù)OrderApp上通過RocketMQ Client創(chuàng)建兩個DefaultMQProducer實例發(fā)送消息給集群A和集群B
架構(gòu)圖如下:

根據(jù)上述架構(gòu)圖,我們給出的示例代碼如下:
// 創(chuàng)建第一個DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
// 設(shè)置nameServer地址
producer1.setNamesrvAddr("192.168.2.230:9876");
try {
producer1.start();
// 發(fā)送消息
SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result1.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_A_topic 發(fā)送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_A_topic 持久化失敗!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_A_topic 同步slave失敗!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_A_topic 副本不可用!");
}
} catch (Exception e) {
e.printStackTrace();
}
// 創(chuàng)建第二個DefaultMQProducer
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
// 設(shè)置nameServer地址
producer2.setNamesrvAddr("192.168.2.231:9876");
try {
producer2.start();
// 發(fā)送消息
SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result2.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_B_topic 發(fā)送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_B_topic 持久化失??!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_B_topic 同步slave失??!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_B_topic 副本不可用!");
}
return "ok";
} catch (Exception e) {
e.printStackTrace();
} finally {
producer1.shutdown();
producer2.shutdown();
}
結(jié)果竟然報錯了,報錯內(nèi)容時cluster_B_topic不存在:

經(jīng)過不斷的測試,發(fā)現(xiàn)只有放在最前面啟動的DefaultMQProducer會生效,后面啟動的DefaultMQProducer發(fā)送消息就報錯說對應(yīng)的topic不存在,而且報錯的broker竟然是前面啟動的DefaultMQProducer對應(yīng)的broker。這就不科學了,難道RocketMQ不允許在一個應(yīng)用上創(chuàng)建多個生產(chǎn)者?
問題定位
首先說明一下,當前使用的RocketMQ Client版本是4.8.0。為了確定是哪兒出了問題,不得不對源碼來一波探索[哭泣臉??]。
我們都知道生產(chǎn)者是發(fā)送消息給Broker的,獲取Broker信息是通過連接NameServer獲取的。既然報錯的Broker和目標Broker竟然不對應(yīng),肯定是后面啟動的生產(chǎn)者獲取的Broker不對。有了最基本的判斷,我們先從DefaultMQProducer#start()入手,最終我們定位到這樣一段代碼DefaultMQProducerImpl#start(final boolean startFactory):
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
// 如果生產(chǎn)者group名稱不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 創(chuàng)建MQClientInstance實例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注冊生產(chǎn)者實例到MQClientInstance中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 添加TBW102對應(yīng)的topic信息,broker設(shè)置autoCreateTopicEnable = true才起作用
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 啟動剛剛創(chuàng)建的MQClientInstance實例
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 修改服務(wù)狀態(tài)為RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
上面的代碼主要是創(chuàng)建了MQClientInstance實例,并且通過start()方法啟動。
通過針對這兩段代碼的debug,我們發(fā)現(xiàn)創(chuàng)建的兩個DefaultMQProducer對象是共用了一個MQClientInstance實例,并且所有針對NameServer和Broker的遠程操作全部是通過MQClientInstance實例來做的。比如發(fā)送消息的時候需要找到對應(yīng)的Broker下的消息隊列:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 從NameServer更新topic路由
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
最終我們發(fā)現(xiàn)兩個DefaultMQProducer對象都是去同一個NameServer下獲取對應(yīng)的topic信息,這下問題就定位到了:因為使用了同一個MQClientInstance實例導(dǎo)致不同的DefaultMQProducer去訪問了同一個NameServer,同一個集群需要同時接收兩個topic的消息,也就出現(xiàn)了前面的報錯說topic不存在的情況。
如何解決
我們來看看MQClientInstance實例是如何保證唯一性的:
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 生成clientID
String clientId = clientConfig.buildMQClientId();
// 從緩存中獲取MQClientInstance
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 沒有緩存的話就創(chuàng)建一個MQClientInstance
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
// 新創(chuàng)建出來的再放進緩存
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
// 返回MQClientInstance實例
return instance;
}
我們之所以拿到的MQClientInstance實例是同一個,是因為在同一個服務(wù)下創(chuàng)建的clientId相同:
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
兩個clientId都是192.168.18.173@14933,為了防止clientId相同,我們可以在創(chuàng)建DefaultMQProducer實例是加上unitName值,保證兩個unitName值不同來避免共享同一個MQClientInstance。
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();
通過上述代碼修改后,兩個消息都發(fā)送成功了。
另一個辦法就是升級RocketMQ Client到4.9.0,我們來看一下RocketMQ Client 4.9.0是怎么解決這個問題的:
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
RocketMQ Client 4.9.0在后面補充了一個納秒值,之前的代碼是這樣的:
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
也就是說,在新的版本中,一個應(yīng)用服務(wù)內(nèi)創(chuàng)建多個DefaultMQProducer就會有多個MQClientInstance實例對應(yīng),不會再出現(xiàn)我們前面的報錯。
以上就是RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決的詳細內(nèi)容,更多關(guān)于RocketMQ發(fā)送NameServer的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java Red5服務(wù)器實現(xiàn)流媒體視頻播放
這篇文章主要介紹了Java Red5服務(wù)器實現(xiàn)流媒體視頻播放,對視頻播放感興趣的同學,可以參考下2021-04-04
Java中的CompletionService批量異步執(zhí)行詳解
這篇文章主要介紹了Java中的CompletionService批量異步執(zhí)行詳解,我們知道線程池可以執(zhí)行異步任務(wù),同時可以通過返回值Future獲取返回值,所以異步任務(wù)大多數(shù)采用ThreadPoolExecutor+Future,需要的朋友可以參考下2023-12-12

