java rocketmq--消息的產(chǎn)生(普通消息)
前言
與消息發(fā)送緊密相關(guān)的幾行代碼:
1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
2. producer.start();
3. Message msg = new Message(...)
4. SendResult sendResult = producer.send(msg);
5. producer.shutdown();
那這幾行代碼執(zhí)行時,背后都做了什么?
一. 首先是DefaultMQProducer.start
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
調(diào)用了默認(rèn)生成消息的實(shí)現(xiàn)類 -- DefaultMQProducerImpl
調(diào)用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()會初始化得到MQClientInstance實(shí)例對象,MQClientInstance實(shí)例對象調(diào)用它自己的start方法會 ,啟動一些服務(wù),如拉去消息服務(wù)PullMessageService.Start()、啟動負(fù)載平衡服務(wù)RebalanceService.Start(),比如網(wǎng)絡(luò)通信服務(wù)MQClientAPIImpl.Start()
另外,還會執(zhí)行與生產(chǎn)消息相關(guān)的信息,如注冊produceGroup、new一個TopicPublishInfo對象并以默認(rèn)TopicKey為鍵值,構(gòu)成鍵值對存入DefaultMQProducerImpl的topicPublishInfoTable中。
efaultMQProducerImpl.start()后,獲取的MQClientInstance實(shí)例對象會調(diào)用sendHeartbeatToAllBroker()方法,不斷向broker發(fā)送心跳包,yin'b可以使用下面一幅圖大致描述DefaultMQProducerImpl.start()過程:

上圖中的三個部分中涉及的內(nèi)容:
1.1 初始化MQClientInstance
一個客戶端只能產(chǎn)生一個MQClientInstance實(shí)例對象,產(chǎn)生方式使用了工廠模式與單例模式。MQClientInstance.start()方法啟動一些服務(wù),源碼如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
1.2 注冊producer
該過程會將這個當(dāng)前producer對象注冊到MQClientInstance實(shí)例對象的的producerTable中。一個jvm(一個客戶端)中一個producerGroup只能有一個實(shí)例,MQClientInstance操作producerTable大概有如下幾個方法:
- -- selectProducer
- -- updateTopicRouteInfoFromNameServer
- -- prepareHeartbeatData
- -- isNeedUpdateTopicRouteInfo
- -- shutdown
注:
根據(jù)不同的clientId,MQClientManager將給出不同的MQClientInstance;
根據(jù)不同的group,MQClientInstance將給出不同的MQProducer和MQConsumer
1.3 向路由信息表中添加路由
topicPublishInfoTable定義:
public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
它是一個以topic為key的Map型數(shù)據(jù)結(jié)構(gòu),DefaultMQProducerImpl.start()時會默認(rèn)創(chuàng)建一個key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。
1.4 發(fā)送心跳包
MQClientInstance向broker發(fā)送心跳包時,調(diào)用sendHeartbeatToAllBroker( ),以及從MQClientInstance實(shí)例對象的brokerAddrTable中拿到所有broker地址,向這些broker發(fā)送心跳包。
sendHeartbeatToAllBroker會涉及到prepareHeartbeatData()方法,該方法會生成heartbeatData數(shù)據(jù),發(fā)送心跳包時,heartbeatData作為心跳包的body。與producer相關(guān)的部分代碼如下:
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
二、. SendResult sendResult = producer.send(msg)
首先會調(diào)用DefaultMQProducer.send(msg) ,繼而調(diào)用sendDefaultImpl:
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
sendDefaultImpl做了啥?
2.1. 獲取topicPublishInfo
根據(jù)msg的topic從topicPublishInfoTable獲取對應(yīng)的topicPublishInfo,如果沒有則更新路由信息,從nameserver端拉取最新路由信息。從nameserver端拉取最新路由信息大致為:
首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 選擇消息發(fā)送的隊(duì)列
普通消息:默認(rèn)方式下,selectOneMessageQueue從topicPublishInfo中的messageQueueList中選擇一個隊(duì)列(MessageQueue)進(jìn)行發(fā)送消息,默認(rèn)采用長輪詢的方式選擇隊(duì)列 。
它的機(jī)制如下:正常情況下,順序選擇queue進(jìn)行發(fā)送;如果某一個節(jié)點(diǎn)發(fā)生了超時,則下次選擇queue時,跳過相同的broker。不同的隊(duì)列選擇策略形成了生產(chǎn)消息的幾種模式,如順序消息,事務(wù)消息。
順序消息:將一組需要有序消費(fèi)的消息發(fā)往同一個broker的同一個隊(duì)列上即可實(shí)現(xiàn)順序消息,假設(shè)相同訂單號的支付,退款需要放到同一個隊(duì)列,那么就可以在send的時候,自己實(shí)現(xiàn)MessageQueueSelector,根據(jù)參數(shù)arg字段來選擇queue。
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}
事務(wù)消息:只有在消息發(fā)送成功,并且本地操作執(zhí)行成功時,才發(fā)送提交事務(wù)消息,做事務(wù)提交,消息發(fā)送失敗,直接發(fā)送回滾消息,進(jìn)行回滾,具體如何實(shí)現(xiàn)后面會單獨(dú)成文分析。
2.3 封裝消息體通信包,發(fā)送數(shù)據(jù)包
首先,根據(jù)獲取的MessageQueue中的getBrokerName,調(diào)用findBrokerAddressInPublish得到該消息存放對應(yīng)的broker地址,如果沒有找到則跟新路由信息,重新獲取地址 :
brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)
可知獲取的broker均為master(id=0)
然后, 將與該消息相關(guān)信息打包成RemotingCommand數(shù)據(jù)包,其RequestCode.SEND_MESSAGE
根據(jù)獲取的broke地址,將數(shù)據(jù)包到對應(yīng)的broker,默認(rèn)是發(fā)送超時時間為3s。
封裝消息請求包的包頭:
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch);
發(fā)送消息包(普通消息默認(rèn)為同步方式):
SendResult sendResult = null;
switch (communicationMode) {
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
處理來自broker端的響應(yīng)數(shù)據(jù)包:
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
broker端處理request數(shù)據(jù)包后會將消息存儲到commitLog,具體過程后續(xù)分析。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
后端如何接收格式為x-www-form-urlencoded的數(shù)據(jù)
x-www-form-urlencoded格式是一種常見的HTTP請求數(shù)據(jù)格式,它將請求參數(shù)編碼為鍵值對的形式,以便于傳輸和解析,下面這篇文章主要給大家介紹了關(guān)于后端如何接收格式為x-www-form-urlencoded的數(shù)據(jù),需要的朋友可以參考下2023-05-05
Eclipse引用XSD實(shí)現(xiàn)XML配置文件提示標(biāo)簽的方法
今天小編就為大家分享一篇關(guān)于Eclipse引用XSD實(shí)現(xiàn)XML配置文件提示標(biāo)簽的方法,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03
EventBus與Spring Event區(qū)別詳解(EventBus 事件機(jī)制,Spring Event事件機(jī)制)
這篇文章主要介紹了EventBus與Spring Event區(qū)別,需要的朋友可以參考下2020-02-02
java反射機(jī)制及beanUtils的實(shí)現(xiàn)原理分析
本文介紹了Java的反射機(jī)制、VO、DTO、PO的概念以及BeanUtils的實(shí)現(xiàn)原理和簡單示例,通過反射可以在運(yùn)行時動態(tài)操作類、方法和字段,BeanUtils用于在不同bean之間進(jìn)行屬性復(fù)制2024-12-12
Spring?Boot實(shí)現(xiàn)分布式任務(wù)調(diào)度的步驟
Spring?Boot提供了一些工具和框架,可以幫助我們輕松地實(shí)現(xiàn)分布式任務(wù)調(diào)度,在本文中我們將介紹如何使用Spring?Boot、Spring?Cloud、Quartz和Redis來實(shí)現(xiàn)分布式任務(wù)調(diào)度,感興趣的朋友跟隨小編一起看看吧2023-06-06

