Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Hadoop MapReduceÖ÷Ìå¿ò¼ÜÔËÐÐÁ÷³Ì
 
×÷ÕߣºåÐÒ£³å  À´Ô´£ºCSDN ·¢²¼ÓÚ  2016-6-7
  2797  次浏览      27
 

Ò»¡¢MapReduce¸ÅÊö

Map/ReduceÊÇÒ»¸öÓÃÓÚ´ó¹æÄ£Êý¾Ý´¦ÀíµÄ·Ö²¼Ê½¼ÆËãÄ£ÐÍ£¬Ëü×î³õÊÇÓÉGoogle¹¤³ÌʦÉè¼Æ²¢ÊµÏֵģ¬GoogleÒѾ­½«ËüÍêÕûµÄMapReduceÂÛÎĹ«¿ª·¢²¼ÁË¡£ÆäÖжÔËüµÄ¶¨ÒåÊÇ£¬Map/ReduceÊÇÒ»¸ö±à³ÌÄ£ÐÍ£¨programmingmodel£©£¬ÊÇÒ»¸öÓÃÓÚ´¦ÀíºÍÉú³É´ó¹æÄ£Êý¾Ý¼¯£¨processing and generating large data sets£©µÄÏà¹ØµÄʵÏÖ¡£Óû§¶¨ÒåÒ»¸ömapº¯ÊýÀ´´¦ÀíÒ»¸ökey/value¶ÔÒÔÉú³ÉÒ»ÅúÖмäµÄkey/value¶Ô£¬ÔÙ¶¨ÒåÒ»¸öreduceº¯Êý½«ËùÓÐÕâЩÖмäµÄÓÐ×ÅÏàͬkeyµÄvaluesºÏ²¢ÆðÀ´¡£ºÜ¶àÏÖʵÊÀ½çÖеÄÈÎÎñ¶¼¿ÉÓÃÕâ¸öÄ£ÐÍÀ´±í´ï¡£

¶þ¡¢ MapReduce¹¤×÷Ô­Àí

Map-Reduce¿ò¼ÜµÄÔË×÷ÍêÈ«»ùÓÚ<key,value>¶Ô£¬¼´Êý¾ÝµÄÊäÈëÊÇÒ»Åú<key,value>¶Ô£¬Éú³ÉµÄ½á¹ûÒ²ÊÇÒ»Åú<key,value>¶Ô£¬Ö»ÊÇÓÐʱºòËüÃǵÄÀàÐͲ»Ò»Ñù¶øÒÑ¡£KeyºÍvalueµÄÀàÓÉÓÚÐèÒªÖ§³Ö±»ÐòÁл¯£¨serialize£©²Ù×÷£¬ËùÒÔËüÃDZØÐëҪʵÏÖWritable½Ó¿Ú£¬¶øÇÒkeyµÄÀ໹±ØÐëʵÏÖWritableComparable½Ó¿Ú£¬Ê¹µÃ¿ÉÒÔÈÿò¼Ü¶ÔÊý¾Ý¼¯µÄÖ´ÐÐÅÅÐò²Ù×÷¡£

Ò»¸öMap-ReduceÈÎÎñµÄÖ´Ðйý³ÌÒÔ¼°Êý¾ÝÊäÈëÊä³öµÄÀàÐÍÈçÏÂËùʾ£º

Map£º<k1,v1> ->list<k2,v2>

Reduce£º<k2,list<v2>> -><k3,v3>

ÏÂÃæÍ¨¹ýÒ»¸öµÄÀý×ÓÀ´Ïêϸ˵Ã÷Õâ¸ö¹ý³Ì¡£

WordCountÊÇHadoop×Ô´øµÄÒ»¸öÀý×Ó£¬Ä¿±êÊÇͳ¼ÆÎı¾ÎļþÖе¥´ÊµÄ¸öÊý¡£¼ÙÉèÓÐÈçϵÄÁ½¸öÎı¾ÎļþÀ´ÔËÐÐWorkCount³ÌÐò£º

Hello World Bye World

Hello Hadoop GoodBye Hadoop

1 mapÊý¾ÝÊäÈë

2 mapÊä³ö/combineÊäÈë

3 combineÊä³ö

4 reduceÊä³ö

Èý ¡¢MapReduce¿ò¼Ü½á¹¹

1 ½ÇÉ«

1.1 JobTracker

JobTrackerÊÇÒ»¸ömaster·þÎñ£¬ JobTracker¸ºÔðµ÷¶ÈjobµÄÿһ¸ö×ÓÈÎÎñtaskÔËÐÐÓÚTaskTrackerÉÏ£¬²¢¼à¿ØËüÃÇ£¬Èç¹û·¢ÏÖÓÐʧ°ÜµÄtask¾ÍÖØÐÂÔËÐÐËü¡£Ò»°ãÇé¿öÓ¦¸Ã°ÑJobTracker²¿ÊðÔÚµ¥¶ÀµÄ»úÆ÷ÉÏ¡£

1.2 TaskTracker

TaskTrackerÊÇÔËÐÐÓÚ¶à¸ö½ÚµãÉϵÄslaver·þÎñ¡£TaskTrackerÔò¸ºÔðÖ±½ÓÖ´ÐÐÿһ¸ötask¡£TaskTracker¶¼ÐèÒªÔËÐÐÔÚHDFSµÄDataNodeÉÏ£¬

1.3 JobClient

ÿһ¸öjob¶¼»áÔÚÓû§¶Ëͨ¹ýJobClientÀཫӦÓóÌÐòÒÔ¼°ÅäÖòÎÊý´ò°ü³ÉjarÎļþ´æ´¢ÔÚHDFS£¬²¢°Ñ·¾¶Ìá½»µ½JobTracker£¬È»ºóÓÉJobTracker´´½¨Ã¿Ò»¸öTask£¨¼´MapTaskºÍReduceTask£©²¢½«ËüÃÇ·Ö·¢µ½¸÷¸öTaskTracker·þÎñÖÐÈ¥Ö´ÐС£

2 Êý¾Ý½á¹¹

2.1 MapperºÍReducer

ÔËÐÐÓÚHadoopµÄMapReduceÓ¦ÓóÌÐò×î»ù±¾µÄ×é³É²¿·Ö°üÀ¨Ò»¸öMapperºÍÒ»¸öReducerÀ࣬ÒÔ¼°Ò»¸ö´´½¨JobConfµÄÖ´ÐгÌÐò£¬ÔÚһЩӦÓÃÖл¹¿ÉÒÔ°üÀ¨Ò»¸öCombinerÀ࣬Ëüʵ¼ÊÒ²ÊÇReducerµÄʵÏÖ¡£

2.2 JobInProgress

JobClientÌá½»jobºó£¬JobTracker»á´´½¨Ò»¸öJobInProgressÀ´¸ú×ٺ͵÷¶ÈÕâ¸öjob£¬²¢°ÑËüÌí¼Óµ½job¶ÓÁÐÀï¡£JobInProgress»á¸ù¾ÝÌá½»µÄjob

jarÖж¨ÒåµÄÊäÈëÊý¾Ý¼¯£¨ÒÑ·Ö½â³ÉFileSplit£©´´½¨¶ÔÓ¦µÄÒ»ÅúTaskInProgressÓÃÓÚ¼à¿ØºÍµ÷¶ÈMapTask£¬Í¬Ê±ÔÚ´´½¨Ö¸¶¨ÊýÄ¿µÄTaskInProgressÓÃÓÚ¼à¿ØºÍµ÷¶ÈReduceTask£¬È±Ê¡Îª1¸öReduceTask¡£

2.3 TaskInProgress

JobTrackerÆô¶¯ÈÎÎñʱͨ¹ýÿһ¸öTaskInProgressÀ´launchTask£¬Õâʱ»á°ÑTask¶ÔÏ󣨼´MapTaskºÍReduceTask£©ÐòÁл¯Ð´ÈëÏàÓ¦µÄTaskTracker·þÎñÖУ¬TaskTrackerÊÕµ½ºó»á´´½¨¶ÔÓ¦µÄTaskInProgress£¨´ËTaskInProgressʵÏÖ·ÇJobTrackerÖÐʹÓõÄTaskInProgress£¬×÷ÓÃÀàËÆ£©ÓÃÓÚ¼à¿ØºÍµ÷¶È¸ÃTask¡£Æô¶¯¾ßÌåµÄTask½ø³ÌÊÇͨ¹ýTaskInProgress¹ÜÀíµÄTaskRunner¶ÔÏóÀ´ÔËÐеġ£TaskRunner»á×Ô¶¯×°ÔØjobjar£¬²¢ÉèÖúû·¾³±äÁ¿ºóÆô¶¯Ò»¸ö¶ÀÁ¢µÄjava child½ø³ÌÀ´Ö´ÐÐTask£¬¼´MapTask»òÕßReduceTask£¬µ«ËüÃDz»Ò»¶¨ÔËÐÐÔÚͬһ¸öTaskTrackerÖС£

2.4 MapTaskºÍReduceTask

Ò»¸öÍêÕûµÄjob»á×Ô¶¯ÒÀ´ÎÖ´ÐÐMapper¡¢Combiner£¨ÔÚJobConfÖ¸¶¨ÁËCombinerʱִÐУ©ºÍReducer£¬ÆäÖÐMapperºÍCombinerÊÇÓÉMapTaskµ÷ÓÃÖ´ÐУ¬ReducerÔòÓÉReduceTaskµ÷Óã¬Combinerʵ¼ÊÒ²ÊÇReducer½Ó¿ÚÀàµÄʵÏÖ¡£Mapper»á¸ù¾ÝjobjarÖж¨ÒåµÄÊäÈëÊý¾Ý¼¯°´<key1,value1>¶Ô¶ÁÈ룬´¦ÀíÍê³ÉÉú³ÉÁÙʱµÄ<key2,value2>¶Ô£¬Èç¹û¶¨ÒåÁËCombiner£¬MapTask»áÔÚMapperÍê³Éµ÷ÓøÃCombiner½«ÏàͬkeyµÄÖµ×öºÏ²¢´¦Àí£¬ÒÔ¼õÉÙÊä³ö½á¹û¼¯¡£MapTaskµÄÈÎÎñÈ«Íê³É¼´½»¸øReduceTask½ø³Ìµ÷ÓÃReducer´¦Àí£¬Éú³É×îÖÕ½á¹û<key3,value3>¶Ô¡£Õâ¸ö¹ý³ÌÔÚÏÂÒ»²¿·ÖÔÙÏêϸ½éÉÜ¡£

ÏÂͼÃèÊöÁËMap/Reduce¿ò¼ÜÖÐÖ÷Òª×é³ÉºÍËüÃÇÖ®¼äµÄ¹ØÏµ£º

3 Á÷³Ì

Ò»µÀMapRedcue×÷ÒµÊÇͨ¹ýJobClient.rubJob(job)Ïòmaster½ÚµãµÄJobTrackerÌá½»µÄ, JobTracker½Óµ½JobClientµÄÇëÇóºó°ÑÆä¼ÓÈë×÷Òµ¶ÓÁÐÖС£JobTrackerÒ»Ö±ÔڵȴýJobClientͨ¹ýRPCÌá½»×÷Òµ,¶øTaskTrackerһֱͨ¹ýRPCÏòJobTracker·¢ËÍÐÄÌøheartbeatѯÎÊÓÐûÓÐÈÎÎñ¿É×ö£¬Èç¹ûÓУ¬ÈÃÆäÅÉ·¢ÈÎÎñ¸øËüÖ´ÐС£Èç¹ûJobTrackerµÄ×÷Òµ¶ÓÁв»Îª¿Õ, ÔòTaskTracker·¢Ë͵ÄÐÄÌø½«»á»ñµÃJobTracker¸øËüÅÉ·¢µÄÈÎÎñ¡£ÕâÊÇÒ»µÀpull¹ý³Ì¡£slave½ÚµãµÄTaskTracker½Óµ½ÈÎÎñºóÔÚÆä±¾µØ·¢ÆðTask,Ö´ÐÐÈÎÎñ¡£ÒÔÏÂÊǼòÂÔʾÒâͼ£º

ÏÂÃæÏêϸ½éÉÜÒ»ÏÂMap/Reduce´¦ÀíÒ»¸ö¹¤×÷µÄÁ÷³Ì¡£

ËÄ¡¢JobClient

ÔÚ±àдMapReduce³ÌÐòʱͨ³£ÊÇÉÏÊÇÕâÑùдµÄ:

Configuration conf = new Configuration();// ¶ÁÈ¡hadoopÅäÖÃ
Job job = new Job(conf, "×÷ÒµÃû³Æ"); // ʵÀý»¯Ò»µÀ×÷Òµ
job.setMapperClass(MapperÀàÐÍ);
job.setCombinerClass(CombinerÀàÐÍ);
job.setReducerClass(ReducerÀàÐÍ);
job.setOutputKeyClass(Êä³öKeyµÄÀàÐÍ);
job.setOutputValueClass(Êä³öValueµÄÀàÐÍ);
FileInputFormat.addInputPath(job, new Path(ÊäÈëhdfs·¾¶));
FileOutputFormat.setOutputPath(job, newPath(Êä³öhdfs·¾¶));
// ÆäËü³õʼ»¯ÅäÖÃ
JobClient.runJob(job);

 

1ÅäÖÃJob

JobConfÊÇÓû§ÃèÊöÒ»¸öjobµÄ½Ó¿Ú¡£ÏÂÃæµÄÐÅÏ¢ÊÇMapReduce¹ý³ÌÖÐһЩ½Ï¹Ø¼üµÄ¶¨ÖÆÐÅÏ¢£º

2 JobClient.runJob()£ºÔËÐÐJob²¢·Ö½âÊäÈëÊý¾Ý¼¯

Ò»¸öMapReduceµÄJob»áͨ¹ýJobClientÀà¸ù¾ÝÓû§ÔÚJobConfÀàÖж¨ÒåµÄInputFormatʵÏÖÀàÀ´½«ÊäÈëµÄÊý¾Ý¼¯·Ö½â³ÉÒ»ÅúСµÄÊý¾Ý¼¯£¬Ã¿Ò»¸öСÊý¾Ý¼¯»á¶ÔÓ¦´´½¨Ò»¸öMapTaskÀ´´¦Àí¡£JobClient»áʹÓÃȱʡµÄFileInputFormatÀàµ÷ÓÃFileInputFormat.getSplits()·½·¨Éú³ÉСÊý¾Ý¼¯£¬Èç¹ûÅжÏÊý¾ÝÎļþÊÇisSplitable()µÄ»°£¬»á½«´óµÄÎļþ·Ö½â³ÉСµÄFileSplit£¬µ±È»Ö»ÊǼǼÎļþÔÚHDFSÀïµÄ·¾¶¼°Æ«ÒÆÁ¿ºÍSplit´óС¡£ÕâЩÐÅÏ¢»áͳһ´ò°üµ½jobFileµÄjarÖС£

JobClientÈ»ºóʹÓÃsubmitJob(job)·½·¨Ïò masterÌá½»×÷Òµ¡£submitJob(job)ÄÚ²¿ÊÇͨ¹ýsubmitJobInternal(job)·½·¨Íê³ÉʵÖÊÐÔµÄ×÷ÒµÌá½»¡£ submitJobInternal(job)·½·¨Ê×ÏÈ»áÏòhadoop·Ö²¼ÏµÍ³ÎļþϵͳhdfsÒÀ´ÎÉÏ´«Èý¸öÎļþ: job.jar, job.splitºÍjob.xml¡£

job.xml: ×÷ÒµÅäÖã¬ÀýÈçMapper,Combiner, ReducerµÄÀàÐÍ£¬ÊäÈëÊä³ö¸ñʽµÄÀàÐ͵ȡ£

job.jar: jar°ü,ÀïÃæ°üº¬ÁËÖ´ÐдËÈÎÎñÐèÒªµÄ¸÷ÖÖÀ࣬±ÈÈç Mapper,ReducerµÈʵÏÖ¡£

job.split: Îļþ·Ö¿éµÄÏà¹ØÐÅÏ¢£¬±ÈÈçÓÐÊý¾Ý·Ö¶àÉÙ¸ö¿é£¬¿éµÄ´óС(ĬÈÏ64m)µÈ¡£

ÕâÈý¸öÎļþÔÚhdfsÉϵÄ·¾¶ÓÉhadoop-default.xmlÎļþÖеÄmapreduceϵͳ·¾¶mapred.system.dirÊôÐÔ + jobid¾ö¶¨¡£mapred.system.dirÊôÐÔĬÈÏÊÇ/tmp/hadoop-user_name/mapred/system¡£Ð´ÍêÕâÈý¸öÎÄ ¼þÖ®ºó, ´Ë·½·¨»áͨ¹ýRPCµ÷ÓÃmaster½ÚµãÉϵÄJobTracker.submitJob(job)·½·¨£¬´Ëʱ×÷ÒµÒѾ­Ìá½»Íê³É¡£

3Ìá½»Job

jobFileµÄÌá½»¹ý³ÌÊÇͨ¹ýRPCÄ£¿é£¨Óе¥¶ÀÒ»ÕÂÀ´Ïêϸ½éÉÜ£©À´ÊµÏֵġ£´óÖ¹ý³ÌÊÇ£¬JobClientÀàÖÐͨ¹ýRPCʵÏÖµÄProxy½Ó¿Úµ÷ÓÃJobTrackerµÄsubmitJob()·½·¨£¬¶øJobTracker±ØÐëʵÏÖJobSubmissionProtocol½Ó¿Ú¡£

JobTracker´´½¨job³É¹¦ºó»á¸øJobClient´«»ØÒ»¸öJobStatus¶ÔÏóÓÃÓڼǼjobµÄ״̬ÐÅÏ¢£¬ÈçÖ´ÐÐʱ¼ä¡¢MapºÍReduceÈÎÎñÍê³ÉµÄ±ÈÀýµÈ¡£JobClient»á¸ù¾ÝÕâ¸öJobStatus¶ÔÏó´´½¨Ò»¸öNetworkedJobµÄRunningJob¶ÔÏó£¬ÓÃÓÚ¶¨Ê±´ÓJobTracker»ñµÃÖ´Ðйý³ÌµÄͳ¼ÆÊý¾ÝÀ´¼à¿Ø²¢´òÓ¡µ½Óû§µÄ¿ØÖÆÌ¨¡£

Óë´´½¨Job¹ý³ÌÏà¹ØµÄÀàºÍ·½·¨ÈçÏÂͼËùʾ

Îå ¡¢JobTracker

ÉÏÃæÒѾ­Ìáµ½£¬jobÊÇͳһÓÉJobTrackerÀ´µ÷¶ÈµÄ£¬¾ßÌåµÄTask·Ö·¢¸ø¸÷¸öTaskTracker½ÚµãÀ´Ö´ÐС£ÏÂÃæÀ´Ïêϸ½âÎöÖ´Ðйý³Ì£¬Ê×ÏÈÏÈ´ÓJobTrackerÊÕµ½JobClientµÄÌá½»ÇëÇó¿ªÊ¼¡£

1JobTracker³õʼ»¯Job

1.1JobTracker.submitJob() ÊÕµ½ÇëÇó

µ±JobTracker½ÓÊÕµ½ÐµÄjobÇëÇ󣨼´submitJob()º¯Êý±»µ÷Ó㩺󣬻ᴴ½¨Ò»¸öJobInProgress¶ÔÏó²¢Í¨¹ýËüÀ´¹ÜÀíºÍµ÷¶ÈÈÎÎñ¡£JobInProgressÔÚ´´½¨µÄʱºò»á³õʼ»¯Ò»ÏµÁÐÓëÈÎÎñÓйصIJÎÊý£¬µ÷Óõ½FileSystem£¬°ÑÔÚJobClient¶ËÉÏ´«µÄËùÓÐÈÎÎñÎļþÏÂÔØµ½±¾µØµÄÎļþϵͳÖеÄÁÙʱĿ¼Àï¡£ÕâÆäÖаüÀ¨ÉÏ´«µÄ*.jarÎļþ°ü¡¢¼Ç¼ÅäÖÃÐÅÏ¢µÄxml¡¢¼Ç¼·Ö¸îÐÅÏ¢µÄÎļþ¡£

1.2JobTracker.JobInitThread ֪ͨ³õʼ»¯Ïß³Ì

JobTracker ÖеļàÌýÆ÷ÀàEagerTaskInitializationListener¸ºÔðÈÎÎñTaskµÄ³õʼ»¯¡£JobTrackerʹÓÃjobAdded(job)¼ÓÈëjobµ½EagerTaskInitializationListenerÖÐÒ»¸öרÃŹÜÀíÐèÒª³õʼ»¯µÄ¶ÓÁÐÀ¼´Ò»¸ölist³ÉÔ±±äÁ¿jobInitQueueÀï¡£resortInitQueue·½·¨¸ù¾Ý×÷ÒµµÄÓÅÏȼ¶ÅÅÐò¡£È»ºóµ÷ÓÃnotifyAll()º¯Êý£¬»á»½ÆðÒ»¸öÓÃÓÚ³õʼ»¯jobµÄÏß³ÌJobInitThreadÀ´´¦Àí¡£JobInitThreadÊÕµ½Ðźźó¼´È¡³ö×ǰµÄjob£¬¼´ÓÅÏȼ¶±ð×î¸ßµÄjob£¬µ÷ÓÃTaskTrackerManagerµÄinitJob×îÖÕµ÷ÓÃJobInProgress.initTasks()Ö´ÐÐÕæÕýµÄ³õʼ»¯¹¤×÷¡£

1.3JobInProgress.initTasks() ³õʼ»¯TaskInProgress

ÈÎÎñTask·ÖÁ½ÖÖ: MapTask ºÍreduceTask£¬ËüÃǵĹÜÀí¶ÔÏó¶¼ÊÇTaskInProgress ¡£

Ê×ÏÈJobInProgress»á´´½¨MapµÄ¼à¿Ø¶ÔÏó¡£ÔÚinitTasks()º¯ÊýÀïͨ¹ýµ÷ÓÃJobClientµÄreadSplitFile()»ñµÃÒÑ·Ö½âµÄÊäÈëÊý¾ÝµÄRawSplitÁÐ±í£¬È»ºó¸ù¾ÝÕâ¸öÁÐ±í´´½¨¶ÔÓ¦ÊýÄ¿µÄMapÖ´ÐйÜÀí¶ÔÏóTaskInProgress¡£ÔÚÕâ¸ö¹ý³ÌÖУ¬»¹»á¼Ç¼¸ÃRawSplit¿é¶ÔÓ¦µÄËùÓÐÔÚHDFSÀïµÄblocksËùÔÚµÄDataNode½ÚµãµÄhost£¬Õâ¸ö»áÔÚRawSplit´´½¨Ê±Í¨¹ýFileSplitµÄgetLocations()º¯Êý»ñÈ¡£¬¸Ãº¯Êý»áµ÷ÓÃDistributedFileSystemµÄgetFileCacheHints()»ñµÃ£¨Õâ¸öϸ½Ú»áÔÚHDFSÖн²½â£©¡£µ±È»Èç¹ûÊÇ´æ´¢ÔÚ±¾µØÎļþϵͳÖУ¬¼´Ê¹ÓÃLocalFileSystemʱµ±È»Ö»ÓÐÒ»¸ölocation¼´¡°localhost¡±ÁË¡£

´´½¨ÕâЩTaskInProgress¶ÔÏóÍê±Ïºó£¬initTasks()·½·¨»áͨ¹ýcreateCache()·½·¨ÎªÕâЩTaskInProgress¶ÔÏó²úÉúÒ»¸öδִÐÐÈÎÎñµÄMap»º´ænonRunningMapCache¡£slave¶ËµÄTaskTrackerÏòmaster·¢ËÍÐÄÌøÊ±£¬¾Í¿ÉÒÔÖ±½Ó´ÓÕâ¸öcacheÖÐÈ¡ÈÎÎñÈ¥Ö´ÐС£

Æä´ÎJobInProgress»á´´½¨ReduceµÄ¼à¿Ø¶ÔÏó£¬Õâ¸ö±È½Ï¼òµ¥£¬¸ù¾ÝJobConfÀïÖ¸¶¨µÄReduceÊýÄ¿´´½¨£¬È±Ê¡Ö»´´½¨1¸öReduceÈÎÎñ¡£¼à¿ØºÍµ÷¶ÈReduceÈÎÎñµÄÊÇTaskInProgressÀ࣬²»¹ý¹¹Ôì·½·¨ÓÐËù²»Í¬£¬TaskInProgress»á¸ù¾Ý²»Í¬²ÎÊý·Ö±ð´´½¨¾ßÌåµÄMapTask»òÕßReduceTask¡£Í¬ÑùµØ£¬initTasks()Ò²»áͨ¹ýcreateCache()·½·¨²úÉúnonRunningReduceCache³ÉÔ±¡£

JobInProgress´´½¨ÍêTaskInProgressºó£¬×îºó¹¹ÔìJobStatus²¢¼Ç¼jobÕýÔÚÖ´ÐÐÖУ¬È»ºóÔÙµ÷ÓÃJobHistory.JobInfo.logStarted()¼Ç¼jobµÄÖ´ÐÐÈÕÖ¾¡£µ½ÕâÀïJobTrackerÀï³õʼ»¯jobµÄ¹ý³ÌÈ«²¿½áÊø¡£

2 JobTrackerµ÷¶ÈJob

hadoopĬÈϵĵ÷¶ÈÆ÷ÊÇFIFO²ßÂÔµÄJobQueueTaskScheduler,ËüÓÐÁ½¸ö³ÉÔ±±äÁ¿jobQueueJobInProgressListenerÓëÉÏÃæËµµÄeagerTaskInitializationListener¡£JobQueueJobInProgressListenerÊÇJobTrackerµÄÁíÒ»¸ö¼àÌýÆ÷À࣬Ëü°üº¬ÁËÒ»¸öÓ³É䣬ÓÃÀ´¹ÜÀíºÍµ÷¶ÈËùÓеÄJobInProgress¡£jobAdded(job)ͬʱ»á¼ÓÈëjobµ½JobQueueJobInProgressListenerÖеÄÓ³Éä¡£

JobQueueTaskScheduler×îÖØÒªµÄ·½·¨ÊÇassignTasks£¬ËûʵÏÖÁ˹¤×÷µ÷¶È¡£¾ßÌåʵÏÖ£ºJobTracker ½Óµ½TaskTrackerµÄheartbeat() µ÷Óúó£¬Ê×ÏÈ»á¼ì²éÉÏÒ»¸öÐÄÌøÏìÓ¦ÊÇ·ñÍê³É£¬ÊÇûҪÇóÆô¶¯»òÖØÆôÈÎÎñ£¬Èç¹ûÒ»ÇÐÕý³££¬Ôò»á´¦ÀíÐÄÌø¡£Ê×ÏÈËü»á¼ì²é TaskTracker ¶Ë»¹¿ÉÒÔ×ö¶àÉÙ¸ö map ºÍ reduce ÈÎÎñ£¬½«ÒªÅÉ·¢µÄÈÎÎñÊýÊÇ·ñ³¬³öÕâ¸öÊý£¬ÊÇ·ñ³¬³ö¼¯ÈºµÄÈÎÎñƽ¾ùÊ£Óà¿É¸ºÔØÊý¡£Èç¹û¶¼Ã»³¬³ö£¬ÔòΪ´ËTaskTracker ·ÖÅäÒ»¸ö MapTask »ò ReduceTask ¡£²úÉú Map ÈÎÎñʹÓà JobInProgress µÄobtainNewMapTask() ·½·¨£¬ÊµÖÊÉÏ×îºóµ÷ÓÃÁË JobInProgress µÄ findNewMapTask() ·ÃÎÊnonRunningMapCache ¡£

ÉÏÃæ½²½âÈÎÎñ³õʼ»¯Ê±Ëµ¹ý£¬createCache()·½·¨»áÔÚÍøÂçÍØÆË½á¹¹ÉϹÒÉÏÐèÒªÖ´ÐеÄTaskInProgress¡£findNewMapTask()´Ó½üµ½Ô¶Ò»²ãÒ»²ãµØÑ°ÕÒ£¬Ê×ÏÈÊÇͬһ½Úµã£¬È»ºóÔÚѰÕÒͬһ»ú¹ñÉϵĽڵ㣬½Ó×ÅѰÕÒÏàͬÊý¾ÝÖÐÐÄϵĽڵ㣬ֱµ½ÕÒÁËmaxLevel²ã½áÊø¡£ÕâÑùµÄ»°£¬ÔÚJobTracker¸øTaskTrackerÅÉ·¢ÈÎÎñµÄʱºò£¬¿ÉÒÔѸËÙÕÒµ½×î½üµÄTaskTracker£¬ÈÃËüÖ´ÐÐÈÎÎñ¡£

×îÖÕÉú³ÉÒ»¸öTaskÀà¶ÔÏ󣬸öÔÏó±»·â×°ÔÚÒ»¸öLanuchTaskActionÖУ¬·¢»Ø¸øTaskTracker£¬ÈÃËüÈ¥Ö´ÐÐÈÎÎñ¡£

²úÉú Reduce ÈÎÎñ¹ý³ÌÀàËÆ£¬Ê¹ÓÃJobInProgress.obtainNewReduceTask() ·½·¨£¬ÊµÖÊÉÏ×îºóµ÷ÓÃÁËJobInProgressµÄ findNewReduceTask() ·ÃÎÊ nonRuningReduceCache¡£

Áù¡¢ TaskTracker

1TaskTracker¼ÓÔØTaskµ½×Ó½ø³Ì

TaskµÄÖ´ÐÐʵ¼ÊÊÇÓÉTaskTracker·¢ÆðµÄ£¬TaskTracker»á¶¨ÆÚ£¨È±Ê¡Îª10ÃëÖÓ£¬²Î¼ûMRConstantsÀàÖж¨ÒåµÄHEARTBEAT_INTERVAL±äÁ¿£©ÓëJobTracker½øÐÐÒ»´ÎͨÐÅ£¬±¨¸æ×Ô¼ºTaskµÄÖ´ÐÐ״̬£¬½ÓÊÕJobTrackerµÄÖ¸ÁîµÈ¡£Èç¹û·¢ÏÖÓÐ×Ô¼ºÐèÒªÖ´ÐеÄÐÂÈÎÎñÒ²»áÔÚÕâʱÆô¶¯£¬¼´ÊÇÔÚTaskTrackerµ÷ÓÃJobTrackerµÄheartbeat()·½·¨Ê±½øÐУ¬´Ëµ÷ÓõײãÊÇͨ¹ýIPC²ãµ÷ÓÃProxy½Ó¿ÚʵÏÖ¡£ÏÂÃæÒ»Ò»¼òµ¥½éÉÜÏÂÿ¸ö²½Öè¡£

1.1TaskTracker.run() Á¬½ÓJobTracker

TaskTrackerµÄÆô¶¯¹ý³Ì»á³õʼ»¯Ò»ÏµÁвÎÊýºÍ·þÎñ£¬È»ºó³¢ÊÔÁ¬½ÓJobTracker£¨¼´±ØÐëʵÏÖInterTrackerProtocol½Ó¿Ú£©£¬Èç¹ûÁ¬½Ó¶Ï¿ª£¬Ôò»áÑ­»·³¢ÊÔÁ¬½ÓJobTracker£¬²¢ÖØÐ³õʼ»¯ËùÓгÉÔ±ºÍ²ÎÊý¡£

1.2TaskTracker.offerService() Ö÷Ñ­»·

Èç¹ûÁ¬½ÓJobTracker·þÎñ³É¹¦£¬TaskTracker¾Í»áµ÷ÓÃofferService()º¯Êý½øÈëÖ÷Ö´ÐÐÑ­»·ÖС£Õâ¸öÑ­»·»áÿ¸ô10ÃëÓëJobTrackerͨѶһ´Î£¬µ÷ÓÃtransmitHeartBeat()£¬»ñµÃHeartbeatResponseÐÅÏ¢¡£È»ºóµ÷ÓÃHeartbeatResponseµÄgetActions()º¯Êý»ñµÃJobTracker´«¹ýÀ´µÄËùÓÐÖ¸Áî¼´Ò»¸öTaskTrackerActionÊý×é¡£ÔÙ±éÀúÕâ¸öÊý×飬Èç¹ûÊÇÒ»¸öÐÂÈÎÎñÖ¸Áî¼´LaunchTaskActionÔòµ÷Óõ÷ÓÃaddToTaskQueue¼ÓÈëµ½´ýÖ´ÐжÓÁУ¬·ñÔò¼ÓÈëµ½tasksToCleanup¶ÓÁУ¬½»¸øÒ»¸ötaskCleanupThreadÏß³ÌÀ´´¦Àí£¬ÈçÖ´ÐÐKillJobAction»òÕßKillTaskActionµÈ¡£

1.3TaskTracker.transmitHeartBeat() »ñÈ¡JobTrackerÖ¸Áî

ÔÚtransmitHeartBeat()º¯Êý´¦ÀíÖУ¬TaskTracker»á´´½¨Ò»¸öеÄTaskTrackerStatus¶ÔÏó¼Ç¼ĿǰÈÎÎñµÄÖ´ÐÐ×´¿ö£¬¼ì²éĿǰִÐеÄTaskÊýÄ¿ÒÔ¼°±¾µØ´ÅÅ̵ĿռäʹÓÃÇé¿öµÈ£¬Èç¹û¿ÉÒÔ½ÓÊÕеÄTaskÔòÉèÖÃheartbeat()µÄaskForNewTask²ÎÊýΪtrue¡£È»ºóͨ¹ýIPC½Ó¿Úµ÷ÓÃJobTrackerµÄheartbeat()·½·¨·¢Ë͹ýÈ¥£¬heartbeat()·µ»ØÖµTaskTrackerActionÊý×é¡£

1.4 TaskTracker.addToTaskQueue£¬½»¸øTaskLauncher´¦Àí

TaskLauncherÊÇÓÃÀ´´¦ÀíÐÂÈÎÎñµÄÏß³ÌÀ࣬°üº¬ÁËÒ»¸ö´ýÔËÐÐÈÎÎñµÄ¶ÓÁÐ tasksToLaunch¡£TaskTracker.addToTaskQueue»áµ÷ÓÃTaskTrackerµÄregisterTask£¬´´½¨TaskInProgress¶ÔÏóÀ´µ÷¶ÈºÍ¼à¿ØÈÎÎñ£¬²¢°ÑËü¼ÓÈëµ½runningTasks¶ÓÁÐÖС£Í¬Ê±½«Õâ¸öTaskInProgress¼Óµ½tasksToLaunchÖУ¬²¢notifyAll()»½ÐÑÒ»¸öÏß³ÌÔËÐУ¬¸ÃÏ̴߳ӶÓÁÐtasksToLaunchÈ¡³öÒ»¸ö´ýÔËÐÐÈÎÎñ£¬µ÷ÓÃTaskTrackerµÄstartNewTaskÔËÐÐÈÎÎñ¡£

