微服務架構設計RocketMQ進階事務消息原理詳解
前言
分布式消息選型的時候是否支持事務消息是一個很重要的考量點,而目前只有RocketMQ對事務消息支持的最好。今天我們來嘮嘮如何實現(xiàn)RocketMQ的事務消息!
Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務消息,這里RocketMQ采用了2PC的思想來實現(xiàn)了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。

RocketMQ事務流程概要
RocketMQ實現(xiàn)事務消息主要分為兩個階段:正常事務的發(fā)送及提交、事務信息的補償流程 整體流程為:
正常事務發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
2、服務端響應消息寫入結果,半消息發(fā)送成功
3、開始執(zhí)行本地事務
4、根據(jù)本地事務的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
事務信息的補償流程
1、如果MQServer長時間沒收到本地事務的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認回查的操作請求
2、生產(chǎn)者收到確認回查請求后,檢查本地事務的執(zhí)行狀態(tài)
3、根據(jù)檢查后的結果執(zhí)行Commit或者Rollback操作
補償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。
RocketMQ事務流程關鍵
1、事務消息在一階段對用戶不可見
事務消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的,也就是說消費者不能直接消費。這里RocketMQ的實現(xiàn)方法是原消息的主題與消息消費隊列,然后把主題改成RMQ_SYS_TRANS_HALF_TOPIC ,這樣由于消費者沒有訂閱這個主題,所以不會被消費。
2、如何處理第二階段的失敗消息?
在本地事務執(zhí)行完成后會向MQServer發(fā)送Commit或Rollback操作,此時如果在發(fā)送消息的時候生產(chǎn)者出故障了,那么要保證這條消息最終被消費,MQServer會像服務端發(fā)送回查請求,確認本地事務的執(zhí)行狀態(tài)。
當然了rocketmq并不會無休止的的信息事務狀態(tài)回查,默認回查15次,如果15次回查還是無法得知事務狀態(tài),RocketMQ默認回滾該消息。
3、消息狀態(tài) 事務消息有三種狀態(tài):
TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息
TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。
TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。
實現(xiàn)
我們構建這樣一個需求:用戶請求訂單微服務 order-service 接口刪除訂單(退貨),刪除訂單后需要發(fā)送消息給用戶服務account-service,用戶微服務收到消息后會給用戶賬戶增加余額。這個需求跟錢相關,肯定要保證消息的事務性,接下來我們根據(jù)上面的原理實現(xiàn)整個流程。
基礎配置
生產(chǎn)者order-servcie和account-service都要引入RocketMQ相關依賴,增加RocketMQ的相關配置
引入組件
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
添加配置
# within rocketmq
rocketmq:
name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876
producer:
group: cloud-group
發(fā)送半消息
order-service在執(zhí)行刪除訂單操作時發(fā)送一條半消息給MQServer,發(fā)送半消息主要是使用rocketMQTemplate.sendMessageInTransaction() 方法,發(fā)送事務消息。
@Override
public void delete(String orderNo) {
Order order = orderMapper.selectByNo(orderNo);
//如果訂單存在且狀態(tài)為有效,進行業(yè)務處理
if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
String transactionId = UUID.randomUUID().toString();
//如果可以刪除訂單則發(fā)送消息給rocketmq,讓用戶中心消費消息
rocketMQTemplate.sendMessageInTransaction("add-amount",
MessageBuilder.withPayload(
UserAddMoneyDTO.builder()
.userCode(order.getAccountCode())
.amount(order.getAmount())
.build()
)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("order_id",order.getId())
.build()
,order
);
}
}
首先先校驗一下訂單狀態(tài),然后發(fā)送消息給MQServer,這個邏輯大家都看得懂,
主要是關注sendMessageInTransaction() 方法,源碼如下:
public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {
try {
if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
} else {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return this.producer.sendMessageInTransaction(rocketMsg, arg);
}
} catch (MQClientException var5) {
throw RocketMQUtil.convert(var5);
}
}
該方法有三個參數(shù):
destination:目的地(主題),這里發(fā)送給add-amount 這個主題
message:發(fā)送給消費者的消息體,需要使用 MessageBuilder.withPayload() 來構建消息
arg:參數(shù)
注意,這里我們生成了一個transactionId,并放在header中跟消息一起發(fā)送(這里實際也可以構造成一個對象,放在arg里進行發(fā)送),作用后面再講!
執(zhí)行本地事務與回查
MQServer收到半消息后會告訴生產(chǎn)者order-service確認收到半消息,這時候order-service需要執(zhí)行本地事務,執(zhí)行完本地事務后再告訴MQServer本地事務的執(zhí)行狀態(tài),確認消息究竟是Commit還是Rollback。如果在告訴MQServer本地執(zhí)行狀態(tài)的時候出異常了還需要讓MQServer能夠回查到,怎么實現(xiàn)這一些列操作呢?
RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事務監(jiān)聽器,這個接口類的實現(xiàn)如下:

