Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
ÎåÖÖ»ùÓÚ MapReduce µÄ²¢ÐмÆËã¿ò¼Ü½éÉܼ°ÐÔÄܲâÊÔ
 
×÷ÕߣºÖÜ Ã÷Ò« À´Ô´£ºIBM ·¢²¼ÓÚ 2016-1-29
  2335  次浏览      30
 

µ±Ê¹Óà Hadoop ¼¼Êõ¼Ü¹¹¼¯Èº£¬¼¯ÈºÄÚÐÂÔö¡¢É¾³ý½Úµã£¬»òÕßij¸ö½Úµã»úÆ÷ÄÚÓ²ÅÌ´æ´¢´ïµ½±¥ºÍֵʱ£¬¶¼»áÔì³É¼¯ÈºÄÚÊý¾Ý·Ö²¼²»¾ùÔÈ¡¢Êý¾Ý¶ªÊ§·çÏÕÔö¼ÓµÈÎÊÌâ³öÏÖ¡£±¾ÎÄ¶Ô HDFS ÄÚ²¿µÄÊý¾Ýƽºâ·½Ê½×öÁ˽éÉÜ£¬Í¨¹ýʵÑé°¸ÀýµÄ·½Ê½Ïò¶ÁÕß½âÊÍÄÚ²¿Êý¾ÝƽºâµÄ½â¾ö°ì·¨¡£

²¢ÐмÆËãÄ£ÐͺͿò¼Ü

Ŀǰ¿ªÔ´ÉçÇøÓÐÐí¶à²¢ÐмÆËãÄ£ÐͺͿò¼Ü¿É¹©Ñ¡Ôñ£¬°´ÕÕʵÏÖ·½Ê½¡¢ÔËÐлúÖÆ¡¢ÒÀ¸½µÄ²úÆ·Éú̬ȦµÈ¿ÉÒÔ±»»®·ÖΪ¼¸¸öÀàÐÍ£¬Ã¿¸öÀàÐ͸÷ÓÐÓÅȱµã£¬Èç¹ûÄܹ»¶Ô¸÷ÀàÐ͵IJ¢ÐмÆËã¿ò¼Ü¶¼½øÐÐÉîÈëÑо¿¼°Êʵ±µÄȱµãÐÞ¸´£¬¾Í¿ÉÒÔΪ²»Í¬Ó²¼þ»·¾³Ïµĺ£Á¿Êý¾Ý·ÖÎöÐèÇóÌṩ²»Í¬µÄÈí¼þ²ãÃæµÄ½â¾ö·½°¸¡£

²¢ÐмÆËã¿ò¼Ü

²¢ÐмÆËã»ò³ÆÆ½ÐмÆËãÊÇÏà¶ÔÓÚ´®ÐмÆËãÀ´ËµµÄ¡£ËüÊÇÒ»ÖÖÒ»´Î¿ÉÖ´Ðжà¸öÖ¸ÁîµÄËã·¨£¬Ä¿µÄÊÇÌá¸ß¼ÆËãËÙ¶È£¬ÒÔ¼°Í¨¹ýÀ©´óÎÊÌâÇó½â¹æÄ££¬½â¾ö´óÐͶø¸´ÔӵļÆËãÎÊÌâ¡£Ëùν²¢ÐмÆËã¿É·ÖΪʱ¼äÉϵIJ¢ÐкͿռäÉϵIJ¢ÐС£Ê±¼äÉϵIJ¢ÐоÍÊÇÖ¸Á÷Ë®Ïß¼¼Êõ£¬¶ø¿Õ¼äÉϵIJ¢ÐÐÔòÊÇÖ¸Óöà¸ö´¦ÀíÆ÷²¢·¢µÄÖ´ÐмÆËã¡£²¢ÐмÆË㣨Parallel Computing£©ÊÇָͬʱʹÓöàÖÖ¼ÆËã×ÊÔ´½â¾ö¼ÆËãÎÊÌâµÄ¹ý³Ì£¬ÊÇÌá¸ß¼ÆËã»úϵͳ¼ÆËãËٶȺʹ¦ÀíÄÜÁ¦µÄÒ»ÖÖÓÐЧÊֶΡ£ËüµÄ»ù±¾Ë¼ÏëÊÇÓöà¸ö´¦ÀíÆ÷À´Ð­Í¬Çó½âͬһÎÊÌ⣬¼´½«±»Çó½âµÄÎÊÌâ·Ö½â³ÉÈô¸É¸ö²¿·Ö£¬¸÷²¿·Ö¾ùÓÉÒ»¸ö¶ÀÁ¢µÄ´¦Àí»úÀ´²¢ÐмÆËã¡£²¢ÐмÆËãϵͳ¼È¿ÉÒÔÊÇרÃÅÉè¼ÆµÄ¡¢º¬Óжà¸ö´¦ÀíÆ÷µÄ³¬¼¶¼ÆËã»ú£¬Ò²¿ÉÒÔÊÇÒÔijÖÖ·½Ê½»¥Á¬µÄÈô¸Ę́µÄ¶ÀÁ¢¼ÆËã»ú¹¹³ÉµÄ¼¯Èº¡£Í¨¹ý²¢ÐмÆË㼯ȺÍê³ÉÊý¾ÝµÄ´¦Àí£¬ÔÙ½«´¦ÀíµÄ½á¹û·µ»Ø¸øÓû§¡£

¹úÄÚÍâÑо¿

Å·ÃÀ·¢´ï¹ú¼Ò¶ÔÓÚ²¢ÐмÆËã¼¼ÊõµÄÑо¿ÒªÔ¶Ô¶ÔçÓÚÎÒ¹ú£¬´Ó×î³õµÄ²¢ÐмÆËãÖð½¥¹ý¶Éµ½Íø¸ñ¼ÆËã£¬Ëæ×Å Internet ÍøÂç×ÊÔ´µÄѸËÙÅòÕÍ£¬ÒòÌØÍøÈÝÄÉÁ˺£Á¿µÄ¸÷ÖÖÀàÐ͵ÄÊý¾ÝºÍÐÅÏ¢¡£º£Á¿Êý¾ÝµÄ´¦Àí¶Ô·þÎñÆ÷ CPU¡¢IO µÄÍÌͶ¼ÊÇÑϾþµÄ¿¼Ñ飬²»ÂÛÊÇ´¦ÀíËÙ¶È¡¢´æ´¢¿Õ¼ä¡¢ÈÝ´íÐÔ£¬»¹ÊÇÔÚ·ÃÎÊËٶȵȷ½Ã棬´«Í³µÄ¼¼Êõ¼Ü¹¹ºÍ½ö¿¿µ¥Ì¨¼ÆËã»ú»ùÓÚ´®Ðеķ½Ê½Ô½À´Ô½²»ÊÊÓ¦µ±Ç°º£Á¿Êý¾Ý´¦ÀíµÄÒªÇó¡£¹úÄÚÍâѧÕßÌá³öºÜ¶àº£Á¿Êý¾Ý´¦Àí·½·¨£¬ÒÔ¸ÄÉÆº£Á¿Êý¾Ý´¦Àí´æÔÚµÄÖî¶àÎÊÌâ¡£

ĿǰÒÑÓеĺ£Á¿Êý¾Ý´¦Àí·½·¨ÔÚ¸ÅÄîÉϽÏÈÝÒ×Àí½â£¬È»¶øÓÉÓÚÊý¾ÝÁ¿¾Þ´ó£¬ÒªÔڿɽÓÊܵÄʱ¼äÄÚÍê³ÉÏàÓ¦µÄ´¦Àí£¬Ö»Óн«ÕâЩ¼ÆËã½øÐв¢Ðл¯´¦Àí£¬Í¨¹ýÌáÈ¡³ö´¦Àí¹ý³ÌÖдæÔڵĿɲ¢Ðй¤×÷µÄ·ÖÁ¿£¬Ó÷ֲ¼Ê½Ä£ÐÍÀ´ÊµÏÖÕâЩ²¢ÐзÖÁ¿µÄ²¢ÐÐÖ´Ðйý³Ì¡£Ëæ×ż¼ÊõµÄ·¢Õ¹£¬µ¥»úµÄÐÔÄÜÓÐÁËÍ»·ÉÃͽøµÄ·¢Õ¹±ä»¯£¬ÓÈÆäÊÇÄÚ´æºÍ´¦ÀíÆ÷µÈÓ²¼þ¼¼Êõ£¬µ«ÊÇÓ²¼þ¼¼ÊõµÄ·¢Õ¹ÔÚÀíÂÛÉÏ×ÜÊÇÓÐÏ޶ȵģ¬Èç¹û˵Ӳ¼þµÄ·¢Õ¹ÔÚ×ÝÏòÉÏÌá¸ßÁËϵͳµÄÐÔÄÜ£¬ÄÇô²¢Ðм¼ÊõµÄ·¢Õ¹¾ÍÊÇ´ÓºáÏòÉÏÍØÕ¹ÁË´¦ÀíµÄ·½Ê½¡£

2003 ÄêÃÀ¹ú Google ¹«Ë¾¶ÔÍâ·¢²¼ÁË MapReduce¡¢GFS¡¢BigData ÈýƪÂÛÎÄ£¬ÖÁ´ËÕýʽ½«²¢ÐмÆËã¿ò¼ÜÂäµØÎª MapReduce ¿ò¼Ü¡£

ÎÒ¹úµÄ²¢Ðкͷֲ¼Ê½¼ÆËã¼¼ÊõÑо¿ÆðÔ´ÓÚ 60 Äê´úÄ©£¬°´ÕÕ¹ú·À¿Æ¼¼´óѧÖÜÐËÃúԺʿÌá³öµÄ¹Ûµã£¬µ½Ä¿Ç°ÎªÖ¹ÒѾ­Èý¸ö½×¶ÎÁË¡£µÚÒ»½×¶Î£¬×Ô 60 Äê´úÄ©ÖÁ 70 Äê´úÄ©£¬Ö÷Òª´ÓÊ´óÐÍ»úÄڵIJ¢Ðд¦Àí¼¼ÊõÑо¿£»µÚ¶þ½×¶Î£¬×Ô 70 Äê´úÄ©ÖÁ 90 Äê´ú³õ£¬Ö÷Òª´ÓÊÂÏòÁ¿»úºÍ²¢ÐжദÀíÆ÷ϵͳÑо¿£»µÚÈý½×¶Î£¬×Ô 80 Äê´úÄ©ÖÁ½ñ£¬Ö÷Òª´ÓÊ MPP(Massively Parallel Processor) ϵͳÑо¿¡£

¾¡¹ÜÎÒ¹úÔÚ²¢ÐмÆËã·½Ãæ¿ªÕ¹µÄÑо¿ºÍÓ¦ÓýÏÔ磬ĿǰҲӵÓкܶàµÄ²¢ÐмÆËã×ÊÔ´£¬µ«Ñо¿ºÍÓ¦ÓõijÉЧÏà¶ÔÃÀ¹ú»¹´æÔڽϴóµÄ²î¾à£¬Óдý½øÒ»²½µÄÌá¸ßºÍ·¢Õ¹¡£

MapReduce

MapReduce ÊÇÓɹȸèÍÆ³öµÄÒ»¸ö±à³ÌÄ£ÐÍ£¬ÊÇÒ»¸öÄÜ´¦ÀíºÍÉú³É³¬´óÊý¾Ý¼¯µÄË㷨ģÐÍ£¬¸Ã¼Ü¹¹Äܹ»ÔÚ´óÁ¿ÆÕͨÅäÖõļÆËã»úÉÏʵÏÖ²¢Ðл¯´¦Àí¡£MapReduce ±à³ÌÄ£ÐͽáºÏÓû§ÊµÏÖµÄ Map ºÍ Reduce º¯Êý¡£Óû§×Ô¶¨ÒåµÄ Map º¯Êý´¦ÀíÒ»¸öÊäÈëµÄ»ùÓÚ key/value pair µÄ¼¯ºÏ£¬Êä³öÖмä»ùÓÚ key/value pair µÄ¼¯ºÏ£¬MapReduce ¿â°ÑÖмäËùÓоßÓÐÏàͬ key ÖµµÄ value Öµ¼¯ºÏÔÚÒ»Æðºó´«µÝ¸ø Reduce º¯Êý£¬Óû§×Ô¶¨ÒåµÄ Reduce º¯ÊýºÏ²¢ËùÓоßÓÐÏàͬ key ÖµµÄ value Öµ£¬ÐγÉÒ»¸ö½ÏС value ÖµµÄ¼¯ºÏ¡£Ò»°ãµØ£¬Ò»¸öµäÐ굀 MapReduce ³ÌÐòµÄÖ´ÐÐÁ÷³ÌÈçͼ 1 Ëùʾ¡£

ͼ 1 .MapReduce ³ÌÐòÖ´ÐÐÁ÷³Ìͼ

MapReduce Ö´Ðйý³ÌÖ÷Òª°üÀ¨£º

½«ÊäÈëµÄº£Á¿Êý¾ÝÇÐÆ¬·Ö¸ø²»Í¬µÄ»úÆ÷´¦Àí£»

Ö´ÐÐ Map ÈÎÎñµÄ Worker ½«ÊäÈëÊý¾Ý½âÎö³É key/value pair£¬Óû§¶¨ÒåµÄ Map º¯Êý°ÑÊäÈëµÄ key/value pair ת³ÉÖмäÐÎʽµÄ key/value pair£»

°´ÕÕ key Öµ¶ÔÖмäÐÎʽµÄ key/value ½øÐÐÅÅÐò¡¢¾ÛºÏ£»

°Ñ²»Í¬µÄ key ÖµºÍÏàÓ¦µÄ value ¼¯·ÖÅ䏸²»Í¬µÄ»úÆ÷£¬Íê³É Reduce ÔËË㣻

Êä³ö Reduce ½á¹û¡£

ÈÎÎñ³É¹¦Íê³Éºó£¬MapReduce µÄÊä³ö´æ·ÅÔÚ R ¸öÊä³öÎļþÖУ¬Ò»°ãÇé¿öÏ£¬Õâ R ¸öÊä³öÎļþ²»ÐèÒªºÏ²¢³ÉÒ»¸öÎļþ£¬¶øÊÇ×÷ΪÁíÍâÒ»¸ö MapReduce µÄÊäÈ룬»òÕßÔÚÁíÒ»¸ö¿É´¦Àí¶à¸ö·Ö¸îÎļþµÄ·Ö²¼Ê½Ó¦ÓÃÖÐʹÓá£

ÊÜ Google MapReduce Æô·¢£¬Ðí¶àÑо¿ÕßÔÚ²»Í¬µÄʵÑéÆ½Ì¨ÉÏʵÏÖÁË MapReduce ¿ò¼Ü£¬±¾ÎĽ«¶Ô Apache Hadoop MapReduce¡¢Apache¡¢Spark¡¢Ë¹Ì¹¸£´óѧµÄ Phoenix£¬Nokia Ñз¢µÄ Disco£¬ÒÔ¼°Ïã¸Û¿Æ¼¼´óѧµÄ Mars µÈ 5 ¸ö MapReduce ʵÏÖ¿ò¼Ü½øÐÐÖðÒ»½éÉܺ͸÷·½Ãæ¶Ô±È¡£

Hadoop MapReduce

Hadoop µÄÉè¼ÆË¼Â·À´Ô´ÓÚ Google µÄ GFS ºÍ MapReduce¡£ËüÊÇÒ»¸ö¿ªÔ´Èí¼þ¿ò¼Ü£¬Í¨¹ýÔÚ¼¯Èº¼ÆËã»úÖÐʹÓüòµ¥µÄ±à³ÌÄ£ÐÍ£¬¿É±àдºÍÔËÐзֲ¼Ê½Ó¦ÓóÌÐò´¦Àí´ó¹æÄ£Êý¾Ý¡£Hadoop °üº¬Èý¸ö×ÓÏîÄ¿£ºHadoop Common¡¢Hadoop Distributed File System(HDFS) ºÍ Hadoop MapReduce¡£

µÚÒ»´ú Hadoop MapReduce ÊÇÒ»¸öÔÚ¼ÆËã»ú¼¯ÈºÉÏ·Ö²¼Ê½´¦Àíº£Á¿Êý¾Ý¼¯µÄÈí¼þ¿ò¼Ü£¬°üÀ¨Ò»¸ö JobTracker ºÍÒ»¶¨ÊýÁ¿µÄ TaskTracker¡£ÔËÐÐÁ÷³ÌͼÈçͼ 2 Ëùʾ¡£

ͼ 2 .Hadoop MapReduce ϵͳ¼Ü¹¹Í¼

ÔÚ×îÉϲãÓÐ 4 ¸ö¶ÀÁ¢µÄʵÌ壬¼´¿Í»§¶Ë¡¢JobTracker¡¢TaskTracker ºÍ·Ö²¼Ê½Îļþϵͳ¡£¿Í»§¶ËÌá½» MapReduce ×÷Òµ£»JobTracker Эµ÷×÷ÒµµÄÔËÐУ»JobTracker ÊÇÒ»¸ö Java Ó¦ÓóÌÐò£¬ËüµÄÖ÷ÀàÊÇ JobTracker£»TaskTracker ÔËÐÐ×÷Òµ»®·ÖºóµÄÈÎÎñ£¬TaskTracker Ò²ÊÇÒ»¸ö Java Ó¦ÓóÌÐò£¬ËüµÄÖ÷ÀàÊÇ TaskTracker¡£Hadoop ÔËÐÐ MapReduce ×÷ÒµµÄ²½ÖèÖ÷Òª°üÀ¨Ìá½»×÷Òµ¡¢³õʼ»¯×÷Òµ¡¢·ÖÅäÈÎÎñ¡¢Ö´ÐÐÈÎÎñ¡¢¸üнø¶ÈºÍ״̬¡¢Íê³É×÷ÒµµÈ 6 ¸ö²½Öè¡£

Spark MapReduce

Spark ÊÇÒ»¸ö»ùÓÚÄÚ´æ¼ÆËãµÄ¿ªÔ´µÄ¼¯Èº¼ÆËãϵͳ£¬Ä¿µÄÊÇÈÃÊý¾Ý·ÖÎö¸ü¼Ó¿ìËÙ¡£Spark ·Ç³£Ð¡ÇÉÁáç磬ÓɼÓÖݲ®¿ËÀû´óѧ AMP ʵÑéÊÒµÄ Matei ΪÖ÷µÄСÍŶÓËù¿ª·¢¡£Ê¹ÓõÄÓïÑÔÊÇ Scala£¬ÏîÄ¿µÄºËÐIJ¿·ÖµÄ´úÂëÖ»ÓÐ 63 ¸ö Scala Îļþ£¬·Ç³£¶ÌС¾«º·¡£Spark ÆôÓÃÁËÄÚ´æ·Ö²¼Êý¾Ý¼¯£¬³ýÁËÄܹ»Ìṩ½»»¥Ê½²éѯÍ⣬Ëü»¹¿ÉÒÔÓÅ»¯µü´ú¹¤×÷¸ºÔØ¡£Spark ÌṩÁË»ùÓÚÄÚ´æµÄ¼ÆË㼯Ⱥ£¬ÔÚ·ÖÎöÊý¾Ýʱ½«Êý¾Ýµ¼ÈëÄÚ´æÒÔʵÏÖ¿ìËÙ²éѯ£¬¡°Ëٶȱȡ±»ùÓÚ´ÅÅ̵Äϵͳ£¬Èç±È Hadoop ¿ìºÜ¶à¡£Spark ×î³õÊÇΪÁË´¦Àíµü´úËã·¨£¬Èç»úÆ÷ѧϰ¡¢Í¼ÍÚ¾òËã·¨µÈ£¬ÒÔ¼°½»»¥Ê½Êý¾ÝÍÚ¾òËã·¨¶ø¿ª·¢µÄ¡£ÔÚÕâÁ½ÖÖ³¡¾°Ï£¬Spark µÄÔËÐÐËÙ¶È¿ÉÒÔ´ïµ½ Hadoop µÄ¼¸°Ù±¶¡£

Disco

Disco ÊÇÓÉ Nokia Ñо¿ÖÐÐÄ¿ª·¢µÄ£¬»ùÓÚ MapReduce µÄ·Ö²¼Ê½Êý¾Ý´¦Àí¿ò¼Ü£¬ºËÐIJ¿·ÖÓÉ Erlang ÓïÑÔ¿ª·¢£¬Íⲿ±à³Ì½Ó¿ÚΪ Python ÓïÑÔ¡£Disco ÊÇÒ»¸ö¿ª·ÅÔ´´úÂëµÄ´ó¹æÄ£Êý¾Ý·ÖÎöƽ̨£¬Ö§³Ö´óÊý¾Ý¼¯µÄ²¢ÐмÆË㣬ÄÜÔËÐÐÔÚ²»¿É¿¿µÄ¼¯Èº¼ÆËã»úÉÏ¡£Disco ¿É²¿ÊðÔÚ¼¯ÈººÍ¶àºË¼ÆËã»úÉÏ£¬»¹¿É²¿ÊðÔÚ Amazon EC2 ÉÏ¡£Disco »ùÓÚÖ÷/´Ó¼Ü¹¹ (Master/Slave)£¬Í¼ 3 ×ÜÌåÉè¼Æ¼Ü¹¹Í¼Õ¹Ê¾ÁËͨ¹ýһ̨Ö÷½Úµã (Master) ·þÎñÆ÷¿ØÖƶą̀´Ó½Úµã (Slave) ·þÎñÆ÷µÄ×ÜÌåÉè¼Æ¼Ü¹¹¡£

ͼ 3 .Disco ×ÜÌå¼Ü¹¹Í¼

Disco ÔËÐÐ MapReduce ²½ÖèÈçÏ£º

Disco Óû§Ê¹Óà Python ½Å±¾¿ªÊ¼ Disco ×÷Òµ£»

×÷ÒµÇëÇóͨ¹ý HTTP ·¢Ë͵½Ö÷»ú£»

Ö÷»úÊÇÒ»¸ö Erlang ½ø³Ì£¬Í¨¹ý HTTP ½ÓÊÕ×÷ÒµÇëÇó£»

Ö÷»úͨ¹ý SSH Æô¶¯Ã¿¸ö½Úµã´¦µÄ´Ó»ú£»

´Ó»úÔÚ Worker ½ø³ÌÖÐÔËÐÐ Disco ÈÎÎñ¡£

Phoenix

Phoenix ×÷Ϊ˹̹¸£´óѧ EE382a ¿Î³ÌµÄÒ»ÀàÏîÄ¿£¬ÓÉ˹̹¸£´óѧ¼ÆËã»úϵͳʵÑéÊÒ¿ª·¢¡£Phoenix ¶Ô MapReduce µÄʵÏÖÔ­ÔòºÍ×î³õÓÉ Google ʵÏÖµÄ MapReduce »ù±¾Ïàͬ¡£²»Í¬µÄÊÇ£¬ËüÔÚ¼¯ÈºÖÐÒÔʵÏÖ¹²ÏíÄÚ´æÏµÍ³ÎªÄ¿µÄ£¬¹²ÏíÄÚ´æÄÜ×îС»¯ÓÉÈÎÎñÅÉÉúºÍÊý¾Ý¼äµÄͨÐÅËùÔì³ÉµÄ¼ä½Ó³É±¾¡£Phoenix ¿É±à³Ì¶àºËоƬ»ò¹²ÏíÄÚ´æ¶àºË´¦ÀíÆ÷ (SMPs ºÍ ccNUMAs)£¬ÓÃÓÚÊý¾ÝÃܼ¯ÐÍÈÎÎñ´¦Àí¡£

Mars

Mars ÊÇÏã¸Û¿Æ¼¼´óѧÓë΢Èí¡¢ÐÂÀ˺Ï×÷¿ª·¢µÄ»ùÓÚ GPU µÄ MapReduce ¿ò¼Ü¡£Ä¿Ç°ÒѾ­°üº¬×Ö·û´®Æ¥Åä¡¢¾ØÕó³Ë·¨¡¢µ¹ÅÅË÷Òý¡¢×Ö´Êͳ¼Æ¡¢ÍøÒ³·ÃÎÊÅÅÃû¡¢ÍøÒ³·ÃÎʼÆÊý¡¢ÏàËÆÐÔÆÀ¹ÀºÍ K ¾ùÖµµÈ 8 ÏîÓ¦Óã¬Äܹ»ÔÚ 32 λÓë 64 λµÄ Linux ƽ̨ÉÏÔËÐС£Mars ¿ò¼ÜʵÏÖ·½Ê½ºÍ»ùÓÚ CPU µÄ MapReduce ¿ò¼Ü·Ç³£ÀàËÆ£¬Ò²ÓÉ Map ºÍ Reduce Á½¸ö½×¶Î×é³É£¬ËüµÄ»ù±¾¹¤×÷Á÷³ÌͼÈçͼ 4 Ëùʾ¡£

ͼ 4 .Mars »ù±¾¹¤×÷Á÷³Ìͼ

ÔÚ¿ªÊ¼Ã¿¸ö½×¶Î֮ǰ£¬Mars ³õʼ»¯Ïß³ÌÅäÖ㬰üÀ¨ GPU ÉÏÏß³Ì×éµÄÊýÁ¿ºÍÿ¸öÏß³Ì×éÖÐÏ̵߳ÄÊýÁ¿¡£Mars ÔÚ GPU ÄÚʹÓôóÁ¿µÄỊ̈߳¬ÔÚÔËÐÐʱ½×¶Î»á¾ùÔÈ·ÖÅäÈÎÎñ¸øỊ̈߳¬Ã¿¸öÏ̸߳ºÔðÒ»¸ö Map »ò Reduce ÈÎÎñ£¬ÒÔСÊýÁ¿µÄ key/value ¶Ô×÷ΪÊäÈ룬²¢Í¨¹ýÒ»ÖÖÎÞËøµÄ·½°¸À´¹ÜÀí MapReduce ¿ò¼ÜÖеIJ¢·¢Ð´Èë¡£

Mars µÄ¹¤×÷Á÷³ÌÖ÷ÒªÓÐ 7 ¸ö²Ù×÷²½Ö裺

ÔÚÖ÷´æ´¢Æ÷ÖÐÊäÈë key/value ¶Ô£¬²¢½«ËüÃÇ´æ´¢µ½Êý×飻

³õʼ»¯ÔËÐÐʱµÄÅäÖòÎÊý£»

¸´ÖÆÖ÷´æ´¢Æ÷ÖеÄÊäÈëÊý×éµ½ GPU É豸Äڴ棻

Æô¶¯ GPU É쵀 Map ½×¶Î£¬²¢½«ÖмäµÄ key/value ¶Ô´æ´¢µ½Êý×飻

Èç¹û mSort Ñ¡Ôñ F£¬¼´ÐèÒªÅÅÐò½×¶Î£¬Ôò¶ÔÖмä½á¹û½øÐÐÅÅÐò£»

Èç¹û noReduce ÊÇ F£¬¼´ÐèÒª Reduce ½×¶Î£¬ÔòÆô¶¯ GPU É쵀 Reduce ½×¶Î£¬²¢Êä³ö×îÖÕ½á¹û£¬·ñÔòÖмä½á¹û¾ÍÊÇ×îÖÕ½á¹û£»

¸´ÖÆ GPU É豸´æ´¢Æ÷ÖеĽá¹ûµ½Ö÷´æ´¢Æ÷¡£

ÉÏÊö²½ÖèµÄ 1£¬2£¬3£¬7 ÕâËĸö²½ÖèµÄ²Ù×÷Óɵ÷¶ÈÆ÷À´Íê³É£¬µ÷¶ÈÆ÷¸ºÔð×¼±¸Êý¾ÝÊäÈ룬ÔÚ GPU Éϵ÷Óà Map ºÍ Reduce ½×¶Î£¬²¢½«½á¹û·µ»Ø¸øÓû§¡£

ÎåÖÖ¿ò¼ÜµÄÓÅȱµã±È½Ï

±í 1. ÎåÖÖ¿ò¼ÜÓÅȱµã±È½Ï

WordCount ʵÑé

»ù±¾Ô­Àí

µ¥´Ê¼ÆÊý (WordCount) ÊÇ×î¼òµ¥Ò²ÊÇ×îÄÜÌåÏÖ MapReduce ˼ÏëµÄ³ÌÐòÖ®Ò»£¬¿ÉÒÔ³ÆÎª MapReduce °æ"Hello World"¡£µ¥´Ê¼ÆÊýÖ÷ÒªÍê³É¹¦ÄÜÊÇ£ºÍ³¼ÆÒ»ÏµÁÐÎı¾ÎļþÖÐÿ¸öµ¥´Ê³öÏֵĴÎÊý¡£

±¾´ÎʵÑé²½Öè

±¾´ÎʵÑéµÄÓ²¼þ×ÊÔ´»ùÓÚ x86 ·þÎñÆ÷ 1 ̨£¬ÅäÖÃΪÄÚ´æ 32GB DDR3¡¢E5 CPU/12 ºË¡¢GPU£¬ÊµÑéÊý¾ÝÑù±¾Îª 10M/50M/100M/500M/1000M µÄÎı¾ÎļþÎå¸ö£¬ÎÒÃÇʹÓà Hadoop MapReduce¡¢Spark¡¢Phoenix¡¢Disco¡¢Mars µÈ MapReduce ¿ò¼Ü·Ö±ðÔËÐÐÎı¾·ÖÎö³ÌÐò£¬»ùÓÚ½á¹ûÒ»ÖµÄǰÌáÏÂͳ¼Æ³öÔËÐÐʱ¼ä¡¢ÔËÐÐʱ CPU Õ¼ÓÐÂÊ¡¢ÔËÐÐʱÄÚ´æÕ¼ÓÐÂʵÈÊý¾Ý£¬²¢²ÉÓÃÕâЩÊý¾Ý»æÖƳÉÖù״ͼ¡£

Hadoop MapReduce

Ê×ÏÈÐèÒª½«Îļþ²ð·Ö³É splits£¬ÓÉÓÚ²âÊÔÓõÄÎļþ½ÏС£¬ËùÒÔÿ¸öÎļþΪһ¸ö split£¬²¢½«Îļþ°´ÐзָîÐγÉ<key,value>¶Ô£¬Í¼ 12 ·Ö¸î¹ý³ÌͼËùʾ¡£ÕâÒ»²½ÓÉ MapReduce ¿ò¼Ü×Ô¶¯Íê³É£¬ÆäÖÐÆ«ÒÆÁ¿£¨¼´ key Öµ£©°üÀ¨Á˻سµËùÕ¼µÄ×Ö·ûÊý£¨Windows ºÍ Linux »·¾³»á²»Í¬£©¡£

ͼ 5 . ·Ö¸î¹ý³Ìͼ

½«·Ö¸îºÃµÄ<key,value>¶Ô½»¸øÓû§¶¨ÒåµÄ map ·½·¨½øÐд¦Àí£¬Éú³ÉеÄ<key,value>¶Ô£¬Í¼ 6 Ö´ÐÐ map ·½·¨Ëùʾ¡£

ͼ 6 . Ö´ÐÐ Map ·½·¨¹ý³Ìͼ

µÃµ½ map ·½·¨Êä³öµÄ<key,value>¶Ôºó£¬Mapper »á½«ËüÃǰ´ÕÕ key Öµ½øÐÐÅÅÐò£¬²¢Ö´ÐÐ Combine ¹ý³Ì£¬½« key ÏàͬµÄ value ÖµÀÛ¼Ó£¬µÃµ½ Mapper µÄ×îÖÕÊä³ö½á¹û¡£Í¼ 7Map ¶ËÅÅÐò¼° Combine ¹ý³ÌËùʾ¡£

ͼ 7 . Map ¶ËÅÅÐò¼° Combine ¹ý³Ì

Reducer ÏÈ¶Ô´Ó Mapper ½ÓÊÕµÄÊý¾Ý½øÐÐÅÅÐò£¬ÔÙ½»ÓÉÓû§×Ô¶¨ÒåµÄ reduce ·½·¨½øÐд¦Àí£¬µÃµ½ÐµÄ<key,value>¶Ô£¬²¢×÷Ϊ WordCount µÄÊä³ö½á¹û£¬Í¼ 15Reduce ¶ËÅÅÐò¼°Êä³ö½á¹ûËùʾ¡£

ͼ 8 . Reduce ¶ËÅÅÐò¼°Êä³ö½á¹ûÁ÷³Ìͼ

Çåµ¥ 1 . µÚÒ»´ú Hadoop MapReduce WordCount ʾÀý´úÂë

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// ¿ªÊ¼ Map ¹ý³Ì
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//±éÀú Map ÀïÃæµÄ×Ö·û´®
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
//¿ªÊ¼ Reduce ¹ý³Ì
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Spark WordCount ʵÑé

Spark Óë Hadoop MapReduce µÄ×î´óÇø±ðÊÇËü°ÑËùÓÐÊý¾Ý±£´æÔÚÄÚ´æÖУ¬Hadoop MapReduce ÐèÒª´ÓÍⲿ´æ´¢½éÖÊÀï°ÑÊý¾Ý¶ÁÈëµ½Äڴ棬Spark ²»ÐèÒªÕâÒ»²½Öè¡£ËüµÄʵÏÖÔ­ÀíÓë Hadoop MapReduce ûÓÐÌ«´óÇø±ð£¬ÕâÀï²»ÔÙÖØ¸´Ô­Àí£¬ÍêÕûµÄÔËÐдúÂëÈçÇåµ¥ 2 Ëùʾ¡£

Çåµ¥ 2 . Spark WordCount ʾÀý´úÂë

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], Integer.parseInt(args[1]));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
//¶¨Òå RDD ones
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
//ones.reduceByKey(func, numPartitions)
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
},10);
//Êä³ö List
List<Tuple2<String, Integer>> output = counts.collect();
Collections.sort(output, new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> t1,
Tuple2<String, Integer> t2) {
if(t1._2 > t2._2) {
return -1;
} else if(t1._2 < t2._2) {
return 1;
}
return 0;
}
});

Disco WordCount ʵÑé

MapReduce ¿ò¼ÜÓÉÓÚ Disco Óзֲ¼Ê½Îļþϵͳ´æÔÚ£¬ËùÒÔÒ»°ãÇé¿ö϶¼²»»áµ¥¶ÀʹÓ㬶¼ÊÇ´Ó·Ö²¼Ê½ÎļþϵͳÄÚÈ¡Êý¾Ýºó¶ÁÈëÄڴ棬ȻºóÔÙÇзÖÊý¾Ý¡¢½øÈë MapReduce ½×¶Î¡£Ê×ÏÈÐèÒªµ÷Óà ddfs µÄ chunk ÃüÁî°ÑÎļþÉÏ´«µ½ DDFS£¬È»ºó¿ªÊ¼±àд MapReduce ³ÌÐò£¬Disco Íâ²ãÓ¦ÓóÌÐò²ÉÓà Python ±àд¡£Map ³ÌÐòʵÀýÈçÇåµ¥ 3 Ëùʾ£¬Reduce ³ÌÐòʾÀýÈçÇåµ¥ 4 Ëùʾ¡£

Çåµ¥ 3 . Map ³ÌÐò¶Î

def fun_map(line, params):
for word in line.split():
yield word, 1

Çåµ¥ 4 . Reduce ³ÌÐò¶Î

def fun_reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)

Çåµ¥ 5 . Map/Reduce ÈÎÎñ

from disco.core import Job, result_iterator
def map(line, params):
for word in line.split():
yield word, 1
def reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
if __name__ == '__main__':
job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
map=map,
reduce=reduce)
for word, count in result_iterator(job.wait(show=True)):
print(word, count)
Note

Phoenix WordCount ʵÑé

Phoenix ÊÇ»ùÓÚ CPU µÄ MapReduce ¿ò¼Ü£¬ËùÒÔËüÒ²ÊDzÉÓý«Êý¾Ý·Ö¸îºó¶ÁÈëÄڴ棬Ȼºó¿ªÊ¼ MapReduce ´¦Àí½×¶ÎÕâÑùµÄ´«Í³·½Ê½¡£Phoenix ²¢²»ÓÉÓû§¾ö¶¨ÇзÖÿ¸ö Map ·ÖÅäµ½µÄÊý¾Ý¿éµÄ´óС£¬ËüÊǸù¾Ý¼¯ÈºÏµÍ³µÄʵ¼Ê Cache ´óСÀ´Çзֵģ¬ÕâÑù¿ÉÒÔ±ÜÃâ³öÏÖ·ÖÅäµ½ Map µÄÊý¾Ý¿é¹ý´ó»òÕß¹ýСµÄÇé¿ö³öÏÖ¡£¹ý´óµÄÊý¾Ý¿ì»áµ¼Ö Map Ö´ÐнÏÂý£¬¹ýСµÄÊý¾Ý¿ì»áµ¼Ö Map ×ÊÔ´ÀË·Ñ£¬ÒòΪÿ´ÎÆô¶¯ Map Ï̶߳¼ÐèÒªÏûºÄÒ»¶¨µÄϵͳ×ÊÔ´¡£Map ½×¶ÎÇзֺõÄÎı¾±»¶à¸ö Map ²¢ÐÐÖ´ÐУ¬Phoenix Ö§³Ö 100 ¸ö×óÓÒµÄ Map ²¢ÐÐÖ´ÐУ¬Ò»¸ö¹¤×÷½ÚµãÏ¿ÉÒÔÓÐÈô¸É¸ö Map ²¢ÐÐÖ´ÐС£Ö»Óе±Ò»¸ö¹¤×÷½ÚµãÉÏËùÓÐµÄ Map ÈÎÎñ¶¼½áÊøºó²Å¿ªÊ¼ Reduce ½×¶Î¡£Reduce ½×¶Î¼ÌÐøÑØÓÃÁ˶¯Ì¬ÈÎÎñµ÷¶È»úÖÆ£¬Í¬Ê±ÔÊÐíÓû§×Ô¶¨ÒåÊý¾Ý·ÖÇø¹æÔò¡£

Çåµ¥ 6 . Phoenix µÄ wordCount ³ÌÐò¶Î

#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <ctype.h>
#include <inttypes.h>
#include "map_reduce.h"
#include "stddefines.h"
#include "sort.h"
#define DEFAULT_DISP_NUM 10
typedef struct {
int fpos;
off_t flen;
char *fdata;
int unit_size;
} wc_data_t;
enum {
IN_WORD,
NOT_IN_WORD
};
struct timeval begin, end;
#ifdef TIMING
unsigned int library_time = 0;
#endif
/** mystrcmp()
* Comparison function to compare 2 words
*/
int mystrcmp(const void *s1, const void *s2)
{
return strcmp((const char *)s1, (const char *) s2);
}
/** mykeyvalcmp()
* Comparison function to compare 2 ints
*/
int mykeyvalcmp(const void *v1, const void *v2)
{
keyval_t* kv1 = (keyval_t*)v1;
keyval_t* kv2 = (keyval_t*)v2;
intptr_t *i1 = kv1->val;
intptr_t *i2 = kv2->val;
if (i1 < i2) return 1;
else if (i1 > i2) return -1;
else {
return strcmp((char *)kv1->key, (char *)kv2->key);
//return 0;
}
}
/** wordcount_·Ö¸îÆ÷ ()
* ÄÚ´æÀïÃæ½øÐÐ Map ¼ÆËã
*/
int wordcount_splitter(void *data_in, int req_units, map_args_t *out)
{
wc_data_t * data = (wc_data_t *)data_in;
assert(data_in);
assert(out);
assert(data->flen >= 0);
assert(data->fdata);
assert(req_units);
assert(data->fpos >= 0);
// End of file reached, return FALSE for no more data
if (data->fpos >= data->flen) return 0;
// Set the start of the next data
out->data = (void *)&data->fdata[data->fpos];
// Determine the nominal length
out->length = req_units * data->unit_size;
if (data->fpos + out->length > data->flen)
out->length = data->flen - data->fpos;
// Set the length to end at a space
for (data->fpos += (long)out->length;
data->fpos < data->flen &&
data->fdata[data->fpos] != ' ' && data->fdata[data->fpos] != '\t' &&
data->fdata[data->fpos] != '\r' && data->fdata[data->fpos] != '\n';
data->fpos++, out->length++);
return 1;
}
/** wordcount_locator()
* Return the memory address where this map task would heavily access.
*/
void *wordcount_locator (map_args_t *task)
{
assert (task);
return task->data;
}
/** wordcount_map()
* ¶ÔÎı¾½øÐмÆÊý
*/
void wordcount_map(map_args_t *args)
{
char *curr_start, curr_ltr;
int state = NOT_IN_WORD;
int i;
assert(args);
char *data = (char *)args->data;
assert(data);
curr_start = data;
for (i = 0; i < args->length; i++)
{
curr_ltr = toupper(data[i]);
switch (state)
{
case IN_WORD:
data[i] = curr_ltr;
if ((curr_ltr < 'A' || curr_ltr > 'Z') && curr_ltr != '\'')
{
data[i] = 0;
emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1);
state = NOT_IN_WORD;
}
break;
default:
case NOT_IN_WORD:
if (curr_ltr >= 'A' && curr_ltr <= 'Z')
{
curr_start = &data[i];
data[i] = curr_ltr;
state = IN_WORD;
}
break;
}
}
// Add the last word
if (state == IN_WORD)
{
data[args->length] = 0;
emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1);
}
}
/** wordcount_reduce()
* ¼ÆËã×Ö·û
*/
void wordcount_reduce(void *key_in, iterator_t *itr)
{
char *key = (char *)key_in;
void *val;
intptr_t sum = 0;
assert(key);
assert(itr);
while (iter_next (itr, &val))
{
sum += (intptr_t)val;
}
emit(key, (void *)sum);
}
void *wordcount_combiner (iterator_t *itr)
{
void *val;
intptr_t sum = 0;
assert(itr);
while (iter_next (itr, &val))
{
sum += (intptr_t)val;
}
return (void *)sum;
}
int main(int argc, char *argv[])
{
final_data_t wc_vals;
int i;
int fd;
char * fdata;
int disp_num;
struct stat finfo;
char * fname, * disp_num_str;
struct timeval starttime,endtime;
get_time (&begin);
// È·±£ÎļþÃû
if (argv[1] == NULL)
{
printf("USAGE: %s <filename> [Top # of results to display]\n", argv[0]);
exit(1);
}
fname = argv[1];
disp_num_str = argv[2];
printf("Wordcount: Running...\n");
// ¶ÁÈ¡Îļþ
CHECK_ERROR((fd = open(fname, O_RDONLY)) < 0);
// Get the file info (for file length)
CHECK_ERROR(fstat(fd, &finfo) < 0);
#ifndef NO_MMAP
// ÄÚ´æÀïÃæ¿ªÊ¼µ÷Óà map
CHECK_ERROR((fdata = mmap(0, finfo.st_size + 1,
PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)) == NULL);
#else
int ret;
fdata = (char *)malloc (finfo.st_size);
CHECK_ERROR (fdata == NULL);
ret = read (fd, fdata, finfo.st_size);
CHECK_ERROR (ret != finfo.st_size);
#endif
CHECK_ERROR((disp_num = (disp_num_str == NULL) ?
DEFAULT_DISP_NUM : atoi(disp_num_str)) <= 0);
wc_data_t wc_data;
wc_data.unit_size = 5; // approx 5 bytes per word
wc_data.fpos = 0;
wc_data.flen = finfo.st_size;
wc_data.fdata = fdata;
CHECK_ERROR (map_reduce_init ());
map_reduce_args_t map_reduce_args;
memset(&map_reduce_args, 0, sizeof(map_reduce_args_t));
map_reduce_args.task_data = &wc_data;
map_reduce_args.map = wordcount_map;
map_reduce_args.reduce = wordcount_reduce;
map_reduce_args.combiner = wordcount_combiner;
map_reduce_args.splitter = wordcount_splitter;
map_reduce_args.locator = wordcount_locator;
map_reduce_args.key_cmp = mystrcmp;
map_reduce_args.unit_size = wc_data.unit_size;
map_reduce_args.partition = NULL; // use default
map_reduce_args.result = &wc_vals;
map_reduce_args.data_size = finfo.st_size;
map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 1024 * 2;
map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16;
map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16;
map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2;
printf("Wordcount: Calling MapReduce Scheduler Wordcount\n");
gettimeofday(&starttime,0);
get_time (&end);
#ifdef TIMING
fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin));
#endif
get_time (&begin);
CHECK_ERROR(map_reduce (&map_reduce_args) < 0);
get_time (&end);
#ifdef TIMING
library_time += time_diff (&end, &begin);
#endif
get_time (&begin);
gettimeofday(&endtime,0);
printf("Wordcount: Completed %ld\n",(endtime.tv_sec - starttime.tv_sec));
printf("Wordcount: MapReduce Completed\n");
printf("Wordcount: Calling MapReduce Scheduler Sort\n");
mapreduce_sort(wc_vals.data, wc_vals.length, sizeof(keyval_t), mykeyvalcmp);
CHECK_ERROR (map_reduce_finalize ());
printf("Wordcount: MapReduce Completed\n");
dprintf("\nWordcount: Results (TOP %d):\n", disp_num);
for (i = 0; i < disp_num && i < wc_vals.length; i++)
{
keyval_t * curr = &((keyval_t *)wc_vals.data)[i];
dprintf("%15s - %" PRIdPTR "\n", (char *)curr->key, (intptr_t)curr->val);
}
free(wc_vals.data);
#ifndef NO_MMAP
CHECK_ERROR(munmap(fdata, finfo.st_size + 1) < 0);
#else
free (fdata);
#endif
CHECK_ERROR(close(fd) < 0);
get_time (&end);
#ifdef TIMING
fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin));
#endif
return 0;
}

Mars MapReduce

Mars ¿ò¼ÜÖУ¬Map ºÍ Reduce µÄ´¦Àí½×¶Î¶¼ÔÚ GPU ÄÚ½øÐУ¬Map ºÍ Reduce µÄ·Ö¸îÊý¾Ý½×¶Î¶¼ÔÚ CPU ÄÚ½øÐУ¬ÕâÊÇÓëÆäËû»ùÓÚ CPU µÄ MapReduce ¿ò¼ÜµÄ×î´ó²»Í¬¡£Mars ¸ü¶àµÄÊÇÀûÓà CPU¡¢GPU »º´æÀ´Ìæ´úÄڴ棬ִÐÐÊý¾Ý·Ö¸î¡¢´¦Àí¹ý³Ì¡£

¾ßÌåµÄ Word count µÄÁ÷³ÌÈçÏÂËùʾ£º

×¼±¸ key/value ¼üÖµ¶Ô£¬½«ÕâЩ¼üÖµ¶Ô´æ´¢ÔÚÊý×éÀïÃæ£»

³õʼ»¯ MapReduce ÉÏÏÂÎÄ£¬ÉèÖòÎÊý (¸ù¾Ý²»Í¬µÄ GPU ÐèÒª¸ù¾Ý CUDA ºËÐÄÊýÄ¿ÉèÖò¢·¢Ïß³ÌÊý)£»

Êý¾ÝÔ¤´¦Àí£¬Ê×ÏÈ´ò¿ªÎļþ£¬½«ÎļþËùÓÐÄÚÈݶÁÈëÄڴ棬ȻºóÉêÇëÒ»¿éͬÎļþ´óСµÄÏԴ棬½«ÎļþÄÚÈÝÖÐСд×Ö·ûתΪ´óд (ÕâÑùµÄÓ°Ïì word,Word Ëãͨһ¸öµ¥´Ê)¡£

¿ªÊ¼ MapReduce ½×¶Î¡£¸ù¾Ý²¢·¢Ïß³ÌÊýºÍÎļþ´óдÇл»ÄÚ´æÖеÄÎļþ£¬Ã¿¿éÇзֺóµÄÈÎÎñ¼Ç¼Ï¸ÃÈÎÎñÔÚÄÚ´æÖÐµÄÆ«ÒÆÎ»Öúͳ¤¶ÈÊÓΪ value, ÏÔ´æµÄÖ¸ÕëµØÖ·ÊÓΪ key£¬½«ÈÎÎñÌí¼ÓµÄÈÎÎñ³Ø¡£½«´¦ÀíºóµÄÄÚ´æÄÚÈݸ´ÖƵ½¸Õ¸ÕÉêÇëµÄÏÔ´æÖС£½Ó×Å¿ªÊ¼ Map Á÷³Ì£¬½«ÄÚ´æÖеÄÈÎÎñ³Ø¸´ÖƵ½ÏԴ棬ÉêÇëÏÔ´æ¿éÓÃÓÚ´æ·Å Map ²úÉúµÄÊý¾Ý£¬¿ªÆô¶àÏ̲߳¢·¢Ö´ÐÐÓû§¶¨ÒåµÄ map Á÷³Ì MAP_COUNT_FUNC£¬Õâ¸öÊÇ Mars ÓÉÓÚ GPU ³ÌÐòµÄÌØÊâÐÔ¶øÉè¼ÆµÄ£¬ÓÃÓڼǼ map ²úÉúµÄ key ºÍ value µÄ³¤¶È (sizeof)¡£µ÷Óà MAP_FUNC ·½·¨£¬ÊäÈëÈÎÎñ¼Ç¼£¬Êä³öµ¥´ÊÒÔ¼°µ¥´ÊËùÔÚµÄλÖã»

Èç¹û noSort ÊÇ F£¬¶Ô½á¹ûÅÅÐò£»

Èç¹û noReduce ÊÇ F£¬GPU ¿ªÊ¼ reduce ½×¶Î£¬Éú³É×îÖյĽá¹û¼¯¡£·ñÔò£¬Á¢¼´Êä³ö×îºóµÄ½á¹û¼¯£»

½á¹ûÊä³ö£¬´Ó GPU É豸¿½±´×îÖյĽá¹û¼¯µ½Äڴ棬ȻºóÊä³öµ½ÆÁÄ»¡£

ͨ¹ýÉÏÊöµÄ 7 ¸ö²½Ö裬WordCount µÄ¼ÆËã¹ý³ÌÈ«²¿Íê³É²¢ÇÒÊä³ö½á¹û¼¯¡£

Çåµ¥ 7 . Mars µÄ Map ³ÌÐò¶Î

#ifndef __MAP_CU__
#define __MAP_CU__
#include "MarsInc.h"
#include "global.h"
__device__ int hash_func(char* str, int len)
{
int hash, i;
for (i = 0, hash=len; i < len; i++)
hash = (hash<<4)^(hash>>28)^str[i];
return hash;
}
__device__ void MAP_COUNT_FUNC//(void *key, void *val, size_t keySize, size_t valSize)
{
WC_KEY_T* pKey = (WC_KEY_T*)key;
WC_VAL_T* pVal = (WC_VAL_T*)val;
char* ptrBuf = pKey->file + pVal->line_offset;
int line_size = pVal->line_size;
char* p = ptrBuf;
int lsize = 0;
int wsize = 0;
char* start = ptrBuf;
while(1)
{
for (; *p >= 'A' && *p <= 'Z'; p++, lsize++);
*p = '\0';
++p;
++lsize;
wsize = (int)(p - start);
if (wsize > 6)
{
//printf("%s, wsize:%d\n", start, wsize);
EMIT_INTER_COUNT_FUNC(wsize, sizeof(int));
}
for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++);
if (lsize >= line_size) break;
start = p;
}
}
__device__ void MAP_FUNC//(void *key, void val, size_t keySize, size_t valSize)
{
WC_KEY_T* pKey = (WC_KEY_T*)key;
WC_VAL_T* pVal = (WC_VAL_T*)val;
char* filebuf = pKey->file;
char* ptrBuf = filebuf + pVal->line_offset;
int line_size = pVal->line_size;
char* p = ptrBuf;
char* start = ptrBuf;
int lsize = 0;
int wsize = 0;
while(1)
{
for (; *p >= 'A' && *p <= 'Z'; p++, lsize++);
*p = '\0';
++p;
++lsize;
wsize = (int)(p - start);
int* o_val = (int*)GET_OUTPUT_BUF(0);
*o_val = wsize;
if (wsize > 6)
{
//printf("%s, %d\n", start, wsize);
EMIT_INTERMEDIATE_FUNC(start, o_val, wsize, sizeof(int));
}
for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++);
if (lsize >= line_size) break;
start = p;
}
}
#endif //__MAP_CU__

Çåµ¥ 8 . Mars µÄ Reduce ³ÌÐò¶Î

#ifndef __REDUCE_CU__
#define __REDUCE_CU__
#include "MarsInc.h"
__device__ void REDUCE_COUNT_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
{
}
__device__ void REDUCE_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
{
}
#endif //__REDUCE_CU__

ÎåÖÖ¿ò¼Ü WordCount ʵÑéÐÔÄܶԱÈ

ͼ 9 . ʵÑéÔËÐÐʱ¼ä±È½Ïͼ

ͼ 9 ʵÑéÔËÐÐʱ¼ä±È½ÏͼÊÇ·ÖÎö²»Í¬´óСµÄÎı¾ÎļþËùÏûºÄµÄʱ¼ä¶Ô±Èͼ¡£´ÓÉÏͼ¿ÉÒÔ¿´³ö£¬Hadoop MapReduce µÄÔËÐÐʱ¼ä×£¬Ô­ÒòÊÇ Hadoop Éú̬»·¾³°üº¬ÄÚÈݹý¶à£¬ËùÒÔÿ´ÎÈÎÎñÆô¶¯Ê±Ê×ÏÈÐèÒª¼ÓÔØËùÐè×ÊÔ´°ü£¬È»ºó»ºÂýµØ·¢ÆðÈÎÎñ£¬²¢ÇÒÓÉÓÚ±¾ÉíÊÇÓÃÐÔÄܽϲîµÄ Java ÓïÑÔ±àдµÄ£¬ËùÒÔµ¼ÖÂÕûÌ弯Ëãʱ¼ä³¤¡¢ÐÔÄܲPhoenix ÓÉÓÚ²ÉÓûã±àºÍ C ÓïÑÔ±àд£¬Äں˺ÜС£¬ÔËÐÐʱËùÓÃ×ÊÔ´ºÜÉÙ£¬ËùÒÔÕû¸ö²âÊÔ¹ý³ÌºÄʱҲ½ÏÉÙ¡£Spark ¿ò¼ÜÔÚ WordCount ʵÑéÖÐÏûºÄµÄʱ³¤½Ï Disco ÉÔÉÙ£¬µ«ÊÇ±È Phoenix¡¢Mars ºÄʱ̫¶à¡£ºÄʱ×î¶ÌµÄÁ½¸ö¿ò¼ÜÊÇ Mars ºÍ Phoenix¡£ÐèҪʱ³¤´Ó¸ßµ½µÍ·Ö±ðÊÇ Hadoop MapReduce¡¢Disco¡¢Spark¡¢Phoenix¡¢Mars¡£

ͼ 10 .CPU ʹÓÃÂʶԱÈͼ

ͼ 10-CPU ʹÓÃÂʱȽÏͼÊÇ·ÖÎöÈÎÎñÖ´Ðйý³Ìµ±ÖÐ CPU ʹÓÃÂÊÇé¿öͼ¡£´ÓÉÏͼ¿ÉÒÔ¿´³ö£¬Hadoop MapReduce¡¢Disco ÕâÁ½¸ö¿ò¼ÜÐèÒªÕ¼ÓÃµÄ CPU ×ÊÔ´ÔÚ 1000M Îı¾´¦Àíʱ»ù±¾µ½´ï×î´ó±¥ºÍ¶È (´óÓÚ 90%)£¬Apache Spark µÄ CPU ʹÓÃÂÊûÓÐÍêÈ«°éËæ×ÅÎı¾ÎļþÔö´ó¶ø´ó·ùÉÏÕÇ£¬Phoenix ºÍ Mars »ù±¾¿ØÖÆÔÚ¶Ô CPU ʹÓÃÂʽϵ͵ķ¶Î§ÄÚ¡£

ͼ 11 . ÄÚ´æÊ¹ÓÃÂʶԱÈͼ

ͼ 11 ÄÚ´æÊ¹ÓÃÂʱȽÏͼÊÇ·ÖÎöÈÎÎñÖ´Ðйý³ÌÖÐÄÚ´æÊ¹ÓÃÇé¿ö¶Ô±È¡£´ÓͼÖпÉÒÔ¿´³ö£¬Mars ºÍ Phoenix ÕâÁ½¿î¿ò¼ÜËùʹÓõÄÄÚ´æÔÚÎı¾Êý¾Ý½ÏСʱÊÇ×îÉٵģ¬Ëæ×ÅÎı¾Êý¾ÝµÄÔö´ó£¬Apache Spark Ëæ×ÅÊý¾ÝÁ¿Ôö´ó¶øÄÚ´æ´ó·ùÔö¼Ó£¬Mars ºÍ Phoenix ÓÐÒ»¶¨·ù¶ÈµÄÄÚ´æÊ¹ÓÃÔö¼ÓÇ÷ÊÆ¡£µ±Êý¾ÝÁ¿´ïµ½±¾´Î²âÊÔ×î´óµÄ 1000M Îı¾Ê±£¬Spark ¿ò¼Ü¶ÔÄÚ´æµÄÏûºÄÊÇ×îСµÄ£¬Hadoop MapReduce ºÍ Disco ÐèÒªÕ¼Óý϶àµÄÄÚ´æ¡£

´ÓÉÏÃæµÄ²âÊÔ½á¹ûÎÒÃǵóö£¬Èç¹ûÓû§Ö»ÐèÒª´¦Àíº£Á¿µÄÎı¾Îļþ£¬²»ÐèÒª¿¼ÂÇ´æ´¢¡¢¶þ´ÎÊý¾ÝÍÚ¾òµÈ£¬²ÉÓà Phoenix »òÕß Mars ÊÇ×î´óÐԼ۱ȵÄÑ¡Ôñ£¬µ«ÊÇÓÉÓÚ Mars ±ØÐëÔÚ GPU ÉÏÔËÐУ¬±¾Éí GPU ÓÉÓÚ¼Û¸ñÒòËØ£¬µ¼Ö²»Ì«¿ÉÄÜÔÚʵ¼ÊÓ¦Óó¡¾°ÀïÍÆ¹ã£¬ËùÒÔ×ÛºÏÀ´¿´ Phoenix ÊÇÐÔ¼Û±È×î¸ßµÄ¿ò¼Ü¡£Èç¹ûÓ¦ÓóÌÐòÐèÒª´¦ÀíµÄÊý¾ÝÁ¿·Ç³£´ó£¬²¢ÇÒ¿Í»§Ï£Íû¼ÆËã³öµÄÊý¾Ý¿ÉÒÔ±»´æ´¢ºÍ¶þ´Î¼ÆËã»òÊý¾ÝÍÚ¾ò£¬ÄÇ Hadoop MapReduce ½ÏºÃ£¬ÒòΪÕû¸ö Hadoop Éú̬ȦÅÓ´ó£¬Ö§³ÖÐԺܺá£Apache Spark ÓÉÓڼܹ¹²ãÃæÉè¼Æ²»Í¬£¬ËùÒÔ¶ÔÓÚ CPU¡¢ÄÚ´æµÄʹÓÃÂÊÒ»Ö±±£³Ö½ÏµÍ״̬£¬ËüδÀ´¿ÉÒÔÓÃÓÚº£Á¿Êý¾Ý·ÖÎöÓÃ;¡£

½áÊøÓï

ÏÖʵÊÀ½çºÜ¶àʵÀý¶¼¿ÉÓà MapReduce ±à³ÌÄ£ÐÍÀ´±íʾ£¬MapReduce ×÷Ϊһ¸öͨÓÿÉÀ©Õ¹µÄ¡¢¸ßÈÝ´íÐԵIJ¢Ðд¦ÀíÄ£ÐÍ£¬¿ÉÓÐЧµØ´¦Àíº£Á¿Êý¾Ý£¬²»¶ÏµØ´ÓÖзÖÎöÍÚ¾ò³öÓмÛÖµµÄÐÅÏ¢¡£MapReduce ·â×°Á˲¢Ðд¦Àí¡¢¸ºÔؾùºâ¡¢ÈÝ´í¡¢Êý¾Ý±¾µØ»¯µÈ¼¼ÊõÄѵãϸ½Ú¡£Í¨¹ý±¾ÎIJâÊÔÓÃÀý¿ÉÒÔÖ¤Ã÷ MapReduce ÊÊÓÃÓÚº£Á¿Îı¾·ÖÎöµÄÓ¦Óó¡¾°£¬¿ÉÒÔΪ´¦Àí´óÊý¾ÝÌṩ¼¼ÊõÖ§³Å¡£

   
2335 ´Îä¯ÀÀ       30
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ

²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí