Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark£ºÒ»¸ö¸ßЧµÄ·Ö²¼Ê½¼ÆËãϵͳ
 
×÷ÕߣºÏÄ¿¡ð½£¬ÉÛÈüÈü À´Ô´£º UC¼¼Êõ²©¿Í ·¢²¼ÓÚ£º2015-7-2
  2756  次浏览      27
 

¸ÅÊö

ʲôÊÇSpark

SparkÊÇUC Berkeley AMP labËù¿ªÔ´µÄÀàHadoop MapReduceµÄͨÓõIJ¢ÐмÆËã¿ò¼Ü£¬Spark»ùÓÚmap reduceË㷨ʵÏֵķֲ¼Ê½¼ÆË㣬ӵÓÐHadoop MapReduceËù¾ßÓеÄÓŵ㣻µ«²»Í¬ÓÚMapReduceµÄÊÇJobÖмäÊä³öºÍ½á¹û¿ÉÒÔ±£´æÔÚÄÚ´æÖУ¬´Ó¶ø²»ÔÙÐèÒª¶ÁдHDFS£¬Òò´ËSparkÄܸüºÃµØÊÊÓÃÓÚÊý¾ÝÍÚ¾òÓë»úÆ÷ѧϰµÈÐèÒªµü´úµÄmap reduceµÄËã·¨¡£Æä¼Ü¹¹ÈçÏÂͼËùʾ£º

SparkÓëHadoopµÄ¶Ô±È

SparkµÄÖмäÊý¾Ý·Åµ½ÄÚ´æÖУ¬¶ÔÓÚµü´úÔËËãЧÂʸü¸ß¡£

Spark¸üÊʺÏÓÚµü´úÔËËã±È½Ï¶àµÄMLºÍDMÔËËã¡£ÒòΪÔÚSparkÀïÃæ£¬ÓÐRDDµÄ³éÏó¸ÅÄî¡£

Spark±ÈHadoop¸üͨÓá£

SparkÌṩµÄÊý¾Ý¼¯²Ù×÷ÀàÐÍÓкܶàÖÖ£¬²»ÏñHadoopÖ»ÌṩÁËMapºÍReduceÁ½ÖÖ²Ù×÷¡£±ÈÈçmap, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionByµÈ¶àÖÖ²Ù×÷ÀàÐÍ£¬Spark°ÑÕâЩ²Ù×÷³ÆÎªTransformations¡£Í¬Ê±»¹ÌṩCount, collect, reduce, lookup, saveµÈ¶àÖÖactions²Ù×÷¡£

ÕâЩ¶àÖÖ¶àÑùµÄÊý¾Ý¼¯²Ù×÷ÀàÐÍ£¬¸ø¸ø¿ª·¢ÉϲãÓ¦ÓõÄÓû§ÌṩÁË·½±ã¡£¸÷¸ö´¦Àí½ÚµãÖ®¼äµÄͨÐÅÄ£ÐͲ»ÔÙÏñHadoopÄÇÑù¾ÍÊÇΨһµÄData ShuffleÒ»ÖÖģʽ¡£Óû§¿ÉÒÔÃüÃû£¬Îﻯ£¬¿ØÖÆÖмä½á¹ûµÄ´æ´¢¡¢·ÖÇøµÈ¡£¿ÉÒÔ˵±à³ÌÄ£ÐͱÈHadoop¸üÁé»î¡£

²»¹ýÓÉÓÚRDDµÄÌØÐÔ£¬Spark²»ÊÊÓÃÄÇÖÖÒ첽ϸÁ£¶È¸üÐÂ״̬µÄÓ¦Óã¬ÀýÈçweb·þÎñµÄ´æ´¢»òÕßÊÇÔöÁ¿µÄwebÅÀ³æºÍË÷Òý¡£¾ÍÊǶÔÓÚÄÇÖÖÔöÁ¿Ð޸ĵÄÓ¦ÓÃÄ£ÐͲ»Êʺϡ£

ÈÝ´íÐÔ¡£

ÔÚ·Ö²¼Ê½Êý¾Ý¼¯¼ÆËãʱͨ¹ýcheckpointÀ´ÊµÏÖÈÝ´í£¬¶øcheckpointÓÐÁ½ÖÖ·½Ê½£¬Ò»¸öÊÇcheckpoint data£¬Ò»¸öÊÇlogging the updates¡£Óû§¿ÉÒÔ¿ØÖƲÉÓÃÄÄÖÖ·½Ê½À´ÊµÏÖÈÝ´í¡£

¿ÉÓÃÐÔ¡£

Sparkͨ¹ýÌṩ·á¸»µÄScala, Java£¬Python API¼°½»»¥Ê½ShellÀ´Ìá¸ß¿ÉÓÃÐÔ¡£

SparkÓëHadoopµÄ½áºÏ

Spark¿ÉÒÔÖ±½Ó¶ÔHDFS½øÐÐÊý¾ÝµÄ¶Áд£¬Í¬ÑùÖ§³ÖSpark on YARN¡£Spark¿ÉÒÔÓëMapReduceÔËÐÐÓÚͬ¼¯ÈºÖУ¬¹²Ïí´æ´¢×ÊÔ´Óë¼ÆË㣬Êý¾Ý²Ö¿âSharkʵÏÖÉϽèÓÃHive£¬¼¸ºõÓëHiveÍêÈ«¼æÈÝ¡£

SparkµÄÊÊÓó¡¾°

SparkÊÇ»ùÓÚÄÚ´æµÄµü´ú¼ÆËã¿ò¼Ü£¬ÊÊÓÃÓÚÐèÒª¶à´Î²Ù×÷ÌØ¶¨Êý¾Ý¼¯µÄÓ¦Óó¡ºÏ¡£ÐèÒª·´¸´²Ù×÷µÄ´ÎÊýÔ½¶à£¬ËùÐè¶ÁÈ¡µÄÊý¾ÝÁ¿Ô½´ó£¬ÊÜÒæÔ½´ó£¬Êý¾ÝÁ¿Ð¡µ«ÊǼÆËãÃܼ¯¶È½Ï´óµÄ³¡ºÏ£¬ÊÜÒæ¾ÍÏà¶Ô½ÏС

ÓÉÓÚRDDµÄÌØÐÔ£¬Spark²»ÊÊÓÃÄÇÖÖÒ첽ϸÁ£¶È¸üÐÂ״̬µÄÓ¦Óã¬ÀýÈçweb·þÎñµÄ´æ´¢»òÕßÊÇÔöÁ¿µÄwebÅÀ³æºÍË÷Òý¡£¾ÍÊǶÔÓÚÄÇÖÖÔöÁ¿Ð޸ĵÄÓ¦ÓÃÄ£ÐͲ»Êʺϡ£

×ܵÄÀ´ËµSparkµÄÊÊÓÃÃæ±È½Ï¹ã·ºÇұȽÏͨÓá£

ÔËÐÐģʽ

±¾µØÄ£Ê½

Standaloneģʽ

Mesoesģʽ

yarnģʽ

SparkÉú̬ϵͳ

Shark ( Hive on Spark): Shark»ù±¾ÉϾÍÊÇÔÚSparkµÄ¿ò¼Ü»ù´¡ÉÏÌṩºÍHiveÒ»ÑùµÄH iveQLÃüÁî½Ó¿Ú£¬ÎªÁË×î´ó³Ì¶ÈµÄ±£³ÖºÍHiveµÄ¼æÈÝÐÔ£¬SharkʹÓÃÁËHiveµÄAPIÀ´ÊµÏÖquery ParsingºÍ Logic Plan generation£¬×îºóµÄPhysicalPlan execution½×¶ÎÓÃSpark´úÌæHadoop MapReduce¡£Í¨¹ýÅäÖÃShark²ÎÊý£¬Shark¿ÉÒÔ×Ô¶¯ÔÚÄÚ´æÖлº´æÌض¨µÄRDD£¬ÊµÏÖÊý¾ÝÖØÓ㬽ø¶ø¼Ó¿ìÌØ¶¨Êý¾Ý¼¯µÄ¼ìË÷¡£Í¬Ê±£¬Sharkͨ¹ýUDFÓû§×Ô¶¨Ò庯ÊýʵÏÖÌØ¶¨µÄÊý¾Ý·ÖÎöѧϰËã·¨£¬Ê¹µÃSQLÊý¾Ý²éѯºÍÔËËã·ÖÎöÄܽáºÏÔÚÒ»Æð£¬×î´ó»¯RDDµÄÖØ¸´Ê¹Óá£

Spark streaming: ¹¹½¨ÔÚSparkÉÏ´¦ÀíStreamÊý¾ÝµÄ¿ò¼Ü£¬»ù±¾µÄÔ­ÀíÊǽ«StreamÊý¾Ý·Ö³ÉСµÄʱ¼äƬ¶Ï£¨¼¸Ã룩£¬ÒÔÀàËÆbatchÅúÁ¿´¦ÀíµÄ·½Ê½À´´¦ÀíÕâС²¿·ÖÊý¾Ý¡£Spark Streaming¹¹½¨ÔÚSparkÉÏ£¬Ò»·½ÃæÊÇÒòΪSparkµÄµÍÑÓ³ÙÖ´ÐÐÒýÇæ£¨100ms+£©¿ÉÒÔÓÃÓÚʵʱ¼ÆË㣬ÁíÒ»·½ÃæÏà±È»ùÓÚRecordµÄÆäËü´¦Àí¿ò¼Ü£¨ÈçStorm£©£¬RDDÊý¾Ý¼¯¸üÈÝÒ××ö¸ßЧµÄÈÝ´í´¦Àí¡£´ËÍâСÅúÁ¿´¦ÀíµÄ·½Ê½Ê¹µÃËü¿ÉÒÔͬʱ¼æÈÝÅúÁ¿ºÍʵʱÊý¾Ý´¦ÀíµÄÂß¼­ºÍËã·¨¡£·½±ãÁËһЩÐèÒªÀúÊ·Êý¾ÝºÍʵʱÊý¾ÝÁªºÏ·ÖÎöµÄÌØ¶¨Ó¦Óó¡ºÏ¡£

Bagel: Pregel on Spark£¬¿ÉÒÔÓÃSpark½øÐÐͼ¼ÆË㣬ÕâÊǸö·Ç³£ÓÐÓõÄСÏîÄ¿¡£Bagel×Ô´øÁËÒ»¸öÀý×Ó£¬ÊµÏÖÁËGoogleµÄPageRankËã·¨¡£

ÔÚÒµ½çµÄʹÓÃ

SparkÏîÄ¿ÔÚ2009ÄêÆô¶¯£¬2010Ä꿪Դ, ÏÖÔÚʹÓõÄÓУºBerkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, ÌÔ±¦µÈ£¬¶¹°êÒ²ÔÚʹÓÃSparkµÄpython¿Ë¡°æDpark¡£

SparkºËÐĸÅÄî

Resilient Distributed Dataset (RDD)µ¯ÐÔ·Ö²¼Êý¾Ý¼¯

RDDÊÇSparkµÄ×î»ù±¾³éÏó,ÊǶԷֲ¼Ê½ÄÚ´æµÄ³éÏóʹÓã¬ÊµÏÖÁËÒÔ²Ù×÷±¾µØ¼¯ºÏµÄ·½Ê½À´²Ù×÷·Ö²¼Ê½Êý¾Ý¼¯µÄ³éÏóʵÏÖ¡£RDDÊÇSpark×îºËÐĵĶ«Î÷£¬Ëü±íʾÒѱ»·ÖÇø£¬²»¿É±äµÄ²¢Äܹ»±»²¢ÐвÙ×÷µÄÊý¾Ý¼¯ºÏ£¬²»Í¬µÄÊý¾Ý¼¯¸ñʽ¶ÔÓ¦²»Í¬µÄRDDʵÏÖ¡£RDD±ØÐëÊÇ¿ÉÐòÁл¯µÄ¡£RDD¿ÉÒÔcacheµ½ÄÚ´æÖУ¬Ã¿´Î¶ÔRDDÊý¾Ý¼¯µÄ²Ù×÷Ö®ºóµÄ½á¹û£¬¶¼¿ÉÒÔ´æ·Åµ½ÄÚ´æÖУ¬ÏÂÒ»¸ö²Ù×÷¿ÉÒÔÖ±½Ó´ÓÄÚ´æÖÐÊäÈ룬ʡȥÁËMapReduce´óÁ¿µÄ´ÅÅÌIO²Ù×÷¡£Õâ¶ÔÓÚµü´úÔËËã±È½Ï³£¼ûµÄ»úÆ÷ѧϰËã·¨, ½»»¥Ê½Êý¾ÝÍÚ¾òÀ´Ëµ£¬Ð§ÂÊÌáÉý±È½Ï´ó¡£

RDDµÄÌØµã£º

ËüÊÇÔÚ¼¯Èº½ÚµãÉϵIJ»¿É±äµÄ¡¢ÒÑ·ÖÇøµÄ¼¯ºÏ¶ÔÏó¡£

ͨ¹ý²¢ÐÐת»»µÄ·½Ê½À´´´½¨È磨map, filter, join, etc£©¡£

ʧ°Ü×Ô¶¯Öؽ¨¡£

¿ÉÒÔ¿ØÖÆ´æ´¢¼¶±ð£¨ÄÚ´æ¡¢´ÅÅ̵ȣ©À´½øÐÐÖØÓá£

±ØÐëÊÇ¿ÉÐòÁл¯µÄ¡£

ÊǾ²Ì¬ÀàÐ͵ġ£

RDDµÄºÃ´¦

RDDÖ»Äܴӳ־ô洢»òͨ¹ýTransformations²Ù×÷²úÉú£¬Ïà±ÈÓÚ·Ö²¼Ê½¹²ÏíÄڴ棨DSM£©¿ÉÒÔ¸ü¸ßЧʵÏÖÈÝ´í£¬¶ÔÓÚ¶ªÊ§²¿·ÖÊý¾Ý·ÖÇøÖ»Ðè¸ù¾ÝËüµÄlineage¾Í¿ÉÖØÐ¼ÆËã³öÀ´£¬¶ø²»ÐèÒª×öÌØ¶¨µÄCheckpoint¡£

RDDµÄ²»±äÐÔ£¬¿ÉÒÔʵÏÖÀàHadoop MapReduceµÄÍÆ²âʽִÐС£

RDDµÄÊý¾Ý·ÖÇøÌØÐÔ£¬¿ÉÒÔͨ¹ýÊý¾ÝµÄ±¾µØÐÔÀ´Ìá¸ßÐÔÄÜ£¬ÕâÓëHadoop MapReduceÊÇÒ»ÑùµÄ¡£

RDD¶¼ÊÇ¿ÉÐòÁл¯µÄ£¬ÔÚÄÚ´æ²»×ãʱ¿É×Ô¶¯½µ¼¶Îª´ÅÅÌ´æ´¢£¬°ÑRDD´æ´¢ÓÚ´ÅÅÌÉÏ£¬ÕâʱÐÔÄÜ»áÓдóµÄϽµµ«²»»á²îÓÚÏÖÔÚµÄMapReduce¡£

RDDµÄ´æ´¢Óë·ÖÇø

Óû§¿ÉÒÔÑ¡Ôñ²»Í¬µÄ´æ´¢¼¶±ð´æ´¢RDDÒÔ±ãÖØÓá£

