spring schedule配置多任務(wù)動態(tài)cron(增刪啟停)
一、背景
之前公司經(jīng)常會遇到配置定時任務(wù),簡單的任務(wù)可以直接依賴spring。
簡單任務(wù)直接使用 @scheduled 注解配合@EnableScheduling。
但是如何實現(xiàn)簡單的動態(tài)cron呢?
開發(fā)原則:
盡可能在項目本身去實現(xiàn),少依賴第三方框架,避免項目過于臃腫和復(fù)雜。
倆種任務(wù)調(diào)度方式:

二、本篇說明
springBoot 基礎(chǔ)模塊 spring-boot-starter-web 已經(jīng)內(nèi)置 schedule ,無需引入額外依賴。
先思考幾個問題:
1、動態(tài) cron 實現(xiàn)的原理
任務(wù)的 【 停止】是基于 future接口 的cancel() 方法。
任務(wù)的 【增加、刪除、啟動】是基于 注冊到 類ScheduledTaskRegistrar 的 ScheduledFuture的數(shù)量。
涉及核心類:
- ScheduledFuture
- SchedulingConfigurer
- ScheduledTaskRegistrar
2、多任務(wù)并行執(zhí)行配置
spring默認(rèn)機(jī)制對schedule是單線程,需要配置多線程并行執(zhí)行。
3、如何配置多個任務(wù)
好多博文,都是配置一個cron,這讓初學(xué)者很難受。
4、如何配置任務(wù)分組
根據(jù)自己業(yè)務(wù)背景,可根據(jù)步驟三,進(jìn)行改造。
5、如何配置服務(wù)啟動自啟任務(wù)。
想要程序啟動時首次去加我們設(shè)置的task,只需實現(xiàn) CommandLineRunner 即可。
6、如何從數(shù)據(jù)庫讀取配置
這個其實很簡單,在實現(xiàn) ScheduledTaskRegistrar 時,先直接查詢我們需要的數(shù)據(jù)即可。
7、如何優(yōu)雅的實現(xiàn)我們的代碼
這里為了我們多個task實現(xiàn)時,去除臃腫的if else ,使用策略模式去實現(xiàn)我們的task,這里代碼里面會具體介紹。
參考類圖:

8、如何去觸發(fā)我們的schedule 【增刪啟?!?br /> 配置好 task任務(wù)類,注入到 controller ,通過接口直接調(diào)用即可。
三、代碼實現(xiàn)
先貼出我的github 代碼,下面代碼描述不全。
1. 普通多任務(wù)動態(tài)cron 實現(xiàn)
1.1 對應(yīng)數(shù)據(jù)庫的實體類 TaskEntity
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TaskEntity {
/**
* 任務(wù)id
*/
private int taskId;
/**
* 任務(wù)說明
*/
private String desc;
/**
* cron 表達(dá)式
*/
private String expression;
}
1.2 配置每個任務(wù)實現(xiàn)
配置任務(wù)接口 TaskService
public interface TaskService {
void HandlerJob();
Integer jobId();
}
配置任務(wù)接口實現(xiàn) TaskServiceJob1Impl、TaskServiceJob2Impl …
@Service
public class TaskServiceJob1Impl implements TaskService {
@Override
public void HandlerJob() {
System.out.println("------job1 開始執(zhí)行---------:"+new Date());
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " " + Thread.currentThread().getName() + " 任務(wù)一啟動");
try {
Thread.sleep(10000);//任務(wù)耗時10秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " " + Thread.currentThread().getName() + " 結(jié)束");
}
@Override
public Integer jobId() {
return 1;
}
}
1.3 配置任務(wù)解析器 TaskSolverChooser
注:
這里引入策略模式
為啥要配置 任務(wù)解析器選擇器:
因為我們實現(xiàn)多個任務(wù)時,一個任務(wù)對應(yīng)一個 CronTask,需要在 MyScheduledTask 里面去實現(xiàn)我們每一個方法。
譬如,我們有100個任務(wù)就要自定義100個任務(wù)實現(xiàn)方法,代碼會很臃腫,明顯不符合,【開閉原則】,于是這里采用策略模式,解耦我們多個任務(wù)業(yè)務(wù)實現(xiàn)邏輯。
@Slf4j
@Component
public class TaskSolverChooser implements ApplicationContextAware {
private ApplicationContext applicationContext;
private Map<Integer, TaskService> chooseMap = new HashMap<>(16);
/**
* 拿到spring context 上下文
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@PostConstruct
private void registerToTaskSolver(){
Map<String, TaskService> taskServiceMap = applicationContext.getBeansOfType(TaskService.class);
for (TaskService value : taskServiceMap.values()) {
chooseMap.put(value.jobId(), value);
log.info("task {} 處理器: {} 注冊成功",new Object[]{value.jobId(),value});
}
}
/**
* 獲取需要的job
*/
public TaskService getTask(Integer jobId){
return chooseMap.get(jobId);
}
}
1.4 配置MyScheduledTask (動態(tài)cron核心配置)
說明:
1、配置多線程執(zhí)行任務(wù)
2、配置 刷新 task
3、配置 停止 task
4、配置 執(zhí)行task 業(yè)務(wù)邏輯
@Component
public class MyScheduledTask implements SchedulingConfigurer {
private volatile ScheduledTaskRegistrar registrar;
private final ConcurrentHashMap<Integer, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, CronTask> cronTasks = new ConcurrentHashMap<>();
@Autowired
private TaskSolverChooser taskSolverChooser;
@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {
//設(shè)置20個線程,默認(rèn)單線程,如果不設(shè)置的話,不能同時并發(fā)執(zhí)行任務(wù)
registrar.setScheduler(Executors.newScheduledThreadPool(10));
this.registrar = registrar;
}
/**
* 修改 cron 需要 調(diào)用該方法
*/
public void refresh(List<TaskEntity> tasks){
//取消已經(jīng)刪除的策略任務(wù)
Set<Integer> sids = scheduledFutures.keySet();
for (Integer sid : sids) {
if(!exists(tasks, sid)){
scheduledFutures.get(sid).cancel(false);
}
}
for (TaskEntity TaskEntity : tasks) {
String expression = TaskEntity.getExpression();
//計劃任務(wù)表達(dá)式為空則跳過
if(!StringUtils.hasLength(expression)){
continue;
}
//計劃任務(wù)已存在并且表達(dá)式未發(fā)生變化則跳過
if (scheduledFutures.containsKey(TaskEntity.getTaskId())
&& cronTasks.get(TaskEntity.getTaskId()).getExpression().equals(expression)) {
continue;
}
//如果策略執(zhí)行時間發(fā)生了變化,則取消當(dāng)前策略的任務(wù)
if(scheduledFutures.containsKey(TaskEntity.getTaskId())){
scheduledFutures.get(TaskEntity.getTaskId()).cancel(false);
scheduledFutures.remove(TaskEntity.getTaskId());
cronTasks.remove(TaskEntity.getTaskId());
}
//業(yè)務(wù)邏輯處理
CronTask task = cronTask(TaskEntity, expression);
//執(zhí)行業(yè)務(wù)
ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(TaskEntity.getTaskId(), task);
scheduledFutures.put(TaskEntity.getTaskId(), future);
}
}
/**
* 停止 cron 運(yùn)行
*/
public void stop(List<TaskEntity> tasks){
tasks.forEach(item->{
if (scheduledFutures.containsKey(item.getTaskId())) {
// mayInterruptIfRunning設(shè)成false話,不允許在線程運(yùn)行時中斷,設(shè)成true的話就允許。
scheduledFutures.get(item.getTaskId()).cancel(false);
scheduledFutures.remove(item.getTaskId());
}
});
}
/**
* 業(yè)務(wù)邏輯處理
*/
public CronTask cronTask(TaskEntity TaskEntity, String expression) {
return new CronTask(() -> {
//每個計劃任務(wù)實際需要執(zhí)行的具體業(yè)務(wù)邏輯
//采用策略,模式 ,執(zhí)行我們的job
taskSolverChooser.getTask(TaskEntity.getTaskId()).HandlerJob();
}, expression);
}
private boolean exists(List<TaskEntity> tasks, Integer tid){
for(TaskEntity TaskEntity:tasks){
if(TaskEntity.getTaskId() == tid){
return true;
}
}
return false;
}
@PreDestroy
public void destroy() {
this.registrar.destroy();
}
}
1.5 配置程序啟動時首次去加我們設(shè)置的task
@Component
public class StartInitTask implements CommandLineRunner {
@Autowired
private MyScheduledTask myScheduledTask;
@Override
public void run(String... args) throws Exception {
List<TaskEntity> list = Arrays.asList(
new TaskEntity(1, "測試1", "0/1 * * * * ?"),
new TaskEntity(2, "測試2", "0/1 * * * * ?")
);
myScheduledTask.refresh(list);
}
}
1.6 配置web接口去觸發(fā),增刪啟停
@RestController
public class StartController {
@Autowired
private MyScheduledTask scheduledTask;
@PostMapping(value = "/startOrChangeCron")
public String changeCron(@RequestBody List<TaskEntity> list){
if (CollectionUtils.isEmpty(list)) {
// 這里模擬存在數(shù)據(jù)庫的數(shù)據(jù)
list = Arrays.asList(
new TaskEntity(1, "測試1","0/1 * * * * ?") ,
new TaskEntity(2, "測試2","0/1 * * * * ?")
);
}
scheduledTask.refresh(list);
return "task任務(wù):" + list.toString() + "已經(jīng)開始運(yùn)行";
}
@PostMapping(value = "/stopCron")
public String stopCron(@RequestBody List<TaskEntity> list){
if (CollectionUtils.isEmpty(list)) {
// 這里模擬將要停止的cron可通過前端傳來
list = Arrays.asList(
new TaskEntity(1, "測試1","0/1 * * * * ?") ,
new TaskEntity(2, "測試2","0/1 * * * * ?")
);
}
scheduledTask.stop(list);
List<Integer> collect = list.stream().map(TaskEntity::getTaskId).collect(Collectors.toList());
return "task任務(wù):" + collect.toString() + "已經(jīng)停止啟動";
}
}
2. 分組多任務(wù)動態(tài)cron 實現(xiàn)
實現(xiàn)原理:
基于反射實現(xiàn),根據(jù)方法全類名,去動態(tài)執(zhí)行方法。多任務(wù)分組配置,根據(jù)任務(wù)類型進(jìn)行分組。
eg:
定時任務(wù)人員的相關(guān)操作,有檢測人員離職狀態(tài),人員業(yè)績達(dá)標(biāo),人員考勤…等,
作用:
對人員定時任務(wù)做一個分類,在同一個類里面去實現(xiàn)不同的task,
比較
《1. 普通多任務(wù)動態(tài)cron 實現(xiàn)》,是一個類可以實現(xiàn)一個task
《2. 分組多任務(wù)動態(tài)cron 實現(xiàn)》,是一個類可以實現(xiàn)多個task
詳細(xì)可參考: 分組多任務(wù)動態(tài)cron
3 測試記錄
測試1 項目啟動自啟
TaskServiceJob1Impl和TaskServiceJob1Impl … 設(shè)置 阻塞10s
觀察日志時間可發(fā)現(xiàn),已經(jīng)同時并發(fā)執(zhí)行倆個任務(wù)。

測試2 觸發(fā) 刷新【增、刪、啟】我們的task,。
其實這里沒這么智能,如果需要觸發(fā)刷新接口,實際上是重新加載我們的task,就是對應(yīng)觸發(fā)我們,增加任務(wù)任務(wù),刪除任務(wù),啟動任務(wù)。
使用idea插件測試接口

觀察日志

測試3 觸發(fā) 停止接口,停止一個接口。
這里測試略過…
四、總結(jié)
其實實現(xiàn)簡單的動態(tài)配置,以上代碼可用,比較簡單。
到此這篇關(guān)于spring schedule配置多任務(wù)動態(tài)cron(增刪啟停)的文章就介紹到這了,更多相關(guān)spring schedule 多任務(wù)動態(tài)cron內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringCloud Zuul和Gateway的實例代碼(搭建方式)
本文主要介紹了SpringCloudZuul和SpringCloudGateway的簡單示例,SpringCloudGateway是推薦使用的API網(wǎng)關(guān)解決方案,基于SpringFramework5和ProjectReactor構(gòu)建,具有更高的性能和吞吐量2025-02-02
springboot實現(xiàn)rabbitmq的隊列初始化和綁定
這篇文章主要介紹了springboot實現(xiàn)rabbitmq的隊列初始化和綁定,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-10-10
使用Cloud?Studio構(gòu)建SpringSecurity權(quán)限框架(騰訊云?Cloud?Studio?實戰(zhàn)訓(xùn)練
隨著云計算技術(shù)的成熟和普及,傳統(tǒng)編程能力和資源以云服務(wù)的形式開放出來,從中間件、數(shù)據(jù)庫等水平能力服務(wù)組件到人臉識別、鑒權(quán)服務(wù)等基本業(yè)務(wù)服務(wù)組件很容易的在云端獲取,本文介紹使用Cloud?Studio構(gòu)建SpringSecurity權(quán)限框架的相關(guān)知識,感興趣的朋友一起看看吧2023-08-08
idea已經(jīng)提交到遠(yuǎn)程分支,但需要本地和遠(yuǎn)程都回退到某一版本問題
這篇文章主要介紹了idea已經(jīng)提交到遠(yuǎn)程分支,但需要本地和遠(yuǎn)程都回退到某一版本問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11

