Spark¼ò½é
SparkÊÇÕû¸öBDASµÄºËÐÄ×é¼þ£¬ÊÇÒ»¸ö´óÊý¾Ý·Ö²¼Ê½±à³Ì¿ò¼Ü£¬²»½öʵÏÖÁËMapReduceµÄËã×Ómap
º¯ÊýºÍreduceº¯Êý¼°¼ÆËãÄ£ÐÍ£¬»¹Ìṩ¸üΪ·á¸»µÄËã×Ó£¬Èçfilter¡¢join¡¢groupByKeyµÈ¡£ÊÇÒ»¸öÓÃÀ´ÊµÏÖ¿ìËÙ¶øÍ¬Óõļ¯Èº¼ÆËãµÄƽ̨¡£
Spark½«·Ö²¼Ê½Êý¾Ý³éÏóΪµ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯£¨RDD£©£¬ÊµÏÖÁËÓ¦ÓÃÈÎÎñµ÷¶È¡¢RPC¡¢ÐòÁл¯ºÍѹËõ£¬²¢ÎªÔËÐÐÔÚÆäÉϵÄÉϲã×é¼þÌṩAPI¡£Æäµ×²ã²ÉÓÃScalaÕâÖÖº¯ÊýʽÓïÑÔÊéд¶ø³É£¬²¢ÇÒËùÌṩµÄAPIÉî¶È½è¼øScalaº¯ÊýʽµÄ±à³Ì˼Ï룬ÌṩÓëScalaÀàËÆµÄ±à³Ì½Ó¿Ú
Sparkon Yarn

´ÓÓû§Ìá½»×÷Òµµ½×÷ÒµÔËÐнáÊøÕû¸öÔËÐÐÆÚ¼äµÄ¹ý³Ì·ÖÎö¡£
Ò»¡¢¿Í»§¶Ë½øÐвÙ×÷
1.¸ù¾ÝyarnConfÀ´³õʼ»¯yarnClient£¬²¢Æô¶¯yarnClient
2.´´½¨¿Í»§¶ËApplication£¬²¢»ñÈ¡ApplicationµÄID£¬½øÒ»²½Åжϼ¯ÈºÖеÄ×ÊÔ´ÊÇ·ñÂú×ãexecutorºÍApplicationMasterÉêÇëµÄ×ÊÔ´£¬Èç¹û²»Âú×ãÔòÅ׳öIllegalArgumentException£»
3. ÉèÖÃ×ÊÔ´¡¢»·¾³±äÁ¿£ºÆäÖаüÀ¨ÁËÉèÖÃApplicationµÄStagingĿ¼¡¢×¼±¸±¾µØ×ÊÔ´£¨jarÎļþ¡¢log4j.properties£©¡¢ÉèÖÃApplicationÆäÖеĻ·¾³±äÁ¿¡¢´´½¨ContainerÆô¶¯µÄContextµÈ£»
4. ÉèÖÃApplicationÌá½»µÄContext£¬°üÀ¨ÉèÖÃÓ¦ÓõÄÃû×Ö¡¢¶ÓÁС¢AMµÄÉêÇëµÄContainer¡¢±ê¼Ç¸Ã×÷ÒµµÄÀàÐÍΪSpark£»
5. ÉêÇëMemory£¬²¢×îÖÕͨ¹ýyarnClient.submitApplicationÏòResourceManagerÌá½»¸ÃApplication¡£
µ±×÷ÒµÌá½»µ½YARNÉÏÖ®ºó£¬¿Í»§¶Ë¾ÍûÊÂÁË£¬ÉõÖÁÔÚÖն˹صôÄǸö½ø³ÌҲûÊ£¬ÒòΪÕû¸ö×÷ÒµÔËÐÐÔÚYARN¼¯ÈºÉϽøÐУ¬ÔËÐеĽá¹û½«»á±£´æµ½HDFS»òÕßÈÕÖ¾ÖС£
¶þ¡¢Ìá½»µ½YARN¼¯Èº£¬YARN²Ù×÷
1.ÔËÐÐApplicationMasterµÄrun·½·¨£»
2.ÉèÖúÃÏà¹ØµÄ»·¾³±äÁ¿¡£
3.´´½¨amClient£¬²¢Æô¶¯£»
4.ÔÚSpark UIÆô¶¯Ö®Ç°ÉèÖÃSpark UIµÄAmIpFilter£»
5.ÔÚstartUserClassº¯ÊýרÃÅÆô¶¯ÁËÒ»¸öỊ̈߳¨Ãû³ÆÎªDriverµÄỊ̈߳©À´Æô¶¯Óû§Ìá½»µÄApplication£¬Ò²¾ÍÊÇÆô¶¯ÁËDriver¡£ÔÚDriverÖн«»á³õʼ»¯SparkContext£»
6.µÈ´ýSparkContext³õʼ»¯Íê³É£¬×î¶àµÈ´ýspark.yarn.applicationMaster.waitTries´ÎÊý£¨Ä¬ÈÏΪ10£©£¬Èç¹ûµÈ´ýÁ˵ĴÎÊý³¬¹ýÁËÅäÖõ쬳ÌÐò½«»áÍ˳ö£»·ñÔòÓÃSparkContext³õʼ»¯yarnAllocator£»
7.µ±SparkContext¡¢Driver³õʼ»¯Íê³ÉµÄʱºò£¬Í¨¹ýamClientÏòResourceManager×¢²áApplicationMaster
8.·ÖÅä²¢Æô¶¯Executeors¡£ÔÚÆô¶¯Executeors֮ǰ£¬ÏÈҪͨ¹ýyarnAllocator»ñÈ¡µ½numExecutors¸öContainer£¬È»ºóÔÚContainerÖÐÆô¶¯Executeors¡£
ÄÇôÕâ¸öApplication½«Ê§°Ü£¬½«Application Status±êÃ÷ΪFAILED£¬²¢½«¹Ø±ÕSparkContext¡£Æäʵ£¬Æô¶¯ExecuteorsÊÇͨ¹ýExecutorRunnableʵÏֵ쬶øExecutorRunnableÄÚ²¿ÊÇÆô¶¯CoarseGrainedExecutorBackendµÄ¡£
9.×îºó£¬Task½«ÔÚCoarseGrainedExecutorBackendÀïÃæÔËÐУ¬È»ºóÔËÐÐ×´¿ö»áͨ¹ýAkka֪ͨCoarseGrainedScheduler£¬Ö±µ½×÷ÒµÔËÐÐÍê³É¡£
Spark½ÚµãµÄ¸ÅÄî
Ò»¡¢SparkÇý¶¯Æ÷ÊÇÖ´ÐгÌÐòÖеÄmain()·½·¨µÄ½ø³Ì¡£ËüÖ´ÐÐÓû§±àдµÄÓÃÀ´´´½¨SparkContext(³õʼ»¯)¡¢´´½¨RDD£¬ÒÔ¼°ÔËÐÐRDDµÄת»¯²Ù×÷ºÍÐж¯²Ù×÷µÄ´úÂë¡£
Çý¶¯Æ÷½ÚµãdriverµÄÖ°Ôð£º
1.°ÑÓû§³ÌÐòתΪÈÎÎñtask(driver)
SparkÇý¶¯Æ÷³ÌÐò¸ºÔð°ÑÓû§³ÌÐòת»¯Îª¶à¸öÎïÀíÖ´Ðе¥Ôª£¬ÕâЩµ¥ÔªÒ²±»³ÆÖ®ÎªÈÎÎñtask(Ïê½â¼û±¸×¢)
2.ΪִÐÐÆ÷½Úµãµ÷¶ÈÈÎÎñ(executor)
ÓÐÁËÎïÀí¼Æ»®Ö®ºó£¬SparkÇý¶¯Æ÷ÔÚ¸÷¸öÖ´ÐÐÆ÷½Úµã½ø³Ì¼äе÷ÈÎÎñµÄµ÷¶È¡£SparkÇý¶¯Æ÷³ÌÐò»á¸ù¾Ýµ±Ç°µÄÖ´ÐÐÆ÷½Úµã£¬°ÑËùÓÐÈÎÎñ»ùÓÚÊý¾ÝËùÔÚλÖ÷ÖÅ䏸ºÏÊʵÄÖ´ÐÐÆ÷½ø³Ì¡£µ±Ö´ÐÐÈÎÎñʱ£¬Ö´ÐÐÆ÷½ø³Ì»á°Ñ»º´æµÄÊý¾Ý´æ´¢ÆðÀ´£¬¶øÇý¶¯Æ÷½ø³ÌͬÑù»á¸ú×ÙÕâЩ»º´æÊý¾ÝµÄλÖ㬲¢ÀûÓÃÕâЩλÖÃÐÅÏ¢À´µ÷¶ÈÒÔºóµÄÈÎÎñ£¬ÒÔ¾¡Á¿¼õÉÙÊý¾ÝµÄÍøÂç´«Êä¡££¨¾ÍÊÇËùνµÄÒÆ¶¯¼ÆË㣬¶ø²»Òƶ¯Êý¾Ý)¡£
¶þ¡¢Ö´ÐÐÆ÷½Úµã
×÷Óãº
1. ¸ºÔðÔËÐÐ×é³ÉSparkÓ¦ÓõÄÈÎÎñ£¬²¢½«½á¹û·µ»Ø¸øÇý¶¯Æ÷½ø³Ì£»
2. ͨ¹ý×ÔÉíµÄ¿é¹ÜÀíÆ÷(blockManager)ΪÓû§³ÌÐòÖÐÒªÇ󻺴æµÄRDDÌṩÄÚ´æÊ½´æ´¢¡£RDDÊÇÖ±½Ó»º´æÔÚÖ´ÐÐÆ÷½ø³ÌÄڵģ¬Òò´ËÈÎÎñ¿ÉÒÔÔÚÔËÐÐʱ³ä·ÖÀûÓûº´æÊý¾Ý¼Ó¿ìÔËËã¡£
Çý¶¯Æ÷µÄÖ°Ôð£º
ËùÓеÄSpark³ÌÐò¶¼×ñÑͬÑùµÄ½á¹¹£º³ÌÐò´ÓÊäÈëÊý¾Ý´´½¨Ò»ÏµÁÐRDD£¬ÔÙʹÓÃת»¯²Ù×÷ÅÉÉú³ÉеÄRDD£¬×îºóʹÓÃÐж¯²Ù×÷ÊÖ»ú»ò´æ´¢½á¹ûRDD£¬Spark³ÌÐòÆäʵÊÇÒþʽµØ´´½¨³öÁËÒ»¸öÓɲÙ×÷×é³ÉµÄÂß¼ÉϵÄÓÐÏòÎÞ»·Í¼DAG¡£µ±Çý¶¯Æ÷³ÌÐòÖ´ÐÐʱ£¬Ëü»á°ÑÕâ¸öÂ߼ͼתΪÎïÀíÖ´Ðмƻ®¡£
ÕâÑù Spark¾Í°ÑÂß¼¼Æ»®×ªÎªÒ»ÏµÁв½Öè(stage)£¬¶øÃ¿¸ö²½ÖèÓÖÓɶà¸öÈÎÎñ×é³É¡£ÕâЩÈÎÎñ»á±»´ò°üË͵½¼¯ÈºÖС£
Spark³õʼ»¯
1.ÿ¸öSparkÓ¦Óö¼ÓÉÒ»¸öÇý¶¯Æ÷³ÌÐòÀ´·¢Æð¼¯ÈºÉϵĸ÷ÖÖ²¢ÐвÙ×÷¡£Çý¶¯Æ÷³ÌÐò°üº¬Ó¦ÓõÄmainº¯Êý£¬²¢ÇÒ¶¨ÒåÁ˼¯ÈºÉϵķֲ¼Ê½Êý¾Ý¼¯£¬ÒÔ¼°¶Ô¸Ã·Ö²¼Ê½Êý¾Ý¼¯Ó¦ÓÃÁËÏà¹Ø²Ù×÷¡£
2. Çý¶¯Æ÷³ÌÐòͨ¹ýÒ»¸öSparkContext¶ÔÏóÀ´·ÃÎÊspark,Õâ¸ö¶ÔÏó´ú±í¶Ô¼ÆË㼯ȺµÄÒ»¸öÁ¬½Ó¡££¨±ÈÈçÔÚsparkshellÆô¶¯Ê±ÒѾ×Ô¶¯´´½¨ÁËÒ»¸öSparkContext¶ÔÏó£¬ÊÇÒ»¸ö½Ð×öSCµÄ±äÁ¿¡£(ÏÂͼ£¬²é¿´±äÁ¿sc)

3.Ò»µ©´´½¨ÁËsparkContext£¬¾Í¿ÉÒÔÓÃËüÀ´´´½¨RDD¡£±ÈÈçµ÷ÓÃsc.textFile()À´´´½¨Ò»¸ö´ú±íÎı¾Öи÷ÐÐÎı¾µÄRDD¡££¨±ÈÈçvallinesRDD
= sc.textFile(¡°yangsy.text¡±),val spark = linesRDD.filter
(line=>line.contains (¡°spark¡±),spark.count()£©
Ö´ÐÐÕâЩ²Ù×÷£¬Çý¶¯Æ÷³ÌÐòÒ»°ãÒª¹ÜÀí¶à¸öÖ´ÐÐÆ÷,¾ÍÊÇÎÒÃÇËù˵µÄexecutor½Úµã¡£
4. ÔÚ³õʼ»¯SparkContextµÄͬʱ£¬¼ÓÔØsparkConf¶ÔÏóÀ´¼ÓÔØ¼¯ÈºµÄÅäÖ㬴Ӷø´´½¨sparkContext¶ÔÏó¡£
´ÓÔ´ÂëÖпÉÒÔ¿´µ½£¬ÔÚÆô¶¯thriftserverʱ£¬µ÷ÓÃÁËspark-
daemon.shÎļþ£¬¸ÃÎļþÔ´ÂëÈç×óͼ£¬¼ÓÔØspark_homeϵÄconfÖеÄÎļþ¡£

£¨ÔÚÖ´Ðкǫ́´úÂëʱ£¬ÐèÒªÊ×ÏÈ´´½¨conf¶ÔÏ󣬼ÓÔØÏàÓ¦²ÎÊý£¬ val
sparkConf = newSparkConf().setMaster("local") .setAppName("cocapp") .set("spark.executor.memory", "1g"),
val sc: SparkContext = new SparkContext(sparkConf))
RDD¹¤×÷ÔÀí£º
RDD(Resilient DistributedDatasets)[1]
,µ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯£¬ÊÇ·Ö²¼Ê½ÄÚ´æµÄÒ»¸ö³éÏó¸ÅÄRDDÌṩÁËÒ»Öָ߶ÈÊÜÏ޵Ĺ²ÏíÄÚ´æÄ£ÐÍ£¬¼´RDDÊÇÖ»¶ÁµÄ¼Ç¼·ÖÇøµÄ¼¯ºÏ£¬Ö»ÄÜͨ¹ýÔÚÆäËûRDDÖ´ÐÐÈ·¶¨µÄת»»²Ù×÷£¨Èçmap¡¢joinºÍgroup
by£©¶ø´´½¨£¬È»¶øÕâЩÏÞÖÆÊ¹µÃʵÏÖÈÝ´íµÄ¿ªÏúºÜµÍ¡£¶Ô¿ª·¢Õß¶øÑÔ£¬RDD¿ÉÒÔ¿´×÷ÊÇSparkµÄÒ»¸ö¶ÔÏó£¬Ëü±¾ÉíÔËÐÐÓÚÄÚ´æÖУ¬Èç¶ÁÎļþÊÇÒ»¸öRDD£¬¶ÔÎļþ¼ÆËãÊÇÒ»¸öRDD£¬½á¹û¼¯Ò²ÊÇÒ»¸öRDD
£¬²»Í¬µÄ·ÖƬ¡¢Êý¾ÝÖ®¼äµÄÒÀÀµ¡¢key-valueÀàÐ͵ÄmapÊý¾Ý¶¼¿ÉÒÔ¿´×öRDD¡£
Ö÷Òª·ÖΪÈý²¿·Ö£º´´½¨RDD¶ÔÏó£¬DAGµ÷¶ÈÆ÷´´½¨Ö´Ðмƻ®£¬Taskµ÷¶ÈÆ÷·ÖÅäÈÎÎñ²¢µ÷¶ÈWorker¿ªÊ¼ÔËÐС£
SparkContext(RDDÏà¹Ø²Ù×÷)¡úͨ¹ý(Ìá½»×÷Òµ)¡ú(±éÀúRDD²ð·Östage¡úÉú³É×÷Òµ)DAGScheduler¡úͨ¹ý£¨Ìá½»ÈÎÎñ¼¯£©¡úÈÎÎñµ÷¶È¹ÜÀí(TaskScheduler)¡úͨ¹ý£¨°´ÕÕ×ÊÔ´»ñÈ¡ÈÎÎñ)¡úÈÎÎñµ÷¶È¹ÜÀí(TaskSetManager)
Transformation·µ»ØÖµ»¹ÊÇÒ»¸öRDD¡£ËüʹÓÃÁËÁ´Ê½µ÷ÓõÄÉè¼ÆÄ£Ê½£¬¶ÔÒ»¸öRDD½øÐмÆËãºó£¬±ä»»³ÉÁíÍâÒ»¸öRDD£¬È»ºóÕâ¸öRDDÓÖ¿ÉÒÔ½øÐÐÁíÍâÒ»´Îת»»¡£Õâ¸ö¹ý³ÌÊÇ·Ö²¼Ê½µÄ¡£
Action·µ»ØÖµ²»ÊÇÒ»¸öRDD¡£ËüҪôÊÇÒ»¸öScalaµÄÆÕͨ¼¯ºÏ£¬ÒªÃ´ÊÇÒ»¸öÖµ£¬ÒªÃ´Êǿգ¬×îÖÕ»ò·µ»Øµ½Driver³ÌÐò£¬»ò°ÑRDDдÈëµ½ÎļþϵͳÖÐ
ת»»(Transformations)(È磺map, filter,
groupBy, joinµÈ)£¬Transformations²Ù×÷ÊÇLazyµÄ£¬Ò²¾ÍÊÇ˵´ÓÒ»¸öRDDת»»Éú³ÉÁíÒ»¸öRDDµÄ²Ù×÷²»ÊÇÂíÉÏÖ´ÐУ¬SparkÔÚÓöµ½Transformations²Ù×÷ʱֻ»á¼Ç¼ÐèÒªÕâÑùµÄ²Ù×÷£¬²¢²»»áÈ¥Ö´ÐУ¬ÐèÒªµÈµ½ÓÐActions²Ù×÷µÄʱºò²Å»áÕæÕýÆô¶¯¼ÆËã¹ý³Ì½øÐмÆËã¡£
²Ù×÷(Actions)(È磺count, collect, saveµÈ)£¬Actions²Ù×÷»á·µ»Ø½á¹û»ò°ÑRDDÊý¾Ýдµ½´æ´¢ÏµÍ³ÖС£ActionsÊÇ´¥·¢SparkÆô¶¯¼ÆËãµÄ¶¯Òò¡£
ËüÃDZ¾ÖÊÇø±ðÊÇ£ºTransformation·µ»ØÖµ»¹ÊÇÒ»¸öRDD¡£ËüʹÓÃÁËÁ´Ê½µ÷ÓõÄÉè¼ÆÄ£Ê½£¬¶ÔÒ»¸öRDD½øÐмÆËãºó£¬±ä»»³ÉÁíÍâÒ»¸öRDD£¬È»ºóÕâ¸öRDDÓÖ¿ÉÒÔ½øÐÐÁíÍâÒ»´Îת»»¡£Õâ¸ö¹ý³ÌÊÇ·Ö²¼Ê½µÄ¡£Action·µ»ØÖµ²»ÊÇÒ»¸öRDD¡£ËüҪôÊÇÒ»¸öScalaµÄÆÕͨ¼¯ºÏ£¬ÒªÃ´ÊÇÒ»¸öÖµ£¬ÒªÃ´Êǿգ¬×îÖÕ»ò·µ»Øµ½Driver³ÌÐò£¬»ò°ÑRDDдÈëµ½ÎļþϵͳÖС£¹ØÓÚÕâÁ½¸ö¶¯×÷£¬ÔÚSpark¿ª·¢Ö¸ÄÏÖлáÓоͽøÒ»²½µÄÏêϸ½éÉÜ£¬ËüÃÇÊÇ»ùÓÚSpark¿ª·¢µÄºËÐÄ¡£
RDD»ù´¡
1.SparkÖеÄRDD¾ÍÊÇÒ»¸ö²»¿É±äµÄ·Ö²¼Ê½¶ÔÏ󼯺ϡ£Ã¿¸öRDD¶¼±»·ÖΪ¶à¸ö·ÖÇø£¬ÕâЩ·ÖÇøÔËÐÐÔÚ¼¯ÈºµÄ²»Í¬½ÚµãÉÏ¡£´´½¨RDDµÄ·½·¨ÓÐÁ½ÖÖ£ºÒ»ÖÖÊǶÁȡһ¸öÍⲿÊý¾Ý¼¯£»Ò»ÖÖÊÇÔÚȺ¶«³ÌÐòÀï·Ö·¢Çý¶¯Æ÷³ÌÐòÖеĶÔÏ󼯺ϣ¬²»Èç¸Õ²ÅµÄʾÀý£¬¶ÁÈ¡Îı¾Îļþ×÷Ϊһ¸ö×Ö·û´®µÄRDDµÄʾÀý¡£
2.´´½¨³öÀ´ºó£¬RDDÖ§³ÖÁ½ÖÖÀàÐ͵IJÙ×÷:ת»¯²Ù×÷ºÍÐж¯²Ù×÷
ת»¯²Ù×÷»áÓÉÒ»¸öRDDÉú³ÉÒ»¸öеÄRDD¡££¨±ÈÈç¸Õ²ÅµÄ¸ù¾Ýν´Êɸѡ£©
Ðж¯²Ù×÷»á¶ÔRDD¼ÆËã³öÒ»¸ö½á¹û£¬²¢°Ñ½á¹û·µ»Øµ½Çý¶¯Æ÷³ÌÐòÖУ¬»ò°Ñ½á¹û´æ´¢µ½Íⲿ´æ´¢ÏµÍ³£¨±ÈÈçHDFS£©ÖС£±ÈÈçfirst()²Ù×÷¾ÍÊÇÒ»¸öÐж¯²Ù×÷£¬»á·µ»ØRDDµÄµÚÒ»¸öÔªËØ¡£
×¢£º×ª»¯²Ù×÷ÓëÐж¯²Ù×÷µÄÇø±ðÔÚÓÚSpark¼ÆËãRDDµÄ·½Ê½²»Í¬¡£ËäÈ»Äã¿ÉÒÔÔÚÈκÎʱºò¶¨ÒåÒ»¸öеÄRDD£¬µ«SparkÖ»»á¶èÐÔ¼ÆËãÕâЩRDD¡£ËüÃÇÖ»ÓеÚÒ»¸öÔÚÒ»¸öÐж¯²Ù×÷ÖÐÓõ½Ê±£¬²Å»áÕæÕýµÄ¼ÆËã¡£Ö®ËùÒÔÕâÑùÉè¼Æ£¬ÊÇÒòΪ±ÈÈç¸Õ²Åµ÷ÓÃsc.textFile(...)ʱ¾Í°ÑÎļþÖеÄËùÓÐÐж¼¶ÁÈ¡²¢´æ´¢ÆðÀ´£¬¾Í»áÏûºÄºÜ¶à´æ´¢¿Õ¼ä£¬¶øÎÒÃÇÂíÉÏÓÖҪɸѡµôÆäÖеĺܶàÊý¾Ý¡£
ÕâÀﻹÐèҪעÒâµÄÒ»µãÊÇ£¬spark»áÔÚÄãÿ´Î¶ÔËüÃǽøÐÐÐж¯²Ù×÷Ê±ÖØÐ¼ÆËã¡£Èç¹ûÏëÔÚ¶à¸öÐж¯²Ù×÷ÖÐÖØÓÃͬһ¸öRDD£¬ÄÇô¿ÉÒÔʹÓÃRDD.persist()»òRDD.collect()ÈÃSpark°ÑÕâ¸öRDD»º´æÏÂÀ´¡££¨¿ÉÒÔÊÇÄڴ棬Ҳ¿ÉÒÔÊÇ´ÅÅÌ)
3.Spark»áʹÓÃÆ×ϵͼÀ´¼Ç¼ÕâЩ²»Í¬RDDÖ®¼äµÄÒÀÀµ¹ØÏµ£¬SparkÐèÒªÓÃÕâЩÐÅÏ¢À´°´Ðè¼ÆËãÿ¸öRDD£¬Ò²¿ÉÒÔÒÀ¿¿Æ×ϵͼÔڳ־û¯µÄRDD¶ªÊ§²¿·ÖÊý¾ÝʱÓÃÀ´»Ö¸´Ëù¶ªÊ§µÄÊý¾Ý¡£(ÈçÏÂͼ£¬¹ýÂËerrorsRDDÓëwarningsRDD,×îÖÕµ÷ÓÃunion()º¯Êý)

RDD¼ÆË㷽ʽ

RDDµÄ¿íÕÒÀÀµ

ÕÒÀÀµ (narrowdependencies) ºÍ¿íÒÀÀµ (widedependencies)
¡£ÕÒÀÀµÊÇÖ¸ ¸¸ RDD µÄÿ¸ö·ÖÇø¶¼Ö»±»×Ó RDD µÄÒ»¸ö·ÖÇøËùʹÓà ¡£ÏàÓ¦µÄ£¬ÄÇô¿íÒÀÀµ¾ÍÊÇÖ¸¸¸
RDD µÄ·ÖÇø±»¶à¸ö×Ó RDD µÄ·ÖÇøËùÒÀÀµ¡£ÀýÈ磬 map ¾ÍÊÇÒ»ÖÖÕÒÀÀµ£¬¶ø join Ôò»áµ¼Ö¿íÒÀÀµ
ÕâÖÖ»®·ÖÓÐÁ½¸öÓô¦¡£Ê×ÏÈ£¬ÕÒÀÀµÖ§³ÖÔÚÒ»¸ö½áµãÉϹܵÀ»¯Ö´ÐС£ÀýÈç»ùÓÚÒ»¶ÔÒ»µÄ¹ØÏµ£¬¿ÉÒÔÔÚ
filter Ö®ºóÖ´ÐÐ map ¡£Æä´Î£¬ÕÒÀÀµÖ§³Ö¸ü¸ßЧµÄ¹ÊÕÏ»¹Ô¡£ÒòΪ¶ÔÓÚÕÒÀÀµ£¬Ö»ÓжªÊ§µÄ¸¸ RDD
µÄ·ÖÇøÐèÒªÖØÐ¼ÆËã¡£¶ø¶ÔÓÚ¿íÒÀÀµ£¬Ò»¸ö½áµãµÄ¹ÊÕÏ¿ÉÄܵ¼ÖÂÀ´×ÔËùÓи¸ RDD µÄ·ÖÇø¶ªÊ§£¬Òò´Ë¾ÍÐèÒªÍêÈ«ÖØÐÂÖ´ÐС£Òò´Ë¶ÔÓÚ¿íÒÀÀµ£¬Spark
»áÔÚ³ÖÓи÷¸ö¸¸·ÖÇøµÄ½áµãÉÏ£¬½«ÖмäÊý¾Ý³Ö¾Ã»¯À´¼ò»¯¹ÊÕÏ»¹Ô£¬¾ÍÏñ MapReduce »á³Ö¾Ã»¯ map
µÄÊä³öÒ»Ñù¡£
SparkExample

²½Öè 1 £º´´½¨ RDD
¡£ÉÏÃæµÄÀý×Ó³ýÈ¥×îºóÒ»¸ö collect ÊǸö¶¯×÷£¬²»»á´´½¨ RDD Ö®Íâ£¬Ç°ÃæËĸöת»»¶¼»á´´½¨³öеÄ
RDD ¡£Òò´ËµÚÒ»²½¾ÍÊÇ´´½¨ºÃËùÓÐ RDD( ÄÚ²¿µÄÎåÏîÐÅÏ¢ ) ¡£
²½Öè 2 £º´´½¨Ö´Ðмƻ®¡£Spark
»á¾¡¿ÉÄܵعܵÀ»¯£¬²¢»ùÓÚÊÇ·ñÒªÖØÐÂ×éÖ¯Êý¾ÝÀ´»®·Ö ½×¶Î (stage) £¬ÀýÈç±¾ÀýÖÐµÄ groupBy()
ת»»¾Í»á½«Õû¸öÖ´Ðмƻ®»®·Ö³ÉÁ½½×¶ÎÖ´ÐС£×îÖÕ»á²úÉúÒ»¸ö DAG(directedacyclic graph
£¬ÓÐÏòÎÞ»·Í¼ ) ×÷ΪÂß¼Ö´Ðмƻ®¡£
²½Öè 3 £ºµ÷¶ÈÈÎÎñ¡£
½«¸÷½×¶Î»®·Ö³É²»Í¬µÄ ÈÎÎñ (task) £¬Ã¿¸öÈÎÎñ¶¼ÊÇÊý¾ÝºÍ¼ÆËãµÄºÏÌå¡£ÔÚ½øÐÐÏÂÒ»½×¶Îǰ£¬µ±Ç°½×¶ÎµÄËùÓÐÈÎÎñ¶¼ÒªÖ´ÐÐÍê³É¡£ÒòΪÏÂÒ»½×¶ÎµÄµÚÒ»¸öת»»Ò»¶¨ÊÇÖØÐÂ×éÖ¯Êý¾ÝµÄ£¬ËùÒÔ±ØÐëµÈµ±Ç°½×¶ÎËùÓнá¹ûÊý¾Ý¶¼¼ÆËã³öÀ´Á˲ÅÄܼÌÐø¡£
¼ÙÉè±¾ÀýÖÐµÄ hdfs://names ÏÂÓÐËĸöÎļþ¿é£¬ÄÇô HadoopRDD
ÖÐ partitions ¾Í»áÓÐËĸö·ÖÇø¶ÔÓ¦ÕâËĸö¿éÊý¾Ý£¬Í¬Ê± preferedLocations
»áÖ¸Ã÷ÕâËĸö¿éµÄ×î¼ÑλÖá£ÏÖÔÚ£¬¾Í¿ÉÒÔ´´½¨³öËĸöÈÎÎñ£¬²¢µ÷¶Èµ½ºÏÊʵļ¯Èº½áµãÉÏ¡£
SparkÊý¾Ý·ÖÇø
1.SparkµÄÌØÐÔÊǶÔÊý¾Ý¼¯ÔÚ½Úµã¼äµÄ·ÖÇø½øÐпØÖÆ¡£ÔÚ·Ö²¼Ê½ÏµÍ³ÖУ¬Í¨Ñ¶µÄ´ú¼ÛÊǾ޴óµÄ£¬¿ØÖÆÊý¾Ý·Ö²¼ÒÔ»ñµÃ×îÉÙµÄÍøÂç´«Êä¿ÉÒÔ¼«´óµØÌáÉýÕûÌåÐÔÄÜ¡£Spark³ÌÐò¿ÉÒÔͨ¹ý¿ØÖÆRDD·ÖÇø·½Ê½À´¼õÉÙͨѶµÄ¿ªÏú
2.SparkÖÐËùÓеļüÖµ¶ÔRDD¶¼¿ÉÒÔ½øÐзÖÇø¡£È·±£Í¬Ò»×éµÄ¼ü³öÏÖÔÚͬһ¸ö½ÚµãÉÏ¡£±ÈÈ磬ʹÓùþÏ£·ÖÇø½«Ò»¸öRDD·Ö³ÉÁË100¸ö·ÖÇø£¬´Ëʱ¼üµÄ¹þÏ£Öµ¶Ô100ȡģµÄ½á¹ûÏàͬµÄ¼Ç¼»á±»·ÅÔÚÒ»¸ö½ÚµãÉÏ¡£
£¨¿ÉʹÓÃpartitionBy(newHashPartitioner
(100)).persist()À´¹¹Ôì100¸ö·ÖÇø)
3.SparkÖеÄÐí¶à²Ù×÷¶¼ÒýÈëÁ˽«Êý¾Ý¸ù¾Ý¼ü¿ç½çµã½øÐлìÏ´µÄ¹ý³Ì¡£(±ÈÈ磺join(),leftOuterJoin(),
groupByKey(),reducebyKey()µÈ)¶ÔÓÚÏñreduceByKey()ÕâÑùÖ»×÷ÓÃÓÚµ¥¸öRDDµÄ²Ù×÷£¬ÔËÐÐÔÚδ·ÖÇøµÄRDDÉϵÄʱºò»áµ¼ÖÂÿ¸ö¼üµÄËùÓжÔÓ¦Öµ¶¼ÔÚÿ̨»úÆ÷ÉϽøÐб¾µØ¼ÆËã¡£
SparkSQLµÄshuffle¹ý³Ì

Spark SQLµÄºËÐÄÊǰÑÒÑÓеÄRDD£¬´øÉÏSchemaÐÅÏ¢£¬È»ºó×¢²á³ÉÀàËÆsqlÀïµÄ¡±Table¡±£¬¶ÔÆä½øÐÐsql²éѯ¡£ÕâÀïÃæÖ÷Òª·ÖÁ½²¿·Ö£¬Ò»ÊÇÉú³ÉSchemaRD£¬¶þÊÇÖ´Ðвéѯ¡£
Èç¹ûÊÇspark-hiveÏîÄ¿£¬ÄÇô¶ÁÈ¡metadataÐÅÏ¢×÷ΪSchema¡¢¶ÁÈ¡hdfsÉÏÊý¾ÝµÄ¹ý³Ì½»¸øHiveÍê³É£¬È»ºó¸ù¾ÝÕâÁ©²¿·ÖÉú³ÉSchemaRDD£¬ÔÚHiveContextϽøÐÐhql()²éѯ¡£
SparkSQL½á¹¹»¯Êý¾Ý
1.Ê×ÏÈ˵һÏÂApacheHive£¬Hive¿ÉÒÔÔÚHDFSÄÚ»òÕßÔÚÆäËû´æ´¢ÏµÍ³ÉÏ´æ´¢¶àÖÖ¸ñʽµÄ±í¡£SparkSQL¿ÉÒÔ¶ÁÈ¡HiveÖ§³ÖµÄÈÎºÎ±í¡£Òª°ÑSpark
SQLÁ¬½ÓÒÑÓеÄhiveÉÏ£¬ÐèÒªÌṩHiveµÄÅäÖÃÎļþ¡£hive-site.xmlÎļþ¸´ÖƵ½sparkµÄconfÎļþ¼ÐÏ¡£ÔÙ´´½¨³öHiveContext¶ÔÏó(sparksqlµÄÈë¿Ú)£¬È»ºó¾Í¿ÉÒÔʹÓÃHQLÀ´¶Ô±í½øÐвéѯ£¬²¢ÒÔÓÉÐÐ×ãÖ¤µÄRDDµÄÐÎʽÄõ½·µ»ØµÄÊý¾Ý¡£
2.´´½¨Hivecontext²¢²éѯÊý¾Ý
importorg.apache .spark.sql.hive.HiveContext
valhiveCtx = new org.apache.spark.sql.hive .HiveContext(sc)
valrows = hiveCtx.sql(¡°SELECT name,age
FROM users¡±)
valfitstRow ¨C rows.first()
println (fitstRow.getSgtring(0)) //×Ö¶Î0ÊÇname×Ö¶Î
3.ͨ¹ýjdbcÁ¬½ÓÍⲿÊý¾ÝÔ´¸üÐÂÓë¼ÓÔØ
Class.forName("com.mysql.jdbc.Driver")
val conn =DriverManager.getConnection(mySQLUrl)
val stat1 =conn.createStatement()
stat1.execute ("UPDATE CI_LABEL_INFO
set DATA_STATUS_ID = 2 , DATA_DATE ='" + dataDate
+"' where LABEL_ID in (" +allCreatedLabels.mkString (",") +")")
stat1.close() //¼ÓÔØÍⲿÊý¾ÝÔ´Êý¾Ýµ½ÄÚ´æ
valDIM_COC_INDEX _MODEL_TABLE_CONF
=sqlContext.jdbc (mySQLUrl,"DIM_COC_INDEX _MODEL_TABLE _CONF").cache()
val
targets =DIM_COC_INDEX_MODEL_TABLE_CONF.filter ("TABLE_DATA_CYCLE
="+ TABLE_DATA_CYCLE).collect
SparkSQL½âÎö

Ê×ÏÈ˵Ï´«Í³Êý¾Ý¿âµÄ½âÎö£¬´«Í³Êý¾Ý¿âµÄ½âÎö¹ý³ÌÊǰ´Rusult¡¢Data
Source¡¢OperationµÄ´ÎÐòÀ´½âÎöµÄ¡£´«Í³Êý¾Ý¿âÏȽ«¶ÁÈëµÄSQLÓï¾ä½øÐнâÎö£¬·Ö±æ³öSQLÓï¾äÖÐÄÄЩ´ÊÊǹؼü×Ö£¨Èçselect,from,where)£¬ÄÄЩÊDZí´ïʽ£¬ÄÄЩÊÇProjection£¬ÄÄЩÊÇData
SourceµÈµÈ¡£½øÒ»²½ÅжÏSQLÓï¾äÊÇ·ñ¹æ·¶£¬²»¹æ·¶¾Í±¨´í£¬¹æ·¶Ôò°´ÕÕÏÂÒ»²½¹ý³Ì°ó¶¨£¨Bind)¡£¹ý³Ì°ó¶¨Êǽ«SQLÓï¾äºÍÊý¾Ý¿âµÄÊý¾Ý×Öµä(ÁÐ,±í,ÊÓͼµÈ£©½øÐа󶨣¬Èç¹ûÏà¹ØµÄProjection¡¢Data
SourceµÈ¶¼´æÔÚ£¬¾Í±íʾÕâ¸öSQLÓï¾äÊÇ¿ÉÒÔÖ´Ðеġ£ÔÚÖ´Ðйý³ÌÖУ¬ÓÐʱºòÉõÖÁ²»ÐèÒª¶ÁÈ¡ÎïÀí±í¾Í¿ÉÒÔ·µ»Ø½á¹û£¬±ÈÈçÖØÐÂÔËÐиÕÔËÐйýµÄSQLÓï¾ä£¬Ö±½Ó´ÓÊý¾Ý¿âµÄ»º³å³ØÖлñÈ¡·µ»Ø½á¹û¡£ÔÚÊý¾Ý¿â½âÎöµÄ¹ý³ÌÖÐSQLÓï¾äʱ£¬½«»á°ÑSQLÓï¾äת»¯³ÉÒ»¸öÊ÷ÐνṹÀ´½øÐд¦Àí£¬»áÐγÉÒ»¸ö»òº¬Óжà¸ö½Úµã(TreeNode)µÄTree,È»ºóÔÙºóÐøµÄ´¦ÀíÕþ¶Ô¸ÃTree½øÐÐһϵÁеIJÙ×÷¡£
Spark SQL¶ÔSQLÓï¾äµÄ´¦ÀíºÍ¹ØÏµÊý¾Ý¿â¶ÔSQLÓï¾äµÄ½âÎö²ÉÓÃÁËÀàËÆµÄ·½·¨£¬Ê×ÏȻὫSQLÓï¾ä½øÐнâÎö£¬È»ºóÐγÉÒ»¸öTree£¬ºóÐøÈç°ó¶¨¡¢ÓÅ»¯µÈ´¦Àí¹ý³Ì¶¼ÊǶÔTreeµÄ²Ù×÷£¬¶ø²Ù×÷·½·¨ÊDzÉÓÃRule,ͨ¹ýģʽƥÅ䣬¶Ô²»Í¬ÀàÐ͵Ľڵã²ÉÓò»Í¬µÄ²Ù×÷¡£SparkSQLÓÐÁ½¸ö·ÖÖ§£¬sqlContextºÍhiveContext¡£sqlContextÏÖÔÚÖ»Ö§³ÖSQLÓï·¨½âÎöÆ÷£¨Catalyst)£¬hiveContextÖ§³ÖSQLÓï·¨ºÍHiveContextÓï·¨½âÎöÆ÷¡£
|