java實(shí)現(xiàn)MapReduce對文件進(jìn)行切分的示例代碼
比如有海量的文本文件,如訂單,頁面點(diǎn)擊事件的記錄,量特別大,很難搞定。
那么我們該怎樣解決海量數(shù)據(jù)的計(jì)算?
1、獲取總行數(shù)
2、計(jì)算每個(gè)文件中存多少數(shù)據(jù)
3、split切分文件
4、reduce將文件進(jìn)行匯總

例如這里有百萬條數(shù)據(jù),單個(gè)文件操作太麻煩,所以我們需要進(jìn)行切分
在切分文件的過程中會(huì)出現(xiàn)文件不能整個(gè)切分的情況,可能有剩下的數(shù)據(jù)并沒有被讀取到,所以我們每個(gè)切分128條數(shù)據(jù),不足128條再保留到一個(gè)文件中

創(chuàng)建MapTask
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MapTask extends Thread {
//用來接收具體的哪一個(gè)文件
private File file;
private int flag;
public MapTask(File file, int flag) {
this.file = file;
this.flag = flag;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(file));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
/**
* 統(tǒng)計(jì)班級(jí)人數(shù)HashMap存儲(chǔ)
*/
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
br.close();
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
String key = entry.getKey();
Integer value = entry.getValue();
bw.write(key + ":" + value);
bw.newLine();
}
bw.flush();
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
創(chuàng)建Map
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Map {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 多線程連接池(線程池)
ExecutorService executorService = Executors.newFixedThreadPool(8);
// 獲取文件列表
File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
File[] files = file.listFiles();
//創(chuàng)建多線程對象
int flag = 0;
for (File f : files) {
//為每一個(gè)文件啟動(dòng)一個(gè)線程
MapTask mapTask = new MapTask(f, flag);
executorService.submit(mapTask);
flag++;
}
executorService.shutdown();
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
創(chuàng)建ClazzSum
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
public class ClazzSum {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
System.out.println(map);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
創(chuàng)建split128
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
public class Split128 {
public static void main(String[] args) throws Exception {
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));
//用作標(biāo)記文件,也作為文件名稱
int index = 0;
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
ArrayList<String> list = new ArrayList<String>();
String line;
//用作累計(jì)讀取了多少行數(shù)據(jù)
int flag = 0;
int row = 0;
while ((line = br.readLine()) != null) {
list.add(line);
flag++;
// flag = 140
if (flag == 140) {// 一個(gè)文件讀寫完成,生成新的文件
row = 0 + 128 * index;
for (int i = row; i <= row + 127; i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
/**
* 生成新的文件
* 計(jì)數(shù)清零
*/
index++;
flag = 12;
bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
}
}
//文件讀取剩余128*1.1范圍之內(nèi)
for (int i = list.size() - flag; i < list.size(); i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
}
}
創(chuàng)建Reduce
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;
public class Reduce {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
HashMap<String, Integer> map = new HashMap<String, Integer>();
File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
File[] files = file.listFiles();
for (File f : files) {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
while ((line = br.readLine()) != null) {
String clazz = line.split(":")[0];
int sum = Integer.valueOf(line.split(":")[1]);
if (!map.containsKey(clazz)) {
map.put(clazz, sum);
} else {
map.put(clazz, map.get(clazz) + sum);
}
}
}
long end = System.currentTimeMillis();
System.out.println(end-start);
System.out.println(map);
}
}


最后將文件切分了8份,這里采用了線程池,建立線程連接,多個(gè)線程同時(shí)啟動(dòng),比單一文件采用多線程效率更高更好使。
到此這篇關(guān)于java實(shí)現(xiàn)MapReduce對文件進(jìn)行切分的示例代碼的文章就介紹到這了,更多相關(guān)java MapReduce 文件切分內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot注解Aspect實(shí)現(xiàn)方案
本文提供一種自定義注解,來實(shí)現(xiàn)業(yè)務(wù)審批操作的DEMO,不包含審批流程的配置功能。對springboot注解Aspect實(shí)現(xiàn)方案感興趣的朋友一起看看吧2022-01-01
Spring實(shí)戰(zhàn)之類級(jí)別緩存實(shí)現(xiàn)與使用方法
這篇文章主要介紹了Spring實(shí)戰(zhàn)之類級(jí)別緩存實(shí)現(xiàn)與使用方法,結(jié)合實(shí)例形式分析了Spring類級(jí)別緩存配置、屬性、領(lǐng)域模型等相關(guān)操作技巧,需要的朋友可以參考下2020-01-01
SpringCloud超詳細(xì)講解微服務(wù)網(wǎng)關(guān)Zuul
這篇文章主要介紹了SpringCloud Zuul微服務(wù)網(wǎng)關(guān),負(fù)載均衡,熔斷和限流,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-07-07
解決redisTemplate中l(wèi)eftPushAll隱性bug的問題
這篇文章主要介紹了解決redisTemplate中l(wèi)eftPushAll隱性bug的問題,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02

