Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
SparkÔ´ÂëϵÁУ¨°Ë£©Spark StreamingʵÀý·ÖÎö
 
×÷Õß á¯Óñº£µÄ²©¿Í£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-11-12
  3037  次浏览      29
 

ÕâÒ»ÕÂÒª½²Spark Streaming£¬½²Ö®Ç°Ê×ÏȻعËÏÂËüµÄÓ÷¨£¬¾ßÌåÓ÷¨Çë²ÎÕÕ¡¶Spark Streaming±à³ÌÖ¸ÄÏ¡·¡£

Example´úÂë·ÖÎö

val ssc = new StreamingContext(sparkConf, Seconds(1));
// »ñµÃÒ»¸öDStream¸ºÔðÁ¬½Ó ¼àÌý¶Ë¿Ú:µØÖ·
val lines = ssc.socketTextStream(serverIP, serverPort);
// ¶ÔÿһÐÐÊý¾ÝÖ´ÐÐSplit²Ù×÷
val words = lines.flatMap(_.split(" "));
// ͳ¼ÆwordµÄÊýÁ¿
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// Êä³ö½á¹û
wordCounts.print();
ssc.start(); // ¿ªÊ¼
ssc.awaitTermination(); // ¼ÆËãÍê±ÏÍ˳ö

1¡¢Ê×ÏÈʵÀý»¯Ò»¸öStreamingContext

2¡¢µ÷ÓÃStreamingContextµÄsocketTextStream

3¡¢¶Ô»ñµÃµÄDStream½øÐд¦Àí

4¡¢µ÷ÓÃStreamingContextÊÇstart·½·¨£¬È»ºóµÈ´ý

ÎÒÃÇ¿´StreamingContextµÄsocketTextStream·½·¨°É¡£

def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

1¡¢StoageLevelÊÇStorageLevel.MEMORY_AND_DISK_SER_2

2¡¢Ê¹ÓÃSocketReceiverµÄbytesToLines°ÑÊäÈëÁ÷ת»»³É¿É±éÀúµÄÊý¾Ý

¼ÌÐø¿´socketStream·½·¨£¬ËüÖ±½ÓnewÁËÒ»¸ö

new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

¼ÌÐøÉîÈëÍÚ¾òSocketInputDStream£¬×·ÊöÒ»ÏÂËüµÄ¼Ì³Ð¹ØÏµ£¬SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream¡£

¾ßÌåʵÏÖReceiverInputDStreamµÄÀàÓкü¸¸ö£¬»ù±¾É϶¼ÊÇ´ÓÍøÂç¶ËÀ´Êý¾ÝµÄ¡£

ËüʵÏÖÁËReceiverInputDStreamµÄgetReceiver·½·¨£¬ÊµÀý»¯ÁËÒ»¸öSocketReceiverÀ´½ÓÊÕÊý¾Ý¡£

SocketReceiverµÄonStart·½·¨ÀïÃæµ÷ÓÃÁËreceive·½·¨£¬´¦Àí´úÂëÈçÏ£º

 socket = new Socket(host, port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}

1¡¢newÁËÒ»¸öSocketÀ´½ÓÊÕÊý¾Ý£¬ÓÃbytesToLines·½·¨°ÑInputStreamת»»³ÉÒ»ÐÐÒ»ÐеÄ×Ö·û´®¡£

2¡¢°ÑÿһÐÐÊý¾ÝÓÃstore·½·¨±£´æÆðÀ´£¬store·½·¨ÊÇ´ÓSocketReceiverµÄ¸¸ÀàReceiver¼Ì³Ð¶øÀ´£¬ÄÚ²¿ÊµÏÖÊÇ:

def store(dataItem: T) {
executor.pushSingle(dataItem)
}

executorÊÇReceiverSupervisorÀàÐÍ£¬ReceiverµÄ²Ù×÷¶¼ÊÇÓÉËüÀ´´¦Àí¡£ÕâÀïÏȲ»Éî¾À£¬ºóÃæÎÒÃÇÔÙ˵Õâ¸öpushSingleµÄʵÏÖ¡£

µ½ÕâÀïÎÒÃÇÖªµÀlinesµÄÀàÐÍÊÇSocketInputDStream£¬È»ºó¶ÔËüÊÇÒ»¶ÙµÄת»»£¬flatMap¡¢map¡¢reduceByKey¡¢print£¬ÕâЩ·½·¨¶¼²»ÊÇRDDµÄÄÇÖÖ·½·¨£¬¶øÊÇDStream¶ÀÓеġ£

½²µ½ÉÏÃæÕ⼸¸ö·½·¨£¬ÎÒÃÇ¿ªÊ¼×ªÈëDStreamÁË£¬flatMap¡¢map¡¢reduceByKey¡¢print·½·¨¶¼Éæ¼°µ½DStreamµÄת»»£¬ÕâºÍRDDµÄת»»ÊÇÀàËÆµÄ¡£ÎÒÃǽ²Ò»ÏÂreduceByKeyºÍprint¡£

reduceByKey·½·¨ºÍRDDÒ»Ñù£¬µ÷ÓõÄcombineByKey·½·¨ÊµÏֵ쬲»Ò»ÑùµÄÊÇËüÖ±½ÓnewÁËÒ»¸öShuffledDStreamÁË£¬ÎÒÃǽÓ×Å¿´Ò»ÏÂËüµÄʵÏÖ°É¡£

override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}

ÔÚcompute½×¶Î£¬¶Ôͨ¹ýTime»ñµÃµÄrdd½øÐÐreduceByKey²Ù×÷¡£½ÓÏÂÀ´µÄprint·½·¨Ò²ÊÇÒ»¸öת»»£º

new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()

´òӡǰʮ¸ö£¬³¬¹ý10¸ö´òÓ¡"..."¡£ÐèҪעÒâregister·½·¨¡£

ssc.graph.addOutputStream(this)

Ëü»á°Ñ´úÂë²åÈëµ½µ±Ç°µÄDStreamÌí¼Óµ½outputStreamsÀïÃæ£¬ºóÃæÊä³öµÄʱºòÈç¹ûûÓÐoutputStream¾Í²»»áÓÐÊä³ö£¬Õâ¸öÐèÒª¼ÇסŶ£¡

Æô¶¯¹ý³Ì·ÖÎö

ǰϷ½áÊøÖ®ºó£¬ssc.start() ¸ß³±¿ªÊ¼ÁË¡£ start·½·¨ºÜС£¬×îºËÐĵÄÒ»¾äÊÇJobSchedulerµÄstart·½·¨¡£ÎÒÃǵÃתµ½JobScheduler·½·¨ÉÏÃæÈ¥¡£

ÏÂÃæÊÇstart·½·¨µÄ´úÂ룺

def start(): Unit = synchronized {
¡¡¡¡// ½ÓÊܵ½JobSchedulerEvent¾Í´¦Àíʼþ
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")

listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
}

1¡¢Æô¶¯ÁËÒ»¸öActorÀ´´¦ÀíJobSchedulerµÄJobStarted¡¢JobCompleted¡¢ErrorReportedʼþ¡£

2¡¢Æô¶¯StreamingListenerBus×÷Ϊ¼àÌýÆ÷¡£

3¡¢Æô¶¯ReceiverTracker¡£

4¡¢Æô¶¯JobGenerator¡£

ÎÒÃǽÓÏÂÀ´¿´¿´ReceiverTrackerµÄstart·½·¨¡£

def start() = synchronized {if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
receiverExecutor.start()
}
}

1¡¢Ê×ÏÈÅжÏÁËÒ»ÏÂreceiverInputStreams²»ÄÜΪ¿Õ£¬ÄÇreceiverInputStreamsÊÇÔõôʱºòдÈëÖµµÄÄØ£¿´ð°¸ÔÚSocketInputDStreamµÄ¸¸ÀàInputDStreamµ±ÖУ¬µ±ÊµÀý»¯InputDStreamµÄʱºò»áÔÚDStreamGraphÀïÃæÌí¼ÓInputStream¡£

abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
ssc.graph.addInputStream(this)
//....
}

2¡¢ÊµÀý»¯ReceiverTrackerActor£¬Ëü¸ºÔðRegisterReceiver£¨×¢²áReceiver£©¡¢AddBlock¡¢ReportError(±¨¸æ´íÎó)¡¢DeregisterReceiver£¨×¢ÏúReceiver£©µÈʼþµÄ´¦Àí¡£

3¡¢Æô¶¯receiverExecutor£¨Êµ¼ÊÀàÊÇReceiverLauncher£¬ÕâÃû×ÖÆðµÃ¡£¡££©£¬ËüÖ÷Òª¸ºÔðÆô¶¯Receiver£¬start·½·¨ÀïÃæµ÷ÓÃÁËstartReceivers·½·¨°É¡£

private def startReceivers() {
¡¡¡¡   // ¶ÔÓ¦×ÅÉÏÃæµÄÄǸöÀý×Ó£¬getReceiver·½·¨»ñµÃÊÇSocketReceiver
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })

      // ²é¿´ÊÇ·ñËùÓеÄreceivers¶¼ÓÐÓÅÏÈÑ¡Ôñ»úÆ÷£¬
Õâ¸öÐèÒªÖØÐ´ReceiverµÄpreferredLocation·½·¨£¬Ä¿Ç°Ö»ÓÐFlumeReceiverÖØÐ´ÁË
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

      // ´´½¨Ò»¸ö²¢ÐÐreceiver¼¯ºÏµÄRDD, °ÑËüÃÇ·ÖÉ¢µ½¸÷¸öworker½ÚµãÉÏ
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }

      // ÔÚworker½ÚµãÉÏÆô¶¯ReceiverµÄ·½·¨£¬±éÀúËùÓÐReceiver£¬È»ºóÆô¶¯
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException("Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
        executor.start()
        executor.awaitTermination()
      }
      // ÔËÐÐÕâ¸öÖØ¸´µÄ×÷ÒµÀ´È·±£ËùÓеÄslave¶¼ÒѾ­×¢²áÁË£¬±ÜÃâËùÓеÄreceivers¶¼µ½Ò»¸ö½ÚµãÉÏ
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }

      // °Ñreceivers·Ö·¢³öÈ¥£¬Æô¶¯
      ssc.sparkContext.runJob(tempRDD, startReceiver)
    }

1¡¢±éÀúreceiverInputStreams»ñÈ¡ËùÓеÄReceiver¡£

2¡¢²é¿´ÕâЩReceiverÊÇ·ñÈ«¶¼ÓÐÓÅÏÈÑ¡Ôñ»úÆ÷¡£

3¡¢°ÑSparkContextµÄmakeRDD·½·¨°ÑËùÓÐReceiver°ü×°µ½ParallelCollectionRDDÀïÃæ£¬²¢ÐжÈÊÇReceiverµÄÊýÁ¿¡£

4¡¢·¢¸öСÈÎÎñ¸øÈ·±£ËùÓеÄslave½Úµã¶¼ÒѾ­×¢²áÁË£¨Õâ¸öСÈÎÎñÓеã¶ùĪÃûÆäÃ¸Ð¾õ¹Ö¹ÖµÄ£©¡£

5¡¢Ìá½»×÷Òµ£¬Æô¶¯ËùÓÐReceiver¡£

SparkдµÃʵÔÚÊÇÌ«ÇÉÃîÁË£¬¾ÓÈ»¿ÉÒÔ°ÑReceiver°ü×°ÔÚRDDÀïÃæ£¬µ±×öÊÇÊý¾ÝÀ´´¦Àí£¡

Æô¶¯ReceiverµÄʱºò£¬newÁËÒ»¸öReceiverSupervisorImpl£¬È»ºóµ÷µÄstart·½·¨£¬Ö÷Òª¸ÉÁËÕâôÈý¼þÊÂÇ飬´úÂë¾Í²»ÌùÁË¡£

1¡¢Æô¶¯BlockGenerator¡£

2¡¢µ÷ÓÃReceiverµÄOnStart·½·¨£¬¿ªÊ¼½ÓÊÜÊý¾Ý£¬²¢°ÑÊý¾ÝдÈëµ½ReceiverSupervisor¡£

3¡¢µ÷ÓÃonReceiverStart·½·¨£¬·¢ËÍRegisterReceiverÏûÏ¢¸ødriver±¨¸æ×Ô¼ºÆô¶¯ÁË¡£

±£´æ½ÓÊÕµ½µÄÊý¾Ý

ok£¬µ½ÁËÕâÀï£¬ÖØµãÂäµ½ÁËBlockGenerator¡£Ç°ÃæËµµ½SocketReceiver°Ñ½ÓÊܵ½µÄÊý¾Ýµ÷ÓÃReceiverSupervisorµÄpushSingle·½·¨±£´æ¡£

// ÕâÊÇReceiverSupervisorImplµÄ·½·¨
  def pushSingle(data: Any) {
    blockGenerator += (data)
  }
  // ÕâÊÇBlockGeneratorµÄ·½·¨
   def += (data: Any): Unit = synchronized {
    currentBuffer += data
  }

ÎÒÃÇ¿´Ò»ÏÂËüµÄstart·½·¨°É¡£

def start() {
blockIntervalTimer.start()
blockPushingThread.start()
}

ËüÆô¶¯ÁËÒ»¸ö¶¨Ê±Æ÷RecurringTimerºÍÒ»¸öÏß³ÌÖ´ÐÐkeepPushingBlocks·½·¨¡£

ÏÈ¿´RecurringTimerµÄʵÏÖ£º

while (!stopped) {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
}

ÿ¸ôÒ»¶Îʱ¼ä¾ÍÖ´ÐÐcallbackº¯Êý£¬callbackº¯ÊýÊÇnewµÄʱºò´«½øÀ´µÄ£¬ÊÇBlockGeneratorµÄupdateCurrentBuffer·½·¨¡£

private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.put(newBlock)
}
} catch {case t: Throwable =>
reportError("Error in block updating thread", t)
}
}

ËünewÁËÒ»¸öBlock³öÀ´£¬È»ºóÌí¼Óµ½blocksForPushingÕâ¸öArrayBlockingQueue¶ÓÁе±ÖС£

Ìáµ½ÕâÀÓÐÁ½¸ö²ÎÊýÐèÒª´ó¼Ò×¢ÒâµÄ£º

spark.streaming.blockInterval   ĬÈÏÖµÊÇ200
spark.streaming.blockQueueSize ĬÈÏÖµÊÇ10

ÕâÊÇÇ°ÃæÌáµ½µÄ¼ä¸ôʱ¼äºÍ¶ÓÁеij¤¶È£¬¼ä¸ôʱ¼äĬÈÏÊÇ200ºÁÃ룬¶ÓÁÐÊÇ×î¶àÄÜÈÝÄÉ10¸öBlock£¬¶àÁ˾ÍÒª×èÈûÁË¡£

ÎÒÃǽÓÏÂÀ´¿´Ò»ÏÂBlockGeneratorÁíÍâÆô¶¯µÄÄǸöÏß³ÌÖ´ÐеÄkeepPushingBlocks·½·¨µ½µ×ÔÚ¸Éʲô£¿

private def keepPushingBlocks() {
¡¡¡¡¡¡ while(!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
¡¡¡¡¡¡// ...Í˳ö֮ǰ°ÑʣϵÄÒ²Êä³öÈ¥ÁË
}

ËüÔÚ°ÑblocksForPushingÖеÄblock²»Í£µÄÄóöÀ´£¬µ÷ÓÃpushBlock·½·¨£¬Õâ¸ö·½·¨ÊôÓÚÔÚʵÀý»¯BlockGeneratorµÄʱºò£¬´ÓReceiverSupervisorImpl´«½øÀ´µÄBlockGeneratorListenerµÄ¡£

private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }, streamId, env.conf)

1¡¢reportError£¬Í¨¹ýactorÏòdriver·¢ËÍ´íÎ󱨸æÏûÏ¢ReportError¡£

2¡¢µ÷ÓÃpushArrayBuffer±£´æÊý¾Ý¡£

ÏÂÃæÊÇpushArrayBuffer·½·¨£º

def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}

1¡¢°ÑBlock±£´æµ½BlockManagerµ±ÖУ¬ÐòÁл¯·½Ê½ÎªÖ®Ç°Ìáµ½µÄStorageLevel.MEMORY_AND_DISK_SER_2£¨ÄÚ´æ²»¹»¾ÍдÈëµ½Ó²ÅÌ£¬²¢ÇÒÔÚ2¸ö½ÚµãÉϱ£´æµÄ·½Ê½£©¡£

2¡¢µ÷ÓÃreportPushedBlock¸ødriver·¢ËÍAddBlockÏûÏ¢£¬±¨¸æÐÂÌí¼ÓµÄBlock£¬ReceiverTrackerÊÕµ½ÏûÏ¢Ö®ºó¸üÐÂÄÚ²¿µÄreceivedBlockInfoÓ³Éä¹ØÏµ¡£

´¦Àí½ÓÊÕµ½µÄÊý¾Ý

Ç°ÃæÖ»½²ÁËÊý¾ÝµÄ½ÓÊպͱ£´æ£¬ÄÇÊý¾ÝÊÇÔõô´¦ÀíµÄÄØ£¿

֮ǰһֱ½²ReceiverTracker£¬¶øºöÂÔÁË֮ǰµÄJobSchedulerµÄstart·½·¨ÀïÃæ×îºóÆô¶¯µÄJobGenerator¡£

def start(): Unit = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}

1¡¢Æô¶¯Ò»¸öactor´¦ÀíJobGeneratorEventʼþ¡£

2¡¢Èç¹ûÊÇÒѾ­ÓÐCheckPointÁË£¬¾Í½Ó×ÅÉϴεļǼ½øÐд¦Àí£¬·ñÔò¾ÍÊǵÚÒ»´ÎÆô¶¯¡£

ÎÒÃÇÏÈ¿´startFirstTime°É£¬CheckPointÒÔºóÔÙ˵°É£¬Óеã¶ùС¸´ÔÓ¡£

private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
}

1¡¢timer.getStartTime¼ÆËã³öÀ´ÏÂÒ»¸öÖÜÆÚµÄµ½ÆÚʱ¼ä£¬¼ÆË㹫ʽ£º(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period£¬ÒÔµ±Ç°µÄʱ¼ä/³ýÒÔ¼ä¸ôʱ¼ä£¬ÔÙÓÃmath.floorÇó³öËüµÄÉÏÒ»¸öÕûÊý£¨¼´ÉÏÒ»¸öÖÜÆÚµÄµ½ÆÚʱ¼äµã£©£¬¼ÓÉÏ1£¬ÔÙ³ËÒÔÖÜÆÚ¾ÍµÈÓÚÏÂÒ»¸öÖÜÆÚµÄµ½ÆÚʱ¼ä¡£

2¡¢Æô¶¯DStreamGraph£¬Æô¶¯Ê±¼ä=startTime - graph.batchDuration¡£

3¡¢Æô¶¯Timer£¬ÎÒÃÇ¿´¿´ËüµÄ¶¨Ò壺

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

µ½ÕâÀï¾ÍÇå³þÁË£¬DStreamGraphµÄ¼ä¸ôʱ¼ä¾ÍÊÇtimerµÄ¼ä¸ôʱ¼ä£¬Æô¶¯Ê±¼äÒªÉèÖóɱÈTimerÔçÒ»¸öʱ¼ä¼ä¸ô£¬Ô­ÒòÔÙÂýÂý̽¾¿¡£

¿ÉÒÔ¿´³öÀ´Ã¿¸ôÒ»¶Îʱ¼ä£¬Timer¸øeventActor·¢ËÍGenerateJobsÏûÏ¢£¬ÎÒÃÇÖ±½ÓÈ¥¿´ËüµÄ´¦Àí·½·¨generateJobs°É£¬ÖмäºöÂÔÁËÒ»²½£¬´ó¼Ò×Ô¼º¿´¡£

private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}

ÏÂÃæÊÇgenerateJobs·½·¨¡£

private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}

1¡¢DStreamGraphÉú³Éjobs¡£

2¡¢´ÓstreamÄÇÀï»ñÈ¡½ÓÊÕµ½µÄBlockÐÅÏ¢¡£

3¡¢µ÷ÓÃsubmitJobSet·½·¨Ìá½»×÷Òµ¡£

4¡¢Ìá½»Íê×÷ÒµÖ®ºó£¬×öÒ»¸öCheckPoint¡£

ÏÈ¿´DStreamGraphÊÇÔõôÉú³ÉµÄjobs¡£

 def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
jobs
}

outputStreamsÔÚÕâ¸öÀý×ÓÀïÃæÊÇprintÕâ¸ö·½·¨ÀïÃæÌí¼ÓµÄ£¬Õâ¸öÔÚÇ°ÃæËµÁË£¬ÎÒÃǼÌÐø¿´DStreamµÄgenerateJob¡£

private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}

1¡¢µ÷ÓÃgetOrCompute·½·¨»ñµÃRDD

2¡¢newÁËÒ»¸ö·½·¨È¥Ìá½»Õâ¸ö×÷Òµ£¬È±Ê²Ã´¶¼²»×ö

ÎªÊ²Ã´ÄØ£¿ÕâÊÇÖ±½ÓÌø×ªµÄ´íÎ󣬺Ǻǣ¬ÒòΪÕâ¸öoutputStreamÊÇprint·½·¨·µ»ØµÄ£¬ËüÓ¦¸ÃÊÇForEachDStream£¬ËùÒÔÎÒÃÇÓ¦¸Ã¿´µÄÊÇËüÀïÃæµÄgenerateJob·½·¨¡£

override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}

ÕâÀïÇë´ó¼ÒǧÍòҪעÒ⣬²»ÒªÔÚÕâ¿é±»¿¨×¡ÁË¡£

ÎÒÃÇ¿´¿´ËüÕâ¸öRDDÊÇÔõô³öÀ´µÄ°É¡£

private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
    // If this DStream was not initialized (i.e., zeroTime not set), then do it
    // If RDD was already generated, then retrieve it from HashMap
    generatedRDDs.get(time) match {

      // Õâ¸öRDDÒѾ­±»Éú³É¹ýÁË£¬Ö±½ÓÓþÍÊÇÁË
      case Some(oldRDD) => Some(oldRDD)

      // »¹Ã»Éú³É¹ý£¬¾Íµ÷ÓÃcompteº¯ÊýÉú³ÉÒ»¸ö
      case None => {
        if (isTimeValid(time)) {
          compute(time) match {
            case Some(newRDD) =>
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ // ÉèÖñ£´æµÄ¼¶±ð
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
              }
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ // Èç¹ûÏÖÔÚÐèÒª£¬¾Í×öCheckPoint
              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
              }
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ // Ìí¼Óµ½generatedRDDsÀïÃæÈ¥£¬¿ÉÒÔÔÙ´ÎÀûÓÃ
              generatedRDDs.put(time, newRDD)
              Some(newRDD)
            case None =>
              None
          }
        } else {
          None
        }
      }
    }
  }

´ÓÉÏÃæµÄ·½·¨¿ÉÒÔ¿´³öÀ´ËüÊÇͨ¹ýÿ¸öDStream×Ô¼ºÊµÏÖµÄcomputeº¯ÊýµÃ³öÀ´µÄRDD¡£ÎÒÃÇÕÒµ½SocketInputDStream£¬Ã»ÓÐcomputeº¯Êý£¬ÔÚ¸¸ÀàReceiverInputDStreamÀïÃæÕÒµ½ÁË¡£

override def compute(validTime: Time): Option[RDD[T]] = {
// Èç¹û³öÏÖÁËʱ¼ä±ÈstartTimeÔçµÄ»°£¬¾Í·µ»ØÒ»¸ö¿ÕµÄRDD£¬ÒòΪÕâ¸öºÜ¿ÉÄÜÊÇmaster¹ÒÁËÖ®ºóµÄ´íÎó»Ö¸´
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}

ͨ¹ýDStreamµÄid°ÑreceiverTrackerµ±ÖаѽÓÊÕµ½µÄblockÐÅϢȫ²¿ÄóöÀ´£¬¼Ç¼µ½ReceiverInputDStream×ÔÉíµÄreceivedBlockInfoÕâ¸öHashMapÀïÃæ£¬¾Í°ÑRDD·µ»ØÁË£¬RDDÀïÃæÊµ¼Ê°üº¬µÄÊÇBlockµÄidµÄ¼¯ºÏ¡£

ÏÖÔÚÎÒÃǾͿÉÒԻص½Ö®Ç°JobGeneratorµÄgenerateJobs·½·¨£¬ÎÒÃǾÍÇå³þËüÕâ¾äÊÇÌá½»µÄʲôÁË¡£

jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))

JobSetÊǼǼJobµÄÍê³ÉÇé¿öµÄ£¬Ö±½Ó¿´submitJobSet·½·¨°É¡£

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
    } else {
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    }
  }

±éÀújobSetÀïÃæµÄËùÓÐjobs£¬Í¨¹ýjobExecutorÕâ¸öÏ̳߳ØÌá½»¡£ÎÒÃÇ¿´Ò»ÏÂJobHandler¾ÍÖªµÀÁË¡£

private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}

1¡¢Í¨ÖªeventActor´¦ÀíJobStartedʼþ¡£

2¡¢ÔËÐÐjob¡£

3¡¢Í¨ÖªeventActor´¦ÀíJobCompletedʼþ¡£

ÕâÀïµÄÖØµãÊÇjob.run£¬Ê¼þ´¦ÀíÖ»ÊǸüÐÂÏà¹ØµÄjobÐÅÏ¢¡£

def run() {
result = Try(func())
}

ÔÚ±éÀúBlockRDDµÄʱºò£¬ÔÚcomputeº¯Êý»ñÈ¡¸ÃBlock£¨ÏêϸÇë¿´BlockRDD£©£¬È»ºó¶ÔÕâ¸öRDDµÄ½á¹û½øÐдòÓ¡¡£

µ½ÕâÀï¾ÍËã½áÊøÁË£¬×îºóÀ´¸ö×ܽá°É£¬ÕâÒ»ÕÂÖ»Êǹý³Ì·ÖÎö:

1¡¢¿ÉÒÔÓжà¸öÊäÈ룬ÎÒÃÇ¿ÉÒÔͨ¹ýStreamingContext¶¨Òå¶à¸öÊäÈ룬±ÈÈçÎÒÃǼàÌý¶à¸ö£¨host£¬ip£©£¬¿ÉÒÔ¸øËüÃǶ¨Òå¸÷×ԵĴ¦ÀíÂß¼­ºÍÊä³ö£¬Êä³ö·½Ê½²»½öÏÞÓÚprint·½·¨£¬»¹¿ÉÒÔÓбðµÄ·½·¨£¬saveAsTextFilesºÍsaveAsObjectFiles¡£Õâ¿éµÄÉè¼ÆÊÇÖ§³Ö¹²ÏíStreamingContextµÄ¡£

2¡¢StreamingContextÆô¶¯ÁËJobScheduler£¬JobSchedulerÆô¶¯ReceiverTrackerºÍJobGenerator¡£

3¡¢ReceiverTrackerÊÇͨ¹ý°ÑReceiver°ü×°³ÉRDDµÄ·½Ê½£¬·¢Ë͵½Executor¶ËÔËÐÐÆðÀ´µÄ£¬ReceiverÆðÀ´Ö®ºóÏòReceiverTracker·¢ËÍRegisterReceiverÏûÏ¢¡£

3¡¢Receiver°Ñ½ÓÊÕµ½µÄÊý¾Ý£¬Í¨¹ýReceiverSupervisor±£´æ¡£

4¡¢ReceiverSupervisorImpl°ÑÊý¾ÝдÈëµ½BlockGeneratorµÄÒ»¸öArrayBufferµ±ÖС£

5¡¢BlockGeneratorÄÚ²¿Ã¿¸öÒ»¶Îʱ¼ä£¨Ä¬ÈÏÊÇ200ºÁÃ룩¾Í°ÑÕâ¸öArrayBuffer¹¹Ôì³ÉBlockÌí¼Óµ½blocksForPushingµ±ÖС£

6¡¢BlockGeneratorµÄÁíÍâÒ»ÌõÏß³ÌÔò²»¶ÏµÄ°Ñ¼ÓÈëµ½blocksForPushingµ±ÖеÄBlockдÈëµ½BlockManagerµ±ÖУ¬²¢ÏòReceiverTracker·¢ËÍAddBlockÏûÏ¢¡£

7¡¢JobGeneratorÄÚ²¿Óиö¶¨Ê±Æ÷£¬¶¨ÆÚÉú³ÉJob£¬Í¨¹ýDStreamµÄid£¬°ÑReceiverTracker½ÓÊÕµ½µÄBlockÐÅÏ¢´ÓBlockManagerÉÏץȡÏÂÀ´½øÐд¦Àí£¬Õâ¸ö¼ä¸ôʱ¼äÊÇÎÒÃÇÔÚʵÀý»¯StreamingContextµÄʱºò´«½øÈ¥µÄÄǸöʱ¼ä£¬ÔÚÕâ¸öÀý×ÓÀïÃæÊÇSeconds(1)¡£

   
3037 ´Îä¯ÀÀ       29
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí