Ò»¡¢MapReduce¸ÅÊö
Map/ReduceÊÇÒ»¸öÓÃÓÚ´ó¹æÄ£Êý¾Ý´¦ÀíµÄ·Ö²¼Ê½¼ÆËãÄ£ÐÍ£¬Ëü×î³õÊÇÓÉGoogle¹¤³ÌʦÉè¼Æ²¢ÊµÏֵģ¬GoogleÒѾ½«ËüÍêÕûµÄMapReduceÂÛÎĹ«¿ª·¢²¼ÁË¡£ÆäÖжÔËüµÄ¶¨ÒåÊÇ£¬Map/ReduceÊÇÒ»¸ö±à³ÌÄ£ÐÍ£¨programmingmodel£©£¬ÊÇÒ»¸öÓÃÓÚ´¦ÀíºÍÉú³É´ó¹æÄ£Êý¾Ý¼¯£¨processing
and generating large data sets£©µÄÏà¹ØµÄʵÏÖ¡£Óû§¶¨ÒåÒ»¸ömapº¯ÊýÀ´´¦ÀíÒ»¸ökey/value¶ÔÒÔÉú³ÉÒ»ÅúÖмäµÄkey/value¶Ô£¬ÔÙ¶¨ÒåÒ»¸öreduceº¯Êý½«ËùÓÐÕâЩÖмäµÄÓÐ×ÅÏàͬkeyµÄvaluesºÏ²¢ÆðÀ´¡£ºÜ¶àÏÖʵÊÀ½çÖеÄÈÎÎñ¶¼¿ÉÓÃÕâ¸öÄ£ÐÍÀ´±í´ï¡£
¶þ¡¢ MapReduce¹¤×÷ÔÀí
Map-Reduce¿ò¼ÜµÄÔË×÷ÍêÈ«»ùÓÚ<key,value>¶Ô£¬¼´Êý¾ÝµÄÊäÈëÊÇÒ»Åú<key,value>¶Ô£¬Éú³ÉµÄ½á¹ûÒ²ÊÇÒ»Åú<key,value>¶Ô£¬Ö»ÊÇÓÐʱºòËüÃǵÄÀàÐͲ»Ò»Ñù¶øÒÑ¡£KeyºÍvalueµÄÀàÓÉÓÚÐèÒªÖ§³Ö±»ÐòÁл¯£¨serialize£©²Ù×÷£¬ËùÒÔËüÃDZØÐëҪʵÏÖWritable½Ó¿Ú£¬¶øÇÒkeyµÄÀ໹±ØÐëʵÏÖWritableComparable½Ó¿Ú£¬Ê¹µÃ¿ÉÒÔÈÿò¼Ü¶ÔÊý¾Ý¼¯µÄÖ´ÐÐÅÅÐò²Ù×÷¡£
Ò»¸öMap-ReduceÈÎÎñµÄÖ´Ðйý³ÌÒÔ¼°Êý¾ÝÊäÈëÊä³öµÄÀàÐÍÈçÏÂËùʾ£º
Map£º<k1,v1> ->list<k2,v2>
Reduce£º<k2,list<v2>> -><k3,v3>
ÏÂÃæÍ¨¹ýÒ»¸öµÄÀý×ÓÀ´Ïêϸ˵Ã÷Õâ¸ö¹ý³Ì¡£
WordCountÊÇHadoop×Ô´øµÄÒ»¸öÀý×Ó£¬Ä¿±êÊÇͳ¼ÆÎı¾ÎļþÖе¥´ÊµÄ¸öÊý¡£¼ÙÉèÓÐÈçϵÄÁ½¸öÎı¾ÎļþÀ´ÔËÐÐWorkCount³ÌÐò£º
Hello World Bye World
Hello Hadoop GoodBye Hadoop
1 mapÊý¾ÝÊäÈë

2 mapÊä³ö/combineÊäÈë

3 combineÊä³ö

4 reduceÊä³ö

