»ù±¾¸ÅÄîºÍÔÔò
Ê×ÏÈ£¬Òª¸ãÇå³þSparkµÄ¼¸¸ö»ù±¾¸ÅÄîºÍÔÔò£¬·ñÔòϵͳµÄÐÔÄܵ÷ÓÅÎÞ´Ó̸Æð£º

ÿһ̨hostÉÏÃæ¿ÉÒÔ²¢ÐÐN¸öworker£¬Ã¿Ò»¸öworkerÏÂÃæ¿ÉÒÔ²¢ÐÐM¸öexecutor£¬taskÃǻᱻ·ÖÅäµ½executorÉÏÃæÈ¥Ö´ÐС£StageÖ¸µÄÊÇÒ»×é²¢ÐÐÔËÐеÄtask£¬stageÄÚ²¿ÊDz»ÄܳöÏÖshuffleµÄ£¬ÒòΪshuffleµÄ¾ÍÏñÀé°ÊÒ»Ñù×èÖ¹Á˲¢ÐÐtaskµÄÔËÐУ¬Óöµ½shuffle¾ÍÒâζ×ŵ½ÁËstageµÄ±ß½ç¡£
CPUµÄcoreÊýÁ¿£¬Ã¿¸öexecutor¿ÉÒÔÕ¼ÓÃÒ»¸ö»ò¶à¸öcore£¬¿ÉÒÔͨ¹ý¹Û²ìCPUµÄʹÓÃÂʱ仯À´ÁË½â¼ÆËã×ÊÔ´µÄʹÓÃÇé¿ö£¬ÀýÈ磬ºÜ³£¼ûµÄÒ»ÖÖÀË·ÑÊÇÒ»¸öexecutorÕ¼ÓÃÁ˶à¸öcore£¬µ«ÊÇ×ܵÄCPUʹÓÃÂÊÈ´²»¸ß£¨ÒòΪһ¸öexecutor²¢²»×ÜÄܳä·ÖÀûÓöàºËµÄÄÜÁ¦£©£¬Õâ¸öʱºò¿ÉÒÔ¿¼ÂÇÈÃô¸öexecutorÕ¼ÓøüÉÙµÄcore£¬Í¬Ê±workerÏÂÃæÔö¼Ó¸ü¶àµÄexecutor£¬»òÕßһ̨hostÉÏÃæÔö¼Ó¸ü¶àµÄworkerÀ´Ôö¼Ó²¢ÐÐÖ´ÐеÄexecutorµÄÊýÁ¿£¬´Ó¶øÔö¼ÓCPUÀûÓÃÂÊ¡£µ«ÊÇÔö¼ÓexecutorµÄʱºòÐèÒª¿¼ÂǺÃÄÚ´æÏûºÄ£¬ÒòΪһ̨»úÆ÷µÄÄÚ´æ·ÖÅä¸øÔ½¶àµÄexecutor£¬Ã¿¸öexecutorµÄÄÚ´æ¾ÍԽС£¬ÒÔÖ³öÏÖ¹ý¶àµÄÊý¾Ýspill
overÉõÖÁout of memoryµÄÇé¿ö¡£
partitionºÍparallelism£¬partitionÖ¸µÄ¾ÍÊÇÊý¾Ý·ÖƬµÄÊýÁ¿£¬Ã¿Ò»´ÎtaskÖ»ÄÜ´¦ÀíÒ»¸öpartitionµÄÊý¾Ý£¬Õâ¸öֵ̫СÁ˻ᵼÖÂÿƬÊý¾ÝÁ¿Ì«´ó£¬µ¼ÖÂÄÚ´æÑ¹Á¦£¬»òÕßÖî¶àexecutorµÄ¼ÆËãÄÜÁ¦ÎÞ·¨ÀûÓóä·Ö£»µ«ÊÇÈç¹ûÌ«´óÁËÔò»áµ¼ÖÂ·ÖÆ¬Ì«¶à£¬Ö´ÐÐЧÂʽµµÍ¡£ÔÚÖ´ÐÐactionÀàÐͲÙ×÷µÄʱºò£¨±ÈÈç¸÷ÖÖreduce²Ù×÷£©£¬partitionµÄÊýÁ¿»áÑ¡Ôñparent
RDDÖÐ×î´óµÄÄÇÒ»¸ö¡£¶øparallelismÔòÖ¸µÄÊÇÔÚRDD½øÐÐreduceÀà²Ù×÷µÄʱºò£¬Ä¬ÈÏ·µ»ØÊý¾ÝµÄparititionÊýÁ¿£¨¶øÔÚ½øÐÐmapÀà²Ù×÷µÄʱºò£¬partitionÊýÁ¿Í¨³£È¡×Ôparent
RDDÖнϴóµÄÒ»¸ö£¬¶øÇÒÒ²²»»áÉæ¼°shuffle£¬Òò´ËÕâ¸öparallelismµÄ²ÎÊýûÓÐÓ°Ï죩¡£ËùÒÔ˵£¬ÕâÁ½¸ö¸ÅÄîÃÜÇÐÏà¹Ø£¬¶¼ÊÇÉæ¼°µ½Êý¾Ý·ÖƬµÄ£¬×÷Ó÷½Ê½ÆäʵÊÇͳһµÄ¡£Í¨¹ýspark.default.parallelism¿ÉÒÔÉèÖÃĬÈ쵀ᅮ¬ÊýÁ¿£¬¶øºÜ¶àRDDµÄ²Ù×÷¶¼¿ÉÒÔÖ¸¶¨Ò»¸öpartition²ÎÊýÀ´ÏÔʽ¿ØÖƾßÌåµÄ·ÖƬÊýÁ¿¡£
ÉÏÃæÕâÁ½ÌõÔÀíÉÏ¿´ÆðÀ´ºÜ¼òµ¥£¬µ«ÊÇÈ´·Ç³£ÖØÒª£¬¸ù¾ÝÓ²¼þºÍÈÎÎñµÄÇé¿öÑ¡Ôñ²»Í¬µÄȡֵ¡£ÏëҪȡһ¸ö·ÅÖ®Ëĺ£¶ø½Ô×¼µÄÅäÖÃÊDz»ÏÖʵµÄ¡£¿´ÕâÑù¼¸¸öÀý×Ó£º
£¨1£©Êµ¼ùÖÐÅܵÄEMR Spark job£¬ÓеÄÌØ±ðÂý£¬²é¿´CPUÀûÓÃÂʺܵͣ¬ÎÒÃǾͳ¢ÊÔ¼õÉÙÿ¸öexecutorÕ¼ÓÃCPU
coreµÄÊýÁ¿£¬Ôö¼Ó²¢ÐеÄexecutorÊýÁ¿£¬Í¬Ê±ÅäºÏÔö¼Ó·ÖƬ£¬ÕûÌåÉÏÔö¼ÓÁËCPUµÄÀûÓÃÂÊ£¬¼Ó¿ìÊý¾Ý´¦ÀíËÙ¶È¡£
£¨2£©·¢ÏÖijjobºÜÈÝÒ×·¢ÉúÄÚ´æÒç³ö£¬ÎÒÃǾÍÔö´ó·ÖƬÊýÁ¿£¬´Ó¶ø¼õÉÙÁËÿƬÊý¾ÝµÄ¹æÄ££¬Í¬Ê±»¹¼õÉÙ²¢ÐеÄexecutorÊýÁ¿£¬ÕâÑùÏàͬµÄÄÚ´æ×ÊÔ´·ÖÅ䏸ÊýÁ¿¸üÉÙµÄexecutor£¬Ï൱ÓÚÔö¼ÓÁËÿ¸ötaskµÄÄÚ´æ·ÖÅ䣬ÕâÑùÔËÐÐËÙ¶È¿ÉÄÜÂýÁËЩ£¬µ«ÊÇ×ܱÈOOMÇ¿¡£
£¨3£©Êý¾ÝÁ¿ÌرðÉÙ£¬ÓдóÁ¿µÄСÎļþÉú³É£¬¾Í¼õÉÙÎļþ·ÖƬ£¬Ã»±ØÒª´´½¨ÄÇô¶àtask£¬ÕâÖÖÇé¿ö£¬Èç¹ûÖ»ÊÇ×îÔʼµÄinput±È½ÏС£¬Ò»°ã¶¼Äܱ»×¢Òâµ½£»µ«ÊÇ£¬Èç¹ûÊÇÔÚÔËËã¹ý³ÌÖУ¬±ÈÈçÓ¦ÓÃij¸öreduceBy»òÕßij¸öfilterÒÔºó£¬Êý¾Ý´óÁ¿¼õÉÙ£¬ÕâÖÖµÍЧÇé¿ö¾ÍºÜÉÙ±»ÁôÒâµ½¡£
×îºóÔÙ²¹³äÒ»µã£¬Ëæ×ŲÎÊýºÍÅäÖõı仯£¬ÐÔÄܵį¿¾±ÊDZ仯µÄ£¬ÔÚ·ÖÎöÎÊÌâµÄʱºò²»ÒªÍü¼Ç¡£ÀýÈçÔÚÿ̨»úÆ÷Éϲ¿ÊðµÄexecutorÊýÁ¿Ôö¼ÓµÄʱºò£¬ÐÔÄÜÒ»¿ªÊ¼ÊÇÔö¼ÓµÄ£¬Í¬Ê±Ò²¹Û²ìµ½CPUµÄƽ¾ùʹÓÃÂÊÔÚÔö¼Ó£»µ«ÊÇËæ×ŵ¥Ì¨»úÆ÷ÉϵÄexecutorÔ½À´Ô½¶à£¬ÐÔÄÜϽµÁË£¬ÒòÎªËæ×ÅexecutorµÄÊýÁ¿Ôö¼Ó£¬±»·ÖÅ䵽ÿ¸öexecutorµÄÄÚ´æÊýÁ¿¼õС£¬ÔÚÄÚ´æÀïÖ±½Ó²Ù×÷µÄÔ½À´Ô½ÉÙ£¬spill
overµ½´ÅÅÌÉϵÄÊý¾ÝÔ½À´Ô½¶à£¬×ÔÈ»ÐÔÄܾͱä²îÁË¡£
ÏÂÃæ¸øÕâÑùÒ»¸öÖ±¹ÛµÄÀý×Ó£¬µ±Ç°×ܵÄcpuÀûÓÃÂʲ¢²»¸ß£º

µ«ÊǾ¹ý¸ù¾ÝÉÏÊöÔÔòµÄµÄµ÷ÕûÖ®ºó£¬¿ÉÒÔÏÔÖø·¢ÏÖcpu×ÜÀûÓÃÂÊÔö¼ÓÁË£º

Æä´Î£¬Éæ¼°ÐÔÄܵ÷ÓÅÎÒÃǾ³£Òª¸ÄÅäÖã¬ÔÚSparkÀïÃæÓÐÈýÖÖ³£¼ûµÄÅäÖ÷½Ê½£¬ËäÈ»ÓÐЩ²ÎÊýµÄÅäÖÃÊÇ¿ÉÒÔ»¥ÏàÌæ´ú£¬µ«ÊÇ×÷Ϊ×î¼Ñʵ¼ù£¬»¹ÊÇÐèÒª×ñѲ»Í¬µÄÇéÐÎÏÂʹÓò»Í¬µÄÅäÖãº
1.ÉèÖû·¾³±äÁ¿£¬ÕâÖÖ·½Ê½Ö÷ÒªÓÃÓںͻ·¾³¡¢Ó²¼þÏà¹ØµÄÅäÖã»
2.ÃüÁîÐвÎÊý£¬ÕâÖÖ·½Ê½Ö÷ÒªÓÃÓÚ²»Í¬´ÎµÄÔËÐлᷢÉú±ä»¯µÄ²ÎÊý£¬ÓÃË«ºáÏß¿ªÍ·£»
3.´úÂëÀïÃæ£¨±ÈÈçScala£©ÏÔʽÉèÖã¨SparkConf¶ÔÏ󣩣¬ÕâÖÖÅäÖÃͨ³£ÊÇapplication¼¶±ðµÄÅäÖã¬Ò»°ã²»¸Ä±ä¡£
¾ÙÒ»¸öÅäÖõľßÌåÀý×Ó¡£slave¡¢workerºÍexecutorÖ®¼äµÄ±ÈÀýµ÷Õû¡£ÎÒÃǾ³£ÐèÒªµ÷Õû²¢ÐеÄexecutorµÄÊýÁ¿£¬ÄÇô¼òµ¥ËµÓÐÁ½ÖÖ·½Ê½£º
1.ÿ¸öworkerÄÚʼÖÕÅÜÒ»¸öexecutor£¬µ«Êǵ÷Õûµ¥Ì¨slaveÉϲ¢ÐеÄworkerµÄÊýÁ¿¡£±ÈÈ磬SPARK_WORKER_INSTANCES¿ÉÒÔÉèÖÃÿ¸öslaveµÄworkerµÄÊýÁ¿£¬µ«ÊÇÔڸıäÕâ¸ö²ÎÊýµÄʱºò£¬±ÈÈç¸Ä³É2£¬Ò»¶¨ÒªÏàÓ¦ÉèÖÃSPARK_WORKER_CORESµÄÖµ£¬ÈÃÿ¸öworkerʹÓÃÔÓÐÒ»°ëµÄcore£¬ÕâÑù²ÅÄÜÈÃÁ½¸öworkerһͬ¹¤×÷£»
2.ÿ̨slaveÄÚʼÖÕÖ»²¿ÊðÒ»¸öworker£¬µ«ÊÇworkerÄÚ²¿Êð¶à¸öexecutor¡£ÎÒÃÇÊÇÔÚYARN¿ò¼ÜϲÉÓÃÕâ¸öµ÷ÕûÀ´ÊµÏÖexecutorÊýÁ¿¸Ä±äµÄ£¬Ò»ÖÖµäÐͰ취ÊÇ£¬Ò»¸öhostÖ»ÅÜÒ»¸öworker£¬È»ºóÅäÖÃspark.executor.coresΪhostÉÏCPU
coreµÄN·ÖÖ®Ò»£¬Í¬Ê±Ò²ÉèÖÃspark.executor.memoryΪhostÉÏ·ÖÅ䏸Spark¼ÆËãÄÚ´æµÄN·ÖÖ®Ò»£¬ÕâÑùÕâ¸öhostÉϾÍÄܹ»Æô¶¯N¸öexecutor¡£
ÓеÄÅäÖÃÔÚ²»Í¬µÄMR¿ò¼Ü/¹¤¾ßÏÂÊDz»Ò»ÑùµÄ£¬±ÈÈçYARNÏÂÓеIJÎÊýµÄĬÈÏȡֵ¾Í²»Í¬£¬ÕâµãÐèҪעÒâ¡£
Ã÷È·ÕâЩ»ù´¡µÄÊÂÇéÒÔºó£¬ÔÙÀ´Ò»ÏîÒ»Ïî¿´ÐÔÄܵ÷ÓŵÄÒªµã¡£
ÄÚ´æ
Memory Tuning£¬Java¶ÔÏó»áÕ¼ÓÃÔʼÊý¾Ý2~5±¶ÉõÖÁ¸ü¶àµÄ¿Õ¼ä¡£×îºÃµÄ¼ì²â¶ÔÏóÄÚ´æÏûºÄµÄ°ì·¨¾ÍÊÇ´´½¨RDD£¬È»ºó·Åµ½cacheÀïÃæÈ¥£¬È»ºóÔÚUIÉÏÃæ¿´storageµÄ±ä»¯£»µ±È»Ò²¿ÉÒÔʹÓÃSizeEstimatorÀ´¹ÀË㡣ʹÓÃ-XX:+UseCompressedOopsÑ¡Ïî¿ÉÒÔѹËõÖ¸Õ루8×Ö½Ú±ä³É4×Ö½Ú£©¡£ÔÚµ÷ÓÃcollectµÈµÈAPIµÄʱºòҲҪСÐÄ¡ª¡ª´ó¿éÊý¾ÝÍùÄڴ濽±´µÄʱºòÐÄÀïÒªÇå³þ¡£ÄÚ´æÒªÁôһЩ¸ø²Ù×÷ϵͳ£¬±ÈÈç20%£¬ÕâÀïÃæÒ²°üÀ¨ÁËOSµÄbuffercache£¬Èç¹ûÔ¤ÁôµÃÌ«ÉÙÁË£¬»á¼ûµ½ÕâÑùµÄ´íÎó£º
¡° Required executor memory (235520+23552
MB) is above the max threshold (241664 MB) of this cluster!
Please increase the value of ¡®yarn.scheduler.maximum-allocation-mb¡¯.
»òÕ߸ɴà¾ÍûÓÐÕâÑùµÄ´íÎ󣬵«ÊÇÒÀÈ»ÓÐÒòΪÄÚ´æ²»×ãµ¼ÖµÄÎÊÌ⣬ÓеĻáÓо¯¸æ£¬±ÈÈçÕâ¸ö£º
¡° 16/01/13 23:54:48 WARN scheduler.TaskSchedulerImpl:
Initial job has not accepted any resources; check your
cluster UI to ensure that workers are registered and
have sufficient memory
ÓеÄʱºòÁ¬ÕâÑùµÄÈÕÖ¾¶¼¼û²»µ½£¬¶øÊǼûµ½Ò»Ð©²»Çå³þÔÒòµÄexecutor¶ªÊ§ÐÅÏ¢£º
¡° Exception in thread ¡°main¡± org.apache.spark.SparkException:
Job aborted due to stage failure: Task 12 in stage 17.0
failed 4 times, most recent failure: Lost task 12.3
in stage 17.0 (TID 1257, ip-10-184-192-56.ec2.internal):
ExecutorLostFailure (executor 79 lost)
Reduce TaskµÄÄÚ´æÊ¹Óá£ÔÚijЩÇé¿öÏÂreduce taskÌØ±ðÏûºÄÄڴ棬±ÈÈçµ±shuffle³öÏÖµÄʱºò£¬±ÈÈçsortByKey¡¢groupByKey¡¢reduceByKeyºÍjoinµÈ£¬ÒªÔÚÄÚ´æÀïÃæ½¨Á¢Ò»¸ö¾Þ´óµÄhash
table¡£ÆäÖÐÒ»¸ö½â¾ö°ì·¨ÊÇÔö´ólevel of parallelism£¬ÕâÑùÿ¸ötaskµÄÊäÈë¹æÄ£¾ÍÏàÓ¦¼õС¡£ÁíÍ⣬עÒâshuffleµÄÄÚ´æÉÏÏÞÉèÖã¬ÓÐʱºòÓÐ×ã¹»µÄÄڴ棬µ«ÊÇshuffleÄÚ´æ²»¹»µÄ»°£¬ÐÔÄÜÒ²ÊÇÉϲ»È¥µÄ¡£ÎÒÃÇÔÚÓдóÁ¿Êý¾ÝjoinµÈ²Ù×÷µÄʱºò£¬shuffleµÄÄÚ´æÉÏÏÞ¾³£ÅäÖõ½executorµÄ50%¡£
×¢ÒâÔʼinputµÄ´óС£¬Óкܶà²Ù×÷ʼÖÕ¶¼ÊÇÐèҪijÀàÈ«¼¯Êý¾ÝÔÚÄÚ´æÀïÃæÍê³ÉµÄ£¬ÄÇô²¢·ÇÆ´ÃüÔö¼ÓparallelismºÍpartitionµÄÖµ¾Í¿ÉÒÔ°ÑÄÚ´æÕ¼ÓüõµÃ·Ç³£Ð¡µÄ¡£ÎÒÃÇÓöµ½¹ýijЩÐÔÄܵÍÏÂÉõÖÁOOMµÄÎÊÌ⣬ÊǸıäÕâÁ½¸ö²ÎÊýËùÄÑÒÔ»º½âµÄ¡£µ«ÊÇ¿ÉÒÔͨ¹ýÔö¼Óÿ̨»úÆ÷µÄÄڴ棬»òÕßÔö¼Ó»úÆ÷µÄÊýÁ¿¶¼¿ÉÒÔÖ±½Ó»ò¼ä½ÓÔö¼ÓÄÚ´æ×ÜÁ¿À´½â¾ö¡£
ÔÚÑ¡ÔñEC2»úÆ÷ÀàÐ͵Äʱºò£¬ÒªÃ÷È·Æ¿¾±£¨¿ÉÒÔ½èÓɲâÊÔÀ´Ã÷È·£©£¬±ÈÈçÎÒÃÇÓöµ½µÄÇé¿ö¾ÍÊÇʹÓÃr3.8
xlargeºÍc3.8 xlargeÑ¡ÔñµÄÎÊÌ⣬ÔËËãÄÜÁ¦Ï൱£¬Ç°Õ߱ȺóÕß¹ó50%£¬µ«ÊÇÄÚ´æÊǺóÕßµÄ5±¶¡£
ÁíÍ⣬ÓÐһЩRDDµÄAPI£¬±ÈÈçcache£¬persist£¬¶¼»á°ÑÊý¾ÝÇ¿ÖÆ·Åµ½ÄÚ´æÀïÃæ£¬Èç¹û²¢²»Ã÷È·ÕâÑù×ö´øÀ´µÄºÃ´¦£¬¾Í²»ÒªÓÃËüÃÇ¡£
CPU
Level of Parallelism¡£Ö¸¶¨ËüÒÔºó£¬ÔÚ½øÐÐreduceÀàÐͲÙ×÷µÄʱºò£¬Ä¬ÈÏpartitionµÄÊýÁ¿¾Í±»Ö¸¶¨ÁË¡£Õâ¸ö²ÎÊýÔÚʵ¼Ê¹¤³ÌÖÐͨ³£ÊDZز»¿ÉÉٵģ¬Ò»°ã¶¼Òª¸ù¾ÝinputºÍÿ¸öexecutorÄÚ´æµÄ´óСÀ´È·¶¨¡£ÉèÖÃlevel
of parallelism»òÕßÊôÐÔspark.default.parallelismÀ´¸Ä±ä²¢Ðм¶±ð£¬Í¨³£À´Ëµ£¬Ã¿Ò»¸öCPUºË¿ÉÒÔ·ÖÅä2~3¸ötask¡£
CPU coreµÄ·ÃÎÊģʽÊǹ²Ïí»¹ÊǶÀÕ¼¡£¼´CPUºËÊDZ»Í¬Ò»hostÉϵÄexecutor¹²Ïí»¹ÊǹϷֲ¢¶ÀÕ¼¡£±ÈÈ磬һ̨»úÆ÷ÉϹ²ÓÐ32¸öCPU
coreµÄ×ÊÔ´£¬Í¬Ê±²¿ÊðÁËÁ½¸öexecutor£¬×ÜÄÚ´æÊÇ50G£¬ÄÇôһÖÖ·½Ê½ÊÇÅäÖÃspark.executor.coresΪ16£¬spark.executor.memoryΪ20G£¬ÕâÑùÓÉÓÚÄÚ´æµÄÏÞÖÆ£¬Õą̂»úÆ÷ÉϻᲿÊðÁ½¸öexecutor£¬Ã¿¸ö¶¼Ê¹ÓÃ20GÄڴ棬²¢ÇÒ¸÷ʹÓá°¶ÀÕ¼¡±µÄ16¸öCPU
core×ÊÔ´£»¶øÔÚÄÚ´æ×ÊÔ´²»±äµÄǰÌáÏ£¬Ò²¿ÉÒÔÈÃÕâÁ½¸öexecutor¡°¹²Ïí¡±Õâ32¸öcore¡£¸ù¾ÝÎҵIJâÊÔ£¬¶ÀռģʽµÄÐÔÄÜÒªÂÔºÃÓë¹²Ïíģʽ¡£
GCµ÷ÓÅ¡£´òÓ¡GCÐÅÏ¢£º-verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps¡£Òª¼ÇµÃĬÈÏ60%µÄexecutorÄÚ´æ¿ÉÒÔ±»ÓÃÀ´×÷ΪRDDµÄ»º´æ£¬Òò´ËÖ»ÓÐ40%µÄÄÚ´æ¿ÉÒÔ±»ÓÃÀ´×÷Ϊ¶ÔÏó´´½¨µÄ¿Õ¼ä£¬ÕâÒ»µã¿ÉÒÔͨ¹ýÉèÖÃspark.storage.memoryFraction¸Ä±ä¡£Èç¹ûÓкܶàС¶ÔÏó´´½¨£¬µ«ÊÇÕâЩ¶ÔÏóÔÚ²»ÍêÈ«GCµÄ¹ý³ÌÖоͿÉÒÔ»ØÊÕ£¬ÄÇôÔö´óEdenÇø»áÓÐÒ»¶¨°ïÖú¡£Èç¹ûÓÐÈÎÎñ´ÓHDFS¿½±´Êý¾Ý£¬ÄÚ´æÏûºÄÓÐÒ»¸ö¼òµ¥µÄ¹ÀË㹫ʽ¡ª¡ª±ÈÈçHDFSµÄblock
sizeÊÇ64MB£¬¹¤×÷ÇøÄÚÓÐ4¸ötask¿½±´Êý¾Ý£¬¶ø½âѹËõÒ»¸öblockÒªÔö´ó3±¶´óС£¬ÄÇô¹ÀËãÄÚ´æÏûºÄ¾ÍÊÇ£º4*3*64MB¡£ÁíÍ⣬¹¤×÷ÖÐÓöµ½¹ýÕâÑùµÄÒ»¸öÎÊÌ⣺GCĬÈÏÇé¿öÏÂÓÐÒ»¸öÏÞÖÆ£¬Ä¬ÈÏÊÇGCʱ¼ä²»Äܳ¬¹ý2%µÄCPUʱ¼ä£¬µ«ÊÇÈç¹û´óÁ¿¶ÔÏó´´½¨£¨ÔÚSparkÀïºÜÈÝÒ׳öÏÖ£¬´úÂëģʽ¾ÍÊÇÒ»¸öRDDתÏÂÒ»¸öRDD£©£¬¾Í»áµ¼Ö´óÁ¿µÄGCʱ¼ä£¬´Ó¶ø³öÏÖ¡°OutOfMemoryError:
GC overhead limit exceeded¡±£¬¶ÔÓÚÕâ¸ö£¬¿ÉÒÔͨ¹ýÉèÖÃ-XX:-UseGCOverheadLimit¹ØµôËü¡£
ÐòÁл¯ºÍ´«Êä
Data Serialization£¬Ä¬ÈÏʹÓõÄÊÇJava Serialization£¬Õâ¸ö³ÌÐòÔ±×îÊìϤ£¬µ«ÊÇÐÔÄÜ¡¢¿Õ¼ä±íÏÖ¶¼±È½Ï²î¡£»¹ÓÐÒ»¸öÑ¡ÏîÊÇKryo
Serialization£¬¸ü¿ì£¬Ñ¹ËõÂÊÒ²¸ü¸ß£¬µ«ÊDz¢·ÇÖ§³ÖÈÎÒâÀàµÄÐòÁл¯¡£ÔÚSpark UIÉÏÄܹ»¿´µ½ÐòÁл¯Õ¼ÓÃ×Üʱ¼ä¿ªÏúµÄ±ÈÀý£¬Èç¹ûÕâ¸ö±ÈÀý¸ßµÄ»°¿ÉÒÔ¿¼ÂÇÓÅ»¯ÄÚ´æÊ¹ÓúÍÐòÁл¯¡£
Broadcasting Large Variables¡£ÔÚtaskʹÓþ²Ì¬´ó¶ÔÏóµÄʱºò£¬¿ÉÒÔ°ÑËübroadcast³öÈ¥¡£Spark»á´òÓ¡ÐòÁл¯ºóµÄ´óС£¬Í¨³£À´ËµÈç¹ûËü³¬¹ý20KB¾ÍÖµµÃÕâô×ö¡£ÓÐÒ»ÖÖ³£¼ûÇéÐÎÊÇ£¬Ò»¸ö´ó±íjoinÒ»¸öС±í£¬°ÑС±íbroadcastºó£¬´ó±íµÄÊý¾Ý¾Í²»ÐèÒªÔÚ¸÷¸önodeÖ®¼ä·èÅÜ£¬°²°²¾²¾²µØ´ôÔÚ±¾µØµÈС±íbroadcast¹ýÀ´¾ÍºÃÁË¡£
Data Locality¡£Êý¾ÝºÍ´úÂëÒª·Åµ½Ò»Æð²ÅÄÜ´¦Àí£¬Í¨³£´úÂë×ܱÈÊý¾ÝҪСһЩ£¬Òò´Ë°Ñ´úÂëË͵½¸÷´¦»á¸ü¿ì¡£Data
LocalityÊÇÊý¾ÝºÍ´¦ÀíµÄ´úÂëÔÚÎÝÀï¿Õ¼äÉϽӽüµÄ³Ì¶È£ºPROCESS_LOCAL£¨Í¬Ò»¸öJVM£©¡¢NODE_LOCAL£¨Í¬Ò»¸önode£¬±ÈÈçÊý¾ÝÔÚHDFSÉÏ£¬µ«ÊǺʹúÂëÔÚͬһ¸önode£©¡¢NO_PREF¡¢RACK_LOCAL£¨²»ÔÚͬһ¸öserver£¬µ«ÔÚͬһ¸ö»ú¼Ü£©¡¢ANY¡£µ±È»ÓÅÏȼ¶´Ó¸ßµ½µÍ£¬µ«ÊÇÈç¹ûÔÚ¿ÕÏеÄexecutorÉÏÃæÃ»ÓÐδ´¦ÀíÊý¾ÝÁË£¬ÄÇô¾ÍÓÐÁ½¸öÑ¡Ôñ£º
£¨1£©ÒªÃ´µÈÈç½ñ·±Ã¦µÄCPUÏÐÏÂÀ´´¦Àí¾¡¿ÉÄÜ¡°±¾µØ¡±µÄÊý¾Ý£¬
£¨2£©ÒªÃ´¾Í²»µÈÖ±½ÓÆô¶¯taskÈ¥´¦ÀíÏà¶ÔÔ¶³ÌµÄÊý¾Ý¡£
ĬÈϵ±ÕâÖÖÇé¿ö·¢ÉúSpark»áµÈÒ»»á¶ù£¨spark.locality£©£¬¼´²ßÂÔ£¨1£©£¬Èç¹û·±Ã¦µÄCPUÍ£²»ÏÂÀ´£¬¾Í»áÖ´ÐвßÂÔ£¨2£©¡£
´úÂëÀï¶Ô´ó¶ÔÏóµÄÒýÓá£ÔÚtaskÀïÃæÒýÓôó¶ÔÏóµÄʱºòҪСÐÄ£¬ÒòΪËü»áËæ×ÅtaskÐòÁл¯µ½Ã¿¸ö½ÚµãÉÏÈ¥£¬Òý·¢ÐÔÄÜÎÊÌâ¡£Ö»ÒªÐòÁл¯µÄ¹ý³Ì²»Å׳öÒì³££¬ÒýÓöÔÏóÐòÁл¯µÄÎÊÌâÊÂʵÉϺÜÉÙ±»ÈËÖØÊÓ¡£Èç¹û£¬Õâ¸ö´ó¶ÔÏóȷʵÊÇÐèÒªµÄ£¬ÄÇô¾Í²»Èç¸É´à°ÑËü±ä³ÉRDDºÃÁË¡£¾ø´ó¶àÊýʱºò£¬¶ÔÓÚ´ó¶ÔÏóµÄÐòÁл¯ÐÐΪ£¬ÊDz»Öª²»¾õ·¢ÉúµÄ£¬»òÕß˵ÊÇÔ¤ÆÚÖ®ÍâµÄ£¬±ÈÈçÔÚÎÒÃǵÄÏîÄ¿ÖÐÓÐÕâÑùÒ»¶Î´úÂ룺
rdd.map(r => { println(BackfillTypeIndex) }) |
ÆäÊµÄØ£¬ËüµÈ¼ÛÓÚÕâÑù£º
rdd.map(r => { println(this.BackfillTypeIndex) }) |
²»ÒªÐ¡¿´ÁËÕâ¸öthis£¬ÓÐʱºòËüµÄÐòÁл¯ÊǷdz£´óµÄ¿ªÏú¡£
¶ÔÓÚÕâÑùµÄÎÊÌ⣬һÖÖ×îÖ±½ÓµÄ½â¾ö·½·¨¾ÍÊÇ£º
val dereferencedVariable = this.BackfillTypeIndex rdd.map(r => println(dereferencedVariable)) // "this" is not serialized |
Ïà¹ØµØ£¬×¢½â@transientÓÃÀ´±êʶij±äÁ¿²»Òª±»ÐòÁл¯£¬Õâ¶ÔÓÚ½«´ó¶ÔÏó´ÓÐòÁл¯µÄÏÝÚåÖÐÅųýµôÊǺÜÓÐÓõġ£ÁíÍ⣬עÒâclassÖ®¼äµÄ¼Ì³Ð²ã¼¶¹ØÏµ£¬ÓÐʱºòÒ»¸öСµÄcase
class¿ÉÄÜÀ´×ÔÒ»¿Ã´óÊ÷¡£
Îļþ¶Áд
Îļþ´æ´¢ºÍ¶ÁÈ¡µÄÓÅ»¯¡£±ÈÈç¶ÔÓÚһЩcase¶øÑÔ£¬Èç¹ûÖ»ÐèҪij¼¸ÁУ¬Ê¹ÓÃrcfileºÍparquetÕâÑùµÄ¸ñʽ»á´ó´ó¼õÉÙÎļþ¶ÁÈ¡³É±¾¡£ÔÙÓоÍÊÇ´æ´¢Îļþµ½S3ÉÏ»òÕßHDFSÉÏ£¬¿ÉÒÔ¸ù¾ÝÇé¿öÑ¡Ôñ¸üºÏÊʵĸñʽ£¬±ÈÈçѹËõÂʸü¸ßµÄ¸ñʽ¡£ÁíÍâ£¬ÌØ±ðÊǶÔÓÚshuffleÌØ±ð¶àµÄÇé¿ö£¬¿¼ÂÇÁôÏÂÒ»¶¨Á¿µÄ¶îÍâÄÚ´æ¸ø²Ù×÷ϵͳ×÷Ϊ²Ù×÷ϵͳµÄbuffer
cache£¬±ÈÈç×ܹ²50GµÄÄڴ棬JVM×î¶à·ÖÅäµ½40G¶àÒ»µã¡£
Îļþ·ÖƬ¡£±ÈÈçÔÚS3ÉÏÃæ¾ÍÖ§³ÖÎļþÒÔ·ÖÆ¬ÐÎʽ´æ·Å£¬ºó׺ÊÇpartXX¡£Ê¹ÓÃcoalesce·½·¨À´ÉèÖ÷ֳɶàÉÙÆ¬£¬Õâ¸öµ÷Õû³É²¢Ðм¶±ð»òÕ߯äÕûÊý±¶¿ÉÒÔÌá¸ß¶ÁдÐÔÄÜ¡£µ«ÊÇÌ«¸ßÌ«µÍ¶¼²»ºÃ£¬Ì«µÍÁËû·¨³ä·ÖÀûÓÃS3²¢ÐжÁдµÄÄÜÁ¦£¬Ì«¸ßÁËÔòÊÇСÎļþÌ«¶à£¬Ô¤´¦Àí¡¢ºÏ²¢¡¢Á¬½Ó½¨Á¢µÈµÈ¶¼ÊÇʱ¼ä¿ªÏú°¡£¬¶Áд»¹ÈÝÒ׳¬¹ýthrottle¡£
ÈÎÎñ
SparkµÄSpeculation¡£Í¨¹ýÉèÖÃspark.speculationµÈ¼¸¸öÏà¹ØÑ¡Ï¿ÉÒÔÈÃSparkÔÚ·¢ÏÖijЩtaskÖ´ÐÐÌØ±ðÂýµÄʱºò£¬¿ÉÒÔÔÚ²»µÈ´ýÍê³ÉµÄÇé¿öϱ»ÖØÐÂÖ´ÐУ¬×îºóÏàͬµÄtaskÖ»ÒªÓÐÒ»¸öÖ´ÐÐÍêÁË£¬ÄÇô×î¿ìÖ´ÐÐÍêµÄÄǸö½á¹û¾Í»á±»²ÉÄÉ¡£
¼õÉÙShuffle¡£ÆäʵSparkµÄ¼ÆËãÍùÍùºÜ¿ì£¬µ«ÊÇ´óÁ¿¿ªÏú¶¼»¨ÔÚÍøÂçºÍIOÉÏÃæ£¬¶øshuffle¾ÍÊÇÒ»¸öµäÐÍ¡£¾Ù¸öÀý×Ó£¬Èç¹û(k,
v1) join (k, v2) => (k, v3)£¬ÄÇô£¬ÕâÖÖÇé¿öÆäʵSparkÊÇÓÅ»¯µÃ·Ç³£ºÃµÄ£¬ÒòΪÐèÒªjoinµÄ¶¼ÔÚÒ»¸önodeµÄÒ»¸öpartitionÀïÃæ£¬joinºÜ¿ìÍê³É£¬½á¹ûÒ²ÊÇÔÚͬһ¸önode£¨ÕâһϵÁвÙ×÷¿ÉÒÔ±»·ÅÔÚͬһ¸östageÀïÃæ£©¡£µ«ÊÇÈç¹ûÊý¾Ý½á¹¹±»Éè¼ÆÎª(obj1)
join (obj2) => (obj3)£¬¶øÆäÖеÄjoinÌõ¼þΪobj1.column1 ==
obj2.column1£¬Õâ¸öʱºòÍùÍù¾Í±»ÆÈshuffleÁË£¬ÒòΪ²»ÔÙÓÐͬһ¸ökeyʹµÃÊý¾ÝÔÚͬһ¸önodeÉϵÄÇ¿±£Ö¤¡£ÔÚÒ»¶¨ÒªshuffleµÄÇé¿öÏ£¬¾¡¿ÉÄܼõÉÙshuffleǰµÄÊý¾Ý¹æÄ££¬±ÈÈçÕâ¸ö±ÜÃâgroupByKeyµÄÀý×Ó¡£ÏÂÃæÕâ¸ö±È½ÏµÄͼƬÀ´×ÔSpark
Summit 2013µÄÒ»¸öÑݽ²£¬½²µÄÊÇͬһ¼þÊÂÇ飺

Repartition¡£ÔËËã¹ý³ÌÖÐÊý¾ÝÁ¿Ê±´óʱС£¬Ñ¡ÔñºÏÊʵÄpartitionÊýÁ¿¹ØÏµÖØ´ó£¬Èç¹ûÌ«¶àpartition¾Íµ¼ÖÂÓкܶàСÈÎÎñºÍ¿ÕÈÎÎñ²úÉú£»Èç¹ûÌ«ÉÙÔòµ¼ÖÂÔËËã×ÊԴû·¨³ä·ÖÀûÓ㬱ØÒªÊ±ºò¿ÉÒÔʹÓÃrepartitionÀ´µ÷Õû£¬²»¹ýËüÒ²²»ÊÇûÓдú¼ÛµÄ£¬ÆäÖÐÒ»¸ö×îÖ÷Òª´ú¼Û¾ÍÊÇshuffle¡£ÔÙÓÐÒ»¸ö³£¼ûÎÊÌâÊÇÊý¾Ý´óС²îÒìÌ«´ó£¬ÕâÖÖÇé¿öÖ÷ÒªÊÇÊý¾ÝµÄpartitionµÄkeyÆäʵȡֵ²¢²»¾ùÔÈÔì³ÉµÄ£¨Ä¬ÈÏʹÓÃHashPartitioner£©£¬ÐèÒª¸Ä½øÕâÒ»µã£¬±ÈÈçÖØÐ´hashËã·¨¡£²âÊÔµÄʱºòÏëÖªµÀpartitionµÄÊýÁ¿¿ÉÒÔµ÷ÓÃrdd.partitions().size()»ñÖª¡£
Taskʱ¼ä·Ö²¼¡£¹Ø×¢Spark UI£¬ÔÚStageµÄÏêÇéÒ³ÃæÉÏ£¬¿ÉÒÔ¿´µÃµ½shuffleдµÄ×Ü¿ªÏú£¬GCʱ¼ä£¬µ±Ç°·½·¨Õ»£¬»¹ÓÐtaskµÄʱ¼ä»¨·Ñ¡£Èç¹ûÄã·¢ÏÖtaskµÄʱ¼ä»¨·Ñ·Ö²¼Ì«É¢£¬¾ÍÊÇ˵Óеύ·Ñʱ¼äºÜ³¤£¬Óеḷ́ܶ¬Õâ¾Í˵Ã÷¼ÆËã·Ö²¼²»¾ù£¬ÐèÒªÖØÐÂÉóÊÓÊý¾Ý·ÖƬ¡¢keyµÄhash¡¢taskÄÚ²¿µÄ¼ÆËãÂß¼µÈµÈ£¬Æ¿¾±³öÏÖÔÚºÄʱ³¤µÄtaskÉÏÃæ¡£

ÖØÓÃ×ÊÔ´¡£ÓеÄ×ÊÔ´ÉêÇ뿪Ïú¾Þ´ó£¬¶øÇÒÍùÍùÏ൱ÓÐÏÞ£¬±ÈÈ罨Á¢Á¬½Ó£¬¿ÉÒÔ¿¼ÂÇÔÚpartition½¨Á¢µÄʱºò¾Í´´½¨ºÃ£¨±ÈÈçʹÓÃmapPartition·½·¨£©£¬ÕâÑù¶ÔÓÚÿ¸öpartitionÄÚµÄÿ¸öÔªËØµÄ²Ù×÷£¬¾ÍÖ»ÒªÖØÓÃÕâ¸öÁ¬½Ó¾ÍºÃÁË£¬²»ÐèÒªÖØÐ½¨Á¢Á¬½Ó¡£
|