關(guān)于通過java調(diào)用datax,返回任務(wù)執(zhí)行的方法
DATAX
DataX 是阿里巴巴集團(tuán)內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺,實現(xiàn)包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。
datax的詳細(xì)介紹
引言
因為業(yè)務(wù)需要,需要使用到datax把數(shù)據(jù)從文本寫入到數(shù)據(jù)庫,原來的做法都是使用python通過datax.py去調(diào)用腳本,阿文為了能更好的管控datax的任務(wù),阿文要求我們對datax進(jìn)行改造,使用java集成的方式去調(diào)用datax,并返回任務(wù)執(zhí)行的詳細(xì)信息。
datax源碼跟蹤
從github下完源碼開始改造,datax的啟動類在datax-core包下Engine類的entry方法,該方法是一個靜態(tài)方法。
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果用戶沒有明確指定jobid, 則 datax.py 會指定 jobid 默認(rèn)值為-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = Long.parseLong(jobIdString);
} else {
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
}
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) {
// 如果不是 standalone 模式,那么 jobId 一定不能為-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必須在 URL 中提供有效的 jobId.");
}
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
}
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");
LOG.debug(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
里面最后通過調(diào)用engine.start(configuration) 開始啟動,我們點進(jìn)去,最后會發(fā)現(xiàn)在里面是調(diào)用JobContainer 的start() 方法。
@Override
public void start() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if (isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
}
} catch (Throwable e) {
LOG.error("Exception when job run", e);
hasException = true;
if (e instanceof OutOfMemoryError) {
this.destroy();
System.gc();
}
if (super.getContainerCommunicator() == null) {
// 由于 containerCollector 是在 scheduler() 中初始化的,所以當(dāng)在 scheduler() 之前出現(xiàn)異常時,需要在此處對 containerCollector 進(jìn)行初始化
AbstractContainerCommunicator tempContainerCollector;
// standalone
tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);
super.setContainerCommunicator(tempContainerCollector);
}
Communication communication = super.getContainerCommunicator().collect();
// 匯報前的狀態(tài),不需要手動進(jìn)行設(shè)置
// communication.setState(State.FAILED);
communication.setThrowable(e);
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
super.getContainerCommunicator().report(reportCommunication);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
if (!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
//最后打印cpu的平均消耗,GC的統(tǒng)計
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
this.logStatistics();
}
}
}
}
而我們需要的任務(wù)信息就在this.logStatistics() 中
private void logStatistics() {
long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
if (0L == transferCosts) {
transferCosts = 1L;
}
if (super.getContainerCommunicator() == null) {
return;
}
Communication communication = super.getContainerCommunicator().collect();
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
// 字節(jié)速率
long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
/ transferCosts;
long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
/ transferCosts;
reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);
reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);
super.getContainerCommunicator().report(reportCommunication);
LOG.info(String.format(
"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n",
"任務(wù)啟動時刻",
dateFormat.format(startTimeStamp),
"任務(wù)結(jié)束時刻",
dateFormat.format(endTimeStamp),
"任務(wù)總計耗時",
String.valueOf(totalCosts) + "s",
"任務(wù)平均流量",
StrUtil.stringify(byteSpeedPerSecond)
+ "/s",
"記錄寫入速度",
String.valueOf(recordSpeedPerSecond)
+ "rec/s", "讀出記錄總數(shù)",
String.valueOf(CommunicationTool.getTotalReadRecords(communication)),
"讀寫失敗總數(shù)",
String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
));
LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" +
dateFormat.format(endTimeStamp) + "|" +
String.valueOf(totalCosts) + "|" +
StrUtil.stringify(byteSpeedPerSecond) + "|" +
String.valueOf(recordSpeedPerSecond) + "|" +
String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" +
String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
);
if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
LOG.info(String.format(
"\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
"Transformer成功記錄總數(shù)",
communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
"Transformer失敗記錄總數(shù)",
communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
"Transformer過濾記錄總數(shù)",
communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
));
}
}
改造開始
新增返回實體DataxResult (get、set省略)
public class DataxResult {
//任務(wù)啟動時刻
private long startTimeStamp;
//任務(wù)結(jié)束時刻
private long endTimeStamp;
//任務(wù)總時耗
private long totalCosts;
//任務(wù)平均流量
private long byteSpeedPerSecond;
//記錄寫入速度
private long recordSpeedPerSecond;
//讀出記錄總數(shù)
private long totalReadRecords;
//讀寫失敗總數(shù)
private long totalErrorRecords;
//成功記錄總數(shù)
private long transformerSucceedRecords;
// 失敗記錄總數(shù)
private long transformerFailedRecords;
// 過濾記錄總數(shù)
private long transformerFilterRecords;
//字節(jié)數(shù)
private long readSucceedBytes;
//轉(zhuǎn)換開始時間
private long endTransferTimeStamp;
//轉(zhuǎn)換結(jié)束時間
private long startTransferTimeStamp;
//轉(zhuǎn)換總耗時
private long transferCosts;
重寫logStatistics方法,返回該實體。
private DataxResult logStatistics(DataxResult resultMsg) {
long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
if (0L == transferCosts) {
transferCosts = 1L;
}
if (super.getContainerCommunicator() == null) {
return resultMsg;
}
Communication communication = super.getContainerCommunicator().collect();
long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
/ transferCosts;
long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
/ transferCosts;
return resultMsg.getResultMsg(startTimeStamp,
endTimeStamp,
totalCosts,
byteSpeedPerSecond,
recordSpeedPerSecond,
communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES),
this.endTransferTimeStamp,
this.startTransferTimeStamp,
transferCosts
);
}
還需要重寫JobContainer的**start()**方法。
@Override
public DataxResult start(DataxResult dataxResult) {
...
DataxResult result = new DataxResult();
result = logStatistics(dataxResult);
...
return result;
}
然后在Engine 類中添加模擬測試方法mockentry
public DataxResult mockstart(Configuration allConf) {
...
DataxResult dataxResult = new DataxResult();
return container.start(dataxResult);
}
開始測試
在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 為本地路徑

該datax_home路徑下有以下幾個目錄

public class test {
public static void main(String[] args) {
String[] datxArgs = {"-job", CoreConstant.DATAX_HOME + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};
try {
DataxResult dataxResult= Engine.mockentry(datxArgs);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
執(zhí)行結(jié)果為
3
大功告成!
以上這篇關(guān)于通過java調(diào)用datax,返回任務(wù)執(zhí)行的方法就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA創(chuàng)建web項目出現(xiàn)404錯誤解決方法
今天先來搭建一個web工程,工程搭建好運行時發(fā)現(xiàn)404,本文主要介紹了IDEA創(chuàng)建web項目出現(xiàn)404錯誤解決方法,具有一定的參考價值,感興趣的可以了解一下2023-09-09
SpringBoot集成WebSocket【基于純H5】進(jìn)行點對點[一對一]和廣播[一對多]實時推送
這篇文章主要介紹了SpringBoot集成WebSocket【基于純H5】進(jìn)行點對點[一對一]和廣播[一對多]實時推送,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-08-08
基于RocketMQ實現(xiàn)分布式事務(wù)的方法
了保證系統(tǒng)數(shù)據(jù)的一致性,我們需要確保這些服務(wù)中的操作要么全部成功,要么全部失敗,通過使用RocketMQ實現(xiàn)分布式事務(wù),我們可以協(xié)調(diào)這些服務(wù)的操作,保證數(shù)據(jù)的一致性,這篇文章主要介紹了基于RocketMQ實現(xiàn)分布式事務(wù),需要的朋友可以參考下2024-03-03
Java Spring MVC 上傳下載文件配置及controller方法詳解
這篇文章主要介紹了Java Spring MVC 上傳下載文件配置及controller方法詳解,本文介紹的非常詳細(xì),具有參考借鑒價值,需要的朋友可以參考下2016-09-09

