±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚÍøÂ磬½éÉÜÁËMapper¶Ë£¬»·ÐÎBufferÊý¾Ý½á¹¹£¬Spill£¬ºÏ²¢SpillÎļþ£¬Reducer¶Ë£¬ºÏ²¢£¬ÐÔÄܵ÷Óŵȡ£
|
|
MapReduce¼ò½é
ÔÚHadoop MapReduceÖУ¬¿ò¼Ü»áÈ·±£reduceÊÕµ½µÄÊäÈëÊý¾ÝÊǸù¾ÝkeyÅÅÐò¹ýµÄ¡£Êý¾Ý´ÓMapperÊä³öµ½Reducer½ÓÊÕ£¬ÊÇÒ»¸öºÜ¸´ÔӵĹý³Ì£¬¿ò¼Ü´¦ÀíÁËËùÓÐÎÊÌ⣬²¢ÌṩÁ˺ܶàÅäÖÃÏî¼°À©Õ¹µã¡£Ò»¸öMapReduceµÄ´óÖÂÊý¾ÝÁ÷ÈçÏÂͼ£º

MapperµÄÊä³öÅÅÐò¡¢È»ºó´«Ë͵½ReducerµÄ¹ý³Ì£¬³ÆÎªshuffle¡£±¾ÎÄÏêϸµØ½âÎöshuffle¹ý³Ì£¬ÉîÈëÀí½âÕâ¸ö¹ý³Ì¶ÔÓÚMapReduceµ÷ÓÅÖÁ¹ØÖØÒª£¬Ä³Ö̶ֳÈÉÏ˵£¬shuffle¹ý³ÌÊÇMapReduceµÄºËÐÄÄÚÈÝ¡£
Mapper¶Ë
µ±mapº¯Êýͨ¹ýcontext.write()¿ªÊ¼Êä³öÊý¾Ýʱ£¬²»Êǵ¥´¿µØ½«Êý¾ÝдÈëµ½´ÅÅÌ¡£ÎªÁËÐÔÄÜ£¬mapÊä³öµÄÊý¾Ý»áдÈëµ½»º³åÇø£¬²¢½øÐÐÔ¤ÅÅÐòµÄһЩ¹¤×÷£¬Õû¸ö¹ý³ÌÈçÏÂͼ£º

»·ÐÎBufferÊý¾Ý½á¹¹
ÿһ¸ömapÈÎÎñÓÐÒ»¸ö»·ÐÎBuffer£¬map½«Êä³öдÈëµ½Õâ¸öBuffer¡£»·ÐÎBufferÊÇÄÚ´æÖеÄÒ»ÖÖÊ×βÏàÁ¬µÄÊý¾Ý½á¹¹£¬×¨ÃÅÓÃÀ´´æ´¢Key-Value¸ñʽµÄÊý¾Ý£º

HadoopÖУ¬»·Ðλº³åÆäʵ¾ÍÊÇÒ»¸ö×Ö½ÚÊý×飺
// MapTask.java
private byte [] kvbuffer; // main output buffer
kvbuffer = new byte [ maxMemUsage - recordCapacity
] ; |
kvbuffer°üº¬Êý¾ÝÇøºÍË÷ÒýÇø£¬ÕâÁ½¸öÇøÊÇÏàÁÚ²»ÖصþµÄÇøÓò£¬ÓÃÒ»¸ö·Ö½çµãÀ´±êʶ¡£·Ö½çµã²»ÊÇÓÀºã²»±äµÄ£¬Ã¿´ÎSpillÖ®ºó¶¼»á¸üÐÂÒ»´Î¡£³õʼ·Ö½çµãΪ0£¬Êý¾Ý´æ´¢·½ÏòΪÏòÉÏÔö³¤£¬Ë÷Òý´æ´¢·½ÏòÏòÏ£º

bufferindexÒ»Ö±ÍùÉÏÔö³¤£¬ÀýÈç×î³õΪ0£¬Ð´ÈëÒ»¸öintÀàÐ͵ÄkeyÖ®ºó±äΪ4£¬Ð´ÈëÒ»¸öintÀàÐ͵ÄvalueÖ®ºó±ä³É8¡£
Ë÷ÒýÊǶÔkey-valueÔÚkvbufferÖеÄË÷Òý£¬ÊǸöËÄÔª×飬ռÓÃËĸöInt³¤¶È£¬°üÀ¨£º
valueµÄÆðʼλÖÃ
keyµÄÆðʼλÖÃ
partitionÖµ
valueµÄ³¤¶È
private static
final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key
offset in acct
private static final int PARTITION = 2; // partition
offset in acct
private static final int VALLEN = 3; // length
of value
private static final int NMETA = 4; // num meta
ints
private static final int METASIZE = NMETA * 4;
// size in bytes
// write accounting info
kvmeta.put (kvindex + PARTITION , partition);
kvmeta.put (kvindex + KEYSTART , keystart);
kvmeta.put (kvindex + VALSTART , valstart);
kvmeta.put (kvindex + VALLEN, distanceTo (valstart
, valend) ); |
kvmetaµÄ´æ·ÅÖ¸Õëkvindexÿ´Î¶¼ÊÇÏòÏÂÌøËĸö¡°¸ñ×Ó¡±£¬È»ºóÔÙÏòÉÏÒ»¸ö¸ñ×ÓÒ»¸ö¸ñ×ÓµØÌî³äËÄÔª×éµÄÊý¾Ý¡£±ÈÈçkvindex³õʼλÖÃÊÇ-4£¬µ±µÚÒ»¸ökey-valueдÍêÖ®ºó£¬(kvindex+0)µÄλÖôæ·ÅvalueµÄÆðʼλÖá¢(kvindex+1)µÄλÖôæ·ÅkeyµÄÆðʼλÖá¢(kvindex+2)µÄλÖôæ·ÅpartitionµÄÖµ¡¢(kvindex+3)µÄλÖôæ·ÅvalueµÄ³¤¶È£¬È»ºókvindexÌøµ½-8λÖá£
»º³åÇøµÄ´óСĬÈÏΪ100M£¬µ«ÊÇ¿ÉÒÔͨ¹ýmapreduce.task.io.sort.mbÕâ¸öÊôÐÔÀ´ÅäÖá£
Spill
map½«Êä³ö²»¶ÏдÈëµ½Õâ¸ö»º³åÇøÖУ¬µ±»º³åÇøÊ¹ÓÃÁ¿´ïµ½Ò»¶¨±ÈÀýÖ®ºó£¬Ò»¸öºǫ́Ï߳̿ªÊ¼°Ñ»º³åÇøµÄÊý¾ÝдÈë´ÅÅÌ£¬Õâ¸öдÈëµÄ¹ý³Ì½Ðspill¡£¿ªÊ¼spillµÄBuffer±ÈÀýĬÈÏΪ0.80£¬¿ÉÒÔͨ¹ýmapreduce.map.sort.spill.percentÅäÖá£ÔÚºǫ́Ïß³ÌдÈëµÄͬʱ£¬map¼ÌÐø½«Êä³öдÈëÕâ¸ö»·Ðλº³å£¬Èç¹û»º³å³ØÐ´ÂúÁË£¬map»á×èÈûÖ±µ½spill¹ý³ÌÍê³É£¬¶ø²»»á¸²¸Ç»º³å³ØÖеÄÒÑÓеÄÊý¾Ý¡£
ÔÚдÈë֮ǰ£¬ºǫ́Ḭ̈߳ÑÊý¾Ý°´ÕÕËûÃǽ«ËÍÍùµÄreducer½øÐл®·Ö£¬Í¨¹ýµ÷ÓÃPartitionerµÄgetPartition()·½·¨¾ÍÄÜÖªµÀ¸ÃÊä³öÒªËÍÍùÄĸöReducer¡£Ä¬ÈϵÄPartitionerʹÓÃHashËã·¨À´·ÖÇø£¬¼´Í¨¹ýkey.hashCode()
mode RÀ´¼ÆË㣬RΪReducerµÄ¸öÊý¡£getPartition·µ»ØPartitionÊÂʵÉÏÊǸöÕûÊý£¬ÀýÈçÓÐ10¸öReducer£¬Ôò·µ»Ø0-9µÄÕûÊý£¬Ã¿¸öReducer»á¶ÔÓ¦µ½Ò»¸öPartition¡£mapÊä³öµÄ¼üÖµ¶Ô£¬ÓëpartitionÒ»Æð´æÔÚ»º³åÖУ¨¼´Ç°ÃæÌáµ½µÄkvmetaÖУ©¡£¼ÙÉè×÷ÒµÓÐ2¸öreduceÈÎÎñ£¬ÔòÊý¾ÝÔÚÄÚ´æÖб»»®·ÖΪreduce1ºÍreduce2£º

²¢ÇÒÕë¶Ôÿ²¿·ÖÊý¾Ý£¬Ê¹ÓÿìËÙÅÅÐòËã·¨£¨QuickSort£©¶ÔkeyÅÅÐò¡£
Èç¹ûÉèÖÃÁËCombiner£¬ÔòÔÚÅÅÐòµÄ½á¹ûÉÏÔËÐÐcombine¡£
ÅÅÐòºóµÄÊý¾Ý±»Ð´Èëµ½mapreduce.cluster.local.dirÅäÖõÄĿ¼ÖÐµÄÆäÖÐÒ»¸ö£¬Ê¹ÓÃround
robin fashionµÄ·½Ê½ÂÖÁ÷¡£×¢ÒâдÈëµÄÊDZ¾µØÎļþĿ¼£¬¶ø²»ÊÇHDFS¡£SpillÎļþÃûÏñsipll0.out£¬spill1.outµÈ¡£
²»Í¬PartitionµÄÊý¾Ý¶¼·ÅÔÚͬһ¸öÎļþ£¬Í¨¹ýË÷ÒýÀ´Çø·ÖpartitionµÄ±ß½çºÍÆðʼλÖá£Ë÷ÒýÊÇÒ»¸öÈýÔª×é½á¹¹£¬°üÀ¨ÆðʼλÖá¢Êý¾Ý³¤¶È¡¢Ñ¹ËõºóµÄÊý¾Ý³¤¶È£¬¶ÔÓ¦IndexRecordÀࣺ
public class
IndexRecord {
public long startOffset;
public long rawLength;
public long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset , long rawLength
, long partLength ) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
} |
ÿ¸ömapperÒ²ÓжÔÓ¦µÄÒ»¸öË÷Òý»·ÐÎBuffer£¬Ä¬ÈÏΪ1KB£¬¿ÉÒÔͨ¹ýmapreduce
.task .index .cache .limit .bytes À´ÅäÖã¬Ë÷ÒýÈç¹û×㹻СÔò´æÔÚÄÚ´æÖУ¬Èç¹ûÄÚ´æ·Å²»Ï£¬ÐèҪдÈë´ÅÅÌ¡£
SpillÎļþË÷ÒýÃû³ÆÀàËÆÕâÑù spill110 .out.index
, spill111.out .index¡£
SpillÎļþµÄË÷ÒýÊÂʵÉÏÊÇ org.apache .hadoop
.mapred .SpillRecord µÄÒ»¸öÊý×飬ÿ¸öMapÈÎÎñ£¨Ô´ÂëÖеÄMapTask .javaÀࣩά»¤Ò»¸öÕâÑùµÄÁÐ±í£º
final ArrayList
<SpillRecord> indexCacheList = new ArrayList
<SpillRecord> (); |
´´½¨Ò»¸öSpillRecordʱ£¬»á·ÖÅ䣨Number_Of_Reducers
* 24£©Bytes»º³å:
public SpillRecord
(int numPartitions) {
buf = ByteBuffer.allocate (
numPartitions * MapTask.MAP_ OUTPUT_INDEX_ RECORD_
LENGTH );
entries = buf.asLongBuffer ();
} |
numPartitionsÊÇPartitionµÄ¸öÊý£¬ÆäʵҲ¾ÍÊÇReducerµÄ¸öÊý£º
public static
final int MAP_OUTPUT_ INDEX_RECORD_ LENGTH = 24;
// ---
partitions = jobContext .getNumReduceTasks ();
final SpillRecord spillRec = new SpillRecord (
partitions ); |
ĬÈϵÄË÷Òý»º³åΪ1KB£¬¼´1024*1024 Bytes£¬¼ÙÉèÓÐ2¸öReducer£¬Ôòÿ¸öSpillÎļþµÄË÷Òý´óСΪ2*24=48
Bytes£¬µ±SpillÎļþ³¬¹ý21845.3ʱ£¬Ë÷ÒýÎļþ¾ÍÐèҪдÈë´ÅÅÌ¡£
Ë÷Òý¼°spillÎļþÈçÏÂͼʾÒ⣺

SpillµÄ¹ý³ÌÖÁÉÙÐèÒªÔËÐÐÒ»´Î£¬ÒòΪMapperµÄÊä³ö½á¹û±ØÐëҪдÈë´ÅÅÌ£¬¹©Reducer½øÒ»²½´¦Àí¡£
ºÏ²¢SpillÎļþ
ÔÚÕû¸ömapÈÎÎñÖУ¬Ò»µ©»º³å´ïµ½É趨µÄãÐÖµ£¬¾Í»á´¥·¢spill²Ù×÷£¬Ð´ÈëspillÎļþµ½´ÅÅÌ£¬Òò´Ë×îºó¿ÉÄÜÓжà¸öspillÎļþ¡£ÔÚmapÈÎÎñ½áÊøÖ®Ç°£¬ÕâЩÎļþ»á¸ù¾ÝÇé¿öºÏ²¢µ½Ò»¸ö´óµÄ·ÖÇøµÄ¡¢ÅÅÐòµÄÎļþÖУ¬ÅÅÐòÊÇÔÚÄÚ´æÅÅÐòµÄ»ù´¡ÉϽøÐÐÈ«¾ÖÅÅÐò¡£ÏÂͼÊǺϲ¢¹ý³ÌµÄ¼òµ¥Ê¾Ò⣺

Ïà¶ÔÓ¦µÄË÷ÒýÎļþÒ²»á±»ºÏ²¢£¬ÒÔ±ãÔÚReducerÇëÇó¶ÔÓ¦PartitionµÄÊý¾ÝµÄʱºòÄܹ»¿ìËÙ¶ÁÈ¡¡£
ÁíÍ⣬Èç¹ûspillÎļþÊýÁ¿´óÓÚmapreduce.map.combiner.minspillsÅäÖõÄÊý£¬ÔòÔںϲ¢ÎļþдÈë֮ǰ£¬»áÔÙ´ÎÔËÐÐcombiner¡£Èç¹ûspillÎļþÊýÁ¿Ì«ÉÙ£¬ÔËÐÐcombinerµÄÊÕÒæ¿ÉÄÜСÓÚµ÷ÓõĴú¼Û¡£
mapreduce.task.io.sort.factorÊôÐÔÅäÖÃÿ´Î×î¶àºÏ²¢¶àÉÙ¸öÎļþ£¬Ä¬ÈÏΪ10£¬¼´Ò»´Î×î¶àºÏ²¢10¸öspillÎļþ¡£×îºó£¬¶àÂֺϲ¢Ö®ºó£¬ËùÓеÄÊä³öÎļþ±»ºÏ²¢ÎªÎ¨Ò»Ò»¸ö´óÎļþ£¬ÒÔ¼°ÏàÓ¦µÄË÷ÒýÎļþ£¨¿ÉÄÜÖ»ÔÚÄÚ´æÖдæÔÚ£©¡£
ѹËõ
ÔÚÊý¾ÝÁ¿´óµÄʱºò£¬¶ÔmapÊä³ö½øÐÐѹËõͨ³£ÊǸöºÃÖ÷Òâ¡£ÒªÆôÓÃѹËõ£¬½«mapreduce.
map. output. compress ÉèΪ true£¬²¢Ê¹ÓÃmapreduce .map.output
.compress .codecÉèÖÃʹÓõÄѹËõËã·¨¡£
ͨ¹ýHTTP±©Â¶Êä³ö½á¹û
mapÊä³öÊý¾ÝÍê³ÉÖ®ºó£¬Í¨¹ýÔËÐÐÒ»¸öHTTP Server±©Â¶³öÀ´£¬¹©reduce¶Ë»ñÈ¡¡£ÓÃÀ´ÏàÓ¦reduceÊý¾ÝÇëÇóµÄÏß³ÌÊýÁ¿¿ÉÒÔÅäÖã¬Ä¬ÈÏÇé¿öÏÂΪ»úÆ÷ÄÚºËÊýÁ¿µÄÁ½±¶£¬ÈçÐè×Ô¼ºÅäÖã¬Í¨¹ýmapreduce.shuffle.max.threadsÊôÐÔÀ´ÅäÖã¬×¢Òâ¸ÃÅäÖÃÊÇÕë¶ÔNodeManagerÅäÖõ쬶ø²»ÊÇÿ¸ö×÷ÒµÅäÖá£
ͬʱ£¬MapÈÎÎñÍê³Éºó£¬Ò²»á֪ͨApplication Master£¬ÒÔ±ãReducerÄܹ»¼°Ê±À´ÀÈ¡Êý¾Ý¡£
ͨ¹ý»º³å¡¢»®·Ö£¨partition£©¡¢ÅÅÐò¡¢combiner¡¢ºÏ²¢¡¢Ñ¹ËõµÈ¹ý³ÌÖ®ºó£¬map¶ËµÄ¹¤×÷¾ÍËãÍê±Ï£º

Reducer¶Ë
¸÷¸ömapÈÎÎñÔËÐÐÍêÖ®ºó£¬Êä³öдÈëÔËÐÐÈÎÎñµÄ»úÆ÷´ÅÅÌÖС£ReducerÐèÒª´Ó¸÷mapÈÎÎñÖÐÌáÈ¡×Ô¼ºµÄÄÇÒ»²¿·ÖÊý¾Ý£¨¶ÔÓ¦µÄpartition£©¡£Ã¿¸ömapÈÎÎñµÄÍê³Éʱ¼ä¿ÉÄÜÊDz»Ò»ÑùµÄ£¬reduceÈÎÎñÔÚmapÈÎÎñ½áÊøÖ®ºó»á¾¡¿ìÈ¡×ßÊä³ö½á¹û£¬Õâ¸ö½×¶Î½Ðcopy¡£
ReducerÊÇÈçºÎÖªµÀҪȥÄÄЩ»úÆ÷È¥Êý¾ÝÄØ£¿Ò»µ©mapÈÎÎñÍê³ÉÖ®ºó£¬¾Í»áͨ¹ý³£¹æÐÄÌøÍ¨ÖªÓ¦ÓóÌÐòµÄApplication
Master¡£reduceµÄÒ»¸öÏ̻߳áÖÜÆÚÐÔµØÏòmasterѯÎÊ£¬Ö±µ½ÌáÈ¡ÍêËùÓÐÊý¾Ý£¨ÈçºÎÖªµÀÌáÈ¡Íꣿ£©¡£
Êý¾Ý±»reduceÌá×ßÖ®ºó£¬map»úÆ÷²»»áÁ¢¿Ìɾ³ýÊý¾Ý£¬ÕâÊÇΪÁËÔ¤·ÀreduceÈÎÎñʧ°ÜÐèÒªÖØ×ö¡£Òò´ËmapÊä³öÊý¾ÝÊÇÔÚÕû¸ö×÷ÒµÍê³ÉÖ®ºó²Å±»É¾³ýµôµÄ¡£
reduceά»¤¼¸¸öcopierỊ̈߳¬²¢ÐеشÓmapÈÎÎñ»úÆ÷ÌáÈ¡Êý¾Ý¡£Ä¬ÈÏÇé¿öÏÂÓÐ5¸öcopy
Ị̈߳¬¿ÉÒÔͨ¹ýmapreduce .reduce .shuffle .parallelcopies
ÅäÖá£
Èç¹ûmapÊä³öµÄÊý¾Ý×㹻С£¬Ôò»á±»¿½±´µ½reduceÈÎÎñµÄJVMÄÚ´æÖС£mapreduce
.reduce .shuffle .input .buffer .percent ÅäÖÃJVM ¶ÑÄÚ´æµÄ¶àÉÙ±ÈÀý¿ÉÒÔÓÃÓÚ´æ·Åmap
ÈÎÎñµÄÊä³ö½á¹û¡£Èç¹ûÊý¾ÝÌ«´óÈݲ»Ï£¬Ôò±»¿½±´µ½reduce µÄ»úÆ÷´ÅÅÌÉÏ¡£
ÄÚ´æÖкϲ¢
µ±»º³åÖÐÊý¾Ý´ïµ½ÅäÖõÄãÐֵʱ£¬ÕâЩÊý¾ÝÔÚÄÚ´æÖб»ºÏ²¢¡¢Ð´Èë»úÆ÷´ÅÅÌ¡£ãÐÖµÓÐ2ÖÖÅäÖ÷½Ê½£º
ÅäÖÃÄÚ´æ±ÈÀý£º Ç°ÃæÌáµ½reduce JVM¶ÑÄÚ´æµÄÒ»²¿·ÖÓÃÓÚ´æ·ÅÀ´×ÔmapÈÎÎñµÄÊäÈ룬ÔÚÕâ»ù´¡Ö®ÉÏÅäÖÃÒ»¸ö¿ªÊ¼ºÏ²¢Êý¾ÝµÄ±ÈÀý¡£¼ÙÉèÓÃÓÚ´æ·Åmap
Êä³öµÄÄÚ´æÎª500M£¬mapreduce .reduce .shuffle .merger .percent
ÅäÖÃΪ0.80£¬Ôòµ±ÄÚ´æÖеÄÊý¾Ý´ïµ½400MµÄʱºò£¬»á´¥·¢ºÏ²¢Ð´Èë¡£
ÅäÖÃmapÊä³öÊýÁ¿£º ͨ¹ýmapreduce .reduce.merge .inmem .thresholdÅäÖá£
Ôںϲ¢µÄ¹ý³ÌÖУ¬»á¶Ô±»ºÏ²¢µÄÎļþ×öÈ«¾ÖµÄÅÅÐò¡£Èç¹û×÷ÒµÅäÖÃÁËCombiner£¬Ôò»áÔËÐÐcombineº¯Êý£¬¼õÉÙдÈë´ÅÅ̵ÄÊý¾ÝÁ¿¡£
Copy¹ý³ÌÖдÅÅ̺ϲ¢
ÔÚcopy¹ýÀ´µÄÊý¾Ý²»¶ÏдÈë´ÅÅ̵Ĺý³ÌÖУ¬Ò»¸öºǫ́Ï̻߳á°ÑÕâЩÎļþºÏ²¢Îª¸ü´óµÄ¡¢ÓÐÐòµÄÎļþ¡£Èç¹ûmapµÄÊä³ö½á¹û½øÐÐÁËѹËõ£¬ÔòÔںϲ¢¹ý³ÌÖУ¬ÐèÒªÔÚÄÚ´æÖнâѹºó²ÅÄܸø½øÐкϲ¢¡£ÕâÀïµÄºÏ²¢Ö»ÊÇΪÁ˼õÉÙ×îÖպϲ¢µÄ¹¤×÷Á¿£¬Ò²¾ÍÊÇÔÚmapÊä³ö»¹ÔÚ¿½±´Ê±£¬¾Í¿ªÊ¼½øÐÐÒ»²¿·ÖºÏ²¢¹¤×÷¡£ºÏ²¢µÄ¹ý³ÌÒ»Ñù»á½øÐÐÈ«¾ÖÅÅÐò¡£
×îÖÕ´ÅÅÌÖкϲ¢
µ±ËùÓÐmapÊä³ö¶¼¿½±´Íê±ÏÖ®ºó£¬ËùÓÐÊý¾Ý±»×îºóºÏ²¢³ÉÒ»¸öÅÅÐòµÄÎļþ£¬×÷ΪreduceÈÎÎñµÄÊäÈë¡£Õâ¸öºÏ²¢¹ý³ÌÊÇÒ»ÂÖÒ»ÂÖ½øÐеģ¬×îºóÒ»Âֵĺϲ¢½á¹ûÖ±½ÓÍÆË͸øreduce×÷ΪÊäÈ룬½ÚÊ¡ÁË´ÅÅ̲Ù×÷µÄÒ»¸öÀ´»Ø¡£×îºó£¨ËùÒÔmapÊä³ö¶¼¿½±´µ½reduceÖ®ºó£©½øÐкϲ¢µÄmapÊä³ö¿ÉÄÜÀ´×Ժϲ¢ºóдÈë´ÅÅ̵ÄÎļþ£¬Ò²¿ÉÄÜÀ´¼°Äڴ滺³å£¬ÔÚ×îºóдÈëÄÚ´æµÄmapÊä³ö¿ÉÄÜûÓдﵽãÐÖµ´¥·¢ºÏ²¢£¬ËùÒÔ»¹ÁôÔÚÄÚ´æÖС£
ÿһÂֺϲ¢²¢²»Ò»¶¨ºÏ²¢Æ½¾ùÊýÁ¿µÄÎļþÊý£¬Ö¸µ¼ÔÔòÊÇʹÓÃÕû¸öºÏ²¢¹ý³ÌÖÐдÈë´ÅÅ̵ÄÊý¾ÝÁ¿×îС£¬ÎªÁË´ïµ½Õâ¸öÄ¿µÄ£¬ÔòÐèÒª×îÖÕµÄÒ»Âֺϲ¢Öкϲ¢¾¡¿ÉÄܶàµÄÊý¾Ý£¬ÒòΪ×îºóÒ»ÂÖµÄÊý¾ÝÖ±½Ó×÷ΪreduceµÄÊäÈ룬ÎÞÐèдÈë´ÅÅÌÔÙ¶Á³ö¡£Òò´ËÎÒÃÇÈÃ×îÖÕµÄÒ»Âֺϲ¢µÄÎļþÊý´ïµ½×î´ó£¬¼´ºÏ²¢Òò×ÓµÄÖµ£¬Í¨¹ýmapreduce.task.io.sort.factorÀ´ÅäÖá£
¼ÙÉèÏÖÔÚÓÐ50¸ömapÊä³öÎļþ£¬ºÏ²¢Òò×ÓÅäÖÃΪ10£¬ÔòÐèÒª5Âֵĺϲ¢¡£×îÖÕµÄÒ»ÂÖÈ·±£ºÏ²¢10¸öÎļþ£¬ÆäÖаüÀ¨4¸öÀ´×Ôǰ4Âֵĺϲ¢½á¹û£¬Òò´ËÔʼµÄ50¸öÖУ¬ÔÙÁô³ö6¸ö¸ø×îÖÕÒ»ÂÖ¡£ËùÒÔ×îºóµÄ5Âֺϲ¢¿ÉÄÜÇé¿öÈçÏ£º

ǰ4Âֺϲ¢ºóµÄÊý¾Ý¶¼ÊÇдÈëµ½´ÅÅÌÖеģ¬×¢Òâµ½×îºóµÄ2¸ñÑÕÉ«²»Ò»Ñù£¬ÊÇΪÁ˱êÃ÷ÕâЩÊý¾Ý¿ÉÄÜÖ±½ÓÀ´×ÔÓÚÄÚ´æ¡£
MemToMemºÏ²¢
³ýÁËÄÚ´æÖкϲ¢ºÍ´ÅÅÌÖкϲ¢Í⣬Hadoop»¹¶¨ÒåÁËÒ»ÖÖMemToMemºÏ²¢£¬ÕâÖֺϲ¢½«ÄÚ´æÖеÄmapÊä³öºÏ²¢£¬È»ºóÔÙдÈëÄÚ´æ¡£ÕâÖֺϲ¢Ä¬ÈϹرգ¬¿ÉÒÔͨ¹ýreduce.merge
.memtomem .enabled´ò¿ª£¬µ±mapÊä³öÎļþ´ïµ½reduce .merge .memtomem
.thresholdʱ£¬´¥·¢ÕâÖֺϲ¢¡£
×îºóÒ»´ÎºÏ²¢ºó´«µÝ¸øreduce·½·¨
ºÏ²¢ºóµÄÎļþ×÷ΪÊäÈë´«µÝ¸øReducer£¬ReducerÕë¶Ôÿ¸ökey¼°ÆäÅÅÐòµÄÊý¾Ýµ÷ÓÃreduceº¯Êý¡£²úÉúµÄreduceÊä³öÒ»°ãдÈëµ½HDFS£¬reduceÊä³öµÄÎļþµÚÒ»¸ö¸±±¾Ð´Èëµ½µ±Ç°ÔËÐÐreduceµÄ»úÆ÷£¬ÆäËû¸±±¾Ñ¡Ö·ÔÔò°´ÕÕ³£¹æµÄHDFSÊý¾ÝдÈëÔÔòÀ´½øÐУ¬ÏêϸÐÅÏ¢Çë²Î¿¼ÕâÀï¡£
ͨ¹ý´Ómap»úÆ÷ÌáÈ¡½á¹û£¬ºÏ²¢£¬combineÖ®ºó£¬´«µÝ¸øreduceÍê³É×îºó¹¤×÷£¬Õû¸ö¹ý³ÌÒ²¾Í²î²»¶àÍê³É¡£×îºóÔÙ¸ÐÊÜÒ»ÏÂÏÂÃæÕâÕÅͼ£º

ÐÔÄܵ÷ÓÅ
Èç¹ûÄܹ»¸ù¾ÝÇé¿ö¶Ôshuffle¹ý³Ì½øÐе÷ÓÅ£¬¶ÔÓÚÌṩMapReduceÐÔÄܺÜÓаïÖú¡£Ïà¹ØµÄ²ÎÊýÅäÖÃÁÐÔÚºóÃæµÄ±í¸ñÖС£
Ò»¸öͨÓõÄÔÔòÊǸøshuffle¹ý³Ì·ÖÅ価¿ÉÄÜ´óµÄÄڴ棬µ±È»ÄãÐèҪȷ±£mapºÍreduceÓÐ×ã¹»µÄÄÚ´æÀ´ÔËÐÐÒµÎñÂß¼¡£Òò´ËÔÚʵÏÖMapperºÍReducerʱ£¬Ó¦¸Ã¾¡Á¿¼õÉÙÄÚ´æµÄʹÓã¬ÀýÈç±ÜÃâÔÚMapÖв»¶ÏµØµþ¼Ó¡£
ÔËÐÐmapºÍreduceÈÎÎñµÄJVM£¬ÄÚ´æÍ¨¹ýmapred .child
.java.optsÊôÐÔÀ´ÉèÖ㬾¡¿ÉÄÜÉè´óÄÚ´æ¡£ÈÝÆ÷µÄÄÚ´æ´óСͨ¹ýmapreduce .map .memory
.mb ºÍmapreduce .reduce .memory .mbÀ´ÉèÖã¬Ä¬È϶¼ÊÇ1024M¡£
mapÓÅ»¯
ÔÚmap¶Ë£¬±ÜÃâдÈë¶à¸öspillÎļþ¿ÉÄÜ´ïµ½×îºÃµÄÐÔÄÜ£¬Ò»¸öspillÎļþÊÇ×îºÃµÄ¡£Í¨¹ý¹À¼ÆmapµÄÊä³ö´óС£¬ÉèÖúÏÀíµÄmapreduce.task.io.sort.*ÊôÐÔ£¬Ê¹µÃspillÎļþÊýÁ¿×îС¡£ÀýÈ羡¿ÉÄܵ÷´ómapreduce.task.io.sort.mb¡£
map¶ËÏà¹ØµÄÊôÐÔÈçÏÂ±í£º

reduceÓÅ»¯
ÔÚreduce¶Ë£¬Èç¹ûÄܹ»ÈÃËùÓÐÊý¾Ý¶¼±£´æÔÚÄÚ´æÖУ¬¿ÉÒÔ´ïµ½×î¼ÑµÄÐÔÄÜ¡£Í¨³£Çé¿öÏ£¬ÄÚ´æ¶¼±£Áô¸øreduceº¯Êý£¬µ«ÊÇÈç¹ûreduceº¯Êý¶ÔÄÚ´æÐèÇó²»ÊǺܸߣ¬½«mapreduce.reduce.merge.inmem.threshold£¨´¥·¢ºÏ²¢µÄmapÊä³öÎļþÊý£©ÉèΪ0£¬mapreduce.reduce.input.buffer.percent£¨ÓÃÓÚ±£´æmapÊä³öÎļþµÄ¶ÑÄÚ´æ±ÈÀý£©ÉèΪ1.0£¬¿ÉÒÔ´ïµ½ºÜºÃµÄÐÔÄÜÌáÉý¡£ÔÚ2008ÄêµÄTB¼¶±ðÊý¾ÝÅÅÐòÐÔÄܲâÊÔÖУ¬Hadoop¾ÍÊÇͨ¹ý½«reduceµÄÖмäÊý¾Ý¶¼±£´æÔÚÄÚ´æÖÐʤÀûµÄ¡£
reduce¶ËÏà¹ØÊôÐÔ£º

ͨÓÃÓÅ»¯
HadoopĬÈÏʹÓÃ4KB×÷Ϊ»º³å£¬Õâ¸öËãÊǺÜСµÄ£¬¿ÉÒÔͨ¹ýio .file
.buffer .sizeÀ´µ÷¸ß»º³å³Ø´óС¡£ |