Hadoop MultipleOutputs輸出到多個文件中的實現(xiàn)方法
Hadoop MultipleOutputs輸出到多個文件中的實現(xiàn)方法
1.輸出到多個文件或多個文件夾:
驅(qū)動中不需要額外改變,只需要在MapClass或Reduce類中加入如下代碼
private MultipleOutputs<Text,IntWritable> mos;
public void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs(context);
}
public void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value);
在MapClass或Reduce中使用,輸出時也會有默認的文件part-m-00*或part-r-00*,不過這些文件是無內(nèi)容的,大小為0. 而且只有part-m-00*會傳給Reduce。
注意:multipleOutputs.write(key, value, baseOutputPath)方法的第三個函數(shù)表明了該輸出所在的目錄(相對于用戶指定的輸出目錄)。
如果baseOutputPath不包含文件分隔符“/”,那么輸出的文件格式為baseOutputPath-r-nnnnn(name-r-nnnnn);
如果包含文件分隔符“/”,例如baseOutputPath=“029070-99999/1901/part”,那么輸出文件則為029070-99999/1901/part-r-nnnnn
2.案例-需求
需求,下面是有些測試數(shù)據(jù),要對這些數(shù)據(jù)按類目輸出到output中:
1512,iphone5s,4英寸,指紋識別,A7處理器,64位,M7協(xié)處理器,低功耗 1512,iphone5,4英寸,A6處理器,IOS7 1512,iphone4s,3.5英寸,A5處理器,雙核,經(jīng)典 50019780,ipad,9.7英寸,retina屏幕,豐富的應用 50019780,yoga,聯(lián)想,待機18小時,外形獨特 50019780,nexus 7,華碩&google,7英寸 50019780,ipad mini 2,retina顯示屏,蘋果,7.9英寸 1101,macbook air,蘋果超薄,OS X mavericks 1101,macbook pro,蘋果,OS X lion 1101,thinkpad yoga,聯(lián)想,windows 8,超級本
3.Mapper程序:
package cn.edu.bjut.multioutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MultiOutPutMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
if(null != line && 0 != line.length()) {
String[] arr = line.split(",");
context.write(new IntWritable(Integer.parseInt(arr[0])), value);
}
}
}
4.Reducer程序:
package cn.edu.bjut.multioutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class MultiOutPutReducer extends
Reducer<IntWritable, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs = null;
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for(Text text : values) {
multipleOutputs.write("KeySpilt", NullWritable.get(), text, key.toString()+"/");
multipleOutputs.write("AllPart", NullWritable.get(), text);
}
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
if(null != multipleOutputs) {
multipleOutputs.close();
multipleOutputs = null;
}
}
}
5.主程序:
package cn.edu.bjut.multioutput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MainJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "aaa");
job.setJarByClass(MainJob.class);
job.setMapperClass(MultiOutPutMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MultiOutPutReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
MultipleOutputs.addNamedOutput(job, "KeySpilt", TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "AllPart", TextOutputFormat.class, NullWritable.class, Text.class);
Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}
如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
相關文章
Springboot項目使用Slf4j將日志保存到本地目錄的實現(xiàn)代碼
這篇文章主要介紹了Springboot項目使用Slf4j將日志保存到本地目錄的實現(xiàn)方法,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05
Spring根據(jù)URL參數(shù)進行路由的方法詳解
這篇文章主要給大家介紹了關于Spring根據(jù)URL參數(shù)進行路由的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起來看看吧。2017-12-12

