µ±Ê¹ÓÃ
Hadoop ¼¼Êõ¼Ü¹¹¼¯Èº£¬¼¯ÈºÄÚÐÂÔö¡¢É¾³ý½Úµã£¬»òÕßij¸ö½Úµã»úÆ÷ÄÚÓ²ÅÌ´æ´¢´ïµ½±¥ºÍֵʱ£¬¶¼»áÔì³É¼¯ÈºÄÚÊý¾Ý·Ö²¼²»¾ùÔÈ¡¢Êý¾Ý¶ªÊ§·çÏÕÔö¼ÓµÈÎÊÌâ³öÏÖ¡£±¾ÎĶÔ
HDFS ÄÚ²¿µÄÊý¾Ýƽºâ·½Ê½×öÁ˽éÉÜ£¬Í¨¹ýʵÑé°¸ÀýµÄ·½Ê½Ïò¶ÁÕß½âÊÍÄÚ²¿Êý¾ÝƽºâµÄ½â¾ö°ì·¨¡£
²¢ÐмÆËãÄ£ÐͺͿò¼Ü
Ŀǰ¿ªÔ´ÉçÇøÓÐÐí¶à²¢ÐмÆËãÄ£ÐͺͿò¼Ü¿É¹©Ñ¡Ôñ£¬°´ÕÕʵÏÖ·½Ê½¡¢ÔËÐлúÖÆ¡¢ÒÀ¸½µÄ²úÆ·Éú̬ȦµÈ¿ÉÒÔ±»»®·ÖΪ¼¸¸öÀàÐÍ£¬Ã¿¸öÀàÐ͸÷ÓÐÓÅȱµã£¬Èç¹ûÄܹ»¶Ô¸÷ÀàÐ͵IJ¢ÐмÆËã¿ò¼Ü¶¼½øÐÐÉîÈëÑо¿¼°Êʵ±µÄȱµãÐÞ¸´£¬¾Í¿ÉÒÔΪ²»Í¬Ó²¼þ»·¾³Ïµĺ£Á¿Êý¾Ý·ÖÎöÐèÇóÌṩ²»Í¬µÄÈí¼þ²ãÃæµÄ½â¾ö·½°¸¡£
²¢ÐмÆËã¿ò¼Ü
²¢ÐмÆËã»ò³ÆÆ½ÐмÆËãÊÇÏà¶ÔÓÚ´®ÐмÆËãÀ´ËµµÄ¡£ËüÊÇÒ»ÖÖÒ»´Î¿ÉÖ´Ðжà¸öÖ¸ÁîµÄËã·¨£¬Ä¿µÄÊÇÌá¸ß¼ÆËãËÙ¶È£¬ÒÔ¼°Í¨¹ýÀ©´óÎÊÌâÇó½â¹æÄ££¬½â¾ö´óÐͶø¸´ÔӵļÆËãÎÊÌâ¡£Ëùν²¢ÐмÆËã¿É·ÖΪʱ¼äÉϵIJ¢ÐкͿռäÉϵIJ¢ÐС£Ê±¼äÉϵIJ¢ÐоÍÊÇÖ¸Á÷Ë®Ïß¼¼Êõ£¬¶ø¿Õ¼äÉϵIJ¢ÐÐÔòÊÇÖ¸Óöà¸ö´¦ÀíÆ÷²¢·¢µÄÖ´ÐмÆËã¡£²¢ÐмÆË㣨Parallel
Computing£©ÊÇָͬʱʹÓöàÖÖ¼ÆËã×ÊÔ´½â¾ö¼ÆËãÎÊÌâµÄ¹ý³Ì£¬ÊÇÌá¸ß¼ÆËã»úϵͳ¼ÆËãËٶȺʹ¦ÀíÄÜÁ¦µÄÒ»ÖÖÓÐЧÊֶΡ£ËüµÄ»ù±¾Ë¼ÏëÊÇÓöà¸ö´¦ÀíÆ÷À´ÐͬÇó½âͬһÎÊÌ⣬¼´½«±»Çó½âµÄÎÊÌâ·Ö½â³ÉÈô¸É¸ö²¿·Ö£¬¸÷²¿·Ö¾ùÓÉÒ»¸ö¶ÀÁ¢µÄ´¦Àí»úÀ´²¢ÐмÆËã¡£²¢ÐмÆËãϵͳ¼È¿ÉÒÔÊÇרÃÅÉè¼ÆµÄ¡¢º¬Óжà¸ö´¦ÀíÆ÷µÄ³¬¼¶¼ÆËã»ú£¬Ò²¿ÉÒÔÊÇÒÔijÖÖ·½Ê½»¥Á¬µÄÈô¸Ę́µÄ¶ÀÁ¢¼ÆËã»ú¹¹³ÉµÄ¼¯Èº¡£Í¨¹ý²¢ÐмÆË㼯ȺÍê³ÉÊý¾ÝµÄ´¦Àí£¬ÔÙ½«´¦ÀíµÄ½á¹û·µ»Ø¸øÓû§¡£
¹úÄÚÍâÑо¿
Å·ÃÀ·¢´ï¹ú¼Ò¶ÔÓÚ²¢ÐмÆËã¼¼ÊõµÄÑо¿ÒªÔ¶Ô¶ÔçÓÚÎÒ¹ú£¬´Ó×î³õµÄ²¢ÐмÆËãÖð½¥¹ý¶Éµ½Íø¸ñ¼ÆËã£¬Ëæ×Å
Internet ÍøÂç×ÊÔ´µÄѸËÙÅòÕÍ£¬ÒòÌØÍøÈÝÄÉÁ˺£Á¿µÄ¸÷ÖÖÀàÐ͵ÄÊý¾ÝºÍÐÅÏ¢¡£º£Á¿Êý¾ÝµÄ´¦Àí¶Ô·þÎñÆ÷ CPU¡¢IO
µÄÍÌͶ¼ÊÇÑϾþµÄ¿¼Ñ飬²»ÂÛÊÇ´¦ÀíËÙ¶È¡¢´æ´¢¿Õ¼ä¡¢ÈÝ´íÐÔ£¬»¹ÊÇÔÚ·ÃÎÊËٶȵȷ½Ã棬´«Í³µÄ¼¼Êõ¼Ü¹¹ºÍ½ö¿¿µ¥Ì¨¼ÆËã»ú»ùÓÚ´®Ðеķ½Ê½Ô½À´Ô½²»ÊÊÓ¦µ±Ç°º£Á¿Êý¾Ý´¦ÀíµÄÒªÇó¡£¹úÄÚÍâѧÕßÌá³öºÜ¶àº£Á¿Êý¾Ý´¦Àí·½·¨£¬ÒÔ¸ÄÉÆº£Á¿Êý¾Ý´¦Àí´æÔÚµÄÖî¶àÎÊÌâ¡£
ĿǰÒÑÓеĺ£Á¿Êý¾Ý´¦Àí·½·¨ÔÚ¸ÅÄîÉϽÏÈÝÒ×Àí½â£¬È»¶øÓÉÓÚÊý¾ÝÁ¿¾Þ´ó£¬ÒªÔڿɽÓÊܵÄʱ¼äÄÚÍê³ÉÏàÓ¦µÄ´¦Àí£¬Ö»Óн«ÕâЩ¼ÆËã½øÐв¢Ðл¯´¦Àí£¬Í¨¹ýÌáÈ¡³ö´¦Àí¹ý³ÌÖдæÔڵĿɲ¢Ðй¤×÷µÄ·ÖÁ¿£¬Ó÷ֲ¼Ê½Ä£ÐÍÀ´ÊµÏÖÕâЩ²¢ÐзÖÁ¿µÄ²¢ÐÐÖ´Ðйý³Ì¡£Ëæ×ż¼ÊõµÄ·¢Õ¹£¬µ¥»úµÄÐÔÄÜÓÐÁËÍ»·ÉÃͽøµÄ·¢Õ¹±ä»¯£¬ÓÈÆäÊÇÄÚ´æºÍ´¦ÀíÆ÷µÈÓ²¼þ¼¼Êõ£¬µ«ÊÇÓ²¼þ¼¼ÊõµÄ·¢Õ¹ÔÚÀíÂÛÉÏ×ÜÊÇÓÐÏ޶ȵģ¬Èç¹û˵Ӳ¼þµÄ·¢Õ¹ÔÚ×ÝÏòÉÏÌá¸ßÁËϵͳµÄÐÔÄÜ£¬ÄÇô²¢Ðм¼ÊõµÄ·¢Õ¹¾ÍÊÇ´ÓºáÏòÉÏÍØÕ¹ÁË´¦ÀíµÄ·½Ê½¡£
2003 ÄêÃÀ¹ú Google ¹«Ë¾¶ÔÍâ·¢²¼ÁË MapReduce¡¢GFS¡¢BigData
ÈýƪÂÛÎÄ£¬ÖÁ´ËÕýʽ½«²¢ÐмÆËã¿ò¼ÜÂäµØÎª MapReduce ¿ò¼Ü¡£
ÎÒ¹úµÄ²¢Ðкͷֲ¼Ê½¼ÆËã¼¼ÊõÑо¿ÆðÔ´ÓÚ 60 Äê´úÄ©£¬°´ÕÕ¹ú·À¿Æ¼¼´óѧÖÜÐËÃúԺʿÌá³öµÄ¹Ûµã£¬µ½Ä¿Ç°ÎªÖ¹ÒѾÈý¸ö½×¶ÎÁË¡£µÚÒ»½×¶Î£¬×Ô
60 Äê´úÄ©ÖÁ 70 Äê´úÄ©£¬Ö÷Òª´ÓÊ´óÐÍ»úÄڵIJ¢Ðд¦Àí¼¼ÊõÑо¿£»µÚ¶þ½×¶Î£¬×Ô 70 Äê´úÄ©ÖÁ 90 Äê´ú³õ£¬Ö÷Òª´ÓÊÂÏòÁ¿»úºÍ²¢ÐжദÀíÆ÷ϵͳÑо¿£»µÚÈý½×¶Î£¬×Ô
80 Äê´úÄ©ÖÁ½ñ£¬Ö÷Òª´ÓÊ MPP(Massively Parallel Processor) ϵͳÑо¿¡£
¾¡¹ÜÎÒ¹úÔÚ²¢ÐмÆËã·½Ãæ¿ªÕ¹µÄÑо¿ºÍÓ¦ÓýÏÔ磬ĿǰҲӵÓкܶàµÄ²¢ÐмÆËã×ÊÔ´£¬µ«Ñо¿ºÍÓ¦ÓõijÉЧÏà¶ÔÃÀ¹ú»¹´æÔڽϴóµÄ²î¾à£¬Óдý½øÒ»²½µÄÌá¸ßºÍ·¢Õ¹¡£
MapReduce
MapReduce ÊÇÓɹȸèÍÆ³öµÄÒ»¸ö±à³ÌÄ£ÐÍ£¬ÊÇÒ»¸öÄÜ´¦ÀíºÍÉú³É³¬´óÊý¾Ý¼¯µÄË㷨ģÐÍ£¬¸Ã¼Ü¹¹Äܹ»ÔÚ´óÁ¿ÆÕͨÅäÖõļÆËã»úÉÏʵÏÖ²¢Ðл¯´¦Àí¡£MapReduce
±à³ÌÄ£ÐͽáºÏÓû§ÊµÏÖµÄ Map ºÍ Reduce º¯Êý¡£Óû§×Ô¶¨ÒåµÄ Map º¯Êý´¦ÀíÒ»¸öÊäÈëµÄ»ùÓÚ key/value
pair µÄ¼¯ºÏ£¬Êä³öÖмä»ùÓÚ key/value pair µÄ¼¯ºÏ£¬MapReduce ¿â°ÑÖмäËùÓоßÓÐÏàͬ
key ÖµµÄ value Öµ¼¯ºÏÔÚÒ»Æðºó´«µÝ¸ø Reduce º¯Êý£¬Óû§×Ô¶¨ÒåµÄ Reduce º¯ÊýºÏ²¢ËùÓоßÓÐÏàͬ
key ÖµµÄ value Öµ£¬ÐγÉÒ»¸ö½ÏС value ÖµµÄ¼¯ºÏ¡£Ò»°ãµØ£¬Ò»¸öµäÐ굀 MapReduce
³ÌÐòµÄÖ´ÐÐÁ÷³ÌÈçͼ 1 Ëùʾ¡£

ͼ 1 .MapReduce ³ÌÐòÖ´ÐÐÁ÷³Ìͼ
MapReduce Ö´Ðйý³ÌÖ÷Òª°üÀ¨£º
½«ÊäÈëµÄº£Á¿Êý¾ÝÇÐÆ¬·Ö¸ø²»Í¬µÄ»úÆ÷´¦Àí£»
Ö´ÐÐ Map ÈÎÎñµÄ Worker ½«ÊäÈëÊý¾Ý½âÎö³É key/value
pair£¬Óû§¶¨ÒåµÄ Map º¯Êý°ÑÊäÈëµÄ key/value pair ת³ÉÖмäÐÎʽµÄ key/value
pair£»
°´ÕÕ key Öµ¶ÔÖмäÐÎʽµÄ key/value ½øÐÐÅÅÐò¡¢¾ÛºÏ£»
°Ñ²»Í¬µÄ key ÖµºÍÏàÓ¦µÄ value ¼¯·ÖÅ䏸²»Í¬µÄ»úÆ÷£¬Íê³É Reduce
ÔËË㣻
Êä³ö Reduce ½á¹û¡£
ÈÎÎñ³É¹¦Íê³Éºó£¬MapReduce µÄÊä³ö´æ·ÅÔÚ R ¸öÊä³öÎļþÖУ¬Ò»°ãÇé¿öÏ£¬Õâ
R ¸öÊä³öÎļþ²»ÐèÒªºÏ²¢³ÉÒ»¸öÎļþ£¬¶øÊÇ×÷ΪÁíÍâÒ»¸ö MapReduce µÄÊäÈ룬»òÕßÔÚÁíÒ»¸ö¿É´¦Àí¶à¸ö·Ö¸îÎļþµÄ·Ö²¼Ê½Ó¦ÓÃÖÐʹÓá£
ÊÜ Google MapReduce Æô·¢£¬Ðí¶àÑо¿ÕßÔÚ²»Í¬µÄʵÑéÆ½Ì¨ÉÏʵÏÖÁË
MapReduce ¿ò¼Ü£¬±¾ÎĽ«¶Ô Apache Hadoop MapReduce¡¢Apache¡¢Spark¡¢Ë¹Ì¹¸£´óѧµÄ
Phoenix£¬Nokia Ñз¢µÄ Disco£¬ÒÔ¼°Ïã¸Û¿Æ¼¼´óѧµÄ Mars µÈ 5 ¸ö MapReduce
ʵÏÖ¿ò¼Ü½øÐÐÖðÒ»½éÉܺ͸÷·½Ãæ¶Ô±È¡£
Hadoop MapReduce
Hadoop µÄÉè¼ÆË¼Â·À´Ô´ÓÚ Google µÄ GFS ºÍ MapReduce¡£ËüÊÇÒ»¸ö¿ªÔ´Èí¼þ¿ò¼Ü£¬Í¨¹ýÔÚ¼¯Èº¼ÆËã»úÖÐʹÓüòµ¥µÄ±à³ÌÄ£ÐÍ£¬¿É±àдºÍÔËÐзֲ¼Ê½Ó¦ÓóÌÐò´¦Àí´ó¹æÄ£Êý¾Ý¡£Hadoop
°üº¬Èý¸ö×ÓÏîÄ¿£ºHadoop Common¡¢Hadoop Distributed File System(HDFS)
ºÍ Hadoop MapReduce¡£
µÚÒ»´ú Hadoop MapReduce ÊÇÒ»¸öÔÚ¼ÆËã»ú¼¯ÈºÉÏ·Ö²¼Ê½´¦Àíº£Á¿Êý¾Ý¼¯µÄÈí¼þ¿ò¼Ü£¬°üÀ¨Ò»¸ö
JobTracker ºÍÒ»¶¨ÊýÁ¿µÄ TaskTracker¡£ÔËÐÐÁ÷³ÌͼÈçͼ 2 Ëùʾ¡£

ͼ 2 .Hadoop MapReduce
ϵͳ¼Ü¹¹Í¼
ÔÚ×îÉϲãÓÐ 4 ¸ö¶ÀÁ¢µÄʵÌ壬¼´¿Í»§¶Ë¡¢JobTracker¡¢TaskTracker
ºÍ·Ö²¼Ê½Îļþϵͳ¡£¿Í»§¶ËÌá½» MapReduce ×÷Òµ£»JobTracker е÷×÷ÒµµÄÔËÐУ»JobTracker
ÊÇÒ»¸ö Java Ó¦ÓóÌÐò£¬ËüµÄÖ÷ÀàÊÇ JobTracker£»TaskTracker ÔËÐÐ×÷Òµ»®·ÖºóµÄÈÎÎñ£¬TaskTracker
Ò²ÊÇÒ»¸ö Java Ó¦ÓóÌÐò£¬ËüµÄÖ÷ÀàÊÇ TaskTracker¡£Hadoop ÔËÐÐ MapReduce
×÷ÒµµÄ²½ÖèÖ÷Òª°üÀ¨Ìá½»×÷Òµ¡¢³õʼ»¯×÷Òµ¡¢·ÖÅäÈÎÎñ¡¢Ö´ÐÐÈÎÎñ¡¢¸üнø¶ÈºÍ״̬¡¢Íê³É×÷ÒµµÈ 6 ¸ö²½Öè¡£
Spark MapReduce
Spark ÊÇÒ»¸ö»ùÓÚÄÚ´æ¼ÆËãµÄ¿ªÔ´µÄ¼¯Èº¼ÆËãϵͳ£¬Ä¿µÄÊÇÈÃÊý¾Ý·ÖÎö¸ü¼Ó¿ìËÙ¡£Spark
·Ç³£Ð¡ÇÉÁáç磬ÓɼÓÖݲ®¿ËÀû´óѧ AMP ʵÑéÊÒµÄ Matei ΪÖ÷µÄСÍŶÓËù¿ª·¢¡£Ê¹ÓõÄÓïÑÔÊÇ Scala£¬ÏîÄ¿µÄºËÐIJ¿·ÖµÄ´úÂëÖ»ÓÐ
63 ¸ö Scala Îļþ£¬·Ç³£¶ÌС¾«º·¡£Spark ÆôÓÃÁËÄÚ´æ·Ö²¼Êý¾Ý¼¯£¬³ýÁËÄܹ»Ìṩ½»»¥Ê½²éѯÍ⣬Ëü»¹¿ÉÒÔÓÅ»¯µü´ú¹¤×÷¸ºÔØ¡£Spark
ÌṩÁË»ùÓÚÄÚ´æµÄ¼ÆË㼯Ⱥ£¬ÔÚ·ÖÎöÊý¾Ýʱ½«Êý¾Ýµ¼ÈëÄÚ´æÒÔʵÏÖ¿ìËÙ²éѯ£¬¡°Ëٶȱȡ±»ùÓÚ´ÅÅ̵Äϵͳ£¬Èç±È Hadoop
¿ìºÜ¶à¡£Spark ×î³õÊÇΪÁË´¦Àíµü´úËã·¨£¬Èç»úÆ÷ѧϰ¡¢Í¼ÍÚ¾òËã·¨µÈ£¬ÒÔ¼°½»»¥Ê½Êý¾ÝÍÚ¾òËã·¨¶ø¿ª·¢µÄ¡£ÔÚÕâÁ½ÖÖ³¡¾°Ï£¬Spark
µÄÔËÐÐËÙ¶È¿ÉÒÔ´ïµ½ Hadoop µÄ¼¸°Ù±¶¡£
Disco
Disco ÊÇÓÉ Nokia Ñо¿ÖÐÐÄ¿ª·¢µÄ£¬»ùÓÚ MapReduce
µÄ·Ö²¼Ê½Êý¾Ý´¦Àí¿ò¼Ü£¬ºËÐIJ¿·ÖÓÉ Erlang ÓïÑÔ¿ª·¢£¬Íⲿ±à³Ì½Ó¿ÚΪ Python ÓïÑÔ¡£Disco
ÊÇÒ»¸ö¿ª·ÅÔ´´úÂëµÄ´ó¹æÄ£Êý¾Ý·ÖÎöƽ̨£¬Ö§³Ö´óÊý¾Ý¼¯µÄ²¢ÐмÆË㣬ÄÜÔËÐÐÔÚ²»¿É¿¿µÄ¼¯Èº¼ÆËã»úÉÏ¡£Disco
¿É²¿ÊðÔÚ¼¯ÈººÍ¶àºË¼ÆËã»úÉÏ£¬»¹¿É²¿ÊðÔÚ Amazon EC2 ÉÏ¡£Disco »ùÓÚÖ÷/´Ó¼Ü¹¹ (Master/Slave)£¬Í¼
3 ×ÜÌåÉè¼Æ¼Ü¹¹Í¼Õ¹Ê¾ÁËͨ¹ýһ̨Ö÷½Úµã (Master) ·þÎñÆ÷¿ØÖƶą̀´Ó½Úµã (Slave) ·þÎñÆ÷µÄ×ÜÌåÉè¼Æ¼Ü¹¹¡£

ͼ 3 .Disco ×ÜÌå¼Ü¹¹Í¼
Disco ÔËÐÐ MapReduce ²½ÖèÈçÏ£º
Disco Óû§Ê¹Óà Python ½Å±¾¿ªÊ¼ Disco ×÷Òµ£»
×÷ÒµÇëÇóͨ¹ý HTTP ·¢Ë͵½Ö÷»ú£»
Ö÷»úÊÇÒ»¸ö Erlang ½ø³Ì£¬Í¨¹ý HTTP ½ÓÊÕ×÷ÒµÇëÇó£»
Ö÷»úͨ¹ý SSH Æô¶¯Ã¿¸ö½Úµã´¦µÄ´Ó»ú£»
´Ó»úÔÚ Worker ½ø³ÌÖÐÔËÐÐ Disco ÈÎÎñ¡£
Phoenix
Phoenix ×÷Ϊ˹̹¸£´óѧ EE382a ¿Î³ÌµÄÒ»ÀàÏîÄ¿£¬ÓÉ˹̹¸£´óѧ¼ÆËã»úϵͳʵÑéÊÒ¿ª·¢¡£Phoenix
¶Ô MapReduce µÄʵÏÖÔÔòºÍ×î³õÓÉ Google ʵÏÖµÄ MapReduce »ù±¾Ïàͬ¡£²»Í¬µÄÊÇ£¬ËüÔÚ¼¯ÈºÖÐÒÔʵÏÖ¹²ÏíÄÚ´æÏµÍ³ÎªÄ¿µÄ£¬¹²ÏíÄÚ´æÄÜ×îС»¯ÓÉÈÎÎñÅÉÉúºÍÊý¾Ý¼äµÄͨÐÅËùÔì³ÉµÄ¼ä½Ó³É±¾¡£Phoenix
¿É±à³Ì¶àºËоƬ»ò¹²ÏíÄÚ´æ¶àºË´¦ÀíÆ÷ (SMPs ºÍ ccNUMAs)£¬ÓÃÓÚÊý¾ÝÃܼ¯ÐÍÈÎÎñ´¦Àí¡£
Mars
Mars ÊÇÏã¸Û¿Æ¼¼´óѧÓë΢Èí¡¢ÐÂÀ˺Ï×÷¿ª·¢µÄ»ùÓÚ GPU µÄ MapReduce
¿ò¼Ü¡£Ä¿Ç°ÒѾ°üº¬×Ö·û´®Æ¥Åä¡¢¾ØÕó³Ë·¨¡¢µ¹ÅÅË÷Òý¡¢×Ö´Êͳ¼Æ¡¢ÍøÒ³·ÃÎÊÅÅÃû¡¢ÍøÒ³·ÃÎʼÆÊý¡¢ÏàËÆÐÔÆÀ¹ÀºÍ K
¾ùÖµµÈ 8 ÏîÓ¦Óã¬Äܹ»ÔÚ 32 λÓë 64 λµÄ Linux ƽ̨ÉÏÔËÐС£Mars ¿ò¼ÜʵÏÖ·½Ê½ºÍ»ùÓÚ
CPU µÄ MapReduce ¿ò¼Ü·Ç³£ÀàËÆ£¬Ò²ÓÉ Map ºÍ Reduce Á½¸ö½×¶Î×é³É£¬ËüµÄ»ù±¾¹¤×÷Á÷³ÌͼÈçͼ
4 Ëùʾ¡£

ͼ 4 .Mars »ù±¾¹¤×÷Á÷³Ìͼ
ÔÚ¿ªÊ¼Ã¿¸ö½×¶Î֮ǰ£¬Mars ³õʼ»¯Ïß³ÌÅäÖ㬰üÀ¨ GPU ÉÏÏß³Ì×éµÄÊýÁ¿ºÍÿ¸öÏß³Ì×éÖÐÏ̵߳ÄÊýÁ¿¡£Mars
ÔÚ GPU ÄÚʹÓôóÁ¿µÄỊ̈߳¬ÔÚÔËÐÐʱ½×¶Î»á¾ùÔÈ·ÖÅäÈÎÎñ¸øỊ̈߳¬Ã¿¸öÏ̸߳ºÔðÒ»¸ö Map »ò Reduce
ÈÎÎñ£¬ÒÔСÊýÁ¿µÄ key/value ¶Ô×÷ΪÊäÈ룬²¢Í¨¹ýÒ»ÖÖÎÞËøµÄ·½°¸À´¹ÜÀí MapReduce ¿ò¼ÜÖеIJ¢·¢Ð´Èë¡£
Mars µÄ¹¤×÷Á÷³ÌÖ÷ÒªÓÐ 7 ¸ö²Ù×÷²½Ö裺
ÔÚÖ÷´æ´¢Æ÷ÖÐÊäÈë key/value ¶Ô£¬²¢½«ËüÃÇ´æ´¢µ½Êý×飻
³õʼ»¯ÔËÐÐʱµÄÅäÖòÎÊý£»
¸´ÖÆÖ÷´æ´¢Æ÷ÖеÄÊäÈëÊý×éµ½ GPU É豸Äڴ棻
Æô¶¯ GPU É쵀 Map ½×¶Î£¬²¢½«ÖмäµÄ key/value ¶Ô´æ´¢µ½Êý×飻
Èç¹û mSort Ñ¡Ôñ F£¬¼´ÐèÒªÅÅÐò½×¶Î£¬Ôò¶ÔÖмä½á¹û½øÐÐÅÅÐò£»
Èç¹û noReduce ÊÇ F£¬¼´ÐèÒª Reduce ½×¶Î£¬ÔòÆô¶¯ GPU
É쵀 Reduce ½×¶Î£¬²¢Êä³ö×îÖÕ½á¹û£¬·ñÔòÖмä½á¹û¾ÍÊÇ×îÖÕ½á¹û£»
¸´ÖÆ GPU É豸´æ´¢Æ÷ÖеĽá¹ûµ½Ö÷´æ´¢Æ÷¡£
ÉÏÊö²½ÖèµÄ 1£¬2£¬3£¬7 ÕâËĸö²½ÖèµÄ²Ù×÷Óɵ÷¶ÈÆ÷À´Íê³É£¬µ÷¶ÈÆ÷¸ºÔð×¼±¸Êý¾ÝÊäÈ룬ÔÚ
GPU Éϵ÷Óà Map ºÍ Reduce ½×¶Î£¬²¢½«½á¹û·µ»Ø¸øÓû§¡£
ÎåÖÖ¿ò¼ÜµÄÓÅȱµã±È½Ï
±í 1. ÎåÖÖ¿ò¼ÜÓÅȱµã±È½Ï

WordCount ʵÑé
»ù±¾ÔÀí
µ¥´Ê¼ÆÊý (WordCount) ÊÇ×î¼òµ¥Ò²ÊÇ×îÄÜÌåÏÖ MapReduce
˼ÏëµÄ³ÌÐòÖ®Ò»£¬¿ÉÒÔ³ÆÎª MapReduce °æ"Hello World"¡£µ¥´Ê¼ÆÊýÖ÷ÒªÍê³É¹¦ÄÜÊÇ£ºÍ³¼ÆÒ»ÏµÁÐÎı¾ÎļþÖÐÿ¸öµ¥´Ê³öÏֵĴÎÊý¡£
±¾´ÎʵÑé²½Öè
±¾´ÎʵÑéµÄÓ²¼þ×ÊÔ´»ùÓÚ x86 ·þÎñÆ÷ 1 ̨£¬ÅäÖÃΪÄÚ´æ 32GB DDR3¡¢E5
CPU/12 ºË¡¢GPU£¬ÊµÑéÊý¾ÝÑù±¾Îª 10M/50M/100M/500M/1000M µÄÎı¾ÎļþÎå¸ö£¬ÎÒÃÇʹÓÃ
Hadoop MapReduce¡¢Spark¡¢Phoenix¡¢Disco¡¢Mars µÈ MapReduce
¿ò¼Ü·Ö±ðÔËÐÐÎı¾·ÖÎö³ÌÐò£¬»ùÓÚ½á¹ûÒ»ÖµÄǰÌáÏÂͳ¼Æ³öÔËÐÐʱ¼ä¡¢ÔËÐÐʱ CPU Õ¼ÓÐÂÊ¡¢ÔËÐÐʱÄÚ´æÕ¼ÓÐÂʵÈÊý¾Ý£¬²¢²ÉÓÃÕâЩÊý¾Ý»æÖƳÉÖù״ͼ¡£
Hadoop MapReduce
Ê×ÏÈÐèÒª½«Îļþ²ð·Ö³É splits£¬ÓÉÓÚ²âÊÔÓõÄÎļþ½ÏС£¬ËùÒÔÿ¸öÎļþΪһ¸ö
split£¬²¢½«Îļþ°´ÐзָîÐγÉ<key,value>¶Ô£¬Í¼ 12 ·Ö¸î¹ý³ÌͼËùʾ¡£ÕâÒ»²½ÓÉ
MapReduce ¿ò¼Ü×Ô¶¯Íê³É£¬ÆäÖÐÆ«ÒÆÁ¿£¨¼´ key Öµ£©°üÀ¨Á˻سµËùÕ¼µÄ×Ö·ûÊý£¨Windows ºÍ
Linux »·¾³»á²»Í¬£©¡£

ͼ 5 . ·Ö¸î¹ý³Ìͼ
½«·Ö¸îºÃµÄ<key,value>¶Ô½»¸øÓû§¶¨ÒåµÄ map ·½·¨½øÐд¦Àí£¬Éú³ÉеÄ<key,value>¶Ô£¬Í¼
6 Ö´ÐÐ map ·½·¨Ëùʾ¡£

ͼ 6 . Ö´ÐÐ Map ·½·¨¹ý³Ìͼ
µÃµ½ map ·½·¨Êä³öµÄ<key,value>¶Ôºó£¬Mapper
»á½«ËüÃǰ´ÕÕ key Öµ½øÐÐÅÅÐò£¬²¢Ö´ÐÐ Combine ¹ý³Ì£¬½« key ÏàͬµÄ value ÖµÀÛ¼Ó£¬µÃµ½
Mapper µÄ×îÖÕÊä³ö½á¹û¡£Í¼ 7Map ¶ËÅÅÐò¼° Combine ¹ý³ÌËùʾ¡£

ͼ 7 . Map ¶ËÅÅÐò¼° Combine
¹ý³Ì
Reducer ÏÈ¶Ô´Ó Mapper ½ÓÊÕµÄÊý¾Ý½øÐÐÅÅÐò£¬ÔÙ½»ÓÉÓû§×Ô¶¨ÒåµÄ
reduce ·½·¨½øÐд¦Àí£¬µÃµ½ÐµÄ<key,value>¶Ô£¬²¢×÷Ϊ WordCount µÄÊä³ö½á¹û£¬Í¼
15Reduce ¶ËÅÅÐò¼°Êä³ö½á¹ûËùʾ¡£

ͼ 8 . Reduce ¶ËÅÅÐò¼°Êä³ö½á¹ûÁ÷³Ìͼ
Çåµ¥ 1 . µÚÒ»´ú Hadoop MapReduce WordCount
ʾÀý´úÂë
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); // ¿ªÊ¼ Map ¹ý³Ì public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); //±éÀú Map ÀïÃæµÄ×Ö·û´® while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); //¿ªÊ¼ Reduce ¹ý³Ì public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
Spark WordCount ʵÑé
Spark Óë Hadoop MapReduce µÄ×î´óÇø±ðÊÇËü°ÑËùÓÐÊý¾Ý±£´æÔÚÄÚ´æÖУ¬Hadoop
MapReduce ÐèÒª´ÓÍⲿ´æ´¢½éÖÊÀï°ÑÊý¾Ý¶ÁÈëµ½Äڴ棬Spark ²»ÐèÒªÕâÒ»²½Öè¡£ËüµÄʵÏÖÔÀíÓë Hadoop
MapReduce ûÓÐÌ«´óÇø±ð£¬ÕâÀï²»ÔÙÖØ¸´ÔÀí£¬ÍêÕûµÄÔËÐдúÂëÈçÇåµ¥ 2 Ëùʾ¡£
Çåµ¥ 2 . Spark WordCount ʾÀý´úÂë
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0], Integer.parseInt(args[1])); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); //¶¨Òå RDD ones JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); //ones.reduceByKey(func, numPartitions) JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } },10); //Êä³ö List List<Tuple2<String, Integer>> output = counts.collect(); Collections.sort(output, new Comparator<Tuple2<String, Integer>>() { @Override public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { if(t1._2 > t2._2) { return -1; } else if(t1._2 < t2._2) { return 1; } return 0; } }); |
Disco WordCount ʵÑé
MapReduce ¿ò¼ÜÓÉÓÚ Disco Óзֲ¼Ê½Îļþϵͳ´æÔÚ£¬ËùÒÔÒ»°ãÇé¿ö϶¼²»»áµ¥¶ÀʹÓ㬶¼ÊÇ´Ó·Ö²¼Ê½ÎļþϵͳÄÚÈ¡Êý¾Ýºó¶ÁÈëÄڴ棬ȻºóÔÙÇзÖÊý¾Ý¡¢½øÈë
MapReduce ½×¶Î¡£Ê×ÏÈÐèÒªµ÷Óà ddfs µÄ chunk ÃüÁî°ÑÎļþÉÏ´«µ½ DDFS£¬È»ºó¿ªÊ¼±àд
MapReduce ³ÌÐò£¬Disco Íâ²ãÓ¦ÓóÌÐò²ÉÓà Python ±àд¡£Map ³ÌÐòʵÀýÈçÇåµ¥ 3
Ëùʾ£¬Reduce ³ÌÐòʾÀýÈçÇåµ¥ 4 Ëùʾ¡£
Çåµ¥ 3 . Map ³ÌÐò¶Î
def fun_map(line, params): for word in line.split(): yield word, 1 |
Çåµ¥ 4 . Reduce ³ÌÐò¶Î
def fun_reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts) |
Çåµ¥ 5 . Map/Reduce ÈÎÎñ
from disco.core import Job, result_iterator def map(line, params): for word in line.split(): yield word, 1 def reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts) if __name__ == '__main__': job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"], map=map, reduce=reduce) for word, count in result_iterator(job.wait(show=True)): print(word, count) Note |
Phoenix WordCount ʵÑé
Phoenix ÊÇ»ùÓÚ CPU µÄ MapReduce ¿ò¼Ü£¬ËùÒÔËüÒ²ÊDzÉÓý«Êý¾Ý·Ö¸îºó¶ÁÈëÄڴ棬Ȼºó¿ªÊ¼
MapReduce ´¦Àí½×¶ÎÕâÑùµÄ´«Í³·½Ê½¡£Phoenix ²¢²»ÓÉÓû§¾ö¶¨ÇзÖÿ¸ö Map ·ÖÅäµ½µÄÊý¾Ý¿éµÄ´óС£¬ËüÊǸù¾Ý¼¯ÈºÏµÍ³µÄʵ¼Ê
Cache ´óСÀ´Çзֵģ¬ÕâÑù¿ÉÒÔ±ÜÃâ³öÏÖ·ÖÅäµ½ Map µÄÊý¾Ý¿é¹ý´ó»òÕß¹ýСµÄÇé¿ö³öÏÖ¡£¹ý´óµÄÊý¾Ý¿ì»áµ¼ÖÂ
Map Ö´ÐнÏÂý£¬¹ýСµÄÊý¾Ý¿ì»áµ¼Ö Map ×ÊÔ´ÀË·Ñ£¬ÒòΪÿ´ÎÆô¶¯ Map Ï̶߳¼ÐèÒªÏûºÄÒ»¶¨µÄϵͳ×ÊÔ´¡£Map
½×¶ÎÇзֺõÄÎı¾±»¶à¸ö Map ²¢ÐÐÖ´ÐУ¬Phoenix Ö§³Ö 100 ¸ö×óÓÒµÄ Map ²¢ÐÐÖ´ÐУ¬Ò»¸ö¹¤×÷½ÚµãÏ¿ÉÒÔÓÐÈô¸É¸ö
Map ²¢ÐÐÖ´ÐС£Ö»Óе±Ò»¸ö¹¤×÷½ÚµãÉÏËùÓÐµÄ Map ÈÎÎñ¶¼½áÊøºó²Å¿ªÊ¼ Reduce ½×¶Î¡£Reduce
½×¶Î¼ÌÐøÑØÓÃÁ˶¯Ì¬ÈÎÎñµ÷¶È»úÖÆ£¬Í¬Ê±ÔÊÐíÓû§×Ô¶¨ÒåÊý¾Ý·ÖÇø¹æÔò¡£
Çåµ¥ 6 . Phoenix µÄ wordCount ³ÌÐò¶Î
#include <stdio.h> #include <strings.h> #include <string.h> #include <stddef.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <sys/mman.h> #include <sys/stat.h> #include <sys/time.h> #include <fcntl.h> #include <ctype.h> #include <inttypes.h> #include "map_reduce.h" #include "stddefines.h" #include "sort.h" #define DEFAULT_DISP_NUM 10 typedef struct { int fpos; off_t flen; char *fdata; int unit_size; } wc_data_t; enum { IN_WORD, NOT_IN_WORD }; struct timeval begin, end; #ifdef TIMING unsigned int library_time = 0; #endif /** mystrcmp() * Comparison function to compare 2 words */ int mystrcmp(const void *s1, const void *s2) { return strcmp((const char *)s1, (const char *) s2); } /** mykeyvalcmp() * Comparison function to compare 2 ints */ int mykeyvalcmp(const void *v1, const void *v2) { keyval_t* kv1 = (keyval_t*)v1; keyval_t* kv2 = (keyval_t*)v2; intptr_t *i1 = kv1->val; intptr_t *i2 = kv2->val; if (i1 < i2) return 1; else if (i1 > i2) return -1; else { return strcmp((char *)kv1->key, (char *)kv2->key); //return 0; } } /** wordcount_·Ö¸îÆ÷ () * ÄÚ´æÀïÃæ½øÐÐ Map ¼ÆËã */ int wordcount_splitter(void *data_in, int req_units, map_args_t *out) { wc_data_t * data = (wc_data_t *)data_in; assert(data_in); assert(out); assert(data->flen >= 0); assert(data->fdata); assert(req_units); assert(data->fpos >= 0); // End of file reached, return FALSE for no more data if (data->fpos >= data->flen) return 0; // Set the start of the next data out->data = (void *)&data->fdata[data->fpos]; // Determine the nominal length out->length = req_units * data->unit_size; if (data->fpos + out->length > data->flen) out->length = data->flen - data->fpos; // Set the length to end at a space for (data->fpos += (long)out->length; data->fpos < data->flen && data->fdata[data->fpos] != ' ' && data->fdata[data->fpos] != '\t' && data->fdata[data->fpos] != '\r' && data->fdata[data->fpos] != '\n'; data->fpos++, out->length++); return 1; } /** wordcount_locator() * Return the memory address where this map task would heavily access. */ void *wordcount_locator (map_args_t *task) { assert (task); return task->data; } /** wordcount_map() * ¶ÔÎı¾½øÐмÆÊý */ void wordcount_map(map_args_t *args) { char *curr_start, curr_ltr; int state = NOT_IN_WORD; int i; assert(args); char *data = (char *)args->data; assert(data); curr_start = data; for (i = 0; i < args->length; i++) { curr_ltr = toupper(data[i]); switch (state) { case IN_WORD: data[i] = curr_ltr; if ((curr_ltr < 'A' || curr_ltr > 'Z') && curr_ltr != '\'') { data[i] = 0; emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1); state = NOT_IN_WORD; } break; default: case NOT_IN_WORD: if (curr_ltr >= 'A' && curr_ltr <= 'Z') { curr_start = &data[i]; data[i] = curr_ltr; state = IN_WORD; } break; } } // Add the last word if (state == IN_WORD) { data[args->length] = 0; emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1); } } /** wordcount_reduce() * ¼ÆËã×Ö·û */ void wordcount_reduce(void *key_in, iterator_t *itr) { char *key = (char *)key_in; void *val; intptr_t sum = 0; assert(key); assert(itr); while (iter_next (itr, &val)) { sum += (intptr_t)val; } emit(key, (void *)sum); } void *wordcount_combiner (iterator_t *itr) { void *val; intptr_t sum = 0; assert(itr); while (iter_next (itr, &val)) { sum += (intptr_t)val; } return (void *)sum; } int main(int argc, char *argv[]) { final_data_t wc_vals; int i; int fd; char * fdata; int disp_num; struct stat finfo; char * fname, * disp_num_str; struct timeval starttime,endtime; get_time (&begin); // È·±£ÎļþÃû if (argv[1] == NULL) { printf("USAGE: %s <filename> [Top # of results to display]\n", argv[0]); exit(1); } fname = argv[1]; disp_num_str = argv[2]; printf("Wordcount: Running...\n"); // ¶ÁÈ¡Îļþ CHECK_ERROR((fd = open(fname, O_RDONLY)) < 0); // Get the file info (for file length) CHECK_ERROR(fstat(fd, &finfo) < 0); #ifndef NO_MMAP // ÄÚ´æÀïÃæ¿ªÊ¼µ÷Óà map CHECK_ERROR((fdata = mmap(0, finfo.st_size + 1, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)) == NULL); #else int ret; fdata = (char *)malloc (finfo.st_size); CHECK_ERROR (fdata == NULL); ret = read (fd, fdata, finfo.st_size); CHECK_ERROR (ret != finfo.st_size); #endif CHECK_ERROR((disp_num = (disp_num_str == NULL) ? DEFAULT_DISP_NUM : atoi(disp_num_str)) <= 0); wc_data_t wc_data; wc_data.unit_size = 5; // approx 5 bytes per word wc_data.fpos = 0; wc_data.flen = finfo.st_size; wc_data.fdata = fdata; CHECK_ERROR (map_reduce_init ()); map_reduce_args_t map_reduce_args; memset(&map_reduce_args, 0, sizeof(map_reduce_args_t)); map_reduce_args.task_data = &wc_data; map_reduce_args.map = wordcount_map; map_reduce_args.reduce = wordcount_reduce; map_reduce_args.combiner = wordcount_combiner; map_reduce_args.splitter = wordcount_splitter; map_reduce_args.locator = wordcount_locator; map_reduce_args.key_cmp = mystrcmp; map_reduce_args.unit_size = wc_data.unit_size; map_reduce_args.partition = NULL; // use default map_reduce_args.result = &wc_vals; map_reduce_args.data_size = finfo.st_size; map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 1024 * 2; map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8; map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16; map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8; map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16; map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2; printf("Wordcount: Calling MapReduce Scheduler Wordcount\n"); gettimeofday(&starttime,0); get_time (&end); #ifdef TIMING fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin)); #endif get_time (&begin); CHECK_ERROR(map_reduce (&map_reduce_args) < 0); get_time (&end); #ifdef TIMING library_time += time_diff (&end, &begin); #endif get_time (&begin); gettimeofday(&endtime,0); printf("Wordcount: Completed %ld\n",(endtime.tv_sec - starttime.tv_sec)); printf("Wordcount: MapReduce Completed\n"); printf("Wordcount: Calling MapReduce Scheduler Sort\n"); mapreduce_sort(wc_vals.data, wc_vals.length, sizeof(keyval_t), mykeyvalcmp); CHECK_ERROR (map_reduce_finalize ()); printf("Wordcount: MapReduce Completed\n"); dprintf("\nWordcount: Results (TOP %d):\n", disp_num); for (i = 0; i < disp_num && i < wc_vals.length; i++) { keyval_t * curr = &((keyval_t *)wc_vals.data)[i]; dprintf("%15s - %" PRIdPTR "\n", (char *)curr->key, (intptr_t)curr->val); } free(wc_vals.data); #ifndef NO_MMAP CHECK_ERROR(munmap(fdata, finfo.st_size + 1) < 0); #else free (fdata); #endif CHECK_ERROR(close(fd) < 0); get_time (&end); #ifdef TIMING fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin)); #endif return 0; } |
Mars MapReduce
Mars ¿ò¼ÜÖУ¬Map ºÍ Reduce µÄ´¦Àí½×¶Î¶¼ÔÚ GPU ÄÚ½øÐУ¬Map ºÍ Reduce
µÄ·Ö¸îÊý¾Ý½×¶Î¶¼ÔÚ CPU ÄÚ½øÐУ¬ÕâÊÇÓëÆäËû»ùÓÚ CPU µÄ MapReduce ¿ò¼ÜµÄ×î´ó²»Í¬¡£Mars
¸ü¶àµÄÊÇÀûÓà CPU¡¢GPU »º´æÀ´Ìæ´úÄڴ棬ִÐÐÊý¾Ý·Ö¸î¡¢´¦Àí¹ý³Ì¡£
¾ßÌåµÄ Word count µÄÁ÷³ÌÈçÏÂËùʾ£º
×¼±¸ key/value ¼üÖµ¶Ô£¬½«ÕâЩ¼üÖµ¶Ô´æ´¢ÔÚÊý×éÀïÃæ£»
³õʼ»¯ MapReduce ÉÏÏÂÎÄ£¬ÉèÖòÎÊý (¸ù¾Ý²»Í¬µÄ GPU ÐèÒª¸ù¾Ý CUDA ºËÐÄÊýÄ¿ÉèÖò¢·¢Ïß³ÌÊý)£»
Êý¾ÝÔ¤´¦Àí£¬Ê×ÏÈ´ò¿ªÎļþ£¬½«ÎļþËùÓÐÄÚÈݶÁÈëÄڴ棬ȻºóÉêÇëÒ»¿éͬÎļþ´óСµÄÏԴ棬½«ÎļþÄÚÈÝÖÐСд×Ö·ûתΪ´óд
(ÕâÑùµÄÓ°Ïì word,Word Ëãͨһ¸öµ¥´Ê)¡£
¿ªÊ¼ MapReduce ½×¶Î¡£¸ù¾Ý²¢·¢Ïß³ÌÊýºÍÎļþ´óдÇл»ÄÚ´æÖеÄÎļþ£¬Ã¿¿éÇзֺóµÄÈÎÎñ¼Ç¼Ï¸ÃÈÎÎñÔÚÄÚ´æÖÐµÄÆ«ÒÆÎ»Öúͳ¤¶ÈÊÓΪ
value, ÏÔ´æµÄÖ¸ÕëµØÖ·ÊÓΪ key£¬½«ÈÎÎñÌí¼ÓµÄÈÎÎñ³Ø¡£½«´¦ÀíºóµÄÄÚ´æÄÚÈݸ´ÖƵ½¸Õ¸ÕÉêÇëµÄÏÔ´æÖС£½Ó×Å¿ªÊ¼
Map Á÷³Ì£¬½«ÄÚ´æÖеÄÈÎÎñ³Ø¸´ÖƵ½ÏԴ棬ÉêÇëÏÔ´æ¿éÓÃÓÚ´æ·Å Map ²úÉúµÄÊý¾Ý£¬¿ªÆô¶àÏ̲߳¢·¢Ö´ÐÐÓû§¶¨ÒåµÄ
map Á÷³Ì MAP_COUNT_FUNC£¬Õâ¸öÊÇ Mars ÓÉÓÚ GPU ³ÌÐòµÄÌØÊâÐÔ¶øÉè¼ÆµÄ£¬ÓÃÓڼǼ
map ²úÉúµÄ key ºÍ value µÄ³¤¶È (sizeof)¡£µ÷Óà MAP_FUNC ·½·¨£¬ÊäÈëÈÎÎñ¼Ç¼£¬Êä³öµ¥´ÊÒÔ¼°µ¥´ÊËùÔÚµÄλÖã»
Èç¹û noSort ÊÇ F£¬¶Ô½á¹ûÅÅÐò£»
Èç¹û noReduce ÊÇ F£¬GPU ¿ªÊ¼ reduce ½×¶Î£¬Éú³É×îÖյĽá¹û¼¯¡£·ñÔò£¬Á¢¼´Êä³ö×îºóµÄ½á¹û¼¯£»
½á¹ûÊä³ö£¬´Ó GPU É豸¿½±´×îÖյĽá¹û¼¯µ½Äڴ棬ȻºóÊä³öµ½ÆÁÄ»¡£
ͨ¹ýÉÏÊöµÄ 7 ¸ö²½Ö裬WordCount µÄ¼ÆËã¹ý³ÌÈ«²¿Íê³É²¢ÇÒÊä³ö½á¹û¼¯¡£
Çåµ¥ 7 . Mars µÄ Map ³ÌÐò¶Î
#ifndef __MAP_CU__ #define __MAP_CU__ #include "MarsInc.h" #include "global.h" __device__ int hash_func(char* str, int len) { int hash, i; for (i = 0, hash=len; i < len; i++) hash = (hash<<4)^(hash>>28)^str[i]; return hash; } __device__ void MAP_COUNT_FUNC//(void *key, void *val, size_t keySize, size_t valSize) { WC_KEY_T* pKey = (WC_KEY_T*)key; WC_VAL_T* pVal = (WC_VAL_T*)val; char* ptrBuf = pKey->file + pVal->line_offset; int line_size = pVal->line_size; char* p = ptrBuf; int lsize = 0; int wsize = 0; char* start = ptrBuf; while(1) { for (; *p >= 'A' && *p <= 'Z'; p++, lsize++); *p = '\0'; ++p; ++lsize; wsize = (int)(p - start); if (wsize > 6) { //printf("%s, wsize:%d\n", start, wsize); EMIT_INTER_COUNT_FUNC(wsize, sizeof(int)); } for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++); if (lsize >= line_size) break; start = p; } } __device__ void MAP_FUNC//(void *key, void val, size_t keySize, size_t valSize) { WC_KEY_T* pKey = (WC_KEY_T*)key; WC_VAL_T* pVal = (WC_VAL_T*)val; char* filebuf = pKey->file; char* ptrBuf = filebuf + pVal->line_offset; int line_size = pVal->line_size; char* p = ptrBuf; char* start = ptrBuf; int lsize = 0; int wsize = 0; while(1) { for (; *p >= 'A' && *p <= 'Z'; p++, lsize++); *p = '\0'; ++p; ++lsize; wsize = (int)(p - start); int* o_val = (int*)GET_OUTPUT_BUF(0); *o_val = wsize; if (wsize > 6) { //printf("%s, %d\n", start, wsize); EMIT_INTERMEDIATE_FUNC(start, o_val, wsize, sizeof(int)); } for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++); if (lsize >= line_size) break; start = p; } } #endif //__MAP_CU__ |
Çåµ¥ 8 . Mars µÄ Reduce ³ÌÐò¶Î
#ifndef __REDUCE_CU__ #define __REDUCE_CU__ #include "MarsInc.h" __device__ void REDUCE_COUNT_FUNC//(void* key, void* vals, size_t keySize, size_t valCount) { } __device__ void REDUCE_FUNC//(void* key, void* vals, size_t keySize, size_t valCount) { } #endif //__REDUCE_CU__ |
ÎåÖÖ¿ò¼Ü WordCount ʵÑéÐÔÄܶԱÈ

ͼ 9 . ʵÑéÔËÐÐʱ¼ä±È½Ïͼ
ͼ 9 ʵÑéÔËÐÐʱ¼ä±È½ÏͼÊÇ·ÖÎö²»Í¬´óСµÄÎı¾ÎļþËùÏûºÄµÄʱ¼ä¶Ô±Èͼ¡£´ÓÉÏͼ¿ÉÒÔ¿´³ö£¬Hadoop MapReduce
µÄÔËÐÐʱ¼ä×£¬ÔÒòÊÇ Hadoop Éú̬»·¾³°üº¬ÄÚÈݹý¶à£¬ËùÒÔÿ´ÎÈÎÎñÆô¶¯Ê±Ê×ÏÈÐèÒª¼ÓÔØËùÐè×ÊÔ´°ü£¬È»ºó»ºÂýµØ·¢ÆðÈÎÎñ£¬²¢ÇÒÓÉÓÚ±¾ÉíÊÇÓÃÐÔÄܽϲîµÄ
Java ÓïÑÔ±àдµÄ£¬ËùÒÔµ¼ÖÂÕûÌ弯Ëãʱ¼ä³¤¡¢ÐÔÄܲPhoenix ÓÉÓÚ²ÉÓûã±àºÍ C ÓïÑÔ±àд£¬Äں˺ÜС£¬ÔËÐÐʱËùÓÃ×ÊÔ´ºÜÉÙ£¬ËùÒÔÕû¸ö²âÊÔ¹ý³ÌºÄʱҲ½ÏÉÙ¡£Spark
¿ò¼ÜÔÚ WordCount ʵÑéÖÐÏûºÄµÄʱ³¤½Ï Disco ÉÔÉÙ£¬µ«ÊÇ±È Phoenix¡¢Mars ºÄʱ̫¶à¡£ºÄʱ×î¶ÌµÄÁ½¸ö¿ò¼ÜÊÇ
Mars ºÍ Phoenix¡£ÐèҪʱ³¤´Ó¸ßµ½µÍ·Ö±ðÊÇ Hadoop MapReduce¡¢Disco¡¢Spark¡¢Phoenix¡¢Mars¡£

ͼ 10 .CPU ʹÓÃÂʶԱÈͼ
ͼ 10-CPU ʹÓÃÂʱȽÏͼÊÇ·ÖÎöÈÎÎñÖ´Ðйý³Ìµ±ÖÐ CPU ʹÓÃÂÊÇé¿öͼ¡£´ÓÉÏͼ¿ÉÒÔ¿´³ö£¬Hadoop
MapReduce¡¢Disco ÕâÁ½¸ö¿ò¼ÜÐèÒªÕ¼ÓÃµÄ CPU ×ÊÔ´ÔÚ 1000M Îı¾´¦Àíʱ»ù±¾µ½´ï×î´ó±¥ºÍ¶È
(´óÓÚ 90%)£¬Apache Spark µÄ CPU ʹÓÃÂÊûÓÐÍêÈ«°éËæ×ÅÎı¾ÎļþÔö´ó¶ø´ó·ùÉÏÕÇ£¬Phoenix
ºÍ Mars »ù±¾¿ØÖÆÔÚ¶Ô CPU ʹÓÃÂʽϵ͵ķ¶Î§ÄÚ¡£

ͼ 11 . ÄÚ´æÊ¹ÓÃÂʶԱÈͼ
ͼ 11 ÄÚ´æÊ¹ÓÃÂʱȽÏͼÊÇ·ÖÎöÈÎÎñÖ´Ðйý³ÌÖÐÄÚ´æÊ¹ÓÃÇé¿ö¶Ô±È¡£´ÓͼÖпÉÒÔ¿´³ö£¬Mars ºÍ Phoenix
ÕâÁ½¿î¿ò¼ÜËùʹÓõÄÄÚ´æÔÚÎı¾Êý¾Ý½ÏСʱÊÇ×îÉٵģ¬Ëæ×ÅÎı¾Êý¾ÝµÄÔö´ó£¬Apache Spark Ëæ×ÅÊý¾ÝÁ¿Ôö´ó¶øÄÚ´æ´ó·ùÔö¼Ó£¬Mars
ºÍ Phoenix ÓÐÒ»¶¨·ù¶ÈµÄÄÚ´æÊ¹ÓÃÔö¼ÓÇ÷ÊÆ¡£µ±Êý¾ÝÁ¿´ïµ½±¾´Î²âÊÔ×î´óµÄ 1000M Îı¾Ê±£¬Spark
¿ò¼Ü¶ÔÄÚ´æµÄÏûºÄÊÇ×îСµÄ£¬Hadoop MapReduce ºÍ Disco ÐèÒªÕ¼Óý϶àµÄÄÚ´æ¡£
´ÓÉÏÃæµÄ²âÊÔ½á¹ûÎÒÃǵóö£¬Èç¹ûÓû§Ö»ÐèÒª´¦Àíº£Á¿µÄÎı¾Îļþ£¬²»ÐèÒª¿¼ÂÇ´æ´¢¡¢¶þ´ÎÊý¾ÝÍÚ¾òµÈ£¬²ÉÓà Phoenix
»òÕß Mars ÊÇ×î´óÐԼ۱ȵÄÑ¡Ôñ£¬µ«ÊÇÓÉÓÚ Mars ±ØÐëÔÚ GPU ÉÏÔËÐУ¬±¾Éí GPU ÓÉÓÚ¼Û¸ñÒòËØ£¬µ¼Ö²»Ì«¿ÉÄÜÔÚʵ¼ÊÓ¦Óó¡¾°ÀïÍÆ¹ã£¬ËùÒÔ×ÛºÏÀ´¿´
Phoenix ÊÇÐÔ¼Û±È×î¸ßµÄ¿ò¼Ü¡£Èç¹ûÓ¦ÓóÌÐòÐèÒª´¦ÀíµÄÊý¾ÝÁ¿·Ç³£´ó£¬²¢ÇÒ¿Í»§Ï£Íû¼ÆËã³öµÄÊý¾Ý¿ÉÒÔ±»´æ´¢ºÍ¶þ´Î¼ÆËã»òÊý¾ÝÍÚ¾ò£¬ÄÇ
Hadoop MapReduce ½ÏºÃ£¬ÒòΪÕû¸ö Hadoop Éú̬ȦÅÓ´ó£¬Ö§³ÖÐԺܺá£Apache Spark
ÓÉÓڼܹ¹²ãÃæÉè¼Æ²»Í¬£¬ËùÒÔ¶ÔÓÚ CPU¡¢ÄÚ´æµÄʹÓÃÂÊÒ»Ö±±£³Ö½ÏµÍ״̬£¬ËüδÀ´¿ÉÒÔÓÃÓÚº£Á¿Êý¾Ý·ÖÎöÓÃ;¡£
½áÊøÓï
ÏÖʵÊÀ½çºÜ¶àʵÀý¶¼¿ÉÓà MapReduce ±à³ÌÄ£ÐÍÀ´±íʾ£¬MapReduce ×÷Ϊһ¸öͨÓÿÉÀ©Õ¹µÄ¡¢¸ßÈÝ´íÐԵIJ¢Ðд¦ÀíÄ£ÐÍ£¬¿ÉÓÐЧµØ´¦Àíº£Á¿Êý¾Ý£¬²»¶ÏµØ´ÓÖзÖÎöÍÚ¾ò³öÓмÛÖµµÄÐÅÏ¢¡£MapReduce
·â×°Á˲¢Ðд¦Àí¡¢¸ºÔؾùºâ¡¢ÈÝ´í¡¢Êý¾Ý±¾µØ»¯µÈ¼¼ÊõÄѵãϸ½Ú¡£Í¨¹ý±¾ÎIJâÊÔÓÃÀý¿ÉÒÔÖ¤Ã÷ MapReduce
ÊÊÓÃÓÚº£Á¿Îı¾·ÖÎöµÄÓ¦Óó¡¾°£¬¿ÉÒÔΪ´¦Àí´óÊý¾ÝÌṩ¼¼ÊõÖ§³Å¡£
|