java 中自定義OutputFormat的實例詳解
java 中 自定義OutputFormat的實例詳解
實例代碼:
package com.ccse.hadoop.outputformat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
public class MySelfOutputFormatApp {
public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";
public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";
public final static String OUTPUT_FILENAME = "/abc";
public static void main(String[] args) throws IOException, URISyntaxException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
fileSystem.delete(new Path(OUTPUT_PATH), true);
Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());
job.setJarByClass(MySelfOutputFormatApp.class);
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(MyselfOutputFormat.class);
job.waitForCompletion(true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
private LongWritable writable = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
if (value != null) {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, writable);
}
}
}
}
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> {
private FSDataOutputStream outputStream = null;
@Override
public RecordWriter<Text, LongWritable> getRecordWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
try {
FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());
//指定文件的輸出路徑
final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH
+ MySelfOutputFormatApp.OUTPUT_FILENAME);
this.outputStream = fileSystem.create(path, false);
} catch (URISyntaxException e) {
e.printStackTrace();
}
return new MySelfRecordWriter(outputStream);
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);
}
}
public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> {
private FSDataOutputStream outputStream = null;
public MySelfRecordWriter(FSDataOutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public void write(Text key, LongWritable value) throws IOException,
InterruptedException {
this.outputStream.writeBytes(key.toString());
this.outputStream.writeBytes("\t");
this.outputStream.writeLong(value.get());
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
this.outputStream.close();
}
}
}
2.OutputFormat是用于處理各種輸出目的地的。
2.1 OutputFormat需要寫出去的鍵值對,是來自于Reducer類,是通過RecordWriter獲得的。
2.2 RecordWriter中的write(...)方法只有k和v,寫到哪里去哪?這要通過單獨傳入OutputStream來處理。write就是把k和v寫入到OutputStream中的。
2.3 RecordWriter類位于OutputFormat中的。因此,我們自定義的OutputFromat必須繼承OutputFormat類型。那么,流對象必須在getRecordWriter(...)方法中獲得。
以上就是java 中自定義OutputFormat的實例,如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
相關文章
Java根據(jù)前端傳回的圖片生成pdf并且加密碼和水印
這篇文章主要為大家詳細介紹了java如何根據(jù)前端傳回的png圖片數(shù)組,后端加水印加密碼生成pdf并返回給前端,感興趣的小伙伴可以參考一下2025-01-01
分布式調度XXL-Job整合Springboot2.X實戰(zhàn)操作過程(推薦)
這篇文章主要介紹了分布式調度XXL-Job整合Springboot2.X實戰(zhàn)操作,包括定時任務的使用場景和常見的定時任務,通過本文學習幫助大家該選擇哪個分布式任務調度平臺,對此文感興趣的朋友一起看看吧2022-04-04
idea maven 項目src下的配置文件沒有同步至target的解決操作
這篇文章主要介紹了idea maven 項目src下的配置文件沒有同步至target的解決操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08
Java設計模式之狀態(tài)模式State Pattern詳解
這篇文章主要介紹了Java設計模式之狀態(tài)模式State Pattern,狀態(tài)模式允許一個對象在其內部狀態(tài)改變的時候改變其行為。這個對象看上去就像是改變了它的類一樣2022-11-11
解決IDEA使用maven創(chuàng)建Web項目,出現(xiàn)500錯誤的問題
本文主要介紹了在使用Maven創(chuàng)建項目并導入依賴寫完測試代碼后運行出現(xiàn)500錯誤的解決步驟,這種問題的根本原因是Tomcat啟動后缺少某些支持的jar包,導致運行出錯,解決方法是在項目結構中找到Artifacts,點擊要編輯的項目2024-10-10

