RocketMQ事務(wù)消息圖文示例講解
RocketMQ 也允許我們像mysql 一樣發(fā)送具有事務(wù)特征的消息
MQ 的事務(wù)流程(本地代碼正常執(zhí)行)

MQ 的消息補償過程(當(dāng)本地代碼執(zhí)行失敗時)

MQ 消息的三種狀態(tài)
- 提交狀態(tài):允許進入隊列,此消息與非事務(wù)消息無區(qū)別
- 回滾狀態(tài):不允許進入隊列,此消息等同于未發(fā)送過
- 中間狀態(tài):完成了 half 消息的發(fā)送,未對 MQ 進行二次狀態(tài)確認(rèn)(未知狀態(tài))
注意:事務(wù)消息僅與生產(chǎn)者有關(guān),與消費者無關(guān)
生產(chǎn)者代碼(提交狀態(tài)、回滾狀態(tài)):
public class Producer {
public static void main(String[] args) throws Exception{
//事務(wù)消息使用的生產(chǎn)者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
//添加本地事務(wù)對應(yīng)的監(jiān)聽
producer.setTransactionListener(new TransactionListener() {
//正常事務(wù)過程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 此處寫本地事務(wù)處理業(yè)務(wù)
// 如果成功,消息改為提交,如果失敗改為 回滾,如果是多線程處理狀態(tài)未知,就提交為未知等待事務(wù)補償過程
//事務(wù)提交狀態(tài)
return LocalTransactionState.COMMIT_MESSAGE;// 類似于msql 的 commit
//return LocalTransactionState.ROLLBACK_MESSAGE;回滾狀態(tài)
}
//事務(wù)補償過程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事務(wù)消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結(jié)果:"+result);
producer.shutdown();
}
}
生產(chǎn)者(中間狀態(tài)):
public class Producer {
public static void main(String[] args) throws Exception{
//事務(wù)消息使用的生產(chǎn)者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
//添加本地事務(wù)對應(yīng)的監(jiān)聽
producer.setTransactionListener(new TransactionListener() {
//正常事務(wù)過程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.UNKNOW;
}
//事務(wù)補償過程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("事務(wù)補償過程執(zhí)行");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic8",("事務(wù)消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結(jié)果:"+result);
//事務(wù)補償過程必須保障服務(wù)器在運行過程中,否則將無法進行正常的事務(wù)補償
//producer.shutdown();
}
}
到此這篇關(guān)于RocketMQ事務(wù)消息圖文示例講解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Java集合中的基本數(shù)據(jù)結(jié)構(gòu)
總有小伙伴讓我總結(jié)一下Java集合中的基本數(shù)據(jù)結(jié)構(gòu)的相關(guān)知識,今天特地整理了本篇文章,文中有非常詳細(xì)的介紹,需要的朋友可以參考下2021-06-06
java發(fā)送post請求使用multipart/form-data格式文件數(shù)據(jù)到接口代碼示例
這篇文章主要介紹了java發(fā)送post請求使用multipart/form-data格式文件數(shù)據(jù)到接口的相關(guān)資料,文中指定了數(shù)據(jù)編碼格式為UTF-8,并強調(diào)了所需依賴工具類,需要的朋友可以參考下2024-12-12
Spring Boot配置application.yml及根據(jù)application.yml選擇啟動配置的操作
Spring Boot中可以選擇applicant.properties 作為配置文件,也可以通過在application.yml中進行配置,讓Spring Boot根據(jù)你的選擇進行加載啟動配置文件,本文給大家介紹Spring Boot配置application.yml及根據(jù)application.yml選擇啟動配置的操作方法,感興趣的朋友一起看看吧2023-10-10

