Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
SparkÔ´ÂëϵÁУ¨Æß£©Spark on yarn¾ßÌåʵÏÖ
 
×÷Õß á¯Óñº£µÄ²©¿Í£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-11-11
  7878  次浏览      30
 

±¾À´²»´òËãдµÄÁË£¬µ«ÊÇÕæµÄÊÇÏÐÀ´ÎÞÊ£¬ÕûÌì¿´ÃÀ¾çҲûɶÒâ˼¡£ÕâÒ»Õ´òËã½²Ò»ÏÂSpark on yarnµÄʵÏÖ£¬1.0.0ÀïÃæÒѾ­ÊÇÒ»¸östableµÄ°æ±¾ÁË£¬¿ÉÊÇ1.0.1Ò²³öÀ´ÁË£¬Àë1.0.0·¢²¼²ÅÒ»¸öÔµÄʱ¼ä£¬¸üÐÂÌ«¿ìÁË£¬½Ú×à¸ú²»Éϰ¡£¬ÕâÀïÈÔ¾ÉÊǽ²1.0.0µÄ´úÂ룬ËùÒÔ¸÷λÅóÓÑÒ²²»ÒªÔÙÎÊÎÒ½²µÄÊÇÄĸö°æ±¾£¬Ä¿Ç°ÎªÖ¹·¢²¼µÄÎÄÕ¶¼ÊÇ»ùÓÚ1.0.0µÄ´úÂë¡£

ÔÚµÚÒ»Õ¡¶spark-submitÌá½»×÷Òµ¹ý³Ì¡·µÄʱºò£¬ÎÒÃǽ²¹ýSpark on yarnµÄÔÚclusterģʽÏÂËüµÄmain classÊÇorg.apache.spark.deploy.yarn.Client¡£okay£¬Õâ¸ö¾ÍÊÇÎÒÃǵÄÍ·ºÅÄ¿±ê¡£

Ìá½»×÷Òµ

ÕÒµ½mainº¯Êý£¬ÀïÃæµ÷ÓÃÁËrun·½·¨£¬ÎÒÃÇÖ±½Ó¿´run·½·¨¡£

val appId = runApp()
monitorApplication(appId)
System.exit(0)

ÔËÐÐApp£¬¸ú×ÙApp£¬×îºóÍ˳ö¡£ÎÒÃÇÏÈ¿´runApp°É¡£

def runApp(): ApplicationId = {
// УÑé²ÎÊý£¬ÄÚ´æ²»ÄÜСÓÚ384Mb£¬ExecutorµÄÊýÁ¿²»ÄÜÉÙÓÚ1¸ö¡£
validateArgs()
// ÕâÁ½¸öÊǸ¸ÀàµÄ·½·¨£¬³õʼ»¯²¢ÇÒÆô¶¯Client
init(yarnConf)
start()

// ¼Ç¼¼¯ÈºµÄÐÅÏ¢(e.g, NodeManagersµÄÊýÁ¿£¬¶ÓÁеÄÐÅÏ¢).
logClusterResourceDetails()

// ×¼±¸Ìá½»ÇëÇóµ½ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application.
val newApp = super.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
// ¼ì²é¼¯ÈºµÄÄÚ´æÊÇ·ñÂú×㵱ǰµÄ×÷ÒµÐèÇó
verifyClusterResources(newAppResponse)

// ×¼±¸×ÊÔ´ºÍ»·¾³±äÁ¿.
//1.»ñµÃ¹¤×÷Ŀ¼µÄ¾ßÌ嵨ַ: /.sparkStaging/appId/
val appStagingDir = getAppStagingDir(appId)
¡¡¡¡//2.´´½¨¹¤×÷Ŀ¼£¬ÉèÖù¤×÷Ŀ¼ȨÏÞ£¬ÉÏ´«ÔËÐÐʱËùÐèÒªµÄjar°ü
val localResources = prepareLocalResources(appStagingDir)
//3.ÉèÖÃÔËÐÐʱÐèÒªµÄ»·¾³±äÁ¿
val launchEnv = setupLaunchEnv(localResources, appStagingDir)
¡¡¡¡//4.ÉèÖÃÔËÐÐʱJVM²ÎÊý£¬ÉèÖÃSPARK_USE_CONC_INCR_GCΪtrueµÄ»°£¬¾ÍʹÓÃCMSµÄÀ¬»ø»ØÊÕ»úÖÆ
val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)

// ÉèÖÃapplication submission context.
val appContext = newApp.getApplicationSubmissionContext()
appContext.setApplicationName(args.appName)
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
appContext.setApplicationType("SPARK")

// ÉèÖÃApplicationMasterµÄÄڴ棬ResourceÊDZíʾ×ÊÔ´µÄÀ࣬ĿǰÓÐCPUºÍÄÚ´æÁ½ÖÖ.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
appContext.setResource(memoryResource)

// Ìá½»Application.
submitApp(appContext)
appId
}

