Java之Rsync并發(fā)遷移數(shù)據(jù)并校驗(yàn)詳解
java調(diào)用Rsync并發(fā)遷移數(shù)據(jù)并執(zhí)行校驗(yàn)
java代碼如下
RsyncFile.java
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import java.io.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
/**
* @ClassName RsyncFile
* @Descriptiom TODO rsync多線程同步遷移數(shù)據(jù)
* @Author KING
* @Date 2019/11/25 09:17
* @Version 1.2.2
* rsync -vzrtopg --progress --delete //鏡像同步
**/
@NoArgsConstructor
public class RsyncFile implements Runnable{
private static final int availProcessors = Runtime.getRuntime().availableProcessors();
//構(gòu)造以cpu核心數(shù)為核心池,cpu線程數(shù)為最大池,超時(shí)時(shí)間為1s,線程隊(duì)列為大小為無(wú)界的安全阻塞線程隊(duì)列,拒絕策略為DiscardOldestPolicy()的線程池。(同步數(shù)據(jù)當(dāng)然不能丟下拒絕任務(wù))
private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1,
availProcessors,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
//保存掃描得到的文件列表
private static ArrayList<String> fileNameList = new ArrayList<String>();
private String shellname;
private String filename;
private String userip;
private CountDownLatch countDownLatch;
private static int deep = 0;
public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) {
this.shellname = ShellName;
this.filename = filename;
this.userip = UserIP;
this.countDownLatch = countDownLatch;
}
public static void main(String[] args) {
try {
new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2]));
}catch (ArrayIndexOutOfBoundsException e){
System.out.println(e);
System.out.println("Error , args send fault");
System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue");
System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");
}catch(NumberFormatException e1){
System.out.println(e1);
System.out.println("please input Right Directory depth, this number must be int");
System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");
}
}
@SneakyThrows
private void Do(String content,String UserIP,int setdeep){
System.out.println("開(kāi)始執(zhí)行");
System.out.println("開(kāi)始時(shí)間:" + new Date());
Long a = System.nanoTime();
File file = new File(content);
System.out.println("開(kāi)始掃描本地指定目錄");
GetAllFile(file,setdeep);//按深度掃描非空文件夾和文件
System.out.println("掃描本地目錄完成");
//給腳本賦予權(quán)限
String [] cmd={"/bin/sh","-c","chmod 755 ./do*"};
Runtime.getRuntime().exec(cmd);
//創(chuàng)建遠(yuǎn)端目錄操作
System.out.println("開(kāi)始創(chuàng)建遠(yuǎn)端目錄結(jié)構(gòu)");
//一次計(jì)數(shù)鎖用于保證目錄創(chuàng)建完成
CountDownLatch doDirLock = new CountDownLatch(1);
ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock));
doDirLock.await();
System.out.println("創(chuàng)建遠(yuǎn)端目錄結(jié)構(gòu)完成");
//開(kāi)始同步工作
System.out.println("開(kāi)始執(zhí)行同步工作");
System.out.println("同步的文件夾或文件總數(shù): " + fileNameList.size());
System.out.println("正在同步。。。。。");
//fileNameList.size()次計(jì)數(shù)鎖用于保證數(shù)據(jù)同步完成(保證計(jì)時(shí)準(zhǔn)確)
CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size());
System.out.println(fileNameList.size());
for (String fileName:fileNameList) {
//除去文件名中與UserIP重復(fù)的文件路徑
String RemoteDir = UserIP.concat(fileName.replace(content, ""));
System.out.println("要同步的本地目錄或文件: " + fileName);
System.out.println("要同步的遠(yuǎn)端目錄或文件: " + RemoteDir);
ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock));
}
rsyncLock.await();
System.out.println("執(zhí)行同步工作完成");
//開(kāi)始文件校驗(yàn)工作
System.out.println("執(zhí)行文件校驗(yàn)及重傳");
//一次計(jì)數(shù)鎖用于保證校驗(yàn)完成
CountDownLatch chechSumLock = new CountDownLatch(1);
ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock));
chechSumLock.await();
System.out.println("文件校驗(yàn)及重傳完成");
ThreadPool.shutdown();
Long b = System.nanoTime();
Long aLong = (b - a)/1000000L;
System.out.println("處理時(shí)間" + aLong + "ms");
System.out.println("結(jié)束時(shí)間:" + new Date());
}
/**
* 執(zhí)行rsync腳本的線程方法,使用PrintWriter來(lái)與linux Terminal交互
*/
@Override
public void run() {
try {
String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip);
File wd = new File("/bin");
Process process = null;
process = Runtime.getRuntime().exec("/bin/bash", null, wd);
if (process != null) {
InputStream is = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())),
true);
//切換到當(dāng)前class文件所在目錄
out.println("cd " + System.getProperty("user.dir"));
out.println(command);
out.println("exit");
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + System.lineSeparator());
}
process.waitFor();
reader.close();
out.close();
process.destroy();
System.out.println("result:" + sb.toString());
}else {
System.out.println("找不到系統(tǒng)bash工具,請(qǐng)檢查系統(tǒng)是否異常,并為系統(tǒng)創(chuàng)建/bin/sh的bash工具軟連接");
}
} catch (Exception e) {
System.err.println(e.getMessage());
}finally {
//倒記數(shù)鎖釋放一次
countDownLatch.countDown();
}
}
/**遍歷指定的目錄并能指定深度
* @param file 指定要遍歷的目錄
* @param setDeep 設(shè)定遍歷深度
*/
@SneakyThrows
private static void GetAllFile(File file, int setDeep) {
if(file != null){
if(file.isDirectory() && deep<setDeep){
deep++;
File f[] = file.listFiles();
if(f != null) {
int length = f.length;
for(int i = 0; i < length; i++)
GetAllFile(f[i],setDeep);
deep--;
}
} else {
if(file.isDirectory()){
//如果為目錄末尾添加 / 保證rsync正常處理
fileNameList.add(file.getAbsolutePath().concat("/"));
}else {
fileNameList.add(file.getAbsolutePath());
}
}
}
}
}
doDir.sh
rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1 echo "創(chuàng)建目錄結(jié)構(gòu)操作"
doRsync.sh
rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1
doChecksum.sh
rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1
附錄
rsync輸出日志說(shuō)明如下
YXcstpoguax path/to/file
|||||||||||
||||||||||╰- x: The extended attribute information changed
|||||||||╰-- a: The ACL information changed
||||||||╰--- u: The u slot is reserved for future use
|||||||╰---- g: Group is different
||||||╰----- o: Owner is different
|||||╰------ p: Permission are different
||||╰------- t: Modification time is different
|||╰-------- s: Size is different
||╰--------- c: Different checksum (for regular files), or
|| changed value (for symlinks, devices, and special files)
|╰---------- the file type:
| f: for a file,
| d: for a directory,
| L: for a symlink,
| D: for a device,
| S: for a special file (e.g. named sockets and fifos)
╰----------- the type of update being done::
<: file is being transferred to the remote host (sent)
>: file is being transferred to the local host (received)
c: local change/creation for the item, such as:
- the creation of a directory
- the changing of a symlink,
- etc.
h: the item is a hard link to another item (requires
--hard-links).
.: the item is not being updated (though it might have
attributes that are being modified)
*: means that the rest of the itemized-output area contains
a message (e.g. "deleting")
到此這篇關(guān)于Java之Rsync并發(fā)遷移數(shù)據(jù)并校驗(yàn)詳解的文章就介紹到這了,更多相關(guān)Java之Rsync并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java并發(fā)編程之代碼實(shí)現(xiàn)兩玩家交換裝備
- Java并發(fā)編程之阻塞隊(duì)列(BlockingQueue)詳解
- java實(shí)戰(zhàn)案例之用戶注冊(cè)并發(fā)送郵件激活/發(fā)送郵件驗(yàn)證碼
- JAVA并發(fā)圖解
- java并發(fā)編程JUC CountDownLatch線程同步
- Java并發(fā)之Condition案例詳解
- java并發(fā)編程之ThreadLocal詳解
- Java 處理高并發(fā)負(fù)載類優(yōu)化方法案例詳解
- 淺談Java高并發(fā)解決方案以及高負(fù)載優(yōu)化方法
- Java httpClient連接池支持多線程高并發(fā)的實(shí)現(xiàn)
- Java中常見(jiàn)的并發(fā)控制手段淺析
- Java面試題沖刺第二十四天--并發(fā)編程
- Java 模擬真正的并發(fā)請(qǐng)求詳情
相關(guān)文章
httpclient 請(qǐng)求http數(shù)據(jù),json轉(zhuǎn)map的實(shí)例
下面小編就為大家?guī)?lái)一篇httpclient 請(qǐng)求http數(shù)據(jù),json轉(zhuǎn)map的實(shí)例。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12
springcloud整合seata的實(shí)現(xiàn)代碼
這篇文章主要介紹了springcloud整合seata的實(shí)現(xiàn)方法,整合步驟通過(guò)引入spring-cloud-starter-alibaba-seata?jar包,文中結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-05-05
TCP/IP協(xié)議中三次握手四次揮手的原理及流程分析
這篇文章主要介紹了TCP/IP協(xié)議中三次握手四次揮手的原理及流程分析,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
spring boot入門(mén)開(kāi)始你的第一個(gè)應(yīng)用
這篇文章主要介紹了spring boot入門(mén)開(kāi)始你的第一個(gè)應(yīng)用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06
IntelliJ IDEA里找不到j(luò)avax.servlet的jar包的解決方法
這篇文章主要介紹了IntelliJ IDEA里找不到j(luò)avax.servlet的jar包的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
Spring Boot 定義系統(tǒng)啟動(dòng)任務(wù)的多種方式
這篇文章主要介紹了Spring Boot 定義系統(tǒng)啟動(dòng)任務(wù)的多種方式,看看你都會(huì)哪幾種 ,感興趣的朋友跟隨小編一起看看吧2019-04-04
關(guān)于springcloud報(bào)錯(cuò)報(bào)UnsatisfiedDependencyException的問(wèn)題
這篇文章主要介紹了關(guān)于springcloud報(bào)錯(cuò)報(bào)UnsatisfiedDependencyException的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11

