Spark
»ùÓÚÄÚ´æ¼ÆË㣬Ìá¸ßÁËÔÚ´óÊý¾Ý»·¾³ÏÂÊý¾Ý´¦ÀíµÄʵʱÐÔ£¬Í¬Ê±±£Ö¤Á˸ßÈÝ´íÐԺ͸߿ÉÉìËõÐÔ£¬ÔÊÐíÓû§½«Spark
²¿ÊðÔÚ´óÁ¿Á®¼ÛÓ²¼þÖ®ÉÏ£¬Ðγɼ¯Èº¡£
ÈÏʶSpark
Apache Spark is an open source cluster
computing system that aims to make data analytics fast
¡ª both fast to run and fast to write. SparkÊÇÒ»¸ö¿ªÔ´µÄ·Ö²¼Ê½¼ÆËãϵͳ£¬ËüµÄÄ¿µÄÊÇʹµÃÊý¾Ý·ÖÎö¸ü¿ì¡ª¡ªÐ´ÆðÀ´ºÍÔËÐÐÆðÀ´¶¼ºÜ¿ì¡£
Spark ÊÇ»ùÓÚÄÚ´æ¼ÆËãµÄ´óÊý¾Ý²¢ÐмÆËã¿ò¼Ü¡£Spark »ùÓÚÄÚ´æ¼ÆË㣬Ìá¸ßÁËÔÚ´óÊý¾Ý»·¾³ÏÂÊý¾Ý´¦ÀíµÄʵʱÐÔ£¬Í¬Ê±±£Ö¤Á˸ßÈÝ´íÐԺ͸߿ÉÉìËõÐÔ£¬ÔÊÐíÓû§½«Spark
²¿ÊðÔÚ´óÁ¿Á®¼ÛÓ²¼þÖ®ÉÏ£¬Ðγɼ¯Èº¡£
Spark·¢Õ¹Ê·
2009Ä꣬Sparkµ®ÉúÓÚ¼ÓÖÝ´óѧ²®¿ËÀû·ÖУAMPLab
2013Äê6Ô£¬Spark³ÉΪApache·õ»¯ÏîÄ¿
2014Äê2Ô£¬SparkÈ¡´úMapReduce³ÉΪApache¶¥¼¶ÏîÄ¿
SparkÉú̬ϵͳ
SparkÓµÓÐÒ»Ì×Éú̬ϵͳ£¬½Ð×ö²®¿ËÀûÊý¾Ý·ÖÎöÕ»£¨BDAS£©

Ϊʲô±ÈMapReduce¿ì
Spark¹ÙÍøÉÏÈçÊÇ˵£ºRun programs up to 100x faster than Hadoop
MapReduce in memory, or 10x faster on disk. ÄÇSpark¾¿¾¹ÊÇΪʲô±ÈMapReduce¿ìÄØ£¿
MapReduceͨ³£»á½«Öмä½á¹û·Åµ½HDFSÉÏ£¬Spark ÊÇ»ùÓÚÄÚ´æ¼ÆËãµÄ´óÊý¾Ý²¢ÐмÆËã¿ò¼Ü£¬Öмä½á¹ûÔÚÄÚ´æÖУ¬¶ÔÓÚµü´úÔËËãЧÂʱȽϸߡ£
MapReduceÏûºÄÁË´óÁ¿Ê±¼äÈ¥ÅÅÐò£¬¶øÓÐЩ³¡¾°²»ÐèҪȥÅÅÐò£¬Spark¿ÉÒÔ±ÜÃâ²»ÒªµÄÅÅÐò´øÀ´µÄ¿ªÏú¡£
SparkÄܹ»½«ÒªÖ´ÐеIJÙ×÷×ö³ÉÒ»ÕÅÓÐÏòÎÞ»·Í¼£¨DAG£©£¬È»ºó½øÐÐÓÅ»¯¡£
ÆäËûÓÅÊÆ
Spark²ÉÓÃʼþÇý¶¯µÄÀà¿âAKKAÆô¶¯ÈÎÎñ£¬Í¨¹ýÏ̳߳ØÀ´±ÜÃâÆô¶¯ÈÎÎñµÄ¿ªÏú¡£
Spark¸ü¼ÓͨÓ㬳ýÁ˾ßÓÐmap¡¢reduceËã×ÓÖ®Í⣬»¹ÓÐfilter¡¢joinµÈ80¶àÖÖËã×Ó¡£
Ö§³ÖµÄAPI
Scala£¨ºÜºÃ£©£¬Python£¨²»´í£©£¬Java£¨¡£©
ÔËÐÐģʽ
Local£¨Ö»ÓÃÓÚ²âÊÔ£©
Standalone£º¶ÀÁ¢Ä£Ê½
Spark on yarn£º×îÓÐǰ¾°µÄģʽ
Spark on Mesos£º¹Ù·½ÍƼö
Amazon EC2
Spark runtime

SparkÔËÐÐʱ£ºÓû§µÄDriver³ÌÐòÆô¶¯¶à¸öWorker£¬Worker´ÓÎļþϵͳÖмÓÔØÊý¾Ý£¬Éú³ÉеÄRDD£¬²¢°´ÕÕ²»Í¬µÄ·ÖÇøCacheµ½ÄÚ´æÖС£
µ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯RDD
RDDÓ¢ÎÄÈ«³ÆResilient Distributed Dataset£¬¼´µ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯¡£RDDÊÇÖ»¶ÁµÄ¡¢·ÖÇø¼Ç¼µÄ¼¯ºÏ¡£SparkÖеÄÒ»Çж¼ÊÇ»ùÓÚRDDµÄ£¬ÎÒÃÇͨ¹ýÒÔϼ¸¸ö·½ÃæÀ´Á˽âËü£º
1¡¢´´½¨
1£©´Ó¼¯ºÏת»»¶øÀ´£»
2£©»ùÓÚÎļþϵͳ£¨±¾µØÎļþ¡¢HDFS¡¢HBaseµÈ£©µÄÊäÈë´´½¨£»
3£©´Ó¸¸RDDת»»¶øÀ´¡£
2¡¢¼ÆËãÀàÐÍ
1£©Transformation£¨×ª»»£©£ºÑÓ³ÙÖ´ÐУ¬Ò²¾ÍÊÇͨ¹ýת»»Éú³ÉÒ»¸öеÄRDDʱºò²¢²»»áÁ¢¼´Ö´ÐУ¬Ö»Óеȵ½Action£¨Ðж¯£©Ê±£¬²Å´¥·¢²Ù×÷¡£³£ÓòÙ×÷ÓÐmap¡¢filterµÈ¡£
2£©Action£¨Ðж¯£©£ºÌá½»Spark×÷Òµ£¬Æô¶¯¼ÆËã²Ù×÷£¬²¢²úÉú×îÖÕ½á¹û£¨ÏòÓû§³ÌÐò·µ»Ø»òÕßдÈëÎļþϵͳ£©¡£
3¡¢ÈÝ´í
Lineage£ºRDDº¬ÓÐÈçºÎ´Ó¸¸RDDÑÜÉú³ö±¾RDDµÄÏà¹ØÐÅÏ¢£¬³ö´íʱRDD¿ÉÒÔͨ¹ýLineage»Ö¸´¡£
4¡¢ÄÚ²¿ÊôÐÔ
1£©·ÖÇøÁбí
2£©¼ÆËãÿ¸ö·ÖƬµÄº¯Êý
3£©¶Ô¸¸RDDµÄÒ»×éÒÀÀµ
4£©¶ÔKey-ValueÊý¾ÝÀàÐÍRDDµÄ·ÖÇøÆ÷£¬Óû§¿ÉÒÔÖ¸¶¨·ÖÇø²ßÂԺͷÖÇøÊý
5£©Ã¿¸öÊý¾Ý·ÖÇøµÄµØÖ·ÁÐ±í£¨ÈçHDFSÉϵÄÊý¾Ý¿éµÄµØÖ·£©
Spark Shell
Spark×Ô´øµÄ½»»¥Ê½Shell³ÌÐò£¬·½±ãÓû§½øÐн»»¥Ê½±à³Ì¡£½øÈ뷽ʽ£º
µ±´ò¿ªspark shellµÄʱºòSparkContextÒѾ±»³õʼ»¯ÁË£¬¶ÔÏóÃûΪsc£¬Ö±½ÓʹÓü´¿É¡£¸úScala½âÊÍÆ÷ºÜÏñ£¬ÔÚÉÏÃæ¿ÉÒÔ½øÐн»»¥Ê½²Ù×÷¡£
½ÓÏÂÀ´µÄÄÚÈÝ¿ÉÄÜÐèÒªÄãÁ˽âScalaÓïÑÔ£¬¿ÉÒÔ²ÎÕÕScala¼«ËÙÈëÃÅ
WordCount¿ªÎ¸²Ë
½ÓÏÂÀ´ÎÒÃÇÀ´¸öʵʵÔÚÔÚµÄÀý×Ó£¬×÷Ϊ½éÉÜËã×Ó֮ǰµÄ¿ªÎ¸Ð¡²Ë¡£ÓùýMapReduceµÄͬѧ¿Ï¶¨Ð´¹ýJavaʵÏÖµÄWordCount³ÌÐò£¬Èç¹ûÐèÒªÅÅÐòµÄ»°»¹ÒªÔÙÁ´½ÓÒ»¸öÅÅÐòÈÎÎñ£¬Âß¼²»¸´ÔÓ¡¢´úÂëÈ´Ò²²»ÉÙ¡£ÎÒÃÇÔÚSparkÖÐÓÃScalaÓïÑÔ¿ÉÒÔÔõôʵÏÖÄØ£¿ÎÒÃÇÖ±½ÓÔÚSpark-ShellÖвÙ×÷¡£
scala> sc.textFile("hdfs://.../wordcount.data"). | flatMap(_ split " ").map((_, 1)).reduceByKey(_ + _). | map(x=>(x._2, x._1)).sortByKey(false).map(x=>(x._2,x._1)). | saveAsTextFile("hdfs://.../result") |
ÔÚScala½âÊÍÆ÷ÖУ¬Èç¹ûÊäÈëµÄ²»ÊÇÒ»¸öÍêÕûµÄ¿ÉÖ´ÐеÄÓï¾ä£¬È»ºóÖ±½ÓÇÃÁ˻سµ£¬»á³öÏÖ¿ªÊ¼µÄ£ü£¬±íʾ¿ÉÒÔ¼ÌÐøÂ¼È룬ֱµ½ÊäÈëÒ»¸öÍêÕûµÄÓï¾ä¡£Ò²¾ÍÊÇ˵ÎÒÃǸոÕÓÃÒ»ÐдúÂ룬¸ã¶¨ÁËWordCount
+ ÅÅÐò¹¦ÄÜ¡£ÎÒÃÇÔÚºóÎÄÖÐÔÚ¶Ô´úÂë×öÒ»¸ö¾ßÌåµÄ½âÊÍ¡£¶Ô±ÈһϿÔßê¿ÔßêдµÄ¡¶MapReduceͳ¼Æ´ÊÓï³öÏÖ´ÎÊý¡·£¨ËäÈ»¹¦ÄÜÓÐÒ»µãµã²îÒ죩£¬ÎÒÖ»ÄÜÓÐÕâÑùµÄ¸Ð¿®£ºº¯Êýʽ±à³Ì£¬Ë¬£»Spark£¬Ë§£¡
³£ÓÃËã×Ó

1. SparkÊäÈ룺
´Ó¼¯ºÏÖÐÊäÈ룺
val rdd1 = sc.parallelize(List("Java", "Scala", "Spark"))
|
´ÓÎļþϵͳÖÐÊäÈ룺
val rdd2 = sc.textFile("hdfs://.../wordcount.data") |
2. cache
cache ½«RDD ÔªËØ´Ó´ÅÅÌ»º´æµ½Äڴ棬Ï൱ÓÚpersist(MEMORY_ONLY) º¯ÊýµÄ¹¦ÄÜ¡£RDDÔÙ´ÎʹÓõϰ£¬¾ÍÖ±½Ó´ÓÄÚ´æÖжÁÈ¡Êý¾Ý¡£
3. map(func)
Return a new distributed dataset formed by passing
each element of the source through a function func.
½«ÔÀ´RDDÖеÄÿ¸öÊý¾ÝÏîͨ¹ýº¯ÊýfuncÓ³ÉäΪеÄÊý¾ÝÏî¡£
val rdd2 = sc.parallelize(List(1,2,3,4)) val rdd3 = rdd2.map(_ + 2) |
rdd3´Órdd2ת»»¶øÀ´£¨rdd2ÖеÄÿ¸öÊý¾ÝÏî¼Ó2£©£¬rdd3ÖеÄÊý¾ÝÏî±äΪ[3,4,5,6]¡£µ±È»ÒòΪÊÇtransformationsÀàÐ͵ÄËã×Ó£¬²¢²»»áÁ¢¼´Ö´ÐС£
4. filter(func)
Return a new dataset formed by selecting those elements
of the source on which funcreturns true.
¶ÔÔRDDÖеÄÊý¾Ý½øÐйýÂË£¬Ã¿¸öÔªËØ¾¹ýfuncº¯Êý´¦Àí£¬·µ»ØtrueµÄ±£Áô¡£
val rdd4 = rdd3.filter(_ > 4) |
rdd3ÖÐËùÓдóÓÚ4µÄÊý¾ÝÏî×é³ÉÁËrdd4¡£
5. flatMap(func)
Similar to map, but each input item can be mapped to
0 or more output items (so funcshould return a Seq rather
than a single item).
ÓëmapÏàËÆ£¬Ö»²»¹ýÿ¸öÊý¾ÝÏîͨ¹ýº¯ÊýfuncÓ³Éä³É0¸ö»òÕß¶à¸öÊý¾ÝÏ¼´funcÒª·µ»ØÒ»¸ö¼¯ºÏ£©£¬²¢½«ÐÂÉú³ÉµÄRDDÖеÄÔªËØºÏ²¢µ½Ò»¸ö¼¯ºÏÖС£
6. sample(withReplacement, fraction,
seed)
Sample a fraction fraction of the data, with or without
replacement, using a given random number generator seed.
¶ÔÒÑÓеÄRDD½øÐвÉÑù£¬»ñÈ¡×Ó¼¯¡£²¢ÇÒ¿ÉÒÔÖ¸¶¨ÊÇ·ñÓзŻزÉÑù¡¢²ÉÑù°Ù·Ö±È¡¢Ëæ»úÖÖ×Ó¡£º¯Êý²ÎÊýÈçÏ£º
withReplacement = true£¬ÓзŻسéÑù£»withReplacement =false£¬Î޷ŻسéÑù¡£
fraction ²ÉÑùËæ»ú±È¡£
seed ²ÉÑùÖÖ×Ó£¬Ò²¾ÍÊÇÒ»¶¨°üº¬ÔÚ²ÉÑùÉú³ÉµÄrddÖС£
7. groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a
dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation
(such as a sum or average) over each key, using reduceByKey
or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output
depends on the number of partitions of the parent RDD.
You can pass an optional numTasks argument to set a
different number of tasks.
½«º¬ÓÐÏàͬkeyµÄÊý¾ÝºÏ²¢µ½Ò»¸ö×飬[numTasks]Ö¸¶¨ÁË·ÖÇøµÄ¸öÊý¡£
val rdd4 = sc.parallelize(List(("aa", 1), ("aa", 2), ("bb", 3), ("bb", 4))) val rdd5 = rdd4.groupByKey |
rdd5½á¹ûΪArray[(String, Iterable[Int])] = Array((aa,CompactBuffer(1,
2)), (bb,CompactBuffer(3, 4)))
8. reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a
dataset of (K, V) pairs where the values for each key
are aggregated using the given reduce function func,
which must be of type (V,V) => V. Like in groupByKey,
the number of reduce tasks is configurable through an
optional second argument.
½«ÏàͬµÄkeyÒÀ¾Ýº¯ÊýfuncºÏ²¢¡£
9. union(otherDataset)
Return a new dataset that contains the union of the
elements in the source dataset and the argument.
½«Á½¸öRDDºÏ²¢£¬ÒªÇóÁ½¸öRDDÖеÄÊý¾ÝÏîÀàÐÍÒ»Ö¡£
10. join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W),
returns a dataset of (K, (V, W)) pairs with all pairs
of elements for each key. Outer joins are supported
through leftOuterJoin,rightOuterJoin, and fullOuterJoin.
val rddtest1 = sc.parallelize(List(("James", 1), ("Wade", 2), ("Paul", 3))) val rddtest2 = sc.parallelize(List(("James", 4), ("Wade", 5))) val rddtest12 = rddtest1 join rddtest2 |
rddtest12½á¹ûΪ£ºArray[(String, (Int, Int))] = Array((James,(1,4)),
(Wade,(2,5)))
11. cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W),
returns a dataset of (K, Iterable<V>, Iterable<W>)
tuples. This operation is also called groupWith.
¶ÔÔÚÁ½¸öRDD ÖеÄKey-Value ÀàÐ͵ÄÔªËØ£¬Ã¿¸öRDD ÏàͬKey µÄÔªËØ·Ö±ð¾ÛºÏΪһ¸ö¼¯ºÏ£¬²¢ÇÒ·µ»ØÁ½¸öRDD
ÖжÔÓ¦Key µÄÔªËØ¼¯ºÏµÄµü´úÆ÷¡£
ʹÓÃÉÏÀýÖÐrddtest1 cogroup rddtest2£¬½á¹ûÊÇ£º
Array((Paul,(CompactBuffer(3),CompactBuffer())), (James,(CompactBuffer(1),CompactBuffer(4))),
(Wade,(CompactBuffer(2),CompactBuffer(5))))
12. sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements
Ordered, returns a dataset of (K, V) pairs sorted by
keys in ascending or descending order, as specified
in the boolean ascending argument.
°´ÕÕkeyÀ´ÅÅÐò£¬Ä¬ÈÏ´ÓСµ½´ó¡£Èç¹û¼ÓÉϲÎÊýfalse£¬Ôò´Ó´óµ½Ð¡ÅÅÐò¡£
13. count()
Return the number of elements in the dataset.
·µ»ØÊý¾ÝÏîµÄ¸öÊý¡£
14. collect()
Return all the elements of the dataset as an array
at the driver program. This is usually useful after
a filter or other operation that returns a sufficiently
small subset of the data.
½«·Ö²¼Ê½µÄRDD ·µ»ØÎªÒ»¸öµ¥»úµÄ×㹻СµÄscala Array Êý×é¡£
15. countByKey()
Only available on RDDs of type (K, V). Returns a hashmap
of (K, Int) pairs with the count of each key.
ÓÃÓÚkey-valueÀàÐ͵ÄRDD£¬·µ»ØÃ¿¸ökey¶ÔÓ¦µÄ¸öÊý¡£
16. lookup(key: K)
ÓÃÓÚkey-valueÀàÐ͵ÄRDD£¬·µ»Økey¶ÔÓ¦µÄËùÓÐvalueÖµ¡£
17. reduce(func)
Aggregate the elements of the dataset using a function
func (which takes two arguments and returns one). The
function should be commutative and associative so that
it can be computed correctly in parallel.
µü´ú±éÀúÿһ¸öÔªËØ£¬²¢Ö´Ðк¯Êýfunc¡£
val reduceRdd = sc.parallelize(List(1,2,3,4,5)) reduceRdd.reduce(_ + _) |
¼ÆËã½á¹ûΪËùÓÐÔªËØÖ®ºÍ15¡£
18. saveAsTextFile(path)
Write the elements of the dataset as a text file (or
set of text files) in a given directory in the local
filesystem, HDFS or any other Hadoop-supported file
system. Spark will call toString on each element to
convert it to a line of text in the file.
½«ÎļþÊä³öµ½Îļþϵͳ¡£
×¢£ºÉÏÊöËã×ÓÖкÚÌåµÄΪActionÀàÐ͵ÄËã×Ó¡£
¿´Íê¶ÔËã×ӵĽéÉÜ£¬ÎÒÃÇÔÙ¿´Ò»ÏÂÍêÕûµÄ³ÌÐò¡£
if (args.length != 3) { println("Usage : java -jar code.jar dependency_jars file_location save_location") System.exit(0) } val jars = ListBuffer[String]() args(0).split(',').map(jars += _) val conf = new SparkConf() conf.setMaster("spark://host:port") .setSparkHome("your-spark-home") .setAppName("WordCount") .setJars(jars) .set("spark.executor.memory","25g") val sc = new SparkContext(conf) sc.textFile(args(1)) // ´ÓÎļþϵͳÖжÁÈ¡Îļþ .flatMap(_ split " ") // ½«Ã¿Ò»ÐÐÊý¾Ý£¬ÒÔ¿Õ¸ñΪ·Ö¸ô·û£¬²ð·Öµ¥´Ê .map((_, 1)) // ÿ¸ö´ÊÓï¼ÆÊýΪ1 .reduceByKey(_ + _) // ͳ¼ÆÃ¿¸ö´ÊÓïµÄ¸öÊý .map(x=>(x._2, x._1)) // key-value»¥»» .sortByKey(false) // °´ÕÕkeyÀ´ÅÅÐò£¨´Ó´óµ½Ð¡£© .map(x=>(x._2,x._1)) // key-value»¥»» .saveAsTextFile(args(2)) // ½«½á¹ûÊä³öµ½ÎļþϵͳÖÐ |
×ܽá
±¾Îļòµ¥½éÉÜÁËSparkÖеÄһЩ¸ÅÄ²¢½áºÏÁËһЩ¼òµ¥µÄÀý×Ó¡£ÄÚÈÝдµÄ±È½ÏdzÏÔ£¬Ëæ×űÊÕßµÄÉîÈëÑо¿£¬Ò²»áÔÚºóÐø²©¿ÍÖжԸ÷²¿·ÖÄÚÈÝ×ö¸üÉîÈëµÄ²ûÊö¡£Ï£Íû±¾ÆªÄܶԿ´µ½µÄÈËÓÐËù°ïÖú¡£
|