MapReduceÓëHDFS¼ò½é
ʲôÊÇHadoop£¿
GoogleΪ×Ô¼ºµÄÒµÎñÐèÒªÌá³öÁ˱à³ÌÄ£ÐÍMapReduceºÍ·Ö²¼Ê½ÎļþϵͳGoogle File System£¬²¢·¢²¼ÁËÏà¹ØÂÛÎÄ£¨¿ÉÔÚGoogle
ResearchµÄÍøÕ¾ÉÏ»ñµÃ£º GFS ¡¢ MapReduce£©¡£ Doug CuttingºÍMike
CafarellaÔÚ¿ª·¢ËÑË÷ÒýÇæNutchʱ¶ÔÕâÁ½ÆªÂÛÎÄ×öÁË×Ô¼ºµÄʵÏÖ£¬¼´Í¬ÃûµÄMapReduceºÍHDFS£¬ºÏÆðÀ´¾ÍÊÇHadoop¡£
MapReduceµÄData flowÈçÏÂͼ£¬ÔʼÊý¾Ý¾¹ýmapper´¦Àí£¬ÔÙ½øÐÐpartitionºÍsort£¬µ½´ïreducer£¬Êä³ö×îºó½á¹û¡£

ͼƬÀ´×ÔHadoop: The Definitive
Guide
Hadoop StreamingÔÀí
Hadoop±¾ÉíÊÇÓÃJava¿ª·¢µÄ£¬³ÌÐòÒ²ÐèÒªÓÃJava±àд£¬µ«ÊÇͨ¹ýHadoop Streaming£¬ÎÒÃÇ¿ÉÒÔʹÓÃÈÎÒâÓïÑÔÀ´±àд³ÌÐò£¬ÈÃHadoopÔËÐС£
Hadoop StreamingµÄÏà¹ØÔ´´úÂë¿ÉÒÔÔÚHadoopµÄGithub
repo ²é¿´¡£¼òµ¥À´Ëµ£¬¾ÍÊÇͨ¹ý½«ÓÃÆäËûÓïÑÔ±àдµÄmapperºÍreducerͨ¹ý²ÎÊý´«¸øÒ»¸öÊÂÏÈдºÃµÄJava³ÌÐò£¨Hadoop×Ô´øµÄ*-streaming.jar£©£¬Õâ¸öJava³ÌÐò»á¸ºÔð´´½¨MR×÷Òµ£¬Áí¿ªÒ»¸ö½ø³ÌÀ´ÔËÐÐmapper£¬½«µÃµ½µÄÊäÈëͨ¹ýstdin´«¸øËü£¬ÔÙ½«mapper´¦ÀíºóÊä³öµ½stdoutµÄÊý¾Ý½»¸øHadoop£¬partitionºÍsortÖ®ºó£¬ÔÙÁí¿ª½ø³ÌÔËÐÐreducer£¬Í¬ÑùµØÍ¨¹ýstdin/stdoutµÃµ½×îÖÕ½á¹û¡£Òò´Ë£¬ÎÒÃÇÖ»ÐèÒªÔÚÆäËûÓïÑÔ±àдµÄ³ÌÐòÀͨ¹ýstdin½ÓÊÕÊý¾Ý£¬ÔÙ½«´¦Àí¹ýµÄÊý¾ÝÊä³öµ½stdout£¬Hadoop
streaming¾ÍÄÜͨ¹ýÕâ¸öJavaµÄwrapper°ïÎÒÃǽâ¾öÖм䷱ËöµÄ²½Ö裬ÔËÐзֲ¼Ê½³ÌÐò¡£

ͼƬÀ´×ÔHadoop: The Definitive
Guide
ÔÀíÉÏÖ»ÒªÊÇÄܹ»´¦ÀístdioµÄÓïÑÔ¶¼ÄÜÓÃÀ´Ð´mapperºÍreducer£¬Ò²¿ÉÒÔÖ¸¶¨mapper»òreducerΪLinuxϵijÌÐò£¨Èçawk¡¢grep¡¢cat£©»òÕß°´ÕÕÒ»¶¨¸ñʽдºÃµÄjava
class¡£Òò´Ë£¬mapperºÍreducerÒ²²»±ØÊÇͬһÀàµÄ³ÌÐò¡£
Hadoop StreamingµÄÓÅȱµã
Óŵã
¿ÉÒÔʹÓÃ×Ô¼ºÏ²»¶µÄÓïÑÔÀ´±àдMapReduce³ÌÐò£¨»»¾ä»°Ëµ£¬²»±ØÐ´Java
XD£©
²»ÐèÒªÏñдJavaµÄMR³ÌÐòÄÇÑùimportÒ»´ó¶Ñ¿â£¬ÔÚ´úÂëÀï×öÒ»´ó¶ÑÅäÖ㬺ܶණÎ÷¶¼³éÏóµ½ÁËstdioÉÏ£¬´úÂëÁ¿ÏÔÖø¼õÉÙ
ÒòΪûÓпâµÄÒÀÀµ£¬µ÷ÊÔ·½±ã£¬²¢ÇÒ¿ÉÒÔÍÑÀëHadoopÏÈÔÚ±¾µØÓùܵÀÄ£Äâµ÷ÊÔ
ȱµã
Ö»ÄÜͨ¹ýÃüÁîÐвÎÊýÀ´¿ØÖÆMapReduce¿ò¼Ü£¬²»ÏñJavaµÄ³ÌÐòÄÇÑù¿ÉÒÔÔÚ´úÂëÀïʹÓÃAPI£¬¿ØÖÆÁ¦±È½ÏÈõ£¬ÓÐЩ¶«Î÷±Þ³¤Äª¼°
ÒòΪÖмä¸ô×ÅÒ»²ã´¦Àí£¬Ð§ÂÊ»á±È½ÏÂý
ËùÒÔHadoop Streaming±È½ÏÊʺÏ×öһЩ¼òµ¥µÄÈÎÎñ£¬±ÈÈçÓÃpythonдֻÓÐÒ»Á½°ÙÐеĽű¾¡£Èç¹ûÏîÄ¿±È½Ï¸´ÔÓ£¬»òÕßÐèÒª½øÐбȽÏϸÖµÄÓÅ»¯£¬Ê¹ÓÃStreaming¾ÍÈÝÒ׳öÏÖÒ»Ð©ÊøÊÖÊø½ÅµÄµØ·½¡£
ÓÃpython±àд¼òµ¥µÄHadoop Streaming³ÌÐò
ÕâÀïÌṩÁ½¸öÀý×Ó£º
1.Michael NollµÄword count³ÌÐò
2.Hadoop: The Definitive GuideÀïµÄÀý³Ì
ʹÓÃpython±àдHadoop Streaming³ÌÐòÓм¸µãÐèҪעÒ⣺
1.ÔÚÄÜʹÓÃiteratorµÄÇé¿öÏ£¬¾¡Á¿Ê¹ÓÃiterator£¬±ÜÃ⽫stdinµÄÊäÈë´óÁ¿´¢´æÔÚÄÚ´æÀ·ñÔò»áÑÏÖØ½µµÍÐÔÄÜ
2.streaming²»»á°ïÄã·Ö¸îkeyºÍvalue´«½øÀ´£¬´«½øÀ´µÄÖ»ÊÇÒ»¸ö¸ö×Ö·û´®¶øÒÑ£¬ÐèÒªÄã×Ô¼ºÔÚ´úÂëÀïÊÖ¶¯µ÷ÓÃsplit()
3.´ÓstdinµÃµ½µÄÿһÐÐÊý¾ÝÄ©Î²ËÆºõ»áÓÐ\n£¬±£ÏÕÆð¼ûÒ»°ã¶¼ÐèҪʹÓÃrstrip()À´È¥µô
4.ÔÚÏë»ñµÃK-V list¶ø²»ÊÇÒ»¸ö¸ö´¦Àíkey-value pairʱ£¬¿ÉÒÔʹÓÃgroupbyÅäºÏitemgetter½«keyÏàͬµÄk-v
pair×é³ÉÒ»¸ö¸ögroup£¬µÃµ½ÀàËÆJava±àдµÄreduce¿ÉÒÔÖ±½Ó»ñȡһ¸öTextÀàÐ͵ÄkeyºÍÒ»¸öiterable×÷ΪvalueµÄЧ¹û¡£×¢ÒâitemgetterµÄЧÂʱÈlambda±í´ïʽҪ¸ß£¬ËùÒÔÈç¹ûÐèÇó²»ÊǺܸ´Ôӵϰ£¬¾¡Á¿ÓÃitemgetter±È½ÏºÃ¡£
ÎÒÔÚ±àдHadoop Streaming³ÌÐòʱµÄ»ù±¾Ä£°æÊÇ
#!/usr/bin/env python # -*- coding: utf-8 -*- """ Some description here... """
import sys
from operator import itemgetter
from itertools import groupby
def read_input(file):
"""Read input and split."""
for line in file:
yield line.rstrip().split('\t')
def main():
data = read_input(sys.stdin)
for key, kviter in groupby(data, itemgetter(0)):
# some code here..
if __name__ == "__main__":
main()
|
Èç¹û¶ÔÊäÈëÊä³ö¸ñʽÓв»Í¬ÓÚĬÈϵĿØÖÆ£¬Ö÷Òª»áÔÚread_input()Àïµ÷Õû¡£
±¾µØµ÷ÊÔ
±¾µØµ÷ÊÔÓÃÓÚHadoop StreamingµÄpython³ÌÐòµÄ»ù±¾Ä£Ê½ÊÇ£º
$ cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path> |
»òÕßÈç¹û²»ÏëÓöàÓàµÄcat£¬Ò²¿ÉÒÔÓÃ<¶¨Ïò
$ python <path to mapper script> < <input path> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path> |
ÕâÀïÓм¸µãÐèҪעÒ⣺
1.HadoopĬÈϰ´ÕÕtabÀ´·Ö¸îkeyºÍvalue£¬ÒÔµÚÒ»¸ö·Ö¸î³öµÄ²¿·ÖΪkey£¬°´key½øÐÐÅÅÐò£¬Òò´ËÕâÀïʹÓÃ
À´Ä£Äâ¡£Èç¹ûÄãÓÐÆäËûÐèÇó£¬ÔÚ½»¸øHadoop StreamingÖ´ÐÐʱ¿ÉÒÔͨ¹ýÃüÁîÐвÎÊýµ÷£¬±¾µØµ÷ÊÔÒ²¿ÉÒÔ½øÐÐÏàÓ¦µÄµ÷Õû£¬Ö÷ÒªÊǵ÷ÕûsortµÄ²ÎÊý¡£Òò´ËΪÁËÄܹ»ÊìÁ·½øÐб¾µØµ÷ÊÔ£¬½¨ÒéÏÈÕÆÎÕsortÃüÁîµÄÓ÷¨¡£
2.Èç¹ûÄãÔÚpython½Å±¾Àï¼ÓÉÏÁËshebang£¬²¢ÇÒΪËüÃÇÌí¼ÓÁËÖ´ÐÐȨÏÞ£¬Ò²¿ÉÒÔÓÃÀàËÆÓÚ
À´´úÌæ
¶þ¡¢ÔÚ¼¯ÈºÉÏÔËÐÐÓë¼à¿Ø
ΪÁË·½±ã£¬ÕâÆªÎÄÕÂÀïµÄÀý×Ó¾ùΪα·Ö²¼Ê½ÔËÐУ¬Ò»°ãÀ´ËµÖ»Òª¼¯ÈºÅäÖõõ±£¬ÔÚα·Ö²¼Ê½ÏÂÄܹ»ÔËÐеijÌÐò£¬ÔÚÕæÊµ¼¯ÈºÉÏÒ²²»»áÓÐʲôÎÊÌâ¡£
ΪÁ˸üºÃµØÄ£Ä⼯Ⱥ»·¾³£¬ÎÒÃÇ¿ÉÒÔÔÚmapred-site.xmlÖÐÔöÉèreducerºÍmapperµÄ×î´óÊýÄ¿£¨Ä¬ÈÏΪ2£¬Êµ¼Ê¿ÉÓÃÊýÄ¿´óÔ¼ÊÇCPUºËÊý-1£©¡£
¼ÙÉèÄãΪHadoop°²×°Â·¾¶Ìí¼ÓµÄ»·¾³±äÁ¿½Ð$HADOOP_HOME£¨Èç¹ûÊÇ$HADOOP_PREFIX£¬ÏÂÎÄ¿´µ½µÄÃüÁî¶ÔÓ¦¸Ä¸Ä¾ÍÐУ©
$ vi $HADOOP_HOME/conf/mapred-site.xml
|
¼ÙÉèÕą̂»ú×ÓµÄCPUÊÇ4ºË£¬ÄÇô¿ÉÒÔÌí¼ÓÏÂÃæÕ⼸ÐÐ
<property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>3</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>3</value> </property>
|
ÕâÀïÐÞ¸ÄÁËÒÔºóÖ»ÊÇÔö¼ÓÁËslotµÄÊýÁ¿£¬Èç¹ûÄãÔÚÖ´ÐÐMR×÷ҵʱûÓÐÖ¸Ã÷ÐèÒª¶àÉÙmapper»òreducer£¬²»Ò»¶¨»áÓõ½Õâô¶à£¬Hadoop»á×ÔÐзÖÅä¡£
²é¿´Îĵµ
Ê×ÏÈÐèÒªÖªµÀÓÃÓÚstreamingµÄjava³ÌÐòÔÚÄÄÀï¡£ÔÚ1.0.xµÄ°æ±¾ÖУ¬Ó¦¸Ã¶¼ÔÚ$HADOOP_HOME/contrib/streaming/Ï¡£±ÈÈç1.0.4µÄ¾ÍÔÚ
$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar
|
Àï¡£ Ê×ÏÈÎÒÃÇ¿ÉÒÔÏÈ¿´¿´Õâ¸öjava³ÌÐò×Ô´øµÄÎĵµ¡£ÒÔÏÂÒÔ1.0.4°æ±¾ÎªÀý£¬Ö´ÐÐ
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info
|
¾Í»á¿´µ½Ò»ÏµÁÐ×Ô´øµÄ°ïÖú£¬´øÓи÷ÖÖ²ÎÊýµÄ˵Ã÷ºÍһЩʹÓÃÑùÀý¡£
ÔËÐÐÃüÁî
ÓÃHadoop StreamingÖ´ÐÐpython³ÌÐòµÄÒ»°ã²½ÖèÊÇ£º
1.½«ÊäÈëÎļþ·Åµ½HDFSÉÏ£¬½¨ÒéʹÓÃcopyFromLocal¶ø²»ÊÇputÃüÁ²Î¼ûDifference
between hadoop fs -put and hadoop fs -copyFromLocal
1.Ò»°ã¿ÉÒÔн¨Ò»¸öÎļþ¼ÐÓÃÓÚ´æ·ÅÊäÈëÎļþ£¬¼ÙÉè½Ðinput
È»ºóÓÃ
²é¿´Ä¿Â¼£¬¿ÉÒÔ¿´µ½³öÏÖÁËÒ»¸ö/user/hadoop/inputÎļþ¼Ð¡£/user/hadoopÊÇĬÈϵÄÓû§Îļþ¼Ð£¬Ï൱ÓÚ±¾µØÎļþϵͳÖеÄ/home/hadoop¡£
2.ÔÙʹÓÃ
$ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(S)> input/
|
½«±¾µØÎļþ·Åµ½inputÎļþ¼ÐÏ¡£copyFromLocalÃüÁîµÄÓ÷¨ÀàËÆÓÚLinuxµÄcpÃüÁ֧³ÖʹÓÃwildcard¡£Èç¹û³öÏÖÁËÔ¤ÆÚÍâµÄÖ´Ðнá¹û£¬¿ÉÒÔÊÔÊÔ¿´ÔÚʹÓÃÁËwildcardµÄ·¾¶Íâ¼ÓÉÏÒýºÅ¡£
2.¿ªÊ¼MR×÷Òµ£¬ÒÔ1.0.4°æ±¾ÎªÀý£¬¼ÙÉèÄãÏÖÔÚÕýÔÚ·ÅÓÐmapperºÍreducerÁ½¸ö½Å±¾µÄĿ¼Ï£¬¶øÇÒËüÃǸպþͽÐmapper.pyºÍreducer.py£¬ÔÚ²»ÐèÒª×öÆäËûÅäÖõÄÇé¿öÏ£¬Ö´ÐÐ
$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \ -mapper mapper.py -file mapper.py\ -reducer reducer.py -file reducer.py \ -input input/* -output output |
µÚÒ»ÐÐÊǸæËßHadoopÔËÐÐstreamingµÄjava³ÌÐò£¬½ÓÏÂÀ´µÄÊDzÎÊý£º
ÕâÀïµÄmapper.pyºÍreducer.pyÊÇmapperËùÔÚpython³ÌÐòµÄ·¾¶¡£ÎªÁËÈÃHadoop½«³ÌÐò·Ö·¢¸øÆäËû»úÆ÷£¬ÐèÒªÔÙ¼ÓÒ»¸ö-file²ÎÊýÓÃÓÚÖ¸Ã÷Òª·Ö·¢µÄ³ÌÐò·ÅÔÚÄÄÀï¡£
×¢ÒâÕâÑùдµÄǰÌáÊÇÕâ¸öpython³ÌÐòÀïÓÐshebang¶øÇÒÌí¼ÓÁËÖ´ÐÐȨÏÞ¡£Èç¹ûûÓеϰ£¬¿ÉÒԸijÉ
-mapper 'python mapper.py' |
¼ÓÉϽâÊÍÆ÷ÃüÁÓÃÒýºÅÀ¨×¡¡£ÒòΪ׼ȷÀ´Ëµ£¬mapperºóÃæ¸úµÄÆäʵӦ¸ÃÊÇÒ»¸öÃüÁî¶ø²»ÊÇÒ»¸öÎļþÃû¡£
¼ÙÈçÄãÖ´ÐеijÌÐò²»·ÅÔÚµ±Ç°Ä¿Â¼Ï£¬±ÈÈç˵ÔÚµ±Ç°Ä¿Â¼µÄsrcÎļþ¼ÐÏ£¬¿ÉÒÔÕâÑùд
-mapper mapper.py -file src/mapper.py\ -reducer reducer.py -file src/reducer.py \
|
Ò²¾ÍÊÇ˵£¬-mapperºÍ-reducerºóÃæ¸úµÄÎļþÃû²»ÐèÒª´øÉÏ·¾¶£¬¶ø-fileºóµÄ²ÎÊýÔòÐèÒª¡£×¢ÒâÈç¹ûÄãÔÚmapperºóµÄÃüÁîÓÃÁËÒýºÅ£¬¼ÓÉÏ·¾¶Ãû·´¶ø»á±¨´í˵ÕÒ²»µ½Õâ¸ö³ÌÐò¡£
-inputºÍ-outputºóÃæ¸úµÄÊÇHDFSÉϵÄ·¾¶Ãû£¬Í¬ÑùÖ§³Öwildcard£¬ÕâÀïµÄinput/*Ö¸µÄ¾ÍÊÇ¡°inputÎļþ¼ÐϵÄËùÓÐÎļþ¡±¡£×¢Òâ-outputºóÃæ¸ú×ŵÄÐèÒªÊÇÒ»¸ö²»´æÔÚÓÚHDFSÉϵÄ·¾¶£¬ÔÚ²úÉúÊä³öµÄʱºòhadoop»á°ïÄã´´½¨Õâ¸öÎļþ¼Ð£¬Èç¹ûÒѾ´æÔڵϰ¾Í»á²úÉú³åÍ»¡£
ÓÐʱºòshebang²»Ò»¶¨ÄÜÓã¬ÓÈÆäÊÇÔÚÖ´Ðл·¾³±È½Ï¸´ÔÓµÄʱºò¡£×î±£ÏÕµÄд·¨¿ÉÄÜÊÇ£º
ÕâÀïÓеãÒªÌØ±ð˵Ã÷µÄµØ·½$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \ -mapper 'python mapper.py' -file mapper.py\ -reducer 'python reducer.py' -file reducer.py \ -input input/* -output output
|
ÕâÑùд»¹ÓÐÒ»¸öºÃ´¦£¬¾ÍÊÇ¿ÉÒÔÔÚÒýºÅÀïдÉÏÌṩ¸øpython³ÌÐòµÄÃüÁîÐвÎÊý£¬ÒÔºóµÄ½Ì³Ì»áÌáµ½ÔõôÓá£
ÓÉÓÚmapperºÍreducer²ÎÊý¸úµÄʵ¼ÊÉÏÊÇÃüÁËùÒÔÈç¹ûÿ̨»úÆ÷ÉÏpythonµÄ»·¾³ÅäÖò»Ò»ÑùµÄ»°£¬»áÓÃÿ̨»úÆ÷×Ô¼ºµÄÅäÖÃÈ¥Ö´ÐÐpython³ÌÐò¡£
ÔËÐйý³Ì
дÍêÃüÁî»Ø³µ£¬Ë³ÀûµÄ»°»á¿ªÊ¼Ö´ÐгÌÐò¡£ ÕâÀﲻ׸ÊöÖ´ÐÐʱÊä³öµ½Öն˵ÄÄÚÈÝ£¬¿ÉÒÔÈ¥ÕâÀï¿´¿´Õý³£ÔËÐеÄʱºò»á¸øÐ©Ê²Ã´¡£
ÀûÓÃWebUI¼à¿Ø¼¯ÈºÔËÐÐÇé¿ö
Ò»°ãÀ´ËµÒª¼ì²éÔËÐÐ×´¿ö£¬¶¼ÊÇÈ¥jobtrackerµÄwebUI¡£Èç¹ûÔÚmasterÉÏ£¬ÓÃä¯ÀÀÆ÷·ÃÎÊhttp://localhost:50030¼´¿É£¨Èç¹ûÄãÔÚÅäÖÃhadoopµÄʱºòÐÞ¸ÄÁËmapred-site.xmlµÄmapred.job.tracker.http.address£¬Çë·ÃÎʶÔÓ¦µÄÆäËûµØÖ·£©
ÔÚwebUIÀïÄã¿ÉÒÔ¿´µ½running jobs, completed jobsºÍretired jobs¡£µã»÷Jobidϵij¬Á´½Ó£¬¿ÉÒÔ¿´µ½¶ÔÓ¦jobµÄÖ´ÐÐ×´¿ö¡£½øÈ¥ºóÈç¹û¿´µ½Failed/Killed
Task AttemptsÏ·ǿգ¬Äã¿ÉÒÔµã½ø¶ÔÓ¦µÄ³¬Á´½Ó£¬ÕÒµ½¶ÔÓ¦µÄlogÈ¥½øÐÐdebug¡£
µÃµ½½á¹û
³É¹¦Ö´ÐÐÍêÕâ¸öÈÎÎñÖ®ºó£¬ÄãÓÃoutput²ÎÊýÔÚHDFSÉÏÖ¸¶¨µÄÊä³öÎļþ¼ÐÀï¾Í»á¶à³ö¼¸¸öÎļþ
Ò»¸ö¿Õ°×Îļþ_SUCCESS£¬±íÃ÷jobÔËÐгɹ¦£¬Õâ¸öÎļþ¿ÉÒÔÈÃÆäËû³ÌÐòÖ»Òª²é¿´Ò»ÏÂHDFS¾ÍÄÜÅжÏÕâ´ÎjobÊÇ·ñ³É¹¦ÔËÐУ¬´Ó¶ø½øÐÐÏà¹Ø´¦Àí¡£
Ò»¸ö_logsÎļþ¼Ð£¬¹ËÃû˼ÒåÀïÃæ·Å×ÅÈÎÎñÈÕÖ¾
part-00000, .... part-xxxxxÎļþ£¬ÓжàÉÙ¸öreducerºóÃæµÄÊý×־ͻáÓжà´ó£¬¶ÔӦÿ¸öreducerµÄÊä³ö½á¹û¡£
¼ÙÈçÄãµÄÊä³öºÜÉÙ£¬±ÈÈçÊÇÒ»¸öÖ»Óм¸ÐеļÆÊý£¬Äã¿ÉÒÔÓÃ
$ hadoop fs -cat <PATH ON HDFS>
|
Ö±½Ó½«Êä³ö´òÓ¡µ½Öն˲鿴¡£
¼ÙÈçÄãµÄÊä³öºÜ¶à£¬ÔòÐèÒª¿½±´µ½±¾µØÎļþϵͳÀ´²é¿´¡£¿ÉÒÔʹÓÃcopyToLocalÀ´»ñÈ¡Õû¸öÎļþ¼Ð£¨ÓëcopyFromLocalÒ»Ñù£¬ËüÓëgetµÄÇø±ðÔÚÓÚ»áÏÞÖÆÄ¿±êÎļþ¼ÐÒ»¶¨ÔÚ±¾µØÎļþϵͳÉÏ£©¡£Èç¹ûÄã²»ÐèÒª_SUCCESS
ºÍ_logs£¬²¢ÇÒÏëÒª½«ËùÓÐreducerµÄÊä³öºÏ²¢£¬¿ÉÒÔʹÓÃgetmergeÃüÁî¡£
±ÈÈçÔÚÉÏÃæµÄÀý×ÓÀ¿ÉÒÔÓÃÃüÁî
$ hadoop fs -copyToLocal output ./
|
½«outputÎļþ¼Ð¸´ÖƵ½±¾µØÎļþϵͳµÄµ±Ç°Ä¿Â¼Ï£¬»òÕßÓÃ
$ hadoop fs -getmerge output ./
|
½«outputϵÄpart-xxxxxºÏ²¢£¬·Åµ½µ±Ç°Ä¿Â¼µÄÒ»¸ö½ÐoutputµÄÎļþÀï¡£
ÈçºÎ´®Áª¶àÌËMR
Èç¹ûÄãÓжà´ÎÈÎÎñÒªÖ´ÐУ¬ÏÂÒ»²½ÐèÒªÓÃÉÏÒ»²½µÄÈÎÎñ×öÊäÈ룬½â¾ö°ì·¨ÆäʵºÜ¼òµ¥¡£¼ÙÉèÉÏÒ»²½ÔÚHDFSµÄÊä³öÎļþ¼ÐÊÇoutput1£¬ÄÇôÔÚÏÂÒ»²½µÄÔËÐÐÃüÁîÖУ¬Ö¸Ã÷
¼´Ö¸¶¨ÉÏÒ»´ÎµÄËùÓÐÊä³öΪ±¾´ÎÈÎÎñµÄÊäÈë¼´¿É¡£×¢ÒâÕâÀï¼ÙÉèÄã²»ÐèÒª¶ÔÉÏÒ»²½µÄÊä³ö×ö¶îÍâ´¦Àí¡£
ÆäËû
ÕâÆªÎÄÕÂÖ»Ìáµ½ÁË×î¼òµ¥µÄÖ´ÐÐHadoop streaming³ÌÐòµÄ·½·¨¡£Éæ¼°µ½Ò»Ð©ÆäËûÐèÇ󣬱ÈÈçÐèÒªÓжà¸öÊäÈëÎļþµÈÇé¿ö£¬»¹ÐèÒª½øÒ»²½µ÷ÕûÔËÐÐÃüÁ»áÔÚÒÔºóµÄÎÄÕÂÀï½²µ½¡£
Èý¡¢ ×Ô¶¨Ò幦ÄÜ
ʹÓöîÍâµÄÎļþ
¼ÙÈçÄãÅܵÄjob³ýÁËÊäÈëÒÔÍ⻹ÐèҪһЩ¶îÍâµÄÎļþ£¨side data£©£¬ÓÐÁ½ÖÖÑ¡Ôñ£º
1.´óÎļþ
ËùνµÄ´óÎļþ¾ÍÊÇ´óС´óÓÚÉèÖõÄlocal.cache.sizeµÄÎļþ£¬Ä¬ÈÏÊÇ10GB¡£Õâ¸öʱºò¿ÉÒÔÓÃ-fileÀ´·Ö·¢¡£³ý´ËÖ®Íâ´úÂë±¾ÉíÒ²¿ÉÒÔÓÃfileÀ´·Ö·¢¡£
¸ñʽ£º¼ÙÈçÎÒÒª¼Ó¶àÒ»¸ösideData.txt¸øpython½Å±¾Óãº
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input iputDir \ -output outputDir \ -mapper mapper.py \ -file mapper.py \ -reducer reduer.py \ -file reducer.py \ -file sideDate.txt
|
ÔÚpython½Å±¾ÀֻҪ°ÑÕâ¸öÎļþµ±³É×Ô¼ºÍ¬Ò»Ä¿Â¼Ïµı¾µØÎļþÀ´´ò¿ª¾Í¿ÉÒÔÁË¡£±ÈÈ磺
×¢ÒâÕâ¸öfileÊÇÖ»¶ÁµÄ£¬²»¿ÉÒÔд¡£
2.СÎļþ
Èç¹ûÊDZȽÏСµÄÎļþ£¬ÏëÒªÌá¸ß¶ÁдËÙ¶È¿ÉÒÔ½«Ëü·ÅÔÚdistributed cacheÀҲ¾ÍÊÇÿ̨»úÆ÷¶¼ÓÐ×Ô¼ºµÄÒ»·Ýcopy£¬²»ÐèÒªÍøÂçIO¾Í¿ÉÒÔÄõ½Êý¾Ý£©¡£ÕâÀïÒªÓõ½µÄ²ÎÊýÊÇ-cachefile£¬Ð´·¨ºÍÓ÷¨ÉÏÒ»¸öÒ»Ñù£¬¾ÍÊÇ-file¸Ä³É-cachefile¶øÒÑ¡£
¿ØÖÆpartitioner
partitioningÖ¸µÄÊÇÊý¾Ý¾¹ýmapper´¦Àíºó£¬±»·Ö·¢µ½reducerÉϵĹý³Ì¡£partitioner¿ØÖƵ쬾ÍÊÇ¡°ÔõÑùµÄmapperÊä³ö»á±»·Ö·¢µ½ÄÄÒ»¸öreducerÉÏ¡±¡£
HadoopÓм¸¸ö×Ô´øµÄpartitioner£¬½âÊÍ¿ÉÒÔ¿´ÕâÀĬÈϵÄÊÇHashPartitioner£¬Ò²¾ÍÊǰѵÚÒ»¸ötabǰµÄkey×öhashÖ®ºóÓÃÓÚ·ÖÅäpartition¡£Ð´Hadoop
Streaming³ÌÐòÊÇ¿ÉÒÔÑ¡ÔñÆäËûpartitionerµÄ£¬Äã¿ÉÒÔÑ¡Ôñ×Ô´øµÄÆäËû¼¸ÖÖÀïµÄÒ»ÖÖ£¬Ò²¿ÉÒÔ×Ô¼ºÐ´Ò»¸ö¼Ì³ÐPartitionerµÄjavaÀàÈ»ºó±àÒë³Éjar£¬ÔÚÔËÐвÎÊýÀïÖ¸¶¨ÎªÄãÓõÄpartitioner¡£
¹Ù·½×Ô´øµÄpartitionerÀï×î³£ÓõÄÊÇKeyFieldBasedPartitioner£¨Ô´´úÂë¿ÉÒÔ¿´ÕâÀ¡£Ëü»á°´ÕÕkeyµÄÒ»²¿·ÖÀ´×öpartition£¬¶ø²»ÊÇÓÃÕû¸ökeyÀ´×öpartition¡£
ÔÚѧ»áÓÃKeyFieldBasedPartitioner֮ǰ£¬±ØÈ»ÒªÏÈѧÔõô¿ØÖÆkey-valueµÄ·Ö¸î¡£·Ö¸îkeyµÄ²½Öè¿ÉÒÔ·ÖΪÁ½²½£¬ÓÃpythonÀ´ÃèÊöһϴóÔ¼ÊÇ
fields = output.split(seperator) key = fields[:numKeyfields]
|
1.Ñ¡ÔñÓÃʲô·ûºÅÀ´·Ö¸îkey£¬Ò²¾ÍÊÇÑ¡Ôñseperator
map.output.key.field.separator¿ÉÒÔÖ¸¶¨ÓÃÓÚ·Ö¸ôkeyµÄ·ûºÅ¡£±ÈÈçÖ¸¶¨ÎªÒ»µãµÄ»°£¬¾ÍÒª¼ÓÉϲÎÊý
-D stream.map.output.field.separator=.
|
¼ÙÉèÄãµÄmapperÊä³öÊÇ
Õâʱ»áÏÈ¿´×¼[11, 22, 33, 44]ÕâÀïµÄÆäÖÐÒ»¸ö»ò¼¸¸ö×÷Ϊkey
2.Ñ¡ÔñkeyµÄ·¶Î§£¬Ò²¾ÍÊÇÑ¡ÔñnumKeyfields
¿ØÖÆkeyµÄ·¶Î§µÄ²ÎÊýÊÇÕâ¸ö£¬¼ÙÉèÎÒÒªÉèÖñ»·Ö¸î³öµÄǰ2¸öÔªËØÎªkey£º
-D stream.num.map.output.key.fields=2 |
ÄÇôkey¾ÍÊÇÉÏÃæµÄ 1122¡£ÖµµÃ×¢ÒâµÄÊǼÙÈçÕâ¸öÊý×ÖÉèÖõ½¸²¸ÇÕû¸öÊä³ö£¬ÔÚÕâ¸öÀý×ÓÀïÊÇ4µÄ»°£¬ÄÇôÕûÒ»Ðж¼»á±ä³Ékey¡£
ÉÏÃæ·Ö¸î³ökeyÖ®ºó£¬ KeyFieldBasedPartitioner»¹ÐèÒªÖªµÀÄãÏëÒªÓÃkeyÀïµÄÄIJ¿·Ö×÷ΪpartitionµÄÒÀ¾Ý¡£Ëü½øÐÐÅäÖõĹý³Ì¿ÉÒÔ¿´Ô´´úÂëÀ´Àí½â¡£
¼ÙÉèÔÚÉÏÒ»²½ÎÒÃÇͨ¹ýʹÓÃ
-D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \
|
½«11.22.33.44µÄÕû¸ö×Ö·û´®¶¼ÉèÖóÉÁËkey£¬ÏÂÒ»²½¾ÍÊÇÔÚÕâ¸ökeyµÄÄÚ²¿ÔÙ½øÐÐÒ»´Î·Ö¸î¡£map.output.key.field.separator¿ÉÒÔÓÃÀ´ÉèÖõڶþ´Î·Ö¸îÓõķָî·û£¬mapred.text.key.partitioner.options¿ÉÒÔ½ÓÊܲÎÊýÀ´»®·Ö±»·Ö¸î³öÀ´µÄpartition
key£¬±ÈÈ磺
-D map.output.key.field.separator=. \ -D mapred.text.key.partitioner.options=-k1,2 \
|
Ö¸µÄ¾ÍÊÇÔÚkeyµÄÄÚ²¿À½«µÚ1µ½µÚ2¸ö±»µã·Ö¸îµÄÔªËØ×÷Ϊpartition key£¬Õâ¸öÀý×ÓÀïÒ²¾ÍÊÇ1122¡£ÕâÀïµÄÖµ-ki,j±íʾ´Óiµ½j¸öÔªËØ£¨inclusive£©»á×÷Ϊpartition
key¡£Èç¹ûÖÕµãÊ¡ÂÔ²»Ð´£¬Ïñ-kiµÄ»°£¬ÄÇôiºÍiÖ®ºóµÄÔªËØ¶¼»á×÷Ϊpartition key¡£
partition keyÏàͬµÄÊä³ö»á±£Ö¤·Öµ½Í¬Ò»¸öreducerÉÏ£¬Ò²¾ÍÊÇËùÓÐ11.22.xx.xxµÄÊä³ö¶¼»áµ½Í¬Ò»¸öpartitioner£¬11.22»»³ÉÆäËû¸÷ÖÖ×éºÏÒ²ÊÇÒ»Ñù¡£
ʵÀý˵Ã÷һϣ¬¾ÍÊÇÕâÑùµÄ£º
1.mapperµÄÊä³öÊÇ
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2 |
2.Ö¸¶¨Ç°4¸öÔªËØ×ökey£¬keyÀïµÄǰÁ½¸öÔªËØ×öpartition key£¬·Ö³É3¸öpartitionµÄ»°£¬¾Í»á±»·Ö³É
11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3 11.14.2.2
|
3.ÏÂÒ»²½reducer»á¶Ô×Ô¼ºµÃµ½µÄÿ¸öpartitionÄÚ½øÐÐÅÅÐò£¬½á¹û¾ÍÊÇ
11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3 |
ÃüÁî¸ñʽ´óÔ¼¾ÍÊdz¤ÕâÑù
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ -D mapred.text.key.partitioner.options=-k1,2 \ -input inputDir \ -output outputDir \ -mapper mapper.py -file mapper.py \ -reducer reducer.py -file reducer.py \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
|
×¢Òâ-D²ÎÊý·ÅÔÚÇ°Ãæ£¬Ö¸¶¨ÓÃKeyFieldBasedPartitionerµÄ-partitionerÒª·ÅÔÚÏÂÃæ¡£
¿ØÖÆcomparatorÓë×Ô¶¨ÒåÅÅÐò
ÉÏÃæËµµ½mapperµÄÊä³ö±»partitionµ½¸÷¸öreducerÖ®ºó£¬»áÓÐÒ»²½ÅÅÐò¡£Õâ¸öÅÅÐòµÄ±ê×¼Ò²ÊÇ¿ÉÒÔͨ¹ýÉèÖÃcomparator¿ØÖƵġ£ºÍÉÏÃæÒ»Ñù£¬ÒªÏÈÉèÖ÷ָî³ökeyÓõķָô·û¡¢keyµÄ·¶Î§£¬keyÄÚ²¿·Ö¸îÓõķָô·û
-D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ |
ÕâÀïÒª¿ØÖƵľÍÊÇkeyÄÚ²¿µÄÄÄÐ©ÔªËØÓÃÀ´×öÅÅÐòÒÀ¾Ý£¬ÊÇÅÅ×ÖµäÐò»¹ÊÇÊý×ÖÐò£¬µ¹Ðò»¹ÊÇÕýÐò¡£ÓÃÀ´¿ØÖƵIJÎÊýÊÇmapred.text.key.comparator.options£¬½ÓÊܵÄÖµ¸ñʽÀàËÆÓÚunix
sort¡£±ÈÈçÎÒÒª°´µÚ¶þ¸öÔªËØµÄÊý×ÖÐò£¨Ä¬ÈÏ×ÖµäÐò£©+µ¹ÐòÀ´ÅÅÔªËØµÄ»°£¬¾ÍÓÃ-D mapred.text.key.comparator.options=-k2,2nr
n±íʾÊý×ÖÐò£¬r±íʾµ¹Ðò¡£ÕâÑùÒ»À´
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2
|
¾Í»á±»ÅųÉ
11.14.2.3 11.14.2.2 11.12.1.2 11.12.1.1 11.11.4.1
|
|