關(guān)于RocketMQ使用事務(wù)消息
說明
事務(wù)消息:
1、不支持延時消息和批量消息
2、如果消息沒有及時提交,默認check 15次,可以通過Broker的transactionCheckMax參數(shù)配置次數(shù)。如果超時15次依然沒有得到明確結(jié)果,將會打印異常信息,具體的處理策略可以通過復(fù)寫AbstractTransactionCheckListener類實現(xiàn)
3、每次check的時間間隔可以通過Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS屬性指定
4、事務(wù)狀態(tài):LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。
原理
事務(wù)消息是RocketMQ的一大特性,其保證發(fā)送消息和執(zhí)行本地邏輯在同一個事務(wù)內(nèi)。實現(xiàn)的思路借鑒了兩階段提交協(xié)議:
第一階段:發(fā)送半事務(wù)消息,消息發(fā)送后,消息是對消費者透明的,也就是該消息還不屬于可消費消息,消費者無法消費。
第二階段:執(zhí)行本地事務(wù),本地執(zhí)行事務(wù)后提交消息。
(1)、如果事務(wù)執(zhí)行失敗,則回滾消息;
(2)、如果事務(wù)執(zhí)行成功,則提交消息,提交后消費者可消費到消息;
(3)、如果事務(wù)執(zhí)行成功,但消息提交失敗,RocketMQ還提供了回查機制:如果一段時間過后,沒有提交/回滾半事務(wù)消息,RocketMQ會定時回查一定的次數(shù),獲取本地事務(wù)的狀態(tài)以決定是提交還是回滾消息。
如果回查一定的次數(shù)后依然沒有獲取到本地事務(wù)的明確狀態(tài),則消息會被放到死信隊列,由人工確認如何處理。
事務(wù)消息處理流程

1、生產(chǎn)端發(fā)送半事務(wù)消息到服務(wù)端
2、服務(wù)端返回半事務(wù)消息發(fā)送成功響應(yīng)。注意,此時的消息對消費端是不可見的,不可被消費
3、發(fā)送方執(zhí)行本地事務(wù)
4、執(zhí)行完本地事務(wù)后,客戶端同步服務(wù)端提交/回滾消息
5、如果服務(wù)端在一定的時間內(nèi),等不到4的回應(yīng),則定時進行回查,詢問客戶端的本地事務(wù)狀態(tài)。
6、客戶端檢查本地事務(wù)狀態(tài)
7、根據(jù)本地事務(wù)執(zhí)行情況,告知服務(wù)端,服務(wù)端決定是提交消息還是丟棄消息。
生產(chǎn)端
@Test
public void sendMessage() throws Exception {
//事務(wù)生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("defaultGroup");
producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr());
//設(shè)置檢查本地事務(wù)狀態(tài)的線程池
//producer.setExecutorService(null);
//本地事務(wù)執(zhí)行監(jiān)聽器
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();
Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8")));
//發(fā)送事務(wù)消息
producer.sendMessageInTransaction(message, null);
}
class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//執(zhí)行本地事務(wù)(數(shù)據(jù)庫)操作......
int num = new Random().nextInt(10);
if (num < 3) {
//本地事務(wù)執(zhí)行成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} else if (num < 6) {
//本地事務(wù)執(zhí)行失敗,刪除消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//等待本地事務(wù)check,即執(zhí)行checkLocalTransaction()方法
return LocalTransactionState.UNKNOW;
}
/**
* 回查邏輯
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
int num = new Random().nextInt(10);
if (num < 3) {
//提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} else if (num < 6) {
//刪除消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}發(fā)送事務(wù)消息步驟:
1、初始化TransactionMQProducer實例
2、指定check線程池(回查線程池)
3、為Producer添加自定義事務(wù)監(jiān)聽器。自定義事務(wù)監(jiān)聽器需實現(xiàn)TransactionListener接口,通過覆蓋接口的executeLocalTransaction方法執(zhí)行本地事務(wù),返回事務(wù)狀態(tài),客戶端會根據(jù)本地事務(wù)狀態(tài)通知服務(wù)端,決定是否提交消息;通過覆蓋接口的checkLocalTransaction方法提供回查機制,當在一定的時間內(nèi)服務(wù)端獲取不到本地事務(wù)執(zhí)行狀態(tài),將通過該方法回查事務(wù)狀態(tài),以決定消失是否需要提交。
4、通過Producer.sendMessageInTransaction發(fā)送事務(wù)消息。
消費者正常消費邏輯
消費端
@Test
public void consumeMessage() throws Exception {
DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("消費到消息條數(shù):{}", list.size());
list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
.map(String::new).forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
Thread.sleep(5000L);
}消費端正常消費消息即可。
到此這篇關(guān)于關(guān)于RocketMQ使用事務(wù)消息的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis SqlSessionFactory與SqlSession詳細講解
SqlSessionFactory是MyBatis的核心類之一,其最重要的功能就是提供創(chuàng)建MyBatis的核心接口SqlSession,所以我們需要先創(chuàng)建SqlSessionFactory,為此我們需要提供配置文件和相關(guān)的參數(shù)2022-11-11
springboot集成nacos無法動態(tài)獲取nacos配置的問題
這篇文章主要介紹了springboot集成nacos無法動態(tài)獲取nacos配置的問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09
Linux下Java開發(fā)環(huán)境搭建以及第一個HelloWorld
這篇文章主要介紹了Linux下Java開發(fā)環(huán)境搭建以及第一個HelloWorld的實現(xiàn)過程,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2015-09-09
Java數(shù)據(jù)結(jié)構(gòu)之圖的領(lǐng)接矩陣詳解
圖的領(lǐng)接矩陣存儲方式是用兩個數(shù)組來表示圖。一個一位數(shù)組存儲圖中頂點信息,一個二維數(shù)組存儲圖中的邊或弧的信息。本文將為大家重點介紹一下數(shù)據(jù)結(jié)構(gòu)中的圖的鄰接矩陣,快來跟隨小編一起學習吧2021-11-11

