MapReduce¸ÅÂÛ
´ó¼Ò¶¼ÊìϤÎļþϵͳ£¬ÔÚ¶ÔHDFS½øÐзÖÎöǰ£¬ÎÒÃDz¢Ã»Óл¨ºÜ¶àµÄʱ¼äÈ¥½éÉÜHDFSµÄ±³¾°£¬±Ï¾¹´ó¼Ò¶ÔÎļþϵͳµÄ»¹ÊÇÓÐÒ»¶¨µÄÀí½âµÄ£¬¶øÇÒÒ²ÓкܺõÄÎĵµ¡£ÔÚ·ÖÎöHadoopµÄMapReduce²¿·Öǰ£¬ÎÒÃÇ»¹ÊÇÏÈÁ˽âϵͳÊÇÈçºÎ¹¤×÷µÄ£¬È»ºóÔÙ½øÈëÎÒÃǵķÖÎö²¿·Ö¡£ÏÂÃæµÄͼÀ´ÊÇÎÒ¿´µ½µÄ½²MapReduce×îºÃµÄͼ¡£

ÒÔHadoop´øµÄwordcountΪÀý×Ó£¨ÏÂÃæÊÇÆô¶¯ÐУ©£º
hadoop jar hadoop-0.19.0-examples.jar wordcount /usr/input /usr/output |
Óû§Ìá½»Ò»¸öÈÎÎñÒԺ󣬸ÃÈÎÎñÓÉJobTrackerе÷£¬ÏÈÖ´ÐÐMap½×¶Î£¨Í¼ÖÐM1£¬M2ºÍM3£©£¬È»ºóÖ´ÐÐReduce½×¶Î£¨Í¼ÖÐR1ºÍR2£©¡£Map½×¶ÎºÍReduce½×¶Î¶¯×÷¶¼ÊÜTaskTracker¼à¿Ø£¬²¢ÔËÐÐÔÚ¶ÀÁ¢ÓÚTaskTrackerµÄJavaÐéÄâ»úÖС£
ÎÒÃǵÄÊäÈëºÍÊä³ö¶¼ÊÇHDFSÉϵÄĿ¼£¨ÈçÉÏͼËùʾ£©¡£ÊäÈëÓÉInputFormat½Ó¿ÚÃèÊö£¬ËüµÄʵÏÖÈçASCIIÎļþ£¬JDBCÊý¾Ý¿âµÈ£¬·Ö±ð´¦Àí¶ÔÓÚµÄÊý¾ÝÔ´£¬²¢ÌṩÁËÊý¾ÝµÄÒ»Ð©ÌØÕ÷¡£Í¨¹ýInputFormatʵÏÖ£¬¿ÉÒÔ»ñÈ¡InputSplit½Ó¿ÚµÄʵÏÖ£¬Õâ¸öʵÏÖÓÃÓÚ¶ÔÊý¾Ý½øÐл®·Ö£¨Í¼ÖеÄsplite1µ½splite5£¬¾ÍÊÇ»®·ÖÒÔºóµÄ½á¹û£©£¬Í¬Ê±´ÓInputFormatÒ²¿ÉÒÔ»ñÈ¡RecordReader½Ó¿ÚµÄʵÏÖ£¬²¢´ÓÊäÈëÖÐÉú³É<k,v>¶Ô¡£ÓÐÁË<k,v>£¬¾Í¿ÉÒÔ¿ªÊ¼×ömap²Ù×÷ÁË¡£
map²Ù×÷ͨ¹ýcontext.collect£¨×îÖÕͨ¹ýOutputCollector. collect£©½«½á¹ûдµ½contextÖС£µ±MapperµÄÊä³ö±»ÊÕ¼¯ºó£¬ËüÃǻᱻPartitionerÀàÒÔÖ¸¶¨µÄ·½Ê½Çø·ÖµØÐ´³öµ½Êä³öÎļþÀï¡£ÎÒÃÇ¿ÉÒÔΪMapperÌṩCombiner£¬ÔÚMapperÊä³öËüµÄ<k,v>ʱ£¬¼üÖµ¶Ô²»»á±»ÂíÉÏдµ½Êä³öÀËûÃǻᱻÊÕ¼¯ÔÚlistÀһ¸ökeyÖµÒ»¸ölist£©£¬µ±Ð´ÈëÒ»¶¨ÊýÁ¿µÄ¼üÖµ¶Ôʱ£¬Õⲿ·Ö»º³å»á±»CombinerÖнøÐкϲ¢£¬È»ºóÔÙÊä³öµ½PartitionerÖУ¨Í¼ÖÐM1µÄ»ÆÑÕÉ«²¿·Ö¶ÔÓ¦×ÅCombinerºÍPartitioner£©¡£
MapµÄ¶¯×÷×öÍêÒԺ󣬽øÈëReduce½×¶Î¡£Õâ¸ö½×¶Î·Ö3¸ö²½Ö裺»ìÏ´£¨Shuffle£©£¬ÅÅÐò£¨sort£©ºÍreduce¡£
»ìÏ´½×¶Î£¬HadoopµÄMapReduce¿ò¼Ü»á¸ù¾ÝMap½á¹ûÖеÄkey£¬½«Ïà¹ØµÄ½á¹û´«Ê䵽ijһ¸öReducerÉÏ£¨¶à¸öMapper²úÉúµÄͬһ¸ökeyµÄÖмä½á¹û·Ö²¼ÔÚ²»Í¬µÄ»úÆ÷ÉÏ£¬ÕâÒ»²½½áÊøºó£¬ËûÃÇ´«Êä¶¼µ½ÁË´¦ÀíÕâ¸ökeyµÄReducerµÄ»úÆ÷ÉÏ£©¡£Õâ¸ö²½ÖèÖеÄÎļþ´«ÊäʹÓÃÁËHTTPÐÒé¡£
ÅÅÐòºÍ»ìÏ´ÊÇÒ»¿é½øÐеģ¬Õâ¸ö½×¶Î½«À´×Ô²»Í¬Mapper¾ßÓÐÏàͬkeyÖµµÄ<key,value>¶ÔºÏ²¢µ½Ò»Æð¡£Reduce½×¶Î£¬ÉÏÃæÍ¨¹ýShuffleºÍsortºóµÃµ½µÄ<key,
(list of values)>»áË͵½Reducer. reduce·½·¨Öд¦Àí£¬Êä³öµÄ½á¹ûͨ¹ýOutputFormat£¬Êä³öµ½DFSÖС£
MapTask
½ÓÏÂÀ´ÎÒÃÇÀ´·ÖÎöTaskµÄÁ½¸ö×ÓÀ࣬MapTaskºÍReduceTask¡£MapTaskµÄÏà¹ØÀàͼÈçÏ£º

MapTaskÆäʵ²»ÊǺܸ´ÔÓ£¬¸´ÔÓµÄÊÇÖ§³ÖMapTask¹¤×÷µÄһЩ¸¨ÖúÀà¡£MapTaskµÄ³ÉÔ±±äÁ¿ÉÙ£¬Ö»ÓÐsplitºÍsplitClass¡£ÎÒÃÇÖªµÀ£¬MapµÄÊäÈëÊÇsplit£¬ÊÇÔʼÊý¾ÝµÄÒ»¸öÇз֣¬Õâ¸öÇзÖÓÉorg.apache.hadoop.mapred.InputSplitµÄ×ÓÀà¾ßÌåÃèÊö£¨Ç°ÃæÎÒÃÇÊÇͨ¹ýorg.apache.hadoop.mapreduce.InputSplit½éÉÜÁËInputSplit£¬ËüÃǶÔÍâµÄAPIÊÇÒ»ÑùµÄ£©¡£splitClassÊÇInputSplit×ÓÀàµÄÀàÃû£¬Í¨¹ýËü£¬ÎÒÃÇ¿ÉÒÔÀûÓÃJavaµÄ·´Éä»úÖÆ£¬´´½¨³öInputSplit×ÓÀà¡£¶øsplitÊÇÒ»¸öBytesWritable£¬ËüÊÇInputSplit×ÓÀà´®Ðл¯ÒÔºóµÄ½á¹û£¬ÔÙͨ¹ýInputSplit×ÓÀàµÄreadFields·½·¨£¬ÎÒÃÇ¿ÉÒԻظ´³ö¶ÔÓ¦µÄInputSplit¶ÔÏó¡£
MapTask×îÖØÒªµÄ·½·¨ÊÇrun¡£run·½·¨Ï൱¼òµ¥£¬ÅäÖÃÍêϵͳµÄTaskReporterºó£¬¾Í¸ù¾ÝÇé¿öÖ´ÐÐrunJobCleanupTask£¬runJobSetupTask£¬runTaskCleanupTask»òÖ´ÐÐMapper¡£ÓÉÓÚMapReduceÏÖÔÚÓÐÁ½Ì×API£¬MapTaskÐèÒªÖ§³ÖÕâÁ½Ì×API£¬Ê¹µÃMapTaskÖ´ÐÐMapper·ÖΪrunNewMapperºÍrunOldMapper£¬run*Mapperºó£¬MapTask»áµ÷Óø¸ÀàµÄdone·½·¨¡£
½ÓÏÂÀ´ÎÒÃÇÀ´·ÖÎörunOldMapper£¬×ʼ²¿·ÖÊǹ¹ÔìMapper´¦ÀíµÄInputSplit£¬¸üÐÂTaskµÄÅäÖã¬È»ºó¾Í¿ªÊ¼´´½¨MapperµÄRecordReader£¬rawInÊÇÔʼÊäÈ룬Ȼºó·ÖÕý³££¨Ê¹ÓÃTrackedRecordReader£¬ºóÃæÌÖÂÛ£©ºÍÌø¹ý²¿·Ö¼Ç¼£¨Ê¹ÓÃSkippingRecordReader£¬ºóÃæÌÖÂÛ£©Á½ÖÖÇé¿ö£¬¹¹Ôì¶ÔÓ¦µÄÕæÕýÊäÈëin¡£
Ìø¹ý²¿·Ö¼Ç¼ÊÇMapµÄÒ»ÖÖ³ö´í»Ö¸´²ßÂÔ£¬ÎÒÃÇÖªµÀ£¬MapReduce´¦ÀíµÄÊý¾Ý¼¯ºÏ·Ç³£´ó£¬¶øÓÐЩÈÎÎñ¶ÔÒ»²¿·Ö³ö´íµÄÊý¾Ý²»½øÐд¦Àí£¬¶Ô½á¹ûµÄÓ°ÏìºÜС£¨Èç´óÊý¾Ý¼¯ºÏµÄһЩͳ¼ÆÁ¿£©£¬ÄÇô£¬Ò»Ð¡²¿·ÖµÄÊý¾Ý³ö´íµ¼ÖÂÒÑ´¦ÀíµÄ´óÁ¿½á¹ûÎÞЧ£¬Êǵò»³¥Ê§µÄ£¬Ìø¹ýÕⲿ·Ö¼Ç¼£¬³ÉÁËMapperµÄÒ»ÖÖÑ¡Ôñ¡£
MapperµÄÊä³ö£¬ÊÇͨ¹ýMapOutputCollector½øÐеģ¬Ò²·ÖÁ½ÖÖÇé¿ö£¬Èç¹ûûÓÐReducer£¬ÄÇô£¬ÓÃDirectMapOutputCollector£¨ºóÃæÌÖÂÛ£©£¬·ñÔò£¬ÓÃMapOutputBuffer£¨ºóÃæÌÖÂÛ£©¡£
¹¹ÔìÍêMapperµÄÊäÈëÊä³ö£¬Í¨¹ý¹¹ÔìÅäÖÃÎļþÖÐÅäÖõÄMapRunnable£¬¾Í¿ÉÒÔÖ´ÐÐMapperÁË¡£Ä¿Ç°ÏµÍ³ÓÐÁ½¸öMapRunnable£ºMapRunnerºÍMultithreadedMapRunner£¬ÈçÏÂͼ¡£
ÔÓÐAPIÔÚÕâ¿éµÄ´¦ÀíÉϺÍÐÂAPIÓкܴóµÄ²»Ò»Ñù¡£½Ó¿ÚMapRunnableÊÇÔÓÐAPIÖÐMapperµÄÖ´ÐÐÆ÷£¬run·½·¨¾ÍÊÇÓÃÓÚÖ´ÐÐÓû§µÄMapper¡£MapRunnerÊǵ¥Ïß³ÌÖ´ÐÐÆ÷£¬Ï൱¼òµ¥£¬Ê×ÏÈ£¬µ±MapTaskµ÷Óãº
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner
=ReflectionUtils.newInstance(job.getMapRunnerClass(), job); |
MapRunnerµÄconfigure»áÔÚnewInstanceµÄ×îºó±»µ÷Óã¬configureÖ´ÐеĹý³ÌÖУ¬¶ÔÓ¦µÄMapper»áͨ¹ý·´Éä»úÖÆ¹¹Ôì³öÀ´¡£
MapRunnerµÄrun·½·¨£¬»áÏÈ´´½¨¶ÔÓ¦µÄkey£¬value¶ÔÏó£¬È»ºó£¬¶ÔInputSplitµÄÿһ¶Ô<key£¬value>£¬µ÷ÓÃMapperµÄmap·½·¨£¬Ñ»·½áÊøºó£¬Mapper¶ÔÓ¦µÄÇåÀí·½·¨»á±»µ÷Óá£ÎÒÃÇÐèҪעÒ⣬key£¬value¶ÔÏóÔÚrun·½·¨ÖÐÊDZ»Öظ´Ê¹Óõ쬾ÍÊÇ˵£¬Ã¿´Î´«ÈëMapperµÄmap·½·¨µÄkey£¬value¶¼ÊÇͬһ¸ö¶ÔÏó£¬Ö»²»¹ýÊÇÀïÃæµÄÄÚÈݱäÁË£¬¶ÔÏó²¢Ã»Óб䡣Èç¹ûÄãÐèÒª±£Áôkey£¬valueµÄÄÚÈÝ£¬ÐèҪʵÏÖclone»úÖÆ£¬¿Ë¡³ö¶ÔÏóµÄÒ»¸öб¸·Ý¡£
Ïà¶ÔÓÚÐÂAPIµÄ¶àÏß³ÌÖ´ÐÐÆ÷£¬ÀÏAPIµÄMultithreadedMapRunner¾Í±È½Ï¸´ÔÓÁË£¬×ÜÌåÀ´Ëµ£¬¾ÍÊÇͨ¹ý×èÈû¶ÓÁÐÅäºÏJavaµÄ¶àÏß³ÌÖ´ÐÐÆ÷£¬½«<key£¬value>·Ö·¢µ½¶à¸öÏß³ÌÖÐÈ¥´¦Àí¡£ÐèҪעÒâµÄÊÇ£¬ÔÚÕâ¸ö¹ý³ÌÖУ¬ÕâЩÏ̹߳²ÏíÒ»¸öMapperʵÀý£¬Èç¹ûMapperÓй²ÏíµÄ×ÊÔ´£¬ÐèÒªÓÐÒ»¶¨µÄ±£»¤»úÖÆ¡£runNewMapperÓÃÓÚÖ´ÐÐа汾µÄMapper£¬±ÈrunOldMapperÉÔ΢¸´ÔÓ£¬ÎÒÃǾͲ»ÔÙÌÖÂÛÁË¡£
¸¨ÖúÀà1£º
MapTaskµÄ¸¨ÖúÀàÖ÷ÒªÕë¶ÔMapperµÄÊäÈëºÍÊä³ö¡£Ê×ÏÈÎÒÃÇÀ´¿´MapTaskÖÐÓõĵÄMapperÊäÈ룬ÔÚÀàͼÖУ¬Õⲿ·ÖλÓÚÓÒÉϽǡ£
MapTask.TrackedRecordReaderÊÇÒ»¸öWrapper£¬ÔÚÔÓÐÊäÈëRecordReaderµÄ»ù´¡ÉÏ£¬Ìí¼ÓÁËÊÕ¼¯Éϱ¨Í³¼ÆÊý¾ÝµÄ¹¦ÄÜ¡£
MapTask.SkippingRecordReaderÒ²ÊÇÒ»¸öWrapper£¬ËüÔÚMapTask.TrackedRecordReaderµÄ»ù´¡ÉÏ£¬Ìí¼ÓÁ˺öÂÔ²¿·ÖÊäÈëµÄ¹¦ÄÜ¡£ÔÚ·ÖÎöMapTask.SkippingRecordReader֮ǰ£¬ÎÒÃÇÏÈ¿´Ò»ÏÂÀàSortedRangesºÍËüÏà¹ØµÄÀà¡£

ÀàSortedRanges.Ranges±íʾÁËÒ»¸ö·¶Î§£¬ÒÔ¿ªÊ¼Î»Öúͷ¶Î§³¤¶È£¨ÕâÑùµÄ»°¾Í¿ÉÒÔ±íʾ³¤¶ÈΪ0µÄ·¶Î§£©À´±íʾһ¸ö·¶Î§£¬²¢ÌṩÁËһϵÁеķ¶Î§²Ù×÷·½·¨¡£×¢Ò⣬·½·¨getEndIndexµÃµ½µÄÓҶ˵㲢²»°üº¬ÔÚ·¶Î§ÄÚ£¨Ó¦Àí½âΪ¿ªÇø¼ä£©¡£SortedRanges°üº¬ÁËһϵÁв»ÖصþµÄ·¶Î§£¬ÎªÁ˱£Ö¤°üº¬µÄ·¶Î§²»Öصþ£¬ÔÚadd·½·¨ºÍremove·½·¨ÉÏÐèÒª×öһЩ´¦Àí£¬±£Ö¤²»ÖصþµÄÔ¼Êø¡£SkipRangeIteratorÊÇ·ÃÎÊSortedRanges°üº¬µÄRangesµÄµü´úÆ÷¡£
MapTask.SkippingRecordReaderµÄʵÏֺܼòµ¥£¬ÒòΪҪºöÂÔµÄÊäÈë¶¼±£³ÖÔÚSortedRanges.Ranges£¬Ö»ÐèÒªÔÚnext·½·¨ÖУ¬ÅжÏĿǰ·¶Î§Ê±ºòÂäÔÚSortedRanges.RangesÖУ¬Èç¹ûÊÇ£¬ºöÂÔ£¬²¢½«ºöÂԵļǼдÎļþ£¨¿ÉÅäÖã©
NewTrackingRecordReaderºÍNewOutputCollector±»ÐÂAPIʹÓã¬ÎÒÃDz»·ÖÎö¡£
MapTaskµÄÊä³ö¸¨ÖúÀà¶¼¼Ì³Ð×ÔMapOutputCollector£¬ËüÖ»ÊÇÔÚOutputCollectorµÄ»ù´¡ÉÏÌí¼ÓÁËcloseºÍflush·½·¨¡£
DirectMapOutputCollectorÓÃÔÚReducerµÄÊýĿΪ0£¬¾ÍÊDz»ÐèÒªReduce½×¶ÎµÄʱºò¡£ËüÊÇÖ±½Óͨ¹ýout
= job.getOutputFormat().getRecordWriter(fs,job, finalName,
reporter);µÃµ½¶ÔÓ¦µÄRecordWriter£¬collectÖ±½Óµ½RecordWriterÉÏ¡£
Èç¹ûMapperºóÐøÓÐreduceÈÎÎñ£¬ÏµÍ³»áʹÓÃMapOutputBuffer×öΪÊä³ö£¬ÕâÊǸö±È½Ï¸´ÔÓµÄÀ࣬ÓÐ1kÐÐ×óÓҵĴúÂë¡£
ÎÒÃÇÖªµÀ£¬MapperÊÇͨ¹ýOutputCollector½«MapµÄ½á¹ûÊä³ö£¬Êä³öµÄÁ¿ºÜ´ó£¬HadoopµÄ»úÖÆÊÇͨ¹ýÒ»¸öcircle
buffer ÊÕ¼¯MapperµÄÊä³ö, µ½ÁËio.sort.mb * percentÁ¿µÄʱºò£¬¾Íspillµ½disk£¬ÈçÏÂͼ¡£Í¼ÖгöÏÖÁËÁ½¸öÊý×éºÍÒ»¸ö»º³åÇø£¬kvindices±£³ÖÁ˼ǼËùÊôµÄ£¨Reduce£©·ÖÇø£¬keyÔÚ»º³åÇø¿ªÊ¼µÄλÖúÍvalueÔÚ»º³åÇø¿ªÊ¼µÄλÖã¬Í¨¹ýkvindices£¬ÎÒÃÇ¿ÉÒÔÔÚ»º³åÇøÖÐÕÒµ½¶ÔÓ¦µÄ¼Ç¼¡£kvoffetsÓÃÓÚÔÚ»º³åÇøÂúµÄʱºò¶ÔkvindicesµÄpartition½øÐÐÅÅÐò£¬ÅÅÍêÐòµÄ½á¹û½«Êä³öµ½Êä³öµ½±¾µØ´ÅÅÌÉÏ£¬ÆäÖÐË÷Òý£¨kvindices£©±£³ÖÔÚspill{spillºÅ}.out.indexÖУ¬Êý¾Ý±£´æÔÚspill{spillºÅ}.outÖС£

µ±MapperÈÎÎñ½áÊøºó£¬ÓпÉÄÜ»á³öÏÖ¶à¸öspillÎļþ£¬ÕâЩÎļþ»á×öÒ»¸ö¹é²¢ÅÅÐò£¬ÐγÉMapperµÄÒ»¸öÊä³ö£¨spill.outºÍspill.out.index£©£¬ÈçÏÂͼ£º

Õâ¸öÊä³öÊǰ´partitionÅÅÐòµÄ£¬ÕâÑùµÄ»°£¬MapperµÄÊä³ö±»·Ö¶Î£¬ReducerÒª»ñÈ¡µÄ¾ÍÊÇspill.outÖеÄÒ»¶Î¡££¨×¢Ò⣬ÄÚ´æºÍÓ²ÅÌÉϵÄË÷Òý½á¹¹²»Ò»Ñù£©
¸¨ÖúÀà2:
ÓÐÁËÉÏÃæMapperÊä³öµÄÄÚ´æ´æ´¢½á¹¹ºÍÓ²ÅÌ´æ´¢½á¹¹ÌÖÂÛ£¬ÎÒÃÇÀ´×Ðϸ·ÖÎöMapOutputBufferµÄÁ÷³Ì¡£
Ê×ÏÈÊdzÉÔ±±äÁ¿¡£×îÏȳõʼ»¯µÄÊÇ×÷ÒµÅäÖÃjobºÍͳ¼Æ¹¦ÄÜreporter¡£Í¨¹ýÅäÖã¬MapOutputBuffer¿ÉÒÔ»ñÈ¡±¾µØÎļþϵͳ£¨localFsºÍrfs£©£¬ReducerµÄÊýÄ¿ºÍPartitioner¡£
SpillRecordÊÇÎļþspill.out{spillºÅ}.indexÔÚÄÚ´æÖеĶÔÓ¦³éÏó£¨ÄÚ´æÊý¾ÝºÍÎļþÊý¾Ý¾Í²î×îºóµÄУÑéºÍ£©£¬¸ÃÎļþ±£³ÖÁËһϵÁеÄIndexRecord£¬ÈçÏÂͼ£º

IndexRecordÓÐ3¸ö×ֶΣ¬·Ö±ðÊÇstartOffset£º¼ÇÂ¼Æ«ÒÆÁ¿£¬rawLength£º³õʼ³¤¶È£¬partLength£ºÊµ¼Ê³¤¶È£¨¿ÉÄÜÓÐѹËõ£©¡£SpillRecord±£³ÖÁËһϵÁеÄIndexRecord£¬²¢Ìṩ·½·¨ÓÃÓÚÌí¼Ó¼Ç¼£¨Ã»ÓÐɾ³ý¼Ç¼µÄ²Ù×÷£¬ÒòΪ²»ÐèÒª£©£¬»ñÈ¡¼Ç¼£¬Ð´Îļþ£¬¶ÁÎļþ£¨Í¨¹ý¹¹Ô캯Êý£©¡£
½ÓÏÂÀ´ÊÇһЩºÍÊä³ö»º´æÇøkvbuffer£¬»º´æÇø¼Ç¼Ë÷ÒýkvindicesºÍ»º´æÇø¼Ç¼Ë÷ÒýÅÅÐò¹¤×÷Êý×ékvoffsetsÏà¹ØµÄ´¦Àí£¬ÏÂÃæµÄͼÓÐÖúÓÚ˵Ã÷Õâ¶Î´úÂë¡£

Õⲿ·ÖÒÀÀµÓÚ3¸öÅäÖòÎÊý£¬io.sort.spill.percentÊÇkvbuffer£¬kvindicesºÍkvoffsetsµÄ×Ü´óС£¨ÒÔMΪµ¥Î»£¬È±Ê¡ÊÇ100£¬¾ÍÊÇ100M£¬ÕâÒ»²¿·ÖÊÇMapOutputBufferÖÐÕ¼Óô洢×î¶àµÄ£©¡£io.sort.record.percentÊÇkvindicesºÍkvoffsetsÕ¼ÓõĿռä±ÈÀý£¨È±Ê¡ÊÇ0.05£©¡£Ç°ÃæµÄ·ÖÎöÎÒÃÇÒѾ֪µÀkvindicesºÍkvoffsets£¬Èç¹û¼Ç¼ÊýÊÇNµÄ»°£¬ËüÕ¼ÓõĿռäÊÇ4N*4bytes£¬¸ù¾ÝÕâ¸ö¹ØÏµºÍio.sort.record.percentµÄÖµ£¬ÎÒÃÇ¿ÉÒÔ¼ÆËã³ökvindicesºÍkvoffsets×î¶àÄÜÓжàÉÙ¸ö¼Ç¼£¬²¢·ÖÅäÏàÓ¦µÄ¿Õ¼ä¡£²ÎÊýio.sort.spill.percentָʾµ±Êä³ö»º³åÇø»òkvindicesºÍkvoffsets¼Ç¼ÊýÁ¿µ½´ï¶ÔÓ¦µÄÕ¼ÓÃÂʵÄʱºò£¬»áÆô¶¯spill£¬½«Äڴ滺³åÇøµÄ¼Ç¼´æ·Åµ½Ó²ÅÌÉÏ£¬softBufferLimitºÍsoftRecordLimitΪ¶ÔÓ¦µÄ×Ö½ÚÊý¡£
Öµ¶Ô<key, value>Êä³öµ½»º³åÇøÊÇͨ¹ýSerializer´®Ðл¯µÄ£¬Õⲿ·ÖµÄ³õʼ»¯¸úÔÚÉÏÃæÊä³ö»º´æºóÃæ¡£½ÓÏÂÀ´ÊÇһЩ¼ÆÊýÆ÷ºÍ¿ÉÄܵÄÊý¾ÝѹËõ´¦ÀíÆ÷µÄ³õʼ»¯£¬¿ÉÄܵÄCombinerºÍcombiner¹¤×÷µÄһЩÅäÖá£
×îºóÊÇÆô¶¯spillThread£¬¸ÃThread»á¼ì²éÄÚ´æÖеÄÊä³ö»º´æÇø£¬ÔÚÂú×ãÒ»¶¨Ìõ¼þµÄʱºò½«»º³åÇøÖеÄÄÚÈÝspillµ½Ó²ÅÌÉÏ¡£ÕâÊÇÒ»¸ö±ê×¼µÄÉú²úÕß-Ïû·ÑÕßÄ£ÐÍ£¬MapTaskµÄcollect·½·¨ÊÇÉú²úÕߣ¬spillThreadÊÇÏû·ÑÕߣ¬ËüÃÇÖ®¼äͬ²½ÊÇͨ¹ýspillLock£¨ReentrantLock£©ºÍspillLockÉϵÄÁ½¸öÌõ¼þ±äÁ¿£¨spillDoneºÍspillReady£©Íê³ÉµÄ¡£
ÏÈ¿´Éú²úÕߣ¬MapOutputBuffer.collectµÄÖ÷ÒªÁ÷³ÌÊÇ£º
1.±¨¸æ½ø¶ÈºÍ²ÎÊý¼ì²â£¨<K, V>·ûºÏMapperµÄÊä³öÔ¼¶¨£©£»
2.spillLock.lock()£¬½øÈëÁÙ½çÇø£»
3.Èç¹û´ïµ½spillÌõ¼þ£¬ÉèÖñäÁ¿²¢Í¨¹ýspillReady.signal()£¬Í¨ÖªspillThread£»²¢µÈ´ýspill½áÊø£¨Í¨¹ýspillDone.await()µÈ´ý£©£»
4.spillLock.unlock()£»
5.Êä³ökey£¬value²¢¸üÐÂkvindicesºÍkvoffsets£¨×¢Ò⣬·½·¨collectÊÇsynchronized£¬keyºÍvalue¸÷×ÔÊä³ö£¬ËüÃÇÒ²»áÕ¼ÓÃÁ¬ÐøµÄÊä³ö»º³åÇø£©£»
kvstart£¬kvendºÍkvindexÈý¸ö±äÁ¿ÔÚÅжÏÊÇ·ñÐèÒªspillºÍspillÊÇ·ñ½áÊøµÄ¹ý³ÌÖкÜÖØÒª£¬kvstartÊÇÓÐЧ¼Ç¼¿ªÊ¼µÄϱ꣬kvindexÊÇÏÂÒ»¸ö¿É×ö¼Ç¼µÄλÖã¬kvendµÄ×÷ÓñȽÏÌØÊ⣬ËüÔÚÒ»°ãÇé¿öÏÂkvstart==kvend£¬µ«¿ªÊ¼spillµÄʱºòËü»á±»¸³ÖµÎªkvindexµÄÖµ£¬spill½áÊøÊ±£¬ËüµÄÖµ»á±»¸³¸økvstart£¬Õâʱºòkvstart==kvend¡£Õâ¾ÍÊÇ˵£¬Èç¹ûkvstart²»µÈÓÚkvend£¬ÏµÍ³ÕýÔÚspill£¬·ñÔò£¬kvstart==kvend£¬ÏµÍ³´¦ÓÚÆÕͨ¹¤×÷״̬¡£ÆäʵÔÚ´úÂëÖУ¬ÎÒÃÇ¿ÉÒÔ¿´µ½ºÜ¶àkvstart==kvendµÄÅжϡ£
ÏÂÃæÎÒÃÇ·ÖÇé¿ö£¬ÌÖÂÛkvstart£¬kvendºÍkvindexµÄÅäºÏ¡£³õʼ»¯µÄʱºò£¬ËüÃǶ¼±»¸³Öµ0¡£

ÏÂͼ¸ø³öÁËÒ»¸öûÓÐspillµÄ¼Ç¼Ìí¼Ó¹ý³Ì£º

×¢ÒâkvindexºÍkvnextµÄ¹ØÏµ£¬È¡Ä£ÊµÏÖÁËÑ»·»º³åÇø
Èç¹ûÔÚÌí¼Ó¼Ç¼µÄ¹ý³ÌÖУ¬³öÏÖspill£¨¶àÖÖÌõ¼þ£©£¬ÄÇô£¬Ö÷ÒªµÄ¹ý³ÌÈçÏ£º

Ê×ÏÈ»¹ÊǼÆËãkvnext£¬Ö÷Òª£¬Õâ¸öʱºòkvend==kvstart£¨Í¼ÖÐûÓл³öÀ´£©¡£Èç¹ûspillÌõ¼þÂú×㣬ÄÇô£¬kvindexµÄÖµ»á¸³¸økvend£¨ÕâÊÇkvend²»µÈÓÚkvstart£©£¬´ÓkvstartºÍkvendµÄ´óС¹ØÏµ£¬ÎÒÃÇ¿ÉÒÔÖªµÀ¼Ç¼λÓÚÊý×éµÄÄÇÒ»²¿·Ö£¨×ó±ßÊÇkvstart<kvendµÄÇé¿ö£¬ÓÒ±ßÊÇÁíÍâµÄÇé¿ö£©¡£Spill½áÊøµÄʱºò£¬kvendÖµ»á±»¸³¸økvstart£¬
kvend==kvstartÓÖÖØÐÂÂú×㣬ͬʱ£¬ÎÒÃÇ¿ÉÒÔ·¢ÏÖkvindexÔÚÕâ¸ö¹ý³ÌÖÐûÓб仯£¬ÐµļǼ»¹ÊÇдÔÚkvindexÖ¸ÏòµÄλÖã¬È»ºó£¬kvindex=kvnect£¬kvindexÒÆµ½ÏÂÒ»¸ö¿ÉÓÃλÖá£
´ó¼ÒÌå»áÒ»ÏÂÉÏÃæµÄ¹ý³Ì£¬ÌرðÊÇkvstart£¬kvendºÍkvindexµÄÅäºÏ£¬Æäʵ£¬<key£¬value>¶ÔÊä³öʹÓõĻº³åÇø£¬Ò²ÓÐÀàËÆµÄ¹ý³Ì¡£
CollectÔÚ´¦Àí<key£¬value>Êä³öʱ£¬»á´¦ÀíÒ»¸öMapBufferTooSmallException£¬ÕâÊÇvalueµÄ´®Ðл¯½á¹ûÌ«´ó£¬²»ÄÜÒ»´Î·ÅÈ뻺³åÇøµÄָʾ£¬ÕâÖÖÇé¿öÏÂÎÒÃÇÐèÒªµ÷ÓÃspillSingleRecord£¬ÌØÊâ´¦Àí¡£
¸¨ÖúÀà3:
½ÓÏÂÀ´ÌÖÂÛµÄÊÇkey£¬valueµÄÊä³ö£¬Õⲿ·Ö±È½Ï¸´ÔÓ£¬²»¹ýÓÐÁËÇ°Ãækvstart£¬kvendºÍkvindexÅäºÏµÄ·ÖÎö£¬ÓÐÀûÓÚÎÒÃÇÀí½âÕⲿ·ÖµÄ´úÂë¡£Êä³ö»º³åÇøÖУ¬ºÍkvstart£¬kvendºÍkvindex¶ÔÓ¦µÄÊÇbufstart£¬bufendºÍbufmark¡£Õⲿ·Ö»¹Éæ¼°µ½±äÁ¿bufvoid£¬ÓÃÓÚ±íÃ÷ʵ¼ÊʹÓõĻº³åÇø½á⣨¼ûºóÃæBlockingBuffer.reset·ÖÎö£©£¬ºÍ±äÁ¿bufmark£¬ÓÃÓÚ±ê¼Ç¼Ç¼µÄ½áβ¡£Õⲿ·Ö´úÂëÐèÒªbufmark£¬ÊÇÒòΪkey»òvalueµÄÊä³öÊDZ䳤µÄ£¬£¨Ç°ÃæÔªÐÅÏ¢¼Ç¼´óСÊdz£Á¿£¬¾Í²»ÐèÒªÕâÑùµÄ±äÁ¿£©¡£×îºÃµÄÇé¿öÊÇ»º³åÇøÃ»ÓзתºÍvalue´®Ðл¯½á¹ûºÜС£¬ÈçÏÂͼ£º

ÏȶÔkey´®Ðл¯£¬È»ºó¶Ôvalue×ö´®Ðл¯£¬ÁÙʱ±äÁ¿keystart£¬valstartºÍvalend·Ö±ð¼Ç¼ÁËkey½á¹ûµÄ¿ªÊ¼Î»Öã¬value½á¹ûµÄ¿ªÊ¼Î»ÖúÍvalue½á¹ûµÄ½áÊøÎ»Öá£
´®Ðл¯¹ý³ÌÖУ¬Íù»º³åÇøÐ´ÊÇ×îÖÕµ÷ÓÃÁËBuffer.write·½·¨£¬ÎÒÃǺóÃæÔÙ·ÖÎö¡£
Èç¹ûkey´®Ðл¯ºó³öÏÖbufindex < keystart£¬ÄÇô»áµ÷ÓÃBlockingBufferµÄreset·½·¨¡£ÔÒòÊÇÔÚspillµÄ¹ý³ÌÖÐÐèÒª¶Ô<key£¬value>ÅÅÐò£¬ÕâÖÖÇé¿öÏ£¬´«µÝ¸øRawComparatorµÄ±ØÐëÊÇÁ¬ÐøµÄ¶þ½øÖÆ»º³åÇø£¬Í¨¹ýBlockingBuffer.reset·½·¨£¬½â¾öÕâ¸öÎÊÌâ¡£ÏÂͼ½âÊÍÁËÈçºÎ½â¾öÕâ¸öÎÊÌ⣺

µ±·¢ÏÖkeyµÄ´®Ðл¯½á¹û³öÏÖ²»Á¬ÐøµÄÇé¿öʱ£¬ÎÒÃÇ»á°ÑbufvoidÉèÖÃΪbufmark£¬¼û»º³åÇø¿ªÊ¼²¿·ÖÍùºóŲ£¬È»ºó½«ÔÀ´Î»ÓÚbufmarkµ½bufvoid³öµÄ½á¹û£¬¿½µ½»º³åÇø¿ªÊ¼´¦£¬ÕâÑùµÄ»°£¬key´®Ðл¯µÄ½á¹û¾ÍÁ¬Ðø´æ·ÅÔÚ»º³åÇøµÄ×ʼ´¦¡£
ÉÏÃæµÄµ÷ÕûÓÐÒ»¸öÌõ¼þ£¬¾ÍÊÇbufstartÇ°ÃæµÄ»º³åÇøÄܹ»·ÅÏÂÕû¸ökey´®Ðл¯µÄ½á¹û£¬Èç¹û²»ÄÜ£¬´¦ÀíµÄ·½Ê½Êǽ«bufindexÖÃ0£¬È»ºóµ÷ÓÃBlockingBufferÄÚ²¿µÄoutµÄwrite·½·¨Ö±½ÓÊä³ö£¬Õâʵ¼Êµ÷ÓÃÁËBuffer.write·½·¨£¬»áÆô¶¯spill¹ý³Ì£¬×îÖÕÎÒÃÇ»á³É¹¦Ð´Èëkey´®Ðл¯µÄ½á¹û¡£
ÏÂÃæÎÒÃÇ¿´write·½·¨¡£key£¬value´®Ðл¯¹ý³ÌÖУ¬Íù»º³åÇøÐ´Êý¾ÝÊÇ×îÖÕµ÷ÓÃÁËBuffer.write·½·¨£¬ÓÖÊÇÒ»¸ö¸´Ôӵķ½·¨¡£
do-whileÑ»·£¬Ö±µ½ÎÒÃÇÓÐ×ã¹»µÄ¿Õ¼ä¿ÉÒÔдÊý¾Ý£¨°üÀ¨»º³åÇøºÍkvindicesºÍkvoffsets£©
Ê×ÏÈÎÒÃǼÆË㻺³åÇøÁ¬ÐøÐ´ÊÇ·ñдÂú±êÖ¾buffullºÍ»º³åÇø·ÇÁ¬ÐøÇé¿öÏÂÓÐ×㹻д¿Õ¼ä±êÖ¾wrap£¨Õâ¸öʵÔÚÞÖ¿Ú£©£¬¼ûÏÂÃæµÄÌÖÂÛ£»Ìõ¼þ£¨buffull
&& !wrap£©ÓÃÓÚÅжÏĿǰÓÐûÓÐ×ã¹»µÄд¿Õ¼ä£»
ÔÚspillûÆô¶¯µÄÇé¿öÏ£¨kvstart == kvend£©£¬·ÖÁ½ÖÖÇé¿ö£¬Èç¹ûÊý×éÖÐÓмǼ(kvend
!= kvindex)£¬ÄÇô£¬¸ù¾ÝÐèÒª£¨Ä¿Ç°Êä³ö¿Õ¼ä²»×ã»ò¼Ç¼Êý´ïµ½spillÌõ¼þ£©Æô¶¯spill¹ý³Ì£»·ñÔò£¬Èç¹û¿Õ¼ä»¹ÊDz»¹»£¨buffull
&& !wrap£©£¬±íÃ÷Õâ¸ö¼Ç¼·Ç³£´ó£¬ÒÔÖÁÓÚÎÒÃǵÄÄڴ滺³åÇø²»ÄÜÈÝÏÂÕâô´óµÄÊý¾ÝÁ¿£¬Å×MapBufferTooSmallExceptionÒì³££»
Èç¹û¿Õ¼ä²»×ãͬʱspillÔÚÔËÐУ¬µÈ´ýspillDone£»
дÊý¾Ý£¬×¢Ò⣬Èç¹ûbuffull£¬ÔòдÊý¾Ý»á²»Á¬Ðø£¬ÔòдÂúÊ£Ó໺³åÇø£¬È»ºóÉèÖÃbufindex=0£¬²¢´Óbufindex´¦½Ó×Åд¡£·ñÔò£¬¾ÍÊÇ´Óbufindex´¦¿ªÊ¼Ð´¡£
ÏÂͼ¸ø³öÁË»º³åÇøÁ¬ÐøÐ´ÊÇ·ñдÂú±êÖ¾buffullºÍ»º³åÇø·ÇÁ¬ÐøÇé¿öÏÂÓÐ×㹻д¿Õ¼ä±êÖ¾wrap¼ÆËãµÄ¼¸ÖÖ¿ÉÄÜ:

Çé¿ö1ºÍÇé¿ö2ÖУ¬buffullÅжÏΪ´Óbufindexµ½bufvoidÊÇ·ñÓÐ×ã¹»µÄ¿Õ¼äÈÝÄÉдµÄÄÚÈÝ£¬wrapÊÇͼÖа×ÑÕÉ«²¿·ÖµÄ¿Õ¼äÊÇ·ñ±ÈÊäÈë´ó£¬Èç¹ûÊÇ£¬wrapΪtrue£»Çé¿ö3ºÍÇé¿ö4ÖУ¬buffullÅжÏbufindexµ½bufstartµÄ¿Õ¼äÊÇ·ñÂú×ãÌõ¼þ£¬¶øwrap¿Ï¶¨ÊÇfalse¡£Ã÷ÏÔ£¬Ìõ¼þ£¨buffull
&& !wrap£©Âú×ãʱ£¬Ä¿Ç°µÄ¿Õ¼ä²»¹»Ò»´Îд¡£
½ÓÏÂÀ´ÎÒÃÇÀ´¿´spillSingleRecord£¬Ö»ÊÇÓÃÓÚд·Å²»½øÄڴ滺³åÇøµÄ<key£¬value>¶Ô¡£¹ý³ÌºÜÁ÷Ë®£¬Ê×ÏÈÊÇ´´½¨SpillRecord¼Ç¼£¬Êä³öÎļþºÍIndexRecord¼Ç¼£¬È»ºóÑ»·£¬¹¹ÔìSpillRecord²¢ÔÚÇ¡µ±µÄʱºòÊä³ö¼Ç¼£¨ÈçÏÂͼ£©£¬×îºóÊä³öspill{n}.indexÎļþ¡£

Ç°ÃæÎÒÃÇÌá¹ýspillThread£¬ÔÚÕâ¸öϵͳÖÐËüÊÇÏû·ÑÕߣ¬Õâ¸öÏû·ÑÕßÏ൱¼òµ¥£¬ÐèÒªspillʱµ÷Óú¯ÊýsortAndSpill£¬½øÐÐspill¡£sortAndSpillºÍspillSingleRecordÀàËÆ£¬º¯ÊýµÄ¿ªÊ¼Ò²ÊÇ´´½¨SpillRecord¼Ç¼£¬Êä³öÎļþºÍIndexRecord¼Ç¼£¬È»ºó£¬ÐèÒªÔÚkvoffsetsÉÏ×öÅÅÐò£¬ÅÅÍêÐòºó˳Ðò·ÃÎÊkvoffsets£¬Ò²¾ÍÊǰ´partition˳Ðò·ÃÎʼǼ¡£
°´partitionÑ»·´¦ÀíÅÅÍêÐòµÄÊý×飬Èç¹ûûÓÐcombiner£¬ÔòÖ±½ÓÊä³ö¼Ç¼£¬·ñÔò£¬µ÷ÓÃcombineAndSpill£¬ÏÈ×öcombinÈ»ºóÊä³ö¡£Ñ»·µÄ×îºó¼Ç¼IndexRecordµ½SpillRecord¡£
sortAndSpill×îºóÊÇÊä³öspill{n}.indexÎļþ¡£
combineAndSpill±È¼Û¼òµ¥£¬ÎÒÃǾͲ»·ÖÎöÁË¡£
BlockingBufferÖÐ×îºóÒª·ÖÎöµÄ·½·¨ÊÇflush·½·¨¡£µ÷ÓÃflush·½·¨£¬Òâζ×ÅMapperµÄ½á¹û¶¼ÒѾcollectÁË£¬ÐèÒª¶Ô»º³åÇø×öһЩ×îºóµÄÇåÀí£¬²¢ºÏ²¢spill{n}Îļþ²úÉú×îºóµÄÊä³ö¡£
»º³åÇø´¦Àí²¿·ÖºÜ¼òµ¥£¬Ïȵȴý¿ÉÄܵÄspill¹ý³ÌÍê³É£¬È»ºóÅжϻº³åÇøÊÇ·ñΪ¿Õ£¬Èç¹û²»ÊÇ£¬Ôòµ÷ÓÃsortAndSpill£¬×ö×îºóµÄspill£¬È»ºó½áÊøspillÏ̡߳£
flushºÏ²¢spill{n}ÎļþÊÇͨ¹ýmergeParts·½·¨¡£Èç¹ûMapper×îºóÖ»ÓÐÒ»¸öspill{n}Îļþ£¬¼òµ¥Ð޸ĸÃÎļþµÄÎļþÃû¾Í¿ÉÒÔ¡£Èç¹ûMapperûÓÐÈκÎÊä³ö£¬ÄÇôÎÒÃÇÐèÒª´´½¨ÑÆÊä³ö£¨dummy
files£©¡£Èç¹ûspill{n}Îļþ¶àÓÚ1¸ö£¬ÄÇô°´partitionÑ»·´¦ÀíËùÓÐÎļþ£¬½«´¦ÓÚ´¦ÀípartitionµÄ¼Ç¼Êä³ö¡£´¦ÀípartitionµÄ¹ý³ÌÖпÉÄÜ»¹»áÔٴε÷ÓÃcombineAndSpill£¬×î¼Ç¼ÔÙ×öÒ»´Îcombination£¬ÆäÖл¹Éæ¼°µ½¹¤¾ßÀàMerger£¬ÎÒÃǾͲ»ÔÙÉîÈëÑо¿ÁË¡£
´ÓÇ°ÃæµÄͼÖУ¬ÎÒÃÇ¿ÉÒÔ·¢ÏÖTaskÓкܶàÄÚ²¿À࣬²¢ÓµÓдóÁ¿Àà³ÉÔ±±äÁ¿£¬ÕâЩÀàÅäºÏTaskÍê³ÉÏà¹ØµÄ¹¤×÷£¬ÈçÏÂͼ¡£

MapOutputFile¹ÜÀí×ÅMapperµÄÊä³öÎļþ£¬ËüÌṩÁËһϵÁÐget·½·¨£¬ÓÃÓÚ»ñÈ¡MapperÐèÒªµÄ¸÷ÖÖÎļþ£¬ÕâЩÎļþ¶¼´æ·ÅÔÚÒ»¸öĿ¼ÏÂÃæ¡£ÎÒÃǼÙÉè´«ÈëMapOutputFileµÄJobIDΪjob_200707121733_0003£¬TaskIDΪtask_200707121733_0003_m_000005¡£MapOutputFileµÄ¸ùΪ{mapred.local.dir}/taskTracker/jobcache/{jobid}/{taskid}/outputÔÚÏÂÃæµÄÌÖÂÛÖУ¬ÎÒÃǰÑÉÏÃæµÄ·¾¶¼ÇΪ{MapOutputFileRoot}
ÒÔÉÏÃæJogIDºÍTaskIDΪÀý£¬ÎÒÃÇÓУº{mapred.local.dir}/taskTracker/jobcache/job_200707121733_0003/task_200707121733_0003_m_000005/outputÐèҪעÒâµÄÊÇ£¬{mapred.local.dir}¿ÉÒÔ°üº¬Ò»ÏµÁеÄ·¾¶£¬ÄÇô£¬Hadoop»áÔÚÕâЩ¸ù·¾¶ÏÂÕÒÒ»¸öÂú×ãÒªÇóµÄĿ¼£¬½¨Á¢ËùÐèµÄÎļþ¡£MapOutputFileµÄ·½·¨ÓÐÁ½ÖÖ£¬½áβ´øForWriteºÍ²»´øForWrite£¬´øForWriteÓÃÓÚ´´½¨Îļþ£¬ËüÐèÒªÒ»¸öÎļþ´óС×÷Ϊ²ÎÊý£¬ÓÃÓÚ¼ì²é´ÅÅ̿ռ䡣²»´øForWriteÓÃÓÚ»ñÈ¡ÒÔ½¨Á¢µÄÎļþ¡£
getOutputFile£ºÎļþÃûΪ{MapOutputFileRoot}/file.out£» getOutputIndexFile£ºÎļþÃûΪ{MapOutputFileRoot}/file.out.indexgetSpillFile£ºÎļþÃûΪ{MapOutputFileRoot}/spill{spillNumber}.outgetSpillIndexFile£ºÎļþÃûΪ{MapOutputFileRoot}/spill{spillNumber}.out.indexÒÔÉÏËĸö·½·¨ÓÃÓÚTask×ÓÀàMapTaskÖУ»getInputFile£ºÎļþÃûΪ{MapOutputFileRoot}/map_{mapId}.outÓÃÓÚReduceTaskÖС£ÎÒÃǵ½Ê¹Óõ½ËûÃǵĵط½ÔÙ½éÉÜÏàÓ¦µÄÓ¦Óó¡¾°¡£
½éÉÜÍêÁÙʱÎļþ¹ÜÀíÒÔºó£¬ÎÒÃÇÀ´¿´Task.CombineOutputCollector£¬Ëü¼Ì³Ð×Ôorg.apache.hadoop.mapred.OutputCollector£¬ºÜ¼òµ¥£¬Ö»ÊÇÒ»¸öOutputCollectorµ½IFile.WriterµÄAdapter£¬»î¶¼ÈÃIFile.Writer¸ÉÁË¡£
ValuesIteratorÓÃÓÚ´ÓRawKeyValueIterator£¨Key£¬Value¶¼ÊÇDataInputBuffer£¬ValuesIteratorÒªÇó¸ÃÊäÈëÒѾÅÅÐò£©ÖлñÈ¡·ûºÏRawComparator<KEY>
comparatorµÄÖµµÄµü´úÆ÷¡£ËüÔÚTaskÖÐÓÐÒ»¸ö¼òµ¥×ÓÀ࣬CombineValuesIterator¡£
Task.TaskReporterÓÃÓÚÏòJobTrackerÌá½»¼ÆÊýÆ÷±¨¸æºÍ״̬±¨¸æ£¬ËüʵÏÖÁ˼ÆÊýÆ÷±¨¸æReporterºÍ״̬±¨¸æStatusReporter¡£ÎªÁ˲»Ó°ÏìÖ÷Ï̵߳Ť×÷£¬TaskReporterÓÐÒ»¸ö¶ÀÁ¢µÄỊ̈߳¬¸ÃÏß³Ìͨ¹ýTaskUmbilicalProtocol½Ó¿Ú£¬ÀûÓÃHadoopµÄRPC»úÖÆ£¬ÏòJobTracker±¨¸æTaskÖ´ÐÐÇé¿ö¡£
FileSystemStatisticUpdaterÓÃÓڼǼ¶ÔÎļþϵͳµÄ¶Ô/д²Ù×÷×Ö½ÚÊý£¬ÊǸö¼òµ¥µÄ¹¤¾ßÀà¡£ |