hadoop上傳文件功能實例代碼
hdfs上的文件是手動執(zhí)行命令從本地linux上傳至hdfs的。在真實的運行環(huán)境中,我們不可能每次手動執(zhí)行命令上傳的,這樣太過繁瑣。那么,我們可以使用hdfs提供的Java api實現(xiàn)文件上傳至hdfs,或者直接從ftp上傳至hdfs。
然而,需要說明一點,之前筆者是要運行MR,都需要每次手動執(zhí)行yarn jar,在實際的環(huán)境中也不可能每次手動執(zhí)行。像我們公司是使用了索答的調度平臺/任務監(jiān)控平臺,可以定時的以工作流執(zhí)行我們的程序,包括普通java程序和MR。其實,這個調度平臺就是使用了quartz。當然,這個調度平臺也提供其它的一些功能,比如web展示、日志查看等,所以也不是免費的。
首先,給大家簡單介紹一下hdfs。hdfs是以流式數(shù)據(jù)訪問模式來存儲超大文件,hdfs的構建思路是一次寫入,多次讀取,這樣才是最高效的訪問模式。hdfs是為高數(shù)據(jù)吞吐量應用優(yōu)化的,所以會以提高時間延遲為代價。對于低延時的訪問需求,我們可以使用hbase。
然后,還要知道hdfs中塊(block)的概念,默認為64MB。塊是hdfs的數(shù)據(jù)讀寫的最小單位,通常每個map任務一次只處理一個block,像我們對集群性能評估就會使用到這個概念,比如目前有多少節(jié)點,每個節(jié)點的磁盤空間、cpu以及所要處理的數(shù)據(jù)量、網(wǎng)絡帶寬,通過這些信息來進行性能評估。我們可以使用Hadoop fsck / -files -blocks列出文件系統(tǒng)中各個文件由哪些塊構成。
然后,再就是要知道namenode和datanode,這個在之前的博文已經(jīng)介紹過,下面看看cm環(huán)境中hdfs的管理者(namenode)和工作者(datanode),如下

在yarn環(huán)境中是可以有多個nameNode的。此環(huán)境中沒有SecondaryNameNode,當然也可以有。
好了,關于hdfs的基本概念就講到這兒了,下面來看看具體的代碼。
一、java實現(xiàn)上傳本地文件至hdfs
這里,可以直接使用hdfs提供的java api即可實現(xiàn),代碼如下:
package com.bjpowernode.hdfs.local;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* ClassName:UploadLocalFileToHdfs <br/>
* Function: 本地文件上傳至hdfs. <br/>
* Date: 2016年3月28日 下午10:06:05 <br/>
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class UploadLocalFileToHdfs {
public static void main(String[] args) {
Configuration conf = new Configuration();
String localDir = "/home/qiyongkang";
String hdfsDir = "/qiyongkang";
try{
Path localPath = new Path(localDir);
Path hdfsPath = new Path(hdfsDir);
FileSystem hdfs = FileSystem.get(conf);
hdfs.copyFromLocalFile(localPath, hdfsPath);
}catch(Exception e){
e.printStackTrace();
}
}
}
注意,這里hdfs上傳目錄如果不存在的話,hdfs會自動創(chuàng)建,比較智能。
打完包后,上傳至服務器,執(zhí)行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,然后執(zhí)行hadoop fs -ls /qiyongkang便可看到:

二、java實現(xiàn)上傳ftp上的文件至hdfs
首先,我們得準備一個ftp服務器,關于ftp服務器的搭建,大家可以查閱資料,筆者就不贅述了。
其實,從ftp上拉取文件上傳到hdfs上,這個過程大家不要想復雜了,我們講本地文件上傳到hdfs,其實就是采用流的方式。因此,我們可以直接讀取ftp上的文件流,然后以流的方式寫入到hdfs。
下面,直接貼出代碼:
package com.bjpowernode.hdfs.ftp;
import java.io.InputStream;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* ClassName:UploadFtpFileToHdfs <br/>
* Function: TODO ADD FUNCTION. <br/>
* Reason: TODO ADD REASON. <br/>
* Date: 2016年3月28日 下午10:50:37 <br/>
*
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class UploadFtpFileToHdfs {
public static void main(String[] args) {
Configuration conf = new Configuration();
loadFromFtpToHdfs("172.31.26.200", "qiyongkang", "qyk123456", "/www/input/", "/qiyongkang/", conf);
}
/**
*
* loadFromFtpToHdfs:將數(shù)據(jù)從ftp上傳到hdfs上. <br/>
*
* @author qiyongkang
* @param ip
* @param username
* @param password
* @param filePath
* @param outputPath
* @param conf
* @return
* @since JDK 1.6
*/
private static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath,
String outputPath, Configuration conf) {
FTPClient ftp = new FTPClient();
InputStream inputStream = null;
FSDataOutputStream outputStream = null;
boolean flag = true;
try {
ftp.connect(ip);
ftp.login(username, password);
ftp.setFileType(FTP.BINARY_FILE_TYPE);
ftp.setControlEncoding("UTF-8");
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
}
FTPFile[] files = ftp.listFiles(filePath);
FileSystem hdfs = FileSystem.get(conf);
for (FTPFile file : files) {
if (!(file.getName().equals(".") || file.getName().equals(".."))) {
inputStream = ftp.retrieveFileStream(filePath + file.getName());
outputStream = hdfs.create(new Path(outputPath + file.getName()));
IOUtils.copyBytes(inputStream, outputStream, conf, false);
if (inputStream != null) {
inputStream.close();
ftp.completePendingCommand();
}
}
}
ftp.disconnect();
} catch (Exception e) {
flag = false;
e.printStackTrace();
}
return flag;
}
}
然后同樣打包上傳后執(zhí)行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,便可看到:

總結
以上所述是小編給大家介紹的hadoop上傳文件功能實例代碼,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關文章
詳解springmvc控制登錄用戶session失效后跳轉登錄頁面
本篇文章主要介紹了springmvc控制登錄用戶session失效后跳轉登錄頁面,session一旦失效就需要重新登陸,有興趣的同學可以了解一下。2017-01-01
Spring boot jpa 刪除數(shù)據(jù)和事務管理的問題實例詳解
這篇文章主要介紹了Spring boot jpa 刪除數(shù)據(jù)和事務管理的問題實例詳解,涉及業(yè)務場景的一些知識和遇到的的問題,需要的朋友可以參考。2017-09-09

