spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理)
spring event 事件異步處理(發(fā)布,監(jiān)聽,異步處理)
// 定義事件
public class EventDemo extends ApplicationEvent {
private String supplierCode;
private String productCode;
public EventDemo(Object source, String supplierCode, String productCode) {
super(source);
this.supplierCode = supplierCode;
this.productCode = productCode;
}
public String getSupplierCode() {
return supplierCode;
}
public String getProductCode() {
return productCode;
}
}// 發(fā)布事件
@Component
public class EventDemoPublish {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publish(String message) {
EventDemo demo = new EventDemo(this, message);
applicationEventPublisher.publishEvent(demo);
System.out.println("發(fā)布事件執(zhí)行結(jié)束");
}
}// 監(jiān)聽事件
@Component
public class EventDemoListener implements ApplicationListener<EventDemo> {
@Override
public void onApplicationEvent(EventDemo event) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("事件監(jiān)聽開始...... " + "商家編碼:" + event.getSupplierCode() + ",商品編碼:" + event.getProductCode());
}
}<!--定義事件異步處理-->
<bean id="commonTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 線程池維持處于Keep-alive狀態(tài)的線程數(shù)量。如果設(shè)置了allowCoreThreadTimeOut為true,該值可能為0。
并發(fā)線程數(shù),想達(dá)到真正的并發(fā)效果,最好對應(yīng)CPU的線程數(shù)及核心數(shù) -->
<property name="corePoolSize" value="2" />
<!-- 最大線程池容量 -->
<property name="maxPoolSize" value="2" />
<!-- 超過最大線程池容量后,允許的線程隊列數(shù) -->
<property name="queueCapacity" value="2" />
<!-- 線程池維護(hù)線程所允許的空閑時間 .單位毫秒,默認(rèn)為60s,超過這個時間后會將大于corePoolSize的線程關(guān)閉,保持corePoolSize的個數(shù) -->
<property name="keepAliveSeconds" value="1000" />
<!-- 允許核心線程超時: false(默認(rèn)值)不允許超時,哪怕空閑;true則使用keepAliveSeconds來控制等待超時時間,最終corePoolSize的個數(shù)可能為0 -->
<property name="allowCoreThreadTimeOut" value="true" />
<!-- 線程池對拒絕任務(wù)(無線程可用)的處理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
<!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主線程直接執(zhí)行該任務(wù),執(zhí)行完之后嘗試添加下一個任務(wù)到線程池中 -->
<!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
</property>
</bean>
<!--名字必須是applicationEventMulticaster,因為AbstractApplicationContext默認(rèn)找個-->
<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
<!--注入任務(wù)執(zhí)行器 這樣就實現(xiàn)了異步調(diào)用-->
<property name="taskExecutor" ref="commonTaskExecutor"></property>
</bean>spring事件之異步線程執(zhí)行
Spring 不僅為我們提供了IOC , AOP功能外,還在這個基礎(chǔ)上提供了許多的功能,我們用的最多的可能就是Spring MVC了吧,但是讓我們來看下spring-context包,其中包含了緩存、調(diào)度、校驗功能等等

這里主要想介紹一下Spring提供的觀察者模式實現(xiàn)(事件發(fā)布監(jiān)聽)及異步方法執(zhí)行,這些功能也都是基于AOP實現(xiàn)的
Spring 事件
觀察者模式大家都了解,它可以解耦各個功能,但是自己實現(xiàn)的話比較麻煩,Spring為我們提供了一種事件發(fā)布機(jī)制,可以按需要發(fā)布事件,之后由監(jiān)聽此事件的類或方法來執(zhí)行各自對應(yīng)的功能,代碼互相不影響,以后修改訂單后續(xù)的邏輯時不會影響到訂單創(chuàng)建,有點類似于使用MQ的感覺~
比如在配置中心apollo項目中,在portal創(chuàng)建了app后會發(fā)送app創(chuàng)建事件,監(jiān)聽此事件的邏輯處將此消息同步到各個環(huán)境的admin sevice中,大家有興趣可以看下相關(guān)代碼
現(xiàn)在我們來看看具體如何使用:假設(shè)一個下單場景,訂單創(chuàng)建成功后可能有一些后續(xù)邏輯要處理,但是和創(chuàng)建訂單本身沒有關(guān)系,此時就可以在創(chuàng)建訂單完成后,發(fā)送一個消息,又相應(yīng)部分的代碼進(jìn)行監(jiān)聽處理,避免代碼耦合到一起
首先創(chuàng)建對應(yīng)的事件
import org.springframework.context.ApplicationEvent;
public class CreatedOrderEvent extends ApplicationEvent {
private final String orderSn;
public CreatedOrderEvent(Object source, String orderSn) {
super(source);
this.orderSn = orderSn;
}
public String getOrderSn() {
return this.orderSn;
}
}現(xiàn)在還需要一個事件發(fā)布者和監(jiān)聽者,創(chuàng)建一下
發(fā)布
import org.springframework.context.ApplicationEventPublisher; private ApplicationEventPublisher applicationEventPublisher; applicationEventPublisher.publishEvent(new CreatedOrderEvent(this, orderSn));
監(jiān)聽的多種實現(xiàn)
1:注解實現(xiàn) @EventListener
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderEventListener {
@EventListener
public void orderEventListener(CreatedOrderEvent event) {
}
}2:代碼實現(xiàn)
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
@Slf4j
@Component
public class OrderEventListener implements ApplicationListener<CreatedOrderEvent> {
@Override
public void onApplicationEvent(CreatedOrderEvent event) {
}
}簡單的事件發(fā)布就完成了,其中的其他復(fù)雜邏輯由Spring替我們處理了
這里我們要注意一點:發(fā)布和監(jiān)聽后處理的邏輯是在一個線程中執(zhí)行的,不是異步執(zhí)行
異步方法
有時候我們?yōu)榱颂岣唔憫?yīng)速度,有些方法可以異步去執(zhí)行,一般情況下我們可能是手動將方法調(diào)用提交到線程池中去執(zhí)行,但是Spring 為我們提供了簡化的寫法,在開啟了異步情況下,不用修改代碼,只使用注解即可完成此功能
這時只需要在要異步執(zhí)行的方法上添加@Async注解即可異步執(zhí)行;@EnableAsync 啟動異步線程, 如
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@EnableAsync
public class OrderEventListener {
@Async
@EventListener
public void orderEventListener(CreatedOrderEvent event) {
}
}在使用@Async會有一些問題建議看各位看下相關(guān)文檔及源碼
我們通過Spring事件同步線程改為異步線程,默認(rèn)的線程池是不復(fù)用線程
我覺得這是這個注解最坑的地方,沒有之一!我們來看看它默認(rèn)使用的線程池是哪個,在前文的源碼分析中,我們可以看到?jīng)Q定要使用線程池的方法是
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
其源碼如下:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 可以在@Async注解中配置線程池的名字
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// 獲取默認(rèn)的線程池
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}最終會調(diào)用到
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
這個方法中
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}可以看到,它默認(rèn)使用的線程池是SimpleAsyncTaskExecutor。我們不看這個類的源碼,只看它上面的文檔注釋,如下:

主要說了三點
- 為每個任務(wù)新起一個線程
- 默認(rèn)線程數(shù)不做限制
- 不復(fù)用線程
就這三點,你還敢用嗎?只要你的任務(wù)耗時長一點,說不定服務(wù)器就給你來個OOM。
解決方案
最好的辦法就是使用自定義的線程池,主要有這么幾種配置方法
1.在之前的源碼分析中,我們可以知道,可以通過AsyncConfigurer來配置使用的線程池
如下:
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 異步線程池配置
*/
@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(200);
executor.setKeepAliveSeconds(5 * 60);
executor.setQueueCapacity(1000);
// 自定義實現(xiàn)拒絕策略
executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("當(dāng)前任務(wù)線程池隊列已滿."));
// 或者選擇已經(jīng)定義好的其中一種拒絕策略
// 丟棄任務(wù)并拋出RejectedExecutionException異常
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 丟棄任務(wù),但是不拋出異常
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 由調(diào)用線程處理該任務(wù)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 線程名稱前綴
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)生未知異常.", ex);
}
/**
* 增加日志MDC
*/
public static class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
/**
* Gets context for task *
*
* @return context for task
*/
private Map<String, String> getContextForTask() {
return MDC.getCopyOfContextMap();
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code execute()} etc.)
* all delegate to this.
*/
@Override
public void execute(@NonNull Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@NonNull
@Override
public Future<?> submit(@NonNull Runnable task) {
return super.submit(wrap(task, getContextForTask()));
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
return super.submit(wrap(task, getContextForTask()));
}
/**
* Wrap callable
*
* @param <T> parameter
* @param task task
* @param context context
* @return the callable
*/
private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return task.call();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
/**
* Wrap runnable
*
* @param runnable runnable
* @param context context
* @return the runnable
*/
private Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
}
}該方式實現(xiàn)線程的復(fù)用以及,子線程繼承父線程全鏈路traceId,方便定位問題
2.直接在@Async注解中配置要使用的線程池的名稱
@Async(value = "自定義線程名")
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解Mybatis攔截器安全加解密MySQL數(shù)據(jù)實戰(zhàn)
本文主要介紹了Mybatis攔截器安全加解密MySQL數(shù)據(jù)實戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01
springboot2.3 整合mybatis-plus 高級功能(圖文詳解)
這篇文章主要介紹了springboot2.3 整合mybatis-plus 高級功能,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08
Java使用poi-tl1.9.1生成Word文檔的技巧分享
本文將簡單介紹poi-tl的相關(guān)知識,通過一個實際的案例實踐,充分介紹如何利用poi-tl進(jìn)行目標(biāo)文檔的生成,同時分享幾個不同的office版本如何進(jìn)行圖表生成的解決方案,需要的朋友可以參考下2023-09-09
SpringCloud zookeeper作為注冊中心使用介紹
ZooKeeper由雅虎研究院開發(fā),是Google Chubby的開源實現(xiàn),后來托管到Apache,于2010年11月正式成為Apache的頂級項目。ZooKeeper是一個經(jīng)典的分布式數(shù)據(jù)一致性解決方案,致力于為分布式應(yīng)用提供一個高性能、高可用,且具有嚴(yán)格順序訪問控制能力的分布式協(xié)調(diào)服務(wù)2022-11-11

