ǰÑÔ
ÔÚ´óÊý¾Ý¼ÆËãÁìÓò£¬SparkÒѾ³ÉΪÁËÔ½À´Ô½Á÷ÐС¢Ô½À´Ô½ÊÜ»¶ÓµÄ¼ÆËãÆ½Ì¨Ö®Ò»¡£SparkµÄ¹¦Äܺ¸ÇÁË´óÊý¾ÝÁìÓòµÄÀëÏßÅú´¦Àí¡¢SQLÀà´¦Àí¡¢Á÷ʽ/ʵʱ¼ÆËã¡¢»úÆ÷ѧϰ¡¢Í¼¼ÆËãµÈ¸÷ÖÖ²»Í¬ÀàÐ͵ļÆËã²Ù×÷£¬Ó¦Ó÷¶Î§Óëǰ¾°·Ç³£¹ã·º¡£ÔÚÃÀÍÅ?´óÖÚµãÆÀ£¬ÒѾÓкܶàͬѧÔÚ¸÷ÖÖÏîÄ¿Öг¢ÊÔʹÓÃSpark¡£´ó¶àÊýͬѧ£¨°üÀ¨±ÊÕßÔÚÄÚ£©£¬×î³õ¿ªÊ¼³¢ÊÔʹÓÃSparkµÄÔÒòºÜ¼òµ¥£¬Ö÷Òª¾ÍÊÇΪÁËÈôóÊý¾Ý¼ÆËã×÷ÒµµÄÖ´ÐÐËٶȸü¿ì¡¢ÐÔÄܸü¸ß¡£
È»¶ø£¬Í¨¹ýSpark¿ª·¢³ö¸ßÐÔÄܵĴóÊý¾Ý¼ÆËã×÷Òµ£¬²¢²»ÊÇÄÇô¼òµ¥µÄ¡£Èç¹ûûÓжÔSpark×÷Òµ½øÐкÏÀíµÄµ÷ÓÅ£¬Spark×÷ÒµµÄÖ´ÐÐËÙ¶È¿ÉÄÜ»áºÜÂý£¬ÕâÑù¾ÍÍêÈ«ÌåÏÖ²»³öSpark×÷ΪһÖÖ¿ìËÙ´óÊý¾Ý¼ÆËãÒýÇæµÄÓÅÊÆÀ´¡£Òò´Ë£¬ÏëÒªÓúÃSpark£¬¾Í±ØÐë¶ÔÆä½øÐкÏÀíµÄÐÔÄÜÓÅ»¯¡£
SparkµÄÐÔÄܵ÷ÓÅʵ¼ÊÉÏÊÇÓɺܶಿ·Ö×é³ÉµÄ£¬²»Êǵ÷½Ú¼¸¸ö²ÎÊý¾Í¿ÉÒÔÁ¢¸Í¼ûÓ°ÌáÉý×÷ÒµÐÔÄܵġ£ÎÒÃÇÐèÒª¸ù¾Ý²»Í¬µÄÒµÎñ³¡¾°ÒÔ¼°Êý¾ÝÇé¿ö£¬¶ÔSpark×÷Òµ½øÐÐ×ÛºÏÐԵķÖÎö£¬È»ºó½øÐжà¸ö·½ÃæµÄµ÷½ÚºÍÓÅ»¯£¬²ÅÄÜ»ñµÃ×î¼ÑÐÔÄÜ¡£
±ÊÕ߸ù¾Ý֮ǰµÄSpark×÷Òµ¿ª·¢¾ÑéÒÔ¼°Êµ¼ù»ýÀÛ£¬×ܽá³öÁËÒ»Ì×Spark×÷ÒµµÄÐÔÄÜÓÅ»¯·½°¸¡£ÕûÌ×·½°¸Ö÷Òª·ÖΪ¿ª·¢µ÷ÓÅ¡¢×ÊÔ´µ÷ÓÅ¡¢Êý¾ÝÇãбµ÷ÓÅ¡¢shuffleµ÷Óż¸¸ö²¿·Ö¡£¿ª·¢µ÷ÓźÍ×ÊÔ´µ÷ÓÅÊÇËùÓÐSpark×÷Òµ¶¼ÐèҪעÒâºÍ×ñѵÄһЩ»ù±¾ÔÔò£¬ÊǸßÐÔÄÜSpark×÷ÒµµÄ»ù´¡£»Êý¾ÝÇãбµ÷ÓÅ£¬Ö÷Òª½²½âÁËÒ»Ì×ÍêÕûµÄÓÃÀ´½â¾öSpark×÷ÒµÊý¾ÝÇãбµÄ½â¾ö·½°¸£»shuffleµ÷ÓÅ£¬ÃæÏòµÄÊǶÔSparkµÄÔÀíÓнÏÉî²ã´ÎÕÆÎÕºÍÑо¿µÄͬѧ£¬Ö÷Òª½²½âÁËÈçºÎ¶ÔSpark×÷ÒµµÄshuffleÔËÐйý³ÌÒÔ¼°Ï¸½Ú½øÐе÷ÓÅ¡£
±¾ÎÄ×÷ΪSparkÐÔÄÜÓÅ»¯Ö¸ÄϵĻù´¡Æª£¬Ö÷Òª½²½â¿ª·¢µ÷ÓÅÒÔ¼°×ÊÔ´µ÷ÓÅ¡£
¿ª·¢µ÷ÓÅ
µ÷ÓŸÅÊö
SparkÐÔÄÜÓÅ»¯µÄµÚÒ»²½£¬¾ÍÊÇÒªÔÚ¿ª·¢Spark×÷ÒµµÄ¹ý³ÌÖÐ×¢ÒâºÍÓ¦ÓÃһЩÐÔÄÜÓÅ»¯µÄ»ù±¾ÔÔò¡£¿ª·¢µ÷ÓÅ£¬¾ÍÊÇÒªÈôó¼ÒÁ˽âÒÔÏÂһЩSpark»ù±¾¿ª·¢ÔÔò£¬°üÀ¨£ºRDD
lineageÉè¼Æ¡¢Ëã×ӵĺÏÀíʹÓá¢ÌØÊâ²Ù×÷µÄÓÅ»¯µÈ¡£ÔÚ¿ª·¢¹ý³ÌÖУ¬Ê±Ê±¿Ì¿Ì¶¼Ó¦¸Ã×¢ÒâÒÔÉÏÔÔò£¬²¢½«ÕâЩÔÔò¸ù¾Ý¾ßÌåµÄÒµÎñÒÔ¼°Êµ¼ÊµÄÓ¦Óó¡¾°£¬Áé»îµØÔËÓõ½×Ô¼ºµÄSpark×÷ÒµÖС£
ÔÔòÒ»£º±ÜÃâ´´½¨Öظ´µÄRDD
ͨ³£À´Ëµ£¬ÎÒÃÇÔÚ¿ª·¢Ò»¸öSpark×÷ҵʱ£¬Ê×ÏÈÊÇ»ùÓÚij¸öÊý¾ÝÔ´£¨±ÈÈçHive±í»òHDFSÎļþ£©´´½¨Ò»¸ö³õʼµÄRDD£»½Ó×ŶÔÕâ¸öRDDÖ´ÐÐij¸öËã×Ó²Ù×÷£¬È»ºóµÃµ½ÏÂÒ»¸öRDD£»ÒÔ´ËÀàÍÆ£¬Ñ»·Íù¸´£¬Ö±µ½¼ÆËã³ö×îÖÕÎÒÃÇÐèÒªµÄ½á¹û¡£ÔÚÕâ¸ö¹ý³ÌÖУ¬¶à¸öRDD»áͨ¹ý²»Í¬µÄËã×Ó²Ù×÷£¨±ÈÈçmap¡¢reduceµÈ£©´®ÆðÀ´£¬Õâ¸ö¡°RDD´®¡±£¬¾ÍÊÇRDD
lineage£¬Ò²¾ÍÊÇ¡°RDDµÄѪԵ¹ØÏµÁ´¡±¡£
ÎÒÃÇÔÚ¿ª·¢¹ý³ÌÖÐҪעÒ⣺¶ÔÓÚͬһ·ÝÊý¾Ý£¬Ö»Ó¦¸Ã´´½¨Ò»¸öRDD£¬²»ÄÜ´´½¨¶à¸öRDDÀ´´ú±íͬһ·ÝÊý¾Ý¡£
һЩSpark³õѧÕßÔÚ¸Õ¿ªÊ¼¿ª·¢Spark×÷ҵʱ£¬»òÕßÊÇÓоÑéµÄ¹¤³ÌʦÔÚ¿ª·¢RDD lineage¼«ÆäÈß³¤µÄSpark×÷ҵʱ£¬¿ÉÄÜ»áÍüÁË×Ô¼ºÖ®Ç°¶ÔÓÚijһ·ÝÊý¾ÝÒѾ´´½¨¹ýÒ»¸öRDDÁË£¬´Ó¶øµ¼Ö¶ÔÓÚͬһ·ÝÊý¾Ý£¬´´½¨Á˶à¸öRDD¡£Õâ¾ÍÒâζ×Å£¬ÎÒÃǵÄSpark×÷Òµ»á½øÐжà´ÎÖØ¸´¼ÆËãÀ´´´½¨¶à¸ö´ú±íÏàͬÊý¾ÝµÄRDD£¬½ø¶øÔö¼ÓÁË×÷ÒµµÄÐÔÄÜ¿ªÏú¡£
Ò»¸ö¼òµ¥µÄÀý×Ó
// ÐèÒª¶ÔÃûΪ¡°hello.txt¡±µÄHDFSÎļþ½øÐÐÒ»´Îmap²Ù×÷£¬ ÔÙ½øÐÐÒ»´Îreduce²Ù×÷¡£Ò²¾ÍÊÇ˵£¬ ÐèÒª¶ÔÒ»·ÝÊý¾ÝÖ´ÐÐÁ½´ÎËã×Ó²Ù×÷¡£ // ´íÎóµÄ×ö·¨£º¶ÔÓÚͬһ·ÝÊý¾ÝÖ´Ðжà´ÎËã×Ó²Ù×÷ʱ£¬ ´´½¨¶à¸öRDD¡£ // ÕâÀïÖ´ÐÐÁËÁ½´ÎtextFile·½·¨£¬Õë¶Ôͬһ¸öHDFSÎļþ£¬ ´´½¨ÁËÁ½¸öRDD³öÀ´£¬È»ºó·Ö±ð¶Ôÿ¸öRDD¶¼Ö´ÐÐÁËÒ»¸öËã×Ó²Ù×÷¡£ // ÕâÖÖÇé¿öÏ£¬SparkÐèÒª´ÓHDFSÉÏÁ½´Î¼ÓÔØhello.txtÎļþµÄÄÚÈÝ £¬²¢´´½¨Á½¸öµ¥¶ÀµÄRDD£» µÚ¶þ´Î¼ÓÔØHDFSÎļþÒÔ¼°´´½¨RDDµÄÐÔÄÜ¿ªÏú£¬ ºÜÃ÷ÏÔÊǰװ×À˷ѵôµÄ¡£ val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd1.map(...) val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd2.reduce(...) // ÕýÈ·µÄÓ÷¨£º ¶ÔÓÚÒ»·ÝÊý¾ÝÖ´Ðжà´ÎËã×Ó²Ù×÷ʱ£¬Ö»Ê¹ÓÃÒ»¸öRDD¡£ // ÕâÖÖд·¨ºÜÃ÷ÏÔ±ÈÉÏÒ»ÖÖд·¨ÒªºÃ¶àÁË£¬ ÒòΪÎÒÃǶÔÓÚͬһ·ÝÊý¾ÝÖ»´´½¨ÁËÒ»¸öRDD £¬È»ºó¶ÔÕâÒ»¸öRDDÖ´ÐÐÁ˶à´ÎËã×Ó²Ù×÷¡£ // µ«ÊÇҪעÒâµ½ÕâÀïΪֹÓÅ»¯»¹Ã»ÓнáÊø£¬ ÓÉÓÚrdd1±»Ö´ÐÐÁËÁ½´ÎËã×Ó²Ù×÷£¬ µÚ¶þ´ÎÖ´ÐÐreduce²Ù×÷µÄʱºò£¬ »¹»áÔٴδÓÔ´Í·´¦ÖØÐ¼ÆËãÒ»´Îrdd1µÄÊý¾Ý£¬ Òò´Ë»¹ÊÇ»áÓÐÖØ¸´¼ÆËãµÄÐÔÄÜ¿ªÏú¡£ // Òª³¹µ×½â¾öÕâ¸öÎÊÌ⣬±ØÐë½áºÏ ¡°ÔÔòÈý£º¶Ô¶à´ÎʹÓõÄRDD½øÐг־û¯¡±£¬ ²ÅÄܱ£Ö¤Ò»¸öRDD±»¶à´ÎʹÓÃʱֻ±»¼ÆËãÒ»´Î¡£ val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd1.map(...) rdd1.reduce(...)
|
ÔÔò¶þ£º¾¡¿ÉÄܸ´ÓÃͬһ¸öRDD
³ýÁËÒª±ÜÃâÔÚ¿ª·¢¹ý³ÌÖжÔÒ»·ÝÍêÈ«ÏàͬµÄÊý¾Ý´´½¨¶à¸öRDDÖ®Í⣬ÔÚ¶Ô²»Í¬µÄÊý¾ÝÖ´ÐÐËã×Ó²Ù×÷ʱ»¹Òª¾¡¿ÉÄܵظ´ÓÃÒ»¸öRDD¡£±ÈÈç˵£¬ÓÐÒ»¸öRDDµÄÊý¾Ý¸ñʽÊÇkey-valueÀàÐ͵ģ¬ÁíÒ»¸öÊǵ¥valueÀàÐ͵ģ¬ÕâÁ½¸öRDDµÄvalueÊý¾ÝÊÇÍêȫһÑùµÄ¡£ÄÇô´ËʱÎÒÃÇ¿ÉÒÔֻʹÓÃkey-valueÀàÐ͵ÄÄǸöRDD£¬ÒòΪÆäÖÐÒѾ°üº¬ÁËÁíÒ»¸öµÄÊý¾Ý¡£¶ÔÓÚÀàËÆÕâÖÖ¶à¸öRDDµÄÊý¾ÝÓÐÖØµþ»òÕß°üº¬µÄÇé¿ö£¬ÎÒÃÇÓ¦¸Ã¾¡Á¿¸´ÓÃÒ»¸öRDD£¬ÕâÑù¿ÉÒÔ¾¡¿ÉÄܵؼõÉÙRDDµÄÊýÁ¿£¬´Ó¶ø¾¡¿ÉÄܼõÉÙËã×ÓÖ´ÐеĴÎÊý¡£
Ò»¸ö¼òµ¥µÄÀý×Ó
// ´íÎóµÄ×ö·¨¡£ // ÓÐÒ»¸ö<Long, String>¸ñʽµÄRDD£¬¼´rdd1¡£ // ½Ó×ÅÓÉÓÚÒµÎñÐèÒª£¬¶Ôrdd1Ö´ÐÐÁËÒ»¸ömap²Ù×÷£¬ ´´½¨ÁËÒ»¸ördd2£¬ ¶ørdd2ÖеÄÊý¾Ý½ö½öÊÇrdd1ÖеÄvalueÖµ¶øÒÑ£¬ Ò²¾ÍÊÇ˵£¬rdd2ÊÇrdd1µÄ×Ó¼¯¡£ JavaPairRDD<Long, String> rdd1 = ... JavaRDD<String> rdd2 = rdd1.map(...) // ·Ö±ð¶Ôrdd1ºÍrdd2Ö´ÐÐÁ˲»Í¬µÄËã×Ó²Ù×÷¡£ rdd1.reduceByKey(...) rdd2.map(...) // ÕýÈ·µÄ×ö·¨¡£ // ÉÏÃæÕâ¸öcaseÖУ¬Æäʵrdd1ºÍrdd2µÄÇø±ðÎ޷ǾÍÊÇÊý¾Ý¸ñʽ²»Í¬¶øÒÑ £¬rdd2µÄÊý¾ÝÍêÈ«¾ÍÊÇrdd1µÄ×Ó¼¯¶øÒÑ£¬ È´´´½¨ÁËÁ½¸ördd£¬²¢¶ÔÁ½¸ördd¶¼Ö´ÐÐÁËÒ»´ÎËã×Ó²Ù×÷¡£ // ´Ëʱ»áÒòΪ¶Ôrdd1Ö´ÐÐmapËã×ÓÀ´´´½¨rdd2£¬ ¶ø¶àÖ´ÐÐÒ»´ÎËã×Ó²Ù×÷£¬½ø¶øÔö¼ÓÐÔÄÜ¿ªÏú¡£ // ÆäʵÔÚÕâÖÖÇé¿öÏÂÍêÈ«¿ÉÒÔ¸´ÓÃͬһ¸öRDD¡£ // ÎÒÃÇ¿ÉÒÔʹÓÃrdd1£¬¼È×öreduceByKey²Ù×÷£¬Ò²×ömap²Ù×÷¡£ // ÔÚ½øÐеڶþ¸ömap²Ù×÷ʱ£¬ ֻʹÓÃÿ¸öÊý¾ÝµÄtuple._2£¬Ò²¾ÍÊÇrdd1ÖеÄvalueÖµ£¬¼´¿É¡£ JavaPairRDD<Long, String> rdd1 = ... rdd1.reduceByKey(...) rdd1.map(tuple._2...) // µÚ¶þÖÖ·½Ê½Ïà½ÏÓÚµÚÒ»ÖÖ·½Ê½¶øÑÔ£¬ ºÜÃ÷ÏÔ¼õÉÙÁËÒ»´Îrdd2µÄ¼ÆË㿪Ïú¡£ // µ«Êǵ½ÕâÀïΪֹ£¬ÓÅ»¯»¹Ã»ÓнáÊø£¬ ¶Ôrdd1ÎÒÃÇ»¹ÊÇÖ´ÐÐÁËÁ½´ÎËã×Ó²Ù×÷£¬ rdd1ʵ¼ÊÉÏ»¹Êǻᱻ¼ÆËãÁ½´Î¡£ // Òò´Ë»¹ÐèÒªÅäºÏ¡°ÔÔòÈý£º ¶Ô¶à´ÎʹÓõÄRDD½øÐг־û¯¡±½øÐÐʹÓ㬠²ÅÄܱ£Ö¤Ò»¸öRDD±»¶à´Î |
ÔÔòÈý£º¶Ô¶à´ÎʹÓõÄRDD½øÐг־û¯
µ±ÄãÔÚSpark´úÂëÖжà´Î¶ÔÒ»¸öRDD×öÁËËã×Ó²Ù×÷ºó£¬¹§Ï²£¬ÄãÒѾʵÏÖSpark×÷ÒµµÚÒ»²½µÄÓÅ»¯ÁË£¬Ò²¾ÍÊǾ¡¿ÉÄܸ´ÓÃRDD¡£´Ëʱ¾Í¸ÃÔÚÕâ¸ö»ù´¡Ö®ÉÏ£¬½øÐеڶþ²½ÓÅ»¯ÁË£¬Ò²¾ÍÊÇÒª±£Ö¤¶ÔÒ»¸öRDDÖ´Ðжà´ÎËã×Ó²Ù×÷ʱ£¬Õâ¸öRDD±¾Éí½ö½ö±»¼ÆËãÒ»´Î¡£
SparkÖжÔÓÚÒ»¸öRDDÖ´Ðжà´ÎËã×ÓµÄĬÈÏÔÀíÊÇÕâÑùµÄ£ºÃ¿´ÎÄã¶ÔÒ»¸öRDDÖ´ÐÐÒ»¸öËã×Ó²Ù×÷ʱ£¬¶¼»áÖØÐ´ÓÔ´Í·´¦¼ÆËãÒ»±é£¬¼ÆËã³öÄǸöRDDÀ´£¬È»ºóÔÙ¶ÔÕâ¸öRDDÖ´ÐÐÄãµÄËã×Ó²Ù×÷¡£ÕâÖÖ·½Ê½µÄÐÔÄÜÊǺܲîµÄ¡£
Òò´Ë¶ÔÓÚÕâÖÖÇé¿ö£¬ÎÒÃǵĽ¨ÒéÊÇ£º¶Ô¶à´ÎʹÓõÄRDD½øÐг־û¯¡£´ËʱSpark¾Í»á¸ù¾ÝÄãµÄ³Ö¾Ã»¯²ßÂÔ£¬½«RDDÖеÄÊý¾Ý±£´æµ½ÄÚ´æ»òÕß´ÅÅÌÖС£ÒÔºóÿ´Î¶ÔÕâ¸öRDD½øÐÐËã×Ó²Ù×÷ʱ£¬¶¼»áÖ±½Ó´ÓÄÚ´æ»ò´ÅÅÌÖÐÌáÈ¡³Ö¾Ã»¯µÄRDDÊý¾Ý£¬È»ºóÖ´ÐÐËã×Ó£¬¶ø²»»á´ÓÔ´Í·´¦ÖØÐ¼ÆËãÒ»±éÕâ¸öRDD£¬ÔÙÖ´ÐÐËã×Ó²Ù×÷¡£
¶Ô¶à´ÎʹÓõÄRDD½øÐг־û¯µÄ´úÂëʾÀý
// Èç¹ûÒª¶ÔÒ»¸öRDD½øÐг־û¯£¬ Ö»Òª¶ÔÕâ¸öRDDµ÷ÓÃcache()ºÍpersist()¼´¿É¡£ // ÕýÈ·µÄ×ö·¨¡£ // cache()·½·¨±íʾ£ºÊ¹Ó÷ÇÐòÁл¯µÄ·½Ê½½«RDD ÖеÄÊý¾ÝÈ«²¿³¢ÊԳ־û¯µ½ÄÚ´æÖС£ // ´ËʱÔÙ¶Ôrdd1Ö´ÐÐÁ½´ÎËã×Ó²Ù×÷ʱ£¬ Ö»ÓÐÔÚµÚÒ»´ÎÖ´ÐÐmapËã×Óʱ£¬²Å»á½«Õâ¸ördd1´ÓÔ´Í·´¦¼ÆËãÒ»´Î¡£ // µÚ¶þ´ÎÖ´ÐÐreduceËã×Óʱ£¬ ¾Í»áÖ±½Ó´ÓÄÚ´æÖÐÌáÈ¡Êý¾Ý½øÐмÆË㣬²»»áÖØ¸´¼ÆËãÒ»¸ördd¡£ val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache() rdd1.map(...) rdd1.reduce(...) // persist()·½·¨±íʾ£ºÊÖ¶¯Ñ¡Ôñ³Ö¾Ã»¯¼¶±ð£¬ ²¢Ê¹ÓÃÖ¸¶¨µÄ·½Ê½½øÐг־û¯¡£ // ±ÈÈç˵£¬StorageLevel.MEMORY_AND_DISK_SER±íʾ£¬ ÄÚ´æ³ä×ãʱÓÅÏȳ־û¯µ½ÄÚ´æÖУ¬ÄÚ´æ²»³ä×ãʱ³Ö¾Ã»¯µ½´ÅÅÌÎļþÖС£ // ¶øÇÒÆäÖеÄ_SERºó׺±íʾ£¬ ʹÓÃÐòÁл¯µÄ·½Ê½À´±£´æRDDÊý¾Ý£¬ ´ËʱRDDÖеÄÿ¸öpartition¶¼»áÐòÁл¯³ÉÒ»¸ö´óµÄ×Ö½ÚÊý×飬 È»ºóÔٳ־û¯µ½ÄÚ´æ»ò´ÅÅÌÖС£ // ÐòÁл¯µÄ·½Ê½¿ÉÒÔ¼õÉٳ־û¯µÄÊý¾Ý¶ÔÄÚ´æ/´ÅÅ̵ÄÕ¼ÓÃÁ¿ £¬½ø¶ø±ÜÃâÄÚ´æ±»³Ö¾Ã»¯Êý¾ÝÕ¼Óùý¶à£¬´Ó¶ø·¢ÉúƵ·±GC¡£ val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") .persist(StorageLevel.MEMORY_AND_DISK_SER) rdd1.map(...) rdd1.reduce(...) |
¶ÔÓÚpersist()·½·¨¶øÑÔ£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ý²»Í¬µÄÒµÎñ³¡¾°Ñ¡Ôñ²»Í¬µÄ³Ö¾Ã»¯¼¶±ð¡£
SparkµÄ³Ö¾Ã»¯¼¶±ð
³Ö¾Ã»¯¼¶±ðº¬Òå½âÊÍMEMORY_ONLYʹÓÃδÐòÁл¯µÄJava¶ÔÏó¸ñʽ£¬½«Êý¾Ý±£´æÔÚÄÚ´æÖС£Èç¹ûÄÚ´æ²»¹»´æ·ÅËùÓеÄÊý¾Ý£¬ÔòÊý¾Ý¿ÉÄܾͲ»»á½øÐг־û¯¡£ÄÇôÏ´ζÔÕâ¸öRDDÖ´ÐÐËã×Ó²Ù×÷ʱ£¬ÄÇЩûÓб»³Ö¾Ã»¯µÄÊý¾Ý£¬ÐèÒª´ÓÔ´Í·´¦ÖØÐ¼ÆËãÒ»±é¡£ÕâÊÇĬÈϵij־û¯²ßÂÔ£¬Ê¹ÓÃcache()·½·¨Ê±£¬Êµ¼Ê¾ÍÊÇʹÓõÄÕâÖֳ־û¯²ßÂÔ¡£MEMORY_AND_DISKʹÓÃδÐòÁл¯µÄJava¶ÔÏó¸ñʽ£¬ÓÅÏȳ¢ÊÔ½«Êý¾Ý±£´æÔÚÄÚ´æÖС£Èç¹ûÄÚ´æ²»¹»´æ·ÅËùÓеÄÊý¾Ý£¬»á½«Êý¾ÝдÈë´ÅÅÌÎļþÖУ¬Ï´ζÔÕâ¸öRDDÖ´ÐÐËã×Óʱ£¬³Ö¾Ã»¯ÔÚ´ÅÅÌÎļþÖеÄÊý¾Ý»á±»¶ÁÈ¡³öÀ´Ê¹Óá£MEMORY_ONLY_SER»ù±¾º¬ÒåͬMEMORY_ONLY¡£Î¨Ò»µÄÇø±ðÊÇ£¬»á½«RDDÖеÄÊý¾Ý½øÐÐÐòÁл¯£¬RDDµÄÿ¸öpartition»á±»ÐòÁл¯³ÉÒ»¸ö×Ö½ÚÊý×é¡£ÕâÖÖ·½Ê½¸ü¼Ó½ÚÊ¡Äڴ棬´Ó¶ø¿ÉÒÔ±ÜÃâ³Ö¾Ã»¯µÄÊý¾ÝÕ¼Óùý¶àÄÚ´æµ¼ÖÂÆµ·±GC¡£MEMORY_AND_DISK_SER»ù±¾º¬ÒåͬMEMORY_AND_DISK¡£Î¨Ò»µÄÇø±ðÊÇ£¬»á½«RDDÖеÄÊý¾Ý½øÐÐÐòÁл¯£¬RDDµÄÿ¸öpartition»á±»ÐòÁл¯³ÉÒ»¸ö×Ö½ÚÊý×é¡£ÕâÖÖ·½Ê½¸ü¼Ó½ÚÊ¡Äڴ棬´Ó¶ø¿ÉÒÔ±ÜÃâ³Ö¾Ã»¯µÄÊý¾ÝÕ¼Óùý¶àÄÚ´æµ¼ÖÂÆµ·±GC¡£DISK_ONLYʹÓÃδÐòÁл¯µÄJava¶ÔÏó¸ñʽ£¬½«Êý¾ÝÈ«²¿Ð´Èë´ÅÅÌÎļþÖС£MEMORY_ONLY_2,
MEMORY_AND_DISK_2, µÈµÈ.¶ÔÓÚÉÏÊöÈÎÒâÒ»Öֳ־û¯²ßÂÔ£¬Èç¹û¼ÓÉϺó׺_2£¬´ú±íµÄÊǽ«Ã¿¸ö³Ö¾Ã»¯µÄÊý¾Ý£¬¶¼¸´ÖÆÒ»·Ý¸±±¾£¬²¢½«¸±±¾±£´æµ½ÆäËû½ÚµãÉÏ¡£ÕâÖÖ»ùÓÚ¸±±¾µÄ³Ö¾Ã»¯»úÖÆÖ÷ÒªÓÃÓÚ½øÐÐÈÝ´í¡£¼ÙÈçij¸ö½Úµã¹Òµô£¬½ÚµãµÄÄÚ´æ»ò´ÅÅÌÖеij־û¯Êý¾Ý¶ªÊ§ÁË£¬ÄÇôºóÐø¶ÔRDD¼ÆËãʱ»¹¿ÉÒÔʹÓøÃÊý¾ÝÔÚÆäËû½ÚµãÉϵĸ±±¾¡£Èç¹ûûÓи±±¾µÄ»°£¬¾ÍÖ»Äܽ«ÕâЩÊý¾Ý´ÓÔ´Í·´¦ÖØÐ¼ÆËãÒ»±éÁË¡£
ÈçºÎÑ¡ÔñÒ»ÖÖ×îºÏÊʵij־û¯²ßÂÔ
ĬÈÏÇé¿öÏ£¬ÐÔÄÜ×î¸ßµÄµ±È»ÊÇMEMORY_ONLY£¬µ«Ç°ÌáÊÇÄãµÄÄÚ´æ±ØÐë×ã¹»×ã¹»´ó£¬¿ÉÒԴ´ÂÓÐÓàµØ´æ·ÅÏÂÕû¸öRDDµÄËùÓÐÊý¾Ý¡£ÒòΪ²»½øÐÐÐòÁл¯Óë·´ÐòÁл¯²Ù×÷£¬¾Í±ÜÃâÁËÕⲿ·ÖµÄÐÔÄÜ¿ªÏú£»¶ÔÕâ¸öRDDµÄºóÐøËã×Ó²Ù×÷£¬¶¼ÊÇ»ùÓÚ´¿ÄÚ´æÖеÄÊý¾ÝµÄ²Ù×÷£¬²»ÐèÒª´Ó´ÅÅÌÎļþÖжÁÈ¡Êý¾Ý£¬ÐÔÄÜÒ²ºÜ¸ß£»¶øÇÒ²»ÐèÒª¸´ÖÆÒ»·ÝÊý¾Ý¸±±¾£¬²¢Ô¶³Ì´«Ë͵½ÆäËû½ÚµãÉÏ¡£µ«ÊÇÕâÀï±ØÐëҪעÒâµÄÊÇ£¬ÔÚʵ¼ÊµÄÉú²ú»·¾³ÖУ¬¿ÖÅÂÄܹ»Ö±½ÓÓÃÕâÖÖ²ßÂԵij¡¾°»¹ÊÇÓÐÏ޵ģ¬Èç¹ûRDDÖÐÊý¾Ý±È½Ï¶àʱ£¨±ÈÈ缸ʮÒÚ£©£¬Ö±½ÓÓÃÕâÖֳ־û¯¼¶±ð£¬»áµ¼ÖÂJVMµÄOOMÄÚ´æÒç³öÒì³£¡£
Èç¹ûʹÓÃMEMORY_ONLY¼¶±ðʱ·¢ÉúÁËÄÚ´æÒç³ö£¬ÄÇô½¨Òé³¢ÊÔʹÓÃMEMORY_ONLY_SER¼¶±ð¡£¸Ã¼¶±ð»á½«RDDÊý¾ÝÐòÁл¯ºóÔÙ±£´æÔÚÄÚ´æÖУ¬´Ëʱÿ¸öpartition½ö½öÊÇÒ»¸ö×Ö½ÚÊý×é¶øÒÑ£¬´ó´ó¼õÉÙÁ˶ÔÏóÊýÁ¿£¬²¢½µµÍÁËÄÚ´æÕ¼Óá£ÕâÖÖ¼¶±ð±ÈMEMORY_ONLY¶à³öÀ´µÄÐÔÄÜ¿ªÏú£¬Ö÷Òª¾ÍÊÇÐòÁл¯Óë·´ÐòÁл¯µÄ¿ªÏú¡£µ«ÊǺóÐøËã×Ó¿ÉÒÔ»ùÓÚ´¿ÄÚ´æ½øÐвÙ×÷£¬Òò´ËÐÔÄÜ×ÜÌ廹ÊDZȽϸߵġ£´ËÍ⣬¿ÉÄÜ·¢ÉúµÄÎÊÌâͬÉÏ£¬Èç¹ûRDDÖеÄÊý¾ÝÁ¿¹ý¶àµÄ»°£¬»¹ÊÇ¿ÉÄܻᵼÖÂOOMÄÚ´æÒç³öµÄÒì³£¡£
Èç¹û´¿ÄÚ´æµÄ¼¶±ð¶¼ÎÞ·¨Ê¹Óã¬ÄÇô½¨ÒéʹÓÃMEMORY_AND_DISK_SER²ßÂÔ£¬¶ø²»ÊÇMEMORY_AND_DISK²ßÂÔ¡£ÒòΪ¼ÈÈ»µ½ÁËÕâÒ»²½£¬¾Í˵Ã÷RDDµÄÊý¾ÝÁ¿ºÜ´ó£¬ÄÚ´æÎÞ·¨ÍêÈ«·ÅÏ¡£ÐòÁл¯ºóµÄÊý¾Ý±È½ÏÉÙ£¬¿ÉÒÔ½ÚÊ¡ÄÚ´æºÍ´ÅÅ̵Ŀռ俪Ïú¡£Í¬Ê±¸Ã²ßÂÔ»áÓÅÏȾ¡Á¿³¢ÊÔ½«Êý¾Ý»º´æÔÚÄÚ´æÖУ¬Äڴ滺´æ²»Ï²ŻáдÈë´ÅÅÌ¡£
ͨ³£²»½¨ÒéʹÓÃDISK_ONLYºÍºó׺Ϊ_2µÄ¼¶±ð£ºÒòΪÍêÈ«»ùÓÚ´ÅÅÌÎļþ½øÐÐÊý¾ÝµÄ¶Áд£¬»áµ¼ÖÂÐÔÄܼ±¾ç½µµÍ£¬ÓÐʱ»¹²»ÈçÖØÐ¼ÆËãÒ»´ÎËùÓÐRDD¡£ºó׺Ϊ_2µÄ¼¶±ð£¬±ØÐ뽫ËùÓÐÊý¾Ý¶¼¸´ÖÆÒ»·Ý¸±±¾£¬²¢·¢Ë͵½ÆäËû½ÚµãÉÏ£¬Êý¾Ý¸´ÖÆÒÔ¼°ÍøÂç´«Êä»áµ¼Ö½ϴóµÄÐÔÄÜ¿ªÏú£¬³ý·ÇÊÇÒªÇó×÷ÒµµÄ¸ß¿ÉÓÃÐÔ£¬·ñÔò²»½¨ÒéʹÓá£
ÔÔòËÄ£º¾¡Á¿±ÜÃâʹÓÃshuffleÀàËã×Ó
Èç¹ûÓпÉÄܵϰ£¬Òª¾¡Á¿±ÜÃâʹÓÃshuffleÀàËã×Ó¡£ÒòΪSpark×÷ÒµÔËÐйý³ÌÖУ¬×îÏûºÄÐÔÄܵĵط½¾ÍÊÇshuffle¹ý³Ì¡£shuffle¹ý³Ì£¬¼òµ¥À´Ëµ£¬¾ÍÊǽ«·Ö²¼ÔÚ¼¯ÈºÖжà¸ö½ÚµãÉϵÄͬһ¸ökey£¬ÀÈ¡µ½Í¬Ò»¸ö½ÚµãÉÏ£¬½øÐоۺϻòjoinµÈ²Ù×÷¡£±ÈÈçreduceByKey¡¢joinµÈËã×Ó£¬¶¼»á´¥·¢shuffle²Ù×÷¡£
shuffle¹ý³ÌÖУ¬¸÷¸ö½ÚµãÉϵÄÏàͬkey¶¼»áÏÈдÈë±¾µØ´ÅÅÌÎļþÖУ¬È»ºóÆäËû½ÚµãÐèҪͨ¹ýÍøÂç´«ÊäÀÈ¡¸÷¸ö½ÚµãÉϵĴÅÅÌÎļþÖеÄÏàͬkey¡£¶øÇÒÏàͬkey¶¼ÀÈ¡µ½Í¬Ò»¸ö½Úµã½øÐоۺϲÙ×÷ʱ£¬»¹ÓпÉÄÜ»áÒòΪһ¸ö½ÚµãÉÏ´¦ÀíµÄkey¹ý¶à£¬µ¼ÖÂÄÚ´æ²»¹»´æ·Å£¬½ø¶øÒçдµ½´ÅÅÌÎļþÖС£Òò´ËÔÚshuffle¹ý³ÌÖУ¬¿ÉÄܻᷢÉú´óÁ¿µÄ´ÅÅÌÎļþ¶ÁдµÄIO²Ù×÷£¬ÒÔ¼°Êý¾ÝµÄÍøÂç´«Êä²Ù×÷¡£´ÅÅÌIOºÍÍøÂçÊý¾Ý´«ÊäÒ²ÊÇshuffleÐÔÄܽϲîµÄÖ÷ÒªÔÒò¡£
Òò´ËÔÚÎÒÃǵĿª·¢¹ý³ÌÖУ¬ÄܱÜÃâÔò¾¡¿ÉÄܱÜÃâʹÓÃreduceByKey¡¢join¡¢distinct¡¢repartitionµÈ»á½øÐÐshuffleµÄËã×Ó£¬¾¡Á¿Ê¹ÓÃmapÀàµÄ·ÇshuffleËã×Ó¡£ÕâÑùµÄ»°£¬Ã»ÓÐshuffle²Ù×÷»òÕß½öÓнÏÉÙshuffle²Ù×÷µÄSpark×÷Òµ£¬¿ÉÒÔ´ó´ó¼õÉÙÐÔÄÜ¿ªÏú¡£
BroadcastÓëmap½øÐÐjoin´úÂëʾÀý
// ´«Í³µÄjoin²Ù×÷»áµ¼ÖÂshuffle²Ù×÷¡£ // ÒòΪÁ½¸öRDDÖУ¬ ÏàͬµÄkey¶¼ÐèҪͨ¹ýÍøÂçÀÈ¡µ½Ò»¸ö½ÚµãÉÏ£¬ÓÉÒ»¸ötask½øÐÐjoin²Ù×÷¡£ val rdd3 = rdd1.join(rdd2) // Broadcast+mapµÄjoin²Ù×÷£¬²»»áµ¼ÖÂshuffle²Ù×÷¡£ // ʹÓÃBroadcast½«Ò»¸öÊý¾ÝÁ¿½ÏСµÄRDD×÷Ϊ¹ã²¥±äÁ¿¡£ val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // ÔÚrdd1.mapËã×ÓÖУ¬¿ÉÒÔ´Órdd2DataBroadcastÖУ¬ »ñÈ¡rdd2µÄËùÓÐÊý¾Ý¡£ // È»ºó½øÐбéÀú£¬ Èç¹û·¢ÏÖrdd2ÖÐijÌõÊý¾ÝµÄkeyÓërdd1µÄµ±Ç°Êý¾ÝµÄkeyÊÇÏàͬµÄ£¬ ÄÇô¾ÍÅж¨¿ÉÒÔ½øÐÐjoin¡£ // ´Ëʱ¾Í¿ÉÒÔ¸ù¾Ý×Ô¼ºÐèÒªµÄ·½Ê½£¬ ½«rdd1µ±Ç°Êý¾ÝÓërdd2ÖпÉÒÔÁ¬½ÓµÄÊý¾Ý£¬ Æ´½ÓÔÚÒ»Æð£¨String»òTuple£©¡£ val rdd3 = rdd1.map(rdd2DataBroadcast...) // ×¢Ò⣬ÒÔÉϲÙ×÷£¬ ½¨Òé½ö½öÔÚrdd2µÄÊý¾ÝÁ¿±È½ÏÉÙ £¨±ÈÈ缸°ÙM£¬»òÕßÒ»Á½G£©µÄÇé¿öÏÂʹÓᣠ// ÒòΪÿ¸öExecutorµÄÄÚ´æÖУ¬¶¼»áפÁôÒ»·Ýrdd2µÄÈ«Á¿Êý¾Ý¡£ |
ÔÔòÎ壺ʹÓÃmap-sideÔ¤¾ÛºÏµÄshuffle²Ù×÷
Èç¹ûÒòΪҵÎñÐèÒª£¬Ò»¶¨ÒªÊ¹ÓÃshuffle²Ù×÷£¬ÎÞ·¨ÓÃmapÀàµÄËã×ÓÀ´Ìæ´ú£¬ÄÇô¾¡Á¿Ê¹ÓÿÉÒÔmap-sideÔ¤¾ÛºÏµÄËã×Ó¡£
ËùνµÄmap-sideÔ¤¾ÛºÏ£¬ËµµÄÊÇÔÚÿ¸ö½Úµã±¾µØ¶ÔÏàͬµÄkey½øÐÐÒ»´Î¾ÛºÏ²Ù×÷£¬ÀàËÆÓÚMapReduceÖеı¾µØcombiner¡£map-sideÔ¤¾ÛºÏÖ®ºó£¬Ã¿¸ö½Úµã±¾µØ¾ÍÖ»»áÓÐÒ»ÌõÏàͬµÄkey£¬ÒòΪ¶àÌõÏàͬµÄkey¶¼±»¾ÛºÏÆðÀ´ÁË¡£ÆäËû½ÚµãÔÚÀÈ¡ËùÓнڵãÉϵÄÏàͬkeyʱ£¬¾Í»á´ó´ó¼õÉÙÐèÒªÀÈ¡µÄÊý¾ÝÊýÁ¿£¬´Ó¶øÒ²¾Í¼õÉÙÁË´ÅÅÌIOÒÔ¼°ÍøÂç´«Ê俪Ïú¡£Í¨³£À´Ëµ£¬ÔÚ¿ÉÄܵÄÇé¿öÏ£¬½¨ÒéʹÓÃreduceByKey»òÕßaggregateByKeyËã×ÓÀ´Ìæ´úµôgroupByKeyËã×Ó¡£ÒòΪreduceByKeyºÍaggregateByKeyËã×Ó¶¼»áʹÓÃÓû§×Ô¶¨ÒåµÄº¯Êý¶Ôÿ¸ö½Úµã±¾µØµÄÏàͬkey½øÐÐÔ¤¾ÛºÏ¡£¶øgroupByKeyËã×ÓÊDz»»á½øÐÐÔ¤¾ÛºÏµÄ£¬È«Á¿µÄÊý¾Ý»áÔÚ¼¯ÈºµÄ¸÷¸ö½ÚµãÖ®¼ä·Ö·¢ºÍ´«Ê䣬ÐÔÄÜÏà¶ÔÀ´Ëµ±È½Ï²î¡£
±ÈÈçÈçÏÂÁ½·ùͼ£¬¾ÍÊǵäÐ͵ÄÀý×Ó£¬·Ö±ð»ùÓÚreduceByKeyºÍgroupByKey½øÐе¥´Ê¼ÆÊý¡£ÆäÖеÚÒ»ÕÅͼÊÇgroupByKeyµÄÔÀíͼ£¬¿ÉÒÔ¿´µ½£¬Ã»ÓнøÐÐÈκα¾µØ¾ÛºÏʱ£¬ËùÓÐÊý¾Ý¶¼»áÔÚ¼¯Èº½ÚµãÖ®¼ä´«Ê䣻µÚ¶þÕÅͼÊÇreduceByKeyµÄÔÀíͼ£¬¿ÉÒÔ¿´µ½£¬Ã¿¸ö½Úµã±¾µØµÄÏàͬkeyÊý¾Ý£¬¶¼½øÐÐÁËÔ¤¾ÛºÏ£¬È»ºó²Å´«Êäµ½ÆäËû½ÚµãÉϽøÐÐÈ«¾Ö¾ÛºÏ¡£

ÔÔòÁù£ºÊ¹ÓøßÐÔÄܵÄËã×Ó
³ýÁËshuffleÏà¹ØµÄËã×ÓÓÐÓÅ»¯ÔÔòÖ®Í⣬ÆäËûµÄËã×ÓÒ²¶¼ÓÐ×ÅÏàÓ¦µÄÓÅ»¯ÔÔò¡£
ʹÓÃreduceByKey/aggregateByKeyÌæ´úgroupByKey
ÏêÇé¼û¡°ÔÔòÎ壺ʹÓÃmap-sideÔ¤¾ÛºÏµÄshuffle²Ù×÷¡±¡£
ʹÓÃmapPartitionsÌæ´úÆÕͨmap
mapPartitionsÀàµÄËã×Ó£¬Ò»´Îº¯Êýµ÷ÓûᴦÀíÒ»¸öpartitionËùÓеÄÊý¾Ý£¬¶ø²»ÊÇÒ»´Îº¯Êýµ÷Óô¦ÀíÒ»Ìõ£¬ÐÔÄÜÏà¶ÔÀ´Ëµ»á¸ßһЩ¡£µ«ÊÇÓеÄʱºò£¬Ê¹ÓÃmapPartitions»á³öÏÖOOM£¨ÄÚ´æÒç³ö£©µÄÎÊÌâ¡£ÒòΪµ¥´Îº¯Êýµ÷ÓþÍÒª´¦ÀíµôÒ»¸öpartitionËùÓеÄÊý¾Ý£¬Èç¹ûÄÚ´æ²»¹»£¬À¬»ø»ØÊÕʱÊÇÎÞ·¨»ØÊÕµôÌ«¶à¶ÔÏóµÄ£¬ºÜ¿ÉÄܳöÏÖOOMÒì³£¡£ËùÒÔʹÓÃÕâÀà²Ù×÷ʱҪÉ÷ÖØ£¡ p class="artdir3">
ʹÓÃforeachPartitionsÌæ´úforeach
ÔÀíÀàËÆÓÚ¡°Ê¹ÓÃmapPartitionsÌæ´úmap¡±£¬Ò²ÊÇÒ»´Îº¯Êýµ÷Óô¦ÀíÒ»¸öpartitionµÄËùÓÐÊý¾Ý£¬¶ø²»ÊÇÒ»´Îº¯Êýµ÷Óô¦ÀíÒ»ÌõÊý¾Ý¡£ÔÚʵ¼ùÖз¢ÏÖ£¬foreachPartitionsÀàµÄËã×Ó£¬¶ÔÐÔÄܵÄÌáÉý»¹ÊǺÜÓаïÖúµÄ¡£±ÈÈçÔÚforeachº¯ÊýÖУ¬½«RDDÖÐËùÓÐÊý¾ÝдMySQL£¬ÄÇôÈç¹ûÊÇÆÕͨµÄforeachËã×Ó£¬¾Í»áÒ»ÌõÊý¾ÝÒ»ÌõÊý¾ÝµØÐ´£¬Ã¿´Îº¯Êýµ÷ÓÿÉÄܾͻᴴ½¨Ò»¸öÊý¾Ý¿âÁ¬½Ó£¬´Ëʱ¾ÍÊÆ±Ø»áƵ·±µØ´´½¨ºÍÏú»ÙÊý¾Ý¿âÁ¬½Ó£¬ÐÔÄÜÊǷdz£µÍÏ£»µ«ÊÇÈç¹ûÓÃforeachPartitionsËã×ÓÒ»´ÎÐÔ´¦ÀíÒ»¸öpartitionµÄÊý¾Ý£¬ÄÇô¶ÔÓÚÿ¸öpartition£¬Ö»Òª´´½¨Ò»¸öÊý¾Ý¿âÁ¬½Ó¼´¿É£¬È»ºóÖ´ÐÐÅúÁ¿²åÈë²Ù×÷£¬´ËʱÐÔÄÜÊDZȽϸߵġ£Êµ¼ùÖз¢ÏÖ£¬¶ÔÓÚ1ÍòÌõ×óÓÒµÄÊý¾ÝÁ¿Ð´MySQL£¬ÐÔÄÜ¿ÉÒÔÌáÉý30%ÒÔÉÏ¡£
ʹÓÃfilterÖ®ºó½øÐÐcoalesce²Ù×÷
ͨ³£¶ÔÒ»¸öRDDÖ´ÐÐfilterËã×Ó¹ýÂ˵ôRDDÖн϶àÊý¾Ýºó£¨±ÈÈç30%ÒÔÉϵÄÊý¾Ý£©£¬½¨ÒéʹÓÃcoalesceËã×Ó£¬ÊÖ¶¯¼õÉÙRDDµÄpartitionÊýÁ¿£¬½«RDDÖеÄÊý¾ÝѹËõµ½¸üÉÙµÄpartitionÖÐÈ¥¡£ÒòΪfilterÖ®ºó£¬RDDµÄÿ¸öpartitionÖж¼»áÓкܶàÊý¾Ý±»¹ýÂ˵ô£¬´ËʱÈç¹ûÕÕ³£½øÐкóÐøµÄ¼ÆË㣬Æäʵÿ¸ötask´¦ÀíµÄpartitionÖеÄÊý¾ÝÁ¿²¢²»ÊǺܶ࣬ÓÐÒ»µã×ÊÔ´ÀË·Ñ£¬¶øÇÒ´Ëʱ´¦ÀíµÄtaskÔ½¶à£¬¿ÉÄÜËÙ¶È·´¶øÔ½Âý¡£Òò´ËÓÃcoalesce¼õÉÙpartitionÊýÁ¿£¬½«RDDÖеÄÊý¾ÝѹËõµ½¸üÉÙµÄpartitionÖ®ºó£¬Ö»ÒªÊ¹ÓøüÉÙµÄtask¼´¿É´¦ÀíÍêËùÓеÄpartition¡£ÔÚijЩ³¡¾°Ï£¬¶ÔÓÚÐÔÄܵÄÌáÉý»áÓÐÒ»¶¨µÄ°ïÖú¡£
ʹÓÃrepartitionAndSortWithinPartitionsÌæ´úrepartitionÓësortÀà²Ù×÷
repartitionAndSortWithinPartitionsÊÇSpark¹ÙÍøÍÆ¼öµÄÒ»¸öËã×Ó£¬¹Ù·½½¨Ò飬Èç¹ûÐèÒªÔÚrepartitionÖØ·ÖÇøÖ®ºó£¬»¹Òª½øÐÐÅÅÐò£¬½¨ÒéÖ±½ÓʹÓÃrepartitionAndSortWithinPartitionsËã×Ó¡£ÒòΪ¸ÃËã×Ó¿ÉÒÔÒ»±ß½øÐÐÖØ·ÖÇøµÄshuffle²Ù×÷£¬Ò»±ß½øÐÐÅÅÐò¡£shuffleÓësortÁ½¸ö²Ù×÷ͬʱ½øÐУ¬±ÈÏÈshuffleÔÙsortÀ´Ëµ£¬ÐÔÄÜ¿ÉÄÜÊÇÒª¸ßµÄ¡£
ÔÔòÆß£º¹ã²¥´ó±äÁ¿
ÓÐʱÔÚ¿ª·¢¹ý³ÌÖУ¬»áÓöµ½ÐèÒªÔÚËã×Óº¯ÊýÖÐʹÓÃÍⲿ±äÁ¿µÄ³¡¾°£¨ÓÈÆäÊÇ´ó±äÁ¿£¬±ÈÈç100MÒÔÉϵĴ󼯺ϣ©£¬ÄÇô´Ëʱ¾ÍÓ¦¸ÃʹÓÃSparkµÄ¹ã²¥£¨Broadcast£©¹¦ÄÜÀ´ÌáÉýÐÔÄÜ¡£
ÔÚËã×Óº¯ÊýÖÐʹÓõ½Íⲿ±äÁ¿Ê±£¬Ä¬ÈÏÇé¿öÏ£¬Spark»á½«¸Ã±äÁ¿¸´Öƶà¸ö¸±±¾£¬Í¨¹ýÍøÂç´«Êäµ½taskÖУ¬´Ëʱÿ¸ötask¶¼ÓÐÒ»¸ö±äÁ¿¸±±¾¡£Èç¹û±äÁ¿±¾Éí±È½Ï´óµÄ»°£¨±ÈÈç100M£¬ÉõÖÁ1G£©£¬ÄÇô´óÁ¿µÄ±äÁ¿¸±±¾ÔÚÍøÂçÖд«ÊäµÄÐÔÄÜ¿ªÏú£¬ÒÔ¼°ÔÚ¸÷¸ö½ÚµãµÄExecutorÖÐÕ¼Óùý¶àÄÚ´æµ¼ÖÂµÄÆµ·±GC£¬¶¼»á¼«´óµØÓ°ÏìÐÔÄÜ¡£
Òò´Ë¶ÔÓÚÉÏÊöÇé¿ö£¬Èç¹ûʹÓõÄÍⲿ±äÁ¿±È½Ï´ó£¬½¨ÒéʹÓÃSparkµÄ¹ã²¥¹¦ÄÜ£¬¶Ô¸Ã±äÁ¿½øÐй㲥¡£¹ã²¥ºóµÄ±äÁ¿£¬»á±£Ö¤Ã¿¸öExecutorµÄÄÚ´æÖУ¬Ö»×¤ÁôÒ»·Ý±äÁ¿¸±±¾£¬¶øExecutorÖеÄtaskÖ´ÐÐʱ¹²Ïí¸ÃExecutorÖеÄÄǷݱäÁ¿¸±±¾¡£ÕâÑùµÄ»°£¬¿ÉÒÔ´ó´ó¼õÉÙ±äÁ¿¸±±¾µÄÊýÁ¿£¬´Ó¶ø¼õÉÙÍøÂç´«ÊäµÄÐÔÄÜ¿ªÏú£¬²¢¼õÉÙ¶ÔExecutorÄÚ´æµÄÕ¼ÓÿªÏú£¬½µµÍGCµÄƵÂÊ¡£
¹ã²¥´ó±äÁ¿µÄ´úÂëʾÀý
// ÒÔÏ´úÂëÔÚËã×Óº¯ÊýÖУ¬Ê¹ÓÃÁËÍⲿµÄ±äÁ¿¡£ // ´ËʱûÓÐ×öÈκÎÌØÊâ²Ù×÷£¬Ã¿¸ötask¶¼»áÓÐÒ»·Ýlist1µÄ¸±±¾¡£ val list1 = ... rdd1.map(list1...) // ÒÔÏ´úÂ뽫list1·â×°³ÉÁËBroadcastÀàÐ͵Ĺ㲥±äÁ¿¡£ // ÔÚËã×Óº¯ÊýÖУ¬Ê¹Óù㲥±äÁ¿Ê± £¬Ê×ÏÈ»áÅжϵ±Ç°taskËùÔÚExecutorÄÚ´æÖУ¬ ÊÇ·ñÓбäÁ¿¸±±¾¡£ // Èç¹ûÓÐÔòÖ±½ÓʹÓã» Èç¹ûûÓÐÔò´ÓDriver»òÕ߯äËûExecutor½ÚµãÉÏ Ô¶³ÌÀȡһ·Ý·Åµ½±¾µØExecutorÄÚ´æÖС£ // ÿ¸öExecutorÄÚ´æÖУ¬¾ÍÖ»»áפÁôÒ»·Ý¹ã²¥±äÁ¿¸±±¾¡£ val list1 = ... val list1Broadcast = sc.broadcast(list1) rdd1.map(list1Broadcast...)
|
ÔÔò°Ë£ºÊ¹ÓÃKryoÓÅ»¯ÐòÁл¯ÐÔÄÜ
ÔÚSparkÖУ¬Ö÷ÒªÓÐÈý¸öµØ·½Éæ¼°µ½ÁËÐòÁл¯£º
ÔÚËã×Óº¯ÊýÖÐʹÓõ½Íⲿ±äÁ¿Ê±£¬¸Ã±äÁ¿»á±»ÐòÁл¯ºó½øÐÐÍøÂç´«Ê䣨¼û¡°ÔÔòÆß£º¹ã²¥´ó±äÁ¿¡±ÖеĽ²½â£©¡£
½«×Ô¶¨ÒåµÄÀàÐÍ×÷ΪRDDµÄ·ºÐÍÀàÐÍʱ£¨±ÈÈçJavaRDD£¬StudentÊÇ×Ô¶¨ÒåÀàÐÍ£©£¬ËùÓÐ×Ô¶¨ÒåÀàÐͶÔÏ󣬶¼»á½øÐÐÐòÁл¯¡£Òò´ËÕâÖÖÇé¿öÏ£¬Ò²ÒªÇó×Ô¶¨ÒåµÄÀà±ØÐëʵÏÖSerializable½Ó¿Ú¡£
ʹÓÿÉÐòÁл¯µÄ³Ö¾Ã»¯²ßÂÔʱ£¨±ÈÈçMEMORY_ONLY_SER£©£¬Spark»á½«RDDÖеÄÿ¸öpartition¶¼ÐòÁл¯³ÉÒ»¸ö´óµÄ×Ö½ÚÊý×é¡£
¶ÔÓÚÕâÈýÖÖ³öÏÖÐòÁл¯µÄµØ·½£¬ÎÒÃǶ¼¿ÉÒÔͨ¹ýʹÓÃKryoÐòÁл¯Àà¿â£¬À´ÓÅ»¯ÐòÁл¯ºÍ·´ÐòÁл¯µÄÐÔÄÜ¡£SparkĬÈÏʹÓõÄÊÇJavaµÄÐòÁл¯»úÖÆ£¬Ò²¾ÍÊÇObjectOutputStream/ObjectInputStream
APIÀ´½øÐÐÐòÁл¯ºÍ·´ÐòÁл¯¡£µ«ÊÇSparkͬʱ֧³ÖʹÓÃKryoÐòÁл¯¿â£¬KryoÐòÁл¯Àà¿âµÄÐÔÄܱÈJavaÐòÁл¯Àà¿âµÄÐÔÄÜÒª¸ßºÜ¶à¡£¹Ù·½½éÉÜ£¬KryoÐòÁл¯»úÖÆ±ÈJavaÐòÁл¯»úÖÆ£¬ÐÔÄܸß10±¶×óÓÒ¡£SparkÖ®ËùÒÔĬÈÏûÓÐʹÓÃKryo×÷ΪÐòÁл¯Àà¿â£¬ÊÇÒòΪKryoÒªÇó×îºÃҪע²áËùÓÐÐèÒª½øÐÐÐòÁл¯µÄ×Ô¶¨ÒåÀàÐÍ£¬Òò´Ë¶ÔÓÚ¿ª·¢ÕßÀ´Ëµ£¬ÕâÖÖ·½Ê½±È½ÏÂé·³¡£
ÒÔÏÂÊÇʹÓÃKryoµÄ´úÂëʾÀý£¬ÎÒÃÇÖ»ÒªÉèÖÃÐòÁл¯À࣬ÔÙ×¢²áÒªÐòÁл¯µÄ×Ô¶¨ÒåÀàÐͼ´¿É£¨±ÈÈçËã×Óº¯ÊýÖÐʹÓõ½µÄÍⲿ±äÁ¿ÀàÐÍ¡¢×÷ΪRDD·ºÐÍÀàÐ͵Ä×Ô¶¨ÒåÀàÐ͵ȣ©£º
// ´´½¨SparkConf¶ÔÏó¡£ val conf = new SparkConf().setMaster(...).setAppName(...) // ÉèÖÃÐòÁл¯Æ÷ΪKryoSerializer¡£ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // ×¢²áÒªÐòÁл¯µÄ×Ô¶¨ÒåÀàÐÍ¡£ conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
|
ÔÔò¾Å£ºÓÅ»¯Êý¾Ý½á¹¹
JavaÖУ¬ÓÐÈýÖÖÀàÐͱȽϺķÑÄڴ棺
¶ÔÏó£¬Ã¿¸öJava¶ÔÏó¶¼ÓжÔÏóÍ·¡¢ÒýÓõȶîÍâµÄÐÅÏ¢£¬Òò´Ë±È½ÏÕ¼ÓÃÄÚ´æ¿Õ¼ä¡£
×Ö·û´®£¬Ã¿¸ö×Ö·û´®ÄÚ²¿¶¼ÓÐÒ»¸ö×Ö·ûÊý×éÒÔ¼°³¤¶ÈµÈ¶îÍâÐÅÏ¢¡£
¼¯ºÏÀàÐÍ£¬±ÈÈçHashMap¡¢LinkedListµÈ£¬ÒòΪ¼¯ºÏÀàÐÍÄÚ²¿Í¨³£»áʹÓÃһЩÄÚ²¿ÀàÀ´·â×°¼¯ºÏÔªËØ£¬±ÈÈçMap.Entry¡£
Òò´ËSpark¹Ù·½½¨Ò飬ÔÚSpark±àÂëʵÏÖÖУ¬ÌرðÊǶÔÓÚËã×Óº¯ÊýÖеĴúÂ룬¾¡Á¿²»ÒªÊ¹ÓÃÉÏÊöÈýÖÖÊý¾Ý½á¹¹£¬¾¡Á¿Ê¹ÓÃ×Ö·û´®Ìæ´ú¶ÔÏó£¬Ê¹ÓÃÔʼÀàÐÍ£¨±ÈÈçInt¡¢Long£©Ìæ´ú×Ö·û´®£¬Ê¹ÓÃÊý×éÌæ´ú¼¯ºÏÀàÐÍ£¬ÕâÑù¾¡¿ÉÄܵؼõÉÙÄÚ´æÕ¼Ó㬴Ӷø½µµÍGCƵÂÊ£¬ÌáÉýÐÔÄÜ¡£
µ«ÊÇÔÚ±ÊÕߵıàÂëʵ¼ùÖз¢ÏÖ£¬Òª×öµ½¸ÃÔÔòÆäʵ²¢²»ÈÝÒס£ÒòΪÎÒÃÇͬʱҪ¿¼Âǵ½´úÂëµÄ¿Éά»¤ÐÔ£¬Èç¹ûÒ»¸ö´úÂëÖУ¬ÍêȫûÓÐÈκζÔÏó³éÏó£¬È«²¿ÊÇ×Ö·û´®Æ´½ÓµÄ·½Ê½£¬ÄÇô¶ÔÓÚºóÐøµÄ´úÂëά»¤ºÍÐ޸ģ¬ÎÞÒÉÊÇÒ»³¡¾Þ´óµÄÔÖÄÑ¡£Í¬Àí£¬Èç¹ûËùÓвÙ×÷¶¼»ùÓÚÊý×éʵÏÖ£¬¶ø²»Ê¹ÓÃHashMap¡¢LinkedListµÈ¼¯ºÏÀàÐÍ£¬ÄÇô¶ÔÓÚÎÒÃǵıàÂëÄѶÈÒÔ¼°´úÂë¿Éά»¤ÐÔ£¬Ò²ÊÇÒ»¸ö¼«´óµÄÌôÕ½¡£Òò´Ë±ÊÕß½¨Ò飬ÔÚ¿ÉÄÜÒÔ¼°ºÏÊʵÄÇé¿öÏ£¬Ê¹ÓÃÕ¼ÓÃÄÚ´æ½ÏÉÙµÄÊý¾Ý½á¹¹£¬µ«ÊÇǰÌáÊÇÒª±£Ö¤´úÂëµÄ¿Éά»¤ÐÔ¡£
×ÊÔ´µ÷ÓÅ
µ÷ÓŸÅÊö
ÔÚ¿ª·¢ÍêSpark×÷ÒµÖ®ºó£¬¾Í¸ÃΪ×÷ÒµÅäÖúÏÊʵÄ×ÊÔ´ÁË¡£SparkµÄ×ÊÔ´²ÎÊý£¬»ù±¾¶¼¿ÉÒÔÔÚspark-submitÃüÁîÖÐ×÷Ϊ²ÎÊýÉèÖ᣺ܶàSpark³õѧÕߣ¬Í¨³£²»ÖªµÀ¸ÃÉèÖÃÄÄЩ±ØÒªµÄ²ÎÊý£¬ÒÔ¼°ÈçºÎÉèÖÃÕâЩ²ÎÊý£¬×îºó¾ÍÖ»ÄܺúÂÒÉèÖã¬ÉõÖÁѹ¸ù¶ù²»ÉèÖá£×ÊÔ´²ÎÊýÉèÖõIJ»ºÏÀí£¬¿ÉÄܻᵼÖÂûÓгä·ÖÀûÓü¯Èº×ÊÔ´£¬×÷ÒµÔËÐлἫÆä»ºÂý£»»òÕßÉèÖõÄ×ÊÔ´¹ý´ó£¬¶ÓÁÐûÓÐ×ã¹»µÄ×ÊÔ´À´Ìṩ£¬½ø¶øµ¼Ö¸÷ÖÖÒì³£¡£×ÜÖ®£¬ÎÞÂÛÊÇÄÄÖÖÇé¿ö£¬¶¼»áµ¼ÖÂSpark×÷ÒµµÄÔËÐÐЧÂʵÍÏ£¬ÉõÖÁ¸ù±¾ÎÞ·¨ÔËÐС£Òò´ËÎÒÃDZØÐë¶ÔSpark×÷ÒµµÄ×ÊԴʹÓÃÔÀíÓÐÒ»¸öÇåÎúµÄÈÏʶ£¬²¢ÖªµÀÔÚSpark×÷ÒµÔËÐйý³ÌÖУ¬ÓÐÄÄЩ×ÊÔ´²ÎÊýÊÇ¿ÉÒÔÉèÖõģ¬ÒÔ¼°ÈçºÎÉèÖúÏÊʵIJÎÊýÖµ¡£
Spark×÷Òµ»ù±¾ÔËÐÐÔÀí

ÏêϸÔÀí¼ûÉÏͼ¡£ÎÒÃÇʹÓÃspark-submitÌá½»Ò»¸öSpark×÷ÒµÖ®ºó£¬Õâ¸ö×÷Òµ¾Í»áÆô¶¯Ò»¸ö¶ÔÓ¦µÄDriver½ø³Ì¡£¸ù¾ÝÄãʹÓõIJ¿Êðģʽ£¨deploy-mode£©²»Í¬£¬Driver½ø³Ì¿ÉÄÜÔÚ±¾µØÆô¶¯£¬Ò²¿ÉÄÜÔÚ¼¯ÈºÖÐij¸ö¹¤×÷½ÚµãÉÏÆô¶¯¡£Driver½ø³Ì±¾Éí»á¸ù¾ÝÎÒÃÇÉèÖõIJÎÊý£¬Õ¼ÓÐÒ»¶¨ÊýÁ¿µÄÄÚ´æºÍCPU
core¡£¶øDriver½ø³ÌÒª×öµÄµÚÒ»¼þÊÂÇ飬¾ÍÊÇÏò¼¯Èº¹ÜÀíÆ÷£¨¿ÉÒÔÊÇSpark Standalone¼¯Èº£¬Ò²¿ÉÒÔÊÇÆäËûµÄ×ÊÔ´¹ÜÀí¼¯Èº£¬ÃÀÍÅ?´óÖÚµãÆÀʹÓõÄÊÇYARN×÷Ϊ×ÊÔ´¹ÜÀí¼¯Èº£©ÉêÇëÔËÐÐSpark×÷ÒµÐèҪʹÓõÄ×ÊÔ´£¬ÕâÀïµÄ×ÊÔ´Ö¸µÄ¾ÍÊÇExecutor½ø³Ì¡£YARN¼¯Èº¹ÜÀíÆ÷»á¸ù¾ÝÎÒÃÇΪSpark×÷ÒµÉèÖõÄ×ÊÔ´²ÎÊý£¬ÔÚ¸÷¸ö¹¤×÷½ÚµãÉÏ£¬Æô¶¯Ò»¶¨ÊýÁ¿µÄExecutor½ø³Ì£¬Ã¿¸öExecutor½ø³Ì¶¼Õ¼ÓÐÒ»¶¨ÊýÁ¿µÄÄÚ´æºÍCPU
core¡£
ÔÚÉêÇëµ½ÁË×÷ÒµÖ´ÐÐËùÐèµÄ×ÊÔ´Ö®ºó£¬Driver½ø³Ì¾Í»á¿ªÊ¼µ÷¶ÈºÍÖ´ÐÐÎÒÃDZàдµÄ×÷Òµ´úÂëÁË¡£Driver½ø³Ì»á½«ÎÒÃDZàдµÄSpark×÷Òµ´úÂë·Ö²ðΪ¶à¸östage£¬Ã¿¸östageÖ´ÐÐÒ»²¿·Ö´úÂëÆ¬¶Î£¬²¢ÎªÃ¿¸östage´´½¨Ò»Åútask£¬È»ºó½«ÕâЩtask·ÖÅäµ½¸÷¸öExecutor½ø³ÌÖÐÖ´ÐС£taskÊÇ×îСµÄ¼ÆËãµ¥Ôª£¬¸ºÔðÖ´ÐÐһģһÑùµÄ¼ÆËãÂß¼£¨Ò²¾ÍÊÇÎÒÃÇ×Ô¼º±àдµÄij¸ö´úÂëÆ¬¶Î£©£¬Ö»ÊÇÿ¸ötask´¦ÀíµÄÊý¾Ý²»Í¬¶øÒÑ¡£Ò»¸östageµÄËùÓÐtask¶¼Ö´ÐÐÍê±ÏÖ®ºó£¬»áÔÚ¸÷¸ö½Úµã±¾µØµÄ´ÅÅÌÎļþÖÐдÈë¼ÆËãÖмä½á¹û£¬È»ºóDriver¾Í»áµ÷¶ÈÔËÐÐÏÂÒ»¸östage¡£ÏÂÒ»¸östageµÄtaskµÄÊäÈëÊý¾Ý¾ÍÊÇÉÏÒ»¸östageÊä³öµÄÖмä½á¹û¡£Èç´ËÑ»·Íù¸´£¬Ö±µ½½«ÎÒÃÇ×Ô¼º±àдµÄ´úÂëÂ߼ȫ²¿Ö´ÐÐÍ꣬²¢ÇÒ¼ÆËãÍêËùÓеÄÊý¾Ý£¬µÃµ½ÎÒÃÇÏëÒªµÄ½á¹ûΪֹ¡£
SparkÊǸù¾ÝshuffleÀàËã×ÓÀ´½øÐÐstageµÄ»®·Ö¡£Èç¹ûÎÒÃǵĴúÂëÖÐÖ´ÐÐÁËij¸öshuffleÀàËã×Ó£¨±ÈÈçreduceByKey¡¢joinµÈ£©£¬ÄÇô¾Í»áÔÚ¸ÃËã×Ó´¦£¬»®·Ö³öÒ»¸östage½çÏÞÀ´¡£¿ÉÒÔ´óÖÂÀí½âΪ£¬shuffleËã×ÓÖ´ÐÐ֮ǰµÄ´úÂë»á±»»®·ÖΪһ¸östage£¬shuffleËã×ÓÖ´ÐÐÒÔ¼°Ö®ºóµÄ´úÂë»á±»»®·ÖΪÏÂÒ»¸östage¡£Òò´ËÒ»¸östage¸Õ¿ªÊ¼Ö´ÐеÄʱºò£¬ËüµÄÿ¸ötask¿ÉÄܶ¼»á´ÓÉÏÒ»¸östageµÄtaskËùÔڵĽڵ㣬ȥͨ¹ýÍøÂç´«ÊäÀÈ¡ÐèÒª×Ô¼º´¦ÀíµÄËùÓÐkey£¬È»ºó¶ÔÀÈ¡µ½µÄËùÓÐÏàͬµÄkeyʹÓÃÎÒÃÇ×Ô¼º±àдµÄËã×Óº¯ÊýÖ´ÐоۺϲÙ×÷£¨±ÈÈçreduceByKey()Ëã×Ó½ÓÊյĺ¯Êý£©¡£Õâ¸ö¹ý³Ì¾ÍÊÇshuffle¡£
µ±ÎÒÃÇÔÚ´úÂëÖÐÖ´ÐÐÁËcache/persistµÈ³Ö¾Ã»¯²Ù×÷ʱ£¬¸ù¾ÝÎÒÃÇÑ¡ÔñµÄ³Ö¾Ã»¯¼¶±ðµÄ²»Í¬£¬Ã¿¸ötask¼ÆËã³öÀ´µÄÊý¾ÝÒ²»á±£´æµ½Executor½ø³ÌµÄÄÚ´æ»òÕßËùÔÚ½ÚµãµÄ´ÅÅÌÎļþÖС£
Òò´ËExecutorµÄÄÚ´æÖ÷Òª·ÖΪÈý¿é£ºµÚÒ»¿éÊÇÈÃtaskÖ´ÐÐÎÒÃÇ×Ô¼º±àдµÄ´úÂëʱʹÓã¬Ä¬ÈÏÊÇÕ¼Executor×ÜÄÚ´æµÄ20%£»µÚ¶þ¿éÊÇÈÃtaskͨ¹ýshuffle¹ý³ÌÀÈ¡ÁËÉÏÒ»¸östageµÄtaskµÄÊä³öºó£¬½øÐоۺϵȲÙ×÷ʱʹÓã¬Ä¬ÈÏÒ²ÊÇÕ¼Executor×ÜÄÚ´æµÄ20%£»µÚÈý¿éÊÇÈÃRDD³Ö¾Ã»¯Ê±Ê¹Óã¬Ä¬ÈÏÕ¼Executor×ÜÄÚ´æµÄ60%¡£
taskµÄÖ´ÐÐËÙ¶ÈÊǸúÿ¸öExecutor½ø³ÌµÄCPU coreÊýÁ¿ÓÐÖ±½Ó¹ØÏµµÄ¡£Ò»¸öCPU coreͬһʱ¼äÖ»ÄÜÖ´ÐÐÒ»¸öÏ̡߳£¶øÃ¿¸öExecutor½ø³ÌÉÏ·ÖÅäµ½µÄ¶à¸ötask£¬¶¼ÊÇÒÔÿ¸ötaskÒ»ÌõÏ̵߳ķ½Ê½£¬¶àÏ̲߳¢·¢ÔËÐеġ£Èç¹ûCPU
coreÊýÁ¿±È½Ï³ä×㣬¶øÇÒ·ÖÅäµ½µÄtaskÊýÁ¿±È½ÏºÏÀí£¬ÄÇôͨ³£À´Ëµ£¬¿ÉÒԱȽϿìËٺ͸ßЧµØÖ´ÐÐÍêÕâЩtaskÏ̡߳£
ÒÔÉϾÍÊÇSpark×÷ÒµµÄ»ù±¾ÔËÐÐÔÀíµÄ˵Ã÷£¬´ó¼Ò¿ÉÒÔ½áºÏÉÏͼÀ´Àí½â¡£Àí½â×÷Òµ»ù±¾ÔÀí£¬ÊÇÎÒÃǽøÐÐ×ÊÔ´²ÎÊýµ÷ÓŵĻù±¾Ç°Ìá¡£
×ÊÔ´²ÎÊýµ÷ÓÅ
Á˽âÍêÁËSpark×÷ÒµÔËÐеĻù±¾ÔÀíÖ®ºó£¬¶Ô×ÊÔ´Ïà¹ØµÄ²ÎÊý¾ÍÈÝÒ×Àí½âÁË¡£ËùνµÄSpark×ÊÔ´²ÎÊýµ÷ÓÅ£¬ÆäʵÖ÷Òª¾ÍÊǶÔSparkÔËÐйý³ÌÖи÷¸öʹÓÃ×ÊÔ´µÄµØ·½£¬Í¨¹ýµ÷½Ú¸÷ÖÖ²ÎÊý£¬À´ÓÅ»¯×ÊԴʹÓõÄЧÂÊ£¬´Ó¶øÌáÉýSpark×÷ÒµµÄÖ´ÐÐÐÔÄÜ¡£ÒÔϲÎÊý¾ÍÊÇSparkÖÐÖ÷ÒªµÄ×ÊÔ´²ÎÊý£¬Ã¿¸ö²ÎÊý¶¼¶ÔÓ¦×Å×÷ÒµÔËÐÐÔÀíÖеÄij¸ö²¿·Ö£¬ÎÒÃÇͬʱҲ¸ø³öÁËÒ»¸öµ÷ÓŵIJο¼Öµ¡£
num-executors
²ÎÊý˵Ã÷£º¸Ã²ÎÊýÓÃÓÚÉèÖÃSpark×÷Òµ×ܹ²ÒªÓöàÉÙ¸öExecutor½ø³ÌÀ´Ö´ÐС£DriverÔÚÏòYARN¼¯Èº¹ÜÀíÆ÷ÉêÇë×ÊԴʱ£¬YARN¼¯Èº¹ÜÀíÆ÷»á¾¡¿ÉÄܰ´ÕÕÄãµÄÉèÖÃÀ´ÔÚ¼¯ÈºµÄ¸÷¸ö¹¤×÷½ÚµãÉÏ£¬Æô¶¯ÏàÓ¦ÊýÁ¿µÄExecutor½ø³Ì¡£Õâ¸ö²ÎÊý·Ç³£Ö®ÖØÒª£¬Èç¹û²»ÉèÖõϰ£¬Ä¬ÈÏÖ»»á¸øÄãÆô¶¯ÉÙÁ¿µÄExecutor½ø³Ì£¬´ËʱÄãµÄSpark×÷ÒµµÄÔËÐÐËÙ¶ÈÊǷdz£ÂýµÄ¡£
²ÎÊýµ÷ÓŽ¨Ò飺ÿ¸öSpark×÷ÒµµÄÔËÐÐÒ»°ãÉèÖÃ50~100¸ö×óÓÒµÄExecutor½ø³Ì±È½ÏºÏÊÊ£¬ÉèÖÃÌ«ÉÙ»òÌ«¶àµÄExecutor½ø³Ì¶¼²»ºÃ¡£ÉèÖõÄÌ«ÉÙ£¬ÎÞ·¨³ä·ÖÀûÓü¯Èº×ÊÔ´£»ÉèÖõÄÌ«¶àµÄ»°£¬´ó²¿·Ö¶ÓÁпÉÄÜÎÞ·¨¸øÓè³ä·ÖµÄ×ÊÔ´¡£
executor-memory
²ÎÊý˵Ã÷£º¸Ã²ÎÊýÓÃÓÚÉèÖÃÿ¸öExecutor½ø³ÌµÄÄÚ´æ¡£ExecutorÄÚ´æµÄ´óС£¬ºÜ¶àʱºòÖ±½Ó¾ö¶¨ÁËSpark×÷ÒµµÄÐÔÄÜ£¬¶øÇÒ¸ú³£¼ûµÄJVM
OOMÒì³££¬Ò²ÓÐÖ±½ÓµÄ¹ØÁª¡£
²ÎÊýµ÷ÓŽ¨Ò飺ÿ¸öExecutor½ø³ÌµÄÄÚ´æÉèÖÃ4G~8G½ÏΪºÏÊÊ¡£µ«ÊÇÕâÖ»ÊÇÒ»¸ö²Î¿¼Öµ£¬¾ßÌåµÄÉèÖû¹Êǵøù¾Ý²»Í¬²¿ÃŵÄ×ÊÔ´¶ÓÁÐÀ´¶¨¡£¿ÉÒÔ¿´¿´×Ô¼ºÍŶӵÄ×ÊÔ´¶ÓÁеÄ×î´óÄÚ´æÏÞÖÆÊǶàÉÙ£¬num-executors³ËÒÔexecutor-memory£¬ÊDz»Äܳ¬¹ý¶ÓÁеÄ×î´óÄÚ´æÁ¿µÄ¡£´ËÍ⣬Èç¹ûÄãÊǸúÍŶÓÀïÆäËûÈ˹²ÏíÕâ¸ö×ÊÔ´¶ÓÁУ¬ÄÇôÉêÇëµÄÄÚ´æÁ¿×îºÃ²»Òª³¬¹ý×ÊÔ´¶ÓÁÐ×î´ó×ÜÄÚ´æµÄ1/3~1/2£¬±ÜÃâÄã×Ô¼ºµÄSpark×÷ÒµÕ¼ÓÃÁ˶ÓÁÐËùÓеÄ×ÊÔ´£¬µ¼Ö±ðµÄͬѧµÄ×÷ÒµÎÞ·¨ÔËÐС£
executor-cores
²ÎÊý˵Ã÷£º¸Ã²ÎÊýÓÃÓÚÉèÖÃÿ¸öExecutor½ø³ÌµÄCPU coreÊýÁ¿¡£Õâ¸ö²ÎÊý¾ö¶¨ÁËÿ¸öExecutor½ø³Ì²¢ÐÐÖ´ÐÐtaskÏ̵߳ÄÄÜÁ¦¡£ÒòΪÿ¸öCPU
coreͬһʱ¼äÖ»ÄÜÖ´ÐÐÒ»¸ötaskỊ̈߳¬Òò´Ëÿ¸öExecutor½ø³ÌµÄCPU coreÊýÁ¿Ô½¶à£¬Ô½Äܹ»¿ìËÙµØÖ´ÐÐÍê·ÖÅ䏸×Ô¼ºµÄËùÓÐtaskÏ̡߳£
²ÎÊýµ÷ÓŽ¨Ò飺ExecutorµÄCPU coreÊýÁ¿ÉèÖÃΪ2~4¸ö½ÏΪºÏÊÊ¡£Í¬ÑùµÃ¸ù¾Ý²»Í¬²¿ÃŵÄ×ÊÔ´¶ÓÁÐÀ´¶¨£¬¿ÉÒÔ¿´¿´×Ô¼ºµÄ×ÊÔ´¶ÓÁеÄ×î´óCPU
coreÏÞÖÆÊǶàÉÙ£¬ÔÙÒÀ¾ÝÉèÖõÄExecutorÊýÁ¿£¬À´¾ö¶¨Ã¿¸öExecutor½ø³Ì¿ÉÒÔ·ÖÅäµ½¼¸¸öCPU
core¡£Í¬Ñù½¨Ò飬Èç¹ûÊǸúËûÈ˹²ÏíÕâ¸ö¶ÓÁУ¬ÄÇônum-executors * executor-cores²»Òª³¬¹ý¶ÓÁÐ×ÜCPU
coreµÄ1/3~1/2×óÓұȽϺÏÊÊ£¬Ò²ÊDZÜÃâÓ°ÏìÆäËûͬѧµÄ×÷ÒµÔËÐС£
driver-memory
²ÎÊý˵Ã÷£º¸Ã²ÎÊýÓÃÓÚÉèÖÃDriver½ø³ÌµÄÄÚ´æ¡£
²ÎÊýµ÷ÓŽ¨Ò飺DriverµÄÄÚ´æÍ¨³£À´Ëµ²»ÉèÖ㬻òÕßÉèÖÃ1G×óÓÒÓ¦¸Ã¾Í¹»ÁË¡£Î¨Ò»ÐèҪעÒâµÄÒ»µãÊÇ£¬Èç¹ûÐèҪʹÓÃcollectËã×Ó½«RDDµÄÊý¾ÝÈ«²¿ÀÈ¡µ½DriverÉϽøÐд¦Àí£¬ÄÇô±ØÐëÈ·±£DriverµÄÄÚ´æ×ã¹»´ó£¬·ñÔò»á³öÏÖOOMÄÚ´æÒç³öµÄÎÊÌâ¡£
spark.default.parallelism
²ÎÊý˵Ã÷£º¸Ã²ÎÊýÓÃÓÚÉèÖÃÿ¸östageµÄĬÈÏtaskÊýÁ¿¡£Õâ¸ö²ÎÊý¼«ÎªÖØÒª£¬Èç¹û²»ÉèÖÿÉÄÜ»áÖ±½ÓÓ°ÏìÄãµÄSpark×÷ÒµÐÔÄÜ¡£
²ÎÊýµ÷ÓŽ¨Ò飺Spark×÷ÒµµÄĬÈÏtaskÊýÁ¿Îª500~1000¸ö½ÏΪºÏÊÊ¡£ºÜ¶àͬѧ³£·¸µÄÒ»¸ö´íÎó¾ÍÊDz»È¥ÉèÖÃÕâ¸ö²ÎÊý£¬ÄÇô´Ëʱ¾Í»áµ¼ÖÂSpark×Ô¼º¸ù¾Ýµ×²ãHDFSµÄblockÊýÁ¿À´ÉèÖÃtaskµÄÊýÁ¿£¬Ä¬ÈÏÊÇÒ»¸öHDFS
block¶ÔÓ¦Ò»¸ötask¡£Í¨³£À´Ëµ£¬SparkĬÈÏÉèÖõÄÊýÁ¿ÊÇÆ«Éٵ썱ÈÈç¾Í¼¸Ê®¸ötask£©£¬Èç¹ûtaskÊýÁ¿Æ«Éٵϰ£¬¾Í»áµ¼ÖÂÄãÇ°ÃæÉèÖúõÄExecutorµÄ²ÎÊý¶¼Ç°¹¦¾¡Æú¡£ÊÔÏëһϣ¬ÎÞÂÛÄãµÄExecutor½ø³ÌÓжàÉÙ¸ö£¬ÄÚ´æºÍCPUÓжà´ó£¬µ«ÊÇtaskÖ»ÓÐ1¸ö»òÕß10¸ö£¬ÄÇô90%µÄExecutor½ø³Ì¿ÉÄܸù±¾¾ÍûÓÐtaskÖ´ÐУ¬Ò²¾ÍÊǰװ×ÀË·ÑÁË×ÊÔ´£¡Òò´ËSpark¹ÙÍø½¨ÒéµÄÉèÖÃÔÔòÊÇ£¬ÉèÖøòÎÊýΪnum-executors
* executor-coresµÄ2~3±¶½ÏΪºÏÊÊ£¬±ÈÈçExecutorµÄ×ÜCPU coreÊýÁ¿Îª300¸ö£¬ÄÇôÉèÖÃ1000¸ötaskÊÇ¿ÉÒԵ쬴Ëʱ¿ÉÒÔ³ä·ÖµØÀûÓÃSpark¼¯ÈºµÄ×ÊÔ´¡£
spark.storage.memoryFraction
²ÎÊý˵Ã÷£º¸Ã²ÎÊýÓÃÓÚÉèÖÃRDD³Ö¾Ã»¯Êý¾ÝÔÚExecutorÄÚ´æÖÐÄÜÕ¼µÄ±ÈÀý£¬Ä¬ÈÏÊÇ0.6¡£Ò²¾ÍÊÇ˵£¬Ä¬ÈÏExecutor
60%µÄÄڴ棬¿ÉÒÔÓÃÀ´±£´æ³Ö¾Ã»¯µÄRDDÊý¾Ý¡£¸ù¾ÝÄãÑ¡ÔñµÄ²»Í¬µÄ³Ö¾Ã»¯²ßÂÔ£¬Èç¹ûÄÚ´æ²»¹»Ê±£¬¿ÉÄÜÊý¾Ý¾Í²»»á³Ö¾Ã»¯£¬»òÕßÊý¾Ý»áдÈë´ÅÅÌ¡£
²ÎÊýµ÷ÓŽ¨Ò飺Èç¹ûSpark×÷ÒµÖУ¬Óн϶àµÄRDD³Ö¾Ã»¯²Ù×÷£¬¸Ã²ÎÊýµÄÖµ¿ÉÒÔÊʵ±Ìá¸ßһЩ£¬±£Ö¤³Ö¾Ã»¯µÄÊý¾ÝÄܹ»ÈÝÄÉÔÚÄÚ´æÖС£±ÜÃâÄÚ´æ²»¹»»º´æËùÓеÄÊý¾Ý£¬µ¼ÖÂÊý¾ÝÖ»ÄÜдÈë´ÅÅÌÖУ¬½µµÍÁËÐÔÄÜ¡£µ«ÊÇÈç¹ûSpark×÷ÒµÖеÄshuffleÀà²Ù×÷±È½Ï¶à£¬¶ø³Ö¾Ã»¯²Ù×÷±È½ÏÉÙ£¬ÄÇôÕâ¸ö²ÎÊýµÄÖµÊʵ±½µµÍһЩ±È½ÏºÏÊÊ¡£´ËÍ⣬Èç¹û·¢ÏÖ×÷ÒµÓÉÓÚÆµ·±µÄgcµ¼ÖÂÔËÐлºÂý£¨Í¨¹ýspark
web ui¿ÉÒԹ۲쵽×÷ÒµµÄgcºÄʱ£©£¬Òâζ×ÅtaskÖ´ÐÐÓû§´úÂëµÄÄÚ´æ²»¹»Óã¬ÄÇôͬÑù½¨Òéµ÷µÍÕâ¸ö²ÎÊýµÄÖµ¡£
spark.shuffle.memoryFraction
²ÎÊý˵Ã÷£º¸Ã²ÎÊýÓÃÓÚÉèÖÃshuffle¹ý³ÌÖÐÒ»¸ötaskÀÈ¡µ½ÉϸöstageµÄtaskµÄÊä³öºó£¬½øÐоۺϲÙ×÷ʱÄܹ»Ê¹ÓõÄExecutorÄÚ´æµÄ±ÈÀý£¬Ä¬ÈÏÊÇ0.2¡£Ò²¾ÍÊÇ˵£¬ExecutorĬÈÏÖ»ÓÐ20%µÄÄÚ´æÓÃÀ´½øÐиòÙ×÷¡£shuffle²Ù×÷ÔÚ½øÐоۺÏʱ£¬Èç¹û·¢ÏÖʹÓõÄÄڴ泬³öÁËÕâ¸ö20%µÄÏÞÖÆ£¬ÄÇô¶àÓàµÄÊý¾Ý¾Í»áÒçдµ½´ÅÅÌÎļþÖÐÈ¥£¬´Ëʱ¾Í»á¼«´óµØ½µµÍÐÔÄÜ¡£
²ÎÊýµ÷ÓŽ¨Ò飺Èç¹ûSpark×÷ÒµÖеÄRDD³Ö¾Ã»¯²Ù×÷½ÏÉÙ£¬shuffle²Ù×÷½Ï¶àʱ£¬½¨Òé½µµÍ³Ö¾Ã»¯²Ù×÷µÄÄÚ´æÕ¼±È£¬Ìá¸ßshuffle²Ù×÷µÄÄÚ´æÕ¼±È±ÈÀý£¬±ÜÃâshuffle¹ý³ÌÖÐÊý¾Ý¹ý¶àʱÄÚ´æ²»¹»Ó㬱ØÐëÒçдµ½´ÅÅÌÉÏ£¬½µµÍÁËÐÔÄÜ¡£´ËÍ⣬Èç¹û·¢ÏÖ×÷ÒµÓÉÓÚÆµ·±µÄgcµ¼ÖÂÔËÐлºÂý£¬Òâζ×ÅtaskÖ´ÐÐÓû§´úÂëµÄÄÚ´æ²»¹»Óã¬ÄÇôͬÑù½¨Òéµ÷µÍÕâ¸ö²ÎÊýµÄÖµ¡£
×ÊÔ´²ÎÊýµÄµ÷ÓÅ£¬Ã»ÓÐÒ»¸ö¹Ì¶¨µÄÖµ£¬ÐèҪͬѧÃǸù¾Ý×Ô¼ºµÄʵ¼ÊÇé¿ö£¨°üÀ¨Spark×÷ÒµÖеÄshuffle²Ù×÷ÊýÁ¿¡¢RDD³Ö¾Ã»¯²Ù×÷ÊýÁ¿ÒÔ¼°spark
web uiÖÐÏÔʾµÄ×÷ÒµgcÇé¿ö£©£¬Í¬Ê±²Î¿¼±¾ÆªÎÄÕÂÖиø³öµÄÔÀíÒÔ¼°µ÷ÓŽ¨Ò飬ºÏÀíµØÉèÖÃÉÏÊö²ÎÊý¡£
×ÊÔ´²ÎÊý²Î¿¼Ê¾Àý
ÒÔÏÂÊÇÒ»·Ýspark-submitÃüÁîµÄʾÀý£¬´ó¼Ò¿ÉÒԲο¼Ò»Ï£¬²¢¸ù¾Ý×Ô¼ºµÄʵ¼ÊÇé¿ö½øÐе÷½Ú£º
./bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
|
дÔÚ×îºóµÄ»°
¸ù¾Ýʵ¼ù¾ÑéÀ´¿´£¬´ó²¿·ÖSpark×÷Òµ¾¹ý±¾´Î»ù´¡ÆªËù½²½âµÄ¿ª·¢µ÷ÓÅÓë×ÊÔ´µ÷ÓÅÖ®ºó£¬Ò»°ã¶¼ÄÜÒԽϸߵÄÐÔÄÜÔËÐÐÁË£¬×ãÒÔÂú×ãÎÒÃǵÄÐèÇó¡£µ«ÊÇÔÚ²»Í¬µÄÉú²ú»·¾³ºÍÏîÄ¿±³¾°Ï£¬¿ÉÄÜ»áÓöµ½ÆäËû¸ü¼Ó¼¬ÊÖµÄÎÊÌ⣨±ÈÈç¸÷ÖÖÊý¾ÝÇãб£©£¬Ò²¿ÉÄÜ»áÓöµ½¸ü¸ßµÄÐÔÄÜÒªÇó¡£ÎªÁËÓ¦¶ÔÕâЩÌôÕ½£¬ÐèҪʹÓøü¸ß¼¶µÄ¼¼ÇÉÀ´´¦ÀíÕâÀàÎÊÌâ
|