¾É°æ
API µÄ Partitioner ½âÎö
Partitioner µÄ×÷ÓÃÊÇ¶Ô Mapper ²úÉúµÄÖмä½á¹û½øÐÐ·ÖÆ¬£¬ÒԱ㽫ͬһ·Ö×éµÄÊý¾Ý½»¸øÍ¬Ò»¸ö
Reducer ´¦Àí£¬ËüÖ±½ÓÓ°Ïì Reduce ½×¶ÎµÄ¸ºÔؾùºâ¡£¾É°æ API ÖÐ Partitioner
µÄÀàͼÈçͼËùʾ¡£Ëü¼Ì³ÐÁËJobConfigurable£¬¿Éͨ¹ý configure ·½·¨³õʼ»¯¡£Ëü±¾ÉíÖ»°üº¬Ò»¸ö´ýʵÏֵķ½·¨
getPartition¡£ ¸Ã·½·¨°üº¬Èý¸ö²ÎÊý£¬ ¾ùÓÉ¿ò¼Ü×Ô¶¯´«Èë£¬Ç°ÃæÁ½¸ö²ÎÊýÊÇkey/value£¬µÚÈý¸ö²ÎÊý
numPartitions ±íʾÿ¸ö Mapper µÄ·ÖƬÊý£¬Ò²¾ÍÊÇ Reducer µÄ¸öÊý¡£

MapReduce ÌṩÁËÁ½¸öPartitioner ʵ ÏÖ£ºHashPartitionerºÍTotalOrderPartitioner¡£ÆäÖÐ
HashPartitioner ÊÇĬÈÏʵÏÖ£¬ËüʵÏÖÁËÒ»ÖÖ»ùÓÚ¹þÏ£ÖµµÄ·ÖƬ·½·¨£¬´úÂëÈçÏ£º
public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } |
TotalOrderPartitioner ÌṩÁËÒ»ÖÖ»ùÓÚÇø¼äµÄ·ÖƬ·½·¨£¬Í¨³£ÓÃÔÚÊý¾ÝÈ«ÅÅÐòÖС£ÔÚMapReduce
»·¾³ÖУ¬ÈÝÒ×Ïëµ½µÄÈ«ÅÅÐò·½°¸Êǹ鲢ÅÅÐò£¬¼´ÔÚ Map ½×¶Î£¬Ã¿¸ö Map Task½øÐоֲ¿ÅÅÐò£»ÔÚ Reduce
½×¶Î£¬Æô¶¯Ò»¸ö Reduce Task ½øÐÐÈ«¾ÖÅÅÐò¡£ÓÉÓÚ×÷ÒµÖ»ÄÜÓÐÒ»¸ö Reduce Task£¬Òò¶ø
Reduce ½×¶Î»á³ÉΪ×÷ÒµµÄÆ¿¾±¡£ÎªÁËÌá¸ßÈ«¾ÖÅÅÐòµÄÐÔÄܺÍÀ©Õ¹ÐÔ£¬MapReduce ÌṩÁË TotalOrderPartitioner¡£ËüÄܹ»°´ÕÕ´óС½«Êý¾Ý·Ö³ÉÈô¸É¸öÇø¼ä£¨·ÖƬ£©£¬²¢±£Ö¤ºóÒ»¸öÇø¼äµÄËùÓÐÊý¾Ý¾ù´óÓÚǰһ¸öÇø¼äÊý¾Ý£¬ÕâʹµÃÈ«ÅÅÐòµÄ²½ÖèÈçÏ£º
²½Öè1£ºÊý¾Ý²ÉÑù¡£ÔÚ Client ¶Ëͨ¹ý²ÉÑù»ñÈ¡·ÖƬµÄ·Ö¸îµã¡£Hadoop
×Ô´øÁ˼¸¸ö²ÉÑùËã·¨£¬Èç IntercalSampler¡¢ RandomSampler¡¢ SplitSampler
µÈ£¨¾ßÌå¼ûorg.apache.hadoop.mapred.lib °üÖÐµÄ InputSampler Àࣩ¡£
ÏÂÃæ¾ÙÀý˵Ã÷¡£
²ÉÑùÊý¾ÝΪ£º b£¬ abc£¬ abd£¬ bcd£¬ abcd£¬ efg£¬
hii£¬ afd£¬ rrr£¬ mnk
¾ÅÅÐòºóµÃµ½£º abc£¬ abcd£¬ abd£¬ afd£¬ b£¬ bcd£¬
efg£¬ hii£¬ mnk£¬ rrr
Èç¹û Reduce Task ¸öÊýΪ 4£¬Ôò²ÉÑùÊý¾ÝµÄËĵȷֵãΪ abd¡¢
bcd¡¢ mnk£¬½«Õâ 3 ¸ö×Ö·û´®×÷Ϊ·Ö¸îµã¡£
²½Öè2£ºMap ½×¶Î¡£±¾½×¶ÎÉæ¼°Á½¸ö×é¼þ£¬·Ö±ðÊÇ Mapper ºÍ Partitioner¡£ÆäÖУ¬Mapper
¿É²ÉÓà IdentityMapper£¬Ö±½Ó½«ÊäÈëÊý¾ÝÊä³ö£¬µ« Partitioner ±ØÐëÑ¡ÓÃTotalOrderPartitioner£¬Ëü½«²½Öè
1 ÖлñÈ¡µÄ·Ö¸îµã±£´æµ½ trie Ê÷ÖÐÒÔ±ã¿ìËÙ¶¨Î»ÈÎÒâÒ»¸ö¼Ç¼ËùÔÚµÄÇø¼ä£¬ÕâÑù£¬Ã¿¸ö Map Task
²úÉú R£¨Reduce Task ¸öÊý£©¸öÇø¼ä£¬ÇÒÇø¼äÖ®¼äÓÐÐò¡£TotalOrderPartitioner
ͨ¹ý trie Ê÷²éÕÒÿÌõ¼Ç¼Ëù¶ÔÓ¦µÄ Reduce Task ±àºÅ¡£ ÈçͼËùʾ£¬ ÎÒÃǽ«·Ö¸îµã ±£´æÔÚÉî¶ÈΪ
2 µÄ trie Ê÷ÖУ¬ ¼ÙÉèÊäÈëÊý¾ÝÖÐ ÓÐÁ½¸ö×Ö·û´®¡° abg¡±ºÍ¡° mnz¡±£¬ Ôò×Ö·û´®¡° abg¡±
¶ÔÓ¦ partition1£¬ ¼´µÚ 2 ¸ö Reduce Task£¬ ×Ö·û´®¡° mnz¡± ¶ÔÓ¦partition3£¬
¼´µÚ 4 ¸ö Reduce Task¡£

²½Öè 3£ºReduce ½×¶Î¡£Ã¿¸ö Reducer ¶Ô·ÖÅäµ½µÄÇø¼äÊý¾Ý½øÐоֲ¿ÅÅÐò£¬×îÖյõ½È«ÅÅÐòÊý¾Ý¡£´ÓÒÔÉϲ½Öè¿ÉÒÔ¿´³ö£¬»ùÓÚ
TotalOrderPartitioner È«ÅÅÐòµÄЧÂʸú key ·Ö²¼¹æÂɺͲÉÑùËã·¨ÓÐÖ±½Ó¹ØÏµ£»key
Öµ·Ö²¼Ô½¾ùÔÈÇÒ²ÉÑùÔ½¾ßÓдú±íÐÔ£¬Ôò Reduce Task ¸ºÔØÔ½¾ùºâ£¬È«ÅÅÐòЧÂÊÔ½¸ß¡£TotalOrderPartitioner
ÓÐÁ½¸öµäÐ͵ÄÓ¦ÓÃʵÀý£º TeraSort ºÍ HBase ÅúÁ¿Êý¾Ýµ¼Èë¡£ ÆäÖУ¬TeraSort ÊÇ Hadoop
×Ô ´øµÄÒ»¸öÓ¦ÓóÌÐòʵÀý¡£ ËüÔøÔÚ TB ¼¶Êý¾ÝÅÅÐò»ù×¼ÆÀ¹ÀÖÐ Ó®µÃµÚÒ»Ãû£¬¶ø TotalOrderPartitionerÕýÊÇ´Ó¸ÃʵÀýÖÐÌáÁ¶³öÀ´µÄ¡£HBase
ÊÇÒ»¸ö¹¹½¨ÔÚ HadoopÖ®É쵀 NoSQL Êý¾Ý²Ö¿â¡£ËüÒÔ RegionΪµ¥Î»»®·ÖÊý¾Ý£¬Region
ÄÚ²¿Êý¾ÝÓÐÐò£¨°´ key ÅÅÐò£©£¬Region Ö®¼äÒ²ÓÐÐò¡£ºÜÃ÷ÏÔ£¬Ò»¸ö MapReduce È«ÅÅÐò×÷ÒµµÄ
R ¸öÊä³öÎļþÕýºÃ¿É¶ÔÓ¦ HBase µÄ R ¸ö Region¡£
аæ API µÄ Partitioner ½âÎö
аæ API ÖеÄPartitionerÀàͼÈçͼËùʾ¡£Ëü²»ÔÙʵÏÖJobConfigurable
½Ó¿Ú¡£µ±Óû§ÐèÒªÈà Partitionerͨ¹ýij¸öJobConf ¶ÔÏó³õʼ»¯Ê±£¬¿É×ÔÐÐʵÏÖConfigurable
½Ó¿Ú£¬È磺
public class TotalOrderPartitioner<K, V> extends Partitioner<K,V> implements Configurable |

PartitionËù´¦µÄλÖÃ

PartitionÖ÷Òª×÷ÓþÍÊǽ«mapµÄ½á¹û·¢Ë͵½ÏàÓ¦µÄreduce¡£Õâ¾Í¶ÔpartitionÓÐÁ½¸öÒªÇó£º
1£©¾ùºâ¸ºÔØ£¬¾¡Á¿µÄ½«¹¤×÷¾ùÔȵķÖÅ䏸²»Í¬µÄreduce¡£
2£©Ð§ÂÊ£¬·ÖÅäËÙ¶ÈÒ»¶¨Òª¿ì¡£
MapreduceÌṩµÄPartitioner

patitionÀà½á¹¹
1. Partitioner<k,v>ÊÇpartitionerµÄ»ùÀ࣬Èç¹ûÐèÒª¶¨ÖÆpartitionerÒ²ÐèÒª¼Ì³Ð¸ÃÀà¡£Ô´´úÂëÈçÏ£º
package org.apache.hadoop.mapred; /** * Partitions the key space. * * <p class="artcon"><code>Partitioner</code> controls the partitioning of the keys of the * intermediate map-outputs. The key (or a subset of the key) is used to derive * the partition, typically by a hash function. The total number of partitions * is the same as the number of reduce tasks for the job. Hence this controls * which of the <code>m</code> reduce tasks the intermediate key (and hence the * record) is sent for reduction.</p> * * @see Reducer * @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead. */ @Deprecated public interface Partitioner<K2, V2> extends JobConfigurable { /** * Get the paritition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job. * * <p class="artcon">Typically a hash function on a all or a subset of the key.</p> * * @param key the key to be paritioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the <code>key</code>. */ int getPartition(K2 key, V2 value, int numPartitions); } |
2. HashPartitioner<k,v>ÊÇmapreduceµÄĬÈÏpartitioner¡£Ô´´úÂëÈçÏ£º
package org.apache.hadoop.mapreduce.lib.partition; import org.apache.hadoop.mapreduce.Partitioner; /** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } |
3. BinaryPatitioner¼Ì³ÐÓÚPartitioner<BinaryComparable
,V>£¬ÊÇPartitioner<k,v>µÄÆ«ÌØ»¯×ÓÀà¡£¸ÃÀàÌṩleftOffsetºÍrightOffset£¬ÔÚ¼ÆËãwhich
reducerʱ½ö¶Ô¼üÖµKµÄ[rightOffset£¬leftOffset]Õâ¸öÇø¼äÈ¡hash¡£
reducer=(hash & Integer.MAX_VALUE) % numReduceTasks |
4. KeyFieldBasedPartitioner<k2, v2="">Ò²ÊÇ»ùÓÚhashµÄ¸öpartitioner¡£ºÍBinaryPatitioner²»Í¬£¬ËüÌṩÁ˶à¸öÇø¼äÓÃÓÚ¼ÆËãhash¡£µ±Çø¼äÊýΪ0ʱKeyFieldBasedPartitionerÍË»¯³ÉHashPartitioner¡£
Ô´´úÂëÈçÏ£º
package org.apache.hadoop.mapred.lib;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
/**
* Defines a way to partition keys based on certain key fields (also see
* {@link KeyFieldBasedComparator}.
* The key specification supported is of the form -k pos1[,pos2], where,
* pos is of the form f[.c][opts], where f is the number
* of the key field to use, and c is the number of the first character from
* the beginning of the field. Fields and character posns are numbered
* starting with 1; a character position of zero in pos2 indicates the
* field's last character. If '.c' is omitted from pos1, it defaults to 1
* (the beginning of the field); if omitted from pos2, it defaults to 0
* (the end of the field).
*
*/
public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {
private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
private int numOfPartitionFields;
private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
public void configure(JobConf job) {
String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
if (job.get("num.key.fields.for.partition") != null) {
LOG.warn("Using deprecated num.key.fields.for.partition. " +
"Use mapred.text.key.partitioner.options instead");
this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
} else {
String option = job.getKeyFieldPartitionerOption();
keyFieldHelper.parseOption(option);
}
}
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
byte[] keyBytes;
List allKeySpecs = keyFieldHelper.keySpecs();
if (allKeySpecs.size() == 0) {
return getPartition(key.toString().hashCode(), numReduceTasks);
}
try {
keyBytes = key.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not " +
"support UTF-8 encoding!", e);
}
// return 0 if the key is empty
if (keyBytes.length == 0) {
return 0;
}
int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
keyBytes.length);
int currentHash = 0;
for (KeyDescription keySpec : allKeySpecs) {
int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
// no key found! continue
if (startChar < 0) {
continue;
}
int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
currentHash = hashCode(keyBytes, startChar, endChar,
currentHash);
}
return getPartition(currentHash, numReduceTasks);
}
protected int hashCode(byte[] b, int start, int end, int currentHash) {
for (int i = start; i <= end; i++) {
currentHash = 31*currentHash + b[i];
}
return currentHash;
}
protected int getPartition(int hash, int numReduceTasks) {
return (hash & Integer.MAX_VALUE) % numReduceTasks;
}
} |
5. TotalOrderPartitionerÕâ¸öÀà¿ÉÒÔʵÏÖÊä³öµÄÈ«ÅÅÐò¡£²»Í¬ÓÚÒÔÉÏ3¸öpartitioner£¬Õâ¸öÀಢ²»ÊÇ»ùÓÚhashµÄ¡£ÏÂÃæÏêϸµÄ½éÉÜTotalOrderPartitioner
TotalOrderPartitioner Àà
ÿһ¸öreducerµÄÊä³öÔÚĬÈϵÄÇé¿ö϶¼ÊÇÓÐ˳ÐòµÄ£¬µ«ÊÇreducerÖ®¼äÔÚÊäÈëÊÇÎÞÐòµÄÇé¿öÏÂÒ²ÊÇÎÞÐòµÄ¡£Èç¹ûҪʵÏÖÊä³öÊÇÈ«ÅÅÐòµÄÄǾͻáÓõ½TotalOrderPartitioner¡£
ҪʹÓÃTotalOrderPartitioner£¬µÃ¸øTotalOrderPartitionerÌṩһ¸öpartition
file¡£Õâ¸öÎļþÒªÇóKey£¨ÕâЩkey¾ÍÊÇËùνµÄ»®·Ö£©µÄÊýÁ¿ºÍµ±Ç°reducerµÄÊýÁ¿-1Ïàͬ²¢ÇÒÊÇ´ÓСµ½´óÅÅÁС£¶ÔÓÚΪʲôҪÓõ½ÕâÑùÒ»¸öÎļþ£¬ÒÔ¼°Õâ¸öÎļþµÄ¾ßÌåϸ½Ú´ý»á»¹»áÌáµ½¡£
TotalOrderPartitioner¶Ô²»Í¬KeyµÄÊý¾ÝÀàÐÍÌṩÁËÁ½ÖÖ·½°¸£º
1£© ¶ÔÓÚ·ÇBinaryComparable ÀàÐ͵ÄKey£¬TotalOrderPartitioner²ÉÓöþ·Ö·¢²éÕÒµ±Ç°µÄKËùÔÚµÄindex¡£
ÀýÈ磺reducerµÄÊýÁ¿Îª5£¬partition file ÌṩµÄ4¸ö»®·ÖΪ¡¾2£¬4£¬6£¬8¡¿¡£Èç¹ûµ±Ç°µÄÒ»¸ökey/value
ÊÇ<4,¡±good¡±>£¬ÀûÓöþ·Ö·¨²éÕÒµ½index=1£¬index+1=2ÄÇôÕâ¸ökey/value
½«»á·¢Ë͵½µÚ¶þ¸öreducer¡£Èç¹ûÒ»¸ökey/valueΪ<4.5, ¡°good¡±>¡£ÄÇô¶þ·Ö·¨²éÕÒ½«·µ»Ø-3£¬Í¬Ñù¶Ô-3¼Ó1È»ºóÈ¡·´¾ÍÊÇÕâ¸ökey/value½«ÒªÈ¥µÄreducer¡£
¶ÔÓÚһЩÊýÖµÐ͵ÄÊý¾ÝÀ´Ëµ£¬ÀûÓöþ·Ö·¨²éÕÒ¸´ÔÓ¶ÈÊÇO(log(reducer count))£¬ËٶȱȽϿ졣
2£© ¶ÔÓÚBinaryComparableÀàÐ͵ÄKey£¨Ò²¿ÉÒÔÖ±½ÓÀí½âΪ×Ö·û´®£©¡£×Ö·û´®°´ÕÕ×Öµä˳ÐòÒ²ÊÇ¿ÉÒÔ½øÐÐÅÅÐòµÄ¡£
ÕâÑùµÄ»°Ò²¿ÉÒÔ¸ø¶¨Ò»Ð©»®·Ö£¬Èò»Í¬µÄ×Ö·û´®key·ÖÅäµ½²»Í¬µÄreducerÀï¡£ÕâÀïµÄ´¦ÀíºÍÊýÖµÀàÐ͵ıȽÏÏà½ü¡£
ÀýÈ磺reducerµÄÊýÁ¿Îª5£¬partition file ÌṩÁË4¸ö»®·ÖΪ¡¾¡°abc¡±, ¡°bce¡±,
¡°eaa¡±, ¡±fhc¡±¡¿ÄÇô¡°ab¡±Õâ¸ö×Ö·û´®½«»á±»·ÖÅäµ½µÚÒ»¸öreducerÀÒòΪËüСÓÚµÚÒ»¸ö»®·Ö¡°abc¡±¡£
µ«ÊDz»Í¬ÓÚÊýÖµÐ͵ÄÊý¾Ý£¬×Ö·û´®µÄ²éÕҺͱȽϲ»Äܰ´ÕÕÊýÖµÐÍÊý¾ÝµÄ±È½Ï·½·¨¡£mapreducer²ÉÓõÄTire
tree£¨¹ØÓÚTire tree¿ÉÒԲο¼¡¶×ÖµäÊ÷(Trie Tree)¡·£©µÄ×Ö·û´®²éÕÒ·½·¨¡£²éÕÒµÄʱ¼ä¸´ÔÓ¶Èo(m)£¬mΪÊ÷µÄÉî¶È£¬¿Õ¼ä¸´ÔÓ¶Èo(255^m-1)¡£ÊÇÒ»¸öµäÐ͵Ŀռ任ʱ¼äµÄ°¸Àý¡£
Tire treeµÄ¹¹½¨
¼ÙÉèÊ÷µÄ×î´óÉî¶ÈΪ3£¬»®·ÖΪ¡¾aaad £¬aaaf£¬ aaaeh£¬abbx¡¿

MapreduceÀïµÄTire treeÖ÷ÒªÓÐÁ½ÖÖ½Úµã×é³É£º
1£© Innertirenode
InnertirenodeÔÚmapreduceÖÐÊǰüº¬ÁË255¸ö×Ö·ûµÄÒ»¸ö±È½Ï³¤µÄ´®¡£ÉÏͼÖеÄÀý×ÓÖ»°üº¬ÁË26¸öÓ¢ÎÄ×Öĸ¡£
2£© Ò¶×Ó½Úµã{unslipttirenode, singesplittirenode,
leaftirenode}
Unslipttirenode ÊDz»°üº¬»®·ÖµÄÒ¶×ӽڵ㡣
Singlesplittirenode ÊÇÖ»°üº¬ÁËÒ»¸ö»®·ÖµãµÄÒ¶×ӽڵ㡣
LeafnodeÊǰüº¬Á˶à¸ö»®·ÖµãµÄÒ¶×ӽڵ㡣£¨ÕâÖÖÇé¿ö±È½ÏÉÙ¼û£¬´ïµ½Ê÷µÄ×î´óÉî¶È²Å³öÏÖÕâÖÖÇé¿ö¡£ÔÚʵ¼Ê²Ù×÷¹ý³ÌÖбȽÏÉÙ¼û£©
Tire treeµÄËÑË÷¹ý³Ì
½ÓÉÏÃæµÄÀý×Ó£º
1£©¼ÙÈ統ǰ key value pair <aad, 10="">Õâʱ»áÕÒµ½Í¼ÖеÄleafnode£¬ÔÚleafnodeÄÚ²¿Ê¹Óöþ·Ö·¨¼ÌÐø²éÕÒÕÒµ½·µ»Ø
aadÔÚ»®·ÖÊý×éÖеÄË÷Òý¡£ÕÒ²»µ½»á·µ»ØÒ»¸öºÍËü×î½Ó½üµÄ»®·ÖµÄË÷Òý¡£
2£©¼ÙÈçÕÒµ½singlenode£¬Èç¹ûºÍsinglenodeµÄ»®·ÖÏàͬ»òС·µ»ØËûµÄË÷Òý£¬±ÈsinglenodeµÄ»®·Ö´óÔò·µ»ØË÷Òý+1¡£
3£©¼ÙÈçÕÒµ½nosplitnodeÔò·µ»ØÇ°ÃæµÄË÷Òý¡£Èç<zaa,
20="">½«»á·µ»ØabbxµÄÔÚ»®·ÖÊý×éÖеÄË÷Òý¡£
TotalOrderPartitionerµÄÒÉÎÊ
ÉÏÃæ½éÉÜÁËpartitionerÓÐÁ½¸öÒªÇó£¬Ò»¸öÊÇËÙ¶È£¬ÁíÍâÒ»¸öÊǾùºâ¸ºÔØ¡£Ê¹ÓÃtire treeÌá¸ßÁËËÑËØµÄËÙ¶È£¬µ«ÊÇÎÒÃÇÔõô²ÅÄÜÕÒµ½ÕâÑùµÄpartition
file ÄØ£¿ÈÃËùÓеĻ®·Ö¸ÕºÃ¾ÍÄÜʵÏÖ¾ùºâ¸ºÔØ¡£
InputSampler
ÊäÈë²ÉÑùÀ࣬¿ÉÒÔ¶ÔÊäÈëĿ¼ÏµÄÊý¾Ý½øÐвÉÑù¡£ÌṩÁË3ÖÖ²ÉÑù·½·¨¡£

²ÉÑùÀà½á¹¹Í¼
²ÉÑù·½Ê½¶Ô±È±í:

writePartitionFileÕâ¸ö·½·¨ºÜ¹Ø¼ü£¬Õâ¸ö·½·¨¾ÍÊǸù¾Ý²ÉÑùÀàÌṩµÄÑù±¾£¬Ê×ÏȽøÐÐÅÅÐò£¬È»ºóÑ¡¶¨£¨Ëæ»úµÄ·½·¨£©ºÍreducerÊýÄ¿-1µÄÑù±¾Ð´Èëµ½partition
file¡£ÕâÑù¾¹ý²ÉÑùµÄÊý¾ÝÉú³ÉµÄ»®·Ö£¬ÔÚÿ¸ö»®·ÖÇø¼äÀïµÄkey/value¾Í½üËÆÏàͬÁË£¬ÕâÑù¾ÍÄÜÍê³É¾ùºâ¸ºÔصÄ×÷Óá£
SplitSamplerÀàµÄÔ´´úÂëÈçÏ£º
/**
* Samples the first n records from s splits.
* Inexpensive way to sample random data.
*/
public static class SplitSampler<K,V> implements Sampler<K,V> {
private final int numSamples;
private final int maxSplitsSampled;
/**
* Create a SplitSampler sampling all splits.
* Takes the first numSamples / numSplits records from each split.
* @param numSamples Total number of samples to obtain from all selected
* splits.
*/
public SplitSampler(int numSamples) {
this(numSamples, Integer.MAX_VALUE);
}
/**
* Create a new SplitSampler.
* @param numSamples Total number of samples to obtain from all selected
* splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
public SplitSampler(int numSamples, int maxSplitsSampled) {
this.numSamples = numSamples;
this.maxSplitsSampled = maxSplitsSampled;
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList samples = new ArrayList(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
} |
RandomSamplerÀàµÄÔ´´úÂëÈçÏ£º
/**
* Sample from random points in the input.
* General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
* each split.
*/
public static class RandomSampler<K,V> implements Sampler<K,V> {
private double freq;
private final int numSamples;
private final int maxSplitsSampled;
/**
* Create a new RandomSampler sampling all splits.
* This will read every split at the client, which is very expensive.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* splits.
*/
public RandomSampler(double freq, int numSamples) {
this(freq, numSamples, Integer.MAX_VALUE);
}
/**
* Create a new RandomSampler.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
this.freq = freq;
this.numSamples = numSamples;
this.maxSplitsSampled = maxSplitsSampled;
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList samples = new ArrayList(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
} |
IntervalSamplerÀàµÄÔ´´úÂëΪ£º
/**
* Sample from s splits at regular intervals.
* Useful for sorted data.
*/
public static class IntervalSampler<K,V> implements Sampler<K,V> {
private final double freq;
private final int maxSplitsSampled;
/**
* Create a new IntervalSampler sampling all splits.
* @param freq The frequency with which records will be emitted.
*/
public IntervalSampler(double freq) {
this(freq, Integer.MAX_VALUE);
}
/**
* Create a new IntervalSampler.
* @param freq The frequency with which records will be emitted.
* @param maxSplitsSampled The maximum number of splits to examine.
* @see #getSample
*/
public IntervalSampler(double freq, int maxSplitsSampled) {
this.freq = freq;
this.maxSplitsSampled = maxSplitsSampled;
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList samples = new ArrayList();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
} |
TotalOrderPartitionerʵÀý
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool
{
@Override
public int run(String[] args) throws Exception
{
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat
.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf,
CompressionType.BLOCK);
conf.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
0.1, 10000, 10);
Path input = FileInputFormat.getInputPaths(conf)[0];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input, "_partitions");
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler);
// Add to DistributedCache
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
}
} |
|