1.5 TaskTracker.startNewTask() Æô¶¯ÐÂÈÎÎñ

µ÷ÓÃlocalizeJob()ÕæÕý³õʼ»¯Task²¢¿ªÊ¼Ö´ÐС£

1.6 TaskTracker.localizeJob() ³õʼ»¯jobĿ¼µÈ

´Ëº¯ÊýÖ÷ÒªÈÎÎñÊdzõʼ»¯¹¤×÷Ŀ¼workDir£¬ÔÙ½«job jar°ü´ÓHDFS¸´ÖƵ½±¾µØÎļþϵͳÖУ¬µ÷ÓÃRunJar.unJar()½«°ü½âѹµ½¹¤×÷Ŀ¼¡£È»ºó´´½¨Ò»¸öRunningJob²¢µ÷ÓÃaddTaskToJob()º¯Êý½«ËüÌí¼Óµ½runningJobs¼à¿Ø¶ÓÁÐÖС£addTaskToJob·½·¨°ÑÒ»¸öÈÎÎñ¼ÓÈëµ½¸ÃÈÎÎñÊôÓÚµÄrunningJobµÄtasksÁбíÖС£Èç¹û¸ÃÈÎÎñÊôÓÚµÄrunningJob²»´æÔÚ£¬ÏÈн¨£¬¼Óµ½runningJobsÖС£Íê³Éºó¼´µ÷ÓÃlaunchTaskForJob()¿ªÊ¼Ö´ÐÐTask¡£

1.7 TaskTracker.launchTaskForJob()Ö´ÐÐÈÎÎñ

Æô¶¯TaskµÄ¹¤×÷ʵ¼ÊÊǵ÷ÓÃTaskTracker$TaskInProgressµÄlaunchTask()º¯ÊýÀ´Ö´Ðеġ£

1.8 TaskTracker$TaskInProgress.launchTask()Ö´ÐÐÈÎÎñ

Ö´ÐÐÈÎÎñǰÏȵ÷ÓÃlocalizeTask()¸üÐÂÒ»ÏÂjobConfÎļþ²¢Ð´Èëµ½±¾µØÄ¿Â¼ÖС£È»ºóͨ¹ýµ÷ÓÃTaskµÄcreateRunner()·½·¨´´½¨TaskRunner¶ÔÏó²¢µ÷ÓÃÆästart()·½·¨×îºóÆô¶¯Task¶ÀÁ¢µÄjavaÖ´ÐÐ×Ó½ø³Ì¡£

1.9 Task.createRunner()´´½¨Æô¶¯Runner¶ÔÏó

TaskÓÐÁ½¸öʵÏÖ°æ±¾£¬¼´MapTaskºÍReduceTask£¬ËüÃÇ·Ö±ðÓÃÓÚ´´½¨MapºÍReduceÈÎÎñ¡£MapTask»á´´½¨MapTaskRunnerÀ´Æô¶¯Task×Ó½ø³Ì£¬¶øReduceTaskÔò´´½¨ReduceTaskRunnerÀ´Æô¶¯¡£

1.10 TaskRunner.start()Æô¶¯×Ó½ø³Ì

TaskRunner¸ºÔð½«Ò»¸öÈÎÎñ·Åµ½Ò»¸ö½ø³ÌÀïÃæÀ´Ö´ÐС£Ëü»áµ÷ÓÃrun()º¯ÊýÀ´´¦Àí£¬Ö÷ÒªµÄ¹¤×÷¾ÍÊdzõʼ»¯Æô¶¯java×Ó½ø³ÌµÄһϵÁл·¾³±äÁ¿£¬°üÀ¨É趨¹¤×÷Ŀ¼workDir£¬ÉèÖÃCLASSPATH»·¾³±äÁ¿µÈ¡£È»ºó×°ÔØjob jar°ü¡£JvmManagerÓÃÓÚ¹ÜÀí¸ÃTaskTrackerÉÏËùÓÐÔËÐеÄTask×Ó½ø³Ì¡£Ã¿Ò»¸ö½ø³Ì¶¼ÊÇÓÉJvmRunnerÀ´¹ÜÀíµÄ£¬ËüÒ²ÊÇλÓÚµ¥¶ÀÏß³ÌÖеġ£JvmManagerµÄlaunchJvm·½·¨£¬¸ù¾ÝÈÎÎñÊÇmap»¹ÊÇreduce,Éú³É¶ÔÓ¦µÄJvmRunner²¢·Åµ½¶ÔÓ¦JvmManagerForTypeµÄ½ø³ÌÈÝÆ÷ÖнøÐйÜÀí¡£JvmManagerForTypeµÄreapJvm()

·ÖÅäÒ»¸öеÄJVM½ø³Ì¡£Èç¹ûJvmManagerForType²ÛÂú£¬¾ÍѰÕÒidleµÄ½ø³Ì£¬Èç¹ûÊÇͬJobµÄÖ±½Ó·Å½øÈ¥£¬·ñÔòɱËÀÕâ¸ö½ø³Ì£¬ÓÃÒ»¸öеĽø³Ì´úÌæ¡£ Èç¹û²ÛûÓÐÂú£¬ÄÇô¾ÍÆô¶¯ÐµÄ×Ó½ø³Ì¡£Éú³ÉеĽø³ÌʹÓÃspawnNewJvm·½·¨¡£spawnNewJvmʹÓÃJvmRunnerÏ̵߳Ärun·½·¨£¬run·½·¨ÓÃÓÚÉú³ÉÒ»¸öеĽø³Ì²¢ÔËÐÐËü£¬¾ßÌåʵÏÖÊǵ÷ÓÃrunChild¡£

2 ×Ó½ø³ÌÖ´ÐÐMapTask

ÕæÊµµÄÖ´ÐÐÔØÌ壬ÊÇChild£¬Ëü°üº¬Ò»¸ö mainº¯Êý£¬½ø³ÌÖ´ÐУ¬»á½«Ïà¹Ø²ÎÊý´«½øÀ´£¬Ëü»á²ð½âÕâЩ²ÎÊý£¬Í¨¹ýgetTask(jvmId)Ïò¸¸½ø³ÌË÷È¡ÈÎÎñ£¬²¢ÇÒ¹¹Ôì³öÏà¹ØµÄTaskʵÀý£¬È»ºóʹÓÃTaskµÄrun()Æô¶¯ÈÎÎñ¡£

2.1 run

·½·¨Ï൱¼òµ¥£¬ÅäÖÃÍêϵͳµÄTaskReporterºó£¬¾Í¸ù¾ÝÇé¿öÖ´ÐÐrunJobCleanupTask£¬runJobSetupTask£¬runTaskCleanupTask»òÖ´ÐÐMapper¡£ÓÉÓÚMapReduceÏÖÔÚÓÐÁ½Ì×API£¬MapTaskÐèÒªÖ§³ÖÕâÁ½Ì×API£¬Ê¹µÃMapTaskÖ´ÐÐMapper·ÖΪrunNewMapperºÍrunOldMapper£¬ÎÒÃÇ·ÖÎörunOldMapper¡£

2.2 runOldMapper

runOldMapper×ʼ²¿·ÖÊǹ¹ÔìMapper´¦ÀíµÄInputSplit£¬È»ºó¾Í¿ªÊ¼´´½¨MapperµÄRecordReader£¬×îÖյõ½mapµÄÊäÈë¡£Ö®ºó¹¹ÔìMapperµÄÊä³ö£¬ÊÇͨ¹ýMapOutputCollector½øÐеģ¬Ò²·ÖÁ½ÖÖÇé¿ö£¬Èç¹ûûÓÐReducer£¬ÄÇô£¬ÓÃDirectMapOutputCollector£¬·ñÔò£¬ÓÃMapOutputBuffer¡£

¹¹ÔìÍêMapperµÄÊäÈëÊä³ö£¬Í¨¹ý¹¹ÔìÅäÖÃÎļþÖÐÅäÖõÄMapRunnable£¬¾Í¿ÉÒÔÖ´ÐÐMapperÁË¡£Ä¿Ç°ÏµÍ³ÓÐÁ½¸öMapRunnable£ºMapRunnerºÍMultithreadedMapRunner¡£MapRunnerÊǵ¥Ïß³ÌÖ´ÐÐÆ÷£¬±È½Ï¼òµ¥£¬Ëû»áʹÓ÷´Éä»úÖÆÉú³ÉÓû§¶¨ÒåµÄMapper½Ó¿ÚʵÏÖÀ࣬×÷ΪËûµÄÒ»¸ö³ÉÔ±¡£

2.3 MapRunnerµÄrun·½·¨

»áÏÈ´´½¨¶ÔÓ¦µÄkey£¬value¶ÔÏó£¬È»ºó£¬¶ÔInputSplitµÄÿһ¶Ô<key£¬value>£¬µ÷ÓÃÓû§ÊµÏÖµÄMapper½Ó¿ÚʵÏÖÀàµÄmap·½·¨£¬Ã¿´¦ÀíÒ»¸öÊý¾Ý¶Ô£¬¾ÍҪʹÓÃOutputCollectorÊÕ¼¯Ã¿´Î´¦Àíkv¶ÔºóµÃµ½µÄеÄkv¶Ô£¬°ÑËûÃÇspillµ½Îļþ»òÕ߷ŵ½Äڴ棬ÒÔ×ö½øÒ»²½µÄ´¦Àí£¬±ÈÈçÅÅÐò£¬combineµÈ¡£

2.4 OutputCollector

OutputCollectorµÄ×÷ÓÃÊÇÊÕ¼¯Ã¿´Îµ÷ÓÃmapºóµÃµ½µÄеÄkv¶Ô£¬Äþ°ÑËûÃÇspillµ½Îļþ»òÕ߷ŵ½Äڴ棬ÒÔ×ö½øÒ»²½µÄ´¦Àí£¬±ÈÈçÅÅÐò£¬combineµÈ¡£

MapOutputCollector ÓÐÁ½¸ö×ÓÀࣺMapOutputBufferºÍDirectMapOutputCollector¡£ DirectMapOutputCollectorÓÃÔÚ²»ÐèÒªReduce½×¶ÎµÄʱºò¡£Èç¹ûMapperºóÐøÓÐreduceÈÎÎñ£¬ÏµÍ³»áʹÓÃMapOutputBuffer×öΪÊä³ö£¬MapOutputBufferʹÓÃÁËÒ»¸ö»º³åÇø¶ÔmapµÄ´¦Àí½á¹û½øÐлº´æ£¬·ÅÔÚÄÚ´æÖУ¬ÓÖʹÓü¸¸öÊý×é¶ÔÕâ¸ö»º³åÇø½øÐйÜÀí¡£

ÔÚÊʵ±µÄʱ»ú£¬»º³åÇøÖеÄÊý¾Ý»á±»spillµ½Ó²ÅÌÖС£

ÏòÓ²ÅÌÖÐдÊý¾ÝµÄʱ»ú:

£¨1£©µ±Äڴ滺³åÇø²»ÄÜÈÝÏÂÒ»¸öÌ«´óµÄkv¶Ôʱ¡£spillSingleRecord·½·¨¡£

£¨2£©Äڴ滺³åÇøÒÑÂúʱ¡£SpillThreadÏ̡߳£

£¨3£©MapperµÄ½á¹û¶¼ÒѾ­collectÁË£¬ÐèÒª¶Ô»º³åÇø×ö×îºóµÄÇåÀí¡£Flush·½·¨¡£

2.5 spillThreadỊ̈߳º½«»º³åÇøÖеÄÊý¾Ýspillµ½Ó²ÅÌÖС£

£¨1£©ÐèÒªspillʱµ÷Óú¯ÊýsortAndSpill£¬°´ÕÕpartitionºÍkey×öÅÅÐò¡£Ä¬ÈÏʹÓõÄÊÇ¿ìËÙÅÅÐòQuickSort¡£

£¨2£©Èç¹ûûÓÐcombiner£¬ÔòÖ±½ÓÊä³ö¼Ç¼£¬·ñÔò£¬µ÷ÓÃCombinerRunnerµÄcombine£¬ÏÈ×öcombinÈ»ºóÊä³ö¡£

3 ×Ó½ø³ÌÖ´ÐÐReduceTask

ReduceTask.run·½·¨¿ªÊ¼ºÍMapTaskÀàËÆ£¬°üÀ¨initialize()³õʼ»¯£¬runJobCleanupTask()£¬runJobSetupTask()£¬runTaskCleanupTask()¡£Ö®ºó½øÈëÕýʽµÄ¹¤×÷£¬Ö÷ÒªÓÐÕâôÈý¸ö²½Ö裺Copy¡¢Sort¡¢Reduce¡£

3.1 Copy

¾ÍÊÇ´ÓÖ´Ðи÷¸öMapÈÎÎñµÄ·þÎñÆ÷ÄÇÀÊÕÂÞµ½mapµÄÊä³öÎļþ¡£¿½±´µÄÈÎÎñ£¬ÊÇÓÉReduceTask.ReduceCopierÀàÀ´¸ºÔð¡£

3.1.1 Ààͼ:

3.1.2 Á÷³Ì: ʹÓÃReduceCopier.fetchOutputs¿ªÊ¼

£¨1£©Ë÷È¡ÈÎÎñ¡£Ê¹ÓÃGetMapEventsThreadÏ̡߳£¸ÃÏ̵߳Ärun·½·¨²»Í£µÄµ÷ÓÃgetMapCompletionEvents·½·¨£¬¸Ã·½·¨ÓÖʹÓÃRPCµ÷ÓÃTaskUmbilicalProtocolЭÒéµÄgetMapCompletionEvents£¬·½·¨Ê¹ÓÃËùÊôµÄjobIDÏòÆä¸¸TaskTrackerѯÎÊ´Ë×÷Òµ¸öMapÈÎÎñµÄÍê³É×´¿ö£¨TaskTrackerÒªÏòJobTrackerѯÎʺóÔÙת¸æ¸øËü¡­£©¡£·µ»ØÒ»¸öÊý×éTaskCompletionEventevents[]¡£TaskCompletionEvent°üº¬taskidºÍipµØÖ·Ö®ÀàµÄÐÅÏ¢¡£ £¨2£©µ±»ñÈ¡µ½Ïà¹ØMapÈÎÎñÖ´ÐзþÎñÆ÷µÄÐÅÏ¢ºó£¬ÓÐÒ»¸öÏß³ÌMapOutputCopier¿ªÆô£¬×ö¾ßÌåµÄ¿½±´¹¤×÷¡£ Ëü»áÔÚÒ»¸öµ¥¶ÀµÄÏß³ÌÄÚ£¬¸ºÔðij¸öMapÈÎÎñ·þÎñÆ÷ÉÏÎļþµÄ¿½±´¹¤×÷¡£MapOutputCopierµÄrunÑ­»·µ÷ÓÃcopyOutput£¬copyOutputÓÖµ÷ÓÃgetMapOutput£¬Ê¹ÓÃHTTPÔ¶³Ì¿½±´¡£

£¨3£©getMapOutputÔ¶³Ì¿½±´¹ýÀ´µÄÄÚÈÝ£¨µ±È»Ò²¿ÉÒÔÊDZ¾µØÁË¡­£©£¬×÷ΪMapOutput¶ÔÏó´æÔÚ£¬Ëü¿ÉÒÔÔÚÄÚ´æÖÐÒ²¿ÉÒÔÐòÁл¯ÔÚ´ÅÅÌÉÏ£¬Õâ¸ö¸ù¾ÝÄÚ´æÊ¹ÓÃ×´¿öÀ´×Ô¶¯µ÷½Ú¡£

£¨4£© ͬʱ£¬»¹ÓÐÒ»¸öÄÚ´æMergerÏß³ÌInMemFSMergeThreadºÍÒ»¸öÎļþMergerÏß³ÌLocalFSMergerÔÚͬ²½¹¤×÷£¬ËüÃǽ«ÏÂÔØ¹ýÀ´µÄÎļþ£¨¿ÉÄÜÔÚÄÚ´æÖУ¬¼òµ¥µÄͳ³ÆÎªÎļþ¡­£©£¬×ö׏鲢ÅÅÐò£¬ÒÔ´Ë£¬½ÚԼʱ¼ä£¬½µµÍÊäÈëÎļþµÄÊýÁ¿£¬ÎªºóÐøµÄÅÅÐò¹¤×÷¼õ ¸º¡£InMemFSMergeThreadµÄrunÑ­»·µ÷ÓÃdoInMemMerge£¬¸Ã·½·¨Ê¹Óù¤¾ßÀàMergerʵÏֹ鲢£¬Èç¹ûÐèÒªcombine£¬ÔòcombinerRunner.combine¡£

3.2 Sort

ÅÅÐò¹¤×÷£¬¾ÍÏ൱ÓÚÉÏÊöÅÅÐò¹¤×÷µÄÒ»¸öÑÓÐø¡£Ëü»áÔÚËùÓеÄÎļþ¶¼¿½±´Íê±Ïºó½øÐС£Ê¹Óù¤¾ßÀàMerger¹é²¢ËùÓеÄÎļþ¡£¾­¹ýÕâÒ»¸öÁ÷³Ì£¬Ò»¸öºÏ²¢ÁËËùÓÐËùÐèMapÈÎÎñÊä³öÎļþµÄÐÂÎļþ²úÉúÁË¡£¶øÄÇЩ´ÓÆäËû¸÷¸ö·þÎñÆ÷ÍøÂÞ¹ýÀ´µÄ

MapÈÎÎñÊä³öÎļþ£¬È«²¿É¾³ýÁË¡£

3.3Reduce

ReduceÈÎÎñµÄ×îºóÒ»¸ö½×¶Î¡£Ëû»á×¼±¸ºÃkeyClass£¨¡±mapred.output.key.class¡±»ò¡±mapred.mapoutput.key.class¡±£©,valueClass(¡°mapred.mapoutput.value.class¡±»ò¡±mapred.output.value.class¡±)ºÍComparator£¨¡°mapred.output.value.groupfn.class¡±»ò¡°mapred.output.key.comparator.class¡±£©¡£×îºóµ÷ÓÃrunOldReducer·½·¨¡££¨Ò²ÊÇÁ½Ì×API£¬ÎÒÃÇ·ÖÎörunOldReducer£©

3.3.1 runOldReducer

£¨1£©Êä³ö·½Ãæ¡£

Ëü»á×¼±¸Ò»¸öOutputCollectorÊÕ¼¯Êä³ö£¬ÓëMapTask²»Í¬£¬Õâ¸öOutputCollector¸üΪ¼òµ¥£¬½ö½öÊÇ´ò¿ªÒ»¸öRecordWriter£¬collectÒ»´Î£¬writeÒ»´Î¡£×î´óµÄ²»Í¬ÔÚÓÚ£¬Õâ´Î´«ÈëRecordWriterµÄÎļþϵͳ£¬»ù±¾¶¼ÊÇ·Ö²¼Ê½Îļþϵͳ£¬»òÕß˵ÊÇHDFS¡£

£¨2£©ÊäÈë·½Ãæ£¬ReduceTask»áÓÃ×¼±¸ºÃµÄKeyClass¡¢ValueClass¡¢KeyComparatorµÈµÈÖ®ÀàµÄ×Ô¶¨ÒåÀ࣬¹¹Ôì³öReducerËùÐèµÄ¼üÀàÐÍ£¬ºÍÖµµÄµü´úÀàÐÍIterator£¨Ò»¸ö¼üµ½ÁËÕâÀïÒ»°ãÊǶÔÓ¦Ò»×éÖµ£©¡£

£¨3£©ÓÐÁËÊäÈ룬ÓÐÁËÊä³ö£¬²»¶ÏÑ­»·µ÷ÓÃ×Ô¶¨ÒåµÄReducer£¬×îÖÕ£¬Reduce½×¶ÎÍê³É¡£

   
2797 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

APPÍÆ¹ãÖ®ÇÉÓù¤¾ß½øÐÐÊý¾Ý·ÖÎö
Hadoop Hive»ù´¡sqlÓï·¨
Ó¦Óö༶»º´æÄ£Ê½Ö§³Åº£Á¿¶Á·þÎñ
HBase ³¬Ïêϸ½éÉÜ
HBase¼¼ÊõÏêϸ½éÉÜ
Spark¶¯Ì¬×ÊÔ´·ÖÅä

HadoopÓëSpark´óÊý¾Ý¼Ü¹¹
HadoopÔ­ÀíÓë¸ß¼¶Êµ¼ù
HadoopÔ­Àí¡¢Ó¦ÓÃÓëÓÅ»¯
´óÊý¾ÝÌåϵ¿ò¼ÜÓëÓ¦ÓÃ
´óÊý¾ÝµÄ¼¼ÊõÓëʵ¼ù
Spark´óÊý¾Ý´¦Àí¼¼Êõ

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí