Hadoop streaming詳細介紹
Hadoop streaming
Hadoop為MapReduce提供了不同的API,可以方便我們使用不同的編程語言來使用MapReduce框架,而不是只局限于Java。這里要介紹的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作為我們mapreduce程序和MapReduce框架之間的接口。所以你可以用任何語言來編寫MapReduce程序,只要該語言可以往standard input/output上進行讀寫。
streamming是天然適用于文字處理的(text processing),當然,也僅適用純文本的處理,對于需要對象和序列化的場景,hadoop streaming無能為力。它力圖使我們能夠快捷的通過各種腳本語言,快速的處理大量的文本文件。以下是steaming的一些特點:
- Map函數(shù)的輸入是通過stand input一行一行的接收數(shù)據(jù)的。(不像Java API,通過InputFormat類做預處理,使得Map函數(shù)的輸入是有Key和value的)
- Map函數(shù)的output則必須限定為key-value pair,key和value之間用\t分開。(MapReduce框架在處理intermediate的Map輸出時,必須做sort和partition,即shuffle)
- Reduce函數(shù)的input是Map函數(shù)的output也是key-value pair,key和value之間用\t分開。
常用的Streaming編程語言:
- bash shell
- ruby
- python
Ruby
下面是一個Ruby編寫的MapReduce程序的示例:
map
max_temperature_map.rb:
ruby
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
- 從標準輸入讀入一行data。
- 處理數(shù)據(jù)之后,生成一個鍵值對,用\t分隔,輸出到標準輸出
reduce
max_temperature_reduce.rb:
ruby
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
- 從標準輸入讀入一行數(shù)據(jù)
- 數(shù)據(jù)是用\t分隔的鍵值對
- 數(shù)據(jù)是被MapReduce根據(jù)key排序之后順序一行一行讀入
- reduce函數(shù)對數(shù)據(jù)進行處理,并輸出,輸出仍是用\t分隔的鍵值對
運行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/sample.txt \ -output output \ -mapper ch02/src/main/ruby/max_temperature_map.rb \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb
- hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar指明了使用hadoop streaming
- hadoop-*-streaming.jar會將input里的文件,一行一行的輸出到標準輸出。
- 用-mapper指定Map函數(shù)。類似于通過管道將數(shù)據(jù)傳給rb文件: data|ch02/src/main/ruby/max_temperature_map.rb
- -reducer指定Reduce函數(shù)。
Python
Map
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
Reduce
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
運行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/sample.txt \ -output output \ -mapper ch02/src/main/ruby/max_temperature_map.py\ -reducer ch02/src/main/ruby/max_temperature_reduce.py
Bash shell
Map
#!/usr/bin/env bash # NLineInputFormat gives a single line: key is offset, value is S3 URI read offset s3file # Retrieve file from S3 to local disk echo "reporter:status:Retrieving $s3file" >&2 $HADOOP_INSTALL/bin/hadoop fs -get $s3file . # Un-bzip and un-tar the local file target=`basename $s3file .tar.bz2` mkdir -p $target echo "reporter:status:Un-tarring $s3file to $target" >&2 tar jxf `basename $s3file` -C $target # Un-gzip each station file and concat into one file echo "reporter:status:Un-gzipping $target" >&2 for file in $target/*/* do gunzip -c $file >> $target.all echo "reporter:status:Processed $file" >&2 done # Put gzipped version into HDFS echo "reporter:status:Gzipping $target and putting in HDFS" >&2 gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz
運行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -D mapred.reduce.tasks=0 \ -D mapred.map.tasks.speculative.execution=false \ -D mapred.task.timeout=12000000 \ -input ncdc_files.txt \ -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \ -output output \ -mapper load_ncdc_map.sh \ -file load_ncdc_map.sh
- 這里的-D mapred.reduce.tasks=0將reduce task觀掉,因此也不需要設置-reducer
- 只使用Mapper,可以通過MapReduce幫助我們并行的完成一些平時只能串行的shell腳本
- 注意這里的-file,在集群模式下,需要并行運行時,需要-file把文件傳輸?shù)狡渌?jié)點
Combiner
在streaming模式下,仍然可以運行Combiner,兩種方法:
- 通過Java編寫一個combiner的函數(shù),并使用-combiner option
- 以命令行的管道模式完成combiner的任務
這里具體解釋第二種方法:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/all \ -output output \ -mapper "ch02/src/main/ruby/max_temperature_map.rb | sort | ch02/src/main/ruby/max_temperature_reduce.rb" \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb \ -file ch02/src/main/ruby/max_temperature_map.rb \ -file ch02/src/main/ruby/max_temperature_reduce.rb
注意看-mapper這一行,通關管道的方式,把mapper的臨時輸出文件(intermediate file,Map完成后的臨時文件)作為輸入,送到sort進行排序,然后送到reduce腳本,來完成類似于combiner的工作。這時候的輸出才真正的作為shuffle的輸入,被分組并在網(wǎng)絡上發(fā)送到Reduce
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
相關文章
linux后臺執(zhí)行命令&和nohup的具體使用方法
這篇文章主要介紹了linux后臺執(zhí)行命令&和nohup的具體使用方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-09-09
在Linux中為現(xiàn)有用戶創(chuàng)建主目錄:useradd問題
這篇文章主要介紹了在Linux中為現(xiàn)有用戶創(chuàng)建主目錄:useradd問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04

