Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
SparkÔ´ÂëϵÁУ¨Ò»£©spark-submitÌá½»×÷Òµ¹ý³Ì
 
×÷Õß á¯Óñº£µÄ²©¿Í£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-11-03
  4142  次浏览      27
 

ǰÑÔ

ÕÛÌÚÁ˺ܾã¬ÖÕÓÚ¿ªÊ¼Ñ§Ï°SparkµÄÔ´ÂëÁË£¬µÚһƪÎÒ´òËã½²Ò»ÏÂSpark×÷ÒµµÄÌá½»¹ý³Ì¡£

Õâ¸öÊÇSparkµÄAppÔËÐÐͼ£¬Ëüͨ¹ýÒ»¸öDriverÀ´ºÍ¼¯ÈºÍ¨ÐÅ£¬¼¯Èº¸ºÔð×÷ÒµµÄ·ÖÅä¡£½ñÌìÎÒÒª½²µÄÊÇÈçºÎ´´½¨Õâ¸öDriver ProgramµÄ¹ý³Ì¡£

×÷ÒµÌá½»·½·¨ÒÔ¼°²ÎÊý

ÎÒÃÇÏÈ¿´Ò»ÏÂÓÃSpark SubmitÌá½»µÄ·½·¨°É£¬ÏÂÃæÊÇ´Ó¹Ù·½ÉÏÃæÕª³­µÄÄÚÈÝ¡£

# Run on a Spark standalone cluster
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \

Õâ¸öÊÇÌá½»µ½standalone¼¯ÈºµÄ·½Ê½£¬´ò¿ªspark-submitÕâÎļþ£¬ÎÒÃǻᷢÏÖËü×îºóÊǵ÷ÓÃÁËorg.apache.spark.deploy.SparkSubmitÕâ¸öÀà¡£

ÎÒÃÇÖ±½Ó½øÈ¥¿´¾ÍÐÐÁË£¬mainº¯Êý¾Í¼¸ÐдúÂ룬̫½ÚÊ¡ÁË¡£

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}

ÎÒÃÇÖ÷Òª¿´¿´createLaunchEnv·½·¨¾Í¿ÉÒÔÁË£¬launchÊÇ·´Éäµ÷ÓÃmainClass£¬¾«»ªÈ«ÔÚcreateLaunchEnvÀïÃæÁË¡£

ÔÚÀïÃæÎÒ·¢ÏÖһЩÓÐÓõÄÐÅÏ¢£¬¿ÉÄÜÔÚ¹Ù·½ÎĵµÉÏÃæ¶¼Ã»Óе쬷¢³öÀ´´ó¼Ò³ò³ò¡£Ç°Ãæ²»´ø--µÄ¿ÉÒÔÔÚspark-defaults.confÀïÃæÉèÖ㬴ø--µÄÖ±½ÓÔÚÌá½»µÄʱºòÖ¸¶¨£¬¾ßÌ庬Òå´ó¼ÒÒ»¿´¾Í¶®¡£

val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
)

Driver³ÌÐòµÄ²¿ÊðģʽÓÐÁ½ÖÖ£¬clientºÍcluster£¬Ä¬ÈÏÊÇclient¡£clientµÄ»°Ä¬ÈϾÍÊÇÖ±½ÓÔÚ±¾µØÔËÐÐÁËDriver³ÌÐòÁË£¬clusterģʽ»¹»á¶µÒ»È¦°Ñ×÷Òµ·¢µ½¼¯ÈºÉÏÃæÈ¥ÔËÐС£

Ö¸¶¨²¿ÊðģʽÐèÒªÓòÎÊý--deploy-modeÀ´Ö¸¶¨£¬»òÕßÔÚ»·¾³±äÁ¿µ±ÖÐÌí¼ÓDEPLOY_MODE±äÁ¿À´Ö¸¶¨¡£

ÏÂÃæ½²µÄÊÇclusterµÄ²¿Êð·½Ê½£¬¶µÒ»È¦µÄÕâÖÖÇé¿ö¡£

yarnģʽµÄ»°mainClassÊÇorg.apache.spark.deploy.yarn.Client£¬standaloneµÄmainClassÊÇorg.apache.spark.deploy.Client¡£

Õâ´ÎÎÒÃǽ²org.apache.spark.deploy.Client£¬yarnµÄ»°µ¥¶ÀÕÒÒ»Õ³öÀ´µ¥¶À½²£¬Ä¿Ç°³¬¸ç»¹ÊÇÍÆ¼öʹÓÃstandaloneµÄ·½Ê½²¿Êðspark£¬¾ßÌåÔ­Òò²»Ï꣬¾Ý˵ÊÇÒòΪ×ÊÔ´µ÷¶È·½ÃæµÄÎÊÌâ¡£

˵¸ö¿ì½Ý¼ü°É£¬Ctrl+Shift+N£¬È»ºóÊäÈëClient¾ÍÄÜÕÒµ½Õâ¸öÀ࣬ÕâÊÇIDEAµÄ¿ì½Ý¼ü£¬Ï൱ºÃʹ¡£

ÎÒÃÇÖ±½ÓÕÒµ½ËüµÄmainº¯Êý£¬·¢ÏÖÁËËü¾ÓȻʹÓÃÁËAkka¿ò¼Ü£¬ÎÒ°Ù¶ÈÁËһϣ¬±»ËüÕð¾ªÁË¡£

Akka

ÔÚmainº¯ÊýÀïÃæ£¬Ö÷Òª´úÂë¾ÍÕâôÈýÐС£

//´´½¨Ò»¸öActorSystem
val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0,
¡¡¡¡conf, new SecurityManager(conf))
//Ö´ÐÐClientActorµÄpreStart·½·¨ºÍreceive·½·¨
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
//µÈ´ýÔËÐнáÊø
actorSystem.awaitTermination()

¿´ÁËÕâÀïÕæµÄÓеã¶ùã°¡£¬ÕâÊÇÉ¶ÍæÒâ¶ù£¬²»¶®µÄÅóÓÑÃÇ£¬Çëµã»÷ÕâÀïAkka¡£ÏÂÃæÊÇËü¹Ù·½·Å³öÀ´µÄÀý×Ó£º

//¶¨ÒåÒ»¸öcase classÓÃÀ´´«µÝ²ÎÊý
case class Greeting(who: String)
//¶¨ÒåActor£¬±È½ÏÖØÒªµÄÒ»¸ö·½·¨ÊÇreceive·½·¨£¬ÓÃÀ´½ÓÊÕÐÅÏ¢µÄ
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ? log.info("Hello " + who)
}
}
//´´½¨Ò»¸öActorSystem
val system = ActorSystem("MySystem")
//¸øActorSystemÉèÖÃActor
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
//Ïògreeter·¢ËÍÐÅÏ¢£¬ÓÃGreetingÀ´´«µÝ
greeter ! Greeting("Charlie Parker")

¼òÖ±ÊÇÎÞ±ÈÇ¿´ó°¡£¬¾ÍÕâô¼¸ÐдúÂë¾Í¸ã¶¨ÁË£¬½ÓÏÂÀ´¿´Äã»á¸ü¼ÓÕ𾪵ġ£

ÎÒÃǻص½ClientÀ൱ÖУ¬ÕÒµ½ClientActor£¬ËüÓÐÁ½¸ö·½·¨£¬ÊÇ֮ǰ˵µÄpreStartºÍreceive·½·¨£¬preStart·½·¨ÓÃÓÚÁ¬½ÓmasterÌá½»×÷ÒµÇëÇó£¬receive·½·¨ÓÃÓÚ½ÓÊÕ´Ómaster·µ»ØµÄ·´À¡ÐÅÏ¢¡£

ÎÒÃÇÏÈ¿´preStart·½·¨°É¡£

override def preStart() = {
// ÕâÀïÐèÒª°ÑmasterµÄµØÖ·×ª»»³ÉakkaµÄµØÖ·£¬È»ºóͨ¹ýÕâ¸öakkaµØÖ·»ñµÃÖ¸¶¨µÄactor
// ËüµÄ¸ñʽÊÇ"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
// °Ñ×ÔÉíÉèÖóÉÔ¶³ÌÉúÃüÖÜÆÚµÄʼþ
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

driverArgs.cmd match {
case "launch" =>
// ´Ë´¦Ê¡ÂÔ100¸ö×Ö
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
// ´Ë´¦Ê¡ÂÔ100¸ö×Ö
// Ïòmaster·¢ËÍÌá½»DriverµÄÇëÇ󣬰ÑdriverDescription´«¹ýÈ¥£¬RequestSubmitDriverÇ°ÃæËµ¹ýÁË£¬ÊǸöcase class
masterActor ! RequestSubmitDriver(driverDescription)

case "kill" =>
val driverId = driverArgs.driverId
val killFuture = masterActor ! RequestKillDriver(driverId)
}
}

´ÓÉÏÃæµÄ´úÂë¿´µÃ³öÀ´£¬ËüÐèÒªÉèÖÃmasterµÄÁ¬½ÓµØÖ·£¬×îºóÌá½»ÁËÒ»¸öRequestSubmitDriverµÄÐÅÏ¢¡£ÔÚreceive·½·¨ÀïÃæ£¬¾ÍÊǵȴý½ÓÊÜ»ØÓ¦ÁË£¬ÓÐÁ½¸öResponse·Ö±ð¶ÔÓ¦×ÅÕâÀïµÄlaunchºÍkill¡£

ÏßË÷Ã²ËÆµ½ÕâÀï¾Í¶ÏÁË£¬ÄÇÏÂÒ»²½ÔÚÄÄÀïÁËÄØ£¿µ±È»ÊÇÔÚMasterÀïÃæÀ²£¬Ôõô֪µÀµÄ£¬²ÂµÄ£¬¹þ¹þ¡£

MasterÒ²ÊǼ̳ÐÁËActor£¬ÔÚËüµÄmainº¯ÊýÀïÃæÕÒµ½ÁËÒÔÏ´úÂ룺

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, 
¡¡¡¡securityManager = securityMgr)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]

ºÍÇ°ÃæµÄactor»ù±¾Ò»Ö£¬¶àÁËactor.askÕâ¾ä»°£¬²éÁËһϹÙÍøµÄÎĵµ£¬Õâ¾ä»°µÄÒâ˼µÄ·¢ËÍÏûÏ¢£¬²¢ÇÒ½ÓÊÜÒ»¸öFuture×÷Ϊresponse£¬ºÍÇ°ÃæµÄactor £¡ messageµÄÇø±ð¾ÍÊÇËü»¹½ÓÊÜ·µ»ØÖµ¡£

¾ßÌåµÄAkkaµÄÓ÷¨£¬´ó¼Ò»¹ÊDzÎÕÕ¹ÙÍø°É£¬AkkaȷʵÈçËü¹ÙÍøËùÑÔµÄÄÇÑù×Ó£¬ÊÇÒ»¸ö¼òµ¥¡¢Ç¿´ó¡¢²¢Ðеķֲ¼Ê½¿ò¼Ü¡£

С½á£º

AkkaµÄʹÓÃȷʵ¼òµ¥£¬¶Ì¶ÌµÄ¼¸ÐдúÂë¼´¿ÌÍê³ÉÒ»¸öͨÐŹ¦ÄÜ£¬±ÈSocket¼òµ¥ºÜ¶à¡£µ«ÊÇËüÒ²ÌÓ²»ÍÑÎÒÃdz£ËµµÄÄÇЩ¶«Î÷£¬ÇëÇó¡¢½ÓÊÕÇëÇó¡¢´«µÝµÄÏûÏ¢¡¢×¢²áµÄµØÖ·ºÍ¶Ë¿ÚÕâЩ¸ÅÄî¡£

µ÷¶Èschedule

ÎÒÃǽÓÏÂÀ´²éÕÒMasterµÄreceive·½·¨°É£¬MasterÊÇ×÷Ϊ½ÓÊÕ·½µÄ£¬¶ø²»ÊÇÖ÷¶¯ÇëÇó£¬ÕâµãºÍhadoopÊÇÒ»Öµġ£

case RequestSubmitDriver(description) => {
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
// µ÷¶È
schedule()
// ¸æËßclient£¬Ìá½»³É¹¦ÁË£¬°Ñdriver.id¸æËßËü
sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
}

ÕâÀïÎÒÃÇÖ÷Òª¿´schedule·½·¨¾Í¿ÉÒÔÁË£¬ËüÊÇÖ´Ðе÷¶ÈµÄ·½·¨¡£

private def schedule() {
if (state != RecoveryState.ALIVE) { return }

// Ê×Ïȵ÷¶ÈDriver³ÌÐò£¬´ÓworkersÀïÃæËæ»ú³éһЩ³öÀ´
val shuffledWorkers = Random.shuffle(workers)
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
// ÅжÏÄÚ´æºÍcpu¹»²»¹»£¬¹»µÄ¾ÍÖ´ÐÐÁ˹þ
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}

// ÕâÀïÊǰ´ÕÕÏȽøÏȳöµÄ£¬spreadOutAppsÊÇÓÉspark.deploy.spreadOut²ÎÊýÀ´¾ö¶¨µÄ£¬Ä¬ÈÏÊÇtrue
if (spreadOutApps) {
// ±éÀúÒ»ÏÂapp
for (app <- waitingApps if app.coresLeft > 0) {
// canUseÀïÃæÅжÏÁËworkerµÄÄÚ´æÊÇ·ñ¹»Ó㬲¢ÇÒ¸ÃworkerÊÇ·ñÒѾ­°üº¬Á˸ÃappµÄExecutor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable)
// ¼Ç¼ÿ¸ö½ÚµãµÄºËÐÄÊý
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
// ±éÀúÖ±µ½·ÖÅä½áÊø
while (toAssign > 0) {
// ´Ó0¿ªÊ¼±éÀú¿ÉÓõÄwork£¬Èç¹û¿ÉÓõÄcpu¼õÈ¥ÒѾ­·ÖÅäµÄ>0,¾Í¿ÉÒÔ·ÖÅ䏸Ëü
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
// Õâ¸öλÖõÄworkµÄ¿É·ÖÅäµÄcpuÊý+1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// ¸ø¸Õ²Å±ê¼ÇµÄworker·ÖÅäÈÎÎñ
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// ÕâÖÖ·½Ê½ºÍÉÏÃæµÄ·½Ê½µÄÇø±ðÊÇ£¬ÕâÖÖ·½Ê½¾¡¿ÉÄÜÓÃÉÙÁ¿µÄ½ÚµãÀ´Íê³ÉÕâ¸öÈÎÎñ
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
// ÅжÏÌõ¼þÊÇworkerµÄÄÚ´æ±ÈappÐèÒªµÄÄÚ´æ¶à
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}

ËüµÄµ÷¶ÈÆ÷ÊÇÕâÑùµÄ£¬Ïȵ÷¶ÈDriver³ÌÐò£¬È»ºóÔÙµ÷¶ÈApp£¬µ÷¶ÈAppµÄ·½Ê½ÊÇ´Ó¸÷¸öworkerµÄÀïÃæºÍApp½øÐÐÆ¥Å䣬¿´ÐèÒª·ÖÅä¶àÉÙ¸öcpu¡£

ÄÇÎÒÃǽÓÏÂÀ´¿´Á½¸ö·½·¨launchDriverºÍlaunchExecutor¼´¿É¡£

def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}

¸øworker·¢ËÍÁËÒ»¸öLaunchDriverµÄÏûÏ¢£¬ÏÂÃæÔÚ¿´launchExecutorµÄ·½·¨¡£

def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}

ËüÒª×öµÄÊÂÇé¶àÒ»µã£¬³ýÁ˸øworker·¢ËÍLaunchExecutorÖ¸ÁîÍ⣬»¹ÐèÒª¸ødriver·¢ËÍExecutorAddedµÄÏûÏ¢£¬ËµÄãµÄÈÎÎñÒѾ­ÓÐÈ˸ÉÁË¡£

ÔÚ¼ÌÐøWorker½²Ö®Ç°£¬ÎÒÃÇÏÈ¿´¿´ËüÊÇÔõôע²á½øÀ´µÄ£¬Ã¿¸öWorkerÆô¶¯Ö®ºó£¬»á×Ô¶¯È¥ÇëÇóMasterÈ¥×¢²á×Ô¼º£¬¾ßÌåÎÒÃÇ¿ÉÒÔ¿´receiveµÄ·½·¨ÀïÃæµÄRegisterWorkerÕâÒ»¶Î£¬ËüÐèÒªÉϱ¨×Ô¼ºµÄÄÚ´æ¡¢Cpu¡¢µØÖ·¡¢¶Ë¿ÚµÈÐÅÏ¢£¬×¢²á³É¹¦Ö®ºó·µ»ØRegisteredWorkerÐÅÏ¢¸øËü£¬ËµÒѾ­×¢²á³É¹¦ÁË¡£

WorkerÖ´ÐÐ

ͬÑùµÄ£¬ÎÒÃǵ½WorkerÀïÃæÔÚreceive·½·¨ÕÒLaunchDriverºÍLaunchExecutor¾Í¿ÉÒÔÕÒµ½ÎÒÃÇÒªµÄ¶«Î÷¡£

case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()

coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}

¿´Ò»ÏÂstart·½·¨°É£¬start·½·¨ÀïÃæ£¬ÆäʵÊÇnew Thread().start()£¬run·½·¨ÀïÃæÊÇͨ¹ý´«¹ýÀ´µÄDriverDescription¹¹ÔìµÄÒ»¸öÃüÁ¶ª¸øProcessBuilderÈ¥Ö´ÐÐÃüÁ½áÊøÖ®ºóµ÷Óá£

worker £¡DriverStateChanged֪ͨworker£¬workerÔÙͨ¹ýmaster ! DriverStateChanged֪ͨmaster£¬ÊͷŵôworkerµÄcpuºÍÄÚ´æ¡£

ͬÀí£¬LaunchExecutorÖ´ÐÐÍê±ÏÁË£¬Í¨¹ýworker ! ExecutorStateChanged֪ͨworker£¬È»ºóworkerͨ¹ýmaster ! ExecutorStateChanged֪ͨmaster£¬ÊͷŵôworkerµÄcpuºÍÄÚ´æ¡£

ÏÂÃæÎÒÃÇÔÙÊáÀíÒ»ÏÂÕâ¸ö¹ý³Ì£¬Ö»°üÀ¨Driver×¢²á£¬DriverÔËÐÐÖ®ºóµÄ¹ý³ÌÔÚÖ®ºóµÄÎÄÕÂÔÙ˵£¬±È½Ï¸´ÔÓ¡£

1¡¢Clientͨ¹ý»ñµÃUrlµØÖ·»ñµÃActorSelection£¨masterµÄactorÒýÓã©,È»ºóͨ¹ýActorSelection¸øMaster·¢ËÍ×¢²áDriverÇëÇó£¨RequestSubmitDriver£©

2¡¢Master½ÓÊÕµ½ÇëÇóÖ®ºó¾Í¿ªÊ¼µ÷¶ÈÁË£¬´ÓworkersÁбíÀïÃæÕÒ³ö¿ÉÒÔÓõÄWorker

3¡¢Í¨¹ýWorkerµÄactorÒýÓÃActorRef¸ø¿ÉÓõÄWorker·¢ËÍÆô¶¯DriverÇëÇó£¨LaunchDriver£©

4¡¢µ÷¶ÈÍê±ÏÖ®ºó£¬¸øClient»Ø¸´×¢²á³É¹¦ÏûÏ¢(SubmitDriverResponse)

5¡¢Worker½ÓÊÕµ½LaunchDriverÇëÇóÖ®ºó£¬Í¨¹ý´«¹ýÀ´µÄDriverDescriptionµÄÐÅÏ¢¹¹Ôì³öÃüÁîÀ´£¬Í¨¹ýProcessBuilderÖ´ÐÐ

6¡¢ProcessBuilderÖ´ÐÐÍêÃüÁîÖ®ºó£¬Í¨¹ýDriverStateChangedͨ¹ýWorker

7¡¢Worker×îºó°ÑDriverStateChanged»ã±¨¸øMaster

ºó¼Ç£ºÌý³¬¸ç˵£¬org.apache.spark.deploy.ClientÕâ¸öÀà¿ìÒª±»É¾³ýÁË£¬²»ÖªµÀclusterµÄÕâÖÖģʽÊDz»ÊÇÒ²±»·ÅÆúÁË£¬¹Ù·½¸ø³öÀ´µÄÀý×ÓÍÆ¼öµÄÊÇclientģʽ->Ö±½ÓÔËÐгÌÐò¡£ÄѹÖÔÚ×÷Òµµ÷¶ÈµÄʱºò£¬¿´µ½±ðµÄactor½ÐdriverActor¡£

²»¹ýÕâÆªÎÄÕ»¹ÓдæÔÚµÄÒâÒå, AkkaºÍµ÷¶ÈÕâ¿é£¬ºÍÎÒÏÖÔÚÕýÔÚдµÄµÚÈýƪÒÔ¼°µÚËÄÆª¹ØÏµºÜÃÜÇС£

   
4142 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí