Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
MapReduce V1£ºJobÌá½»Á÷³ÌÖ®JobClient¶Ë·ÖÎö
 
À´Ô´£ºCSDN ·¢²¼ÓÚ 2015-12-14
  2788  次浏览      28
 

ÎÒÃÇ»ùÓÚHadoop 1.2.1Ô´Âë·ÖÎöMapReduce V1µÄ´¦ÀíÁ÷³Ì¡£

MapReduce V1ʵÏÖÖУ¬Ö÷Òª´æÔÚ3¸öÖ÷ÒªµÄ·Ö²¼Ê½½ø³Ì£¨½ÇÉ«£©£ºJobClient¡¢JobTrackerºÍTaskTracker£¬ÎÒÃÇÖ÷ÒªÊÇÒÔÕâÈý¸ö½ÇÉ«µÄʵ¼Ê´¦Àí»î¶¯ÎªÖ÷Ïߣ¬²¢½áºÏÔ´Â룬·ÖÎöʵ¼Ê´¦ÀíÁ÷³Ì¡£ÏÂͼÊÇ¡¶HadoopȨÍþÖ¸ÄÏ¡·Ò»Ê鏸³öµÄMapReduce V1´¦ÀíJobµÄ³éÏóÁ÷³Ìͼ£º

ÈçÉÏͼ£¬ÎÒÃÇÕ¹¿ªÒõÓ°²¿·ÖµÄ´¦ÀíÂß¼­£¬Ïêϸ·ÖÎöJobÌá½»ÔÚJobClient¶ËµÄ¾ßÌåÁ÷³Ì¡£

ÔÚ±àдºÃMapReduce³ÌÐòÒÔºó£¬ÐèÒª½«JobÌá½»¸øJobTracker£¬ÄÇôÎÒÃǾÍÐèÒªÁ˽âÔÚÌá½»JobµÄ¹ý³ÌÖУ¬ÔÚJobClient¶Ë¶¼×öÁËÄÄЩ¹¤×÷£¬»òÕß˵ִÐÐÁËÄÄЩ´¦Àí¡£ÔÚJobClient¶ËÌá½»JobµÄ´¦ÀíÁ÷³Ì£¬ÈçÏÂͼËùʾ£º

ÉÏͼËùÃèÊöµÄJobµÄÌá½»Á÷³Ì£¬ËµÃ÷ÈçÏÂËùʾ£º

ÔÚMR³ÌÐòÖд´½¨Ò»¸öJobʵÀý£¬ÉèÖÃJob״̬

´´½¨Ò»¸öJobClientʵÀý£¬×¼±¸½«´´½¨µÄJobʵÀýÌá½»µ½JobTracker

ÔÚ´´½¨JobClientµÄ¹ý³ÌÖУ¬Ê×ÏȱØÐë±£Ö¤½¨Á¢µ½JobTrackerµÄRPCÁ¬½Ó

»ùÓÚJobSubmissionProtocolЭÒéÔ¶³Ìµ÷ÓÃJobTracker»ñȡһ¸öеÄJob ID

¸ù¾ÝMR³ÌÐòÖÐÅäÖõÄJob£¬ÔÚHDFSÉÏ´´½¨JobÏà¹ØÄ¿Â¼£¬²¢½«ÅäÖõÄtmpfiles¡¢tmpjars¡¢tmparchives£¬ÒÔ¼°Job¶ÔÓ¦jarÎļþµÈ×ÊÔ´¸´ÖƵ½HDFS

¸ù¾ÝJobÅäÖõÄInputFormat£¬¼ÆËã¸ÃJobÊäÈëµÄSplitÐÅÏ¢ºÍÔªÊý¾Ý£¨SplitMetaInfo£©ÐÅÏ¢£¬ÒÔ¼°¼ÆËã³ömapºÍreduceµÄ¸öÊý£¬×îºó½«ÕâЩÐÅÏ¢Á¬Í¨JobÅäÖÃдÈëµ½HDFS£¨±£Ö¤JobTrackerÄܹ»¶ÁÈ¡£©

ͨ¹ýJobClient»ùÓÚJobSubmissionProtocolЭÒé·½·¨submitJobÌá½»Jobµ½JobTracker

MR³ÌÐò´´½¨Job

ÏÂÃæµÄMR³ÌÐòʾÀý´úÂ룬ÒѾ­ºÜÊìϤÁË£º

public static void main(String[] args) throws Exception {
02
Configuration conf = new Configuration();
03
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
04
if (otherArgs.length != 2) {
05
System.err.println("Usage: wordcount <in> <out>");
06
System.exit(2);
07
}
08
Job job = new Job(conf, "word count");
09
job.setJarByClass(WordCount.class);
10
job.setMapperClass(TokenizerMapper.class);
11
job.setCombinerClass(IntSumReducer.class);
12
job.setReducerClass(IntSumReducer.class);
13
job.setOutputKeyClass(Text.class);
14
job.setOutputValueClass(IntWritable.class);
15
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
16
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
17
System.exit(job.waitForCompletion(true) ? 0 : 1);
18
}

ÔÚMR³ÌÐòÖУ¬Ê×ÏÈ´´½¨Ò»¸öJob£¬²¢½øÐÐÅäÖã¬È»ºóͨ¹ýµ÷ÓÃJobµÄwaitForCompletion·½·¨½«JobÌá½»µ½MapReduce¼¯Èº¡£Õâ¸ö¹ý³ÌÖУ¬Job´æÔÚÁ½ÖÖ״̬£ºJob.JobState.DEFINEºÍJob.JobState.RUNNING£¬´´½¨Ò»¸öJobºó£¬¸ÃJobµÄ״̬ΪJob.JobState.DEFINE£¬JobÄÚ²¿Í¨¹ýJobClient»ùÓÚorg.apache.hadoop.mapred.JobSubmissionProtocolЭÒéÌá½»¸øJobTracker£¬È»ºó¸ÃJobµÄ״̬±äΪJob.JobState.RUNNING¡£

JobÌύĿ¼submitJobDir

ͨ¹ýÈçÏ´úÂë¿ÉÒÔ¿´µ½£¬JobÌύĿ¼ÊÇÈçºÎ´´½¨µÄ£º

JobConf jobCopy = job;
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); // »ñÈ¡µ½StagingAreaĿ¼
JobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(jobStagingArea, jobId.toString());

»ñÈ¡StagingAreaĿ¼£¬JobClientÐèҪͨ¹ýJobSubmissionProtocolЭÒéµÄÔ¶³Ì·½·¨getStagingAreaDir´ÓJobTracker¶Ë»ñÈ¡µ½£¬ÎÒÃÇ¿´Ò»ÏÂJobTracker¶ËµÄgetStagingAreaDirInternal·½·¨£¬ÈçÏÂËùʾ£º

private String getStagingAreaDirInternal(String user) throws IOException {
2
final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging"));
3
final FileSystem fs = stagingRootDir.getFileSystem(conf);
4
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
5
}

×îÖÕ»ñÈ¡µ½µÄStagingAreaĿ¼Ϊ${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/£¬ÀýÈ磬Èç¹ûʹÓÃĬÈϵÄmapreduce.jobtracker.staging.root.dirÖµ£¬Óû§Îªshirdrn£¬ÔòStagingAreaĿ¼/tmp/hadoop/mapred/staging/shirdrn/.staging/¡£Í¨¹ýPath submitJobDir = new Path(jobStagingArea, jobId.toString());¿ÉÒԵõ½submitJobDir£¬¼ÙÈçÒ»¸öjobµÄIDΪjob_200912121733_0002£¬ÔòsubmitJobDirµÄֵΪ/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/

¿½±´×ÊÔ´Îļþ

ÔÚÅäÖÃJobµÄʱºò£¬¿ÉÒÔÖ¸¶¨tmpfiles¡¢tmpjars¡¢tmparchives£¬JobClient»á½«¶ÔÓ¦µÄ×ÊÔ´Îļþ¿½±´µ½Ö¸¶¨µÄĿ¼ÖУ¬¶ÔӦĿ¼ÈçÏ´úÂëËùʾ£º

Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
2
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
3
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
4
...
5
Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
6
job.setJar(submitJarFile.toString());
7
fs.copyFromLocalFile(originalJarFile, submitJarFile);

ÉÏÃæÒѾ­ÖªµÀJobÌύĿ¼£¬¿ÉÒÔ·Ö±ðµÃµ½¶ÔÓ¦µÄ×ÊÔ´ËùÔÚĿ¼£º

tmpfilesĿ¼£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files

tmpjarsĿ¼£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars

tmparchivesĿ¼£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives

Job JarÎļþ£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar

È»ºó£¬¾Í¿ÉÒÔ½«¶ÔÓ¦µÄ×ÊÔ´Îļþ¿½±´µ½¶ÔÓ¦µÄĿ¼ÖС£

¼ÆËã²¢´æ´¢SplitÊý¾Ý

¸ù¾ÝJobÅäÖÃÖÐÉèÖõÄInputFormat£¬¼ÆËã¸ÃJobµÄÊý¾ÝÊý¾ÝÎļþÊÇÈçºÎ½øÐÐ·ÖÆ¬µÄ£¬´úÂëÈçÏÂËùʾ£º

Configuration conf = job.getConfiguration();
2
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
3
List<InputSplit> splits = input.getSplits(job);

ʵ¼ÊÉϾÍÊǵ÷ÓÃInputFormatµÄgetSplits·½·¨£¬Èç¹û²»ÊÊÓÃHadoop×Ô´øµÄFileInputFormatµÄĬÈÏgetSplits·½·¨ÊµÏÖ£¬¿ÉÒÔ×Ô¶¨ÒåʵÏÖ£¬ÖØÐ´¸ÃĬÈÏʵÏÖÂß¼­À´¶¨ÒåÊý¾ÝÊý¾ÝÎļþ·ÖƬµÄ¹æÔò¡£
¼ÆËã³öÊäÈëÎļþµÄ·ÖƬÐÅÏ¢£¬È»ºóÐèÒª½«ÕâЩ·ÖƬÊý¾ÝдÈëµ½HDFS¹©JobTracker²éѯ³õʼ»¯MapTask£¬Ð´Èë·ÖƬÊý¾ÝµÄʵÏÖ´úÂ룺

T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
2
// sort the splits into order based on size, so that the biggest
3
// go first
4
Arrays.sort(array, new SplitComparator()); // ¸ù¾ÝInputSplitµÄ³¤¶È×öÁËÒ»¸öÄæÐòÅÅÐò
5
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); // ½«split¼°ÆäÔªÊý¾ÝÐÅϢдÈëHDFS

½Ó×ŵ÷ÓÃJobSplitWriter.createSplitFiles·½·¨´æ´¢SplitÐÅÏ¢£¬²¢´´½¨ÔªÊý¾ÝÐÅÏ¢£¬²¢±£´æÔªÊý¾ÝÐÅÏ¢¡£´æ´¢SplitÐÅÏ¢£¬´úÂëʵÏÖÈçÏÂËùʾ£º

SerializationFactory factory = new SerializationFactory(conf);
02
int i = 0;
03
long offset = out.getPos();
04
for(T split: array) {
05
long prevCount = out.getPos();
06
Text.writeString(out, split.getClass().getName());
07
Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
08
serializer.open(out);
09
serializer.serialize(split); // ½«splitÐòÁл¯Ð´Èëµ½HDFSÎļþÖÐ
10
long currCount = out.getPos();
11
String[] locations = split.getLocations();
12
final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
13
if (locations.length > max_loc) {
14
LOG.warn("Max block location exceeded for split: "+ split + " splitsize: " + locations.length + " maxsize: " + max_loc);
15
locations = Arrays.copyOf(locations, max_loc);
16
}
17
info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); // ´´½¨SplitMetaInfoʵÀý
18
offset += currCount - prevCount;
19
}

ÎÒÃÇÏÈ¿´Ò»ÏÂFileSplit°üº¬µÄ·ÖƬÄÚÈÝ£¬ÈçÏÂËùʾ£º

private Path file;
2
private long start;
3
private long length;
4
private String[] hosts;

ÔÚÐòÁл¯±£´æFileSplitµ½HDFS£¬¿ÉÒÔͨ¹ý²é¿´FileSplitµÄwrite·½·¨£¬ÈçÏÂËùʾ£º

@Override
2
public void write(DataOutput out) throws IOException {
3
Text.writeString(out, file.toString());
4
out.writeLong(start);
5
out.writeLong(length);
6
}

ÐèҪעÒâµÄÊÇ£¬ÕâÀïÃæ²¢Ã»Óн«FileSplitµÄhostsÐÅÏ¢±£´æ£¬¶øÊÇ´æ´¢µ½ÁËSplitMetaInfoÖÐnew JobSplit.SplitMetaInfo(locations, offset, split.getLength())¡£

ÏÂÃæÊDZ£´æSplitMetaInfoÐÅÏ¢µÄʵÏÖ£º

private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
02
FsPermission p, int splitMetaInfoVersion,
03
JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException {
04
// write the splits meta-info to a file for the job tracker
05
FSDataOutputStream out = FileSystem.create(fs, filename, p);
06
out.write(JobSplit.META_SPLIT_FILE_HEADER); // дÈëMETAÍ·ÐÅÏ¢£ºMETA_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
07
WritableUtils.writeVInt(out, splitMetaInfoVersion); // META°æ±¾ÐÅÏ¢£º1
08
WritableUtils.writeVInt(out, allSplitMetaInfo.length); // META¶ÔÏóµÄÊýÁ¿£ºÃ¿¸öInputSplit¶ÔÓ¦Ò»¸öSplitMetaInfo
09
for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
10
splitMetaInfo.write(out); // ÿ¸ö¶¼½øÐд洢
11
}
12
out.close();
13
}

¿´Ò»ÏÂSplitMetaInfo´æ´¢Ê±°üº¬µÄÊý¾ÝÐÅÏ¢£º

public void write(DataOutput out) throws IOException {
2
WritableUtils.writeVInt(out, locations.length); // location¸öÊý
3
for (int i = 0; i < locations.length; i++) {
4
Text.writeString(out, locations[i]); // дÈëÿһ¸ölocationλÖÃÐÅÏ¢
5
}
6
WritableUtils.writeVLong(out, startOffset); // Æ«ÒÆÁ¿
7
WritableUtils.writeVLong(out, inputDataLength); // Êý¾Ý³¤¶È
8
}

×îºó£¬ÎÒÃÇ¿´Ò»ÏÂÕâЩÊý¾Ý±£´æµÄĿ¼ºÍÎļþÇé¿ö¡£Ç°ÃæÒѾ­ÖªµÀJobÌύĿ¼£¬ÏÂÃæ¿´split´æ´¢µÄÎļþÊÇÈçºÎ¹¹½¨µÄ£º

FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
2
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);

ÄÇôsplit±£´æµÄÎļþΪ£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split¡£

ͬÑù£¬splitÔªÊý¾ÝÐÅÏ¢Îļþ¹¹½¨ÈçÏÂËùʾ£º

writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
2
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);

splitÔªÊý¾ÝÐÅÏ¢ÎļþΪ£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo¡£

±£´æJobÅäÖÃÊý¾Ý

ÔÚÌá½»Jobµ½JobTracker֮ǰ£¬»¹ÐèÒª±£´æJobµÄÅäÖÃÐÅÏ¢£¬ÕâЩÅäÖÃÊý¾Ý¸ù¾ÝÓû§ÔÚMR³ÌÐòÖÐÅäÖ㬸²¸ÇĬÈϵÄÅäÖÃÖµ£¬×îºó±£´æµ½XMLÎļþ£¨job.xml£©µ½HDFS£¬¹©JobTracker²éѯ¡£ÈçÏ´úÂ룬´´½¨submitJobFileÎļþ²¢Ð´ÈëjobÅäÖÃÊý¾Ý£º

...
02
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
03
jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
04
...
05
// Write job file to JobTracker's fs
06
FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
07
...
08
try {
09
jobCopy.writeXml(out);
10
} finally {
11
out.close();
12
}

Ç°ÃæÒѾ­ÖªµÀJobÌύĿ¼£¬ÎÒÃǺÜÈÝÒ×¾ÍÄܵõ½job.xmlÎļþµÄ´æ´¢Â·¾¶£º/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml¡£

×îºó£¬ËùÓеÄÊý¾Ý¶¼ÒѾ­×¼±¸Íê³É£¬JobClient¾Í¿ÉÒÔ»ùÓÚJobSubmissionProtocolЭÒé·½·¨submitJob£¬Ìá½»Jobµ½JobTrackerÔËÐС£

   
2788 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ

²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí