±¾ÎÄÊ×ÏȶԾö²ßÊ÷Ëã·¨µÄÔÀí½øÐзÖÎö²¢Ö¸³öÆä´æÔÚµÄÎÊÌ⣬½ø¶ø½éÉÜËæ»úÉÁÖËã·¨¡£Í¬µ¥»ú»·¾³ÏµÄËæ»úÉÁÖ¹¹Ô첻ͬµÄÊÇ£¬·Ö²¼Ê½»·¾³Ïµľö²ßÊ÷¹¹½¨Èç¹û²»½øÐÐÓÅ»¯µÄ»°£¬»á´øÀ´´óÁ¿µÄÍøÂç
IO ²Ù×÷£¬Ë㷨ЧÂʽ«·Ç³£µÍ£¬Îª´Ë±¾Îĸø³öÁËËæ»úÉÁÖÔÚ·Ö²¼Ê½»·¾³ÏµľßÌåÓÅ»¯²ßÂÔ£¬È»ºó¶ÔÆäÔ´Âë½øÐзÖÎö£¬×îºóͨ¹ý°¸Àý½éÉÜËæ»úÉÁÖÔÚ½ðÈÚÁìÓòÄÚÈçºÎ½øÐÐÓÅÖʿͻ§µÄ·ÖÀà¡£
ÒýÑÔ
Spark ÄÚ´æ¼ÆËã¿ò¼ÜÔÚ´óÊý¾Ý´¦ÀíÁìÓòÄÚÕ¼ÓоÙ×ãÇáÖØµÄµØÎ»£¬2014
Äê Spark ·çÃÒ IT ½ç£¬Twitter Êý¾ÝÏÔʾ Spark ÒѾ³¬Ô½ Hadoop¡¢Yarn
µÈ¼¼Êõ£¬³ÉΪ´óÊý¾Ý´¦ÀíÁìÓòÖÐ×îÈÈÃŵļ¼Êõ£¬Èçͼ 1 Ëùʾ¡£2015 Äê 6 Ô 17 ÈÕ£¬IBM Ðû²¼ËüµÄ¡°°ÙÍòÊý¾Ý¹¤³Ìʦ¼Æ»®¡±£¬³Ðŵ´óÁ¦Íƽø
Apache Spark ÏîÄ¿£¬²¢³Æ¸ÃÏîĿΪ¡°ÒÔÊý¾ÝΪÖ÷µ¼µÄ£¬Î´À´Ê®Äê×îÎªÖØÒªµÄеĿªÔ´ÏîÄ¿¡±£¬¼Æ»®Í¶È볬¹ý
3500 ÃûÑо¿ºÍ¿ª·¢ÈËÔ±ÔÚÈ«ÇòÊ®Óà¸öʵÑéÊÒ¿ªÕ¹Óë Spark Ïà¹ØµÄÏîÄ¿£¬²¢½«Îª Spark ¿ªÔ´Éú̬ϵͳÎÞ³¥Ìá¹©Í»ÆÆÐԵĻúÆ÷ѧϰ¼¼Êõ¡ª¡ªIBM
SystemML¡£´ÓÖв»ÄÑ·¢ÏÖ£¬»úÆ÷ѧϰ¼¼ÊõÊÇ IBM ´óÁ¦Ö§³Ö Spark µÄÒ»¸öÖØÒªÔÒò£¬ÕâÊÇÒòΪ
Spark ÊÇ»ùÓÚÄÚ´æµÄ£¬¶ø»úÆ÷ѧϰËã·¨ÄÚ²¿ÊµÏÖ¼¸ºõ¶¼ÐèÒª½øÐеü´úʽ¼ÆË㣬ÕâʹµÃ Spark ÌØ±ðÊÊÓÃÓÚ·Ö²¼Ê½»·¾³ÏµĻúÆ÷ѧϰ¡£
±¾ÎĽ«¶Ô»úÆ÷ѧϰÁìÓòÖоµäµÄ·ÖÀàºÍ»Ø¹éËã·¨¡ª¡ªËæ»úÉÁÖ£¨Random Forests£©½øÐнéÉÜ¡£Ê×ÏȶÔËæ»úÉÁÖËã·¨µÄºËÐÄÔÀí½øÐнéÉÜ£¬½Ó׎éÉÜÆäÔÚ
Spark ÉϵÄʵÏÖ·½Ê½²¢¶ÔÆäÔ´Âë½øÐзÖÎö£¬×îºó¸ø³öÒ»¸ö°¸Àý˵Ã÷Ëæ»úÉÁÖËã·¨ÔÚʵ¼ÊÏîÄ¿ÖеÄÓ¦Ó᣺óÐøÏà¹ØÄÚÈݽéÉÜÈ«²¿ÒÔ·ÖÀà½Ç¶È½øÐУ¬»Ø¹éÔ¤²âÓë·ÖÀàÔÚËã·¨Éϲ¢Ã»ÓÐÌ«¶àµÄ²îÒ죬±¾ÎÄÖ¼ÔÚÀí½âËæ»úÉÁÖÔÚ
Spark ÉϵÄʵÏÖÔÀí¡£

ͼ 1. Spark ÓëÆäËü´óÊý¾Ý´¦Àí¹¤¾ßµÄ»îÔ¾³Ì¶È±È½Ï
»·¾³ÒªÇó
²Ù×÷ϵͳ£ºLinux£¬±¾ÎIJÉÓÃµÄ Ubuntu 10.04£¬´ó¼Ò¿ÉÒÔ¸ù¾Ý×Ô¼ºµÄϲºÃʹÓÃ×Ô¼ºÉó¤µÄ
Linux ·¢Ðаæ
Java Óë Scala °æ±¾£ºScala 2.10.4£¬Java 1.7
Spark ¼¯Èº»·¾³£¨3 ̨£©£ºHadoop 2.4.1+Spark 1.4.0
Ô´ÂëÔĶÁÓë°¸Àýʵս»·¾³£ºIntellij IDEA 14.1.4
¾ö²ßÊ÷
Ëæ»úÉÁÖËã·¨ÊÇ»úÆ÷ѧϰ¡¢¼ÆËã»úÊÓ¾õµÈÁìÓòÄÚÓ¦Óü«Îª¹ã·ºµÄÒ»¸öËã·¨£¬Ëü²»½ö¿ÉÒÔÓÃÀ´×ö·ÖÀ࣬Ҳ¿ÉÓÃÀ´×ö»Ø¹é¼´Ô¤²â£¬Ëæ»úÉÁÖ»úÓɶà¸ö¾ö²ßÊ÷¹¹³É£¬Ïà±ÈÓÚµ¥¸ö¾ö²ßÊ÷Ëã·¨£¬Ëü·ÖÀà¡¢Ô¤²âЧ¹û¸üºÃ£¬²»ÈÝÒ׳öÏÖ¹ý¶ÈÄâºÏµÄÇé¿ö¡£
Ëæ»úÉÁÖËã·¨»ùÓÚ¾ö²ßÊ÷£¬ÔÚÕýʽ½²½âËæ»úÉÁÖË㷨֮ǰ£¬ÏÈÀ´½éÉܾö²ßÊ÷µÄÔÀí¡£¾ö²ßÊ÷ÊÇÊý¾ÝÍÚ¾òÓë»úÆ÷ѧϰÁìÓòÖÐÒ»Öַdz£ÖØÒªµÄ·ÖÀàÆ÷£¬Ë㷨ͨ¹ýѵÁ·Êý¾ÝÀ´¹¹½¨Ò»¿ÃÓÃÓÚ·ÖÀàµÄÊ÷£¬´Ó¶ø¶Ôδ֪Êý¾Ý½øÐиßЧ·ÖÀà¡£¾Ù¸öÏàÇ×µÄÀý×ÓÀ´ËµÃ÷ʲôÊǾö²ßÊ÷¡¢ÈçºÎ¹¹½¨Ò»¸ö¾ö²ßÊ÷¼°ÈçºÎÀûÓþö²ßÊ÷½øÐзÖÀ࣬ijÏàÇ×ÍøÕ¾Í¨¹ýµ÷²éÏàÇ×ÀúÊ·Êý¾Ý·¢ÏÖ£¬Å®º¢ÔÚʵ¼ÊÏàÇ×ʱÓÐÈçϱíÏÖ£º
±í 1. ÏàÇ×ÀúÊ·Êý¾Ý±í

ͨ¹ý±í 1 ËùʾÀúÊ·Êý¾Ý¿ÉÒÔ¹¹½¨ÈçϾö²ßÊ÷£º

ͼ 2. ¾ö²ßÊ÷ʾÒâͼ
Èç¹ûÍøÕ¾ÐÂ×¢²áÁËÒ»¸öÓû§£¬ËûÔÚ³ÇÊÐÎÞ·¿²ú¡¢ÄêÊÕÈëСÓÚ 35w ÇÒÀë¹ý»é£¬Ôò¿ÉÒÔÔ¤²âÅ®º¢²»»á¸úËû¼ûÃæ¡£Í¨¹ýÉÏÃæÕâ¸ö¼òµ¥µÄÀý×Ó¿ÉÒÔ¿´³ö£¬¾ö²ßÊ÷¶ÔÓÚÏÖʵÉú»î¾ßÓкÜÇ¿µÄÖ¸µ¼ÒâÒ塣ͨ¹ý¸ÃÀý×Ó£¬ÎÒÃÇÒ²¿ÉÒÔ×ܽá³ö¾ö²ßÊ÷µÄ¹¹½¨²½Ö裺
½«ËùÓмǼ¿´×÷ÊÇÒ»¸ö½Úµã
±éÀúÿ¸ö±äÁ¿µÄÿÖַָʽ£¬ÕÒµ½×îºÃµÄ·Ö¸îµã
ÀûÓ÷ָîµã½«¼Ç¼·Ö¸î³ÉÁ½¸ö×Ó½áµã C1 ºÍ C2
¶Ô×Ó½áµã C1 ºÍ C2 ÖØ¸´Ö´Ðв½Öè 2£©¡¢3£©£¬Ö±µ½Âú×ãÌØ¶¨Ìõ¼þΪֹ
ÔÚ¹¹½¨¾ö²ßÊ÷µÄ¹ý³ÌÖУ¬×îÖØÒªµÄÊÇÈçºÎÕÒµ½×îºÃµÄ·Ö¸îµã£¬ÄÇÔõÑùµÄ·Ö¸îµã²ÅËãÊÇ×îºÃµÄÄØ£¿Èç¹ûÒ»¸ö·Ö¸îµãÄܹ»½«Õû¸ö¼Ç¼׼ȷµØ·ÖΪÁ½À࣬ÄǸ÷ָîµã¾Í¿ÉÒÔÈÏΪÊÇ×îºÃµÄ£¬´Ëʱ±»·Ö³ÉµÄÁ½ÀàÊÇÏà¶ÔÀ´ËµÊÇ×î¡°´¿¡±µÄ£¬¡£ÀýÈçÇ°ÃæµÄÀý×ÓÖС°ÔÚ³ÇÊÐÓµÓз¿²ú¡±¿ÉÒÔ½«ËùÓмǼ·ÖÁ½À࣬ËùÓÐÊÇ¡°ÊÇ¡±µÄ¶¼¿ÉÒÔ»®ÎªÒ»À࣬¶ø¡°·ñ¡±µÄÔò¶¼±»»®ÎªÁíÍâÒ»Àà¡£ËùÓС°ÊÇ¡±»®·ÖºóµÄÀàÊÇ×î¡°´¿¡°µÄ£¬ÒòΪËùÓÐÔÚ³ÇÊÐÓµÓз¿²úµ¥ÉíÄÐÊ¿£¬²»¹ÜËûÊÇ·ñÀë¹ý»é¡¢ÄêÊÕÈë¶àÉÙ¶¼»á¼ûÃæ£»¶øËùÓС°·ñ¡±»®·ÖºóµÄÀ࣬ÓÖ±»·ÖΪÁ½À࣬ÆäÖÐÓмûÃæµÄ£¬Ò²Óв»¼ûÃæµÄ£¬Òò´ËËü²»ÊǺܴ¿£¬µ«¶ÔÓÚÕûÌå¼Ç¼À´½²£¬ËüÊÇ×î´¿µÄ¡£
ÔÚÉÏÊöÀý×Óµ±ÖУ¬¿ÉÒÔ¿´µ½¾ö²ßÊ÷¼È¿ÉÒÔ´¦ÀíÁ¬ÐøÐͱäÁ¿Ò²¿ÉÒÔ´¦ÀíÃû³ÆÐͱäÁ¿£¬Á¬ÐøÐͱäÁ¿ÈçÄêÊÕÈ룬Ëü¿ÉÒÔÓá°>=¡±£¬¡°>¡±,¡°<¡±»ò¡°<=¡±×÷Ϊ·Ö¸îÌõ¼þ£¬¶øÃû³ÆÐͱäÁ¿Èç³ÇÊÐÊÇ·ñÓµÓз¿²ú£¬ÖµÊÇÓÐÏ޵ļ¯ºÏÈç¡°ÊÇ¡°¡¢¡±·ñ¡°Á½ÖÖ£¬Ëü²ÉÓá±=¡±×÷Ϊ·Ö¸îÌõ¼þ¡£
ÔÚÇ°ÃæÌáµ½£¬Ñ°ÕÒ×îºÃµÄ·Ö¸îµãÊÇͨ¹ýÁ¿»¯·Ö¸îºóÀàµÄ´¿¶ÈÀ´È·¶¨µÄ£¬Ä¿Ç°ÓÐÈýÖÖ´¿¶È¼ÆË㷽ʽ£¬·Ö±ðÊÇ
Gini ²»´¿¶È¡¢ìØ£¨Entropy£©¼°´íÎóÂÊ£¬ËüÃǵĹ«Ê½¶¨ÒåÈçÏ£º
¹«Ê½ÖÐµÄ P(i) ±íʾ¼Ç¼ÖÐµÚ i Àà¼Ç¼ÊýÕ¼×ܼǼÊýµÄ±ÈÀý£¬ÀýÈçÇ°ÃæµÄÅ®º¢ÏàÇ×Àý×Ó¿ÉÒÔ¸ù¾Ý¼ûÃæ»ò²»¼ûÃæ·ÖΪÁ½À࣬¼ûÃæµÄ¼Ç¼ռ±ÈÊýΪ
P(1)=9/10£¬²»¼ûÃæµÄ¼Ç¼ռ±ÈΪ P(2)=1/10¡£ÉÏÃæµÄÈý¸ö¹«Ê½¾ùÊÇÖµÔ½´ó±íʾԽ¡°²»´¿¡±£¬ÖµÔ½Ð¡±íʾԽ¡°´¿¡±¡£Êµ¼ÊÖÐ×î³£ÓõÄÊÇ
Gini ²»´¿¶È¹«Ê½£¬ºóÃæµÄÀý×ÓÒ²½«²ÉÓøù«Ê½½øÐд¿¶È¼ÆËã¡£
¾ö²ßÊ÷µÄ¹¹½¨ÊÇÒ»¸öµÝ¹éµÄ¹ý³Ì£¬ÀíÏëÇé¿öÏÂËùÓеļǼ¶¼Äܱ»¾«È··ÖÀ࣬¼´Éú³É¾ö²ßÊ÷Ò¶½Úµã¶¼ÓÐÈ·¶¨µÄÀàÐÍ£¬µ«ÏÖʵÕâÖÖÌõ¼þÍùÍùºÜÄÑÂú×㣬ÕâʹµÃ¾ö²ßÊ÷ÔÚ¹¹½¨Ê±¿ÉÄܺÜÄÑÍ£Ö¹¡£¼´Ê¹¹¹½¨Íê³É£¬Ò²³£³£»áʹµÃ×îÖյĽڵãÊý¹ý¶à£¬´Ó¶øµ¼Ö¹ý¶ÈÄâºÏ£¨overfitting£©£¬Òò´ËÔÚʵ¼ÊÓ¦ÓÃÖÐÐèÒªÉ趨ֹͣÌõ¼þ£¬µ±´ïµ½Í£Ö¹Ìõ¼þʱ£¬Ö±½ÓÍ£Ö¹¾ö²ßÊ÷µÄ¹¹½¨¡£µ«ÕâÈÔÈ»²»ÄÜÍêÈ«½â¾ö¹ý¶ÈÄâºÏÎÊÌ⣬¹ý¶ÈÄâºÏµÄµäÐͱíÏÖÊǾö²ßÊ÷¶ÔѵÁ·Êý¾Ý´íÎóÂʺܵͣ¬¶ø¶Ô²âÊÔÊý¾ÝÆä´íÎóÂÊÈ´·Ç³£¸ß¡£
¹ý¶ÈÄâºÏ³£¼ûÔÒòÓУº
£¨1£©ÑµÁ·Êý¾ÝÖдæÔÚÔëÉù£»
£¨2£©Êý¾Ý²»¾ßÓдú±íÐÔ¡£¹ý¶ÈÄâºÏµÄµäÐͱíÏÖÊǾö²ßÊ÷µÄ½Úµã¹ý¶à£¬Òò´Ëʵ¼ÊÖг£³£ÐèÒª¶Ô¹¹½¨ºÃµÄ¾ö²ßÊ÷½øÐÐÖ¦Ò¶²Ã¼ô£¨Prune
Tree£©£¬µ«Ëü²»Äܽâ¾ö¸ù±¾ÎÊÌâ£¬Ëæ»úÉÁÖËã·¨µÄ³öÏÖÄܹ»½ÏºÃµØ½â¾ö¹ý¶ÈÄâºÏÎÊÌâ¡£
Ëæ»úÉÁÖËã·¨
Óɶà¸ö¾ö²ßÊ÷¹¹³ÉµÄÉÁÖ£¬Ëã·¨·ÖÀà½á¹ûÓÉÕâЩ¾ö²ßÊ÷ͶƱµÃµ½£¬¾ö²ßÊ÷ÔÚÉú³ÉµÄ¹ý³Ìµ±ÖзֱðÔÚÐз½ÏòºÍÁз½ÏòÉÏÌí¼ÓËæ»ú¹ý³Ì£¬Ðз½ÏòÉϹ¹½¨¾ö²ßÊ÷ʱ²ÉÓ÷ŻسéÑù£¨bootstraping£©µÃµ½ÑµÁ·Êý¾Ý£¬Áз½ÏòÉϲÉÓÃÎÞ·Å»ØËæ»ú³éÑùµÃµ½ÌØÕ÷×Ó¼¯£¬²¢¾Ý´ËµÃµ½Æä×îÓÅÇзֵ㣬Õâ±ãÊÇËæ»úÉÁÖËã·¨µÄ»ù±¾ÔÀí¡£Í¼
3 ¸ø³öÁËËæ»úÉÁÖËã·¨·ÖÀàÔÀí£¬´ÓͼÖпÉÒÔ¿´µ½£¬Ëæ»úÉÁÖÊÇÒ»¸ö×éºÏÄ£ÐÍ£¬ÄÚ²¿ÈÔÈ»ÊÇ»ùÓÚ¾ö²ßÊ÷£¬Í¬µ¥Ò»µÄ¾ö²ßÊ÷·ÖÀ಻ͬµÄÊÇ£¬Ëæ»úÉÁÖͨ¹ý¶à¸ö¾ö²ßÊ÷ͶƱ½á¹û½øÐзÖÀ࣬Ëã·¨²»ÈÝÒ׳öÏÖ¹ý¶ÈÄâºÏÎÊÌâ¡£

ͼ 3. Ëæ»úÉÁÖʾÒâͼ
Ëæ»úÉÁÖÔÚ·Ö²¼Ê½»·¾³ÏµÄÓÅ»¯²ßÂÔ
Ëæ»úÉÁÖËã·¨ÔÚµ¥»ú»·¾³ÏºÜÈÝÒ×ʵÏÖ£¬µ«ÔÚ·Ö²¼Ê½»·¾³ÏÂÌØ±ðÊÇÔÚ Spark
ƽ̨ÉÏ£¬´«Í³µ¥»úÐÎʽµÄµü´ú·½Ê½±ØÐëÒª½øÐÐÏàÓ¦¸Ä½ø²ÅÄÜÊÊÓÃÓÚ·Ö²¼Ê½»·¾³£¬ÕâÊÇÒòΪÔÚ·Ö²¼Ê½»·¾³Ï£¬Êý¾ÝÒ²ÊÇ·Ö²¼Ê½µÄ£¨Èçͼ
5 Ëùʾ£©£¬Ëã·¨Éè¼Æ²»µÃµ±»áÉú³É´óÁ¿µÄ IO ²Ù×÷£¬ÀýÈçÆµ·±µÄÍøÂçÊý¾Ý´«Ê䣬´Ó¶øÓ°ÏìË㷨ЧÂÊ¡£

ͼ 4. µ¥»ú»·¾³ÏÂÊý¾Ý´æ´¢

ͼ 5. ·Ö²¼Ê½»·¾³ÏÂÊý¾Ý´æ´¢
Òò´Ë£¬ÔÚ Spark ÉϽøÐÐËæ»úÉÁÖËã·¨µÄʵÏÖ£¬ÐèÒª½øÐÐÒ»¶¨µÄÓÅ»¯£¬Spark
ÖеÄËæ»úÉÁÖËã·¨Ö÷ҪʵÏÖÁËÈý¸öÓÅ»¯²ßÂÔ£º
Çзֵã³éÑùͳ¼Æ£¬Èçͼ 6 Ëùʾ¡£ÔÚµ¥»ú»·¾³Ïµľö²ßÊ÷¶ÔÁ¬Ðø±äÁ¿½øÐÐÇзֵãÑ¡Ôñʱ£¬Ò»°ãÊÇͨ¹ý¶ÔÌØÕ÷µã½øÐÐÅÅÐò£¬È»ºóÈ¡ÏàÁÚÁ½¸öÊýÖ®¼äµÄµã×÷ΪÇзֵ㣬ÕâÔÚµ¥»ú»·¾³ÏÂÊÇ¿ÉÐе쬵«Èç¹ûÔÚ·Ö²¼Ê½»·¾³ÏÂÈç´Ë²Ù×÷µÄ»°£¬»á´øÀ´´óÁ¿µÄÍøÂç´«Êä²Ù×÷£¬ÌرðÊǵ±Êý¾ÝÁ¿´ïµ½
PB ¼¶Ê±£¬Ë㷨ЧÂʽ«¼«ÎªµÍÏ¡£Îª±ÜÃâ¸ÃÎÊÌ⣬Spark ÖеÄËæ»úÉÁÖÔÚ¹¹½¨¾ö²ßÊ÷ʱ£¬»á¶Ô¸÷·ÖÇø²ÉÓÃÒ»¶¨µÄ×ÓÌØÕ÷²ßÂÔ½øÐгéÑù£¬È»ºóÉú³É¸÷¸ö·ÖÇøµÄͳ¼ÆÊý¾Ý£¬²¢×îÖյõ½Çзֵ㡣
ÌØÕ÷×°Ï䣨Binning£©£¬Èçͼ 7 Ëùʾ¡£¾ö²ßÊ÷µÄ¹¹½¨¹ý³Ì¾ÍÊǶÔÌØÕ÷µÄȡֵ²»¶Ï½øÐл®·ÖµÄ¹ý³Ì£¬¶ÔÓÚÀëÉ¢µÄÌØÕ÷£¬Èç¹ûÓÐ
M ¸öÖµ£¬×î¶à ¸ö»®·Ö£¬Èç¹ûÖµÊÇÓÐÐòµÄ£¬ÄÇô¾Í×î¶à
M-1 ¸ö»®·Ö¡£±ÈÈçÄêÁäÌØÕ÷£¬ÓÐÀÏ£¬ÖУ¬ÉÙ 3 ¸öÖµ£¬Èç¹ûÎÞÐòÓÐ ¸ö£¬¼´
3 ÖÖ»®·Ö£ºÀÏ|ÖУ¬ÉÙ£»ÀÏ£¬ÖÐ|ÉÙ£»ÀÏ£¬ÉÙ|ÖУ»Èç¹ûÊÇÓÐÐòµÄ£¬¼´°´ÀÏ£¬ÖУ¬ÉÙµÄÐò£¬ÄÇôֻÓÐ m-1 ¸ö£¬¼´
2 ÖÖ»®·Ö£¬ÀÏ|ÖУ¬ÉÙ£»ÀÏ£¬ÖÐ|ÉÙ¡£¶ÔÓÚÁ¬ÐøµÄÌØÕ÷£¬Æäʵ¾ÍÊǽøÐз¶Î§»®·Ö£¬¶ø»®·ÖµÄµã¾ÍÊÇ split£¨Çзֵ㣩£¬»®·Ö³öµÄÇø¼ä¾ÍÊÇ
bin¡£¶ÔÓÚÁ¬ÐøÌØÕ÷£¬ÀíÂÛÉÏ split ÊÇÎÞÊýµÄ£¬ÔÚ·Ö²¼»·¾³Ï²»¿ÉÄÜÈ¡³öËùÓеÄÖµ£¬Òò´ËËü²ÉÓõÄÊÇ£¨1£©ÖеÄÇеã³éÑùͳ¼Æ·½·¨¡£
Öð²ãѵÁ·£¨level-wise training£©£¬Èçͼ 8 Ëùʾ¡£µ¥»ú°æ±¾µÄ¾ö²ßÊýÉú³É¹ý³ÌÊÇͨ¹ýµÝ¹éµ÷Ó㨱¾ÖÊÉÏÊÇÉî¶ÈÓÅÏÈ£©µÄ·½Ê½¹¹ÔìÊ÷£¬ÔÚ¹¹ÔìÊ÷µÄͬʱ£¬ÐèÒªÒÆ¶¯Êý¾Ý£¬½«Í¬Ò»¸ö×Ó½ÚµãµÄÊý¾ÝÒÆ¶¯µ½Ò»Æð¡£´Ë·½·¨ÔÚ·Ö²¼Ê½Êý¾Ý½á¹¹ÉÏÎÞ·¨ÓÐЧµÄÖ´ÐУ¬¶øÇÒÒ²ÎÞ·¨Ö´ÐУ¬ÒòΪÊý¾ÝÌ«´ó£¬ÎÞ·¨·ÅÔÚÒ»Æð£¬ËùÒÔÔÚ·Ö²¼Ê½»·¾³Ï²ÉÓõIJßÂÔÊÇÖð²ã¹¹½¨Ê÷½Úµã£¨±¾ÖÊÉÏÊǹã¶ÈÓÅÏÈ£©£¬ÕâÑù±éÀúËùÓÐÊý¾ÝµÄ´ÎÊýµÈÓÚËùÓÐÊ÷ÖеÄ×î´ó²ãÊý¡£Ã¿´Î±éÀúʱ£¬Ö»ÐèÒª¼ÆËãÿ¸ö½ÚµãËùÓÐÇзֵãͳ¼Æ²ÎÊý£¬±éÀúÍêºó£¬¸ù¾Ý½ÚµãµÄÌØÕ÷»®·Ö£¬¾ö¶¨ÊÇ·ñÇз֣¬ÒÔ¼°ÈçºÎÇз֡£

ͼ 6. Çзֵã³éÑùͳ¼Æ

ͼ 7. ÌØÕ÷×°Ïä

ͼ 8. Öð²ãѵÁ·
Ëæ»úÉÁÖËã·¨Ô´Âë·ÖÎö
ÔÚ¶Ô¾ö²ßÊ÷¡¢Ëæ»úÉÁÖËã·¨ÔÀí¼° Spark ÉϵÄÓÅ»¯²ßÂÔµÄÀí½â»ù´¡ÉÏ£¬±¾½Ú½«¶Ô
Spark MLlib ÖеÄËæ»úÉÁÖËã·¨Ô´Âë½øÐзÖÎö¡£Ê×Ïȸø³öÁ˹ÙÍøÉϵÄË㷨ʹÓà demo£¬È»ºóÔÙÉîÈëµ½¶ÔÓ¦·½·¨Ô´ÂëÖУ¬¶ÔʵÏÖÔÀí½øÐзÖÎö¡£
Çåµ¥ 1. Ëæ»úÉÁÖʹÓà demo
import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.model.RandomForestModel import org.apache.spark.mllib.util.MLUtils // ¼ÓÔØÊý¾Ý val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") // ½«Êý¾ÝËæ»ú·ÖÅäΪÁ½·Ý£¬Ò»·ÝÓÃÓÚѵÁ·£¬Ò»·ÝÓÃÓÚ²âÊÔ val splits = data.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) // Ëæ»úÉÁÖѵÁ·²ÎÊýÉèÖà //·ÖÀàÊý val numClasses = 2 // categoricalFeaturesInfo Ϊ¿Õ£¬Òâζ×ÅËùÓеÄÌØÕ÷ΪÁ¬ÐøÐͱäÁ¿ val categoricalFeaturesInfo = Map[Int, Int]() //Ê÷µÄ¸öÊý val numTrees = 3 //ÌØÕ÷×Ó¼¯²ÉÑù²ßÂÔ£¬auto ±íʾËã·¨×ÔÖ÷ѡȡ val featureSubsetStrategy = "auto" //´¿¶È¼ÆËã val impurity = "gini" //Ê÷µÄ×î´ó²ã´Î val maxDepth = 4 //ÌØÕ÷×î´ó×°ÏäÊý val maxBins = 32 //ѵÁ·Ëæ»úÉÁÖ·ÖÀàÆ÷£¬trainClassifier ·µ»ØµÄÊÇ RandomForestModel ¶ÔÏó val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// ²âÊÔÊý¾ÝÆÀ¼ÛѵÁ·ºÃµÄ·ÖÀàÆ÷²¢¼ÆËã´íÎóÂÊ
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1
!= r._2).count.toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification forest model:\n"
+ model.toDebugString)
// ½«ÑµÁ·ºóµÄËæ»úÉÁÖÄ£Ðͳ־û¯
model.save(sc, "myModelPath")
//¼ÓÔØËæ»úÉÁÖÄ£Ð͵½ÄÚ´æ
val sameModel = RandomForestModel.load(sc, "myModelPath") |
ͨ¹ýÉÏÊöÑùÀý´úÂë¿ÉÒÔ¿´µ½£¬´ÓʹÓÃÕߵĽǶÈÀ´¿´£¬Ëæ»úÉÁÖÖйؼüµÄÀàÊÇ org.apache.spark.mllib.tree.RandomForest¡¢org.apache.spark.mllib.tree.model.RandomForestModel
ÕâÁ½¸öÀ࣬ËüÃÇÌṩÁËËæ»úÉÁÖ¾ßÌåµÄ trainClassifier ºÍ predict º¯Êý¡£
´ÓÉÏÃæµÄ demo ÖпÉÒÔ¿´µ½£¬ÑµÁ·Ëæ»úÉÁÖËã·¨²ÉÓõÄÊÇ RandomForest µÄ°éÉú¶ÔÏóÖÐµÄ trainClassifier
·½·¨£¬ÆäÔ´Â루Ϊ·½±ãÀí½â£¬±£Áô·½·¨Ç°ÃæµÄ×¢Êͼ°²ÎÊý˵Ã÷£©ÈçÏ£º
Çåµ¥ 2. ºËÐÄÔ´Âë·ÖÎö 1
/** * Method to train a decision tree model for binary or multiclass classification. * * @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. * Labels should take values {0, 1, ..., numClasses-1}. * @param numClasses number of classes for classification. * @param categoricalFeaturesInfo Map storing arity of categorical features. * E.g., an entry (n -> k) indicates that feature n is categorical * with k categories indexed from 0: {0, 1, ..., k-1}. * @param numTrees Number of trees in the random forest. * @param featureSubsetStrategy Number of features to consider for splits at each node. * Supported: "auto", "all", "sqrt", "log2", "onethird". * If "auto" is set, this parameter is set based on numTrees: * if numTrees == 1, set to "all"; * if numTrees > 1 (forest) set to "sqrt". * @param impurity Criterion used for information gain calculation. * Supported values: "gini" (recommended) or "entropy". * @param maxDepth Maximum depth of the tree. * E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. * (suggested value: 4) * @param maxBins maximum number of bins used for splitting features * (suggested value: 100) * @param seed Random seed for bootstrapping and choosing feature subsets. * @return a random forest model that can be used for prediction */ def trainClassifier( input: RDD[LabeledPoint], numClasses: Int, categoricalFeaturesInfo: Map[Int, Int], numTrees: Int, featureSubsetStrategy: String, impurity: String, maxDepth: Int, maxBins: Int, seed: Int = Utils.random.nextInt()): RandomForestModel = { val impurityType = Impurities.fromString(impurity) val strategy = new Strategy(Classification, impurityType, maxDepth, numClasses, maxBins, Sort, categoricalFeaturesInfo) //µ÷ÓõÄÊÇÖØÔØµÄÁíÍâÒ»¸ö trainClassifier trainClassifier(input, strategy, numTrees, featureSubsetStrategy, seed) } |
ÖØÔØºó trainClassifier ·½·¨´úÂëÈçÏ£º
Çåµ¥ 3. ºËÐÄÔ´Âë·ÖÎö 2
/** * Method to train a decision tree model for binary or multiclass classification. * * @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. * Labels should take values {0, 1, ..., numClasses-1}. * @param strategy Parameters for training each tree in the forest. * @param numTrees Number of trees in the random forest. * @param featureSubsetStrategy Number of features to consider for splits at each node. * Supported: "auto", "all", "sqrt", "log2", "onethird". * If "auto" is set, this parameter is set based on numTrees: * if numTrees == 1, set to "all"; * if numTrees > 1 (forest) set to "sqrt". * @param seed Random seed for bootstrapping and choosing feature subsets. * @return a random forest model that can be used for prediction */ def trainClassifier( input: RDD[LabeledPoint], strategy: Strategy, numTrees: Int, featureSubsetStrategy: String, seed: Int): RandomForestModel = { require(strategy.algo == Classification, s"RandomForest.trainClassifier given Strategy with invalid algo: ${strategy.algo}") //Ôڸ÷½·¨Öд´½¨ RandomForest ¶ÔÏó val rf = new RandomForest(strategy, numTrees, featureSubsetStrategy, seed) //ÔÙµ÷ÓÃÆä run ·½·¨£¬´«ÈëµÄ²ÎÊýÊÇÀàÐÍ RDD[LabeledPoint]£¬·½·¨·µ»ØµÄÊÇ RandomForestModel ʵÀý rf.run(input) } |
½øÈë RandomForest ÖÐµÄ run ·½·¨£¬Æä´úÂëÈçÏ£º
Çåµ¥ 4. ºËÐÄÔ´Âë·ÖÎö 3
/** * Method to train a decision tree model over an RDD * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] * @return a random forest model that can be used for prediction */ def run(input: RDD[LabeledPoint]): RandomForestModel = {
val timer = new TimeTracker()
timer.start("total")
timer.start("init")
val retaggedInput = input.retag(classOf[LabeledPoint])
//½¨Á¢¾ö²ßÊ÷µÄÔªÊý¾ÝÐÅÏ¢£¨·ÖÁѵãλÖá¢Ïä×ÓÊý¼°¸÷Ïä×Ó°üº¬ÌØÕ÷ÊôÐÔµÄÖµµÈµÈ£©
val metadata =
DecisionTreeMetadata.buildMetadata(retaggedInput,
strategy, numTrees, featureSubsetStrategy)
logDebug("algo = " + strategy.algo)
logDebug("numTrees = " + numTrees)
logDebug("seed = " + seed)
logDebug("maxBins = " + metadata.maxBins)
logDebug("featureSubsetStrategy = "
+ featureSubsetStrategy)
logDebug("numFeaturesPerNode = " + metadata.numFeaturesPerNode)
logDebug("subsamplingRate = " + strategy.subsamplingRate)
// Find the splits and the corresponding bins
(interval between the splits) using a sample
// of the input data.
timer.start("findSplitsBins")
//ÕÒµ½Çзֵ㣨splits£©¼°Ïä×ÓÐÅÏ¢£¨Bins£©
//¶ÔÓÚÁ¬ÐøÐÍÌØÕ÷£¬ÀûÓÃÇзֵã³éÑùͳ¼Æ¼ò»¯¼ÆËã
//¶ÔÓÚÃû³ÆÐÍÌØÕ÷£¬Èç¹ûÊÇÎÞÐòµÄ£¬Ôò×î¶àÓиö splits=2^(numBins-1)-1
»®·Ö
//Èç¹ûÊÇÓÐÐòµÄ£¬Ôò×î¶àÓÐ splits=numBins-1 ¸ö»®·Ö
val (splits, bins) = DecisionTree.findSplitsBins(retaggedInput,
metadata)
timer.stop("findSplitsBins")
logDebug("numBins: feature: number of bins")
logDebug(Range(0, metadata.numFeatures).map {
featureIndex =>
s"\t$featureIndex\t${metadata.numBins(featureIndex)}"
}.mkString("\n"))
// Bin feature values (TreePoint representation).
// Cache input RDD for speedup during multiple
passes.
//ת»»³ÉÊ÷Ð뵀 RDD ÀàÐÍ£¬×ª»»ºó£¬ËùÓÐÑù±¾µãÒѾ°´·ÖÁѵãÌõ¼þ·Öµ½Á˸÷×ÔµÄÏä×ÓÖÐ
val treeInput = TreePoint.convertToTreeRDD(retaggedInput,
bins, metadata)
val withReplacement = if (numTrees > 1) true
else false
// convertToBaggedRDD ·½·¨Ê¹µÃÿ¿ÃÊ÷¾ÍÊÇÑù±¾µÄÒ»¸ö×Ó¼¯
val baggedInput
= BaggedPoint.convertToBaggedRDD(treeInput,
strategy.subsamplingRate, numTrees,
withReplacement, seed).persist(StorageLevel.MEMORY_AND_DISK)
// depth of the decision tree
val maxDepth = strategy.maxDepth
require(maxDepth <= 30,
s"DecisionTree currently only supports maxDepth
<= 30, but was given maxDepth = $maxDepth.")
// Max memory usage for aggregates
// TODO: Calculate memory usage more precisely.
val maxMemoryUsage: Long = strategy.maxMemoryInMB
* 1024L * 1024L
logDebug("max memory usage for aggregates
= " + maxMemoryUsage + " bytes.")
val maxMemoryPerNode = {
val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures)
{
// Find numFeaturesPerNode largest bins to get
an upper bound on memory usage.
Some(metadata.numBins.zipWithIndex.sortBy(- _._1)
.take(metadata.numFeaturesPerNode).map(_._2))
} else {
None
}
//¼ÆËã¾ÛºÏ²Ù×÷ʱ½ÚµãµÄÄÚ´æ
RandomForest.aggregateSizeForNode(metadata, featureSubset)
* 8L
}
require(maxMemoryPerNode <= maxMemoryUsage,
s"RandomForest/DecisionTree given maxMemoryInMB
= ${strategy.maxMemoryInMB}," +
" which is too small for the given features."
+
s" Minimum value = ${maxMemoryPerNode / (1024L
* 1024L)}")
timer.stop("init")
/*
* The main idea here is to perform group-wise
training of the decision tree nodes thus
* reducing the passes over the data from (# nodes)
to (# nodes / maxNumberOfNodesPerGroup).
* Each data sample is handled by a particular
node (or it reaches a leaf and is not used
* in lower levels).
*/
// Create an RDD of node Id cache.
// At first, all the rows belong to the root nodes
(node Id == 1).
//½ÚµãÊÇ·ñʹÓûº´æ£¬½Úµã ID ´Ó 1 ¿ªÊ¼£¬1 ¼´ÎªÕâ¿ÅÊ÷µÄ¸ù½Úµã£¬×ó½ÚµãΪ 2£¬ÓÒ½ÚµãΪ
3£¬ÒÀ´ÎµÝÔöÏÂÈ¥
val nodeIdCache = if (strategy.useNodeIdCache)
{
Some(NodeIdCache.init(
data = baggedInput,
numTrees = numTrees,
checkpointInterval = strategy.checkpointInterval,
initVal = 1))
} else {
None
}
// FIFO queue of nodes to train: (treeIndex,
node)
val nodeQueue = new mutable.Queue[(Int, Node)]()
val rng = new scala.util.Random()
rng.setSeed(seed)
// Allocate and queue root nodes.
//´´½¨Ê÷µÄ¸ù½Úµã
val topNodes: Array[Node] = Array.fill[Node](numTrees)(Node.emptyNode(nodeIndex
= 1))
//½«£¨Ê÷µÄË÷Òý£¬ÊýµÄ¸ù½Úµã£©Èë¶Ó£¬Ê÷Ë÷Òý´Ó 0 ¿ªÊ¼£¬¸ù½Úµã´Ó 1 ¿ªÊ¼
Range(0, numTrees).foreach(treeIndex => nodeQueue.enqueue((treeIndex,
topNodes(treeIndex))))
while (nodeQueue.nonEmpty) {
// Collect some nodes to split, and choose features
for each node (if subsampling).
// Each group of nodes may come from one or multiple
trees, and at multiple levels.
// È¡µÃÿ¸öÊ÷ËùÓÐÐèÒªÇзֵĽڵã
val (nodesForGroup, treeToNodeToIndexInfo) =
RandomForest.selectNodesToSplit(nodeQueue, maxMemoryUsage,
metadata, rng)
// Sanity check (should never occur):
assert(nodesForGroup.size > 0,
s"RandomForest selected empty nodesForGroup.
Error for unknown reason.")
// Choose node splits, and enqueue new nodes
as needed.
timer.start("findBestSplits")
//ÕÒ³ö×îÓÅÇеã
DecisionTree.findBestSplits(baggedInput, metadata,
topNodes, nodesForGroup,
treeToNodeToIndexInfo, splits, bins, nodeQueue,
timer, nodeIdCache = nodeIdCache)
timer.stop("findBestSplits")
}
baggedInput.unpersist()
timer.stop("total")
logInfo("Internal timing for DecisionTree:")
logInfo(s"$timer")
// Delete any remaining checkpoints used for
node Id cache.
if (nodeIdCache.nonEmpty) {
try {
nodeIdCache.get.deleteAllCheckpoints()
} catch {
case e: IOException =>
logWarning(s"delete all checkpoints failed.
Error reason: ${e.getMessage}")
}
}
val trees = topNodes.map(topNode => new DecisionTreeModel(topNode,
strategy.algo))
new RandomForestModel(strategy.algo, trees)
}
} |
ÉÏÃæ¸ø³öµÄÊÇ RandomForest ÀàÖеĺËÐÄ·½·¨ run µÄ´úÂ룬ÔÚÈ·¶¨Çзֵ㼰Ïä×ÓÐÅÏ¢µÄʱºòµ÷ÓÃÁË
DecisionTree.findSplitsBins ·½·¨£¬ÌøÈë¸Ã·½·¨£¬¿ÉÒÔ¿´µ½ÈçÏ´úÂ룺
Çåµ¥ 5. ºËÐÄÔ´Âë·ÖÎö 4
/** * Returns splits and bins for decision tree calculation. * Continuous and categorical features are handled differently. * * Continuous features: * For each feature, there are numBins - 1 possible splits representing the possible binary * decisions at each node in the tree. * This finds locations (feature values) for splits using a subsample of the data. * * Categorical features: * For each feature, there is 1 bin per split. * Splits and bins are handled in 2 ways: * (a) "unordered features" * For multiclass classification with a low-arity feature * (i.e., if isMulticlass && isSpaceSufficientForAllCategoricalSplits), * the feature is split based on subsets of categories. * (b) "ordered features" * For regression and binary classification, * and for multiclass classification with a high-arity feature, * there is one bin per category. * * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] * @param metadata Learning and dataset metadata * @return A tuple of (splits, bins). * Splits is an Array of [[org.apache.spark.mllib.tree.model.Split]] * of size (numFeatures, numSplits). * Bins is an Array of [[org.apache.spark.mllib.tree.model.Bin]] * of size (numFeatures, numBins). */ protected[tree] def findSplitsBins( input: RDD[LabeledPoint], metadata: DecisionTreeMetadata): (Array[Array[Split]], Array[Array[Bin]]) = {
logDebug("isMulticlass = " + metadata.isMulticlass)
val numFeatures = metadata.numFeatures
// Sample the input only if there are continuous
features.
// ÅжÏÌØÕ÷ÖÐÊÇ·ñ´æÔÚÁ¬ÐøÌØÕ÷
val hasContinuousFeatures = Range(0, numFeatures).exists(metadata.isContinuous)
val sampledInput = if (hasContinuousFeatures)
{
// Calculate the number of samples for approximate
quantile calculation.
//²ÉÑùÑù±¾ÊýÁ¿£¬×îÉÙÓ¦¸ÃΪ 10000 ¸ö
val requiredSamples = math.max(metadata.maxBins
* metadata.maxBins, 10000)
//¼ÆËã²ÉÑù±ÈÀý
val fraction = if (requiredSamples < metadata.numExamples)
{
requiredSamples.toDouble / metadata.numExamples
} else {
1.0
}
logDebug("fraction of data used for calculating
quantiles = " + fraction)
input.sample(withReplacement = false, fraction,
new XORShiftRandom().nextInt()).collect()
} else {
//Èç¹ûΪÀëÉ¢ÌØÕ÷£¬Ôò¹¹½¨Ò»¸ö¿ÕÊý×飨¼´ÎÞÐè²ÉÑù£©
new Array[LabeledPoint](0)
}
// //·ÖÁѵã²ßÂÔ£¬Ä¿Ç° Spark ÖÐֻʵÏÖÁËÒ»ÖÖ²ßÂÔ£ºÅÅÐò Sort
metadata.quantileStrategy match {
case Sort =>
//ÿ¸öÌØÕ÷·Ö±ð¶ÔÓ¦Ò»×éÇзֵãλÖÃ
val splits = new Array[Array[Split]](numFeatures)
//´æ·ÅÇзֵãλÖöÔÓ¦µÄÏä×ÓÐÅÏ¢
val bins = new Array[Array[Bin]](numFeatures)
// Find all splits.
// Iterate over all features.
var featureIndex = 0
//±éÀúËùÓеÄÌØÕ÷
while (featureIndex < numFeatures) {
//ÌØÕ÷ΪÁ¬ÐøµÄÇé¿ö
if (metadata.isContinuous(featureIndex)) {
val featureSamples = sampledInput.map(lp =>
lp.features(featureIndex))
// findSplitsForContinuousFeature ·µ»ØÁ¬ÐøÌØÕ÷µÄËùÓÐÇзÖλÖÃ
val featureSplits = findSplitsForContinuousFeature(featureSamples,
metadata, featureIndex)
val numSplits = featureSplits.length
//Á¬ÐøÌØÕ÷µÄÏä×ÓÊýΪÇзֵã¸öÊý+1
val numBins = numSplits + 1
logDebug(s"featureIndex = $featureIndex,
numSplits = $numSplits")
//ÇзֵãÊý×é¼°ÌØÕ÷Ïä×ÓÊý×é
splits(featureIndex) = new Array[Split](numSplits)
bins(featureIndex) = new Array[Bin](numBins)
var splitIndex = 0
//±éÀúÇзֵã
while (splitIndex < numSplits) {
//»ñÈ¡Çзֵã¶ÔÓ¦µÄÖµ£¬ÓÉÓÚÊÇÅŹýÐòµÄ£¬Òò´ËËü¾ßÓÐãÐÖµÊôÐÔ
val threshold = featureSplits(splitIndex)
//±£´æ¶ÔÓ¦ÌØÕ÷ËùÓеÄÇзֵãλÖÃÐÅÏ¢
splits(featureIndex)(splitIndex) =
new Split(featureIndex, threshold, Continuous,
List())
splitIndex += 1
}
//²ÉÓÃ×îСãÐÖµ Double.MinValue ×÷Ϊ×î×ó±ßµÄ·ÖÁÑλÖò¢½øÐÐ×°Ïä
bins(featureIndex)(0) = new Bin(new DummyLowSplit(featureIndex,
Continuous),
splits(featureIndex)(0), Continuous, Double.MinValue)
splitIndex = 1
//³ý×îºóÒ»¸öÏä×ÓÍâÊ£ÓàÏä×ӵļÆË㣬¸÷Ïä×ÓÀォ´æ·ÅµÄÊÇÁ½¸öÇзֵãλÖÃãÐÖµÇø¼äµÄÊôÐÔÖµ
while (splitIndex < numSplits) {
bins(featureIndex)(splitIndex) =
new Bin(splits(featureIndex)(splitIndex - 1),
splits(featureIndex)(splitIndex),
Continuous, Double.MinValue)
splitIndex += 1
}
//×îºóÒ»¸öÏä×ӵļÆËã²ÉÓÃ×î´óãÐÖµ Double.MaxValue ×÷Ϊ×îÓұߵÄÇзÖλÖÃ
bins(featureIndex)(numSplits) = new Bin(splits(featureIndex)(numSplits
- 1),
new DummyHighSplit(featureIndex, Continuous),
Continuous, Double.MinValue)
} else { //ÌØÕ÷ΪÀëÉ¢Çé¿öʱµÄ¼ÆËã
val numSplits = metadata.numSplits(featureIndex)
val numBins = metadata.numBins(featureIndex)
// Categorical feature
//ÀëÏßÊôÐԵĸöÊý
val featureArity = metadata.featureArity(featureIndex)
//ÌØÕ÷ÎÞÐòʱµÄ´¦Àí·½Ê½
if (metadata.isUnordered(featureIndex)) {
// Unordered features
// 2^(maxFeatureValue - 1) - 1 combinations
splits(featureIndex) = new Array[Split](numSplits)
var splitIndex = 0
while (splitIndex < numSplits) {
//ÌáÈ¡ÌØÕ÷µÄÊôÐÔÖµ£¬·µ»Ø¼¯ºÏ°üº¬ÆäÖÐÒ»¸ö»ò¶à¸öµÄÀëÉ¢ÊôÐÔÖµ
val categories: List[Double] =
extractMultiClassCategories(splitIndex + 1, featureArity)
splits(featureIndex)(splitIndex) =
new Split(featureIndex, Double.MinValue, Categorical,
categories)
splitIndex += 1
}
} else {
//ÓÐÐòÌØÕ÷ÎÞÐè´¦Àí£¬Ïä×ÓÓëÌØÕ÷Öµ¶ÔÓ¦
// Ordered features
// Bins correspond to feature values, so we do
not need to compute splits or bins
// beforehand. Splits are constructed as needed
during training.
splits(featureIndex) = new Array[Split](0)
}
// For ordered features, bins correspond to feature
values.
// For unordered categorical features, there is
no need to construct the bins.
// since there is a one-to-one correspondence
between the splits and the bins.
bins(featureIndex) = new Array[Bin](0)
}
featureIndex += 1
}
(splits, bins)
case MinMax =>
throw new UnsupportedOperationException("minmax
not supported yet.")
case ApproxHist =>
throw new UnsupportedOperationException("approximate
histogram not supported yet.")
}
} |
³ý findSplitsBins ·½·¨Í⣬»¹ÓÐÒ»¸ö·Ç³£ÖØÒªµÄ DecisionTree.findBestSplits()
·½·¨£¬ÓÃÓÚ×îÓÅÇзֵãµÄ²éÕÒ£¬¸Ã·½·¨ÖеĹؼüÊÇ¶Ô binsToBestSplit ·½·¨µÄµ÷Óã¬Æä binsToBestSplit
·½·¨´úÂëÈçÏ£º
Çåµ¥ 6. ºËÐÄÔ´Âë·ÖÎö 5
/** * Find the best split for a node. * @param binAggregates Bin statistics. * @return tuple for best split: (Split, information gain, prediction at node) */ private def binsToBestSplit( binAggregates: DTStatsAggregator, // DTStatsAggregator£¬ÆäÖÐÒýÓÃÁË ImpurityAggregator£¬¸ø³ö¼ÆËã²»´¿¶È impurity µÄÂß¼ splits: Array[Array[Split]], featuresForNode: Option[Array[Int]], node: Node): (Split, InformationGainStats, Predict) = {
// calculate predict and impurity if current
node is top node
val level = Node.indexToLevel(node.id)
var predictWithImpurity: Option[(Predict, Double)]
= if (level == 0) {
None
} else {
Some((node.predict, node.impurity))
}
// For each (feature, split), calculate the gain,
and select the best (feature, split).
//¶Ô¸÷ÌØÕ÷¼°Çзֵ㣬¼ÆËãÆäÐÅÏ¢ÔöÒæ²¢´ÓÖÐÑ¡Ôñ×îÓÅ (feature, split)
val (bestSplit, bestSplitStats) =
Range(0, binAggregates.metadata.numFeaturesPerNode).map
{ featureIndexIdx =>
val featureIndex = if (featuresForNode.nonEmpty)
{
featuresForNode.get.apply(featureIndexIdx)
} else {
featureIndexIdx
}
val numSplits = binAggregates.metadata.numSplits(featureIndex)
//ÌØÕ÷ΪÁ¬ÐøÖµµÄÇé¿ö
if (binAggregates.metadata.isContinuous(featureIndex))
{
// Cumulative sum (scanLeft) of bin statistics.
// Afterwards, binAggregates for a bin is the
sum of aggregates for
// that bin + all preceding bins.
val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
var splitIndex = 0
while (splitIndex < numSplits) {
binAggregates.mergeForFeature(nodeFeatureOffset,
splitIndex + 1, splitIndex)
splitIndex += 1
}
// Find best split.
val (bestFeatureSplitIndex, bestFeatureGainStats)
=
Range(0, numSplits).map { case splitIdx =>
//¼ÆËã leftChild ¼° rightChild ×Ó½ÚµãµÄ impurity
val leftChildStats = binAggregates.getImpurityCalculator(nodeFeatureOffset,
splitIdx)
val rightChildStats = binAggregates.getImpurityCalculator(nodeFeatureOffset,
numSplits)
rightChildStats.subtract(leftChildStats)
//Çó impurity µÄÔ¤²âÖµ£¬²ÉÓõÄÊÇÆ½¾ùÖµ¼ÆËã
predictWithImpurity = Some(predictWithImpurity.getOrElse(
calculatePredictImpurity(leftChildStats, rightChildStats)))
//ÇóÐÅÏ¢ÔöÒæ information gain Öµ£¬ÓÃÓÚÆÀ¹ÀÇзֵãÊÇ·ñ×îÓÅ
val gainStats = calculateGainForSplit(leftChildStats,
rightChildStats, binAggregates.metadata, predictWithImpurity.get._2)
(splitIdx, gainStats)
}.maxBy(_._2.gain)
(splits(featureIndex)(bestFeatureSplitIndex),
bestFeatureGainStats)
} else if (binAggregates.metadata.isUnordered(featureIndex))
{ //ÎÞÐòÀëÉ¢ÌØÕ÷ʱµÄÇé¿ö
// Unordered categorical feature
val (leftChildOffset, rightChildOffset) =
binAggregates.getLeftRightFeatureOffsets(featureIndexIdx)
val (bestFeatureSplitIndex, bestFeatureGainStats)
=
Range(0, numSplits).map { splitIndex =>
val leftChildStats = binAggregates.getImpurityCalculator(leftChildOffset,
splitIndex)
val rightChildStats = binAggregates.getImpurityCalculator(rightChildOffset,
splitIndex)
predictWithImpurity = Some(predictWithImpurity.getOrElse(
calculatePredictImpurity(leftChildStats, rightChildStats)))
val gainStats = calculateGainForSplit(leftChildStats,
rightChildStats, binAggregates.metadata, predictWithImpurity.get._2)
(splitIndex, gainStats)
}.maxBy(_._2.gain)
(splits(featureIndex)(bestFeatureSplitIndex),
bestFeatureGainStats)
} else { //ÓÐÐòÀëÉ¢ÌØÕ÷ʱµÄÇé¿ö
// Ordered categorical feature
val nodeFeatureOffset = binAggregates.getFeatureOffset(featureIndexIdx)
val numBins = binAggregates.metadata.numBins(featureIndex)
/* Each bin is one category (feature value).
* The bins are ordered based on centroidForCategories,
and this ordering determines which
* splits are considered. (With K categories, we
consider K - 1 possible splits.)
*
* centroidForCategories is a list: (category,
centroid)
*/
//¶àÔª·ÖÀàʱµÄÇé¿ö
val centroidForCategories = if (binAggregates.metadata.isMulticlass)
{
// For categorical variables in multiclass classification,
// the bins are ordered by the impurity of their
corresponding labels.
Range(0, numBins).map { case featureValue =>
val categoryStats = binAggregates.getImpurityCalculator(nodeFeatureOffset,
featureValue)
val centroid = if (categoryStats.count != 0) {
// impurity ÇóµÄ¾ÍÊǾù·½²î
categoryStats.calculate()
} else {
Double.MaxValue
}
(featureValue, centroid)
}
} else { // »Ø¹é»ò¶þÔª·ÖÀàʱµÄÇé¿ö regression or binary classification
// For categorical variables in regression and
binary classification,
// the bins are ordered by the centroid of their
corresponding labels.
Range(0, numBins).map { case featureValue =>
val categoryStats = binAggregates.getImpurityCalculator(nodeFeatureOffset,
featureValue)
val centroid = if (categoryStats.count != 0) {
//ÇóµÄ¾ÍÊÇÆ½¾ùÖµ×÷Ϊ impurity
categoryStats.predict
} else {
Double.MaxValue
}
(featureValue, centroid)
}
}
logDebug("Centroids for categorical variable:
" + centroidForCategories.mkString(","))
// bins sorted by centroids
val categoriesSortedByCentroid = centroidForCategories.toList.sortBy(_._2)
logDebug("Sorted centroids for categorical
variable = " +
categoriesSortedByCentroid.mkString(","))
// Cumulative sum (scanLeft) of bin statistics.
// Afterwards, binAggregates for a bin is the
sum of aggregates for
// that bin + all preceding bins.
var splitIndex = 0
while (splitIndex < numSplits) {
val currentCategory = categoriesSortedByCentroid(splitIndex)._1
val nextCategory = categoriesSortedByCentroid(splitIndex
+ 1)._1
//½«Á½¸öÏä×ÓµÄ״̬ÐÅÏ¢½øÐкϲ¢
binAggregates.mergeForFeature(nodeFeatureOffset,
nextCategory, currentCategory)
splitIndex += 1
}
// lastCategory = index of bin with total aggregates
for this (node, feature)
val lastCategory = categoriesSortedByCentroid.last._1
// Find best split.
//ͨ¹ýÐÅÏ¢ÔöÒæÖµÑ¡Ôñ×îÓÅÇзֵã
val (bestFeatureSplitIndex, bestFeatureGainStats)
=
Range(0, numSplits).map { splitIndex =>
val featureValue = categoriesSortedByCentroid(splitIndex)._1
val leftChildStats =
binAggregates.getImpurityCalculator(nodeFeatureOffset,
featureValue)
val rightChildStats =
binAggregates.getImpurityCalculator(nodeFeatureOffset,
lastCategory)
rightChildStats.subtract(leftChildStats)
predictWithImpurity = Some(predictWithImpurity.getOrElse(
calculatePredictImpurity(leftChildStats, rightChildStats)))
val gainStats = calculateGainForSplit(leftChildStats,
rightChildStats, binAggregates.metadata, predictWithImpurity.get._2)
(splitIndex, gainStats)
}.maxBy(_._2.gain)
val categoriesForSplit =
categoriesSortedByCentroid.map(_._1.toDouble).slice(0,
bestFeatureSplitIndex + 1)
val bestFeatureSplit =
new Split(featureIndex, Double.MinValue, Categorical,
categoriesForSplit)
(bestFeatureSplit, bestFeatureGainStats)
}
}.maxBy(_._2.gain)
(bestSplit, bestSplitStats, predictWithImpurity.get._1)
} |
ÉÏÊö´úÂë¸ø³öÁËÒ»¸öÍêÕûµÄËæ»úÉÁÖ¹¹Ôì¹ý³ÌºËÐÄ´úÂ룬ÎÒÃÇÒ²Ìáµ½ RandomForest
ÖÐµÄ run ·½·¨·µ»ØµÄÊÇ RandomForestModel£¬¸ÃÀàµÄ´úÂëÈçÏ£º
Çåµ¥ 7. ºËÐÄÔ´Âë·ÖÎö 6
/** * :: Experimental :: * Represents a random forest model. * * @param algo algorithm for the ensemble model, either Classification or Regression * @param trees tree ensembles */ // RandomForestModel À©Õ¹×Ô TreeEnsembleModel @Experimental class RandomForestModel(override val algo: Algo, override val trees: Array[DecisionTreeModel]) extends TreeEnsembleModel(algo, trees, Array.fill(trees.length)(1.0), combiningStrategy = if (algo == Classification) Vote else Average) with Saveable {
require(trees.forall(_.algo == algo))
//½«ÑµÁ·ºÃµÄÄ£Ðͳ־û¯
override def save(sc: SparkContext, path: String):
Unit = {
TreeEnsembleModel.SaveLoadV1_0.save(sc, path,
this,
RandomForestModel.SaveLoadV1_0.thisClassName)
}
override protected def formatVersion: String
= RandomForestModel.formatVersion
}
object RandomForestModel extends Loader[RandomForestModel]
{
private[mllib] def formatVersion: String = TreeEnsembleModel.SaveLoadV1_0.thisFormatVersion
//½«ÑµÁ·ºÃµÄÄ£ÐͼÓÔØµ½ÄÚ´æ
override def load(sc: SparkContext, path: String):
RandomForestModel = {
val (loadedClassName, version, jsonMetadata) =
Loader.loadMetadata(sc, path)
val classNameV1_0 = SaveLoadV1_0.thisClassName
(loadedClassName, version) match {
case (className, "1.0") if className
== classNameV1_0 =>
val metadata = TreeEnsembleModel.SaveLoadV1_0.readMetadata(jsonMetadata)
assert(metadata.treeWeights.forall(_ == 1.0))
val trees =
TreeEnsembleModel.SaveLoadV1_0.loadTrees(sc, path,
metadata.treeAlgo)
new RandomForestModel(Algo.fromString(metadata.algo),
trees)
case _ => throw new Exception(s"RandomForestModel.load
did not recognize model" +
s" with (className, format version): ($loadedClassName,
$version). Supported:\n" +
s" ($classNameV1_0, 1.0)")
}
}
private object SaveLoadV1_0 {
// Hard-code class name string in case it changes
in the future
def thisClassName: String = "org.apache.spark.mllib.tree.model.RandomForestModel"
}
} |
ÔÚÀûÓÃËæ»úÉÁÖ½øÐÐÔ¤²âʱ£¬µ÷ÓÃµÄ predict ·½·¨À©Õ¹×Ô TreeEnsembleModel£¬ËüÊÇÊ÷½á¹¹×éºÏÄ£Ð͵ıíʾ£¬³ýËæ»úÉÁÖÍ⻹°üÀ¨
Gradient-Boosted Trees (GBTs)£¬Æä²¿·ÖºËÐÄ´úÂëÈçÏ£º
Çåµ¥ 8. ºËÐÄÔ´Âë·ÖÎö 7
/** * Represents a tree ensemble model. * * @param algo algorithm for the ensemble model, either Classification or Regression * @param trees tree ensembles * @param treeWeights tree ensemble weights * @param combiningStrategy strategy for combining the predictions, not used for regression. */ private[tree] sealed class TreeEnsembleModel( protected val algo: Algo, protected val trees: Array[DecisionTreeModel], protected val treeWeights: Array[Double], protected val combiningStrategy: EnsembleCombiningStrategy) extends Serializable {
require(numTrees > 0, "TreeEnsembleModel
cannot be created without trees.")
//ÆäËü´úÂëÊ¡ÂÔ
//ͨ¹ýͶƱʵÏÖ×îÖյķÖÀà
/**
* Classifies a single data point based on (weighted)
majority votes.
*/
private def predictByVoting(features: Vector):
Double = {
val votes = mutable.Map.empty[Int, Double]
trees.view.zip(treeWeights).foreach { case (tree,
weight) =>
val prediction = tree.predict(features).toInt
votes(prediction) = votes.getOrElse(prediction,
0.0) + weight
}
votes.maxBy(_._2)._1
}
/**
* Predict values for a single data point using
the model trained.
*
* @param features array representing a single
data point
* @return predicted category from the trained
model
*/
//²»Í¬µÄ²ßÂÔ²ÉÓò»Í¬µÄÔ¤²â·½·¨
def findSplitsBins(features: Vector): Double =
{
(algo, combiningStrategy) match {
case (Regression, Sum) =>
predictBySumming(features)
case (Regression, Average) =>
predictBySumming(features) / sumWeights
case (Classification, Sum) => // binary classification
val prediction = predictBySumming(features)
// TODO: predicted labels are +1 or -1 for GBT.
Need a better way to store this info.
if (prediction > 0.0) 1.0 else 0.0
//Ëæ»úÉÁÖ¶ÔÓ¦ predictByVoting ·½·¨
case (Classification, Vote) =>
predictByVoting(features)
case _ =>
throw new IllegalArgumentException(
"TreeEnsembleModel given unsupported (algo,
combiningStrategy) combination: " +
s"($algo, $combiningStrategy).")
}
}
// predict ·½·¨µÄ¾ßÌåʵÏÖ
/**
* Predict values for the given data set.
*
* @param features RDD representing data points
to be predicted
* @return RDD[Double] where each entry contains
the corresponding prediction
*/
def predict(features: RDD[Vector]): RDD[Double]
= features.map(x => findSplitsBins (x))
//ÆäËü´úÂëÊ¡ÂÔ
} |
ͨ¹ýÉÏÊöºËÐÄ´úÂë·ÖÎö£¬ÎÒÃÇÒѾÀí½âÁËÕû¸öËæ»úÉÁÖËã·¨µÄÄÚ²¿»úÖÆ£¬ÏÂһС½Ú¸ø³öÆäʵ¼ÊʹÓð¸Àý¡£
Ëæ»úÉÁÖËã·¨°¸Àýʵս
±¾½Ú½«Í¨¹ý½éÉÜÒ»¸ö°¸ÀýÀ´ËµÃ÷Ëæ»úÉÁֵľßÌåÓ¦Óá£Ò»°ãÒøÐÐÔÚ»õ¿î֮ǰ¶¼ÐèÒª¶Ô¿Í»§µÄ»¹¿îÄÜÁ¦½øÐÐÆÀ¹À£¬µ«Èç¹û¿Í»§Êý¾ÝÁ¿±È½ÏÅÓ´ó£¬ÐÅ´ûÉóºËÈËÔ±µÄѹÁ¦»á·Ç³£´ó£¬´Ëʱ³£³£»áÏ£Íûͨ¹ý¼ÆËã»úÀ´½øÐи¨Öú¾ö²ß¡£Ëæ»úÉÁÖËã·¨¿ÉÒÔÔڸó¡¾°ÏÂʹÓã¬ÀýÈç¿ÉÒÔ½«ÔÓеÄÀúÊ·Êý¾ÝÊäÈëµ½Ëæ»úÉÁÖËã·¨µ±ÖнøÐÐÊý¾ÝѵÁ·£¬ÀûÓÃѵÁ·ºóµÃµ½µÄÄ£ÐͶÔеĿͻ§Êý¾Ý½øÐзÖÀ࣬ÕâÑù±ã¿ÉÒÔ¹ýÂ˵ô´óÁ¿µÄÎÞ»¹¿îÄÜÁ¦µÄ¿Í»§£¬Èç´Ë±ãÄܼ«´óµØ¼õÉÙÐÅ»õÉóºËÈËÔ±µÄ¹¤×÷Á¿¡£

ͼ 9. Spark ¼¯ÈºÔËÐÐЧ¹ûͼ
¼ÙÉè´æÔÚÏÂÁÐÐÅ´ûÓû§ÀúÊ·»¹¿î¼Ç¼£º
±í 2. ÐÅ´ûÓû§ÀúÊ·»¹¿îÊý¾Ý±í

ÉÏÊöÐÅ´ûÓû§ÀúÊ·»¹¿î¼Ç¼±»¸ñʽ»¯Îª label index1:feature1
index2:feature2 index3:feature3 ÕâÖÖ¸ñʽ£¬ÀýÈçÉϱíÖеĵÚÒ»Ìõ¼Ç¼½«±»¸ñʽ»¯Îª
0 1:0 2:1 3:10£¬¸÷×ֶκ¬ÒåÈçÏ£º
ÊÇ·ñ¾ß±¸»¹¿îÄÜÁ¦ ÊÇ·ñÓµÓз¿²ú »éÒöÇé¿ö£¬0 ±íʾµ¥Éí¡¢ ÄêÊÕÈë
0 ±íʾÊÇ£¬1 ±íʾ·ñ 0 ±íʾ·ñ£¬1 ±íʾÊÇ 1 ±íʾÒѻ顢2 ±íʾÀë»é
ÌîÈëʵ¼ÊÊý×Ö
0 1:0 2:1 3:10
½«±íÖÐËùÓÐÊý¾Ýת»»ºó£¬±£´æÎª sample_data.txt£¬¸ÃÊý¾ÝÓÃÓÚѵÁ·Ëæ»úÉÁÖ¡£²âÊÔÊý¾ÝΪ£º
±í 3. ²âÊÔÊý¾Ý±í

Èç¹ûËæ»úÉÁÖÄ£ÐÍѵÁ·ÕýÈ·µÄ»°£¬ÉÏÃæÕâÌõÓû§Êý¾ÝµÃµ½µÄ½á¹ûÓ¦¸ÃÊǾ߱¸»¹¿îÄÜÁ¦£¬Îª·½±ãºóÆÚ´¦Àí£¬ÎÒÃǽ«Æä±£´æÎª
input.txt£¬ÄÚÈÝΪ£º
0 1:0 2:1 3:12
½« sample_data.txt¡¢input.txt ÀûÓà hadoop
fs ¨Cput input.txt sample_data.txt /data ÉÏ´«µ½ HDFS ÖеÄ/data
Ŀ¼µ±ÖУ¬ÔÙ±àдÈçÇåµ¥ 9 ËùʾµÄ´úÂë½øÐÐÑéÖ¤
Çåµ¥ 9. ÅжϿͻ§ÊÇ·ñ¾ßÓл¹´ûÄÜÁ¦Ëµ
package cn.ml
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.linalg.Vectors
object RandomForstExample {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("RandomForestExample").
setMaster("spark://sparkmaster:7077")
val sc = new SparkContext(sparkConf)
val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc,
"/data/sample_data.txt")
val numClasses = 2
val featureSubsetStrategy = "auto"
val numTrees = 3
val model: RandomForestModel =RandomForest.trainClassifier(
data, Strategy.defaultStrategy("classification"),numTrees,
featureSubsetStrategy,new java.util.Random().nextInt())
val input: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc,
"/data/input.txt")
val predictResult = input.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
//´òÓ¡Êä³ö½á¹û£¬ÔÚ spark-shell ÉÏÖ´ÐÐʱʹÓÃ
predictResult.collect()
//½«½á¹û±£´æµ½ hdfs //predictResult.saveAsTextFile("/data/predictResult")
sc.stop()
}
} |
ÉÏÊö´úÂë¼È¿ÉÒÔ´ò°üºóÀûÓà spark-summit Ìá½»µ½·þÎñÆ÷ÉÏÖ´ÐУ¬Ò²¿ÉÒÔÔÚ
spark-shell ÉÏÖ´Ðв鿴½á¹û. ͼ 10 ¸ø³öÁËѵÁ·µÃµ½µÄRadomForest Ä£Ðͽá¹û£¬Í¼
11 ¸ø³öÁË RandomForest Ä£ÐÍÔ¤²âµÃµ½µÄ½á¹û£¬¿ÉÒÔ¿´µ½Ô¤²â½á¹ûÓëÔ¤ÆÚÊÇÒ»Öµġ£

ͼ 10. ѵÁ·µÃµ½µÄ RadomForest
Ä£ÐÍ

ͼ 11. collect ·½·¨·µ»ØµÄ½á¹û
½áÊøÓï
±¾ÎĶÔËæ»úÉÁÖËã·¨½øÐÐÁËÉîÈë·ÖÎö£¬Ê×ÏÈ·ÖÎöÁ˾ö²ßÊ÷Ëã·¨µÄÔÀí²¢Ö¸³öÆä´æÔÚ¹ý¶ÈÄâºÏÎÊÌ⣬ȻºóÖ¸³öËæ»úÉÁÖËã·¨Äܹ»ºÜºÃµØ±ÜÃâ¸ÃÎÊÌ⣬ÒòÎªËæ»úÉÁÖͨ¹ýÈô¸É¾ö²ßÊ÷½øÐÐͶƱÀ´¾ö¶¨×îÖÕµÄÔ¤²â»ò·ÖÀà½á¹û¡£Îª½â¾öËæ»úÉÁÖ·Ö²¼»·¾³ÏµÄЧÂÊÎÊÌ⣬ÔÚÎÄÖÐչʾÁËËæ»úÉÁÖÔÚ·Ö²¼Ê½»·¾³ÏµÄÓÅ»¯²ßÂÔ£¬ÔÚ´Ë»ù´¡ÉÏ¶ÔÆä
Spark ÉϵĺËÐÄÔ´Âë½øÐÐÁË·ÖÎö£¬×îºó¸ø³öÁËËæ»úÉÁÖËã·¨µÄÒ»¸öʵ¼Ê°¸Àý£¬¸ø´ó¼Ò³ÊÏÖÁËÈçºÎÀûÓøÃËã·¨½øÐÐÓÅÖʿͻ§µÄ·ÖÀà¡£
|