µ±Ç°RDDĬÈÏÊÇ´æ´¢ÓÚÄڴ棬µ«µ±ÄÚ´æ²»×ãʱ£¬RDD»áspillµ½disk¡£

RDDÔÚÐèÒª½øÐзÖÇø°ÑÊý¾Ý·Ö²¼ÓÚ¼¯ÈºÖÐʱ»á¸ù¾ÝÿÌõ¼Ç¼Key½øÐзÖÇø£¨ÈçHash ·ÖÇø£©£¬ÒԴ˱£Ö¤Á½¸öÊý¾Ý¼¯ÔÚJoinʱÄܸßЧ¡£

RDDµÄÄÚ²¿±íʾ

ÔÚRDDµÄÄÚ²¿ÊµÏÖÖÐÿ¸öRDD¶¼¿ÉÒÔʹÓÃ5¸ö·½ÃæµÄÌØÐÔÀ´±íʾ£º

·ÖÇøÁÐ±í£¨Êý¾Ý¿éÁÐ±í£©

¼ÆËãÿ¸ö·ÖƬµÄº¯Êý£¨¸ù¾Ý¸¸RDD¼ÆËã³ö´ËRDD£©

¶Ô¸¸RDDµÄÒÀÀµÁбí

¶Ôkey-value RDDµÄPartitioner¡¾¿ÉÑ¡¡¿

ÿ¸öÊý¾Ý·ÖƬµÄÔ¤¶¨Ò嵨ַÁбí(ÈçHDFSÉϵÄÊý¾Ý¿éµÄµØÖ·)¡¾¿ÉÑ¡¡¿

RDDµÄ´æ´¢¼¶±ð

RDD¸ù¾ÝuseDisk¡¢useMemory¡¢deserialized¡¢replicationËĸö²ÎÊýµÄ×éºÏÌṩÁË11ÖÖ´æ´¢¼¶±ð£º

val NONE = new StorageLevel(false, false, false) 
val DISK_ONLY = new StorageLevel(true, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

RDD¶¨ÒåÁ˸÷ÖÖ²Ù×÷£¬²»Í¬ÀàÐ͵ÄÊý¾ÝÓɲ»Í¬µÄRDDÀà³éÏó±íʾ£¬²»Í¬µÄ²Ù×÷Ò²ÓÉRDD½øÐгéʵÏÖ¡£

RDDµÄÉú³É

RDDÓÐÁ½ÖÖ´´½¨·½Ê½£º

1¡¢´ÓHadoopÎļþϵͳ£¨»òÓëHadoop¼æÈÝµÄÆäËü´æ´¢ÏµÍ³£©ÊäÈ루ÀýÈçHDFS£©´´½¨¡£

2¡¢´Ó¸¸RDDת»»µÃµ½ÐÂRDD¡£

ÏÂÃæÀ´¿´Ò»´ÓHadoopÎļþϵͳÉú³ÉRDDµÄ·½Ê½£¬È磺val file = spark.textFile("hdfs://...")£¬file±äÁ¿¾ÍÊÇRDD£¨Êµ¼ÊÊÇHadoopRDDʵÀý£©£¬Éú³ÉµÄËüµÄºËÐÄ´úÂëÈçÏ£º

// SparkContext¸ù¾ÝÎļþ/Ŀ¼¼°¿ÉÑ¡µÄ·ÖƬÊý´´½¨RDD, ÕâÀïÎÒÃÇ¿ÉÒÔ¿´µ½SparkÓëHadoop MapReduceºÜÏñ 
// ÐèÒªInputFormat, Key¡¢ValueµÄÀàÐÍ£¬ÆäʵSparkʹÓõÄHadoopµÄInputFormat, WritableÀàÐÍ¡£
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], minSplits) .map(pair => pair._2.toString) }

// ¸ù¾ÝHadoopÅäÖ㬼°InputFormatµÈ´´½¨HadoopRDD
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

¶ÔRDD½øÐмÆËãʱ£¬RDD´ÓHDFS¶ÁÈ¡Êý¾ÝʱÓëHadoop MapReduce¼¸ºõÒ»ÑùµÄ£º

// ¸ù¾ÝhadoopÅäÖÃºÍ·ÖÆ¬´ÓInputFormatÖлñÈ¡RecordReader½øÐÐÊý¾ÝµÄ¶ÁÈ¡¡£ 
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)

val key: K = reader.createKey()
val value: V = reader.createValue()

//ʹÓÃHadoop MapReduceµÄRecordReader¶ÁÈ¡Êý¾Ý£¬Ã¿¸öKey¡¢Value¶ÔÒÔÔª×é·µ»Ø¡£
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
}

RDDµÄת»»Óë²Ù×÷

¶ÔÓÚRDD¿ÉÒÔÓÐÁ½ÖÖ¼ÆË㷽ʽ£º×ª»»£¨·µ»ØÖµ»¹ÊÇÒ»¸öRDD£©Óë²Ù×÷£¨·µ»ØÖµ²»ÊÇÒ»¸öRDD£©¡£

ת»»(Transformations) (È磺map, filter, groupBy, joinµÈ)£¬Transformations²Ù×÷ÊÇLazyµÄ£¬Ò²¾ÍÊÇ˵´ÓÒ»¸öRDDת»»Éú³ÉÁíÒ»¸öRDDµÄ²Ù×÷²»ÊÇÂíÉÏÖ´ÐУ¬SparkÔÚÓöµ½Transformations²Ù×÷ʱֻ»á¼Ç¼ÐèÒªÕâÑùµÄ²Ù×÷£¬²¢²»»áÈ¥Ö´ÐУ¬ÐèÒªµÈµ½ÓÐActions²Ù×÷µÄʱºò²Å»áÕæÕýÆô¶¯¼ÆËã¹ý³Ì½øÐмÆËã¡£

²Ù×÷(Actions) (È磺count, collect, saveµÈ)£¬Actions²Ù×÷»á·µ»Ø½á¹û»ò°ÑRDDÊý¾Ýдµ½´æ´¢ÏµÍ³ÖС£ActionsÊÇ´¥·¢SparkÆô¶¯¼ÆËãµÄ¶¯Òò¡£

ÏÂÃæÊ¹ÓÃÒ»¸öÀý×ÓÀ´Ê¾Àý˵Ã÷TransformationsÓëActionsÔÚSparkµÄʹÓá£

val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), 
Seq(System.getenv("SPARK_TEST_JAR")))

val rdd_A = sc.textFile(hdfs://.....)
val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1))

val rdd_C = sc.textFile(hdfs://.....)
val rdd_D = rdd_C.map(line => (line.substring(10), 1))
val rdd_E = rdd_D.reduceByKey((a, b) => a + b)

val rdd_F = rdd_B.jion(rdd_E)

rdd_F.saveAsSequenceFile(hdfs://....)

Lineage£¨ÑªÍ³£©

ÀûÓÃÄÚ´æ¼Ó¿ìÊý¾Ý¼ÓÔØ,ÔÚÖÚ¶àµÄÆäËüµÄIn-MemoryÀàÊý¾Ý¿â»òCacheÀàϵͳÖÐÒ²ÓÐʵÏÖ£¬SparkµÄÖ÷񻂿±ðÔÚÓÚËü´¦Àí·Ö²¼Ê½ÔËËã»·¾³ÏµÄÊý¾ÝÈÝ´íÐÔ£¨½ÚµãʵЧ/Êý¾Ý¶ªÊ§£©ÎÊÌâʱ²ÉÓõķ½°¸¡£ÎªÁ˱£Ö¤RDDÖÐÊý¾ÝµÄ³°ôÐÔ£¬RDDÊý¾Ý¼¯Í¨¹ýËùνµÄѪͳ¹ØÏµ(Lineage)¼ÇסÁËËüÊÇÈçºÎ´ÓÆäËüRDDÖÐÑݱä¹ýÀ´µÄ¡£Ïà±ÈÆäËüϵͳµÄϸ¿ÅÁ£¶ÈµÄÄÚ´æÊý¾Ý¸üм¶±ðµÄ±¸·Ý»òÕßLOG»úÖÆ£¬RDDµÄLineage¼Ç¼µÄÊÇ´Ö¿ÅÁ£¶ÈµÄÌØ¶¨Êý¾Ýת»»£¨Transformation£©²Ù×÷£¨filter, map, join etc.)ÐÐΪ¡£µ±Õâ¸öRDDµÄ²¿·Ö·ÖÇøÊý¾Ý¶ªÊ§Ê±£¬Ëü¿ÉÒÔͨ¹ýLineage»ñÈ¡×ã¹»µÄÐÅÏ¢À´ÖØÐÂÔËËãºÍ»Ö¸´¶ªÊ§µÄÊý¾Ý·ÖÇø¡£ÕâÖÖ´Ö¿ÅÁ£µÄÊý¾ÝÄ£ÐÍ£¬ÏÞÖÆÁËSparkµÄÔËÓó¡ºÏ£¬µ«Í¬Ê±Ïà±Èϸ¿ÅÁ£¶ÈµÄÊý¾ÝÄ£ÐÍ£¬Ò²´øÀ´ÁËÐÔÄܵÄÌáÉý¡£

RDDÔÚLineageÒÀÀµ·½Ãæ·ÖΪÁ½ÖÖNarrow DependenciesÓëWide DependenciesÓÃÀ´½â¾öÊý¾ÝÈÝ´íµÄ¸ßЧÐÔ¡£Narrow DependenciesÊÇÖ¸¸¸RDDµÄÿһ¸ö·ÖÇø×î¶à±»Ò»¸ö×ÓRDDµÄ·ÖÇøËùÓ㬱íÏÖΪһ¸ö¸¸RDDµÄ·ÖÇø¶ÔÓ¦ÓÚÒ»¸ö×ÓRDDµÄ·ÖÇø»ò¶à¸ö¸¸RDDµÄ·ÖÇø¶ÔÓ¦ÓÚÒ»¸ö×ÓRDDµÄ·ÖÇø£¬Ò²¾ÍÊÇ˵һ¸ö¸¸RDDµÄÒ»¸ö·ÖÇø²»¿ÉÄܶÔÓ¦Ò»¸ö×ÓRDDµÄ¶à¸ö·ÖÇø¡£Wide DependenciesÊÇÖ¸×ÓRDDµÄ·ÖÇøÒÀÀµÓÚ¸¸RDDµÄ¶à¸ö·ÖÇø»òËùÓзÖÇø£¬Ò²¾ÍÊÇ˵´æÔÚÒ»¸ö¸¸RDDµÄÒ»¸ö·ÖÇø¶ÔÓ¦Ò»¸ö×ÓRDDµÄ¶à¸ö·ÖÇø¡£¶ÔÓëWide Dependencies£¬ÕâÖÖ¼ÆËãµÄÊäÈëºÍÊä³öÔÚ²»Í¬µÄ½ÚµãÉÏ£¬lineage·½·¨¶ÔÓëÊäÈë½ÚµãÍêºÃ£¬¶øÊä³ö½Úµãå´»úʱ£¬Í¨¹ýÖØÐ¼ÆË㣬ÕâÖÖÇé¿öÏ£¬ÕâÖÖ·½·¨ÈÝ´íÊÇÓÐЧµÄ£¬·ñÔòÎÞЧ£¬ÒòΪÎÞ·¨ÖØÊÔ£¬ÐèÒªÏòÉÏÆä׿ÏÈ×·ËÝ¿´ÊÇ·ñ¿ÉÒÔÖØÊÔ£¨Õâ¾ÍÊÇlineage£¬ÑªÍ³µÄÒâ˼£©£¬Narrow Dependencies¶ÔÓÚÊý¾ÝµÄÖØË㿪ÏúҪԶСÓÚWide DependenciesµÄÊý¾ÝÖØË㿪Ïú¡£

ÈÝ´í

ÔÚRDD¼ÆË㣬ͨ¹ýcheckpint½øÐÐÈÝ´í£¬×öcheckpointÓÐÁ½ÖÖ·½Ê½£¬Ò»¸öÊÇcheckpoint data£¬Ò»¸öÊÇlogging the updates¡£Óû§¿ÉÒÔ¿ØÖƲÉÓÃÄÄÖÖ·½Ê½À´ÊµÏÖÈÝ´í£¬Ä¬ÈÏÊÇlogging the updates·½Ê½£¬Í¨¹ý¼Ç¼¸ú×ÙËùÓÐÉú³ÉRDDµÄת»»£¨transformations£©Ò²¾ÍÊǼǼÿ¸öRDDµÄlineage£¨ÑªÍ³£©À´ÖØÐ¼ÆËãÉú³É¶ªÊ§µÄ·ÖÇøÊý¾Ý¡£

×ÊÔ´¹ÜÀíÓë×÷Òµµ÷¶È

Spark¶ÔÓÚ×ÊÔ´¹ÜÀíÓë×÷Òµµ÷¶È¿ÉÒÔʹÓÃStandalone(¶ÀÁ¢Ä£Ê½)£¬Apache Mesos¼°Hadoop YARNÀ´ÊµÏÖ¡£ Spark on YarnÔÚSpark0.6ʱÒýÓ㬵«ÕæÕý¿ÉÓÃÊÇÔÚÏÖÔÚµÄbranch-0.8°æ±¾¡£Spark on Yarn×ñÑ­YARNµÄ¹Ù·½¹æ·¶ÊµÏÖ£¬µÃÒæÓÚSparkÌìÉúÖ§³Ö¶àÖÖSchedulerºÍExecutorµÄÁ¼ºÃÉè¼Æ£¬¶ÔYARNµÄÖ§³ÖÒ²¾Í·Ç³£ÈÝÒ×£¬Spark on YarnµÄ´óÖ¿ò¼Üͼ¡£

ÈÃSparkÔËÐÐÓÚYARNÉÏÓëHadoop¹²Óü¯Èº×ÊÔ´¿ÉÒÔÌá¸ß×ÊÔ´ÀûÓÃÂÊ¡£

±à³Ì½Ó¿Ú

Sparkͨ¹ýÓë±à³ÌÓïÑÔ¼¯³ÉµÄ·½Ê½±©Â¶RDDµÄ²Ù×÷£¬ÀàËÆÓÚDryadLINQºÍFlumeJava£¬Ã¿¸öÊý¾Ý¼¯¶¼±íʾΪRDD¶ÔÏ󣬶ÔÊý¾Ý¼¯µÄ²Ù×÷¾Í±íʾ³É¶ÔRDD¶ÔÏóµÄ²Ù×÷¡£SparkÖ÷ÒªµÄ±à³ÌÓïÑÔÊÇScala£¬Ñ¡ÔñScalaÊÇÒòΪËüµÄ¼ò½àÐÔ£¨Scala¿ÉÒԺܷ½±ãÔÚ½»»¥Ê½ÏÂʹÓ㩺ÍÐÔÄÜ£¨JVMÉϵľ²Ì¬Ç¿ÀàÐÍÓïÑÔ£©¡£

SparkºÍHadoop MapReduceÀàËÆ£¬ÓÉMaster(ÀàËÆÓÚMapReduceµÄJobtracker)ºÍWorkers(SparkµÄSlave¹¤×÷½Úµã)×é³É¡£Óû§±àдµÄSpark³ÌÐò±»³ÆÎªDriver³ÌÐò£¬Dirver³ÌÐò»áÁ¬½Ómaster²¢¶¨ÒåÁ˶Ը÷RDDµÄת»»Óë²Ù×÷£¬¶ø¶ÔRDDµÄת»»Óë²Ù×÷ͨ¹ýScala±Õ°ü(×ÖÃæÁ¿º¯Êý)À´±íʾ£¬ScalaʹÓÃJava¶ÔÏóÀ´±íʾ±Õ°üÇÒ¶¼ÊÇ¿ÉÐòÁл¯µÄ£¬ÒԴ˰ѶÔRDDµÄ±Õ°ü²Ù×÷·¢Ë͵½¸÷Workers½Úµã¡£ Workers´æ´¢×ÅÊý¾Ý·Ö¿éºÍÏíÓм¯ÈºÄڴ棬ÊÇÔËÐÐÔÚ¹¤×÷½ÚµãÉϵÄÊØ»¤½ø³Ì£¬µ±ËüÊÕµ½¶ÔRDDµÄ²Ù×÷ʱ£¬¸ù¾ÝÊý¾Ý·ÖƬÐÅÏ¢½øÐб¾µØ»¯Êý¾Ý²Ù×÷£¬Éú³ÉеÄÊý¾Ý·ÖƬ¡¢·µ»Ø½á¹û»ò°ÑRDDдÈë´æ´¢ÏµÍ³¡£

Scala

SparkʹÓÃScala¿ª·¢£¬Ä¬ÈÏʹÓÃScala×÷Ϊ±à³ÌÓïÑÔ¡£±àдSpark³ÌÐò±È±àдHadoop MapReduce³ÌÐòÒª¼òµ¥µÄ¶à£¬SparKÌṩÁËSpark-Shell£¬¿ÉÒÔÔÚSpark-Shell²âÊÔ³ÌÐò¡£Ð´SparK³ÌÐòµÄÒ»°ã²½Öè¾ÍÊÇ´´½¨»òʹÓÃ(SparkContext)ʵÀý£¬Ê¹ÓÃSparkContext´´½¨RDD£¬È»ºó¾ÍÊǶÔRDD½øÐвÙ×÷¡£È磺

val sc = new SparkContext(master, appName, [sparkHome], [jars]) 
val textFile = sc.textFile("hdfs://.....")
textFile.map(....).filter(.....).....

Java

SparkÖ§³ÖJava±à³Ì£¬µ«¶ÔÓÚʹÓÃJava¾ÍûÓÐÁËSpark-ShellÕâÑù·½±ãµÄ¹¤¾ß£¬ÆäËüÓëScala±à³ÌÊÇÒ»ÑùµÄ£¬ÒòΪ¶¼ÊÇJVMÉϵÄÓïÑÔ£¬ScalaÓëJava¿ÉÒÔ»¥²Ù×÷£¬Java±à³Ì½Ó¿ÚÆäʵ¾ÍÊǶÔScalaµÄ·â×°¡£È磺

JavaSparkContext sc = new JavaSparkContext(...);  
JavaRDD lines = ctx.textFile("hdfs://...");
JavaRDD words = lines.flatMap(
new FlatMapFunction<String, String>() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
}
);

Python

ÏÖÔÚSparkÒ²ÌṩÁËPython±à³Ì½Ó¿Ú£¬SparkʹÓÃpy4jÀ´ÊµÏÖpythonÓëjavaµÄ»¥²Ù×÷£¬´Ó¶øÊµÏÖʹÓÃpython±àдSpark³ÌÐò¡£SparkҲͬÑùÌṩÁËpyspark£¬Ò»¸öSparkµÄpython shell£¬¿ÉÒÔÒÔ½»»¥Ê½µÄ·½Ê½Ê¹ÓÃPython±àдSpark³ÌÐò¡£ È磺

from pyspark import SparkContext 
sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
words = sc.textFile("/usr/share/dict/words")
words.filter(lambda w: w.startswith("spar")).take(5)

ʹÓÃʾÀý

Standaloneģʽ

Ϊ·½±ãSparkµÄÍÆ¹ãʹÓã¬SparkÌṩÁËStandaloneģʽ£¬SparkÒ»¿ªÊ¼¾ÍÉè¼ÆÔËÐÐÓÚApache Mesos×ÊÔ´¹ÜÀí¿ò¼ÜÉÏ£¬ÕâÊǷdz£ºÃµÄÉè¼Æ£¬µ«ÊÇÈ´´øÁ˲¿Êð²âÊԵĸ´ÔÓÐÔ¡£ÎªÁËÈÃSparkÄܸü·½±ãµÄ²¿ÊðºÍ³¢ÊÔ£¬SparkÒò´ËÌṩÁËStandaloneÔËÐÐģʽ£¬ËüÓÉÒ»¸öSpark MasterºÍ¶à¸öSpark worker×é³É£¬ÓëHadoop MapReduce1ºÜÏàËÆ£¬¾ÍÁ¬¼¯ÈºÆô¶¯·½Ê½¶¼¼¸ºõÊÇÒ»Ñù¡£

ÒÔStandaloneģʽÔËÐÐSpark¼¯Èº

ÏÂÔØScala2.9.3£¬²¢ÅäÖÃSCALA_HOME

ÏÂÔØSpark´úÂ루¿ÉÒÔʹÓÃÔ´Âë±àÒëÒ²¿ÉÒÔÏÂÔØ±àÒëºÃµÄ°æ±¾£©ÕâÀïÏÂÔØ ±àÒëºÃµÄ°æ±¾£¨http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz£©

½âѹspark-0.7.3-prebuilt-cdh4.tgz°²×°°ü

ÐÞ¸ÄÅäÖã¨conf/*£© slaves: ÅäÖù¤×÷½ÚµãµÄÖ÷»úÃû spark-env.sh£ºÅäÖû·¾³±äÁ¿¡£

SCALA_HOME=/home/spark/scala-2.9.3 
JAVA_HOME=/home/spark/jdk1.6.0_45
SPARK_MASTER_IP=spark1
SPARK_MASTER_PORT=30111
SPARK_MASTER_WEBUI_PORT=30118
SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g
SPARK_WORKER_PORT=30333
SPARK_WORKER_WEBUI_PORT=30119
SPARK_WORKER_INSTANCES=1

°ÑHadoopÅäÖÃcopyµ½confĿ¼ÏÂ

ÔÚmasterÖ÷»úÉÏ¶ÔÆäËü»úÆ÷×össhÎÞÃÜÂëµÇ¼

°ÑÅäÖúõÄSpark³ÌÐòʹÓÃscp copyµ½ÆäËü»úÆ÷

ÔÚmasterÆô¶¯¼¯Èº

$SPARK_HOME/start-all.sh

yarnģʽ

Spark-shellÏÖÔÚ»¹²»Ö§³ÖYarnģʽ£¬Ê¹ÓÃYarnģʽÔËÐУ¬ÐèÒª°ÑSpark³ÌÐòÈ«²¿´ò°ü³ÉÒ»¸öjar°üÌá½»µ½YarnÉÏÔËÐС£Ä¿Â¼Ö»ÓÐbranch-0.8°æ±¾²ÅÕæÕýÖ§³ÖYarn¡£

ÒÔYarnģʽÔËÐÐSpark

ÏÂÔØSpark´úÂë.

git clone git://github.com/mesos/spark

Çл»µ½branch-0.8

cd spark 
git checkout -b yarn --track origin/yarn

ʹÓÃsbt±àÒëSpark²¢

$SPARK_HOME/sbt/sbt 
> package
> assembly

°ÑHadoop yarnÅäÖÃcopyµ½confĿ¼ÏÂ

ÔËÐвâÊÔ

  SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ 
./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \
--class spark.examples.SparkPi --args yarn-standalone

ʹÓÃSpark-shell

Spark-shellʹÓúܼòµ¥£¬µ±SparkÒÔStandalonģʽÔËÐкó£¬Ê¹ÓÃ$SPARK_HOME/spark-shell½øÈëshell¼´¿É£¬ÔÚSpark-shellÖÐSparkContextÒѾ­´´½¨ºÃÁË£¬ÊµÀýÃûΪsc¿ÉÒÔÖ±½ÓʹÓ㬻¹ÓÐÒ»¸öÐèҪעÒâµÄÊÇ£¬ÔÚStandaloneģʽÏ£¬SparkĬÈÏʹÓõĵ÷¶ÈÆ÷µÄFIFOµ÷¶ÈÆ÷¶ø²»Êǹ«Æ½µ÷¶È£¬¶øSpark-shell×÷Ϊһ¸öSpark³ÌÐòÒ»Ö±ÔËÐÐÔÚSparkÉÏ£¬ÆäËüµÄSpark³ÌÐò¾ÍÖ»ÄÜÅŶӵȴý£¬Ò²¾ÍÊÇ˵ͬһʱ¼äÖ»ÄÜÓÐÒ»¸öSpark-shellÔÚÔËÐС£

ÔÚSpark-shellÉÏд³ÌÐò·Ç³£¼òµ¥£¬¾ÍÏñÔÚScala ShellÉÏд³ÌÐòÒ»Ñù¡£

scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") 
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

scala> textFile.count() // Number of items in this RDD
res0: Long = 21374

scala> textFile.first() // First item in this RDD
res1: String = # Spark

±àдDriver³ÌÐò

ÔÚSparkÖÐSpark³ÌÐò³ÆÎªDriver³ÌÐò£¬±àдDriver³ÌÐòºÜ¼òµ¥¼¸ºõÓëÔÚSpark-shellÉÏд³ÌÐòÊÇÒ»ÑùµÄ£¬²»Í¬µÄµØ·½¾ÍÊÇSparkContextÐèÒª×Ô¼º´´½¨¡£ÈçWorkCount³ÌÐòÈçÏ£º

import spark.SparkContext
import SparkContext._

object WordCount {
def main(args: Array[String]) {
if (args.length ==0 ){
println("usage is org.test.WordCount ")
}
println("the args: ")
args.foreach(println)

val hdfsPath = "hdfs://hadoop1:8020"

// create the SparkContext£¬ args(0)ÓÉyarn´«ÈëappMasterµØÖ·
val sc = new SparkContext(args(0), "WrodCount",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

val textFile = sc.textFile(hdfsPath + args(1))

val result = textFile.flatMap(line => line.split("\\s+"))
.map(word => (word, 1)).reduceByKey(_ + _)

result.saveAsTextFile(hdfsPath + args(2))
}
}
   
2756 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ

²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí