SpringCloud+RocketMQ實(shí)現(xiàn)分布式事務(wù)的實(shí)踐
隨著互聯(lián)網(wǎng)公司的微服務(wù)越來(lái)越多,分布式事務(wù)已經(jīng)成為了我們的經(jīng)常使用的。所以我們來(lái)一步一步的實(shí)現(xiàn)基于RocketMQ的分布式事務(wù)。接下來(lái),我們將要做的主題寫(xiě)出來(lái)。
- RocketMQ的分布式事務(wù)結(jié)構(gòu)和說(shuō)明
- 搭建RocketMQ步驟
- 事務(wù)場(chǎng)景,然后準(zhǔn)備工程,運(yùn)行代碼
一、RocketMQ的分布式事務(wù)結(jié)構(gòu)和說(shuō)明
我們通過(guò)下圖來(lái)了解一下RocketMQ實(shí)現(xiàn)分布式事務(wù)的結(jié)構(gòu)。采用半消息機(jī)制實(shí)現(xiàn)分布式事務(wù),半消息顧名思義,就是發(fā)送方將消息發(fā)送到MQ中的Broker端,這個(gè)消息被標(biāo)記為“暫不投遞”狀態(tài),這個(gè)時(shí)間訂閱方是不能收到這個(gè)消息的,當(dāng)發(fā)送方將提交了Commit后,這個(gè)消息才可以被訂閱方收到。如果發(fā)送方提交了Rollback,那么訂閱方就不會(huì)收到以該消息。
RocketMQ采用的是最終一致性事務(wù),因?yàn)樗褂昧税胂C(jī)制,訂閱方可能收不到消息,但是最終狀態(tài)是能保持一致的。
我們?cè)谥v這個(gè)流程前,我們需要先看下圖的紅色部分,當(dāng)發(fā)送方發(fā)給MQ的Commit/Rollback沒(méi)有收到的時(shí)候,這個(gè)時(shí)候需要啟用一個(gè)線程,從本地事務(wù)庫(kù)里邊查找到當(dāng)前的狀態(tài),根據(jù)當(dāng)前的狀態(tài)來(lái)決定是否重新發(fā)送Commit/Rollback,相當(dāng)于重試。

接下來(lái),我們說(shuō)一下整個(gè)執(zhí)行的過(guò)程。
發(fā)送方將半消息發(fā)送到MQ中,成功后MQ將消息保持好后將結(jié)果同步給發(fā)送方,即半消息發(fā)送成功。當(dāng)MQ告訴發(fā)送方成功后,我們需要記錄下發(fā)送到MQ的相關(guān)的事情,比如發(fā)送的消息內(nèi)容,發(fā)送時(shí)間,發(fā)送的相關(guān)ID,比如事務(wù)ID,都要做好相應(yīng)記錄。當(dāng)本地事務(wù)庫(kù),也可以理解為日志庫(kù),我們可以針對(duì)發(fā)生的問(wèn)題進(jìn)行追溯。本地庫(kù)記錄好事務(wù)后,發(fā)送方就可以將Commit/Rollback提交到MQ了。當(dāng)MQ收到了Commit命令后,這個(gè)時(shí)候就會(huì)將該條消息發(fā)送給訂閱方。收到了Rollback后,那該條消息不會(huì)投遞給訂閱方,消息保存3天后刪除。
整個(gè)過(guò)程就是這些,大家如果覺(jué)得有遺漏的,可以告訴我。
二、搭建RocketMQ
我使用的是windos10,所以需要下載windows版本。
進(jìn)入到RocketMQ的官方地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/
下載二進(jìn)制文件壓縮包。如下圖:

先解壓,然后進(jìn)行環(huán)境變量配置,打開(kāi)菜單,直接輸入環(huán)境變量。
變量名:ROCKETMQ_HOME
值:解壓的release的路徑,如D:\rocketmq-all-4.3.0-bin-release
然后打開(kāi)CMD命令,進(jìn)入到解壓路徑中的bin目標(biāo),進(jìn)行nameserver啟動(dòng)。
start mqnameserv.cmd
啟動(dòng)后的效果為:

再進(jìn)行broker的啟動(dòng),啟動(dòng)需要連到的nameserver為127.0.0.1:9876,打開(kāi)自動(dòng)創(chuàng)建Topic,這個(gè)時(shí)候當(dāng)用到某個(gè)Topic沒(méi)有的情況下會(huì)自動(dòng)創(chuàng)建一個(gè)。
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

這個(gè)時(shí)候Windows的rocketmq就下載好并且啟動(dòng)成功了。
三、事務(wù)場(chǎng)景,然后準(zhǔn)備工程,運(yùn)行代碼
因?yàn)槲覀兪褂玫腞ocketMq是使用的分布式事務(wù)是最終一致性的,柔性的,所以我們使用的場(chǎng)景也要考慮到。我們用的場(chǎng)景就是下訂單充話費(fèi),用戶(hù)支付了訂單,那么直接給用戶(hù)的進(jìn)行充值。購(gòu)買(mǎi)的訂單不會(huì)立馬充值到手機(jī)的,需要等一會(huì)才到賬,這就是最終一致性。
我們使用的如下代碼版本號(hào),SpringCloud的版本號(hào)要和SpringBoot保持一致,否則會(huì)出現(xiàn)類(lèi)找不到的情況。
SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4.RELEASE + RocketMQ4.3 +MySQL + lombok(插件)
我們使用SpringCloud的幾個(gè)組件:
Euerka Server :用于提供服務(wù)注冊(cè)的能力和發(fā)現(xiàn)
Euerka Client : 用于進(jìn)行服務(wù)注冊(cè)
Feign:用于服務(wù)間的調(diào)用
Config :用于進(jìn)行配置
接下來(lái),我們創(chuàng)建需要進(jìn)行配置的表。
DROP TABLE IF EXISTS `spring_cloud_config`;
CREATE TABLE `spring_cloud_config` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`akey` varchar(30) DEFAULT NULL,
`avalue` varchar(128) DEFAULT NULL,
`application` varchar(30) DEFAULT NULL,
`aprofile` varchar(30) DEFAULT NULL,
`label` varchar(30) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO `spring_cloud_config` (`id`, `akey`, `avalue`, `application`, `aprofile`, `label`)
VALUES
(2,'name_server','127.0.0.1:9876','product-service','dev','dev'),
(3,'name_server','127.0.0.1:9876','order-service','dev','dev'),
(4,'order_topic','order_topic','order-service','dev','dev'),
(5,'order_topic','order_topic','product-service','dev','dev');
我們構(gòu)建了4個(gè)模塊,eureka模塊用于服務(wù)注冊(cè)和發(fā)現(xiàn),springcloud-config將數(shù)據(jù)庫(kù)創(chuàng)建的配置加載供其他工程使用。charge-order-service用于下充值訂單,phone-charge-service用于給手機(jī)進(jìn)行充值業(yè)務(wù)。

主要的流程為,通過(guò)charge-order-service產(chǎn)生了訂單,然后將訂單和數(shù)量到phone-charge-service進(jìn)行二次確認(rèn)看是否還有足夠的數(shù)量可以使用,如果數(shù)量不夠的話直接將事務(wù)進(jìn)行rollback,這樣消息就不會(huì)到消費(fèi)端。如果下過(guò)去的訂單號(hào)已經(jīng)充值過(guò)了,那么該消息將會(huì)被直接丟掉,這也是消息端的冪等設(shè)計(jì)。
接下來(lái),我們看一下生產(chǎn)者以及事務(wù)的核心代碼。主要用于連接RocketMQ, 執(zhí)行本地事務(wù),在本地事務(wù)中進(jìn)行二次確認(rèn),根據(jù)結(jié)果進(jìn)行Commit、Rollback。當(dāng)連接RocketMQ出現(xiàn)問(wèn)題的時(shí)候可能會(huì)收到UNKNOWN,這個(gè)時(shí)候會(huì)調(diào)用checkLocalTransaction()方法,用于檢查是否將消息發(fā)送給RocketMQ了。
package com.hqs.chargeorderservice.mqservice;
import com.alibaba.fastjson.JSONObject;
import com.hqs.chargeorderservice.config.Jms;
import com.hqs.chargeorderservice.service.ProduceOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
/**
*
* @Description: 分布式事務(wù)RocketMQ 生產(chǎn)者
*/
@Slf4j
@Component
public class TransactionProducer {
/**
* 需要自定義事務(wù)監(jiān)聽(tīng)器 用于 事務(wù)的二次確認(rèn) 和 事務(wù)回查
*/
private TransactionListener transactionListener ;
/**
* 這里的生產(chǎn)者和之前的不一樣
*/
private TransactionMQProducer producer = null;
/**
* 官方建議自定義線程 給線程取自定義名稱(chēng) 發(fā)現(xiàn)問(wèn)題更好排查
*/
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) {
transactionListener = new TransactionListenerImpl(produceOrderService);
// 初始化 事務(wù)生產(chǎn)者
producer = new TransactionMQProducer(jms.getOrderTopic());
// 添加服務(wù)器地址
producer.setNamesrvAddr(jms.getNameServer());
// 添加事務(wù)監(jiān)聽(tīng)器
producer.setTransactionListener(transactionListener);
// 添加自定義線程池
producer.setExecutorService(executorService);
start();
}
public TransactionMQProducer getProducer() {
return this.producer;
}
/**
* 對(duì)象在使用之前必須要調(diào)用一次,只能初始化一次
*/
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在應(yīng)用上下文,使用上下文監(jiān)聽(tīng)器,進(jìn)行關(guān)閉
*/
public void shutdown() {
this.producer.shutdown();
}
}
/**
* @Description: 自定義事務(wù)監(jiān)聽(tīng)器
*/
@Slf4j
class TransactionListenerImpl implements TransactionListener {
@Autowired
private ProduceOrderService produceOrderService ;
public TransactionListenerImpl( ProduceOrderService produceOrderService) {
this.produceOrderService = produceOrderService;
}
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("=========本地事務(wù)開(kāi)始執(zhí)行=============");
String message = new String(msg.getBody());
JSONObject jsonObject = JSONObject.parseObject(message);
Integer productId = jsonObject.getInteger("productId");
Integer total = jsonObject.getInteger("total");
Integer tradeNo = jsonObject.getInteger("tradeNo");
int userId = Integer.parseInt(arg.toString());
//模擬執(zhí)行本地事務(wù)begin=======
/**
* 本地事務(wù)執(zhí)行會(huì)有三種可能
* 1、commit 成功
* 2、Rollback 失敗
* 3、網(wǎng)絡(luò)等原因服務(wù)宕機(jī)收不到返回結(jié)果
*/
log.info("本地事務(wù)執(zhí)行參數(shù),用戶(hù)id={},商品ID={},銷(xiāo)售庫(kù)存={}",userId,productId,total);
int result = produceOrderService.save(userId, productId, total, tradeNo);
//模擬執(zhí)行本地事務(wù)end========
//TODO 實(shí)際開(kāi)發(fā)下面不需要我們手動(dòng)返回,而是根據(jù)本地事務(wù)執(zhí)行結(jié)果自動(dòng)返回
//1、二次確認(rèn)消息,然后消費(fèi)者可以消費(fèi)
if (result == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
//2、回滾消息,Broker端會(huì)刪除半消息
if (result == 1) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//3、Broker端會(huì)進(jìn)行回查消息
if (result == 2) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 只有上面接口返回 LocalTransactionState.UNKNOW 才會(huì)調(diào)用查接口被調(diào)用
*
* @param msg 消息
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("==========回查接口=========");
String key = msg.getKeys();
//TODO 1、必須根據(jù)key先去檢查本地事務(wù)消息是否完成。
/**
* 因?yàn)橛蟹N情況就是:上面本地事務(wù)執(zhí)行成功了,但是return LocalTransactionState.COMMIT_MESSAG的時(shí)候
* 服務(wù)掛了,那么最終 Brock還未收到消息的二次確定,還是個(gè)半消息 ,所以當(dāng)重新啟動(dòng)的時(shí)候還是回調(diào)這個(gè)回調(diào)接口。
* 如果不先查詢(xún)上面本地事務(wù)的執(zhí)行情況 直接在執(zhí)行本地事務(wù),那么就相當(dāng)于成功執(zhí)行了兩次本地事務(wù)了。
*/
// TODO 2、這里返回要么commit 要么rollback。沒(méi)有必要在返回 UNKNOW
return LocalTransactionState.COMMIT_MESSAGE;
}
}
我們啟動(dòng)一下程序。我們登錄localhost:7001注冊(cè)中心,看到3個(gè)服務(wù)都成功注冊(cè)了。

然后我們進(jìn)行下訂單調(diào)用接口,userId用戶(hù)編碼,productId產(chǎn)品號(hào),total購(gòu)買(mǎi)量,tradeNo交易訂單號(hào)
http://localhost:9001/api/v1/order/chargeOrder?userId=1&productId=1&total=1&tradeNo=4
charge-order-service顯示出來(lái)本地執(zhí)行事務(wù)的參數(shù)。

phone-charge-service消費(fèi)事務(wù)執(zhí)行的日志。

當(dāng)我們?cè)趧?chuàng)建charge-order-service和phone-charge-service的時(shí)候一定要注意,將工程里邊的application.properties變?yōu)閎ootstrap.yml,要不然程序啟動(dòng)的時(shí)候會(huì)從8888找config的配置內(nèi)容。因?yàn)镾pringCloud里面有個(gè)“啟動(dòng)上下文”,主要是用于加載遠(yuǎn)端的配置,也就是加載ConfigServer里面的配置,默認(rèn)加載順序?yàn)?加載bootstrap.*里面的配置 --> 鏈接configserver,加載遠(yuǎn)程配置 --> 加載application.*里面的配置。
好了,代碼也放到git地址了,https://github.com/stonehqs/springcloud-rocketmq-transaction,
到此這篇關(guān)于SpringCloud+RocketMQ實(shí)現(xiàn)分布式事務(wù)的實(shí)踐的文章就介紹到這了,更多相關(guān)SpringCloud RocketMQ分布式事務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot vue組件開(kāi)發(fā)實(shí)現(xiàn)接口斷言功能
這篇文章主要為大家介紹了springboot+vue組件開(kāi)發(fā)實(shí)現(xiàn)接口斷言功能,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
Java 仿天貓服裝商城系統(tǒng)的實(shí)現(xiàn)流程
讀萬(wàn)卷書(shū)不如行萬(wàn)里路,只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實(shí)現(xiàn)一個(gè)仿天貓服裝商城系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11
MyBatis實(shí)現(xiàn)動(dòng)態(tài)SQL更新的代碼示例
本文博小編將帶領(lǐng)大家學(xué)習(xí)如何利用 MyBatis 攔截器機(jī)制來(lái)優(yōu)雅的實(shí)現(xiàn)這個(gè)需求,文中通過(guò)代碼示例介紹的非常詳細(xì),具有一定的參考價(jià)值,需要的朋友可以參考下2023-07-07
關(guān)于Map的遍歷以及轉(zhuǎn)JsonArray存儲(chǔ)方式
在Java開(kāi)發(fā)過(guò)程中,經(jīng)常會(huì)遇到需要對(duì)復(fù)雜數(shù)據(jù)結(jié)構(gòu)進(jìn)行處理的情況,本案例以List<Map<String,Object>>為例,介紹了如何遍歷該數(shù)據(jù)結(jié)構(gòu),并根據(jù)特定條件篩選出符合要求的元素,通過(guò)自定義一個(gè)Edit類(lèi)來(lái)模擬形成一個(gè)新的Map對(duì)象,實(shí)現(xiàn)了數(shù)據(jù)的有序存儲(chǔ)2024-11-11
Jmeter后置處理器實(shí)現(xiàn)過(guò)程及方法應(yīng)用
這篇文章主要介紹了Jmeter后置處理器實(shí)現(xiàn)過(guò)程及方法應(yīng)用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
Spring boot 數(shù)據(jù)源未配置異常的解決
這篇文章主要介紹了Spring boot 數(shù)據(jù)源未配置異常的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
logback?OutputStreamAppender高效日志輸出源碼解析
這篇文章主要介紹了為大家logback?OutputStreamAppender日志輸出效率提升示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
淺談collection標(biāo)簽的oftype屬性能否為java.util.Map
這篇文章主要介紹了collection標(biāo)簽的oftype屬性能否為java.util.Map,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
Sentinel的熔斷降級(jí)、資源規(guī)則詳解與實(shí)例
這篇文章主要介紹了Sentinel的熔斷降級(jí)、資源規(guī)則詳解與實(shí)例,Sentinel是阿里巴巴開(kāi)源的一款流量控制和熔斷降級(jí)的框架,它主要用于保護(hù)分布式系統(tǒng)中的服務(wù)穩(wěn)定性,Sentinel通過(guò)對(duì)服務(wù)進(jìn)行流量控制和熔斷降級(jí),可以有效地保護(hù)系統(tǒng)的穩(wěn)定性,需要的朋友可以參考下2023-09-09

