深入淺出RocketMQ的事務(wù)消息
事務(wù)消息發(fā)送流程

半消息實(shí)現(xiàn)了分布式環(huán)境下的數(shù)據(jù)一致性的處理,生產(chǎn)者發(fā)送事務(wù)消息的流程如上圖所示,通過(guò)對(duì)源碼的學(xué)習(xí),我們可以弄清楚下面幾點(diǎn),也是半消息機(jī)制的核心:
1.為什么prepare消息不會(huì)被Consumer消費(fèi)?
2.事務(wù)消息是如何提交和回滾的?
3.定時(shí)回查本地事務(wù)狀態(tài)的實(shí)現(xiàn)細(xì)節(jié)。
發(fā)送事務(wù)消息源碼分析
發(fā)送事務(wù)消息方法TransactionMQProducer.sendMessageInTransaction:
- msg:消息
- tranExecuter:本地事務(wù)執(zhí)行器
- arg:本地事務(wù)執(zhí)行器參數(shù)
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 忽視消息延遲的屬性
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
// 發(fā)送半消息
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
// 處理發(fā)送半消息的結(jié)果
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
// 發(fā)送半消息成功,執(zhí)行本地事務(wù)邏輯
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 執(zhí)行本地事務(wù)邏輯
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
// 發(fā)送半消息失敗,標(biāo)記本地事務(wù)狀態(tài)為回滾
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// 結(jié)束事務(wù),設(shè)置消息 COMMIT / ROLLBACK
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// 返回事務(wù)發(fā)送結(jié)果
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
// 提取Prepared消息的uniqID
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}該方法的入?yún)幸粋€(gè)需要用戶實(shí)現(xiàn)本地事務(wù)的LocalTransactionExecuter executer,executer中會(huì)進(jìn)行事務(wù)操作以保證本地事務(wù)和消息發(fā)送這兩個(gè)操作的原子性。
由上面的源碼可知:
Producer會(huì)首先發(fā)送一個(gè)半消息到Broker中:
- 半消息發(fā)送成功,執(zhí)行事務(wù)
- 半消息發(fā)送失敗,不執(zhí)行事務(wù)
半消息發(fā)送到Broker后不會(huì)被Consumer消費(fèi)掉的原因有以下兩點(diǎn):
- Broker在將消息寫(xiě)入CommitLog時(shí)會(huì)判斷消息類型,如果是prepare或者rollback消息,ConsumeQueue的offset不變
- Broker在構(gòu)造ConsumeQueue時(shí)會(huì)判斷是否是處于prepare或者rollback狀態(tài)的消息,如果是則不會(huì)將該消息放入ConsumeQueue里,Consumer在拉取消息時(shí)也就不會(huì)拉取到這條消息
Producer會(huì)根據(jù)半消息的發(fā)送結(jié)果和本地任務(wù)執(zhí)行結(jié)果來(lái)決定如何處理事務(wù)(commit或rollback),方法最后調(diào)用了endTransaction來(lái)處理事務(wù)的執(zhí)行結(jié)果,源碼如下:
- sendResult:發(fā)送半消息的結(jié)果
- localTransactionState:本地事務(wù)狀態(tài)
- localException:執(zhí)行本地事務(wù)邏輯產(chǎn)生的異常
- RemotingException:遠(yuǎn)程調(diào)用異常
- MQBrokerException:Broker異常
- InterruptedException:當(dāng)線程中斷異常
- UnknownHostException:未知host異常
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
// 解碼消息id
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
// 創(chuàng)建請(qǐng)求
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 提交 commit / rollback 消息
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}該方法是將事務(wù)執(zhí)行的結(jié)果發(fā)送給Broker,再由Broker決定是否進(jìn)行消息投遞,執(zhí)行步驟如下:
1.收到消息后先檢查是否是事務(wù)消息,如果不是事務(wù)消息則直接返回
2.根據(jù)請(qǐng)求頭里的offset查詢半消息,如果查詢結(jié)果為空則直接返回
3.根據(jù)半消息構(gòu)造新消息,新構(gòu)造的消息會(huì)被重新寫(xiě)入到CommitLog里,rollback消息的消息體為空
4.如果是rollback消息,則該消息不會(huì)被投遞
具體原因上文中已經(jīng)分析過(guò):只有commit消息才會(huì)被Broker投遞給consumer
RocketMQ會(huì)將commit消息和rollback消息都寫(xiě)入到commitLog里,但rollback消息的消息體為空且不會(huì)被投遞,CommitLog在刪除過(guò)期消息時(shí)才會(huì)將其刪除。當(dāng)事務(wù)commit成功之后,RocketMQ會(huì)重新封裝半消息并將其投遞給Consumer端消費(fèi)。
事務(wù)消息回查
Broker發(fā)起
相較于普通消息,事務(wù)消息主要依賴下面三個(gè)類:
1.TransactionStateService:事務(wù)狀態(tài)服務(wù),負(fù)責(zé)對(duì)事務(wù)消息進(jìn)行管理,包括存儲(chǔ)和更新事務(wù)消息狀態(tài)、回查狀態(tài)等
2.TranStateTable:事務(wù)消息狀態(tài)存儲(chǔ)表,基于MappedFileQueue實(shí)現(xiàn)
3.TranRedoLog:TranStateTable的日志,每次寫(xiě)入操作都會(huì)記錄日志,當(dāng)Broker宕機(jī)時(shí),可以利用這個(gè)文件做數(shù)據(jù)恢復(fù)
存儲(chǔ)半消息到CommitLog時(shí),使用offset索引到對(duì)應(yīng)的TranStateTable的位置
到此這篇關(guān)于深入淺出RocketMQ的事務(wù)消息的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
maven工程打包引入本地jar包的實(shí)現(xiàn)
我們需要將jar包發(fā)布到一些指定的第三方Maven倉(cāng)庫(kù),本文主要介紹了maven工程打包引入本地jar包的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02
java 字符串的拼接的實(shí)現(xiàn)實(shí)例
這篇文章主要介紹了java 字符串的拼接的實(shí)現(xiàn)實(shí)例的相關(guān)資料,希望通過(guò)本文大家能掌握字符拼接的實(shí)現(xiàn),需要的朋友可以參考下2017-09-09
SpringBoot定時(shí)任務(wù)詳解與案例代碼
SpringBoot是一個(gè)流行的Java開(kāi)發(fā)框架,它提供了許多便捷的特性來(lái)簡(jiǎn)化開(kāi)發(fā)過(guò)程,其中之一就是定時(shí)任務(wù)的支持,讓開(kāi)發(fā)人員可以輕松地在應(yīng)用程序中執(zhí)行定時(shí)任務(wù),本文將詳細(xì)介紹如何在Spring?Boot中使用定時(shí)任務(wù),并提供相關(guān)的代碼示例2023-06-06
Java編程實(shí)現(xiàn)統(tǒng)計(jì)數(shù)組中各元素出現(xiàn)次數(shù)的方法
這篇文章主要介紹了Java編程實(shí)現(xiàn)統(tǒng)計(jì)數(shù)組中各元素出現(xiàn)次數(shù)的方法,涉及java針對(duì)數(shù)組的遍歷、比較、運(yùn)算等相關(guān)操作技巧,需要的朋友可以參考下2017-07-07
Java序列化問(wèn)題:“Serialized class has not impl
在Java開(kāi)發(fā)中,序列化(Serialization)是一個(gè)常見(jiàn)的操作,尤其是在分布式系統(tǒng)、網(wǎng)絡(luò)通信或數(shù)據(jù)持久化場(chǎng)景中,然而,序列化過(guò)程中可能會(huì)遇到各種問(wèn)題,其中最常見(jiàn)的一個(gè)錯(cuò)誤是Serialized class has not implement Serializable interface,本文給大家介紹了相關(guān)的解決方法2025-02-02
SpringBoot創(chuàng)建并簡(jiǎn)單使用的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot創(chuàng)建并簡(jiǎn)單使用的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10

