Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
»ùÓÚSpark ML Pipeline¹¹½¨»úÆ÷ѧϰӦÓÃ
 
À´Ô´£ºshiyanjun.cn ·¢²¼ÓÚ£º 2017-10-18
  5172  次浏览      27
 

ʹÓûúÆ÷ѧϰµÄ·½·¨¿ÉÒÔ½â¾öÔ½À´Ô½¶àµÄʵ¼ÊÎÊÌ⣬ËüÔÚÏÖʵÊÀ½çÖеÄÓ¦ÓÃÔ½À´Ô½¹ã·º£¬±ÈÈçÖÇÄÜ·ç¿Ø¡¢ÆÛÕ©¼ì²â¡¢¸öÐÔ»¯ÍƼö¡¢»úÆ÷·­Ò롢ģʽʶ±ð¡¢ÖÇÄÜ¿ØÖÆ£¬µÈµÈ¡£

»úÆ÷ѧϰ·ÖÀà

ÎÒÃǶ¼ÖªµÀ£¬»úÆ÷ѧϰ¿ÉÒÔ·ÖΪÈý´óÀࣺ¼à¶½Ñ§Ï°£¨Supervised Learning£©¡¢Î޼ලѧϰ£¨Unsupervised Learning£©ºÍÇ¿»¯Ñ§Ï°£¨Reinforcement Learning£©£¬ÏÂÃæ¼òµ¥½éÉÜÒ»ÏÂËüÃǺ¬Ò壺

¼à¶½Ñ§Ï°

¼à¶½Ñ§Ï°ÊǸù¾Ý¸ø¶¨µÄ±êÇ©£¨Label£©ÒÑÖªµÄѵÁ·Êý¾Ý¼¯£¬Í¨¹ýÑ¡¶¨µÄËã·¨ÔÚ¸ÃѵÁ·Êý¾Ý¼¯ÉϽøÐÐѵÁ·Ñ§Ï°£¬×îºóµÃµ½Ò»¸ö¿ÉÒÔÃèÊö¸ÃÊý¾Ý¼¯¹æÂɵÄÔ¤²âº¯Êý£¬Ò²¾ÍÊÇÎÒÃÇËù˵µÄÄ£ÐÍ¡£ÓÐÁËÄ£ÐÍ£¬¶ÔÓÚδ֪±êÇ©µÄÊäÈëÊý¾Ý£¬¿ÉÒÔͨ¹ý¸ÃÔ¤²âº¯ÊýÔ¤²â³öËüµÄ±êÇ©¡£µäÐ͵ļලѧϰ·½·¨£¬Èç·ÖÀà¡¢»Ø¹éµÈ¡£

Î޼ලѧϰ

Î޼ලѧϰÊǸù¾Ý¸ø¶¨µÄ±êÇ©£¨Label£©Î´ÖªµÄѵÁ·Êý¾Ý¼¯£¬Í¨¹ýѵÁ·Ñ§Ï°´ÓѵÁ·Êý¾Ý¼¯Öз¢ÏÖÒþ²ØµÄģʽ»ò½á¹¹¡£µäÐ͵ÄÎ޼ලѧϰ·½·¨£¬Èç¾ÛÀà·ÖÎö¡£

Ç¿»¯Ñ§Ï°

Ç¿»¯Ñ§Ï°ÊÇÈ˹¤ÖÇÄÜÖеIJßÂÔѧϰµÄÒ»ÖÖ£¬´Ó¶¯Îïѧϰ¡¢²ÎÊýÈŶ¯×ÔÊÊÓ¦¿ØÖÆÀíÂÛ·¢Õ¹¶øÀ´¡£ÕâÖÖѧϰ·½·¨ÊÇ´Ó»·¾³×´Ì¬µ½¶¯×÷Ó³ÉäµÄѧϰ·½·¨£¬Ê¹¶¯×÷´Ó»·¾³ÖлñµÃµÄÀۼƽ±ÉÍÖµ×î´ó¡£Í¨¹ý²»¶ÏµØÊÔ´íÀ´Ñ§Ï°£¬´Ó¶ø·¢ÏÖ×îÓÅÐÐΪ²ßÂÔ¡£³£ÓõÄÇ¿»¯Ñ§Ï°Ëã·¨£¬ÈçQѧϰËã·¨¡¢SARSAËã·¨¡£

»úÆ÷ѧϰ»ù±¾¹ý³Ì

»ùÓÚÉÏÃæ½éÉܵĻúÆ÷ѧϰµÄ·ÖÀà·½·¨£¬ÎÒÃÇ¿ÉÒÔ´Ó¸ü¹ãÒåµÄ·¶Î§À´×ܽá³ö£¬¹¹½¨Ò»¸öÍêÕûµÄ»úÆ÷ѧϰӦÓóÌÐò£¬Í¨³£Ó¦¸Ã°üº¬µÄ»ù±¾¹ý³Ì¡£ÏÂÃæ£¬¸ø³öÁ˹¹½¨»úÆ÷ѧϰӦÓÃÒ»°ãÐèÒª¾­ÀúµÄ4¸öÖ÷Òª½×¶Î£º

Êý¾Ý×¼±¸

Êý¾Ý×¼±¸½×¶Î£¬Í¨³£»áÓÐÒ»¸ö»òÕß¶à¸öÒѾ­´æÔÚµÄÊý¾Ý¼¯£¬Êý¾Ý¼¯µÄ״̬¿ÉÄܾàÀëÉú²ú¸ÃÊý¾ÝµÄÔ´Í··Ç³£½ü£¬Êý¾Ý¸ñʽ¶àÖÖ¶àÑù£¬²»¹æ·¶¡¢È±Ê§Öµ¡¢´íÎóÖµËæ´¦¿É¼û¡£»¹ÓпÉÄÜ£¬Êý¾Ý¼¯°üº¬¶à¸ö²»Í¬ÀàÐ͵Ä×ÓÊý¾Ý¼¯£¬µ¥¶ÀÄóöÿһ¸ö×ÓÊý¾Ý¼¯¶Ô»úÆ÷ѧϰģÐÍѵÁ·£¬¶¼Ã»ÓÐʲôÒâÒ壬³ý·ÇÎÒÃǾÍÐèÒª¶ÔÕâЩ×ÓÊý¾Ý¼¯½øÐÐһЩ´¦Àí¡£

¶ÔÓÚÕâÑùµÄÊäÈëÊý¾Ý¼¯£¬Èç¹û²»¼Ó´¦Àí¶øÖ±½ÓÄÃÀ´½øÐлúÆ÷ѧϰģÐ͵ÄѵÁ·£¬Ò»¸ö½á¹ûÊǸù±¾ÎÞ·¨Ê¹ÓÃÕâÑùÊý¾Ý¼¯×÷ΪÉú²ú»úÆ÷ѧϰģÐ͵ÄÊäÈ룻ÁíÒ»¸ö½á¹ûÊÇ¿ÉÒÔÂú×ãѵÁ·»úÆ÷ѧϰģÐÍËã·¨µÄÊäÈë¸ñʽµÈÒªÇ󣬵«ÊÇѵÁ·³öÀ´µÄ»úÆ÷ѧϰģÐ͸ù±¾ÎÞ·¨Í¶ÈëÉú²ú£¬´øÀ´ÆÚÍûµÄЧ¹û¡£

ÃæÏò»úÆ÷ѧϰµÄÊý¾Ý×¼±¸½×¶Î£¬¿ÉÒÔ¿´³ÉÊÇÒ»¸öͨÓõÄÊý¾ÝETL¹ý³Ì£¬Õâ¸öETL¹ý³Ì³ýÁ˽øÐлù´¡µÄ¹æ·¶Êý¾Ý¸ñʽ¡¢È¥³ýÔëÉù¡¢¼¯³ÉÊý¾ÝµÈ£¬»¹°üº¬Ò»Ð©»úÆ÷Ñ§Ï°ÌØÓеÄÊý¾ÝETL¹ý³Ì£¬±ÈÈç£ºÌØÕ÷³éÈ¡£¨TF-IDF/Word2Vec£©¡¢½µÎ¬¡¢Ö÷³É·Ö·ÖÎö£¨PCA£©µÈ¡£¶øÇÒ£¬¶ÔÒ»Ð©ÃæÏò»úÆ÷ѧϰµÄÊý¾ÝETL¹ý³Ì£¬¿ÉÄܱ¾Éí¾ÍÐèÒªÉú³ÉÒ»¸ö»úÆ÷ѧϰģÐÍÀ´¶ÔÊäÈëÊý¾Ý¼¯½øÐи´ÔÓµÄÔ¤´¦Àí¡£

¿É¼û£¬Êý¾Ý×¼±¸½×¶ÎÖ÷ÒªÊǶÔÊý¾Ý½øÐÐETL£¬ÔÚ´Ë»ù´¡ÉÏ¿ÉÄÜÐèҪѡÔñºÏÊʵÄÊý¾Ý·Ö¸î²ßÂÔ£¬Éú³ÉÂú×ã»úÆ÷ѧϰģÐÍѵÁ·µÄѵÁ·¼¯£¬ºÍÓÃÓÚÆÀ¹ÀÄ£Ð͵IJâÊÔ¼¯¡£

ѵÁ·Ä£ÐÍ

ѵÁ·Ä£ÐÍÊǹ¹½¨»úÆ÷ѧϰӦÓø÷¸ö½×¶ÎÖÐ×îºËÐĵĽ׶Ρ£¸Ã½×¶Î£¬ÎÒÃÇÊ×ÏÈ»á¸ù¾Ý¸ø¶¨µÄÎÊÌâÓò£¬Ñ¡ÔñÒ»¸öÊʺϽâ¾ö¸ÃÁìÓòÎÊÌâµÄÄ£ÐÍ£¬È»ºó²Å»á¿¼ÂÇ»ùÓÚËùÑ¡ÔñÊý¾ÝµÄ¹æÄ£ºÍÌØµã£¬Ê¹ÓÃÌØ¶¨µÄËã·¨À´¼ÆËãÉú³É×îÖÕÎÒÃÇÐèÒªµÄÄ£ÐÍ¡£

Ä£ÐÍÊÇÊ²Ã´ÄØ£¿Í¨Ë׵ؽ²£¬Ä£ÐÍ¿ÉÒÔÀí½âΪһ¸öÊýѧº¯Êý£¬¸Ãº¯Êý×îÖÕÄܹ»Âú×ãµÄЧ¹ûÊÇ£¬¸ù¾ÝÎÒÃǸø¶¨µÄÊäÈëÊý¾Ý£¬¾ÍÄܵõ½»ò½üËÆµÃµ½ÎÒÃÇÈÏΪºÏÀíµÄ½á¹û¡£Ò»¸öÊýѧº¯Êý¾ßÓÐÒ»¸ö»ò¶à¸ö²ÎÊý£¬ÑµÁ·Ä£Ð͵Ľá¹û¾ÍÊÇÈ·¶¨ÕâЩ²ÎÊýµÄÖµ¡£º¯Êý¿ÉÄܼܺòµ¥£¬Ò²¿ÉÄܸܺ´ÔÓ¡£Êý¾Ý¼¯¿ÉÄÜÓÐÆäÌØµã£¬±ÈÈçÊý¾Ý¹æÄ£³¬´ó¡¢Êý¾ÝÔÚ´¦Àí¹ý³ÌÖо«¶ÈµÄËðʧµÈµÈ£¬ÎÒÃÇÒªÔÚËùÑ¡ÔñµÄÊý¾Ý¼¯ÉϽøÐÐѵÁ·Ñ§Ï°£¬Í¨³£²»Äܵõ½Ä¿±êº¯ÊýËùÓвÎÊýÀíÂÛÉϵľ«È·Öµ¡£×îÖÕµÄÄ¿±êÊÇ£¬Äܹ»ÔÚ¸ø¶¨µÄÊý¾Ý¼¯ÉϾßÓкܺõرíÏÖ£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ýʵ¼ÊÇé¿ö×öÌØÊâ´¦Àí¡£ÔÚʵ¼ÊÓ¦ÓÃÖУ¬ÍùÍùÌáÉý¾«¶È»áºÄ·Ñ´óÁ¿×ÊÔ´ºÍʱ¼ä£¬ÔÙ¶Ô±ÈÄ£ÐÍ´øÀ´Ð§¹û¿ÉÄÜ΢ºõÆä΢£¬ËùÒÔÉáÆúÒ»¶¨µÄ¾«¶ÈÒ²ÄܺܺõØÔÚʵ¼ÊÓ¦ÓÃÖÐʹÓøÃÄ£ÐÍ¡£

ѵÁ·Ä£ÐÍ£¬¾ÍÊÇ´Ó¸ø¶¨µÄÊý¾Ý¼¯Ñ§Ï°µÃµ½Êý¾ÝÖÐDZÔڵĹæÂÉ£¬Í¨¹ýÒÔº¯ÊýµÄÐÎʽ±íʾ£¬¾­¹ý¼ÆËã´¦ÀíÇóµÃÄ¿±êÊýѧº¯Êý¶ÔÓ¦µÄÈ«²¿²ÎÊý¡£»ùÓÚ×îÖյõ½µÄ²ÎÊýËù¹¹ÔìµÄº¯Êý£¬Äܹ»Ê¹º¯ÊýºÜºÃµØ½â¾ö¼ÙÉèµÄÎÊÌ⣨ѵÁ·Êý¾Ý¼¯£©£¬Ä£Äâ¸ø¶¨ÑµÁ·Êý¾Ý¼¯Í¬Ê±£¬Ó־߱¸ºÜºÃµÄ·º»¯ÄÜÁ¦£¬¼´²»»áÇ·ÄâºÏ»ò¹ýÄâºÏ¡£

ÆÀ¹ÀÄ£ÐÍ

ѵÁ·Ä£Ð͵õ½ÁËÒ»×é²ÎÊý£¬Äܹ»Ä£Äâ¸ø¶¨ÑµÁ·Êý¾Ý¼¯£¬µ«ÊÇÈç¹û¶ÔÓÚδÀ´Î´ÖªµÄÊý¾Ý£¬Ä£Ð͵ıíÏÖ»áÈçºÎ£¿ÎªÁ˽â¾öÕâ¸öÒÉÎÊ£¬ÎÒÃÇÐèÒª½«ÑµÁ·µÃµ½µÄÄ£ÐÍ£¬×÷ÓÃÔÚ¸ø¶¨µÄ²âÊÔÊý¾Ý¼¯ÉÏ£¬¸ù¾Ý½á¹û½øÐзÖÎö£¬È·¶¨Ä£Ð͵ľ«¶ÈÊÇ·ñÄܹ»Âú×ãÓ¦ÓÃÐèÇó¡£ÑµÁ·Êý¾Ý¼¯ºÍ²âÊÔÊý¾Ý¼¯Î¨Ò»²»Í¬µÄ¾ÍÊÇÊÇ·ñÒÑÖª±êÇ©£¬¶ø¶ÔÊý¾Ý±¾ÉíµÄ´¦ÀíÂß¼­»ù±¾¶¼ÊÇÏàͬµÄ¡£

ÁíÍ⣬ÆÀ¼ÛÄ£Ð͵ÄÓÅÁÓ£¬Ñé֤ģÐ͵ĺûµ£¬ÐèҪѡÔñÊʺÏÌØÄⶨÁìÓòµÄ¶ÈÁ¿·½·¨£¬´Ó¶ø¶ÔÄ£ÐͽøÐÐ¿Í¹ÛµÄÆÀ¼Û¡£±ÈÈ磬ÀëÏßÄ£ÐÍµÄÆÀ¹À£¬³£ÓÃ׼ȷÂÊ¡¢¾«È·ÂÊ-ÕÙ»ØÂÊ£¬¶øÔÚÏßÄ£ÐÍ¿ÉÄÜ»áʹÓÃCTR¡¢A/B²âÊԵȡ£

Ó¦ÓÃÄ£ÐÍ

Ò»¸ö¾­¹ýÑéÖ¤¿ÉÒÔͶÈëʹÓõÄÄ£ÐÍ£¬¿ÉÄÜ»áÌṩһ¸öÌØÊâµÄ½á¹ûÊý¾Ý¼¯£¬ÎÒÃǸù¾ÝÓ¦ÓõÄÐèÒª¶ÔÆä½øÐнøÒ»²½´¦Àí£¬±ÈÈçÍÆ¼öÄ£ÐÍÖеÄÎïÆ·¼¯ºÏºÜ´ó£¬¿ÉÒÔͨ¹ý¶ÔÍÆ¼öµÄÎïÆ·½á¹û¼¯½øÐÐÔÙ¼Ó¹¤´¦Àí£¬¶ÔÖ§³ÖµÄÓ¦ÓÃÌṩ¿ìËٵIJéѯ·þÎñ¡£Ä£ÐÍÒ²¿ÉÄÜÊÇÒ»¸ö²»¿É¶ÁµÄÄ£ÐÍ£¬ÕâÖÖÇé¿öÎÒÃÇ¿ÉÄÜÐèÒª»ùÓÚ¸ÃÄ£ÐÍ¿ª·¢Ò»¸ö·þÎñ£¬Ö±½Ó¶ÔÍâÌṩģÐÍ·þÎñ¡£

¾ßÌåµÄÈçºÎʹÓÃÄ£ÐÍ£¬ÕâÒªÒÀÀµÓÚʵ¼ÊÓ¦ÓõÄÌØµãºÍʹÓ÷½Ê½¡£

Spark ML Pipeline¼ò½é

Spark ML Pipeline»ùÓÚDataFrame¹¹½¨ÁËÒ»Ì×High-level API£¬ÎÒÃÇ¿ÉÒÔʹÓÃMLPipeline¹¹½¨»úÆ÷ѧϰӦÓã¬ËüÄܹ»½«Ò»¸ö»úÆ÷ѧϰӦÓõĶà¸ö´¦Àí¹ý³Ì×éÖ¯ÆðÀ´£¬Í¨¹ýÔÚ´úÂëʵÏֵļ¶±ð¹ÜÀíºÃÿһ¸ö´¦Àí²½ÖèÖ®¼äµÄÏȺóÔËÐйØÏµ£¬¼«´óµØ¼ò»¯ÁË¿ª·¢»úÆ÷ѧϰӦÓõÄÄѶȡ£

Spark ML PipelineʹÓÃDataFrame×÷Ϊ»úÆ÷ѧϰÊäÈëÊä³öÊý¾Ý¼¯µÄ³éÏó¡£DataFrameÀ´×ÔSpark SQL£¬±íʾ¶ÔÊý¾Ý¼¯µÄÒ»ÖÖÌØÊâ³éÏó£¬ËüÒ²ÊÇDataset£¨ËüÊÇSpark 1.6ÒýÈëµÄ±íʾ·Ö²¼Ê½Êý¾Ý¼¯µÄ³éÏó½Ó¿Ú£©£¬µ«ÊÇDataFrameͨ¹ýΪÊý¾Ý¼¯ÖÐÿÐÐÊý¾ÝµÄÿÁÐÖ¸¶¨ÁÐÃûµÄ·½Ê½À´×éÖ¯Dataset£¬ÀàËÆÓÚ¹ØÏµÊý¾Ý¿âÖÐµÄ±í£¬Í¬Ê±»¹Ôڵײ㴦Àí×öÁ˷dz£¶àµÄÓÅ»¯¡£DataFrame¿ÉÒÔ»ùÓÚ²»Í¬µÄÊý¾ÝÔ´½øÐй¹½¨£¬±ÈÈç½á¹¹»¯Îļþ¡¢Hive±í¡¢Êý¾Ý¿â¡¢RDDµÈ¡£»òÕ߸üÖ±°×Ò»µã±í´ïʲôÊÇDataFrame£¬¿ÉÒÔÈÏΪËüµÈ¼ÛÓÚDataset[Row]£¬±íʾDataFrameÊÇÒ»¸öRowÀàÐÍÊý¾Ý¶ÔÏóµÄDataset¡£

»úÆ÷ѧϰ¿ÉÒÔ±»Ó¦ÓÃÓÚ¸÷ÖÖÊý¾ÝÀàÐÍ£¬ÀýÈçÏòÁ¿¡¢Îı¾¡¢Í¼Æ¬¡¢½á¹¹»¯Êý¾Ý¡£Spark ML API²ÉÓÃDataFrameµÄÀíÓÉÊÇ£¬À´×ÔSpark SQLÖеÄDataFrame½Ó¿ÚµÄ³éÏ󣬿ÉÒÔÖ§³Ö·Ç³£¹ã·ºµÄÀàÐÍ£¬¶øÇÒ±í´ï·Ç³£Ö±¹Û£¬±ãÓÚÔÚSparkÖнøÐд¦Àí¡£ËùÒÔ˵£¬DataFrameÊÇSpark ML×î»ù´¡µÄ¶ÔÊý¾Ý¼¯µÄ³éÏó£¬ËùÓи÷ÖÖML Pipeline×é¼þ¶¼»á»ùÓÚDataFrame¹¹½¨¸ü¼Ó·á¸»¡¢¸´ÔÓµÄÊý¾Ý´¦ÀíÂß¼­¡£

Spark ML PipelineÖ÷Òª°üº¬2¸öºËÐĵÄÊý¾Ý´¦Àí×é¼þ£ºTransformer¡¢Estimator£¬ÆäÖÐËüÃǶ¼ÊÇPipelineÖÐPipelineStageµÄ×ÓÀ࣬ÁíÍâһЩ³éÏó£¬ÈçModel¡¢Predictor¡¢Classifier¡¢RegressorµÈ¶¼ÊÇ»ùÓÚÕâÁ½¸öºËÐÄ×é¼þÑÜÉú³öÀ´£¬±ÈÈ磬ModelÊÇÒ»¸öTransformer£¬PredictorÊÇÒ»¸öEstimator£¬ËüÃǵĹØÏµÈçÏÂÀàͼËùʾ£º

»ùÓÚÉÏͼ£¬ÎÒÃǶÔËüÃǽøÐÐÏêϸµÄ˵Ã÷£¬ÈçÏÂËùʾ£º

Transformer

Transformer¶Ô»úÆ÷ѧϰÖÐÒª´¦ÀíµÄÊý¾Ý¼¯½øÐÐת»»²Ù×÷£¬ÀàËÆÓÚSparkÖжÔRDD½øÐеÄTransformation²Ù×÷£¨¶ÔÒ»¸öÊäÈëRDDת»»´¦ÀíºóÉú³ÉÒ»¸öеÄRDD£©£¬TransformerÊǶÔDataFrame½øÐÐת»»¡£ÎÒÃÇ¿ÉÒÔ´ÓTransformerÀàµÄ´úÂë³éÏó¶¨Ò壬À´¿´Ò»ÏÂËü¶¨ÒåµÄ¼¸¸ö²ÎÊý²»Í¬µÄtransform·½·¨£¬ÈçÏÂËùʾ£º

package org.apache.spark.ml

@DeveloperApi

abstract class Transformer extends PipelineStage {

@Since("2.0.0")

@varargs

def transform(

dataset: Dataset[_],

firstParamPair: ParamPair[_],

otherParamPairs: ParamPair[_]*): DataFrame = {

val map = new ParamMap()

.put(firstParamPair)

.put(otherParamPairs: _*)

transform(dataset, map)

}
@Since("2.0.0")

def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame = {

this.copy(paramMap).transform(dataset)

}

@Since("2.0.0")

def transform(dataset: Dataset[_]): DataFrame

override def copy(extra: ParamMap): Transformer

}

ÉÏÃæ¶ÔÓ¦µÄ¶à¸ötransform·½·¨£¬¶¼»áÊäÈëÒ»¸öDataset[_]£¬¾­¹ýת»»´¦ÀíºóÊä³öÒ»¸öDataFrame£¬Êµ¼ÊÉÏÄã¿ÉÒÔͨ¹ý²é¿´DataFrameµÄ¶¨Ò壬ÆäʵËü¾ÍÊÇÒ»¸öDataset£¬ÈçÏÂËùʾ£º

type DataFrame = Dataset[Row]

TransformerÖ÷Òª³éÏóÁËÁ½Àà²Ù×÷£ºÒ»ÀàÊǶÔÌØÕ÷½øÐÐת»»£¬Ëü¿ÉÄÜ»á´ÓÒ»¸öDataFrameÖжÁȡijÁÐÊý¾Ý£¬È»ºóͨ¹ýmapËã·¨½«¸ÃÁÐÊý¾Ýת»»ÎªÐµÄÁÐÊý¾Ý£¬±ÈÈ磬ÊäÈëÒ»¸öDataFrame£¬½«ÊäÈëµÄԭʼһÁÐÎı¾Êý¾Ý£¬×ª»»³ÉÒ»ÁÐÌØÕ÷ÏòÁ¿£¬×îºóÊä³öµÄÊý¾Ý»¹ÊÇÒ»¸öDataFrame£¬¶Ô¸ÃÁÐÊý¾Ýת»»´¦Àíºó»¹Ó³Éäµ½ÊäÈëʱµÄÁÐÃû£¨Í¨¹ý¸ÃÁÐÃû¿ÉÒÔ²Ù×÷¸ÃÁÐÊý¾Ý£©¡£

ÏÂÃæ£¬ÎÒÃÇ¿´Ò»Ï£¬Spark MLLibÖÐʵÏÖµÄTransformerÀà¼Ì³Ð¹ØÏµ£¬ÈçÏÂÀàͼËùʾ£º

Estimator

EstimatorÓÃÀ´ÑµÁ·Ä£ÐÍ£¬ËüµÄÊäÈëÊÇÒ»¸öDataFrame£¬Êä³öÊÇÒ»¸öModel£¬ModelÊÇSpark MLÖжԻúÆ÷ѧϰģÐ͵ijéÏóºÍ¶¨Ò壬ModelÆäʵÊÇÒ»¸öTransformer¡£Ò»¸ö»úÆ÷ѧϰËã·¨ÊÇ»ùÓÚÒ»¸öÊý¾Ý¼¯½øÐÐѵÁ·µÄ£¬Estimator¶Ô»ùÓÚ¸ÃѵÁ·¼¯µÄ»úÆ÷ѧϰËã·¨½øÐÐÁ˳éÏó¡£ËùÒÔËüµÄÊäÈëÊÇÒ»¸öÊý¾Ý¼¯DataFrame£¬¾­¹ýѵÁ·×îÖյõ½Ò»¸öÄ£ÐÍModel¡£

EstimatorÀඨÁËfit·½·¨À´ÊµÏÖ¶ÔÄ£Ð͵ÄѵÁ·£¬ÀàµÄ´úÂëÈçÏÂËùʾ£º

package org.apache.spark.ml

@DeveloperApi

abstract class Estimator[M <: Model[M]] extends PipelineStage {
@Since("2.0.0")

@varargs

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M = {

val map = new ParamMap()

.put(firstParamPair)

.put(otherParamPairs: _*)

fit(dataset, map)

}

@Since("2.0.0")

def fit(dataset: Dataset[_], paramMap: ParamMap): M = {

copy(paramMap).fit(dataset)

}

@Since("2.0.0")

def fit(dataset: Dataset[_]): M

@Since("2.0.0")

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {

paramMaps.map(fit(dataset, _))

}

override def copy(extra: ParamMap): Estimator[M]

}

ͨ¹ýÉÏÃæ´úÂë¿ÉÒÔ¿´µ½£¬Estimatorµ÷ÓÃfit·½·¨ÒԺ󣬵õ½Ò»¸öModel£¬Ò²¾ÍÊÇTransformer£¬Ò»¸öTransformerÓÖ¿ÉÒÔ¶ÔÊäÈëµÄDataFrameÖ´Ðб任²Ù×÷¡£

ÏÂÃæ£¬ÎÒÃÇ¿´Ò»Ï£¬Spark MLLibÖÐʵÏÖµÄEstimatorÀ࣬ÈçÏÂÀàͼËùʾ£º

PipelineStage

PipelineStageÊǹ¹½¨Ò»¸öPipelineµÄ»ù±¾ÔªËØ£¬Ëü»òÕßÊÇÒ»¸öTransformer£¬»òÕßÊÇÒ»¸öEstimator¡£

Pipeline

Pipelineʵ¼ÊÉÏÊÇEstimatorµÄʵÏÖÀ࣬һ¸öPipelineÊÇ»ùÓÚ¶à¸öPipelineStage¹¹½¨¶ø³ÉµÄDAGͼ£¬¼òµ¥Ò»µã¿ÉÒÔʹÓÃÏßÐÔµÄPipelineStageÐòÁÐÀ´Íê³É»úÆ÷ѧϰӦÓõĹ¹½¨£¬µ±È»Ò²¿ÉÒÔ¹¹½¨Ïà¶Ô¸´ÔÓһЩµÄPipelineStage DAGͼ¡£

µ÷ÓÃPipelineµÄfit·½·¨£¬»áÉú³ÉÒ»¸öPipelineModel£¬ËüÊÇModelµÄ×ÓÀ࣬ËùÒÔÒ²¾ÍÊÇÒ»¸öTransformer¡£ÔÚѵÁ·¹ý³ÌÖУ¬PipelineÖеĶà¸öPipelineStageÊÇÔËÐÐÔÚѵÁ·Êý¾Ý¼¯Éϵģ¬×îºóÉú³ÉÁËÒ»¸öModel¡£ÎÒÃÇÒ²¿ÉÒÔ¿´µ½£¬ÑµÁ·Ä£Ð͹ý³ÌÖУ¬´¦ÓÚ×îºóÃæµÄPipelineStageÓ¦¸ÃÊÇÒ»¸ö»ò¶à¸öÁ¬ÐøµÄEstimator£¬ÒòΪֻÓÐEstimatorÔËÐкó²Å»áÉú³ÉModel¡£

½Ó×Å£¬¾ÍÊÇPipelineÖд¦ÓÚѵÁ·½×¶ÎºÍ²âÊÔ½×¶ÎÖ®¼ä£¬±È½ÏÖØÒªµÄÒ»¸öPipelineStageÁË£ºPipelineModel£¬ËüÆðÁ˳ÐÉÏÆôϵÄ×÷Ó㬵÷ÓÃPipelineModelµÄtransform·½·¨£¬°´ÕÕºÍѵÁ·½×¶ÎÀàËÆµÄÊý¾Ý´¦Àí£¨×ª»»£©Á÷³Ì£¬¾­¹ýÏàͬµÄ¸÷¸öPipelineState¶ÔÊý¾Ý¼¯½øÐб任£¬×îºó½«ÑµÁ·½×¶ÎÉú³ÉÄ£ÐÍ×÷ÓÃÔÚ²âÊÔÊý¾Ý¼¯ÉÏ£¬´Ó¶øÊµÏÖ×îÖÕµÄÔ¤²âÄ¿µÄ¡£

»ùÓÚSpark ML Pipeline£¬¿ÉÒÔºÜÈÝÒ׵ع¹½¨ÕâÖÖÏßÐÔPipeline£¬ÎÒÃÇ¿ÉÒÔ¿´µ½Ò»¸ö»úÆ÷ѧϰӦÓù¹½¨¹ý³ÌÖУ¨×¼±¸Êý¾Ý¡¢ÑµÁ·Ä£ÐÍ¡¢ÆÀ¹ÀÄ£ÐÍ£©µÄ¸÷¸ö´¦Àí¹ý³Ì£¬¿ÉÒÔͨ¹ýÒ»¸öͬһ¸öPipeline API½øÐÐÏßÐÔ×éºÏ£¬·Ç³£Ö±¹Û¡¢ÈÝÒ×¹ÜÀí¡£

Spark ML Pipelineʵ¼ù

ÕâÀÎÒÃÇÖ±½Ó¸ù¾ÝSpark ML Pipeline¹Ù·½Îĵµ¸ø³öµÄʾÀý¡ª¡ª»ùÓÚLogistic»Ø¹éʵÏÖÎı¾·ÖÀ࣬À´Ïêϸ˵Ã÷ͨ¹ýSpark ML Pipeline API¹¹½¨»úÆ÷ѧϰӦÓã¬ÒÔ¼°¾ßÌåÈçºÎʹÓÃËü¡£¹ÙÍø¸ø³öµÄÕâ¸öÀý×ӷdz£Ö±¹Û£¬ºóÐøÓйØÔÚʵ¼ÊÒµÎñ³¡¾°ÖеÄʵ¼ù£¬ÎÒÃǻᵥ¶ÀÔÚÁíһƪÎÄÕÂÖнøÐзÖÏí¡£

³¡¾°ÃèÊö

Õâ¸öʾÀý£º

ÔÚѵÁ·½×¶Î£¬ÐèÒª¸ù¾Ý¸ø¶¨µÄѵÁ·Îı¾ÐÐÊý¾Ý¼¯£¬½«Ã¿¸öµ¥´Ê·ÖÀë³öÀ´£»È»ºó¸ù¾ÝµÃµ½µÄµ¥´Ê£¬Éú³ÉÌØÕ÷ÏòÁ¿£»×îºó»ùÓÚÌØÕ÷ÏòÁ¿£¬Ñ¡ÔñLogistic»Ø¹éËã·¨£¬½øÐÐѵÁ·Ñ§Ï°Éú³ÉLogisticÄ£ÐÍ¡£

ÔÚ²âÊԽ׶Σ¬ÐèÒª°´ÕÕÈçÉÏÏàͬµÄ·½Ê½È¥´¦Àí¸ø¶¨µÄ²âÊÔÊý¾Ý¼¯£¬»ùÓÚѵÁ·½×¶ÎµÃµ½µÄÄ£ÐÍ£¬½øÐÐÔ¤²â¡£

ѵÁ·½×¶Î

ѵÁ·½×¶Î¸÷¸öÊý¾Ý´¦ÀíµÄ²½Ö裬ÈçÏÂͼËùʾ£º

ÉÏͼÖУ¬À¶É«·½¿ò±íʾµÄ¶¼ÊÇTransformer£¬ºìÉ«·½¿ò±íʾEstimator¡£

ÔÚѵÁ·½×¶Î£¬Í¨¹ýPipelineÔËÐÐʱ£¬TokenizerºÍHashingTF¶¼»á½«ÊäÈëµÄDataFrame½øÐÐת»»£¬Éú³ÉеÄDataFrame£»LogisticRegressionÊÇÒ»¸öEstimator£¬µ±µ÷ÓÃLogisticRegressionµÄfit·½·¨Ê±£¬»áÉú³ÉÒ»¸öLogisticRegressionModel£¬ËüÊÇÒ»¸öTransformer£¬¿ÉÒÔÔÚ²âÊÔ½×¶ÎʹÓá£

²âÊÔ½×¶Î

ÉÏÃæµÄ¹ý³Ì¶¼ÊÇÔÚµ÷ÓÃPipelineµÄfit·½·¨Ê±½øÐд¦ÀíµÄ£¬×îºó»áÉú³ÉÒ»¸öPipelineModel£¬ËüÊÇÒ»¸öTransformer£¬»á±»ÓÃÓÚ²âÊԽ׶Ρ£²âÊÔ½×¶ÎÔËÐÐʼÓÚ¸ÃPipelineModel£¬¾ßÌå´¦ÀíÁ÷³ÌÈçÏÂͼËùʾ£º

PipelineModel×÷Ϊһ¸öTransformer£¬Ê×ÏÈÒ²»á¶ÔÊäÈëµÄ²âÊÔÊý¾Ý¼¯Ö´ÐÐת»»²Ù×÷£¬¶Ô±ÈѵÁ·½×¶ÎµÄ´¦ÀíÁ÷³Ì£¬¿ÉÒÔ¿´µ½£¬ÔÚѵÁ·½×¶ÎµÄEstimator¶¼±ä³ÉÁËTransformer£¬ÒòΪÎÒÃÇÔÚ²âÊԽ׶εÄÊä³ö¾ÍÊÇÒ»¸ö½á¹û¼¯DataFrame£¬¶ø²»ÐèҪѵÁ·½×¶ÎÉú³ÉModelÁË¡£

ʾÀý´úÂë

Ê×ÏÈ£¬×¼±¸Ä£ÄâµÄѵÁ·Êý¾Ý¼¯£¬´úÂëÈçÏÂËùʾ£º

val training = spark.createDataFrame(Seq(

(0L, "a b c d e spark", 1.0),

(1L, "b d", 0.0),

(2L, "spark f g h", 1.0),

(3L, "hadoop mapreduce", 0.0)

)).toDF("id", "text", "label")

Ä£ÄâµÄѵÁ·Êý¾Ý¼¯ÖУ¬ÓÐ3¸ö×ֶΣ¬·Ö±ðΪID¡¢Îı¾ÄÚÈÝ¡¢±êÇ©¡£ÔÚʵ¼ÊÓ¦ÓÃÖУ¬ÎÒÃÇÓ¦¸ÃÊÇ´ÓÖ¸¶¨µÄÎļþϵͳÖÐÈ¥¶ÁÈ¡Êý¾Ý£¬ÈçHDFS£¬Ö»ÐèÒª¸ù¾ÝÐèÒªÐ޸ļ´¿É¡£

Æä´Î£¬´´½¨Ò»¸öPipeline¶ÔÏó£¬Í¬Ê±ÉèÖöÔÓ¦µÄ¶à¸ö˳ÐòÖ´ÐеÄPipelineStage£¬´úÂëÈçÏÂËùʾ£º

val tokenizer = new Tokenizer()

.setInputCol("text")

.setOutputCol("words")

val hashingTF = new HashingTF()

.setNumFeatures(1000)

.setInputCol(tokenizer.getOutputCol)

.setOutputCol("features")

val lr = new LogisticRegression()

.setMaxIter(10)

.setRegParam(0.001)

val pipeline = new Pipeline()

.setStages(Array(tokenizer, hashingTF, lr)) // °üº¬3¸öPipelineStage

½Ó×Å£¬¾Í¿ÉÒÔ»ùÓÚѵÁ·Êý¾Ý¼¯½øÐÐѵÁ·²Ù×÷ÁË£¬´úÂëÈçÏÂËùʾ£º

val model = pipeline.fit(training)

µ÷ÓÃPipelineµÄfit·½·¨Éú³ÉÁËÒ»¸öModel£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ýʵ¼ÊÇé¿ö£¬Ñ¡ÔñÊÇ·ñ½«Éú³ÉÄ£ÐͽøÐб£´æ£¨ÒÔ±ãºóÐøÖØÐ¼ÓÔØÊ¹ÓÃÄ£ÐÍ£©£¬ÈçÏÂËùʾ£º

// Now we can optionally save the fitted pipeline to disk

model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk

pipeline.write.overwrite().save("/tmp/unfit-lr-model")

È»ºó£¬´´½¨Ò»¸öÄ£Äâ²âÊÔÊý¾Ý¼¯£¬ÓÃÀ´²âÊÔÇ°ÃæÑµÁ·Éú³ÉµÄÄ£ÐÍ£¬´úÂëÈçÏÂËùʾ£º

val test = spark.createDataFrame(Seq(

(4L, "spark i j k"),

(5L, "l m n"),

(6L, "spark hadoop spark"),

(7L, "apache hadoop")

)).toDF("id", "text")

²âÊÔÊý¾Ý¼¯ÖУ¬±êÇ©£¨Label£©¶¼ÊÇδ֪µÄ£¬Í¨¹ý½«Ç°ÃæÉú³ÉµÄÄ£ÐÍ×÷ÓÃÔڸòâÊÔÊý¾Ý¼¯ÉÏ£¬¾Í»áÔ¤²âÉú³É¶ÔÓ¦µÄ±êÇ©Êý¾Ý£¬´úÂëÈçÏÂËùʾ£º

// Make predictions on test documents.

model.transform(test)

.select("id", "text", "probability", "prediction")

.collect()

.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>

println(s"($id, $text) --> prob=$prob, prediction=$prediction")

}

ÕâÑù¾ÍÄܹ»»ùÓÚÔ¤²âµÄ½á¹û£¬ÑéÖ¤·ÖÀàÄ£Ð͵Ä׼ȷÐÔ¡£

×îºó£¬¿ÉÒÔ½«Éú³ÉÄ£ÐÍÓÃÓÚʵ¼ÊÓ¦Óó¡¾°ÖУ¬Íê³ÉÐèÒªµÄ¹¦ÄÜ¡£

Óйظü¶àʹÓÃSpark ML PipelineµÄÀý×Ó£¬¿ÉÒԲο¼Spark·¢ÐаüÖУ¬examplesÀïÃæsrc/main/scala/mlÏÂÃæµÄºÜ¶àʾÀý´úÂ룬·Ç³£ºÃµÄѧϰ×ÊÔ´¡£

   
5172 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚͼ¾í»ýÍøÂçµÄͼÉî¶Èѧϰ
×Ô¶¯¼ÝÊ»ÖеÄ3DÄ¿±ê¼ì²â
¹¤Òµ»úÆ÷ÈË¿ØÖÆÏµÍ³¼Ü¹¹½éÉÜ
ÏîĿʵս£ºÈçºÎ¹¹½¨ÖªÊ¶Í¼Æ×
 
Ïà¹ØÎĵµ

5GÈ˹¤ÖÇÄÜÎïÁªÍøµÄµäÐÍÓ¦ÓÃ
Éî¶ÈѧϰÔÚ×Ô¶¯¼ÝÊ»ÖеÄÓ¦ÓÃ
ͼÉñ¾­ÍøÂçÔÚ½»²æÑ§¿ÆÁìÓòµÄÓ¦ÓÃÑо¿
ÎÞÈË»úϵͳԭÀí
Ïà¹Ø¿Î³Ì

È˹¤ÖÇÄÜ¡¢»úÆ÷ѧϰ&TensorFlow
»úÆ÷ÈËÈí¼þ¿ª·¢¼¼Êõ
È˹¤ÖÇÄÜ£¬»úÆ÷ѧϰºÍÉî¶Èѧϰ
ͼÏñ´¦ÀíËã·¨·½·¨Óëʵ¼ù