ǰÑÔ
ÕÛÌÚÁ˺ܾã¬ÖÕÓÚ¿ªÊ¼Ñ§Ï°SparkµÄÔ´ÂëÁË£¬µÚһƪÎÒ´òËã½²Ò»ÏÂSpark×÷ÒµµÄÌá½»¹ý³Ì¡£

Õâ¸öÊÇSparkµÄAppÔËÐÐͼ£¬Ëüͨ¹ýÒ»¸öDriverÀ´ºÍ¼¯ÈºÍ¨ÐÅ£¬¼¯Èº¸ºÔð×÷ÒµµÄ·ÖÅä¡£½ñÌìÎÒÒª½²µÄÊÇÈçºÎ´´½¨Õâ¸öDriver
ProgramµÄ¹ý³Ì¡£
×÷ÒµÌá½»·½·¨ÒÔ¼°²ÎÊý
ÎÒÃÇÏÈ¿´Ò»ÏÂÓÃSpark SubmitÌá½»µÄ·½·¨°É£¬ÏÂÃæÊÇ´Ó¹Ù·½ÉÏÃæÕª³µÄÄÚÈÝ¡£
# Run on a Spark standalone cluster ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ |
Õâ¸öÊÇÌá½»µ½standalone¼¯ÈºµÄ·½Ê½£¬´ò¿ªspark-submitÕâÎļþ£¬ÎÒÃǻᷢÏÖËü×îºóÊǵ÷ÓÃÁËorg.apache.spark.deploy.SparkSubmitÕâ¸öÀà¡£
ÎÒÃÇÖ±½Ó½øÈ¥¿´¾ÍÐÐÁË£¬mainº¯Êý¾Í¼¸ÐдúÂ룬̫½ÚÊ¡ÁË¡£
def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) } |
ÎÒÃÇÖ÷Òª¿´¿´createLaunchEnv·½·¨¾Í¿ÉÒÔÁË£¬launchÊÇ·´Éäµ÷ÓÃmainClass£¬¾«»ªÈ«ÔÚcreateLaunchEnvÀïÃæÁË¡£
ÔÚÀïÃæÎÒ·¢ÏÖһЩÓÐÓõÄÐÅÏ¢£¬¿ÉÄÜÔÚ¹Ù·½ÎĵµÉÏÃæ¶¼Ã»Óе쬷¢³öÀ´´ó¼Ò³ò³ò¡£Ç°Ãæ²»´ø--µÄ¿ÉÒÔÔÚspark-defaults.confÀïÃæÉèÖ㬴ø--µÄÖ±½ÓÔÚÌá½»µÄʱºòÖ¸¶¨£¬¾ßÌ庬Òå´ó¼ÒÒ»¿´¾Í¶®¡£
val options = List[OptionAssigner]( OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"), OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"), OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraClassPath"), OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true, sysProp = "spark.driver.extraJavaOptions"), OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraLibraryPath"), OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"), OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"), OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"), OptionAssigner(args.queue, YARN, true, clOption = "--queue"), OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"), OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"), OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false, sysProp = "spark.executor.memory"), OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"), OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false, sysProp = "spark.cores.max"), OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.files, YARN, true, clOption = "--files"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), OptionAssigner(args.archives, YARN, true, clOption = "--archives"), OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars") ) |
Driver³ÌÐòµÄ²¿ÊðģʽÓÐÁ½ÖÖ£¬clientºÍcluster£¬Ä¬ÈÏÊÇclient¡£clientµÄ»°Ä¬ÈϾÍÊÇÖ±½ÓÔÚ±¾µØÔËÐÐÁËDriver³ÌÐòÁË£¬clusterģʽ»¹»á¶µÒ»È¦°Ñ×÷Òµ·¢µ½¼¯ÈºÉÏÃæÈ¥ÔËÐС£
Ö¸¶¨²¿ÊðģʽÐèÒªÓòÎÊý--deploy-modeÀ´Ö¸¶¨£¬»òÕßÔÚ»·¾³±äÁ¿µ±ÖÐÌí¼ÓDEPLOY_MODE±äÁ¿À´Ö¸¶¨¡£
ÏÂÃæ½²µÄÊÇclusterµÄ²¿Êð·½Ê½£¬¶µÒ»È¦µÄÕâÖÖÇé¿ö¡£
yarnģʽµÄ»°mainClassÊÇorg.apache.spark.deploy.yarn.Client£¬standaloneµÄmainClassÊÇorg.apache.spark.deploy.Client¡£
Õâ´ÎÎÒÃǽ²org.apache.spark.deploy.Client£¬yarnµÄ»°µ¥¶ÀÕÒÒ»Õ³öÀ´µ¥¶À½²£¬Ä¿Ç°³¬¸ç»¹ÊÇÍÆ¼öʹÓÃstandaloneµÄ·½Ê½²¿Êðspark£¬¾ßÌåÔÒò²»Ï꣬¾Ý˵ÊÇÒòΪ×ÊÔ´µ÷¶È·½ÃæµÄÎÊÌâ¡£
˵¸ö¿ì½Ý¼ü°É£¬Ctrl+Shift+N£¬È»ºóÊäÈëClient¾ÍÄÜÕÒµ½Õâ¸öÀ࣬ÕâÊÇIDEAµÄ¿ì½Ý¼ü£¬Ï൱ºÃʹ¡£
ÎÒÃÇÖ±½ÓÕÒµ½ËüµÄmainº¯Êý£¬·¢ÏÖÁËËü¾ÓȻʹÓÃÁËAkka¿ò¼Ü£¬ÎÒ°Ù¶ÈÁËһϣ¬±»ËüÕð¾ªÁË¡£
Akka
ÔÚmainº¯ÊýÀïÃæ£¬Ö÷Òª´úÂë¾ÍÕâôÈýÐС£
//´´½¨Ò»¸öActorSystem val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0, ¡¡¡¡conf, new SecurityManager(conf)) //Ö´ÐÐClientActorµÄpreStart·½·¨ºÍreceive·½·¨ actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) //µÈ´ýÔËÐнáÊø actorSystem.awaitTermination() |
¿´ÁËÕâÀïÕæµÄÓеã¶ùã°¡£¬ÕâÊÇÉ¶ÍæÒâ¶ù£¬²»¶®µÄÅóÓÑÃÇ£¬Çëµã»÷ÕâÀïAkka¡£ÏÂÃæÊÇËü¹Ù·½·Å³öÀ´µÄÀý×Ó£º
//¶¨ÒåÒ»¸öcase classÓÃÀ´´«µÝ²ÎÊý case class Greeting(who: String) //¶¨ÒåActor£¬±È½ÏÖØÒªµÄÒ»¸ö·½·¨ÊÇreceive·½·¨£¬ÓÃÀ´½ÓÊÕÐÅÏ¢µÄ class GreetingActor extends Actor with ActorLogging { def receive = { case Greeting(who) ? log.info("Hello " + who) } } //´´½¨Ò»¸öActorSystem val system = ActorSystem("MySystem") //¸øActorSystemÉèÖÃActor val greeter = system.actorOf(Props[GreetingActor], name = "greeter") //Ïògreeter·¢ËÍÐÅÏ¢£¬ÓÃGreetingÀ´´«µÝ greeter ! Greeting("Charlie Parker") |
¼òÖ±ÊÇÎÞ±ÈÇ¿´ó°¡£¬¾ÍÕâô¼¸ÐдúÂë¾Í¸ã¶¨ÁË£¬½ÓÏÂÀ´¿´Äã»á¸ü¼ÓÕ𾪵ġ£
ÎÒÃǻص½ClientÀ൱ÖУ¬ÕÒµ½ClientActor£¬ËüÓÐÁ½¸ö·½·¨£¬ÊÇ֮ǰ˵µÄpreStartºÍreceive·½·¨£¬preStart·½·¨ÓÃÓÚÁ¬½ÓmasterÌá½»×÷ÒµÇëÇó£¬receive·½·¨ÓÃÓÚ½ÓÊÕ´Ómaster·µ»ØµÄ·´À¡ÐÅÏ¢¡£
ÎÒÃÇÏÈ¿´preStart·½·¨°É¡£
override def preStart() = { // ÕâÀïÐèÒª°ÑmasterµÄµØÖ·×ª»»³ÉakkaµÄµØÖ·£¬È»ºóͨ¹ýÕâ¸öakkaµØÖ·»ñµÃÖ¸¶¨µÄactor // ËüµÄ¸ñʽÊÇ"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master)) // °Ñ×ÔÉíÉèÖóÉÔ¶³ÌÉúÃüÖÜÆÚµÄʼþ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
driverArgs.cmd match {
case "launch" =>
// ´Ë´¦Ê¡ÂÔ100¸ö×Ö
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
// ´Ë´¦Ê¡ÂÔ100¸ö×Ö
// Ïòmaster·¢ËÍÌá½»DriverµÄÇëÇ󣬰ÑdriverDescription´«¹ýÈ¥£¬RequestSubmitDriverÇ°ÃæËµ¹ýÁË£¬ÊǸöcase
class
masterActor ! RequestSubmitDriver(driverDescription)
case "kill" =>
val driverId = driverArgs.driverId
val killFuture = masterActor ! RequestKillDriver(driverId)
}
} |
´ÓÉÏÃæµÄ´úÂë¿´µÃ³öÀ´£¬ËüÐèÒªÉèÖÃmasterµÄÁ¬½ÓµØÖ·£¬×îºóÌá½»ÁËÒ»¸öRequestSubmitDriverµÄÐÅÏ¢¡£ÔÚreceive·½·¨ÀïÃæ£¬¾ÍÊǵȴý½ÓÊÜ»ØÓ¦ÁË£¬ÓÐÁ½¸öResponse·Ö±ð¶ÔÓ¦×ÅÕâÀïµÄlaunchºÍkill¡£
ÏßË÷Ã²ËÆµ½ÕâÀï¾Í¶ÏÁË£¬ÄÇÏÂÒ»²½ÔÚÄÄÀïÁËÄØ£¿µ±È»ÊÇÔÚMasterÀïÃæÀ²£¬Ôõô֪µÀµÄ£¬²ÂµÄ£¬¹þ¹þ¡£
MasterÒ²ÊǼ̳ÐÁËActor£¬ÔÚËüµÄmainº¯ÊýÀïÃæÕÒµ½ÁËÒÔÏ´úÂ룺
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, ¡¡¡¡securityManager = securityMgr) val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName) val timeout = AkkaUtils.askTimeout(conf) val respFuture = actor.ask(RequestWebUIPort)(timeout) val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse] |
ºÍÇ°ÃæµÄactor»ù±¾Ò»Ö£¬¶àÁËactor.askÕâ¾ä»°£¬²éÁËһϹÙÍøµÄÎĵµ£¬Õâ¾ä»°µÄÒâ˼µÄ·¢ËÍÏûÏ¢£¬²¢ÇÒ½ÓÊÜÒ»¸öFuture×÷Ϊresponse£¬ºÍÇ°ÃæµÄactor
£¡ messageµÄÇø±ð¾ÍÊÇËü»¹½ÓÊÜ·µ»ØÖµ¡£
¾ßÌåµÄAkkaµÄÓ÷¨£¬´ó¼Ò»¹ÊDzÎÕÕ¹ÙÍø°É£¬AkkaȷʵÈçËü¹ÙÍøËùÑÔµÄÄÇÑù×Ó£¬ÊÇÒ»¸ö¼òµ¥¡¢Ç¿´ó¡¢²¢Ðеķֲ¼Ê½¿ò¼Ü¡£
С½á£º
AkkaµÄʹÓÃȷʵ¼òµ¥£¬¶Ì¶ÌµÄ¼¸ÐдúÂë¼´¿ÌÍê³ÉÒ»¸öͨÐŹ¦ÄÜ£¬±ÈSocket¼òµ¥ºÜ¶à¡£µ«ÊÇËüÒ²ÌÓ²»ÍÑÎÒÃdz£ËµµÄÄÇЩ¶«Î÷£¬ÇëÇó¡¢½ÓÊÕÇëÇó¡¢´«µÝµÄÏûÏ¢¡¢×¢²áµÄµØÖ·ºÍ¶Ë¿ÚÕâЩ¸ÅÄî¡£
µ÷¶Èschedule
ÎÒÃǽÓÏÂÀ´²éÕÒMasterµÄreceive·½·¨°É£¬MasterÊÇ×÷Ϊ½ÓÊÕ·½µÄ£¬¶ø²»ÊÇÖ÷¶¯ÇëÇó£¬ÕâµãºÍhadoopÊÇÒ»Öµġ£
case RequestSubmitDriver(description) => { val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) // µ÷¶È schedule() // ¸æËßclient£¬Ìá½»³É¹¦ÁË£¬°Ñdriver.id¸æËßËü sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}") } |
ÕâÀïÎÒÃÇÖ÷Òª¿´schedule·½·¨¾Í¿ÉÒÔÁË£¬ËüÊÇÖ´Ðе÷¶ÈµÄ·½·¨¡£
private def schedule() { if (state != RecoveryState.ALIVE) { return }
// Ê×Ïȵ÷¶ÈDriver³ÌÐò£¬´ÓworkersÀïÃæËæ»ú³éһЩ³öÀ´
val shuffledWorkers = Random.shuffle(workers)
for (worker <- shuffledWorkers if worker.state
== WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
// ÅжÏÄÚ´æºÍcpu¹»²»¹»£¬¹»µÄ¾ÍÖ´ÐÐÁ˹þ
if (worker.memoryFree >= driver.desc.mem &&
worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
// ÕâÀïÊǰ´ÕÕÏȽøÏȳöµÄ£¬spreadOutAppsÊÇÓÉspark.deploy.spreadOut²ÎÊýÀ´¾ö¶¨µÄ£¬Ä¬ÈÏÊÇtrue
if (spreadOutApps) {
// ±éÀúÒ»ÏÂapp
for (app <- waitingApps if app.coresLeft >
0) {
// canUseÀïÃæÅжÏÁËworkerµÄÄÚ´æÊÇ·ñ¹»Ó㬲¢ÇÒ¸ÃworkerÊÇ·ñÒѾ°üº¬Á˸ÃappµÄExecutor
val usableWorkers = workers.toArray.filter(_.state
== WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable)
// ¼Ç¼ÿ¸ö½ÚµãµÄºËÐÄÊý
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
// ±éÀúÖ±µ½·ÖÅä½áÊø
while (toAssign > 0) {
// ´Ó0¿ªÊ¼±éÀú¿ÉÓõÄwork£¬Èç¹û¿ÉÓõÄcpu¼õÈ¥ÒѾ·ÖÅäµÄ>0,¾Í¿ÉÒÔ·ÖÅ䏸Ëü
if (usableWorkers(pos).coresFree - assigned(pos)
> 0) {
toAssign -= 1
// Õâ¸öλÖõÄworkµÄ¿É·ÖÅäµÄcpuÊý+1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// ¸ø¸Õ²Å±ê¼ÇµÄworker·ÖÅäÈÎÎñ
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos),
assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// ÕâÖÖ·½Ê½ºÍÉÏÃæµÄ·½Ê½µÄÇø±ðÊÇ£¬ÕâÖÖ·½Ê½¾¡¿ÉÄÜÓÃÉÙÁ¿µÄ½ÚµãÀ´Íê³ÉÕâ¸öÈÎÎñ
for (worker <- workers if worker.coresFree
> 0 && worker.state == WorkerState.ALIVE)
{
for (app <- waitingApps if app.coresLeft >
0) {
// ÅжÏÌõ¼þÊÇworkerµÄÄÚ´æ±ÈappÐèÒªµÄÄÚ´æ¶à
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
} |
ËüµÄµ÷¶ÈÆ÷ÊÇÕâÑùµÄ£¬Ïȵ÷¶ÈDriver³ÌÐò£¬È»ºóÔÙµ÷¶ÈApp£¬µ÷¶ÈAppµÄ·½Ê½ÊÇ´Ó¸÷¸öworkerµÄÀïÃæºÍApp½øÐÐÆ¥Å䣬¿´ÐèÒª·ÖÅä¶àÉÙ¸öcpu¡£
ÄÇÎÒÃǽÓÏÂÀ´¿´Á½¸ö·½·¨launchDriverºÍlaunchExecutor¼´¿É¡£
def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) worker.actor ! LaunchDriver(driver.id, driver.desc) driver.state = DriverState.RUNNING } |
¸øworker·¢ËÍÁËÒ»¸öLaunchDriverµÄÏûÏ¢£¬ÏÂÃæÔÚ¿´launchExecutorµÄ·½·¨¡£
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) } |
ËüÒª×öµÄÊÂÇé¶àÒ»µã£¬³ýÁ˸øworker·¢ËÍLaunchExecutorÖ¸ÁîÍ⣬»¹ÐèÒª¸ødriver·¢ËÍExecutorAddedµÄÏûÏ¢£¬ËµÄãµÄÈÎÎñÒѾÓÐÈ˸ÉÁË¡£
ÔÚ¼ÌÐøWorker½²Ö®Ç°£¬ÎÒÃÇÏÈ¿´¿´ËüÊÇÔõôע²á½øÀ´µÄ£¬Ã¿¸öWorkerÆô¶¯Ö®ºó£¬»á×Ô¶¯È¥ÇëÇóMasterÈ¥×¢²á×Ô¼º£¬¾ßÌåÎÒÃÇ¿ÉÒÔ¿´receiveµÄ·½·¨ÀïÃæµÄRegisterWorkerÕâÒ»¶Î£¬ËüÐèÒªÉϱ¨×Ô¼ºµÄÄÚ´æ¡¢Cpu¡¢µØÖ·¡¢¶Ë¿ÚµÈÐÅÏ¢£¬×¢²á³É¹¦Ö®ºó·µ»ØRegisteredWorkerÐÅÏ¢¸øËü£¬ËµÒѾע²á³É¹¦ÁË¡£
WorkerÖ´ÐÐ
ͬÑùµÄ£¬ÎÒÃǵ½WorkerÀïÃæÔÚreceive·½·¨ÕÒLaunchDriverºÍLaunchExecutor¾Í¿ÉÒÔÕÒµ½ÎÒÃÇÒªµÄ¶«Î÷¡£
case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) drivers(driverId) = driver driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
} |
¿´Ò»ÏÂstart·½·¨°É£¬start·½·¨ÀïÃæ£¬ÆäʵÊÇnew Thread().start()£¬run·½·¨ÀïÃæÊÇͨ¹ý´«¹ýÀ´µÄDriverDescription¹¹ÔìµÄÒ»¸öÃüÁ¶ª¸øProcessBuilderÈ¥Ö´ÐÐÃüÁ½áÊøÖ®ºóµ÷Óá£
worker £¡DriverStateChanged֪ͨworker£¬workerÔÙͨ¹ýmaster
! DriverStateChanged֪ͨmaster£¬ÊͷŵôworkerµÄcpuºÍÄÚ´æ¡£
ͬÀí£¬LaunchExecutorÖ´ÐÐÍê±ÏÁË£¬Í¨¹ýworker ! ExecutorStateChanged֪ͨworker£¬È»ºóworkerͨ¹ýmaster
! ExecutorStateChanged֪ͨmaster£¬ÊͷŵôworkerµÄcpuºÍÄÚ´æ¡£
ÏÂÃæÎÒÃÇÔÙÊáÀíÒ»ÏÂÕâ¸ö¹ý³Ì£¬Ö»°üÀ¨Driver×¢²á£¬DriverÔËÐÐÖ®ºóµÄ¹ý³ÌÔÚÖ®ºóµÄÎÄÕÂÔÙ˵£¬±È½Ï¸´ÔÓ¡£
1¡¢Clientͨ¹ý»ñµÃUrlµØÖ·»ñµÃActorSelection£¨masterµÄactorÒýÓã©,È»ºóͨ¹ýActorSelection¸øMaster·¢ËÍ×¢²áDriverÇëÇó£¨RequestSubmitDriver£©
2¡¢Master½ÓÊÕµ½ÇëÇóÖ®ºó¾Í¿ªÊ¼µ÷¶ÈÁË£¬´ÓworkersÁбíÀïÃæÕÒ³ö¿ÉÒÔÓõÄWorker
3¡¢Í¨¹ýWorkerµÄactorÒýÓÃActorRef¸ø¿ÉÓõÄWorker·¢ËÍÆô¶¯DriverÇëÇó£¨LaunchDriver£©
4¡¢µ÷¶ÈÍê±ÏÖ®ºó£¬¸øClient»Ø¸´×¢²á³É¹¦ÏûÏ¢(SubmitDriverResponse)
5¡¢Worker½ÓÊÕµ½LaunchDriverÇëÇóÖ®ºó£¬Í¨¹ý´«¹ýÀ´µÄDriverDescriptionµÄÐÅÏ¢¹¹Ôì³öÃüÁîÀ´£¬Í¨¹ýProcessBuilderÖ´ÐÐ
6¡¢ProcessBuilderÖ´ÐÐÍêÃüÁîÖ®ºó£¬Í¨¹ýDriverStateChangedͨ¹ýWorker
7¡¢Worker×îºó°ÑDriverStateChanged»ã±¨¸øMaster
ºó¼Ç£ºÌý³¬¸ç˵£¬org.apache.spark.deploy.ClientÕâ¸öÀà¿ìÒª±»É¾³ýÁË£¬²»ÖªµÀclusterµÄÕâÖÖģʽÊDz»ÊÇÒ²±»·ÅÆúÁË£¬¹Ù·½¸ø³öÀ´µÄÀý×ÓÍÆ¼öµÄÊÇclientģʽ->Ö±½ÓÔËÐгÌÐò¡£ÄѹÖÔÚ×÷Òµµ÷¶ÈµÄʱºò£¬¿´µ½±ðµÄactor½ÐdriverActor¡£
²»¹ýÕâÆªÎÄÕ»¹ÓдæÔÚµÄÒâÒå, AkkaºÍµ÷¶ÈÕâ¿é£¬ºÍÎÒÏÖÔÚÕýÔÚдµÄµÚÈýƪÒÔ¼°µÚËÄÆª¹ØÏµºÜÃÜÇС£
|