±¾À´²»´òËãдµÄÁË£¬µ«ÊÇÕæµÄÊÇÏÐÀ´ÎÞÊ£¬ÕûÌì¿´ÃÀ¾çҲûɶÒâ˼¡£ÕâÒ»Õ´òËã½²Ò»ÏÂSpark
on yarnµÄʵÏÖ£¬1.0.0ÀïÃæÒѾÊÇÒ»¸östableµÄ°æ±¾ÁË£¬¿ÉÊÇ1.0.1Ò²³öÀ´ÁË£¬Àë1.0.0·¢²¼²ÅÒ»¸öÔµÄʱ¼ä£¬¸üÐÂÌ«¿ìÁË£¬½Ú×à¸ú²»Éϰ¡£¬ÕâÀïÈÔ¾ÉÊǽ²1.0.0µÄ´úÂ룬ËùÒÔ¸÷λÅóÓÑÒ²²»ÒªÔÙÎÊÎÒ½²µÄÊÇÄĸö°æ±¾£¬Ä¿Ç°ÎªÖ¹·¢²¼µÄÎÄÕ¶¼ÊÇ»ùÓÚ1.0.0µÄ´úÂë¡£
ÔÚµÚÒ»Õ¡¶spark-submitÌá½»×÷Òµ¹ý³Ì¡·µÄʱºò£¬ÎÒÃǽ²¹ýSpark on yarnµÄÔÚclusterģʽÏÂËüµÄmain
classÊÇorg.apache.spark.deploy.yarn.Client¡£okay£¬Õâ¸ö¾ÍÊÇÎÒÃǵÄÍ·ºÅÄ¿±ê¡£
Ìá½»×÷Òµ
ÕÒµ½mainº¯Êý£¬ÀïÃæµ÷ÓÃÁËrun·½·¨£¬ÎÒÃÇÖ±½Ó¿´run·½·¨¡£
val appId = runApp() monitorApplication(appId) System.exit(0) |
ÔËÐÐApp£¬¸ú×ÙApp£¬×îºóÍ˳ö¡£ÎÒÃÇÏÈ¿´runApp°É¡£
def runApp(): ApplicationId = { // УÑé²ÎÊý£¬ÄÚ´æ²»ÄÜСÓÚ384Mb£¬ExecutorµÄÊýÁ¿²»ÄÜÉÙÓÚ1¸ö¡£ validateArgs() // ÕâÁ½¸öÊǸ¸ÀàµÄ·½·¨£¬³õʼ»¯²¢ÇÒÆô¶¯Client init(yarnConf) start()
// ¼Ç¼¼¯ÈºµÄÐÅÏ¢(e.g, NodeManagersµÄÊýÁ¿£¬¶ÓÁеÄÐÅÏ¢).
logClusterResourceDetails()
// ×¼±¸Ìá½»ÇëÇóµ½ResourcManager (specifically its ApplicationsManager
(ASM)// Get a new client application.
val newApp = super.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
// ¼ì²é¼¯ÈºµÄÄÚ´æÊÇ·ñÂú×㵱ǰµÄ×÷ÒµÐèÇó
verifyClusterResources(newAppResponse)
// ×¼±¸×ÊÔ´ºÍ»·¾³±äÁ¿.
//1.»ñµÃ¹¤×÷Ŀ¼µÄ¾ßÌ嵨ַ: /.sparkStaging/appId/
val appStagingDir = getAppStagingDir(appId)
¡¡¡¡//2.´´½¨¹¤×÷Ŀ¼£¬ÉèÖù¤×÷Ŀ¼ȨÏÞ£¬ÉÏ´«ÔËÐÐʱËùÐèÒªµÄjar°ü
val localResources = prepareLocalResources(appStagingDir)
//3.ÉèÖÃÔËÐÐʱÐèÒªµÄ»·¾³±äÁ¿
val launchEnv = setupLaunchEnv(localResources,
appStagingDir)
¡¡¡¡//4.ÉèÖÃÔËÐÐʱJVM²ÎÊý£¬ÉèÖÃSPARK_USE_CONC_INCR_GCΪtrueµÄ»°£¬¾ÍʹÓÃCMSµÄÀ¬»ø»ØÊÕ»úÖÆ
val amContainer = createContainerLaunchContext(newAppResponse,
localResources, launchEnv)
// ÉèÖÃapplication submission context.
val appContext = newApp.getApplicationSubmissionContext()
appContext.setApplicationName(args.appName)
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
appContext.setApplicationType("SPARK")
// ÉèÖÃApplicationMasterµÄÄڴ棬ResourceÊDZíʾ×ÊÔ´µÄÀ࣬ĿǰÓÐCPUºÍÄÚ´æÁ½ÖÖ.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
appContext.setResource(memoryResource)
// Ìá½»Application.
submitApp(appContext)
appId
} |
monitorApplication¾Í²»ËµÁË£¬²»Í£µÄµ÷ÓÃgetApplicationReport·½·¨»ñµÃ×îеÄReport£¬È»ºóµ÷ÓÃgetYarnApplicationState»ñÈ¡µ±Ç°×´Ì¬£¬Èç¹û״̬ΪFINISHED¡¢FAILED¡¢KILLED¾ÍÍ˳ö¡£
˵µ½ÕâÀ˳±ã°Ñ¸úyarnÏà¹ØµÄ²ÎÊýÒ²Ìù³öÀ´Ò»Ï£¬´ó¼ÒÒ»¿´¾ÍÇå³þÁË¡£
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } } |
ApplicationMaster
Ö±½Ó¿´run·½·¨¾Í¿ÉÒÔÁË£¬mainº¯Êý¾Í¸ÉÁËÄÇôһ¼þÊÂ...
def run() { // ÉèÖñ¾µØÄ¿Â¼£¬Ä¬ÈÏÊÇÏÈʹÓÃyarnµÄYARN_LOCAL_DIRSĿ¼£¬ÔÙµ½LOCAL_DIRS System.setProperty("spark.local.dir", getLocalDirs())
// set the web ui port to be ephemeral for yarn
so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port",
"0")
// when running the AM, the Spark master is
always "yarn-cluster"
System.setProperty("spark.master", "yarn-cluster")
¡¡¡¡// ÉèÖÃÓÅÏȼ¶Îª30£¬ºÍmapreduceµÄÓÅÏȼ¶Ò»Ñù¡£Ëü±ÈHDFSµÄÓÅÏȼ¶¸ß£¬ÒòΪËüµÄ²Ù×÷ÊÇÇåÀí¸Ã×÷ÒµÔÚhdfsÉÏÃæµÄStagingĿ¼
ShutdownHookManager.get().addShutdownHook(new
AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
¡¡¡¡// ͨ¹ýyarn.resourcemanager.am.max-attemptsÀ´ÉèÖã¬Ä¬ÈÏÊÇ2
¡¡¡¡// Ŀǰ·¢ÏÖËüÖ»ÔÚÇåÀíStagingĿ¼µÄʱºòÓÃ
isLastAMRetry = appAttemptId.getAttemptId() >=
maxAppAttempts
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()
// setup AmIpFilter for the SparkUI - do this
before we start the UI
¡¡¡¡// ·½·¨µÄ½éÉÜ˵ÊÇyarnÓÃÀ´±£»¤ui½çÃæµÄ£¬ÎҸоõÊÇÉèÖÃip´úÀíµÄ
addAmIpFilter()
¡¡¡¡// ×¢²áApplicationMasterµ½ÄÚ²¿µÄÁбíÀï
ApplicationMaster.register(this)
// °²È«ÈÏÖ¤Ïà¹ØµÄ¶«Î÷£¬Ä¬ÈÏÊDz»¿ªÆôµÄ£¬Ê¡µÃ¸ø×Ô¼ºÕÒÊÂ
val securityMgr = new SecurityManager(sparkConf)
// Æô¶¯driver³ÌÐò
userThread = startUserClass()
// µÈ´ýSparkContext±»ÊµÀý»¯£¬Ö÷ÒªÊǵȴýspark.driver.port
property±»Ê¹ÓÃ
¡¡¡¡// µÈ´ý½áÊøÖ®ºó£¬ÊµÀý»¯Ò»¸öYarnAllocationHandler
waitForSparkContextInitialized()
// Do this after Spark master is up and SparkContext
is created so that we can register UI Url.
¡¡¡¡// Ïòyarn×¢²áµ±Ç°µÄApplicationMaster, Õâ¸öʱºòisFinished²»ÄÜΪtrue£¬ÊÇtrue¾Í˵Ã÷³ÌÐòʧ°ÜÁË
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}
// ÉêÇëContainerÀ´Æô¶¯Executor
allocateExecutors()
// µÈ´ý³ÌÐòÔËÐнáÊø
userThread.join()
System.exit(0)
} |
run·½·¨ÀïÃæÖ÷Òª¸ÉÁË5Ï×÷£º
1¡¢³õʼ»¯¹¤×÷
2¡¢Æô¶¯driver³ÌÐò
3¡¢×¢²áApplicationMaster
4¡¢·ÖÅäExecutors
5¡¢µÈ´ý³ÌÐòÔËÐнáÊø
ÎÒÃÇÖØµã¿´·ÖÅäExecutor·½·¨¡£
private def allocateExecutors() { try { logInfo("Allocating " + args.numExecutors + " executors.") // ·Öhost¡¢rack¡¢ÈÎÒâ»úÆ÷ÈýÖÖÀàÐÍÏòResourceManagerÌá½»ContainerRequest ¡¡¡¡¡¡ // ÇëÇóµÄContainerÊýÁ¿¿ÉÄÜ´óÓÚÐèÒªµÄÊýÁ¿ yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached") } ¡¡¡¡¡¡¡¡ // °ÑÇëÇó»ØÀ´µÄ×ÊÔ´½øÐзÖÅ䣬²¢Êͷŵô¶àÓàµÄ×ÊÔ´ yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All executors have launched.")
// Æô¶¯Ò»¸öÏß³ÌÀ´×´Ì¬±¨¸æ
if (userThread.isAlive) {
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS
elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
120000)
// we want to be reasonably responsive without
causing too many requests to RM.
val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms",
5000)
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
launchReporterThread(interval)
}
} |
ÕâÀïÃæÎÒÃÇÖ»ÐèÒª¿´addResourceRequestsºÍallocateResources·½·¨¼´¿É¡£
ÏÈ˵addResourceRequests·½·¨£¬´úÂë¾Í²»ÌùÁË¡£
ClientÏòResourceManagerÌá½»ContainerµÄÇëÇ󣬷ÖÈýÖÖÀàÐÍ£ºÓÅÏÈÑ¡Ôñ»úÆ÷¡¢Í¬Ò»¸örackµÄ»úÆ÷¡¢ÈÎÒâ»úÆ÷¡£
ÓÅÏÈÑ¡Ôñ»úÆ÷ÊÇÔÚRDDÀïÃæµÄgetPreferredLocations»ñµÃµÄ»úÆ÷λÖã¬Èç¹ûûÓÐÓÅÏÈÑ¡Ôñ»úÆ÷£¬Ò²¾ÍûÓÐͬһ¸örack֮˵ÁË£¬¿ÉÒÔÊÇÈÎÒâ»úÆ÷¡£
ÏÂÃæÎÒÃǽÓ×Å¿´allocateResources·½·¨¡£
1¡¢°Ñ´ÓResourceManagerÖлñµÃµÄContainer½øÐÐÑ¡Ôñ£¬Ñ¡Ôñ˳ÐòÊǰ´ÕÕÇ°ÃæµÄ½éÉܵÄÈýÖÖÀà±ðÒÀ´Î½øÐУ¬ÓÅÏÈÑ¡Ôñ»úÆ÷
> ͬһ¸örackµÄ»úÆ÷ > ÈÎÒâ»úÆ÷¡£
2¡¢Ñ¡ÔñÁËContainerÖ®ºó£¬¸øÃ¿Ò»¸öContainer¶¼Æô¶¯Ò»¸öExecutorRunnerÒ»¶ÔÒ»ÌùÉí·þÎñ£¬¸øËü·¢ËÍÔËÐÐCoarseGrainedExecutorBackendµÄÃüÁî¡£
3¡¢ExecutorRunnerͨ¹ýNMClientÀ´ÏòNodeManager·¢ËÍÇëÇó¡£
×ܽ᣺
°Ñ×÷Òµ·¢²¼µ½yarnÉÏÃæÈ¥Ö´ÐÐÕâ¿éÉæ¼°µ½µÄÀ಻¶à£¬Ö÷ÒªÊÇÉæ¼°µ½Client¡¢ApplicationMaster¡¢YarnAllocationHandler¡¢ExecutorRunnerÕâËĸöÀà¡£
1¡¢Client×÷ΪYarnµÄ¿Í»§¶Ë£¬¸ºÔðÏòYarn·¢ËÍÆô¶¯ApplicationMasterµÄÃüÁî¡£
2¡¢ApplicationMaster¾ÍÏñÏîÄ¿¾ÀíÒ»Ñù¸ºÔðÕû¸öÏîÄ¿ËùÐèÒªµÄ¹¤×÷£¬°üÀ¨ÇëÇó×ÊÔ´£¬·ÖÅä×ÊÔ´£¬Æô¶¯DriverºÍExecutor£¬ExecutorÆô¶¯Ê§°ÜµÄ´íÎó´¦Àí¡£
3¡¢ApplicationMasterµÄÇëÇó¡¢·ÖÅä×ÊÔ´ÊÇͨ¹ýYarnAllocationHandlerÀ´½øÐеġ£
4¡¢ContainerÑ¡ÔñµÄ˳ÐòÊÇ£ºÓÅÏÈÑ¡Ôñ»úÆ÷ > ͬһ¸örackµÄ»úÆ÷ > ÈÎÒâ»úÆ÷¡£
5¡¢ExecutorRunnerÖ»¸ºÔðÏòContainer·¢ËÍÆô¶¯CoarseGrainedExecutorBackendµÄÃüÁî¡£
6¡¢ExecutorµÄ´íÎó´¦ÀíÊÇÔÚApplicationMasterµÄlaunchReporterThread·½·¨ÀïÃæ£¬ËüÆô¶¯µÄÏ̳߳ýÁ˱¨¸æÔËÐÐ״̬£¬»¹»á¼à¿ØExecutorµÄÔËÐУ¬Ò»µ©·¢ÏÖÓжªÊ§µÄExecutor¾ÍÖØÐÂÇëÇó¡£
7¡¢ÔÚyarnĿ¼Ï¿´µ½µÄÃû³ÆÀïÃæ´øÓÐYarnClientµÄÊÇÊôÓÚyarn-clientģʽµÄÀ࣬ʵÏÖºÍÇ°ÃæµÄÒ²²î²»¶à¡£
ÆäËüµÄÄÚÈݸü¶àÊÇYarnµÄ¿Í»§¶ËapiʹÓã¬ÎÒÒ²²»Ì«»á£¬Ö»ÊÇ¿´µ½ÁËÄܶ®¸öÒâ˼£¬¹þ¹þ¡£ |