Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark Streaming±à³Ì½²½â
 
×÷Õߣºhowtodown À´Ô´£ºAboutÔÆ ·¢²¼ÓÚ£º2015-7-30
  3548  次浏览      30
 

ÎÊÌâµ¼¶Á£º

1.ʲôÊÇSpark Streaming£¿

2.Spark Streaming¿ÉÒÔ½ÓÊÜÄÇЩÊý¾ÝÔ´£¿

3.Dstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÄÄÁ½ÖÖ²Ù×÷£¿

ÔÚ¿´spark Streaming£¬ÎÒÃÇÐèÒªÊ×ÏÈÖªµÀʲôÊÇSpark streaming£¿

Spark streaming: ¹¹½¨ÔÚSparkÉÏ´¦ÀíStreamÊý¾ÝµÄ¿ò¼Ü£¬»ù±¾µÄÔ­ÀíÊǽ«StreamÊý¾Ý·Ö³ÉСµÄʱ¼äƬ¶Ï£¨¼¸Ã룩£¬ÒÔÀàËÆbatchÅúÁ¿´¦ÀíµÄ·½Ê½À´´¦ÀíÕâС²¿·ÖÊý¾Ý¡£Spark Streaming¹¹½¨ÔÚSparkÉÏ£¬Ò»·½ÃæÊÇÒòΪSparkµÄµÍÑÓ³ÙÖ´ÐÐÒýÇæ£¨100ms+£©¿ÉÒÔÓÃÓÚʵʱ¼ÆË㣬ÁíÒ»·½ÃæÏà±È»ùÓÚRecordµÄÆäËü´¦Àí¿ò¼Ü£¨ÈçStorm£©£¬RDDÊý¾Ý¼¯¸üÈÝÒ××ö¸ßЧµÄÈÝ´í´¦Àí¡£´ËÍâСÅúÁ¿´¦ÀíµÄ·½Ê½Ê¹µÃËü¿ÉÒÔͬʱ¼æÈÝÅúÁ¿ºÍʵʱÊý¾Ý´¦ÀíµÄÂß¼­ºÍËã·¨¡£·½±ãÁËһЩÐèÒªÀúÊ·Êý¾ÝºÍʵʱÊý¾ÝÁªºÏ·ÖÎöµÄÌØ¶¨Ó¦Óó¡ºÏ¡£

Overview

Spark StreamingÊôÓÚSparkµÄºËÐÄapi£¬ËüÖ§³Ö¸ßÍÌÍÂÁ¿¡¢Ö§³ÖÈÝ´íµÄʵʱÁ÷Êý¾Ý´¦Àí¡£

Ëü¿ÉÒÔ½ÓÊÜÀ´×ÔKafka, Flume, Twitter, ZeroMQºÍTCP SocketµÄÊý¾ÝÔ´£¬Ê¹Óüòµ¥µÄapiº¯Êý±ÈÈç map, reduce, join, windowµÈ²Ù×÷£¬»¹¿ÉÒÔÖ±½ÓʹÓÃÄÚÖõĻúÆ÷ѧϰËã·¨¡¢Í¼Ëã·¨°üÀ´´¦ÀíÊý¾Ý¡£

ËüµÄ¹¤×÷Á÷³ÌÏñÏÂÃæµÄͼËùʾһÑù£¬½ÓÊܵ½ÊµÊ±Êý¾Ýºó£¬¸øÊý¾Ý·ÖÅú´Î£¬È»ºó´«¸øSpark Engine´¦Àí×îºóÉú³É¸ÃÅú´ÎµÄ½á¹û¡£

ËüÖ§³ÖµÄÊý¾ÝÁ÷½ÐDstream£¬Ö±½ÓÖ§³ÖKafka¡¢FlumeµÄÊý¾ÝÔ´¡£DstreamÊÇÒ»ÖÖÁ¬ÐøµÄRDDs£¬ÏÂÃæÊÇÒ»¸öÀý×Ó°ïÖú´ó¼ÒÀí½âDstream¡£

A Quick Example

// ´´½¨StreamingContext£¬1ÃëÒ»¸öÅú´Î
val ssc = new StreamingContext(sparkConf, Seconds(1));

// »ñµÃÒ»¸öDStream¸ºÔðÁ¬½Ó ¼àÌý¶Ë¿Ú:µØÖ·
val lines = ssc.socketTextStream(serverIP, serverPort);

// ¶ÔÿһÐÐÊý¾ÝÖ´ÐÐSplit²Ù×÷
val words = lines.flatMap(_.split(" "));
// ͳ¼ÆwordµÄÊýÁ¿
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);

// Êä³ö½á¹û
wordCounts.print();

ssc.start(); // ¿ªÊ¼
ssc.awaitTermination(); // ¼ÆËãÍê±ÏÍ˳ö

Èç¹ûÒѾ­×°ºÃSparkµÄÅóÓÑ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÀý×ÓÊÔÊÔ¡£

Ê×ÏÈ£¬Æô¶¯Netcat£¬Õâ¸ö¹¤¾ßÔÚUnix-likeµÄϵͳ¶¼´æÔÚ£¬ÊǸö¼òÒ×µÄÊý¾Ý·þÎñÆ÷¡£

ʹÓÃÏÂÃæÕâ¾äÃüÁîÀ´Æô¶¯Netcat£º

$ nc -lk 9999

½Ó×ÅÆô¶¯example

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999

ÔÚNetcatÕâ¶ËÊäÈëhello world£¬¿´SparkÕâ±ßµÄ

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

Basics

ÏÂÃæÕâ¿éÊÇÈçºÎ±àд´úÂëµÄÀ²£¬ÍÛßÇßÇ£¡

Ê×ÏÈÎÒÃÇÒªÔÚSBT»òÕßMaven¹¤³ÌÌí¼ÓÒÔÏÂÐÅÏ¢£º

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating

//ÐèҪʹÓÃÒ»ÏÂÊý¾ÝÔ´µÄ£¬»¹ÒªÌí¼ÓÏàÓ¦µÄÒÀÀµ
Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

½ÓמÍÊÇʵÀý»¯

new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])

ÕâÊÇ֮ǰµÄÀý×Ó¶ÔDStreamµÄ²Ù×÷¡£

Input Sources

³ýÁËsocketsÖ®Í⣬ÎÒÃÇ»¹¿ÉÒÔÕâÑù´´½¨Dstream

streamingContext.fileStream(dataDirectory)

ÕâÀïÓÐ3¸öÒªµã£º

£¨1£©dataDirectoryϵÄÎļþ¸ñʽ¶¼ÊÇÒ»Ñù

£¨2£©ÔÚÕâ¸öĿ¼Ï´´½¨Îļþ¶¼ÊÇͨ¹ýÒÆ¶¯»òÕßÖØÃüÃûµÄ·½Ê½´´½¨µÄ

£¨3£©Ò»µ©Îļþ½øÈ¥Ö®ºó¾Í²»ÄÜÔٸıä

¼ÙÉèÎÒÃÇÒª´´½¨Ò»¸öKafkaµÄDstream¡£

import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)

Èç¹ûÎÒÃÇÐèÒª×Ô¶¨ÒåÁ÷µÄreceiver£¬¿ÉÒԲ鿴https://spark.incubator.apache.o ... stom-receivers.html

Operations

¶ÔÓÚDstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÁ½ÖÖ²Ù×÷£¬transformations ºÍ output

Transformations

Transformation                          Meaning
map(func) ¶Ôÿһ¸öÔªËØÖ´ÐÐfunc·½·¨
flatMap(func) ÀàËÆmapº¯Êý£¬µ«ÊÇ¿ÉÒÔmapµ½0+¸öÊä³ö
filter(func) ¹ýÂË
repartition(numPartitions) Ôö¼Ó·ÖÇø£¬Ìá¸ß²¢ÐжÈ
union(otherStream) ºÏ²¢Á½¸öÁ÷
count() ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ ͳ¼ÆÔªËصĸöÊý
reduce(func) ¶ÔRDDsÀïÃæµÄÔªËØ½øÐоۺϲÙ×÷£¬2¸öÊäÈë²ÎÊý£¬1¸öÊä³ö²ÎÊý
countByValue() Õë¶ÔÀàÐÍͳ¼Æ£¬µ±Ò»¸öDstreamµÄÔªËØµÄÀàÐÍÊÇKµÄʱºò£¬ µ÷ÓÃËü»á·µ»ØÒ»¸öеÄDstream£¬°üº¬<K,Long>¼üÖµ¶Ô£¬LongÊÇÿ¸öK³öÏֵįµÂÊ¡£
reduceByKey(func, [numTasks]) ¶ÔÓÚÒ»¸ö(K, V)ÀàÐ͵ÄDstream£¬ÎªÃ¿¸ökey£¬ Ö´ÐÐfuncº¯Êý£¬Ä¬ÈÏÊÇlocalÊÇ2¸öỊ̈߳¬clusterÊÇ8¸öỊ̈߳¬Ò²¿ÉÒÔÖ¸¶¨numTasks
join(otherStream, [numTasks]) °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, (V, W))µÄÐÂDstream
cogroup(otherStream, [numTasks]) °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, Seq[V], Seq[W])µÄÐÂDstream
transform(func) ת»»²Ù×÷£¬°ÑÔ­À´µÄRDDͨ¹ýfuncת»»³ÉÒ»¸öеÄRDD
updateStateByKey(func) Õë¶ÔkeyʹÓÃfuncÀ´¸üÐÂ״̬ºÍÖµ£¬¿ÉÒÔ½«state¸ÃΪÈκÎÖµ

UpdateStateByKey Operation

ʹÓÃÕâ¸ö²Ù×÷£¬ÎÒÃÇÊÇÏ£Íû±£´æËü״̬µÄÐÅÏ¢£¬È»ºó³ÖÐøµÄ¸üÐÂËü£¬Ê¹ÓÃËüÓÐÁ½¸ö²½Ö裺

£¨1£©¶¨Òå״̬£¬Õâ¸ö״̬¿ÉÒÔÊÇÈÎÒâµÄÊý¾ÝÀàÐÍ

£¨2£©¶¨Òå״̬¸üк¯Êý£¬´Óǰһ¸ö״̬¸ü¸ÄеÄ״̬

ÏÂÃæÕ¹Ê¾Ò»¸öÀý×Ó£º

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}

Ëü¿ÉÒÔÓÃÔÚ°üº¬(word, 1) µÄDstreamµ±ÖУ¬±ÈÈçÇ°ÃæÕ¹Ê¾µÄexample

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

Ëü»áÕë¶ÔÀïÃæµÄÿ¸öwordµ÷ÓÃһϸüк¯Êý£¬newValuesÊÇ×îеÄÖµ£¬runningCountÊÇ֮ǰµÄÖµ¡£

Transform Operation

ºÍtransformWithÒ»Ñù£¬¿ÉÒÔ¶ÔÒ»¸öDstream½øÐÐRDD->RDD²Ù×÷£¬±ÈÈçÎÒÃÇÒª¶ÔDstreamÁ÷ÀïµÄRDDºÍÁíÍâÒ»¸öÊý¾Ý¼¯½øÐÐjoin²Ù×÷£¬µ«ÊÇDstreamµÄAPIûÓÐÖ±½Ó±©Â¶³öÀ´£¬ÎÒÃǾͿÉÒÔʹÓÃtransform·½·¨À´½øÐÐÕâ¸ö²Ù×÷£¬ÏÂÃæÊÇÀý×Ó£º

val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})

ÁíÍ⣬ÎÒÃÇÒ²¿ÉÒÔÔÚÀïÃæÊ¹ÓûúÆ÷ѧϰËã·¨ºÍͼËã·¨¡£

Window Operations

ÏȾٸöÀý×Ó°É£¬±ÈÈçÇ°ÃæµÄword countµÄÀý×Ó£¬ÎÒÃÇÏëҪÿ¸ô10Ãë¼ÆËãÒ»ÏÂ×î½ü30ÃëµÄµ¥´Ê×ÜÊý¡£

ÎÒÃÇ¿ÉÒÔʹÓÃÒÔÏÂÓï¾ä£º

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

ÕâÀïÃæÌáµ½ÁËwindowsµÄÁ½¸ö²ÎÊý£º

£¨1£©window length£ºwindowµÄ³¤¶ÈÊÇ30Ã룬×î½ü30ÃëµÄÊý¾Ý

£¨2£©slice interval£º¼ÆËãµÄʱ¼ä¼ä¸ô

ͨ¹ýÕâ¸öÀý×Ó£¬ÎÒÃÇ´ó¸ÅÄܹ»´°¿ÚµÄÒâ˼ÁË£¬¶¨ÆÚ¼ÆË㻬¶¯µÄÊý¾Ý¡£

ÏÂÃæÊÇwindowµÄһЩ²Ù×÷º¯Êý£¬»¹ÊÇÓеã¶ùÀí½â²»ÁËwindowµÄ¸ÅÄMeaning¾Í²»·­ÒëÁË£¬Ö±½Óɾµô

Transformation Meaning
window(windowLength, slideInterval)
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
countByValueAndWindow(windowLength, slideInterval, [numTasks])

Output Operations

Output Operation                                      Meaning
print() ´òÓ¡µ½¿ØÖÆÌ¨
foreachRDD(func) ¶ÔDstreamÀïÃæµÄÿ¸öRDDÖ´ÐÐfunc£¬±£´æµ½Íⲿϵͳ
saveAsObjectFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪSequenceFile, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
saveAsTextFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪÎı¾Îļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪhadoopÎļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".

Persistence

DstreamÖеÄRDDÒ²¿ÉÒÔµ÷ÓÃpersist()·½·¨±£´æÔÚÄÚ´æµ±ÖУ¬µ«ÊÇ»ùÓÚwindowºÍstateµÄ²Ù×÷£¬reduceByWindow,reduceByKeyAndWindow,updateStateByKeyËüÃǾÍÊÇÒþʽµÄ±£´æÁË£¬ÏµÍ³ÒѾ­°ïËü×Ô¶¯±£´æÁË¡£

´ÓÍøÂç½ÓÊÕµÄÊý¾Ý(such as, Kafka, Flume, sockets, etc.)£¬Ä¬ÈÏÊDZ£´æÔÚÁ½¸ö½ÚµãÀ´ÊµÏÖÈÝ´íÐÔ£¬ÒÔÐòÁл¯µÄ·½Ê½±£´æÔÚÄÚ´æµ±ÖС£

RDD Checkpointing

״̬µÄ²Ù×÷ÊÇ»ùÓÚ¶à¸öÅú´ÎµÄÊý¾ÝµÄ¡£Ëü°üÀ¨»ùÓÚwindowµÄ²Ù×÷ºÍupdateStateByKey¡£ÒòΪ״̬µÄ²Ù×÷ÒªÒÀÀµÓÚÉÏÒ»¸öÅú´ÎµÄÊý¾Ý£¬ËùÒÔËüÒª¸ù¾Ýʱ¼ä£¬²»¶ÏÀÛ»ýÔªÊý¾Ý¡£ÎªÁËÇå¿ÕÊý¾Ý£¬ËüÖ§³ÖÖÜÆÚÐԵļì²éµã£¬Í¨¹ý°ÑÖмä½á¹û±£´æµ½hdfsÉÏ¡£ÒòΪ¼ì²é²Ù×÷»áµ¼Ö±£´æµ½hdfsÉϵĿªÏú£¬ËùÒÔÉèÖÃÕâ¸öʱ¼ä¼ä¸ô£¬ÒªºÜÉ÷ÖØ¡£¶ÔÓÚСÅú´ÎµÄÊý¾Ý£¬±ÈÈçÒ»ÃëµÄ£¬¼ì²é²Ù×÷»á´ó´ó½µµÍÍÌÍÂÁ¿¡£µ«ÊǼì²éµÄ¼ä¸ôÌ«³¤£¬»áµ¼ÖÂÈÎÎñ±ä´ó¡£Í¨³£À´Ëµ£¬5-10ÃëµÄ¼ì²é¼ä¸ôʱ¼äÊDZȽϺÏÊʵġ£

ssc.checkpoint(hdfsPath)  //ÉèÖüì²éµãµÄ±£´æÎ»ÖÃ
dstream.checkpoint(checkpointInterval) //ÉèÖüì²éµã¼ä¸ô

¶ÔÓÚ±ØÐëÉèÖüì²éµãµÄDstream£¬±ÈÈçͨ¹ýupdateStateByKeyºÍreduceByKeyAndWindow´´½¨µÄDstream£¬Ä¬ÈÏÉèÖÃÊÇÖÁÉÙ10Ãë¡£

Performance Tuning

¶ÔÓÚµ÷ÓÅ£¬¿ÉÒÔ´ÓÁ½¸ö·½Ã濼ÂÇ£º

£¨1£©ÀûÓü¯Èº×ÊÔ´£¬¼õÉÙ´¦Àíÿ¸öÅú´ÎµÄÊý¾ÝµÄʱ¼ä

£¨2£©¸øÃ¿¸öÅú´ÎµÄÊý¾ÝÁ¿µÄÉ趨һ¸öºÏÊʵĴóС

Level of Parallelism

ÏñһЩ·Ö²¼Ê½µÄ²Ù×÷£¬±ÈÈçreduceByKeyºÍreduceByKeyAndWindow£¬Ä¬ÈϵÄ8¸ö²¢·¢Ị̈߳¬¿ÉÒÔͨ¹ý¶ÔÓ¦µÄº¯ÊýÌá¸ßËüµÄÖµ£¬»òÕßͨ¹ýÐ޸IJÎÊýspark.default.parallelismÀ´Ìá¸ßÕâ¸öĬÈÏÖµ¡£

Task Launching Overheads

ͨ¹ý½øÐеÄÈÎÎñÌ«¶àÒ²²»ºÃ£¬±ÈÈçÿÃë50¸ö£¬·¢ËÍÈÎÎñµÄ¸ºÔؾͻá±äµÃºÜÖØÒª£¬ºÜÄÑʵÏÖѹÃë¼¶µÄʱÑÓÁË£¬µ±È»¿ÉÒÔͨ¹ýѹËõÀ´½µµÍÅú´ÎµÄ´óС¡£

Setting the Right Batch Size

ҪʹÁ÷³ÌÐòÄÜÔÚ¼¯ÈºÉÏÎȶ¨µÄÔËÐУ¬ÒªÊ¹´¦ÀíÊý¾ÝµÄËٶȸúÉÏÊý¾ÝÁ÷ÈëµÄËÙ¶È¡£×îºÃµÄ·½Ê½¼ÆËãÕâ¸öÅúÁ¿µÄ´óС£¬ÎÒÃÇÊ×ÏÈÉèÖÃbatch sizeΪ5-10ÃëºÍÒ»¸öºÜµÍµÄÊý¾ÝÊäÈëËÙ¶È¡£È·ÊµÏµÍ³ÄܸúÉÏÊý¾ÝµÄËٶȵÄʱºò£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ý¾­ÑéÉèÖÃËüµÄ´óС£¬Í¨¹ý²é¿´ÈÕÖ¾¿´¿´Total delayµÄ¶à³¤Ê±¼ä¡£Èç¹ûdelayµÄСÓÚbatchµÄ£¬ÄÇôϵͳ¿ÉÒÔÎȶ¨£¬Èç¹ûdelayÒ»Ö±Ôö¼Ó£¬ËµÃ÷ϵͳµÄ´¦ÀíËٶȸú²»ÉÏÊý¾ÝµÄÊäÈëËÙ¶È¡£

24/7 Operation

SparkĬÈϲ»»áÍü¼ÇÔªÊý¾Ý£¬±ÈÈçÉú³ÉµÄRDD£¬´¦ÀíµÄstages£¬µ«ÊÇSpark StreamingÊÇÒ»¸ö24/7µÄ³ÌÐò£¬ËüÐèÒªÖÜÆÚÐÔµÄÇåÀíÔªÊý¾Ý£¬Í¨¹ýspark.cleaner.ttlÀ´ÉèÖᣱÈÈçÎÒÉèÖÃËüΪ600£¬µ±³¬¹ý10·ÖÖÓµÄʱºò£¬Spark¾Í»áÇå³þËùÓÐÔªÊý¾Ý£¬È»ºó³Ö¾Ã»¯RDDs¡£µ«ÊÇÕâ¸öÊôÐÔÒªÔÚSparkContext ´´½¨Ö®Ç°ÉèÖá£

µ«ÊÇÕâ¸öÖµÊǺÍÈκεÄwindow²Ù×÷°ó¶¨¡£Spark»áÒªÇóÊäÈëÊý¾ÝÔÚ¹ýÆÚÖ®ºó±ØÐë³Ö¾Ã»¯µ½ÄÚ´æµ±ÖУ¬ËùÒÔ±ØÐëÉèÖÃdelayµÄÖµÖÁÉÙºÍ×î´óµÄwindow²Ù×÷Ò»Ö£¬Èç¹ûÉèÖÃСÁË£¬¾Í»á±¨´í¡£

Monitoring

³ýÁËSparkÄÚÖÃµÄ¼à¿ØÄÜÁ¦£¬»¹¿ÉÒÔStreamingListenerÕâ¸ö½Ó¿ÚÀ´»ñÈ¡Åú´¦ÀíµÄʱ¼ä, ²éѯʱÑÓ, È«²¿µÄ¶Ëµ½¶ËµÄÊÔÑé¡£

Memory Tuning

Spark StreamĬÈϵÄÐòÁл¯·½Ê½ÊÇStorageLevel.MEMORY_ONLY_SER£¬¶ø²»ÊÇRDDµÄStorageLevel.MEMORY_ONLY¡£

ĬÈϵģ¬ËùÓг־û¯µÄRDD¶¼»áͨ¹ý±»SparkµÄLRUËã·¨ÌÞ³ý³öÄڴ棬Èç¹ûÉèÖÃÁËspark.cleaner.ttl£¬¾Í»áÖÜÆÚÐÔµÄÇåÀí£¬µ«ÊÇÕâ¸ö²ÎÊýÉèÖÃÒªºÜ½÷É÷¡£Ò»¸ö¸üºÃµÄ·½·¨ÊÇÉèÖÃspark.streaming.unpersistΪtrue£¬Õâ¾ÍÈÃSparkÀ´¼ÆËãÄÄЩRDDÐèÒª³Ö¾Ã»¯£¬ÕâÑùÓÐÀûÓÚÌá¸ßGCµÄ±íÏÖ¡£

ÍÆ¼öʹÓÃconcurrent mark-and-sweep GC£¬ËäÈ»ÕâÑù»á½µµÍϵͳµÄÍÌÍÂÁ¿£¬µ«ÊÇÕâÑùÓÐÖúÓÚ¸üÎȶ¨µÄ½øÐÐÅú´¦Àí¡£

Fault-tolerance PropertiesFailure of a Worker Node

ÏÂÃæÓÐÁ½ÖÖʧЧµÄ·½Ê½£º

1.ʹÓÃhdfsÉϵÄÎļþ£¬ÒòΪhdfsÊǿɿ¿µÄÎļþϵͳ£¬ËùÒÔ²»»áÓÐÈκεÄÊý¾ÝʧЧ¡£

2.Èç¹ûÊý¾ÝÀ´Ô´ÊÇÍøÂ磬±ÈÈçKafkaºÍFlume£¬ÎªÁË·ÀֹʧЧ£¬Ä¬ÈÏÊÇÊý¾Ý»á±£´æµ½2¸ö½ÚµãÉÏ£¬µ«ÊÇÓÐÒ»ÖÖ¿ÉÄÜÐÔÊǽÓÊÜÊý¾ÝµÄ½Úµã¹ÒÁË£¬ÄÇôÊý¾Ý¿ÉÄܻᶪʧ£¬ÒòΪËü»¹Ã»À´µÃ¼°°ÑÊý¾Ý¸´ÖƵ½ÁíÍâÒ»¸ö½Úµã¡£

Failure of the Driver Node

ΪÁËÖ§³Ö24/7²»¼ä¶ÏµÄ´¦Àí£¬SparkÖ§³ÖÇý¶¯½ÚµãʧЧºó£¬ÖØÐ»ָ´¼ÆËã¡£Spark Streaming»áÖÜÆÚÐÔµÄдÊý¾Ýµ½hdfsϵͳ£¬¾ÍÊÇÇ°ÃæµÄ¼ì²éµãµÄÄǸöĿ¼¡£Çý¶¯½ÚµãʧЧ֮ºó£¬StreamingContext¿ÉÒÔ±»»Ö¸´µÄ¡£

ΪÁËÈÃÒ»¸öSpark Streaming³ÌÐòÄܹ»±»»Ø¸´£¬ËüÐèÒª×öÒÔϲÙ×÷£º

£¨1£©µÚÒ»´ÎÆô¶¯µÄʱºò£¬´´½¨ StreamingContext£¬´´½¨ËùÓеÄstreams£¬È»ºóµ÷ÓÃstart()·½·¨¡£

£¨2£©»Ö¸´ºóÖØÆôµÄ£¬±ØÐëͨ¹ý¼ì²éµãµÄÊý¾ÝÖØÐ´´½¨StreamingContext¡£

ÏÂÃæÊÇÒ»¸öʵ¼ÊµÄÀý×Ó£º

ͨ¹ýStreamingContext.getOrCreateÀ´¹¹ÔìStreamingContext£¬¿ÉÒÔʵÏÖÉÏÃæËù˵µÄ¡£

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}

// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

ÔÚstand-aloneµÄ²¿ÊðģʽÏÂÃæ£¬Çý¶¯½ÚµãʧЧÁË£¬Ò²¿ÉÒÔ×Ô¶¯»Ö¸´£¬ÈñðµÄÇý¶¯½ÚµãÌæ´úËü¡£Õâ¸ö¿ÉÒÔÔÚ±¾µØ½øÐвâÊÔ£¬ÔÚÌá½»µÄʱºò²ÉÓÃsuperviseģʽ£¬µ±Ìá½»Á˳ÌÐòÖ®ºó£¬Ê¹ÓÃjps²é¿´½ø³Ì£¬¿´µ½ÀàËÆDriverWrapper¾ÍɱËÀËü£¬Èç¹ûÊÇʹÓÃYARNģʽµÄ»°¾ÍµÃʹÓÃÆäËü·½Ê½À´ÖØÐÂÆô¶¯ÁË¡£

ÕâÀï˳±ãÌáÒ»ÏÂÏò¿Í»§¶ËÌá½»³ÌÐò°É£¬Ö®Ç°×ܽáµÄʱºò°ÑÕâ¿é¸øÂäÏÂÁË¡£

./bin/spark-class org.apache.spark.deploy.Client launch
[client-options] \
<cluster-url> <application-jar-url> <main-class> \
[application-options]

cluster-url: masterµÄµØÖ·.
application-jar-url: jar°üµÄµØÖ·£¬×îºÃÊÇhdfsÉϵÄ,´øÉÏhdfs£º//...·ñÔòÒªËùÓеĽڵãµÄĿ¼Ï¶¼ÓÐÕâ¸öjarµÄ
main-class: Òª·¢²¼µÄ³ÌÐòµÄmainº¯ÊýËùÔÚÀà.
Client Options:
--memory <count> (Çý¶¯³ÌÐòµÄÄڴ棬µ¥Î»ÊÇMB)
--cores <count> (ΪÄãµÄÇý¶¯³ÌÐò·ÖÅä¶àÉÙ¸öºËÐÄ)
--supervise (½ÚµãʧЧµÄʱºò£¬ÊÇ·ñÖØÐÂÆô¶¯Ó¦ÓÃ)
--verbose (´òÓ¡ÔöÁ¿µÄÈÕÖ¾Êä³ö)

ÔÚδÀ´µÄ°æ±¾£¬»áÖ§³ÖËùÓеÄÊý¾ÝÔ´µÄ¿É»Ö¸´ÐÔ¡£

ΪÁ˸üºÃµÄÀí½â»ùÓÚHDFSµÄÇý¶¯½ÚµãʧЧ»Ö¸´£¬ÏÂÃæÓÃÒ»¸ö¼òµ¥µÄÀý×ÓÀ´ËµÃ÷£º

Time Number of lines in input file Output without driver failure Output with driver failure
¡¡¡¡10 ¡¡¡¡¡¡¡¡10 ¡¡¡¡¡¡¡¡¡¡¡¡10
¡¡¡¡20 ¡¡¡¡¡¡¡¡20 ¡¡¡¡¡¡¡¡¡¡ 20
¡¡¡¡30 ¡¡¡¡¡¡¡¡30 ¡¡¡¡¡¡¡¡¡¡ 30
¡¡¡¡40 ¡¡¡¡¡¡¡¡40 ¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER FAILS] no output
¡¡¡¡50 ¡¡¡¡¡¡¡¡50 ¡¡¡¡¡¡¡¡¡¡¡¡no output
¡¡¡¡60 ¡¡¡¡¡¡¡¡60 ¡¡¡¡¡¡¡¡¡¡¡¡no output
¡¡¡¡70 ¡¡¡¡¡¡¡¡70 ¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER RECOVERS] 40, 50, 60, 70
¡¡¡¡80 ¡¡¡¡¡¡¡¡80 ¡¡¡¡¡¡¡¡¡¡¡¡80
¡¡¡¡90 ¡¡¡¡¡¡¡¡90 ¡¡¡¡¡¡¡¡¡¡¡¡90
¡¡100 ¡¡¡¡¡¡¡¡100 ¡¡¡¡¡¡¡¡¡¡ 100

ÔÚ4µÄʱºò³öÏÖÁË´íÎó£¬40,50,60¶¼Ã»ÓÐÊä³ö£¬µ½70µÄʱºò»Ö¸´ÁË£¬»Ö¸´Ö®ºó°Ñ֮ǰûÊä³öµÄÒ»ÏÂ×ÓÈ«²¿Êä³ö¡£

   
3548 ´Îä¯ÀÀ       30
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ

²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí