Hadoop之Mapreduce序列化
什么是序列化:
序列化就是把內(nèi)存中的對(duì)象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(chǔ)到磁盤(持久化)和網(wǎng)絡(luò)傳輸。
什么是反序列化:
反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是磁盤的持久化數(shù)據(jù),轉(zhuǎn)換成內(nèi)存中的對(duì)象。
為什么要序列化:
一般來(lái)說(shuō),“活的”對(duì)象只生存在內(nèi)存里,關(guān)機(jī)斷 電就沒(méi)有了。而且“活的”對(duì)象只能由本地的進(jìn)程使用,不能被發(fā)送到網(wǎng)絡(luò)上的另外一臺(tái)計(jì)算機(jī)。 然而序列化可以存儲(chǔ)“活的”對(duì)象,可以將“活的”對(duì)象發(fā)送到遠(yuǎn)程計(jì)算機(jī)。
Java的序列化:
在Java中也是有序列化的,我們?yōu)槭裁床煌ㄟ^(guò)idea使用Java的序列化那?
因?yàn)镴ava的序列化框架(Serializable)是一個(gè)繁重的框架,附帶信息比較多(各種校驗(yàn)信息,Header,繼承體系等),不便于在網(wǎng)絡(luò)中高效傳輸。所以,Hadoop自己開(kāi)發(fā)了一套序列化機(jī)制(Writable)。
Hadoop序列化:
Hadoop的序列化比較精簡(jiǎn),只有簡(jiǎn)單的校驗(yàn),有緊湊(高效使用存儲(chǔ)空間),快速(讀寫數(shù)據(jù)的額外開(kāi)銷小),互操作(支持多語(yǔ)言的交互)的特點(diǎn)。
自定義序列化接口:
在開(kāi)發(fā)過(guò)程中,基本序列化類型不能滿足所有需求,比如在Hadoop框架內(nèi)部傳遞一個(gè)bean對(duì)象(不是基本的數(shù)據(jù)類型(某個(gè)類)----沒(méi)有對(duì)應(yīng)的Hadoop類型),那么該對(duì)象就需要實(shí)現(xiàn)序列化接口。
實(shí)現(xiàn)序列化的步驟:
先看源碼進(jìn)行簡(jiǎn)單分析:
Writable接口(好像也分析不出什么)
兩個(gè)方法:
1.write: 進(jìn)行序列化
2.readFields:進(jìn)行反序列化

(1) 反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造
public FlowBean() {
super();
}(2) 重寫接口中的兩個(gè)方法***(注意:反序列化的順序和序列化的順序完全一致)
如數(shù)據(jù)結(jié)構(gòu)中的隊(duì)列一樣先進(jìn)先出,先序列化則先反序列化
(3)需要重寫toString()方法,因?yàn)樾枰蛴〕鰜?lái),否則打印出來(lái)的是地址
(4) 如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)Comparable接口,因?yàn)镸apReduce框中的Shuffle過(guò)程要求對(duì)key必須能排序。(比如:上一篇博客中的計(jì)算單詞出現(xiàn)次數(shù)中 最后呈現(xiàn)的單詞是按照26個(gè)英文字母的順序進(jìn)行排序的)
看一個(gè)樣例源碼(字符串Text):

看到上圖 實(shí)現(xiàn)接口:
WritableComparable<BinaryComparable>
跟進(jìn)一下:

看到該接口繼承自Comparable接口(這是Java中的一個(gè)API)
序列化案例實(shí)操:
案例需求:
統(tǒng)計(jì)每一個(gè)手機(jī)號(hào)耗費(fèi)的總上行流量、總下行流量、總流量
(1)輸入數(shù)據(jù):

(2)輸入數(shù)據(jù)格式:

(3)期望輸出數(shù)據(jù)格式

