ÎÒÃÇ»ùÓÚHadoop
1.2.1Ô´Âë·ÖÎöMapReduce V1µÄ´¦ÀíÁ÷³Ì¡£MapReduce V1ʵÏÖÖУ¬Ö÷Òª´æÔÚ3¸öÖ÷ÒªµÄ·Ö²¼Ê½½ø³Ì£¨½ÇÉ«£©£ºJobClient¡¢JobTrackerºÍTaskTracker£¬ÎÒÃÇÖ÷ÒªÊÇÒÔÕâÈý¸ö½ÇÉ«µÄʵ¼Ê´¦Àí»î¶¯ÎªÖ÷Ïߣ¬²¢½áºÏÔ´Â룬·ÖÎöʵ¼Ê´¦ÀíÁ÷³Ì¡£
ÉÏһƪÎÒÃÇ·ÖÎöÁËJobÌá½»¹ý³ÌÖÐJobClient¶ËµÄ´¦ÀíÁ÷³Ì£¨Ïê¼ûÎÄÕÂ
MapReduce V1£ºJobÌá½»Á÷³ÌÖ®JobClient¶Ë·ÖÎö£©£¬ÕâÀïÎÒÃǼÌÐøÏêϸ·ÖÎöJobÌá½»ÔÚJobTracker¶ËµÄ¾ßÌåÁ÷³Ì¡£Í¨¹ýÔĶÁÔ´Âë¿ÉÒÔ·¢ÏÖ£¬Õⲿ·ÖµÄ´¦ÀíÂß¼»¹ÊÇÓе㸴ÔÓ£¬¾¹ýÊáÀí£¬¸ü¼Óϸ»¯ÇåÎúµÄÁ÷³Ì£¬ÈçÏÂͼËùʾ£º

ÉÏͼÖÐÖ÷Òª·ÖΪÁ½´ó²¿·Ö£ºÒ»²¿·ÖÊÇJobClient»ùÓÚRPCµ÷ÓÃÌá½»Jobµ½JobTrackerºó£¬ÔÚJobTracker¶Ë´¥·¢TaskSchedulerËù×¢²áµÄһϵÁÐListener½øÐÐJobÐÅÏ¢³õʼ»¯£»ÁíÒ»²¿·ÖÊÇJobTracker¶Ë¼àÌýJob¶ÓÁеÄỊ̈߳¬¼àÌýµ½Job״̬·¢Éú±ä¸ü´¥·¢Ò»ÏµÁÐListener¸üÐÂ״̬¡£ÎÒÃÇ´ÓÕâÁ½¸ö·½ÃæÕ¹¿ª·ÖÎö£º
JobTracker½ÓÊÕJobÌá½»
JobTracker½ÓÊÕµ½JobClientÌá½»µÄJob£¬ÔÚJobTracker¶Ë¾ßÌåÖ´ÐÐÁ÷³Ì£¬ÃèÊöÈçÏ£º
JobClient»ùÓÚJobSubmissionProtocolÐÒéÔ¶³Ìµ÷ÓÃJobTrackerµÄsubmitJob·½·¨Ìá½»Job
JobTracker½ÓÊÕÌá½»µÄJob£¬´´½¨Ò»¸öJobInProgress¶ÔÏ󣬽«Æä·ÅÈëÄÚ²¿Î¬»¤µÄMap<JobID,
JobInProgress> jobs¶ÓÁÐÖÐ
´¥·¢JobQueueJobInProgressListener
Ö´ÐÐJobQueueJobInProgressListenerµÄjobAdded·½·¨£¬´´½¨JobSchedulingInfo¶ÔÏ󣬲¢·ÅÈëµ½JobQueueJobInProgressListenerÄÚ²¿Î¬»¤µÄMap<JobSchedulingInfo,
JobInProgress> jobQueue¶ÓÁÐÖÐ
´¥·¢EagerTaskInitializationListener
Ö´ÐÐEagerTaskInitializationListenerµÄjobAdded·½·¨£¬½«JobInProgress¶ÔÏó¼ÓÈëµ½List<JobInProgress>
jobInitQueue¶ÓÁÐÖÐ
ÔÚJobTracker¶ËʹÓÃTaskScheduler½øÐÐJob/TaskµÄµ÷¶È£¬¿ÉÒÔͨ¹ýmapred.jobtracker.taskSchedulerÅäÖÃËùʹÓõÄTaskSchedulerʵÏÖÀ࣬ĬÈÏʹÓõÄʵÏÖÀàJobQueueTaskScheduler£¬ÈçÏÂËùʾ£º
// Create the scheduler 2 Class<? extends TaskScheduler> schedulerClass 3 = conf.getClass("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class, TaskScheduler.class); 4 taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
|
Èç¹ûÏëҪʹÓÃÆäËûµÄTaskSchedulerʵÏÖ£¬¿ÉÒÔÔÚmapred-site.xmlÖÐÅäÖÃmapred.jobtracker.taskSchedulerµÄÊôÐÔÖµ£¬¸²¸ÇĬÈϵĵ÷¶È²ßÂÔ¼´¿É¡£
ÔÚJobQueueTaskSchedulerʵÏÖÀàÖУ¬×¢²áÁË2¸öJobInProgressListener£¬JobInProgressListenerÊÇÓÃÀ´¼àÌýÓÉJobClient¶ËÌá½»ºóÔÚJobTracker¶ËJob£¨ÔÚJobTracker¶Ëά»¤µÄJobInProgress£©ÉúÃüÖÜÆÚ±ä»¯£¬²¢´¥·¢ÏàӦʼþ£¨jobAdded/jobUpdated/jobRemoved£©µÄ£¬ÈçÏÂËùʾ£º
01 protected JobQueueJobInProgressListener jobQueueJobInProgressListener; 02 protected EagerTaskInitializationListener eagerTaskInitializationListener; 03 private float padFraction; 04 05 public JobQueueTaskScheduler() { 06 this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener(); 07 } 08 09 @Override 10 public synchronized void start() throws IOException { 11 super.start(); 12 taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); // taskTrackerManagerÊÇJobTrackerµÄÒýÓà 13 eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); 14 eagerTaskInitializationListener.start(); 15 taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener); 16 }
|
JobTrackerά»¤Ò»¸öList<JobInProgressListener> jobInProgressListeners¶ÓÁУ¬ÔÚTaskScheduler£¨Ä¬ÈÏJobQueueTaskScheduler
£©Æô¶¯µÄʱºòÏòJobTracker×¢²á¡£ÔÚJobClientÌá½»Jobºó£¬ÔÚJobTracker¶Î´´½¨Ò»¸ö¶ÔÓ¦µÄJobInProgress¶ÔÏ󣬲¢½«Æä·ÅÈëµ½jobs¶ÓÁк󣬴¥·¢ÕâÒ»×éJobInProgressListenerµÄjobAdded·½·¨¡£
JobTracker¹ÜÀíJobÌá½»
JobTracker½ÓÊÕµ½Ìá½»µÄJobºó£¬ÐèÒª¶ÔÌá½»µÄJob½øÐгõʼ»¯²Ù×÷£¬¾ßÌåÁ÷³ÌÈçÏÂËùʾ£º
EagerTaskInitializationListener.JobInitManagerÏß³Ì¼à¿ØEagerTaskInitializationListenerÄÚ²¿µÄList<JobInProgress>
jobInitQueue¶ÓÁÐ
¼ÓÔØÒ»¸öEagerTaskInitializationListener.InitJobÏß³ÌÈ¥³õʼ»¯Job
ÔÚEagerTaskInitializationListener.InitJobÏß³ÌÖУ¬µ÷ÓÃJobTrackerµÄinitJob·½·¨³õʼ»¯Job
µ÷ÓÃJobInProgressµÄinitTasks·½·¨³õʼ»¯¸ÃJob¶ÔÓ¦µÄTasks
´ÓHDFS¶ÁÈ¡¸ÃJob¶ÔÓ¦µÄsplitsÐÅÏ¢£¬´´½¨MapTaskºÍReduceTask£¨ÔÚJobTracker¶Ëά»¤µÄTaskʵ¼ÊÉÏÊÇTaskInProgress£©
Job״̬±ä¸ü£¬´¥·¢JobQueueJobInProgressListener
Èç¹ûJobÓÅÏȼ¶£¨Priority£©/¿ªÊ¼Ê±¼ä·¢Éú±ä¸ü£¬Ôò¶ÔMap<JobSchedulingInfo,
JobInProgress> jobQueue¶ÓÁнøÐÐÖØÐÂÅÅÐò£»Èç¹ûJobÍê³É£¬Ôò½«Job´ÓjobQueue¶ÓÁÐÖÐÒÆ³ý
Job״̬±ä¸ü£¬´¥·¢EagerTaskInitializationListener
Èç¹ûJobÓÅÏȼ¶£¨Priority£©/¿ªÊ¼Ê±¼ä·¢Éú±ä¸ü£¬Ôò¶ÔList<JobInProgress>
jobInitQueue¶ÓÁнøÐÐÖØÐÂÅÅÐò
ÏÂÃæ£¬ÎÒÃÇ·ÖÎöµÄJob³õʼ»¯£¬ÒÔ¼°Task³õʼ»¯£¬¶¼ÊÇÔÚJobTracker¶ËÖ´ÐеŤ×÷£¬Ö÷ÒªÊÇΪÁ˹ÜÀíJobºÍTaskµÄÔËÐУ¬´´½¨Á˶ÔÓ¦µÄÊý¾Ý½á¹¹£¬Job¶ÔÓ¦JobInProgress£¬Task¶ÔÓ¦TaskInProgress¡£ÎÒÃÇ·ÖÎö˵Ã÷ÈçÏ£º
Job³õʼ»¯
JobTracker½ÓÊÕµ½JobClientÌá½»µÄJob£¬Ôڷŵ½JobTrackerµÄMap<JobID,
JobInProgress> jobs¶ÓÁк󣬴¥·¢2¸öJobInProgressListenerÖ´ÐÐjobAdded·½·¨£¬Ê×ÏÈ»á·Åµ½EagerTaskInitializationListenerµÄList<JobInProgress>
jobInitQueue¶ÓÁÐÖС£ÔÚEagerTaskInitializationListenerÄÚ²¿£¬ÓÐÒ»¸öÄÚ²¿Ïß³ÌÀàJobInitManagerÔÚ¼à¿ØjobInitQueue¶ÓÁУ¬Èç¹ûÓÐеÄJobInProgress¶ÔÏó¼ÓÈëµ½¶ÓÁУ¬ÔòÈ¡³ö²¢Æô¶¯Ò»¸öеijõʼ»¯Ïß³ÌInitJobÈ¥³õʼ»¯¸ÃJob£¬´úÂëÈçÏÂËùʾ£º
class JobInitManager implements Runnable { 02 03 public void run() { 04 JobInProgress job = null; 05 while (true) { 06 try { 07 synchronized (jobInitQueue) { 08 while (jobInitQueue.isEmpty()) { 09 jobInitQueue.wait(); 10 } 11 job = jobInitQueue.remove(0); // È¡³öJobInProgress 12 } 13 threadPool.execute(new InitJob(job)); // ´´½¨Ò»¸öInitJobÏß³ÌÈ¥³õʼ»¯¸ÃJobInProgress 14 } catch (InterruptedException t) { 15 LOG.info("JobInitManagerThread interrupted."); 16 break; 17 } 18 } 19 LOG.info("Shutting down thread pool"); 20 threadPool.shutdownNow(); 21 } 22 }
|
È»ºó£¬ÔÚInitJobÏß³ÌÖУ¬µ÷ÓÃJobTrackerµÄinitJob·½·¨³õʼ»¯Job£¬ÈçÏÂËùʾ£º
class InitJob implements Runnable { 02 03 private JobInProgress job; 04 05 public InitJob(JobInProgress job) { 06 this.job = job; 07 } 08 09 public void run() { 10 ttm.initJob(job); // TaskTrackerManager ttm£¬µ÷ÓÃJobTrackerµÄinitJob·½·¨³õʼ»¯ 11 } 12 }
|
JobTrackerÖеÄinitJob·½·¨µÄÖ÷ÒªÂß¼£¬ÈçÏÂËùʾ£º
JobStatus prevStatus = (JobStatus)job.getStatus().clone(); 02 LOG.info("Initializing " + job.getJobID()); 03 job.initTasks(); // µ÷ÓÃJobInProgressµÄinitTasks·½·¨³õʼ»¯Task 04 // Inform the listeners if the job state has changed 05 // Note : that the job will be in PREP state. 06 JobStatus newStatus = (JobStatus)job.getStatus().clone(); 07 if (prevStatus.getRunState() != newStatus.getRunState()) { 08 JobStatusChangeEvent event = 09 new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 10 newStatus); 11 synchronized (JobTracker.this) { 12 updateJobInProgressListeners(event); // ¸üÐÂJobÏà¹Ø¶ÓÁеÄ״̬ 13 } 14 }
|
ʵ¼ÊÉÏ£¬ÔÚJobTrackerÖеÄinitJob·½·¨ÖÐ×îºËÐĵÄÂß¼£¬¾ÍÊdzõʼ»¯×é³É¸ÃJobµÄMapTaskºÍReduceTask£¬ËüÃÇÔÚJobTracker¶Ë¶¼³éÏóΪTaskInProgress¡£
³õʼ»¯Task
ÔÚJobClientÌá½»JobµÄ¹ý³ÌÖУ¬ÒѾ½«¸ÃJobËù¶ÔÓ¦µÄ×ÊÔ´¸´ÖƵ½HDFS£¬ÔÚJobTracker¶ËÐèÒª¶ÁÈ¡ÕâЩÐÅÏ¢À´´´½¨MapTaskºÍReduceTask¡£ÎÒÃǻعËһϣºÄ¬ÈÏÇé¿öÏ£¬splitºÍ¶ÔÓ¦µÄÔªÊý¾Ý´æ´¢Â·¾¶·Ö±ðΪ/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.splitºÍ/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.splitmetainfo£¬ÔÚ´´½¨MapTaskºÍReduceTaskÖ»ÐèÒªsplitµÄÔªÊý¾ÝÐÅÏ¢¼´¿É£¬ÎÒÃÇ¿´Ò»ÏÂjob.splitmetainfoÎļþ´æ´¢µÄÊý¾Ý¸ñʽÈçÏÂͼËùʾ£º

ÉÏͼÖУ¬META_SPLIT_FILE_HEADERµÄֵΪMETA-SPL£¬°æ±¾versionµÄֵΪ1£¬numSplitsµÄÖµ¸ù¾Ýʵ¼ÊJobÊäÈësplit´óС¼ÆËãµÄµ½£¬SplitMetaInfo°üÀ¨µÄÐÅϢΪsplitËù´æ·ÅµÄ½ÚµãλÖøöÊý¡¢ËùÓеĽڵãλÖÃÐÅÏ¢¡¢splitÔÚÎļþÖÐµÄÆðÊ¼Æ«ÒÆÁ¿¡¢splitÊý¾ÝµÄ³¤¶È¡£ÓÐÁËÕâЩÃèÊöÐÅÏ¢£¬JobTracker¾Í¿ÉÒÔÖªµÀÒ»¸öJobÐèÒª´´½¨¼¸¸öMapTask£¬ÊµÏÖ´úÂëÈçÏÂËùʾ£º
TaskSplitMetaInfo[] splits = createSplits(jobId); 2 ... 3 numMapTasks = splits.length; 4 ... 5 maps = new TaskInProgress[numMapTasks]; // MapTaskÔÚJobTrackerµÄ±íʾΪTaskInProgress 6 for(int i=0; i < numMapTasks; ++i) { 7 inputLength += splits[i].getInputDataLength(); 8 maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i, numSlotsPerMap); 9 }
|
¶øReduceTaskµÄ¸öÊý£¬¸ù¾ÝÓû§ÔÚÅäÖÃJobʱָ¶¨µÄReduceµÄ¸öÊý£¬´´½¨ReduceTaskµÄ´úÂ룬ÈçÏÂËùʾ£º
// 2 // Create reduce tasks 3 // 4 this.reduces = new TaskInProgress[numReduceTasks]; 5 for (int i = 0; i < numReduceTasks; i++) { 6 reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this, numSlotsPerReduce); 7 nonRunningReduces.add(reduces[i]); 8 }
|
³ýÁË´´½¨MapTaskºÍReduceTaskÖ®Í⣬»¹»á´´½¨setupºÍcleanup task£¬Ã¿¸öJobµÄMapTaskºÍReduceTask¸÷¶ÔÓ¦Ò»¸ö£¬¼´¹²¼Æ2¸ösetup
taskºÍ2¸öcleanup task¡£setup taskÓÃÀ´³õʼ»¯MapTask/ReduceTask£¬¶øcleanup
taskÓÃÀ´ÇåÀíMapTask/ReduceTask¡£´´½¨setupºÍcleanup task£¬´úÂëÈçÏÂËùʾ£º
// create cleanup two cleanup tips, one map and one reduce. 02 cleanup = new TaskInProgress[2]; // cleanup task£¬map¶ÔÓ¦Ò»¸ö£¬reduce¶ÔÓ¦Ò»¸ö 03 04 // cleanup map tip. This map doesn't use any splits. Just assign an empty split. 05 TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; 06 cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1); 07 cleanup[0].setJobCleanupTask(); 08 09 // cleanup reduce tip. 10 cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1); 11 cleanup[1].setJobCleanupTask(); 12 13 // create two setup tips, one map and one reduce. 14 setup = new TaskInProgress[2]; // setup task£¬map¶ÔÓ¦Ò»¸ö£¬reduce¶ÔÓ¦Ò»¸ö 15 16 // setup map tip. This map doesn't use any split. Just assign an empty split. 17 setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1); 18 setup[0].setJobSetupTask(); 19 20 // setup reduce tip. 21 setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1); 22 setup[1].setJobSetupTask();
|
Ò»¸öJobÔÚJobInProgressÖнøÐгõʼ»¯Task£¬ÕâÀï³õʼ»¯TaskʹµÃ¸ÃJobÂú×ã±»µ÷¶ÈµÄÒªÇ󣬱ÈÈ磬֪µÀÒ»¸öJobÓÐÄÄЩTask×é³É£¬Ã¿¸öTask¶ÔÓ¦ÄĸösplitµÈµÈ¡£ÔÚ³õʼ»¯Íê³Éºó£¬ÖÃÒ»¸öTask³õʼ»¯Íê³É±êÖ¾£¬ÈçÏÂËùʾ£º
synchronized(jobInitKillStatus){ 02 jobInitKillStatus.initDone = true; 03 04 // set this before the throw to make sure cleanup works properly 05 tasksInited = true; // ÖÃTask³õʼ»¯Íê³É±êÖ¾ 06 07 if(jobInitKillStatus.killed) { 08 throw new KillInterruptedException("Job " + jobId + " killed in init"); 09 } 10 }
|
ÔÚÖÃtasksInited = true;ºó£¬¸ÃJobInProgress¾Í¿ÉÒÔ±»TaskScheduler½øÐе÷¶ÈÁË£¬µ÷¶Èʱ£¬ÊÇÒÔTask£¨MapTask/ReduceTask£©Îªµ¥Î»·ÖÅɸøTaskTracker¡£¶ø¶ÔÓÚÄÄЩTaskTracker¿ÉÒÔÔËÐÐTask£¬ÐèҪͨ¹ýTaskTrackerÏòJobTrackerÖÜÆÚÐÔ·¢Ë͵ÄÐÄÌøµÃµ½TaskTrackerµÄ½¡¿µ×´¿öÐÅÏ¢¡¢½Úµã×ÊÔ´ÐÅÏ¢µÈÀ´È·¶¨£¬ÊÇ·ñ¸ÃTaskTracker¿ÉÒÔÔËÐÐÒ»¸öJobµÄÒ»¸ö»ò¶à¸öTask¡£
|