Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
HadoopÖÐPartitionÉî¶È½âÎö
 
×÷Õߣºcodingwu À´Ô´£º²©¿ÍÔ° ·¢²¼ÓÚ£º2015-04-28
  3510  次浏览      28
 

¾É°æ API µÄ Partitioner ½âÎö

Partitioner µÄ×÷ÓÃÊÇ¶Ô Mapper ²úÉúµÄÖмä½á¹û½øÐÐ·ÖÆ¬£¬ÒԱ㽫ͬһ·Ö×éµÄÊý¾Ý½»¸øÍ¬Ò»¸ö Reducer ´¦Àí£¬ËüÖ±½ÓÓ°Ïì Reduce ½×¶ÎµÄ¸ºÔؾùºâ¡£¾É°æ API ÖÐ Partitioner µÄÀàͼÈçͼËùʾ¡£Ëü¼Ì³ÐÁËJobConfigurable£¬¿Éͨ¹ý configure ·½·¨³õʼ»¯¡£Ëü±¾ÉíÖ»°üº¬Ò»¸ö´ýʵÏֵķ½·¨ getPartition¡£ ¸Ã·½·¨°üº¬Èý¸ö²ÎÊý£¬ ¾ùÓÉ¿ò¼Ü×Ô¶¯´«Èë£¬Ç°ÃæÁ½¸ö²ÎÊýÊÇkey/value£¬µÚÈý¸ö²ÎÊý numPartitions ±íʾÿ¸ö Mapper µÄ·ÖƬÊý£¬Ò²¾ÍÊÇ Reducer µÄ¸öÊý¡£

MapReduce ÌṩÁËÁ½¸öPartitioner ʵ ÏÖ£ºHashPartitionerºÍTotalOrderPartitioner¡£ÆäÖÐ HashPartitioner ÊÇĬÈÏʵÏÖ£¬ËüʵÏÖÁËÒ»ÖÖ»ùÓÚ¹þÏ£ÖµµÄ·ÖƬ·½·¨£¬´úÂëÈçÏ£º

public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

TotalOrderPartitioner ÌṩÁËÒ»ÖÖ»ùÓÚÇø¼äµÄ·ÖƬ·½·¨£¬Í¨³£ÓÃÔÚÊý¾ÝÈ«ÅÅÐòÖС£ÔÚMapReduce »·¾³ÖУ¬ÈÝÒ×Ïëµ½µÄÈ«ÅÅÐò·½°¸Êǹ鲢ÅÅÐò£¬¼´ÔÚ Map ½×¶Î£¬Ã¿¸ö Map Task½øÐоֲ¿ÅÅÐò£»ÔÚ Reduce ½×¶Î£¬Æô¶¯Ò»¸ö Reduce Task ½øÐÐÈ«¾ÖÅÅÐò¡£ÓÉÓÚ×÷ÒµÖ»ÄÜÓÐÒ»¸ö Reduce Task£¬Òò¶ø Reduce ½×¶Î»á³ÉΪ×÷ÒµµÄÆ¿¾±¡£ÎªÁËÌá¸ßÈ«¾ÖÅÅÐòµÄÐÔÄܺÍÀ©Õ¹ÐÔ£¬MapReduce ÌṩÁË TotalOrderPartitioner¡£ËüÄܹ»°´ÕÕ´óС½«Êý¾Ý·Ö³ÉÈô¸É¸öÇø¼ä£¨·ÖƬ£©£¬²¢±£Ö¤ºóÒ»¸öÇø¼äµÄËùÓÐÊý¾Ý¾ù´óÓÚǰһ¸öÇø¼äÊý¾Ý£¬ÕâʹµÃÈ«ÅÅÐòµÄ²½ÖèÈçÏ£º

²½Öè1£ºÊý¾Ý²ÉÑù¡£ÔÚ Client ¶Ëͨ¹ý²ÉÑù»ñÈ¡·ÖƬµÄ·Ö¸îµã¡£Hadoop ×Ô´øÁ˼¸¸ö²ÉÑùËã·¨£¬Èç IntercalSampler¡¢ RandomSampler¡¢ SplitSampler µÈ£¨¾ßÌå¼ûorg.apache.hadoop.mapred.lib °üÖÐµÄ InputSampler Àࣩ¡£ ÏÂÃæ¾ÙÀý˵Ã÷¡£

²ÉÑùÊý¾ÝΪ£º b£¬ abc£¬ abd£¬ bcd£¬ abcd£¬ efg£¬ hii£¬ afd£¬ rrr£¬ mnk

¾­ÅÅÐòºóµÃµ½£º abc£¬ abcd£¬ abd£¬ afd£¬ b£¬ bcd£¬ efg£¬ hii£¬ mnk£¬ rrr

Èç¹û Reduce Task ¸öÊýΪ 4£¬Ôò²ÉÑùÊý¾ÝµÄËĵȷֵãΪ abd¡¢ bcd¡¢ mnk£¬½«Õâ 3 ¸ö×Ö·û´®×÷Ϊ·Ö¸îµã¡£

²½Öè2£ºMap ½×¶Î¡£±¾½×¶ÎÉæ¼°Á½¸ö×é¼þ£¬·Ö±ðÊÇ Mapper ºÍ Partitioner¡£ÆäÖУ¬Mapper ¿É²ÉÓà IdentityMapper£¬Ö±½Ó½«ÊäÈëÊý¾ÝÊä³ö£¬µ« Partitioner ±ØÐëÑ¡ÓÃTotalOrderPartitioner£¬Ëü½«²½Öè 1 ÖлñÈ¡µÄ·Ö¸îµã±£´æµ½ trie Ê÷ÖÐÒÔ±ã¿ìËÙ¶¨Î»ÈÎÒâÒ»¸ö¼Ç¼ËùÔÚµÄÇø¼ä£¬ÕâÑù£¬Ã¿¸ö Map Task ²úÉú R£¨Reduce Task ¸öÊý£©¸öÇø¼ä£¬ÇÒÇø¼äÖ®¼äÓÐÐò¡£TotalOrderPartitioner ͨ¹ý trie Ê÷²éÕÒÿÌõ¼Ç¼Ëù¶ÔÓ¦µÄ Reduce Task ±àºÅ¡£ ÈçͼËùʾ£¬ ÎÒÃǽ«·Ö¸îµã ±£´æÔÚÉî¶ÈΪ 2 µÄ trie Ê÷ÖУ¬ ¼ÙÉèÊäÈëÊý¾ÝÖÐ ÓÐÁ½¸ö×Ö·û´®¡° abg¡±ºÍ¡° mnz¡±£¬ Ôò×Ö·û´®¡° abg¡± ¶ÔÓ¦ partition1£¬ ¼´µÚ 2 ¸ö Reduce Task£¬ ×Ö·û´®¡° mnz¡± ¶ÔÓ¦partition3£¬ ¼´µÚ 4 ¸ö Reduce Task¡£

