RocketMQ事務(wù)消息使用與原理詳解
一、背景&概述
最近在找工作,面試過程中被多次問到事務(wù)消息的實(shí)現(xiàn)原理,另外在分布式事務(wù)解決方案中,事務(wù)消息也是一個不錯的解決方案,本篇文章將圍繞RocketMQ的事務(wù)消息實(shí)現(xiàn)展開描述。
二、應(yīng)用場景
所謂事務(wù)消息,其實(shí)是為了解決上下游寫一致性,以及強(qiáng)依賴解耦,也即是完成當(dāng)前操作的同時給下游發(fā)送指令,并且保證上下游要么同時成功或者同時失敗,并且考慮上游的性能和RT問題做出的強(qiáng)調(diào)用解耦妥協(xié)。常見的應(yīng)用場景有:
1.訂單履約指令下發(fā)
用戶下單成功后,給履約系統(tǒng)發(fā)送指令進(jìn)行履約操作,下單失敗不發(fā)送指令,采購缺貨或者其他履約異常,反向觸發(fā)訂單取消或者其他兜底操作。
2.用戶轉(zhuǎn)賬
用戶發(fā)起轉(zhuǎn)賬后,交易狀態(tài)短暫掛起,發(fā)送指令給銀行,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待結(jié)果更新交易狀態(tài)。
3.訂單支付
支付發(fā)起后,當(dāng)筆訂單處于中間狀態(tài),給支付網(wǎng)關(guān)發(fā)起指令,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待支付網(wǎng)關(guān)反饋更新支付狀態(tài)。
三、使用方式
1.事務(wù)消息監(jiān)聽器
@Component
@Slf4j
public class OrderTransactionalListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("開始執(zhí)行本地事務(wù)....");
LocalTransactionState state;
try{
String body = new String(message.getBody());
OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
orderService.createOrder(order,message.getTransactionId());
state = LocalTransactionState.COMMIT_MESSAGE;
log.info("本地事務(wù)已提交。{}",message.getTransactionId());
}catch (Exception e){
log.error("執(zhí)行本地事務(wù)失敗。{}",e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
log.info("開始回查本地事務(wù)狀態(tài)。{}",messageExt.getTransactionId());
LocalTransactionState state;
String transactionId = messageExt.getTransactionId();
if (transactionLogService.get(transactionId)>0){
state = LocalTransactionState.COMMIT_MESSAGE;
}else {
state = LocalTransactionState.UNKNOW;
}
log.info("結(jié)束本地事務(wù)狀態(tài)查詢:{}",state);
return state;
}
}2.編寫事務(wù)消息生產(chǎn)者
@Component
@Slf4j
public class TransactionalMsgProducer implements InitializingBean, DisposableBean {
private String GROUP = "order_transactional";
private TransactionMQProducer msgProducer;
//用于執(zhí)行本地事務(wù)和事務(wù)狀態(tài)回查的監(jiān)聽器
@Autowired
private OrderTransactionalListener orderTransactionListener;
//執(zhí)行任務(wù)的線程池
private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
private void start(){
try {
this.msgProducer.start();
} catch (MQClientException e) {
log.error("msg producer starter occur error;",e);
}
}
private void shutdown() {
if(null != msgProducer) {
try {
msgProducer.shutdown();
} catch (Exception e) {
log.error("producer shutdown occur error;",e);
}
}
}
public TransactionSendResult send(String data, String topic) throws MQClientException {
Message message = new Message(topic,data.getBytes());
return this.msgProducer.sendMessageInTransaction(message, null);
}
@Override
public void afterPropertiesSet() throws Exception {
msgProducer = new TransactionMQProducer(GROUP);
msgProducer.setNamesrvAddr("namesrvHost:ip");
msgProducer.setSendMsgTimeout(Integer.MAX_VALUE);
msgProducer.setExecutorService(executor);
msgProducer.setTransactionListener(orderTransactionListener);
this.start();
}
@Override
public void destroy() throws Exception {
this.shutdown();
}
}3.業(yè)務(wù)實(shí)現(xiàn)
@Service
@Slf4j
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private TransactionLogMapper transactionLogMapper;
@Autowired
private TransactionalMsgProducer producer;
//執(zhí)行本地事務(wù)時調(diào)用,將訂單數(shù)據(jù)和事務(wù)日志寫入本地?cái)?shù)據(jù)庫
@Transactional
@Override
public void createOrder(OrderDTO orderDTO,String transactionId){
//1.創(chuàng)建訂單
Order order = new Order();
BeanUtils.copyProperties(orderDTO,order);
orderMapper.createOrder(order);
//2.寫入事務(wù)日志
TransactionLog log = new TransactionLog();
log.setId(transactionId);
log.setBusiness("order");
log.setForeignKey(String.valueOf(order.getId()));
transactionLogMapper.insert(log);
log.info("create order success,order={}",orderDTO);
}
//前端調(diào)用,只用于向RocketMQ發(fā)送事務(wù)消息
@Override
public void createOrder(OrderDTO order) throws MQClientException {
order.setId(snowflake.nextId());
order.setOrderNo(snowflake.nextIdStr());
producer.send(JSON.toJSONString(order),"order");
}
}4.入口調(diào)用
@RestController
@Slf4j
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/create_order")
public void createOrder(@RequestBody OrderDTO order) {
log.info("receive order data,order={}",order.getCommodityCode());
orderService.createOrder(order);
}
}這樣我們就實(shí)現(xiàn)了rocketmq事務(wù)消息的使用。
四、原理介紹
1.概念模型
- 半消息(half message):半消息是一種特殊的消息類型,該狀態(tài)的消息暫時不能被Consumer消費(fèi)(消費(fèi)端不可見)。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發(fā)出的二次確認(rèn)時,該事務(wù)消息就處于"暫時不可被消費(fèi)"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。
- 消息狀態(tài)回查(Message status check):由于網(wǎng)絡(luò)抖動閃斷、Producer重啟等原因,可能導(dǎo)致Producer向Broker發(fā)送的二次確認(rèn)消息沒有成功送達(dá)。如果Broker檢測到某條事務(wù)消息長時間處于半消息狀態(tài),則會主動向Producer端發(fā)起回查操作,查詢該事務(wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)。可以看出,Message Status Check主要用來解決分布式事務(wù)中的超時問題。
2.執(zhí)行流程

1):Producer向Broker端發(fā)送Half Message;
2):Broker ACK,Half Message發(fā)送成功;
3):Producer執(zhí)行本地事務(wù);
4):本地事務(wù)完畢,根據(jù)事務(wù)的狀態(tài),Producer向Broker發(fā)送二次確認(rèn)消息,確認(rèn)該Half Message的Commit或者Rollback狀態(tài)。Broker收到二次確認(rèn)消息后,對于Commit狀態(tài),則直接發(fā)送到Consumer端執(zhí)行消費(fèi)邏輯,而對于Rollback則直接標(biāo)記為失敗,一段時間后清除,并不會發(fā)給Consumer。正常情況下,到此分布式事務(wù)已經(jīng)完成,剩下要處理的就是超時問題,即一段時間后Broker仍沒有收到Producer的二次確認(rèn)消息;
5):針對超時狀態(tài),Broker主動向Producer發(fā)起消息回查;
6):Producer處理回查消息,返回對應(yīng)的本地事務(wù)的執(zhí)行結(jié)果;
7):Broker針對回查消息的結(jié)果,執(zhí)行Commit或Rollback操作,同4。
3.事務(wù)消息設(shè)計(jì)
在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中事務(wù)消息相對普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對用戶是不可見的。那么如何做到寫入消息但是對用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費(fèi)隊(duì)列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費(fèi)組未訂閱該主題,故消費(fèi)端無法消費(fèi)half類型的消息,然后RocketMQ會開啟一個定時任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費(fèi),根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息。
在RocketMQ中,消息在服務(wù)端的存儲結(jié)構(gòu)如下,每條消息都會有對應(yīng)的索引信息,Consumer通過ConsumeQueue這個二級索引來讀取消息實(shí)體內(nèi)容,其流程如下:

RocketMQ的具體實(shí)現(xiàn)策略是:寫入的如果事務(wù)消息,對消息的Topic和Queue等屬性進(jìn)行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因?yàn)橄⒅黝}被替換,故消息并不會轉(zhuǎn)發(fā)到該原主題的消息消費(fèi)隊(duì)列,消費(fèi)者無法感知消息的存在,不會消費(fèi)。
在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對于Rollback,本身一階段的消息對用戶是不可見的,其實(shí)不需要真正撤銷消息(實(shí)際上RocketMQ也無法去真正的刪除一條消息,因?yàn)槭琼樞驅(qū)懳募模?。但是區(qū)別于這條消息沒有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決),需要一個操作來標(biāo)識這條消息的最終狀態(tài)。RocketMQ事務(wù)消息方案中引入了Op消息的概念,用Op消息標(biāo)識事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。如果一條事務(wù)消息沒有對應(yīng)的Op消息,說明這個事務(wù)的狀態(tài)還無法確定(可能是二階段失敗了)。引入Op消息后,事務(wù)消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對于Rollback只是在寫入Op消息前創(chuàng)建Half消息的索引。
一階段的Half消息由于是寫到一個特殊的Topic,所以二階段構(gòu)建索引時需要讀取出Half消息,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務(wù)消息二階段其實(shí)是利用了一階段存儲的消息的內(nèi)容,在二階段時恢復(fù)出一條完整的普通消息。
如果在RocketMQ事務(wù)消息的二階段過程中失敗了,例如在做Commit操作時,出現(xiàn)網(wǎng)絡(luò)問題導(dǎo)致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補(bǔ)償機(jī)制,稱為“回查”。Broker端對未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對應(yīng)的Producer端(同一個Group的Producer),由Producer根據(jù)消息來檢查本地事務(wù)的狀態(tài),進(jìn)而執(zhí)行Commit或者Rollback。Broker端通過對比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。
需要注意的是,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),rocketmq默認(rèn)回滾該消息。
五、源碼分析
1.客服端發(fā)送事務(wù)消息
RocketMQ事務(wù)消息由TransactionMQProducer實(shí)現(xiàn),繼承DefaultMQProducer實(shí)現(xiàn)了發(fā)送事務(wù)消息的能力。

發(fā)送事務(wù)消息會調(diào)用TransactionMQProducer的sendMessageInTransaction方法:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}檢查有沒有配置事務(wù)監(jiān)聽器,監(jiān)聽器提供了兩個方法:
- executeLocalTransaction:執(zhí)行本地事務(wù)
- checkLocalTransaction:回查本地事務(wù)
然后調(diào)用DefaultMQProducerImpl執(zhí)行發(fā)送:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//...省略
SendResult sendResult = null;
//msg設(shè)置參數(shù)TRAN_MSG,表示為事務(wù)消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//發(fā)送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
//通過LocalTransactionExecutor執(zhí)行,已經(jīng)廢棄
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
//消息發(fā)送成功,執(zhí)行本地事務(wù)
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
} catch (Throwable e) {
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//執(zhí)行endTransaction方法,如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//省略...
return transactionSendResult;
}該方法做了以下幾件事情:
- 給消息打上事務(wù)屬性,用于broker區(qū)分普通消息和事務(wù)消息
- 發(fā)送半消息(half message)
- 發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)
- 執(zhí)行endTransaction方法,通知broker 執(zhí)行 commit/rollback
發(fā)送消息會正常調(diào)用DefaultMQProducerImpl的發(fā)送消息邏輯,執(zhí)行本地事務(wù)通過transactionListener調(diào)用本地的事務(wù)邏輯,我們看一下結(jié)束事務(wù)endTransaction方法實(shí)現(xiàn):
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}本地事務(wù)執(zhí)行后,則調(diào)用this.endTransaction()方法,根據(jù)本地事務(wù)執(zhí)行狀態(tài),去提交事務(wù)或者回滾事務(wù)。
如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息。
2.Broker處理事務(wù)消息
RocketMQ服務(wù)端有個NettyRequestProcessor接口,類似于spring的BeanPostProcessor,broker啟動的時候會把對應(yīng)的實(shí)現(xiàn)注冊到NettyRemotingServer的本地緩存processorTable中,在收到producer發(fā)送的消息會調(diào)用NettyServerHandler的channelRead0方法,然后會調(diào)用對應(yīng)的NettyRequestProcessor實(shí)現(xiàn)處理接收到的消息請求??匆幌耂endMessageProcessor實(shí)現(xiàn):
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext traceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
traceContext = buildMsgContext(ctx, requestHeader);
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
traceContext.setCommercialOwner(owner);
try {
this.executeSendMessageHookBefore(ctx, request, traceContext);
} catch (AbortProcessException e) {
final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
errorResponse.setOpaque(request.getOpaque());
return errorResponse;
}
RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
(ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
} else {
response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
(ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
}
return response;
}
}會調(diào)用到SendMessageProcessor.sendMessage(),判斷消息類型,進(jìn)行半消息存儲:
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader,
final TopicQueueMappingContext mappingContext,
final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
//...省略
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
long beginTimeMillis = this.brokerController.getMessageStore().now();
if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
//...異步發(fā)送
return null;
} else {
PutMessageResult putMessageResult = null;
if (sendTransactionPrepareMessage) {
//存儲事務(wù)消息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
//存儲普通消息
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return response;
}
}繼續(xù)看事務(wù)半消息存儲實(shí)現(xiàn)prepareMessage:
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}備份消息的原主題名稱與原隊(duì)列ID,然后取消事務(wù)消息的消息標(biāo)簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊(duì)列ID固定為0。與其他普通消息區(qū)分開,然后完成消息持久化。
到這里,broker就初步處理完了 Producer 發(fā)送的事務(wù)半消息。
當(dāng)客戶端TransactionMQProducer執(zhí)行endTransaction動作時,觸發(fā)broker事務(wù)消息的二階段提交,broker會執(zhí)行EndTransactionProcessor的processRequest方法:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
//...省略
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}邏輯很清晰,其核心實(shí)現(xiàn)如下:
- 根據(jù)commitlogOffset找到消息
- 如果是提交動作,就恢復(fù)原消息的主題與隊(duì)列,再次存入commitlog文件進(jìn)而轉(zhuǎn)到消息消費(fèi)隊(duì)列,供消費(fèi)者消費(fèi),然后將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
- 回滾消息,則直接將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
還有一種情況,如果本地事務(wù)執(zhí)行結(jié)果是UNKNOW或者由于網(wǎng)絡(luò)問題沒有提交,那么存儲的broker的事務(wù)消息處于漂浮狀態(tài),無法主動轉(zhuǎn)換成可消費(fèi)或者刪除狀態(tài),那么就需要broker有一種兜底機(jī)制來處理這種場景,當(dāng)然RocketMQ提供了一種補(bǔ)償機(jī)制,定時回查此類消息,由TransactionalMessageCheckService實(shí)現(xiàn):
@Override
public void run() {
log.info("Start transaction check service thread!");
while (!this.isStopped()) {
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}整體流程如下圖:

六、總結(jié)與思考
異常情況覆蓋
- 客戶端producer發(fā)送半消息失敗
可能由于網(wǎng)絡(luò)或者mq故障,導(dǎo)致 Producer 發(fā)送半消息(prepare)失敗??蛻舳朔?wù)可以執(zhí)行回滾操作,比如“訂單關(guān)閉”等。
- 半消息發(fā)送成功,本地事務(wù)執(zhí)行失敗
如果producer發(fā)送的半消息成功了,但是執(zhí)行本地事務(wù)失敗了,如更新訂單狀態(tài)為“已完成”。這種情況下,執(zhí)行本地事務(wù)失敗后,會返回rollback給 MQ,MQ會刪除之前發(fā)送的半消息。不會下發(fā)指令給下游依賴。
- 半消息投遞成功,沒收到MQ返回的ack
如果客戶端發(fā)送半消息成功后,沒有收到MQ返回的響應(yīng)??赡苁且?yàn)榫W(wǎng)絡(luò)問題,或者其他未知異常,客戶端以為發(fā)送MQ半消息失敗,執(zhí)行了逆向回滾流程。這個時候其實(shí)mq已經(jīng)保存半消息成功了,那這個消息怎么處理?
這個時候broker的補(bǔ)償邏輯上場,消息回查定時任務(wù)TransactionalMessageCheckService會每隔1分鐘掃描一次半消息隊(duì)列,判斷是否需要消息回查,然后回查訂單系統(tǒng)的本地事務(wù),這時MQ就會發(fā)現(xiàn)訂單已經(jīng)變成“已關(guān)閉”,此時就要發(fā)送rollback請求給mq,刪除之前的半消息。
- commit/rollback失敗
這個也是通過定時任務(wù)TransactionalMessageCheckService來做補(bǔ)償,它發(fā)現(xiàn)這個消息超過一定時間還沒有進(jìn)行二階段處理,就會回查本地事務(wù)。
缺點(diǎn)和替代方案
事務(wù)消息很好了解決了分布式事務(wù)場景的業(yè)務(wù)解耦,但是也存在一些問題,比如引入新的組件依賴,并且事務(wù)消息是強(qiáng)依賴,那么還有沒有其他比較可行的替代方案,ebay提出的本地消息表是一種解決方案,消息生產(chǎn)方新增消息表,并記錄消息發(fā)送狀態(tài)。消息表和業(yè)務(wù)數(shù)據(jù)要在一個事務(wù)里提交,也就是說他們要在一個數(shù)據(jù)庫里面。然后消息會經(jīng)過MQ發(fā)送到消息的消費(fèi)方。如果消息發(fā)送失敗,會進(jìn)行重試發(fā)送。消息消費(fèi)方,需要處理這個消息,并完成自己的業(yè)務(wù)邏輯。此時如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個業(yè)務(wù)補(bǔ)償消息,通知生產(chǎn)方進(jìn)行回滾等操作。
本地消息表的優(yōu)點(diǎn)是避免了分布式事務(wù),實(shí)現(xiàn)了最終一致性,缺點(diǎn)也明顯,消息表會耦合到業(yè)務(wù)系統(tǒng)中,如果沒有封裝好的解決方案,會有很多支撐邏輯要處理。
以上就是RocketMQ事務(wù)消息使用與原理詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 事務(wù)消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java?byte數(shù)組轉(zhuǎn)String的幾種常用方法
在Java中數(shù)組是一種非常常見的數(shù)據(jù)結(jié)構(gòu),它可以用來存儲多個相同類型的數(shù)據(jù),有時候,我們需要將數(shù)組轉(zhuǎn)換為字符串,以便于輸出或者傳遞給其他方法,這篇文章主要給大家介紹了關(guān)于java?byte數(shù)組轉(zhuǎn)String的幾種常用方法,需要的朋友可以參考下2024-09-09
基于springboot i18n國際化后臺多種語言設(shè)置的方式
這篇文章主要介紹了基于springboot i18n國際化后臺多種語言設(shè)置的方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
IntelliJ?IDEA?2022.1.1創(chuàng)建java項(xiàng)目的詳細(xì)方法步驟
最近安裝了IntelliJ IDEA 2022.1.1,發(fā)現(xiàn)新版本的窗口還有些變化的,所以下面這篇文章主要給大家介紹了關(guān)于IntelliJ?IDEA?2022.1.1創(chuàng)建java項(xiàng)目的詳細(xì)方法步驟,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07
@RequestBody 部分屬性沒有轉(zhuǎn)化成功的處理
這篇文章主要介紹了@RequestBody 部分屬性沒有轉(zhuǎn)化成功的處理方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
SpringBoot啟動自動終止也不報(bào)錯的原因及解決
這篇文章主要介紹了SpringBoot啟動自動終止也不報(bào)錯的原因及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09

