Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Hadoop¼¯Èº_WordCountÔËÐÐÏê½â--MapReduce±à³ÌÄ£ÐÍ
 
 À´Ô´£º NOSQLÖÐÎÄÍø ·¢²¼ÓÚ£º  2016-7-8
  4986  次浏览      30
 

1¡¢MapReduceÀíÂÛ¼ò½é

1.1 MapReduce±à³ÌÄ£ÐÍ

¡¡¡¡

MapReduce²ÉÓÃ"·Ö¶øÖÎÖ®"µÄ˼Ï룬°Ñ¶Ô´ó¹æÄ£Êý¾Ý¼¯µÄ²Ù×÷£¬·Ö·¢¸øÒ»¸öÖ÷½Úµã¹ÜÀíϵĸ÷¸ö·Ö½Úµã¹²Í¬Íê³É£¬È»ºóͨ¹ýÕûºÏ¸÷¸ö½ÚµãµÄÖмä½á¹û£¬µÃµ½×îÖÕ½á¹û¡£¼òµ¥µØËµ£¬MapReduce¾ÍÊÇ"ÈÎÎñµÄ·Ö½âÓë½á¹ûµÄ»ã×Ü"¡£

¡¡¡¡ÔÚHadoopÖУ¬ÓÃÓÚÖ´ÐÐMapReduceÈÎÎñµÄ»úÆ÷½ÇÉ«ÓÐÁ½¸ö£ºÒ»¸öÊÇJobTracker£»ÁíÒ»¸öÊÇTaskTracker£¬JobTrackerÊÇÓÃÓÚµ÷¶È¹¤×÷µÄ£¬TaskTrackerÊÇÓÃÓÚÖ´Ðй¤×÷µÄ¡£Ò»¸öHadoop¼¯ÈºÖÐÖ»ÓÐһ̨JobTracker¡£

¡¡¡¡ÔÚ·Ö²¼Ê½¼ÆËãÖУ¬MapReduce¿ò¼Ü¸ºÔð´¦ÀíÁ˲¢Ðбà³ÌÖзֲ¼Ê½´æ´¢¡¢¹¤×÷µ÷¶È¡¢¸ºÔؾùºâ¡¢ÈÝ´í¾ùºâ¡¢ÈÝ´í´¦ÀíÒÔ¼°ÍøÂçͨÐŵȸ´ÔÓÎÊÌ⣬°Ñ´¦Àí¹ý³Ì¸ß¶È³éÏóΪÁ½¸öº¯Êý£ºmapºÍreduce£¬map¸ºÔð°ÑÈÎÎñ·Ö½â³É¶à¸öÈÎÎñ£¬reduce¸ºÔð°Ñ·Ö½âºó¶àÈÎÎñ´¦ÀíµÄ½á¹û»ã×ÜÆðÀ´¡£

¡¡¡¡ÐèҪעÒâµÄÊÇ£¬ÓÃMapReduceÀ´´¦ÀíµÄÊý¾Ý¼¯£¨»òÈÎÎñ£©±ØÐë¾ß±¸ÕâÑùµÄÌØµã£º´ý´¦ÀíµÄÊý¾Ý¼¯¿ÉÒÔ·Ö½â³ÉÐí¶àСµÄÊý¾Ý¼¯£¬¶øÇÒÿһ¸öСÊý¾Ý¼¯¶¼¿ÉÒÔÍêÈ«²¢ÐеؽøÐд¦Àí¡£

1.2 MapReduce´¦Àí¹ý³Ì

¡¡

¡¡ÔÚHadoopÖУ¬Ã¿¸öMapReduceÈÎÎñ¶¼±»³õʼ»¯ÎªÒ»¸öJob£¬Ã¿¸öJobÓÖ¿ÉÒÔ·ÖΪÁ½Öֽ׶Σºmap½×¶ÎºÍreduce½×¶Î¡£ÕâÁ½¸ö½×¶Î·Ö±ðÓÃÁ½¸öº¯Êý±íʾ£¬¼´mapº¯ÊýºÍreduceº¯Êý¡£mapº¯Êý½ÓÊÕÒ»¸ö<key,value>ÐÎʽµÄÊäÈ룬ȻºóͬÑù²úÉúÒ»¸ö<key,value>ÐÎʽµÄÖмäÊä³ö£¬Hadoopº¯Êý½ÓÊÕÒ»¸öÈç<key,(list of values)>ÐÎʽµÄÊäÈ룬Ȼºó¶ÔÕâ¸övalue¼¯ºÏ½øÐд¦Àí£¬Ã¿¸öreduce²úÉú0»ò1¸öÊä³ö£¬reduceµÄÊä³öÒ²ÊÇ<key,value>ÐÎʽµÄ¡£

MapReduce´¦Àí´óÊý¾Ý¼¯µÄ¹ý³Ì

