Spring?boot異步任務(wù)原理全面分析
前言
我們經(jīng)常在需要提升性能或者項(xiàng)目架構(gòu)解耦的過(guò)程中,使用線程池異步執(zhí)行任務(wù),經(jīng)常使用ThreadPoolExecutor創(chuàng)建線程池。那么Spring對(duì)異步任務(wù)是如何處理的呢?
1. spring 異步任務(wù)
估計(jì)或多或少了解過(guò)一些,比如@EnableAsync可以開(kāi)啟異步任務(wù),@Async用于注解說(shuō)明當(dāng)前方法是異步執(zhí)行,下面使用demo看看Spring的異步任務(wù)如何執(zhí)行。
pom依賴(lài),其實(shí)僅依賴(lài)Spring core context 就可以了,這里演示,另外spring boot還要許多好玩的特性。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.3.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>main & controller
@RestController
@SpringBootApplication
public class AsyncMain {
public static void main(String[] args) {
SpringApplication.run(AsyncMain.class, args);
}
@Autowired
private TaskService taskService;
@RequestMapping(value = "/async-task", method = RequestMethod.GET)
public String asyncMapping(){
System.out.println(Thread.currentThread().getThreadGroup() + "http-------" + Thread.currentThread().getName());
taskService.doTask();
return "exec http ok--------------";
}
}異步任務(wù)服務(wù)
@EnableAsync
@Service
public class TaskService {
@Async
public String doTask(){
System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName());
return "do task done";
}
}運(yùn)行main方法,訪問(wèn)localhost:8080/async-task,控制臺(tái)可以看到:

可以看到線程的name是task-1,而http訪問(wèn)的線程是http-nio-xxx。說(shuō)明任務(wù)異步執(zhí)行了。然而Spring的異步任務(wù)是如何執(zhí)行的呢,我們也并未創(chuàng)建線程池,難道Spring替我們創(chuàng)建了?
2. Spring boot異步任務(wù)執(zhí)行過(guò)程分析
首先,需要執(zhí)行異步任務(wù),必須創(chuàng)建線程池,那我們來(lái)揪出Spring創(chuàng)建的線程池,從啟動(dòng)日志可以看出

Spring默認(rèn)給我們創(chuàng)建了applicationTaskExecutor的ExecutorService的線程池。
通過(guò)源碼分析,Spring boot的starter已經(jīng)給我們?cè)O(shè)置了默認(rèn)的執(zhí)行器
/**
* {@link EnableAutoConfiguration Auto-configuration} for {@link TaskExecutor}.
*
* @author Stephane Nicoll
* @author Camille Vienot
* @since 2.1.0
*/
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {
/**
* Bean name of the application {@link TaskExecutor}.
*/
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
private final TaskExecutionProperties properties;
private final ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers;
private final ObjectProvider<TaskDecorator> taskDecorator;
public TaskExecutionAutoConfiguration(TaskExecutionProperties properties,
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
this.properties = properties;
this.taskExecutorCustomizers = taskExecutorCustomizers;
this.taskDecorator = taskDecorator;
}
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder() {
TaskExecutionProperties.Pool pool = this.properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.taskExecutorCustomizers);
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
return builder;
}
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
}追根溯源:在Spring boot的autoconfigure中已經(jīng)定義了默認(rèn)實(shí)現(xiàn)

Spring為我們定義了兩種實(shí)現(xiàn),如上圖所示,根據(jù)Spring boot的配置定律,我們可以通過(guò)配置來(lái)定義異步任務(wù)的參數(shù)
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
private final Pool pool = new Pool();
/**
* Prefix to use for the names of newly created threads.
*/
private String threadNamePrefix = "task-";
public Pool getPool() {
return this.pool;
}
public String getThreadNamePrefix() {
return this.threadNamePrefix;
}
public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
public static class Pool {
/**
* Queue capacity. An unbounded capacity does not increase the pool and therefore
* ignores the "max-size" property.
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* Core number of threads.
*/
private int coreSize = 8;
/**
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
* can expand up to that size to accommodate the load. Ignored if the queue is
* unbounded.
*/
private int maxSize = Integer.MAX_VALUE;
/**
* Whether core threads are allowed to time out. This enables dynamic growing and
* shrinking of the pool.
*/
private boolean allowCoreThreadTimeout = true;
/**
* Time limit for which threads may remain idle before being terminated.
*/
private Duration keepAlive = Duration.ofSeconds(60);省略get set方法,spring boot的配置以spring.task.execution開(kāi)頭,參數(shù)的設(shè)置參考如上源碼的屬性設(shè)置。
各位可以自行嘗試,當(dāng)然因?yàn)镾pring bean的定義方式,我們可以復(fù)寫(xiě)bean來(lái)達(dá)到自定義的目的
? ? @Lazy
?? ?@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
?? ??? ??? ?AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
?? ?@ConditionalOnMissingBean(Executor.class)
?? ?public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
?? ??? ?return builder.build();
?? ?}比如:
@Configuration
@EnableAsync
public class TaskAsyncConfig {
@Bean
public Executor initExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//定制線程名稱(chēng),還可以定制線程group
executor.setThreadFactory(new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r,
"async-task-" + threadNumber.getAndIncrement(),
0);
return t;
}
});
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setKeepAliveSeconds(5);
executor.setQueueCapacity(100);
// executor.setRejectedExecutionHandler(null);
return executor;
}
}重啟,訪問(wèn)localhost:8080/async-task,證明我們寫(xiě)的Executor已經(jīng)覆蓋系統(tǒng)默認(rèn)了。

3. Spring 異步任務(wù)執(zhí)行過(guò)程分析
方法斷點(diǎn)跟蹤

執(zhí)行異步任務(wù)使用Spring CGLib動(dòng)態(tài)代理AOP實(shí)現(xiàn)

可以看出動(dòng)態(tài)代理后使用AsyncExecutionInterceptor來(lái)處理異步邏輯,執(zhí)行submit方法

同理可以看出,默認(rèn)的taskExecutor使用BeanFactory中獲取。

默認(rèn)使用SimpleAsyncUncaughtExceptionHandler處理異步異常。下面我們來(lái)試試
@EnableAsync
@Service
public class TaskService {
@Async
public String doTask(){
System.out.println(Thread.currentThread().getThreadGroup() + "-------" + Thread.currentThread().getName());
throw new RuntimeException(" I`m a demo test exception-----------------");
}
}默認(rèn)會(huì)打印logger.error("Unexpected exception occurred invoking async method: " + method, ex);日志
public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
if (logger.isErrorEnabled()) {
logger.error("Unexpected exception occurred invoking async method: " + method, ex);
}
}
}運(yùn)行測(cè)試

4. Spring 自定義Executor與自定義異步異常處理
需要實(shí)現(xiàn)AsyncConfigurer接口,可以看到Spring要我們配合EnableAsync與Configuration注解同時(shí)使用
/**
* Interface to be implemented by @{@link org.springframework.context.annotation.Configuration
* Configuration} classes annotated with @{@link EnableAsync} that wish to customize the
* {@link Executor} instance used when processing async method invocations or the
* {@link AsyncUncaughtExceptionHandler} instance used to process exception thrown from
* async method with {@code void} return type.
*
* <p>Consider using {@link AsyncConfigurerSupport} providing default implementations for
* both methods if only one element needs to be customized. Furthermore, backward compatibility
* of this interface will be insured in case new customization options are introduced
* in the future.
*
* <p>See @{@link EnableAsync} for usage examples.
*
* @author Chris Beams
* @author Stephane Nicoll
* @since 3.1
* @see AbstractAsyncConfiguration
* @see EnableAsync
* @see AsyncConfigurerSupport
*/
public interface AsyncConfigurer {
/**
* The {@link Executor} instance to be used when processing async
* method invocations.
*/
@Nullable
default Executor getAsyncExecutor() {
return null;
}
/**
* The {@link AsyncUncaughtExceptionHandler} instance to be used
* when an exception is thrown during an asynchronous method execution
* with {@code void} return type.
*/
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}demo,如下改造
@Configuration
@EnableAsync
public class TaskAsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//定制線程名稱(chēng),還可以定制線程group
executor.setThreadFactory(new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
//重新定義一個(gè)名稱(chēng)
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r,
"async-task-all" + threadNumber.getAndIncrement(),
0);
return t;
}
});
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setKeepAliveSeconds(5);
executor.setQueueCapacity(100);
// executor.setRejectedExecutionHandler(null);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
System.out.println("do exception by myself");
}
};
}
}記住,此時(shí),Spring就不會(huì)替我們管理Executor了,需要我們自己初始化
executor.initialize();
觀其源碼就是new 一個(gè)ThreadPoolExecutor
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}運(yùn)行,結(jié)果如下

總結(jié)
Spring boot將簡(jiǎn)單的ThreadPoolExecutor通過(guò)封裝成了異步任務(wù),大大方便了程序的開(kāi)發(fā)。
然而我們?cè)谌缟系氖纠?,并沒(méi)有處理程序的異步執(zhí)行結(jié)果,其實(shí)Spring定義了結(jié)果的處理
/**
* AOP Alliance {@code MethodInterceptor} that processes method invocations
* asynchronously, using a given {@link org.springframework.core.task.AsyncTaskExecutor}.
* Typically used with the {@link org.springframework.scheduling.annotation.Async} annotation.
*
* <p>In terms of target method signatures, any parameter types are supported.
* However, the return type is constrained to either {@code void} or
* {@code java.util.concurrent.Future}. In the latter case, the Future handle
* returned from the proxy will be an actual asynchronous Future that can be used
* to track the result of the asynchronous method execution. However, since the
* target method needs to implement the same signature, it will have to return
* a temporary Future handle that just passes the return value through
* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
* or EJB 3.1's {@code javax.ejb.AsyncResult}).
*
* <p>When the return type is {@code java.util.concurrent.Future}, any exception thrown
* during the execution can be accessed and managed by the caller. With {@code void}
* return type however, such exceptions cannot be transmitted back. In that case an
* {@link AsyncUncaughtExceptionHandler} can be registered to process such exceptions.
*
* <p>As of Spring 3.1.2 the {@code AnnotationAsyncExecutionInterceptor} subclass is
* preferred for use due to its support for executor qualification in conjunction with
* Spring's {@code @Async} annotation.
*
* @author Juergen Hoeller
* @author Chris Beams
* @author Stephane Nicoll
* @since 3.0
* @see org.springframework.scheduling.annotation.Async
* @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
* @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
*/
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {<p>In terms of target method signatures, any parameter types are supported.
?* However, the return type is constrained to either {@code void} or
?* {@code java.util.concurrent.Future}. In the latter case, the Future handle
?* returned from the proxy will be an actual asynchronous Future that can be used
?* to track the result of the asynchronous method execution. However, since the
?* target method needs to implement the same signature, it will have to return
?* a temporary Future handle that just passes the return value through
?* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
?* or EJB 3.1's {@code javax.ejb.AsyncResult}).如果程序不返回void或者Future,那么通過(guò)AsyncResult來(lái)返回一個(gè)結(jié)果
另外Spring還定義了一個(gè)Task,即定時(shí)任務(wù)task,原理相同。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring boot測(cè)試找不到SpringRunner.class的問(wèn)題
這篇文章主要介紹了Spring boot測(cè)試找不到SpringRunner.class的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
SpringBoot導(dǎo)入導(dǎo)出數(shù)據(jù)實(shí)現(xiàn)方法詳解
這篇文章主要介紹了SpringBoot導(dǎo)入導(dǎo)出數(shù)據(jù)實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2022-12-12
啟動(dòng)Spring項(xiàng)目詳細(xì)過(guò)程(小結(jié))
這篇文章主要介紹了啟動(dòng)Spring項(xiàng)目詳細(xì)過(guò)程(小結(jié)),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11
java.util.ArrayDeque類(lèi)使用方法詳解
這篇文章主要介紹了java.util.ArrayDeque類(lèi)使用方法,java.util.ArrayDeque類(lèi)提供了可調(diào)整大小的陣列,并實(shí)現(xiàn)了Deque接口,感興趣的小伙伴們可以參考一下2016-03-03
Java框架入門(mén)之簡(jiǎn)單介紹SpringBoot框架
今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí),文章圍繞著SpringBoot框架展開(kāi),文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06
基于spring AOP @Around @Before @After的區(qū)別說(shuō)明
這篇文章主要介紹了基于spring AOP @Around @Before @After的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02
springboot前端傳參date類(lèi)型后臺(tái)處理的方式
這篇文章主要介紹了springboot前端傳參date類(lèi)型后臺(tái)處理的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
Java基于ReadWriteLock實(shí)現(xiàn)鎖的應(yīng)用
這篇文章主要介紹了Java基于ReadWriteLock實(shí)現(xiàn)鎖的應(yīng)用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10