²½Öè 3£ºReduce ½×¶Î¡£Ã¿¸ö Reducer ¶Ô·ÖÅäµ½µÄÇø¼äÊý¾Ý½øÐоֲ¿ÅÅÐò£¬×îÖյõ½È«ÅÅÐòÊý¾Ý¡£´ÓÒÔÉϲ½Öè¿ÉÒÔ¿´³ö£¬»ùÓÚ TotalOrderPartitioner È«ÅÅÐòµÄЧÂʸú key ·Ö²¼¹æÂɺͲÉÑùËã·¨ÓÐÖ±½Ó¹ØÏµ£»key Öµ·Ö²¼Ô½¾ùÔÈÇÒ²ÉÑùÔ½¾ßÓдú±íÐÔ£¬Ôò Reduce Task ¸ºÔØÔ½¾ùºâ£¬È«ÅÅÐòЧÂÊÔ½¸ß¡£TotalOrderPartitioner ÓÐÁ½¸öµäÐ͵ÄÓ¦ÓÃʵÀý£º TeraSort ºÍ HBase ÅúÁ¿Êý¾Ýµ¼Èë¡£ ÆäÖУ¬TeraSort ÊÇ Hadoop ×Ô ´øµÄÒ»¸öÓ¦ÓóÌÐòʵÀý¡£ ËüÔøÔÚ TB ¼¶Êý¾ÝÅÅÐò»ù×¼ÆÀ¹ÀÖÐ Ó®µÃµÚÒ»Ãû£¬¶ø TotalOrderPartitionerÕýÊÇ´Ó¸ÃʵÀýÖÐÌáÁ¶³öÀ´µÄ¡£HBase ÊÇÒ»¸ö¹¹½¨ÔÚ HadoopÖ®É쵀 NoSQL Êý¾Ý²Ö¿â¡£ËüÒÔ RegionΪµ¥Î»»®·ÖÊý¾Ý£¬Region ÄÚ²¿Êý¾ÝÓÐÐò£¨°´ key ÅÅÐò£©£¬Region Ö®¼äÒ²ÓÐÐò¡£ºÜÃ÷ÏÔ£¬Ò»¸ö MapReduce È«ÅÅÐò×÷ÒµµÄ R ¸öÊä³öÎļþÕýºÃ¿É¶ÔÓ¦ HBase µÄ R ¸ö Region¡£

аæ API µÄ Partitioner ½âÎö

аæ API ÖеÄPartitionerÀàͼÈçͼËùʾ¡£Ëü²»ÔÙʵÏÖJobConfigurable ½Ó¿Ú¡£µ±Óû§ÐèÒªÈà Partitionerͨ¹ýij¸öJobConf ¶ÔÏó³õʼ»¯Ê±£¬¿É×ÔÐÐʵÏÖConfigurable ½Ó¿Ú£¬È磺

public class TotalOrderPartitioner<K, V> extends Partitioner<K,V> implements Configurable

PartitionËù´¦µÄλÖÃ

PartitionÖ÷Òª×÷ÓþÍÊǽ«mapµÄ½á¹û·¢Ë͵½ÏàÓ¦µÄreduce¡£Õâ¾Í¶ÔpartitionÓÐÁ½¸öÒªÇó£º

1£©¾ùºâ¸ºÔØ£¬¾¡Á¿µÄ½«¹¤×÷¾ùÔȵķÖÅ䏸²»Í¬µÄreduce¡£

2£©Ð§ÂÊ£¬·ÖÅäËÙ¶ÈÒ»¶¨Òª¿ì¡£

MapreduceÌṩµÄPartitioner

patitionÀà½á¹¹

1. Partitioner<k,v>ÊÇpartitionerµÄ»ùÀ࣬Èç¹ûÐèÒª¶¨ÖÆpartitionerÒ²ÐèÒª¼Ì³Ð¸ÃÀà¡£Ô´´úÂëÈçÏ£º

package org.apache.hadoop.mapred;
/**
* Partitions the key space.
*
* <p class="artcon"><code>Partitioner</code> controls the partitioning of the keys of the
* intermediate map-outputs. The key (or a subset of the key) is used to derive
* the partition, typically by a hash function. The total number of partitions
* is the same as the number of reduce tasks for the job. Hence this controls
* which of the <code>m</code> reduce tasks the intermediate key (and hence the
* record) is sent for reduction.</p>
*
* @see Reducer
* @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead.
*/
@Deprecated
public interface Partitioner<K2, V2> extends JobConfigurable {

/**
* Get the paritition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* <p class="artcon">Typically a hash function on a all or a subset of the key.</p>
*
* @param key the key to be paritioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the <code>key</code>.
*/
int getPartition(K2 key, V2 value, int numPartitions);
}

2. HashPartitioner<k,v>ÊÇmapreduceµÄĬÈÏpartitioner¡£Ô´´úÂëÈçÏ£º

package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

3. BinaryPatitioner¼Ì³ÐÓÚPartitioner<BinaryComparable ,V>£¬ÊÇPartitioner<k,v>µÄÆ«ÌØ»¯×ÓÀà¡£¸ÃÀàÌṩleftOffsetºÍrightOffset£¬ÔÚ¼ÆËãwhich reducerʱ½ö¶Ô¼üÖµKµÄ[rightOffset£¬leftOffset]Õâ¸öÇø¼äÈ¡hash¡£

reducer=(hash & Integer.MAX_VALUE) % numReduceTasks

4. KeyFieldBasedPartitioner<k2, v2="">Ò²ÊÇ»ùÓÚhashµÄ¸öpartitioner¡£ºÍBinaryPatitioner²»Í¬£¬ËüÌṩÁ˶à¸öÇø¼äÓÃÓÚ¼ÆËãhash¡£µ±Çø¼äÊýΪ0ʱKeyFieldBasedPartitionerÍË»¯³ÉHashPartitioner¡£ Ô´´úÂëÈçÏ£º

package org.apache.hadoop.mapred.lib;

import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;

 /**   
  *  Defines a way to partition keys based on certain key fields (also see
  *  {@link KeyFieldBasedComparator}.
  *  The key specification supported is of the form -k pos1[,pos2], where,
  *  pos is of the form f[.c][opts], where f is the number
  *  of the key field to use, and c is the number of the first character from
  *  the beginning of the field. Fields and character posns are numbered 
  *  starting with 1; a character position of zero in pos2 indicates the
  *  field's last character. If '.c' is omitted from pos1, it defaults to 1
  *  (the beginning of the field); if omitted from pos2, it defaults to 0 
  *  (the end of the field).
  * 
  */
public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {

  private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
  private int numOfPartitionFields;
  
  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();

  public void configure(JobConf job) {
    String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
    if (job.get("num.key.fields.for.partition") != null) {
      LOG.warn("Using deprecated num.key.fields.for.partition. " +
              "Use mapred.text.key.partitioner.options instead");
      this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
      keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
    } else {
      String option = job.getKeyFieldPartitionerOption();
      keyFieldHelper.parseOption(option);
    }
  }

  public int getPartition(K2 key, V2 value,
      int numReduceTasks) {
    byte[] keyBytes;

    List  allKeySpecs = keyFieldHelper.keySpecs();
    if (allKeySpecs.size() == 0) {
      return getPartition(key.toString().hashCode(), numReduceTasks);
    }

    try {
      keyBytes = key.toString().getBytes("UTF-8");
    } catch (UnsupportedEncodingException e) {
      throw new RuntimeException("The current system does not " +
          "support UTF-8 encoding!", e);
    }
    // return 0 if the key is empty
    if (keyBytes.length == 0) {
      return 0;
    }
    
    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
        keyBytes.length);
    int currentHash = 0;
    for (KeyDescription keySpec : allKeySpecs) {
      int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length, 
          lengthIndicesFirst, keySpec);
       // no key found! continue
      if (startChar < 0) {
        continue;
      }
      int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
          lengthIndicesFirst, keySpec);
      currentHash = hashCode(keyBytes, startChar, endChar, 
          currentHash);
    }
    return getPartition(currentHash, numReduceTasks);
  }
  
  protected int hashCode(byte[] b, int start, int end, int currentHash) {
    for (int i = start; i <= end; i++) {
      currentHash = 31*currentHash + b[i];
    }
    return currentHash;
  }

  protected int getPartition(int hash, int numReduceTasks) {
    return (hash & Integer.MAX_VALUE) % numReduceTasks;
  }
}

5. TotalOrderPartitionerÕâ¸öÀà¿ÉÒÔʵÏÖÊä³öµÄÈ«ÅÅÐò¡£²»Í¬ÓÚÒÔÉÏ3¸öpartitioner£¬Õâ¸öÀಢ²»ÊÇ»ùÓÚhashµÄ¡£ÏÂÃæÏêϸµÄ½éÉÜTotalOrderPartitioner

TotalOrderPartitioner Àà

ÿһ¸öreducerµÄÊä³öÔÚĬÈϵÄÇé¿ö϶¼ÊÇÓÐ˳ÐòµÄ£¬µ«ÊÇreducerÖ®¼äÔÚÊäÈëÊÇÎÞÐòµÄÇé¿öÏÂÒ²ÊÇÎÞÐòµÄ¡£Èç¹ûҪʵÏÖÊä³öÊÇÈ«ÅÅÐòµÄÄǾͻáÓõ½TotalOrderPartitioner¡£

ҪʹÓÃTotalOrderPartitioner£¬µÃ¸øTotalOrderPartitionerÌṩһ¸öpartition file¡£Õâ¸öÎļþÒªÇóKey£¨ÕâЩkey¾ÍÊÇËùνµÄ»®·Ö£©µÄÊýÁ¿ºÍµ±Ç°reducerµÄÊýÁ¿-1Ïàͬ²¢ÇÒÊÇ´ÓСµ½´óÅÅÁС£¶ÔÓÚΪʲôҪÓõ½ÕâÑùÒ»¸öÎļþ£¬ÒÔ¼°Õâ¸öÎļþµÄ¾ßÌåϸ½Ú´ý»á»¹»áÌáµ½¡£

TotalOrderPartitioner¶Ô²»Í¬KeyµÄÊý¾ÝÀàÐÍÌṩÁËÁ½ÖÖ·½°¸£º

1£© ¶ÔÓÚ·ÇBinaryComparable ÀàÐ͵ÄKey£¬TotalOrderPartitioner²ÉÓöþ·Ö·¢²éÕÒµ±Ç°µÄKËùÔÚµÄindex¡£

ÀýÈ磺reducerµÄÊýÁ¿Îª5£¬partition file ÌṩµÄ4¸ö»®·ÖΪ¡¾2£¬4£¬6£¬8¡¿¡£Èç¹ûµ±Ç°µÄÒ»¸ökey/value ÊÇ<4,¡±good¡±>£¬ÀûÓöþ·Ö·¨²éÕÒµ½index=1£¬index+1=2ÄÇôÕâ¸ökey/value ½«»á·¢Ë͵½µÚ¶þ¸öreducer¡£Èç¹ûÒ»¸ökey/valueΪ<4.5, ¡°good¡±>¡£ÄÇô¶þ·Ö·¨²éÕÒ½«·µ»Ø-3£¬Í¬Ñù¶Ô-3¼Ó1È»ºóÈ¡·´¾ÍÊÇÕâ¸ökey/value½«ÒªÈ¥µÄreducer¡£

¶ÔÓÚһЩÊýÖµÐ͵ÄÊý¾ÝÀ´Ëµ£¬ÀûÓöþ·Ö·¨²éÕÒ¸´ÔÓ¶ÈÊÇO(log(reducer count))£¬ËٶȱȽϿ졣

2£© ¶ÔÓÚBinaryComparableÀàÐ͵ÄKey£¨Ò²¿ÉÒÔÖ±½ÓÀí½âΪ×Ö·û´®£©¡£×Ö·û´®°´ÕÕ×Öµä˳ÐòÒ²ÊÇ¿ÉÒÔ½øÐÐÅÅÐòµÄ¡£

ÕâÑùµÄ»°Ò²¿ÉÒÔ¸ø¶¨Ò»Ð©»®·Ö£¬Èò»Í¬µÄ×Ö·û´®key·ÖÅäµ½²»Í¬µÄreducerÀï¡£ÕâÀïµÄ´¦ÀíºÍÊýÖµÀàÐ͵ıȽÏÏà½ü¡£

ÀýÈ磺reducerµÄÊýÁ¿Îª5£¬partition file ÌṩÁË4¸ö»®·ÖΪ¡¾¡°abc¡±, ¡°bce¡±, ¡°eaa¡±, ¡±fhc¡±¡¿ÄÇô¡°ab¡±Õâ¸ö×Ö·û´®½«»á±»·ÖÅäµ½µÚÒ»¸öreducerÀÒòΪËüСÓÚµÚÒ»¸ö»®·Ö¡°abc¡±¡£

µ«ÊDz»Í¬ÓÚÊýÖµÐ͵ÄÊý¾Ý£¬×Ö·û´®µÄ²éÕҺͱȽϲ»Äܰ´ÕÕÊýÖµÐÍÊý¾ÝµÄ±È½Ï·½·¨¡£mapreducer²ÉÓõÄTire tree£¨¹ØÓÚTire tree¿ÉÒԲο¼¡¶×ÖµäÊ÷(Trie Tree)¡·£©µÄ×Ö·û´®²éÕÒ·½·¨¡£²éÕÒµÄʱ¼ä¸´ÔÓ¶Èo(m)£¬mΪÊ÷µÄÉî¶È£¬¿Õ¼ä¸´ÔÓ¶Èo(255^m-1)¡£ÊÇÒ»¸öµäÐ͵Ŀռ任ʱ¼äµÄ°¸Àý¡£

Tire treeµÄ¹¹½¨

¼ÙÉèÊ÷µÄ×î´óÉî¶ÈΪ3£¬»®·ÖΪ¡¾aaad £¬aaaf£¬ aaaeh£¬abbx¡¿

MapreduceÀïµÄTire treeÖ÷ÒªÓÐÁ½ÖÖ½Úµã×é³É£º

1£© Innertirenode

InnertirenodeÔÚmapreduceÖÐÊǰüº¬ÁË255¸ö×Ö·ûµÄÒ»¸ö±È½Ï³¤µÄ´®¡£ÉÏͼÖеÄÀý×ÓÖ»°üº¬ÁË26¸öÓ¢ÎÄ×Öĸ¡£

2£© Ò¶×Ó½Úµã{unslipttirenode, singesplittirenode, leaftirenode}

Unslipttirenode ÊDz»°üº¬»®·ÖµÄÒ¶×ӽڵ㡣

Singlesplittirenode ÊÇÖ»°üº¬ÁËÒ»¸ö»®·ÖµãµÄÒ¶×ӽڵ㡣

LeafnodeÊǰüº¬Á˶à¸ö»®·ÖµãµÄÒ¶×ӽڵ㡣£¨ÕâÖÖÇé¿ö±È½ÏÉÙ¼û£¬´ïµ½Ê÷µÄ×î´óÉî¶È²Å³öÏÖÕâÖÖÇé¿ö¡£ÔÚʵ¼Ê²Ù×÷¹ý³ÌÖбȽÏÉÙ¼û£©

Tire treeµÄËÑË÷¹ý³Ì

½ÓÉÏÃæµÄÀý×Ó£º

1£©¼ÙÈ統ǰ key value pair <aad, 10="">Õâʱ»áÕÒµ½Í¼ÖеÄleafnode£¬ÔÚleafnodeÄÚ²¿Ê¹Óöþ·Ö·¨¼ÌÐø²éÕÒÕÒµ½·µ»Ø aadÔÚ»®·ÖÊý×éÖеÄË÷Òý¡£ÕÒ²»µ½»á·µ»ØÒ»¸öºÍËü×î½Ó½üµÄ»®·ÖµÄË÷Òý¡£

2£©¼ÙÈçÕÒµ½singlenode£¬Èç¹ûºÍsinglenodeµÄ»®·ÖÏàͬ»òС·µ»ØËûµÄË÷Òý£¬±ÈsinglenodeµÄ»®·Ö´óÔò·µ»ØË÷Òý+1¡£

3£©¼ÙÈçÕÒµ½nosplitnodeÔò·µ»ØÇ°ÃæµÄË÷Òý¡£Èç<zaa, 20="">½«»á·µ»ØabbxµÄÔÚ»®·ÖÊý×éÖеÄË÷Òý¡£

TotalOrderPartitionerµÄÒÉÎÊ

ÉÏÃæ½éÉÜÁËpartitionerÓÐÁ½¸öÒªÇó£¬Ò»¸öÊÇËÙ¶È£¬ÁíÍâÒ»¸öÊǾùºâ¸ºÔØ¡£Ê¹ÓÃtire treeÌá¸ßÁËËÑËØµÄËÙ¶È£¬µ«ÊÇÎÒÃÇÔõô²ÅÄÜÕÒµ½ÕâÑùµÄpartition file ÄØ£¿ÈÃËùÓеĻ®·Ö¸ÕºÃ¾ÍÄÜʵÏÖ¾ùºâ¸ºÔØ¡£

InputSampler

ÊäÈë²ÉÑùÀ࣬¿ÉÒÔ¶ÔÊäÈëĿ¼ÏµÄÊý¾Ý½øÐвÉÑù¡£ÌṩÁË3ÖÖ²ÉÑù·½·¨¡£

²ÉÑùÀà½á¹¹Í¼

²ÉÑù·½Ê½¶Ô±È±í:

writePartitionFileÕâ¸ö·½·¨ºÜ¹Ø¼ü£¬Õâ¸ö·½·¨¾ÍÊǸù¾Ý²ÉÑùÀàÌṩµÄÑù±¾£¬Ê×ÏȽøÐÐÅÅÐò£¬È»ºóÑ¡¶¨£¨Ëæ»úµÄ·½·¨£©ºÍreducerÊýÄ¿-1µÄÑù±¾Ð´Èëµ½partition file¡£ÕâÑù¾­¹ý²ÉÑùµÄÊý¾ÝÉú³ÉµÄ»®·Ö£¬ÔÚÿ¸ö»®·ÖÇø¼äÀïµÄkey/value¾Í½üËÆÏàͬÁË£¬ÕâÑù¾ÍÄÜÍê³É¾ùºâ¸ºÔصÄ×÷Óá£

SplitSamplerÀàµÄÔ´´úÂëÈçÏ£º

/**
   * Samples the first n records from s splits.
   * Inexpensive way to sample random data.
   */
public static class SplitSampler<K,V> implements Sampler<K,V> {
    private final int numSamples;
    private final int maxSplitsSampled;
    /**
     * Create a SplitSampler sampling all splits.
     * Takes the first numSamples / numSplits records from each split.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     */
    public SplitSampler(int numSamples) {
      this(numSamples, Integer.MAX_VALUE);
    }
    /**
     * Create a new SplitSampler.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     * @param maxSplitsSampled The maximum number of splits to examine.
     */
    public SplitSampler(int numSamples, int maxSplitsSampled) {
      this.numSamples = numSamples;
      this.maxSplitsSampled = maxSplitsSampled;
    }
    /**
     * From each split sampled, take the first numSamples / numSplits records.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList samples = new ArrayList(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
      int splitStep = splits.length / splitsToSample;
      int samplesPerSplit = numSamples / splitsToSample;
      long records = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          samples.add(key);
          key = reader.createKey();
          ++records;
          if ((i+1) * samplesPerSplit <= records) {
            break;
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
  }

RandomSamplerÀàµÄÔ´´úÂëÈçÏ£º

/**
   * Sample from random points in the input.
   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
   * each split.
   */
  public static class RandomSampler<K,V> implements Sampler<K,V> {
    private double freq;
    private final int numSamples;
    private final int maxSplitsSampled;

    /**
     * Create a new RandomSampler sampling all splits.
     * This will read every split at the client, which is very expensive.
     * @param freq Probability with which a key will be chosen.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     */
    public RandomSampler(double freq, int numSamples) {
      this(freq, numSamples, Integer.MAX_VALUE);
    }

    /**
     * Create a new RandomSampler.
     * @param freq Probability with which a key will be chosen.
     * @param numSamples Total number of samples to obtain from all selected
     *                   splits.
     * @param maxSplitsSampled The maximum number of splits to examine.
     */
    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
      this.freq = freq;
      this.numSamples = numSamples;
      this.maxSplitsSampled = maxSplitsSampled;
    }

    /**
     * Randomize the split order, then take the specified number of keys from
     * each split sampled, where each key is selected with the specified
     * probability and possibly replaced by a subsequently selected key when
     * the quota of keys from that split is satisfied.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList samples = new ArrayList(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);

      Random r = new Random();
      long seed = r.nextLong();
      r.setSeed(seed);
      LOG.debug("seed: " + seed);
      // shuffle splits
      for (int i = 0; i < splits.length; ++i) {
        InputSplit tmp = splits[i];
        int j = r.nextInt(splits.length);
        splits[i] = splits[j];
        splits[j] = tmp;
      }
      // our target rate is in terms of the maximum number of sample splits,
      // but we accept the possibility of sampling additional splits to hit
      // the target sample keyset
      for (int i = 0; i < splitsToSample ||
                     (i < splits.length && samples.size() < numSamples); ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
            Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          if (r.nextDouble() <= freq) {
            if (samples.size() < numSamples) {
              samples.add(key);
            } else {
              // When exceeding the maximum number of samples, replace a
              // random element with this one, then adjust the frequency
              // to reflect the possibility of existing elements being
              // pushed out
              int ind = r.nextInt(numSamples);
              if (ind != numSamples) {
                samples.set(ind, key);
              }
              freq *= (numSamples - 1) / (double) numSamples;
            }
            key = reader.createKey();
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
  }

IntervalSamplerÀàµÄÔ´´úÂëΪ£º

/**
   * Sample from s splits at regular intervals.
   * Useful for sorted data.
   */
  public static class IntervalSampler<K,V> implements Sampler<K,V> {
    private final double freq;
    private final int maxSplitsSampled;

    /**
     * Create a new IntervalSampler sampling all splits.
     * @param freq The frequency with which records will be emitted.
     */
    public IntervalSampler(double freq) {
      this(freq, Integer.MAX_VALUE);
    }

    /**
     * Create a new IntervalSampler.
     * @param freq The frequency with which records will be emitted.
     * @param maxSplitsSampled The maximum number of splits to examine.
     * @see #getSample
     */
    public IntervalSampler(double freq, int maxSplitsSampled) {
      this.freq = freq;
      this.maxSplitsSampled = maxSplitsSampled;
    }

    /**
     * For each split sampled, emit when the ratio of the number of records
     * retained to the total record count is less than the specified
     * frequency.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList samples = new ArrayList();
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
      int splitStep = splits.length / splitsToSample;
      long records = 0;
      long kept = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          ++records;
          if ((double) kept / records < freq) {
            ++kept;
            samples.add(key);
            key = reader.createKey();
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
  }

TotalOrderPartitionerʵÀý

public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
        implements Tool
{
    @Override
    public int run(String[] args) throws Exception
    {
        JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (conf == null) {
            return -1;
        }
        conf.setInputFormat(SequenceFileInputFormat.class);
        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputFormat(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat
                .setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(conf,
                CompressionType.BLOCK);
        conf.setPartitionerClass(TotalOrderPartitioner.class);
        InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
                0.1, 10000, 10);
        Path input = FileInputFormat.getInputPaths(conf)[0];
        input = input.makeQualified(input.getFileSystem(conf));
        Path partitionFile = new Path(input, "_partitions");
        TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
        InputSampler.writePartitionFile(conf, sampler);
        // Add to DistributedCache
        URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
        DistributedCache.addCacheFile(partitionUri, conf);
        DistributedCache.createSymlink(conf);
        JobClient.runJob(conf);
        return 0;
    }
 
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(
                new SortByTemperatureUsingTotalOrderPartitioner(), args);
        System.exit(exitCode);
    }
}
   
3510 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí