SpringBoot集成RocketMQ發(fā)送事務(wù)消息的原理解析
簡介
RocketMQ 事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時成功,要么同時失敗。RocketMQ 的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
原理
RocketMQ事務(wù)消息通過異步確保方式,保證事務(wù)的最終一致性。設(shè)計的思想可以借鑒兩個階段提交事務(wù)。其執(zhí)行流程圖如下:
- 發(fā)送方向MQ服務(wù)端發(fā)送消息。
- MQ Server將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半消息。
- 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
- 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。
- 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時間后 MQ Server 將對該消息發(fā)起消息回查。
- 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對半消息進(jìn)行操作。
具體實(shí)現(xiàn)
消費(fèi)者
@Component
public class TransactionProduce
{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTransactionMessage(String msg)
{
logger.info("start sendTransMessage hashKey:{}",msg);
Message message =new Message();
message.setBody("this is tx message".getBytes());
TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq",
MessageBuilder.withPayload(message).build(), msg);
//發(fā)送狀態(tài)
String sendStatus = result.getSendStatus().name();
// 本地事務(wù)執(zhí)行狀態(tài)
String localTxState = result.getLocalTransactionState().name();
logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
}
}說明:發(fā)送事務(wù)消息采用的是sendMessageInTransaction方法,返回結(jié)果為TransactionSendResult對象,該對象中包含了事務(wù)發(fā)送的狀態(tài)、本地事務(wù)執(zhí)行的狀態(tài)等。
消費(fèi)者
@Component
@RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING)
public class TransactionConsumer implements RocketMQListener<String>
{
private Logger logger =LoggerFactory.getLogger(getClass());
@Override
public void onMessage(String message)
{
logger.info("send transaction mssage parma is:{}", message);
}
}說明:發(fā)送事務(wù)消息的消費(fèi)者與普通的消費(fèi)者一樣沒有太大的區(qū)別。
生產(chǎn)者消息監(jiān)聽器
發(fā)送事務(wù)消息除了生產(chǎn)者和消費(fèi)者以外,我們還需要創(chuàng)建生產(chǎn)者的消息監(jiān)聽器,來監(jiān)聽本地事務(wù)執(zhí)行的狀態(tài)和檢查本地事務(wù)狀態(tài)。
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener
{
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 執(zhí)行本地事務(wù)
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object obj)
{
logger.info("start invoke local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
//處理業(yè)務(wù)
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("invoke msg content:{}",jsonStr);
}
catch (Exception e)
{
logger.error("invoke local mq trans error",e);
resultState = RocketMQLocalTransactionState.UNKNOWN;
}
return resultState;
}
/**
* 檢查本地事務(wù)的狀態(tài)
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
{
logger.info("start check Local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("check trans msg content:{}",jsonStr);
}
catch (Exception e)
{
resultState = RocketMQLocalTransactionState.ROLLBACK;
}
return resultState;
}
}
說明:RocketMQ本地事務(wù)狀態(tài)由如下幾種:
- RocketMQLocalTransactionState.COMMIT:提交事務(wù),允許消費(fèi)者消費(fèi)此消息。
- RocketMQLocalTransactionState.ROLLBACK: 回滾事務(wù),消息將被刪除,不允許被消費(fèi)。
- RocketMQLocalTransactionState.UNKNOWN:中間狀態(tài),代表需要進(jìn)行檢查來確定狀態(tài)。
注意:Spring Boot2.0的版本之后,@RocketMQTransactionListener 已經(jīng)沒有了txProducerGroup屬性,且sendMessageInTransaction方法也將其移除。所以在同一項目中只能有一個@RocketMQTransactionListener,不能出現(xiàn)多個,否則會報如下錯誤:
java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener
消息事務(wù)測試
正常測試
c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"}
c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction
c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE
c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
說明:通過日志我們可以看出,執(zhí)行的流程與上述的一致,執(zhí)行成功后,消息執(zhí)行成功返回的結(jié)果為SEND_OK,本地事務(wù)執(zhí)行的狀態(tài)為COMMIT_MESSAGE。
異常測試
如果在執(zhí)行本地消息時出現(xiàn)異常,那么執(zhí)行結(jié)果會是怎樣?修改下本地事務(wù)執(zhí)行的方法,讓其出現(xiàn)異常。
代碼調(diào)整
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object obj)
{
logger.info("start invoke local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try
{
//處理業(yè)務(wù)
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("invoke local transaction msg content:{}",jsonStr);
int c=1/0;
}
catch (Exception e)
{
logger.error("invoke local mq trans error",e);
resultState = RocketMQLocalTransactionState.UNKNOWN;
}
return resultState;
}
執(zhí)行結(jié)果
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW
從執(zhí)行的結(jié)果可以看出,消息執(zhí)行成功返回的結(jié)果為SEND_OK,本地事務(wù)執(zhí)行的狀態(tài)為:UNKNOW.所以消費(fèi)端無法消費(fèi)此消息。
總結(jié)
到此這篇關(guān)于SpringBoot集成RocketMQ發(fā)送事務(wù)消息的文章就介紹到這了,更多相關(guān)SpringBoot集成RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea使用war以及war exploded的區(qū)別說明
本文詳細(xì)解析了war與warexploded兩種部署方式的差異及步驟,war方式是先打包成war包,再部署到服務(wù)器上;warexploded方式是直接把文件夾、class文件等移到Tomcat上部署,支持熱部署,開發(fā)時常用,文章分別列出了warexploded模式和war包形式的具體操作步驟2024-10-10
Java數(shù)據(jù)結(jié)構(gòu)及算法實(shí)例:三角數(shù)字
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)及算法實(shí)例:三角數(shù)字,本文直接給出實(shí)現(xiàn)代碼,代碼中包含詳細(xì)注釋,需要的朋友可以參考下2015-06-06
java正則匹配讀取txt文件提取特定開頭和結(jié)尾的字符串
通常我們可以直接通過文件流來讀取txt文件的內(nèi)容,但有時候也會遇到問題,下面這篇文章主要給大家介紹了關(guān)于java正則匹配讀取txt文件提取特定開頭和結(jié)尾的字符串的相關(guān)資料,需要的朋友可以參考下2022-11-11
Android應(yīng)用開發(fā)之將SQLite和APK一起打包的方法
這篇文章主要介紹了Android應(yīng)用開發(fā)之將SQLite和APK一起打包的方法,文章時間較早,盡管現(xiàn)在開發(fā)環(huán)境已大都遷移至Android Studio上,但打包原理依然相同,需要的朋友可以參考下2015-08-08