第一個方法executeLocalTransaction 為執(zhí)行本地事務;
第二個方法checkLocalTransaction 為檢查本地事務的執(zhí)行狀態(tài),也就是回查動作。
有了這個接口類我們的執(zhí)行邏輯清楚了,但是還有個問題:本地事務已經(jīng)執(zhí)行完成了,怎么去回查本地事務的執(zhí)行結果呢?
我們可以在執(zhí)行本地事務的時候同時生成一個事務日志,讓本地事務與日志事務在同一個方法中,同時添加@Transactional 注解,保證兩個操作事務是一個原子操作。這樣如果事務日志表中有這個本地事務的信息,那就代表本地事務執(zhí)行成功,需要Commit,相反如果沒有對應的事務日志,則表示沒執(zhí)行成功,需要Rollback
思路既然理順了,咱們就開擼。
首先創(chuàng)建一個日志表

很簡單的三個字段,主要是這個事務id,需要根據(jù)這個事務id回查事務,還記得我們在發(fā)送半消息時生成的事務id嗎,就是干這個用的!
在生產(chǎn)者編寫方法實現(xiàn)RocketMQLocalTransactionListener
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
private final OrderService orderService;
private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
/**
* 執(zhí)行本地事務
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("執(zhí)行本地事務");
MessageHeaders headers = message.getHeaders();
//獲取事務ID
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer orderId = Integer.valueOf((String)headers.get("order_id"));
log.info("transactionId is {}, orderId is {}",transactionId,orderId);
try{
//執(zhí)行本地事務,并記錄日志
orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
//執(zhí)行成功,可以提交事務
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事務的檢查,檢查本地事務是否成功
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
//獲取事務ID
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("檢查本地事務,事務ID:{}",transactionId);
//根據(jù)事務id從日志表檢索
QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("transaction_id",transactionId);
RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
if(null != rocketmqTransactionLog){
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
執(zhí)行本地事務的方法
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
//將訂單狀態(tài)置位無效
orderMapper.changeStatus(id,status);
//插入事務表
rocketMqTransactionLogMapper.insert(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("執(zhí)行刪除訂單操作")
.build()
);
}
這一塊的代碼邏輯都是在生產(chǎn)端,即Order-Server,大家不要搞錯了
消費消息
Rollback的消息MQServer會給我們處理,我們只要關注Commit狀態(tài)時消費端可以正常消費即可。在account-service監(jiān)聽消息,如果收到消息則給用戶賬戶增加余額。
@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
private final AccountMapper accountMapper;
/**
* 收到消息的業(yè)務邏輯
*/
@Override
public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
log.info("received message: {}",userAddMoneyDTO);
accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
log.info("add money success");
}
}
測試

訂單表有這樣一條記錄,用戶為jianzh5,amount為200

用戶表的記錄,執(zhí)行完成后jianzh5的賬戶應該變成250
調(diào)用刪除訂單接口,刪除訂單

發(fā)送半消息

執(zhí)行本地事務,并生成事務日志

模擬異常情況 在發(fā)送Commit消息的時候我們用命令殺掉進程taskkill /pid 19748 -t -f,模擬異常!

重新啟動order-service,查看是否會執(zhí)行回查動作

MQServer進行回查,檢查事務日志,判斷是否可以提交事務
消費者消費事務消息,保證事務的一致性

總結
使用RocketMQ實現(xiàn)事務消息的過程還是很復雜的,需要好好理解開頭的那張圖,只有理解了事務消息的交互過程才能編寫相應的代碼!
好了,各位朋友們,本期的內(nèi)容到此就全部結束啦,能看到這里的同學都是優(yōu)秀的同學,下一個升職加薪的就是你了!
以上就是微服務架構RocketMQ進階事務消息原理詳解的詳細內(nèi)容,更多關于微服務架構RocketMQ事務消息的資料請關注腳本之家其它相關文章!
相關文章
Java?Collections.sort()實現(xiàn)List排序的默認方法和自定義方法
這篇文章主要介紹了Java?Collections.sort()實現(xiàn)List排序的默認方法和自定義方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2017-06-06
Java使用ThreadLocal實現(xiàn)當前登錄信息的存取功能
ThreadLocal和其他并發(fā)工具一樣,也是用于解決多線程并發(fā)訪問,下這篇文章主要給大家介紹了關于Java使用ThreadLocal實現(xiàn)當前登錄信息的存取功能,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-02-02
Java如何實現(xiàn)實體類轉(zhuǎn)Map、Map轉(zhuǎn)實體類
這篇文章主要介紹了Java 實現(xiàn)實體類轉(zhuǎn)Map、Map轉(zhuǎn)實體類的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringBoot整合mybatis-plus快速入門超詳細教程
mybatis-plus 是一個 Mybatis 的增強工具,在 Mybatis 的基礎上只做增強不做改變,為簡化開發(fā)、提高效率而生,本文給大家分享SpringBoot整合mybatis-plus快速入門超詳細教程,一起看看吧2021-09-09
SpringBoot使用itext填充pdf表單及導出pdf的流程
由于最近開發(fā)的項目需要用到打印單據(jù),就在網(wǎng)上找了一下方案,反反復復,都沒有找到合適的,借鑒了網(wǎng)上資源,使用itext5、itext7的工具包,所以本文介紹了SpringBoot使用itext填充pdf表單及導出pdf的流程,需要的朋友可以參考下2024-09-09
java ThreadPoolExecutor 并發(fā)調(diào)用實例詳解
這篇文章主要介紹了java ThreadPoolExecutor 并發(fā)調(diào)用實例詳解的相關資料,需要的朋友可以參考下2017-05-05