2¡¢ÔËÐÐWordCount³ÌÐò

¡¡¡¡

µ¥´Ê¼ÆÊýÊÇ×î¼òµ¥Ò²ÊÇ×îÄÜÌåÏÖMapReduce˼ÏëµÄ³ÌÐòÖ®Ò»£¬¿ÉÒÔ³ÆÎªMapReduce°æ"Hello World"£¬¸Ã³ÌÐòµÄÍêÕû´úÂë¿ÉÒÔÔÚHadoop°²×°°üµÄ"src/examples"Ŀ¼ÏÂÕÒµ½¡£µ¥´Ê¼ÆÊýÖ÷ÒªÍê³É¹¦ÄÜÊÇ£ºÍ³¼ÆÒ»ÏµÁÐÎı¾ÎļþÖÐÿ¸öµ¥´Ê³öÏֵĴÎÊý£¬ÈçÏÂͼËùʾ¡£

2.1 ×¼±¸¹¤×÷

¡¡¡¡ÏÖÔÚÒÔ"hadoop"ÆÕͨÓû§µÇ¼"Master.Hadoop"·þÎñÆ÷¡£

¡¡¡¡1£©´´½¨±¾µØÊ¾ÀýÎļþ

¡¡¡¡Ê×ÏÈÔÚ"/home/hadoop"Ŀ¼Ï´´½¨Îļþ¼Ð"file"¡£

¡¡¡¡½Ó×Å´´½¨Á½¸öÎı¾Îļþfile1.txtºÍfile2.txt£¬Ê¹file1.txtÄÚÈÝΪ"Hello World"£¬¶øfile2.txtµÄÄÚÈÝΪ"Hello Hadoop"¡£

¡¡¡¡2£©ÔÚHDFSÉÏ´´½¨ÊäÈëÎļþ¼Ð

¡¡¡¡3£©ÉÏ´«±¾µØfileÖÐÎļþµ½¼¯ÈºµÄinputĿ¼ÏÂ

2.2 ÔËÐÐÀý×Ó

¡¡¡¡1£©ÔÚ¼¯ÈºÉÏÔËÐÐWordCount³ÌÐò

¡¡¡¡±¸×¢£ºÒÔinput×÷ΪÊäÈëĿ¼£¬outputĿ¼×÷ΪÊä³öĿ¼¡£

¡¡¡¡ÒѾ­±àÒëºÃµÄWordCountµÄJarÔÚ"/usr/hadoop"ÏÂÃæ£¬¾ÍÊÇ"hadoop-examples-1.0.0.jar"£¬ËùÒÔÔÚÏÂÃæÖ´ÐÐÃüÁîʱ¼ÇµÃ°Ñ·¾¶Ð´È«ÁË£¬²»È»»áÌáʾÕÒ²»µ½¸ÃJar°ü¡£

¡¡¡¡2£©MapReduceÖ´Ðйý³ÌÏÔʾÐÅÏ¢

¡¡¡¡HadoopÃüÁî»áÆô¶¯Ò»¸öJVMÀ´ÔËÐÐÕâ¸öMapReduce³ÌÐò£¬²¢×Ô¶¯»ñµÃHadoopµÄÅäÖã¬Í¬Ê±°ÑÀàµÄ·¾¶£¨¼°ÆäÒÀÀµ¹ØÏµ£©¼ÓÈëµ½HadoopµÄ¿âÖС£ÒÔÉϾÍÊÇHadoop JobµÄÔËÐмǼ£¬´ÓÕâÀï¿ÉÒÔ¿´µ½£¬Õâ¸öJob±»¸³ÓèÁËÒ»¸öIDºÅ£ºjob_201202292213_0002£¬¶øÇÒµÃÖªÊäÈëÎļþÓÐÁ½¸ö£¨Total input paths to process : 2£©£¬Í¬Ê±»¹¿ÉÒÔÁ˽âmapµÄÊäÈëÊä³ö¼Ç¼£¨recordÊý¼°×Ö½ÚÊý£©£¬ÒÔ¼°reduceÊäÈëÊä³ö¼Ç¼¡£±ÈÈç˵£¬ÔÚ±¾ÀýÖУ¬mapµÄtaskÊýÁ¿ÊÇ2¸ö£¬reduceµÄtaskÊýÁ¿ÊÇÒ»¸ö¡£mapµÄÊäÈërecordÊýÊÇ2¸ö£¬Êä³örecordÊýÊÇ4¸öµÈÐÅÏ¢¡£

2.3 ²é¿´½á¹û

¡¡¡¡1£©²é¿´HDFSÉÏoutputĿ¼ÄÚÈÝ

¡¡¡¡´ÓÉÏͼÖÐÖªµÀÉú³ÉÁËÈý¸öÎļþ£¬ÎÒÃǵĽá¹ûÔÚ"part-r-00000"ÖС£

¡¡¡¡2£©²é¿´½á¹ûÊä³öÎļþÄÚÈÝ

3¡¢WordCountÔ´Âë·ÖÎö

3.1 ÌØ±ðÊý¾ÝÀàÐͽéÉÜ

¡¡¡¡HadoopÌṩÁËÈçÏÂÄÚÈݵÄÊý¾ÝÀàÐÍ£¬ÕâЩÊý¾ÝÀàÐͶ¼ÊµÏÖÁËWritableComparable½Ó¿Ú£¬ÒÔ±ãÓÃÕâЩÀàÐͶ¨ÒåµÄÊý¾Ý¿ÉÒÔ±»ÐòÁл¯½øÐÐÍøÂç´«ÊäºÍÎļþ´æ´¢£¬ÒÔ¼°½øÐдóС±È½Ï¡£

BooleanWritable£º±ê×¼²¼¶ûÐÍÊýÖµ

ByteWritable£ºµ¥×Ö½ÚÊýÖµ

DoubleWritable£ºË«×Ö½ÚÊý

FloatWritable£º¸¡µãÊý

IntWritable£ºÕûÐÍÊý

LongWritable£º³¤ÕûÐÍÊý

Text£ºÊ¹ÓÃUTF8¸ñʽ´æ´¢µÄÎı¾

NullWritable£ºµ±<key,value>ÖеÄkey»òvalueΪ¿ÕʱʹÓÃ

3.2 ¾ÉµÄWordCount·ÖÎö

¡¡¡¡1£©Ô´´úÂë³ÌÐò

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

public static class Map extends MapReduceBase implements

Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}

public static class Reduce extends MapReduceBase implements

Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WordCount.class);

conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}

}

¡¡¡¡3£©Ö÷·½·¨Main·ÖÎö

 public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WordCount.class);

conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}

¡¡¡¡Ê×ÏȽ²½âÒ»ÏÂJobµÄ³õʼ»¯¹ý³Ì¡£mainº¯Êýµ÷ÓÃJobconfÀàÀ´¶ÔMapReduce Job½øÐгõʼ»¯£¬È»ºóµ÷ÓÃsetJobName()·½·¨ÃüÃûÕâ¸öJob¡£¶ÔJob½øÐкÏÀíµÄÃüÃûÓÐÖúÓÚ¸ü¿ìµØÕÒµ½Job£¬ÒÔ±ãÔÚJobTrackerºÍTasktrackerµÄÒ³ÃæÖÐ¶ÔÆä½øÐмàÊÓ¡£

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

¡¡¡¡½Ó×ÅÉèÖÃJobÊä³ö½á¹û<key,value>µÄÖÐkeyºÍvalueÊý¾ÝÀàÐÍ£¬ÒòΪ½á¹ûÊÇ<µ¥´Ê,¸öÊý>£¬ËùÒÔkeyÉèÖÃΪ"Text"ÀàÐÍ£¬Ï൱ÓÚJavaÖÐStringÀàÐÍ¡£ValueÉèÖÃΪ"IntWritable"£¬Ï൱ÓÚJavaÖеÄintÀàÐÍ¡£

conf.setOutputKeyClass(Text.class );

conf.setOutputValueClass(IntWritable.class );

¡¡¡¡È»ºóÉèÖÃJob´¦ÀíµÄMap£¨²ð·Ö£©¡¢Combiner£¨Öмä½á¹ûºÏ²¢£©ÒÔ¼°Reduce£¨ºÏ²¢£©µÄÏà¹Ø´¦ÀíÀà¡£ÕâÀïÓÃReduceÀàÀ´½øÐÐMap²úÉúµÄÖмä½á¹ûºÏ²¢£¬±ÜÃâ¸øÍøÂçÊý¾Ý´«Êä²úÉúѹÁ¦¡£

 conf.setMapperClass(Map.class );

conf.setCombinerClass(Reduce.class );

conf.setReducerClass(Reduce.class );

¡¡¡¡½ÓמÍÊǵ÷ÓÃsetInputPath()ºÍsetOutputPath()ÉèÖÃÊäÈëÊä³ö·¾¶¡£

conf.setInputFormat(TextInputFormat.class );

conf.setOutputFormat(TextOutputFormat.class );

£¨1£©InputFormatºÍInputSplit

¡¡¡¡InputSplitÊÇHadoop¶¨ÒåµÄÓÃÀ´´«Ë͸øÃ¿¸öµ¥¶ÀµÄmapµÄÊý¾Ý£¬InputSplit´æ´¢µÄ²¢·ÇÊý¾Ý±¾Éí£¬¶øÊÇÒ»¸ö·ÖƬ³¤¶ÈºÍÒ»¸ö¼Ç¼Êý¾ÝλÖõÄÊý×é¡£Éú³ÉInputSplitµÄ·½·¨¿ÉÒÔͨ¹ýInputFormat()À´ÉèÖá£

¡¡¡¡µ±Êý¾Ý´«Ë͸ømapʱ£¬map»á½«ÊäÈë·ÖƬ´«Ë͵½InputFormat£¬InputFormatÔòµ÷Ó÷½·¨getRecordReader()Éú³ÉRecordReader£¬RecordReaderÔÙͨ¹ýcreatKey()¡¢creatValue()·½·¨´´½¨¿É¹©map´¦ÀíµÄ<key,value>¶Ô¡£¼ò¶øÑÔÖ®£¬InputFormat()·½·¨ÊÇÓÃÀ´Éú³É¿É¹©map´¦ÀíµÄ<key,value>¶ÔµÄ¡£

¡¡¡¡HadoopÔ¤¶¨ÒåÁ˶àÖÖ·½·¨½«²»Í¬ÀàÐ͵ÄÊäÈëÊý¾Ýת»¯ÎªmapÄܹ»´¦ÀíµÄ<key,value>¶Ô£¬ËüÃǶ¼¼Ì³Ð×ÔInputFormat£¬·Ö±ðÊÇ£º

  InputFormat

|

|---BaileyBorweinPlouffe.BbpInputFormat

|---ComposableInputFormat

|---CompositeInputFormat

|---DBInputFormat

|---DistSum.Machine.AbstractInputFormat

|---FileInputFormat

|---CombineFileInputFormat

|---KeyValueTextInputFormat

|---NLineInputFormat

|---SequenceFileInputFormat

|---TeraInputFormat

|---TextInputFormat

ÆäÖÐTextInputFormatÊÇHadoopĬÈϵÄÊäÈë·½·¨£¬ÔÚTextInputFormatÖУ¬Ã¿¸öÎļþ£¨»òÆäÒ»²¿·Ö£©¶¼»áµ¥¶ÀµØ×÷ΪmapµÄÊäÈ룬¶øÕâ¸öÊǼ̳Ð×ÔFileInputFormatµÄ¡£Ö®ºó£¬Ã¿ÐÐÊý¾Ý¶¼»áÉú³ÉÒ»Ìõ¼Ç¼£¬Ã¿Ìõ¼Ç¼Ôò±íʾ³É<key,value>ÐÎʽ£º

keyÖµÊÇÿ¸öÊý¾ÝµÄ¼Ç¼ÔÚÊý¾Ý·ÖƬÖÐ×Ö½ÚÆ«ÒÆÁ¿£¬Êý¾ÝÀàÐÍÊÇLongWritable£»¡¡¡¡

valueÖµÊÇÿÐеÄÄÚÈÝ£¬Êý¾ÝÀàÐÍÊÇText¡£

¡¡¡¡£¨2£©OutputFormat

¡¡¡¡Ã¿Ò»ÖÖÊäÈë¸ñʽ¶¼ÓÐÒ»ÖÖÊä³ö¸ñʽÓëÆä¶ÔÓ¦¡£Ä¬ÈϵÄÊä³ö¸ñʽÊÇTextOutputFormat£¬ÕâÖÖÊä³ö·½Ê½ÓëÊäÈëÀàËÆ£¬»á½«Ã¿Ìõ¼Ç¼ÒÔÒ»ÐеÄÐÎʽ´æÈëÎı¾Îļþ¡£²»¹ý£¬ËüµÄ¼üºÍÖµ¿ÉÒÔÊÇÈÎÒâÐÎʽµÄ£¬ÒòΪ³ÌÐòÄÚÈÝ»áµ÷ÓÃtoString()·½·¨½«¼üºÍֵת»»ÎªStringÀàÐÍÔÙÊä³ö¡£

¡¡¡¡3£©MapÀàÖÐmap·½·¨·ÖÎö

 public static class Map extends MapReduceBase implements

Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}

¡¡¡¡MapÀà¼Ì³Ð×ÔMapReduceBase£¬²¢ÇÒËüʵÏÖÁËMapper½Ó¿Ú£¬´Ë½Ó¿ÚÊÇÒ»¸ö¹æ·¶ÀàÐÍ£¬ËüÓÐ4ÖÖÐÎʽµÄ²ÎÊý£¬·Ö±ðÓÃÀ´Ö¸¶¨mapµÄÊäÈëkeyÖµÀàÐÍ¡¢ÊäÈëvalueÖµÀàÐÍ¡¢Êä³ökeyÖµÀàÐͺÍÊä³övalueÖµÀàÐÍ¡£ÔÚ±¾ÀýÖУ¬ÒòΪʹÓõÄÊÇTextInputFormat£¬ËüµÄÊä³ökeyÖµÊÇLongWritableÀàÐÍ£¬Êä³övalueÖµÊÇTextÀàÐÍ£¬ËùÒÔmapµÄÊäÈëÀàÐÍΪ<LongWritable,Text>¡£ÔÚ±¾ÀýÖÐÐèÒªÊä³ö<word,1>ÕâÑùµÄÐÎʽ£¬Òò´ËÊä³öµÄkeyÖµÀàÐÍÊÇText£¬Êä³öµÄvalueÖµÀàÐÍÊÇIntWritable¡£

¡¡¡¡ÊµÏִ˽ӿÚÀ໹ÐèҪʵÏÖmap·½·¨£¬map·½·¨»á¾ßÌ帺Ôð¶ÔÊäÈë½øÐвÙ×÷£¬ÔÚ±¾ÀýÖУ¬map·½·¨¶ÔÊäÈëµÄÐÐÒÔ¿Õ¸ñΪµ¥Î»½øÐÐÇз֣¬È»ºóʹÓÃOutputCollectÊÕ¼¯Êä³öµÄ<word,1>¡£

¡¡¡¡4£©ReduceÀàÖÐreduce·½·¨·ÖÎö

public static class Reduce extends MapReduceBase implements

Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

¡¡¡¡ReduceÀàÒ²ÊǼ̳Ð×ÔMapReduceBaseµÄ£¬ÐèҪʵÏÖReducer½Ó¿Ú¡£ReduceÀàÒÔmapµÄÊä³ö×÷ΪÊäÈ룬Òò´ËReduceµÄÊäÈëÀàÐÍÊÇ<Text£¬Intwritable>¡£¶øReduceµÄÊä³öÊǵ¥´ÊºÍËüµÄÊýÄ¿£¬Òò´Ë£¬ËüµÄÊä³öÀàÐÍÊÇ<Text,IntWritable>¡£ReduceÀàҲҪʵÏÖreduce·½·¨£¬ÔÚ´Ë·½·¨ÖУ¬reduceº¯Êý½«ÊäÈëµÄkeyÖµ×÷ΪÊä³öµÄkeyÖµ£¬È»ºó½«»ñµÃ¶à¸övalueÖµ¼ÓÆðÀ´£¬×÷ΪÊä³öµÄÖµ¡£

3.3 еÄWordCount·ÖÎö

¡¡¡¡1£©Ô´´úÂë³ÌÐò

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

¡¡¡¡public static class TokenizerMapper

¡¡¡¡¡¡¡¡¡¡¡¡extends Mapper<Object, Text, Text, IntWritable>{

¡¡¡¡¡¡¡¡¡¡¡¡private final static IntWritable one = new IntWritable(1);

¡¡¡¡¡¡¡¡¡¡¡¡private Text word = new Text();

¡¡¡¡¡¡¡¡¡¡¡¡public void map(Object key, Text value, Context context)

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡throws IOException, InterruptedException {

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡StringTokenizer itr = new StringTokenizer(value.toString());

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡while (itr.hasMoreTokens()) {

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡word.set(itr.nextToken());

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡context.write(word, one);

¡¡¡¡¡¡¡¡¡¡¡¡}

¡¡¡¡¡¡¡¡}

¡¡¡¡}

¡¡¡¡public static class IntSumReducer

¡¡¡¡¡¡¡¡¡¡¡¡extends Reducer<Text,IntWritable,Text,IntWritable> {

¡¡¡¡¡¡¡¡¡¡¡¡private IntWritable result = new IntWritable();

¡¡¡¡¡¡¡¡¡¡¡¡public void reduce(Text key, Iterable<IntWritable> values,Context context)

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ throws IOException, InterruptedException {

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡int sum = 0;

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡for (IntWritable val : values) {

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡sum += val.get();

¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡}

¡¡¡¡¡¡¡¡¡¡¡¡result.set(sum);

¡¡¡¡¡¡¡¡¡¡¡¡context.write(key, result);

¡¡¡¡¡¡¡¡}

¡¡¡¡}

¡¡¡¡public static void main(String[] args) throws Exception {

¡¡¡¡¡¡¡¡Configuration conf = new Configuration();

¡¡¡¡¡¡¡¡String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

¡¡¡¡¡¡¡¡if (otherArgs.length != 2) {

¡¡¡¡¡¡¡¡¡¡¡¡System.err.println("Usage: wordcount <in> <out>");

¡¡¡¡¡¡¡¡¡¡¡¡System.exit(2);

¡¡¡¡¡¡¡¡}

¡¡¡¡¡¡¡¡Job job = new Job(conf, "word count");

¡¡¡¡¡¡¡¡job.setJarByClass(WordCount.class);

¡¡¡¡¡¡¡¡job.setMapperClass(TokenizerMapper.class);

¡¡¡¡¡¡¡¡job.setCombinerClass(IntSumReducer.class);

¡¡¡¡¡¡¡¡job.setReducerClass(IntSumReducer.class);

¡¡¡¡¡¡¡¡job.setOutputKeyClass(Text.class);

¡¡¡¡¡¡¡¡job.setOutputValueClass(IntWritable.class);

¡¡¡¡¡¡¡¡FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

¡¡¡¡¡¡¡¡FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

¡¡¡¡¡¡¡¡System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

¡¡ 1£©Map¹ý³Ì

 public static class TokenizerMapper

¡¡¡¡extends Mapper<Object, Text, Text, IntWritable>{

¡¡¡¡private final static IntWritable one = new IntWritable(1);

¡¡¡¡private Text word = new Text();

¡¡¡¡public void map(Object key, Text value, Context context)

¡¡¡¡¡¡¡¡throws IOException, InterruptedException {

¡¡¡¡¡¡¡¡StringTokenizer itr = new StringTokenizer(value.toString());

¡¡¡¡¡¡¡¡while (itr.hasMoreTokens()) {

¡¡¡¡¡¡¡¡¡¡¡¡word.set(itr.nextToken());

¡¡¡¡¡¡¡¡¡¡¡¡context.write(word, one);

¡¡¡¡}

}

Map¹ý³ÌÐèÒª¼Ì³Ðorg.apache.hadoop.mapreduce°üÖÐMapperÀ࣬²¢ÖØÐ´Æämap·½·¨¡£Í¨¹ýÔÚmap·½·¨ÖÐÌí¼ÓÁ½¾ä°ÑkeyÖµºÍvalueÖµÊä³öµ½¿ØÖÆÌ¨µÄ´úÂ룬¿ÉÒÔ·¢ÏÖmap·½·¨ÖÐvalueÖµ´æ´¢µÄÊÇÎı¾ÎļþÖеÄÒ»ÐУ¨ÒԻسµ·ûΪÐнáÊø±ê¼Ç£©£¬¶økeyֵΪ¸ÃÐеÄÊ××ÖĸÏà¶ÔÓÚÎı¾ÎļþµÄÊ×µØÖ·µÄÆ«ÒÆÁ¿¡£È»ºóStringTokenizerÀཫÿһÐвð·Ö³ÉΪһ¸ö¸öµÄµ¥´Ê£¬²¢½«<word,1>×÷Ϊmap·½·¨µÄ½á¹ûÊä³ö£¬ÆäÓàµÄ¹¤×÷¶¼½»ÓÐMapReduce¿ò¼Ü´¦Àí¡£

2£©Reduce¹ý³Ì

 public static class IntSumReducer

¡¡¡¡extends Reducer<Text,IntWritable,Text,IntWritable> {

¡¡¡¡private IntWritable result = new IntWritable();

¡¡¡¡public void reduce(Text key, Iterable<IntWritable> values,Context context)

¡¡¡¡¡¡¡¡ throws IOException, InterruptedException {

¡¡¡¡¡¡¡¡int sum = 0;

¡¡¡¡¡¡¡¡for (IntWritable val : values) {

¡¡¡¡¡¡¡¡¡¡¡¡sum += val.get();

¡¡¡¡¡¡¡¡}

¡¡¡¡¡¡¡¡result.set(sum);

¡¡¡¡¡¡¡¡context.write(key, result);

¡¡¡¡}

}

¡¡¡¡Reduce¹ý³ÌÐèÒª¼Ì³Ðorg.apache.hadoop.mapreduce°üÖÐReducerÀ࣬²¢ÖØÐ´Æäreduce·½·¨¡£Map¹ý³ÌÊä³ö<key,values>ÖÐkeyΪµ¥¸öµ¥´Ê£¬¶øvaluesÊǶÔÓ¦µ¥´ÊµÄ¼ÆÊýÖµËù×é³ÉµÄÁÐ±í£¬MapµÄÊä³ö¾ÍÊÇReduceµÄÊäÈ룬ËùÒÔreduce·½·¨Ö»Òª±éÀúvalues²¢ÇóºÍ£¬¼´¿ÉµÃµ½Ä³¸öµ¥´ÊµÄ×Ü´ÎÊý¡£

3£©Ö´ÐÐMapReduceÈÎÎñ

public static void main(String[] args) throws Exception {

¡¡¡¡Configuration conf = new Configuration();

¡¡¡¡String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

¡¡¡¡if (otherArgs.length != 2) {

¡¡¡¡¡¡¡¡System.err.println("Usage: wordcount <in> <out>");

¡¡¡¡¡¡¡¡System.exit(2);

¡¡¡¡}

¡¡¡¡Job job = new Job(conf, "word count");

¡¡¡¡job.setJarByClass(WordCount.class);

¡¡¡¡job.setMapperClass(TokenizerMapper.class);

¡¡¡¡job.setCombinerClass(IntSumReducer.class);

¡¡¡¡job.setReducerClass(IntSumReducer.class);

¡¡¡¡job.setOutputKeyClass(Text.class);

¡¡¡¡job.setOutputValueClass(IntWritable.class);

¡¡¡¡FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

¡¡¡¡FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

¡¡¡¡System.exit(job.waitForCompletion(true) ? 0 : 1);

}

¡¡¡¡ÔÚMapReduceÖУ¬ÓÉJob¶ÔÏó¸ºÔð¹ÜÀíºÍÔËÐÐÒ»¸ö¼ÆËãÈÎÎñ£¬²¢Í¨¹ýJobµÄһЩ·½·¨¶ÔÈÎÎñµÄ²ÎÊý½øÐÐÏà¹ØµÄÉèÖᣴ˴¦ÉèÖÃÁËʹÓÃTokenizerMapperÍê³ÉMap¹ý³ÌÖеĴ¦ÀíºÍʹÓÃIntSumReducerÍê³ÉCombineºÍReduce¹ý³ÌÖеĴ¦Àí¡£»¹ÉèÖÃÁËMap¹ý³ÌºÍReduce¹ý³ÌµÄÊä³öÀàÐÍ£ºkeyµÄÀàÐÍΪText£¬valueµÄÀàÐÍΪIntWritable¡£ÈÎÎñµÄÊä³öºÍÊäÈë·¾¶ÔòÓÉÃüÁîÐвÎÊýÖ¸¶¨£¬²¢ÓÉFileInputFormatºÍFileOutputFormat·Ö±ðÉ趨¡£Íê³ÉÏàÓ¦ÈÎÎñµÄ²ÎÊýÉ趨ºó£¬¼´¿Éµ÷ÓÃjob.waitForCompletion()·½·¨Ö´ÐÐÈÎÎñ¡£

4¡¢WordCount´¦Àí¹ý³Ì

¡¡¡¡

±¾½Ú½«¶ÔWordCount½øÐиüÏêϸµÄ½²½â¡£ÏêϸִÐв½ÖèÈçÏ£º

¡¡¡¡1£©½«Îļþ²ð·Ö³Ésplits£¬ÓÉÓÚ²âÊÔÓõÄÎļþ½ÏС£¬ËùÒÔÿ¸öÎļþΪһ¸ösplit£¬²¢½«Îļþ°´ÐзָîÐγÉ<key,value>¶Ô£¬Èçͼ4-1Ëùʾ¡£ÕâÒ»²½ÓÉMapReduce¿ò¼Ü×Ô¶¯Íê³É£¬ÆäÖÐÆ«ÒÆÁ¿£¨¼´keyÖµ£©°üÀ¨Á˻سµËùÕ¼µÄ×Ö·ûÊý£¨WindowsºÍLinux»·¾³»á²»Í¬£©¡£

ͼ4-1 ·Ö¸î¹ý³Ì

¡¡¡¡2£©½«·Ö¸îºÃµÄ<key,value>¶Ô½»¸øÓû§¶¨ÒåµÄmap·½·¨½øÐд¦Àí£¬Éú³ÉеÄ<key,value>¶Ô£¬Èçͼ4-2Ëùʾ¡£

ͼ4-2 Ö´ÐÐmap·½·¨

¡¡¡¡3£©µÃµ½map·½·¨Êä³öµÄ<key,value>¶Ôºó£¬Mapper»á½«ËüÃǰ´ÕÕkeyÖµ½øÐÐÅÅÐò£¬²¢Ö´ÐÐCombine¹ý³Ì£¬½«keyÖÁÏàͬvalueÖµÀÛ¼Ó£¬µÃµ½MapperµÄ×îÖÕÊä³ö½á¹û¡£Èçͼ4-3Ëùʾ¡£

ͼ4-3 Map¶ËÅÅÐò¼°Combine¹ý³Ì

¡¡¡¡4£©ReducerÏȶԴÓMapper½ÓÊÕµÄÊý¾Ý½øÐÐÅÅÐò£¬ÔÙ½»ÓÉÓû§×Ô¶¨ÒåµÄreduce·½·¨½øÐд¦Àí£¬µÃµ½ÐµÄ<key,value>¶Ô£¬²¢×÷ΪWordCountµÄÊä³ö½á¹û£¬Èçͼ4-4Ëùʾ¡£

ͼ4-4 Reduce¶ËÅÅÐò¼°Êä³ö½á¹û

5¡¢MapReduceоɸıä

¡¡¡¡

Hadoop×îа汾µÄMapReduce Release 0.20.0µÄAPI°üÀ¨ÁËÒ»¸öȫеÄMapreduce JAVA API£¬ÓÐʱºòÒ²³ÆÎªÉÏÏÂÎĶÔÏó¡£

еÄAPIÀàÐÍÉϲ»¼æÈÝÒÔǰµÄAPI£¬ËùÒÔ£¬ÒÔǰµÄÓ¦ÓóÌÐòÐèÒªÖØÐ´²ÅÄÜʹеÄAPI·¢»ÓÆä×÷Óà ¡£

еÄAPIºÍ¾ÉµÄAPIÖ®¼äÓÐÏÂÃæ¼¸¸öÃ÷ÏÔµÄÇø±ð¡£

еÄAPIÇãÏòÓÚʹÓóéÏóÀ࣬¶ø²»Êǽӿڣ¬ÒòΪÕâ¸üÈÝÒ×À©Õ¹¡£ÀýÈ磬Äã¿ÉÒÔÌí¼ÓÒ»¸ö·½·¨(ÓÃĬÈϵÄʵÏÖ)µ½Ò»¸ö³éÏóÀà¶ø²»ÐèÐÞ¸ÄÀà֮ǰµÄʵÏÖ·½·¨¡£ÔÚеÄAPIÖУ¬MapperºÍReducerÊdzéÏóÀà¡£

еÄAPIÊÇÔÚorg.apache.hadoop.mapreduce°ü(ºÍ×Ó°ü)Öеġ£Ö®Ç°°æ±¾µÄAPIÔòÊÇ·ÅÔÚorg.apache.hadoop.mapredÖеġ£

еÄAPI¹ã·ºÊ¹ÓÃcontext object(ÉÏÏÂÎĶÔÏó)£¬²¢ÔÊÐíÓû§´úÂëÓëMapReduceϵͳ½øÐÐͨÐÅ¡£ÀýÈ磬MapContext»ù±¾Éϳ䵱×ÅJobConfµÄOutputCollectorºÍReporterµÄ½ÇÉ«¡£

еÄAPIͬʱ֧³Ö"ÍÆ"ºÍ"À­"ʽµÄµü´ú¡£ÔÚÕâÁ½¸öÐÂÀÏAPIÖУ¬¼ü/Öµ¼Ç¼¶Ô±»ÍÆmapperÖУ¬µ«³ý´ËÖ®Í⣬еÄAPIÔÊÐí°Ñ¼Ç¼´Ómap()·½·¨ÖÐÀ­³ö£¬ÕâÒ²ÊÊÓÃÓÚreducer¡£"À­"ʽµÄÒ»¸öÓÐÓõÄÀý×ÓÊÇ·ÖÅú´¦Àí¼Ç¼£¬¶ø²»ÊÇÒ»¸ö½ÓÒ»¸ö¡£

еÄAPIͳһÁËÅäÖ᣾ɵÄAPIÓÐÒ»¸öÌØÊâµÄJobConf¶ÔÏóÓÃÓÚ×÷ÒµÅäÖã¬ÕâÊÇÒ»¸ö¶ÔÓÚHadoopͨ³£µÄConfiguration¶ÔÏóµÄÀ©Õ¹¡£ÔÚеÄAPIÖУ¬ÕâÖÖÇø±ðûÓÐÁË£¬ËùÒÔ×÷ÒµÅäÖÃͨ¹ýConfigurationÀ´Íê³É¡£×÷Òµ¿ØÖƵÄÖ´ÐÐÓÉJobÀàÀ´¸ºÔ𣬶ø²»ÊÇJobClient£¬ËüÔÚеÄAPIÖÐÒѾ­µ´È»ÎÞ´æ¡£

   
4986 ´Îä¯ÀÀ       30
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-28[±±¾©]
OCSMPÈÏÖ¤£ºOCSMP-MBF 8-29[±±¾©]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 9-9[±±¾©]
Èí¼þ¼Ü¹¹Éè¼Æ·½·¨¡¢°¸Àýʵ¼ù 9-24[±±¾©]
ÐèÇó·ÖÎöʦÄÜÁ¦ÅàÑø 10-30[±±¾©]
MBSEÌåϵÓëʵ¼ù 8-26[±±¾©]

APPÍÆ¹ãÖ®ÇÉÓù¤¾ß½øÐÐÊý¾Ý·ÖÎö
Hadoop Hive»ù´¡sqlÓï·¨
Ó¦Óö༶»º´æÄ£Ê½Ö§³Åº£Á¿¶Á·þÎñ
HBase ³¬Ïêϸ½éÉÜ
HBase¼¼ÊõÏêϸ½éÉÜ
Spark¶¯Ì¬×ÊÔ´·ÖÅä

HadoopÓëSpark´óÊý¾Ý¼Ü¹¹
HadoopÔ­ÀíÓë¸ß¼¶Êµ¼ù
HadoopÔ­Àí¡¢Ó¦ÓÃÓëÓÅ»¯
´óÊý¾ÝÌåϵ¿ò¼ÜÓëÓ¦ÓÃ
´óÊý¾ÝµÄ¼¼ÊõÓëʵ¼ù
Spark´óÊý¾Ý´¦Àí¼¼Êõ

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí