您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
MapReduce V1:Job提交流程之JobClient端分析
 
来源:CSDN 发布于 2015-12-14
  1935  次浏览      
 

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。

MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient、JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程。下图是《Hadoop权威指南》一书给出的MapReduce V1处理Job的抽象流程图:

如上图,我们展开阴影部分的处理逻辑,详细分析Job提交在JobClient端的具体流程。

在编写好MapReduce程序以后,需要将Job提交给JobTracker,那么我们就需要了解在提交Job的过程中,在JobClient端都做了哪些工作,或者说执行了哪些处理。在JobClient端提交Job的处理流程,如下图所示:

上图所描述的Job的提交流程,说明如下所示:

在MR程序中创建一个Job实例,设置Job状态

创建一个JobClient实例,准备将创建的Job实例提交到JobTracker

在创建JobClient的过程中,首先必须保证建立到JobTracker的RPC连接

基于JobSubmissionProtocol协议远程调用JobTracker获取一个新的Job ID

根据MR程序中配置的Job,在HDFS上创建Job相关目录,并将配置的tmpfiles、tmpjars、tmparchives,以及Job对应jar文件等资源复制到HDFS

根据Job配置的InputFormat,计算该Job输入的Split信息和元数据(SplitMetaInfo)信息,以及计算出map和reduce的个数,最后将这些信息连通Job配置写入到HDFS(保证JobTracker能够读取)

通过JobClient基于JobSubmissionProtocol协议方法submitJob提交Job到JobTracker

MR程序创建Job

下面的MR程序示例代码,已经很熟悉了:

public static void main(String[] args) throws Exception {
02
Configuration conf = new Configuration();
03
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
04
if (otherArgs.length != 2) {
05
System.err.println("Usage: wordcount <in> <out>");
06
System.exit(2);
07
}
08
Job job = new Job(conf, "word count");
09
job.setJarByClass(WordCount.class);
10
job.setMapperClass(TokenizerMapper.class);
11
job.setCombinerClass(IntSumReducer.class);
12
job.setReducerClass(IntSumReducer.class);
13
job.setOutputKeyClass(Text.class);
14
job.setOutputValueClass(IntWritable.class);
15
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
16
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
17
System.exit(job.waitForCompletion(true) ? 0 : 1);
18
}

在MR程序中,首先创建一个Job,并进行配置,然后通过调用Job的waitForCompletion方法将Job提交到MapReduce集群。这个过程中,Job存在两种状态:Job.JobState.DEFINE和Job.JobState.RUNNING,创建一个Job后,该Job的状态为Job.JobState.DEFINE,Job内部通过JobClient基于org.apache.hadoop.mapred.JobSubmissionProtocol协议提交给JobTracker,然后该Job的状态变为Job.JobState.RUNNING。

Job提交目录submitJobDir

通过如下代码可以看到,Job提交目录是如何创建的:

JobConf jobCopy = job;
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); // 获取到StagingArea目录
JobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(jobStagingArea, jobId.toString());

获取StagingArea目录,JobClient需要通过JobSubmissionProtocol协议的远程方法getStagingAreaDir从JobTracker端获取到,我们看一下JobTracker端的getStagingAreaDirInternal方法,如下所示:

private String getStagingAreaDirInternal(String user) throws IOException {
2
final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging"));
3
final FileSystem fs = stagingRootDir.getFileSystem(conf);
4
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
5
}

最终获取到的StagingArea目录为${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/,例如,如果使用默认的mapreduce.jobtracker.staging.root.dir值,用户为shirdrn,则StagingArea目录/tmp/hadoop/mapred/staging/shirdrn/.staging/。通过Path submitJobDir = new Path(jobStagingArea, jobId.toString());可以得到submitJobDir,假如一个job的ID为job_200912121733_0002,则submitJobDir的值为/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/

拷贝资源文件

在配置Job的时候,可以指定tmpfiles、tmpjars、tmparchives,JobClient会将对应的资源文件拷贝到指定的目录中,对应目录如下代码所示:

Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
2
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
3
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
4
...
5
Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
6
job.setJar(submitJarFile.toString());
7
fs.copyFromLocalFile(originalJarFile, submitJarFile);

上面已经知道Job提交目录,可以分别得到对应的资源所在目录:

tmpfiles目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files

tmpjars目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars

tmparchives目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives

Job Jar文件:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar

然后,就可以将对应的资源文件拷贝到对应的目录中。

计算并存储Split数据

根据Job配置中设置的InputFormat,计算该Job的数据数据文件是如何进行分片的,代码如下所示:

Configuration conf = job.getConfiguration();
2
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
3
List<InputSplit> splits = input.getSplits(job);

实际上就是调用InputFormat的getSplits方法,如果不适用Hadoop自带的FileInputFormat的默认getSplits方法实现,可以自定义实现,重写该默认实现逻辑来定义数据数据文件分片的规则。
计算出输入文件的分片信息,然后需要将这些分片数据写入到HDFS供JobTracker查询初始化MapTask,写入分片数据的实现代码:

T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
2
// sort the splits into order based on size, so that the biggest
3
// go first
4
Arrays.sort(array, new SplitComparator()); // 根据InputSplit的长度做了一个逆序排序
5
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); // 将split及其元数据信息写入HDFS

接着调用JobSplitWriter.createSplitFiles方法存储Split信息,并创建元数据信息,并保存元数据信息。存储Split信息,代码实现如下所示:

SerializationFactory factory = new SerializationFactory(conf);
02
int i = 0;
03
long offset = out.getPos();
04
for(T split: array) {
05
long prevCount = out.getPos();
06
Text.writeString(out, split.getClass().getName());
07
Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
08
serializer.open(out);
09
serializer.serialize(split); // 将split序列化写入到HDFS文件中
10
long currCount = out.getPos();
11
String[] locations = split.getLocations();
12
final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
13
if (locations.length > max_loc) {
14
LOG.warn("Max block location exceeded for split: "+ split + " splitsize: " + locations.length + " maxsize: " + max_loc);
15
locations = Arrays.copyOf(locations, max_loc);
16
}
17
info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); // 创建SplitMetaInfo实例
18
offset += currCount - prevCount;
19
}

我们先看一下FileSplit包含的分片内容,如下所示:

private Path file;
2
private long start;
3
private long length;
4
private String[] hosts;

在序列化保存FileSplit到HDFS,可以通过查看FileSplit的write方法,如下所示:

@Override
2
public void write(DataOutput out) throws IOException {
3
Text.writeString(out, file.toString());
4
out.writeLong(start);
5
out.writeLong(length);
6
}

需要注意的是,这里面并没有将FileSplit的hosts信息保存,而是存储到了SplitMetaInfo中new JobSplit.SplitMetaInfo(locations, offset, split.getLength())。

下面是保存SplitMetaInfo信息的实现:

private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
02
FsPermission p, int splitMetaInfoVersion,
03
JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException {
04
// write the splits meta-info to a file for the job tracker
05
FSDataOutputStream out = FileSystem.create(fs, filename, p);
06
out.write(JobSplit.META_SPLIT_FILE_HEADER); // 写入META头信息:META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
07
WritableUtils.writeVInt(out, splitMetaInfoVersion); // META版本信息:1
08
WritableUtils.writeVInt(out, allSplitMetaInfo.length); // META对象的数量:每个InputSplit对应一个SplitMetaInfo
09
for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
10
splitMetaInfo.write(out); // 每个都进行存储
11
}
12
out.close();
13
}

看一下SplitMetaInfo存储时包含的数据信息:

public void write(DataOutput out) throws IOException {
2
WritableUtils.writeVInt(out, locations.length); // location个数
3
for (int i = 0; i < locations.length; i++) {
4
Text.writeString(out, locations[i]); // 写入每一个location位置信息
5
}
6
WritableUtils.writeVLong(out, startOffset); // 偏移量
7
WritableUtils.writeVLong(out, inputDataLength); // 数据长度
8
}

最后,我们看一下这些数据保存的目录和文件情况。前面已经知道Job提交目录,下面看split存储的文件是如何构建的:

FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
2
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);

那么split保存的文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split。

同样,split元数据信息文件构建如下所示:

writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
2
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);

split元数据信息文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo。

保存Job配置数据

在提交Job到JobTracker之前,还需要保存Job的配置信息,这些配置数据根据用户在MR程序中配置,覆盖默认的配置值,最后保存到XML文件(job.xml)到HDFS,供JobTracker查询。如下代码,创建submitJobFile文件并写入job配置数据:

...
02
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
03
jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
04
...
05
// Write job file to JobTracker's fs
06
FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
07
...
08
try {
09
jobCopy.writeXml(out);
10
} finally {
11
out.close();
12
}

前面已经知道Job提交目录,我们很容易就能得到job.xml文件的存储路径:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml。

最后,所有的数据都已经准备完成,JobClient就可以基于JobSubmissionProtocol协议方法submitJob,提交Job到JobTracker运行。

   
1935 次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   

并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理

GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...