Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
sparkÁ÷Êý¾Ý´¦Àí£ºSpark StreamingµÄʹÓÃ
 
À´Ô´£º²©¿Í ·¢²¼ÓÚ£º  2016-7-26
  5208  次浏览      27
 

¸ÅÊö

Spark StreamingÊÇSparkºËÐÄAPIµÄÀ©Õ¹£¬ÓÃÓÚ¿ÉÉìËõ¡¢¸ßÍÌÍÂÁ¿¡¢¿ÉÈÝ´íµØ´¦ÀíÔÚÏßÁ÷Êý¾Ý¡£Spark Streaming¿ÉÒԴӺܶàÊý¾ÝÔ´»ñÈ¡Êý¾Ý£¬±ÈÈ磺Kafka¡¢Flume¡¢Twitter¡¢ZeroMQ¡¢Kinesis»òTCPÁ¬½ÓµÈ£¬²¢¿ÉÒÔÓúܶà¸ß²ãËã×Ó(map/reduce/join/windowµÈ)À´·½±ãµØ´¦ÀíÕâЩÊý¾Ý¡£×îºó´¦Àí¹ýµÄÊý¾Ý»¹¿ÉÒÔÍÆË͵½Îļþϵͳ¡¢Êý¾Ý¿âºÍÔÚÏß¼à¿ØÒ³ÃæµÈ¡£Êµ¼ÊÉÏ£¬ÄãÒ²¿ÉÒÔÔÚÊý¾ÝÁ÷ÉÏʹÓÃSparkµÄ»úÆ÷ѧϰºÍͼ¼ÆËãËã·¨¡£

Spark StreamingÄÚ²¿¹¤×÷»úÖÆ¸ÅͼÈçÏÂËùʾ¡£Spark Streaming½ÓÊÕÔÚÏßÊý¾ÝÁ÷²¢½«Æä»®·Ö³ÉÅú(batch)£¬È»ºóͨ¹ýSparkÒýÇæ´¦Àí²¢×îÖյõ½ÓÉÒ»ÅúÒ»ÅúÊý¾Ý¹¹³ÉµÄ½á¹ûÁ÷¡£ Spark StreamingÁ÷³Ì¸Åͼ

Spark Streaming½«Á÷Êý¾Ý³éÏóΪÀëÉ¢»¯Á÷(discretized stream)£¬¼´DStream¡£DStream¿ÉÒÔ´ÓÊäÈëÊý¾ÝÁ÷´´½¨Ò²¿ÉÒÔ´ÓÆäËûµÄDStreamת»»¶øÀ´¡£DStreamÔÚÄÚ²¿±»±íʾΪһ¸öÁ¬ÐøµÄRDDÐòÁС£

¿ìËÙʾÀý

Ê×ÏÈÒÔÒ»¸ö¼òµ¥µÄʾÀý¿ªÊ¼£ºÓÃSpark Streaming¶Ô´ÓTCPÁ¬½ÓÖнÓÊÕµÄÎı¾½øÐе¥´Ê¼ÆÊý¡£

/**
* ¹¦ÄÜ£ºÓÃspark streamingʵÏÖµÄÕë¶ÔÁ÷ʽÊý¾Ý½øÐе¥´Ê¼ÆÊýµÄ³ÌÐò¡£
* ¸Ã³ÌÐòÖ»ÊǶÔÊý¾ÝÁ÷ÖеÄÿһÅúÊý¾Ý½øÐе¥¶ÀµÄ¼ÆÊý£¬¶øÃ»ÓнøÐÐÔöÁ¿¼ÆÊý¡£
* »·¾³£ºspark 1.6.1, scala 2.10.4
*/

// ÒýÈëÏà¹ØÀà¿â
import org.apache.spark._
import org.apache.spark.streaming._

object NetworkWordCount {
def main(args: Array[String]) {
// Spark Streaming³ÌÐòÒÔStreamingContextΪÆðµã£¬ÆäÄÚ²¿Î¬³ÖÁËÒ»¸öSparkContextµÄʵÀý¡£
// ÕâÀïÎÒÃÇ´´½¨Ò»¸ö´øÓÐÁ½¸ö±¾µØÏ̵߳ÄStreamingContext£¬²¢ÉèÖÃÅú´¦Àí¼ä¸ôΪ1Ãë¡£
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// ÔÚÒ»¸öSparkÓ¦ÓÃÖÐĬÈÏÖ»ÔÊÐíÓÐÒ»¸öSparkContext£¬Ä¬ÈϵØspark-shellÒѾ­ÎªÎÒÃÇ´´½¨ºÃÁË
// SparkContext£¬ÃûΪsc¡£Òò´ËÔÚspark-shellÖÐÓ¦¸ÃÒÔÏÂÊö·½Ê½´´½¨StreamingContext£¬ÒÔ
// ±ÜÃâ´´½¨Ôٴδ´½¨SparkContext¶øÒýÆð´íÎó£º
// val ssc = new StreamingContext(sc, Seconds(1))

// ´´½¨Ò»¸ö´ÓTCPÁ¬½Ó»ñÈ¡Á÷Êý¾ÝµÄDStream£¬ÆäÿÌõ¼Ç¼ÊÇÒ»ÐÐÎı¾
val lines = ssc.socketTextStream("localhost", 9999)

// ¶ÔDStream½øÐÐת»»£¬×îÖյõ½¼ÆËã½á¹û
val res = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

// ´òÓ¡¸ÃDStreamÖÐÿ¸öRDDÖеÄǰʮ¸öÔªËØ
res.print()

// Ö´ÐÐÍêÉÏÃæ´úÂ룬Spark Streaming²¢Ã»ÓÐÕæÕý¿ªÊ¼´¦ÀíÊý¾Ý£¬¶øÖ»ÊǼǼÐèÔÚÊý¾ÝÉÏÖ´ÐеIJÙ×÷¡£
// µ±ÎÒÃÇÉèÖúÃËùÓÐÐèÒªÔÚÊý¾ÝÉÏÖ´ÐеIJÙ×÷ÒÔºó£¬ÎÒÃǾͿÉÒÔ¿ªÊ¼ÕæÕýµØ´¦ÀíÊý¾ÝÁË¡£ÈçÏ£º
ssc.start() // ¿ªÊ¼¼ÆËã
ssc.awaitTermination() // µÈ´ý¼ÆËãÖÕÖ¹
}
}

ΪÁ˲âÊÔ³ÌÐò£¬ÎÒÃǵÃÓÐTCPÊý¾ÝÔ´×÷ΪÊäÈ룬Õâ¿ÉÒÔʹÓÃNetcat(Ò»°ãlinuxϵͳÖж¼ÓУ¬Èç¹ûÊÇwindowsϵͳ£¬ÔòÍÆ¼öÄãʹÓÃNcat£¬NcatÊÇÒ»¸ö¸Ä½ø°æµÄNetcat)¡£ÈçÏÂʹÓÃNetcat¼àÌýÖ¸¶¨±¾µØ¶Ë¿Ú£º

nc -lk 9999

Èç¹ûÊÇʹÓÃNcat£¬Ôò¶ÔÓ¦ÃüÁîÈçÏ£º

ncat -lk 9999

ÔÚIntelliJ IDEA»òEclipseÖпÉÒÔ±¾µØÔËÐвâÊÔÉÏÊöSpark Streaming³ÌÐò£¬¸Ã³ÌÐò»áÁ¬½Óµ½Netcat(»òNcat)¼àÌýµÄ¶Ë¿Ú£¬Äã¿ÉÒÔÔÚÔËÐÐNetcat(»òNcat)µÄÖÕ¶ËÖÐÊäÈë¶«¶«²¢»Ø³µ£¬È»ºó¾Í¿ÉÒÔ¿´µ½¸ÃSpark Streaming³ÌÐò»áÂíÉÏÊä³ö´¦Àí½á¹û£¬²¢ÇÒÕâ¸ö´¦ÀíÊDz»Í£µÄ¡¢Á÷ʽµÄ¡£

×¢Ò⣺ÉÏÊöʾÀýÖ»ÊǶÔÊý¾ÝÁ÷ÖеÄÿһÅúÊý¾Ý½øÐе¥¶ÀµÄ¼ÆÊý£¬¶øÃ»ÓнøÐÐÔöÁ¿¼ÆÊý¡£

»ù±¾¸ÅÄî

StreamingContext

StreamingContextÊÇSpark Streaming³ÌÐòµÄÈë¿Úµã£¬ÕýÈçSparkContextÊÇSpark³ÌÐòµÄÈë¿ÚµãÒ»Ñù¡£

StreamingContextÖÐά»¤ÁËÒ»¸öSparkContextʵÀý£¬Äã¿ÉÒÔͨ¹ýssc.sparkContextÀ´·ÃÎÊËü¡£¸ÃSparkContextʵÀýҪôÔÚ´´½¨StreamingContextʱ±»´«È룬ҪôÔÚStreamingContextÄÚ²¿¸ù¾Ý´«ÈëµÄSparkConf½øÐд´½¨£¬ÕâÈ¡¾öÓÚÄãËùʹÓõÄStreamingContext¹¹Ô캯Êý£¬Çë¹Û¿´APIÎĵµ¡£

¹ØÓÚDStream

Spark Streaming½«Á÷Êý¾Ý³éÏóΪÀëÉ¢»¯Á÷(discretized stream)£¬¼´DStream¡£DStreamÔÚÄÚ²¿±»±íʾΪһ¸öÁ¬ÐøµÄRDDÐòÁУ¬Ã¿Ò»¸öRDD°üº¬ÁËÒ»¸ö¹Ì¶¨Ê±¼ä¼ä¸ôÄÚÊý¾ÝÔ´Ëù²úÉúµÄÊý¾Ý£¬ÈçÏÂͼËùʾ¡£

¶ÔDStreamËù½øÐеIJÙ×÷½«±»×ª»»Îª¶Ôµ×²ãRDDµÄ²Ù×÷¡£ÀýÈ磬ÔÚÇ°ÃæµÄÁ÷Êý¾Ýµ¥´Ê¼ÆÊýʾÀý³ÌÐòÖУ¬lines.flatMap(_.split(" "))Óï¾äÖеÄflatMapËã×Ӿͱ»Ó¦Óõ½lines DStreamÖеÄRDDÒÔÉú³Éwords DStreamÖеÄRDD£¬ÈçÏÂͼËùʾ¡£

InputDStreamºÍReceiver

InputDStream´ú±íÊäÈëÊý¾ÝÁ÷£¬³ýÁËfile streamºÍqueue RDD stream£¬ÆäËûµÄÊäÈëÁ÷¶¼ºÍÒ»¸öReceiverÏà¹ØÁª(¾ßÌ嵨ÊǶÔÓ¦ReceiverInputDStreamÀ࣬ÆäÄÚ²¿»áÆô¶¯Ò»¸öReceiver)£¬Receiver¹¤×÷ÔÚÒ»¸öworker½ÚµãÉÏ£¬ÓÃÓÚ½ÓÊÕÏàÓ¦Êý¾ÝÔ´µÄÁ÷Êý¾Ý²¢½«Æä´æ´¢ÔÚÄÚ´æÖÐ(È¡¾öÓÚ´´½¨StreamingContextʱָ¶¨µÄ´æ´¢¼¶±ð)ÒÔ¹©´¦Àí¡£

ÎÒÃÇÒ²¿ÉÒÔ´´½¨¶à¸öInputDStreamÀ´Á¬½Ó¶à¸öÊý¾ÝÔ´£¬ÆäÖеÄReceiverInputDStream¶¼½«Æô¶¯ReceiverÀ´½ÓÊÕÊý¾Ý¡£Ò»¸öSpark StreamingÓ¦ÓóÌÐòÓ¦¸Ã·ÖÅä×ã¹»¶àµÄºËÐÄ(localģʽÏÂÊÇÏß³Ì)È¥ÔËÐÐreceiver(s)²¢´¦ÀíÆä½ÓÊÕµÄÊý¾Ý¡£µ±ÎÒÃÇÒÔ±¾µØÄ£Ê½ÔËÐÐSpark Streaming³ÌÐòʱ£¬master URL²»ÄÜÖ¸¶¨Îªlocal»òÕßlocal[1](Spark Streaming»áÆô¶¯Ò»¸öÏß³ÌÔËÐÐreceiver£¬Ö»ÓÐÒ»¸öÏ߳̽«µ¼ÖÂûÓÐÏß³ÌÀ´´¦ÀíÊý¾Ý)£¬¶øÓ¦¸ÃÊÇlocal[n]£¬Õâ¸önÓ¦¸Ã´óÓÚreceiverµÄ¸öÊý¡£ÔÚ¼¯ÈºÖÐÔËÐÐSpark Streaming³ÌÐòʱ£¬Í¬ÑùµÀÀí£¬Ò²ÐèÒª·ÖÅä´óÓÚreceiverµÄ¸öÊýµÄºËÐÄÊý¡£

»ù±¾Êý¾ÝÔ´

Spark StreamingÌṩÁ˴ӺܶàÊý¾ÝÔ´»ñÈ¡Á÷Êý¾ÝµÄ·½·¨£¬Ò»Ð©»ù±¾µÄÊý¾ÝÔ´¿ÉÒÔͨ¹ýStreamingContext APIÖ±½ÓʹÓã¬Ö÷Òª°üÀ¨£ºÎļþϵͳ¡¢ÍøÂçÁ¬½Ó¡¢Akka actorsµÈ¡£

ÎļþÊý¾ÝÁ÷

StreamingContextÌṩÁË´Ó¼æÈÝÓÚHDFS APIµÄËùÓÐÎļþϵͳÖд´½¨ÎļþÊý¾ÝÊäÈëÁ÷µÄ·½·¨£¬ÈçÏ£º

ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
ÎļþÁ÷ûÓÐreceiver¡£Spark Streaming½«¼à¿Ø¶ÔӦĿ¼(µ«²»Ö§³ÖǶÌ×Ŀ¼)£¬²¢´¦ÀíÔÚ¸ÃĿ¼Öд´½¨µÄÈκÎÎļþ(ÒÔ.¿ªÍ·µÄ½«±»ºöÂÔ)¡£¼à¿ØÄ¿Â¼ÖеÄÎļþ±ØÐëÓÐÏàͬµÄÊý¾Ý¸ñʽ¡£¼à¿ØÄ¿Â¼ÖеÄÎļþÈç¹û±»ÐÞ¸Ä(±ÈÈçÒÔappend·½Ê½Ð´Èë)£¬ÕâЩÐ޸Ľ«²»»á±»¶ÁÈ¡£¬Òò´ËÕýÈ·µÄ·½Ê½Ó¦¸ÃÊÇÏÈÔÚÆäËûĿ¼ÖÐдºÃÕâЩÎļþ²¢½«ÆäÒÆ¶¯»òÕßÖØÃüÃûµ½¸Ã¼à¿ØÄ¿Â¼¡£

¶ÔÓÚ¼òµ¥µÄÎı¾Îļþ£¬¿ÉÒÔʹÓøü¼òµ¥µÄ·½·¨£¬ÈçÏ£º

ssc.textFileStream(dataDirectory)

ssc.textFileStream(dataDirectory)

ÍøÂçÊý¾ÝÁ÷

ÍøÂçÁ¬½ÓÁ÷¿ÉÒÔʹÓÃssc.socketStream()»òssc.socketTextStream()´´½¨£¬ÏêÇé²Î¼ûAPIÎĵµ¡£

Akka ActorÁ÷

¿ÉÒÔͨ¹ýssc.actorStream()´´½¨Ò»¸ö´ÓAkka actor½ÓÊÕÊý¾ÝÁ÷µÄReceiverInputDStream¡£¸ü¶à²Î¼ûAPIÎĵµºÍ×Ô¶¨ÒåReceiverÖ¸ÄÏ¡£

RDDÐòÁÐÁ÷

ÎÒÃÇÒ²¿ÉÒÔÓÃssc.queueStream()´´½¨Ò»¸ö»ùÓÚRDDÐòÁеÄInputDStream¡£ÐòÁÐÖеÄÿһ¸öRDD½«±»×÷ΪDStreamÖеÄÒ»¸öÊý¾ÝÅú£¬Õâͨ³£ÔÚ²âÊÔÄãµÄSpark Streaming³ÌÐòʱ·Ç³£ÓÐÓá£

¸ß¼¶Êý¾ÝÔ´

¶ÔÓÚKafka¡¢Flume¡¢Kinesis¡¢TwitterµÈÕâЩ¸ß¼¶Êý¾ÝÔ´£¬ÔòÐèÒªÌí¼ÓÍⲿÒÀÀµ£¬¹ØÓÚÒÀÀµ²Î¼ûÕâÀï¡£

ÏÂÃæ¸ø³öһЩ¹ØÓڸ߼¶Êý¾ÝÔ´¼¯³É·½·¨µÄ²Î¿¼Á´½Ó£º

Kafka: Kafka Integration Guide

Flume: Flume Integration Guide

Kinesis: Kinesis Integration Guide

×Ô¶¨ÒåÊý¾ÝÔ´

ÄãÒ²¿ÉÒÔ×Ô¶¨ÒåÊý¾ÝÔ´£¬Ö»ÐèҪʵÏÖÒ»¸ö×Ô¼ºµÄreceiver´Ó×Ô¶¨ÒåÊý¾ÝÔ´½ÓÊÕÊý¾Ý²¢½«ÆäÍÆË͵½Spark¡£ÏêÇé²Î¼û£ºCustom Receiver Guide¡£

Receiver¿É¿¿ÐÔ

ÒÀ¾Ý¿É¿¿ÐԿɽ«Receiver·ÖΪÁ½Àà¡£¿É¿¿Receiver´øÓд«ÊäÈ·ÈÏ»úÖÆ(ACK»úÖÆ)£¬¿ÉÒÔÈ·±£Êý¾ÝÔÚ´«Êä¹ý³ÌÖв»»á¶ªÊ§£¬KafkaºÍFlumeµÈÔÚACK»úÖÆ¿ªÆôµÄÇé¿öϾÍÊǿɿ¿µÄ¡£²»¿É¿¿Receiver²»´øÓд«ÊäÈ·ÈÏ»úÖÆ£¬°üÀ¨²»Ö§³ÖACK»úÖÆºÍÖ§³ÖACKµ«¹Ø±ÕµÄÇéÐΡ£

DStreamת»»Ëã×Ó

DStreamÖ§³ÖºÜ¶àºÍRDDÀàËÆµÄת»»Ëã×Ó(transformation)(ÕâЩת»»Ëã×ÓºÍRDDÖеÄÒ»Ñù£¬¶¼ÊÇlazyµÄ)£¬ÍêÕûµÄËã×ÓÁбí²Î¼ûAPIÎĵµÖеÄDStreamºÍPairDStreamFunctions£¬ÏÂÃæÁгöһЩ³£Óõģº

ÏÂÃæ¶ÔÆäÖÐһЩ²Ù×÷½øÐиüÏêϸµÄ˵Ã÷¡£

updateStateByKey

updateStateByKey²Ù×÷ÔÊÐíÄãÓóÖÐø²»¶ÏµÄÐÂÐÅÏ¢À´¸üÐÂÄãËùά»¤µÄijЩ״̬ÐÅÏ¢¡£ÎªÁËʹÓÃÕâ¸öËã×Ó£¬Í¨³£ÄãÐèÒªÈçÏÂÁ½¸ö²½Ö裺

1¡¢¶¨Òå״̬£º×´Ì¬¿ÉÒÔÊÇÈÎÒâÊý¾ÝÀàÐÍ¡£

2¡¢¶¨Òå״̬¸üк¯Êý£º¸ù¾ÝÏÈǰµÄ״̬Êý¾ÝºÍÁ÷Êý¾ÝÖеÄÐÂÊý¾ÝÈçºÎ¸üÐÂ״̬Êý¾Ý¡£

ÔÚÿһÅúÊý¾ÝÖУ¬Spark¶¼½«ÔÚËùÓÐÒѾ­´æÔÚµÄkeyÉÏÓ¦ÓÃ״̬¸üк¯Êý£¬¼´Ê¹ÔÚÕâÅúÊý¾ÝÖÐûÓÐijЩkey¶ÔÓ¦µÄÊý¾Ý¡£Èç¹û״̬¸üк¯Êý·µ»ØNone£¬ÄÇô¶ÔÓ¦µÄkey-value¶Ô½«±»ÒƳý¡£

ÔÚ¿ìËÙʾÀýÖУ¬ÎÒÃÇÖ»ÊǶÔÿһÅúÊý¾Ý½øÐе¥¶ÀµÄµ¥´Ê¼ÆÊý£¬ÔÚÕâÀïÎÒÃǾͿÉÒÔͨ¹ýupdateStateByKeyËã×Ó½øÐÐÔöÁ¿¼ÆÊýÁË¡£

ÐèҪעÒâµÄÊÇʹÓÃupdateStateByKeyËã×ÓÒªÇóÒѾ­ÅäÖÃÁ˼ì²éµãĿ¼£¬²Î¼û¼ì²éµã²¿·Ö¡£

transform

transform²Ù×÷ÔÊÐíÔÚDStreamÉÏÓ¦ÓÃÈÎÒâRDD-to-RDDº¯Êý£¬ÕâÑùÄã¾Í¿ÉÒÔ·½±ãµØÊ¹ÓÃÔÚDStream APIÖÐûÓеÄÈ´ÔÚRDD APIÖдæÔÚµÄËã×ÓÀ´×ª»»DStreamÖеÄÿһ¸öRDDÁË¡£ÀýÈ磬ÔÚDStream APIÖв»´æÔÚ½«Êý¾ÝÁ÷ÖеÄÿһÅúÊý¾Ý(Ò»¸öRDD)ÓëÆäËûÊý¾Ý¼¯½øÐÐjoinµÄ²Ù×÷£¬´Ëʱ¾Í¿ÉÒÔͨ¹ýtransformËã×Ó+RDDµÄjoinËã×ÓÀ´ÊµÏÖ¡£

ÐèҪעÒâµÄÊÇ´«ÈëtransformµÄº¯Êýÿ´ÎÓ¦ÓÃÔÚÒ»ÅúÊý¾Ý(Ò»¸öRDD)ÉÏ£¬ÕâÒâζ×ÅÄã¿ÉÒÔ¸ù¾Ýʱ¼ä±ä»¯ÔÚ²»Í¬µÄRDDÉÏ×ö²»Í¬µÄ´¦Àí£¬Ò²¾ÍÊÇ˵RDD²Ù×÷¡¢RDD·ÖÇøÊý¡¢¹ã²¥±äÁ¿µÈÔÚ²»Í¬µÄÅúÖ®¼ä¶¼¿ÉÒԸı䡣

window

Spark StreamingÒ²ÌṩÁË»ùÓÚ´°¿ÚµÄ¼ÆË㣬ËüÔÊÐíÄãÔÚÒ»¸ö»¬¶¯´°¿ÚÉÏʹÓÃת»»²Ù×÷£¬»¬¶¯´°¿ÚÈçÏÂͼËùʾ¡£

´°¿ÚÊÇ»ùÓÚʱ¼ä»¬¶¯µÄ£¬´°¿Ú²Ù×÷ÐÂÐγɵÄDStreamÖеÄÿһ¸öRDD°üº¬ÁËijһ»¬¶¯´°¿ÚÖеÄËùÓÐÊý¾Ý¡£Èκδ°¿Ú²Ù×÷¶¼ÐèÒªÖ¸¶¨ÈçÏÂÁ½¸ö²ÎÊý£º

´°¿Ú³¤¶È£ºËü±ØÐëÊÇÔ´DStreamÅú´¦Àí¼ä¸ôµÄÕûÊý±¶¡£

»¬¶¯¼ä¸ô£ºËü±ØÐëÊÇÔ´DStreamÅú´¦Àí¼ä¸ôµÄÕûÊý±¶¡£

һЩ³£ÓõĴ°¿Ú²Ù×÷Ëã×ÓÈçÏ£º

ÐèҪǿµ÷µÄÊÇ£¬ÉÏÊöijЩ²Ù×÷(ÈçreduceByWindowºÍreduceByKeyAndWindowµÈ)ÓÐÒ»Ð©ÌØÊâÐÎʽ£¬Í¨¹ýÖ»¿¼ÂÇнøÈë´°¿ÚµÄÊý¾ÝºÍÀ뿪´°¿ÚµÄÊý¾Ý£¬ÈÃSparkÔöÁ¿¼ÆËã¹éÔ¼½á¹û¡£ÕâÖÖÌØÊâÐÎʽÐèÒª¶îÍâÌṩһ¸ö¹æÔ¼º¯ÊýµÄÄæº¯Êý£¬±ÈÈç+¶ÔÓ¦µÄÄæº¯ÊýΪ-¡£¶ÔÓڽϴóµÄ´°¿Ú£¬Ìá¹©Äæº¯Êý¿ÉÒÔ´ó´óÌá¸ßÖ´ÐÐЧÂÊ¡£

join

Stream-stream joins£º

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

ÉÏÊö´úÂë¶ÔÁ½¸öDStream½øÐÐjoin²Ù×÷£¬ÔÚÿһ¸öÅú´¦Àí¼ä¸ô£¬stream1²úÉúµÄÒ»¸öRDD½«ºÍstream2²úÉúµÄÒ»¸öRDD½øÐÐjoin²Ù×÷¡£ÁíÍ⣬»¹ÓÐÆäËûһЩjoin²Ù×÷£ºleftOuterJoin¡¢rightOuterJoinºÍfullOuterJoin¡£Ò²¿ÉÒÔ½øÐлùÓÚ´°¿ÚµÄjoin²Ù×÷£¬ÈçÏ£º

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins£º

ÕâÖÖÀàÐ͵Äjoin¿ÉÒÔͨ¹ýtransform()À´ÊµÏÖ¡£ÈçÏ´úÂ뽫һ¸ö·Ö´°µÄstreamºÍÒ»¸öÊý¾Ý¼¯½øÐÐjoin£º

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

DStreamÊä³ö²Ù×÷

Êä³ö²Ù×÷ÔÊÐí½«DStreamÖеÄÊý¾ÝÍÆË͵½Íⲿϵͳ£¬±ÈÈçÊý¾Ý¿âºÍÎļþϵͳ¡£ºÍRDDµÄactionËã×ÓÀàËÆ£¬DStreamµÄÊä³ö²Ù×÷ÓÃÀ´´¥·¢ËùÓÐת»»²Ù×÷µÄÖ´ÐС£ÏÂÃæÁгöÖ÷ÒªµÄÊä³ö²Ù×÷£º

foreachRDDʹÓÃ×¢ÒâÊÂÏî

ͨ³££¬½«Êý¾Ýдµ½ÍⲿϵͳÐèÒª´´½¨Ò»¸öÍøÂçÁ¬½Ó¡£²»¾­Òâ¼ä£¬ÄãºÜ¿ÉÄÜÔÚdriver½Úµã´´½¨Ò»¸öÁ¬½Ó¶ÔÏó£¬È»ºóÔÚÊÔ×ÅÔÚexecutor½ÚµãʹÓÃÕâ¸öÁ¬½Ó£¬ÈçÏ£º

dstream.foreachRDD { rdd =>
val connection = createNewConnection() // ÔÚdriver½ÚµãÖ´ÐÐ
rdd.foreach { record =>
connection.send(record) // ÔÚexecutor½ÚµãÖ´ÐÐ
}
}

ÉÏÊö´úÂëÊÇ´íÎóµÄ£¬ÒòΪ´´½¨µÄÁ¬½Ó¶ÔÏ󽫱»ÐòÁл¯È»ºó´«Êäµ½worker½Úµã£¬¶øÁ¬½Óͨ³£ÊDz»ÄÜÔÚ»úÆ÷Ö®¼ä´«µÝµÄ¡£Õâ¸ö´íÎó¿ÉÄÜÏÔʾΪÐòÁл¯´íÎó(Á¬½Ó¶ÔÏ󲻿ÉÐòÁл¯)¡¢³õʼ»¯´íÎó(Á¬½Ó¶ÔÏóÐèÒªÔÚworker½Úµã³õʼ»¯)µÈµÈ¡£

¸Ä½øµÄ°ì·¨ÊÇÔÚworker½Úµã´´½¨Á¬½Ó¶ÔÏó¡£È»¶ø£¬Õâ¿ÉÄܵ¼ÖÂÁíÒ»¸ö³£¼û´íÎ󡪡ªÎªÃ¿Ò»Ìõ¼Ç¼´´½¨Ò»¸öÁ¬½Ó£¬ÈçÏ£º

dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}

ΪÿһÌõ¼Ç¼´´½¨Ò»¸öÁ¬½Ó½«ÏûºÄµô´óÁ¿ÏµÍ³×ÊÔ´£¬¼«´óµØ½µµÍÁËϵͳЧÂÊ¡£Ò»¸ö¸üºÃµÄ·½·¨ÊÇʹÓÃrdd.foreachPartition()Ϊÿһ¸öRDD·ÖÇø´´½¨Ò»¸öÁ¬½Ó£¬ÈçÏ£º

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}

×îºó£¬»¹¿ÉÒÔ½øÒ»²½ÓÅ»¯¡ª¡ªÔÚ¶à¸öÅúÊý¾Ý(RDD)Ö®¼äÖØÓÃÁ¬½Ó¶ÔÏó¡£ÎÒÃÇ¿ÉÒÔͨ¹ýÒ»¸ö¾²Ì¬µÄlazyµÄÁ¬½Ó³ØÀ´±£´æÁ¬½Ó£¬ÒÔ¹©ÔÚ¶àÅúÊý¾ÝÖ®¼ä¹²ÓÃÁ¬½Ó¶ÔÏó£¬ÈçÏ£º

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// Á¬½Ó³ØÐèÊǾ²Ì¬µÄ£¬²¢ÇÒ³õʼ»¯ÐèÊÇlazyµÄ
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // ½«Á¬½Ó¹é»¹£¬ÒԱ㽫À´ÖØÓÃ
}
}

×¢Ò⣬ÉÏÊöÁ¬½Ó³ØÖеÄÁ¬½ÓÓ¦¸ÃÊǰ´Ðè´´½¨µÄ(lazyµÄ)£¬²¢ÇÒ×îºÃ½«³¤ÆÚ²»ÓõÄÁ¬½Ó¹Ø±Õ(³¬Ê±»úÖÆ)¡£

DStreamµÄת»»²Ù×÷ÊÇlazyµÄ£¬Êä³ö²Ù×÷´¥·¢ÊµÖʵļÆËã¡£¾ßÌåµØËµÊÇDStreamÊä³ö²Ù×÷ÄÚ²¿µÄRDDÐж¯²Ù×÷Ç¿ÖÆ´¦Àí½ÓÊÕµ½µÄÊý¾Ý¡£Òò´Ë£¬Èç¹ûÒ»¸ö³ÌÐòûÓÐÈκÎÊä³ö²Ù×÷£¬»òÕßÓÐÏñforeachRDD()ÕâÑùµÄÊä³ö²Ù×÷µ«ÆäÖÐûÓÐÈκÎRDDÐж¯²Ù×÷£¬ÄÇô¸Ã³ÌÐò¾Í²»»áÖ´ÐÐÈκμÆË㣬Ëü½«¼òµ¥µØ½ÓÊÕÊý¾ÝÈ»ºó¶ªÆúµô¡£

ȱʡÇé¿öÏ£¬Êä³ö²Ù×÷Ò»´ÎÖ´ÐÐÒ»¸ö£¬²¢ÇÒÊǰ´ÕÕÓ¦ÓóÌÐò¶¨ÒåµÄǰºó˳ÐòÖ´Ðеġ£

ÀÛ¼ÓÆ÷ºÍ¹ã²¥±äÁ¿

ÔÚSpark StreamingÖУ¬ÀÛ¼ÓÆ÷(Accumulator)ºÍ¹ã²¥±äÁ¿(Broadcast)²»ÄÜ´Ó¼ì²éµã(checkpoint)Öлָ´¡£Èç¹ûÄã²ÉÓüì²éµã»úÖÆ(¼ì²éµã½«ÇжÏRDDÒÀÀµ)²¢ÇÒÒ²ÓÃÁËÀÛ¼ÓÆ÷»ò¹ã²¥±äÁ¿£¬ÎªÁËÔÚÍ»·¢Òì³£²¢ÖØÆôdriver½ÚµãÖ®ºóÀÛ¼ÓÆ÷ºÍ¹ã²¥±äÁ¿¿ÉÒÔ±»ÖØÐÂʵÀý»¯£¬ÄãÓ¦¸ÃΪËüÃÇ´´½¨lazyʵÀý»¯µÄµ¥Àý¶ÔÏó¡£Ê¾ÀýÈçÏ£º

object WordBlacklist {

@volatile private var instance: Broadcast[Seq[String]] = null

def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}

object DroppedWordsCounter {

@volatile private var instance: Accumulator[Long] = null

def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}
}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// »ñÈ¡»ò×¢²áblacklist¹ã²¥±äÁ¿
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// »ñÈ¡»ò×¢²ádroppedWordsCounterÀÛ¼ÓÆ÷
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// ¸ù¾ÝblacklistÀ´ÒƳýwords£¬²¢ÓÃdroppedWordsCounterÀ´¼ÆÊý
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
false
} else {
true
}
}.collect()
val output = "Counts at time " + time + " " + counts
})

DataFrameºÍSQL²Ù×÷

ÔÚÁ÷Êý¾Ý´¦ÀíÖÐÒ²¿ÉÒÔʹÓÃDataFrameºÍSQL¡£´ËʱÄã±ØÐëÓÃStreamingContextÕýÔÚʹÓõÄSparkContextʵÀýÀ´´´½¨Ò»¸öSQLContext¡£ÎªÁËʹ³ÌÐò¿ÉÒÔÔÚdriver¹ÊÕÏÖØÆôÖ®ºó¿ÉÒÔ¼ÌÐøÔËÐУ¬ÎÒÃÇÓ¦¸Ã´´½¨Ò»¸ölazyʵÀý»¯µÄSQLContextµÄµ¥Àý¶ÔÏó¡£Ê¾ÀýÈçÏ£º

/** ÔÚSpark Streaming³ÌÐòÖнøÐÐDataFrame²Ù×÷ */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

// »ñÈ¡SQLContextµÄµ¥Àý¶ÔÏó
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._

val wordsDataFrame = rdd.toDF("word")
wordsDataFrame.registerTempTable("words")
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}

MLlib²Ù×÷

ÄãÒ²¿ÉÒÔÔÚÁ÷Êý¾ÝÖÐʹÓÃÓÉMLlibÌṩµÄ»úÆ÷ѧϰËã·¨¡£Ê×ÏÈÄãÒªÖªµÀµÄÊÇ£¬ÓÐһЩÁ÷ʽ»úÆ÷ѧϰËã·¨(ÀýÈçStreaming Linear Regression¡¢Streaming KMeansµÈ)£¬ËüÃÇ¿ÉÒÔ´ÓÁ÷Êý¾ÝÖÐѧϰµÃµ½Ä£ÐÍ£¬Ò²¿ÉÒÔ½«Ñ§µ½µÄÄ£ÐÍÓ¦Óõ½Á÷Êý¾ÝÖС£³ý´ËÖ®Í⣬¶ÔÓÚ´óÁ¿µÄ»úÆ÷ѧϰËã·¨£¬Äã¿ÉÒÔͨ¹ýÀúÊ·Êý¾ÝÀëÏßµØÑ§Ï°µÃµ½Ò»¸öÄ£ÐÍ£¬²¢½«Ä£ÐÍÓ¦Óõ½ÔÚÏßµÄÁ÷Êý¾ÝÖС£

³Ö¾Ã»¯(»º´æ)

ºÍRDDÏàËÆ£¬DStreamÒ²ÔÊÐí½«Á÷Êý¾Ý³Ö¾Ã»¯£¬¼òµ¥µØÔÚDStreamÉϵ÷ÓÃpersist()½«×Ô¶¯µØ½«Æä´ú±íµÄÿһ¸öRDD»º´æÏÂÀ´¡£Èç¹ûͬһ¸öDStreamÖеÄÊý¾ÝÒª±»Ê¹Óöà´Î£¬½«DStream»º´æÏÂÀ´½«ÊǷdz£ÓÐÒæµÄ¡£

¶ÔÓÚwindow-based²Ù×÷(ÈçreduceByWindow¡¢reduceByKeyAndWindowµÈ)ºÍstate-based²Ù×÷(ÈçupdateStateByKey)£¬DStream½«±»ÒþʽµØ³Ö¾Ã»¯£¬Òò´ËÄã¾Í²»±Ø×Ô¼ºÊÖ¶¯µ÷ÓÃpersist()ÁËŶ¡£

¶ÔÓÚ´ÓÍøÂç»ñÈ¡Êý¾ÝµÄÇé¿ö(ÈçTCPÁ¬½Ó¡¢Kafka¡¢FlumeµÈ)£¬³öÓÚÈÝ´íµÄ¿¼ÂÇ£¬È±Ê¡µÄ³Ö¾Ã»¯¼¶±ðÊǽ«Êý¾Ý¸´ÖƵ½Á½¸ö½Úµã¡£

×¢Ò⣺ºÍRDD²»Í¬µÄÊÇ£¬DStreamµÄȱʡ³Ö¾Ã»¯¼¶±ðÊǽ«Êý¾ÝÐòÁл¯²¢´æ´¢µ½ÄÚ´æÖС£

¼ì²éµã

Á÷´¦Àí³ÌÐòͨ³£ÊÇ7*24Сʱ²»¼ä¶ÏÔËÐеģ¬Òò´Ë±ØÐëÊÇ¿ÉÒÔ´Ó¹ÊÕÏÖлָ´µÄ¡£ÎªÁË¿ÉÒÔ´Ó¹ÊÕÏÖлָ´£¬Spark StreamingÐèÒªÔÚ¿ÉÈÝ´íµÄ´æ´¢ÏµÍ³ÖÐcheckpoint×ã¹»µÄÐÅÏ¢¡£ÓÐÁ½ÀàÊý¾Ý±»checkpoint¡£

1¡¢Metadata checkpointing: ½«¶¨ÒåÁ÷¼ÆËãµÄÐÅÏ¢±£´æµ½¿ÉÈÝ´íµÄ´æ´¢ÏµÍ³ÖУ¬ÈçHDFS¡£Õâ±»ÓÃ×÷Spark Streaming driver½Úµã´Ó¹ÊÕÏÖлָ´¡£Metadata°üÀ¨£ºStreaming³ÌÐòµÄÅäÖÃÐÅÏ¢¡¢×÷ÓÃÔÚDStreamÉϵIJÙ×÷¼¯ºÏ¡¢»¹Î´´¦ÀíÍêµÄbatch¡£

2¡¢Data checkpointing: ½«²úÉúµÄRDD±£´æµ½¿É¿¿µÄ´æ´¢ÏµÍ³£¬ÈçHDFS¡£Õâ¶ÔÓÚһЩstatefulµÄת»»²Ù×÷(ÈçupdateStateByKey¡¢reduceByKeyAndWindowµÈ)ÊDZØÐëµÄ£¬ÕâЩ²Ù×÷ÐèÒª½áºÏ¶à¸öbatchµÄÊý¾ÝÀ´½øÐд¦Àí£¬Ð²úÉúµÄRDDÒÀÀµÓÚÏÈǰ¼¸¸öbatch¶ÔÓ¦µÄRDD£¬Õ⽫µ¼ÖÂÒÀÀµÁ´Ëæ×Åʱ¼ä³ÖÐø±ä³¤¡£ÎªÁ˱ÜÃâÎޱ߽çµÄÔö³¤£¬statefulת»»²Ù×÷µÄÖмä½á¹û(RDD)»á±»ÖÜÆÚÐÔµØcheckpointµ½¿É¿¿µÄ´æ´¢ÏµÍ³ÖУ¬ÕâÑù¾Í¿ÉÒÔÇжÏÒÀÀµÁ´ÁË¡£

ºÎʱÆôÓÃCheckpointing£º

1¡¢Ê¹ÓÃÁËstatefulת»»²Ù×÷£ºÕâÖÖÇé¿öϱØÐëÅäÖÃcheckpointĿ¼£¬ÒÔ±ãÖÜÆÚÐÔµÄRDD checkpointing¡£

2¡¢driver½Úµã´Ó¹ÊÕϻָ´£ºÈç¹ûÄãÏëʹdriver½Úµã¿É´Ó¹ÊÕϻָ´£¬¾ÍÒªÅäÖÃcheckpointĿ¼¡£Èç¹ûÎÞËùν¹ÊÕϻָ´£¬checkpointĿ¼¾Í²»ÊDZØÐëµÄ£¬´ËʱÈç¹û³ÌÐòÖÐûÓÐstatefulת»»²Ù×÷£¬ÄÇô¾ÍÎÞÐèÅäÖÃcheckpointĿ¼¡£

ÔõÑùÅäÖÃCheckpointing£º

¿ÉÒÔʹÓÃssc.checkpoint()À´ÉèÖÃcheckpointĿ¼£¬ÕâÑùÄã¾Í¿ÉÒÔÔÚ³ÌÐòÖÐʹÓÃstatefulµÄת»»²Ù×÷ÁË£¬Èç¹ûÄãÏëʹ³ÌÐò¿ÉÒÔ´Ódriver½ÚµãµÄ¹ÊÕÏÖлָ´£¬ÄãÓ¦¸ÃÖØÐ´ÄãµÄ³ÌÐòÒÔÖ§³ÖÒÔÏÂÐÐΪ£º

1¡¢µ±³ÌÐòÊ×´ÎÆô¶¯£¬´´½¨Ò»¸öStreamingContext£¬²¢ÔÚÉèÖÃһЩ¶«¶«Ö®ºóµ÷ÓÃstart()¡£

2¡¢µ±³ÌÐò´Ó¹ÊÕÏÖлָ´£¬´ÓcheckpointÊý¾ÝÖÐÖØ½¨Ò»¸öStreamingContext¡£

ÉÏÊöÐÐΪ¿ÉÒÔͨ¹ýssc.getOrCreate()À´¸¨ÖúʵÏÖ£¬Ê¾ÀýÈçÏ£º

// ´´½¨²¢ÉèÖÃÒ»¸öеÄStreamingContext¶ÔÏó
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)
val lines = ssc.socketTextStream(...)
...
ssc.checkpoint(checkpointDirectory)
ssc
}

// Èç¹û¼ì²éµã´æÔÚ£¬Ôò¸ù¾Ý¼ì²éµã´´½¨Ò»¸öStreamingContext£»
// ·ñÔò£¬¸ù¾ÝÌṩµÄº¯Êý´´½¨Ò»¸öеÄStreamingContext¡£
val context =
StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// ÔÚÕâÀï×öһЩ¶îÍâµÄÉèÖá£ÕâÀïµÄÉèÖúÍÊ×´ÎÆô¶¯»¹ÊǹÊÕϻָ´Î޹ء£
context. ...

// ¿ªÊ¼ÔËÐÐ
context.start()
context.awaitTermination()

ÐèҪעÒâµÄÊÇ£¬checkpointµÄʱ¼ä¼ä¸ôÐèÒª×Ðϸ¿¼ÂÇ£¬¹ýС»ò¹ý´óµÄʱ¼ä¼ä¸ô¶¼¿ÉÄܵ¼ÖÂÎÊÌ⡣ͨ³££¬checkpointµÄʱ¼ä¼ä¸ô×îºÃÊÇDStreamµÄÅú´¦Àíʱ¼ä¼ä¸ôµÄ5-10±¶¡£dstream.checkpoint()¿ÉÓÃÀ´ÉèÖÃcheckpointµÄʱ¼ä¼ä¸ô£¬Í¬Ê±¶ÔÓÚÄÇЩûÓÐĬÈϵؽøÐÐcheckpointingµÄDStream(·Çstatefulת»»²Ù×÷Éú³ÉµÄDStream)£¬ÕâÒ²½«ÒýÆðÖÜÆÚÐÔµØcheckpoint¸ÃDStreamÖеÄRDD¡£

¼à¿ØÓ¦ÓÃ

³ýÁËSparkÌṩµÄһЩ¼à¿ØÄÜÁ¦Í⣬Spark Streaming»¹ÌṩÁËһЩ¶îÍâµÄ¼à¿ØÄÜÁ¦¡£µ±Ò»¸öSpark Streaming³ÌÐòÔËÐÐʱ£¬SparkÓ¦ÓóÌÐò¼à¿ØÒ³Ãæ(ͨ³£ÊÇ http://master:4040 )½«¶à³öÒ»¸öStreamingÑ¡Ï£¬ÆäÖÐչʾÁËreceiverºÍÒÑÍê³ÉµÄbatchµÄͳ¼ÆÐÅÏ¢¡£ÆäÖÐÓÐÁ½¸öÐÅÏ¢·Ç³£ÓÐÓãº

1¡¢Processing Time: Ò»ÅúÊý¾ÝµÄ´¦Àíʱ¼ä¡£

2¡¢Scheduling Delay: Ò»ÅúÊý¾ÝÔÚ¶ÓÁÐÖеȴýµÄʱ¼ä¡£

Èç¹û´¦ÀíÒ»ÅúÊý¾ÝµÄʱ¼ä³ÖÐø¸ß³öÅú´¦Àí¼ä¸ô£¬»òÕߵȴýʱ¼ä³ÖÐøÔö¼Ó£¬Í¨³£Òâζ×ÅÄãµÄϵͳµÄ´¦ÀíËٶȸú²»ÉÏÊý¾Ý²úÉúµÄËÙ¶È¡£´Ëʱ£¬Äã¿ÉÒÔ¿¼ÂÇÏ÷¼õÅúÊý¾ÝµÄ´¦Àíʱ¼ä£¬²Î¼ûÐÔÄܵ÷ÓŲ¿·Ö¡£

ÄãÒ²¿ÉÒÔͨ¹ýStreamingListener½Ó¿ÚÀ´¼àÌýSpark Streaming³ÌÐòµÄÖ´ÐÐ×´¿ö£¬°üÀ¨£ºreceiver״̬¡¢´¦Àíʱ¼äµÈµÈ¡£

ÐÔÄܵ÷ÓÅ

Ï÷¼õÅúÊý¾Ý´¦Àíʱ¼ä

Êý¾Ý½ÓÊյIJ¢ÐжÈ

ͨ¹ýÍøÂç½ÓÊÕµÄÊý¾Ý(Kafka¡¢Flume¡¢socketµÈ)ÐèÒª¾­¹ý½âÐòÁл¯È»ºó´æ´¢ÔÚSparkÖС£Èç¹ûÊý¾Ý½ÓÊÕ³ÉΪƿ¾±£¬Äã¾ÍÐèÒª¿¼ÂÇÔö¼ÓÊý¾Ý½ÓÊյIJ¢Ðжȡ£×¢Òâÿһ¸öInput DStreamÖ»»á´´½¨Ò»¸öreceiver(ÔËÐÐÔÚworker½Úµã)ÓÃÓÚ½ÓÊÕÒ»¸öÊý¾ÝÁ÷¡£Äã¿ÉÒÔ´´½¨¶à¸öInput DStream²¢ÅäÖÃËüÃÇÀ´½ÓÊÕÊý¾ÝÔ´µÄ²»Í¬²¿·ÖÒÔÔö¼ÓÊý¾Ý½ÓÊÕ²¢Ðжȡ£

ÀýÈ磬Èç¹ûÒ»¸öKafka input DStream½ÓÊÕÁ½¸öÖ÷ÌâµÄÊý¾Ýµ¼ÖÂϵͳƿ¾±µÄ»°£¬¿ÉÒÔ½«KafkaÊäÈëÁ÷»®·ÖΪÁ½¸ö£¬È»ºó´´½¨Á½¸öInput DStream£¬Ã¿Ò»¸ö½ÓÊÕÒ»¸öÖ÷ÌâµÄÊý¾ÝÁ÷¡£ÕâÑùµÄ»°Êý¾Ý½ÓÊվͿÉÒÔ²¢ÐнøÐÐÁË£¬´Ó¶øÔö¼ÓÁËϵͳµÄÍÌÍÂÁ¿¡£ÕâÁ½¸öDStream¿ÉÒÔ±»union³ÉΪһ¸öµ¥Ò»µÄDStream£¬ºóÐøµÄת»»²Ù×÷½«×÷ÓÃÔÚͳһµÄÊý¾ÝÁ÷Ö®ÉÏ¡£Ê¾ÀýÈçÏ£º

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

Êý¾Ý´¦ÀíµÄ²¢ÐжÈ

ÁíÒ»¸öÐèÒª¿¼ÂǵÄÊÇreceiverµÄÊý¾Ý¿é»®·Ö¼ä¸ô£¬Õâ¿ÉÒÔͨ¹ýspark.streaming.blockInterval½øÐÐÉèÖá£receiver»á½«½ÓÊÕµ½µÄÊý¾ÝºÏ²¢ÎªÊý¾Ý¿éÈ»ºó´æ´¢µ½SparkÄÚ´æÖС£Ã¿Ò»ÅúÊý¾ÝÖÐÊý¾Ý¿éµÄÊýÁ¿¾ö¶¨ÁËTaskµÄÊýÁ¿(ͨ³£´óÔ¼ÊÇ£ºÅú´¦Àí¼ä¸ô/¿é¼ä¸ô)¡£¹ýÉÙµÄTaskÊý½«µ¼Ö¼¯Èº×ÊÔ´ÀûÓÃÂʽµµÍ£¬Èç¹û³öÏÖÕâÖÖÇé¿ö£¬ÄãÓ¦¸ÃÊÔ×ÅÈ¥¼õС¿é»®·Ö¼ä¸ô£¬ÎÒÃÇÍÆ¼öµÄ¿é»®·Ö¼ä¸ôµÄ×îСֵÊÇ´óÔ¼50ms£¬¹ýСµÄ»°Ò²½«µ¼ÖÂһЩÎÊÌâ¡£

ÁíÒ»¸öÔö¼Ó²¢Ðжȵķ½·¨ÊÇÔÚ´¦ÀíÊý¾Ý֮ǰ£¬Ê¹ÓÃinputStream.repartition()Ã÷È·µØ½«Input DStreamÖØÐ·ÖÇø£¬ÕâÑùÐÂÐγɵÄDStreamÖеÄRDD¶¼½«ÓÐÖ¸¶¨ÊýÁ¿µÄ·ÖÇø¡£

¶ÔÓÚһЩ²Ù×÷£¬ÈçreduceByKey()ºÍreduceByKeyAndWindow()µÈ£¬Ò²¿ÉÒÔ´«µÝÒ»¸ö·ÖÇøÊý²ÎÊýÀ´¿ØÖƼÆËãµÄ²¢Ðжȡ£

Êý¾ÝÐòÁл¯ÓÅ»¯

ÔÚSpark Streaming³ÌÐòÖУ¬ÊäÈëÊý¾ÝºÍ³Ö¾Ã»¯µÄÊý¾ÝĬÈ϶¼¾­¹ýÐòÁл¯´¦Àí²¢»º´æÔÚÄÚ´æÖС£

ÐòÁл¯ÓÅ»¯¿ÉÒÔÃ÷ÏÔÌá¸ß³ÌÐòµÄÔËÐÐЧÂÊ£¬²Î¼ûÎÒµÄSparkʹÓÃ×ܽáÒ»ÎĵÄÐòÁл¯²¿·Ö¡£

ÔÚÒ»Ð©ÌØÊâÇé¿öÏ£¬ÐèÒª±£´æÔÚÄÚ´æÖеÄÁ÷Êý¾Ý¿ÉÄܲ»ÊǺܴó£¬Õâʱ¿ÉÒÔÉèÖô洢¼¶±ðÒÔ·ÇÐòÁл¯µÄÐÎʽ´æ´¢ÔÚÄÚ´æÖУ¬Èç¹ûÕâ²»»áÒýÆð¹ý´óµÄGC¿ªÏú£¬ÄÇô½«Ìá¸ß³ÌÐòµÄÐÔÄÜ¡£

ÉèÖÃÕýÈ·µÄÅú´¦Àí¼ä¸ô

Åú´¦Àí¼ä¸ôµÄÉèÖöÔÁ÷´¦Àí³ÌÐòÊǷdz£¹Ø¼üµÄ£¬Õâ¿ÉÄÜÓ°Ïìµ½ÊäÈëÁ÷ÄÜ·ñ±»Ñ¸ËÙµØÁ÷³©µØ³ÖÐøµØ´¦Àí¡£

Ç¡µ±µÄÅú´¦Àí¼ä¸ôͨ³£ºÍÊý¾Ý²úÉúËÙ¶ÈÒÔ¼°¼¯Èº¼ÆËãÄÜÁ¦Ïà¹Ø¡£Í¨³£À´Ëµ£¬Èç¹ûÎÒÃÇÏëÁ˽âÒ»¸öÁ÷´¦Àí³ÌÐòµÄ´¦ÀíËÙ¶ÈÄÜ·ñ¸úµÃÉÏÊý¾Ý²úÉúµÄËÙ¶È£¬¿ÉÒԲ鿴SparkÓ¦ÓÃ¼à¿ØÒ³Ãæ( http://master:4040 )¡£¶ÔÓÚÒ»¸öÎȶ¨µÄÁ÷´¦Àí³ÌÐòÀ´Ëµ£¬Åú´¦Àíʱ¼ä(Processing Time)Ó¦¸ÃСÓÚÉèÖõÄÅú´¦Àí¼ä¸ôʱ¼ä(Batch Interval)£¬²¢ÇÒBatchµÄµ÷¶ÈÑÓ³Ùʱ¼ä(Scheduling Delay)ÊÇÏà¶ÔƽÎȵÄ(³ÖÐøÔö¼Ó¾ÍÒâζןú²»ÉÏÊý¾Ý²úÉúËÙ¶ÈÁË£¬µ«Ë²Ê±µÄÔö¼Ó²¢²»Òâζ×Åʲô)¡£

ÒѾ­Ö¤Êµ£¬¶ÔÓÚ´ó¶àÊýÓ¦ÓÃÀ´Ëµ£¬500msÊDZȽϺõÄ×îСÅú´¦Àí¼ä¸ô¡£

ÄÚ´æµ÷ÓÅ

ÄÚ´æÓÅ»¯¶ÔÓÚÒ»¸öSpark Streaming³ÌÐòÀ´ËµÒ²ºÜÖØÒª£¬ÕâÖ÷Òª°üÀ¨£ºÄÚ´æÊ¹ÓÃÓÅ»¯ÒÔ¼°GCÓÅ»¯¡£

   
5208 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

APPÍÆ¹ãÖ®ÇÉÓù¤¾ß½øÐÐÊý¾Ý·ÖÎö
Hadoop Hive»ù´¡sqlÓï·¨
Ó¦Óö༶»º´æÄ£Ê½Ö§³Åº£Á¿¶Á·þÎñ
HBase ³¬Ïêϸ½éÉÜ
HBase¼¼ÊõÏêϸ½éÉÜ
Spark¶¯Ì¬×ÊÔ´·ÖÅä

HadoopÓëSpark´óÊý¾Ý¼Ü¹¹
HadoopÔ­ÀíÓë¸ß¼¶Êµ¼ù
HadoopÔ­Àí¡¢Ó¦ÓÃÓëÓÅ»¯
´óÊý¾ÝÌåϵ¿ò¼ÜÓëÓ¦ÓÃ
´óÊý¾ÝµÄ¼¼ÊõÓëʵ¼ù
Spark´óÊý¾Ý´¦Àí¼¼Êõ

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí