Java應(yīng)用多機(jī)器部署解決大量定時(shí)任務(wù)問題
今天來(lái)說(shuō)一個(gè)Java多機(jī)部署下定時(shí)任務(wù)的處理方案。
需求: 有兩臺(tái)服務(wù)器同時(shí)部署了同一套代碼, 代碼中寫有spring自帶的定時(shí)任務(wù),但是每次執(zhí)行定時(shí)任務(wù)時(shí)只需要一臺(tái)機(jī)器去執(zhí)行。
當(dāng)拿到這個(gè)需求時(shí)我腦子中立馬出現(xiàn)了兩個(gè)簡(jiǎn)單的解決方案:
- 利用ip進(jìn)行判斷, 兩臺(tái)機(jī)器ip肯定不一樣, 指定某一臺(tái)機(jī)器的ip運(yùn)行。
- 只在一臺(tái)機(jī)器上部署定時(shí)任務(wù)的代碼。
最后兩個(gè)方案又都被自己否決了。 第一條,如果指定ip的機(jī)器出現(xiàn)了問題怎么辦? 例如說(shuō)宕機(jī)了, 那么該制定ip的機(jī)器上的定時(shí)任務(wù)是不是就無(wú)法運(yùn)行了?如果以后該服務(wù)器遷移導(dǎo)致ip變化怎么辦?
第二條, 同上, 還有就是要維護(hù)兩套代碼很不方便。
因?yàn)樯厦鎯蓚€(gè)假設(shè)都不成立, 只能另找他法。 于是便想到利用mysql去解決, 之前了解過(guò)一點(diǎn)mysql的鎖機(jī)制, 知道如果有同時(shí)的兩個(gè)任務(wù)去寫數(shù)據(jù)庫(kù)中同一條記錄, 只有一條會(huì)成功, 這是利用了mysql的排他鎖。
下面就開始代碼演示, 這里主要想給大家的是一個(gè)思路的提示, 代碼還是很簡(jiǎn)單的。
首先需要單獨(dú)創(chuàng)建一張表
CREATE TABLE `t_schedule_cluster` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '@cname:主鍵', `execute` int(1) NOT NULL COMMENT '@cname:執(zhí)行狀態(tài)', `version` int(11) NOT NULL COMMENT '@cname:版本號(hào)\r\n ', `task_name` varchar(128) NOT NULL COMMENT '@cname:任務(wù)名稱\r\n ', `execute_ip` varchar(32) DEFAULT NULL COMMENT '@cname:執(zhí)行ip\r\n ', `update_time` datetime DEFAULT NULL COMMENT '@cname:修改時(shí)間\r\n ', PRIMARY KEY (`id`), KEY `Index_series_id` (`execute`) ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8 COMMENT='@cname:多機(jī)定時(shí)任務(wù)調(diào)度';
看一下建成后的表結(jié)構(gòu), 注釋寫的已經(jīng)很清楚了, 初始化時(shí)需要添加一些定時(shí)任務(wù)的名稱(task_name), 這個(gè)要和你代碼中保持一致, 后面會(huì)提到:

代碼
首先看下我代碼中用到的spring定時(shí)任務(wù):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd"
default-lazy-init="true">
<description>使用Spring的 Scheduled的定時(shí)任務(wù)配置</description>
<!--支持annotation的方式-->
<task:annotation-driven />
<task:scheduler id="springScheduler" pool-size="10"/>
<task:scheduled-tasks scheduler="springScheduler">
<!-- 測(cè)試使用, 項(xiàng)目啟動(dòng)后每隔一分鐘執(zhí)行一次 -->
<task:scheduled ref="listCarAction" method="listCar" cron="0 0/1 0 * * ?"/>
<task:scheduled ref="listCarAction" method="listCar" cron="0 0/1 0 * * ?"/>
</task:scheduled-tasks>
</beans>
相信大家都是用過(guò)這種定時(shí)任務(wù)的設(shè)置方法, 因?yàn)樗莝pring自帶的, 所以使用起來(lái)很方便, 這里我指定了兩個(gè)定時(shí)任務(wù)來(lái)模擬兩臺(tái)機(jī)器的情況, 兩個(gè)定時(shí)任務(wù)都是項(xiàng)目啟動(dòng)后每隔一分鐘執(zhí)行一次。
然后看看這個(gè)listCar中的代碼:
//定時(shí)任務(wù)的名稱, 這個(gè)和數(shù)據(jù)庫(kù)中的task_name是保持一致的, 保證要執(zhí)行該定時(shí)任務(wù)。
public static final String LIST_CAR_TASK = "listCarTask";
private ScheduleClusterTask scheduleClusterTask;
//這個(gè)時(shí)間是根據(jù)spring-scheduler.xml中配置的定時(shí)刷新時(shí)間, 比如說(shuō)我們以后要設(shè)置這個(gè)定時(shí)任務(wù)時(shí)4小時(shí)刷新一次
public static final long maxExpireTime = 4 * 3600;
public void listCar() {
if (scheduleClusterTask.isValidMachine(maxExpireTime, CommonConstants.ScheduleTaskName.LIST_CAR_TASK)) {
//執(zhí)行具體的task方法,
doTask();
//將execute狀態(tài)更新為0
scheduleClusterTask.end(LIST_CAR_TASK);
}
}
最后看下最核心的代碼:ScheduleClusterTask.java
/**
* 多機(jī)定時(shí)任務(wù)工具類
* Created by WangMeng on 2017/4/12.
*/
@Service
public class ScheduleClusterTask {
private ScheduleClusterEntityService scheduleClusterEntityService;
/**
* 這里因?yàn)閮膳_(tái)機(jī)器都有同樣的定時(shí)任務(wù), 會(huì)同時(shí)執(zhí)行這個(gè)方法,只有一臺(tái)機(jī)器可以執(zhí)行成功,返回true。
* @param maxExpireTime 最大的檢查時(shí)間。
* @param taskName 任務(wù)名稱。
* @return
*/
public boolean isValidMachine(long maxExpireTime, String taskName) {
boolean isValid = false;
try {
//通過(guò)taskName去數(shù)據(jù)庫(kù)中查找到該條記錄, 如果大家使用的是mybatis這里需要改一下, 就是一個(gè)簡(jiǎn)單的查詢操作
ScheduleClusterEntity carIndexEntity = scheduleClusterEntityService.findOne(ScheduleClusterEntity.Fields.taskName.eq(taskName));
int execute = carIndexEntity.getExecute();
String ip = InetAddress.getLocalHost().getHostAddress();
long currentTimeMillis = System.currentTimeMillis();
long time = carIndexEntity.getUpdateTime().getTime();
if (execute == 0 && time + maxExpireTime - 1000 < currentTimeMillis) {
isValid = checkMachine(taskName, carIndexEntity, ip);
} else if (time + maxExpireTime - 1000 < currentTimeMillis){
//這里要判斷下, 如果上一次執(zhí)行出現(xiàn)異常導(dǎo)致execute沒有更新為0, 那么這里要判斷上一次更新時(shí)間的間隔。
isValid = checkMachine(taskName, carIndexEntity, ip);
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return isValid;
}
/**
* end方法主要是將excute(是否正在執(zhí)行的標(biāo)志位,0:沒有執(zhí)行, 1:正在執(zhí)行)更新為0
* @param taskName
* @return
*/
public boolean end (String taskName) {
ScheduleClusterEntity carIndexEntity = scheduleClusterEntityService.findOne(ScheduleClusterEntity.Fields.taskName.eq(taskName));
//將execute狀態(tài)更新為0
return scheduleClusterEntityService.end(carIndexEntity);
}
private boolean checkMachine(String taskName, ScheduleClusterEntity carIndexRefresh, String ip) {
return scheduleClusterEntityService.start(taskName, carIndexRefresh.getVersion(), ip);
}
@Autowired
public void setScheduleClusterEntityService(ScheduleClusterEntityService scheduleClusterEntityService) {
this.scheduleClusterEntityService = scheduleClusterEntityService;
}
}
這里還有start方法, 看看怎樣的操作:
@Repository
public class DefaultScheduleClusterEntityDao extends AbstractDao<ScheduleClusterEntity> implements ScheduleClusterEntityDao {
@Override
public boolean start(String taskName, int version, String ip) {
String sql = "update t_schedule_cluster set execute = 1, " +
"version = ?, execute_ip = ?, update_time = ?" +
" where task_name = ? and version = ?";
Sql s = new Sql(sql);
s.addParam(version + 1);
s.addParam(ip);
s.addParam(SqlTimeUtils.nowTimestamp());
s.addParam(taskName);
s.addParam(version);
return 1 == executeUpdate(s);
}
}
核心的代碼到了這里就沒有了, 代碼確實(shí)是非常非常的簡(jiǎn)單, 有興趣的話大家可以在本地測(cè)試一下就可以。
當(dāng)然還有更多很好地解決方案, 我這里秉承的是最簡(jiǎn)單的處理方式。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java中排序報(bào):Comparison method violates its general contract異常的解
這篇文章主要給大家介紹了關(guān)于java中排序報(bào):Comparison method violates its general contract異常的解決方法,文中介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧。2017-06-06
Java8語(yǔ)法糖之Lambda表達(dá)式的深入講解
這篇文章主要給大家介紹了關(guān)于Java8語(yǔ)法糖之Lambda表達(dá)式的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
mybatis Interceptor對(duì)UpdateTime自動(dòng)處理的實(shí)現(xiàn)方法
這篇文章主要給大家介紹了關(guān)于使用mybatis Interceptor對(duì)UpdateTime自動(dòng)處理的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧2018-12-12
SpringBoot利用jackson格式化時(shí)間的三種方法
日常開發(fā)過(guò)程中經(jīng)常會(huì)使用json進(jìn)行數(shù)據(jù)的傳輸,這就涉及到了對(duì)象和json的相互轉(zhuǎn)化,常用的解決方案有:Jackson(推薦)、谷歌的Gson、阿里的Fastjson,這篇文章主要給大家介紹了關(guān)于SpringBoot如何利用jackson格式化時(shí)間的相關(guān)資料,需要的朋友可以參考下2021-06-06
Java中遍歷集合的并發(fā)修改異常解決方案實(shí)例代碼
當(dāng)你遍歷集合的同時(shí),又往集合中添加或者刪除元素,就可能報(bào)并發(fā)修改異常,下面這篇文章主要給大家介紹了關(guān)于Java中遍歷集合的并發(fā)修改異常解決方案的相關(guān)資料,需要的朋友可以參考下2022-12-12

