ÎÒÃÇ»ùÓÚHadoop 1.2.1Ô´Âë·ÖÎöMapReduce V1µÄ´¦ÀíÁ÷³Ì¡£
MapReduce V1ʵÏÖÖУ¬Ö÷Òª´æÔÚ3¸öÖ÷ÒªµÄ·Ö²¼Ê½½ø³Ì£¨½ÇÉ«£©£ºJobClient¡¢JobTrackerºÍTaskTracker£¬ÎÒÃÇÖ÷ÒªÊÇÒÔÕâÈý¸ö½ÇÉ«µÄʵ¼Ê´¦Àí»î¶¯ÎªÖ÷Ïߣ¬²¢½áºÏÔ´Â룬·ÖÎöʵ¼Ê´¦ÀíÁ÷³Ì¡£ÏÂͼÊÇ¡¶HadoopȨÍþÖ¸ÄÏ¡·Ò»Ê鏸³öµÄMapReduce
V1´¦ÀíJobµÄ³éÏóÁ÷³Ìͼ£º

ÈçÉÏͼ£¬ÎÒÃÇÕ¹¿ªÒõÓ°²¿·ÖµÄ´¦ÀíÂß¼£¬Ïêϸ·ÖÎöJobÌá½»ÔÚJobClient¶ËµÄ¾ßÌåÁ÷³Ì¡£
ÔÚ±àдºÃMapReduce³ÌÐòÒÔºó£¬ÐèÒª½«JobÌá½»¸øJobTracker£¬ÄÇôÎÒÃǾÍÐèÒªÁ˽âÔÚÌá½»JobµÄ¹ý³ÌÖУ¬ÔÚJobClient¶Ë¶¼×öÁËÄÄЩ¹¤×÷£¬»òÕß˵ִÐÐÁËÄÄЩ´¦Àí¡£ÔÚJobClient¶ËÌá½»JobµÄ´¦ÀíÁ÷³Ì£¬ÈçÏÂͼËùʾ£º

ÉÏͼËùÃèÊöµÄJobµÄÌá½»Á÷³Ì£¬ËµÃ÷ÈçÏÂËùʾ£º
ÔÚMR³ÌÐòÖд´½¨Ò»¸öJobʵÀý£¬ÉèÖÃJob״̬
´´½¨Ò»¸öJobClientʵÀý£¬×¼±¸½«´´½¨µÄJobʵÀýÌá½»µ½JobTracker
ÔÚ´´½¨JobClientµÄ¹ý³ÌÖУ¬Ê×ÏȱØÐë±£Ö¤½¨Á¢µ½JobTrackerµÄRPCÁ¬½Ó
»ùÓÚJobSubmissionProtocolÐÒéÔ¶³Ìµ÷ÓÃJobTracker»ñȡһ¸öеÄJob
ID
¸ù¾ÝMR³ÌÐòÖÐÅäÖõÄJob£¬ÔÚHDFSÉÏ´´½¨JobÏà¹ØÄ¿Â¼£¬²¢½«ÅäÖõÄtmpfiles¡¢tmpjars¡¢tmparchives£¬ÒÔ¼°Job¶ÔÓ¦jarÎļþµÈ×ÊÔ´¸´ÖƵ½HDFS
¸ù¾ÝJobÅäÖõÄInputFormat£¬¼ÆËã¸ÃJobÊäÈëµÄSplitÐÅÏ¢ºÍÔªÊý¾Ý£¨SplitMetaInfo£©ÐÅÏ¢£¬ÒÔ¼°¼ÆËã³ömapºÍreduceµÄ¸öÊý£¬×îºó½«ÕâЩÐÅÏ¢Á¬Í¨JobÅäÖÃдÈëµ½HDFS£¨±£Ö¤JobTrackerÄܹ»¶ÁÈ¡£©
ͨ¹ýJobClient»ùÓÚJobSubmissionProtocolÐÒé·½·¨submitJobÌá½»Jobµ½JobTracker
MR³ÌÐò´´½¨Job
ÏÂÃæµÄMR³ÌÐòʾÀý´úÂ룬ÒѾºÜÊìϤÁË£º
public static void main(String[] args) throws Exception { 02 Configuration conf = new Configuration(); 03 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 04 if (otherArgs.length != 2) { 05 System.err.println("Usage: wordcount <in> <out>"); 06 System.exit(2); 07 } 08 Job job = new Job(conf, "word count"); 09 job.setJarByClass(WordCount.class); 10 job.setMapperClass(TokenizerMapper.class); 11 job.setCombinerClass(IntSumReducer.class); 12 job.setReducerClass(IntSumReducer.class); 13 job.setOutputKeyClass(Text.class); 14 job.setOutputValueClass(IntWritable.class); 15 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 16 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 17 System.exit(job.waitForCompletion(true) ? 0 : 1); 18 }
|
ÔÚMR³ÌÐòÖУ¬Ê×ÏÈ´´½¨Ò»¸öJob£¬²¢½øÐÐÅäÖã¬È»ºóͨ¹ýµ÷ÓÃJobµÄwaitForCompletion·½·¨½«JobÌá½»µ½MapReduce¼¯Èº¡£Õâ¸ö¹ý³ÌÖУ¬Job´æÔÚÁ½ÖÖ״̬£ºJob.JobState.DEFINEºÍJob.JobState.RUNNING£¬´´½¨Ò»¸öJobºó£¬¸ÃJobµÄ״̬ΪJob.JobState.DEFINE£¬JobÄÚ²¿Í¨¹ýJobClient»ùÓÚorg.apache.hadoop.mapred.JobSubmissionProtocolÐÒéÌá½»¸øJobTracker£¬È»ºó¸ÃJobµÄ״̬±äΪJob.JobState.RUNNING¡£
JobÌύĿ¼submitJobDir
ͨ¹ýÈçÏ´úÂë¿ÉÒÔ¿´µ½£¬JobÌύĿ¼ÊÇÈçºÎ´´½¨µÄ£º
JobConf jobCopy = job; Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); // »ñÈ¡µ½StagingAreaĿ¼ JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(jobStagingArea, jobId.toString());
|
»ñÈ¡StagingAreaĿ¼£¬JobClientÐèҪͨ¹ýJobSubmissionProtocolÐÒéµÄÔ¶³Ì·½·¨getStagingAreaDir´ÓJobTracker¶Ë»ñÈ¡µ½£¬ÎÒÃÇ¿´Ò»ÏÂJobTracker¶ËµÄgetStagingAreaDirInternal·½·¨£¬ÈçÏÂËùʾ£º
private String getStagingAreaDirInternal(String user) throws IOException { 2 final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging")); 3 final FileSystem fs = stagingRootDir.getFileSystem(conf); 4 return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString(); 5 }
|
×îÖÕ»ñÈ¡µ½µÄStagingAreaĿ¼Ϊ${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/£¬ÀýÈ磬Èç¹ûʹÓÃĬÈϵÄmapreduce.jobtracker.staging.root.dirÖµ£¬Óû§Îªshirdrn£¬ÔòStagingAreaĿ¼/tmp/hadoop/mapred/staging/shirdrn/.staging/¡£Í¨¹ýPath
submitJobDir = new Path(jobStagingArea, jobId.toString());¿ÉÒԵõ½submitJobDir£¬¼ÙÈçÒ»¸öjobµÄIDΪjob_200912121733_0002£¬ÔòsubmitJobDirµÄֵΪ/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/
¿½±´×ÊÔ´Îļþ
ÔÚÅäÖÃJobµÄʱºò£¬¿ÉÒÔÖ¸¶¨tmpfiles¡¢tmpjars¡¢tmparchives£¬JobClient»á½«¶ÔÓ¦µÄ×ÊÔ´Îļþ¿½±´µ½Ö¸¶¨µÄĿ¼ÖУ¬¶ÔӦĿ¼ÈçÏ´úÂëËùʾ£º
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); 2 Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); 3 Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); 4 ... 5 Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir); 6 job.setJar(submitJarFile.toString()); 7 fs.copyFromLocalFile(originalJarFile, submitJarFile);
|
ÉÏÃæÒѾ֪µÀJobÌύĿ¼£¬¿ÉÒÔ·Ö±ðµÃµ½¶ÔÓ¦µÄ×ÊÔ´ËùÔÚĿ¼£º
tmpfilesĿ¼£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files
tmpjarsĿ¼£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars
tmparchivesĿ¼£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives
Job JarÎļþ£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar
È»ºó£¬¾Í¿ÉÒÔ½«¶ÔÓ¦µÄ×ÊÔ´Îļþ¿½±´µ½¶ÔÓ¦µÄĿ¼ÖС£
¼ÆËã²¢´æ´¢SplitÊý¾Ý
¸ù¾ÝJobÅäÖÃÖÐÉèÖõÄInputFormat£¬¼ÆËã¸ÃJobµÄÊý¾ÝÊý¾ÝÎļþÊÇÈçºÎ½øÐÐ·ÖÆ¬µÄ£¬´úÂëÈçÏÂËùʾ£º
Configuration conf = job.getConfiguration(); 2 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); 3 List<InputSplit> splits = input.getSplits(job);
|
ʵ¼ÊÉϾÍÊǵ÷ÓÃInputFormatµÄgetSplits·½·¨£¬Èç¹û²»ÊÊÓÃHadoop×Ô´øµÄFileInputFormatµÄĬÈÏgetSplits·½·¨ÊµÏÖ£¬¿ÉÒÔ×Ô¶¨ÒåʵÏÖ£¬ÖØÐ´¸ÃĬÈÏʵÏÖÂß¼À´¶¨ÒåÊý¾ÝÊý¾ÝÎļþ·ÖƬµÄ¹æÔò¡£
¼ÆËã³öÊäÈëÎļþµÄ·ÖƬÐÅÏ¢£¬È»ºóÐèÒª½«ÕâЩ·ÖƬÊý¾ÝдÈëµ½HDFS¹©JobTracker²éѯ³õʼ»¯MapTask£¬Ð´Èë·ÖƬÊý¾ÝµÄʵÏÖ´úÂ룺
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); 2 // sort the splits into order based on size, so that the biggest 3 // go first 4 Arrays.sort(array, new SplitComparator()); // ¸ù¾ÝInputSplitµÄ³¤¶È×öÁËÒ»¸öÄæÐòÅÅÐò 5 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); // ½«split¼°ÆäÔªÊý¾ÝÐÅϢдÈëHDFS |
½Ó×ŵ÷ÓÃJobSplitWriter.createSplitFiles·½·¨´æ´¢SplitÐÅÏ¢£¬²¢´´½¨ÔªÊý¾ÝÐÅÏ¢£¬²¢±£´æÔªÊý¾ÝÐÅÏ¢¡£´æ´¢SplitÐÅÏ¢£¬´úÂëʵÏÖÈçÏÂËùʾ£º
SerializationFactory factory = new SerializationFactory(conf); 02 int i = 0; 03 long offset = out.getPos(); 04 for(T split: array) { 05 long prevCount = out.getPos(); 06 Text.writeString(out, split.getClass().getName()); 07 Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass()); 08 serializer.open(out); 09 serializer.serialize(split); // ½«splitÐòÁл¯Ð´Èëµ½HDFSÎļþÖÐ 10 long currCount = out.getPos(); 11 String[] locations = split.getLocations(); 12 final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10); 13 if (locations.length > max_loc) { 14 LOG.warn("Max block location exceeded for split: "+ split + " splitsize: " + locations.length + " maxsize: " + max_loc); 15 locations = Arrays.copyOf(locations, max_loc); 16 } 17 info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); // ´´½¨SplitMetaInfoʵÀý 18 offset += currCount - prevCount; 19 }
|
ÎÒÃÇÏÈ¿´Ò»ÏÂFileSplit°üº¬µÄ·ÖƬÄÚÈÝ£¬ÈçÏÂËùʾ£º
private Path file; 2 private long start; 3 private long length; 4 private String[] hosts;
|
ÔÚÐòÁл¯±£´æFileSplitµ½HDFS£¬¿ÉÒÔͨ¹ý²é¿´FileSplitµÄwrite·½·¨£¬ÈçÏÂËùʾ£º
@Override 2 public void write(DataOutput out) throws IOException { 3 Text.writeString(out, file.toString()); 4 out.writeLong(start); 5 out.writeLong(length); 6 }
|
ÐèҪעÒâµÄÊÇ£¬ÕâÀïÃæ²¢Ã»Óн«FileSplitµÄhostsÐÅÏ¢±£´æ£¬¶øÊÇ´æ´¢µ½ÁËSplitMetaInfoÖÐnew
JobSplit.SplitMetaInfo(locations, offset, split.getLength())¡£
ÏÂÃæÊDZ£´æSplitMetaInfoÐÅÏ¢µÄʵÏÖ£º
private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, 02 FsPermission p, int splitMetaInfoVersion, 03 JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException { 04 // write the splits meta-info to a file for the job tracker 05 FSDataOutputStream out = FileSystem.create(fs, filename, p); 06 out.write(JobSplit.META_SPLIT_FILE_HEADER);
// дÈëMETAÍ·ÐÅÏ¢£ºMETA_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8"); 07 WritableUtils.writeVInt(out, splitMetaInfoVersion); // META°æ±¾ÐÅÏ¢£º1 08 WritableUtils.writeVInt(out, allSplitMetaInfo.length); // META¶ÔÏóµÄÊýÁ¿£ºÃ¿¸öInputSplit¶ÔÓ¦Ò»¸öSplitMetaInfo 09 for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) { 10 splitMetaInfo.write(out); // ÿ¸ö¶¼½øÐд洢 11 } 12 out.close(); 13 }
|
¿´Ò»ÏÂSplitMetaInfo´æ´¢Ê±°üº¬µÄÊý¾ÝÐÅÏ¢£º
public void write(DataOutput out) throws IOException { 2 WritableUtils.writeVInt(out, locations.length); // location¸öÊý 3 for (int i = 0; i < locations.length; i++) { 4 Text.writeString(out, locations[i]); // дÈëÿһ¸ölocationλÖÃÐÅÏ¢ 5 } 6 WritableUtils.writeVLong(out, startOffset); // Æ«ÒÆÁ¿ 7 WritableUtils.writeVLong(out, inputDataLength); // Êý¾Ý³¤¶È 8 }
|
×îºó£¬ÎÒÃÇ¿´Ò»ÏÂÕâЩÊý¾Ý±£´æµÄĿ¼ºÍÎļþÇé¿ö¡£Ç°ÃæÒѾ֪µÀJobÌύĿ¼£¬ÏÂÃæ¿´split´æ´¢µÄÎļþÊÇÈçºÎ¹¹½¨µÄ£º
FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); 2 SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
|
ÄÇôsplit±£´æµÄÎļþΪ£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split¡£
ͬÑù£¬splitÔªÊý¾ÝÐÅÏ¢Îļþ¹¹½¨ÈçÏÂËùʾ£º
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 2 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);
|
splitÔªÊý¾ÝÐÅÏ¢ÎļþΪ£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo¡£
±£´æJobÅäÖÃÊý¾Ý
ÔÚÌá½»Jobµ½JobTracker֮ǰ£¬»¹ÐèÒª±£´æJobµÄÅäÖÃÐÅÏ¢£¬ÕâЩÅäÖÃÊý¾Ý¸ù¾ÝÓû§ÔÚMR³ÌÐòÖÐÅäÖ㬸²¸ÇĬÈϵÄÅäÖÃÖµ£¬×îºó±£´æµ½XMLÎļþ£¨job.xml£©µ½HDFS£¬¹©JobTracker²éѯ¡£ÈçÏ´úÂ룬´´½¨submitJobFileÎļþ²¢Ð´ÈëjobÅäÖÃÊý¾Ý£º
... 02 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); 03 jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); 04 ... 05 // Write job file to JobTracker's fs 06 FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); 07 ... 08 try { 09 jobCopy.writeXml(out); 10 } finally { 11 out.close(); 12 }
|
Ç°ÃæÒѾ֪µÀJobÌύĿ¼£¬ÎÒÃǺÜÈÝÒ×¾ÍÄܵõ½job.xmlÎļþµÄ´æ´¢Â·¾¶£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml¡£
×îºó£¬ËùÓеÄÊý¾Ý¶¼ÒѾ׼±¸Íê³É£¬JobClient¾Í¿ÉÒÔ»ùÓÚJobSubmissionProtocolÐÒé·½·¨submitJob£¬Ìá½»Jobµ½JobTrackerÔËÐС£
|