需求分析:
先輸入數(shù)據(jù),輸入數(shù)據(jù)后需要進(jìn)行mapper階段---》reduce階段---》輸出階段
mapper階段:
先考慮輸入kv (k---偏移量 v是一行數(shù)據(jù))
輸出(kv)為reduce的輸入(kv) (本樣例中使用的k是手機(jī)號(hào)--統(tǒng)計(jì)手機(jī)號(hào)的流量使用 v為上行流量,下行流量,總流量 則需要封裝bean類(自定義對(duì)象) 再進(jìn)行序列化傳輸(為什么要進(jìn)行序列化那?----因?yàn)樵儆?jì)算的過(guò)程中可能由于資源問(wèn)題mapper和reduce不在同一臺(tái)服務(wù)器上))
輸出(kv)同樣也是(手機(jī)號(hào),bean類)
編寫MapperReduce程序:
1.FlowBean代碼:
package com.tangxiaocong.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*
*
* 定義bean類
* 需要實(shí)現(xiàn)writable
* 重寫序列化和反序列化方法
* 重寫空參構(gòu)造
* 重寫tostring方法
*
* */
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.downFlow+this.upFlow;
}
//生成空參構(gòu)造函數(shù)由于反射 快捷鍵alt + insert
public FlowBean() {
}
@Override
public void write(DataOutput out) throws IOException {
//序列化方法
// 向緩沖流中寫入Long類型的數(shù)據(jù)
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
//反序列化方法
//讀取緩沖區(qū)中數(shù)據(jù)
this.upFlow= in.readLong();
this.downFlow= in.readLong();
this.sumFlow= in.readLong();
}
@Override
public String toString() {
return upFlow + "\t"+downFlow +"\t"+ sumFlow ;
}
}2.Mapper代碼:
package com.tangxiaocong.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
private Text outK=new Text();
private FlowBean outV=new FlowBean(); //調(diào)用的無(wú)參構(gòu)造函數(shù)
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//1 獲取一行
//1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String s = value.toString();// 將數(shù)據(jù)轉(zhuǎn)換成string
//2 進(jìn)行切割
String[] split = s.split("\t"); //將數(shù)據(jù)按寫入形式進(jìn)行切割
//3 抓取想要的數(shù)據(jù)
//根據(jù)角標(biāo)獲取 手機(jī)號(hào) 上行流量 下行流量
String phone = split[1];
String up = split[split.length - 3];// 不能正序 因?yàn)橛械膶傩允菦](méi)有字段的
String down = split[split.length - 2];
// 封裝輸出的kv
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));// up為string類型
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow(); //
//寫出
context.write(outK,outV);
}
}3. reduce代碼:
package com.tangxiaocong.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer <Text,FlowBean,Text,FlowBean>{
private FlowBean outv=new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long totalUp=0;
long totaldown=0;
//分析 傳入TEXT 為手機(jī)號(hào) 后邊為集合(Bean類的對(duì)象的集合)輸出還是一個(gè)一個(gè)bean類 (每個(gè)手機(jī)號(hào)的總和)
for (FlowBean value : values) { //傳入的參數(shù)是同一個(gè)key的
totalUp+=value.getUpFlow();
totaldown+=value.getDownFlow();
}
// 現(xiàn)在求出的是每個(gè)手機(jī)號(hào)的總的上行流量 下行流量
//封裝 key不需要
//outv
outv.setUpFlow(totalUp);
outv.setDownFlow(totaldown);
outv.setSumFlow();
//寫出
context.write(key,outv);
}
}4.driver代碼:
package com.tangxiaocong.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//獲取JOB
Configuration entries = new Configuration();
Job job = Job.getInstance(entries);
job.setJarByClass(FlowDriver.class);
//關(guān)聯(lián)mapper 和reduce
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//設(shè)置mapper 輸出的key 和value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 設(shè)置最終的數(shù)據(jù)輸出的key和value 類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//設(shè)置數(shù)據(jù)的輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\phone_data.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output3"));
//提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}最后運(yùn)行
出現(xiàn)了bug 經(jīng)過(guò)兩個(gè)小時(shí)的調(diào)試 找出答案 是在driver類中設(shè)置mapper類輸出kv類型出現(xiàn)差錯(cuò)沒(méi)有配型成功
現(xiàn)在是運(yùn)作正確的



到此這篇關(guān)于Hadoop之Mapreduce序列化的文章就介紹到這了,更多相關(guān)Mapreduce序列化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot使用redis進(jìn)行api防刷限流過(guò)程詳解
這篇文章主要介紹了Springboot使用redis進(jìn)行api防刷限流過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
Java實(shí)現(xiàn)單機(jī)版五子棋游戲的示例代碼
五子棋是世界智力運(yùn)動(dòng)會(huì)競(jìng)技項(xiàng)目之一,是一種兩人對(duì)弈的純策略型棋類游戲,是世界智力運(yùn)動(dòng)會(huì)競(jìng)技項(xiàng)目之一。本文將用java語(yǔ)言實(shí)現(xiàn)單機(jī)版五子棋游戲,感興趣的可以了解一下2022-09-09
基于Java的界面開(kāi)發(fā)詳細(xì)步驟(用戶注冊(cè)登錄)
通過(guò)一段時(shí)間Java Web的學(xué)習(xí),寫一個(gè)簡(jiǎn)單的注冊(cè)登陸界面來(lái)做個(gè)總結(jié),這篇文章主要給大家介紹了基于Java的界面開(kāi)發(fā)(用戶注冊(cè)登錄)的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01
SpringBoot使用Micrometer實(shí)現(xiàn)度量和監(jiān)控
在構(gòu)建和維護(hù)現(xiàn)代應(yīng)用程序時(shí),度量和監(jiān)控是至關(guān)重要的,它們可以幫助您了解應(yīng)用程序的性能、穩(wěn)定性和可用性,本文將介紹如何在Spring Boot應(yīng)用程序中使用Micrometer進(jìn)行度量和監(jiān)控,需要的朋友可以參考下2023-10-10
java數(shù)學(xué)類Math?BigInteger?BigDecimal使用介紹
這篇文章主要為大家介紹了java數(shù)學(xué)類Math、BigInteger、BigDecimal的使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06

