MapTask階段shuffle源碼分析
1. 收集階段
在Mapper中,調(diào)用context.write(key,value)實際是調(diào)用代理NewOutPutCollector的wirte方法
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
output.write(key, value);
}
實際調(diào)用的是MapOutPutBuffer的collect(),在進行收集前,調(diào)用partitioner來計算每個key-value的分區(qū)號
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
2. NewOutPutCollector對象的創(chuàng)建
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
// 創(chuàng)建實際用來收集key-value的緩存區(qū)對象
collector = createSortingCollector(job, reporter);
// 獲取總的分區(qū)個數(shù)
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
// 默認情況,直接創(chuàng)建一個匿名內(nèi)部類,所有的key-value都分配到0號分區(qū)
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
3. 創(chuàng)建環(huán)形緩沖區(qū)對象
@SuppressWarnings("unchecked")
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
// 從當(dāng)前Job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設(shè)置,使用MapOutputBuffer.class
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
// 創(chuàng)建緩沖區(qū)對象
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job);
// 創(chuàng)建完緩沖區(qū)對象后,執(zhí)行初始化
collector.init(context);
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
lastException = e;
LOG.warn(msg, e);
}
}
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was :" + lastException.getMessage(), lastException);
}
3. MapOutPutBuffer的初始化 環(huán)形緩沖區(qū)對象
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
// 獲取分區(qū)總個數(shù),取決于ReduceTask的數(shù)量
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks
// 從當(dāng)前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設(shè)置,就是0.8
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
// 獲取mapreduce.task.io.sort.mb,如果沒設(shè)置,就是100MB
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
// 在溢寫前,對key-value排序,采用的排序器,使用快速排序,只排索引
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
// 存放key-value
kvbuffer = new byte[maxMemUsage];
bufvoid = kvbuffer.length;
// 存儲key-value的屬性信息,分區(qū)號,索引等
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper);
bufferRemaining = softLimit;
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization
// 獲取快速排序的Key的比較器,排序只按照key進行排序!
comparator = job.getOutputKeyComparator();
// 獲取key-value的序列化器
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// output counters
mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
// 溢寫到磁盤,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
// 獲取Combiner組件
// combiner
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
spillInProgress = false;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
// 設(shè)置溢寫線程在后臺運行,溢寫是在后臺運行另外一個溢寫線程!和收集是兩個線程!
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
// 啟動線程
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}
4. Paritionner的獲取
從配置中讀取mapreduce.job.partitioner.class,如果沒有指定,采用HashPartitioner.class
如果reduceTask > 1, 還沒有設(shè)置分區(qū)組件,使用HashPartitioner
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. **/
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
分區(qū)號的限制:0 <= 分區(qū)號 < 總的分區(qū)數(shù)(reduceTask的個數(shù))
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
5.MapTask shuffle的流程
①在map()調(diào)用context.write()
②調(diào)用MapoutPutBuffer的collect()
- 調(diào)用分區(qū)組件Partitionner計算當(dāng)前這組key-value的分區(qū)號
③將當(dāng)前key-value收集到MapOutPutBuffer中
- 如果超過溢寫的閥值,在后臺啟動溢寫線程,來進行溢寫!
④溢寫前,先根據(jù)分區(qū)號,將相同分區(qū)號的key-value,采用快速排序算法,進行排序!
- 排序并不在內(nèi)存中移動key-value,而是記錄排序后key-value的有序索引!
⑤ 開始溢寫,按照排序后有序的索引,將文件寫入到一個臨時的溢寫文件中
- 如果沒有定義Combiner,直接溢寫!
- 如果定義了Combiner,使用CombinerRunner.conbine()對key-value處理后再次溢寫!
⑥多次溢寫后,每次溢寫都會產(chǎn)生一個臨時文件
⑦最后,執(zhí)行一次flush(),將剩余的key-value進行溢寫
⑧MergeParts: 將多次溢寫的結(jié)果,保存為一個總的文件!
- 在合并為一個總的文件前,會執(zhí)行歸并排序,保證合并后的文件,各個分區(qū)也是有序的!
- 如果定義了Conbiner,Conbiner會再次運行(前提是溢寫的文件個數(shù)大于3)!
- 否則,就直接溢寫!
⑨最終保證生成一個最終的文件,這個文件根據(jù)總區(qū)號,分為若干部分,每個部分的key-value都已經(jīng)排好序,等待ReduceTask來拷貝相應(yīng)分區(qū)的數(shù)據(jù)
6. Combiner
combiner其實就是Reducer類型:
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
Combiner的運行時機:
MapTask:
- ①每次溢寫前,如果指定了Combiner,會運行
- ②將多個溢寫片段,進行合并為一個最終的文件時,也會運行Combiner,前提是片段數(shù)>=3
ReduceTask:
③reduceTask在運行時,需要啟動shuffle進程拷貝MapTask產(chǎn)生的數(shù)據(jù)!
- 數(shù)據(jù)在copy后,進入shuffle工作的內(nèi)存,在內(nèi)存中進行merge和sort!
- 數(shù)據(jù)過多,內(nèi)部不夠,將部分數(shù)據(jù)溢寫在磁盤!
- 如果有溢寫的過程,那么combiner會再次運行!
①一定會運行,②,③需要條件!
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。如果你想了解更多相關(guān)內(nèi)容請查看下面相關(guān)鏈接
相關(guān)文章
SpringBoot如何使用mail實現(xiàn)登錄郵箱驗證
在實際的開發(fā)當(dāng)中,不少的場景中需要我們使用更加安全的認證方式,同時也為了防止一些用戶惡意注冊,我們可能會需要用戶使用一些可以證明個人身份的注冊方式,如短信驗證、郵箱驗證等,這篇文章主要介紹了SpringBoot如何使用mail實現(xiàn)登錄郵箱驗證,需要的朋友可以參考下2024-06-06
java fastJson轉(zhuǎn)JSON兩種常見的轉(zhuǎn)義操作
在實際開發(fā)中,我們有時需要將特殊字符進行轉(zhuǎn)義,本文主要介紹了java fastJson轉(zhuǎn)JSON兩種常見的轉(zhuǎn)義操作,具有一定的參考價值,感興趣的可以了解一下2024-03-03
關(guān)于maven環(huán)境的安裝及maven集成idea環(huán)境的問題
Maven 是一個基于 Java 的工具,所以要做的第一件事情就是安裝 JDK。本文重點給大家介紹關(guān)于maven環(huán)境的安裝及和idea環(huán)境的集成問題,感興趣的朋友一起看看吧2021-09-09
詳解Java異常處理中throw與throws關(guān)鍵字的用法區(qū)別
這篇文章主要介紹了詳解Java異常處理中throw與throws關(guān)鍵字的用法區(qū)別,這也是Java面試題目中的常客,需要的朋友可以參考下2015-11-11
Spring關(guān)于@Configuration配置處理流程
這篇文章主要介紹了Spring關(guān)于@Configuration配置處理流程,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06
Java 如何讀取Excel格式xls、xlsx數(shù)據(jù)工具類
這篇文章主要介紹了Java 如何讀取Excel格式xls、xlsx數(shù)據(jù)工具類的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
淺談為什么阿里巴巴要禁用Executors創(chuàng)建線程池
這篇文章主要介紹了淺談為什么阿里巴巴要禁用Executors創(chuàng)建線程池,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02

