spring的異步執(zhí)行使用與源碼詳解
在實(shí)際的開(kāi)發(fā)過(guò)程中,有些業(yè)務(wù)邏輯使用異步的方式處理更為合理。比如在某個(gè)業(yè)務(wù)邏輯中,需要把一些數(shù)據(jù)存入到redis緩存中,這個(gè)操作只是一個(gè)輔助的功能,成功或者失敗對(duì)主業(yè)務(wù)并不會(huì)產(chǎn)生根本影響,這個(gè)過(guò)程可以通過(guò)異步的方法去進(jìn)行。
Spring中通過(guò)在方法上設(shè)置@Async注解,可使得方法被異步調(diào)用。也就是說(shuō)該方法會(huì)在調(diào)用時(shí)立即返回,而這個(gè)方法的實(shí)際執(zhí)行交給Spring的TaskExecutor去完成。
異步執(zhí)行的使用
配置類(lèi)
使用@EnableAsync注解開(kāi)啟異步功能。
package com.morris.spring.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 開(kāi)啟Async
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
// 自定義線(xiàn)程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
}service層的使用
在需要異步執(zhí)行的方法上面加上@Async注解。
package com.morris.spring.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@Slf4j
public class AsyncService {
@Async
public void noResult() {
log.info("execute noResult");
}
@Async
public Future<String> hasResult() throws InterruptedException {
log.info("execute hasResult");
TimeUnit.SECONDS.sleep(5);
return new AsyncResult<>("hasResult success");
}
@Async
public CompletableFuture<String> completableFuture() throws InterruptedException {
log.info(" execute completableFuture");
TimeUnit.SECONDS.sleep(5);
return CompletableFuture.completedFuture("completableFuture success");
}
}測(cè)試類(lèi)
package com.morris.spring.demo.async;
import com.morris.spring.config.AsyncConfig;
import com.morris.spring.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 異步調(diào)用的演示
*/
@Slf4j
public class AsyncDemo {
@Test
public void test() throws ExecutionException, InterruptedException {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.register(AsyncService.class);
applicationContext.register(AsyncConfig.class);
applicationContext.refresh();
AsyncService asyncService = applicationContext.getBean(AsyncService.class);
asyncService.noResult(); // 無(wú)結(jié)果
Future<String> future = asyncService.hasResult();
log.info("hasResult: {}", future.get()); // 有結(jié)果
CompletableFuture<String> completableFuture = asyncService.completableFuture();
completableFuture.thenAcceptAsync(System.out::println);// 異步回調(diào)
log.info("completableFuture call down");
}
}運(yùn)行結(jié)果如下:
INFO MyExecutor-1 AsyncService:16 - execute noResult INFO MyExecutor-2 AsyncService:21 - execute hasResult INFO main AsyncDemo:29 - hasResult: hasResult success INFO MyExecutor-1 AsyncService:28 - execute completableFuture INFO main AsyncDemo:33 - completableFuture call down
通過(guò)日志可以發(fā)現(xiàn)AsyncService的方法都是通過(guò)線(xiàn)程名為MyExecutor-1的線(xiàn)程執(zhí)行的,這個(gè)名稱(chēng)的前綴是在AsyncConfig中指定的,而不是通過(guò)main線(xiàn)程執(zhí)行的。
兩個(gè)疑問(wèn):
- 是否可以不配置Executor線(xiàn)程池,Spring會(huì)默認(rèn)創(chuàng)建默認(rèn)的Executor,還是會(huì)報(bào)錯(cuò)?
- Executor線(xiàn)程池中執(zhí)行任務(wù)時(shí)如果拋出了異常,可否自定義異常的處理類(lèi)對(duì)異常進(jìn)行捕獲處理?
源碼分析
@EnableAsync
@EnableAsync主要是向Spring容器中導(dǎo)入了AsyncConfigurationSelector類(lèi)。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {AsyncConfigurationSelector
AsyncConfigurationSelector的主要方法當(dāng)然是selectImports(),注意這里會(huì)先調(diào)用父類(lèi)的selectImports() org.springframework.context.annotation.AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata)
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format(
"@%s is not present on importing class '%s' as expected",
annType.getSimpleName(), importingClassMetadata.getClassName()));
}
AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
// 模板方法模式,回調(diào)子類(lèi)的selectImports
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
}
return imports;
}org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
// 奇怪???@Transaction、@EnableCaching都是注入兩個(gè)類(lèi),一個(gè)config,一個(gè)registrar導(dǎo)入aop的入口類(lèi)
// 而這里只有一個(gè)config類(lèi)ProxyAsyncConfiguration
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}AsyncConfigurationSelector又導(dǎo)入了配置類(lèi)ProxyAsyncConfiguration。
ProxyAsyncConfiguration
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
/**
* 先看父類(lèi)AbstractAsyncConfiguration
* @return
*/
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
// 實(shí)例化AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}ProxyAsyncConfiguration向容器中注入了一個(gè)AsyncAnnotationBeanPostProcessor。
疑問(wèn):這里為啥是BeanPostProcessor,不應(yīng)該像事務(wù)切面或者緩存切面一樣,注入一個(gè)Advisor和XxxxInterceptor(Advice)嗎?
AbstractAsyncConfiguration
AbstractAsyncConfiguration是ProxyAsyncConfiguration的父類(lèi)。
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected AnnotationAttributes enableAsync;
@Nullable
protected Supplier<Executor> executor;
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
/**
* 實(shí)現(xiàn)了ImportAware.setImportMetadata
* 在ProxyAsyncConfiguration初始化后被調(diào)用
* @param importMetadata
*/
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
// 取得@EnableAsync注解
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
// configurers默認(rèn)為空,除非手動(dòng)注入AsyncConfigurer
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
}從這里可以看出,可以通過(guò)向spring容器中注入AsyncConfigurer來(lái)指定執(zhí)行異步任務(wù)的線(xiàn)程池和異常處理器。
AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor的繼承結(jié)構(gòu)圖:

AsyncAnnotationBeanPostProcessor主要實(shí)現(xiàn)了BeanFactoryAware和BeanPostProcessor接口。
org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 實(shí)例化Advisor
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}在AsyncAnnotationBeanPostProcessor實(shí)例化時(shí)實(shí)例化了切面AsyncAnnotationAdvisor。
每個(gè)bean實(shí)例化完后都會(huì)調(diào)用AsyncAnnotationBeanPostProcessor.postProcessAfterInitialization()判斷是否要生成代理對(duì)象。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
... ...
/**
* @see AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(java.lang.Object, java.lang.String)
*/
// isEligible會(huì)判斷哪些bean要生成代理
// 就是使用advisor中的pointcut進(jìn)行匹配
if (isEligible(bean, beanName)) {
// 創(chuàng)建代理
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}AsyncAnnotationAdvisor
切面AsyncAnnotationAdvisor包括通知AnnotationAsyncExecutionInterceptor和切點(diǎn)ComposablePointcut。
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler); // 創(chuàng)建AnnotationAsyncExecutionInterceptor
this.pointcut = buildPointcut(asyncAnnotationTypes); // 創(chuàng)建ComposablePointcut
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); // 類(lèi)
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); // 方法
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc); // 類(lèi)和方法的組合切點(diǎn)
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}AnnotationMatchingPointcut切面其實(shí)就是查看類(lèi)或者方法上面有沒(méi)有@Async注解。
AnnotationAsyncExecutionInterceptor
AnnotationAsyncExecutionInterceptor類(lèi)主要負(fù)責(zé)增強(qiáng)邏輯的實(shí)現(xiàn)。
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 獲得線(xiàn)程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 將目標(biāo)方法的執(zhí)行封裝為Callable,方便提交到線(xiàn)程池
Callable<Object> task = () -> {
try {
// 執(zhí)行目標(biāo)方法
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 提交任務(wù)
return oSubmit(task, executor, invocation.getMethod().getReturnType());
}org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
/**
* @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier(java.lang.reflect.Method)
*/
// 獲得@Async注解中的value屬性中指定的taskExecutor名稱(chēng)
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
// 獲取默認(rèn)的taskExecutor
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}determineAsyncExecutor()負(fù)責(zé)獲取異步任務(wù)執(zhí)行的線(xiàn)程池,線(xiàn)程池的查找步驟如下:
- 從spring容器中尋找@Async注解中的value屬性中指定的taskExecutor
- 尋找默認(rèn)的defaultExecutor
默認(rèn)的defaultExecutor是怎么來(lái)的?
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// defaultExecutor默認(rèn)為從beanFactory獲取TaskExecutor或者bean名字為taskExecutor的Executor,beanFactory.getBean(TaskExecutor.class)
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
// exceptionHandler默認(rèn)為SimpleAsyncUncaughtExceptionHandler
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}defaultExecutor首先取參數(shù)傳入的defaultExecutor,這個(gè)參數(shù)來(lái)自接口AsyncConfigurer.getAsyncExecutor(),如果參數(shù)為null,那么就調(diào)用getDefaultExecutor(),注意這個(gè)方法子類(lèi)AsyncExecutionInterceptor重寫(xiě)了:
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}如果找不到defaultExecutor就會(huì)創(chuàng)建一個(gè)SimpleAsyncTaskExecutor。
再來(lái)看看父類(lèi)的AsyncExecutionAspectSupport#getDefaultExecutor: org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
// 找名為taskExecutor的Executor
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}先從beanFactory中獲取TaskExecutor類(lèi)型的對(duì)象,然后再找名為taskExecutor的Executor對(duì)象。
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 執(zhí)行任務(wù)
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}doSubmit()負(fù)責(zé)將任務(wù)提交至線(xiàn)程池中,并對(duì)各種方法的返回值進(jìn)行處理。
到此這篇關(guān)于spring的異步執(zhí)行使用與源碼詳解的文章就介紹到這了,更多相關(guān)spring的異步執(zhí)行內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Java開(kāi)發(fā)中的安全編碼問(wèn)題
這篇文章主要介紹了淺談Java開(kāi)發(fā)中的安全編碼問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10
Spring Boot整合elasticsearch的詳細(xì)步驟
這篇文章主要介紹了Spring Boot整合elasticsearch的詳細(xì)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04
Java網(wǎng)絡(luò)編程實(shí)現(xiàn)的簡(jiǎn)單端口掃描器示例
這篇文章主要介紹了Java網(wǎng)絡(luò)編程實(shí)現(xiàn)的簡(jiǎn)單端口掃描器,涉及Java網(wǎng)絡(luò)編程Socket組建、swing組建及多線(xiàn)程相關(guān)操作技巧,需要的朋友可以參考下2018-04-04
SpringBoot Admin的簡(jiǎn)單使用的方法步驟
本文主要介紹了SpringBoot Admin的簡(jiǎn)單使用的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01
Java?C++題解leetcode769最多能完成排序的塊
這篇文章主要為大家介紹了Java?C++題解leetcode769最多能完成排序的塊示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10

