µ÷ÓŸÅÊö
ÓеÄʱºò£¬ÎÒÃÇ¿ÉÄÜ»áÓöµ½´óÊý¾Ý¼ÆËãÖÐÒ»¸ö×ÊÖµÄÎÊÌ⡪¡ªÊý¾ÝÇãб£¬´ËʱSpark×÷ÒµµÄÐÔÄÜ»á±ÈÆÚÍû²îºÜ¶à¡£Êý¾ÝÇãбµ÷ÓÅ£¬¾ÍÊÇʹÓø÷ÖÖ¼¼Êõ·½°¸½â¾ö²»Í¬ÀàÐ͵ÄÊý¾ÝÇãбÎÊÌ⣬ÒÔ±£Ö¤Spark×÷ÒµµÄÐÔÄÜ¡£
Êý¾ÝÇãбÊǶàôʹ£¿£¡£¡£¡
Èç¹ûÊý¾ÝÇãбûÓнâ¾ö£¬ÍêȫûÓпÉÄܽøÐÐÐÔÄܵ÷ÓÅ£¬ÆäËûËùÓеĵ÷ÓÅÊֶζ¼ÊÇÒ»¸öЦ»°¡£Êý¾ÝÇãбÊÇ×îÄÜÌåÏÖÒ»¸öspark´óÊý¾Ý¹¤³ÌʦˮƽµÄÐÔÄܵ÷ÓÅÎÊÌâ¡£
Êý¾ÝÇãбÈç¹ûÄܹ»½â¾öµÄ»°£¬´ú±í¶ÔsparkÔËÐлúÖÆÁËÈçÖ¸ÕÆ¡£
Êý¾ÝÇãбÁ©´óÖ±½ÓÖÂÃüºó¹û¡£
1 Êý¾ÝÇãбֱ½Ó»áµ¼ÖÂÒ»ÖÖÇé¿ö£ºOOM¡£
2 ÔËÐÐËÙ¶ÈÂý,ÌØ±ðÂý£¬·Ç³£Âý£¬¼«¶ËµÄÂý£¬²»¿É½ÓÊܵÄÂý¡£

ÎÒÃÇÒÔ100ÒÚÌõÊý¾ÝΪÁÐ×Ó¡£
¸ö±ðTask(80ÒÚÌõÊý¾ÝµÄÄǸöTask)´¦Àí¹ý¶È´óÁ¿Êý¾Ý¡£µ¼ÖÂÍÏÂýÁËÕû¸öJobµÄÖ´ÐÐʱ¼ä¡£Õâ¿ÉÄܵ¼Ö¸ÃTaskËùÔڵĻúÆ÷OOM,»òÕßÔËÐÐËٶȷdz£Âý¡£
Êý¾ÝÇãбµÄÔÒò£º
ÔÚShuffle½×¶Î¡£Í¬ÑùKeyµÄÊý¾ÝÌõÊýÌ«¶àÁË¡£µ¼ÖÂÁËij¸ökey(ÉÏͼÖеÄ80ÒÚÌõ)ËùÔÚµÄTaskÊý¾ÝÁ¿Ì«´óÁË¡£Ô¶Ô¶³¬¹ýÆäËûTaskËù´¦ÀíµÄÊý¾ÝÁ¿¡£
¶øÕâÑùµÄ³¡¾°Ì«³£¼ûÁË¡£¶þ°Ë¶¨ÂÉ¿ÉÒÔ֤ʵÕâÖÖ³¡¾°¡£
¸ã¶¨Êý¾ÝÇãбÐèÒª£º
1 ¸ã¶¨shuffle
2 ¸ã¶¨ÒµÎñ³¡¾°
3 ¸ã¶¨ cpu coreµÄʹÓÃÇé¿ö
4 ¸ã¶¨OOMµÄ¸ù±¾ÔÒòµÈ¡£
ËùÒԸ㶨ÁËÊý¾ÝÇãбÐèÒª¶ÔÖÁÉÙÒÔÉϵÄÔÀíÁËÈçÖ¸ÕÆ¡£ËùÒԸ㶨Êý¾ÝÇãбÊǹؼüÖеĹؼü¡£
¸æËß´ó¼ÒÒ»¸öÂÅÊÔ²»Ë¬µÄ¾Ñé½áÂÛ£ºÒ»°ãÇé¿öÏ£¬OOMµÄÔÒò¶¼ÊÇÊý¾ÝÇãб¡£Ä³¸ötaskÈÎÎñÊý¾ÝÁ¿Ì«´ó£¬GCµÄѹÁ¦¾ÍºÜ´ó¡£Õâ±È²»ÁËKafka,ÒòΪkafkaµÄÄÚ´æÊDz»¾¹ýJVMµÄ¡£ÊÇ»ùÓÚLinuxÄں˵ÄPage.
Êý¾ÝÇãб·¢ÉúµÄÔÀí
Êý¾ÝÇãбµÄÔÀíºÜ¼òµ¥£ºÔÚ½øÐÐshuffleµÄʱºò£¬±ØÐ뽫¸÷¸ö½ÚµãÉÏÏàͬµÄkeyÀÈ¡µ½Ä³¸ö½ÚµãÉϵÄÒ»¸ötaskÀ´½øÐд¦Àí£¬±ÈÈç°´ÕÕkey½øÐоۺϻòjoinµÈ²Ù×÷¡£´ËʱÈç¹ûij¸ökey¶ÔÓ¦µÄÊý¾ÝÁ¿Ìرð´óµÄ»°£¬¾Í»á·¢ÉúÊý¾ÝÇãб¡£±ÈÈç´ó²¿·Ökey¶ÔÓ¦10ÌõÊý¾Ý£¬µ«ÊǸö±ðkeyÈ´¶ÔÓ¦ÁË100ÍòÌõÊý¾Ý£¬ÄÇô´ó²¿·Ötask¿ÉÄܾÍÖ»»á·ÖÅäµ½10ÌõÊý¾Ý£¬È»ºó1ÃëÖÓ¾ÍÔËÐÐÍêÁË£»µ«ÊǸö±ðtask¿ÉÄÜ·ÖÅäµ½ÁË100ÍòÊý¾Ý£¬ÒªÔËÐÐÒ»Á½¸öСʱ¡£Òò´Ë£¬Õû¸öSpark×÷ÒµµÄÔËÐнø¶ÈÊÇÓÉÔËÐÐʱ¼ä×µÄÄǸötask¾ö¶¨µÄ¡£
Òò´Ë³öÏÖÊý¾ÝÇãбµÄʱºò£¬Spark×÷Òµ¿´ÆðÀ´»áÔËÐе÷dz£»ºÂý£¬ÉõÖÁ¿ÉÄÜÒòΪij¸ötask´¦ÀíµÄÊý¾ÝÁ¿¹ý´óµ¼ÖÂÄÚ´æÒç³ö¡£
ÏÂͼ¾ÍÊÇÒ»¸öºÜÇåÎúµÄÀý×Ó£ºhelloÕâ¸ökey£¬ÔÚÈý¸ö½ÚµãÉ϶ÔÓ¦ÁË×ܹ²7ÌõÊý¾Ý£¬ÕâЩÊý¾Ý¶¼»á±»ÀÈ¡µ½Í¬Ò»¸ötaskÖнøÐд¦Àí£»¶øworldºÍyouÕâÁ½¸ökey·Ö±ð²Å¶ÔÓ¦1ÌõÊý¾Ý£¬ËùÒÔÁíÍâÁ½¸ötaskÖ»Òª·Ö±ð´¦Àí1ÌõÊý¾Ý¼´¿É¡£´ËʱµÚÒ»¸ötaskµÄÔËÐÐʱ¼ä¿ÉÄÜÊÇÁíÍâÁ½¸ötaskµÄ7±¶£¬¶øÕû¸östageµÄÔËÐÐËÙ¶ÈÒ²ÓÉÔËÐÐ×îÂýµÄÄǸötaskËù¾ö¶¨¡£

ÈçºÎ¶¨Î»µ¼ÖÂÊý¾ÝÇãбµÄ´úÂë
Êý¾ÝÇãбֻ»á·¢ÉúÔÚshuffle¹ý³ÌÖС£ÕâÀï¸ø´ó¼ÒÂÞÁÐһЩ³£ÓõIJ¢ÇÒ¿ÉÄܻᴥ·¢shuffle²Ù×÷µÄËã×Ó£ºdistinct¡¢groupByKey¡¢reduceByKey¡¢aggregateByKey¡¢join¡¢cogroup¡¢repartitionµÈ¡£³öÏÖÊý¾ÝÇãбʱ£¬¿ÉÄܾÍÊÇÄãµÄ´úÂëÖÐʹÓÃÁËÕâЩËã×ÓÖеÄijһ¸öËùµ¼Öµġ£
ij¸ötaskÖ´ÐÐÌØ±ðÂýµÄÇé¿ö
Ê×ÏÈÒª¿´µÄ£¬¾ÍÊÇÊý¾ÝÇãб·¢ÉúÔÚµÚ¼¸¸östageÖС£
Èç¹ûÊÇÓÃyarn-clientģʽÌá½»£¬ÄÇô±¾µØÊÇÖ±½Ó¿ÉÒÔ¿´µ½logµÄ£¬¿ÉÒÔÔÚlogÖÐÕÒµ½µ±Ç°ÔËÐе½Á˵ڼ¸¸östage£»Èç¹ûÊÇÓÃyarn-clusterģʽÌá½»£¬Ôò¿ÉÒÔͨ¹ýSpark
Web UIÀ´²é¿´µ±Ç°ÔËÐе½Á˵ڼ¸¸östage¡£´ËÍ⣬ÎÞÂÛÊÇʹÓÃyarn-clientģʽ»¹ÊÇyarn-clusterģʽ£¬ÎÒÃǶ¼¿ÉÒÔÔÚSpark
Web UIÉÏÉîÈ뿴һϵ±Ç°Õâ¸östage¸÷¸ötask·ÖÅäµÄÊý¾ÝÁ¿£¬´Ó¶ø½øÒ»²½È·¶¨ÊDz»ÊÇtask·ÖÅäµÄÊý¾Ý²»¾ùÔȵ¼ÖÂÁËÊý¾ÝÇãб¡£
±ÈÈçÏÂͼÖУ¬µ¹ÊýµÚÈýÁÐÏÔʾÁËÿ¸ötaskµÄÔËÐÐʱ¼ä¡£Ã÷ÏÔ¿ÉÒÔ¿´µ½£¬ÓеÄtaskÔËÐÐÌØ±ð¿ì£¬Ö»ÐèÒª¼¸ÃëÖӾͿÉÒÔÔËÐÐÍꣻ¶øÓеÄtaskÔËÐÐÌØ±ðÂý£¬ÐèÒª¼¸·ÖÖÓ²ÅÄÜÔËÐÐÍ꣬´Ëʱµ¥´ÓÔËÐÐʱ¼äÉÏ¿´¾ÍÒѾÄܹ»È·¶¨·¢ÉúÊý¾ÝÇãбÁË¡£´ËÍ⣬µ¹ÊýµÚÒ»ÁÐÏÔʾÁËÿ¸ötask´¦ÀíµÄÊý¾ÝÁ¿£¬Ã÷ÏÔ¿ÉÒÔ¿´µ½£¬ÔËÐÐʱ¼äÌØ±ð¶ÌµÄtaskÖ»ÐèÒª´¦Àí¼¸°ÙKBµÄÊý¾Ý¼´¿É£¬¶øÔËÐÐʱ¼äÌØ±ð³¤µÄtaskÐèÒª´¦Àí¼¸Ç§KBµÄÊý¾Ý£¬´¦ÀíµÄÊý¾ÝÁ¿²îÁË10±¶¡£´Ëʱ¸ü¼ÓÄܹ»È·¶¨ÊÇ·¢ÉúÁËÊý¾ÝÇãб¡£

ÖªµÀÊý¾ÝÇãб·¢ÉúÔÚÄÄÒ»¸östageÖ®ºó£¬½Ó×ÅÎÒÃǾÍÐèÒª¸ù¾Ýstage»®·ÖÔÀí£¬ÍÆËã³öÀ´·¢ÉúÇãбµÄÄǸöstage¶ÔÓ¦´úÂëÖеÄÄÄÒ»²¿·Ö£¬Õⲿ·Ö´úÂëÖп϶¨»áÓÐÒ»¸öshuffleÀàËã×Ó¡£¾«×¼ÍÆËãstageÓë´úÂëµÄ¶ÔÓ¦¹ØÏµ£¬ÐèÒª¶ÔSparkµÄÔ´ÂëÓÐÉîÈëµÄÀí½â£¬ÕâÀïÎÒÃÇ¿ÉÒÔ½éÉÜÒ»¸öÏà¶Ô¼òµ¥ÊµÓõÄÍÆËã·½·¨£ºÖ»Òª¿´µ½Spark´úÂëÖгöÏÖÁËÒ»¸öshuffleÀàËã×Ó»òÕßÊÇSpark
SQLµÄSQLÓï¾äÖгöÏÖÁ˻ᵼÖÂshuffleµÄÓï¾ä£¨±ÈÈçgroup byÓï¾ä£©£¬ÄÇô¾Í¿ÉÒÔÅж¨£¬ÒÔÄǸöµØ·½Îª½çÏÞ»®·Ö³öÁËǰºóÁ½¸östage¡£
ÕâÀïÎÒÃǾÍÒÔSpark×î»ù´¡µÄÈëÃųÌÐò¡ª¡ªµ¥´Ê¼ÆÊýÀ´¾ÙÀý£¬ÈçºÎÓÃ×î¼òµ¥µÄ·½·¨´óÖÂÍÆËã³öÒ»¸östage¶ÔÓ¦µÄ´úÂë¡£ÈçÏÂʾÀý£¬ÔÚÕû¸ö´úÂëÖУ¬Ö»ÓÐÒ»¸öreduceByKeyÊǻᷢÉúshuffleµÄËã×Ó£¬Òò´Ë¾Í¿ÉÒÔÈÏΪ£¬ÒÔÕâ¸öËã×ÓΪ½çÏÞ£¬»á»®·Ö³öǰºóÁ½¸östage¡£
1¡¢stage0£¬Ö÷ÒªÊÇÖ´ÐдÓtextFileµ½map²Ù×÷£¬ÒÔ¼°Ö´ÐÐshuffle
write²Ù×÷¡£shuffle write²Ù×÷£¬ÎÒÃÇ¿ÉÒÔ¼òµ¥Àí½âΪ¶Ôpairs RDDÖеÄÊý¾Ý½øÐзÖÇø²Ù×÷£¬Ã¿¸ötask´¦ÀíµÄÊý¾ÝÖУ¬ÏàͬµÄkey»áдÈëͬһ¸ö´ÅÅÌÎļþÄÚ¡£
2¡¢stage1£¬Ö÷ÒªÊÇÖ´ÐдÓreduceByKeyµ½collect²Ù×÷£¬stage1µÄ¸÷¸ötaskÒ»¿ªÊ¼ÔËÐУ¬¾Í»áÊ×ÏÈÖ´ÐÐshuffle
read²Ù×÷¡£Ö´ÐÐshuffle read²Ù×÷µÄtask£¬»á´Óstage0µÄ¸÷¸ötaskËùÔÚ½ÚµãÀÈ¡ÊôÓÚ×Ô¼º´¦ÀíµÄÄÇЩkey£¬È»ºó¶Ôͬһ¸ökey½øÐÐÈ«¾ÖÐԵľۺϻòjoinµÈ²Ù×÷£¬ÔÚÕâÀï¾ÍÊǶÔkeyµÄvalueÖµ½øÐÐÀÛ¼Ó¡£stage1ÔÚÖ´ÐÐÍêreduceByKeyËã×ÓÖ®ºó£¬¾Í¼ÆËã³öÁË×îÖÕµÄwordCounts
RDD£¬È»ºó»áÖ´ÐÐcollectËã×Ó£¬½«ËùÓÐÊý¾ÝÀÈ¡µ½DriverÉÏ£¬¹©ÎÒÃDZéÀúºÍ´òÓ¡Êä³ö¡£
val conf = new SparkConf() val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.collect().foreach(println(_)) |
ͨ¹ý¶Ôµ¥´Ê¼ÆÊý³ÌÐòµÄ·ÖÎö£¬Ï£ÍûÄܹ»Èôó¼ÒÁ˽â×î»ù±¾µÄstage»®·ÖµÄÔÀí£¬ÒÔ¼°stage»®·Öºóshuffle²Ù×÷ÊÇÈçºÎÔÚÁ½¸östageµÄ±ß½ç´¦Ö´Ðеġ£È»ºóÎÒÃǾÍÖªµÀÈçºÎ¿ìËÙ¶¨Î»³ö·¢ÉúÊý¾ÝÇãбµÄstage¶ÔÓ¦´úÂëµÄÄÄÒ»¸ö²¿·ÖÁË¡£±ÈÈçÎÒÃÇÔÚSpark
Web UI»òÕß±¾µØlogÖз¢ÏÖ£¬stage1µÄij¼¸¸ötaskÖ´ÐеÃÌØ±ðÂý£¬Åж¨stage1³öÏÖÁËÊý¾ÝÇãб£¬ÄÇô¾Í¿ÉÒԻص½´úÂëÖж¨Î»³östage1Ö÷Òª°üÀ¨ÁËreduceByKeyÕâ¸öshuffleÀàËã×Ó£¬´Ëʱ»ù±¾¾Í¿ÉÒÔÈ·¶¨ÊÇÓÉeduceByKeyËã×Óµ¼ÖµÄÊý¾ÝÇãбÎÊÌâ¡£±ÈÈçij¸öµ¥´Ê³öÏÖÁË100Íò´Î£¬ÆäËûµ¥´Ê²Å³öÏÖ10´Î£¬ÄÇôstage1µÄij¸ötask¾ÍÒª´¦Àí100ÍòÊý¾Ý£¬Õû¸östageµÄËٶȾͻᱻÕâ¸ötaskÍÏÂý¡£
Êý¾ÝÇãбµÄ½â¾ö·½°¸
½â¾ö·½°¸Ò»£ºÊ¹ÓÃHive ETLÔ¤´¦ÀíÊý¾Ý
·½°¸ÊÊÓó¡¾°£ºµ¼ÖÂÊý¾ÝÇãбµÄÊÇHive±í¡£Èç¹û¸ÃHive±íÖеÄÊý¾Ý±¾ÉíºÜ²»¾ùÔÈ£¨±ÈÈçij¸ökey¶ÔÓ¦ÁË100ÍòÊý¾Ý£¬ÆäËûkey²Å¶ÔÓ¦ÁË10ÌõÊý¾Ý£©£¬¶øÇÒÒµÎñ³¡¾°ÐèҪƵ·±Ê¹ÓÃSpark¶ÔHive±íÖ´ÐÐij¸ö·ÖÎö²Ù×÷£¬ÄÇô±È½ÏÊʺÏʹÓÃÕâÖÖ¼¼Êõ·½°¸¡£
·½°¸ÊµÏÖ˼·£º´Ëʱ¿ÉÒÔÆÀ¹Àһϣ¬ÊÇ·ñ¿ÉÒÔͨ¹ýHiveÀ´½øÐÐÊý¾ÝÔ¤´¦Àí£¨¼´Í¨¹ýHive
ETLÔ¤ÏȶÔÊý¾Ý°´ÕÕkey½øÐоۺϣ¬»òÕßÊÇÔ¤ÏÈºÍÆäËû±í½øÐÐjoin£©£¬È»ºóÔÚSpark×÷ÒµÖÐÕë¶ÔµÄÊý¾ÝÔ´¾Í²»ÊÇÔÀ´µÄHive±íÁË£¬¶øÊÇÔ¤´¦ÀíºóµÄHive±í¡£´ËʱÓÉÓÚÊý¾ÝÒѾԤÏȽøÐйý¾ÛºÏ»òjoin²Ù×÷ÁË£¬ÄÇôÔÚSpark×÷ÒµÖÐÒ²¾Í²»ÐèҪʹÓÃÔÏȵÄshuffleÀàËã×ÓÖ´ÐÐÕâÀà²Ù×÷ÁË¡£
·½°¸ÊµÏÖÔÀí£ºÕâÖÖ·½°¸´Ó¸ùÔ´ÉϽâ¾öÁËÊý¾ÝÇãб£¬ÒòΪ³¹µ×±ÜÃâÁËÔÚSparkÖÐÖ´ÐÐshuffleÀàËã×Ó£¬ÄÇô¿Ï¶¨¾Í²»»áÓÐÊý¾ÝÇãбµÄÎÊÌâÁË¡£µ«ÊÇÕâÀïÒ²ÒªÌáÐÑһϴó¼Ò£¬ÕâÖÖ·½Ê½ÊôÓÚÖα겻Öα¾¡£ÒòΪ±Ï¾¹Êý¾Ý±¾Éí¾Í´æÔÚ·Ö²¼²»¾ùÔȵÄÎÊÌ⣬ËùÒÔHive
ETLÖнøÐÐgroup by»òÕßjoinµÈshuffle²Ù×÷ʱ£¬»¹ÊÇ»á³öÏÖÊý¾ÝÇãб£¬µ¼ÖÂHive ETLµÄËٶȺÜÂý¡£ÎÒÃÇÖ»ÊǰÑÊý¾ÝÇãбµÄ·¢ÉúÌáǰµ½ÁËHive
ETLÖУ¬±ÜÃâSpark³ÌÐò·¢ÉúÊý¾ÝÇãб¶øÒÑ¡£
·½°¸Óŵ㣺ʵÏÖÆðÀ´¼òµ¥±ã½Ý£¬Ð§¹û»¹·Ç³£ºÃ£¬ÍêÈ«¹æ±ÜµôÁËÊý¾ÝÇãб£¬Spark×÷ÒµµÄÐÔÄÜ»á´ó·ù¶ÈÌáÉý¡£
·½°¸È±µã£ºÖα겻Öα¾£¬Hive ETLÖл¹ÊǻᷢÉúÊý¾ÝÇãб¡£
·½°¸Êµ¼ù¾Ñ飺ÔÚһЩJavaϵͳÓëSpark½áºÏʹÓõÄÏîÄ¿ÖУ¬»á³öÏÖJava´úÂëÆµ·±µ÷ÓÃSpark×÷ÒµµÄ³¡¾°£¬¶øÇÒ¶ÔSpark×÷ÒµµÄÖ´ÐÐÐÔÄÜÒªÇóºÜ¸ß£¬¾Í±È½ÏÊʺÏʹÓÃÕâÖÖ·½°¸¡£½«Êý¾ÝÇãбÌáǰµ½ÉÏÓεÄHive
ETL£¬Ã¿Ìì½öÖ´ÐÐÒ»´Î£¬Ö»ÓÐÄÇÒ»´ÎÊDZȽÏÂýµÄ£¬¶øÖ®ºóÿ´ÎJavaµ÷ÓÃSpark×÷ҵʱ£¬Ö´ÐÐËٶȶ¼»áºÜ¿ì£¬Äܹ»Ìṩ¸üºÃµÄÓû§ÌåÑé¡£
ÏîĿʵ¼ù¾Ñ飺ÔÚÃÀÍÅ¡¤µãÆÀµÄ½»»¥Ê½Óû§ÐÐΪ·ÖÎöϵͳÖÐʹÓÃÁËÕâÖÖ·½°¸£¬¸ÃϵͳÖ÷ÒªÊÇÔÊÐíÓû§Í¨¹ýJava
WebϵͳÌá½»Êý¾Ý·ÖÎöͳ¼ÆÈÎÎñ£¬ºó¶Ëͨ¹ýJavaÌá½»Spark×÷Òµ½øÐÐÊý¾Ý·ÖÎöͳ¼Æ¡£ÒªÇóSpark×÷ÒµËٶȱØÐëÒª¿ì£¬¾¡Á¿ÔÚ10·ÖÖÓÒÔÄÚ£¬·ñÔòËÙ¶ÈÌ«Âý£¬Óû§ÌåÑé»áºÜ²î¡£ËùÒÔÎÒÃǽ«ÓÐЩSpark×÷ÒµµÄshuffle²Ù×÷Ìáǰµ½ÁËHive
ETLÖУ¬´Ó¶øÈÃSparkÖ±½ÓʹÓÃÔ¤´¦ÀíµÄHiveÖмä±í£¬¾¡¿ÉÄܵؼõÉÙSparkµÄshuffle²Ù×÷£¬´ó·ù¶ÈÌáÉýÁËÐÔÄÜ£¬½«²¿·Ö×÷ÒµµÄÐÔÄÜÌáÉýÁË6±¶ÒÔÉÏ¡£
½â¾ö·½°¸¶þ£º¹ýÂËÉÙÊýµ¼ÖÂÇãбµÄkey
·½°¸ÊÊÓó¡¾°£ºÈç¹û·¢ÏÖµ¼ÖÂÇãбµÄkey¾ÍÉÙÊý¼¸¸ö£¬¶øÇÒ¶Ô¼ÆËã±¾ÉíµÄÓ°Ïì²¢²»´óµÄ»°£¬ÄÇôºÜÊʺÏʹÓÃÕâÖÖ·½°¸¡£±ÈÈç99%µÄkey¾Í¶ÔÓ¦10ÌõÊý¾Ý£¬µ«ÊÇÖ»ÓÐÒ»¸ökey¶ÔÓ¦ÁË100ÍòÊý¾Ý£¬´Ó¶øµ¼ÖÂÁËÊý¾ÝÇãб¡£
·½°¸ÊµÏÖ˼·£ºÈç¹ûÎÒÃÇÅжÏÄÇÉÙÊý¼¸¸öÊý¾ÝÁ¿Ìرð¶àµÄkey£¬¶Ô×÷ÒµµÄÖ´ÐкͼÆËã½á¹û²»ÊÇÌØ±ðÖØÒªµÄ»°£¬ÄÇô¸É´à¾ÍÖ±½Ó¹ýÂ˵ôÄÇÉÙÊý¼¸¸ökey¡£±ÈÈ磬ÔÚSpark
SQLÖпÉÒÔʹÓÃwhere×Ó¾ä¹ýÂ˵ôÕâЩkey»òÕßÔÚSpark CoreÖжÔRDDÖ´ÐÐfilterËã×Ó¹ýÂ˵ôÕâЩkey¡£Èç¹ûÐèҪÿ´Î×÷ÒµÖ´ÐÐʱ£¬¶¯Ì¬Åж¨ÄÄЩkeyµÄÊý¾ÝÁ¿×î¶àÈ»ºóÔÙ½øÐйýÂË£¬ÄÇô¿ÉÒÔʹÓÃsampleËã×Ó¶ÔRDD½øÐвÉÑù£¬È»ºó¼ÆËã³öÿ¸ökeyµÄÊýÁ¿£¬È¡Êý¾ÝÁ¿×î¶àµÄkey¹ýÂ˵ô¼´¿É¡£
·½°¸ÊµÏÖÔÀí£º½«µ¼ÖÂÊý¾ÝÇãбµÄkey¸ø¹ýÂ˵ôÖ®ºó£¬ÕâЩkey¾Í²»»á²ÎÓë¼ÆËãÁË£¬×ÔÈ»²»¿ÉÄܲúÉúÊý¾ÝÇãб¡£
·½°¸Óŵ㣺ʵÏÖ¼òµ¥£¬¶øÇÒЧ¹ûÒ²ºÜºÃ£¬¿ÉÒÔÍêÈ«¹æ±ÜµôÊý¾ÝÇãб¡£
·½°¸È±µã£ºÊÊÓó¡¾°²»¶à£¬´ó¶àÊýÇé¿öÏ£¬µ¼ÖÂÇãбµÄkey»¹ÊǺܶàµÄ£¬²¢²»ÊÇÖ»ÓÐÉÙÊý¼¸¸ö¡£
·½°¸Êµ¼ù¾Ñ飺ÔÚÏîÄ¿ÖÐÎÒÃÇÒ²²ÉÓùýÕâÖÖ·½°¸½â¾öÊý¾ÝÇãб¡£ÓÐÒ»´Î·¢ÏÖijһÌìSpark×÷ÒµÔÚÔËÐеÄʱºòͻȻOOMÁË£¬×·²éÖ®ºó·¢ÏÖ£¬ÊÇHive±íÖеÄijһ¸ökeyÔÚÄÇÌìÊý¾ÝÒì³££¬µ¼ÖÂÊý¾ÝÁ¿±©Ôö¡£Òò´Ë¾Í²Éȡÿ´ÎÖ´ÐÐǰÏȽøÐвÉÑù£¬¼ÆËã³öÑù±¾ÖÐÊý¾ÝÁ¿×î´óµÄ¼¸¸ökeyÖ®ºó£¬Ö±½ÓÔÚ³ÌÐòÖн«ÄÇЩkey¸ø¹ýÂ˵ô¡£
½â¾ö·½°¸Èý£ºÌá¸ßshuffle²Ù×÷µÄ²¢ÐжÈ
·½°¸ÊÊÓó¡¾°£ºÈç¹ûÎÒÃDZØÐëÒª¶ÔÊý¾ÝÇãбÓÄѶøÉÏ£¬ÄÇô½¨ÒéÓÅÏÈʹÓÃÕâÖÖ·½°¸£¬ÒòΪÕâÊÇ´¦ÀíÊý¾ÝÇãб×î¼òµ¥µÄÒ»ÖÖ·½°¸¡£
·½°¸ÊµÏÖ˼·£ºÔÚ¶ÔRDDÖ´ÐÐshuffleËã×Óʱ£¬¸øshuffleËã×Ó´«ÈëÒ»¸ö²ÎÊý£¬±ÈÈçreduceByKey(1000)£¬¸Ã²ÎÊý¾ÍÉèÖÃÁËÕâ¸öshuffleËã×ÓÖ´ÐÐʱshuffle
read taskµÄÊýÁ¿¡£¶ÔÓÚSpark SQLÖеÄshuffleÀàÓï¾ä£¬±ÈÈçgroup by¡¢joinµÈ£¬ÐèÒªÉèÖÃÒ»¸ö²ÎÊý£¬¼´spark.sql.shuffle.partitions£¬¸Ã²ÎÊý´ú±íÁËshuffle
read taskµÄ²¢Ðжȣ¬¸ÃֵĬÈÏÊÇ200£¬¶ÔÓںܶೡ¾°À´Ëµ¶¼Óеã¹ýС¡£
·½°¸ÊµÏÖÔÀí£ºÔö¼Óshuffle read taskµÄÊýÁ¿£¬¿ÉÒÔÈÃÔ±¾·ÖÅä¸øÒ»¸ötaskµÄ¶à¸ökey·ÖÅ䏸¶à¸ötask£¬´Ó¶øÈÃÿ¸ötask´¦Àí±ÈÔÀ´¸üÉÙµÄÊý¾Ý¡£¾ÙÀýÀ´Ëµ£¬Èç¹ûÔ±¾ÓÐ5¸ökey£¬Ã¿¸ökey¶ÔÓ¦10ÌõÊý¾Ý£¬Õâ5¸ökey¶¼ÊÇ·ÖÅä¸øÒ»¸ötaskµÄ£¬ÄÇôÕâ¸ötask¾ÍÒª´¦Àí50ÌõÊý¾Ý¡£¶øÔö¼ÓÁËshuffle
read taskÒÔºó£¬Ã¿¸ötask¾Í·ÖÅäµ½Ò»¸ökey£¬¼´Ã¿¸ötask¾Í´¦Àí10ÌõÊý¾Ý£¬ÄÇô×ÔȻÿ¸ötaskµÄÖ´ÐÐʱ¼ä¶¼»á±ä¶ÌÁË¡£¾ßÌåÔÀíÈçÏÂͼËùʾ¡£
·½°¸Óŵ㣺ʵÏÖÆðÀ´±È½Ï¼òµ¥£¬¿ÉÒÔÓÐЧ»º½âºÍ¼õÇáÊý¾ÝÇãбµÄÓ°Ïì¡£
·½°¸È±µã£ºÖ»ÊÇ»º½âÁËÊý¾ÝÇãб¶øÒÑ£¬Ã»Óг¹µ×¸ù³ýÎÊÌ⣬¸ù¾Ýʵ¼ù¾ÑéÀ´¿´£¬ÆäЧ¹ûÓÐÏÞ¡£
·½°¸Êµ¼ù¾Ñ飺¸Ã·½°¸Í¨³£ÎÞ·¨³¹µ×½â¾öÊý¾ÝÇãб£¬ÒòΪÈç¹û³öÏÖһЩ¼«¶ËÇé¿ö£¬±ÈÈçij¸ökey¶ÔÓ¦µÄÊý¾ÝÁ¿ÓÐ100Íò£¬ÄÇôÎÞÂÛÄãµÄtaskÊýÁ¿Ôö¼Óµ½¶àÉÙ£¬Õâ¸ö¶ÔÓ¦×Å100ÍòÊý¾ÝµÄkey¿Ï¶¨»¹ÊÇ»á·ÖÅäµ½Ò»¸ötaskÖÐÈ¥´¦Àí£¬Òò´Ë×¢¶¨»¹ÊǻᷢÉúÊý¾ÝÇãбµÄ¡£ËùÒÔÕâÖÖ·½°¸Ö»ÄÜ˵ÊÇÔÚ·¢ÏÖÊý¾ÝÇãбʱ³¢ÊÔʹÓõĵÚÒ»ÖÖÊֶΣ¬³¢ÊÔÈ¥ÓÃ×ì¼òµ¥µÄ·½·¨»º½âÊý¾ÝÇãб¶øÒÑ£¬»òÕßÊÇºÍÆäËû·½°¸½áºÏÆðÀ´Ê¹Óá£

½â¾ö·½°¸ËÄ£ºÁ½½×¶Î¾ÛºÏ£¨¾Ö²¿¾ÛºÏ+È«¾Ö¾ÛºÏ£©
·½°¸ÊÊÓó¡¾°£º¶ÔRDDÖ´ÐÐreduceByKeyµÈ¾ÛºÏÀàshuffleËã×Ó»òÕßÔÚSpark
SQLÖÐʹÓÃgroup byÓï¾ä½øÐзÖ×é¾ÛºÏʱ£¬±È½ÏÊÊÓÃÕâÖÖ·½°¸¡£
·½°¸ÊµÏÖ˼·£ºÕâ¸ö·½°¸µÄºËÐÄʵÏÖ˼·¾ÍÊǽøÐÐÁ½½×¶Î¾ÛºÏ¡£µÚÒ»´ÎÊǾֲ¿¾ÛºÏ£¬ÏȸøÃ¿¸ökey¶¼´òÉÏÒ»¸öËæ»úÊý£¬±ÈÈç10ÒÔÄÚµÄËæ»úÊý£¬´ËʱÔÏÈÒ»ÑùµÄkey¾Í±ä³É²»Ò»ÑùµÄÁË£¬±ÈÈç(hello,
1) (hello, 1) (hello, 1) (hello, 1)£¬¾Í»á±ä³É(1_hello, 1)
(1_hello, 1) (2_hello, 1) (2_hello, 1)¡£½Ó×ŶԴòÉÏËæ»úÊýºóµÄÊý¾Ý£¬Ö´ÐÐreduceByKeyµÈ¾ÛºÏ²Ù×÷£¬½øÐоֲ¿¾ÛºÏ£¬ÄÇô¾Ö²¿¾ÛºÏ½á¹û£¬¾Í»á±ä³ÉÁË(1_hello,
2) (2_hello, 2)¡£È»ºó½«¸÷¸ökeyµÄǰ׺¸øÈ¥µô£¬¾Í»á±ä³É(hello,2)(hello,2)£¬ÔٴνøÐÐÈ«¾Ö¾ÛºÏ²Ù×÷£¬¾Í¿ÉÒԵõ½×îÖÕ½á¹ûÁË£¬±ÈÈç(hello,
4)¡£
·½°¸ÊµÏÖÔÀí£º½«Ô±¾ÏàͬµÄkeyͨ¹ý¸½¼ÓËæ»úǰ׺µÄ·½Ê½£¬±ä³É¶à¸ö²»Í¬µÄkey£¬¾Í¿ÉÒÔÈÃÔ±¾±»Ò»¸ötask´¦ÀíµÄÊý¾Ý·ÖÉ¢µ½¶à¸ötaskÉÏÈ¥×ö¾Ö²¿¾ÛºÏ£¬½ø¶ø½â¾öµ¥¸ötask´¦ÀíÊý¾ÝÁ¿¹ý¶àµÄÎÊÌâ¡£½Ó×ÅÈ¥³ýµôËæ»úǰ׺£¬ÔٴνøÐÐÈ«¾Ö¾ÛºÏ£¬¾Í¿ÉÒԵõ½×îÖյĽá¹û¡£¾ßÌåÔÀí¼ûÏÂͼ¡£
·½°¸Óŵ㣺¶ÔÓÚ¾ÛºÏÀàµÄshuffle²Ù×÷µ¼ÖµÄÊý¾ÝÇãб£¬Ð§¹ûÊǷdz£²»´íµÄ¡£Í¨³£¶¼¿ÉÒÔ½â¾öµôÊý¾ÝÇãб£¬»òÕßÖÁÉÙÊÇ´ó·ù¶È»º½âÊý¾ÝÇãб£¬½«Spark×÷ÒµµÄÐÔÄÜÌáÉýÊý±¶ÒÔÉÏ¡£
·½°¸È±µã£º½ö½öÊÊÓÃÓÚ¾ÛºÏÀàµÄshuffle²Ù×÷£¬ÊÊÓ÷¶Î§Ïà¶Ô½ÏÕ¡£Èç¹ûÊÇjoinÀàµÄshuffle²Ù×÷£¬»¹µÃÓÃÆäËûµÄ½â¾ö·½°¸¡£

// µÚÒ»²½£¬¸øRDDÖеÄÿ¸ökey¶¼´òÉÏÒ»¸öËæ»úǰ׺¡£ JavaPairRDD randomPrefixRdd = rdd.mapToPair( new PairFunction, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Tuple2 tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(10); return new Tuple2(prefix + "_" + tuple._1, tuple._2); } });
// µÚ¶þ²½£¬¶Ô´òÉÏËæ»úǰ׺µÄkey½øÐоֲ¿¾ÛºÏ¡£
JavaPairRDD localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception
{
return v1 + v2;
}
});
// µÚÈý²½£¬È¥³ýRDDÖÐÿ¸ökeyµÄËæ»úǰ׺¡£
JavaPairRDD removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2(originalKey, tuple._2);
}
});
// µÚËIJ½£¬¶ÔÈ¥³ýÁËËæ»úǰ׺µÄRDD½øÐÐÈ«¾Ö¾ÛºÏ¡£
JavaPairRDD globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception
{
return v1 + v2;
}
}); |
½â¾ö·½°¸Î壺½«reduce joinתΪmap join
·½°¸ÊÊÓó¡¾°£ºÔÚ¶ÔRDDʹÓÃjoinÀà²Ù×÷£¬»òÕßÊÇÔÚSpark SQLÖÐʹÓÃjoinÓï¾äʱ£¬¶øÇÒjoin²Ù×÷ÖеÄÒ»¸öRDD»ò±íµÄÊý¾ÝÁ¿±È½ÏС£¨±ÈÈ缸°ÙM»òÕßÒ»Á½G£©£¬±È½ÏÊÊÓô˷½°¸¡£
·½°¸ÊµÏÖ˼·£º²»Ê¹ÓÃjoinËã×Ó½øÐÐÁ¬½Ó²Ù×÷£¬¶øÊ¹ÓÃBroadcast±äÁ¿ÓëmapÀàËã×ÓʵÏÖjoin²Ù×÷£¬½ø¶øÍêÈ«¹æ±ÜµôshuffleÀàµÄ²Ù×÷£¬³¹µ×±ÜÃâÊý¾ÝÇãбµÄ·¢ÉúºÍ³öÏÖ¡£½«½ÏСRDDÖеÄÊý¾ÝÖ±½Óͨ¹ýcollectËã×ÓÀÈ¡µ½Driver¶ËµÄÄÚ´æÖÐÀ´£¬È»ºó¶ÔÆä´´½¨Ò»¸öBroadcast±äÁ¿£»½Ó×ŶÔÁíÍâÒ»¸öRDDÖ´ÐÐmapÀàËã×Ó£¬ÔÚËã×Óº¯ÊýÄÚ£¬´ÓBroadcast±äÁ¿ÖлñÈ¡½ÏСRDDµÄÈ«Á¿Êý¾Ý£¬Ó뵱ǰRDDµÄÿһÌõÊý¾Ý°´ÕÕÁ¬½Ókey½øÐбȶԣ¬Èç¹ûÁ¬½ÓkeyÏàͬµÄ»°£¬ÄÇô¾Í½«Á½¸öRDDµÄÊý¾ÝÓÃÄãÐèÒªµÄ·½Ê½Á¬½ÓÆðÀ´¡£
·½°¸ÊµÏÖÔÀí£ºÆÕͨµÄjoinÊÇ»á×ßshuffle¹ý³ÌµÄ£¬¶øÒ»µ©shuffle£¬¾ÍÏ൱ÓڻὫÏàͬkeyµÄÊý¾ÝÀÈ¡µ½Ò»¸öshuffle
read taskÖÐÔÙ½øÐÐjoin£¬´Ëʱ¾ÍÊÇreduce join¡£µ«ÊÇÈç¹ûÒ»¸öRDDÊDZȽÏСµÄ£¬Ôò¿ÉÒÔ²ÉÓù㲥СRDDÈ«Á¿Êý¾Ý+mapËã×ÓÀ´ÊµÏÖÓëjoinͬÑùµÄЧ¹û£¬Ò²¾ÍÊÇmap
join£¬´Ëʱ¾Í²»»á·¢Éúshuffle²Ù×÷£¬Ò²¾Í²»»á·¢ÉúÊý¾ÝÇãб¡£¾ßÌåÔÀíÈçÏÂͼËùʾ¡£
·½°¸Óŵ㣺¶Ôjoin²Ù×÷µ¼ÖµÄÊý¾ÝÇãб£¬Ð§¹û·Ç³£ºÃ£¬ÒòΪ¸ù±¾¾Í²»»á·¢Éúshuffle£¬Ò²¾Í¸ù±¾²»»á·¢ÉúÊý¾ÝÇãб¡£
·½°¸È±µã£ºÊÊÓó¡¾°½ÏÉÙ£¬ÒòΪÕâ¸ö·½°¸Ö»ÊÊÓÃÓÚÒ»¸ö´ó±íºÍÒ»¸öС±íµÄÇé¿ö¡£±Ï¾¹ÎÒÃÇÐèÒª½«Ð¡±í½øÐй㲥£¬´Ëʱ»á±È½ÏÏûºÄÄÚ´æ×ÊÔ´£¬driverºÍÿ¸öExecutorÄÚ´æÖж¼»áפÁôÒ»·ÝСRDDµÄÈ«Á¿Êý¾Ý¡£Èç¹ûÎÒÃǹ㲥³öÈ¥µÄRDDÊý¾Ý±È½Ï´ó£¬±ÈÈç10GÒÔÉÏ£¬ÄÇô¾Í¿ÉÄÜ·¢ÉúÄÚ´æÒç³öÁË¡£Òò´Ë²¢²»ÊʺÏÁ½¸ö¶¼ÊÇ´ó±íµÄÇé¿ö¡£

// Ê×ÏȽ«Êý¾ÝÁ¿±È½ÏСµÄRDDµÄÊý¾Ý£¬collectµ½DriverÖÐÀ´¡£ List> rdd1Data = rdd1.collect() // È»ºóʹÓÃSparkµÄ¹ã²¥¹¦ÄÜ£¬½«Ð¡RDDµÄÊý¾Ýת»»³É¹ã²¥±äÁ¿£¬ÕâÑùÿ¸öExecutor¾ÍÖ»ÓÐÒ»·ÝRDDµÄÊý¾Ý¡£ // ¿ÉÒÔ¾¡¿ÉÄܽÚÊ¡ÄÚ´æ¿Õ¼ä£¬²¢ÇÒ¼õÉÙÍøÂç´«ÊäÐÔÄÜ¿ªÏú¡£ final Broadcast>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// ¶ÔÁíÍâÒ»¸öRDDÖ´ÐÐmapÀà²Ù×÷£¬¶ø²»ÔÙÊÇjoinÀà²Ù×÷¡£
JavaPairRDD> joinedRdd = rdd2.mapToPair(
new PairFunction, String, Tuple2>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2> call(Tuple2 tuple)
throws Exception {
// ÔÚËã×Óº¯ÊýÖУ¬Í¨¹ý¹ã²¥±äÁ¿£¬»ñÈ¡µ½±¾µØExecutorÖеÄrdd1Êý¾Ý¡£
List> rdd1Data = rdd1DataBroadcast.value();
// ¿ÉÒÔ½«rdd1µÄÊý¾Ýת»»ÎªÒ»¸öMap£¬±ãÓÚºóÃæ½øÐÐjoin²Ù×÷¡£
Map rdd1DataMap = new HashMap();
for(Tuple2 data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// »ñÈ¡µ±Ç°RDDÊý¾ÝµÄkeyÒÔ¼°value¡£
String key = tuple._1;
String value = tuple._2;
// ´Órdd1Êý¾ÝMapÖУ¬¸ù¾Ýkey»ñÈ¡µ½¿ÉÒÔjoinµ½µÄÊý¾Ý¡£
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2(key, new Tuple2(value, rdd1Value));
}
});
// ÕâÀïµÃÌáʾһÏ¡£
// ÉÏÃæµÄ×ö·¨£¬½ö½öÊÊÓÃÓÚrdd1ÖеÄkeyûÓÐÖØ¸´£¬È«²¿ÊÇΨһµÄ³¡¾°¡£
// Èç¹ûrdd1ÖÐÓжà¸öÏàͬµÄkey£¬ÄÇô¾ÍµÃÓÃflatMapÀàµÄ²Ù×÷£¬ÔÚ½øÐÐjoinµÄʱºò²»ÄÜÓÃmap£¬¶øÊǵñéÀúrdd1ËùÓÐÊý¾Ý½øÐÐjoin¡£
// rdd2ÖÐÿÌõÊý¾Ý¶¼¿ÉÄܻ᷵»Ø¶àÌõjoinºóµÄÊý¾Ý¡£ |
½â¾ö·½°¸Áù£º²ÉÑùÇãбkey²¢·Ö²ðjoin²Ù×÷
·½°¸ÊÊÓó¡¾°£ºÁ½¸öRDD/Hive±í½øÐÐjoinµÄʱºò£¬Èç¹ûÊý¾ÝÁ¿¶¼±È½Ï´ó£¬ÎÞ·¨²ÉÓá°½â¾ö·½°¸Î塱£¬ÄÇô´Ëʱ¿ÉÒÔ¿´Ò»ÏÂÁ½¸öRDD/Hive±íÖеÄkey·Ö²¼Çé¿ö¡£Èç¹û³öÏÖÊý¾ÝÇãб£¬ÊÇÒòΪÆäÖÐijһ¸öRDD/Hive±íÖеÄÉÙÊý¼¸¸ökeyµÄÊý¾ÝÁ¿¹ý´ó£¬¶øÁíÒ»¸öRDD/Hive±íÖеÄËùÓÐkey¶¼·Ö²¼±È½Ï¾ùÔÈ£¬ÄÇô²ÉÓÃÕâ¸ö½â¾ö·½°¸ÊDZȽϺÏÊʵġ£
·½°¸ÊµÏÖ˼·£º
1¡¢¶Ô°üº¬ÉÙÊý¼¸¸öÊý¾ÝÁ¿¹ý´óµÄkeyµÄÄǸöRDD£¬Í¨¹ýsampleËã×Ó²ÉÑù³öÒ»·ÝÑù±¾À´£¬È»ºóͳ¼ÆÒ»ÏÂÿ¸ökeyµÄÊýÁ¿£¬¼ÆËã³öÀ´Êý¾ÝÁ¿×î´óµÄÊÇÄökey¡£
2¡¢È»ºó½«Õ⼸¸ökey¶ÔÓ¦µÄÊý¾Ý´ÓÔÀ´µÄRDDÖвð·Ö³öÀ´£¬ÐγÉÒ»¸öµ¥¶ÀµÄRDD£¬²¢¸øÃ¿¸ökey¶¼´òÉÏnÒÔÄÚµÄËæ»úÊý×÷Ϊǰ׺£¬¶ø²»»áµ¼ÖÂÇãбµÄ´ó²¿·ÖkeyÐγÉÁíÍâÒ»¸öRDD¡£
3¡¢½Ó׎«ÐèÒªjoinµÄÁíÒ»¸öRDD£¬Ò²¹ýÂ˳öÀ´ÄǼ¸¸öÇãбkey¶ÔÓ¦µÄÊý¾Ý²¢ÐγÉÒ»¸öµ¥¶ÀµÄRDD£¬½«Ã¿ÌõÊý¾ÝÅòÕͳÉnÌõÊý¾Ý£¬ÕânÌõÊý¾Ý¶¼°´Ë³Ðò¸½¼ÓÒ»¸ö0~nµÄǰ׺£¬²»»áµ¼ÖÂÇãбµÄ´ó²¿·ÖkeyÒ²ÐγÉÁíÍâÒ»¸öRDD¡£
4¡¢ÔÙ½«¸½¼ÓÁËËæ»úǰ׺µÄ¶ÀÁ¢RDDÓëÁíÒ»¸öÅòÕÍn±¶µÄ¶ÀÁ¢RDD½øÐÐjoin£¬´Ëʱ¾Í¿ÉÒÔ½«ÔÏÈÏàͬµÄkey´òÉ¢³Én·Ý£¬·ÖÉ¢µ½¶à¸ötaskÖÐÈ¥½øÐÐjoinÁË¡£
5¡¢¶øÁíÍâÁ½¸öÆÕͨµÄRDD¾ÍÕÕ³£join¼´¿É¡£
6¡¢×îºó½«Á½´ÎjoinµÄ½á¹ûʹÓÃunionËã×Ӻϲ¢ÆðÀ´¼´¿É£¬¾ÍÊÇ×îÖÕµÄjoin½á¹û¡£
·½°¸ÊµÏÖÔÀí£º¶ÔÓÚjoinµ¼ÖµÄÊý¾ÝÇãб£¬Èç¹ûÖ»ÊÇij¼¸¸ökeyµ¼ÖÂÁËÇãб£¬¿ÉÒÔ½«ÉÙÊý¼¸¸ökey·Ö²ð³É¶ÀÁ¢RDD£¬²¢¸½¼ÓËæ»úǰ׺´òÉ¢³Én·ÝÈ¥½øÐÐjoin£¬´ËʱÕ⼸¸ökey¶ÔÓ¦µÄÊý¾Ý¾Í²»»á¼¯ÖÐÔÚÉÙÊý¼¸¸ötaskÉÏ£¬¶øÊÇ·ÖÉ¢µ½¶à¸ötask½øÐÐjoinÁË¡£¾ßÌåÔÀí¼ûÏÂͼ¡£
·½°¸Óŵ㣺¶ÔÓÚjoinµ¼ÖµÄÊý¾ÝÇãб£¬Èç¹ûÖ»ÊÇij¼¸¸ökeyµ¼ÖÂÁËÇãб£¬²ÉÓø÷½Ê½¿ÉÒÔÓÃ×îÓÐЧµÄ·½Ê½´òÉ¢key½øÐÐjoin¡£¶øÇÒÖ»ÐèÒªÕë¶ÔÉÙÊýÇãбkey¶ÔÓ¦µÄÊý¾Ý½øÐÐÀ©ÈÝn±¶£¬²»ÐèÒª¶ÔÈ«Á¿Êý¾Ý½øÐÐÀ©ÈÝ¡£±ÜÃâÁËÕ¼Óùý¶àÄÚ´æ¡£
·½°¸È±µã£ºÈç¹ûµ¼ÖÂÇãбµÄkeyÌØ±ð¶àµÄ»°£¬±ÈÈç³ÉǧÉÏÍò¸ökey¶¼µ¼ÖÂÊý¾ÝÇãб£¬ÄÇôÕâÖÖ·½Ê½Ò²²»Êʺϡ£

// Ê×ÏÈ´Ó°üº¬ÁËÉÙÊý¼¸¸öµ¼ÖÂÊý¾ÝÇãбkeyµÄrdd1ÖУ¬²ÉÑù10%µÄÑù±¾Êý¾Ý¡£ JavaPairRDD sampledRDD = rdd1.sample(false, 0.1);
// ¶ÔÑù±¾Êý¾ÝRDDͳ¼Æ³öÿ¸ökeyµÄ³öÏÖ´ÎÊý£¬²¢°´³öÏÖ´ÎÊý½µÐòÅÅÐò¡£
// ¶Ô½µÐòÅÅÐòºóµÄÊý¾Ý£¬È¡³ötop 1»òÕßtop 100µÄÊý¾Ý£¬Ò²¾ÍÊÇkey×î¶àµÄǰn¸öÊý¾Ý¡£
// ¾ßÌåÈ¡³ö¶àÉÙ¸öÊý¾ÝÁ¿×î¶àµÄkey£¬ÓÉ´ó¼Ò×Ô¼º¾ö¶¨£¬ÎÒÃÇÕâÀï¾ÍÈ¡1¸ö×÷Ϊʾ·¶¡£
JavaPairRDD mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._1, 1L);
}
});
JavaPairRDD countedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception
{
return v1 + v2;
}
});
JavaPairRDD reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
// ´Órdd1Öзֲð³öµ¼ÖÂÊý¾ÝÇãбµÄkey£¬ÐγɶÀÁ¢µÄRDD¡£
JavaPairRDD skewedRDD = rdd1.filter(
new Function, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple) throws Exception
{
return tuple._1.equals(skewedUserid);
}
});
// ´Órdd1Öзֲð³ö²»µ¼ÖÂÊý¾ÝÇãбµÄÆÕͨkey£¬ÐγɶÀÁ¢µÄRDD¡£
JavaPairRDD commonRDD = rdd1.filter(
new Function, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple) throws Exception
{
return !tuple._1.equals(skewedUserid);
}
});
// rdd2£¬¾ÍÊÇÄǸöËùÓÐkeyµÄ·Ö²¼Ïà¶Ô½ÏΪ¾ùÔȵÄrdd¡£
// ÕâÀォrdd2ÖУ¬Ç°Ãæ»ñÈ¡µ½µÄkey¶ÔÓ¦µÄÊý¾Ý£¬¹ýÂ˳öÀ´£¬·Ö²ð³Éµ¥¶ÀµÄrdd£¬²¢¶ÔrddÖеÄÊý¾ÝʹÓÃflatMapËã×Ó¶¼À©ÈÝ100±¶¡£
// ¶ÔÀ©ÈݵÄÿÌõÊý¾Ý£¬¶¼´òÉÏ0¡«100µÄǰ׺¡£
JavaPairRDD skewedRdd2 = rdd2.filter(
new Function, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple) throws Exception
{
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction, String,
Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable> call(
Tuple2 tuple) throws Exception {
Random random = new Random();
List> list = new ArrayList>();
for(int i = 0; i < 100; i++) { list.add(new
Tuple2(i + "_" + tuple._1, tuple._2));
}
return list;
}
});
// ½«rdd1Öзֲð³öÀ´µÄµ¼ÖÂÇãбµÄkeyµÄ¶ÀÁ¢rdd£¬Ã¿ÌõÊý¾Ý¶¼´òÉÏ100ÒÔÄÚµÄËæ»úǰ׺¡£
// È»ºó½«Õâ¸ördd1Öзֲð³öÀ´µÄ¶ÀÁ¢rdd£¬ÓëÉÏÃærdd2Öзֲð³öÀ´µÄ¶ÀÁ¢rdd£¬½øÐÐjoin¡£
JavaPairRDD> joinedRDD1 = skewedRDD.mapToPair(
new PairFunction, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2(prefix + "_" + tuple._1,
tuple._2);
}
})
.join(skewedUserid2infoRDD)
.mapToPair(new PairFunction>, Long, Tuple2>()
{
private static final long serialVersionUID = 1L;
@Override
public Tuple2> call(
Tuple2> tuple)
throws Exception {
long key = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2>(key, tuple._2);
}
});
// ½«rdd1Öзֲð³öÀ´µÄ°üº¬ÆÕͨkeyµÄ¶ÀÁ¢rdd£¬Ö±½ÓÓërdd2½øÐÐjoin¡£
JavaPairRDD> joinedRDD2 = commonRDD.join(rdd2);
// ½«Çãбkey joinºóµÄ½á¹ûÓëÆÕͨkey joinºóµÄ½á¹û£¬uinonÆðÀ´¡£
// ¾ÍÊÇ×îÖÕµÄjoin½á¹û¡£
JavaPairRDD> joinedRDD = joinedRDD1.union(joinedRDD2); |
½â¾ö·½°¸Æß£ºÊ¹ÓÃËæ»úǰ׺ºÍÀ©ÈÝRDD½øÐÐjoin
·½°¸ÊÊÓó¡¾°£ºÈç¹ûÔÚ½øÐÐjoin²Ù×÷ʱ£¬RDDÖÐÓдóÁ¿µÄkeyµ¼ÖÂÊý¾ÝÇãб£¬ÄÇô½øÐзֲðkeyҲûʲôÒâÒ壬´Ëʱ¾ÍÖ»ÄÜʹÓÃ×îºóÒ»ÖÖ·½°¸À´½â¾öÎÊÌâÁË¡£
·½°¸ÊµÏÖ˼·£º
1¡¢¸Ã·½°¸µÄʵÏÖ˼·»ù±¾ºÍ¡°½â¾ö·½°¸Áù¡±ÀàËÆ£¬Ê×ÏȲ鿴RDD/Hive±íÖеÄÊý¾Ý·Ö²¼Çé¿ö£¬ÕÒµ½ÄǸöÔì³ÉÊý¾ÝÇãбµÄRDD/Hive±í£¬±ÈÈçÓжà¸ökey¶¼¶ÔÓ¦Á˳¬¹ý1ÍòÌõÊý¾Ý¡£
2¡¢È»ºó½«¸ÃRDDµÄÿÌõÊý¾Ý¶¼´òÉÏÒ»¸önÒÔÄÚµÄËæ»úǰ׺¡£
3¡¢Í¬Ê±¶ÔÁíÍâÒ»¸öÕý³£µÄRDD½øÐÐÀ©ÈÝ£¬½«Ã¿ÌõÊý¾Ý¶¼À©ÈݳÉnÌõÊý¾Ý£¬À©ÈݳöÀ´µÄÿÌõÊý¾Ý¶¼ÒÀ´Î´òÉÏÒ»¸ö0~nµÄǰ׺¡£
4¡¢×îºó½«Á½¸ö´¦ÀíºóµÄRDD½øÐÐjoin¼´¿É¡£
·½°¸ÊµÏÖÔÀí£º½«ÔÏÈÒ»ÑùµÄkeyͨ¹ý¸½¼ÓËæ»úǰ׺±ä³É²»Ò»ÑùµÄkey£¬È»ºó¾Í¿ÉÒÔ½«ÕâЩ´¦ÀíºóµÄ¡°²»Í¬key¡±·ÖÉ¢µ½¶à¸ötaskÖÐÈ¥´¦Àí£¬¶ø²»ÊÇÈÃÒ»¸ötask´¦Àí´óÁ¿µÄÏàͬkey¡£¸Ã·½°¸Óë¡°½â¾ö·½°¸Áù¡±µÄ²»Í¬Ö®´¦¾ÍÔÚÓÚ£¬ÉÏÒ»ÖÖ·½°¸ÊǾ¡Á¿Ö»¶ÔÉÙÊýÇãбkey¶ÔÓ¦µÄÊý¾Ý½øÐÐÌØÊâ´¦Àí£¬ÓÉÓÚ´¦Àí¹ý³ÌÐèÒªÀ©ÈÝRDD£¬Òò´ËÉÏÒ»ÖÖ·½°¸À©ÈÝRDDºó¶ÔÄÚ´æµÄÕ¼Óò¢²»´ó£»¶øÕâÒ»ÖÖ·½°¸ÊÇÕë¶ÔÓдóÁ¿ÇãбkeyµÄÇé¿ö£¬Ã»·¨½«²¿·Ökey²ð·Ö³öÀ´½øÐе¥¶À´¦Àí£¬Òò´ËÖ»ÄܶÔÕû¸öRDD½øÐÐÊý¾ÝÀ©ÈÝ£¬¶ÔÄÚ´æ×ÊÔ´ÒªÇóºÜ¸ß¡£
·½°¸Óŵ㣺¶ÔjoinÀàÐ͵ÄÊý¾ÝÇãб»ù±¾¶¼¿ÉÒÔ´¦Àí£¬¶øÇÒЧ¹ûÒ²Ïà¶Ô±È½ÏÏÔÖø£¬ÐÔÄÜÌáÉýЧ¹û·Ç³£²»´í¡£
·½°¸È±µã£º¸Ã·½°¸¸ü¶àµÄÊÇ»º½âÊý¾ÝÇãб£¬¶ø²»Êdz¹µ×±ÜÃâÊý¾ÝÇãб¡£¶øÇÒÐèÒª¶ÔÕû¸öRDD½øÐÐÀ©ÈÝ£¬¶ÔÄÚ´æ×ÊÔ´ÒªÇóºÜ¸ß¡£
·½°¸Êµ¼ù¾Ñé£ºÔø¾¿ª·¢Ò»¸öÊý¾ÝÐèÇóµÄʱºò£¬·¢ÏÖÒ»¸öjoinµ¼ÖÂÁËÊý¾ÝÇãб¡£ÓÅ»¯Ö®Ç°£¬×÷ÒµµÄÖ´ÐÐʱ¼ä´óÔ¼ÊÇ60·ÖÖÓ×óÓÒ£»Ê¹Óø÷½°¸ÓÅ»¯Ö®ºó£¬Ö´ÐÐʱ¼äËõ¶Ìµ½10·ÖÖÓ×óÓÒ£¬ÐÔÄÜÌáÉýÁË6±¶¡£
// Ê×ÏȽ«ÆäÖÐÒ»¸ökey·Ö²¼Ïà¶Ô½ÏΪ¾ùÔȵÄRDDÅòÕÍ100±¶¡£ JavaPairRDD expandedRDD = rdd1.flatMapToPair( new PairFlatMapFunction, String, Row>() { private static final long serialVersionUID = 1L; @Override public Iterable> call(Tuple2 tuple) throws Exception { List> list = new ArrayList>(); for(int i = 0; i < 100; i++) { list.add(new Tuple2(0 + "_" + tuple._1, tuple._2)); } return list; } });
// Æä´Î£¬½«ÁíÒ»¸öÓÐÊý¾ÝÇãбkeyµÄRDD£¬Ã¿ÌõÊý¾Ý¶¼´òÉÏ100ÒÔÄÚµÄËæ»úǰ׺¡£
JavaPairRDD mappedRDD = rdd2.mapToPair(
new PairFunction, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2(prefix + "_" + tuple._1,
tuple._2);
}
});
// ½«Á½¸ö´¦ÀíºóµÄRDD½øÐÐjoin¼´¿É¡£
JavaPairRDD> joinedRDD = mappedRDD.join(expandedRDD); |
½â¾ö·½°¸°Ë£º¶àÖÖ·½°¸×éºÏʹÓÃ
ÔÚʵ¼ùÖз¢ÏÖ£¬ºÜ¶àÇé¿öÏ£¬Èç¹ûÖ»ÊÇ´¦Àí½ÏΪ¼òµ¥µÄÊý¾ÝÇãб³¡¾°£¬ÄÇôʹÓÃÉÏÊö·½°¸ÖеÄijһÖÖ»ù±¾¾Í¿ÉÒÔ½â¾ö¡£µ«ÊÇÈç¹ûÒª´¦ÀíÒ»¸ö½ÏΪ¸´ÔÓµÄÊý¾ÝÇãб³¡¾°£¬ÄÇô¿ÉÄÜÐèÒª½«¶àÖÖ·½°¸×éºÏÆðÀ´Ê¹ÓᣱÈÈç˵£¬ÎÒÃÇÕë¶Ô³öÏÖÁ˶à¸öÊý¾ÝÇãб»·½ÚµÄSpark×÷Òµ£¬¿ÉÒÔÏÈÔËÓýâ¾ö·½°¸Ò»ºÍ¶þ£¬Ô¤´¦ÀíÒ»²¿·ÖÊý¾Ý£¬²¢¹ýÂËÒ»²¿·ÖÊý¾ÝÀ´»º½â£»Æä´Î¿ÉÒÔ¶ÔijЩshuffle²Ù×÷ÌáÉý²¢Ðжȣ¬ÓÅ»¯ÆäÐÔÄÜ£»×îºó»¹¿ÉÒÔÕë¶Ô²»Í¬µÄ¾ÛºÏ»òjoin²Ù×÷£¬Ñ¡ÔñÒ»ÖÖ·½°¸À´ÓÅ»¯ÆäÐÔÄÜ¡£´ó¼ÒÐèÒª¶ÔÕâЩ·½°¸µÄ˼·ºÍÔÀí¶¼Í¸³¹Àí½âÖ®ºó£¬ÔÚʵ¼ùÖиù¾Ý¸÷ÖÖ²»Í¬µÄÇé¿ö£¬Áé»îÔËÓöàÖÖ·½°¸£¬À´½â¾ö×Ô¼ºµÄÊý¾ÝÇãбÎÊÌâ¡£
|