java結(jié)合HADOOP集群文件上傳下載
對HDFS上的文件進行上傳和下載是對集群的基本操作,在《HADOOP權(quán)威指南》一書中,對文件的上傳和下載都有代碼的實例,但是對如何配置HADOOP客戶端卻是沒有講得很清楚,經(jīng)過長時間的搜索和調(diào)試,總結(jié)了一下,如何配置使用集群的方法,以及自己測試可用的對集群上的文件進行操作的程序。首先,需要配置對應(yīng)的環(huán)境變量:
hadoop_HOME="/home/work/tools/java/hadoop-client/hadoop"
for f in $hadoop_HOME/hadoop-*.jar; do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
for f in $hadoop_HOME/lib/*.jar; do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
hadoopvfs_HOME="/home/work/tools/java/hadoop-client/hadoop-vfs"
for f in $hadoopvfs_HOME/lib/*.jar; do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/work/tools/java/hadoop-client/hadoop/lib/native/Linux-amd64-64/
其中LD_LIBRARY_PATH是在調(diào)用時需要用到的庫的路徑,hadoop_CLASSPATH則是我們hadoop客戶端里各種jar包
有一點需要注意的是最好不要使用HADOOP_HOME這個變量,這個是一個系統(tǒng)使用的環(huán)境變量,最好不要和它沖突
編譯類的方法:
javac -classpath $CLASSPATH:$hadoop_CLASSPATH HDFSUtil.java
運行的方法:
java -classpath $CLASSPATH:$hadoop_CLASSPATH HDFSUtil
但是在實際的使用過程中,會報No Permission之類的錯誤,或者你能保證代碼沒有問題的情況下,在運行的時候也會報一些奇奇怪怪的錯誤
那么問題來了,這是什么鬼?
答案:這是因為沒有配置對應(yīng)集群的配置文件
因為在《HADOOP權(quán)威指南》一書中,弱化了配置的東西,所以在具體使用集群的時候就會出現(xiàn)問題,如何解決呢,這樣子:
this.conf = new Configuration(false);
conf.addResource("./hadoop-site.xml");
conf.addResource("./hadoop-default.xml");
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
為什么會這樣,書上只是很簡單的:
this.conf = new Configuration();
那是因為默認你的集群在本地,所以不需要做配置,但是在實際使用的過程中,各個集群的配置是不同的,所以我們要引入集群的配置
這是非常重要的一點,因為實際使用的過程中我們都是使用的HADOOP的客戶端,而且是已經(jīng)搭好環(huán)境的集群,所以我們需要做好本地的配置
hadoop-site.xml和hadoop-default.xml這兩個文件在所使用的客戶端的conf目錄下,在addResource的時候指定好目錄就行了
將以上所提到的配置,全部配完之后,這個程序才能真正運行起來,所以配置是非常重要的一環(huán)。
以下是對應(yīng)的工具的代碼,有興趣的看一下吧,使用的是文件流的方式來搞的,這樣子也可以打通FTP和HDFS之間文件的互傳:
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class HDFSUtil {
private String hdfs_node = "";
private String hdfs_path = "";
private String file_path = "";
private String hadoop_site = "";
private String hadoop_default = "";
private Configuration conf = null;
public HDFSUtil(String hdfs_node) {
this.hdfs_node = hdfs_node;
}
public String getHdfsNode() {
return this.hdfs_node;
}
public void setHdfsPath(String hdfs_path){
this.hdfs_path = hdfs_path;
}
public String getHdfsPath(){
return this.hdfs_path;
}
public void setFilePath(String file_path){
this.file_path = file_path;
}
public String getFilePath(){
return this.file_path;
}
public void setHadoopSite(String hadoop_site){
this.hadoop_site = hadoop_site;
}
public String getHadoopSite(){
return this.hadoop_site;
}
public void setHadoopDefault(String hadoop_default){
this.hadoop_default = hadoop_default;
}
public String getHadoopDefault(){
return this.hadoop_default;
}
public int setConfigure(boolean flag) {
if (flag == false){
if (this.getHadoopSite() == "" || this.getHadoopDefault() == ""){
return -1;
}
else {
this.conf = new Configuration(false);
conf.addResource(this.getHadoopDefault());
conf.addResource(this.getHadoopSite());
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
return 0;
}
}
this.conf = new Configuration();
return 0;
}
public Configuration getConfigure() {
return this.conf;
}
public int upLoad(String localName, String remoteName) throws FileNotFoundException, IOException {
InputStream inStream = null;
FileSystem fs = null;
try{
inStream = new BufferedInputStream(new FileInputStream(localName));
fs = FileSystem.get(URI.create(this.hdfs_node), this.conf);
OutputStream outStream = fs.create(new Path(remoteName) ,new Progressable() {
public void progress(){
System.out.print('.');
}
});
IOUtils.copyBytes(inStream, outStream, 4096, true);
inStream.close();
return 0;
} catch (IOException e){
inStream.close();
e.printStackTrace();
return -1;
}
}
public int upLoad(InputStream inStream, String remoteName) throws FileNotFoundException, IOException {
FileSystem fs = null;
try{
fs = FileSystem.get(URI.create(this.hdfs_node), this.conf);
OutputStream outStream = fs.create(new Path(remoteName) ,new Progressable() {
public void progress(){
System.out.print('.');
}
});
IOUtils.copyBytes(inStream, outStream, 4096, true);
inStream.close();
return 0;
} catch (IOException e){
inStream.close();
e.printStackTrace();
return -1;
}
}
public int donwLoad(String remoteName, String localName, int lines) throws FileNotFoundException, IOException {
FileOutputStream fos = null;
InputStreamReader isr = null;
BufferedReader br = null;
String str = null;
OutputStreamWriter osw = null;
BufferedWriter buffw = null;
PrintWriter pw = null;
FileSystem fs = null;
InputStream inStream = null;
try {
fs = FileSystem.get(URI.create(this.hdfs_node + remoteName), this.conf);
inStream = fs.open(new Path(this.hdfs_node + remoteName));
fos = new FileOutputStream(localName);
osw = new OutputStreamWriter(fos, "UTF-8");
buffw = new BufferedWriter(osw);
pw = new PrintWriter(buffw);
isr = new InputStreamReader(inStream, "UTF-8");
br = new BufferedReader(isr);
while((str = br.readLine()) != null && lines > 0){
lines--;
pw.println(str);
}
} catch (IOException e){
throw new IOException("Couldn't write.", e);
} finally {
pw.close();
buffw.close();
osw.close();
fos.close();
inStream.close()
}
return 0;
}
//main to test
public static void main(String[] args){
String hdfspath = null;
String localname = null;
String hdfsnode = null;
int lines = 0;
if (args.length == 4){
hdfsnode = args[0];
hdfspath = args[1];
localname = args[2];
lines = Integer.parseInt(args[3]);
}
else{
hdfsnode = "hdfs://nj01-nanling-hdfs.dmop.baidu.com:54310";
hdfspath = "/app/ps/spider/wdmqa/wangweilong/test/HDFSUtil.java";
localname = "/home/work/workspace/project/dhc2-0/dhc/base/ftp/papapa";
lines = 5;
}
HDFSUtil hdfsutil = new HDFSUtil(hdfsnode);
hdfsutil.setFilePath(hdfsutil.getHdfsNode()+hdfspath);
hdfsutil.setHadoopSite("./hadoop-site.xml");
hdfsutil.setHadoopDefault("./hadoop-default.xml");
hdfsutil.setConfigure(false);
try {
hdfsutil.donwLoad(hdfspath, localname, lines);
} catch (IOException e){
e.printStackTrace();
}
}
如果想要了解FTP上文件的下載,請參考這篇文章:
如果想要打通FTP和HDFS文件互傳,只要創(chuàng)建一個類,調(diào)用這兩篇文章中的工具的接口就可以搞定,自己寫的代碼,實測有效。
以上就是本文的全部內(nèi)容了,希望能夠?qū)Υ蠹沂炀氄莆誮ava有所幫助。
請您花一點時間將文章分享給您的朋友或者留下評論。我們將會由衷感謝您的支持!
- Java訪問Hadoop分布式文件系統(tǒng)HDFS的配置說明
- java使用hadoop實現(xiàn)關(guān)聯(lián)商品統(tǒng)計
- Java執(zhí)行hadoop的基本操作實例代碼
- 深入淺析Java Object Serialization與 Hadoop 序列化
- hadoop中實現(xiàn)java網(wǎng)絡(luò)爬蟲(示例講解)
- Java/Web調(diào)用Hadoop進行MapReduce示例代碼
- Hadoop運行時遇到j(luò)ava.io.FileNotFoundException錯誤的解決方法
- hadoop運行java程序(jar包)并運行時動態(tài)指定參數(shù)
- java實現(xiàn)對Hadoop的操作
- 利用Java連接Hadoop進行編程
相關(guān)文章
java.lang.NullPointerException異常的幾種原因及解決方案
本文主要介紹了java.lang.NullPointerException異常的幾種原因及解決方案,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04
idea手動執(zhí)行maven命令的三種實現(xiàn)方式
這篇文章主要介紹了idea手動執(zhí)行maven命令的三種實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08
Java實現(xiàn)多對多網(wǎng)絡(luò)通訊的流程
這篇文章主要介紹了Java實現(xiàn)多對多網(wǎng)絡(luò)通訊的流程,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04
Jmeter解析返回參數(shù)以及解析并操作json方式
本文總結(jié)了Java或JMeter的BeanShell腳本中處理JSON字符串的常用操作,包括解析JSON數(shù)組、JSON對象和嵌套的JSON字符串,并提供了示例代碼和打印結(jié)果2025-02-02
使用IDEA創(chuàng)建Servlet程序的詳細步驟
在學(xué)習(xí)servlet過程中,參考的教程是用eclipse完成的,而我在練習(xí)的過程中是使用IDEA的,在創(chuàng)建servlet程序時遇到了挺多困難,在此記錄一下如何用IDEA完整創(chuàng)建一個servlet程序,感興趣的朋友一起看看吧2024-08-08

