SpringBoot CommandLineRunner的異步任務機制使用
在SpringBoot中,CommandLineRunner 本身并不是一個直接支持異步任務的機制。
CommandLineRunner 接口定義了一個在 Spring Boot 應用程序啟動后立即同步執(zhí)行的方法 run(String... args)。
這意味著,通過實現(xiàn) CommandLineRunner 接口定義的任務將在主線程中順序執(zhí)行,而不會創(chuàng)建新的線程來異步執(zhí)行這些任務。
然而,如果你希望在 CommandLineRunner 中執(zhí)行異步任務,你可以手動創(chuàng)建線程或使用 Spring 的異步執(zhí)行功能。
以下是一些實現(xiàn)異步任務的方法。
1.概要分析
1.1 手動創(chuàng)建線程
在 CommandLineRunner 的 run 方法中,你可以直接創(chuàng)建并啟動一個新的線程來執(zhí)行異步任務。
這種方法簡單直接,但需要注意線程管理和異常處理。
@Component
public class MyCommandLineRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
new Thread(() -> {
// 異步執(zhí)行的代碼
System.out.println("異步任務執(zhí)行中...");
// 模擬耗時操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("異步任務完成");
}).start();
// 主線程繼續(xù)執(zhí)行,不會等待異步任務完成
System.out.println("CommandLineRunner 執(zhí)行完畢,主線程繼續(xù)");
}
}1.2 使用 Spring 的異步執(zhí)行功能
如果你希望利用 Spring 的異步支持來執(zhí)行異步任務,你可以在 CommandLineRunner 中注入一個使用 @Async 注解的服務。但是,需要注意的是,由于 CommandLineRunner 的 run 方法本身是在 Spring 容器完全初始化之后同步執(zhí)行的,因此即使你調(diào)用了一個異步服務方法,run 方法本身仍然會立即返回,不會等待異步任務完成。
首先,你需要在 Spring Boot 應用中啟用異步支持,通過在啟動類上添加 @EnableAsync 注解。
然后,你可以創(chuàng)建一個異步服務:
@Service
public class AsyncService {
@Async
public void executeAsyncTask() {
// 異步執(zhí)行的代碼
System.out.println("異步任務執(zhí)行中...");
// 模擬耗時操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("異步任務完成");
}
}在 CommandLineRunner 中注入并使用這個服務:
@Component
public class MyCommandLineRunner implements CommandLineRunner {
@Autowired
private AsyncService asyncService;
@Override
public void run(String... args) throws Exception {
asyncService.executeAsyncTask(); // 調(diào)用異步方法,但 CommandLineRunner 的 run 方法會立即返回
System.out.println("CommandLineRunner 執(zhí)行完畢,主線程繼續(xù),不會等待異步任務完成");
}
}雖然 CommandLineRunner 本身不支持異步執(zhí)行,但你可以通過手動創(chuàng)建線程或使用 Spring 的異步支持來在 CommandLineRunner 中執(zhí)行異步任務。然而,需要注意的是,CommandLineRunner 的 run 方法本身仍然是同步執(zhí)行的,它不會等待任何異步任務完成。
如果你的應用程序依賴于異步任務的結果,你可能需要采用其他機制(如 Future、CompletableFuture 或消息隊列)來管理異步任務的執(zhí)行和結果。
2.核心原理分析
org.springframework.boot.CommandLineRunner 是 Spring Boot 框架中的一個核心接口,其原理分析可以從以下幾個方面進行。
2.1 接口定義與功能
CommandLineRunner 是一個函數(shù)式接口(Functional Interface),它只定義了一個抽象方法 run(String... args)。
這個方法在 Spring Boot 應用程序啟動完成后被調(diào)用,允許開發(fā)者執(zhí)行一些初始化操作或啟動后的任務。這些任務可能包括數(shù)據(jù)初始化、緩存預熱、日志記錄等。
2.2 執(zhí)行時機
當 Spring Boot 應用程序啟動時,Spring 容器會完成一系列的初始化操作,包括 Bean 的加載和依賴注入等。
在所有 Spring Bean 都初始化完成后,Spring Boot 會查找所有實現(xiàn)了 CommandLineRunner 接口的 Bean,并依次調(diào)用它們的 run 方法。
這意味著 CommandLineRunner 的執(zhí)行時機是在 Spring 上下文準備好之后,但在應用程序對外提供服務之前。
2.3 命令行參數(shù)傳遞
CommandLineRunner 的 run 方法接收一個 String... args 參數(shù),這個參數(shù)包含了啟動應用程序時傳遞給它的命令行參數(shù)。
這使得開發(fā)者可以根據(jù)命令行參數(shù)的不同來執(zhí)行不同的初始化邏輯。
2.4 實現(xiàn)與注冊
要使用 CommandLineRunner,開發(fā)者需要創(chuàng)建一個類并實現(xiàn)這個接口,然后重寫 run 方法以定義自己的初始化邏輯。
為了讓 Spring 容器能夠掃描到這個實現(xiàn)類并將其注冊為一個 Bean,通常會在類上添加 @Component 或其他類似的注解(如 @Service、@Repository 等)。此外,也可以通過編程方式在配置類中顯式地注冊這個 Bean。
2.5 執(zhí)行順序
如果應用程序中有多個實現(xiàn)了 CommandLineRunner 接口的類,那么它們的 run 方法將按照一定的順序執(zhí)行。
默認情況下,執(zhí)行順序取決于 Spring 容器注冊這些 Bean 的順序。但是,開發(fā)者可以通過 @Order 注解或實現(xiàn) Ordered 接口來指定執(zhí)行順序。
@Order 注解的值越小,優(yōu)先級越高,相應的 run 方法就會越早執(zhí)行。
2.6 與 ApplicationRunner 的區(qū)別
值得注意的是,Spring Boot 還提供了另一個類似的接口 ApplicationRunner,它也用于在應用程序啟動后執(zhí)行初始化任務。與 CommandLineRunner 不同的是,ApplicationRunner 的 run 方法接收一個 ApplicationArguments 參數(shù)而不是 String... args。
ApplicationArguments 提供了對命令行參數(shù)的更高級別訪問,包括選項和非選項參數(shù)等。此外,如果同時存在 CommandLineRunner 和 ApplicationRunner 的實現(xiàn),那么 CommandLineRunner 的實現(xiàn)會先于 ApplicationRunner 的實現(xiàn)被調(diào)用。
2.7 應用場景
CommandLineRunner 適用于需要在應用程序啟動后立即執(zhí)行的任務場景,如數(shù)據(jù)初始化、配置加載、緩存預熱等。通過使用 CommandLineRunner,開發(fā)者可以確保這些任務在應用程序對外提供服務之前完成,從而提高應用程序的性能和用戶體驗。
綜上所述,org.springframework.boot.CommandLineRunner 是 Spring Boot 框架中用于執(zhí)行啟動后任務的強大機制,它通過簡單的接口定義和靈活的注冊方式,為開發(fā)者提供了方便、高效的初始化操作手段。
3.部分源碼分析
3.1 啟動Spring Boot應用程序
/**
* 啟動應用程序。
*
* @param args 命令行參數(shù)
* @return ConfigurableApplicationContext 應用程序上下文
*/
public ConfigurableApplicationContext run(String... args) {
// 記錄應用程序啟動時間
long startTime = System.nanoTime();
// 創(chuàng)建引導上下文
DefaultBootstrapContext bootstrapContext = createBootstrapContext();
ConfigurableApplicationContext context = null;
// 配置無頭模式屬性
configureHeadlessProperty();
// 獲取啟動監(jiān)聽器
SpringApplicationRunListeners listeners = getRunListeners(args);
// 通知監(jiān)聽器應用程序即將啟動
listeners.starting(bootstrapContext, this.mainApplicationClass);
try {
// 創(chuàng)建并配置應用參數(shù)
ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
// 配置忽略BeanInfo的設置
configureIgnoreBeanInfo(environment);
// 打印啟動橫幅
Banner printedBanner = printBanner(environment);
// 創(chuàng)建應用程序上下文
context = createApplicationContext();
// 設置應用啟動器
context.setApplicationStartup(this.applicationStartup);
// 準備應用程序上下文
prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
// 刷新上下文,使配置生效
refreshContext(context);
// 啟動后配置
afterRefresh(context, applicationArguments);
// 計算應用程序啟動時間
Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
// 如果啟用了啟動信息日志,則記錄啟動信息
if (this.logStartupInfo) {
new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup);
}
// 通知監(jiān)聽器應用程序已啟動
listeners.started(context, timeTakenToStartup);
// 調(diào)用應用程序運行者
callRunners(context, applicationArguments);
} catch (Throwable ex) {
// 處理啟動失敗
handleRunFailure(context, ex, listeners);
throw new IllegalStateException(ex);
}
try {
// 計算應用程序就緒時間
Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
// 通知監(jiān)聽器應用程序已就緒
listeners.ready(context, timeTakenToReady);
} catch (Throwable ex) {
// 處理就緒失敗
handleRunFailure(context, ex, null);
throw new IllegalStateException(ex);
}
// 返回應用程序上下文
return context;
}3.2 調(diào)用所有的Runner實現(xiàn)類
/**
* 調(diào)用所有的Runner實現(xiàn)類。
*
* 本方法的目的是遍歷ApplicationContext中所有的Runner實例,并根據(jù)它們的類型分別調(diào)用相應的方法。
* Runner和CommandLineRunner是Spring Boot提供的一組接口,用于在應用程序啟動后執(zhí)行一些自定義的初始化邏輯。
* 這里通過判斷Runner的類型,來決定是調(diào)用ApplicationRunner還是CommandLineRunner的方法,從而實現(xiàn)對不同類型Runner的兼容處理。
*
* @param context Spring應用上下文,用于獲取BeanProvider以獲取Runner實例。
* @param args 命令行參數(shù),傳遞給每個Runner的調(diào)用方法。
*/
private void callRunners(ApplicationContext context, ApplicationArguments args) {
// 通過BeanProvider獲取所有Runner類型的bean,并以有序的方式遍歷它們
context.getBeanProvider(Runner.class).orderedStream().forEach((runner) -> {
// 如果runner是ApplicationRunner類型,則調(diào)用callRunner方法,并傳入ApplicationRunner和命令行參數(shù)
if (runner instanceof ApplicationRunner) {
callRunner((ApplicationRunner) runner, args);
}
// 如果runner是CommandLineRunner類型,則同樣調(diào)用callRunner方法,并傳入CommandLineRunner和命令行參數(shù)
if (runner instanceof CommandLineRunner) {
callRunner((CommandLineRunner) runner, args);
}
});
}4.典型的應用場景分析
Seata中的TM(Transaction Manager,事務管理器)和RM(Resource Manager,資源管理器)是分布式事務框架中的關鍵角色,它們各自承擔著特定的職責,以確保分布式事務的一致性和完整性。
4.1 TM(Transaction Manager,事務管理器)
4.1.1 定義與職責
- (1)TM負責定義全局事務的范圍,即開始全局事務、提交或回滾全局事務。它是分布式事務的發(fā)起者和終結者,類似于本地事務中的begin…commit…rollback操作,但針對的是全局的分布式事務。
- (2)在Seata框架中,TM與業(yè)務系統(tǒng)集成在一起,作為客戶端存在。當業(yè)務操作需要跨多個服務或數(shù)據(jù)庫時,TM會啟動一個全局事務,并管理這個事務的生命周期。
4.1.2 工作流程
- (1)TM向TC(Transaction Coordinator,事務協(xié)調(diào)者)請求開啟一個全局事務,TC生成一個全局唯一的事務ID(XID)并返回給TM。
- (2)TM攜帶XID進行遠程服務調(diào)用,XID在微服務調(diào)用鏈中傳播,確保所有參與的分支事務都能被正確關聯(lián)到全局事務中。
- (3)當業(yè)務操作完成后,TM根據(jù)業(yè)務邏輯向TC發(fā)起全局事務的提交或回滾請求。
- (4)TC根據(jù)各分支事務的執(zhí)行結果,決定全局事務的提交或回滾,并通知所有RM進行相應的操作。
4.2 RM(Resource Manager,資源管理器)
4.2.1 定義與職責
- (1)RM負責管理分支事務處理的資源,如數(shù)據(jù)庫連接、消息隊列等。它是分布式事務中具體執(zhí)行操作的組件。
- (2)RM與TC進行通信,注冊分支事務、報告分支事務的狀態(tài),并根據(jù)TC的指令驅動分支事務的提交或回滾。
- (3)在Seata框架中,RM同樣與業(yè)務系統(tǒng)集成在一起,作為客戶端存在。每個參與全局事務的服務或數(shù)據(jù)庫操作,都會有一個對應的RM來管理。
4.2.2 工作流程
- (1)在執(zhí)行具體業(yè)務操作之前,RM會向TC注冊分支事務,并將其納入全局事務的管轄范圍。
- (2)RM執(zhí)行本地事務操作,如數(shù)據(jù)庫更新、消息發(fā)送等,并確保這些操作是可回滾和持久化的。
- (3)RM將分支事務的執(zhí)行結果(提交或回滾)上報給TC。
- (4)當TC收到TM的全局事務提交或回滾請求時,它會根據(jù)各分支事務的狀態(tài)決定全局事務的結果,并通知所有RM進行相應的提交或回滾操作。
4.3 Seata RM和TM與Seata Server之間的RPC通信
在Seata中,TM(Transaction Manager,事務管理器)與Seata Server(即TC,Transaction Coordinator,事務協(xié)調(diào)者)之間的通信是通過RPC(Remote Procedure Call,遠程過程調(diào)用)實現(xiàn)的。
RPC是一種允許程序在網(wǎng)絡上調(diào)用遠程計算機上程序的技術,就像調(diào)用本地計算機上的程序一樣。
4.3.1 TM與Seata Server之間的RPC通信
(1)建立連接
- TM在啟動時會嘗試與Seata Server建立長連接。這個連接是通過Netty等網(wǎng)絡通信框架實現(xiàn)的,Netty提供了高效、異步的網(wǎng)絡通信能力。
- 在建立連接的過程中,TM會向Seata Server發(fā)送注冊請求,包括應用ID、事務組名稱等信息,以便Seata Server能夠識別和管理該TM。
(2)事務管理
- 一旦連接建立,TM就可以通過RPC調(diào)用Seata Server提供的服務來管理全局事務。
- 例如,當業(yè)務操作需要跨多個服務或數(shù)據(jù)庫時,TM會向Seata Server請求開啟一個全局事務,并獲取一個全局唯一的事務ID(XID)。
- 在執(zhí)行遠程服務調(diào)用時,TM會將XID攜帶在調(diào)用中,以便參與的RM能夠識別并將分支事務注冊到全局事務中。
- 當業(yè)務操作完成后,TM會根據(jù)業(yè)務邏輯向Seata Server發(fā)起全局事務的提交或回滾請求。
(3)通信協(xié)議
- Seata使用自定義的通信協(xié)議來進行RPC通信,該協(xié)議支持事務的創(chuàng)建、提交、回滾等操作。
- 通信過程中,Seata還實現(xiàn)了心跳檢測、超時重試等機制來確保通信的可靠性和穩(wěn)定性。
(4)性能優(yōu)化
- 為了提高RPC通信的性能,Seata采用了多種優(yōu)化策略,如使用Netty的主從Reactor多線程模型來處理并發(fā)請求、采用批量發(fā)送請求來減少網(wǎng)絡開銷等。
4.3.2 RM與Seata Server之間的RPC通信
在Seata中,RM負責管理分支事務處理的資源,如數(shù)據(jù)庫連接等。
當RM執(zhí)行分支事務時,它需要與Seata Server進行通信,以注冊分支事務、報告分支事務的狀態(tài),并根據(jù)Seata Server的指令驅動分支事務的提交或回滾。
這種通信是通過RPC機制實現(xiàn)的,它允許RM遠程調(diào)用Seata Server上的服務。
(1)建立連接
- 當RM啟動時,它會根據(jù)配置嘗試與Seata Server建立長連接。這個連接是通過Netty等網(wǎng)絡通信框架實現(xiàn)的,Netty提供了高效、異步的網(wǎng)絡通信能力。
- 在建立連接的過程中,RM會向Seata Server發(fā)送注冊請求,包括應用ID、事務組名稱等信息,以便Seata Server能夠識別和管理該RM。
(2)分支事務注冊
- 當RM執(zhí)行一個分支事務時,它會向Seata Server注冊該分支事務。注冊過程中,RM會攜帶全局事務ID(XID)等信息,以便Seata Server能夠將該分支事務關聯(lián)到相應的全局事務中。
- Seata Server在收到注冊請求后,會為該分支事務分配一個唯一的分支事務ID,并將其注冊到全局事務中。
(3)狀態(tài)報告與指令執(zhí)行
- 在分支事務執(zhí)行過程中,RM會定期向Seata Server報告分支事務的狀態(tài),如正在執(zhí)行、已提交、已回滾等。
- 當全局事務需要提交或回滾時,Seata Server會根據(jù)各分支事務的狀態(tài)和結果,向RM發(fā)送相應的提交或回滾指令。
- RM在收到指令后,會執(zhí)行相應的操作,如提交本地事務、回滾本地事務等,并將執(zhí)行結果報告給Seata Server。
(4)心跳檢測與異常處理
- 為了保持連接的活躍狀態(tài),RM會定期向Seata Server發(fā)送心跳消息。
- 如果Seata Server在一段時間內(nèi)沒有收到RM的心跳消息,它可能會認為RM已經(jīng)離線,并采取相應的異常處理措施,如重試連接、記錄日志等。
Seata RM與Seata Server之間的RPC通信是Seata分布式事務框架中的重要組成部分。通過高效的RPC通信機制,RM能夠遠程調(diào)用Seata Server提供的服務來管理分支事務,確保分布式事務的一致性和完整性。同時,Seata還通過多種優(yōu)化策略來提高RPC通信的性能和可靠性。
4.4 Seata Server利用SpringBoot CommandLineRunner啟動服務端通信渠道
在Seata Server中,ServerRunner類是一個重要的組件,它繼承自Spring Boot的CommandLineRunner接口。
這意味著在Spring Boot應用啟動后,ServerRunner的run()方法會被自動執(zhí)行。這種方法通常用于在應用啟動后立即執(zhí)行一段特定的代碼,比如初始化資源、啟動服務等。
ServerRunner類的主要職責是初始化Netty通信渠道,即NettyRemotingServer。
Netty是一個高性能的異步事件驅動的網(wǎng)絡應用框架,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端。
在Seata中,Netty用于節(jié)點間的通信,包括事務協(xié)調(diào)器(TC)、事務管理器(TM)和資源管理器(RM)之間的通信。
以下是ServerRunner類中run()方法可能包含的邏輯的一個簡單示例:
@Override
public void run(String... args) throws Exception {
// 初始化Netty通信服務器
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer();
nettyRemotingServer.setPort(serverPort); // 設置服務器端口
nettyRemotingServer.setHost(serverHost); // 設置服務器主機地址
nettyRemotingServer.start(); // 啟動服務器
// 其他初始化邏輯,比如注冊服務等
}在這個例子中,run()方法首先創(chuàng)建了一個NettyRemotingServer實例,并設置了服務器的主機地址和端口號。然后,它調(diào)用start()方法來啟動Netty服務器,這樣Seata Server就可以監(jiān)聽來自其他節(jié)點的請求了。
總的來說
ServerRunner類在Seata Server中扮演著重要的角色,它負責初始化Netty通信渠道,為Seata節(jié)點間的通信提供基礎設施。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Dubbo?LoadBalance基于權重的隨機負載均衡算法提高服務性能
這篇文章主要為大家介紹了Dubbo?LoadBalance基于權重的隨機負載均衡算法提高服務性能詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪<BR>2023-10-10
Java Websocket Canvas實現(xiàn)井字棋網(wǎng)絡游戲
這篇文章主要介紹了Java Websocket Canvas實現(xiàn)井字棋網(wǎng)絡游戲,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08
原生java代碼實現(xiàn)碼云第三方驗證登錄的示例代碼
這篇文章主要介紹了原生java代碼實現(xiàn)碼云第三方驗證登錄的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
SpringBoot集成tomcat詳解實現(xiàn)過程
采用spring boot之后,一切變得如此簡單,打包->java-jar->運維,只需要一個jar包便可以隨意部署安裝。這篇文章,將對 spring boot集成tomcat的源碼進行分析,探索其內(nèi)部的原理2023-02-02

