使用@TransactionalEventListener監(jiān)聽事務教程
@TransactionalEventListener監(jiān)聽事務
項目背景
最近在項目遇到一個問題
A方法體內(nèi)有 INSERT、UPDATE或者DELETE操作,最后會發(fā)送一段MQ給外部,外部接收到MQ后會再發(fā)送一段請求過來,系統(tǒng)收到請求后會執(zhí)行B方法,B方法會依賴A方法修改后的結(jié)果,這就有一個問題,如果A方法事務沒有提交;且B方法的請求過來了會查詢到事務未提交前的狀態(tài),這就會有問題
解決辦法:@TransactionalEventListener
在Spring4.2+,有一種叫做TransactionEventListener的方式,能夠控制在事務的時候Event事件的處理方式。 我們知道,Spring的發(fā)布訂閱模型實際上并不是異步的,而是同步的來將代碼進行解耦。而TransactionEventListener仍是通過這種方式,只不過加入了回調(diào)的方式來解決,這樣就能夠在事務進行Commited,Rollback…等的時候才會去進行Event的處理。
具體實現(xiàn)
//創(chuàng)建一個事件類
package com.qk.cas.config;
import org.springframework.context.ApplicationEvent;
public class MyTransactionEvent extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private IProcesser processer;
public MyTransactionEvent(IProcesser processer) {
super(processer);
this.processer = processer;
}
public IProcesser getProcesser() {
return this.processer;
}
@FunctionalInterface
public interface IProcesser {
void handle();
}
}
//創(chuàng)建一個監(jiān)聽類
package com.qk.cas.config;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
@Component
public class MyTransactionListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void hanldeOrderCreatedEvent(MyTransactionEvent event) {
event.getProcesser().handle();
}
}
//MQ方法的變動
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendCreditResult(String applyNo, String jsonString) {
eventPublisher.publishEvent(new MyTransactionEvent(() -> {
LOGGER.info("MQ。APPLY_NO:[{}]。KEY:[{}]。通知報文:[{}]", applyNo, Queues.CREDIT_RESULT, jsonString);
rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString);
}));
}
拓展
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 只有當前事務提交之后,才會執(zhí)行事件監(jiān)聽的方法,其中參數(shù)phase默認為AFTER_COMMIT,共有四個枚舉:
public enum TransactionPhase {
/**
* Fire the event before transaction commit.
* @see TransactionSynchronization#beforeCommit(boolean)
*/
BEFORE_COMMIT,
/**
* Fire the event after the commit has completed successfully.
* <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
* therefore executes in the same after-completion sequence of events,
* (and not in {@link TransactionSynchronization#afterCommit()}).
* @see TransactionSynchronization#afterCompletion(int)
* @see TransactionSynchronization#STATUS_COMMITTED
*/
AFTER_COMMIT,
/**
* Fire the event if the transaction has rolled back.
* <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
* therefore executes in the same after-completion sequence of events.
* @see TransactionSynchronization#afterCompletion(int)
* @see TransactionSynchronization#STATUS_ROLLED_BACK
*/
AFTER_ROLLBACK,
/**
* Fire the event after the transaction has completed.
* <p>For more fine-grained events, use {@link #AFTER_COMMIT} or
* {@link #AFTER_ROLLBACK} to intercept transaction commit
* or rollback, respectively.
* @see TransactionSynchronization#afterCompletion(int)
*/
AFTER_COMPLETION
}
注解@TransactionalEventListener
例如 用戶注冊之后需要計算用戶的邀請關系,遞歸操作。如果注冊的時候包含多步驗證,生成基本初始化數(shù)據(jù),這時候我們通過mq發(fā)送消息來處理這個邀請關系,會出現(xiàn)一個問題,就是用戶還沒注冊數(shù)據(jù)還沒入庫,邀請關系就開始執(zhí)行,但是查不到數(shù)據(jù),導致出錯。
@TransactionalEventListener 可以實現(xiàn)事務的監(jiān)聽,可以在提交之后再進行操作。
監(jiān)聽的對象
package com.jinglitong.springshop.interceptor;
import com.jinglitong.springshop.entity.Customer;
import org.springframework.context.ApplicationEvent;
public class RegCustomerEvent extends ApplicationEvent{
public RegCustomerEvent(Customer customer){
super(customer);
}
}
監(jiān)聽到之后的操作
package com.jinglitong.springshop.interceptor;
import com.alibaba.fastjson.JSON;
import com.jinglitong.springshop.entity.Customer;
import com.jinglitong.springshop.entity.MqMessageRecord;
import com.jinglitong.springshop.servcie.MqMessageRecordService;
import com.jinglitong.springshop.util.AliMQServiceUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class RegCustomerListener {
@Value("${aliyun.mq.order.topic}")
private String topic;
@Value("${aliyun.mq.regist.product}")
private String registGroup;
@Value("${aliyun.mq.regist.tag}")
private String registTag;
@Autowired
MqMessageRecordService mqMessageRecordService;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) {
Customer cust = (Customer) regCustomerEvent.getSource();
Map<String, String> map = new HashMap<String, String>();
map.put("custId", cust.getZid());
map.put("account", cust.getAccount());
log.info("put regist notice to Mq start");
String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup);
MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup);
if(StringUtils.isEmpty(hdResult)) {
insert.setStatus(false);
}else {
insert.setStatus(true);
}
mqMessageRecordService.insertRecord(insert);
log.info("put regist notice to Mq end");
log.info("regist notice userId : " + cust.getAccount());
}
private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) {
MqMessageRecord msg = new MqMessageRecord();
msg.setFlowId(custId);
msg.setGroupName(groupId);
msg.setTopic(topic);
msg.setTag(tag);
msg.setMsgId(result);
msg.setDataBody(jsonStr);
msg.setSendType(3);
msg.setGroupType(1);
msg.setCreateTime(new Date());
return msg;
}
}
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
applicationEventPublisher.publishEvent(new RegCustomerEvent (XXX));
這樣可以確保數(shù)據(jù)入庫之后再進行異步計算
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
kafka 重新分配partition和調(diào)整replica的數(shù)量實現(xiàn)
當需要提升Kafka集群的性能和負載均衡時,可通過kafka-reassign-partitions.sh命令手動重新分配Partition,增加節(jié)點后,可以將Topic的Partition的Leader節(jié)點均勻分布,以提高寫入和消費速度,感興趣的可以了解一下2022-03-03
SpringBoot靜態(tài)資源css,js,img配置方案
這篇文章主要介紹了SpringBoot靜態(tài)資源css,js,img配置方案,下文給大家分享了三種解決方案,需要的朋友可以參考下2017-07-07
SpringBoot中@KafkaListener使用${}動態(tài)指定topic問題
在SpringKafka中,使用${}引用Spring屬性配置,可以在不同環(huán)境中重新配置topic名稱,而無需修改代碼,在application.properties或application.yml中定義topic名稱,并在代碼中使用${}引用2024-12-12
解決grails服務端口沖突的辦法(grails修改端口號)
grails中默認的服務端口為8080,當本機中需要同時啟動兩個不同的項目時,就會造成端口沖突,下面給出解決方法2013-12-12
使用Idea maven創(chuàng)建Spring項目過程圖解
這篇文章主要介紹了使用Idea maven創(chuàng)建Spring項目過程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02
基于springboot的flowable工作流實戰(zhàn)流程分析
這篇文章主要介紹了基于springboot的flowable工作流實戰(zhàn)流程分析,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-10-10

