您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
MapReduce V1:Job提交流程之JobTracker端分析
 
作者:Yanjun 来源:简单之美 发布于 2015-12-29
  2201  次浏览      16
 

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient、JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程。

上一篇我们分析了Job提交过程中JobClient端的处理流程(详见文章 MapReduce V1:Job提交流程之JobClient端分析),这里我们继续详细分析Job提交在JobTracker端的具体流程。通过阅读源码可以发现,这部分的处理逻辑还是有点复杂,经过梳理,更加细化清晰的流程,如下图所示:

上图中主要分为两大部分:一部分是JobClient基于RPC调用提交Job到JobTracker后,在JobTracker端触发TaskScheduler所注册的一系列Listener进行Job信息初始化;另一部分是JobTracker端监听Job队列的线程,监听到Job状态发生变更触发一系列Listener更新状态。我们从这两个方面展开分析:

JobTracker接收Job提交

JobTracker接收到JobClient提交的Job,在JobTracker端具体执行流程,描述如下:

JobClient基于JobSubmissionProtocol协议远程调用JobTracker的submitJob方法提交Job

JobTracker接收提交的Job,创建一个JobInProgress对象,将其放入内部维护的Map<JobID, JobInProgress> jobs队列中

触发JobQueueJobInProgressListener

执行JobQueueJobInProgressListener的jobAdded方法,创建JobSchedulingInfo对象,并放入到JobQueueJobInProgressListener内部维护的Map<JobSchedulingInfo, JobInProgress> jobQueue队列中

触发EagerTaskInitializationListener

执行EagerTaskInitializationListener的jobAdded方法,将JobInProgress对象加入到List<JobInProgress> jobInitQueue队列中

在JobTracker端使用TaskScheduler进行Job/Task的调度,可以通过mapred.jobtracker.taskScheduler配置所使用的TaskScheduler实现类,默认使用的实现类JobQueueTaskScheduler,如下所示:

// Create the scheduler
2
Class<? extends TaskScheduler> schedulerClass
3
= conf.getClass("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class, TaskScheduler.class);
4
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);

如果想要使用其他的TaskScheduler实现,可以在mapred-site.xml中配置mapred.jobtracker.taskScheduler的属性值,覆盖默认的调度策略即可。

在JobQueueTaskScheduler实现类中,注册了2个JobInProgressListener,JobInProgressListener是用来监听由JobClient端提交后在JobTracker端Job(在JobTracker端维护的JobInProgress)生命周期变化,并触发相应事件(jobAdded/jobUpdated/jobRemoved)的,如下所示:

01
protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
02
protected EagerTaskInitializationListener eagerTaskInitializationListener;
03
private float padFraction;
04

05
public JobQueueTaskScheduler() {
06
this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
07
}
08

09
@Override
10
public synchronized void start() throws IOException {
11
super.start();
12
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); // taskTrackerManager是JobTracker的引用
13
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
14
eagerTaskInitializationListener.start();
15
taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener);
16
}

JobTracker维护一个List<JobInProgressListener> jobInProgressListeners队列,在TaskScheduler(默认JobQueueTaskScheduler )启动的时候向JobTracker注册。在JobClient提交Job后,在JobTracker段创建一个对应的JobInProgress对象,并将其放入到jobs队列后,触发这一组JobInProgressListener的jobAdded方法。

JobTracker管理Job提交

JobTracker接收到提交的Job后,需要对提交的Job进行初始化操作,具体流程如下所示:

EagerTaskInitializationListener.JobInitManager线程监控EagerTaskInitializationListener内部的List<JobInProgress> jobInitQueue队列

加载一个EagerTaskInitializationListener.InitJob线程去初始化Job

在EagerTaskInitializationListener.InitJob线程中,调用JobTracker的initJob方法初始化Job

调用JobInProgress的initTasks方法初始化该Job对应的Tasks

从HDFS读取该Job对应的splits信息,创建MapTask和ReduceTask(在JobTracker端维护的Task实际上是TaskInProgress)

Job状态变更,触发JobQueueJobInProgressListener

如果Job优先级(Priority)/开始时间发生变更,则对Map<JobSchedulingInfo, JobInProgress> jobQueue队列进行重新排序;如果Job完成,则将Job从jobQueue队列中移除

Job状态变更,触发EagerTaskInitializationListener

如果Job优先级(Priority)/开始时间发生变更,则对List<JobInProgress> jobInitQueue队列进行重新排序

下面,我们分析的Job初始化,以及Task初始化,都是在JobTracker端执行的工作,主要是为了管理Job和Task的运行,创建了对应的数据结构,Job对应JobInProgress,Task对应TaskInProgress。我们分析说明如下:

Job初始化

JobTracker接收到JobClient提交的Job,在放到JobTracker的Map<JobID, JobInProgress> jobs队列后,触发2个JobInProgressListener执行jobAdded方法,首先会放到EagerTaskInitializationListener的List<JobInProgress> jobInitQueue队列中。在EagerTaskInitializationListener内部,有一个内部线程类JobInitManager在监控jobInitQueue队列,如果有新的JobInProgress对象加入到队列,则取出并启动一个新的初始化线程InitJob去初始化该Job,代码如下所示:

class JobInitManager implements Runnable {
02

03
public void run() {
04
JobInProgress job = null;
05
while (true) {
06
try {
07
synchronized (jobInitQueue) {
08
while (jobInitQueue.isEmpty()) {
09
jobInitQueue.wait();
10
}
11
job = jobInitQueue.remove(0); // 取出JobInProgress
12
}
13
threadPool.execute(new InitJob(job)); // 创建一个InitJob线程去初始化该JobInProgress
14
} catch (InterruptedException t) {
15
LOG.info("JobInitManagerThread interrupted.");
16
break;
17
}
18
}
19
LOG.info("Shutting down thread pool");
20
threadPool.shutdownNow();
21
}
22
}

然后,在InitJob线程中,调用JobTracker的initJob方法初始化Job,如下所示:

class InitJob implements Runnable {
02

03
private JobInProgress job;
04

05
public InitJob(JobInProgress job) {
06
this.job = job;
07
}
08

09
public void run() {
10
ttm.initJob(job); // TaskTrackerManager ttm,调用JobTracker的initJob方法初始化
11
}
12
}

JobTracker中的initJob方法的主要逻辑,如下所示:

JobStatus prevStatus = (JobStatus)job.getStatus().clone();
02
LOG.info("Initializing " + job.getJobID());
03
job.initTasks(); // 调用JobInProgress的initTasks方法初始化Task
04
// Inform the listeners if the job state has changed
05
// Note : that the job will be in PREP state.
06
JobStatus newStatus = (JobStatus)job.getStatus().clone();
07
if (prevStatus.getRunState() != newStatus.getRunState()) {
08
JobStatusChangeEvent event =
09
new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
10
newStatus);
11
synchronized (JobTracker.this) {
12
updateJobInProgressListeners(event); // 更新Job相关队列的状态
13
}
14
}

实际上,在JobTracker中的initJob方法中最核心的逻辑,就是初始化组成该Job的MapTask和ReduceTask,它们在JobTracker端都抽象为TaskInProgress。

初始化Task

在JobClient提交Job的过程中,已经将该Job所对应的资源复制到HDFS,在JobTracker端需要读取这些信息来创建MapTask和ReduceTask。我们回顾一下:默认情况下,split和对应的元数据存储路径分别为/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.split和/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.splitmetainfo,在创建MapTask和ReduceTask只需要split的元数据信息即可,我们看一下job.splitmetainfo文件存储的数据格式如下图所示:

上图中,META_SPLIT_FILE_HEADER的值为META-SPL,版本version的值为1,numSplits的值根据实际Job输入split大小计算的到,SplitMetaInfo包括的信息为split所存放的节点位置个数、所有的节点位置信息、split在文件中的起始偏移量、split数据的长度。有了这些描述信息,JobTracker就可以知道一个Job需要创建几个MapTask,实现代码如下所示:

TaskSplitMetaInfo[] splits = createSplits(jobId);
2
...
3
numMapTasks = splits.length;
4
...
5
maps = new TaskInProgress[numMapTasks]; // MapTask在JobTracker的表示为TaskInProgress
6
for(int i=0; i < numMapTasks; ++i) {
7
inputLength += splits[i].getInputDataLength();
8
maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i, numSlotsPerMap);
9
}

而ReduceTask的个数,根据用户在配置Job时指定的Reduce的个数,创建ReduceTask的代码,如下所示:

//
2
// Create reduce tasks
3
//
4
this.reduces = new TaskInProgress[numReduceTasks];
5
for (int i = 0; i < numReduceTasks; i++) {
6
reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this, numSlotsPerReduce);
7
nonRunningReduces.add(reduces[i]);
8
}

除了创建MapTask和ReduceTask之外,还会创建setup和cleanup task,每个Job的MapTask和ReduceTask各对应一个,即共计2个setup task和2个cleanup task。setup task用来初始化MapTask/ReduceTask,而cleanup task用来清理MapTask/ReduceTask。创建setup和cleanup task,代码如下所示:

// create cleanup two cleanup tips, one map and one reduce.
02
cleanup = new TaskInProgress[2]; // cleanup task,map对应一个,reduce对应一个
03

04
// cleanup map tip. This map doesn't use any splits. Just assign an empty split.
05
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
06
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1);
07
cleanup[0].setJobCleanupTask();
08

09
// cleanup reduce tip.
10
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1);
11
cleanup[1].setJobCleanupTask();
12

13
// create two setup tips, one map and one reduce.
14
setup = new TaskInProgress[2]; // setup task,map对应一个,reduce对应一个
15

16
// setup map tip. This map doesn't use any split. Just assign an empty split.
17
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1);
18
setup[0].setJobSetupTask();
19

20
// setup reduce tip.
21
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1);
22
setup[1].setJobSetupTask();

一个Job在JobInProgress中进行初始化Task,这里初始化Task使得该Job满足被调度的要求,比如,知道一个Job有哪些Task组成,每个Task对应哪个split等等。在初始化完成后,置一个Task初始化完成标志,如下所示:

synchronized(jobInitKillStatus){
02
jobInitKillStatus.initDone = true;
03

04
// set this before the throw to make sure cleanup works properly
05
tasksInited = true; // 置Task初始化完成标志
06

07
if(jobInitKillStatus.killed) {
08
throw new KillInterruptedException("Job " + jobId + " killed in init");
09
}
10
}

在置tasksInited = true;后,该JobInProgress就可以被TaskScheduler进行调度了,调度时,是以Task(MapTask/ReduceTask)为单位分派给TaskTracker。而对于哪些TaskTracker可以运行Task,需要通过TaskTracker向JobTracker周期性发送的心跳得到TaskTracker的健康状况信息、节点资源信息等来确定,是否该TaskTracker可以运行一个Job的一个或多个Task。

   
2201 次浏览       16
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划
信息架构建模(基于UML+EA)3-21[北京]
软件架构设计师 3-21[北京]
图数据库与知识图谱 3-25[北京]
业务架构设计 4-11[北京]
SysML和EA系统设计与建模 4-22[北京]
DoDAF规范、模型与实例 5-23[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   

并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理

GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...