1.MapReduce¸ÅÊö
Hadoop Map/ReduceÊÇÒ»¸öʹÓüòÒ×µÄÈí¼þ¿ò¼Ü£¬»ùÓÚËüд³öÀ´µÄÓ¦ÓóÌÐòÄܹ»ÔËÐÐÔÚÓÉÉÏǧ¸öÉÌÓûúÆ÷×é³ÉµÄ´óÐͼ¯ÈºÉÏ£¬²¢ÒÔÒ»ÖÖ¿É¿¿ÈÝ´íµÄ·½Ê½²¢Ðд¦ÀíÉÏT¼¶±ðµÄÊý¾Ý¼¯¡£
Ò»¸öMap/Reduce ×÷Òµ£¨job£© ͨ³£»á°ÑÊäÈëµÄÊý¾Ý¼¯ÇзÖΪÈô¸É¶ÀÁ¢µÄÊý¾Ý¿é£¬ÓÉ mapÈÎÎñ£¨task£©ÒÔÍêÈ«²¢Ðеķ½Ê½´¦ÀíËüÃÇ¡£¿ò¼Ü»á¶ÔmapµÄÊä³öÏȽøÐÐÅÅÐò£¬ È»ºó°Ñ½á¹ûÊäÈë¸øreduceÈÎÎñ¡£Í¨³£×÷ÒµµÄÊäÈëºÍÊä³ö¶¼»á±»´æ´¢ÔÚÎļþϵͳÖС£ Õû¸ö¿ò¼Ü¸ºÔðÈÎÎñµÄµ÷¶ÈºÍ¼à¿Ø£¬ÒÔ¼°ÖØÐÂÖ´ÐÐÒѾʧ°ÜµÄÈÎÎñ¡£
ͨ³££¬Map/Reduce¿ò¼ÜºÍ·Ö²¼Ê½ÎļþϵͳÊÇÔËÐÐÔÚÒ»×éÏàͬµÄ½ÚµãÉϵģ¬Ò²¾ÍÊÇ˵£¬¼ÆËã½ÚµãºÍ´æ´¢½Úµãͨ³£ÔÚÒ»Æð¡£ÕâÖÖÅäÖÃÔÊÐí¿ò¼ÜÔÚÄÇЩÒѾ´æºÃÊý¾ÝµÄ½ÚµãÉϸßЧµØµ÷¶ÈÈÎÎñ£¬Õâ¿ÉÒÔʹÕû¸ö¼¯ÈºµÄÍøÂç´ø¿í±»·Ç³£¸ßЧµØÀûÓá£
Map/Reduce¿ò¼ÜÓÉÒ»¸öµ¥¶ÀµÄmaster JobTracker ºÍÿ¸ö¼¯Èº½ÚµãÒ»¸öslave TaskTracker¹²Í¬×é³É¡£master¸ºÔðµ÷¶È¹¹³ÉÒ»¸ö×÷ÒµµÄËùÓÐÈÎÎñ£¬ÕâЩÈÎÎñ·Ö²¼ÔÚ²»Í¬µÄslaveÉÏ£¬master¼à¿ØËüÃǵÄÖ´ÐУ¬ÖØÐÂÖ´ÐÐÒѾʧ°ÜµÄÈÎÎñ¡£¶øslave½ö¸ºÔðÖ´ÐÐÓÉmasterÖ¸ÅɵÄÈÎÎñ¡£
Ó¦ÓóÌÐòÖÁÉÙÓ¦¸ÃÖ¸Ã÷ÊäÈë/Êä³öµÄλÖã¨Â·¾¶£©£¬²¢Í¨¹ýʵÏÖºÏÊʵĽӿڻò³éÏóÀàÌṩmapºÍreduceº¯Êý¡£ÔÙ¼ÓÉÏÆäËû×÷ÒµµÄ²ÎÊý£¬¾Í¹¹³ÉÁË×÷ÒµÅäÖã¨job configuration£©¡£È»ºó£¬HadoopµÄ job clientÌá½»×÷Òµ£¨jar°ü/¿ÉÖ´ÐгÌÐòµÈ£©ºÍÅäÖÃÐÅÏ¢¸øJobTracker£¬ºóÕ߸ºÔð·Ö·¢ÕâЩÈí¼þºÍÅäÖÃÐÅÏ¢¸øslave¡¢µ÷¶ÈÈÎÎñ²¢¼à¿ØËüÃǵÄÖ´ÐУ¬Í¬Ê±Ìṩ״̬ºÍÕï¶ÏÐÅÏ¢¸øjob-client¡£
ËäÈ»Hadoop¿ò¼ÜÊÇÓÃJavaʵÏֵ쬵«Map/ReduceÓ¦ÓóÌÐòÔò²»Ò»¶¨ÒªÓà JavaÀ´Ð´ ¡£

2.ÑùÀý·ÖÎö£ºµ¥´Ê¼ÆÊý
1¡¢WordCountÔ´Âë·ÖÎö
µ¥´Ê¼ÆÊýÊÇ×î¼òµ¥Ò²ÊÇ×îÄÜÌåÏÖMapReduce˼ÏëµÄ³ÌÐòÖ®Ò»£¬¸Ã³ÌÐòÍêÕûµÄ´úÂë¿ÉÒÔÔÚHadoop°²×°°üµÄsrc/examplesĿ¼ÏÂÕÒµ½
µ¥´Ê¼ÆÊýÖ÷ÒªÍê³ÉµÄ¹¦ÄÜÊÇ£ºÍ³¼ÆÒ»ÏµÁÐÎı¾ÎļþÖÐÿ¸öµ¥´Ê³öÏֵĴÎÊý£¬ÈçͼËùʾ£º

£¨1£©Map¹ý³Ì
Map¹ý³ÌÐèÒª¼Ì³Ðorg.apache.hadoop.mapreduce°üÖеÄMapperÀ࣬²¢ÖØÐ´map·½·¨
ͨ¹ýÔÚmap·½·¨ÖÐÌí¼ÓÁ½¾ä°ÑkeyÖµºÍvalueÖµÊä³öµ½¿ØÖÆÌ¨µÄ´úÂ룬¿ÉÒÔ·¢ÏÖmap·½·¨ÖеÄvalueÖµ´æ´¢µÄÊÇÎı¾ÎļþÖеÄÒ»ÐУ¨ÒԻسµ·û×÷ΪÐнáÊø±ê¼Ç£©£¬¶økeyֵΪ¸ÃÐеÄÊ××Ö·ûÏà¶ÔÓÚÎı¾ÎļþµÄÊ×µØÖ·µÄÆ«ÒÆÁ¿¡£È»ºóStringTokenizerÀཫÿһÐвð·Ö³ÉÒ»¸ö¸öµÄµ¥´Ê£¬²¢½«<word,1>×÷Ϊmap·½·¨µÄ½á¹ûÊä³ö£¬ÆäÓàµÄ¹¤×÷¶¼½»ÓÉMapReduce¿ò¼Ü´¦Àí¡£ÆäÖÐIntWritableºÍTextÀàÊÇHadoop¶ÔintºÍstringÀàµÄ·â×°£¬ÕâЩÀàÄܹ»±»´®Ðл¯£¬ÒÔ·½±ãÔÚ·Ö²¼Ê½»·¾³ÖнøÐÐÊý¾Ý½»»»¡£
TokenizerMapperµÄʵÏÖ´úÂëÈçÏ£º
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key = " + key.toString());//Ìí¼Ó²é¿´keyÖµ
System.out.println("value = " + value.toString());//Ìí¼Ó²é¿´valueÖµ
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
|
£¨2£©Reduce¹ý³Ì
Reduce¹ý³ÌÐèÒª¼Ì³Ðorg.apache.hadoop.mapreduce°üÖеÄReducerÀ࣬²¢ÖØÐ´reduce·½·¨
reduce·½·¨µÄÊäÈë²ÎÊýkeyΪµ¥¸öµ¥´Ê£¬¶øvaluesÊÇÓɸ÷MapperÉ϶ÔÓ¦µ¥´ÊµÄ¼ÆÊýÖµËù×é³ÉµÄÁÐ±í£¬ËùÒÔÖ»Òª±éÀúvalues²¢ÇóºÍ£¬¼´¿ÉµÃµ½Ä³¸öµ¥´ÊµÄ³öÏÖ×Ü´ÎÊý
IntSumReduceÀàµÄʵÏÖ´úÂëÈçÏ£º
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
|
£¨3£©Ö´ÐÐMapReduceÈÎÎñ
ÔÚMapReduceÖУ¬ÓÉJob¶ÔÏó¸ºÔð¹ÜÀíºÍÔËÐÐÒ»¸ö¼ÆËãÈÎÎñ£¬²¢Í¨¹ýJobµÄһЩ·½·¨¶ÔÈÎÎñµÄ²ÎÊý½øÐÐÏà¹ØµÄÉèÖᣴ˴¦ÉèÖÃÁËʹÓÃTokenizerMapperÍê³ÉMap¹ý³ÌºÍʹÓõÄIntSumReduceÍê³ÉCombineºÍReduce¹ý³Ì¡£»¹ÉèÖÃÁËMap¹ý³ÌºÍReduce¹ý³ÌµÄÊä³öÀàÐÍ£ºkeyµÄÀàÐÍΪText£¬valueµÄÀàÐÍΪIntWritable¡£ÈÎÎñµÄÊäÈëºÍÊä³ö·¾¶ÔòÓÉÃüÁîÐвÎÊýÖ¸¶¨£¬²¢ÓÉFileInputFormatºÍFileOutputFormat·Ö±ðÉ趨¡£Íê³ÉÏàÓ¦ÈÎÎñµÄ²ÎÊýÉ趨ºó£¬¼´¿Éµ÷ÓÃjob.waitForCompletion()·½·¨Ö´ÐÐÈÎÎñ£¬Ö÷º¯ÊýʵÏÖÈçÏ£º
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount ");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(wordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
|
ÔËÐнá¹ûÈçÏ£º
14/12/17 05:53:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 05:53:26 INFO mapred.JobClient: Running job: job_local_0001
14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 05:53:26 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680
key = 0
value = Hello World
key = 12
value = Bye World
14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output
14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0
14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0′ done.
14/12/17 05:53:27 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680
14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output
key = 0
value = Hello Hadoop
key = 13 value = Bye Hadoop
14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0
14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0′ done.
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.Merger: Merging 2 sorted segments
14/12/17 05:53:27 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 73 bytes
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/17 05:53:27 INFO mapred.LocalJobRunner:
14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/17 05:53:27 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to out
14/12/17 05:53:27 INFO mapred.LocalJobRunner: reduce > reduce
14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0′ done.
14/12/17 05:53:27 INFO mapred.JobClient: map 100% reduce 100%
14/12/17 05:53:27 INFO mapred.JobClient: Job complete: job_local_0001
14/12/17 05:53:27 INFO mapred.JobClient: Counters: 14
14/12/17 05:53:27 INFO mapred.JobClient: FileSystemCounters
14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_READ=17886
14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_READ=52932
14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54239
14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=71431
14/12/17 05:53:27 INFO mapred.JobClient: Map-Reduce Framework
14/12/17 05:53:27 INFO mapred.JobClient: Reduce input groups=4
14/12/17 05:53:27 INFO mapred.JobClient: Combine output records=6
14/12/17 05:53:27 INFO mapred.JobClient: Map input records=4
14/12/17 05:53:27 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/17 05:53:27 INFO mapred.JobClient: Reduce output records=4
14/12/17 05:53:27 INFO mapred.JobClient: Spilled Records=12
14/12/17 05:53:27 INFO mapred.JobClient: Map output bytes=78
14/12/17 05:53:27 INFO mapred.JobClient: Combine input records=8
14/12/17 05:53:27 INFO mapred.JobClient: Map output records=8
14/12/17 05:53:27 INFO mapred.JobClient: Reduce input records=6
2¡¢WordCount´¦Àí¹ý³Ì
ÉÏÃæ¸ø³öÁËWordCountµÄÉè¼ÆË¼Â·ºÍÔ´Â룬µ«ÊÇûÓÐÉîÈëϸ½Ú£¬ÏÂÃæ¶ÔWordCount½øÐиü¼ÓÏêϸµÄ·ÖÎö£º
£¨1£©½«Îļþ²ð·Ö³Ésplits£¬ÓÉÓÚ²âÊÔÓõÄÎļþ½ÏС£¬ËùÒÔÿһ¸öÎļþΪһ¸ösplit£¬²¢½«Îļþ°´Ðзָî³É<key, value>¶Ô£¬Èçͼ£¬ÕâÒ»²½ÓÉMapreduce¿ò¼Ü×Ô¶¯Íê³É£¬ÆäÖÐÆ«ÒÆÁ¿°üÀ¨Á˻سµËùÕ¼µÄ×Ö·û
£¨2£©½«·Ö¸îºÃµÄ<key, value>¶Ô½»¸øÓû§¶¨ÒåµÄmap·½·¨½øÐд¦Àí£¬Éú³ÉеÄ<key, value>¶Ô
£¨3£©µÃµ½map·½·¨Êä³öµÄ<key, value>¶Ôºó£¬Mapper»á½«ËüÃǰ´ÕÕkeyÖµ½øÐÐÅÅÐò£¬²¢Ö´ÐÐCombine¹ý³Ì£¬½«keyÖµÏàͬµÄvalueÖµÀÛ¼Ó£¬µÃµ½MapperµÄ×îÖÕÊä³ö½á¹û£¬Èçͼ£º

£¨4£©ReduceÏȶԴÓMapper½ÓÊÕµÄÊý¾Ý½øÐÐÅÅÐò£¬ÔÙ½»ÓÉÓû§×Ô¶¨ÒåµÄreduce·½·¨½øÐд¦Àí£¬µÃµ½ÐµÄ<key, value>¶Ô£¬²¢×÷ΪWordCountµÄÊä³ö½á¹û£¬Èçͼ£º

3.MapReduce£¬Äã¹»Á˽âÂð£¿
MapReduce¿ò¼ÜÔÚÄ»ºóĬĬµØÍê³ÉÁ˺ܶàµÄÊÂÇ飬Èç¹û²»ÖØÐ´mapºÍreduce·½·¨£¬»á³öÏÖʲôÇé¿öÄØ£¿
ÏÂÃæÀ´ÊµÏÖÒ»¸ö¼ò»¯µÄMapReduce£¬Ð½¨Ò»¸öLazyMapReduce£¬¸ÃÀàÖ»¶ÔÈÎÎñ½øÐбØÒªµÄ³õʼ»¯¼°ÊäÈë/Êä³ö·¾¶µÄÉèÖã¬ÆäÓàµÄ²ÎÊý¾ù±£³ÖĬÈÏ
´úÂëÈçÏ£º
public class LazyMapReduce {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage:wordcount");
System.exit(2);
}
Job job = new Job(conf, "LazyMapReduce");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0:1);
}
}
|
ÔËÐнá¹ûΪ£º
14/12/17 23:04:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 23:04:14 INFO mapred.JobClient: Running job: job_local_0001
14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 23:04:14 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 23:04:15 INFO mapred.JobClient: map 0% reduce 0%
14/12/17 23:04:18 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 23:04:18 INFO mapred.MapTask: record buffer = 262144/327680
14/12/17 23:04:18 INFO mapred.MapTask: Starting flush of map output
14/12/17 23:04:19 INFO mapred.MapTask: Finished spill 0
14/12/17 23:04:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/17 23:04:19 INFO mapred.LocalJobRunner:
14/12/17 23:04:19 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0′ done.
14/12/17 23:04:20 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 23:04:20 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/327680
14/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output
14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 0
14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0′ done.
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.Merger: Merging 2 sorted segments
14/12/17 23:04:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 90 bytes
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/17 23:04:20 INFO mapred.LocalJobRunner:
14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/17 23:04:20 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to out
14/12/17 23:04:20 INFO mapred.LocalJobRunner: reduce > reduce
14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0′ done.
14/12/17 23:04:20 INFO mapred.JobClient: map 100% reduce 100%
14/12/17 23:04:20 INFO mapred.JobClient: Job complete: job_local_0001
14/12/17 23:04:20 INFO mapred.JobClient: Counters: 14
14/12/17 23:04:20 INFO mapred.JobClient: FileSystemCounters
14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_READ=46040
14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_READ=51471
14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=52808
14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=98132
14/12/17 23:04:20 INFO mapred.JobClient: Map-Reduce Framework
14/12/17 23:04:20 INFO mapred.JobClient: Reduce input groups=3
14/12/17 23:04:20 INFO mapred.JobClient: Combine output records=0
14/12/17 23:04:20 INFO mapred.JobClient: Map input records=4
14/12/17 23:04:20 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/17 23:04:20 INFO mapred.JobClient: Reduce output records=4
14/12/17 23:04:20 INFO mapred.JobClient: Spilled Records=8
14/12/17 23:04:20 INFO mapred.JobClient: Map output bytes=78
14/12/17 23:04:20 INFO mapred.JobClient: Combine input records=0
14/12/17 23:04:20 INFO mapred.JobClient: Map output records=4
14/12/17 23:04:20 INFO mapred.JobClient: Reduce input records=4
¿É¼ûÔÚĬÈÏÇé¿öÏ£¬MapReduceÔ·â²»¶¯µØ½«ÊäÈë<key, value>дµ½Êä³ö
ÏÂÃæ½éÉÜMapReduceµÄ²¿·Ö²ÎÊý¼°ÆäĬÈÏÉèÖãº
£¨1£©InputFormatÀà
¸ÃÀàµÄ×÷ÓÃÊǽ«ÊäÈëµÄÊý¾Ý·Ö¸î³ÉÒ»¸ö¸öµÄsplit£¬²¢½«split½øÒ»²½²ð·Ö³É<key, value>¶Ô×÷Ϊmapº¯ÊýµÄÊäÈë
£¨2£©MapperÀà
ʵÏÖmapº¯Êý£¬¸ù¾ÝÊäÈëµÄ<key, value>¶ÔÉú²úÖмä½á¹û
£¨3£©Combiner
ʵÏÖcombineº¯Êý£¬ºÏ²¢Öмä½á¹ûÖоßÓÐÏàͬkeyÖµµÄ¼üÖµ¶Ô¡£
£¨4£©PartitionerÀà
ʵÏÖgetPartitionº¯Êý£¬ÓÃÓÚÔÚShuffle¹ý³Ì°´ÕÕkeyÖµ½«ÖмäÊý¾Ý·Ö³ÉR·Ý£¬Ã¿Ò»·ÝÓÉÒ»¸öReduce¸ºÔð
£¨5£©ReducerÀà
ʵÏÖreduceº¯Êý£¬½«Öмä½á¹ûºÏ²¢£¬µÃµ½×îÖյĽá¹û
£¨6£©OutputFormatÀà
¸ÃÀฺÔðÊä³ö×îÖյĽá¹û
ÉÏÃæµÄ´úÂë¿ÉÒÔ¸ÄдΪ:
public class LazyMapReduce {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage:wordcount");
System.exit(2);
}
Job job = new Job(conf, "LazyMapReduce");
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Mapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(HashPartitioner.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(FileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0:1);
}
}
|
²»¹ýÓÉÓÚ°æ±¾ÎÊÌ⣬ÏÔʾÓÐЩÀàÒѾ¹ýʱ |