Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark֪ʶÌåϵÍêÕû½â¶Á
 
×÷ÕߣºÑî˼Òå À´Ô´£º51CTO ·¢²¼ÓÚ£º 2017-4-6
 

Spark¼ò½é

SparkÊÇÕû¸öBDASµÄºËÐÄ×é¼þ£¬ÊÇÒ»¸ö´óÊý¾Ý·Ö²¼Ê½±à³Ì¿ò¼Ü£¬²»½öʵÏÖÁËMapReduceµÄËã×Ómap º¯ÊýºÍreduceº¯Êý¼°¼ÆËãÄ£ÐÍ£¬»¹Ìṩ¸üΪ·á¸»µÄËã×Ó£¬Èçfilter¡¢join¡¢groupByKeyµÈ¡£ÊÇÒ»¸öÓÃÀ´ÊµÏÖ¿ìËÙ¶øÍ¬Óõļ¯Èº¼ÆËãµÄƽ̨¡£ Spark½«·Ö²¼Ê½Êý¾Ý³éÏóΪµ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯£¨RDD£©£¬ÊµÏÖÁËÓ¦ÓÃÈÎÎñµ÷¶È¡¢RPC¡¢ÐòÁл¯ºÍѹËõ£¬²¢ÎªÔËÐÐÔÚÆäÉϵÄÉϲã×é¼þÌṩAPI¡£Æäµ×²ã²ÉÓÃScalaÕâÖÖº¯ÊýʽÓïÑÔÊéд¶ø³É£¬²¢ÇÒËùÌṩµÄAPIÉî¶È½è¼øScalaº¯ÊýʽµÄ±à³Ì˼Ï룬ÌṩÓëScalaÀàËÆµÄ±à³Ì½Ó¿Ú

Sparkon Yarn

´ÓÓû§Ìá½»×÷Òµµ½×÷ÒµÔËÐнáÊøÕû¸öÔËÐÐÆÚ¼äµÄ¹ý³Ì·ÖÎö¡£

Ò»¡¢¿Í»§¶Ë½øÐвÙ×÷

1.¸ù¾ÝyarnConfÀ´³õʼ»¯yarnClient£¬²¢Æô¶¯yarnClient

2.´´½¨¿Í»§¶ËApplication£¬²¢»ñÈ¡ApplicationµÄID£¬½øÒ»²½Åжϼ¯ÈºÖеÄ×ÊÔ´ÊÇ·ñÂú×ãexecutorºÍApplicationMasterÉêÇëµÄ×ÊÔ´£¬Èç¹û²»Âú×ãÔòÅ׳öIllegalArgumentException£»

3. ÉèÖÃ×ÊÔ´¡¢»·¾³±äÁ¿£ºÆäÖаüÀ¨ÁËÉèÖÃApplicationµÄStagingĿ¼¡¢×¼±¸±¾µØ×ÊÔ´£¨jarÎļþ¡¢log4j.properties£©¡¢ÉèÖÃApplicationÆäÖеĻ·¾³±äÁ¿¡¢´´½¨ContainerÆô¶¯µÄContextµÈ£»

4. ÉèÖÃApplicationÌá½»µÄContext£¬°üÀ¨ÉèÖÃÓ¦ÓõÄÃû×Ö¡¢¶ÓÁС¢AMµÄÉêÇëµÄContainer¡¢±ê¼Ç¸Ã×÷ÒµµÄÀàÐÍΪSpark£»

5. ÉêÇëMemory£¬²¢×îÖÕͨ¹ýyarnClient.submitApplicationÏòResourceManagerÌá½»¸ÃApplication¡£

µ±×÷ÒµÌá½»µ½YARNÉÏÖ®ºó£¬¿Í»§¶Ë¾ÍûÊÂÁË£¬ÉõÖÁÔÚÖն˹صôÄǸö½ø³ÌҲûÊ£¬ÒòΪÕû¸ö×÷ÒµÔËÐÐÔÚYARN¼¯ÈºÉϽøÐУ¬ÔËÐеĽá¹û½«»á±£´æµ½HDFS»òÕßÈÕÖ¾ÖС£

¶þ¡¢Ìá½»µ½YARN¼¯Èº£¬YARN²Ù×÷

1.ÔËÐÐApplicationMasterµÄrun·½·¨£»

2.ÉèÖúÃÏà¹ØµÄ»·¾³±äÁ¿¡£

3.´´½¨amClient£¬²¢Æô¶¯£»

4.ÔÚSpark UIÆô¶¯Ö®Ç°ÉèÖÃSpark UIµÄAmIpFilter£»

5.ÔÚstartUserClassº¯ÊýרÃÅÆô¶¯ÁËÒ»¸öỊ̈߳¨Ãû³ÆÎªDriverµÄỊ̈߳©À´Æô¶¯Óû§Ìá½»µÄApplication£¬Ò²¾ÍÊÇÆô¶¯ÁËDriver¡£ÔÚDriverÖн«»á³õʼ»¯SparkContext£»

6.µÈ´ýSparkContext³õʼ»¯Íê³É£¬×î¶àµÈ´ýspark.yarn.applicationMaster.waitTries´ÎÊý£¨Ä¬ÈÏΪ10£©£¬Èç¹ûµÈ´ýÁ˵ĴÎÊý³¬¹ýÁËÅäÖõ쬳ÌÐò½«»áÍ˳ö£»·ñÔòÓÃSparkContext³õʼ»¯yarnAllocator£»

