詳解SpringBoot定時任務(wù)功能
一 背景
項目中需要一個可以動態(tài)新增定時定時任務(wù)的功能,現(xiàn)在項目中使用的是xxl-job定時任務(wù)調(diào)度系統(tǒng),但是經(jīng)過一番對xxl-job功能的了解,發(fā)現(xiàn)xxl-job對項目動態(tài)新增定時任務(wù),動態(tài)刪除定時任務(wù)的支持并不是那么好,所以需要自己手動實現(xiàn)一個定時任務(wù)的功能
二 動態(tài)定時任務(wù)調(diào)度
1 技術(shù)選擇
Timer or ScheduledExecutorService
這兩個都能實現(xiàn)定時任務(wù)調(diào)度,先看下Timer的定時任務(wù)調(diào)度
public class MyTimerTask extends TimerTask {
private String name;
public MyTimerTask(String name){
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void run() {
//task
Calendar instance = Calendar.getInstance();
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
}
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次執(zhí)行,在當(dāng)前時間的1秒以后,之后每隔兩秒鐘執(zhí)行一次
timer.schedule(timerTask,1000L,2000L);在看下ScheduledThreadPoolExecutor的實現(xiàn)
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//do something
}
},initialDelay,period, TimeUnit.HOURS);兩個都能實現(xiàn)定時任務(wù),那他們的區(qū)別呢,使用阿里p3c會給出建議和區(qū)別
多線程并行處理定時任務(wù)時,Timer運行多個TimeTask時,只要其中之一沒有捕獲拋出的異常,其它任務(wù)便會自動終止運行,使用ScheduledExecutorService則沒有這個問題。
從建議上來看,是一定要選擇ScheduledExecutorService了,我們看看源碼看看為什么Timer出現(xiàn)問題會終止執(zhí)行
/**
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);
public Timer() {
this("Timer-" + serialNumber());
}
public Timer(String name) {
thread.setName(name);
thread.start();
}新建對象時,我們看到開啟了一個線程,那么這個線程在做什么呢?一起看看
class TimerThread extends Thread {
boolean newTasksMayBeScheduled = true;
/**
* 每一件一個任務(wù)都是一個quene
*/
private TaskQueue queue;
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // 清除所有任務(wù)信息
}
}
}
/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
}我們看到,執(zhí)行了 mainLoop(),里面是 while (true)方法無限循環(huán),獲取程序中任務(wù)對象中的時間和當(dāng)前時間比對,相同就執(zhí)行,但是一旦報錯,就會進(jìn)入finally中清除掉所有任務(wù)信息。
這時候我們已經(jīng)找到了答案,timer是在被實例化后,啟動一個線程,不間斷的循環(huán)匹配,來執(zhí)行任務(wù),他是單線程的,一旦報錯,線程就終止了,所以不會執(zhí)行后續(xù)的任務(wù),而ScheduledThreadPoolExecutor是多線程執(zhí)行的,就算其中有一個任務(wù)報錯了,并不影響其他線程的執(zhí)行。
2 使用ScheduledThreadPoolExecutor
從上面看,使用ScheduledThreadPoolExecutor還是比較簡單的,但是我們要實現(xiàn)的更優(yōu)雅一些,所以選擇 TaskScheduler來實現(xiàn)
@Component
public class CronTaskRegistrar implements DisposableBean {
private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
@Autowired
private TaskScheduler taskScheduler;
public TaskScheduler getScheduler() {
return this.taskScheduler;
}
public void addCronTask(Runnable task, String cronExpression) {
addCronTask(new CronTask(task, cronExpression));
}
private void addCronTask(CronTask cronTask) {
if (cronTask != null) {
Runnable task = cronTask.getRunnable();
if (this.scheduledTasks.containsKey(task)) {
removeCronTask(task);
}
this.scheduledTasks.put(task, scheduleCronTask(cronTask));
}
}
public void removeCronTask(Runnable task) {
Set<Runnable> runnables = this.scheduledTasks.keySet();
Iterator it1 = runnables.iterator();
while (it1.hasNext()) {
SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
Long taskId = schedulingRunnable.getTaskId();
SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
if (taskId.equals(cancelRunnable.getTaskId())) {
ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
if (scheduledTask != null){
scheduledTask.cancel();
}
}
}
}
public ScheduledTask scheduleCronTask(CronTask cronTask) {
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
return scheduledTask;
}
@Override
public void destroy() throws Exception {
for (ScheduledTask task : this.scheduledTasks.values()) {
task.cancel();
}
this.scheduledTasks.clear();
}
}TaskScheduler是本次功能實現(xiàn)的核心類,但是他是一個接口
public interface TaskScheduler {
/**
* Schedule the given {@link Runnable}, invoking it whenever the trigger
* indicates a next execution time.
* <p>Execution will end once the scheduler shuts down or the returned
* {@link ScheduledFuture} gets cancelled.
* @param task the Runnable to execute whenever the trigger fires
* @param trigger an implementation of the {@link Trigger} interface,
* e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
* wrapping a cron expression
* @return a {@link ScheduledFuture} representing pending completion of the task,
* or {@code null} if the given Trigger object never fires (i.e. returns
* {@code null} from {@link Trigger#nextExecutionTime})
* @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
* for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
* @see org.springframework.scheduling.support.CronTrigger
*/
@Nullable
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);前面的代碼可以看到,我們在類中注入了這個類,但是他是接口,我們怎么知道是那個實現(xiàn)類呢,以往出現(xiàn)這種情況要在類上面加@Primany或者@Quality來執(zhí)行實現(xiàn)的類,但是我們看到我的注入上并沒有標(biāo)記,因為是通過另一種方式實現(xiàn)的
@Configuration
public class SchedulingConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 定時任務(wù)執(zhí)行線程池核心線程數(shù)
taskScheduler.setPoolSize(4);
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
return taskScheduler;
}
}在spring初始化時就注冊了Bean TaskScheduler,而我們可以看到他的實現(xiàn)是ThreadPoolTaskScheduler,在網(wǎng)上的資料中有人說ThreadPoolTaskScheduler是TaskScheduler的默認(rèn)實現(xiàn)類,其實不是,還是需要我們?nèi)ブ付?,而這種方式,當(dāng)我們想替換實現(xiàn)時,只需要修改配置類就行了,很靈活。
而為什么說他是更優(yōu)雅的實現(xiàn)方式呢,因為他的核心也是通過ScheduledThreadPoolExecutor來實現(xiàn)的
public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
return this.scheduledExecutor;
}三 多節(jié)點任務(wù)執(zhí)行問題
這次的實現(xiàn)過程中,我并沒有選擇xxl-job來進(jìn)行實現(xiàn),而是采用了TaskScheduler來實現(xiàn),這也產(chǎn)生了一個問題,xxl-job是分布式的程序調(diào)度系統(tǒng),當(dāng)想要執(zhí)行定時任務(wù)的應(yīng)用使用xxl-job時,無論應(yīng)用程序中部署多少個節(jié)點,xxl-job只會選擇其中一個節(jié)點作為定時任務(wù)執(zhí)行的節(jié)點,從而不會產(chǎn)生定時任務(wù)在不同節(jié)點上同時執(zhí)行,導(dǎo)致重復(fù)執(zhí)行問題,而使用TaskScheduler來實現(xiàn),就要考慮多節(jié)點重復(fù)執(zhí)行問題。當(dāng)然既然有問題,就有解決方案
· 方案一 將定時任務(wù)功能拆出來單獨部署,且只部署一個節(jié)點 · 方案二 使用redis setNx的形式,保證同一時間只有一個任務(wù)在執(zhí)行
我選擇的是方案二來執(zhí)行,當(dāng)然還有一些方式也能保證不重復(fù)執(zhí)行,這里就不多說了,一下是我的實現(xiàn)
public void executeTask(Long taskId) {
if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {
log.info("已有執(zhí)行中定時發(fā)送短信任務(wù),本次不執(zhí)行!");
return;
}四 后記
其實定時任務(wù)應(yīng)該每一個開發(fā)都會用到的工具,以前并沒有了解其中的實現(xiàn),這次的功能開發(fā)過程中也算是對其內(nèi)涵的進(jìn)一步了解,以后遇到定時任務(wù)的處理也更清晰,更有效率了。
到此這篇關(guān)于SpringBoot定時任務(wù)功能詳細(xì)解析的文章就介紹到這了,更多相關(guān)SpringBoot定時任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot?內(nèi)部服務(wù)調(diào)用方式
這篇文章主要介紹了Springboot?內(nèi)部服務(wù)調(diào)用方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
解決Springboot集成Redis集群配置公網(wǎng)IP連接報私網(wǎng)IP連接失敗問題
在Springboot 集成 Redis集群配置公網(wǎng)IP連接報私網(wǎng)IP連接失敗,一直報私有IP連接失敗,所以本文小編給大家介紹了如何解決報錯問題,如果有遇到相同問題的同學(xué),可以參考閱讀本文2023-10-10