monitorApplication¾Í²»ËµÁË£¬²»Í£µÄµ÷ÓÃgetApplicationReport·½·¨»ñµÃ×îеÄReport£¬È»ºóµ÷ÓÃgetYarnApplicationState»ñÈ¡µ±Ç°×´Ì¬£¬Èç¹û״̬ΪFINISHED¡¢FAILED¡¢KILLED¾ÍÍ˳ö¡£

˵µ½ÕâÀ˳±ã°Ñ¸úyarnÏà¹ØµÄ²ÎÊýÒ²Ìù³öÀ´Ò»Ï£¬´ó¼ÒÒ»¿´¾ÍÇå³þÁË¡£

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}

ApplicationMaster

Ö±½Ó¿´run·½·¨¾Í¿ÉÒÔÁË£¬mainº¯Êý¾Í¸ÉÁËÄÇôһ¼þÊÂ...

def run() {
// ÉèÖñ¾µØÄ¿Â¼£¬Ä¬ÈÏÊÇÏÈʹÓÃyarnµÄYARN_LOCAL_DIRSĿ¼£¬ÔÙµ½LOCAL_DIRS
System.setProperty("spark.local.dir", getLocalDirs())

// set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")

// when running the AM, the Spark master is always "yarn-cluster"
System.setProperty("spark.master", "yarn-cluster")

¡¡¡¡// ÉèÖÃÓÅÏȼ¶Îª30£¬ºÍmapreduceµÄÓÅÏȼ¶Ò»Ñù¡£Ëü±ÈHDFSµÄÓÅÏȼ¶¸ß£¬ÒòΪËüµÄ²Ù×÷ÊÇÇåÀí¸Ã×÷ÒµÔÚhdfsÉÏÃæµÄStagingĿ¼
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

appAttemptId = getApplicationAttemptId()
¡¡¡¡// ͨ¹ýyarn.resourcemanager.am.max-attemptsÀ´ÉèÖã¬Ä¬ÈÏÊÇ2
¡¡¡¡// Ŀǰ·¢ÏÖËüÖ»ÔÚÇåÀíStagingĿ¼µÄʱºòÓÃ
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()

// setup AmIpFilter for the SparkUI - do this before we start the UI
¡¡¡¡// ·½·¨µÄ½éÉÜ˵ÊÇyarnÓÃÀ´±£»¤ui½çÃæµÄ£¬ÎҸоõÊÇÉèÖÃip´úÀíµÄ
addAmIpFilter()
¡¡¡¡// ×¢²áApplicationMasterµ½ÄÚ²¿µÄÁбíÀï
ApplicationMaster.register(this)

// °²È«ÈÏÖ¤Ïà¹ØµÄ¶«Î÷£¬Ä¬ÈÏÊDz»¿ªÆôµÄ£¬Ê¡µÃ¸ø×Ô¼ºÕÒÊÂ
val securityMgr = new SecurityManager(sparkConf)

// Æô¶¯driver³ÌÐò
userThread = startUserClass()

// µÈ´ýSparkContext±»ÊµÀý»¯£¬Ö÷ÒªÊǵȴýspark.driver.port property±»Ê¹ÓÃ
¡¡¡¡// µÈ´ý½áÊøÖ®ºó£¬ÊµÀý»¯Ò»¸öYarnAllocationHandler
waitForSparkContextInitialized()

// Do this after Spark master is up and SparkContext is created so that we can register UI Url.
¡¡¡¡// Ïòyarn×¢²áµ±Ç°µÄApplicationMaster, Õâ¸öʱºòisFinished²»ÄÜΪtrue£¬ÊÇtrue¾Í˵Ã÷³ÌÐòʧ°ÜÁË
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}

// ÉêÇëContainerÀ´Æô¶¯Executor
allocateExecutors()

// µÈ´ý³ÌÐòÔËÐнáÊø
userThread.join()

System.exit(0)
}

run·½·¨ÀïÃæÖ÷Òª¸ÉÁË5Ï×÷£º

1¡¢³õʼ»¯¹¤×÷

2¡¢Æô¶¯driver³ÌÐò

3¡¢×¢²áApplicationMaster

4¡¢·ÖÅäExecutors

5¡¢µÈ´ý³ÌÐòÔËÐнáÊø

ÎÒÃÇÖØµã¿´·ÖÅäExecutor·½·¨¡£

private def allocateExecutors() {
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// ·Öhost¡¢rack¡¢ÈÎÒâ»úÆ÷ÈýÖÖÀàÐÍÏòResourceManagerÌá½»ContainerRequest
¡¡¡¡¡¡ // ÇëÇóµÄContainerÊýÁ¿¿ÉÄÜ´óÓÚÐèÒªµÄÊýÁ¿
yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached")
}
¡¡¡¡¡¡¡¡ // °ÑÇëÇó»ØÀ´µÄ×ÊÔ´½øÐзÖÅ䣬²¢Êͷŵô¶àÓàµÄ×ÊÔ´
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All executors have launched.")

// Æô¶¯Ò»¸öÏß³ÌÀ´×´Ì¬±¨¸æ
if (userThread.isAlive) {
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)

// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)

// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)

launchReporterThread(interval)
}
}

ÕâÀïÃæÎÒÃÇÖ»ÐèÒª¿´addResourceRequestsºÍallocateResources·½·¨¼´¿É¡£

ÏÈ˵addResourceRequests·½·¨£¬´úÂë¾Í²»ÌùÁË¡£

ClientÏòResourceManagerÌá½»ContainerµÄÇëÇ󣬷ÖÈýÖÖÀàÐÍ£ºÓÅÏÈÑ¡Ôñ»úÆ÷¡¢Í¬Ò»¸örackµÄ»úÆ÷¡¢ÈÎÒâ»úÆ÷¡£

ÓÅÏÈÑ¡Ôñ»úÆ÷ÊÇÔÚRDDÀïÃæµÄgetPreferredLocations»ñµÃµÄ»úÆ÷λÖã¬Èç¹ûûÓÐÓÅÏÈÑ¡Ôñ»úÆ÷£¬Ò²¾ÍûÓÐͬһ¸örack֮˵ÁË£¬¿ÉÒÔÊÇÈÎÒâ»úÆ÷¡£

ÏÂÃæÎÒÃǽÓ×Å¿´allocateResources·½·¨¡£

1¡¢°Ñ´ÓResourceManagerÖлñµÃµÄContainer½øÐÐÑ¡Ôñ£¬Ñ¡Ôñ˳ÐòÊǰ´ÕÕÇ°ÃæµÄ½éÉܵÄÈýÖÖÀà±ðÒÀ´Î½øÐУ¬ÓÅÏÈÑ¡Ôñ»úÆ÷ > ͬһ¸örackµÄ»úÆ÷ > ÈÎÒâ»úÆ÷¡£

2¡¢Ñ¡ÔñÁËContainerÖ®ºó£¬¸øÃ¿Ò»¸öContainer¶¼Æô¶¯Ò»¸öExecutorRunnerÒ»¶ÔÒ»ÌùÉí·þÎñ£¬¸øËü·¢ËÍÔËÐÐCoarseGrainedExecutorBackendµÄÃüÁî¡£

3¡¢ExecutorRunnerͨ¹ýNMClientÀ´ÏòNodeManager·¢ËÍÇëÇó¡£

×ܽ᣺

°Ñ×÷Òµ·¢²¼µ½yarnÉÏÃæÈ¥Ö´ÐÐÕâ¿éÉæ¼°µ½µÄÀ಻¶à£¬Ö÷ÒªÊÇÉæ¼°µ½Client¡¢ApplicationMaster¡¢YarnAllocationHandler¡¢ExecutorRunnerÕâËĸöÀà¡£

1¡¢Client×÷ΪYarnµÄ¿Í»§¶Ë£¬¸ºÔðÏòYarn·¢ËÍÆô¶¯ApplicationMasterµÄÃüÁî¡£

2¡¢ApplicationMaster¾ÍÏñÏîÄ¿¾­ÀíÒ»Ñù¸ºÔðÕû¸öÏîÄ¿ËùÐèÒªµÄ¹¤×÷£¬°üÀ¨ÇëÇó×ÊÔ´£¬·ÖÅä×ÊÔ´£¬Æô¶¯DriverºÍExecutor£¬ExecutorÆô¶¯Ê§°ÜµÄ´íÎó´¦Àí¡£

3¡¢ApplicationMasterµÄÇëÇó¡¢·ÖÅä×ÊÔ´ÊÇͨ¹ýYarnAllocationHandlerÀ´½øÐеġ£

4¡¢ContainerÑ¡ÔñµÄ˳ÐòÊÇ£ºÓÅÏÈÑ¡Ôñ»úÆ÷ > ͬһ¸örackµÄ»úÆ÷ > ÈÎÒâ»úÆ÷¡£

5¡¢ExecutorRunnerÖ»¸ºÔðÏòContainer·¢ËÍÆô¶¯CoarseGrainedExecutorBackendµÄÃüÁî¡£

6¡¢ExecutorµÄ´íÎó´¦ÀíÊÇÔÚApplicationMasterµÄlaunchReporterThread·½·¨ÀïÃæ£¬ËüÆô¶¯µÄÏ̳߳ýÁ˱¨¸æÔËÐÐ״̬£¬»¹»á¼à¿ØExecutorµÄÔËÐУ¬Ò»µ©·¢ÏÖÓжªÊ§µÄExecutor¾ÍÖØÐÂÇëÇó¡£

7¡¢ÔÚyarnĿ¼Ï¿´µ½µÄÃû³ÆÀïÃæ´øÓÐYarnClientµÄÊÇÊôÓÚyarn-clientģʽµÄÀ࣬ʵÏÖºÍÇ°ÃæµÄÒ²²î²»¶à¡£

ÆäËüµÄÄÚÈݸü¶àÊÇYarnµÄ¿Í»§¶ËapiʹÓã¬ÎÒÒ²²»Ì«»á£¬Ö»ÊÇ¿´µ½ÁËÄܶ®¸öÒâ˼£¬¹þ¹þ¡£

   
7878 ´Îä¯ÀÀ       30
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí