RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例
RocketMQ發(fā)送消息
我們在使用RocketMQ發(fā)送消息時,一般都會使用DefaultMQProducer,類型的代碼如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("42.192.50.8:9876");
try {
producer.start();
producer.send(new Message("topic", "ping".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
上述代碼中,在消息發(fā)送之前調(diào)用了start()方法,如果不調(diào)用start()方法,直接發(fā)送消息,那么會出現(xiàn)以下報錯:

報錯消息里面很明顯地告知我們,目前這個DefaultMQProducer狀態(tài)沒有準備好,還不能發(fā)送消息。為了一探究竟,我們得去看看start()里面究竟做了什么操作呢?
start()里面究竟做了什么操作
我們根據(jù)源碼一路走下來,可以追蹤到DefaultMQProducerImpl.start(final boolean startFactory)這個方法:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 創(chuàng)建MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注冊Producer到MQClientInstance中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 啟動MQClientInstance實例
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
上述代碼主要做了以下幾點:
1.創(chuàng)建MQClientInstance實例;
2.注冊Producer到MQClientInstance實例中;
3.啟動MQClientInstance實例;
MQClientInstance實例并不是每次都會創(chuàng)建的,它創(chuàng)建出來也會緩存的MQClientManager中,不過根據(jù)源碼來看的話,每次創(chuàng)建Producer都會對應創(chuàng)建一個新的MQClientInstance實例,所以一般情況下不建議一個應用服務中重復創(chuàng)建Producer;
最終start()方法的關鍵實現(xiàn)邏輯還是需要進入MQClientInstance.start()中:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 如果namesrv地址為null,那么就需要自己找namesrv地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 開啟一個請求響應渠道,沒猜錯的話,應該是netty實現(xiàn)的
this.mQClientAPIImpl.start();
// 開啟定時任務
this.startScheduledTask();
// 開啟拉消息服務
this.pullMessageService.start();
// 開啟負載均衡服務
this.rebalanceService.start();
// 再開啟一個默認生產(chǎn)者,這個生產(chǎn)者不需要啟動MQClientInstance實例
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
看樣子,這才是start()方法真正要做的事情:
1.找namesrv地址,應該是后面需要使用namesrv地址查詢對應的broker;
2.開啟Netty客戶端的初始化,包括與namesrv建立信道;另外開啟兩個定時任務,一個清除列表中過期的請求,第二個就是篩選可用的namesrv服務;
3.開啟一些定時任務;包括如果沒有設置namesrv地址的話,會從指定站點拉namesrv地址;清除下線broker并發(fā)送心跳給所有的broker等工作;
4.因為當前是生產(chǎn)者,所以pullMessageService很快就結束;
5.生產(chǎn)者不需要做負載均衡,所以rebalanceService很快也結束;
6.給默認創(chuàng)建的生產(chǎn)者執(zhí)行一下start()方法,其實啥也沒做;
上述大多數(shù)任務都是給消費者使用的,作為生產(chǎn)者,唯一起作用的就是前三步,查找namesrv地址、第二步與namesrv建立通信以及第三步對broker的一些定時清理工作;不過沒有發(fā)生消息之前,是不會從遠程獲取任何數(shù)據(jù)的。所以綜上所述,start()方法里面只做了以下兩件事情:
1.與namesrv建立通信渠道,它甚至都沒有從namesrv獲取任何數(shù)據(jù);
2.啟動一些定時任務,包括清理下線的broker;
小結
雖然在生產(chǎn)者中,start()方法里面真正做的事情比較少,但是卻是非常有必要的。發(fā)送消息之前,我們沒有使用start()方法,導致消息發(fā)送失敗,是因為生產(chǎn)者與namesrv之間的通信渠道沒有建立。
以上就是RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例的詳細內(nèi)容,更多關于RocketMQ調(diào)用start發(fā)送消息的資料請關注腳本之家其它相關文章!
相關文章
Intellj?idea新建的java源文件夾不是藍色的圖文解決辦法
idea打開java項目后新建的模塊中,java文件夾需要變成藍色,這篇文章主要給大家介紹了關于Intellj?idea新建的java源文件夾不是藍色的相關資料,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2024-02-02
深入探究 spring-boot-starter-parent的作用
這篇文章主要介紹了spring-boot-starter-parent的作用詳解,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,感興趣的小伙伴可以跟著小編一起來學習一下2023-05-05

