Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
hadoop MapReduceʵÀý½âÎö
 
×÷Õß liuxiaochen123£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-11-20
  3664  次浏览      32
 

1¡¢MapReduceÀíÂÛ¼ò½é

1.1 MapReduce±à³ÌÄ£ÐÍ

MapReduce²ÉÓÃ"·Ö¶øÖÎÖ®"µÄ˼Ï룬°Ñ¶Ô´ó¹æÄ£Êý¾Ý¼¯µÄ²Ù×÷£¬·Ö·¢¸øÒ»¸öÖ÷½Úµã¹ÜÀíϵĸ÷¸ö·Ö½Úµã¹²Í¬Íê³É£¬È»ºóͨ¹ýÕûºÏ¸÷¸ö½ÚµãµÄÖмä½á¹û£¬µÃµ½×îÖÕ½á¹û¡£¼òµ¥µØËµ£¬MapReduce¾ÍÊÇ"ÈÎÎñµÄ·Ö½âÓë½á¹ûµÄ»ã×Ü"¡£

ÔÚHadoopÖУ¬ÓÃÓÚÖ´ÐÐMapReduceÈÎÎñµÄ»úÆ÷½ÇÉ«ÓÐÁ½¸ö£ºÒ»¸öÊÇJobTracker£»ÁíÒ»¸öÊÇTaskTracker£¬JobTrackerÊÇÓÃÓÚµ÷¶È¹¤×÷µÄ£¬TaskTrackerÊÇÓÃÓÚÖ´Ðй¤×÷µÄ¡£Ò»¸öHadoop¼¯ÈºÖÐÖ»ÓÐһ̨JobTracker¡£

ÔÚ·Ö²¼Ê½¼ÆËãÖУ¬MapReduce¿ò¼Ü¸ºÔð´¦ÀíÁ˲¢Ðбà³ÌÖзֲ¼Ê½´æ´¢¡¢¹¤×÷µ÷¶È¡¢¸ºÔؾùºâ¡¢ÈÝ´í¾ùºâ¡¢ÈÝ´í´¦ÀíÒÔ¼°ÍøÂçͨÐŵȸ´ÔÓÎÊÌ⣬°Ñ´¦Àí¹ý³Ì¸ß¶È³éÏóΪÁ½¸öº¯Êý£ºmapºÍreduce£¬map¸ºÔð°ÑÈÎÎñ·Ö½â³É¶à¸öÈÎÎñ£¬reduce¸ºÔð°Ñ·Ö½âºó¶àÈÎÎñ´¦ÀíµÄ½á¹û»ã×ÜÆðÀ´¡£

ÐèҪעÒâµÄÊÇ£¬ÓÃMapReduceÀ´´¦ÀíµÄÊý¾Ý¼¯£¨»òÈÎÎñ£©±ØÐë¾ß±¸ÕâÑùµÄÌØµã£º´ý´¦ÀíµÄÊý¾Ý¼¯¿ÉÒÔ·Ö½â³ÉÐí¶àСµÄÊý¾Ý¼¯£¬¶øÇÒÿһ¸öСÊý¾Ý¼¯¶¼¿ÉÒÔÍêÈ«²¢ÐеؽøÐд¦Àí¡£

1.2 MapReduce´¦Àí¹ý³Ì

ÔÚHadoopÖУ¬Ã¿¸öMapReduceÈÎÎñ¶¼±»³õʼ»¯ÎªÒ»¸öJob£¬Ã¿¸öJobÓÖ¿ÉÒÔ·ÖΪÁ½Öֽ׶Σºmap½×¶ÎºÍreduce½×¶Î¡£ÕâÁ½¸ö½×¶Î·Ö±ðÓÃÁ½¸öº¯Êý±íʾ£¬¼´mapº¯ÊýºÍreduceº¯Êý¡£mapº¯Êý½ÓÊÕÒ»¸ö<key,value>ÐÎʽµÄÊäÈ룬ȻºóͬÑù²úÉúÒ»¸ö<key,value>ÐÎʽµÄÖмäÊä³ö£¬Hadoopº¯Êý½ÓÊÕÒ»¸öÈç<key,(list of values)>ÐÎʽµÄÊäÈ룬Ȼºó¶ÔÕâ¸övalue¼¯ºÏ½øÐд¦Àí£¬Ã¿¸öreduce²úÉú0»ò1¸öÊä³ö£¬reduceµÄÊä³öÒ²ÊÇ<key,value>ÐÎʽµÄ¡£

MapReduce´¦Àí´óÊý¾Ý¼¯µÄ¹ý³Ì

2¡¢ÔËÐÐWordCount³ÌÐò

µ¥´Ê¼ÆÊýÊÇ×î¼òµ¥Ò²ÊÇ×îÄÜÌåÏÖMapReduce˼ÏëµÄ³ÌÐòÖ®Ò»£¬¿ÉÒÔ³ÆÎªMapReduce°æ"Hello World"£¬¸Ã³ÌÐòµÄÍêÕû´úÂë¿ÉÒÔÔÚHadoop°²×°°üµÄ"src/examples"Ŀ¼ÏÂÕÒµ½¡£µ¥´Ê¼ÆÊýÖ÷ÒªÍê³É¹¦ÄÜÊÇ£ºÍ³¼ÆÒ»ÏµÁÐÎı¾ÎļþÖÐÿ¸öµ¥´Ê³öÏֵĴÎÊý£¬ÈçÏÂͼËùʾ¡£

2.1 ×¼±¸¹¤×÷

ÏÖÔÚÒÔ"hadoop"ÆÕͨÓû§µÇ¼"Master.Hadoop"·þÎñÆ÷¡£

1£©´´½¨±¾µØÊ¾ÀýÎļþ

Ê×ÏÈÔÚ"/home/hadoop"Ŀ¼Ï´´½¨Îļþ¼Ð"file"¡£

½Ó×Å´´½¨Á½¸öÎı¾Îļþfile1.txtºÍfile2.txt£¬Ê¹file1.txtÄÚÈÝΪ"Hello World"£¬¶øfile2.txtµÄÄÚÈÝΪ"Hello Hadoop"¡£

2£©ÔÚHDFSÉÏ´´½¨ÊäÈëÎļþ¼Ð

3£©ÉÏ´«±¾µØfileÖÐÎļþµ½¼¯ÈºµÄinputĿ¼ÏÂ

2.2 ÔËÐÐÀý×Ó

1£©ÔÚ¼¯ÈºÉÏÔËÐÐWordCount³ÌÐò

±¸×¢£º ÒÔinput×÷ΪÊäÈëĿ¼£¬outputĿ¼×÷ΪÊä³öĿ¼¡£

ÒѾ­±àÒëºÃµÄWordCountµÄJarÔÚ"/usr/hadoop"ÏÂÃæ£¬¾ÍÊÇ"hadoop-examples-1.0.0.jar"£¬ËùÒÔÔÚÏÂÃæÖ´ÐÐÃüÁîʱ¼ÇµÃ°Ñ·¾¶Ð´È«ÁË£¬²»È»»áÌáʾÕÒ²»µ½¸ÃJar°ü¡£

2£©MapReduceÖ´Ðйý³ÌÏÔʾÐÅÏ¢

HadoopÃüÁî»áÆô¶¯Ò»¸öJVMÀ´ÔËÐÐÕâ¸öMapReduce³ÌÐò£¬²¢×Ô¶¯»ñµÃHadoopµÄÅäÖã¬Í¬Ê±°ÑÀàµÄ·¾¶£¨¼°ÆäÒÀÀµ¹ØÏµ£©¼ÓÈëµ½HadoopµÄ¿âÖС£ÒÔÉϾÍÊÇHadoop JobµÄÔËÐмǼ£¬´ÓÕâÀï¿ÉÒÔ¿´µ½£¬Õâ¸öJob±»¸³ÓèÁËÒ»¸öIDºÅ£ºjob_201202292213_0002£¬¶øÇÒµÃÖªÊäÈëÎļþÓÐÁ½¸ö£¨Total input paths to process : 2£©£¬Í¬Ê±»¹¿ÉÒÔÁ˽âmapµÄÊäÈëÊä³ö¼Ç¼£¨recordÊý¼°×Ö½ÚÊý£©£¬ÒÔ¼°reduceÊäÈëÊä³ö¼Ç¼¡£±ÈÈç˵£¬ÔÚ±¾ÀýÖУ¬mapµÄtaskÊýÁ¿ÊÇ2¸ö£¬reduceµÄtaskÊýÁ¿ÊÇÒ»¸ö¡£mapµÄÊäÈërecordÊýÊÇ2¸ö£¬Êä³örecordÊýÊÇ4¸öµÈÐÅÏ¢¡£

2.3 ²é¿´½á¹û

1£©²é¿´HDFSÉÏoutputĿ¼ÄÚÈÝ

´ÓÉÏͼÖÐÖªµÀÉú³ÉÁËÈý¸öÎļþ£¬ÎÒÃǵĽá¹ûÔÚ" part-r-00000 "ÖС£

2£©²é¿´½á¹ûÊä³öÎļþÄÚÈÝ

3¡¢WordCountÔ´Âë·ÖÎö

3.1 ÌØ±ðÊý¾ÝÀàÐͽéÉÜ

HadoopÌṩÁËÈçÏÂÄÚÈݵÄÊý¾ÝÀàÐÍ£¬ÕâЩÊý¾ÝÀàÐͶ¼ÊµÏÖÁËWritableComparable½Ó¿Ú£¬ÒÔ±ãÓÃÕâЩÀàÐͶ¨ÒåµÄÊý¾Ý¿ÉÒÔ±»ÐòÁл¯½øÐÐÍøÂç´«ÊäºÍÎļþ´æ´¢£¬ÒÔ¼°½øÐдóС±È½Ï¡£

BooleanWritable£º±ê×¼²¼¶ûÐÍÊýÖµ

ByteWritable£ºµ¥×Ö½ÚÊýÖµ

DoubleWritable£ºË«×Ö½ÚÊý

FloatWritable£º¸¡µãÊý

IntWritable£ºÕûÐÍÊý

LongWritable£º³¤ÕûÐÍÊý

Text£ºÊ¹ÓÃUTF8¸ñʽ´æ´¢µÄÎı¾

NullWritable£ºµ±<key,value>ÖеÄkey»òvalueΪ¿ÕʱʹÓÃ

3.2 ¾ÉµÄWordCount·ÖÎö

1£©Ô´´úÂë³ÌÐò

package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}

3£©Ö÷·½·¨ Main ·ÖÎö

public static void main(String[] args) throws Exception { 
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}

Ê×ÏȽ²½âһϠJob µÄ ³õʼ»¯¹ý³Ì ¡£ main º¯Êýµ÷Óà Jobconf ÀàÀ´¶Ô MapReduce Job ½øÐгõʼ»¯£¬È»ºóµ÷Óà setJobName() ·½·¨ÃüÃûÕâ¸ö Job ¡£¶ÔJob½øÐкÏÀíµÄÃüÃûÓÐÖúÓÚ ¸ü¿ì µØÕÒµ½Job£¬ÒÔ±ãÔÚJobTrackerºÍTasktrackerµÄÒ³ÃæÖÐ¶ÔÆä½øÐÐ ¼àÊÓ ¡£

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

½Ó×ÅÉèÖÃJobÊä³ö½á¹û<key,value>µÄÖÐkeyºÍvalueÊý¾ÝÀàÐÍ£¬ÒòΪ½á¹ûÊÇ<µ¥´Ê,¸öÊý>£¬ËùÒÔkeyÉèÖÃΪ"Text"ÀàÐÍ£¬Ï൱ÓÚJavaÖÐStringÀàÐÍ¡£ValueÉèÖÃΪ"IntWritable"£¬Ï൱ÓÚJavaÖеÄintÀàÐÍ¡£

conf.setOutputKeyClass(Text.class );
conf.setOutputValueClass(IntWritable.class );

È»ºóÉèÖÃJob´¦ÀíµÄMap£¨²ð·Ö£©¡¢Combiner£¨Öмä½á¹ûºÏ²¢£©ÒÔ¼°Reduce£¨ºÏ²¢£©µÄÏà¹Ø´¦ÀíÀà¡£ÕâÀïÓÃReduceÀàÀ´½øÐÐMap²úÉúµÄÖмä½á¹ûºÏ²¢£¬±ÜÃâ¸øÍøÂçÊý¾Ý´«Êä²úÉúѹÁ¦¡£

conf.setMapperClass(Map.class );
conf.setCombinerClass(Reduce.class );
conf.setReducerClass(Reduce.class );

½ÓמÍÊǵ÷ÓÃsetInputPath()ºÍsetOutputPath()ÉèÖÃÊäÈëÊä³ö·¾¶¡£

conf.setInputFormat(TextInputFormat.class );
conf.setOutputFormat(TextOutputFormat.class );

£¨1£©InputFormatºÍInputSplit

InputSplitÊÇHadoop¶¨ÒåµÄÓÃÀ´ ´«ËÍ ¸øÃ¿¸ö µ¥¶À µÄ map µÄ Êý¾Ý £¬InputSplit ´æ´¢ µÄ²¢ ·Ç Êý¾Ý±¾Éí £¬ ¶øÊÇÒ»¸ö ·ÖƬ³¤¶È ºÍÒ»¸ö ¼Ç¼Êý¾ÝλÖà µÄ Êý×é ¡£ Éú³ÉInputSplitµÄ·½·¨ ¿ÉÒÔͨ¹ý InputFormat() À´ ÉèÖà ¡£

µ±Êý¾Ý´«Ë͸ø map ʱ£¬map»á½«ÊäÈë ·ÖÆ¬ ´«Ë͵½ InputFormat £¬InputFormatÔò µ÷Óà ·½·¨ getRecordReader() Éú³É RecordReader £¬ RecordReaderÔÙͨ¹ý creatKey() ¡¢ creatValue() ·½·¨ ´´½¨ ¿É¹©map´¦ÀíµÄ <key,value> ¶Ô¡£¼ò¶øÑÔÖ®£¬InputFormat()·½·¨ÊÇÓÃÀ´Éú³É¿É¹©map´¦ÀíµÄ<key,value>¶ÔµÄ¡£

HadoopÔ¤¶¨ÒåÁ˶àÖÖ·½·¨½«²»Í¬ÀàÐ͵ÄÊäÈëÊý¾Ýת»¯ÎªmapÄܹ»´¦ÀíµÄ<key,value>¶Ô£¬ËüÃǶ¼¼Ì³Ð×ÔInputFormat£¬·Ö±ðÊÇ£º

InputFormat
|
|---BaileyBorweinPlouffe.BbpInputFormat
|---ComposableInputFormat
|---CompositeInputFormat
|---DBInputFormat
|---DistSum.Machine.AbstractInputFormat
|---FileInputFormat
|---CombineFileInputFormat
|---KeyValueTextInputFormat
|---NLineInputFormat
|---SequenceFileInputFormat
|---TeraInputFormat
|---TextInputFormat

ÆäÖÐ TextInputFormat ÊÇHadoop ĬÈÏ µÄÊäÈë·½·¨£¬ÔÚTextInputFormatÖУ¬Ã¿¸öÎļþ£¨»òÆäÒ»²¿·Ö£©¶¼»áµ¥¶ÀµØ×÷ΪmapµÄÊäÈ룬¶øÕâ¸öÊǼ̳Ð×ÔFileInputFormatµÄ¡£Ö®ºó£¬Ã¿ÐÐÊý¾Ý¶¼»áÉú³ÉÒ»Ìõ¼Ç¼£¬Ã¿Ìõ¼Ç¼Ôò±íʾ³É<key,value>ÐÎʽ£º

keyÖµÊÇÿ¸öÊý¾ÝµÄ¼Ç¼ÔÚ Êý¾Ý·ÖƬ ÖÐ ×Ö½ÚÆ«ÒÆÁ¿ £¬Êý¾ÝÀàÐÍÊÇ LongWritable £»
valueÖµÊÇÿÐеÄÄÚÈÝ£¬Êý¾ÝÀàÐÍÊÇ Text ¡£

£¨2£©OutputFormat

ÿһÖÖ Êä Èë ¸ñʽ ¶¼ÓÐÒ»ÖÖ Êä ³ö ¸ñʽ ÓëÆä¶ÔÓ¦¡£Ä¬ÈϵÄÊä³ö¸ñʽÊÇ TextOutputFormat £¬ÕâÖÖÊä³ö·½Ê½ÓëÊäÈëÀàËÆ£¬»á½«Ã¿Ìõ¼Ç¼ÒÔÒ»ÐеÄÐÎʽ´æÈëÎı¾Îļþ¡£²»¹ý£¬ËüµÄ ¼üºÍÖµ ¿ÉÒÔÊÇ ÈÎÒâÐÎʽ µÄ£¬ÒòΪ³ÌÐò ÄÚÈÝ »áµ÷Óà toString() ·½·¨½«¼üºÍֵת»»Îª String ÀàÐÍÔÙÊä³ö¡£

3£©MapÀàÖÐmap·½·¨·ÖÎö

public static class Map extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

MapÀà ¼Ì³Ð×Ô MapReduceBase £¬²¢ÇÒËüʵÏÖÁË Mapper½Ó¿Ú £¬´Ë½Ó¿ÚÊÇÒ»¸ö ¹æ·¶ÀàÐÍ £¬ËüÓÐ4ÖÖÐÎʽµÄ²ÎÊý£¬·Ö±ðÓÃÀ´Ö¸¶¨mapµÄ ÊäÈë keyÖµÀàÐÍ¡¢ ÊäÈë valueÖµÀàÐÍ¡¢ Êä³ö keyÖµÀàÐÍºÍ Êä³ö valueÖµÀàÐÍ¡£ÔÚ±¾ÀýÖУ¬ÒòΪʹÓõÄÊÇTextInputFormat£¬ËüµÄÊä³ökeyÖµÊÇLongWritableÀàÐÍ£¬Êä³övalueÖµÊÇTextÀàÐÍ£¬ËùÒÔmapµÄÊäÈëÀàÐÍΪ<LongWritable,Text>¡£ÔÚ±¾ÀýÖÐÐèÒªÊä³ö<word,1>ÕâÑùµÄÐÎʽ£¬Òò´ËÊä³öµÄkeyÖµÀàÐÍÊÇText£¬Êä³öµÄvalueÖµÀàÐÍÊÇIntWritable¡£

ʵÏִ˽ӿÚÀ໹ÐèҪʵÏÖmap·½·¨£¬map·½·¨»á¾ßÌ帺Ôð¶ÔÊäÈë½øÐвÙ×÷£¬ÔÚ±¾ÀýÖУ¬map·½·¨¶ÔÊäÈëµÄÐÐÒÔ¿Õ¸ñΪµ¥Î»½øÐÐÇз֣¬È»ºóʹÓà OutputCollect ÊÕ¼¯Êä³öµÄ<word,1>¡£

4£©ReduceÀàÖÐreduce·½·¨·ÖÎö

public static class Reduce extends MapReduceBase implements 
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

ReduceÀà Ò²ÊǼ̳Ð×Ô MapReduceBase µÄ£¬ÐèҪʵÏÖReducer½Ó¿Ú¡£ReduceÀàÒÔmapµÄÊä³ö×÷ΪÊäÈ룬Òò´ËReduceµÄÊäÈëÀàÐÍÊÇ<Text£¬Intwritable>¡£¶øReduceµÄÊä³öÊÇ µ¥´Ê ºÍ ËüµÄÊýÄ¿ £¬Òò´Ë£¬ËüµÄÊä³öÀàÐÍÊÇ<Text,IntWritable>¡£ReduceÀàҲҪʵÏÖreduce·½·¨£¬ÔÚ´Ë·½·¨ÖУ¬reduceº¯Êý½«ÊäÈëµÄkeyÖµ×÷ΪÊä³öµÄkeyÖµ£¬È»ºó½«»ñµÃ¶à¸övalueÖµ¼ÓÆðÀ´£¬×÷ΪÊä³öµÄÖµ¡£

3.3 еÄWordCount·ÖÎö

1£©Ô´´úÂë³ÌÐò

package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main (String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

1£©Map¹ý³Ì

public static class  TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}

Map¹ý³ÌÐèÒª¼Ì³Ðorg.apache.hadoop.mapreduce°üÖÐ Mapper À࣬²¢ ÖØÐ´ Æämap·½·¨¡£Í¨¹ýÔÚmap·½·¨ÖÐÌí¼ÓÁ½¾ä°ÑkeyÖµºÍvalueÖµÊä³öµ½¿ØÖÆÌ¨µÄ´úÂ룬¿ÉÒÔ·¢ÏÖmap·½·¨ÖÐvalueÖµ´æ´¢µÄÊÇÎı¾ÎļþÖеÄÒ»ÐУ¨ÒԻسµ·ûΪÐнáÊø±ê¼Ç£©£¬¶økeyֵΪ¸ÃÐеÄÊ××ÖĸÏà¶ÔÓÚÎı¾ÎļþµÄÊ×µØÖ·µÄÆ«ÒÆÁ¿¡£È»ºóStringTokenizerÀཫÿһÐвð·Ö³ÉΪһ¸ö¸öµÄµ¥´Ê£¬²¢½«<word,1>×÷Ϊmap·½·¨µÄ½á¹ûÊä³ö£¬ÆäÓàµÄ¹¤×÷¶¼½»ÓÐ MapReduce¿ò¼Ü ´¦Àí¡£

2£©Reduce¹ý³Ì

public static class  IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

Reduce¹ý³ÌÐèÒª¼Ì³Ðorg.apache.hadoop.mapreduce°üÖÐ Reducer À࣬²¢ ÖØÐ´ Æäreduce·½·¨¡£Map¹ý³ÌÊä³ö<key,values>ÖÐkeyΪµ¥¸öµ¥´Ê£¬¶øvaluesÊǶÔÓ¦µ¥´ÊµÄ¼ÆÊýÖµËù×é³ÉµÄÁÐ±í£¬MapµÄÊä³ö¾ÍÊÇReduceµÄÊäÈ룬ËùÒÔreduce·½·¨Ö»Òª±éÀúvalues²¢ÇóºÍ£¬¼´¿ÉµÃµ½Ä³¸öµ¥´ÊµÄ×Ü´ÎÊý¡£

3£©Ö´ÐÐMapReduceÈÎÎñ

public static void  main (String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

ÔÚMapReduceÖУ¬ÓÉJob¶ÔÏó¸ºÔð¹ÜÀíºÍÔËÐÐÒ»¸ö¼ÆËãÈÎÎñ£¬²¢Í¨¹ýJobµÄһЩ·½·¨¶ÔÈÎÎñµÄ²ÎÊý½øÐÐÏà¹ØµÄÉèÖᣴ˴¦ÉèÖÃÁËʹÓÃTokenizerMapperÍê³ÉMap¹ý³ÌÖеĴ¦ÀíºÍʹÓÃIntSumReducerÍê³ÉCombineºÍReduce¹ý³ÌÖеĴ¦Àí¡£»¹ÉèÖÃÁËMap¹ý³ÌºÍReduce¹ý³ÌµÄÊä³öÀàÐÍ£ºkeyµÄÀàÐÍΪText£¬valueµÄÀàÐÍΪIntWritable¡£ÈÎÎñµÄÊä³öºÍÊäÈë ·¾¶ ÔòÓÉÃüÁîÐвÎÊýÖ¸¶¨£¬²¢ÓÉFileInputFormatºÍFileOutputFormat·Ö±ðÉ趨¡£Íê³ÉÏàÓ¦ÈÎÎñµÄ²ÎÊýÉ趨ºó£¬¼´¿Éµ÷Óà job.waitForCompletion() ·½·¨Ö´ÐÐÈÎÎñ¡£

4¡¢WordCount´¦Àí¹ý³Ì

±¾½Ú½«¶ÔWordCount½øÐиüÏêϸµÄ½²½â¡£ÏêϸִÐв½ÖèÈçÏ£º

1£©½«Îļþ²ð·Ö³Ésplits£¬ÓÉÓÚ²âÊÔÓõÄÎļþ½ÏС£¬ËùÒÔÿ¸öÎļþΪһ¸ösplit£¬²¢½«Îļþ°´ÐзָîÐγÉ<key,value>¶Ô£¬Èçͼ4-1Ëùʾ¡£ÕâÒ»²½ÓÉMapReduce¿ò¼Ü×Ô¶¯Íê³É£¬ÆäÖÐÆ«ÒÆÁ¿£¨¼´keyÖµ£©°üÀ¨Á˻سµËùÕ¼µÄ×Ö·ûÊý£¨WindowsºÍLinux»·¾³»á²»Í¬£©¡£

ͼ4-1 ·Ö¸î¹ý³Ì

2£©½«·Ö¸îºÃµÄ<key,value>¶Ô½»¸øÓû§¶¨ÒåµÄmap·½·¨½øÐд¦Àí£¬Éú³ÉеÄ<key,value>¶Ô£¬Èçͼ4-2Ëùʾ¡£

ͼ4-2 Ö´ÐÐmap·½·¨

3£©µÃµ½map·½·¨Êä³öµÄ<key,value>¶Ôºó£¬Mapper»á½«ËüÃǰ´ÕÕkeyÖµ½øÐÐÅÅÐò£¬²¢Ö´ÐÐCombine¹ý³Ì£¬½«keyÖÁÏàͬvalueÖµÀÛ¼Ó£¬µÃµ½MapperµÄ×îÖÕÊä³ö½á¹û¡£Èçͼ4-3Ëùʾ¡£

ͼ4-3 Map¶ËÅÅÐò¼°Combine¹ý³Ì

4£©ReducerÏȶԴÓMapper½ÓÊÕµÄÊý¾Ý½øÐÐÅÅÐò£¬ÔÙ½»ÓÉÓû§×Ô¶¨ÒåµÄreduce·½·¨½øÐд¦Àí£¬µÃµ½ÐµÄ<key,value>¶Ô£¬²¢×÷ΪWordCountµÄÊä³ö½á¹û£¬Èçͼ4-4Ëùʾ¡£

ͼ4-4 Reduce¶ËÅÅÐò¼°Êä³ö½á¹û

5¡¢MapReduceоɸıä

Hadoop×îа汾µÄMapReduce Release 0.20.0µÄAPI°üÀ¨ÁËÒ»¸öȫеÄMapreduce JAVA API£¬ÓÐʱºòÒ²³ÆÎªÉÏÏÂÎĶÔÏó¡£

еÄAPIÀàÐÍÉϲ»¼æÈÝÒÔǰµÄAPI£¬ËùÒÔ£¬ÒÔǰµÄÓ¦ÓóÌÐòÐèÒªÖØÐ´²ÅÄÜʹеÄAPI·¢»ÓÆä×÷Óà ¡£

еÄAPIºÍ¾ÉµÄAPIÖ®¼äÓÐÏÂÃæ¼¸¸öÃ÷ÏÔµÄÇø±ð¡£

еÄAPIÇãÏòÓÚʹÓóéÏóÀ࣬¶ø²»Êǽӿڣ¬ÒòΪÕâ¸üÈÝÒ×À©Õ¹¡£ÀýÈ磬Äã¿ÉÒÔÌí¼ÓÒ»¸ö·½·¨(ÓÃĬÈϵÄʵÏÖ)µ½Ò»¸ö³éÏóÀà¶ø²»ÐèÐÞ¸ÄÀà֮ǰµÄʵÏÖ·½·¨¡£ÔÚеÄAPIÖУ¬MapperºÍReducerÊdzéÏóÀà¡£

еÄAPIÊÇÔÚorg.apache.hadoop.mapreduce°ü(ºÍ×Ó°ü)Öеġ£Ö®Ç°°æ±¾µÄAPIÔòÊÇ·ÅÔÚorg.apache.hadoop.mapredÖеġ£

еÄAPI¹ã·ºÊ¹ÓÃcontext object(ÉÏÏÂÎĶÔÏó)£¬²¢ÔÊÐíÓû§´úÂëÓëMapReduceϵͳ½øÐÐͨÐÅ¡£ÀýÈ磬MapContext»ù±¾Éϳ䵱×ÅJobConfµÄOutputCollectorºÍReporterµÄ½ÇÉ«¡£

еÄAPIͬʱ֧³Ö"ÍÆ"ºÍ"À­"ʽµÄµü´ú¡£ÔÚÕâÁ½¸öÐÂÀÏAPIÖУ¬¼ü/Öµ¼Ç¼¶Ô±»ÍÆmapperÖУ¬µ«³ý´ËÖ®Í⣬еÄAPIÔÊÐí°Ñ¼Ç¼´Ómap()·½·¨ÖÐÀ­³ö£¬ÕâÒ²ÊÊÓÃÓÚreducer¡£"À­"ʽµÄÒ»¸öÓÐÓõÄÀý×ÓÊÇ·ÖÅú´¦Àí¼Ç¼£¬¶ø²»ÊÇÒ»¸ö½ÓÒ»¸ö¡£

еÄAPIͳһÁËÅäÖ᣾ɵÄAPIÓÐÒ»¸öÌØÊâµÄJobConf¶ÔÏóÓÃÓÚ×÷ÒµÅäÖã¬ÕâÊÇÒ»¸ö¶ÔÓÚHadoopͨ³£µÄConfiguration¶ÔÏóµÄÀ©Õ¹¡£ÔÚеÄAPIÖУ¬ÕâÖÖÇø±ðûÓÐÁË£¬ËùÒÔ×÷ÒµÅäÖÃͨ¹ýConfigurationÀ´Íê³É¡£×÷Òµ¿ØÖƵÄÖ´ÐÐÓÉJobÀàÀ´¸ºÔ𣬶ø²»ÊÇJobClient£¬ËüÔÚеÄAPIÖÐÒѾ­µ´È»ÎÞ´æ¡£

   
3664 ´Îä¯ÀÀ       32
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí