Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
spark×÷Òµµ÷ÓÅÃØ¼®£¬½âÊý¾ÝÇãб֮ʹ
 
 À´Ô´£º´ó½²Ì¨ ·¢²¼ÓÚ£º 2016-11-15
  1872  次浏览      29
 

µ÷ÓŸÅÊö

ÓеÄʱºò£¬ÎÒÃÇ¿ÉÄÜ»áÓöµ½´óÊý¾Ý¼ÆËãÖÐÒ»¸ö×ÊÖµÄÎÊÌ⡪¡ªÊý¾ÝÇãб£¬´ËʱSpark×÷ÒµµÄÐÔÄÜ»á±ÈÆÚÍû²îºÜ¶à¡£Êý¾ÝÇãбµ÷ÓÅ£¬¾ÍÊÇʹÓø÷ÖÖ¼¼Êõ·½°¸½â¾ö²»Í¬ÀàÐ͵ÄÊý¾ÝÇãбÎÊÌ⣬ÒÔ±£Ö¤Spark×÷ÒµµÄÐÔÄÜ¡£

Êý¾ÝÇãбÊǶàôʹ£¿£¡£¡£¡

Èç¹ûÊý¾ÝÇãбûÓнâ¾ö£¬ÍêȫûÓпÉÄܽøÐÐÐÔÄܵ÷ÓÅ£¬ÆäËûËùÓеĵ÷ÓÅÊֶζ¼ÊÇÒ»¸öЦ»°¡£Êý¾ÝÇãбÊÇ×îÄÜÌåÏÖÒ»¸öspark´óÊý¾Ý¹¤³ÌʦˮƽµÄÐÔÄܵ÷ÓÅÎÊÌâ¡£

Êý¾ÝÇãбÈç¹ûÄܹ»½â¾öµÄ»°£¬´ú±í¶ÔsparkÔËÐлúÖÆÁËÈçÖ¸ÕÆ¡£

Êý¾ÝÇãбÁ©´óÖ±½ÓÖÂÃüºó¹û¡£

1 Êý¾ÝÇãбֱ½Ó»áµ¼ÖÂÒ»ÖÖÇé¿ö£ºOOM¡£

2 ÔËÐÐËÙ¶ÈÂý,ÌØ±ðÂý£¬·Ç³£Âý£¬¼«¶ËµÄÂý£¬²»¿É½ÓÊܵÄÂý¡£

ÎÒÃÇÒÔ100ÒÚÌõÊý¾ÝΪÁÐ×Ó¡£

¸ö±ðTask(80ÒÚÌõÊý¾ÝµÄÄǸöTask)´¦Àí¹ý¶È´óÁ¿Êý¾Ý¡£µ¼ÖÂÍÏÂýÁËÕû¸öJobµÄÖ´ÐÐʱ¼ä¡£Õâ¿ÉÄܵ¼Ö¸ÃTaskËùÔڵĻúÆ÷OOM,»òÕßÔËÐÐËٶȷdz£Âý¡£

Êý¾ÝÇãбµÄÔ­Òò£º

ÔÚShuffle½×¶Î¡£Í¬ÑùKeyµÄÊý¾ÝÌõÊýÌ«¶àÁË¡£µ¼ÖÂÁËij¸ökey(ÉÏͼÖеÄ80ÒÚÌõ)ËùÔÚµÄTaskÊý¾ÝÁ¿Ì«´óÁË¡£Ô¶Ô¶³¬¹ýÆäËûTaskËù´¦ÀíµÄÊý¾ÝÁ¿¡£

¶øÕâÑùµÄ³¡¾°Ì«³£¼ûÁË¡£¶þ°Ë¶¨ÂÉ¿ÉÒÔ֤ʵÕâÖÖ³¡¾°¡£

¸ã¶¨Êý¾ÝÇãбÐèÒª£º

1 ¸ã¶¨shuffle

2 ¸ã¶¨ÒµÎñ³¡¾°

3 ¸ã¶¨ cpu coreµÄʹÓÃÇé¿ö

4 ¸ã¶¨OOMµÄ¸ù±¾Ô­ÒòµÈ¡£

ËùÒԸ㶨ÁËÊý¾ÝÇãбÐèÒª¶ÔÖÁÉÙÒÔÉϵÄÔ­ÀíÁËÈçÖ¸ÕÆ¡£ËùÒԸ㶨Êý¾ÝÇãбÊǹؼüÖеĹؼü¡£

¸æËß´ó¼ÒÒ»¸öÂÅÊÔ²»Ë¬µÄ¾­Ñé½áÂÛ£ºÒ»°ãÇé¿öÏ£¬OOMµÄÔ­Òò¶¼ÊÇÊý¾ÝÇãб¡£Ä³¸ötaskÈÎÎñÊý¾ÝÁ¿Ì«´ó£¬GCµÄѹÁ¦¾ÍºÜ´ó¡£Õâ±È²»ÁËKafka,ÒòΪkafkaµÄÄÚ´æÊDz»¾­¹ýJVMµÄ¡£ÊÇ»ùÓÚLinuxÄں˵ÄPage.

Êý¾ÝÇãб·¢ÉúµÄÔ­Àí

Êý¾ÝÇãбµÄÔ­ÀíºÜ¼òµ¥£ºÔÚ½øÐÐshuffleµÄʱºò£¬±ØÐ뽫¸÷¸ö½ÚµãÉÏÏàͬµÄkeyÀ­È¡µ½Ä³¸ö½ÚµãÉϵÄÒ»¸ötaskÀ´½øÐд¦Àí£¬±ÈÈç°´ÕÕkey½øÐоۺϻòjoinµÈ²Ù×÷¡£´ËʱÈç¹ûij¸ökey¶ÔÓ¦µÄÊý¾ÝÁ¿Ìرð´óµÄ»°£¬¾Í»á·¢ÉúÊý¾ÝÇãб¡£±ÈÈç´ó²¿·Ökey¶ÔÓ¦10ÌõÊý¾Ý£¬µ«ÊǸö±ðkeyÈ´¶ÔÓ¦ÁË100ÍòÌõÊý¾Ý£¬ÄÇô´ó²¿·Ötask¿ÉÄܾÍÖ»»á·ÖÅäµ½10ÌõÊý¾Ý£¬È»ºó1ÃëÖÓ¾ÍÔËÐÐÍêÁË£»µ«ÊǸö±ðtask¿ÉÄÜ·ÖÅäµ½ÁË100ÍòÊý¾Ý£¬ÒªÔËÐÐÒ»Á½¸öСʱ¡£Òò´Ë£¬Õû¸öSpark×÷ÒµµÄÔËÐнø¶ÈÊÇÓÉÔËÐÐʱ¼ä×µÄÄǸötask¾ö¶¨µÄ¡£

Òò´Ë³öÏÖÊý¾ÝÇãбµÄʱºò£¬Spark×÷Òµ¿´ÆðÀ´»áÔËÐе÷dz£»ºÂý£¬ÉõÖÁ¿ÉÄÜÒòΪij¸ötask´¦ÀíµÄÊý¾ÝÁ¿¹ý´óµ¼ÖÂÄÚ´æÒç³ö¡£

ÏÂͼ¾ÍÊÇÒ»¸öºÜÇåÎúµÄÀý×Ó£ºhelloÕâ¸ökey£¬ÔÚÈý¸ö½ÚµãÉ϶ÔÓ¦ÁË×ܹ²7ÌõÊý¾Ý£¬ÕâЩÊý¾Ý¶¼»á±»À­È¡µ½Í¬Ò»¸ötaskÖнøÐд¦Àí£»¶øworldºÍyouÕâÁ½¸ökey·Ö±ð²Å¶ÔÓ¦1ÌõÊý¾Ý£¬ËùÒÔÁíÍâÁ½¸ötaskÖ»Òª·Ö±ð´¦Àí1ÌõÊý¾Ý¼´¿É¡£´ËʱµÚÒ»¸ötaskµÄÔËÐÐʱ¼ä¿ÉÄÜÊÇÁíÍâÁ½¸ötaskµÄ7±¶£¬¶øÕû¸östageµÄÔËÐÐËÙ¶ÈÒ²ÓÉÔËÐÐ×îÂýµÄÄǸötaskËù¾ö¶¨¡£

ÈçºÎ¶¨Î»µ¼ÖÂÊý¾ÝÇãбµÄ´úÂë

Êý¾ÝÇãбֻ»á·¢ÉúÔÚshuffle¹ý³ÌÖС£ÕâÀï¸ø´ó¼ÒÂÞÁÐһЩ³£ÓõIJ¢ÇÒ¿ÉÄܻᴥ·¢shuffle²Ù×÷µÄËã×Ó£ºdistinct¡¢groupByKey¡¢reduceByKey¡¢aggregateByKey¡¢join¡¢cogroup¡¢repartitionµÈ¡£³öÏÖÊý¾ÝÇãбʱ£¬¿ÉÄܾÍÊÇÄãµÄ´úÂëÖÐʹÓÃÁËÕâЩËã×ÓÖеÄijһ¸öËùµ¼Öµġ£

ij¸ötaskÖ´ÐÐÌØ±ðÂýµÄÇé¿ö

Ê×ÏÈÒª¿´µÄ£¬¾ÍÊÇÊý¾ÝÇãб·¢ÉúÔÚµÚ¼¸¸östageÖС£

Èç¹ûÊÇÓÃyarn-clientģʽÌá½»£¬ÄÇô±¾µØÊÇÖ±½Ó¿ÉÒÔ¿´µ½logµÄ£¬¿ÉÒÔÔÚlogÖÐÕÒµ½µ±Ç°ÔËÐе½Á˵ڼ¸¸östage£»Èç¹ûÊÇÓÃyarn-clusterģʽÌá½»£¬Ôò¿ÉÒÔͨ¹ýSpark Web UIÀ´²é¿´µ±Ç°ÔËÐе½Á˵ڼ¸¸östage¡£´ËÍ⣬ÎÞÂÛÊÇʹÓÃyarn-clientģʽ»¹ÊÇyarn-clusterģʽ£¬ÎÒÃǶ¼¿ÉÒÔÔÚSpark Web UIÉÏÉîÈ뿴һϵ±Ç°Õâ¸östage¸÷¸ötask·ÖÅäµÄÊý¾ÝÁ¿£¬´Ó¶ø½øÒ»²½È·¶¨ÊDz»ÊÇtask·ÖÅäµÄÊý¾Ý²»¾ùÔȵ¼ÖÂÁËÊý¾ÝÇãб¡£

±ÈÈçÏÂͼÖУ¬µ¹ÊýµÚÈýÁÐÏÔʾÁËÿ¸ötaskµÄÔËÐÐʱ¼ä¡£Ã÷ÏÔ¿ÉÒÔ¿´µ½£¬ÓеÄtaskÔËÐÐÌØ±ð¿ì£¬Ö»ÐèÒª¼¸ÃëÖӾͿÉÒÔÔËÐÐÍꣻ¶øÓеÄtaskÔËÐÐÌØ±ðÂý£¬ÐèÒª¼¸·ÖÖÓ²ÅÄÜÔËÐÐÍ꣬´Ëʱµ¥´ÓÔËÐÐʱ¼äÉÏ¿´¾ÍÒѾ­Äܹ»È·¶¨·¢ÉúÊý¾ÝÇãбÁË¡£´ËÍ⣬µ¹ÊýµÚÒ»ÁÐÏÔʾÁËÿ¸ötask´¦ÀíµÄÊý¾ÝÁ¿£¬Ã÷ÏÔ¿ÉÒÔ¿´µ½£¬ÔËÐÐʱ¼äÌØ±ð¶ÌµÄtaskÖ»ÐèÒª´¦Àí¼¸°ÙKBµÄÊý¾Ý¼´¿É£¬¶øÔËÐÐʱ¼äÌØ±ð³¤µÄtaskÐèÒª´¦Àí¼¸Ç§KBµÄÊý¾Ý£¬´¦ÀíµÄÊý¾ÝÁ¿²îÁË10±¶¡£´Ëʱ¸ü¼ÓÄܹ»È·¶¨ÊÇ·¢ÉúÁËÊý¾ÝÇãб¡£

ÖªµÀÊý¾ÝÇãб·¢ÉúÔÚÄÄÒ»¸östageÖ®ºó£¬½Ó×ÅÎÒÃǾÍÐèÒª¸ù¾Ýstage»®·ÖÔ­Àí£¬ÍÆËã³öÀ´·¢ÉúÇãбµÄÄǸöstage¶ÔÓ¦´úÂëÖеÄÄÄÒ»²¿·Ö£¬Õⲿ·Ö´úÂëÖп϶¨»áÓÐÒ»¸öshuffleÀàËã×Ó¡£¾«×¼ÍÆËãstageÓë´úÂëµÄ¶ÔÓ¦¹ØÏµ£¬ÐèÒª¶ÔSparkµÄÔ´ÂëÓÐÉîÈëµÄÀí½â£¬ÕâÀïÎÒÃÇ¿ÉÒÔ½éÉÜÒ»¸öÏà¶Ô¼òµ¥ÊµÓõÄÍÆËã·½·¨£ºÖ»Òª¿´µ½Spark´úÂëÖгöÏÖÁËÒ»¸öshuffleÀàËã×Ó»òÕßÊÇSpark SQLµÄSQLÓï¾äÖгöÏÖÁ˻ᵼÖÂshuffleµÄÓï¾ä£¨±ÈÈçgroup byÓï¾ä£©£¬ÄÇô¾Í¿ÉÒÔÅж¨£¬ÒÔÄǸöµØ·½Îª½çÏÞ»®·Ö³öÁËǰºóÁ½¸östage¡£

ÕâÀïÎÒÃǾÍÒÔSpark×î»ù´¡µÄÈëÃųÌÐò¡ª¡ªµ¥´Ê¼ÆÊýÀ´¾ÙÀý£¬ÈçºÎÓÃ×î¼òµ¥µÄ·½·¨´óÖÂÍÆËã³öÒ»¸östage¶ÔÓ¦µÄ´úÂë¡£ÈçÏÂʾÀý£¬ÔÚÕû¸ö´úÂëÖУ¬Ö»ÓÐÒ»¸öreduceByKeyÊǻᷢÉúshuffleµÄËã×Ó£¬Òò´Ë¾Í¿ÉÒÔÈÏΪ£¬ÒÔÕâ¸öËã×ÓΪ½çÏÞ£¬»á»®·Ö³öǰºóÁ½¸östage¡£

1¡¢stage0£¬Ö÷ÒªÊÇÖ´ÐдÓtextFileµ½map²Ù×÷£¬ÒÔ¼°Ö´ÐÐshuffle write²Ù×÷¡£shuffle write²Ù×÷£¬ÎÒÃÇ¿ÉÒÔ¼òµ¥Àí½âΪ¶Ôpairs RDDÖеÄÊý¾Ý½øÐзÖÇø²Ù×÷£¬Ã¿¸ötask´¦ÀíµÄÊý¾ÝÖУ¬ÏàͬµÄkey»áдÈëͬһ¸ö´ÅÅÌÎļþÄÚ¡£

2¡¢stage1£¬Ö÷ÒªÊÇÖ´ÐдÓreduceByKeyµ½collect²Ù×÷£¬stage1µÄ¸÷¸ötaskÒ»¿ªÊ¼ÔËÐУ¬¾Í»áÊ×ÏÈÖ´ÐÐshuffle read²Ù×÷¡£Ö´ÐÐshuffle read²Ù×÷µÄtask£¬»á´Óstage0µÄ¸÷¸ötaskËùÔÚ½ÚµãÀ­È¡ÊôÓÚ×Ô¼º´¦ÀíµÄÄÇЩkey£¬È»ºó¶Ôͬһ¸ökey½øÐÐÈ«¾ÖÐԵľۺϻòjoinµÈ²Ù×÷£¬ÔÚÕâÀï¾ÍÊǶÔkeyµÄvalueÖµ½øÐÐÀÛ¼Ó¡£stage1ÔÚÖ´ÐÐÍêreduceByKeyËã×ÓÖ®ºó£¬¾Í¼ÆËã³öÁË×îÖÕµÄwordCounts RDD£¬È»ºó»áÖ´ÐÐcollectËã×Ó£¬½«ËùÓÐÊý¾ÝÀ­È¡µ½DriverÉÏ£¬¹©ÎÒÃDZéÀúºÍ´òÓ¡Êä³ö¡£

val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))

ͨ¹ý¶Ôµ¥´Ê¼ÆÊý³ÌÐòµÄ·ÖÎö£¬Ï£ÍûÄܹ»Èôó¼ÒÁ˽â×î»ù±¾µÄstage»®·ÖµÄÔ­Àí£¬ÒÔ¼°stage»®·Öºóshuffle²Ù×÷ÊÇÈçºÎÔÚÁ½¸östageµÄ±ß½ç´¦Ö´Ðеġ£È»ºóÎÒÃǾÍÖªµÀÈçºÎ¿ìËÙ¶¨Î»³ö·¢ÉúÊý¾ÝÇãбµÄstage¶ÔÓ¦´úÂëµÄÄÄÒ»¸ö²¿·ÖÁË¡£±ÈÈçÎÒÃÇÔÚSpark Web UI»òÕß±¾µØlogÖз¢ÏÖ£¬stage1µÄij¼¸¸ötaskÖ´ÐеÃÌØ±ðÂý£¬Åж¨stage1³öÏÖÁËÊý¾ÝÇãб£¬ÄÇô¾Í¿ÉÒԻص½´úÂëÖж¨Î»³östage1Ö÷Òª°üÀ¨ÁËreduceByKeyÕâ¸öshuffleÀàËã×Ó£¬´Ëʱ»ù±¾¾Í¿ÉÒÔÈ·¶¨ÊÇÓÉeduceByKeyËã×Óµ¼ÖµÄÊý¾ÝÇãбÎÊÌâ¡£±ÈÈçij¸öµ¥´Ê³öÏÖÁË100Íò´Î£¬ÆäËûµ¥´Ê²Å³öÏÖ10´Î£¬ÄÇôstage1µÄij¸ötask¾ÍÒª´¦Àí100ÍòÊý¾Ý£¬Õû¸östageµÄËٶȾͻᱻÕâ¸ötaskÍÏÂý¡£

Êý¾ÝÇãбµÄ½â¾ö·½°¸

½â¾ö·½°¸Ò»£ºÊ¹ÓÃHive ETLÔ¤´¦ÀíÊý¾Ý

·½°¸ÊÊÓó¡¾°£ºµ¼ÖÂÊý¾ÝÇãбµÄÊÇHive±í¡£Èç¹û¸ÃHive±íÖеÄÊý¾Ý±¾ÉíºÜ²»¾ùÔÈ£¨±ÈÈçij¸ökey¶ÔÓ¦ÁË100ÍòÊý¾Ý£¬ÆäËûkey²Å¶ÔÓ¦ÁË10ÌõÊý¾Ý£©£¬¶øÇÒÒµÎñ³¡¾°ÐèҪƵ·±Ê¹ÓÃSpark¶ÔHive±íÖ´ÐÐij¸ö·ÖÎö²Ù×÷£¬ÄÇô±È½ÏÊʺÏʹÓÃÕâÖÖ¼¼Êõ·½°¸¡£

·½°¸ÊµÏÖ˼·£º´Ëʱ¿ÉÒÔÆÀ¹Àһϣ¬ÊÇ·ñ¿ÉÒÔͨ¹ýHiveÀ´½øÐÐÊý¾ÝÔ¤´¦Àí£¨¼´Í¨¹ýHive ETLÔ¤ÏȶÔÊý¾Ý°´ÕÕkey½øÐоۺϣ¬»òÕßÊÇÔ¤ÏÈºÍÆäËû±í½øÐÐjoin£©£¬È»ºóÔÚSpark×÷ÒµÖÐÕë¶ÔµÄÊý¾ÝÔ´¾Í²»ÊÇÔ­À´µÄHive±íÁË£¬¶øÊÇÔ¤´¦ÀíºóµÄHive±í¡£´ËʱÓÉÓÚÊý¾ÝÒѾ­Ô¤ÏȽøÐйý¾ÛºÏ»òjoin²Ù×÷ÁË£¬ÄÇôÔÚSpark×÷ÒµÖÐÒ²¾Í²»ÐèҪʹÓÃÔ­ÏȵÄshuffleÀàËã×ÓÖ´ÐÐÕâÀà²Ù×÷ÁË¡£

·½°¸ÊµÏÖÔ­Àí£ºÕâÖÖ·½°¸´Ó¸ùÔ´ÉϽâ¾öÁËÊý¾ÝÇãб£¬ÒòΪ³¹µ×±ÜÃâÁËÔÚSparkÖÐÖ´ÐÐshuffleÀàËã×Ó£¬ÄÇô¿Ï¶¨¾Í²»»áÓÐÊý¾ÝÇãбµÄÎÊÌâÁË¡£µ«ÊÇÕâÀïÒ²ÒªÌáÐÑһϴó¼Ò£¬ÕâÖÖ·½Ê½ÊôÓÚÖα겻Öα¾¡£ÒòΪ±Ï¾¹Êý¾Ý±¾Éí¾Í´æÔÚ·Ö²¼²»¾ùÔȵÄÎÊÌ⣬ËùÒÔHive ETLÖнøÐÐgroup by»òÕßjoinµÈshuffle²Ù×÷ʱ£¬»¹ÊÇ»á³öÏÖÊý¾ÝÇãб£¬µ¼ÖÂHive ETLµÄËٶȺÜÂý¡£ÎÒÃÇÖ»ÊǰÑÊý¾ÝÇãбµÄ·¢ÉúÌáǰµ½ÁËHive ETLÖУ¬±ÜÃâSpark³ÌÐò·¢ÉúÊý¾ÝÇãб¶øÒÑ¡£

·½°¸Óŵ㣺ʵÏÖÆðÀ´¼òµ¥±ã½Ý£¬Ð§¹û»¹·Ç³£ºÃ£¬ÍêÈ«¹æ±ÜµôÁËÊý¾ÝÇãб£¬Spark×÷ÒµµÄÐÔÄÜ»á´ó·ù¶ÈÌáÉý¡£

·½°¸È±µã£ºÖα겻Öα¾£¬Hive ETLÖл¹ÊǻᷢÉúÊý¾ÝÇãб¡£

·½°¸Êµ¼ù¾­Ñ飺ÔÚһЩJavaϵͳÓëSpark½áºÏʹÓõÄÏîÄ¿ÖУ¬»á³öÏÖJava´úÂëÆµ·±µ÷ÓÃSpark×÷ÒµµÄ³¡¾°£¬¶øÇÒ¶ÔSpark×÷ÒµµÄÖ´ÐÐÐÔÄÜÒªÇóºÜ¸ß£¬¾Í±È½ÏÊʺÏʹÓÃÕâÖÖ·½°¸¡£½«Êý¾ÝÇãбÌáǰµ½ÉÏÓεÄHive ETL£¬Ã¿Ìì½öÖ´ÐÐÒ»´Î£¬Ö»ÓÐÄÇÒ»´ÎÊDZȽÏÂýµÄ£¬¶øÖ®ºóÿ´ÎJavaµ÷ÓÃSpark×÷ҵʱ£¬Ö´ÐÐËٶȶ¼»áºÜ¿ì£¬Äܹ»Ìṩ¸üºÃµÄÓû§ÌåÑé¡£

ÏîĿʵ¼ù¾­Ñ飺ÔÚÃÀÍÅ¡¤µãÆÀµÄ½»»¥Ê½Óû§ÐÐΪ·ÖÎöϵͳÖÐʹÓÃÁËÕâÖÖ·½°¸£¬¸ÃϵͳÖ÷ÒªÊÇÔÊÐíÓû§Í¨¹ýJava WebϵͳÌá½»Êý¾Ý·ÖÎöͳ¼ÆÈÎÎñ£¬ºó¶Ëͨ¹ýJavaÌá½»Spark×÷Òµ½øÐÐÊý¾Ý·ÖÎöͳ¼Æ¡£ÒªÇóSpark×÷ÒµËٶȱØÐëÒª¿ì£¬¾¡Á¿ÔÚ10·ÖÖÓÒÔÄÚ£¬·ñÔòËÙ¶ÈÌ«Âý£¬Óû§ÌåÑé»áºÜ²î¡£ËùÒÔÎÒÃǽ«ÓÐЩSpark×÷ÒµµÄshuffle²Ù×÷Ìáǰµ½ÁËHive ETLÖУ¬´Ó¶øÈÃSparkÖ±½ÓʹÓÃÔ¤´¦ÀíµÄHiveÖмä±í£¬¾¡¿ÉÄܵؼõÉÙSparkµÄshuffle²Ù×÷£¬´ó·ù¶ÈÌáÉýÁËÐÔÄÜ£¬½«²¿·Ö×÷ÒµµÄÐÔÄÜÌáÉýÁË6±¶ÒÔÉÏ¡£

½â¾ö·½°¸¶þ£º¹ýÂËÉÙÊýµ¼ÖÂÇãбµÄkey

·½°¸ÊÊÓó¡¾°£ºÈç¹û·¢ÏÖµ¼ÖÂÇãбµÄkey¾ÍÉÙÊý¼¸¸ö£¬¶øÇÒ¶Ô¼ÆËã±¾ÉíµÄÓ°Ïì²¢²»´óµÄ»°£¬ÄÇôºÜÊʺÏʹÓÃÕâÖÖ·½°¸¡£±ÈÈç99%µÄkey¾Í¶ÔÓ¦10ÌõÊý¾Ý£¬µ«ÊÇÖ»ÓÐÒ»¸ökey¶ÔÓ¦ÁË100ÍòÊý¾Ý£¬´Ó¶øµ¼ÖÂÁËÊý¾ÝÇãб¡£

·½°¸ÊµÏÖ˼·£ºÈç¹ûÎÒÃÇÅжÏÄÇÉÙÊý¼¸¸öÊý¾ÝÁ¿Ìرð¶àµÄkey£¬¶Ô×÷ÒµµÄÖ´ÐкͼÆËã½á¹û²»ÊÇÌØ±ðÖØÒªµÄ»°£¬ÄÇô¸É´à¾ÍÖ±½Ó¹ýÂ˵ôÄÇÉÙÊý¼¸¸ökey¡£±ÈÈ磬ÔÚSpark SQLÖпÉÒÔʹÓÃwhere×Ó¾ä¹ýÂ˵ôÕâЩkey»òÕßÔÚSpark CoreÖжÔRDDÖ´ÐÐfilterËã×Ó¹ýÂ˵ôÕâЩkey¡£Èç¹ûÐèҪÿ´Î×÷ÒµÖ´ÐÐʱ£¬¶¯Ì¬Åж¨ÄÄЩkeyµÄÊý¾ÝÁ¿×î¶àÈ»ºóÔÙ½øÐйýÂË£¬ÄÇô¿ÉÒÔʹÓÃsampleËã×Ó¶ÔRDD½øÐвÉÑù£¬È»ºó¼ÆËã³öÿ¸ökeyµÄÊýÁ¿£¬È¡Êý¾ÝÁ¿×î¶àµÄkey¹ýÂ˵ô¼´¿É¡£

·½°¸ÊµÏÖÔ­Àí£º½«µ¼ÖÂÊý¾ÝÇãбµÄkey¸ø¹ýÂ˵ôÖ®ºó£¬ÕâЩkey¾Í²»»á²ÎÓë¼ÆËãÁË£¬×ÔÈ»²»¿ÉÄܲúÉúÊý¾ÝÇãб¡£

·½°¸Óŵ㣺ʵÏÖ¼òµ¥£¬¶øÇÒЧ¹ûÒ²ºÜºÃ£¬¿ÉÒÔÍêÈ«¹æ±ÜµôÊý¾ÝÇãб¡£

·½°¸È±µã£ºÊÊÓó¡¾°²»¶à£¬´ó¶àÊýÇé¿öÏ£¬µ¼ÖÂÇãбµÄkey»¹ÊǺܶàµÄ£¬²¢²»ÊÇÖ»ÓÐÉÙÊý¼¸¸ö¡£

·½°¸Êµ¼ù¾­Ñ飺ÔÚÏîÄ¿ÖÐÎÒÃÇÒ²²ÉÓùýÕâÖÖ·½°¸½â¾öÊý¾ÝÇãб¡£ÓÐÒ»´Î·¢ÏÖijһÌìSpark×÷ÒµÔÚÔËÐеÄʱºòͻȻOOMÁË£¬×·²éÖ®ºó·¢ÏÖ£¬ÊÇHive±íÖеÄijһ¸ökeyÔÚÄÇÌìÊý¾ÝÒì³££¬µ¼ÖÂÊý¾ÝÁ¿±©Ôö¡£Òò´Ë¾Í²Éȡÿ´ÎÖ´ÐÐǰÏȽøÐвÉÑù£¬¼ÆËã³öÑù±¾ÖÐÊý¾ÝÁ¿×î´óµÄ¼¸¸ökeyÖ®ºó£¬Ö±½ÓÔÚ³ÌÐòÖн«ÄÇЩkey¸ø¹ýÂ˵ô¡£

½â¾ö·½°¸Èý£ºÌá¸ßshuffle²Ù×÷µÄ²¢ÐжÈ

·½°¸ÊÊÓó¡¾°£ºÈç¹ûÎÒÃDZØÐëÒª¶ÔÊý¾ÝÇãбӭÄѶøÉÏ£¬ÄÇô½¨ÒéÓÅÏÈʹÓÃÕâÖÖ·½°¸£¬ÒòΪÕâÊÇ´¦ÀíÊý¾ÝÇãб×î¼òµ¥µÄÒ»ÖÖ·½°¸¡£

·½°¸ÊµÏÖ˼·£ºÔÚ¶ÔRDDÖ´ÐÐshuffleËã×Óʱ£¬¸øshuffleËã×Ó´«ÈëÒ»¸ö²ÎÊý£¬±ÈÈçreduceByKey(1000)£¬¸Ã²ÎÊý¾ÍÉèÖÃÁËÕâ¸öshuffleËã×ÓÖ´ÐÐʱshuffle read taskµÄÊýÁ¿¡£¶ÔÓÚSpark SQLÖеÄshuffleÀàÓï¾ä£¬±ÈÈçgroup by¡¢joinµÈ£¬ÐèÒªÉèÖÃÒ»¸ö²ÎÊý£¬¼´spark.sql.shuffle.partitions£¬¸Ã²ÎÊý´ú±íÁËshuffle read taskµÄ²¢Ðжȣ¬¸ÃֵĬÈÏÊÇ200£¬¶ÔÓںܶೡ¾°À´Ëµ¶¼Óеã¹ýС¡£

·½°¸ÊµÏÖÔ­Àí£ºÔö¼Óshuffle read taskµÄÊýÁ¿£¬¿ÉÒÔÈÃÔ­±¾·ÖÅä¸øÒ»¸ötaskµÄ¶à¸ökey·ÖÅ䏸¶à¸ötask£¬´Ó¶øÈÃÿ¸ötask´¦Àí±ÈÔ­À´¸üÉÙµÄÊý¾Ý¡£¾ÙÀýÀ´Ëµ£¬Èç¹ûÔ­±¾ÓÐ5¸ökey£¬Ã¿¸ökey¶ÔÓ¦10ÌõÊý¾Ý£¬Õâ5¸ökey¶¼ÊÇ·ÖÅä¸øÒ»¸ötaskµÄ£¬ÄÇôÕâ¸ötask¾ÍÒª´¦Àí50ÌõÊý¾Ý¡£¶øÔö¼ÓÁËshuffle read taskÒÔºó£¬Ã¿¸ötask¾Í·ÖÅäµ½Ò»¸ökey£¬¼´Ã¿¸ötask¾Í´¦Àí10ÌõÊý¾Ý£¬ÄÇô×ÔȻÿ¸ötaskµÄÖ´ÐÐʱ¼ä¶¼»á±ä¶ÌÁË¡£¾ßÌåÔ­ÀíÈçÏÂͼËùʾ¡£

·½°¸Óŵ㣺ʵÏÖÆðÀ´±È½Ï¼òµ¥£¬¿ÉÒÔÓÐЧ»º½âºÍ¼õÇáÊý¾ÝÇãбµÄÓ°Ïì¡£

·½°¸È±µã£ºÖ»ÊÇ»º½âÁËÊý¾ÝÇãб¶øÒÑ£¬Ã»Óг¹µ×¸ù³ýÎÊÌ⣬¸ù¾Ýʵ¼ù¾­ÑéÀ´¿´£¬ÆäЧ¹ûÓÐÏÞ¡£

·½°¸Êµ¼ù¾­Ñ飺¸Ã·½°¸Í¨³£ÎÞ·¨³¹µ×½â¾öÊý¾ÝÇãб£¬ÒòΪÈç¹û³öÏÖһЩ¼«¶ËÇé¿ö£¬±ÈÈçij¸ökey¶ÔÓ¦µÄÊý¾ÝÁ¿ÓÐ100Íò£¬ÄÇôÎÞÂÛÄãµÄtaskÊýÁ¿Ôö¼Óµ½¶àÉÙ£¬Õâ¸ö¶ÔÓ¦×Å100ÍòÊý¾ÝµÄkey¿Ï¶¨»¹ÊÇ»á·ÖÅäµ½Ò»¸ötaskÖÐÈ¥´¦Àí£¬Òò´Ë×¢¶¨»¹ÊǻᷢÉúÊý¾ÝÇãбµÄ¡£ËùÒÔÕâÖÖ·½°¸Ö»ÄÜ˵ÊÇÔÚ·¢ÏÖÊý¾ÝÇãбʱ³¢ÊÔʹÓõĵÚÒ»ÖÖÊֶΣ¬³¢ÊÔÈ¥ÓÃ×ì¼òµ¥µÄ·½·¨»º½âÊý¾ÝÇãб¶øÒÑ£¬»òÕßÊÇºÍÆäËû·½°¸½áºÏÆðÀ´Ê¹Óá£

½â¾ö·½°¸ËÄ£ºÁ½½×¶Î¾ÛºÏ£¨¾Ö²¿¾ÛºÏ+È«¾Ö¾ÛºÏ£©

·½°¸ÊÊÓó¡¾°£º¶ÔRDDÖ´ÐÐreduceByKeyµÈ¾ÛºÏÀàshuffleËã×Ó»òÕßÔÚSpark SQLÖÐʹÓÃgroup byÓï¾ä½øÐзÖ×é¾ÛºÏʱ£¬±È½ÏÊÊÓÃÕâÖÖ·½°¸¡£

·½°¸ÊµÏÖ˼·£ºÕâ¸ö·½°¸µÄºËÐÄʵÏÖ˼·¾ÍÊǽøÐÐÁ½½×¶Î¾ÛºÏ¡£µÚÒ»´ÎÊǾֲ¿¾ÛºÏ£¬ÏȸøÃ¿¸ökey¶¼´òÉÏÒ»¸öËæ»úÊý£¬±ÈÈç10ÒÔÄÚµÄËæ»úÊý£¬´ËʱԭÏÈÒ»ÑùµÄkey¾Í±ä³É²»Ò»ÑùµÄÁË£¬±ÈÈç(hello, 1) (hello, 1) (hello, 1) (hello, 1)£¬¾Í»á±ä³É(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)¡£½Ó×ŶԴòÉÏËæ»úÊýºóµÄÊý¾Ý£¬Ö´ÐÐreduceByKeyµÈ¾ÛºÏ²Ù×÷£¬½øÐоֲ¿¾ÛºÏ£¬ÄÇô¾Ö²¿¾ÛºÏ½á¹û£¬¾Í»á±ä³ÉÁË(1_hello, 2) (2_hello, 2)¡£È»ºó½«¸÷¸ökeyµÄǰ׺¸øÈ¥µô£¬¾Í»á±ä³É(hello,2)(hello,2)£¬ÔٴνøÐÐÈ«¾Ö¾ÛºÏ²Ù×÷£¬¾Í¿ÉÒԵõ½×îÖÕ½á¹ûÁË£¬±ÈÈç(hello, 4)¡£

·½°¸ÊµÏÖÔ­Àí£º½«Ô­±¾ÏàͬµÄkeyͨ¹ý¸½¼ÓËæ»úǰ׺µÄ·½Ê½£¬±ä³É¶à¸ö²»Í¬µÄkey£¬¾Í¿ÉÒÔÈÃÔ­±¾±»Ò»¸ötask´¦ÀíµÄÊý¾Ý·ÖÉ¢µ½¶à¸ötaskÉÏÈ¥×ö¾Ö²¿¾ÛºÏ£¬½ø¶ø½â¾öµ¥¸ötask´¦ÀíÊý¾ÝÁ¿¹ý¶àµÄÎÊÌâ¡£½Ó×ÅÈ¥³ýµôËæ»úǰ׺£¬ÔٴνøÐÐÈ«¾Ö¾ÛºÏ£¬¾Í¿ÉÒԵõ½×îÖյĽá¹û¡£¾ßÌåÔ­Àí¼ûÏÂͼ¡£

·½°¸Óŵ㣺¶ÔÓÚ¾ÛºÏÀàµÄshuffle²Ù×÷µ¼ÖµÄÊý¾ÝÇãб£¬Ð§¹ûÊǷdz£²»´íµÄ¡£Í¨³£¶¼¿ÉÒÔ½â¾öµôÊý¾ÝÇãб£¬»òÕßÖÁÉÙÊÇ´ó·ù¶È»º½âÊý¾ÝÇãб£¬½«Spark×÷ÒµµÄÐÔÄÜÌáÉýÊý±¶ÒÔÉÏ¡£

·½°¸È±µã£º½ö½öÊÊÓÃÓÚ¾ÛºÏÀàµÄshuffle²Ù×÷£¬ÊÊÓ÷¶Î§Ïà¶Ô½ÏÕ­¡£Èç¹ûÊÇjoinÀàµÄshuffle²Ù×÷£¬»¹µÃÓÃÆäËûµÄ½â¾ö·½°¸¡£

// µÚÒ»²½£¬¸øRDDÖеÄÿ¸ökey¶¼´òÉÏÒ»¸öËæ»úǰ׺¡£
JavaPairRDD randomPrefixRdd = rdd.mapToPair(
new PairFunction, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2(prefix + "_" + tuple._1, tuple._2);
}
});

// µÚ¶þ²½£¬¶Ô´òÉÏËæ»úǰ׺µÄkey½øÐоֲ¿¾ÛºÏ¡£
JavaPairRDD localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});

// µÚÈý²½£¬È¥³ýRDDÖÐÿ¸ökeyµÄËæ»úǰ׺¡£
JavaPairRDD removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2(originalKey, tuple._2);
}
});

// µÚËIJ½£¬¶ÔÈ¥³ýÁËËæ»úǰ׺µÄRDD½øÐÐÈ«¾Ö¾ÛºÏ¡£
JavaPairRDD globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});

½â¾ö·½°¸Î壺½«reduce joinתΪmap join

·½°¸ÊÊÓó¡¾°£ºÔÚ¶ÔRDDʹÓÃjoinÀà²Ù×÷£¬»òÕßÊÇÔÚSpark SQLÖÐʹÓÃjoinÓï¾äʱ£¬¶øÇÒjoin²Ù×÷ÖеÄÒ»¸öRDD»ò±íµÄÊý¾ÝÁ¿±È½ÏС£¨±ÈÈ缸°ÙM»òÕßÒ»Á½G£©£¬±È½ÏÊÊÓô˷½°¸¡£

·½°¸ÊµÏÖ˼·£º²»Ê¹ÓÃjoinËã×Ó½øÐÐÁ¬½Ó²Ù×÷£¬¶øÊ¹ÓÃBroadcast±äÁ¿ÓëmapÀàËã×ÓʵÏÖjoin²Ù×÷£¬½ø¶øÍêÈ«¹æ±ÜµôshuffleÀàµÄ²Ù×÷£¬³¹µ×±ÜÃâÊý¾ÝÇãбµÄ·¢ÉúºÍ³öÏÖ¡£½«½ÏСRDDÖеÄÊý¾ÝÖ±½Óͨ¹ýcollectËã×ÓÀ­È¡µ½Driver¶ËµÄÄÚ´æÖÐÀ´£¬È»ºó¶ÔÆä´´½¨Ò»¸öBroadcast±äÁ¿£»½Ó×ŶÔÁíÍâÒ»¸öRDDÖ´ÐÐmapÀàËã×Ó£¬ÔÚËã×Óº¯ÊýÄÚ£¬´ÓBroadcast±äÁ¿ÖлñÈ¡½ÏСRDDµÄÈ«Á¿Êý¾Ý£¬Ó뵱ǰRDDµÄÿһÌõÊý¾Ý°´ÕÕÁ¬½Ókey½øÐбȶԣ¬Èç¹ûÁ¬½ÓkeyÏàͬµÄ»°£¬ÄÇô¾Í½«Á½¸öRDDµÄÊý¾ÝÓÃÄãÐèÒªµÄ·½Ê½Á¬½ÓÆðÀ´¡£

·½°¸ÊµÏÖÔ­Àí£ºÆÕͨµÄjoinÊÇ»á×ßshuffle¹ý³ÌµÄ£¬¶øÒ»µ©shuffle£¬¾ÍÏ൱ÓڻὫÏàͬkeyµÄÊý¾ÝÀ­È¡µ½Ò»¸öshuffle read taskÖÐÔÙ½øÐÐjoin£¬´Ëʱ¾ÍÊÇreduce join¡£µ«ÊÇÈç¹ûÒ»¸öRDDÊDZȽÏСµÄ£¬Ôò¿ÉÒÔ²ÉÓù㲥СRDDÈ«Á¿Êý¾Ý+mapËã×ÓÀ´ÊµÏÖÓëjoinͬÑùµÄЧ¹û£¬Ò²¾ÍÊÇmap join£¬´Ëʱ¾Í²»»á·¢Éúshuffle²Ù×÷£¬Ò²¾Í²»»á·¢ÉúÊý¾ÝÇãб¡£¾ßÌåÔ­ÀíÈçÏÂͼËùʾ¡£

·½°¸Óŵ㣺¶Ôjoin²Ù×÷µ¼ÖµÄÊý¾ÝÇãб£¬Ð§¹û·Ç³£ºÃ£¬ÒòΪ¸ù±¾¾Í²»»á·¢Éúshuffle£¬Ò²¾Í¸ù±¾²»»á·¢ÉúÊý¾ÝÇãб¡£

·½°¸È±µã£ºÊÊÓó¡¾°½ÏÉÙ£¬ÒòΪÕâ¸ö·½°¸Ö»ÊÊÓÃÓÚÒ»¸ö´ó±íºÍÒ»¸öС±íµÄÇé¿ö¡£±Ï¾¹ÎÒÃÇÐèÒª½«Ð¡±í½øÐй㲥£¬´Ëʱ»á±È½ÏÏûºÄÄÚ´æ×ÊÔ´£¬driverºÍÿ¸öExecutorÄÚ´æÖж¼»áפÁôÒ»·ÝСRDDµÄÈ«Á¿Êý¾Ý¡£Èç¹ûÎÒÃǹ㲥³öÈ¥µÄRDDÊý¾Ý±È½Ï´ó£¬±ÈÈç10GÒÔÉÏ£¬ÄÇô¾Í¿ÉÄÜ·¢ÉúÄÚ´æÒç³öÁË¡£Òò´Ë²¢²»ÊʺÏÁ½¸ö¶¼ÊÇ´ó±íµÄÇé¿ö¡£

// Ê×ÏȽ«Êý¾ÝÁ¿±È½ÏСµÄRDDµÄÊý¾Ý£¬collectµ½DriverÖÐÀ´¡£
List> rdd1Data = rdd1.collect()
// È»ºóʹÓÃSparkµÄ¹ã²¥¹¦ÄÜ£¬½«Ð¡RDDµÄÊý¾Ýת»»³É¹ã²¥±äÁ¿£¬ÕâÑùÿ¸öExecutor¾ÍÖ»ÓÐÒ»·ÝRDDµÄÊý¾Ý¡£
// ¿ÉÒÔ¾¡¿ÉÄܽÚÊ¡ÄÚ´æ¿Õ¼ä£¬²¢ÇÒ¼õÉÙÍøÂç´«ÊäÐÔÄÜ¿ªÏú¡£
final Broadcast>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// ¶ÔÁíÍâÒ»¸öRDDÖ´ÐÐmapÀà²Ù×÷£¬¶ø²»ÔÙÊÇjoinÀà²Ù×÷¡£
JavaPairRDD> joinedRdd = rdd2.mapToPair(
new PairFunction, String, Tuple2>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2> call(Tuple2 tuple)
throws Exception {
// ÔÚËã×Óº¯ÊýÖУ¬Í¨¹ý¹ã²¥±äÁ¿£¬»ñÈ¡µ½±¾µØExecutorÖеÄrdd1Êý¾Ý¡£
List> rdd1Data = rdd1DataBroadcast.value();
// ¿ÉÒÔ½«rdd1µÄÊý¾Ýת»»ÎªÒ»¸öMap£¬±ãÓÚºóÃæ½øÐÐjoin²Ù×÷¡£
Map rdd1DataMap = new HashMap();
for(Tuple2 data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// »ñÈ¡µ±Ç°RDDÊý¾ÝµÄkeyÒÔ¼°value¡£
String key = tuple._1;
String value = tuple._2;
// ´Órdd1Êý¾ÝMapÖУ¬¸ù¾Ýkey»ñÈ¡µ½¿ÉÒÔjoinµ½µÄÊý¾Ý¡£
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2(key, new Tuple2(value, rdd1Value));
}
});

// ÕâÀïµÃÌáʾһÏ¡£
// ÉÏÃæµÄ×ö·¨£¬½ö½öÊÊÓÃÓÚrdd1ÖеÄkeyûÓÐÖØ¸´£¬È«²¿ÊÇΨһµÄ³¡¾°¡£
// Èç¹ûrdd1ÖÐÓжà¸öÏàͬµÄkey£¬ÄÇô¾ÍµÃÓÃflatMapÀàµÄ²Ù×÷£¬ÔÚ½øÐÐjoinµÄʱºò²»ÄÜÓÃmap£¬¶øÊǵñéÀúrdd1ËùÓÐÊý¾Ý½øÐÐjoin¡£
// rdd2ÖÐÿÌõÊý¾Ý¶¼¿ÉÄܻ᷵»Ø¶àÌõjoinºóµÄÊý¾Ý¡£

½â¾ö·½°¸Áù£º²ÉÑùÇãбkey²¢·Ö²ðjoin²Ù×÷

·½°¸ÊÊÓó¡¾°£ºÁ½¸öRDD/Hive±í½øÐÐjoinµÄʱºò£¬Èç¹ûÊý¾ÝÁ¿¶¼±È½Ï´ó£¬ÎÞ·¨²ÉÓá°½â¾ö·½°¸Î塱£¬ÄÇô´Ëʱ¿ÉÒÔ¿´Ò»ÏÂÁ½¸öRDD/Hive±íÖеÄkey·Ö²¼Çé¿ö¡£Èç¹û³öÏÖÊý¾ÝÇãб£¬ÊÇÒòΪÆäÖÐijһ¸öRDD/Hive±íÖеÄÉÙÊý¼¸¸ökeyµÄÊý¾ÝÁ¿¹ý´ó£¬¶øÁíÒ»¸öRDD/Hive±íÖеÄËùÓÐkey¶¼·Ö²¼±È½Ï¾ùÔÈ£¬ÄÇô²ÉÓÃÕâ¸ö½â¾ö·½°¸ÊDZȽϺÏÊʵġ£

·½°¸ÊµÏÖ˼·£º

1¡¢¶Ô°üº¬ÉÙÊý¼¸¸öÊý¾ÝÁ¿¹ý´óµÄkeyµÄÄǸöRDD£¬Í¨¹ýsampleËã×Ó²ÉÑù³öÒ»·ÝÑù±¾À´£¬È»ºóͳ¼ÆÒ»ÏÂÿ¸ökeyµÄÊýÁ¿£¬¼ÆËã³öÀ´Êý¾ÝÁ¿×î´óµÄÊÇÄökey¡£

2¡¢È»ºó½«Õ⼸¸ökey¶ÔÓ¦µÄÊý¾Ý´ÓÔ­À´µÄRDDÖвð·Ö³öÀ´£¬ÐγÉÒ»¸öµ¥¶ÀµÄRDD£¬²¢¸øÃ¿¸ökey¶¼´òÉÏnÒÔÄÚµÄËæ»úÊý×÷Ϊǰ׺£¬¶ø²»»áµ¼ÖÂÇãбµÄ´ó²¿·ÖkeyÐγÉÁíÍâÒ»¸öRDD¡£

3¡¢½Ó׎«ÐèÒªjoinµÄÁíÒ»¸öRDD£¬Ò²¹ýÂ˳öÀ´ÄǼ¸¸öÇãбkey¶ÔÓ¦µÄÊý¾Ý²¢ÐγÉÒ»¸öµ¥¶ÀµÄRDD£¬½«Ã¿ÌõÊý¾ÝÅòÕͳÉnÌõÊý¾Ý£¬ÕânÌõÊý¾Ý¶¼°´Ë³Ðò¸½¼ÓÒ»¸ö0~nµÄǰ׺£¬²»»áµ¼ÖÂÇãбµÄ´ó²¿·ÖkeyÒ²ÐγÉÁíÍâÒ»¸öRDD¡£

4¡¢ÔÙ½«¸½¼ÓÁËËæ»úǰ׺µÄ¶ÀÁ¢RDDÓëÁíÒ»¸öÅòÕÍn±¶µÄ¶ÀÁ¢RDD½øÐÐjoin£¬´Ëʱ¾Í¿ÉÒÔ½«Ô­ÏÈÏàͬµÄkey´òÉ¢³Én·Ý£¬·ÖÉ¢µ½¶à¸ötaskÖÐÈ¥½øÐÐjoinÁË¡£

5¡¢¶øÁíÍâÁ½¸öÆÕͨµÄRDD¾ÍÕÕ³£join¼´¿É¡£

6¡¢×îºó½«Á½´ÎjoinµÄ½á¹ûʹÓÃunionËã×Ӻϲ¢ÆðÀ´¼´¿É£¬¾ÍÊÇ×îÖÕµÄjoin½á¹û¡£

·½°¸ÊµÏÖÔ­Àí£º¶ÔÓÚjoinµ¼ÖµÄÊý¾ÝÇãб£¬Èç¹ûÖ»ÊÇij¼¸¸ökeyµ¼ÖÂÁËÇãб£¬¿ÉÒÔ½«ÉÙÊý¼¸¸ökey·Ö²ð³É¶ÀÁ¢RDD£¬²¢¸½¼ÓËæ»úǰ׺´òÉ¢³Én·ÝÈ¥½øÐÐjoin£¬´ËʱÕ⼸¸ökey¶ÔÓ¦µÄÊý¾Ý¾Í²»»á¼¯ÖÐÔÚÉÙÊý¼¸¸ötaskÉÏ£¬¶øÊÇ·ÖÉ¢µ½¶à¸ötask½øÐÐjoinÁË¡£¾ßÌåÔ­Àí¼ûÏÂͼ¡£

·½°¸Óŵ㣺¶ÔÓÚjoinµ¼ÖµÄÊý¾ÝÇãб£¬Èç¹ûÖ»ÊÇij¼¸¸ökeyµ¼ÖÂÁËÇãб£¬²ÉÓø÷½Ê½¿ÉÒÔÓÃ×îÓÐЧµÄ·½Ê½´òÉ¢key½øÐÐjoin¡£¶øÇÒÖ»ÐèÒªÕë¶ÔÉÙÊýÇãбkey¶ÔÓ¦µÄÊý¾Ý½øÐÐÀ©ÈÝn±¶£¬²»ÐèÒª¶ÔÈ«Á¿Êý¾Ý½øÐÐÀ©ÈÝ¡£±ÜÃâÁËÕ¼Óùý¶àÄÚ´æ¡£

·½°¸È±µã£ºÈç¹ûµ¼ÖÂÇãбµÄkeyÌØ±ð¶àµÄ»°£¬±ÈÈç³ÉǧÉÏÍò¸ökey¶¼µ¼ÖÂÊý¾ÝÇãб£¬ÄÇôÕâÖÖ·½Ê½Ò²²»Êʺϡ£

// Ê×ÏÈ´Ó°üº¬ÁËÉÙÊý¼¸¸öµ¼ÖÂÊý¾ÝÇãбkeyµÄrdd1ÖУ¬²ÉÑù10%µÄÑù±¾Êý¾Ý¡£
JavaPairRDD sampledRDD = rdd1.sample(false, 0.1);

// ¶ÔÑù±¾Êý¾ÝRDDͳ¼Æ³öÿ¸ökeyµÄ³öÏÖ´ÎÊý£¬²¢°´³öÏÖ´ÎÊý½µÐòÅÅÐò¡£
// ¶Ô½µÐòÅÅÐòºóµÄÊý¾Ý£¬È¡³ötop 1»òÕßtop 100µÄÊý¾Ý£¬Ò²¾ÍÊÇkey×î¶àµÄǰn¸öÊý¾Ý¡£
// ¾ßÌåÈ¡³ö¶àÉÙ¸öÊý¾ÝÁ¿×î¶àµÄkey£¬ÓÉ´ó¼Ò×Ô¼º¾ö¶¨£¬ÎÒÃÇÕâÀï¾ÍÈ¡1¸ö×÷Ϊʾ·¶¡£
JavaPairRDD mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._1, 1L);
}
});
JavaPairRDD countedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairRDD reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// ´Órdd1Öзֲð³öµ¼ÖÂÊý¾ÝÇãбµÄkey£¬ÐγɶÀÁ¢µÄRDD¡£
JavaPairRDD skewedRDD = rdd1.filter(
new Function, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
});
// ´Órdd1Öзֲð³ö²»µ¼ÖÂÊý¾ÝÇãбµÄÆÕͨkey£¬ÐγɶÀÁ¢µÄRDD¡£
JavaPairRDD commonRDD = rdd1.filter(
new Function, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
}
});

// rdd2£¬¾ÍÊÇÄǸöËùÓÐkeyµÄ·Ö²¼Ïà¶Ô½ÏΪ¾ùÔȵÄrdd¡£
// ÕâÀォrdd2ÖУ¬Ç°Ãæ»ñÈ¡µ½µÄkey¶ÔÓ¦µÄÊý¾Ý£¬¹ýÂ˳öÀ´£¬·Ö²ð³Éµ¥¶ÀµÄrdd£¬²¢¶ÔrddÖеÄÊý¾ÝʹÓÃflatMapËã×Ó¶¼À©ÈÝ100±¶¡£
// ¶ÔÀ©ÈݵÄÿÌõÊý¾Ý£¬¶¼´òÉÏ0¡«100µÄǰ׺¡£
JavaPairRDD skewedRdd2 = rdd2.filter(
new Function, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable> call(
Tuple2 tuple) throws Exception {
Random random = new Random();
List> list = new ArrayList>();
for(int i = 0; i < 100; i++) { list.add(new Tuple2(i + "_" + tuple._1, tuple._2));
}
return list;
}

});

// ½«rdd1Öзֲð³öÀ´µÄµ¼ÖÂÇãбµÄkeyµÄ¶ÀÁ¢rdd£¬Ã¿ÌõÊý¾Ý¶¼´òÉÏ100ÒÔÄÚµÄËæ»úǰ׺¡£
// È»ºó½«Õâ¸ördd1Öзֲð³öÀ´µÄ¶ÀÁ¢rdd£¬ÓëÉÏÃærdd2Öзֲð³öÀ´µÄ¶ÀÁ¢rdd£¬½øÐÐjoin¡£
JavaPairRDD> joinedRDD1 = skewedRDD.mapToPair(
new PairFunction, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2(prefix + "_" + tuple._1, tuple._2);
}
})
.join(skewedUserid2infoRDD)
.mapToPair(new PairFunction>, Long, Tuple2>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2> call(
Tuple2> tuple)
throws Exception {
long key = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2>(key, tuple._2);
}
});

// ½«rdd1Öзֲð³öÀ´µÄ°üº¬ÆÕͨkeyµÄ¶ÀÁ¢rdd£¬Ö±½ÓÓërdd2½øÐÐjoin¡£
JavaPairRDD> joinedRDD2 = commonRDD.join(rdd2);

// ½«Çãбkey joinºóµÄ½á¹ûÓëÆÕͨkey joinºóµÄ½á¹û£¬uinonÆðÀ´¡£
// ¾ÍÊÇ×îÖÕµÄjoin½á¹û¡£
JavaPairRDD> joinedRDD = joinedRDD1.union(joinedRDD2);

½â¾ö·½°¸Æß£ºÊ¹ÓÃËæ»úǰ׺ºÍÀ©ÈÝRDD½øÐÐjoin

·½°¸ÊÊÓó¡¾°£ºÈç¹ûÔÚ½øÐÐjoin²Ù×÷ʱ£¬RDDÖÐÓдóÁ¿µÄkeyµ¼ÖÂÊý¾ÝÇãб£¬ÄÇô½øÐзֲðkeyҲûʲôÒâÒ壬´Ëʱ¾ÍÖ»ÄÜʹÓÃ×îºóÒ»ÖÖ·½°¸À´½â¾öÎÊÌâÁË¡£

·½°¸ÊµÏÖ˼·£º

1¡¢¸Ã·½°¸µÄʵÏÖ˼·»ù±¾ºÍ¡°½â¾ö·½°¸Áù¡±ÀàËÆ£¬Ê×ÏȲ鿴RDD/Hive±íÖеÄÊý¾Ý·Ö²¼Çé¿ö£¬ÕÒµ½ÄǸöÔì³ÉÊý¾ÝÇãбµÄRDD/Hive±í£¬±ÈÈçÓжà¸ökey¶¼¶ÔÓ¦Á˳¬¹ý1ÍòÌõÊý¾Ý¡£

2¡¢È»ºó½«¸ÃRDDµÄÿÌõÊý¾Ý¶¼´òÉÏÒ»¸önÒÔÄÚµÄËæ»úǰ׺¡£

3¡¢Í¬Ê±¶ÔÁíÍâÒ»¸öÕý³£µÄRDD½øÐÐÀ©ÈÝ£¬½«Ã¿ÌõÊý¾Ý¶¼À©ÈݳÉnÌõÊý¾Ý£¬À©ÈݳöÀ´µÄÿÌõÊý¾Ý¶¼ÒÀ´Î´òÉÏÒ»¸ö0~nµÄǰ׺¡£

4¡¢×îºó½«Á½¸ö´¦ÀíºóµÄRDD½øÐÐjoin¼´¿É¡£

·½°¸ÊµÏÖÔ­Àí£º½«Ô­ÏÈÒ»ÑùµÄkeyͨ¹ý¸½¼ÓËæ»úǰ׺±ä³É²»Ò»ÑùµÄkey£¬È»ºó¾Í¿ÉÒÔ½«ÕâЩ´¦ÀíºóµÄ¡°²»Í¬key¡±·ÖÉ¢µ½¶à¸ötaskÖÐÈ¥´¦Àí£¬¶ø²»ÊÇÈÃÒ»¸ötask´¦Àí´óÁ¿µÄÏàͬkey¡£¸Ã·½°¸Óë¡°½â¾ö·½°¸Áù¡±µÄ²»Í¬Ö®´¦¾ÍÔÚÓÚ£¬ÉÏÒ»ÖÖ·½°¸ÊǾ¡Á¿Ö»¶ÔÉÙÊýÇãбkey¶ÔÓ¦µÄÊý¾Ý½øÐÐÌØÊâ´¦Àí£¬ÓÉÓÚ´¦Àí¹ý³ÌÐèÒªÀ©ÈÝRDD£¬Òò´ËÉÏÒ»ÖÖ·½°¸À©ÈÝRDDºó¶ÔÄÚ´æµÄÕ¼Óò¢²»´ó£»¶øÕâÒ»ÖÖ·½°¸ÊÇÕë¶ÔÓдóÁ¿ÇãбkeyµÄÇé¿ö£¬Ã»·¨½«²¿·Ökey²ð·Ö³öÀ´½øÐе¥¶À´¦Àí£¬Òò´ËÖ»ÄܶÔÕû¸öRDD½øÐÐÊý¾ÝÀ©ÈÝ£¬¶ÔÄÚ´æ×ÊÔ´ÒªÇóºÜ¸ß¡£

·½°¸Óŵ㣺¶ÔjoinÀàÐ͵ÄÊý¾ÝÇãб»ù±¾¶¼¿ÉÒÔ´¦Àí£¬¶øÇÒЧ¹ûÒ²Ïà¶Ô±È½ÏÏÔÖø£¬ÐÔÄÜÌáÉýЧ¹û·Ç³£²»´í¡£

·½°¸È±µã£º¸Ã·½°¸¸ü¶àµÄÊÇ»º½âÊý¾ÝÇãб£¬¶ø²»Êdz¹µ×±ÜÃâÊý¾ÝÇãб¡£¶øÇÒÐèÒª¶ÔÕû¸öRDD½øÐÐÀ©ÈÝ£¬¶ÔÄÚ´æ×ÊÔ´ÒªÇóºÜ¸ß¡£

·½°¸Êµ¼ù¾­Ñé£ºÔø¾­¿ª·¢Ò»¸öÊý¾ÝÐèÇóµÄʱºò£¬·¢ÏÖÒ»¸öjoinµ¼ÖÂÁËÊý¾ÝÇãб¡£ÓÅ»¯Ö®Ç°£¬×÷ÒµµÄÖ´ÐÐʱ¼ä´óÔ¼ÊÇ60·ÖÖÓ×óÓÒ£»Ê¹Óø÷½°¸ÓÅ»¯Ö®ºó£¬Ö´ÐÐʱ¼äËõ¶Ìµ½10·ÖÖÓ×óÓÒ£¬ÐÔÄÜÌáÉýÁË6±¶¡£

// Ê×ÏȽ«ÆäÖÐÒ»¸ökey·Ö²¼Ïà¶Ô½ÏΪ¾ùÔȵÄRDDÅòÕÍ100±¶¡£
JavaPairRDD expandedRDD = rdd1.flatMapToPair(
new PairFlatMapFunction, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable> call(Tuple2 tuple)
throws Exception {
List> list = new ArrayList>();
for(int i = 0; i < 100; i++) { list.add(new Tuple2(0 + "_" + tuple._1, tuple._2));
}
return list;
}
});

// Æä´Î£¬½«ÁíÒ»¸öÓÐÊý¾ÝÇãбkeyµÄRDD£¬Ã¿ÌõÊý¾Ý¶¼´òÉÏ100ÒÔÄÚµÄËæ»úǰ׺¡£
JavaPairRDD mappedRDD = rdd2.mapToPair(
new PairFunction, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2(prefix + "_" + tuple._1, tuple._2);
}
});

// ½«Á½¸ö´¦ÀíºóµÄRDD½øÐÐjoin¼´¿É¡£
JavaPairRDD> joinedRDD = mappedRDD.join(expandedRDD);

½â¾ö·½°¸°Ë£º¶àÖÖ·½°¸×éºÏʹÓÃ

ÔÚʵ¼ùÖз¢ÏÖ£¬ºÜ¶àÇé¿öÏ£¬Èç¹ûÖ»ÊÇ´¦Àí½ÏΪ¼òµ¥µÄÊý¾ÝÇãб³¡¾°£¬ÄÇôʹÓÃÉÏÊö·½°¸ÖеÄijһÖÖ»ù±¾¾Í¿ÉÒÔ½â¾ö¡£µ«ÊÇÈç¹ûÒª´¦ÀíÒ»¸ö½ÏΪ¸´ÔÓµÄÊý¾ÝÇãб³¡¾°£¬ÄÇô¿ÉÄÜÐèÒª½«¶àÖÖ·½°¸×éºÏÆðÀ´Ê¹ÓᣱÈÈç˵£¬ÎÒÃÇÕë¶Ô³öÏÖÁ˶à¸öÊý¾ÝÇãб»·½ÚµÄSpark×÷Òµ£¬¿ÉÒÔÏÈÔËÓýâ¾ö·½°¸Ò»ºÍ¶þ£¬Ô¤´¦ÀíÒ»²¿·ÖÊý¾Ý£¬²¢¹ýÂËÒ»²¿·ÖÊý¾ÝÀ´»º½â£»Æä´Î¿ÉÒÔ¶ÔijЩshuffle²Ù×÷ÌáÉý²¢Ðжȣ¬ÓÅ»¯ÆäÐÔÄÜ£»×îºó»¹¿ÉÒÔÕë¶Ô²»Í¬µÄ¾ÛºÏ»òjoin²Ù×÷£¬Ñ¡ÔñÒ»ÖÖ·½°¸À´ÓÅ»¯ÆäÐÔÄÜ¡£´ó¼ÒÐèÒª¶ÔÕâЩ·½°¸µÄ˼·ºÍÔ­Àí¶¼Í¸³¹Àí½âÖ®ºó£¬ÔÚʵ¼ùÖиù¾Ý¸÷ÖÖ²»Í¬µÄÇé¿ö£¬Áé»îÔËÓöàÖÖ·½°¸£¬À´½â¾ö×Ô¼ºµÄÊý¾ÝÇãбÎÊÌâ¡£

   
1872 ´Îä¯ÀÀ       29
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

APPÍÆ¹ãÖ®ÇÉÓù¤¾ß½øÐÐÊý¾Ý·ÖÎö
Hadoop Hive»ù´¡sqlÓï·¨
Ó¦Óö༶»º´æÄ£Ê½Ö§³Åº£Á¿¶Á·þÎñ
HBase ³¬Ïêϸ½éÉÜ
HBase¼¼ÊõÏêϸ½éÉÜ
Spark¶¯Ì¬×ÊÔ´·ÖÅä

HadoopÓëSpark´óÊý¾Ý¼Ü¹¹
HadoopÔ­ÀíÓë¸ß¼¶Êµ¼ù
HadoopÔ­Àí¡¢Ó¦ÓÃÓëÓÅ»¯
´óÊý¾ÝÌåϵ¿ò¼ÜÓëÓ¦ÓÃ
´óÊý¾ÝµÄ¼¼ÊõÓëʵ¼ù
Spark´óÊý¾Ý´¦Àí¼¼Êõ

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí