¸ÅÊö
ʲôÊÇSpark
SparkÊÇUC Berkeley AMP labËù¿ªÔ´µÄÀàHadoop
MapReduceµÄͨÓõIJ¢ÐмÆËã¿ò¼Ü£¬Spark»ùÓÚmap reduceË㷨ʵÏֵķֲ¼Ê½¼ÆË㣬ӵÓÐHadoop
MapReduceËù¾ßÓеÄÓŵ㣻µ«²»Í¬ÓÚMapReduceµÄÊÇJobÖмäÊä³öºÍ½á¹û¿ÉÒÔ±£´æÔÚÄÚ´æÖУ¬´Ó¶ø²»ÔÙÐèÒª¶ÁдHDFS£¬Òò´ËSparkÄܸüºÃµØÊÊÓÃÓÚÊý¾ÝÍÚ¾òÓë»úÆ÷ѧϰµÈÐèÒªµü´úµÄmap
reduceµÄËã·¨¡£Æä¼Ü¹¹ÈçÏÂͼËùʾ£º

SparkÓëHadoopµÄ¶Ô±È
SparkµÄÖмäÊý¾Ý·Åµ½ÄÚ´æÖУ¬¶ÔÓÚµü´úÔËËãЧÂʸü¸ß¡£
Spark¸üÊʺÏÓÚµü´úÔËËã±È½Ï¶àµÄMLºÍDMÔËËã¡£ÒòΪÔÚSparkÀïÃæ£¬ÓÐRDDµÄ³éÏó¸ÅÄî¡£
Spark±ÈHadoop¸üͨÓá£
SparkÌṩµÄÊý¾Ý¼¯²Ù×÷ÀàÐÍÓкܶàÖÖ£¬²»ÏñHadoopÖ»ÌṩÁËMapºÍReduceÁ½ÖÖ²Ù×÷¡£±ÈÈçmap,
filter, flatMap, sample, groupByKey, reduceByKey, union,
join, cogroup, mapValues, sort,partionByµÈ¶àÖÖ²Ù×÷ÀàÐÍ£¬Spark°ÑÕâЩ²Ù×÷³ÆÎªTransformations¡£Í¬Ê±»¹ÌṩCount,
collect, reduce, lookup, saveµÈ¶àÖÖactions²Ù×÷¡£
ÕâЩ¶àÖÖ¶àÑùµÄÊý¾Ý¼¯²Ù×÷ÀàÐÍ£¬¸ø¸ø¿ª·¢ÉϲãÓ¦ÓõÄÓû§ÌṩÁË·½±ã¡£¸÷¸ö´¦Àí½ÚµãÖ®¼äµÄͨÐÅÄ£ÐͲ»ÔÙÏñHadoopÄÇÑù¾ÍÊÇΨһµÄData
ShuffleÒ»ÖÖģʽ¡£Óû§¿ÉÒÔÃüÃû£¬Îﻯ£¬¿ØÖÆÖмä½á¹ûµÄ´æ´¢¡¢·ÖÇøµÈ¡£¿ÉÒÔ˵±à³ÌÄ£ÐͱÈHadoop¸üÁé»î¡£
²»¹ýÓÉÓÚRDDµÄÌØÐÔ£¬Spark²»ÊÊÓÃÄÇÖÖÒ첽ϸÁ£¶È¸üÐÂ״̬µÄÓ¦Óã¬ÀýÈçweb·þÎñµÄ´æ´¢»òÕßÊÇÔöÁ¿µÄwebÅÀ³æºÍË÷Òý¡£¾ÍÊǶÔÓÚÄÇÖÖÔöÁ¿Ð޸ĵÄÓ¦ÓÃÄ£ÐͲ»Êʺϡ£
ÈÝ´íÐÔ¡£
ÔÚ·Ö²¼Ê½Êý¾Ý¼¯¼ÆËãʱͨ¹ýcheckpointÀ´ÊµÏÖÈÝ´í£¬¶øcheckpointÓÐÁ½ÖÖ·½Ê½£¬Ò»¸öÊÇcheckpoint
data£¬Ò»¸öÊÇlogging the updates¡£Óû§¿ÉÒÔ¿ØÖƲÉÓÃÄÄÖÖ·½Ê½À´ÊµÏÖÈÝ´í¡£
¿ÉÓÃÐÔ¡£
Sparkͨ¹ýÌṩ·á¸»µÄScala, Java£¬Python API¼°½»»¥Ê½ShellÀ´Ìá¸ß¿ÉÓÃÐÔ¡£
SparkÓëHadoopµÄ½áºÏ
Spark¿ÉÒÔÖ±½Ó¶ÔHDFS½øÐÐÊý¾ÝµÄ¶Áд£¬Í¬ÑùÖ§³ÖSpark on YARN¡£Spark¿ÉÒÔÓëMapReduceÔËÐÐÓÚͬ¼¯ÈºÖУ¬¹²Ïí´æ´¢×ÊÔ´Óë¼ÆË㣬Êý¾Ý²Ö¿âSharkʵÏÖÉϽèÓÃHive£¬¼¸ºõÓëHiveÍêÈ«¼æÈÝ¡£
SparkµÄÊÊÓó¡¾°
SparkÊÇ»ùÓÚÄÚ´æµÄµü´ú¼ÆËã¿ò¼Ü£¬ÊÊÓÃÓÚÐèÒª¶à´Î²Ù×÷ÌØ¶¨Êý¾Ý¼¯µÄÓ¦Óó¡ºÏ¡£ÐèÒª·´¸´²Ù×÷µÄ´ÎÊýÔ½¶à£¬ËùÐè¶ÁÈ¡µÄÊý¾ÝÁ¿Ô½´ó£¬ÊÜÒæÔ½´ó£¬Êý¾ÝÁ¿Ð¡µ«ÊǼÆËãÃܼ¯¶È½Ï´óµÄ³¡ºÏ£¬ÊÜÒæ¾ÍÏà¶Ô½ÏС
ÓÉÓÚRDDµÄÌØÐÔ£¬Spark²»ÊÊÓÃÄÇÖÖÒ첽ϸÁ£¶È¸üÐÂ״̬µÄÓ¦Óã¬ÀýÈçweb·þÎñµÄ´æ´¢»òÕßÊÇÔöÁ¿µÄwebÅÀ³æºÍË÷Òý¡£¾ÍÊǶÔÓÚÄÇÖÖÔöÁ¿Ð޸ĵÄÓ¦ÓÃÄ£ÐͲ»Êʺϡ£
×ܵÄÀ´ËµSparkµÄÊÊÓÃÃæ±È½Ï¹ã·ºÇұȽÏͨÓá£
ÔËÐÐģʽ
±¾µØÄ£Ê½
Standaloneģʽ
Mesoesģʽ
yarnģʽ
SparkÉú̬ϵͳ
Shark ( Hive on Spark): Shark»ù±¾ÉϾÍÊÇÔÚSparkµÄ¿ò¼Ü»ù´¡ÉÏÌṩºÍHiveÒ»ÑùµÄH
iveQLÃüÁî½Ó¿Ú£¬ÎªÁË×î´ó³Ì¶ÈµÄ±£³ÖºÍHiveµÄ¼æÈÝÐÔ£¬SharkʹÓÃÁËHiveµÄAPIÀ´ÊµÏÖquery
ParsingºÍ Logic Plan generation£¬×îºóµÄPhysicalPlan execution½×¶ÎÓÃSpark´úÌæHadoop
MapReduce¡£Í¨¹ýÅäÖÃShark²ÎÊý£¬Shark¿ÉÒÔ×Ô¶¯ÔÚÄÚ´æÖлº´æÌض¨µÄRDD£¬ÊµÏÖÊý¾ÝÖØÓ㬽ø¶ø¼Ó¿ìÌØ¶¨Êý¾Ý¼¯µÄ¼ìË÷¡£Í¬Ê±£¬Sharkͨ¹ýUDFÓû§×Ô¶¨Ò庯ÊýʵÏÖÌØ¶¨µÄÊý¾Ý·ÖÎöѧϰËã·¨£¬Ê¹µÃSQLÊý¾Ý²éѯºÍÔËËã·ÖÎöÄܽáºÏÔÚÒ»Æð£¬×î´ó»¯RDDµÄÖØ¸´Ê¹Óá£
Spark streaming: ¹¹½¨ÔÚSparkÉÏ´¦ÀíStreamÊý¾ÝµÄ¿ò¼Ü£¬»ù±¾µÄÔÀíÊǽ«StreamÊý¾Ý·Ö³ÉСµÄʱ¼äƬ¶Ï£¨¼¸Ã룩£¬ÒÔÀàËÆbatchÅúÁ¿´¦ÀíµÄ·½Ê½À´´¦ÀíÕâС²¿·ÖÊý¾Ý¡£Spark
Streaming¹¹½¨ÔÚSparkÉÏ£¬Ò»·½ÃæÊÇÒòΪSparkµÄµÍÑÓ³ÙÖ´ÐÐÒýÇæ£¨100ms+£©¿ÉÒÔÓÃÓÚʵʱ¼ÆË㣬ÁíÒ»·½ÃæÏà±È»ùÓÚRecordµÄÆäËü´¦Àí¿ò¼Ü£¨ÈçStorm£©£¬RDDÊý¾Ý¼¯¸üÈÝÒ××ö¸ßЧµÄÈÝ´í´¦Àí¡£´ËÍâСÅúÁ¿´¦ÀíµÄ·½Ê½Ê¹µÃËü¿ÉÒÔͬʱ¼æÈÝÅúÁ¿ºÍʵʱÊý¾Ý´¦ÀíµÄÂß¼ºÍËã·¨¡£·½±ãÁËһЩÐèÒªÀúÊ·Êý¾ÝºÍʵʱÊý¾ÝÁªºÏ·ÖÎöµÄÌØ¶¨Ó¦Óó¡ºÏ¡£
Bagel: Pregel on Spark£¬¿ÉÒÔÓÃSpark½øÐÐͼ¼ÆË㣬ÕâÊǸö·Ç³£ÓÐÓõÄСÏîÄ¿¡£Bagel×Ô´øÁËÒ»¸öÀý×Ó£¬ÊµÏÖÁËGoogleµÄPageRankËã·¨¡£
ÔÚÒµ½çµÄʹÓÃ
SparkÏîÄ¿ÔÚ2009ÄêÆô¶¯£¬2010Ä꿪Դ, ÏÖÔÚʹÓõÄÓУºBerkeley,
Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo!
Research & others, ÌÔ±¦µÈ£¬¶¹°êÒ²ÔÚʹÓÃSparkµÄpython¿Ë¡°æDpark¡£
SparkºËÐĸÅÄî
Resilient Distributed Dataset (RDD)µ¯ÐÔ·Ö²¼Êý¾Ý¼¯
RDDÊÇSparkµÄ×î»ù±¾³éÏó,ÊǶԷֲ¼Ê½ÄÚ´æµÄ³éÏóʹÓã¬ÊµÏÖÁËÒÔ²Ù×÷±¾µØ¼¯ºÏµÄ·½Ê½À´²Ù×÷·Ö²¼Ê½Êý¾Ý¼¯µÄ³éÏóʵÏÖ¡£RDDÊÇSpark×îºËÐĵĶ«Î÷£¬Ëü±íʾÒѱ»·ÖÇø£¬²»¿É±äµÄ²¢Äܹ»±»²¢ÐвÙ×÷µÄÊý¾Ý¼¯ºÏ£¬²»Í¬µÄÊý¾Ý¼¯¸ñʽ¶ÔÓ¦²»Í¬µÄRDDʵÏÖ¡£RDD±ØÐëÊÇ¿ÉÐòÁл¯µÄ¡£RDD¿ÉÒÔcacheµ½ÄÚ´æÖУ¬Ã¿´Î¶ÔRDDÊý¾Ý¼¯µÄ²Ù×÷Ö®ºóµÄ½á¹û£¬¶¼¿ÉÒÔ´æ·Åµ½ÄÚ´æÖУ¬ÏÂÒ»¸ö²Ù×÷¿ÉÒÔÖ±½Ó´ÓÄÚ´æÖÐÊäÈ룬ʡȥÁËMapReduce´óÁ¿µÄ´ÅÅÌIO²Ù×÷¡£Õâ¶ÔÓÚµü´úÔËËã±È½Ï³£¼ûµÄ»úÆ÷ѧϰËã·¨,
½»»¥Ê½Êý¾ÝÍÚ¾òÀ´Ëµ£¬Ð§ÂÊÌáÉý±È½Ï´ó¡£
RDDµÄÌØµã£º
ËüÊÇÔÚ¼¯Èº½ÚµãÉϵIJ»¿É±äµÄ¡¢ÒÑ·ÖÇøµÄ¼¯ºÏ¶ÔÏó¡£
ͨ¹ý²¢ÐÐת»»µÄ·½Ê½À´´´½¨È磨map, filter, join, etc£©¡£
ʧ°Ü×Ô¶¯Öؽ¨¡£
¿ÉÒÔ¿ØÖÆ´æ´¢¼¶±ð£¨ÄÚ´æ¡¢´ÅÅ̵ȣ©À´½øÐÐÖØÓá£
±ØÐëÊÇ¿ÉÐòÁл¯µÄ¡£
ÊǾ²Ì¬ÀàÐ͵ġ£
RDDµÄºÃ´¦
RDDÖ»Äܴӳ־ô洢»òͨ¹ýTransformations²Ù×÷²úÉú£¬Ïà±ÈÓÚ·Ö²¼Ê½¹²ÏíÄڴ棨DSM£©¿ÉÒÔ¸ü¸ßЧʵÏÖÈÝ´í£¬¶ÔÓÚ¶ªÊ§²¿·ÖÊý¾Ý·ÖÇøÖ»Ðè¸ù¾ÝËüµÄlineage¾Í¿ÉÖØÐ¼ÆËã³öÀ´£¬¶ø²»ÐèÒª×öÌØ¶¨µÄCheckpoint¡£
RDDµÄ²»±äÐÔ£¬¿ÉÒÔʵÏÖÀàHadoop MapReduceµÄÍÆ²âʽִÐС£
RDDµÄÊý¾Ý·ÖÇøÌØÐÔ£¬¿ÉÒÔͨ¹ýÊý¾ÝµÄ±¾µØÐÔÀ´Ìá¸ßÐÔÄÜ£¬ÕâÓëHadoop
MapReduceÊÇÒ»ÑùµÄ¡£
RDD¶¼ÊÇ¿ÉÐòÁл¯µÄ£¬ÔÚÄÚ´æ²»×ãʱ¿É×Ô¶¯½µ¼¶Îª´ÅÅÌ´æ´¢£¬°ÑRDD´æ´¢ÓÚ´ÅÅÌÉÏ£¬ÕâʱÐÔÄÜ»áÓдóµÄϽµµ«²»»á²îÓÚÏÖÔÚµÄMapReduce¡£
RDDµÄ´æ´¢Óë·ÖÇø
Óû§¿ÉÒÔÑ¡Ôñ²»Í¬µÄ´æ´¢¼¶±ð´æ´¢RDDÒÔ±ãÖØÓá£
µ±Ç°RDDĬÈÏÊÇ´æ´¢ÓÚÄڴ棬µ«µ±ÄÚ´æ²»×ãʱ£¬RDD»áspillµ½disk¡£
RDDÔÚÐèÒª½øÐзÖÇø°ÑÊý¾Ý·Ö²¼ÓÚ¼¯ÈºÖÐʱ»á¸ù¾ÝÿÌõ¼Ç¼Key½øÐзÖÇø£¨ÈçHash
·ÖÇø£©£¬ÒԴ˱£Ö¤Á½¸öÊý¾Ý¼¯ÔÚJoinʱÄܸßЧ¡£
RDDµÄÄÚ²¿±íʾ
ÔÚRDDµÄÄÚ²¿ÊµÏÖÖÐÿ¸öRDD¶¼¿ÉÒÔʹÓÃ5¸ö·½ÃæµÄÌØÐÔÀ´±íʾ£º
·ÖÇøÁÐ±í£¨Êý¾Ý¿éÁÐ±í£©
¼ÆËãÿ¸ö·ÖƬµÄº¯Êý£¨¸ù¾Ý¸¸RDD¼ÆËã³ö´ËRDD£©
¶Ô¸¸RDDµÄÒÀÀµÁбí
¶Ôkey-value RDDµÄPartitioner¡¾¿ÉÑ¡¡¿
ÿ¸öÊý¾Ý·ÖƬµÄÔ¤¶¨Ò嵨ַÁбí(ÈçHDFSÉϵÄÊý¾Ý¿éµÄµØÖ·)¡¾¿ÉÑ¡¡¿
RDDµÄ´æ´¢¼¶±ð
RDD¸ù¾ÝuseDisk¡¢useMemory¡¢deserialized¡¢replicationËĸö²ÎÊýµÄ×éºÏÌṩÁË11ÖÖ´æ´¢¼¶±ð£º
val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) |
RDD¶¨ÒåÁ˸÷ÖÖ²Ù×÷£¬²»Í¬ÀàÐ͵ÄÊý¾ÝÓɲ»Í¬µÄRDDÀà³éÏó±íʾ£¬²»Í¬µÄ²Ù×÷Ò²ÓÉRDD½øÐгéʵÏÖ¡£
RDDµÄÉú³É
RDDÓÐÁ½ÖÖ´´½¨·½Ê½£º
1¡¢´ÓHadoopÎļþϵͳ£¨»òÓëHadoop¼æÈÝµÄÆäËü´æ´¢ÏµÍ³£©ÊäÈ루ÀýÈçHDFS£©´´½¨¡£
2¡¢´Ó¸¸RDDת»»µÃµ½ÐÂRDD¡£
ÏÂÃæÀ´¿´Ò»´ÓHadoopÎļþϵͳÉú³ÉRDDµÄ·½Ê½£¬È磺val file
= spark.textFile("hdfs://...")£¬file±äÁ¿¾ÍÊÇRDD£¨Êµ¼ÊÊÇHadoopRDDʵÀý£©£¬Éú³ÉµÄËüµÄºËÐÄ´úÂëÈçÏ£º
// SparkContext¸ù¾ÝÎļþ/Ŀ¼¼°¿ÉÑ¡µÄ·ÖƬÊý´´½¨RDD, ÕâÀïÎÒÃÇ¿ÉÒÔ¿´µ½SparkÓëHadoop MapReduceºÜÏñ // ÐèÒªInputFormat, Key¡¢ValueµÄÀàÐÍ£¬ÆäʵSparkʹÓõÄHadoopµÄInputFormat, WritableÀàÐÍ¡£ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // ¸ù¾ÝHadoopÅäÖ㬼°InputFormatµÈ´´½¨HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) |
¶ÔRDD½øÐмÆËãʱ£¬RDD´ÓHDFS¶ÁÈ¡Êý¾ÝʱÓëHadoop MapReduce¼¸ºõÒ»ÑùµÄ£º
// ¸ù¾ÝhadoopÅäÖÃºÍ·ÖÆ¬´ÓInputFormatÖлñÈ¡RecordReader½øÐÐÊý¾ÝµÄ¶ÁÈ¡¡£ reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) val key: K = reader.createKey() val value: V = reader.createValue() //ʹÓÃHadoop MapReduceµÄRecordReader¶ÁÈ¡Êý¾Ý£¬Ã¿¸öKey¡¢Value¶ÔÒÔÔª×é·µ»Ø¡£ override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } (key, value) } |
RDDµÄת»»Óë²Ù×÷
¶ÔÓÚRDD¿ÉÒÔÓÐÁ½ÖÖ¼ÆË㷽ʽ£º×ª»»£¨·µ»ØÖµ»¹ÊÇÒ»¸öRDD£©Óë²Ù×÷£¨·µ»ØÖµ²»ÊÇÒ»¸öRDD£©¡£
ת»»(Transformations) (È磺map, filter,
groupBy, joinµÈ)£¬Transformations²Ù×÷ÊÇLazyµÄ£¬Ò²¾ÍÊÇ˵´ÓÒ»¸öRDDת»»Éú³ÉÁíÒ»¸öRDDµÄ²Ù×÷²»ÊÇÂíÉÏÖ´ÐУ¬SparkÔÚÓöµ½Transformations²Ù×÷ʱֻ»á¼Ç¼ÐèÒªÕâÑùµÄ²Ù×÷£¬²¢²»»áÈ¥Ö´ÐУ¬ÐèÒªµÈµ½ÓÐActions²Ù×÷µÄʱºò²Å»áÕæÕýÆô¶¯¼ÆËã¹ý³Ì½øÐмÆËã¡£
²Ù×÷(Actions) (È磺count, collect, saveµÈ)£¬Actions²Ù×÷»á·µ»Ø½á¹û»ò°ÑRDDÊý¾Ýдµ½´æ´¢ÏµÍ³ÖС£ActionsÊÇ´¥·¢SparkÆô¶¯¼ÆËãµÄ¶¯Òò¡£
ÏÂÃæÊ¹ÓÃÒ»¸öÀý×ÓÀ´Ê¾Àý˵Ã÷TransformationsÓëActionsÔÚSparkµÄʹÓá£
val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....) |

Lineage£¨ÑªÍ³£©
ÀûÓÃÄÚ´æ¼Ó¿ìÊý¾Ý¼ÓÔØ,ÔÚÖÚ¶àµÄÆäËüµÄIn-MemoryÀàÊý¾Ý¿â»òCacheÀàϵͳÖÐÒ²ÓÐʵÏÖ£¬SparkµÄÖ÷񻂿±ðÔÚÓÚËü´¦Àí·Ö²¼Ê½ÔËËã»·¾³ÏµÄÊý¾ÝÈÝ´íÐÔ£¨½ÚµãʵЧ/Êý¾Ý¶ªÊ§£©ÎÊÌâʱ²ÉÓõķ½°¸¡£ÎªÁ˱£Ö¤RDDÖÐÊý¾ÝµÄ³°ôÐÔ£¬RDDÊý¾Ý¼¯Í¨¹ýËùνµÄѪͳ¹ØÏµ(Lineage)¼ÇסÁËËüÊÇÈçºÎ´ÓÆäËüRDDÖÐÑݱä¹ýÀ´µÄ¡£Ïà±ÈÆäËüϵͳµÄϸ¿ÅÁ£¶ÈµÄÄÚ´æÊý¾Ý¸üм¶±ðµÄ±¸·Ý»òÕßLOG»úÖÆ£¬RDDµÄLineage¼Ç¼µÄÊÇ´Ö¿ÅÁ£¶ÈµÄÌØ¶¨Êý¾Ýת»»£¨Transformation£©²Ù×÷£¨filter,
map, join etc.)ÐÐΪ¡£µ±Õâ¸öRDDµÄ²¿·Ö·ÖÇøÊý¾Ý¶ªÊ§Ê±£¬Ëü¿ÉÒÔͨ¹ýLineage»ñÈ¡×ã¹»µÄÐÅÏ¢À´ÖØÐÂÔËËãºÍ»Ö¸´¶ªÊ§µÄÊý¾Ý·ÖÇø¡£ÕâÖÖ´Ö¿ÅÁ£µÄÊý¾ÝÄ£ÐÍ£¬ÏÞÖÆÁËSparkµÄÔËÓó¡ºÏ£¬µ«Í¬Ê±Ïà±Èϸ¿ÅÁ£¶ÈµÄÊý¾ÝÄ£ÐÍ£¬Ò²´øÀ´ÁËÐÔÄܵÄÌáÉý¡£
RDDÔÚLineageÒÀÀµ·½Ãæ·ÖΪÁ½ÖÖNarrow DependenciesÓëWide
DependenciesÓÃÀ´½â¾öÊý¾ÝÈÝ´íµÄ¸ßЧÐÔ¡£Narrow DependenciesÊÇÖ¸¸¸RDDµÄÿһ¸ö·ÖÇø×î¶à±»Ò»¸ö×ÓRDDµÄ·ÖÇøËùÓ㬱íÏÖΪһ¸ö¸¸RDDµÄ·ÖÇø¶ÔÓ¦ÓÚÒ»¸ö×ÓRDDµÄ·ÖÇø»ò¶à¸ö¸¸RDDµÄ·ÖÇø¶ÔÓ¦ÓÚÒ»¸ö×ÓRDDµÄ·ÖÇø£¬Ò²¾ÍÊÇ˵һ¸ö¸¸RDDµÄÒ»¸ö·ÖÇø²»¿ÉÄܶÔÓ¦Ò»¸ö×ÓRDDµÄ¶à¸ö·ÖÇø¡£Wide
DependenciesÊÇÖ¸×ÓRDDµÄ·ÖÇøÒÀÀµÓÚ¸¸RDDµÄ¶à¸ö·ÖÇø»òËùÓзÖÇø£¬Ò²¾ÍÊÇ˵´æÔÚÒ»¸ö¸¸RDDµÄÒ»¸ö·ÖÇø¶ÔÓ¦Ò»¸ö×ÓRDDµÄ¶à¸ö·ÖÇø¡£¶ÔÓëWide
Dependencies£¬ÕâÖÖ¼ÆËãµÄÊäÈëºÍÊä³öÔÚ²»Í¬µÄ½ÚµãÉÏ£¬lineage·½·¨¶ÔÓëÊäÈë½ÚµãÍêºÃ£¬¶øÊä³ö½Úµãå´»úʱ£¬Í¨¹ýÖØÐ¼ÆË㣬ÕâÖÖÇé¿öÏ£¬ÕâÖÖ·½·¨ÈÝ´íÊÇÓÐЧµÄ£¬·ñÔòÎÞЧ£¬ÒòΪÎÞ·¨ÖØÊÔ£¬ÐèÒªÏòÉÏÆä׿ÏÈ×·ËÝ¿´ÊÇ·ñ¿ÉÒÔÖØÊÔ£¨Õâ¾ÍÊÇlineage£¬ÑªÍ³µÄÒâ˼£©£¬Narrow
Dependencies¶ÔÓÚÊý¾ÝµÄÖØË㿪ÏúҪԶСÓÚWide DependenciesµÄÊý¾ÝÖØË㿪Ïú¡£
ÈÝ´í
ÔÚRDD¼ÆË㣬ͨ¹ýcheckpint½øÐÐÈÝ´í£¬×öcheckpointÓÐÁ½ÖÖ·½Ê½£¬Ò»¸öÊÇcheckpoint
data£¬Ò»¸öÊÇlogging the updates¡£Óû§¿ÉÒÔ¿ØÖƲÉÓÃÄÄÖÖ·½Ê½À´ÊµÏÖÈÝ´í£¬Ä¬ÈÏÊÇlogging
the updates·½Ê½£¬Í¨¹ý¼Ç¼¸ú×ÙËùÓÐÉú³ÉRDDµÄת»»£¨transformations£©Ò²¾ÍÊǼǼÿ¸öRDDµÄlineage£¨ÑªÍ³£©À´ÖØÐ¼ÆËãÉú³É¶ªÊ§µÄ·ÖÇøÊý¾Ý¡£
×ÊÔ´¹ÜÀíÓë×÷Òµµ÷¶È
Spark¶ÔÓÚ×ÊÔ´¹ÜÀíÓë×÷Òµµ÷¶È¿ÉÒÔʹÓÃStandalone(¶ÀÁ¢Ä£Ê½)£¬Apache
Mesos¼°Hadoop YARNÀ´ÊµÏÖ¡£ Spark on YarnÔÚSpark0.6ʱÒýÓ㬵«ÕæÕý¿ÉÓÃÊÇÔÚÏÖÔÚµÄbranch-0.8°æ±¾¡£Spark
on Yarn×ñÑYARNµÄ¹Ù·½¹æ·¶ÊµÏÖ£¬µÃÒæÓÚSparkÌìÉúÖ§³Ö¶àÖÖSchedulerºÍExecutorµÄÁ¼ºÃÉè¼Æ£¬¶ÔYARNµÄÖ§³ÖÒ²¾Í·Ç³£ÈÝÒ×£¬Spark
on YarnµÄ´óÖ¿ò¼Üͼ¡£

ÈÃSparkÔËÐÐÓÚYARNÉÏÓëHadoop¹²Óü¯Èº×ÊÔ´¿ÉÒÔÌá¸ß×ÊÔ´ÀûÓÃÂÊ¡£
±à³Ì½Ó¿Ú
Sparkͨ¹ýÓë±à³ÌÓïÑÔ¼¯³ÉµÄ·½Ê½±©Â¶RDDµÄ²Ù×÷£¬ÀàËÆÓÚDryadLINQºÍFlumeJava£¬Ã¿¸öÊý¾Ý¼¯¶¼±íʾΪRDD¶ÔÏ󣬶ÔÊý¾Ý¼¯µÄ²Ù×÷¾Í±íʾ³É¶ÔRDD¶ÔÏóµÄ²Ù×÷¡£SparkÖ÷ÒªµÄ±à³ÌÓïÑÔÊÇScala£¬Ñ¡ÔñScalaÊÇÒòΪËüµÄ¼ò½àÐÔ£¨Scala¿ÉÒԺܷ½±ãÔÚ½»»¥Ê½ÏÂʹÓ㩺ÍÐÔÄÜ£¨JVMÉϵľ²Ì¬Ç¿ÀàÐÍÓïÑÔ£©¡£
SparkºÍHadoop MapReduceÀàËÆ£¬ÓÉMaster(ÀàËÆÓÚMapReduceµÄJobtracker)ºÍWorkers(SparkµÄSlave¹¤×÷½Úµã)×é³É¡£Óû§±àдµÄSpark³ÌÐò±»³ÆÎªDriver³ÌÐò£¬Dirver³ÌÐò»áÁ¬½Ómaster²¢¶¨ÒåÁ˶Ը÷RDDµÄת»»Óë²Ù×÷£¬¶ø¶ÔRDDµÄת»»Óë²Ù×÷ͨ¹ýScala±Õ°ü(×ÖÃæÁ¿º¯Êý)À´±íʾ£¬ScalaʹÓÃJava¶ÔÏóÀ´±íʾ±Õ°üÇÒ¶¼ÊÇ¿ÉÐòÁл¯µÄ£¬ÒԴ˰ѶÔRDDµÄ±Õ°ü²Ù×÷·¢Ë͵½¸÷Workers½Úµã¡£
Workers´æ´¢×ÅÊý¾Ý·Ö¿éºÍÏíÓм¯ÈºÄڴ棬ÊÇÔËÐÐÔÚ¹¤×÷½ÚµãÉϵÄÊØ»¤½ø³Ì£¬µ±ËüÊÕµ½¶ÔRDDµÄ²Ù×÷ʱ£¬¸ù¾ÝÊý¾Ý·ÖƬÐÅÏ¢½øÐб¾µØ»¯Êý¾Ý²Ù×÷£¬Éú³ÉеÄÊý¾Ý·ÖƬ¡¢·µ»Ø½á¹û»ò°ÑRDDдÈë´æ´¢ÏµÍ³¡£

Scala
SparkʹÓÃScala¿ª·¢£¬Ä¬ÈÏʹÓÃScala×÷Ϊ±à³ÌÓïÑÔ¡£±àдSpark³ÌÐò±È±àдHadoop
MapReduce³ÌÐòÒª¼òµ¥µÄ¶à£¬SparKÌṩÁËSpark-Shell£¬¿ÉÒÔÔÚSpark-Shell²âÊÔ³ÌÐò¡£Ð´SparK³ÌÐòµÄÒ»°ã²½Öè¾ÍÊÇ´´½¨»òʹÓÃ(SparkContext)ʵÀý£¬Ê¹ÓÃSparkContext´´½¨RDD£¬È»ºó¾ÍÊǶÔRDD½øÐвÙ×÷¡£È磺
val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....)..... |
Java
SparkÖ§³ÖJava±à³Ì£¬µ«¶ÔÓÚʹÓÃJava¾ÍûÓÐÁËSpark-ShellÕâÑù·½±ãµÄ¹¤¾ß£¬ÆäËüÓëScala±à³ÌÊÇÒ»ÑùµÄ£¬ÒòΪ¶¼ÊÇJVMÉϵÄÓïÑÔ£¬ScalaÓëJava¿ÉÒÔ»¥²Ù×÷£¬Java±à³Ì½Ó¿ÚÆäʵ¾ÍÊǶÔScalaµÄ·â×°¡£È磺
JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction<String, String>() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } ); |
Python
ÏÖÔÚSparkÒ²ÌṩÁËPython±à³Ì½Ó¿Ú£¬SparkʹÓÃpy4jÀ´ÊµÏÖpythonÓëjavaµÄ»¥²Ù×÷£¬´Ó¶øÊµÏÖʹÓÃpython±àдSpark³ÌÐò¡£SparkҲͬÑùÌṩÁËpyspark£¬Ò»¸öSparkµÄpython
shell£¬¿ÉÒÔÒÔ½»»¥Ê½µÄ·½Ê½Ê¹ÓÃPython±àдSpark³ÌÐò¡£ È磺
from pyspark import SparkContext sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words") words.filter(lambda w: w.startswith("spar")).take(5) |
ʹÓÃʾÀý
Standaloneģʽ
Ϊ·½±ãSparkµÄÍÆ¹ãʹÓã¬SparkÌṩÁËStandaloneģʽ£¬SparkÒ»¿ªÊ¼¾ÍÉè¼ÆÔËÐÐÓÚApache
Mesos×ÊÔ´¹ÜÀí¿ò¼ÜÉÏ£¬ÕâÊǷdz£ºÃµÄÉè¼Æ£¬µ«ÊÇÈ´´øÁ˲¿Êð²âÊԵĸ´ÔÓÐÔ¡£ÎªÁËÈÃSparkÄܸü·½±ãµÄ²¿ÊðºÍ³¢ÊÔ£¬SparkÒò´ËÌṩÁËStandaloneÔËÐÐģʽ£¬ËüÓÉÒ»¸öSpark
MasterºÍ¶à¸öSpark worker×é³É£¬ÓëHadoop MapReduce1ºÜÏàËÆ£¬¾ÍÁ¬¼¯ÈºÆô¶¯·½Ê½¶¼¼¸ºõÊÇÒ»Ñù¡£
ÒÔStandaloneģʽÔËÐÐSpark¼¯Èº
ÏÂÔØScala2.9.3£¬²¢ÅäÖÃSCALA_HOME
ÏÂÔØSpark´úÂ루¿ÉÒÔʹÓÃÔ´Âë±àÒëÒ²¿ÉÒÔÏÂÔØ±àÒëºÃµÄ°æ±¾£©ÕâÀïÏÂÔØ ±àÒëºÃµÄ°æ±¾£¨http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz£©
½âѹspark-0.7.3-prebuilt-cdh4.tgz°²×°°ü
ÐÞ¸ÄÅäÖã¨conf/*£© slaves: ÅäÖù¤×÷½ÚµãµÄÖ÷»úÃû spark-env.sh£ºÅäÖû·¾³±äÁ¿¡£
SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1 |
°ÑHadoopÅäÖÃcopyµ½confĿ¼ÏÂ
ÔÚmasterÖ÷»úÉÏ¶ÔÆäËü»úÆ÷×össhÎÞÃÜÂëµÇ¼
°ÑÅäÖúõÄSpark³ÌÐòʹÓÃscp copyµ½ÆäËü»úÆ÷
ÔÚmasterÆô¶¯¼¯Èº
yarnģʽ
Spark-shellÏÖÔÚ»¹²»Ö§³ÖYarnģʽ£¬Ê¹ÓÃYarnģʽÔËÐУ¬ÐèÒª°ÑSpark³ÌÐòÈ«²¿´ò°ü³ÉÒ»¸öjar°üÌá½»µ½YarnÉÏÔËÐС£Ä¿Â¼Ö»ÓÐbranch-0.8°æ±¾²ÅÕæÕýÖ§³ÖYarn¡£
ÒÔYarnģʽÔËÐÐSpark
ÏÂÔØSpark´úÂë.
git clone git://github.com/mesos/spark |
Çл»µ½branch-0.8
cd spark git checkout -b yarn --track origin/yarn |
ʹÓÃsbt±àÒëSpark²¢
$SPARK_HOME/sbt/sbt > package > assembly |
°ÑHadoop yarnÅäÖÃcopyµ½confĿ¼ÏÂ
ÔËÐвâÊÔ
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone
|
ʹÓÃSpark-shell
Spark-shellʹÓúܼòµ¥£¬µ±SparkÒÔStandalonģʽÔËÐкó£¬Ê¹ÓÃ$SPARK_HOME/spark-shell½øÈëshell¼´¿É£¬ÔÚSpark-shellÖÐSparkContextÒѾ´´½¨ºÃÁË£¬ÊµÀýÃûΪsc¿ÉÒÔÖ±½ÓʹÓ㬻¹ÓÐÒ»¸öÐèҪעÒâµÄÊÇ£¬ÔÚStandaloneģʽÏ£¬SparkĬÈÏʹÓõĵ÷¶ÈÆ÷µÄFIFOµ÷¶ÈÆ÷¶ø²»Êǹ«Æ½µ÷¶È£¬¶øSpark-shell×÷Ϊһ¸öSpark³ÌÐòÒ»Ö±ÔËÐÐÔÚSparkÉÏ£¬ÆäËüµÄSpark³ÌÐò¾ÍÖ»ÄÜÅŶӵȴý£¬Ò²¾ÍÊÇ˵ͬһʱ¼äÖ»ÄÜÓÐÒ»¸öSpark-shellÔÚÔËÐС£
ÔÚSpark-shellÉÏд³ÌÐò·Ç³£¼òµ¥£¬¾ÍÏñÔÚScala ShellÉÏд³ÌÐòÒ»Ñù¡£
scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 21374 scala> textFile.first() // First item in this RDD res1: String = # Spark |
±àдDriver³ÌÐò
ÔÚSparkÖÐSpark³ÌÐò³ÆÎªDriver³ÌÐò£¬±àдDriver³ÌÐòºÜ¼òµ¥¼¸ºõÓëÔÚSpark-shellÉÏд³ÌÐòÊÇÒ»ÑùµÄ£¬²»Í¬µÄµØ·½¾ÍÊÇSparkContextÐèÒª×Ô¼º´´½¨¡£ÈçWorkCount³ÌÐòÈçÏ£º
import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext£¬ args(0)ÓÉyarn´«ÈëappMasterµØÖ· val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } } |
|