Java多線程實(shí)現(xiàn)FTP批量上傳文件
本文實(shí)例為大家分享了Java多線程實(shí)現(xiàn)FTP批量上傳文件的具體代碼,供大家參考,具體內(nèi)容如下
1、構(gòu)建FTP客戶端
package cn.com.pingtech.common.ftp;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import java.io.*;
import java.net.UnknownHostException;
@Slf4j
public class ?FtpConnection {
? ? private FTPClient ftp = new FTPClient();
? ? private boolean is_connected = false;
? ? /**
? ? ?* 構(gòu)造函數(shù)
? ? ?*/
? ? public FtpConnection() {
? ? ? ? is_connected = false;
? ? ? ? ftp.setDefaultTimeout(FtpConfig.defaultTimeoutSecond * 1000);
? ? ? ? ftp.setConnectTimeout(FtpConfig.connectTimeoutSecond * 1000);
? ? ? ? ftp.setDataTimeout(FtpConfig.dataTimeoutSecond * 1000);
? ? ? ? try {
? ? ? ? ? ? initConnect(FtpConfig.host, FtpConfig.port, FtpConfig.user, FtpConfig.password);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? /**
? ? ?* 初始化連接
? ? ?*
? ? ?* @param host
? ? ?* @param port
? ? ?* @param user
? ? ?* @param password
? ? ?* @throws IOException
? ? ?*/
? ? private void initConnect(String host, int port, String user, String password) throws IOException {
? ? ? ? try {
? ? ? ? ? ? ftp.connect(host, port);
? ? ? ? } catch (UnknownHostException ex) {
? ? ? ? ? ? throw new IOException("Can't find FTP server '" + host + "'");
? ? ? ? }
? ? ? ? int reply = ftp.getReplyCode();//220 連接成功
? ? ? ? if (!FTPReply.isPositiveCompletion(reply)) {
? ? ? ? ? ? disconnect();
? ? ? ? ? ? throw new IOException("Can't connect to server '" + host + "'");
? ? ? ? }
? ? ? ? if (!ftp.login(user, password)) {
? ? ? ? ? ? is_connected = false;
? ? ? ? ? ? disconnect();
? ? ? ? ? ? throw new IOException("Can't login to server '" + host + "'");
? ? ? ? } else {
? ? ? ? ? ? is_connected = true;
? ? ? ? }
? ? }
? ? /**
? ? ?* 上傳文件
? ? ?*
? ? ?* @param path
? ? ?* @param ftpFileName
? ? ?* @param localFile
? ? ?* @throws IOException
? ? ?*/
? ? public boolean upload(String path, String ftpFileName, File localFile) throws IOException {
? ? ? ? boolean is ?= false;
? ? ? ? //檢查本地文件是否存在
? ? ? ? if (!localFile.exists()) {
? ? ? ? ? ? throw new IOException("Can't upload '" + localFile.getAbsolutePath() + "'. This file doesn't exist.");
? ? ? ? }
? ? ? ? //設(shè)置工作路徑
? ? ? ? setWorkingDirectory(path);
? ? ? ? //上傳
? ? ? ? InputStream in = null;
? ? ? ? try {
? ? ? ? ? ? //被動(dòng)模式
? ? ? ? ? ? ftp.enterLocalPassiveMode();
? ? ? ? ? ? in = new BufferedInputStream(new FileInputStream(localFile));
? ? ? ? ? ? //保存文件
? ? ? ? ? ? is = ftp.storeFile(ftpFileName, in);
? ? ? ? }catch (Exception e){
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? finally {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? in.close();
? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? ? ex.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return is;
? ? }
? ? /**
? ? ?* 關(guān)閉連接
? ? ?*
? ? ?* @throws IOException
? ? ?*/
? ? public void disconnect() throws IOException {
? ? ? ? if (ftp.isConnected()) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ftp.logout();
? ? ? ? ? ? ? ? ftp.disconnect();
? ? ? ? ? ? ? ? is_connected = false;
? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? ? ex.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? /**
? ? ?* 設(shè)置工作路徑
? ? ?*
? ? ?* @param dir
? ? ?* @return
? ? ?*/
? ? private boolean setWorkingDirectory(String dir) {
? ? ? ? if (!is_connected) {
? ? ? ? ? ? return false;
? ? ? ? }
? ? ? ? //如果目錄不存在創(chuàng)建目錄
? ? ? ? try {
? ? ? ? ? ? if (createDirecroty(dir)) {
? ? ? ? ? ? ? ? return ftp.changeWorkingDirectory(dir);
? ? ? ? ? ? }
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return false;
? ? }
? ? /**
? ? ?* 是否連接
? ? ?*
? ? ?* @return
? ? ?*/
? ? public boolean isConnected() {
? ? ? ? return is_connected;
? ? }
? ? /**
? ? ?* 創(chuàng)建目錄
? ? ?*
? ? ?* @param remote
? ? ?* @return
? ? ?* @throws IOException
? ? ?*/
? ? private boolean createDirecroty(String remote) throws IOException {
? ? ? ? boolean success = true;
? ? ? ? String directory = remote.substring(0, remote.lastIndexOf("/") + 1);
? ? ? ? // 如果遠(yuǎn)程目錄不存在,則遞歸創(chuàng)建遠(yuǎn)程服務(wù)器目錄
? ? ? ? if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(new String(directory))) {
? ? ? ? ? ? int start = 0;
? ? ? ? ? ? int end = 0;
? ? ? ? ? ? if (directory.startsWith("/")) {
? ? ? ? ? ? ? ? start = 1;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? start = 0;
? ? ? ? ? ? }
? ? ? ? ? ? end = directory.indexOf("/", start);
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? String subDirectory = new String(remote.substring(start, end));
? ? ? ? ? ? ? ? if (!ftp.changeWorkingDirectory(subDirectory)) {
? ? ? ? ? ? ? ? ? ? if (ftp.makeDirectory(subDirectory)) {
? ? ? ? ? ? ? ? ? ? ? ? ftp.changeWorkingDirectory(subDirectory);
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? log.error("mack directory error :/" + subDirectory);
? ? ? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? start = end + 1;
? ? ? ? ? ? ? ? end = directory.indexOf("/", start);
? ? ? ? ? ? ? ? // 檢查所有目錄是否創(chuàng)建完畢
? ? ? ? ? ? ? ? if (end <= start) {
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return success;
? ? }
}2、FTP連接工廠
package cn.com.pingtech.common.ftp;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
/**
?* 連接工廠
?*/
@Slf4j
public class FtpFactory {
? ? //有界隊(duì)列
? ? private static final ArrayBlockingQueue<FtpConnection> arrayBlockingQueue = new ArrayBlockingQueue<>(FtpConfig.ftpConnectionSize);
? ? protected FtpFactory(){
? ? ? ? log.info("init ftpConnectionSize "+FtpConfig.ftpConnectionSize);
? ? ? ? for(int i = 0; i< FtpConfig.ftpConnectionSize; i++){
? ? ? ? ? ? //表示如果可能的話,將 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容納,則返回 true,否則返回 false
? ? ? ? ? ? arrayBlockingQueue.offer(new FtpConnection());
? ? ? ? }
? ? }
? ? /**
? ? ?* 獲取連接
? ? ?*
? ? ?* @return
? ? ?*/
? ? public FtpConnection getFtp() {
? ? ? ? FtpConnection poll = null;
? ? ? ? try {
? ? ? ? ? ? //取走 BlockingQueue 里排在首位的對(duì)象,若 BlockingQueue 為空,阻斷進(jìn)入等待狀態(tài)直到 Blocking 有新的對(duì)象被加入為止
? ? ? ? ? ? poll = arrayBlockingQueue.take();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return poll;
? ? }
? ? /**
? ? ?* 釋放連接
? ? ?* @param ftp
? ? ?* @return
? ? ?*/
? ? public boolean relase(FtpConnection ftp){
? ? ? ? return arrayBlockingQueue.offer(ftp);
? ? }
? ? /**
? ? ?* 刪除連接
? ? ?*
? ? ?* @param ftp
? ? ?*/
? ? public void remove(FtpConnection ftp) {
? ? ? ? arrayBlockingQueue.remove(ftp);
? ? }
? ? /**
? ? ?* 關(guān)閉連接
? ? ?*/
? ? public void close() {
? ? ? ? for (FtpConnection connection : arrayBlockingQueue) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? connection.disconnect();
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
}3、FTP配置
package cn.com.pingtech.common.ftp;
/**
?* ftp 配置類
?*/
public class FtpConfig {
? ? public static int defaultTimeoutSecond = 10;
? ? public static int connectTimeoutSecond = 10;
? ? public static int dataTimeoutSecond = 10;
? ? public static String host = "127.0.0.1";
? ? public static int port =9999;
? ? public static String user = "Administrator";
? ? public static String password ="Yp886611";
? ? public static int threadPoolSize = 1;
? ? public static int ftpConnectionSize = 1;
? ??
}4、構(gòu)建多線程FTP上傳任務(wù)
package cn.com.pingtech.common.ftp;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;
/**
?* 上傳任務(wù)
?*/
public class UploadTask implements Callable{
? ? private File file;
? ? private FtpConnection ftp;
? ? private String path;
? ? private String fileName;
? ? private FtpFactory factory;
? ? public UploadTask(FtpFactory factory,FtpConnection ftp, File file, String path, String fileName){
? ? ? ? this.factory = factory;
? ? ? ? this.ftp = ftp;
? ? ? ? this.file = file;
? ? ? ? this.path = path;
? ? ? ? this.fileName = fileName;
? ? }
? ? @Override
? ? public UploadResult call() throws Exception {
? ? ? ? UploadResult result = null;
? ? ? ? try {
? ? ? ? ? ? if (ftp == null) {
? ? ? ? ? ? ? ? result = new UploadResult(file.getAbsolutePath(), false);
? ? ? ? ? ? ? ? return result;
? ? ? ? ? ? }
? ? ? ? ? ? //如果連接未開啟 重新獲取連接
? ? ? ? ? ? if (!ftp.isConnected()) {
? ? ? ? ? ? ? ? factory.remove(ftp);
? ? ? ? ? ? ? ? ftp = new FtpConnection();
? ? ? ? ? ? }
? ? ? ? ? ? //開始上傳
? ? ? ? ? ? result = new UploadResult(file.getName(), ftp.upload(path, fileName, file));
? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? result = new UploadResult(file.getName(), false);
? ? ? ? ? ? ex.printStackTrace();
? ? ? ? } finally {
? ? ? ? ? ? factory.relase(ftp);//釋放連接
? ? ? ? }
? ? ? ? return result;
? ? }
}package cn.com.pingtech.common.ftp;
/**
?* 上傳結(jié)果
?*/
public class UploadResult {
? ? private String fileName; //文件名稱
? ? private boolean result; //是否上傳成功
? ? public UploadResult(String fileName, boolean result) {
? ? ? ? this.fileName = fileName;
? ? ? ? this.result = result;
? ? }
? ? public String getFileName() {
? ? ? ? return fileName;
? ? }
? ? public void setFileName(String fileName) {
? ? ? ? this.fileName = fileName;
? ? }
? ? public boolean isResult() {
? ? ? ? return result;
? ? }
? ? public void setResult(boolean result) {
? ? ? ? this.result = result;
? ? }
? ? public String toString() {
? ? ? ? return "[fileName=" + fileName + " , result=" + result + "]";
? ? }
}注意:實(shí)現(xiàn)Callable接口的任務(wù)線程能返回執(zhí)行結(jié)果
Callable接口支持返回執(zhí)行結(jié)果,此時(shí)需要調(diào)用FutureTask.get()方法實(shí)現(xiàn),此方法會(huì)阻塞線程直到獲取“將來”的結(jié)果,當(dāng)不調(diào)用此方法時(shí),主線程不會(huì)阻塞
5、FTP上傳工具類
package cn.com.pingtech.common.ftp;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
?* ftp上傳工具包
?*/
public class FtpUtil {
? ? /**
? ? ?* 上傳文件
? ? ?*
? ? ?* @param ftpPath
? ? ?* @param listFiles
? ? ?* @return
? ? ?*/
? ? public static synchronized List upload(String ftpPath, File[] listFiles) {
? ? ? ? //構(gòu)建線程池
? ? ? ? ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(FtpConfig.threadPoolSize);
? ? ? ? List<Future> results = new ArrayList<>();
? ? ? ? //創(chuàng)建n個(gè)ftp鏈接
? ? ? ? FtpFactory factory = new FtpFactory();
? ? ? ? for (File file : listFiles) {
? ? ? ? ? ? FtpConnection ftp = factory.getFtp();//獲取ftp con
? ? ? ? ? ? UploadTask upload = new UploadTask(factory,ftp, file, ftpPath, file.getName());
? ? ? ? ? ? Future submit = newFixedThreadPool.submit(upload);
? ? ? ? ? ? results.add(submit);
? ? ? ? }
? ? ? ? List listResults = new ArrayList<>();
? ? ? ? for (Future result : results) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? //獲取線程結(jié)果
? ? ? ? ? ? ? ? UploadResult uploadResult = (UploadResult)result.get(30, TimeUnit.MINUTES);
? ? ? ? ? ? ? ? listResults.add(uploadResult);
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? factory.close();
? ? ? ? newFixedThreadPool.shutdown();
? ? ? ? return listResults;
? ? }
}6、測(cè)試上傳
package cn.com.pingtech.common.ftp
class Client {
? ? public static void main(String[] args) throws IOException {
? ? ? ? String loalPath = "C:\\Users\\Administrator\\Desktop\\test\\0";
? ? ? ? String ftpPath = "/data/jcz/";
? ? ? ? File parentFile = new File(loalPath);
? ? ? ? List <UploadResult> list = FtpUtil.upload(ftpPath,parentFile.listFiles());
? ? ? ? for(UploadResult vo:list){
? ? ? ? ? ? System.out.println(vo);
? ? ? ? }
? ? ? ??
? ? }
}注意:FTP協(xié)議里面,規(guī)定文件名編碼為iso-8859-1,所以目錄名或文件名需要轉(zhuǎn)碼
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot之restTemplate的配置及使用方式
這篇文章主要介紹了Springboot之restTemplate的配置及使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10
Java數(shù)據(jù)結(jié)構(gòu) 遞歸之迷宮回溯案例講解
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)遞歸之迷宮回溯案例講解,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08
SpringBoot 使用 @Value 注解讀取配置文件給靜態(tài)變量賦值
這篇文章主要介紹了SpringBoot 使用 @Value 注解讀取配置文件給靜態(tài)變量賦值,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
Java實(shí)現(xiàn)定時(shí)讀取json文件里內(nèi)容的示例代碼
有時(shí)候我們會(huì)需要定時(shí)來讀取JSON配置文件里的內(nèi)容,來執(zhí)行一些業(yè)務(wù)邏輯上的操作,本文就介紹了Java實(shí)現(xiàn)定時(shí)讀取json文件里內(nèi)容的示例代碼,感興趣的可以了解一下2023-08-08
使用socket實(shí)現(xiàn)網(wǎng)絡(luò)聊天室和私聊功能
這篇文章主要介紹了使用socket實(shí)現(xiàn)網(wǎng)絡(luò)聊天室和私聊功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-12-12
Java JSch遠(yuǎn)程執(zhí)行Shell命令的方法
本文主要介紹了Java JSch遠(yuǎn)程執(zhí)行Shell命令,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02
Spring boot打包jar分離lib和resources方法實(shí)例
這篇文章主要介紹了Spring boot打包jar分離lib和resources方法實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05
Spring Boot整合MyBatis連接Oracle數(shù)據(jù)庫的步驟全紀(jì)錄
這篇文章主要給大家介紹了關(guān)于Spring Boot整合MyBatis連接Oracle數(shù)據(jù)庫的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-07-07
SpringBoot中實(shí)現(xiàn)Redis?Stream隊(duì)列的代碼實(shí)例
本文介紹了如何在Spring?Boot中使用Redis?Stream隊(duì)列進(jìn)行消息的生產(chǎn)和消費(fèi),涉及到的主要內(nèi)容包括添加Redis依賴、配置RedisTemplate、創(chuàng)建生產(chǎn)者和消費(fèi)者監(jiān)聽器等,需要的朋友可以參考下2024-09-09