Èý ¡¢MapReduce¿ò¼Ü½á¹¹
1 ½ÇÉ«
1.1 JobTracker
JobTrackerÊÇÒ»¸ömaster·þÎñ£¬ JobTracker¸ºÔðµ÷¶ÈjobµÄÿһ¸ö×ÓÈÎÎñtaskÔËÐÐÓÚTaskTrackerÉÏ£¬²¢¼à¿ØËüÃÇ£¬Èç¹û·¢ÏÖÓÐʧ°ÜµÄtask¾ÍÖØÐÂÔËÐÐËü¡£Ò»°ãÇé¿öÓ¦¸Ã°ÑJobTracker²¿ÊðÔÚµ¥¶ÀµÄ»úÆ÷ÉÏ¡£
1.2 TaskTracker
TaskTrackerÊÇÔËÐÐÓÚ¶à¸ö½ÚµãÉϵÄslaver·þÎñ¡£TaskTrackerÔò¸ºÔðÖ±½ÓÖ´ÐÐÿһ¸ötask¡£TaskTracker¶¼ÐèÒªÔËÐÐÔÚHDFSµÄDataNodeÉÏ£¬
1.3 JobClient
ÿһ¸öjob¶¼»áÔÚÓû§¶Ëͨ¹ýJobClientÀཫӦÓóÌÐòÒÔ¼°ÅäÖòÎÊý´ò°ü³ÉjarÎļþ´æ´¢ÔÚHDFS£¬²¢°Ñ·¾¶Ìá½»µ½JobTracker£¬È»ºóÓÉJobTracker´´½¨Ã¿Ò»¸öTask£¨¼´MapTaskºÍReduceTask£©²¢½«ËüÃÇ·Ö·¢µ½¸÷¸öTaskTracker·þÎñÖÐÈ¥Ö´ÐС£
2 Êý¾Ý½á¹¹
2.1 MapperºÍReducer
ÔËÐÐÓÚHadoopµÄMapReduceÓ¦ÓóÌÐò×î»ù±¾µÄ×é³É²¿·Ö°üÀ¨Ò»¸öMapperºÍÒ»¸öReducerÀ࣬ÒÔ¼°Ò»¸ö´´½¨JobConfµÄÖ´ÐгÌÐò£¬ÔÚһЩӦÓÃÖл¹¿ÉÒÔ°üÀ¨Ò»¸öCombinerÀ࣬Ëüʵ¼ÊÒ²ÊÇReducerµÄʵÏÖ¡£
2.2 JobInProgress
JobClientÌá½»jobºó£¬JobTracker»á´´½¨Ò»¸öJobInProgressÀ´¸ú×ٺ͵÷¶ÈÕâ¸öjob£¬²¢°ÑËüÌí¼Óµ½job¶ÓÁÐÀï¡£JobInProgress»á¸ù¾ÝÌá½»µÄjob
jarÖж¨ÒåµÄÊäÈëÊý¾Ý¼¯£¨ÒÑ·Ö½â³ÉFileSplit£©´´½¨¶ÔÓ¦µÄÒ»ÅúTaskInProgressÓÃÓÚ¼à¿ØºÍµ÷¶ÈMapTask£¬Í¬Ê±ÔÚ´´½¨Ö¸¶¨ÊýÄ¿µÄTaskInProgressÓÃÓÚ¼à¿ØºÍµ÷¶ÈReduceTask£¬È±Ê¡Îª1¸öReduceTask¡£
2.3 TaskInProgress
JobTrackerÆô¶¯ÈÎÎñʱͨ¹ýÿһ¸öTaskInProgressÀ´launchTask£¬Õâʱ»á°ÑTask¶ÔÏ󣨼´MapTaskºÍReduceTask£©ÐòÁл¯Ð´ÈëÏàÓ¦µÄTaskTracker·þÎñÖУ¬TaskTrackerÊÕµ½ºó»á´´½¨¶ÔÓ¦µÄTaskInProgress£¨´ËTaskInProgressʵÏÖ·ÇJobTrackerÖÐʹÓõÄTaskInProgress£¬×÷ÓÃÀàËÆ£©ÓÃÓÚ¼à¿ØºÍµ÷¶È¸ÃTask¡£Æô¶¯¾ßÌåµÄTask½ø³ÌÊÇͨ¹ýTaskInProgress¹ÜÀíµÄTaskRunner¶ÔÏóÀ´ÔËÐеġ£TaskRunner»á×Ô¶¯×°ÔØjobjar£¬²¢ÉèÖúû·¾³±äÁ¿ºóÆô¶¯Ò»¸ö¶ÀÁ¢µÄjava
child½ø³ÌÀ´Ö´ÐÐTask£¬¼´MapTask»òÕßReduceTask£¬µ«ËüÃDz»Ò»¶¨ÔËÐÐÔÚͬһ¸öTaskTrackerÖС£
2.4 MapTaskºÍReduceTask
Ò»¸öÍêÕûµÄjob»á×Ô¶¯ÒÀ´ÎÖ´ÐÐMapper¡¢Combiner£¨ÔÚJobConfÖ¸¶¨ÁËCombinerʱִÐУ©ºÍReducer£¬ÆäÖÐMapperºÍCombinerÊÇÓÉMapTaskµ÷ÓÃÖ´ÐУ¬ReducerÔòÓÉReduceTaskµ÷Óã¬Combinerʵ¼ÊÒ²ÊÇReducer½Ó¿ÚÀàµÄʵÏÖ¡£Mapper»á¸ù¾ÝjobjarÖж¨ÒåµÄÊäÈëÊý¾Ý¼¯°´<key1,value1>¶Ô¶ÁÈ룬´¦ÀíÍê³ÉÉú³ÉÁÙʱµÄ<key2,value2>¶Ô£¬Èç¹û¶¨ÒåÁËCombiner£¬MapTask»áÔÚMapperÍê³Éµ÷ÓøÃCombiner½«ÏàͬkeyµÄÖµ×öºÏ²¢´¦Àí£¬ÒÔ¼õÉÙÊä³ö½á¹û¼¯¡£MapTaskµÄÈÎÎñÈ«Íê³É¼´½»¸øReduceTask½ø³Ìµ÷ÓÃReducer´¦Àí£¬Éú³É×îÖÕ½á¹û<key3,value3>¶Ô¡£Õâ¸ö¹ý³ÌÔÚÏÂÒ»²¿·ÖÔÙÏêϸ½éÉÜ¡£
ÏÂͼÃèÊöÁËMap/Reduce¿ò¼ÜÖÐÖ÷Òª×é³ÉºÍËüÃÇÖ®¼äµÄ¹ØÏµ£º

3 Á÷³Ì
Ò»µÀMapRedcue×÷ÒµÊÇͨ¹ýJobClient.rubJob(job)Ïòmaster½ÚµãµÄJobTrackerÌá½»µÄ,
JobTracker½Óµ½JobClientµÄÇëÇóºó°ÑÆä¼ÓÈë×÷Òµ¶ÓÁÐÖС£JobTrackerÒ»Ö±ÔڵȴýJobClientͨ¹ýRPCÌá½»×÷Òµ,¶øTaskTrackerһֱͨ¹ýRPCÏòJobTracker·¢ËÍÐÄÌøheartbeatѯÎÊÓÐûÓÐÈÎÎñ¿É×ö£¬Èç¹ûÓУ¬ÈÃÆäÅÉ·¢ÈÎÎñ¸øËüÖ´ÐС£Èç¹ûJobTrackerµÄ×÷Òµ¶ÓÁв»Îª¿Õ,
ÔòTaskTracker·¢Ë͵ÄÐÄÌø½«»á»ñµÃJobTracker¸øËüÅÉ·¢µÄÈÎÎñ¡£ÕâÊÇÒ»µÀpull¹ý³Ì¡£slave½ÚµãµÄTaskTracker½Óµ½ÈÎÎñºóÔÚÆä±¾µØ·¢ÆðTask,Ö´ÐÐÈÎÎñ¡£ÒÔÏÂÊǼòÂÔʾÒâͼ£º

ÏÂÃæÏêϸ½éÉÜÒ»ÏÂMap/Reduce´¦ÀíÒ»¸ö¹¤×÷µÄÁ÷³Ì¡£
ËÄ¡¢JobClient
ÔÚ±àдMapReduce³ÌÐòʱͨ³£ÊÇÉÏÊÇÕâÑùдµÄ:
Configuration conf = new Configuration();// ¶ÁÈ¡hadoopÅäÖà Job job = new Job(conf, "×÷ÒµÃû³Æ"); // ʵÀý»¯Ò»µÀ×÷Òµ job.setMapperClass(MapperÀàÐÍ); job.setCombinerClass(CombinerÀàÐÍ); job.setReducerClass(ReducerÀàÐÍ); job.setOutputKeyClass(Êä³öKeyµÄÀàÐÍ); job.setOutputValueClass(Êä³öValueµÄÀàÐÍ); FileInputFormat.addInputPath(job, new Path(ÊäÈëhdfs·¾¶)); FileOutputFormat.setOutputPath(job, newPath(Êä³öhdfs·¾¶)); // ÆäËü³õʼ»¯ÅäÖà JobClient.runJob(job); |
1ÅäÖÃJob
JobConfÊÇÓû§ÃèÊöÒ»¸öjobµÄ½Ó¿Ú¡£ÏÂÃæµÄÐÅÏ¢ÊÇMapReduce¹ý³ÌÖÐһЩ½Ï¹Ø¼üµÄ¶¨ÖÆÐÅÏ¢£º

2 JobClient.runJob()£ºÔËÐÐJob²¢·Ö½âÊäÈëÊý¾Ý¼¯
Ò»¸öMapReduceµÄJob»áͨ¹ýJobClientÀà¸ù¾ÝÓû§ÔÚJobConfÀàÖж¨ÒåµÄInputFormatʵÏÖÀàÀ´½«ÊäÈëµÄÊý¾Ý¼¯·Ö½â³ÉÒ»ÅúСµÄÊý¾Ý¼¯£¬Ã¿Ò»¸öСÊý¾Ý¼¯»á¶ÔÓ¦´´½¨Ò»¸öMapTaskÀ´´¦Àí¡£JobClient»áʹÓÃȱʡµÄFileInputFormatÀàµ÷ÓÃFileInputFormat.getSplits()·½·¨Éú³ÉСÊý¾Ý¼¯£¬Èç¹ûÅжÏÊý¾ÝÎļþÊÇisSplitable()µÄ»°£¬»á½«´óµÄÎļþ·Ö½â³ÉСµÄFileSplit£¬µ±È»Ö»ÊǼǼÎļþÔÚHDFSÀïµÄ·¾¶¼°Æ«ÒÆÁ¿ºÍSplit´óС¡£ÕâЩÐÅÏ¢»áͳһ´ò°üµ½jobFileµÄjarÖС£
JobClientÈ»ºóʹÓÃsubmitJob(job)·½·¨Ïò masterÌá½»×÷Òµ¡£submitJob(job)ÄÚ²¿ÊÇͨ¹ýsubmitJobInternal(job)·½·¨Íê³ÉʵÖÊÐÔµÄ×÷ÒµÌá½»¡£
submitJobInternal(job)·½·¨Ê×ÏÈ»áÏòhadoop·Ö²¼ÏµÍ³ÎļþϵͳhdfsÒÀ´ÎÉÏ´«Èý¸öÎļþ:
job.jar, job.splitºÍjob.xml¡£
job.xml: ×÷ÒµÅäÖã¬ÀýÈçMapper,Combiner, ReducerµÄÀàÐÍ£¬ÊäÈëÊä³ö¸ñʽµÄÀàÐ͵ȡ£
job.jar: jar°ü,ÀïÃæ°üº¬ÁËÖ´ÐдËÈÎÎñÐèÒªµÄ¸÷ÖÖÀ࣬±ÈÈç Mapper,ReducerµÈʵÏÖ¡£
job.split: Îļþ·Ö¿éµÄÏà¹ØÐÅÏ¢£¬±ÈÈçÓÐÊý¾Ý·Ö¶àÉÙ¸ö¿é£¬¿éµÄ´óС(ĬÈÏ64m)µÈ¡£
ÕâÈý¸öÎļþÔÚhdfsÉϵÄ·¾¶ÓÉhadoop-default.xmlÎļþÖеÄmapreduceϵͳ·¾¶mapred.system.dirÊôÐÔ
+ jobid¾ö¶¨¡£mapred.system.dirÊôÐÔĬÈÏÊÇ/tmp/hadoop-user_name/mapred/system¡£Ð´ÍêÕâÈý¸öÎÄ
¼þÖ®ºó, ´Ë·½·¨»áͨ¹ýRPCµ÷ÓÃmaster½ÚµãÉϵÄJobTracker.submitJob(job)·½·¨£¬´Ëʱ×÷ÒµÒѾÌá½»Íê³É¡£
3Ìá½»Job

jobFileµÄÌá½»¹ý³ÌÊÇͨ¹ýRPCÄ£¿é£¨Óе¥¶ÀÒ»ÕÂÀ´Ïêϸ½éÉÜ£©À´ÊµÏֵġ£´óÖ¹ý³ÌÊÇ£¬JobClientÀàÖÐͨ¹ýRPCʵÏÖµÄProxy½Ó¿Úµ÷ÓÃJobTrackerµÄsubmitJob()·½·¨£¬¶øJobTracker±ØÐëʵÏÖJobSubmissionProtocol½Ó¿Ú¡£
JobTracker´´½¨job³É¹¦ºó»á¸øJobClient´«»ØÒ»¸öJobStatus¶ÔÏóÓÃÓڼǼjobµÄ״̬ÐÅÏ¢£¬ÈçÖ´ÐÐʱ¼ä¡¢MapºÍReduceÈÎÎñÍê³ÉµÄ±ÈÀýµÈ¡£JobClient»á¸ù¾ÝÕâ¸öJobStatus¶ÔÏó´´½¨Ò»¸öNetworkedJobµÄRunningJob¶ÔÏó£¬ÓÃÓÚ¶¨Ê±´ÓJobTracker»ñµÃÖ´Ðйý³ÌµÄͳ¼ÆÊý¾ÝÀ´¼à¿Ø²¢´òÓ¡µ½Óû§µÄ¿ØÖÆÌ¨¡£
Óë´´½¨Job¹ý³ÌÏà¹ØµÄÀàºÍ·½·¨ÈçÏÂͼËùʾ
Îå ¡¢JobTracker
ÉÏÃæÒѾÌáµ½£¬jobÊÇͳһÓÉJobTrackerÀ´µ÷¶ÈµÄ£¬¾ßÌåµÄTask·Ö·¢¸ø¸÷¸öTaskTracker½ÚµãÀ´Ö´ÐС£ÏÂÃæÀ´Ïêϸ½âÎöÖ´Ðйý³Ì£¬Ê×ÏÈÏÈ´ÓJobTrackerÊÕµ½JobClientµÄÌá½»ÇëÇó¿ªÊ¼¡£
1JobTracker³õʼ»¯Job
1.1JobTracker.submitJob() ÊÕµ½ÇëÇó
µ±JobTracker½ÓÊÕµ½ÐµÄjobÇëÇ󣨼´submitJob()º¯Êý±»µ÷Ó㩺󣬻ᴴ½¨Ò»¸öJobInProgress¶ÔÏó²¢Í¨¹ýËüÀ´¹ÜÀíºÍµ÷¶ÈÈÎÎñ¡£JobInProgressÔÚ´´½¨µÄʱºò»á³õʼ»¯Ò»ÏµÁÐÓëÈÎÎñÓйصIJÎÊý£¬µ÷Óõ½FileSystem£¬°ÑÔÚJobClient¶ËÉÏ´«µÄËùÓÐÈÎÎñÎļþÏÂÔØµ½±¾µØµÄÎļþϵͳÖеÄÁÙʱĿ¼Àï¡£ÕâÆäÖаüÀ¨ÉÏ´«µÄ*.jarÎļþ°ü¡¢¼Ç¼ÅäÖÃÐÅÏ¢µÄxml¡¢¼Ç¼·Ö¸îÐÅÏ¢µÄÎļþ¡£
1.2JobTracker.JobInitThread ֪ͨ³õʼ»¯Ïß³Ì
JobTracker ÖеļàÌýÆ÷ÀàEagerTaskInitializationListener¸ºÔðÈÎÎñTaskµÄ³õʼ»¯¡£JobTrackerʹÓÃjobAdded(job)¼ÓÈëjobµ½EagerTaskInitializationListenerÖÐÒ»¸öרÃŹÜÀíÐèÒª³õʼ»¯µÄ¶ÓÁÐÀ¼´Ò»¸ölist³ÉÔ±±äÁ¿jobInitQueueÀï¡£resortInitQueue·½·¨¸ù¾Ý×÷ÒµµÄÓÅÏȼ¶ÅÅÐò¡£È»ºóµ÷ÓÃnotifyAll()º¯Êý£¬»á»½ÆðÒ»¸öÓÃÓÚ³õʼ»¯jobµÄÏß³ÌJobInitThreadÀ´´¦Àí¡£JobInitThreadÊÕµ½Ðźźó¼´È¡³ö×ǰµÄjob£¬¼´ÓÅÏȼ¶±ð×î¸ßµÄjob£¬µ÷ÓÃTaskTrackerManagerµÄinitJob×îÖÕµ÷ÓÃJobInProgress.initTasks()Ö´ÐÐÕæÕýµÄ³õʼ»¯¹¤×÷¡£
1.3JobInProgress.initTasks() ³õʼ»¯TaskInProgress
ÈÎÎñTask·ÖÁ½ÖÖ: MapTask ºÍreduceTask£¬ËüÃǵĹÜÀí¶ÔÏó¶¼ÊÇTaskInProgress
¡£
Ê×ÏÈJobInProgress»á´´½¨MapµÄ¼à¿Ø¶ÔÏó¡£ÔÚinitTasks()º¯ÊýÀïͨ¹ýµ÷ÓÃJobClientµÄreadSplitFile()»ñµÃÒÑ·Ö½âµÄÊäÈëÊý¾ÝµÄRawSplitÁÐ±í£¬È»ºó¸ù¾ÝÕâ¸öÁÐ±í´´½¨¶ÔÓ¦ÊýÄ¿µÄMapÖ´ÐйÜÀí¶ÔÏóTaskInProgress¡£ÔÚÕâ¸ö¹ý³ÌÖУ¬»¹»á¼Ç¼¸ÃRawSplit¿é¶ÔÓ¦µÄËùÓÐÔÚHDFSÀïµÄblocksËùÔÚµÄDataNode½ÚµãµÄhost£¬Õâ¸ö»áÔÚRawSplit´´½¨Ê±Í¨¹ýFileSplitµÄgetLocations()º¯Êý»ñÈ¡£¬¸Ãº¯Êý»áµ÷ÓÃDistributedFileSystemµÄgetFileCacheHints()»ñµÃ£¨Õâ¸öϸ½Ú»áÔÚHDFSÖн²½â£©¡£µ±È»Èç¹ûÊÇ´æ´¢ÔÚ±¾µØÎļþϵͳÖУ¬¼´Ê¹ÓÃLocalFileSystemʱµ±È»Ö»ÓÐÒ»¸ölocation¼´¡°localhost¡±ÁË¡£
´´½¨ÕâЩTaskInProgress¶ÔÏóÍê±Ïºó£¬initTasks()·½·¨»áͨ¹ýcreateCache()·½·¨ÎªÕâЩTaskInProgress¶ÔÏó²úÉúÒ»¸öδִÐÐÈÎÎñµÄMap»º´ænonRunningMapCache¡£slave¶ËµÄTaskTrackerÏòmaster·¢ËÍÐÄÌøÊ±£¬¾Í¿ÉÒÔÖ±½Ó´ÓÕâ¸öcacheÖÐÈ¡ÈÎÎñÈ¥Ö´ÐС£
Æä´ÎJobInProgress»á´´½¨ReduceµÄ¼à¿Ø¶ÔÏó£¬Õâ¸ö±È½Ï¼òµ¥£¬¸ù¾ÝJobConfÀïÖ¸¶¨µÄReduceÊýÄ¿´´½¨£¬È±Ê¡Ö»´´½¨1¸öReduceÈÎÎñ¡£¼à¿ØºÍµ÷¶ÈReduceÈÎÎñµÄÊÇTaskInProgressÀ࣬²»¹ý¹¹Ôì·½·¨ÓÐËù²»Í¬£¬TaskInProgress»á¸ù¾Ý²»Í¬²ÎÊý·Ö±ð´´½¨¾ßÌåµÄMapTask»òÕßReduceTask¡£Í¬ÑùµØ£¬initTasks()Ò²»áͨ¹ýcreateCache()·½·¨²úÉúnonRunningReduceCache³ÉÔ±¡£
JobInProgress´´½¨ÍêTaskInProgressºó£¬×îºó¹¹ÔìJobStatus²¢¼Ç¼jobÕýÔÚÖ´ÐÐÖУ¬È»ºóÔÙµ÷ÓÃJobHistory.JobInfo.logStarted()¼Ç¼jobµÄÖ´ÐÐÈÕÖ¾¡£µ½ÕâÀïJobTrackerÀï³õʼ»¯jobµÄ¹ý³ÌÈ«²¿½áÊø¡£

2 JobTrackerµ÷¶ÈJob
hadoopĬÈϵĵ÷¶ÈÆ÷ÊÇFIFO²ßÂÔµÄJobQueueTaskScheduler,ËüÓÐÁ½¸ö³ÉÔ±±äÁ¿jobQueueJobInProgressListenerÓëÉÏÃæËµµÄeagerTaskInitializationListener¡£JobQueueJobInProgressListenerÊÇJobTrackerµÄÁíÒ»¸ö¼àÌýÆ÷À࣬Ëü°üº¬ÁËÒ»¸öÓ³É䣬ÓÃÀ´¹ÜÀíºÍµ÷¶ÈËùÓеÄJobInProgress¡£jobAdded(job)ͬʱ»á¼ÓÈëjobµ½JobQueueJobInProgressListenerÖеÄÓ³Éä¡£
JobQueueTaskScheduler×îÖØÒªµÄ·½·¨ÊÇassignTasks£¬ËûʵÏÖÁ˹¤×÷µ÷¶È¡£¾ßÌåʵÏÖ£ºJobTracker
½Óµ½TaskTrackerµÄheartbeat() µ÷Óúó£¬Ê×ÏÈ»á¼ì²éÉÏÒ»¸öÐÄÌøÏìÓ¦ÊÇ·ñÍê³É£¬ÊÇûҪÇóÆô¶¯»òÖØÆôÈÎÎñ£¬Èç¹ûÒ»ÇÐÕý³££¬Ôò»á´¦ÀíÐÄÌø¡£Ê×ÏÈËü»á¼ì²é
TaskTracker ¶Ë»¹¿ÉÒÔ×ö¶àÉÙ¸ö map ºÍ reduce ÈÎÎñ£¬½«ÒªÅÉ·¢µÄÈÎÎñÊýÊÇ·ñ³¬³öÕâ¸öÊý£¬ÊÇ·ñ³¬³ö¼¯ÈºµÄÈÎÎñƽ¾ùÊ£Óà¿É¸ºÔØÊý¡£Èç¹û¶¼Ã»³¬³ö£¬ÔòΪ´ËTaskTracker
·ÖÅäÒ»¸ö MapTask »ò ReduceTask ¡£²úÉú Map ÈÎÎñʹÓà JobInProgress
µÄobtainNewMapTask() ·½·¨£¬ÊµÖÊÉÏ×îºóµ÷ÓÃÁË JobInProgress µÄ findNewMapTask()
·ÃÎÊnonRunningMapCache ¡£
ÉÏÃæ½²½âÈÎÎñ³õʼ»¯Ê±Ëµ¹ý£¬createCache()·½·¨»áÔÚÍøÂçÍØÆË½á¹¹ÉϹÒÉÏÐèÒªÖ´ÐеÄTaskInProgress¡£findNewMapTask()´Ó½üµ½Ô¶Ò»²ãÒ»²ãµØÑ°ÕÒ£¬Ê×ÏÈÊÇͬһ½Úµã£¬È»ºóÔÚѰÕÒͬһ»ú¹ñÉϵĽڵ㣬½Ó×ÅѰÕÒÏàͬÊý¾ÝÖÐÐÄϵĽڵ㣬ֱµ½ÕÒÁËmaxLevel²ã½áÊø¡£ÕâÑùµÄ»°£¬ÔÚJobTracker¸øTaskTrackerÅÉ·¢ÈÎÎñµÄʱºò£¬¿ÉÒÔѸËÙÕÒµ½×î½üµÄTaskTracker£¬ÈÃËüÖ´ÐÐÈÎÎñ¡£
×îÖÕÉú³ÉÒ»¸öTaskÀà¶ÔÏ󣬸öÔÏó±»·â×°ÔÚÒ»¸öLanuchTaskActionÖУ¬·¢»Ø¸øTaskTracker£¬ÈÃËüÈ¥Ö´ÐÐÈÎÎñ¡£
²úÉú Reduce ÈÎÎñ¹ý³ÌÀàËÆ£¬Ê¹ÓÃJobInProgress.obtainNewReduceTask()
·½·¨£¬ÊµÖÊÉÏ×îºóµ÷ÓÃÁËJobInProgressµÄ findNewReduceTask() ·ÃÎÊ nonRuningReduceCache¡£

Áù¡¢ TaskTracker
1TaskTracker¼ÓÔØTaskµ½×Ó½ø³Ì
TaskµÄÖ´ÐÐʵ¼ÊÊÇÓÉTaskTracker·¢ÆðµÄ£¬TaskTracker»á¶¨ÆÚ£¨È±Ê¡Îª10ÃëÖÓ£¬²Î¼ûMRConstantsÀàÖж¨ÒåµÄHEARTBEAT_INTERVAL±äÁ¿£©ÓëJobTracker½øÐÐÒ»´ÎͨÐÅ£¬±¨¸æ×Ô¼ºTaskµÄÖ´ÐÐ״̬£¬½ÓÊÕJobTrackerµÄÖ¸ÁîµÈ¡£Èç¹û·¢ÏÖÓÐ×Ô¼ºÐèÒªÖ´ÐеÄÐÂÈÎÎñÒ²»áÔÚÕâʱÆô¶¯£¬¼´ÊÇÔÚTaskTrackerµ÷ÓÃJobTrackerµÄheartbeat()·½·¨Ê±½øÐУ¬´Ëµ÷ÓõײãÊÇͨ¹ýIPC²ãµ÷ÓÃProxy½Ó¿ÚʵÏÖ¡£ÏÂÃæÒ»Ò»¼òµ¥½éÉÜÏÂÿ¸ö²½Öè¡£
1.1TaskTracker.run() Á¬½ÓJobTracker
TaskTrackerµÄÆô¶¯¹ý³Ì»á³õʼ»¯Ò»ÏµÁвÎÊýºÍ·þÎñ£¬È»ºó³¢ÊÔÁ¬½ÓJobTracker£¨¼´±ØÐëʵÏÖInterTrackerProtocol½Ó¿Ú£©£¬Èç¹ûÁ¬½Ó¶Ï¿ª£¬Ôò»áÑ»·³¢ÊÔÁ¬½ÓJobTracker£¬²¢ÖØÐ³õʼ»¯ËùÓгÉÔ±ºÍ²ÎÊý¡£
1.2TaskTracker.offerService() Ö÷Ñ»·
Èç¹ûÁ¬½ÓJobTracker·þÎñ³É¹¦£¬TaskTracker¾Í»áµ÷ÓÃofferService()º¯Êý½øÈëÖ÷Ö´ÐÐÑ»·ÖС£Õâ¸öÑ»·»áÿ¸ô10ÃëÓëJobTrackerͨѶһ´Î£¬µ÷ÓÃtransmitHeartBeat()£¬»ñµÃHeartbeatResponseÐÅÏ¢¡£È»ºóµ÷ÓÃHeartbeatResponseµÄgetActions()º¯Êý»ñµÃJobTracker´«¹ýÀ´µÄËùÓÐÖ¸Áî¼´Ò»¸öTaskTrackerActionÊý×é¡£ÔÙ±éÀúÕâ¸öÊý×飬Èç¹ûÊÇÒ»¸öÐÂÈÎÎñÖ¸Áî¼´LaunchTaskActionÔòµ÷Óõ÷ÓÃaddToTaskQueue¼ÓÈëµ½´ýÖ´ÐжÓÁУ¬·ñÔò¼ÓÈëµ½tasksToCleanup¶ÓÁУ¬½»¸øÒ»¸ötaskCleanupThreadÏß³ÌÀ´´¦Àí£¬ÈçÖ´ÐÐKillJobAction»òÕßKillTaskActionµÈ¡£
1.3TaskTracker.transmitHeartBeat() »ñÈ¡JobTrackerÖ¸Áî
ÔÚtransmitHeartBeat()º¯Êý´¦ÀíÖУ¬TaskTracker»á´´½¨Ò»¸öеÄTaskTrackerStatus¶ÔÏó¼Ç¼ĿǰÈÎÎñµÄÖ´ÐÐ×´¿ö£¬¼ì²éĿǰִÐеÄTaskÊýÄ¿ÒÔ¼°±¾µØ´ÅÅ̵ĿռäʹÓÃÇé¿öµÈ£¬Èç¹û¿ÉÒÔ½ÓÊÕеÄTaskÔòÉèÖÃheartbeat()µÄaskForNewTask²ÎÊýΪtrue¡£È»ºóͨ¹ýIPC½Ó¿Úµ÷ÓÃJobTrackerµÄheartbeat()·½·¨·¢Ë͹ýÈ¥£¬heartbeat()·µ»ØÖµTaskTrackerActionÊý×é¡£
1.4 TaskTracker.addToTaskQueue£¬½»¸øTaskLauncher´¦Àí
TaskLauncherÊÇÓÃÀ´´¦ÀíÐÂÈÎÎñµÄÏß³ÌÀ࣬°üº¬ÁËÒ»¸ö´ýÔËÐÐÈÎÎñµÄ¶ÓÁÐ tasksToLaunch¡£TaskTracker.addToTaskQueue»áµ÷ÓÃTaskTrackerµÄregisterTask£¬´´½¨TaskInProgress¶ÔÏóÀ´µ÷¶ÈºÍ¼à¿ØÈÎÎñ£¬²¢°ÑËü¼ÓÈëµ½runningTasks¶ÓÁÐÖС£Í¬Ê±½«Õâ¸öTaskInProgress¼Óµ½tasksToLaunchÖУ¬²¢notifyAll()»½ÐÑÒ»¸öÏß³ÌÔËÐУ¬¸ÃÏ̴߳ӶÓÁÐtasksToLaunchÈ¡³öÒ»¸ö´ýÔËÐÐÈÎÎñ£¬µ÷ÓÃTaskTrackerµÄstartNewTaskÔËÐÐÈÎÎñ¡£
1.5 TaskTracker.startNewTask() Æô¶¯ÐÂÈÎÎñ
µ÷ÓÃlocalizeJob()ÕæÕý³õʼ»¯Task²¢¿ªÊ¼Ö´ÐС£
1.6 TaskTracker.localizeJob() ³õʼ»¯jobĿ¼µÈ
´Ëº¯ÊýÖ÷ÒªÈÎÎñÊdzõʼ»¯¹¤×÷Ŀ¼workDir£¬ÔÙ½«job jar°ü´ÓHDFS¸´ÖƵ½±¾µØÎļþϵͳÖУ¬µ÷ÓÃRunJar.unJar()½«°ü½âѹµ½¹¤×÷Ŀ¼¡£È»ºó´´½¨Ò»¸öRunningJob²¢µ÷ÓÃaddTaskToJob()º¯Êý½«ËüÌí¼Óµ½runningJobs¼à¿Ø¶ÓÁÐÖС£addTaskToJob·½·¨°ÑÒ»¸öÈÎÎñ¼ÓÈëµ½¸ÃÈÎÎñÊôÓÚµÄrunningJobµÄtasksÁбíÖС£Èç¹û¸ÃÈÎÎñÊôÓÚµÄrunningJob²»´æÔÚ£¬ÏÈн¨£¬¼Óµ½runningJobsÖС£Íê³Éºó¼´µ÷ÓÃlaunchTaskForJob()¿ªÊ¼Ö´ÐÐTask¡£
1.7 TaskTracker.launchTaskForJob()Ö´ÐÐÈÎÎñ
Æô¶¯TaskµÄ¹¤×÷ʵ¼ÊÊǵ÷ÓÃTaskTracker$TaskInProgressµÄlaunchTask()º¯ÊýÀ´Ö´Ðеġ£
1.8 TaskTracker$TaskInProgress.launchTask()Ö´ÐÐÈÎÎñ
Ö´ÐÐÈÎÎñǰÏȵ÷ÓÃlocalizeTask()¸üÐÂÒ»ÏÂjobConfÎļþ²¢Ð´Èëµ½±¾µØÄ¿Â¼ÖС£È»ºóͨ¹ýµ÷ÓÃTaskµÄcreateRunner()·½·¨´´½¨TaskRunner¶ÔÏó²¢µ÷ÓÃÆästart()·½·¨×îºóÆô¶¯Task¶ÀÁ¢µÄjavaÖ´ÐÐ×Ó½ø³Ì¡£
1.9 Task.createRunner()´´½¨Æô¶¯Runner¶ÔÏó
TaskÓÐÁ½¸öʵÏÖ°æ±¾£¬¼´MapTaskºÍReduceTask£¬ËüÃÇ·Ö±ðÓÃÓÚ´´½¨MapºÍReduceÈÎÎñ¡£MapTask»á´´½¨MapTaskRunnerÀ´Æô¶¯Task×Ó½ø³Ì£¬¶øReduceTaskÔò´´½¨ReduceTaskRunnerÀ´Æô¶¯¡£
1.10 TaskRunner.start()Æô¶¯×Ó½ø³Ì
TaskRunner¸ºÔð½«Ò»¸öÈÎÎñ·Åµ½Ò»¸ö½ø³ÌÀïÃæÀ´Ö´ÐС£Ëü»áµ÷ÓÃrun()º¯ÊýÀ´´¦Àí£¬Ö÷ÒªµÄ¹¤×÷¾ÍÊdzõʼ»¯Æô¶¯java×Ó½ø³ÌµÄһϵÁл·¾³±äÁ¿£¬°üÀ¨É趨¹¤×÷Ŀ¼workDir£¬ÉèÖÃCLASSPATH»·¾³±äÁ¿µÈ¡£È»ºó×°ÔØjob
jar°ü¡£JvmManagerÓÃÓÚ¹ÜÀí¸ÃTaskTrackerÉÏËùÓÐÔËÐеÄTask×Ó½ø³Ì¡£Ã¿Ò»¸ö½ø³Ì¶¼ÊÇÓÉJvmRunnerÀ´¹ÜÀíµÄ£¬ËüÒ²ÊÇλÓÚµ¥¶ÀÏß³ÌÖеġ£JvmManagerµÄlaunchJvm·½·¨£¬¸ù¾ÝÈÎÎñÊÇmap»¹ÊÇreduce,Éú³É¶ÔÓ¦µÄJvmRunner²¢·Åµ½¶ÔÓ¦JvmManagerForTypeµÄ½ø³ÌÈÝÆ÷ÖнøÐйÜÀí¡£JvmManagerForTypeµÄreapJvm()
·ÖÅäÒ»¸öеÄJVM½ø³Ì¡£Èç¹ûJvmManagerForType²ÛÂú£¬¾ÍѰÕÒidleµÄ½ø³Ì£¬Èç¹ûÊÇͬJobµÄÖ±½Ó·Å½øÈ¥£¬·ñÔòɱËÀÕâ¸ö½ø³Ì£¬ÓÃÒ»¸öеĽø³Ì´úÌæ¡£
Èç¹û²ÛûÓÐÂú£¬ÄÇô¾ÍÆô¶¯ÐµÄ×Ó½ø³Ì¡£Éú³ÉеĽø³ÌʹÓÃspawnNewJvm·½·¨¡£spawnNewJvmʹÓÃJvmRunnerÏ̵߳Ärun·½·¨£¬run·½·¨ÓÃÓÚÉú³ÉÒ»¸öеĽø³Ì²¢ÔËÐÐËü£¬¾ßÌåʵÏÖÊǵ÷ÓÃrunChild¡£
2 ×Ó½ø³ÌÖ´ÐÐMapTask
ÕæÊµµÄÖ´ÐÐÔØÌ壬ÊÇChild£¬Ëü°üº¬Ò»¸ö mainº¯Êý£¬½ø³ÌÖ´ÐУ¬»á½«Ïà¹Ø²ÎÊý´«½øÀ´£¬Ëü»á²ð½âÕâЩ²ÎÊý£¬Í¨¹ýgetTask(jvmId)Ïò¸¸½ø³ÌË÷È¡ÈÎÎñ£¬²¢ÇÒ¹¹Ôì³öÏà¹ØµÄTaskʵÀý£¬È»ºóʹÓÃTaskµÄrun()Æô¶¯ÈÎÎñ¡£
2.1 run
·½·¨Ï൱¼òµ¥£¬ÅäÖÃÍêϵͳµÄTaskReporterºó£¬¾Í¸ù¾ÝÇé¿öÖ´ÐÐrunJobCleanupTask£¬runJobSetupTask£¬runTaskCleanupTask»òÖ´ÐÐMapper¡£ÓÉÓÚMapReduceÏÖÔÚÓÐÁ½Ì×API£¬MapTaskÐèÒªÖ§³ÖÕâÁ½Ì×API£¬Ê¹µÃMapTaskÖ´ÐÐMapper·ÖΪrunNewMapperºÍrunOldMapper£¬ÎÒÃÇ·ÖÎörunOldMapper¡£
2.2 runOldMapper
runOldMapper×ʼ²¿·ÖÊǹ¹ÔìMapper´¦ÀíµÄInputSplit£¬È»ºó¾Í¿ªÊ¼´´½¨MapperµÄRecordReader£¬×îÖյõ½mapµÄÊäÈë¡£Ö®ºó¹¹ÔìMapperµÄÊä³ö£¬ÊÇͨ¹ýMapOutputCollector½øÐеģ¬Ò²·ÖÁ½ÖÖÇé¿ö£¬Èç¹ûûÓÐReducer£¬ÄÇô£¬ÓÃDirectMapOutputCollector£¬·ñÔò£¬ÓÃMapOutputBuffer¡£
¹¹ÔìÍêMapperµÄÊäÈëÊä³ö£¬Í¨¹ý¹¹ÔìÅäÖÃÎļþÖÐÅäÖõÄMapRunnable£¬¾Í¿ÉÒÔÖ´ÐÐMapperÁË¡£Ä¿Ç°ÏµÍ³ÓÐÁ½¸öMapRunnable£ºMapRunnerºÍMultithreadedMapRunner¡£MapRunnerÊǵ¥Ïß³ÌÖ´ÐÐÆ÷£¬±È½Ï¼òµ¥£¬Ëû»áʹÓ÷´Éä»úÖÆÉú³ÉÓû§¶¨ÒåµÄMapper½Ó¿ÚʵÏÖÀ࣬×÷ΪËûµÄÒ»¸ö³ÉÔ±¡£
2.3 MapRunnerµÄrun·½·¨
»áÏÈ´´½¨¶ÔÓ¦µÄkey£¬value¶ÔÏó£¬È»ºó£¬¶ÔInputSplitµÄÿһ¶Ô<key£¬value>£¬µ÷ÓÃÓû§ÊµÏÖµÄMapper½Ó¿ÚʵÏÖÀàµÄmap·½·¨£¬Ã¿´¦ÀíÒ»¸öÊý¾Ý¶Ô£¬¾ÍҪʹÓÃOutputCollectorÊÕ¼¯Ã¿´Î´¦Àíkv¶ÔºóµÃµ½µÄеÄkv¶Ô£¬°ÑËûÃÇspillµ½Îļþ»òÕ߷ŵ½Äڴ棬ÒÔ×ö½øÒ»²½µÄ´¦Àí£¬±ÈÈçÅÅÐò£¬combineµÈ¡£
2.4 OutputCollector
OutputCollectorµÄ×÷ÓÃÊÇÊÕ¼¯Ã¿´Îµ÷ÓÃmapºóµÃµ½µÄеÄkv¶Ô£¬Äþ°ÑËûÃÇspillµ½Îļþ»òÕ߷ŵ½Äڴ棬ÒÔ×ö½øÒ»²½µÄ´¦Àí£¬±ÈÈçÅÅÐò£¬combineµÈ¡£
MapOutputCollector ÓÐÁ½¸ö×ÓÀࣺMapOutputBufferºÍDirectMapOutputCollector¡£
DirectMapOutputCollectorÓÃÔÚ²»ÐèÒªReduce½×¶ÎµÄʱºò¡£Èç¹ûMapperºóÐøÓÐreduceÈÎÎñ£¬ÏµÍ³»áʹÓÃMapOutputBuffer×öΪÊä³ö£¬MapOutputBufferʹÓÃÁËÒ»¸ö»º³åÇø¶ÔmapµÄ´¦Àí½á¹û½øÐлº´æ£¬·ÅÔÚÄÚ´æÖУ¬ÓÖʹÓü¸¸öÊý×é¶ÔÕâ¸ö»º³åÇø½øÐйÜÀí¡£

ÔÚÊʵ±µÄʱ»ú£¬»º³åÇøÖеÄÊý¾Ý»á±»spillµ½Ó²ÅÌÖС£

ÏòÓ²ÅÌÖÐдÊý¾ÝµÄʱ»ú:
£¨1£©µ±Äڴ滺³åÇø²»ÄÜÈÝÏÂÒ»¸öÌ«´óµÄkv¶Ôʱ¡£spillSingleRecord·½·¨¡£
£¨2£©Äڴ滺³åÇøÒÑÂúʱ¡£SpillThreadÏ̡߳£
£¨3£©MapperµÄ½á¹û¶¼ÒѾcollectÁË£¬ÐèÒª¶Ô»º³åÇø×ö×îºóµÄÇåÀí¡£Flush·½·¨¡£
2.5 spillThreadỊ̈߳º½«»º³åÇøÖеÄÊý¾Ýspillµ½Ó²ÅÌÖС£
£¨1£©ÐèÒªspillʱµ÷Óú¯ÊýsortAndSpill£¬°´ÕÕpartitionºÍkey×öÅÅÐò¡£Ä¬ÈÏʹÓõÄÊÇ¿ìËÙÅÅÐòQuickSort¡£
£¨2£©Èç¹ûûÓÐcombiner£¬ÔòÖ±½ÓÊä³ö¼Ç¼£¬·ñÔò£¬µ÷ÓÃCombinerRunnerµÄcombine£¬ÏÈ×öcombinÈ»ºóÊä³ö¡£
3 ×Ó½ø³ÌÖ´ÐÐReduceTask
ReduceTask.run·½·¨¿ªÊ¼ºÍMapTaskÀàËÆ£¬°üÀ¨initialize()³õʼ»¯£¬runJobCleanupTask()£¬runJobSetupTask()£¬runTaskCleanupTask()¡£Ö®ºó½øÈëÕýʽµÄ¹¤×÷£¬Ö÷ÒªÓÐÕâôÈý¸ö²½Ö裺Copy¡¢Sort¡¢Reduce¡£
3.1 Copy
¾ÍÊÇ´ÓÖ´Ðи÷¸öMapÈÎÎñµÄ·þÎñÆ÷ÄÇÀÊÕÂÞµ½mapµÄÊä³öÎļþ¡£¿½±´µÄÈÎÎñ£¬ÊÇÓÉReduceTask.ReduceCopierÀàÀ´¸ºÔð¡£
3.1.1 Ààͼ:
3.1.2 Á÷³Ì: ʹÓÃReduceCopier.fetchOutputs¿ªÊ¼
£¨1£©Ë÷È¡ÈÎÎñ¡£Ê¹ÓÃGetMapEventsThreadÏ̡߳£¸ÃÏ̵߳Ärun·½·¨²»Í£µÄµ÷ÓÃgetMapCompletionEvents·½·¨£¬¸Ã·½·¨ÓÖʹÓÃRPCµ÷ÓÃTaskUmbilicalProtocolÐÒéµÄgetMapCompletionEvents£¬·½·¨Ê¹ÓÃËùÊôµÄjobIDÏòÆä¸¸TaskTrackerѯÎÊ´Ë×÷Òµ¸öMapÈÎÎñµÄÍê³É×´¿ö£¨TaskTrackerÒªÏòJobTrackerѯÎʺóÔÙת¸æ¸øËü¡£©¡£·µ»ØÒ»¸öÊý×éTaskCompletionEventevents[]¡£TaskCompletionEvent°üº¬taskidºÍipµØÖ·Ö®ÀàµÄÐÅÏ¢¡£
£¨2£©µ±»ñÈ¡µ½Ïà¹ØMapÈÎÎñÖ´ÐзþÎñÆ÷µÄÐÅÏ¢ºó£¬ÓÐÒ»¸öÏß³ÌMapOutputCopier¿ªÆô£¬×ö¾ßÌåµÄ¿½±´¹¤×÷¡£
Ëü»áÔÚÒ»¸öµ¥¶ÀµÄÏß³ÌÄÚ£¬¸ºÔðij¸öMapÈÎÎñ·þÎñÆ÷ÉÏÎļþµÄ¿½±´¹¤×÷¡£MapOutputCopierµÄrunÑ»·µ÷ÓÃcopyOutput£¬copyOutputÓÖµ÷ÓÃgetMapOutput£¬Ê¹ÓÃHTTPÔ¶³Ì¿½±´¡£
£¨3£©getMapOutputÔ¶³Ì¿½±´¹ýÀ´µÄÄÚÈÝ£¨µ±È»Ò²¿ÉÒÔÊDZ¾µØÁË¡£©£¬×÷ΪMapOutput¶ÔÏó´æÔÚ£¬Ëü¿ÉÒÔÔÚÄÚ´æÖÐÒ²¿ÉÒÔÐòÁл¯ÔÚ´ÅÅÌÉÏ£¬Õâ¸ö¸ù¾ÝÄÚ´æÊ¹ÓÃ×´¿öÀ´×Ô¶¯µ÷½Ú¡£
£¨4£© ͬʱ£¬»¹ÓÐÒ»¸öÄÚ´æMergerÏß³ÌInMemFSMergeThreadºÍÒ»¸öÎļþMergerÏß³ÌLocalFSMergerÔÚͬ²½¹¤×÷£¬ËüÃǽ«ÏÂÔØ¹ýÀ´µÄÎļþ£¨¿ÉÄÜÔÚÄÚ´æÖУ¬¼òµ¥µÄͳ³ÆÎªÎļþ¡£©£¬×ö׏鲢ÅÅÐò£¬ÒÔ´Ë£¬½ÚԼʱ¼ä£¬½µµÍÊäÈëÎļþµÄÊýÁ¿£¬ÎªºóÐøµÄÅÅÐò¹¤×÷¼õ
¸º¡£InMemFSMergeThreadµÄrunÑ»·µ÷ÓÃdoInMemMerge£¬¸Ã·½·¨Ê¹Óù¤¾ßÀàMergerʵÏֹ鲢£¬Èç¹ûÐèÒªcombine£¬ÔòcombinerRunner.combine¡£
3.2 Sort
ÅÅÐò¹¤×÷£¬¾ÍÏ൱ÓÚÉÏÊöÅÅÐò¹¤×÷µÄÒ»¸öÑÓÐø¡£Ëü»áÔÚËùÓеÄÎļþ¶¼¿½±´Íê±Ïºó½øÐС£Ê¹Óù¤¾ßÀàMerger¹é²¢ËùÓеÄÎļþ¡£¾¹ýÕâÒ»¸öÁ÷³Ì£¬Ò»¸öºÏ²¢ÁËËùÓÐËùÐèMapÈÎÎñÊä³öÎļþµÄÐÂÎļþ²úÉúÁË¡£¶øÄÇЩ´ÓÆäËû¸÷¸ö·þÎñÆ÷ÍøÂÞ¹ýÀ´µÄ
MapÈÎÎñÊä³öÎļþ£¬È«²¿É¾³ýÁË¡£
3.3Reduce
ReduceÈÎÎñµÄ×îºóÒ»¸ö½×¶Î¡£Ëû»á×¼±¸ºÃkeyClass£¨¡±mapred.output.key.class¡±»ò¡±mapred.mapoutput.key.class¡±£©,valueClass(¡°mapred.mapoutput.value.class¡±»ò¡±mapred.output.value.class¡±)ºÍComparator£¨¡°mapred.output.value.groupfn.class¡±»ò¡°mapred.output.key.comparator.class¡±£©¡£×îºóµ÷ÓÃrunOldReducer·½·¨¡££¨Ò²ÊÇÁ½Ì×API£¬ÎÒÃÇ·ÖÎörunOldReducer£©
3.3.1 runOldReducer
£¨1£©Êä³ö·½Ãæ¡£
Ëü»á×¼±¸Ò»¸öOutputCollectorÊÕ¼¯Êä³ö£¬ÓëMapTask²»Í¬£¬Õâ¸öOutputCollector¸üΪ¼òµ¥£¬½ö½öÊÇ´ò¿ªÒ»¸öRecordWriter£¬collectÒ»´Î£¬writeÒ»´Î¡£×î´óµÄ²»Í¬ÔÚÓÚ£¬Õâ´Î´«ÈëRecordWriterµÄÎļþϵͳ£¬»ù±¾¶¼ÊÇ·Ö²¼Ê½Îļþϵͳ£¬»òÕß˵ÊÇHDFS¡£
£¨2£©ÊäÈë·½Ãæ£¬ReduceTask»áÓÃ×¼±¸ºÃµÄKeyClass¡¢ValueClass¡¢KeyComparatorµÈµÈÖ®ÀàµÄ×Ô¶¨ÒåÀ࣬¹¹Ôì³öReducerËùÐèµÄ¼üÀàÐÍ£¬ºÍÖµµÄµü´úÀàÐÍIterator£¨Ò»¸ö¼üµ½ÁËÕâÀïÒ»°ãÊǶÔÓ¦Ò»×éÖµ£©¡£
£¨3£©ÓÐÁËÊäÈ룬ÓÐÁËÊä³ö£¬²»¶ÏÑ»·µ÷ÓÃ×Ô¶¨ÒåµÄReducer£¬×îÖÕ£¬Reduce½×¶ÎÍê³É¡£

|