7.µ±SparkContext¡¢Driver³õʼ»¯Íê³ÉµÄʱºò£¬Í¨¹ýamClientÏòResourceManager×¢²áApplicationMaster

8.·ÖÅä²¢Æô¶¯Executeors¡£ÔÚÆô¶¯Executeors֮ǰ£¬ÏÈҪͨ¹ýyarnAllocator»ñÈ¡µ½numExecutors¸öContainer£¬È»ºóÔÚContainerÖÐÆô¶¯Executeors¡£

ÄÇôÕâ¸öApplication½«Ê§°Ü£¬½«Application Status±êÃ÷ΪFAILED£¬²¢½«¹Ø±ÕSparkContext¡£Æäʵ£¬Æô¶¯ExecuteorsÊÇͨ¹ýExecutorRunnableʵÏֵ쬶øExecutorRunnableÄÚ²¿ÊÇÆô¶¯CoarseGrainedExecutorBackendµÄ¡£

9.×îºó£¬Task½«ÔÚCoarseGrainedExecutorBackendÀïÃæÔËÐУ¬È»ºóÔËÐÐ×´¿ö»áͨ¹ýAkka֪ͨCoarseGrainedScheduler£¬Ö±µ½×÷ÒµÔËÐÐÍê³É¡£

Spark½ÚµãµÄ¸ÅÄî

Ò»¡¢SparkÇý¶¯Æ÷ÊÇÖ´ÐгÌÐòÖеÄmain()·½·¨µÄ½ø³Ì¡£ËüÖ´ÐÐÓû§±àдµÄÓÃÀ´´´½¨SparkContext(³õʼ»¯)¡¢´´½¨RDD£¬ÒÔ¼°ÔËÐÐRDDµÄת»¯²Ù×÷ºÍÐж¯²Ù×÷µÄ´úÂë¡£

Çý¶¯Æ÷½ÚµãdriverµÄÖ°Ôð£º

1.°ÑÓû§³ÌÐòתΪÈÎÎñtask(driver)

SparkÇý¶¯Æ÷³ÌÐò¸ºÔð°ÑÓû§³ÌÐòת»¯Îª¶à¸öÎïÀíÖ´Ðе¥Ôª£¬ÕâЩµ¥ÔªÒ²±»³ÆÖ®ÎªÈÎÎñtask(Ïê½â¼û±¸×¢)

2.ΪִÐÐÆ÷½Úµãµ÷¶ÈÈÎÎñ(executor)

ÓÐÁËÎïÀí¼Æ»®Ö®ºó£¬SparkÇý¶¯Æ÷ÔÚ¸÷¸öÖ´ÐÐÆ÷½Úµã½ø³Ì¼äЭµ÷ÈÎÎñµÄµ÷¶È¡£SparkÇý¶¯Æ÷³ÌÐò»á¸ù¾Ýµ±Ç°µÄÖ´ÐÐÆ÷½Úµã£¬°ÑËùÓÐÈÎÎñ»ùÓÚÊý¾ÝËùÔÚλÖ÷ÖÅ䏸ºÏÊʵÄÖ´ÐÐÆ÷½ø³Ì¡£µ±Ö´ÐÐÈÎÎñʱ£¬Ö´ÐÐÆ÷½ø³Ì»á°Ñ»º´æµÄÊý¾Ý´æ´¢ÆðÀ´£¬¶øÇý¶¯Æ÷½ø³ÌͬÑù»á¸ú×ÙÕâЩ»º´æÊý¾ÝµÄλÖ㬲¢ÀûÓÃÕâЩλÖÃÐÅÏ¢À´µ÷¶ÈÒÔºóµÄÈÎÎñ£¬ÒÔ¾¡Á¿¼õÉÙÊý¾ÝµÄÍøÂç´«Êä¡££¨¾ÍÊÇËùνµÄÒÆ¶¯¼ÆË㣬¶ø²»Òƶ¯Êý¾Ý)¡£

¶þ¡¢Ö´ÐÐÆ÷½Úµã

×÷Óãº

1. ¸ºÔðÔËÐÐ×é³ÉSparkÓ¦ÓõÄÈÎÎñ£¬²¢½«½á¹û·µ»Ø¸øÇý¶¯Æ÷½ø³Ì£»

2. ͨ¹ý×ÔÉíµÄ¿é¹ÜÀíÆ÷(blockManager)ΪÓû§³ÌÐòÖÐÒªÇ󻺴æµÄRDDÌṩÄÚ´æÊ½´æ´¢¡£RDDÊÇÖ±½Ó»º´æÔÚÖ´ÐÐÆ÷½ø³ÌÄڵģ¬Òò´ËÈÎÎñ¿ÉÒÔÔÚÔËÐÐʱ³ä·ÖÀûÓûº´æÊý¾Ý¼Ó¿ìÔËËã¡£

Çý¶¯Æ÷µÄÖ°Ôð£º

ËùÓеÄSpark³ÌÐò¶¼×ñѭͬÑùµÄ½á¹¹£º³ÌÐò´ÓÊäÈëÊý¾Ý´´½¨Ò»ÏµÁÐRDD£¬ÔÙʹÓÃת»¯²Ù×÷ÅÉÉú³ÉеÄRDD£¬×îºóʹÓÃÐж¯²Ù×÷ÊÖ»ú»ò´æ´¢½á¹ûRDD£¬Spark³ÌÐòÆäʵÊÇÒþʽµØ´´½¨³öÁËÒ»¸öÓɲÙ×÷×é³ÉµÄÂß¼­ÉϵÄÓÐÏòÎÞ»·Í¼DAG¡£µ±Çý¶¯Æ÷³ÌÐòÖ´ÐÐʱ£¬Ëü»á°ÑÕâ¸öÂß¼­Í¼×ªÎªÎïÀíÖ´Ðмƻ®¡£

ÕâÑù Spark¾Í°ÑÂß¼­¼Æ»®×ªÎªÒ»ÏµÁв½Öè(stage)£¬¶øÃ¿¸ö²½ÖèÓÖÓɶà¸öÈÎÎñ×é³É¡£ÕâЩÈÎÎñ»á±»´ò°üË͵½¼¯ÈºÖС£

Spark³õʼ»¯

1.ÿ¸öSparkÓ¦Óö¼ÓÉÒ»¸öÇý¶¯Æ÷³ÌÐòÀ´·¢Æð¼¯ÈºÉϵĸ÷ÖÖ²¢ÐвÙ×÷¡£Çý¶¯Æ÷³ÌÐò°üº¬Ó¦ÓõÄmainº¯Êý£¬²¢ÇÒ¶¨ÒåÁ˼¯ÈºÉϵķֲ¼Ê½Êý¾Ý¼¯£¬ÒÔ¼°¶Ô¸Ã·Ö²¼Ê½Êý¾Ý¼¯Ó¦ÓÃÁËÏà¹Ø²Ù×÷¡£

2. Çý¶¯Æ÷³ÌÐòͨ¹ýÒ»¸öSparkContext¶ÔÏóÀ´·ÃÎÊspark,Õâ¸ö¶ÔÏó´ú±í¶Ô¼ÆË㼯ȺµÄÒ»¸öÁ¬½Ó¡££¨±ÈÈçÔÚsparkshellÆô¶¯Ê±ÒѾ­×Ô¶¯´´½¨ÁËÒ»¸öSparkContext¶ÔÏó£¬ÊÇÒ»¸ö½Ð×öSCµÄ±äÁ¿¡£(ÏÂͼ£¬²é¿´±äÁ¿sc)

3.Ò»µ©´´½¨ÁËsparkContext£¬¾Í¿ÉÒÔÓÃËüÀ´´´½¨RDD¡£±ÈÈçµ÷ÓÃsc.textFile()À´´´½¨Ò»¸ö´ú±íÎı¾Öи÷ÐÐÎı¾µÄRDD¡££¨±ÈÈçvallinesRDD = sc.textFile(¡°yangsy.text¡±),val spark = linesRDD.filter (line=>line.contains (¡°spark¡±),spark.count()£©

Ö´ÐÐÕâЩ²Ù×÷£¬Çý¶¯Æ÷³ÌÐòÒ»°ãÒª¹ÜÀí¶à¸öÖ´ÐÐÆ÷,¾ÍÊÇÎÒÃÇËù˵µÄexecutor½Úµã¡£

4. ÔÚ³õʼ»¯SparkContextµÄͬʱ£¬¼ÓÔØsparkConf¶ÔÏóÀ´¼ÓÔØ¼¯ÈºµÄÅäÖ㬴Ӷø´´½¨sparkContext¶ÔÏó¡£

´ÓÔ´ÂëÖпÉÒÔ¿´µ½£¬ÔÚÆô¶¯thriftserverʱ£¬µ÷ÓÃÁËspark- daemon.shÎļþ£¬¸ÃÎļþÔ´ÂëÈç×óͼ£¬¼ÓÔØspark_homeϵÄconfÖеÄÎļþ¡£

£¨ÔÚÖ´Ðкǫ́´úÂëʱ£¬ÐèÒªÊ×ÏÈ´´½¨conf¶ÔÏ󣬼ÓÔØÏàÓ¦²ÎÊý£¬ val sparkConf = newSparkConf().setMaster("local") .setAppName("cocapp") .set("spark.executor.memory", "1g"), val sc: SparkContext = new SparkContext(sparkConf))

RDD¹¤×÷Ô­Àí£º

RDD(Resilient DistributedDatasets)[1] ,µ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯£¬ÊÇ·Ö²¼Ê½ÄÚ´æµÄÒ»¸ö³éÏó¸ÅÄRDDÌṩÁËÒ»Öָ߶ÈÊÜÏ޵Ĺ²ÏíÄÚ´æÄ£ÐÍ£¬¼´RDDÊÇÖ»¶ÁµÄ¼Ç¼·ÖÇøµÄ¼¯ºÏ£¬Ö»ÄÜͨ¹ýÔÚÆäËûRDDÖ´ÐÐÈ·¶¨µÄת»»²Ù×÷£¨Èçmap¡¢joinºÍgroup by£©¶ø´´½¨£¬È»¶øÕâЩÏÞÖÆÊ¹µÃʵÏÖÈÝ´íµÄ¿ªÏúºÜµÍ¡£¶Ô¿ª·¢Õß¶øÑÔ£¬RDD¿ÉÒÔ¿´×÷ÊÇSparkµÄÒ»¸ö¶ÔÏó£¬Ëü±¾ÉíÔËÐÐÓÚÄÚ´æÖУ¬Èç¶ÁÎļþÊÇÒ»¸öRDD£¬¶ÔÎļþ¼ÆËãÊÇÒ»¸öRDD£¬½á¹û¼¯Ò²ÊÇÒ»¸öRDD £¬²»Í¬µÄ·ÖƬ¡¢Êý¾ÝÖ®¼äµÄÒÀÀµ¡¢key-valueÀàÐ͵ÄmapÊý¾Ý¶¼¿ÉÒÔ¿´×öRDD¡£

Ö÷Òª·ÖΪÈý²¿·Ö£º´´½¨RDD¶ÔÏó£¬DAGµ÷¶ÈÆ÷´´½¨Ö´Ðмƻ®£¬Taskµ÷¶ÈÆ÷·ÖÅäÈÎÎñ²¢µ÷¶ÈWorker¿ªÊ¼ÔËÐС£

SparkContext(RDDÏà¹Ø²Ù×÷)¡úͨ¹ý(Ìá½»×÷Òµ)¡ú(±éÀúRDD²ð·Östage¡úÉú³É×÷Òµ)DAGScheduler¡úͨ¹ý£¨Ìá½»ÈÎÎñ¼¯£©¡úÈÎÎñµ÷¶È¹ÜÀí(TaskScheduler)¡úͨ¹ý£¨°´ÕÕ×ÊÔ´»ñÈ¡ÈÎÎñ)¡úÈÎÎñµ÷¶È¹ÜÀí(TaskSetManager)

Transformation·µ»ØÖµ»¹ÊÇÒ»¸öRDD¡£ËüʹÓÃÁËÁ´Ê½µ÷ÓõÄÉè¼ÆÄ£Ê½£¬¶ÔÒ»¸öRDD½øÐмÆËãºó£¬±ä»»³ÉÁíÍâÒ»¸öRDD£¬È»ºóÕâ¸öRDDÓÖ¿ÉÒÔ½øÐÐÁíÍâÒ»´Îת»»¡£Õâ¸ö¹ý³ÌÊÇ·Ö²¼Ê½µÄ¡£

Action·µ»ØÖµ²»ÊÇÒ»¸öRDD¡£ËüҪôÊÇÒ»¸öScalaµÄÆÕͨ¼¯ºÏ£¬ÒªÃ´ÊÇÒ»¸öÖµ£¬ÒªÃ´Êǿգ¬×îÖÕ»ò·µ»Øµ½Driver³ÌÐò£¬»ò°ÑRDDдÈëµ½ÎļþϵͳÖÐ

ת»»(Transformations)(È磺map, filter, groupBy, joinµÈ)£¬Transformations²Ù×÷ÊÇLazyµÄ£¬Ò²¾ÍÊÇ˵´ÓÒ»¸öRDDת»»Éú³ÉÁíÒ»¸öRDDµÄ²Ù×÷²»ÊÇÂíÉÏÖ´ÐУ¬SparkÔÚÓöµ½Transformations²Ù×÷ʱֻ»á¼Ç¼ÐèÒªÕâÑùµÄ²Ù×÷£¬²¢²»»áÈ¥Ö´ÐУ¬ÐèÒªµÈµ½ÓÐActions²Ù×÷µÄʱºò²Å»áÕæÕýÆô¶¯¼ÆËã¹ý³Ì½øÐмÆËã¡£

²Ù×÷(Actions)(È磺count, collect, saveµÈ)£¬Actions²Ù×÷»á·µ»Ø½á¹û»ò°ÑRDDÊý¾Ýдµ½´æ´¢ÏµÍ³ÖС£ActionsÊÇ´¥·¢SparkÆô¶¯¼ÆËãµÄ¶¯Òò¡£

ËüÃDZ¾ÖÊÇø±ðÊÇ£ºTransformation·µ»ØÖµ»¹ÊÇÒ»¸öRDD¡£ËüʹÓÃÁËÁ´Ê½µ÷ÓõÄÉè¼ÆÄ£Ê½£¬¶ÔÒ»¸öRDD½øÐмÆËãºó£¬±ä»»³ÉÁíÍâÒ»¸öRDD£¬È»ºóÕâ¸öRDDÓÖ¿ÉÒÔ½øÐÐÁíÍâÒ»´Îת»»¡£Õâ¸ö¹ý³ÌÊÇ·Ö²¼Ê½µÄ¡£Action·µ»ØÖµ²»ÊÇÒ»¸öRDD¡£ËüҪôÊÇÒ»¸öScalaµÄÆÕͨ¼¯ºÏ£¬ÒªÃ´ÊÇÒ»¸öÖµ£¬ÒªÃ´Êǿգ¬×îÖÕ»ò·µ»Øµ½Driver³ÌÐò£¬»ò°ÑRDDдÈëµ½ÎļþϵͳÖС£¹ØÓÚÕâÁ½¸ö¶¯×÷£¬ÔÚSpark¿ª·¢Ö¸ÄÏÖлáÓоͽøÒ»²½µÄÏêϸ½éÉÜ£¬ËüÃÇÊÇ»ùÓÚSpark¿ª·¢µÄºËÐÄ¡£

RDD»ù´¡

1.SparkÖеÄRDD¾ÍÊÇÒ»¸ö²»¿É±äµÄ·Ö²¼Ê½¶ÔÏ󼯺ϡ£Ã¿¸öRDD¶¼±»·ÖΪ¶à¸ö·ÖÇø£¬ÕâЩ·ÖÇøÔËÐÐÔÚ¼¯ÈºµÄ²»Í¬½ÚµãÉÏ¡£´´½¨RDDµÄ·½·¨ÓÐÁ½ÖÖ£ºÒ»ÖÖÊǶÁȡһ¸öÍⲿÊý¾Ý¼¯£»Ò»ÖÖÊÇÔÚȺ¶«³ÌÐòÀï·Ö·¢Çý¶¯Æ÷³ÌÐòÖеĶÔÏ󼯺ϣ¬²»Èç¸Õ²ÅµÄʾÀý£¬¶ÁÈ¡Îı¾Îļþ×÷Ϊһ¸ö×Ö·û´®µÄRDDµÄʾÀý¡£

2.´´½¨³öÀ´ºó£¬RDDÖ§³ÖÁ½ÖÖÀàÐ͵IJÙ×÷:ת»¯²Ù×÷ºÍÐж¯²Ù×÷

ת»¯²Ù×÷»áÓÉÒ»¸öRDDÉú³ÉÒ»¸öеÄRDD¡££¨±ÈÈç¸Õ²ÅµÄ¸ù¾Ýν´Êɸѡ£©

Ðж¯²Ù×÷»á¶ÔRDD¼ÆËã³öÒ»¸ö½á¹û£¬²¢°Ñ½á¹û·µ»Øµ½Çý¶¯Æ÷³ÌÐòÖУ¬»ò°Ñ½á¹û´æ´¢µ½Íⲿ´æ´¢ÏµÍ³£¨±ÈÈçHDFS£©ÖС£±ÈÈçfirst()²Ù×÷¾ÍÊÇÒ»¸öÐж¯²Ù×÷£¬»á·µ»ØRDDµÄµÚÒ»¸öÔªËØ¡£

×¢£º×ª»¯²Ù×÷ÓëÐж¯²Ù×÷µÄÇø±ðÔÚÓÚSpark¼ÆËãRDDµÄ·½Ê½²»Í¬¡£ËäÈ»Äã¿ÉÒÔÔÚÈκÎʱºò¶¨ÒåÒ»¸öеÄRDD£¬µ«SparkÖ»»á¶èÐÔ¼ÆËãÕâЩRDD¡£ËüÃÇÖ»ÓеÚÒ»¸öÔÚÒ»¸öÐж¯²Ù×÷ÖÐÓõ½Ê±£¬²Å»áÕæÕýµÄ¼ÆËã¡£Ö®ËùÒÔÕâÑùÉè¼Æ£¬ÊÇÒòΪ±ÈÈç¸Õ²Åµ÷ÓÃsc.textFile(...)ʱ¾Í°ÑÎļþÖеÄËùÓÐÐж¼¶ÁÈ¡²¢´æ´¢ÆðÀ´£¬¾Í»áÏûºÄºÜ¶à´æ´¢¿Õ¼ä£¬¶øÎÒÃÇÂíÉÏÓÖҪɸѡµôÆäÖеĺܶàÊý¾Ý¡£

ÕâÀﻹÐèҪעÒâµÄÒ»µãÊÇ£¬spark»áÔÚÄãÿ´Î¶ÔËüÃǽøÐÐÐж¯²Ù×÷Ê±ÖØÐ¼ÆËã¡£Èç¹ûÏëÔÚ¶à¸öÐж¯²Ù×÷ÖÐÖØÓÃͬһ¸öRDD£¬ÄÇô¿ÉÒÔʹÓÃRDD.persist()»òRDD.collect()ÈÃSpark°ÑÕâ¸öRDD»º´æÏÂÀ´¡££¨¿ÉÒÔÊÇÄڴ棬Ҳ¿ÉÒÔÊÇ´ÅÅÌ)

3.Spark»áʹÓÃÆ×ϵͼÀ´¼Ç¼ÕâЩ²»Í¬RDDÖ®¼äµÄÒÀÀµ¹ØÏµ£¬SparkÐèÒªÓÃÕâЩÐÅÏ¢À´°´Ðè¼ÆËãÿ¸öRDD£¬Ò²¿ÉÒÔÒÀ¿¿Æ×ϵͼÔڳ־û¯µÄRDD¶ªÊ§²¿·ÖÊý¾ÝʱÓÃÀ´»Ö¸´Ëù¶ªÊ§µÄÊý¾Ý¡£(ÈçÏÂͼ£¬¹ýÂËerrorsRDDÓëwarningsRDD,×îÖÕµ÷ÓÃunion()º¯Êý)

RDD¼ÆË㷽ʽ

RDDµÄ¿íÕ­ÒÀÀµ

Õ­ÒÀÀµ (narrowdependencies) ºÍ¿íÒÀÀµ (widedependencies) ¡£Õ­ÒÀÀµÊÇÖ¸ ¸¸ RDD µÄÿ¸ö·ÖÇø¶¼Ö»±»×Ó RDD µÄÒ»¸ö·ÖÇøËùʹÓà ¡£ÏàÓ¦µÄ£¬ÄÇô¿íÒÀÀµ¾ÍÊÇÖ¸¸¸ RDD µÄ·ÖÇø±»¶à¸ö×Ó RDD µÄ·ÖÇøËùÒÀÀµ¡£ÀýÈ磬 map ¾ÍÊÇÒ»ÖÖÕ­ÒÀÀµ£¬¶ø join Ôò»áµ¼Ö¿íÒÀÀµ

ÕâÖÖ»®·ÖÓÐÁ½¸öÓô¦¡£Ê×ÏÈ£¬Õ­ÒÀÀµÖ§³ÖÔÚÒ»¸ö½áµãÉϹܵÀ»¯Ö´ÐС£ÀýÈç»ùÓÚÒ»¶ÔÒ»µÄ¹ØÏµ£¬¿ÉÒÔÔÚ filter Ö®ºóÖ´ÐÐ map ¡£Æä´Î£¬Õ­ÒÀÀµÖ§³Ö¸ü¸ßЧµÄ¹ÊÕÏ»¹Ô­¡£ÒòΪ¶ÔÓÚÕ­ÒÀÀµ£¬Ö»ÓжªÊ§µÄ¸¸ RDD µÄ·ÖÇøÐèÒªÖØÐ¼ÆËã¡£¶ø¶ÔÓÚ¿íÒÀÀµ£¬Ò»¸ö½áµãµÄ¹ÊÕÏ¿ÉÄܵ¼ÖÂÀ´×ÔËùÓи¸ RDD µÄ·ÖÇø¶ªÊ§£¬Òò´Ë¾ÍÐèÒªÍêÈ«ÖØÐÂÖ´ÐС£Òò´Ë¶ÔÓÚ¿íÒÀÀµ£¬Spark »áÔÚ³ÖÓи÷¸ö¸¸·ÖÇøµÄ½áµãÉÏ£¬½«ÖмäÊý¾Ý³Ö¾Ã»¯À´¼ò»¯¹ÊÕÏ»¹Ô­£¬¾ÍÏñ MapReduce »á³Ö¾Ã»¯ map µÄÊä³öÒ»Ñù¡£

SparkExample

²½Öè 1 £º´´½¨ RDD ¡£ÉÏÃæµÄÀý×Ó³ýÈ¥×îºóÒ»¸ö collect ÊǸö¶¯×÷£¬²»»á´´½¨ RDD Ö®Íâ£¬Ç°ÃæËĸöת»»¶¼»á´´½¨³öÐ嵀 RDD ¡£Òò´ËµÚÒ»²½¾ÍÊÇ´´½¨ºÃËùÓÐ RDD( ÄÚ²¿µÄÎåÏîÐÅÏ¢ ) ¡£

²½Öè 2 £º´´½¨Ö´Ðмƻ®¡£Spark »á¾¡¿ÉÄܵعܵÀ»¯£¬²¢»ùÓÚÊÇ·ñÒªÖØÐÂ×éÖ¯Êý¾ÝÀ´»®·Ö ½×¶Î (stage) £¬ÀýÈç±¾ÀýÖÐµÄ groupBy() ת»»¾Í»á½«Õû¸öÖ´Ðмƻ®»®·Ö³ÉÁ½½×¶ÎÖ´ÐС£×îÖÕ»á²úÉúÒ»¸ö DAG(directedacyclic graph £¬ÓÐÏòÎÞ»·Í¼ ) ×÷ΪÂß¼­Ö´Ðмƻ®¡£

²½Öè 3 £ºµ÷¶ÈÈÎÎñ¡£ ½«¸÷½×¶Î»®·Ö³É²»Í¬µÄ ÈÎÎñ (task) £¬Ã¿¸öÈÎÎñ¶¼ÊÇÊý¾ÝºÍ¼ÆËãµÄºÏÌå¡£ÔÚ½øÐÐÏÂÒ»½×¶Îǰ£¬µ±Ç°½×¶ÎµÄËùÓÐÈÎÎñ¶¼ÒªÖ´ÐÐÍê³É¡£ÒòΪÏÂÒ»½×¶ÎµÄµÚÒ»¸öת»»Ò»¶¨ÊÇÖØÐÂ×éÖ¯Êý¾ÝµÄ£¬ËùÒÔ±ØÐëµÈµ±Ç°½×¶ÎËùÓнá¹ûÊý¾Ý¶¼¼ÆËã³öÀ´Á˲ÅÄܼÌÐø¡£

¼ÙÉè±¾ÀýÖÐµÄ hdfs://names ÏÂÓÐËĸöÎļþ¿é£¬ÄÇô HadoopRDD ÖÐ partitions ¾Í»áÓÐËĸö·ÖÇø¶ÔÓ¦ÕâËĸö¿éÊý¾Ý£¬Í¬Ê± preferedLocations »áÖ¸Ã÷ÕâËĸö¿éµÄ×î¼ÑλÖá£ÏÖÔÚ£¬¾Í¿ÉÒÔ´´½¨³öËĸöÈÎÎñ£¬²¢µ÷¶Èµ½ºÏÊʵļ¯Èº½áµãÉÏ¡£

SparkÊý¾Ý·ÖÇø

1.SparkµÄÌØÐÔÊǶÔÊý¾Ý¼¯ÔÚ½Úµã¼äµÄ·ÖÇø½øÐпØÖÆ¡£ÔÚ·Ö²¼Ê½ÏµÍ³ÖУ¬Í¨Ñ¶µÄ´ú¼ÛÊǾ޴óµÄ£¬¿ØÖÆÊý¾Ý·Ö²¼ÒÔ»ñµÃ×îÉÙµÄÍøÂç´«Êä¿ÉÒÔ¼«´óµØÌáÉýÕûÌåÐÔÄÜ¡£Spark³ÌÐò¿ÉÒÔͨ¹ý¿ØÖÆRDD·ÖÇø·½Ê½À´¼õÉÙͨѶµÄ¿ªÏú

2.SparkÖÐËùÓеļüÖµ¶ÔRDD¶¼¿ÉÒÔ½øÐзÖÇø¡£È·±£Í¬Ò»×éµÄ¼ü³öÏÖÔÚͬһ¸ö½ÚµãÉÏ¡£±ÈÈ磬ʹÓùþÏ£·ÖÇø½«Ò»¸öRDD·Ö³ÉÁË100¸ö·ÖÇø£¬´Ëʱ¼üµÄ¹þÏ£Öµ¶Ô100ȡģµÄ½á¹ûÏàͬµÄ¼Ç¼»á±»·ÅÔÚÒ»¸ö½ÚµãÉÏ¡£

£¨¿ÉʹÓÃpartitionBy(newHashPartitioner (100)).persist()À´¹¹Ôì100¸ö·ÖÇø)

3.SparkÖеÄÐí¶à²Ù×÷¶¼ÒýÈëÁ˽«Êý¾Ý¸ù¾Ý¼ü¿ç½çµã½øÐлìÏ´µÄ¹ý³Ì¡£(±ÈÈ磺join(),leftOuterJoin(), groupByKey(),reducebyKey()µÈ)¶ÔÓÚÏñreduceByKey()ÕâÑùÖ»×÷ÓÃÓÚµ¥¸öRDDµÄ²Ù×÷£¬ÔËÐÐÔÚδ·ÖÇøµÄRDDÉϵÄʱºò»áµ¼ÖÂÿ¸ö¼üµÄËùÓжÔÓ¦Öµ¶¼ÔÚÿ̨»úÆ÷ÉϽøÐб¾µØ¼ÆËã¡£

SparkSQLµÄshuffle¹ý³Ì

Spark SQLµÄºËÐÄÊǰÑÒÑÓеÄRDD£¬´øÉÏSchemaÐÅÏ¢£¬È»ºó×¢²á³ÉÀàËÆsqlÀïµÄ¡±Table¡±£¬¶ÔÆä½øÐÐsql²éѯ¡£ÕâÀïÃæÖ÷Òª·ÖÁ½²¿·Ö£¬Ò»ÊÇÉú³ÉSchemaRD£¬¶þÊÇÖ´Ðвéѯ¡£

Èç¹ûÊÇspark-hiveÏîÄ¿£¬ÄÇô¶ÁÈ¡metadataÐÅÏ¢×÷ΪSchema¡¢¶ÁÈ¡hdfsÉÏÊý¾ÝµÄ¹ý³Ì½»¸øHiveÍê³É£¬È»ºó¸ù¾ÝÕâÁ©²¿·ÖÉú³ÉSchemaRDD£¬ÔÚHiveContextϽøÐÐhql()²éѯ¡£

SparkSQL½á¹¹»¯Êý¾Ý

1.Ê×ÏÈ˵һÏÂApacheHive£¬Hive¿ÉÒÔÔÚHDFSÄÚ»òÕßÔÚÆäËû´æ´¢ÏµÍ³ÉÏ´æ´¢¶àÖÖ¸ñʽµÄ±í¡£SparkSQL¿ÉÒÔ¶ÁÈ¡HiveÖ§³ÖµÄÈÎºÎ±í¡£Òª°ÑSpark SQLÁ¬½ÓÒÑÓеÄhiveÉÏ£¬ÐèÒªÌṩHiveµÄÅäÖÃÎļþ¡£hive-site.xmlÎļþ¸´ÖƵ½sparkµÄconfÎļþ¼ÐÏ¡£ÔÙ´´½¨³öHiveContext¶ÔÏó(sparksqlµÄÈë¿Ú)£¬È»ºó¾Í¿ÉÒÔʹÓÃHQLÀ´¶Ô±í½øÐвéѯ£¬²¢ÒÔÓÉÐÐ×ãÖ¤µÄRDDµÄÐÎʽÄõ½·µ»ØµÄÊý¾Ý¡£

2.´´½¨Hivecontext²¢²éѯÊý¾Ý

importorg.apache .spark.sql.hive.HiveContext

valhiveCtx = new org.apache.spark.sql.hive .HiveContext(sc)

valrows = hiveCtx.sql(¡°SELECT name,age FROM users¡±)

valfitstRow ¨C rows.first()

println (fitstRow.getSgtring(0)) //×Ö¶Î0ÊÇname×Ö¶Î

3.ͨ¹ýjdbcÁ¬½ÓÍⲿÊý¾ÝÔ´¸üÐÂÓë¼ÓÔØ

Class.forName("com.mysql.jdbc.Driver")

val conn =DriverManager.getConnection(mySQLUrl)

val stat1 =conn.createStatement()

stat1.execute ("UPDATE CI_LABEL_INFO set DATA_STATUS_ID = 2 , DATA_DATE ='" + dataDate +"' where LABEL_ID in (" +allCreatedLabels.mkString (",") +")")

stat1.close() //¼ÓÔØÍⲿÊý¾ÝÔ´Êý¾Ýµ½ÄÚ´æ

valDIM_COC_INDEX _MODEL_TABLE_CONF =sqlContext.jdbc (mySQLUrl,"DIM_COC_INDEX _MODEL_TABLE _CONF").cache()

val

targets =DIM_COC_INDEX_MODEL_TABLE_CONF.filter ("TABLE_DATA_CYCLE ="+ TABLE_DATA_CYCLE).collect

SparkSQL½âÎö

Ê×ÏÈ˵Ï´«Í³Êý¾Ý¿âµÄ½âÎö£¬´«Í³Êý¾Ý¿âµÄ½âÎö¹ý³ÌÊǰ´Rusult¡¢Data Source¡¢OperationµÄ´ÎÐòÀ´½âÎöµÄ¡£´«Í³Êý¾Ý¿âÏȽ«¶ÁÈëµÄSQLÓï¾ä½øÐнâÎö£¬·Ö±æ³öSQLÓï¾äÖÐÄÄЩ´ÊÊǹؼü×Ö£¨Èçselect,from,where)£¬ÄÄЩÊDZí´ïʽ£¬ÄÄЩÊÇProjection£¬ÄÄЩÊÇData SourceµÈµÈ¡£½øÒ»²½ÅжÏSQLÓï¾äÊÇ·ñ¹æ·¶£¬²»¹æ·¶¾Í±¨´í£¬¹æ·¶Ôò°´ÕÕÏÂÒ»²½¹ý³Ì°ó¶¨£¨Bind)¡£¹ý³Ì°ó¶¨Êǽ«SQLÓï¾äºÍÊý¾Ý¿âµÄÊý¾Ý×Öµä(ÁÐ,±í,ÊÓͼµÈ£©½øÐа󶨣¬Èç¹ûÏà¹ØµÄProjection¡¢Data SourceµÈ¶¼´æÔÚ£¬¾Í±íʾÕâ¸öSQLÓï¾äÊÇ¿ÉÒÔÖ´Ðеġ£ÔÚÖ´Ðйý³ÌÖУ¬ÓÐʱºòÉõÖÁ²»ÐèÒª¶ÁÈ¡ÎïÀí±í¾Í¿ÉÒÔ·µ»Ø½á¹û£¬±ÈÈçÖØÐÂÔËÐиÕÔËÐйýµÄSQLÓï¾ä£¬Ö±½Ó´ÓÊý¾Ý¿âµÄ»º³å³ØÖлñÈ¡·µ»Ø½á¹û¡£ÔÚÊý¾Ý¿â½âÎöµÄ¹ý³ÌÖÐSQLÓï¾äʱ£¬½«»á°ÑSQLÓï¾äת»¯³ÉÒ»¸öÊ÷ÐνṹÀ´½øÐд¦Àí£¬»áÐγÉÒ»¸ö»òº¬Óжà¸ö½Úµã(TreeNode)µÄTree,È»ºóÔÙºóÐøµÄ´¦ÀíÕþ¶Ô¸ÃTree½øÐÐһϵÁеIJÙ×÷¡£

Spark SQL¶ÔSQLÓï¾äµÄ´¦ÀíºÍ¹ØÏµÊý¾Ý¿â¶ÔSQLÓï¾äµÄ½âÎö²ÉÓÃÁËÀàËÆµÄ·½·¨£¬Ê×ÏȻὫSQLÓï¾ä½øÐнâÎö£¬È»ºóÐγÉÒ»¸öTree£¬ºóÐøÈç°ó¶¨¡¢ÓÅ»¯µÈ´¦Àí¹ý³Ì¶¼ÊǶÔTreeµÄ²Ù×÷£¬¶ø²Ù×÷·½·¨ÊDzÉÓÃRule,ͨ¹ýģʽƥÅ䣬¶Ô²»Í¬ÀàÐ͵Ľڵã²ÉÓò»Í¬µÄ²Ù×÷¡£SparkSQLÓÐÁ½¸ö·ÖÖ§£¬sqlContextºÍhiveContext¡£sqlContextÏÖÔÚÖ»Ö§³ÖSQLÓï·¨½âÎöÆ÷£¨Catalyst)£¬hiveContextÖ§³ÖSQLÓï·¨ºÍHiveContextÓï·¨½âÎöÆ÷¡£

 

   
´Îä¯ÀÀ       
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