RocketMQ特性Broker存儲事務(wù)消息實現(xiàn)
引言
在Broker中,事務(wù)消息的初始化是通過BrokerController.initialTransaction()方法執(zhí)行的。
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
這里有三個核心的初始化變量
TransactionalMessageService
事務(wù)消息主要處理服務(wù)。默認實現(xiàn)類是TransactionalMessageServiceImpl也可以自己定義事務(wù)消息處理實現(xiàn)類,通過ServiceProvider.loadClass()方法進行加載。
TransactionalMessageService類定義如下。內(nèi)部屬性已加注釋標(biāo)明。
public interface TransactionalMessageService {
//用于保存Half事務(wù)消息
PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
//刪除事務(wù)消息
boolean deletePrepareMessage(MessageExt messageExt);
//提交事務(wù)消息
OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
//回滾事務(wù)消息
OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
//打開事務(wù)消息
boolean open();
//關(guān)閉事務(wù)消息
void close();
}
transactionalMessageCheckListener
事務(wù)消息回查監(jiān)聽器
transactionalMessageCheckService
事務(wù)消息回查服務(wù),啟動一個線程定時檢查超時的Half消息是否需要回查。
處理事務(wù)消息
當(dāng)初始化完成之后,Broker就可以處理事務(wù)消息了。
Broker存儲事務(wù)消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,這和普通消息其實是一樣的。
但是有兩點針對事務(wù)消息的特殊處理:
第一處:
在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:
//獲取擴展字段的值,若是該值為true則為事務(wù)消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) {
//判斷當(dāng)前Broker配置是否支持事務(wù)消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
//保存Half信息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
第二處:
存儲事務(wù)消息前的預(yù)處理,對應(yīng)方法是
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//將原消息的topic保存在擴展字段中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
//將原消息的QueueId保存在擴展字段中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
//將原消息的SysFlag保存在擴展字段中
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//修改topic的值為RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//修改Queueid為0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
完成上述步驟之后,調(diào)用DefaultMessageStole.putMessage()方法將其保存到CommitLog中。
CommitLog存儲成功之后,通過org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法對其進行處理。
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the consume queue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
這里的邏輯是這樣的,當(dāng)讀到的消息類型為事務(wù)消息時,設(shè)置當(dāng)前消息的位點值為0,而不是設(shè)置真實的位點。這樣該位點就不會建立ConsumeQueue索引,也不會被消費。
以上就是RocketMQ特性Broker存儲事務(wù)消息實現(xiàn)的詳細內(nèi)容,更多關(guān)于RocketMQ Broker存儲事務(wù)消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解Spring Boot 部署jar和war的區(qū)別
本篇文章主要介紹了詳解Spring Boot 部署jar和war的區(qū)別,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-09-09
IDEA中實體類(POJO)與JSON快速互轉(zhuǎn)問題
這篇文章主要介紹了IDEA中實體類(POJO)與JSON快速互轉(zhuǎn),本文通過圖文實例代碼相結(jié)合給大家介紹的非常詳細,需要的朋友可以參考下2022-08-08
關(guān)于mybatis一對一查詢一對多查詢遇到的問題
這篇文章主要介紹了關(guān)于mybatis一對一查詢,一對多查詢遇到的錯誤,接下來是對文章進行操作,要求查詢?nèi)课恼?,并關(guān)聯(lián)查詢作者,文章標(biāo)簽,本文給大家介紹的非常詳細,需要的朋友可以參考下2022-05-05
解決后端傳long類型數(shù)據(jù)到前端精度丟失問題
這篇文章主要介紹了解決后端傳long類型數(shù)據(jù)到前端精度丟失問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
SpringBoot日志進階實戰(zhàn)之Logback配置經(jīng)驗和方法
本文給大家介紹在SpringBoot中使用Logback配置日志的經(jīng)驗和方法,并提供了詳細的代碼示例和解釋,包括:滾動文件、異步日志記錄、動態(tài)指定屬性、日志級別、配置文件等常用功能,覆蓋日常Logback配置開發(fā)90%的知識點,感興趣的朋友跟隨小編一起看看吧2023-06-06

