云計(jì)算實(shí)驗(yàn):Java?MapReduce編程
實(shí)驗(yàn)題目:
MapReduce:編程
實(shí)驗(yàn)內(nèi)容:
本實(shí)驗(yàn)利用 Hadoop 提供的 Java API 進(jìn)行編程進(jìn)行 MapReduce 編程。
實(shí)驗(yàn)?zāi)繕?biāo):
- 掌握
MapReduce編程。 - 理解
MapReduce原理
【實(shí)驗(yàn)作業(yè)】簡(jiǎn)單流量統(tǒng)計(jì)
有如下這樣的日志文件:
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230513 00-FD-07-A4-72-B8:CMCC 120.196.40.8 i02.c.aliimg.com 248 0 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230533 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230543 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Video website 1527 2106 200
13926230553 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13826230563 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13926230573 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 Integrated portal 1938 2910 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 i02.c.aliimg.com 3333 21321 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Search Engines 9531 9531 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
該日志文件記錄了每個(gè)手機(jī)用戶(hù)在一段時(shí)間內(nèi)的網(wǎng)絡(luò)流量信息,具體字段含義為:
手機(jī)號(hào)碼 MAC地址 IP地址 域名 上行流量(字節(jié)數(shù)) 下行流量(字節(jié)數(shù)) 套餐類(lèi)型
根據(jù)以上日志,統(tǒng)計(jì)出每個(gè)手機(jī)用戶(hù)在該時(shí)間段內(nèi)的總流量(上行流量+下行流量),統(tǒng)計(jì)結(jié)果的格式為:
手機(jī)號(hào)碼 字節(jié)數(shù)量
實(shí)驗(yàn)結(jié)果:

實(shí)驗(yàn)代碼:
WcMap.java
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String str = value.toString();
String[] words = StringUtils.split(str," ",10);
int i=0;
for(String word : words){
if(i==words.length-2||i==words.length-3)
context.write(new Text(words[0]), new LongWritable(Integer.parseInt(word)));
i++;
}
}
}
WcReduce.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
long count = 0;
for(LongWritable value : values){
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
WcRunner.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
public class WcRunner{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WcRunner.class);
job.setMapperClass(WcMap.class);
job.setReducerClass(WcReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
Scanner sc = new Scanner(System.in);
System.out.print("inputPath:");
String inputPath = sc.next();
System.out.print("outputPath:");
String outputPath = sc.next();
try {
FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path hdfsPath = new Path(outputPath);
fs0.copyFromLocalFile(new Path("/headless/Desktop/workspace/mapreduce/WordCount/data/1.txt"),new Path("/mapreduce/WordCount/input/1.txt"));
if(fs0.delete(hdfsPath,true)){
System.out.println("Directory "+ outputPath +" has been deleted successfully!");
}
}catch(Exception e) {
e.printStackTrace();
}
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
job.waitForCompletion(true);
try {
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path srcPath = new Path(outputPath+"/part-r-00000");
FSDataInputStream is = fs.open(srcPath);
System.out.println("Results:");
while(true) {
String line = is.readLine();
if(line == null) {
break;
}
System.out.println(line);
}
is.close();
}catch(Exception e) {
e.printStackTrace();
}
}
}
【實(shí)驗(yàn)作業(yè)】索引倒排輸出行號(hào)
在索引倒排實(shí)驗(yàn)中,我們可以得到每個(gè)單詞分布在哪些文件中,以及在每個(gè)文件中出現(xiàn)的次數(shù),修改以上實(shí)現(xiàn),在輸出的倒排索引結(jié)果中可以得到每個(gè)單詞在每個(gè)文件中的具體行號(hào)信息。輸出結(jié)果的格式如下:
單詞 文件名:行號(hào),文件名:行號(hào),文件名:行號(hào)
實(shí)驗(yàn)結(jié)果:
MapReduce在3.txt的第一行出現(xiàn)了兩次所以有兩個(gè)1

import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MyMapper extends Mapper<Object,Text,Text,Text>{
private Text keyInfo = new Text();
private Text valueInfo = new Text();
private FileSplit split;
int num=0;
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
num++;
split = (FileSplit)context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
keyInfo.set(itr.nextToken()+" "+split.getPath().getName().toString());
valueInfo.set(num+"");
context.write(keyInfo,valueInfo);
}
}
}
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class MyCombiner extends Reducer<Text,Text,Text,Text>{
private Text info = new Text();
public void reduce(Text key,Iterable<Text>values,Context context)
throws IOException, InterruptedException{
String sum = "";
for(Text value:values){
sum += value.toString()+" ";
}
String record = key.toString();
String[] str = record.split(" ");
key.set(str[0]);
info.set(str[1]+":"+sum);
context.write(key,info);
}
}
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text,Text,Text,Text>{
private Text result = new Text();
public void reduce(Text key,Iterable<Text>values,Context context) throws
IOException, InterruptedException{
String value =new String();
for(Text value1:values){
value += value1.toString()+" ; ";
}
result.set(value);
context.write(key,result);
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
public class MyRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyRunner.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setCombinerClass(MyCombiner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Scanner sc = new Scanner(System.in);
System.out.print("inputPath:");
String inputPath = sc.next();
System.out.print("outputPath:");
String outputPath = sc.next();
try {
FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path hdfsPath = new Path(outputPath);
if(fs0.delete(hdfsPath,true)){
System.out.println("Directory "+ outputPath +" has been deleted successfully!");
}
}catch(Exception e) {
e.printStackTrace();
}
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
job.waitForCompletion(true);
try {
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
Path srcPath = new Path(outputPath+"/part-r-00000");
FSDataInputStream is = fs.open(srcPath);
System.out.println("Results:");
while(true) {
String line = is.readLine();
if(line == null) {
break;
}
System.out.println(line);
}
is.close();
}catch(Exception e) {
e.printStackTrace();
}
}
}
到此這篇關(guān)于云計(jì)算實(shí)驗(yàn):Java MapReduce編程的文章就介紹到這了,更多相關(guān)Java MapReduce編程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot如何手寫(xiě)一個(gè)starter并使用這個(gè)starter詳解
starter是SpringBoot中的一個(gè)新發(fā)明,它有效的降低了項(xiàng)目開(kāi)發(fā)過(guò)程的復(fù)雜程度,對(duì)于簡(jiǎn)化開(kāi)發(fā)操作有著非常好的效果,下面這篇文章主要給大家介紹了關(guān)于SpringBoot如何手寫(xiě)一個(gè)starter并使用這個(gè)starter的相關(guān)資料,需要的朋友可以參考下2022-12-12
在Java程序中使用數(shù)據(jù)庫(kù)的新方法
這篇文章主要介紹了在Java程序中使用數(shù)據(jù)庫(kù)的新方法,講述了Java8以來(lái)數(shù)據(jù)庫(kù)API的一些新特性,需要的朋友可以參考下2015-07-07
Mybatis一對(duì)多和多對(duì)一處理的深入講解
Mybatis可以通過(guò)關(guān)聯(lián)查詢(xún)實(shí)現(xiàn),關(guān)聯(lián)查詢(xún)是幾個(gè)表聯(lián)合查詢(xún),只查詢(xún)一次,通過(guò)在resultMap里面的association,collection節(jié)點(diǎn)配置一對(duì)一,一對(duì)多的類(lèi)就可以完成,這篇文章主要給大家介紹了關(guān)于Mybatis一對(duì)多和多對(duì)一處理的相關(guān)資料,需要的朋友可以參考下2021-09-09
Springboot支持Emoji表情的實(shí)現(xiàn)方法
本文主要介紹了Springboot 支持Emoji 表情,本篇的實(shí)現(xiàn)方式是僅需后端處理,具有一定的參考價(jià)值,需要的朋友可以參考一下。2021-07-07
SpringBoot Knife4j在線API文檔框架基本使用
knife4j是為Java MVC框架集成Swagger生成Api文檔的增強(qiáng)解決方案,這篇文章主要介紹了SpringBoot中使用Knife4J在線API文檔框架,需要的朋友可以參考下2022-12-12
Java壓縮文件工具類(lèi)ZipUtil使用方法代碼示例
這篇文章主要介紹了Java壓縮文件工具類(lèi)ZipUtil使用方法代碼示例,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-11-11
使用Spring Security集成手機(jī)驗(yàn)證碼登錄功能實(shí)現(xiàn)
本文詳細(xì)介紹了如何利用SpringSecurity來(lái)實(shí)現(xiàn)手機(jī)驗(yàn)證碼的注冊(cè)和登錄功能,在登錄過(guò)程中,同樣需通過(guò)驗(yàn)證碼進(jìn)行驗(yàn)證,文章還提供了相關(guān)的代碼實(shí)現(xiàn)2024-10-10

