Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
MapReduce shuffle¹ý³ÌÆÊÎö¼°µ÷ÓÅ
 
  2344  次浏览      27
 2018-6-6 
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚÍøÂ磬½éÉÜÁËMapper¶Ë£¬»·ÐÎBufferÊý¾Ý½á¹¹£¬Spill£¬ºÏ²¢SpillÎļþ£¬Reducer¶Ë£¬ºÏ²¢£¬ÐÔÄܵ÷Óŵȡ£

MapReduce¼ò½é

ÔÚHadoop MapReduceÖУ¬¿ò¼Ü»áÈ·±£reduceÊÕµ½µÄÊäÈëÊý¾ÝÊǸù¾ÝkeyÅÅÐò¹ýµÄ¡£Êý¾Ý´ÓMapperÊä³öµ½Reducer½ÓÊÕ£¬ÊÇÒ»¸öºÜ¸´ÔӵĹý³Ì£¬¿ò¼Ü´¦ÀíÁËËùÓÐÎÊÌ⣬²¢ÌṩÁ˺ܶàÅäÖÃÏî¼°À©Õ¹µã¡£Ò»¸öMapReduceµÄ´óÖÂÊý¾ÝÁ÷ÈçÏÂͼ£º

MapperµÄÊä³öÅÅÐò¡¢È»ºó´«Ë͵½ReducerµÄ¹ý³Ì£¬³ÆÎªshuffle¡£±¾ÎÄÏêϸµØ½âÎöshuffle¹ý³Ì£¬ÉîÈëÀí½âÕâ¸ö¹ý³Ì¶ÔÓÚMapReduceµ÷ÓÅÖÁ¹ØÖØÒª£¬Ä³Ö̶ֳÈÉÏ˵£¬shuffle¹ý³ÌÊÇMapReduceµÄºËÐÄÄÚÈÝ¡£

Mapper¶Ë

µ±mapº¯Êýͨ¹ýcontext.write()¿ªÊ¼Êä³öÊý¾Ýʱ£¬²»Êǵ¥´¿µØ½«Êý¾ÝдÈëµ½´ÅÅÌ¡£ÎªÁËÐÔÄÜ£¬mapÊä³öµÄÊý¾Ý»áдÈëµ½»º³åÇø£¬²¢½øÐÐÔ¤ÅÅÐòµÄһЩ¹¤×÷£¬Õû¸ö¹ý³ÌÈçÏÂͼ£º

»·ÐÎBufferÊý¾Ý½á¹¹

ÿһ¸ömapÈÎÎñÓÐÒ»¸ö»·ÐÎBuffer£¬map½«Êä³öдÈëµ½Õâ¸öBuffer¡£»·ÐÎBufferÊÇÄÚ´æÖеÄÒ»ÖÖÊ×βÏàÁ¬µÄÊý¾Ý½á¹¹£¬×¨ÃÅÓÃÀ´´æ´¢Key-Value¸ñʽµÄÊý¾Ý£º

HadoopÖУ¬»·Ðλº³åÆäʵ¾ÍÊÇÒ»¸ö×Ö½ÚÊý×飺

// MapTask.java
private byte [] kvbuffer; // main output buffer
kvbuffer = new byte [ maxMemUsage - recordCapacity ] ;

kvbuffer°üº¬Êý¾ÝÇøºÍË÷ÒýÇø£¬ÕâÁ½¸öÇøÊÇÏàÁÚ²»ÖصþµÄÇøÓò£¬ÓÃÒ»¸ö·Ö½çµãÀ´±êʶ¡£·Ö½çµã²»ÊÇÓÀºã²»±äµÄ£¬Ã¿´ÎSpillÖ®ºó¶¼»á¸üÐÂÒ»´Î¡£³õʼ·Ö½çµãΪ0£¬Êý¾Ý´æ´¢·½ÏòΪÏòÉÏÔö³¤£¬Ë÷Òý´æ´¢·½ÏòÏòÏ£º

bufferindexÒ»Ö±ÍùÉÏÔö³¤£¬ÀýÈç×î³õΪ0£¬Ð´ÈëÒ»¸öintÀàÐ͵ÄkeyÖ®ºó±äΪ4£¬Ð´ÈëÒ»¸öintÀàÐ͵ÄvalueÖ®ºó±ä³É8¡£

Ë÷ÒýÊǶÔkey-valueÔÚkvbufferÖеÄË÷Òý£¬ÊǸöËÄÔª×飬ռÓÃËĸöInt³¤¶È£¬°üÀ¨£º

valueµÄÆðʼλÖÃ

keyµÄÆðʼλÖÃ

partitionÖµ

valueµÄ³¤¶È

private static final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int PARTITION = 2; // partition offset in acct
private static final int VALLEN = 3; // length of value
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
// write accounting info
kvmeta.put (kvindex + PARTITION , partition);
kvmeta.put (kvindex + KEYSTART , keystart);
kvmeta.put (kvindex + VALSTART , valstart);
kvmeta.put (kvindex + VALLEN, distanceTo (valstart , valend) );

kvmetaµÄ´æ·ÅÖ¸Õëkvindexÿ´Î¶¼ÊÇÏòÏÂÌøËĸö¡°¸ñ×Ó¡±£¬È»ºóÔÙÏòÉÏÒ»¸ö¸ñ×ÓÒ»¸ö¸ñ×ÓµØÌî³äËÄÔª×éµÄÊý¾Ý¡£±ÈÈçkvindex³õʼλÖÃÊÇ-4£¬µ±µÚÒ»¸ökey-valueдÍêÖ®ºó£¬(kvindex+0)µÄλÖôæ·ÅvalueµÄÆðʼλÖá¢(kvindex+1)µÄλÖôæ·ÅkeyµÄÆðʼλÖá¢(kvindex+2)µÄλÖôæ·ÅpartitionµÄÖµ¡¢(kvindex+3)µÄλÖôæ·ÅvalueµÄ³¤¶È£¬È»ºókvindexÌøµ½-8λÖá£

»º³åÇøµÄ´óСĬÈÏΪ100M£¬µ«ÊÇ¿ÉÒÔͨ¹ýmapreduce.task.io.sort.mbÕâ¸öÊôÐÔÀ´ÅäÖá£

Spill

map½«Êä³ö²»¶ÏдÈëµ½Õâ¸ö»º³åÇøÖУ¬µ±»º³åÇøÊ¹ÓÃÁ¿´ïµ½Ò»¶¨±ÈÀýÖ®ºó£¬Ò»¸öºǫ́Ï߳̿ªÊ¼°Ñ»º³åÇøµÄÊý¾ÝдÈë´ÅÅÌ£¬Õâ¸öдÈëµÄ¹ý³Ì½Ðspill¡£¿ªÊ¼spillµÄBuffer±ÈÀýĬÈÏΪ0.80£¬¿ÉÒÔͨ¹ýmapreduce.map.sort.spill.percentÅäÖá£ÔÚºǫ́Ïß³ÌдÈëµÄͬʱ£¬map¼ÌÐø½«Êä³öдÈëÕâ¸ö»·Ðλº³å£¬Èç¹û»º³å³ØÐ´ÂúÁË£¬map»á×èÈûÖ±µ½spill¹ý³ÌÍê³É£¬¶ø²»»á¸²¸Ç»º³å³ØÖеÄÒÑÓеÄÊý¾Ý¡£

ÔÚдÈë֮ǰ£¬ºǫ́Ḭ̈߳ÑÊý¾Ý°´ÕÕËûÃǽ«ËÍÍùµÄreducer½øÐл®·Ö£¬Í¨¹ýµ÷ÓÃPartitionerµÄgetPartition()·½·¨¾ÍÄÜÖªµÀ¸ÃÊä³öÒªËÍÍùÄĸöReducer¡£Ä¬ÈϵÄPartitionerʹÓÃHashËã·¨À´·ÖÇø£¬¼´Í¨¹ýkey.hashCode() mode RÀ´¼ÆË㣬RΪReducerµÄ¸öÊý¡£getPartition·µ»ØPartitionÊÂʵÉÏÊǸöÕûÊý£¬ÀýÈçÓÐ10¸öReducer£¬Ôò·µ»Ø0-9µÄÕûÊý£¬Ã¿¸öReducer»á¶ÔÓ¦µ½Ò»¸öPartition¡£mapÊä³öµÄ¼üÖµ¶Ô£¬ÓëpartitionÒ»Æð´æÔÚ»º³åÖУ¨¼´Ç°ÃæÌáµ½µÄkvmetaÖУ©¡£¼ÙÉè×÷ÒµÓÐ2¸öreduceÈÎÎñ£¬ÔòÊý¾ÝÔÚÄÚ´æÖб»»®·ÖΪreduce1ºÍreduce2£º

²¢ÇÒÕë¶Ôÿ²¿·ÖÊý¾Ý£¬Ê¹ÓÿìËÙÅÅÐòËã·¨£¨QuickSort£©¶ÔkeyÅÅÐò¡£

Èç¹ûÉèÖÃÁËCombiner£¬ÔòÔÚÅÅÐòµÄ½á¹ûÉÏÔËÐÐcombine¡£

ÅÅÐòºóµÄÊý¾Ý±»Ð´Èëµ½mapreduce.cluster.local.dirÅäÖõÄĿ¼ÖÐµÄÆäÖÐÒ»¸ö£¬Ê¹ÓÃround robin fashionµÄ·½Ê½ÂÖÁ÷¡£×¢ÒâдÈëµÄÊDZ¾µØÎļþĿ¼£¬¶ø²»ÊÇHDFS¡£SpillÎļþÃûÏñsipll0.out£¬spill1.outµÈ¡£

²»Í¬PartitionµÄÊý¾Ý¶¼·ÅÔÚͬһ¸öÎļþ£¬Í¨¹ýË÷ÒýÀ´Çø·ÖpartitionµÄ±ß½çºÍÆðʼλÖá£Ë÷ÒýÊÇÒ»¸öÈýÔª×é½á¹¹£¬°üÀ¨ÆðʼλÖá¢Êý¾Ý³¤¶È¡¢Ñ¹ËõºóµÄÊý¾Ý³¤¶È£¬¶ÔÓ¦IndexRecordÀࣺ

public class IndexRecord {
public long startOffset;
public long rawLength;
public long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset , long rawLength , long partLength ) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
}

ÿ¸ömapperÒ²ÓжÔÓ¦µÄÒ»¸öË÷Òý»·ÐÎBuffer£¬Ä¬ÈÏΪ1KB£¬¿ÉÒÔͨ¹ýmapreduce .task .index .cache .limit .bytes À´ÅäÖã¬Ë÷ÒýÈç¹û×㹻СÔò´æÔÚÄÚ´æÖУ¬Èç¹ûÄÚ´æ·Å²»Ï£¬ÐèҪдÈë´ÅÅÌ¡£

SpillÎļþË÷ÒýÃû³ÆÀàËÆÕâÑù spill110 .out.index , spill111.out .index¡£

SpillÎļþµÄË÷ÒýÊÂʵÉÏÊÇ org.apache .hadoop .mapred .SpillRecord µÄÒ»¸öÊý×飬ÿ¸öMapÈÎÎñ£¨Ô´ÂëÖеÄMapTask .javaÀࣩά»¤Ò»¸öÕâÑùµÄÁÐ±í£º

final ArrayList <SpillRecord> indexCacheList = new ArrayList <SpillRecord> ();

´´½¨Ò»¸öSpillRecordʱ£¬»á·ÖÅ䣨Number_Of_Reducers * 24£©Bytes»º³å:

public SpillRecord (int numPartitions) {
buf = ByteBuffer.allocate (
numPartitions * MapTask.MAP_ OUTPUT_INDEX_ RECORD_ LENGTH );
entries = buf.asLongBuffer ();
}

numPartitionsÊÇPartitionµÄ¸öÊý£¬ÆäʵҲ¾ÍÊÇReducerµÄ¸öÊý£º

public static final int MAP_OUTPUT_ INDEX_RECORD_ LENGTH = 24;
// ---
partitions = jobContext .getNumReduceTasks ();
final SpillRecord spillRec = new SpillRecord ( partitions );

ĬÈϵÄË÷Òý»º³åΪ1KB£¬¼´1024*1024 Bytes£¬¼ÙÉèÓÐ2¸öReducer£¬Ôòÿ¸öSpillÎļþµÄË÷Òý´óСΪ2*24=48 Bytes£¬µ±SpillÎļþ³¬¹ý21845.3ʱ£¬Ë÷ÒýÎļþ¾ÍÐèҪдÈë´ÅÅÌ¡£

Ë÷Òý¼°spillÎļþÈçÏÂͼʾÒ⣺

SpillµÄ¹ý³ÌÖÁÉÙÐèÒªÔËÐÐÒ»´Î£¬ÒòΪMapperµÄÊä³ö½á¹û±ØÐëҪдÈë´ÅÅÌ£¬¹©Reducer½øÒ»²½´¦Àí¡£

ºÏ²¢SpillÎļþ

ÔÚÕû¸ömapÈÎÎñÖУ¬Ò»µ©»º³å´ïµ½É趨µÄãÐÖµ£¬¾Í»á´¥·¢spill²Ù×÷£¬Ð´ÈëspillÎļþµ½´ÅÅÌ£¬Òò´Ë×îºó¿ÉÄÜÓжà¸öspillÎļþ¡£ÔÚmapÈÎÎñ½áÊøÖ®Ç°£¬ÕâЩÎļþ»á¸ù¾ÝÇé¿öºÏ²¢µ½Ò»¸ö´óµÄ·ÖÇøµÄ¡¢ÅÅÐòµÄÎļþÖУ¬ÅÅÐòÊÇÔÚÄÚ´æÅÅÐòµÄ»ù´¡ÉϽøÐÐÈ«¾ÖÅÅÐò¡£ÏÂͼÊǺϲ¢¹ý³ÌµÄ¼òµ¥Ê¾Ò⣺

Ïà¶ÔÓ¦µÄË÷ÒýÎļþÒ²»á±»ºÏ²¢£¬ÒÔ±ãÔÚReducerÇëÇó¶ÔÓ¦PartitionµÄÊý¾ÝµÄʱºòÄܹ»¿ìËÙ¶ÁÈ¡¡£

ÁíÍ⣬Èç¹ûspillÎļþÊýÁ¿´óÓÚmapreduce.map.combiner.minspillsÅäÖõÄÊý£¬ÔòÔںϲ¢ÎļþдÈë֮ǰ£¬»áÔÙ´ÎÔËÐÐcombiner¡£Èç¹ûspillÎļþÊýÁ¿Ì«ÉÙ£¬ÔËÐÐcombinerµÄÊÕÒæ¿ÉÄÜСÓÚµ÷ÓõĴú¼Û¡£

mapreduce.task.io.sort.factorÊôÐÔÅäÖÃÿ´Î×î¶àºÏ²¢¶àÉÙ¸öÎļþ£¬Ä¬ÈÏΪ10£¬¼´Ò»´Î×î¶àºÏ²¢10¸öspillÎļþ¡£×îºó£¬¶àÂֺϲ¢Ö®ºó£¬ËùÓеÄÊä³öÎļþ±»ºÏ²¢ÎªÎ¨Ò»Ò»¸ö´óÎļþ£¬ÒÔ¼°ÏàÓ¦µÄË÷ÒýÎļþ£¨¿ÉÄÜÖ»ÔÚÄÚ´æÖдæÔÚ£©¡£

ѹËõ

ÔÚÊý¾ÝÁ¿´óµÄʱºò£¬¶ÔmapÊä³ö½øÐÐѹËõͨ³£ÊǸöºÃÖ÷Òâ¡£ÒªÆôÓÃѹËõ£¬½«mapreduce. map. output. compress ÉèΪ true£¬²¢Ê¹ÓÃmapreduce .map.output .compress .codecÉèÖÃʹÓõÄѹËõËã·¨¡£

ͨ¹ýHTTP±©Â¶Êä³ö½á¹û

mapÊä³öÊý¾ÝÍê³ÉÖ®ºó£¬Í¨¹ýÔËÐÐÒ»¸öHTTP Server±©Â¶³öÀ´£¬¹©reduce¶Ë»ñÈ¡¡£ÓÃÀ´ÏàÓ¦reduceÊý¾ÝÇëÇóµÄÏß³ÌÊýÁ¿¿ÉÒÔÅäÖã¬Ä¬ÈÏÇé¿öÏÂΪ»úÆ÷ÄÚºËÊýÁ¿µÄÁ½±¶£¬ÈçÐè×Ô¼ºÅäÖã¬Í¨¹ýmapreduce.shuffle.max.threadsÊôÐÔÀ´ÅäÖã¬×¢Òâ¸ÃÅäÖÃÊÇÕë¶ÔNodeManagerÅäÖõ쬶ø²»ÊÇÿ¸ö×÷ÒµÅäÖá£

ͬʱ£¬MapÈÎÎñÍê³Éºó£¬Ò²»á֪ͨApplication Master£¬ÒÔ±ãReducerÄܹ»¼°Ê±À´À­È¡Êý¾Ý¡£

ͨ¹ý»º³å¡¢»®·Ö£¨partition£©¡¢ÅÅÐò¡¢combiner¡¢ºÏ²¢¡¢Ñ¹ËõµÈ¹ý³ÌÖ®ºó£¬map¶ËµÄ¹¤×÷¾ÍËãÍê±Ï£º

Reducer¶Ë

¸÷¸ömapÈÎÎñÔËÐÐÍêÖ®ºó£¬Êä³öдÈëÔËÐÐÈÎÎñµÄ»úÆ÷´ÅÅÌÖС£ReducerÐèÒª´Ó¸÷mapÈÎÎñÖÐÌáÈ¡×Ô¼ºµÄÄÇÒ»²¿·ÖÊý¾Ý£¨¶ÔÓ¦µÄpartition£©¡£Ã¿¸ömapÈÎÎñµÄÍê³Éʱ¼ä¿ÉÄÜÊDz»Ò»ÑùµÄ£¬reduceÈÎÎñÔÚmapÈÎÎñ½áÊøÖ®ºó»á¾¡¿ìÈ¡×ßÊä³ö½á¹û£¬Õâ¸ö½×¶Î½Ðcopy¡£
ReducerÊÇÈçºÎÖªµÀҪȥÄÄЩ»úÆ÷È¥Êý¾ÝÄØ£¿Ò»µ©mapÈÎÎñÍê³ÉÖ®ºó£¬¾Í»áͨ¹ý³£¹æÐÄÌøÍ¨ÖªÓ¦ÓóÌÐòµÄApplication Master¡£reduceµÄÒ»¸öÏ̻߳áÖÜÆÚÐÔµØÏòmasterѯÎÊ£¬Ö±µ½ÌáÈ¡ÍêËùÓÐÊý¾Ý£¨ÈçºÎÖªµÀÌáÈ¡Íꣿ£©¡£

Êý¾Ý±»reduceÌá×ßÖ®ºó£¬map»úÆ÷²»»áÁ¢¿Ìɾ³ýÊý¾Ý£¬ÕâÊÇΪÁËÔ¤·ÀreduceÈÎÎñʧ°ÜÐèÒªÖØ×ö¡£Òò´ËmapÊä³öÊý¾ÝÊÇÔÚÕû¸ö×÷ÒµÍê³ÉÖ®ºó²Å±»É¾³ýµôµÄ¡£

reduceά»¤¼¸¸öcopierỊ̈߳¬²¢ÐеشÓmapÈÎÎñ»úÆ÷ÌáÈ¡Êý¾Ý¡£Ä¬ÈÏÇé¿öÏÂÓÐ5¸öcopy Ị̈߳¬¿ÉÒÔͨ¹ýmapreduce .reduce .shuffle .parallelcopies ÅäÖá£

Èç¹ûmapÊä³öµÄÊý¾Ý×㹻С£¬Ôò»á±»¿½±´µ½reduceÈÎÎñµÄJVMÄÚ´æÖС£mapreduce .reduce .shuffle .input .buffer .percent ÅäÖÃJVM ¶ÑÄÚ´æµÄ¶àÉÙ±ÈÀý¿ÉÒÔÓÃÓÚ´æ·Åmap ÈÎÎñµÄÊä³ö½á¹û¡£Èç¹ûÊý¾ÝÌ«´óÈݲ»Ï£¬Ôò±»¿½±´µ½reduce µÄ»úÆ÷´ÅÅÌÉÏ¡£

ÄÚ´æÖкϲ¢

µ±»º³åÖÐÊý¾Ý´ïµ½ÅäÖõÄãÐֵʱ£¬ÕâЩÊý¾ÝÔÚÄÚ´æÖб»ºÏ²¢¡¢Ð´Èë»úÆ÷´ÅÅÌ¡£ãÐÖµÓÐ2ÖÖÅäÖ÷½Ê½£º

ÅäÖÃÄÚ´æ±ÈÀý£º Ç°ÃæÌáµ½reduce JVM¶ÑÄÚ´æµÄÒ»²¿·ÖÓÃÓÚ´æ·ÅÀ´×ÔmapÈÎÎñµÄÊäÈ룬ÔÚÕâ»ù´¡Ö®ÉÏÅäÖÃÒ»¸ö¿ªÊ¼ºÏ²¢Êý¾ÝµÄ±ÈÀý¡£¼ÙÉèÓÃÓÚ´æ·Åmap Êä³öµÄÄÚ´æÎª500M£¬mapreduce .reduce .shuffle .merger .percent ÅäÖÃΪ0.80£¬Ôòµ±ÄÚ´æÖеÄÊý¾Ý´ïµ½400MµÄʱºò£¬»á´¥·¢ºÏ²¢Ð´Èë¡£
ÅäÖÃmapÊä³öÊýÁ¿£º ͨ¹ýmapreduce .reduce.merge .inmem .thresholdÅäÖá£

Ôںϲ¢µÄ¹ý³ÌÖУ¬»á¶Ô±»ºÏ²¢µÄÎļþ×öÈ«¾ÖµÄÅÅÐò¡£Èç¹û×÷ÒµÅäÖÃÁËCombiner£¬Ôò»áÔËÐÐcombineº¯Êý£¬¼õÉÙдÈë´ÅÅ̵ÄÊý¾ÝÁ¿¡£

Copy¹ý³ÌÖдÅÅ̺ϲ¢

ÔÚcopy¹ýÀ´µÄÊý¾Ý²»¶ÏдÈë´ÅÅ̵Ĺý³ÌÖУ¬Ò»¸öºǫ́Ï̻߳á°ÑÕâЩÎļþºÏ²¢Îª¸ü´óµÄ¡¢ÓÐÐòµÄÎļþ¡£Èç¹ûmapµÄÊä³ö½á¹û½øÐÐÁËѹËõ£¬ÔòÔںϲ¢¹ý³ÌÖУ¬ÐèÒªÔÚÄÚ´æÖнâѹºó²ÅÄܸø½øÐкϲ¢¡£ÕâÀïµÄºÏ²¢Ö»ÊÇΪÁ˼õÉÙ×îÖպϲ¢µÄ¹¤×÷Á¿£¬Ò²¾ÍÊÇÔÚmapÊä³ö»¹ÔÚ¿½±´Ê±£¬¾Í¿ªÊ¼½øÐÐÒ»²¿·ÖºÏ²¢¹¤×÷¡£ºÏ²¢µÄ¹ý³ÌÒ»Ñù»á½øÐÐÈ«¾ÖÅÅÐò¡£

×îÖÕ´ÅÅÌÖкϲ¢

µ±ËùÓÐmapÊä³ö¶¼¿½±´Íê±ÏÖ®ºó£¬ËùÓÐÊý¾Ý±»×îºóºÏ²¢³ÉÒ»¸öÅÅÐòµÄÎļþ£¬×÷ΪreduceÈÎÎñµÄÊäÈë¡£Õâ¸öºÏ²¢¹ý³ÌÊÇÒ»ÂÖÒ»ÂÖ½øÐеģ¬×îºóÒ»Âֵĺϲ¢½á¹ûÖ±½ÓÍÆË͸øreduce×÷ΪÊäÈ룬½ÚÊ¡ÁË´ÅÅ̲Ù×÷µÄÒ»¸öÀ´»Ø¡£×îºó£¨ËùÒÔmapÊä³ö¶¼¿½±´µ½reduceÖ®ºó£©½øÐкϲ¢µÄmapÊä³ö¿ÉÄÜÀ´×Ժϲ¢ºóдÈë´ÅÅ̵ÄÎļþ£¬Ò²¿ÉÄÜÀ´¼°Äڴ滺³å£¬ÔÚ×îºóдÈëÄÚ´æµÄmapÊä³ö¿ÉÄÜûÓдﵽãÐÖµ´¥·¢ºÏ²¢£¬ËùÒÔ»¹ÁôÔÚÄÚ´æÖС£

ÿһÂֺϲ¢²¢²»Ò»¶¨ºÏ²¢Æ½¾ùÊýÁ¿µÄÎļþÊý£¬Ö¸µ¼Ô­ÔòÊÇʹÓÃÕû¸öºÏ²¢¹ý³ÌÖÐдÈë´ÅÅ̵ÄÊý¾ÝÁ¿×îС£¬ÎªÁË´ïµ½Õâ¸öÄ¿µÄ£¬ÔòÐèÒª×îÖÕµÄÒ»Âֺϲ¢Öкϲ¢¾¡¿ÉÄܶàµÄÊý¾Ý£¬ÒòΪ×îºóÒ»ÂÖµÄÊý¾ÝÖ±½Ó×÷ΪreduceµÄÊäÈ룬ÎÞÐèдÈë´ÅÅÌÔÙ¶Á³ö¡£Òò´ËÎÒÃÇÈÃ×îÖÕµÄÒ»Âֺϲ¢µÄÎļþÊý´ïµ½×î´ó£¬¼´ºÏ²¢Òò×ÓµÄÖµ£¬Í¨¹ýmapreduce.task.io.sort.factorÀ´ÅäÖá£

¼ÙÉèÏÖÔÚÓÐ50¸ömapÊä³öÎļþ£¬ºÏ²¢Òò×ÓÅäÖÃΪ10£¬ÔòÐèÒª5Âֵĺϲ¢¡£×îÖÕµÄÒ»ÂÖÈ·±£ºÏ²¢10¸öÎļþ£¬ÆäÖаüÀ¨4¸öÀ´×Ôǰ4Âֵĺϲ¢½á¹û£¬Òò´ËԭʼµÄ50¸öÖУ¬ÔÙÁô³ö6¸ö¸ø×îÖÕÒ»ÂÖ¡£ËùÒÔ×îºóµÄ5Âֺϲ¢¿ÉÄÜÇé¿öÈçÏ£º

ǰ4Âֺϲ¢ºóµÄÊý¾Ý¶¼ÊÇдÈëµ½´ÅÅÌÖеģ¬×¢Òâµ½×îºóµÄ2¸ñÑÕÉ«²»Ò»Ñù£¬ÊÇΪÁ˱êÃ÷ÕâЩÊý¾Ý¿ÉÄÜÖ±½ÓÀ´×ÔÓÚÄÚ´æ¡£

MemToMemºÏ²¢

³ýÁËÄÚ´æÖкϲ¢ºÍ´ÅÅÌÖкϲ¢Í⣬Hadoop»¹¶¨ÒåÁËÒ»ÖÖMemToMemºÏ²¢£¬ÕâÖֺϲ¢½«ÄÚ´æÖеÄmapÊä³öºÏ²¢£¬È»ºóÔÙдÈëÄÚ´æ¡£ÕâÖֺϲ¢Ä¬ÈϹرգ¬¿ÉÒÔͨ¹ýreduce.merge .memtomem .enabled´ò¿ª£¬µ±mapÊä³öÎļþ´ïµ½reduce .merge .memtomem .thresholdʱ£¬´¥·¢ÕâÖֺϲ¢¡£

×îºóÒ»´ÎºÏ²¢ºó´«µÝ¸øreduce·½·¨

ºÏ²¢ºóµÄÎļþ×÷ΪÊäÈë´«µÝ¸øReducer£¬ReducerÕë¶Ôÿ¸ökey¼°ÆäÅÅÐòµÄÊý¾Ýµ÷ÓÃreduceº¯Êý¡£²úÉúµÄreduceÊä³öÒ»°ãдÈëµ½HDFS£¬reduceÊä³öµÄÎļþµÚÒ»¸ö¸±±¾Ð´Èëµ½µ±Ç°ÔËÐÐreduceµÄ»úÆ÷£¬ÆäËû¸±±¾Ñ¡Ö·Ô­Ôò°´ÕÕ³£¹æµÄHDFSÊý¾ÝдÈëÔ­ÔòÀ´½øÐУ¬ÏêϸÐÅÏ¢Çë²Î¿¼ÕâÀï¡£

ͨ¹ý´Ómap»úÆ÷ÌáÈ¡½á¹û£¬ºÏ²¢£¬combineÖ®ºó£¬´«µÝ¸øreduceÍê³É×îºó¹¤×÷£¬Õû¸ö¹ý³ÌÒ²¾Í²î²»¶àÍê³É¡£×îºóÔÙ¸ÐÊÜÒ»ÏÂÏÂÃæÕâÕÅͼ£º

ÐÔÄܵ÷ÓÅ

Èç¹ûÄܹ»¸ù¾ÝÇé¿ö¶Ôshuffle¹ý³Ì½øÐе÷ÓÅ£¬¶ÔÓÚÌṩMapReduceÐÔÄܺÜÓаïÖú¡£Ïà¹ØµÄ²ÎÊýÅäÖÃÁÐÔÚºóÃæµÄ±í¸ñÖС£

Ò»¸öͨÓõÄÔ­ÔòÊǸøshuffle¹ý³Ì·ÖÅ価¿ÉÄÜ´óµÄÄڴ棬µ±È»ÄãÐèҪȷ±£mapºÍreduceÓÐ×ã¹»µÄÄÚ´æÀ´ÔËÐÐÒµÎñÂß¼­¡£Òò´ËÔÚʵÏÖMapperºÍReducerʱ£¬Ó¦¸Ã¾¡Á¿¼õÉÙÄÚ´æµÄʹÓã¬ÀýÈç±ÜÃâÔÚMapÖв»¶ÏµØµþ¼Ó¡£

ÔËÐÐmapºÍreduceÈÎÎñµÄJVM£¬ÄÚ´æÍ¨¹ýmapred .child .java.optsÊôÐÔÀ´ÉèÖ㬾¡¿ÉÄÜÉè´óÄÚ´æ¡£ÈÝÆ÷µÄÄÚ´æ´óСͨ¹ýmapreduce .map .memory .mb ºÍmapreduce .reduce .memory .mbÀ´ÉèÖã¬Ä¬È϶¼ÊÇ1024M¡£

mapÓÅ»¯

ÔÚmap¶Ë£¬±ÜÃâдÈë¶à¸öspillÎļþ¿ÉÄÜ´ïµ½×îºÃµÄÐÔÄÜ£¬Ò»¸öspillÎļþÊÇ×îºÃµÄ¡£Í¨¹ý¹À¼ÆmapµÄÊä³ö´óС£¬ÉèÖúÏÀíµÄmapreduce.task.io.sort.*ÊôÐÔ£¬Ê¹µÃspillÎļþÊýÁ¿×îС¡£ÀýÈ羡¿ÉÄܵ÷´ómapreduce.task.io.sort.mb¡£

map¶ËÏà¹ØµÄÊôÐÔÈçÏÂ±í£º

reduceÓÅ»¯

ÔÚreduce¶Ë£¬Èç¹ûÄܹ»ÈÃËùÓÐÊý¾Ý¶¼±£´æÔÚÄÚ´æÖУ¬¿ÉÒÔ´ïµ½×î¼ÑµÄÐÔÄÜ¡£Í¨³£Çé¿öÏ£¬ÄÚ´æ¶¼±£Áô¸øreduceº¯Êý£¬µ«ÊÇÈç¹ûreduceº¯Êý¶ÔÄÚ´æÐèÇó²»ÊǺܸߣ¬½«mapreduce.reduce.merge.inmem.threshold£¨´¥·¢ºÏ²¢µÄmapÊä³öÎļþÊý£©ÉèΪ0£¬mapreduce.reduce.input.buffer.percent£¨ÓÃÓÚ±£´æmapÊä³öÎļþµÄ¶ÑÄÚ´æ±ÈÀý£©ÉèΪ1.0£¬¿ÉÒÔ´ïµ½ºÜºÃµÄÐÔÄÜÌáÉý¡£ÔÚ2008ÄêµÄTB¼¶±ðÊý¾ÝÅÅÐòÐÔÄܲâÊÔÖУ¬Hadoop¾ÍÊÇͨ¹ý½«reduceµÄÖмäÊý¾Ý¶¼±£´æÔÚÄÚ´æÖÐʤÀûµÄ¡£

reduce¶ËÏà¹ØÊôÐÔ£º

ͨÓÃÓÅ»¯

HadoopĬÈÏʹÓÃ4KB×÷Ϊ»º³å£¬Õâ¸öËãÊǺÜСµÄ£¬¿ÉÒÔͨ¹ýio .file .buffer .sizeÀ´µ÷¸ß»º³å³Ø´óС¡£

   
2344 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