ÎÊÌâµ¼¶Á£º
1.ʲôÊÇSpark Streaming£¿
2.Spark Streaming¿ÉÒÔ½ÓÊÜÄÇЩÊý¾ÝÔ´£¿
3.Dstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÄÄÁ½ÖÖ²Ù×÷£¿
ÔÚ¿´spark Streaming£¬ÎÒÃÇÐèÒªÊ×ÏÈÖªµÀʲôÊÇSpark
streaming£¿
Spark streaming: ¹¹½¨ÔÚSparkÉÏ´¦ÀíStreamÊý¾ÝµÄ¿ò¼Ü£¬»ù±¾µÄÔÀíÊǽ«StreamÊý¾Ý·Ö³ÉСµÄʱ¼äƬ¶Ï£¨¼¸Ã룩£¬ÒÔÀàËÆbatchÅúÁ¿´¦ÀíµÄ·½Ê½À´´¦ÀíÕâС²¿·ÖÊý¾Ý¡£Spark
Streaming¹¹½¨ÔÚSparkÉÏ£¬Ò»·½ÃæÊÇÒòΪSparkµÄµÍÑÓ³ÙÖ´ÐÐÒýÇæ£¨100ms+£©¿ÉÒÔÓÃÓÚʵʱ¼ÆË㣬ÁíÒ»·½ÃæÏà±È»ùÓÚRecordµÄÆäËü´¦Àí¿ò¼Ü£¨ÈçStorm£©£¬RDDÊý¾Ý¼¯¸üÈÝÒ××ö¸ßЧµÄÈÝ´í´¦Àí¡£´ËÍâСÅúÁ¿´¦ÀíµÄ·½Ê½Ê¹µÃËü¿ÉÒÔͬʱ¼æÈÝÅúÁ¿ºÍʵʱÊý¾Ý´¦ÀíµÄÂß¼ºÍËã·¨¡£·½±ãÁËһЩÐèÒªÀúÊ·Êý¾ÝºÍʵʱÊý¾ÝÁªºÏ·ÖÎöµÄÌØ¶¨Ó¦Óó¡ºÏ¡£
Overview
Spark StreamingÊôÓÚSparkµÄºËÐÄapi£¬ËüÖ§³Ö¸ßÍÌÍÂÁ¿¡¢Ö§³ÖÈÝ´íµÄʵʱÁ÷Êý¾Ý´¦Àí¡£
Ëü¿ÉÒÔ½ÓÊÜÀ´×ÔKafka, Flume, Twitter, ZeroMQºÍTCP
SocketµÄÊý¾ÝÔ´£¬Ê¹Óüòµ¥µÄapiº¯Êý±ÈÈç map, reduce, join, windowµÈ²Ù×÷£¬»¹¿ÉÒÔÖ±½ÓʹÓÃÄÚÖõĻúÆ÷ѧϰËã·¨¡¢Í¼Ëã·¨°üÀ´´¦ÀíÊý¾Ý¡£

ËüµÄ¹¤×÷Á÷³ÌÏñÏÂÃæµÄͼËùʾһÑù£¬½ÓÊܵ½ÊµÊ±Êý¾Ýºó£¬¸øÊý¾Ý·ÖÅú´Î£¬È»ºó´«¸øSpark
Engine´¦Àí×îºóÉú³É¸ÃÅú´ÎµÄ½á¹û¡£

