ʹÓûúÆ÷ѧϰµÄ·½·¨¿ÉÒÔ½â¾öÔ½À´Ô½¶àµÄʵ¼ÊÎÊÌ⣬ËüÔÚÏÖʵÊÀ½çÖеÄÓ¦ÓÃÔ½À´Ô½¹ã·º£¬±ÈÈçÖÇÄÜ·ç¿Ø¡¢ÆÛÕ©¼ì²â¡¢¸öÐÔ»¯ÍƼö¡¢»úÆ÷·Ò롢ģʽʶ±ð¡¢ÖÇÄÜ¿ØÖÆ£¬µÈµÈ¡£
»úÆ÷ѧϰ·ÖÀà
ÎÒÃǶ¼ÖªµÀ£¬»úÆ÷ѧϰ¿ÉÒÔ·ÖΪÈý´óÀࣺ¼à¶½Ñ§Ï°£¨Supervised Learning£©¡¢Î޼ලѧϰ£¨Unsupervised
Learning£©ºÍÇ¿»¯Ñ§Ï°£¨Reinforcement Learning£©£¬ÏÂÃæ¼òµ¥½éÉÜÒ»ÏÂËüÃǺ¬Ò壺
¼à¶½Ñ§Ï°
¼à¶½Ñ§Ï°ÊǸù¾Ý¸ø¶¨µÄ±êÇ©£¨Label£©ÒÑÖªµÄѵÁ·Êý¾Ý¼¯£¬Í¨¹ýÑ¡¶¨µÄËã·¨ÔÚ¸ÃѵÁ·Êý¾Ý¼¯ÉϽøÐÐѵÁ·Ñ§Ï°£¬×îºóµÃµ½Ò»¸ö¿ÉÒÔÃèÊö¸ÃÊý¾Ý¼¯¹æÂɵÄÔ¤²âº¯Êý£¬Ò²¾ÍÊÇÎÒÃÇËù˵µÄÄ£ÐÍ¡£ÓÐÁËÄ£ÐÍ£¬¶ÔÓÚδ֪±êÇ©µÄÊäÈëÊý¾Ý£¬¿ÉÒÔͨ¹ý¸ÃÔ¤²âº¯ÊýÔ¤²â³öËüµÄ±êÇ©¡£µäÐ͵ļලѧϰ·½·¨£¬Èç·ÖÀà¡¢»Ø¹éµÈ¡£
Î޼ලѧϰ
Î޼ලѧϰÊǸù¾Ý¸ø¶¨µÄ±êÇ©£¨Label£©Î´ÖªµÄѵÁ·Êý¾Ý¼¯£¬Í¨¹ýѵÁ·Ñ§Ï°´ÓѵÁ·Êý¾Ý¼¯Öз¢ÏÖÒþ²ØµÄģʽ»ò½á¹¹¡£µäÐ͵ÄÎ޼ලѧϰ·½·¨£¬Èç¾ÛÀà·ÖÎö¡£
Ç¿»¯Ñ§Ï°
Ç¿»¯Ñ§Ï°ÊÇÈ˹¤ÖÇÄÜÖеIJßÂÔѧϰµÄÒ»ÖÖ£¬´Ó¶¯Îïѧϰ¡¢²ÎÊýÈŶ¯×ÔÊÊÓ¦¿ØÖÆÀíÂÛ·¢Õ¹¶øÀ´¡£ÕâÖÖѧϰ·½·¨ÊÇ´Ó»·¾³×´Ì¬µ½¶¯×÷Ó³ÉäµÄѧϰ·½·¨£¬Ê¹¶¯×÷´Ó»·¾³ÖлñµÃµÄÀۼƽ±ÉÍÖµ×î´ó¡£Í¨¹ý²»¶ÏµØÊÔ´íÀ´Ñ§Ï°£¬´Ó¶ø·¢ÏÖ×îÓÅÐÐΪ²ßÂÔ¡£³£ÓõÄÇ¿»¯Ñ§Ï°Ëã·¨£¬ÈçQѧϰËã·¨¡¢SARSAËã·¨¡£
»úÆ÷ѧϰ»ù±¾¹ý³Ì
»ùÓÚÉÏÃæ½éÉܵĻúÆ÷ѧϰµÄ·ÖÀà·½·¨£¬ÎÒÃÇ¿ÉÒÔ´Ó¸ü¹ãÒåµÄ·¶Î§À´×ܽá³ö£¬¹¹½¨Ò»¸öÍêÕûµÄ»úÆ÷ѧϰӦÓóÌÐò£¬Í¨³£Ó¦¸Ã°üº¬µÄ»ù±¾¹ý³Ì¡£ÏÂÃæ£¬¸ø³öÁ˹¹½¨»úÆ÷ѧϰӦÓÃÒ»°ãÐèÒª¾ÀúµÄ4¸öÖ÷Òª½×¶Î£º
Êý¾Ý×¼±¸
Êý¾Ý×¼±¸½×¶Î£¬Í¨³£»áÓÐÒ»¸ö»òÕß¶à¸öÒѾ´æÔÚµÄÊý¾Ý¼¯£¬Êý¾Ý¼¯µÄ״̬¿ÉÄܾàÀëÉú²ú¸ÃÊý¾ÝµÄÔ´Í··Ç³£½ü£¬Êý¾Ý¸ñʽ¶àÖÖ¶àÑù£¬²»¹æ·¶¡¢È±Ê§Öµ¡¢´íÎóÖµËæ´¦¿É¼û¡£»¹ÓпÉÄÜ£¬Êý¾Ý¼¯°üº¬¶à¸ö²»Í¬ÀàÐ͵Ä×ÓÊý¾Ý¼¯£¬µ¥¶ÀÄóöÿһ¸ö×ÓÊý¾Ý¼¯¶Ô»úÆ÷ѧϰģÐÍѵÁ·£¬¶¼Ã»ÓÐʲôÒâÒ壬³ý·ÇÎÒÃǾÍÐèÒª¶ÔÕâЩ×ÓÊý¾Ý¼¯½øÐÐһЩ´¦Àí¡£
¶ÔÓÚÕâÑùµÄÊäÈëÊý¾Ý¼¯£¬Èç¹û²»¼Ó´¦Àí¶øÖ±½ÓÄÃÀ´½øÐлúÆ÷ѧϰģÐ͵ÄѵÁ·£¬Ò»¸ö½á¹ûÊǸù±¾ÎÞ·¨Ê¹ÓÃÕâÑùÊý¾Ý¼¯×÷ΪÉú²ú»úÆ÷ѧϰģÐ͵ÄÊäÈ룻ÁíÒ»¸ö½á¹ûÊÇ¿ÉÒÔÂú×ãѵÁ·»úÆ÷ѧϰģÐÍËã·¨µÄÊäÈë¸ñʽµÈÒªÇ󣬵«ÊÇѵÁ·³öÀ´µÄ»úÆ÷ѧϰģÐ͸ù±¾ÎÞ·¨Í¶ÈëÉú²ú£¬´øÀ´ÆÚÍûµÄЧ¹û¡£
ÃæÏò»úÆ÷ѧϰµÄÊý¾Ý×¼±¸½×¶Î£¬¿ÉÒÔ¿´³ÉÊÇÒ»¸öͨÓõÄÊý¾ÝETL¹ý³Ì£¬Õâ¸öETL¹ý³Ì³ýÁ˽øÐлù´¡µÄ¹æ·¶Êý¾Ý¸ñʽ¡¢È¥³ýÔëÉù¡¢¼¯³ÉÊý¾ÝµÈ£¬»¹°üº¬Ò»Ð©»úÆ÷Ñ§Ï°ÌØÓеÄÊý¾ÝETL¹ý³Ì£¬±ÈÈç£ºÌØÕ÷³éÈ¡£¨TF-IDF/Word2Vec£©¡¢½µÎ¬¡¢Ö÷³É·Ö·ÖÎö£¨PCA£©µÈ¡£¶øÇÒ£¬¶ÔÒ»Ð©ÃæÏò»úÆ÷ѧϰµÄÊý¾ÝETL¹ý³Ì£¬¿ÉÄܱ¾Éí¾ÍÐèÒªÉú³ÉÒ»¸ö»úÆ÷ѧϰģÐÍÀ´¶ÔÊäÈëÊý¾Ý¼¯½øÐи´ÔÓµÄÔ¤´¦Àí¡£
¿É¼û£¬Êý¾Ý×¼±¸½×¶ÎÖ÷ÒªÊǶÔÊý¾Ý½øÐÐETL£¬ÔÚ´Ë»ù´¡ÉÏ¿ÉÄÜÐèҪѡÔñºÏÊʵÄÊý¾Ý·Ö¸î²ßÂÔ£¬Éú³ÉÂú×ã»úÆ÷ѧϰģÐÍѵÁ·µÄѵÁ·¼¯£¬ºÍÓÃÓÚÆÀ¹ÀÄ£Ð͵IJâÊÔ¼¯¡£
ѵÁ·Ä£ÐÍ
ѵÁ·Ä£ÐÍÊǹ¹½¨»úÆ÷ѧϰӦÓø÷¸ö½×¶ÎÖÐ×îºËÐĵĽ׶Ρ£¸Ã½×¶Î£¬ÎÒÃÇÊ×ÏÈ»á¸ù¾Ý¸ø¶¨µÄÎÊÌâÓò£¬Ñ¡ÔñÒ»¸öÊʺϽâ¾ö¸ÃÁìÓòÎÊÌâµÄÄ£ÐÍ£¬È»ºó²Å»á¿¼ÂÇ»ùÓÚËùÑ¡ÔñÊý¾ÝµÄ¹æÄ£ºÍÌØµã£¬Ê¹ÓÃÌØ¶¨µÄËã·¨À´¼ÆËãÉú³É×îÖÕÎÒÃÇÐèÒªµÄÄ£ÐÍ¡£
Ä£ÐÍÊÇÊ²Ã´ÄØ£¿Í¨Ë׵ؽ²£¬Ä£ÐÍ¿ÉÒÔÀí½âΪһ¸öÊýѧº¯Êý£¬¸Ãº¯Êý×îÖÕÄܹ»Âú×ãµÄЧ¹ûÊÇ£¬¸ù¾ÝÎÒÃǸø¶¨µÄÊäÈëÊý¾Ý£¬¾ÍÄܵõ½»ò½üËÆµÃµ½ÎÒÃÇÈÏΪºÏÀíµÄ½á¹û¡£Ò»¸öÊýѧº¯Êý¾ßÓÐÒ»¸ö»ò¶à¸ö²ÎÊý£¬ÑµÁ·Ä£Ð͵Ľá¹û¾ÍÊÇÈ·¶¨ÕâЩ²ÎÊýµÄÖµ¡£º¯Êý¿ÉÄܼܺòµ¥£¬Ò²¿ÉÄܸܺ´ÔÓ¡£Êý¾Ý¼¯¿ÉÄÜÓÐÆäÌØµã£¬±ÈÈçÊý¾Ý¹æÄ£³¬´ó¡¢Êý¾ÝÔÚ´¦Àí¹ý³ÌÖо«¶ÈµÄËðʧµÈµÈ£¬ÎÒÃÇÒªÔÚËùÑ¡ÔñµÄÊý¾Ý¼¯ÉϽøÐÐѵÁ·Ñ§Ï°£¬Í¨³£²»Äܵõ½Ä¿±êº¯ÊýËùÓвÎÊýÀíÂÛÉϵľ«È·Öµ¡£×îÖÕµÄÄ¿±êÊÇ£¬Äܹ»ÔÚ¸ø¶¨µÄÊý¾Ý¼¯ÉϾßÓкܺõرíÏÖ£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ýʵ¼ÊÇé¿ö×öÌØÊâ´¦Àí¡£ÔÚʵ¼ÊÓ¦ÓÃÖУ¬ÍùÍùÌáÉý¾«¶È»áºÄ·Ñ´óÁ¿×ÊÔ´ºÍʱ¼ä£¬ÔÙ¶Ô±ÈÄ£ÐÍ´øÀ´Ð§¹û¿ÉÄÜ΢ºõÆä΢£¬ËùÒÔÉáÆúÒ»¶¨µÄ¾«¶ÈÒ²ÄܺܺõØÔÚʵ¼ÊÓ¦ÓÃÖÐʹÓøÃÄ£ÐÍ¡£
ѵÁ·Ä£ÐÍ£¬¾ÍÊÇ´Ó¸ø¶¨µÄÊý¾Ý¼¯Ñ§Ï°µÃµ½Êý¾ÝÖÐDZÔڵĹæÂÉ£¬Í¨¹ýÒÔº¯ÊýµÄÐÎʽ±íʾ£¬¾¹ý¼ÆËã´¦ÀíÇóµÃÄ¿±êÊýѧº¯Êý¶ÔÓ¦µÄÈ«²¿²ÎÊý¡£»ùÓÚ×îÖյõ½µÄ²ÎÊýËù¹¹ÔìµÄº¯Êý£¬Äܹ»Ê¹º¯ÊýºÜºÃµØ½â¾ö¼ÙÉèµÄÎÊÌ⣨ѵÁ·Êý¾Ý¼¯£©£¬Ä£Äâ¸ø¶¨ÑµÁ·Êý¾Ý¼¯Í¬Ê±£¬Ó־߱¸ºÜºÃµÄ·º»¯ÄÜÁ¦£¬¼´²»»áÇ·ÄâºÏ»ò¹ýÄâºÏ¡£
ÆÀ¹ÀÄ£ÐÍ
ѵÁ·Ä£Ð͵õ½ÁËÒ»×é²ÎÊý£¬Äܹ»Ä£Äâ¸ø¶¨ÑµÁ·Êý¾Ý¼¯£¬µ«ÊÇÈç¹û¶ÔÓÚδÀ´Î´ÖªµÄÊý¾Ý£¬Ä£Ð͵ıíÏÖ»áÈçºÎ£¿ÎªÁ˽â¾öÕâ¸öÒÉÎÊ£¬ÎÒÃÇÐèÒª½«ÑµÁ·µÃµ½µÄÄ£ÐÍ£¬×÷ÓÃÔÚ¸ø¶¨µÄ²âÊÔÊý¾Ý¼¯ÉÏ£¬¸ù¾Ý½á¹û½øÐзÖÎö£¬È·¶¨Ä£Ð͵ľ«¶ÈÊÇ·ñÄܹ»Âú×ãÓ¦ÓÃÐèÇó¡£ÑµÁ·Êý¾Ý¼¯ºÍ²âÊÔÊý¾Ý¼¯Î¨Ò»²»Í¬µÄ¾ÍÊÇÊÇ·ñÒÑÖª±êÇ©£¬¶ø¶ÔÊý¾Ý±¾ÉíµÄ´¦ÀíÂß¼»ù±¾¶¼ÊÇÏàͬµÄ¡£
ÁíÍ⣬ÆÀ¼ÛÄ£Ð͵ÄÓÅÁÓ£¬Ñé֤ģÐ͵ĺûµ£¬ÐèҪѡÔñÊʺÏÌØÄⶨÁìÓòµÄ¶ÈÁ¿·½·¨£¬´Ó¶ø¶ÔÄ£ÐͽøÐÐ¿Í¹ÛµÄÆÀ¼Û¡£±ÈÈ磬ÀëÏßÄ£ÐÍµÄÆÀ¹À£¬³£ÓÃ׼ȷÂÊ¡¢¾«È·ÂÊ-ÕÙ»ØÂÊ£¬¶øÔÚÏßÄ£ÐÍ¿ÉÄÜ»áʹÓÃCTR¡¢A/B²âÊԵȡ£
Ó¦ÓÃÄ£ÐÍ
Ò»¸ö¾¹ýÑéÖ¤¿ÉÒÔͶÈëʹÓõÄÄ£ÐÍ£¬¿ÉÄÜ»áÌṩһ¸öÌØÊâµÄ½á¹ûÊý¾Ý¼¯£¬ÎÒÃǸù¾ÝÓ¦ÓõÄÐèÒª¶ÔÆä½øÐнøÒ»²½´¦Àí£¬±ÈÈçÍÆ¼öÄ£ÐÍÖеÄÎïÆ·¼¯ºÏºÜ´ó£¬¿ÉÒÔͨ¹ý¶ÔÍÆ¼öµÄÎïÆ·½á¹û¼¯½øÐÐÔÙ¼Ó¹¤´¦Àí£¬¶ÔÖ§³ÖµÄÓ¦ÓÃÌṩ¿ìËٵIJéѯ·þÎñ¡£Ä£ÐÍÒ²¿ÉÄÜÊÇÒ»¸ö²»¿É¶ÁµÄÄ£ÐÍ£¬ÕâÖÖÇé¿öÎÒÃÇ¿ÉÄÜÐèÒª»ùÓÚ¸ÃÄ£ÐÍ¿ª·¢Ò»¸ö·þÎñ£¬Ö±½Ó¶ÔÍâÌṩģÐÍ·þÎñ¡£
¾ßÌåµÄÈçºÎʹÓÃÄ£ÐÍ£¬ÕâÒªÒÀÀµÓÚʵ¼ÊÓ¦ÓõÄÌØµãºÍʹÓ÷½Ê½¡£
Spark ML Pipeline¼ò½é
Spark ML Pipeline»ùÓÚDataFrame¹¹½¨ÁËÒ»Ì×High-level API£¬ÎÒÃÇ¿ÉÒÔʹÓÃMLPipeline¹¹½¨»úÆ÷ѧϰӦÓã¬ËüÄܹ»½«Ò»¸ö»úÆ÷ѧϰӦÓõĶà¸ö´¦Àí¹ý³Ì×éÖ¯ÆðÀ´£¬Í¨¹ýÔÚ´úÂëʵÏֵļ¶±ð¹ÜÀíºÃÿһ¸ö´¦Àí²½ÖèÖ®¼äµÄÏȺóÔËÐйØÏµ£¬¼«´óµØ¼ò»¯ÁË¿ª·¢»úÆ÷ѧϰӦÓõÄÄѶȡ£
Spark ML PipelineʹÓÃDataFrame×÷Ϊ»úÆ÷ѧϰÊäÈëÊä³öÊý¾Ý¼¯µÄ³éÏó¡£DataFrameÀ´×ÔSpark
SQL£¬±íʾ¶ÔÊý¾Ý¼¯µÄÒ»ÖÖÌØÊâ³éÏó£¬ËüÒ²ÊÇDataset£¨ËüÊÇSpark 1.6ÒýÈëµÄ±íʾ·Ö²¼Ê½Êý¾Ý¼¯µÄ³éÏó½Ó¿Ú£©£¬µ«ÊÇDataFrameͨ¹ýΪÊý¾Ý¼¯ÖÐÿÐÐÊý¾ÝµÄÿÁÐÖ¸¶¨ÁÐÃûµÄ·½Ê½À´×éÖ¯Dataset£¬ÀàËÆÓÚ¹ØÏµÊý¾Ý¿âÖÐµÄ±í£¬Í¬Ê±»¹Ôڵײ㴦Àí×öÁ˷dz£¶àµÄÓÅ»¯¡£DataFrame¿ÉÒÔ»ùÓÚ²»Í¬µÄÊý¾ÝÔ´½øÐй¹½¨£¬±ÈÈç½á¹¹»¯Îļþ¡¢Hive±í¡¢Êý¾Ý¿â¡¢RDDµÈ¡£»òÕ߸üÖ±°×Ò»µã±í´ïʲôÊÇDataFrame£¬¿ÉÒÔÈÏΪËüµÈ¼ÛÓÚDataset[Row]£¬±íʾDataFrameÊÇÒ»¸öRowÀàÐÍÊý¾Ý¶ÔÏóµÄDataset¡£
»úÆ÷ѧϰ¿ÉÒÔ±»Ó¦ÓÃÓÚ¸÷ÖÖÊý¾ÝÀàÐÍ£¬ÀýÈçÏòÁ¿¡¢Îı¾¡¢Í¼Æ¬¡¢½á¹¹»¯Êý¾Ý¡£Spark ML API²ÉÓÃDataFrameµÄÀíÓÉÊÇ£¬À´×ÔSpark
SQLÖеÄDataFrame½Ó¿ÚµÄ³éÏ󣬿ÉÒÔÖ§³Ö·Ç³£¹ã·ºµÄÀàÐÍ£¬¶øÇÒ±í´ï·Ç³£Ö±¹Û£¬±ãÓÚÔÚSparkÖнøÐд¦Àí¡£ËùÒÔ˵£¬DataFrameÊÇSpark
ML×î»ù´¡µÄ¶ÔÊý¾Ý¼¯µÄ³éÏó£¬ËùÓи÷ÖÖML Pipeline×é¼þ¶¼»á»ùÓÚDataFrame¹¹½¨¸ü¼Ó·á¸»¡¢¸´ÔÓµÄÊý¾Ý´¦ÀíÂß¼¡£
Spark ML PipelineÖ÷Òª°üº¬2¸öºËÐĵÄÊý¾Ý´¦Àí×é¼þ£ºTransformer¡¢Estimator£¬ÆäÖÐËüÃǶ¼ÊÇPipelineÖÐPipelineStageµÄ×ÓÀ࣬ÁíÍâһЩ³éÏó£¬ÈçModel¡¢Predictor¡¢Classifier¡¢RegressorµÈ¶¼ÊÇ»ùÓÚÕâÁ½¸öºËÐÄ×é¼þÑÜÉú³öÀ´£¬±ÈÈ磬ModelÊÇÒ»¸öTransformer£¬PredictorÊÇÒ»¸öEstimator£¬ËüÃǵĹØÏµÈçÏÂÀàͼËùʾ£º

»ùÓÚÉÏͼ£¬ÎÒÃǶÔËüÃǽøÐÐÏêϸµÄ˵Ã÷£¬ÈçÏÂËùʾ£º
Transformer
Transformer¶Ô»úÆ÷ѧϰÖÐÒª´¦ÀíµÄÊý¾Ý¼¯½øÐÐת»»²Ù×÷£¬ÀàËÆÓÚSparkÖжÔRDD½øÐеÄTransformation²Ù×÷£¨¶ÔÒ»¸öÊäÈëRDDת»»´¦ÀíºóÉú³ÉÒ»¸öеÄRDD£©£¬TransformerÊǶÔDataFrame½øÐÐת»»¡£ÎÒÃÇ¿ÉÒÔ´ÓTransformerÀàµÄ´úÂë³éÏó¶¨Ò壬À´¿´Ò»ÏÂËü¶¨ÒåµÄ¼¸¸ö²ÎÊý²»Í¬µÄtransform·½·¨£¬ÈçÏÂËùʾ£º
package org.apache.spark.ml
@DeveloperApi
abstract class Transformer extends PipelineStage
{
@Since("2.0.0")
@varargs
def transform(
dataset: Dataset[_],
firstParamPair: ParamPair[_],
otherParamPairs: ParamPair[_]*): DataFrame = {
val map = new ParamMap()
.put(firstParamPair)
.put(otherParamPairs: _*)
transform(dataset, map)
}
@Since("2.0.0")
def transform(dataset: Dataset[_], paramMap: ParamMap):
DataFrame = {
this.copy(paramMap).transform(dataset)
}
@Since("2.0.0")
def transform(dataset: Dataset[_]): DataFrame
override def copy(extra: ParamMap): Transformer
}
|
ÉÏÃæ¶ÔÓ¦µÄ¶à¸ötransform·½·¨£¬¶¼»áÊäÈëÒ»¸öDataset[_]£¬¾¹ýת»»´¦ÀíºóÊä³öÒ»¸öDataFrame£¬Êµ¼ÊÉÏÄã¿ÉÒÔͨ¹ý²é¿´DataFrameµÄ¶¨Ò壬ÆäʵËü¾ÍÊÇÒ»¸öDataset£¬ÈçÏÂËùʾ£º
type DataFrame
= Dataset[Row] |
TransformerÖ÷Òª³éÏóÁËÁ½Àà²Ù×÷£ºÒ»ÀàÊǶÔÌØÕ÷½øÐÐת»»£¬Ëü¿ÉÄÜ»á´ÓÒ»¸öDataFrameÖжÁȡijÁÐÊý¾Ý£¬È»ºóͨ¹ýmapËã·¨½«¸ÃÁÐÊý¾Ýת»»ÎªÐµÄÁÐÊý¾Ý£¬±ÈÈ磬ÊäÈëÒ»¸öDataFrame£¬½«ÊäÈëµÄÔʼһÁÐÎı¾Êý¾Ý£¬×ª»»³ÉÒ»ÁÐÌØÕ÷ÏòÁ¿£¬×îºóÊä³öµÄÊý¾Ý»¹ÊÇÒ»¸öDataFrame£¬¶Ô¸ÃÁÐÊý¾Ýת»»´¦Àíºó»¹Ó³Éäµ½ÊäÈëʱµÄÁÐÃû£¨Í¨¹ý¸ÃÁÐÃû¿ÉÒÔ²Ù×÷¸ÃÁÐÊý¾Ý£©¡£
ÏÂÃæ£¬ÎÒÃÇ¿´Ò»Ï£¬Spark MLLibÖÐʵÏÖµÄTransformerÀà¼Ì³Ð¹ØÏµ£¬ÈçÏÂÀàͼËùʾ£º

Estimator
EstimatorÓÃÀ´ÑµÁ·Ä£ÐÍ£¬ËüµÄÊäÈëÊÇÒ»¸öDataFrame£¬Êä³öÊÇÒ»¸öModel£¬ModelÊÇSpark
MLÖжԻúÆ÷ѧϰģÐ͵ijéÏóºÍ¶¨Ò壬ModelÆäʵÊÇÒ»¸öTransformer¡£Ò»¸ö»úÆ÷ѧϰËã·¨ÊÇ»ùÓÚÒ»¸öÊý¾Ý¼¯½øÐÐѵÁ·µÄ£¬Estimator¶Ô»ùÓÚ¸ÃѵÁ·¼¯µÄ»úÆ÷ѧϰËã·¨½øÐÐÁ˳éÏó¡£ËùÒÔËüµÄÊäÈëÊÇÒ»¸öÊý¾Ý¼¯DataFrame£¬¾¹ýѵÁ·×îÖյõ½Ò»¸öÄ£ÐÍModel¡£
EstimatorÀඨÁËfit·½·¨À´ÊµÏÖ¶ÔÄ£Ð͵ÄѵÁ·£¬ÀàµÄ´úÂëÈçÏÂËùʾ£º
package org.apache.spark.ml
@DeveloperApi
abstract class Estimator[M <: Model[M]] extends
PipelineStage {
@Since("2.0.0")
@varargs
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_],
otherParamPairs: ParamPair[_]*): M = {
val map = new ParamMap()
.put(firstParamPair)
.put(otherParamPairs: _*)
fit(dataset, map)
}
@Since("2.0.0")
def fit(dataset: Dataset[_], paramMap: ParamMap):
M = {
copy(paramMap).fit(dataset)
}
@Since("2.0.0")
def fit(dataset: Dataset[_]): M
@Since("2.0.0")
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]):
Seq[M] = {
paramMaps.map(fit(dataset, _))
}
override def copy(extra: ParamMap): Estimator[M]
}
|
ͨ¹ýÉÏÃæ´úÂë¿ÉÒÔ¿´µ½£¬Estimatorµ÷ÓÃfit·½·¨ÒԺ󣬵õ½Ò»¸öModel£¬Ò²¾ÍÊÇTransformer£¬Ò»¸öTransformerÓÖ¿ÉÒÔ¶ÔÊäÈëµÄDataFrameÖ´Ðб任²Ù×÷¡£
ÏÂÃæ£¬ÎÒÃÇ¿´Ò»Ï£¬Spark MLLibÖÐʵÏÖµÄEstimatorÀ࣬ÈçÏÂÀàͼËùʾ£º

PipelineStage
PipelineStageÊǹ¹½¨Ò»¸öPipelineµÄ»ù±¾ÔªËØ£¬Ëü»òÕßÊÇÒ»¸öTransformer£¬»òÕßÊÇÒ»¸öEstimator¡£
Pipeline
Pipelineʵ¼ÊÉÏÊÇEstimatorµÄʵÏÖÀ࣬һ¸öPipelineÊÇ»ùÓÚ¶à¸öPipelineStage¹¹½¨¶ø³ÉµÄDAGͼ£¬¼òµ¥Ò»µã¿ÉÒÔʹÓÃÏßÐÔµÄPipelineStageÐòÁÐÀ´Íê³É»úÆ÷ѧϰӦÓõĹ¹½¨£¬µ±È»Ò²¿ÉÒÔ¹¹½¨Ïà¶Ô¸´ÔÓһЩµÄPipelineStage
DAGͼ¡£
µ÷ÓÃPipelineµÄfit·½·¨£¬»áÉú³ÉÒ»¸öPipelineModel£¬ËüÊÇModelµÄ×ÓÀ࣬ËùÒÔÒ²¾ÍÊÇÒ»¸öTransformer¡£ÔÚѵÁ·¹ý³ÌÖУ¬PipelineÖеĶà¸öPipelineStageÊÇÔËÐÐÔÚѵÁ·Êý¾Ý¼¯Éϵģ¬×îºóÉú³ÉÁËÒ»¸öModel¡£ÎÒÃÇÒ²¿ÉÒÔ¿´µ½£¬ÑµÁ·Ä£Ð͹ý³ÌÖУ¬´¦ÓÚ×îºóÃæµÄPipelineStageÓ¦¸ÃÊÇÒ»¸ö»ò¶à¸öÁ¬ÐøµÄEstimator£¬ÒòΪֻÓÐEstimatorÔËÐкó²Å»áÉú³ÉModel¡£
½Ó×Å£¬¾ÍÊÇPipelineÖд¦ÓÚѵÁ·½×¶ÎºÍ²âÊÔ½×¶ÎÖ®¼ä£¬±È½ÏÖØÒªµÄÒ»¸öPipelineStageÁË£ºPipelineModel£¬ËüÆðÁ˳ÐÉÏÆôϵÄ×÷Ó㬵÷ÓÃPipelineModelµÄtransform·½·¨£¬°´ÕÕºÍѵÁ·½×¶ÎÀàËÆµÄÊý¾Ý´¦Àí£¨×ª»»£©Á÷³Ì£¬¾¹ýÏàͬµÄ¸÷¸öPipelineState¶ÔÊý¾Ý¼¯½øÐб任£¬×îºó½«ÑµÁ·½×¶ÎÉú³ÉÄ£ÐÍ×÷ÓÃÔÚ²âÊÔÊý¾Ý¼¯ÉÏ£¬´Ó¶øÊµÏÖ×îÖÕµÄÔ¤²âÄ¿µÄ¡£
»ùÓÚSpark ML Pipeline£¬¿ÉÒÔºÜÈÝÒ׵ع¹½¨ÕâÖÖÏßÐÔPipeline£¬ÎÒÃÇ¿ÉÒÔ¿´µ½Ò»¸ö»úÆ÷ѧϰӦÓù¹½¨¹ý³ÌÖУ¨×¼±¸Êý¾Ý¡¢ÑµÁ·Ä£ÐÍ¡¢ÆÀ¹ÀÄ£ÐÍ£©µÄ¸÷¸ö´¦Àí¹ý³Ì£¬¿ÉÒÔͨ¹ýÒ»¸öͬһ¸öPipeline
API½øÐÐÏßÐÔ×éºÏ£¬·Ç³£Ö±¹Û¡¢ÈÝÒ×¹ÜÀí¡£
Spark ML Pipelineʵ¼ù
ÕâÀÎÒÃÇÖ±½Ó¸ù¾ÝSpark ML Pipeline¹Ù·½Îĵµ¸ø³öµÄʾÀý¡ª¡ª»ùÓÚLogistic»Ø¹éʵÏÖÎı¾·ÖÀ࣬À´Ïêϸ˵Ã÷ͨ¹ýSpark
ML Pipeline API¹¹½¨»úÆ÷ѧϰӦÓã¬ÒÔ¼°¾ßÌåÈçºÎʹÓÃËü¡£¹ÙÍø¸ø³öµÄÕâ¸öÀý×ӷdz£Ö±¹Û£¬ºóÐøÓйØÔÚʵ¼ÊÒµÎñ³¡¾°ÖеÄʵ¼ù£¬ÎÒÃǻᵥ¶ÀÔÚÁíһƪÎÄÕÂÖнøÐзÖÏí¡£
³¡¾°ÃèÊö
Õâ¸öʾÀý£º
ÔÚѵÁ·½×¶Î£¬ÐèÒª¸ù¾Ý¸ø¶¨µÄѵÁ·Îı¾ÐÐÊý¾Ý¼¯£¬½«Ã¿¸öµ¥´Ê·ÖÀë³öÀ´£»È»ºó¸ù¾ÝµÃµ½µÄµ¥´Ê£¬Éú³ÉÌØÕ÷ÏòÁ¿£»×îºó»ùÓÚÌØÕ÷ÏòÁ¿£¬Ñ¡ÔñLogistic»Ø¹éËã·¨£¬½øÐÐѵÁ·Ñ§Ï°Éú³ÉLogisticÄ£ÐÍ¡£
ÔÚ²âÊԽ׶Σ¬ÐèÒª°´ÕÕÈçÉÏÏàͬµÄ·½Ê½È¥´¦Àí¸ø¶¨µÄ²âÊÔÊý¾Ý¼¯£¬»ùÓÚѵÁ·½×¶ÎµÃµ½µÄÄ£ÐÍ£¬½øÐÐÔ¤²â¡£
ѵÁ·½×¶Î
ѵÁ·½×¶Î¸÷¸öÊý¾Ý´¦ÀíµÄ²½Ö裬ÈçÏÂͼËùʾ£º

ÉÏͼÖУ¬À¶É«·½¿ò±íʾµÄ¶¼ÊÇTransformer£¬ºìÉ«·½¿ò±íʾEstimator¡£
ÔÚѵÁ·½×¶Î£¬Í¨¹ýPipelineÔËÐÐʱ£¬TokenizerºÍHashingTF¶¼»á½«ÊäÈëµÄDataFrame½øÐÐת»»£¬Éú³ÉеÄDataFrame£»LogisticRegressionÊÇÒ»¸öEstimator£¬µ±µ÷ÓÃLogisticRegressionµÄfit·½·¨Ê±£¬»áÉú³ÉÒ»¸öLogisticRegressionModel£¬ËüÊÇÒ»¸öTransformer£¬¿ÉÒÔÔÚ²âÊÔ½×¶ÎʹÓá£
²âÊÔ½×¶Î
ÉÏÃæµÄ¹ý³Ì¶¼ÊÇÔÚµ÷ÓÃPipelineµÄfit·½·¨Ê±½øÐд¦ÀíµÄ£¬×îºó»áÉú³ÉÒ»¸öPipelineModel£¬ËüÊÇÒ»¸öTransformer£¬»á±»ÓÃÓÚ²âÊԽ׶Ρ£²âÊÔ½×¶ÎÔËÐÐʼÓÚ¸ÃPipelineModel£¬¾ßÌå´¦ÀíÁ÷³ÌÈçÏÂͼËùʾ£º

PipelineModel×÷Ϊһ¸öTransformer£¬Ê×ÏÈÒ²»á¶ÔÊäÈëµÄ²âÊÔÊý¾Ý¼¯Ö´ÐÐת»»²Ù×÷£¬¶Ô±ÈѵÁ·½×¶ÎµÄ´¦ÀíÁ÷³Ì£¬¿ÉÒÔ¿´µ½£¬ÔÚѵÁ·½×¶ÎµÄEstimator¶¼±ä³ÉÁËTransformer£¬ÒòΪÎÒÃÇÔÚ²âÊԽ׶εÄÊä³ö¾ÍÊÇÒ»¸ö½á¹û¼¯DataFrame£¬¶ø²»ÐèҪѵÁ·½×¶ÎÉú³ÉModelÁË¡£
ʾÀý´úÂë
Ê×ÏÈ£¬×¼±¸Ä£ÄâµÄѵÁ·Êý¾Ý¼¯£¬´úÂëÈçÏÂËùʾ£º
val training =
spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
|
Ä£ÄâµÄѵÁ·Êý¾Ý¼¯ÖУ¬ÓÐ3¸ö×ֶΣ¬·Ö±ðΪID¡¢Îı¾ÄÚÈÝ¡¢±êÇ©¡£ÔÚʵ¼ÊÓ¦ÓÃÖУ¬ÎÒÃÇÓ¦¸ÃÊÇ´ÓÖ¸¶¨µÄÎļþϵͳÖÐÈ¥¶ÁÈ¡Êý¾Ý£¬ÈçHDFS£¬Ö»ÐèÒª¸ù¾ÝÐèÒªÐ޸ļ´¿É¡£
Æä´Î£¬´´½¨Ò»¸öPipeline¶ÔÏó£¬Í¬Ê±ÉèÖöÔÓ¦µÄ¶à¸ö˳ÐòÖ´ÐеÄPipelineStage£¬´úÂëÈçÏÂËùʾ£º
val tokenizer
= new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr)) //
°üº¬3¸öPipelineStage
|
½Ó×Å£¬¾Í¿ÉÒÔ»ùÓÚѵÁ·Êý¾Ý¼¯½øÐÐѵÁ·²Ù×÷ÁË£¬´úÂëÈçÏÂËùʾ£º
val model = pipeline.fit(training)
|
µ÷ÓÃPipelineµÄfit·½·¨Éú³ÉÁËÒ»¸öModel£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ýʵ¼ÊÇé¿ö£¬Ñ¡ÔñÊÇ·ñ½«Éú³ÉÄ£ÐͽøÐб£´æ£¨ÒÔ±ãºóÐøÖØÐ¼ÓÔØÊ¹ÓÃÄ£ÐÍ£©£¬ÈçÏÂËùʾ£º
// Now we can
optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
|
È»ºó£¬´´½¨Ò»¸öÄ£Äâ²âÊÔÊý¾Ý¼¯£¬ÓÃÀ´²âÊÔÇ°ÃæÑµÁ·Éú³ÉµÄÄ£ÐÍ£¬´úÂëÈçÏÂËùʾ£º
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
|
²âÊÔÊý¾Ý¼¯ÖУ¬±êÇ©£¨Label£©¶¼ÊÇδ֪µÄ£¬Í¨¹ý½«Ç°ÃæÉú³ÉµÄÄ£ÐÍ×÷ÓÃÔڸòâÊÔÊý¾Ý¼¯ÉÏ£¬¾Í»áÔ¤²âÉú³É¶ÔÓ¦µÄ±êÇ©Êý¾Ý£¬´úÂëÈçÏÂËùʾ£º
// Make predictions
on test documents.
model.transform(test)
.select("id", "text", "probability",
"prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob:
Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob,
prediction=$prediction")
}
|
ÕâÑù¾ÍÄܹ»»ùÓÚÔ¤²âµÄ½á¹û£¬ÑéÖ¤·ÖÀàÄ£Ð͵Ä׼ȷÐÔ¡£
×îºó£¬¿ÉÒÔ½«Éú³ÉÄ£ÐÍÓÃÓÚʵ¼ÊÓ¦Óó¡¾°ÖУ¬Íê³ÉÐèÒªµÄ¹¦ÄÜ¡£
Óйظü¶àʹÓÃSpark ML PipelineµÄÀý×Ó£¬¿ÉÒԲο¼Spark·¢ÐаüÖУ¬examplesÀïÃæsrc/main/scala/mlÏÂÃæµÄºÜ¶àʾÀý´úÂ룬·Ç³£ºÃµÄѧϰ×ÊÔ´¡£ |