ÕâÒ»ÕÂÒª½²Spark Streaming£¬½²Ö®Ç°Ê×ÏȻعËÏÂËüµÄÓ÷¨£¬¾ßÌåÓ÷¨Çë²ÎÕÕ¡¶Spark
Streaming±à³ÌÖ¸ÄÏ¡·¡£
Example´úÂë·ÖÎö
val ssc = new StreamingContext(sparkConf, Seconds(1)); // »ñµÃÒ»¸öDStream¸ºÔðÁ¬½Ó ¼àÌý¶Ë¿Ú:µØÖ· val lines = ssc.socketTextStream(serverIP, serverPort); // ¶ÔÿһÐÐÊý¾ÝÖ´ÐÐSplit²Ù×÷ val words = lines.flatMap(_.split(" ")); // ͳ¼ÆwordµÄÊýÁ¿ val pairs = words.map(word => (word, 1)); val wordCounts = pairs.reduceByKey(_ + _); // Êä³ö½á¹û wordCounts.print(); ssc.start(); // ¿ªÊ¼ ssc.awaitTermination(); // ¼ÆËãÍê±ÏÍ˳ö |
1¡¢Ê×ÏÈʵÀý»¯Ò»¸öStreamingContext
2¡¢µ÷ÓÃStreamingContextµÄsocketTextStream
3¡¢¶Ô»ñµÃµÄDStream½øÐд¦Àí
4¡¢µ÷ÓÃStreamingContextÊÇstart·½·¨£¬È»ºóµÈ´ý
ÎÒÃÇ¿´StreamingContextµÄsocketTextStream·½·¨°É¡£
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } |
1¡¢StoageLevelÊÇStorageLevel.MEMORY_AND_DISK_SER_2
2¡¢Ê¹ÓÃSocketReceiverµÄbytesToLines°ÑÊäÈëÁ÷ת»»³É¿É±éÀúµÄÊý¾Ý
¼ÌÐø¿´socketStream·½·¨£¬ËüÖ±½ÓnewÁËÒ»¸ö
new SocketInputDStream[T](this, hostname, port, converter, storageLevel) |
¼ÌÐøÉîÈëÍÚ¾òSocketInputDStream£¬×·ÊöÒ»ÏÂËüµÄ¼Ì³Ð¹ØÏµ£¬SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream¡£
¾ßÌåʵÏÖReceiverInputDStreamµÄÀàÓкü¸¸ö£¬»ù±¾É϶¼ÊÇ´ÓÍøÂç¶ËÀ´Êý¾ÝµÄ¡£
ËüʵÏÖÁËReceiverInputDStreamµÄgetReceiver·½·¨£¬ÊµÀý»¯ÁËÒ»¸öSocketReceiverÀ´½ÓÊÕÊý¾Ý¡£
SocketReceiverµÄonStart·½·¨ÀïÃæµ÷ÓÃÁËreceive·½·¨£¬´¦Àí´úÂëÈçÏ£º
socket = new Socket(host, port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } |
1¡¢newÁËÒ»¸öSocketÀ´½ÓÊÕÊý¾Ý£¬ÓÃbytesToLines·½·¨°ÑInputStreamת»»³ÉÒ»ÐÐÒ»ÐеÄ×Ö·û´®¡£
2¡¢°ÑÿһÐÐÊý¾ÝÓÃstore·½·¨±£´æÆðÀ´£¬store·½·¨ÊÇ´ÓSocketReceiverµÄ¸¸ÀàReceiver¼Ì³Ð¶øÀ´£¬ÄÚ²¿ÊµÏÖÊÇ:
def store(dataItem: T) { executor.pushSingle(dataItem) } |
executorÊÇReceiverSupervisorÀàÐÍ£¬ReceiverµÄ²Ù×÷¶¼ÊÇÓÉËüÀ´´¦Àí¡£ÕâÀïÏȲ»Éî¾À£¬ºóÃæÎÒÃÇÔÙ˵Õâ¸öpushSingleµÄʵÏÖ¡£
µ½ÕâÀïÎÒÃÇÖªµÀlinesµÄÀàÐÍÊÇSocketInputDStream£¬È»ºó¶ÔËüÊÇÒ»¶ÙµÄת»»£¬flatMap¡¢map¡¢reduceByKey¡¢print£¬ÕâЩ·½·¨¶¼²»ÊÇRDDµÄÄÇÖÖ·½·¨£¬¶øÊÇDStream¶ÀÓеġ£
½²µ½ÉÏÃæÕ⼸¸ö·½·¨£¬ÎÒÃÇ¿ªÊ¼×ªÈëDStreamÁË£¬flatMap¡¢map¡¢reduceByKey¡¢print·½·¨¶¼Éæ¼°µ½DStreamµÄת»»£¬ÕâºÍRDDµÄת»»ÊÇÀàËÆµÄ¡£ÎÒÃǽ²Ò»ÏÂreduceByKeyºÍprint¡£
reduceByKey·½·¨ºÍRDDÒ»Ñù£¬µ÷ÓõÄcombineByKey·½·¨ÊµÏֵ쬲»Ò»ÑùµÄÊÇËüÖ±½ÓnewÁËÒ»¸öShuffledDStreamÁË£¬ÎÒÃǽÓ×Å¿´Ò»ÏÂËüµÄʵÏÖ°É¡£
override def compute(validTime: Time): Option[RDD[(K,C)]] = { parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } } |
ÔÚcompute½×¶Î£¬¶Ôͨ¹ýTime»ñµÃµÄrdd½øÐÐreduceByKey²Ù×÷¡£½ÓÏÂÀ´µÄprint·½·¨Ò²ÊÇÒ»¸öת»»£º
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() |
´òӡǰʮ¸ö£¬³¬¹ý10¸ö´òÓ¡"..."¡£ÐèҪעÒâregister·½·¨¡£
ssc.graph.addOutputStream(this) |
Ëü»á°Ñ´úÂë²åÈëµ½µ±Ç°µÄDStreamÌí¼Óµ½outputStreamsÀïÃæ£¬ºóÃæÊä³öµÄʱºòÈç¹ûûÓÐoutputStream¾Í²»»áÓÐÊä³ö£¬Õâ¸öÐèÒª¼ÇסŶ£¡
Æô¶¯¹ý³Ì·ÖÎö
ǰϷ½áÊøÖ®ºó£¬ssc.start() ¸ß³±¿ªÊ¼ÁË¡£ start·½·¨ºÜС£¬×îºËÐĵÄÒ»¾äÊÇJobSchedulerµÄstart·½·¨¡£ÎÒÃǵÃתµ½JobScheduler·½·¨ÉÏÃæÈ¥¡£
ÏÂÃæÊÇstart·½·¨µÄ´úÂ룺
def start(): Unit = synchronized { ¡¡¡¡// ½ÓÊܵ½JobSchedulerEvent¾Í´¦Àíʼþ eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobSchedulerEvent => processEvent(event) } }), "JobScheduler")
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
} |
1¡¢Æô¶¯ÁËÒ»¸öActorÀ´´¦ÀíJobSchedulerµÄJobStarted¡¢JobCompleted¡¢ErrorReportedʼþ¡£
2¡¢Æô¶¯StreamingListenerBus×÷Ϊ¼àÌýÆ÷¡£
3¡¢Æô¶¯ReceiverTracker¡£
4¡¢Æô¶¯JobGenerator¡£
ÎÒÃǽÓÏÂÀ´¿´¿´ReceiverTrackerµÄstart·½·¨¡£
def start() = synchronized {if (!receiverInputStreams.isEmpty) { actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker") receiverExecutor.start() } } |
1¡¢Ê×ÏÈÅжÏÁËÒ»ÏÂreceiverInputStreams²»ÄÜΪ¿Õ£¬ÄÇreceiverInputStreamsÊÇÔõôʱºòдÈëÖµµÄÄØ£¿´ð°¸ÔÚSocketInputDStreamµÄ¸¸ÀàInputDStreamµ±ÖУ¬µ±ÊµÀý»¯InputDStreamµÄʱºò»áÔÚDStreamGraphÀïÃæÌí¼ÓInputStream¡£
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { ssc.graph.addInputStream(this) //.... } |
2¡¢ÊµÀý»¯ReceiverTrackerActor£¬Ëü¸ºÔðRegisterReceiver£¨×¢²áReceiver£©¡¢AddBlock¡¢ReportError(±¨¸æ´íÎó)¡¢DeregisterReceiver£¨×¢ÏúReceiver£©µÈʼþµÄ´¦Àí¡£
3¡¢Æô¶¯receiverExecutor£¨Êµ¼ÊÀàÊÇReceiverLauncher£¬ÕâÃû×ÖÆðµÃ¡£¡££©£¬ËüÖ÷Òª¸ºÔðÆô¶¯Receiver£¬start·½·¨ÀïÃæµ÷ÓÃÁËstartReceivers·½·¨°É¡£
private def startReceivers() {
¡¡¡¡ // ¶ÔÓ¦×ÅÉÏÃæµÄÄǸöÀý×Ó£¬getReceiver·½·¨»ñµÃÊÇSocketReceiver
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
// ²é¿´ÊÇ·ñËùÓеÄreceivers¶¼ÓÐÓÅÏÈÑ¡Ôñ»úÆ÷£¬
Õâ¸öÐèÒªÖØÐ´ReceiverµÄpreferredLocation·½·¨£¬Ä¿Ç°Ö»ÓÐFlumeReceiverÖØÐ´ÁË
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// ´´½¨Ò»¸ö²¢ÐÐreceiver¼¯ºÏµÄRDD, °ÑËüÃÇ·ÖÉ¢µ½¸÷¸öworker½ÚµãÉÏ
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// ÔÚworker½ÚµãÉÏÆô¶¯ReceiverµÄ·½·¨£¬±éÀúËùÓÐReceiver£¬È»ºóÆô¶¯
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException("Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
// ÔËÐÐÕâ¸öÖØ¸´µÄ×÷ÒµÀ´È·±£ËùÓеÄslave¶¼ÒѾע²áÁË£¬±ÜÃâËùÓеÄreceivers¶¼µ½Ò»¸ö½ÚµãÉÏ
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// °Ñreceivers·Ö·¢³öÈ¥£¬Æô¶¯
ssc.sparkContext.runJob(tempRDD, startReceiver)
}
|
1¡¢±éÀúreceiverInputStreams»ñÈ¡ËùÓеÄReceiver¡£
2¡¢²é¿´ÕâЩReceiverÊÇ·ñÈ«¶¼ÓÐÓÅÏÈÑ¡Ôñ»úÆ÷¡£
3¡¢°ÑSparkContextµÄmakeRDD·½·¨°ÑËùÓÐReceiver°ü×°µ½ParallelCollectionRDDÀïÃæ£¬²¢ÐжÈÊÇReceiverµÄÊýÁ¿¡£
4¡¢·¢¸öСÈÎÎñ¸øÈ·±£ËùÓеÄslave½Úµã¶¼ÒѾע²áÁË£¨Õâ¸öСÈÎÎñÓеã¶ùĪÃûÆäÃ¸Ð¾õ¹Ö¹ÖµÄ£©¡£
5¡¢Ìá½»×÷Òµ£¬Æô¶¯ËùÓÐReceiver¡£
SparkдµÃʵÔÚÊÇÌ«ÇÉÃîÁË£¬¾ÓÈ»¿ÉÒÔ°ÑReceiver°ü×°ÔÚRDDÀïÃæ£¬µ±×öÊÇÊý¾ÝÀ´´¦Àí£¡
Æô¶¯ReceiverµÄʱºò£¬newÁËÒ»¸öReceiverSupervisorImpl£¬È»ºóµ÷µÄstart·½·¨£¬Ö÷Òª¸ÉÁËÕâôÈý¼þÊÂÇ飬´úÂë¾Í²»ÌùÁË¡£
1¡¢Æô¶¯BlockGenerator¡£
2¡¢µ÷ÓÃReceiverµÄOnStart·½·¨£¬¿ªÊ¼½ÓÊÜÊý¾Ý£¬²¢°ÑÊý¾ÝдÈëµ½ReceiverSupervisor¡£
3¡¢µ÷ÓÃonReceiverStart·½·¨£¬·¢ËÍRegisterReceiverÏûÏ¢¸ødriver±¨¸æ×Ô¼ºÆô¶¯ÁË¡£
±£´æ½ÓÊÕµ½µÄÊý¾Ý
ok£¬µ½ÁËÕâÀï£¬ÖØµãÂäµ½ÁËBlockGenerator¡£Ç°ÃæËµµ½SocketReceiver°Ñ½ÓÊܵ½µÄÊý¾Ýµ÷ÓÃReceiverSupervisorµÄpushSingle·½·¨±£´æ¡£
// ÕâÊÇReceiverSupervisorImplµÄ·½·¨
def pushSingle(data: Any) {
blockGenerator += (data)
}
// ÕâÊÇBlockGeneratorµÄ·½·¨
def += (data: Any): Unit = synchronized {
currentBuffer += data
}
|
ÎÒÃÇ¿´Ò»ÏÂËüµÄstart·½·¨°É¡£
def start() { blockIntervalTimer.start() blockPushingThread.start() } |
ËüÆô¶¯ÁËÒ»¸ö¶¨Ê±Æ÷RecurringTimerºÍÒ»¸öÏß³ÌÖ´ÐÐkeepPushingBlocks·½·¨¡£
ÏÈ¿´RecurringTimerµÄʵÏÖ£º
while (!stopped) { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period } |
ÿ¸ôÒ»¶Îʱ¼ä¾ÍÖ´ÐÐcallbackº¯Êý£¬callbackº¯ÊýÊÇnewµÄʱºò´«½øÀ´µÄ£¬ÊÇBlockGeneratorµÄupdateCurrentBuffer·½·¨¡£
private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) blocksForPushing.put(newBlock) } } catch {case t: Throwable => reportError("Error in block updating thread", t) } } |
ËünewÁËÒ»¸öBlock³öÀ´£¬È»ºóÌí¼Óµ½blocksForPushingÕâ¸öArrayBlockingQueue¶ÓÁе±ÖС£
Ìáµ½ÕâÀÓÐÁ½¸ö²ÎÊýÐèÒª´ó¼Ò×¢ÒâµÄ£º
spark.streaming.blockInterval ĬÈÏÖµÊÇ200 spark.streaming.blockQueueSize ĬÈÏÖµÊÇ10 |
ÕâÊÇÇ°ÃæÌáµ½µÄ¼ä¸ôʱ¼äºÍ¶ÓÁеij¤¶È£¬¼ä¸ôʱ¼äĬÈÏÊÇ200ºÁÃ룬¶ÓÁÐÊÇ×î¶àÄÜÈÝÄÉ10¸öBlock£¬¶àÁ˾ÍÒª×èÈûÁË¡£
ÎÒÃǽÓÏÂÀ´¿´Ò»ÏÂBlockGeneratorÁíÍâÆô¶¯µÄÄǸöÏß³ÌÖ´ÐеÄkeepPushingBlocks·½·¨µ½µ×ÔÚ¸Éʲô£¿
private def keepPushingBlocks() { ¡¡¡¡¡¡ while(!stopped) { Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } ¡¡¡¡¡¡// ...Í˳ö֮ǰ°ÑʣϵÄÒ²Êä³öÈ¥ÁË } |
ËüÔÚ°ÑblocksForPushingÖеÄblock²»Í£µÄÄóöÀ´£¬µ÷ÓÃpushBlock·½·¨£¬Õâ¸ö·½·¨ÊôÓÚÔÚʵÀý»¯BlockGeneratorµÄʱºò£¬´ÓReceiverSupervisorImpl´«½øÀ´µÄBlockGeneratorListenerµÄ¡£
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}, streamId, env.conf)
|
1¡¢reportError£¬Í¨¹ýactorÏòdriver·¢ËÍ´íÎ󱨸æÏûÏ¢ReportError¡£
2¡¢µ÷ÓÃpushArrayBuffer±£´æÊý¾Ý¡£
ÏÂÃæÊÇpushArrayBuffer·½·¨£º
def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId] ) { val blockId = optionalBlockId.getOrElse(nextBlockId) val time = System.currentTimeMillis blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true) reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata) } |
1¡¢°ÑBlock±£´æµ½BlockManagerµ±ÖУ¬ÐòÁл¯·½Ê½ÎªÖ®Ç°Ìáµ½µÄStorageLevel.MEMORY_AND_DISK_SER_2£¨ÄÚ´æ²»¹»¾ÍдÈëµ½Ó²ÅÌ£¬²¢ÇÒÔÚ2¸ö½ÚµãÉϱ£´æµÄ·½Ê½£©¡£
2¡¢µ÷ÓÃreportPushedBlock¸ødriver·¢ËÍAddBlockÏûÏ¢£¬±¨¸æÐÂÌí¼ÓµÄBlock£¬ReceiverTrackerÊÕµ½ÏûÏ¢Ö®ºó¸üÐÂÄÚ²¿µÄreceivedBlockInfoÓ³Éä¹ØÏµ¡£
´¦Àí½ÓÊÕµ½µÄÊý¾Ý
Ç°ÃæÖ»½²ÁËÊý¾ÝµÄ½ÓÊպͱ£´æ£¬ÄÇÊý¾ÝÊÇÔõô´¦ÀíµÄÄØ£¿
֮ǰһֱ½²ReceiverTracker£¬¶øºöÂÔÁË֮ǰµÄJobSchedulerµÄstart·½·¨ÀïÃæ×îºóÆô¶¯µÄJobGenerator¡£
def start(): Unit = synchronized { eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobGeneratorEvent => processEvent(event) } }), "JobGenerator") if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } } |
1¡¢Æô¶¯Ò»¸öactor´¦ÀíJobGeneratorEventʼþ¡£
2¡¢Èç¹ûÊÇÒѾÓÐCheckPointÁË£¬¾Í½Ó×ÅÉϴεļǼ½øÐд¦Àí£¬·ñÔò¾ÍÊǵÚÒ»´ÎÆô¶¯¡£
ÎÒÃÇÏÈ¿´startFirstTime°É£¬CheckPointÒÔºóÔÙ˵°É£¬Óеã¶ùС¸´ÔÓ¡£
private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) } |
1¡¢timer.getStartTime¼ÆËã³öÀ´ÏÂÒ»¸öÖÜÆÚµÄµ½ÆÚʱ¼ä£¬¼ÆË㹫ʽ£º(math.floor(clock.currentTime.toDouble
/ period) + 1).toLong * period£¬ÒÔµ±Ç°µÄʱ¼ä/³ýÒÔ¼ä¸ôʱ¼ä£¬ÔÙÓÃmath.floorÇó³öËüµÄÉÏÒ»¸öÕûÊý£¨¼´ÉÏÒ»¸öÖÜÆÚµÄµ½ÆÚʱ¼äµã£©£¬¼ÓÉÏ1£¬ÔÙ³ËÒÔÖÜÆÚ¾ÍµÈÓÚÏÂÒ»¸öÖÜÆÚµÄµ½ÆÚʱ¼ä¡£
2¡¢Æô¶¯DStreamGraph£¬Æô¶¯Ê±¼ä=startTime - graph.batchDuration¡£
3¡¢Æô¶¯Timer£¬ÎÒÃÇ¿´¿´ËüµÄ¶¨Ò壺
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator") |
µ½ÕâÀï¾ÍÇå³þÁË£¬DStreamGraphµÄ¼ä¸ôʱ¼ä¾ÍÊÇtimerµÄ¼ä¸ôʱ¼ä£¬Æô¶¯Ê±¼äÒªÉèÖóɱÈTimerÔçÒ»¸öʱ¼ä¼ä¸ô£¬ÔÒòÔÙÂýÂý̽¾¿¡£
¿ÉÒÔ¿´³öÀ´Ã¿¸ôÒ»¶Îʱ¼ä£¬Timer¸øeventActor·¢ËÍGenerateJobsÏûÏ¢£¬ÎÒÃÇÖ±½ÓÈ¥¿´ËüµÄ´¦Àí·½·¨generateJobs°É£¬ÖмäºöÂÔÁËÒ»²½£¬´ó¼Ò×Ô¼º¿´¡£
private def processEvent(event: JobGeneratorEvent) { event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time) => doCheckpoint(time) case ClearCheckpointData(time) => clearCheckpointData(time) } } |
ÏÂÃæÊÇgenerateJobs·½·¨¡£
private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => val streamId = stream.id val receivedBlockInfo = stream.getReceivedBlockInfo(time) (streamId, receivedBlockInfo) }.toMap jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) } |
1¡¢DStreamGraphÉú³Éjobs¡£
2¡¢´ÓstreamÄÇÀï»ñÈ¡½ÓÊÕµ½µÄBlockÐÅÏ¢¡£
3¡¢µ÷ÓÃsubmitJobSet·½·¨Ìá½»×÷Òµ¡£
4¡¢Ìá½»Íê×÷ÒµÖ®ºó£¬×öÒ»¸öCheckPoint¡£
ÏÈ¿´DStreamGraphÊÇÔõôÉú³ÉµÄjobs¡£
def generateJobs(time: Time): Seq[Job] = { val jobs = this.synchronized { outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } jobs } |
outputStreamsÔÚÕâ¸öÀý×ÓÀïÃæÊÇprintÕâ¸ö·½·¨ÀïÃæÌí¼ÓµÄ£¬Õâ¸öÔÚÇ°ÃæËµÁË£¬ÎÒÃǼÌÐø¿´DStreamµÄgenerateJob¡£
private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } } |
1¡¢µ÷ÓÃgetOrCompute·½·¨»ñµÃRDD
2¡¢newÁËÒ»¸ö·½·¨È¥Ìá½»Õâ¸ö×÷Òµ£¬È±Ê²Ã´¶¼²»×ö
ÎªÊ²Ã´ÄØ£¿ÕâÊÇÖ±½ÓÌø×ªµÄ´íÎ󣬺Ǻǣ¬ÒòΪÕâ¸öoutputStreamÊÇprint·½·¨·µ»ØµÄ£¬ËüÓ¦¸ÃÊÇForEachDStream£¬ËùÒÔÎÒÃÇÓ¦¸Ã¿´µÄÊÇËüÀïÃæµÄgenerateJob·½·¨¡£
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } |
ÕâÀïÇë´ó¼ÒǧÍòҪעÒ⣬²»ÒªÔÚÕâ¿é±»¿¨×¡ÁË¡£
ÎÒÃÇ¿´¿´ËüÕâ¸öRDDÊÇÔõô³öÀ´µÄ°É¡£
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// Õâ¸öRDDÒѾ±»Éú³É¹ýÁË£¬Ö±½ÓÓþÍÊÇÁË
case Some(oldRDD) => Some(oldRDD)
// »¹Ã»Éú³É¹ý£¬¾Íµ÷ÓÃcompteº¯ÊýÉú³ÉÒ»¸ö
case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ // ÉèÖñ£´æµÄ¼¶±ð
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
}
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ // Èç¹ûÏÖÔÚÐèÒª£¬¾Í×öCheckPoint
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
}
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ // Ìí¼Óµ½generatedRDDsÀïÃæÈ¥£¬¿ÉÒÔÔÙ´ÎÀûÓÃ
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
None
}
} else {
None
}
}
}
}
|
´ÓÉÏÃæµÄ·½·¨¿ÉÒÔ¿´³öÀ´ËüÊÇͨ¹ýÿ¸öDStream×Ô¼ºÊµÏÖµÄcomputeº¯ÊýµÃ³öÀ´µÄRDD¡£ÎÒÃÇÕÒµ½SocketInputDStream£¬Ã»ÓÐcomputeº¯Êý£¬ÔÚ¸¸ÀàReceiverInputDStreamÀïÃæÕÒµ½ÁË¡£
override def compute(validTime: Time): Option[RDD[T]] = { // Èç¹û³öÏÖÁËʱ¼ä±ÈstartTimeÔçµÄ»°£¬¾Í·µ»ØÒ»¸ö¿ÕµÄRDD£¬ÒòΪÕâ¸öºÜ¿ÉÄÜÊÇmaster¹ÒÁËÖ®ºóµÄ´íÎó»Ö¸´ if (validTime >= graph.startTime) { val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) receivedBlockInfo(validTime) = blockInfo val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } |
ͨ¹ýDStreamµÄid°ÑreceiverTrackerµ±ÖаѽÓÊÕµ½µÄblockÐÅϢȫ²¿ÄóöÀ´£¬¼Ç¼µ½ReceiverInputDStream×ÔÉíµÄreceivedBlockInfoÕâ¸öHashMapÀïÃæ£¬¾Í°ÑRDD·µ»ØÁË£¬RDDÀïÃæÊµ¼Ê°üº¬µÄÊÇBlockµÄidµÄ¼¯ºÏ¡£
ÏÖÔÚÎÒÃǾͿÉÒԻص½Ö®Ç°JobGeneratorµÄgenerateJobs·½·¨£¬ÎÒÃǾÍÇå³þËüÕâ¾äÊÇÌá½»µÄʲôÁË¡£
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) |
JobSetÊǼǼJobµÄÍê³ÉÇé¿öµÄ£¬Ö±½Ó¿´submitJobSet·½·¨°É¡£
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
} else {
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
}
}
|
±éÀújobSetÀïÃæµÄËùÓÐjobs£¬Í¨¹ýjobExecutorÕâ¸öÏ̳߳ØÌá½»¡£ÎÒÃÇ¿´Ò»ÏÂJobHandler¾ÍÖªµÀÁË¡£
private class JobHandler(job: Job) extends Runnable { def run() { eventActor ! JobStarted(job) job.run() eventActor ! JobCompleted(job) } } |
1¡¢Í¨ÖªeventActor´¦ÀíJobStartedʼþ¡£
2¡¢ÔËÐÐjob¡£
3¡¢Í¨ÖªeventActor´¦ÀíJobCompletedʼþ¡£
ÕâÀïµÄÖØµãÊÇjob.run£¬Ê¼þ´¦ÀíÖ»ÊǸüÐÂÏà¹ØµÄjobÐÅÏ¢¡£
def run() { result = Try(func()) } |
ÔÚ±éÀúBlockRDDµÄʱºò£¬ÔÚcomputeº¯Êý»ñÈ¡¸ÃBlock£¨ÏêϸÇë¿´BlockRDD£©£¬È»ºó¶ÔÕâ¸öRDDµÄ½á¹û½øÐдòÓ¡¡£
µ½ÕâÀï¾ÍËã½áÊøÁË£¬×îºóÀ´¸ö×ܽá°É£¬ÕâÒ»ÕÂÖ»Êǹý³Ì·ÖÎö:
1¡¢¿ÉÒÔÓжà¸öÊäÈ룬ÎÒÃÇ¿ÉÒÔͨ¹ýStreamingContext¶¨Òå¶à¸öÊäÈ룬±ÈÈçÎÒÃǼàÌý¶à¸ö£¨host£¬ip£©£¬¿ÉÒÔ¸øËüÃǶ¨Òå¸÷×ԵĴ¦ÀíÂß¼ºÍÊä³ö£¬Êä³ö·½Ê½²»½öÏÞÓÚprint·½·¨£¬»¹¿ÉÒÔÓбðµÄ·½·¨£¬saveAsTextFilesºÍsaveAsObjectFiles¡£Õâ¿éµÄÉè¼ÆÊÇÖ§³Ö¹²ÏíStreamingContextµÄ¡£
2¡¢StreamingContextÆô¶¯ÁËJobScheduler£¬JobSchedulerÆô¶¯ReceiverTrackerºÍJobGenerator¡£
3¡¢ReceiverTrackerÊÇͨ¹ý°ÑReceiver°ü×°³ÉRDDµÄ·½Ê½£¬·¢Ë͵½Executor¶ËÔËÐÐÆðÀ´µÄ£¬ReceiverÆðÀ´Ö®ºóÏòReceiverTracker·¢ËÍRegisterReceiverÏûÏ¢¡£
3¡¢Receiver°Ñ½ÓÊÕµ½µÄÊý¾Ý£¬Í¨¹ýReceiverSupervisor±£´æ¡£
4¡¢ReceiverSupervisorImpl°ÑÊý¾ÝдÈëµ½BlockGeneratorµÄÒ»¸öArrayBufferµ±ÖС£
5¡¢BlockGeneratorÄÚ²¿Ã¿¸öÒ»¶Îʱ¼ä£¨Ä¬ÈÏÊÇ200ºÁÃ룩¾Í°ÑÕâ¸öArrayBuffer¹¹Ôì³ÉBlockÌí¼Óµ½blocksForPushingµ±ÖС£
6¡¢BlockGeneratorµÄÁíÍâÒ»ÌõÏß³ÌÔò²»¶ÏµÄ°Ñ¼ÓÈëµ½blocksForPushingµ±ÖеÄBlockдÈëµ½BlockManagerµ±ÖУ¬²¢ÏòReceiverTracker·¢ËÍAddBlockÏûÏ¢¡£
7¡¢JobGeneratorÄÚ²¿Óиö¶¨Ê±Æ÷£¬¶¨ÆÚÉú³ÉJob£¬Í¨¹ýDStreamµÄid£¬°ÑReceiverTracker½ÓÊÕµ½µÄBlockÐÅÏ¢´ÓBlockManagerÉÏץȡÏÂÀ´½øÐд¦Àí£¬Õâ¸ö¼ä¸ôʱ¼äÊÇÎÒÃÇÔÚʵÀý»¯StreamingContextµÄʱºò´«½øÈ¥µÄÄǸöʱ¼ä£¬ÔÚÕâ¸öÀý×ÓÀïÃæÊÇSeconds(1)¡£
|