ËüÖ§³ÖµÄÊý¾ÝÁ÷½ÐDstream£¬Ö±½ÓÖ§³ÖKafka¡¢FlumeµÄÊý¾ÝÔ´¡£DstreamÊÇÒ»ÖÖÁ¬ÐøµÄRDDs£¬ÏÂÃæÊÇÒ»¸öÀý×Ó°ïÖú´ó¼ÒÀí½âDstream¡£
A Quick Example
// ´´½¨StreamingContext£¬1ÃëÒ»¸öÅú´Î val ssc = new StreamingContext(sparkConf, Seconds(1));
// »ñµÃÒ»¸öDStream¸ºÔðÁ¬½Ó ¼àÌý¶Ë¿Ú:µØÖ·
val lines = ssc.socketTextStream(serverIP, serverPort);
// ¶ÔÿһÐÐÊý¾ÝÖ´ÐÐSplit²Ù×÷
val words = lines.flatMap(_.split(" "));
// ͳ¼ÆwordµÄÊýÁ¿
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// Êä³ö½á¹û
wordCounts.print();
ssc.start(); // ¿ªÊ¼
ssc.awaitTermination(); // ¼ÆËãÍê±ÏÍ˳ö |
Èç¹ûÒѾװºÃSparkµÄÅóÓÑ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÀý×ÓÊÔÊÔ¡£
Ê×ÏÈ£¬Æô¶¯Netcat£¬Õâ¸ö¹¤¾ßÔÚUnix-likeµÄϵͳ¶¼´æÔÚ£¬ÊǸö¼òÒ×µÄÊý¾Ý·þÎñÆ÷¡£
ʹÓÃÏÂÃæÕâ¾äÃüÁîÀ´Æô¶¯Netcat£º
½Ó×ÅÆô¶¯example
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999 |
ÔÚNetcatÕâ¶ËÊäÈëhello world£¬¿´SparkÕâ±ßµÄ
# TERMINAL 1: # Running Netcat
$ nc -lk 9999
hello world
...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount
local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
... |
Basics
ÏÂÃæÕâ¿éÊÇÈçºÎ±àд´úÂëµÄÀ²£¬ÍÛßÇßÇ£¡
Ê×ÏÈÎÒÃÇÒªÔÚSBT»òÕßMaven¹¤³ÌÌí¼ÓÒÔÏÂÐÅÏ¢£º
groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 0.9.0-incubating |
//ÐèҪʹÓÃÒ»ÏÂÊý¾ÝÔ´µÄ£¬»¹ÒªÌí¼ÓÏàÓ¦µÄÒÀÀµ Source Artifact Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10 |
½ÓמÍÊÇʵÀý»¯
new StreamingContext(master, appName, batchDuration, [sparkHome], [jars]) |
ÕâÊÇ֮ǰµÄÀý×Ó¶ÔDStreamµÄ²Ù×÷¡£

Input Sources
³ýÁËsocketsÖ®Í⣬ÎÒÃÇ»¹¿ÉÒÔÕâÑù´´½¨Dstream
streamingContext.fileStream(dataDirectory) |
ÕâÀïÓÐ3¸öÒªµã£º
£¨1£©dataDirectoryϵÄÎļþ¸ñʽ¶¼ÊÇÒ»Ñù
£¨2£©ÔÚÕâ¸öĿ¼Ï´´½¨Îļþ¶¼ÊÇͨ¹ýÒÆ¶¯»òÕßÖØÃüÃûµÄ·½Ê½´´½¨µÄ
£¨3£©Ò»µ©Îļþ½øÈ¥Ö®ºó¾Í²»ÄÜÔٸıä
¼ÙÉèÎÒÃÇÒª´´½¨Ò»¸öKafkaµÄDstream¡£
import org.apache.spark.streaming.kafka._ KafkaUtils.createStream(streamingContext, kafkaParams, ...) |
Èç¹ûÎÒÃÇÐèÒª×Ô¶¨ÒåÁ÷µÄreceiver£¬¿ÉÒԲ鿴https://spark.incubator.apache.o
... stom-receivers.html
Operations
¶ÔÓÚDstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÁ½ÖÖ²Ù×÷£¬transformations ºÍ output
Transformations
Transformation Meaning map(func) ¶Ôÿһ¸öÔªËØÖ´ÐÐfunc·½·¨ flatMap(func) ÀàËÆmapº¯Êý£¬µ«ÊÇ¿ÉÒÔmapµ½0+¸öÊä³ö filter(func) ¹ýÂË repartition(numPartitions) Ôö¼Ó·ÖÇø£¬Ìá¸ß²¢ÐÐ¶È union(otherStream) ºÏ²¢Á½¸öÁ÷ count() ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ ͳ¼ÆÔªËصĸöÊý reduce(func) ¶ÔRDDsÀïÃæµÄÔªËØ½øÐоۺϲÙ×÷£¬2¸öÊäÈë²ÎÊý£¬1¸öÊä³ö²ÎÊý countByValue() Õë¶ÔÀàÐÍͳ¼Æ£¬µ±Ò»¸öDstreamµÄÔªËØµÄÀàÐÍÊÇKµÄʱºò£¬
µ÷ÓÃËü»á·µ»ØÒ»¸öеÄDstream£¬°üº¬<K,Long>¼üÖµ¶Ô£¬LongÊÇÿ¸öK³öÏֵįµÂÊ¡£ reduceByKey(func, [numTasks]) ¶ÔÓÚÒ»¸ö(K, V)ÀàÐ͵ÄDstream£¬ÎªÃ¿¸ökey£¬
Ö´ÐÐfuncº¯Êý£¬Ä¬ÈÏÊÇlocalÊÇ2¸öỊ̈߳¬clusterÊÇ8¸öỊ̈߳¬Ò²¿ÉÒÔÖ¸¶¨numTasks join(otherStream, [numTasks]) °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, (V, W))µÄÐÂDstream cogroup(otherStream, [numTasks]) °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, Seq[V], Seq[W])µÄÐÂDstream transform(func) ת»»²Ù×÷£¬°ÑÔÀ´µÄRDDͨ¹ýfuncת»»³ÉÒ»¸öеÄRDD updateStateByKey(func) Õë¶ÔkeyʹÓÃfuncÀ´¸üÐÂ״̬ºÍÖµ£¬¿ÉÒÔ½«state¸ÃΪÈκÎÖµ |
UpdateStateByKey Operation
ʹÓÃÕâ¸ö²Ù×÷£¬ÎÒÃÇÊÇÏ£Íû±£´æËü״̬µÄÐÅÏ¢£¬È»ºó³ÖÐøµÄ¸üÐÂËü£¬Ê¹ÓÃËüÓÐÁ½¸ö²½Ö裺
£¨1£©¶¨Òå״̬£¬Õâ¸ö״̬¿ÉÒÔÊÇÈÎÒâµÄÊý¾ÝÀàÐÍ
£¨2£©¶¨Òå״̬¸üк¯Êý£¬´Óǰһ¸ö״̬¸ü¸ÄеÄ״̬
ÏÂÃæÕ¹Ê¾Ò»¸öÀý×Ó£º
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) } |
Ëü¿ÉÒÔÓÃÔÚ°üº¬(word, 1) µÄDstreamµ±ÖУ¬±ÈÈçÇ°ÃæÕ¹Ê¾µÄexample
val runningCounts = pairs.updateStateByKey[Int](updateFunction _) |
Ëü»áÕë¶ÔÀïÃæµÄÿ¸öwordµ÷ÓÃһϸüк¯Êý£¬newValuesÊÇ×îеÄÖµ£¬runningCountÊÇ֮ǰµÄÖµ¡£
Transform Operation
ºÍtransformWithÒ»Ñù£¬¿ÉÒÔ¶ÔÒ»¸öDstream½øÐÐRDD->RDD²Ù×÷£¬±ÈÈçÎÒÃÇÒª¶ÔDstreamÁ÷ÀïµÄRDDºÍÁíÍâÒ»¸öÊý¾Ý¼¯½øÐÐjoin²Ù×÷£¬µ«ÊÇDstreamµÄAPIûÓÐÖ±½Ó±©Â¶³öÀ´£¬ÎÒÃǾͿÉÒÔʹÓÃtransform·½·¨À´½øÐÐÕâ¸ö²Ù×÷£¬ÏÂÃæÊÇÀý×Ó£º
val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
val cleanedDStream = inputDStream.transform(rdd
=> {
rdd.join(spamInfoRDD).filter(...) // join data
stream with spam information to do data cleaning
...
}) |
ÁíÍ⣬ÎÒÃÇÒ²¿ÉÒÔÔÚÀïÃæÊ¹ÓûúÆ÷ѧϰËã·¨ºÍͼËã·¨¡£
Window Operations

ÏȾٸöÀý×Ó°É£¬±ÈÈçÇ°ÃæµÄword countµÄÀý×Ó£¬ÎÒÃÇÏëҪÿ¸ô10Ãë¼ÆËãÒ»ÏÂ×î½ü30ÃëµÄµ¥´Ê×ÜÊý¡£
ÎÒÃÇ¿ÉÒÔʹÓÃÒÔÏÂÓï¾ä£º
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) |
ÕâÀïÃæÌáµ½ÁËwindowsµÄÁ½¸ö²ÎÊý£º
£¨1£©window length£ºwindowµÄ³¤¶ÈÊÇ30Ã룬×î½ü30ÃëµÄÊý¾Ý
£¨2£©slice interval£º¼ÆËãµÄʱ¼ä¼ä¸ô
ͨ¹ýÕâ¸öÀý×Ó£¬ÎÒÃÇ´ó¸ÅÄܹ»´°¿ÚµÄÒâ˼ÁË£¬¶¨ÆÚ¼ÆË㻬¶¯µÄÊý¾Ý¡£
ÏÂÃæÊÇwindowµÄһЩ²Ù×÷º¯Êý£¬»¹ÊÇÓеã¶ùÀí½â²»ÁËwindowµÄ¸ÅÄMeaning¾Í²»·ÒëÁË£¬Ö±½Óɾµô
Transformation Meaning window(windowLength, slideInterval) countByWindow(windowLength, slideInterval) reduceByWindow(func, windowLength, slideInterval) reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) countByValueAndWindow(windowLength, slideInterval, [numTasks]) |
Output Operations
Output Operation Meaning print() ´òÓ¡µ½¿ØÖÆÌ¨ foreachRDD(func) ¶ÔDstreamÀïÃæµÄÿ¸öRDDÖ´ÐÐfunc£¬±£´æµ½Íⲿϵͳ saveAsObjectFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪSequenceFile, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]". saveAsTextFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪÎı¾Îļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]". saveAsHadoopFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪhadoopÎļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]". |
Persistence
DstreamÖеÄRDDÒ²¿ÉÒÔµ÷ÓÃpersist()·½·¨±£´æÔÚÄÚ´æµ±ÖУ¬µ«ÊÇ»ùÓÚwindowºÍstateµÄ²Ù×÷£¬reduceByWindow,reduceByKeyAndWindow,updateStateByKeyËüÃǾÍÊÇÒþʽµÄ±£´æÁË£¬ÏµÍ³ÒѾ°ïËü×Ô¶¯±£´æÁË¡£
´ÓÍøÂç½ÓÊÕµÄÊý¾Ý(such as, Kafka, Flume, sockets,
etc.)£¬Ä¬ÈÏÊDZ£´æÔÚÁ½¸ö½ÚµãÀ´ÊµÏÖÈÝ´íÐÔ£¬ÒÔÐòÁл¯µÄ·½Ê½±£´æÔÚÄÚ´æµ±ÖС£
RDD Checkpointing
״̬µÄ²Ù×÷ÊÇ»ùÓÚ¶à¸öÅú´ÎµÄÊý¾ÝµÄ¡£Ëü°üÀ¨»ùÓÚwindowµÄ²Ù×÷ºÍupdateStateByKey¡£ÒòΪ״̬µÄ²Ù×÷ÒªÒÀÀµÓÚÉÏÒ»¸öÅú´ÎµÄÊý¾Ý£¬ËùÒÔËüÒª¸ù¾Ýʱ¼ä£¬²»¶ÏÀÛ»ýÔªÊý¾Ý¡£ÎªÁËÇå¿ÕÊý¾Ý£¬ËüÖ§³ÖÖÜÆÚÐԵļì²éµã£¬Í¨¹ý°ÑÖмä½á¹û±£´æµ½hdfsÉÏ¡£ÒòΪ¼ì²é²Ù×÷»áµ¼Ö±£´æµ½hdfsÉϵĿªÏú£¬ËùÒÔÉèÖÃÕâ¸öʱ¼ä¼ä¸ô£¬ÒªºÜÉ÷ÖØ¡£¶ÔÓÚСÅú´ÎµÄÊý¾Ý£¬±ÈÈçÒ»ÃëµÄ£¬¼ì²é²Ù×÷»á´ó´ó½µµÍÍÌÍÂÁ¿¡£µ«ÊǼì²éµÄ¼ä¸ôÌ«³¤£¬»áµ¼ÖÂÈÎÎñ±ä´ó¡£Í¨³£À´Ëµ£¬5-10ÃëµÄ¼ì²é¼ä¸ôʱ¼äÊDZȽϺÏÊʵġ£
ssc.checkpoint(hdfsPath) //ÉèÖüì²éµãµÄ±£´æÎ»Öà dstream.checkpoint(checkpointInterval) //ÉèÖüì²éµã¼ä¸ô |
¶ÔÓÚ±ØÐëÉèÖüì²éµãµÄDstream£¬±ÈÈçͨ¹ýupdateStateByKeyºÍreduceByKeyAndWindow´´½¨µÄDstream£¬Ä¬ÈÏÉèÖÃÊÇÖÁÉÙ10Ãë¡£
Performance Tuning
¶ÔÓÚµ÷ÓÅ£¬¿ÉÒÔ´ÓÁ½¸ö·½Ã濼ÂÇ£º
£¨1£©ÀûÓü¯Èº×ÊÔ´£¬¼õÉÙ´¦Àíÿ¸öÅú´ÎµÄÊý¾ÝµÄʱ¼ä
£¨2£©¸øÃ¿¸öÅú´ÎµÄÊý¾ÝÁ¿µÄÉ趨һ¸öºÏÊʵĴóС
Level of Parallelism
ÏñһЩ·Ö²¼Ê½µÄ²Ù×÷£¬±ÈÈçreduceByKeyºÍreduceByKeyAndWindow£¬Ä¬ÈϵÄ8¸ö²¢·¢Ị̈߳¬¿ÉÒÔͨ¹ý¶ÔÓ¦µÄº¯ÊýÌá¸ßËüµÄÖµ£¬»òÕßͨ¹ýÐ޸IJÎÊýspark.default.parallelismÀ´Ìá¸ßÕâ¸öĬÈÏÖµ¡£
Task Launching Overheads
ͨ¹ý½øÐеÄÈÎÎñÌ«¶àÒ²²»ºÃ£¬±ÈÈçÿÃë50¸ö£¬·¢ËÍÈÎÎñµÄ¸ºÔؾͻá±äµÃºÜÖØÒª£¬ºÜÄÑʵÏÖѹÃë¼¶µÄʱÑÓÁË£¬µ±È»¿ÉÒÔͨ¹ýѹËõÀ´½µµÍÅú´ÎµÄ´óС¡£
Setting the Right Batch Size
ҪʹÁ÷³ÌÐòÄÜÔÚ¼¯ÈºÉÏÎȶ¨µÄÔËÐУ¬ÒªÊ¹´¦ÀíÊý¾ÝµÄËٶȸúÉÏÊý¾ÝÁ÷ÈëµÄËÙ¶È¡£×îºÃµÄ·½Ê½¼ÆËãÕâ¸öÅúÁ¿µÄ´óС£¬ÎÒÃÇÊ×ÏÈÉèÖÃbatch
sizeΪ5-10ÃëºÍÒ»¸öºÜµÍµÄÊý¾ÝÊäÈëËÙ¶È¡£È·ÊµÏµÍ³ÄܸúÉÏÊý¾ÝµÄËٶȵÄʱºò£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ý¾ÑéÉèÖÃËüµÄ´óС£¬Í¨¹ý²é¿´ÈÕÖ¾¿´¿´Total
delayµÄ¶à³¤Ê±¼ä¡£Èç¹ûdelayµÄСÓÚbatchµÄ£¬ÄÇôϵͳ¿ÉÒÔÎȶ¨£¬Èç¹ûdelayÒ»Ö±Ôö¼Ó£¬ËµÃ÷ϵͳµÄ´¦ÀíËٶȸú²»ÉÏÊý¾ÝµÄÊäÈëËÙ¶È¡£
24/7 Operation
SparkĬÈϲ»»áÍü¼ÇÔªÊý¾Ý£¬±ÈÈçÉú³ÉµÄRDD£¬´¦ÀíµÄstages£¬µ«ÊÇSpark
StreamingÊÇÒ»¸ö24/7µÄ³ÌÐò£¬ËüÐèÒªÖÜÆÚÐÔµÄÇåÀíÔªÊý¾Ý£¬Í¨¹ýspark.cleaner.ttlÀ´ÉèÖᣱÈÈçÎÒÉèÖÃËüΪ600£¬µ±³¬¹ý10·ÖÖÓµÄʱºò£¬Spark¾Í»áÇå³þËùÓÐÔªÊý¾Ý£¬È»ºó³Ö¾Ã»¯RDDs¡£µ«ÊÇÕâ¸öÊôÐÔÒªÔÚSparkContext
´´½¨Ö®Ç°ÉèÖá£
µ«ÊÇÕâ¸öÖµÊǺÍÈκεÄwindow²Ù×÷°ó¶¨¡£Spark»áÒªÇóÊäÈëÊý¾ÝÔÚ¹ýÆÚÖ®ºó±ØÐë³Ö¾Ã»¯µ½ÄÚ´æµ±ÖУ¬ËùÒÔ±ØÐëÉèÖÃdelayµÄÖµÖÁÉÙºÍ×î´óµÄwindow²Ù×÷Ò»Ö£¬Èç¹ûÉèÖÃСÁË£¬¾Í»á±¨´í¡£
Monitoring
³ýÁËSparkÄÚÖÃµÄ¼à¿ØÄÜÁ¦£¬»¹¿ÉÒÔStreamingListenerÕâ¸ö½Ó¿ÚÀ´»ñÈ¡Åú´¦ÀíµÄʱ¼ä,
²éѯʱÑÓ, È«²¿µÄ¶Ëµ½¶ËµÄÊÔÑé¡£
Memory Tuning
Spark StreamĬÈϵÄÐòÁл¯·½Ê½ÊÇStorageLevel.MEMORY_ONLY_SER£¬¶ø²»ÊÇRDDµÄStorageLevel.MEMORY_ONLY¡£
ĬÈϵģ¬ËùÓг־û¯µÄRDD¶¼»áͨ¹ý±»SparkµÄLRUËã·¨ÌÞ³ý³öÄڴ棬Èç¹ûÉèÖÃÁËspark.cleaner.ttl£¬¾Í»áÖÜÆÚÐÔµÄÇåÀí£¬µ«ÊÇÕâ¸ö²ÎÊýÉèÖÃÒªºÜ½÷É÷¡£Ò»¸ö¸üºÃµÄ·½·¨ÊÇÉèÖÃspark.streaming.unpersistΪtrue£¬Õâ¾ÍÈÃSparkÀ´¼ÆËãÄÄЩRDDÐèÒª³Ö¾Ã»¯£¬ÕâÑùÓÐÀûÓÚÌá¸ßGCµÄ±íÏÖ¡£
ÍÆ¼öʹÓÃconcurrent mark-and-sweep GC£¬ËäÈ»ÕâÑù»á½µµÍϵͳµÄÍÌÍÂÁ¿£¬µ«ÊÇÕâÑùÓÐÖúÓÚ¸üÎȶ¨µÄ½øÐÐÅú´¦Àí¡£
Fault-tolerance PropertiesFailure of
a Worker Node
ÏÂÃæÓÐÁ½ÖÖʧЧµÄ·½Ê½£º
1.ʹÓÃhdfsÉϵÄÎļþ£¬ÒòΪhdfsÊǿɿ¿µÄÎļþϵͳ£¬ËùÒÔ²»»áÓÐÈκεÄÊý¾ÝʧЧ¡£
2.Èç¹ûÊý¾ÝÀ´Ô´ÊÇÍøÂ磬±ÈÈçKafkaºÍFlume£¬ÎªÁË·ÀֹʧЧ£¬Ä¬ÈÏÊÇÊý¾Ý»á±£´æµ½2¸ö½ÚµãÉÏ£¬µ«ÊÇÓÐÒ»ÖÖ¿ÉÄÜÐÔÊǽÓÊÜÊý¾ÝµÄ½Úµã¹ÒÁË£¬ÄÇôÊý¾Ý¿ÉÄܻᶪʧ£¬ÒòΪËü»¹Ã»À´µÃ¼°°ÑÊý¾Ý¸´ÖƵ½ÁíÍâÒ»¸ö½Úµã¡£
Failure of the Driver Node
ΪÁËÖ§³Ö24/7²»¼ä¶ÏµÄ´¦Àí£¬SparkÖ§³ÖÇý¶¯½ÚµãʧЧºó£¬ÖØÐ»ָ´¼ÆËã¡£Spark
Streaming»áÖÜÆÚÐÔµÄдÊý¾Ýµ½hdfsϵͳ£¬¾ÍÊÇÇ°ÃæµÄ¼ì²éµãµÄÄǸöĿ¼¡£Çý¶¯½ÚµãʧЧ֮ºó£¬StreamingContext¿ÉÒÔ±»»Ö¸´µÄ¡£
ΪÁËÈÃÒ»¸öSpark Streaming³ÌÐòÄܹ»±»»Ø¸´£¬ËüÐèÒª×öÒÔϲÙ×÷£º
£¨1£©µÚÒ»´ÎÆô¶¯µÄʱºò£¬´´½¨ StreamingContext£¬´´½¨ËùÓеÄstreams£¬È»ºóµ÷ÓÃstart()·½·¨¡£
£¨2£©»Ö¸´ºóÖØÆôµÄ£¬±ØÐëͨ¹ý¼ì²éµãµÄÊý¾ÝÖØÐ´´½¨StreamingContext¡£
ÏÂÃæÊÇÒ»¸öʵ¼ÊµÄÀý×Ó£º
ͨ¹ýStreamingContext.getOrCreateÀ´¹¹ÔìStreamingContext£¬¿ÉÒÔʵÏÖÉÏÃæËù˵µÄ¡£
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc }
// Get StreaminContext from checkpoint data or
create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
// Do additional setup on context that needs
to be done,
// irrespective of whether it is being started
or restarted
context. ...
// Start the context
context.start()
context.awaitTermination() |
ÔÚstand-aloneµÄ²¿ÊðģʽÏÂÃæ£¬Çý¶¯½ÚµãʧЧÁË£¬Ò²¿ÉÒÔ×Ô¶¯»Ö¸´£¬ÈñðµÄÇý¶¯½ÚµãÌæ´úËü¡£Õâ¸ö¿ÉÒÔÔÚ±¾µØ½øÐвâÊÔ£¬ÔÚÌá½»µÄʱºò²ÉÓÃsuperviseģʽ£¬µ±Ìá½»Á˳ÌÐòÖ®ºó£¬Ê¹ÓÃjps²é¿´½ø³Ì£¬¿´µ½ÀàËÆDriverWrapper¾ÍɱËÀËü£¬Èç¹ûÊÇʹÓÃYARNģʽµÄ»°¾ÍµÃʹÓÃÆäËü·½Ê½À´ÖØÐÂÆô¶¯ÁË¡£
ÕâÀï˳±ãÌáÒ»ÏÂÏò¿Í»§¶ËÌá½»³ÌÐò°É£¬Ö®Ç°×ܽáµÄʱºò°ÑÕâ¿é¸øÂäÏÂÁË¡£
./bin/spark-class org.apache.spark.deploy.Client launch [client-options] \ <cluster-url> <application-jar-url> <main-class> \ [application-options]
cluster-url: masterµÄµØÖ·.
application-jar-url: jar°üµÄµØÖ·£¬×îºÃÊÇhdfsÉϵÄ,´øÉÏhdfs£º//...·ñÔòÒªËùÓеĽڵãµÄĿ¼Ï¶¼ÓÐÕâ¸öjarµÄ
main-class: Òª·¢²¼µÄ³ÌÐòµÄmainº¯ÊýËùÔÚÀà.
Client Options:
--memory <count> (Çý¶¯³ÌÐòµÄÄڴ棬µ¥Î»ÊÇMB)
--cores <count> (ΪÄãµÄÇý¶¯³ÌÐò·ÖÅä¶àÉÙ¸öºËÐÄ)
--supervise (½ÚµãʧЧµÄʱºò£¬ÊÇ·ñÖØÐÂÆô¶¯Ó¦ÓÃ)
--verbose (´òÓ¡ÔöÁ¿µÄÈÕÖ¾Êä³ö) |
ÔÚδÀ´µÄ°æ±¾£¬»áÖ§³ÖËùÓеÄÊý¾ÝÔ´µÄ¿É»Ö¸´ÐÔ¡£
ΪÁ˸üºÃµÄÀí½â»ùÓÚHDFSµÄÇý¶¯½ÚµãʧЧ»Ö¸´£¬ÏÂÃæÓÃÒ»¸ö¼òµ¥µÄÀý×ÓÀ´ËµÃ÷£º
Time Number of lines in input file Output without driver failure Output with driver failure ¡¡¡¡10 ¡¡¡¡¡¡¡¡10 ¡¡¡¡¡¡¡¡¡¡¡¡10 ¡¡¡¡20 ¡¡¡¡¡¡¡¡20 ¡¡¡¡¡¡¡¡¡¡ 20 ¡¡¡¡30 ¡¡¡¡¡¡¡¡30 ¡¡¡¡¡¡¡¡¡¡ 30 ¡¡¡¡40 ¡¡¡¡¡¡¡¡40 ¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER FAILS] no output ¡¡¡¡50 ¡¡¡¡¡¡¡¡50 ¡¡¡¡¡¡¡¡¡¡¡¡no output ¡¡¡¡60 ¡¡¡¡¡¡¡¡60 ¡¡¡¡¡¡¡¡¡¡¡¡no output ¡¡¡¡70 ¡¡¡¡¡¡¡¡70 ¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER RECOVERS] 40, 50, 60, 70 ¡¡¡¡80 ¡¡¡¡¡¡¡¡80 ¡¡¡¡¡¡¡¡¡¡¡¡80 ¡¡¡¡90 ¡¡¡¡¡¡¡¡90 ¡¡¡¡¡¡¡¡¡¡¡¡90 ¡¡100 ¡¡¡¡¡¡¡¡100 ¡¡¡¡¡¡¡¡¡¡ 100 |
ÔÚ4µÄʱºò³öÏÖÁË´íÎó£¬40,50,60¶¼Ã»ÓÐÊä³ö£¬µ½70µÄʱºò»Ö¸´ÁË£¬»Ö¸´Ö®ºó°Ñ֮ǰûÊä³öµÄÒ»ÏÂ×ÓÈ«²¿Êä³ö¡£
|