ÔÚÕâÆªÎÄÕÂÀï×ܽáÁ˼¸ÖÖÍøÉÏ»òÕßÂÛÎÄÖг£¼ûµÄMapReduceģʽºÍËã·¨£¬²¢ÏµÍ³»¯µÄ½âÊÍÁËÕâЩ¼¼ÊõµÄ²»Í¬Ö®´¦¡£ËùÓÐÃèÊöÐÔµÄÎÄ×ֺʹúÂ붼ʹÓÃÁ˱ê×¼hadoopµÄMapReduceÄ£ÐÍ£¬°üÀ¨Mappers,
Reduces, Combiners, Partitioners,ºÍ sorting¡£ÈçÏÂͼËùʾ¡£

MapReduceģʽ
¼ÆÊýÓëÇóºÍ
ÎÊÌâ³ÂÊö: ÓÐÐí¶àÎĵµ£¬Ã¿¸öÎĵµ¶¼ÓÐһЩ×Ö¶Î×é³É¡£ÐèÒª¼ÆËã³öÿ¸ö×Ö¶ÎÔÚËùÓÐÎĵµÖеijöÏÖ´ÎÊý»òÕßÕâЩ×Ö¶ÎµÄÆäËûʲôͳ¼ÆÖµ¡£ÀýÈ磬¸ø¶¨Ò»¸ölogÎļþ£¬ÆäÖеÄÿÌõ¼Ç¼¶¼°üº¬Ò»¸öÏìӦʱ¼ä£¬ÐèÒª¼ÆËã³öƽ¾ùÏìӦʱ¼ä¡£
½â¾ö·½°¸:
ÈÃÎÒÃÇÏÈ´Ó¼òµ¥µÄÀý×ÓÈëÊÖ¡£ÔÚÏÂÃæµÄ´úÂëÆ¬¶ÎÀMapperÿÓöµ½Ö¸¶¨´Ê¾Í°ÑƵ´Î¼Ç1£¬ReducerÒ»¸ö¸ö±éÀúÕâЩ´ÊµÄ¼¯ºÏÈ»ºó°ÑËûÃÇµÄÆµ´Î¼ÓºÍ¡£
class Mapper method Map(docid id, doc d) for all term t in doc d do Emit(term t, count 1) class Reducer method Reduce(term t, counts [c1, c2,...]) sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum) |
ÕâÖÖ·½·¨µÄȱµãÏÔ¶øÒ×¼û£¬MapperÌá½»ÁËÌ«¶àÎÞÒâÒåµÄ¼ÆÊý¡£ËüÍêÈ«¿ÉÒÔͨ¹ýÏȶÔÿ¸öÎĵµÖеĴʽøÐмÆÊý´Ó¶ø¼õÉÙ´«µÝ¸øReducerµÄÊý¾ÝÁ¿:
class Mapper method Map(docid id, doc d) H = new AssociativeArray for all term t in doc d do H{t} = H{t} + 1 for all term t in H do Emit(term t, count H{t}) |
Èç¹ûÒªÀۼƼÆÊýµÄµÄ²»Ö»Êǵ¥¸öÎĵµÖеÄÄÚÈÝ£¬»¹°üÀ¨ÁËÒ»¸öMapper½Úµã´¦ÀíµÄËùÓÐÎĵµ£¬ÄǾÍÒªÓõ½CombinerÁË:
class Mapper method Map(docid id, doc d) for all term t in doc d do Emit(term t, count 1) class Combiner method Combine(term t, [c1, c2,...]) sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum) class Reducer method Reduce(term t, counts [c1, c2,...]) sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum) |
Ó¦ÓÃ:
Log ·ÖÎö, Êý¾Ý²éѯ
ÕûÀí¹éÀà
ÎÊÌâ³ÂÊö:
ÓÐһϵÁÐÌõÄ¿£¬Ã¿¸öÌõÄ¿¶¼Óм¸¸öÊôÐÔ£¬Òª°Ñ¾ßÓÐͬһÊôÐÔÖµµÄÌõÄ¿¶¼±£´æÔÚÒ»¸öÎļþÀ»òÕß°ÑÌõÄ¿°´ÕÕÊôÐÔÖµ·Ö×é¡£
×îµäÐ͵ÄÓ¦ÓÃÊǵ¹ÅÅË÷Òý¡£
½â¾ö·½°¸£º
½â¾ö·½°¸ºÜ¼òµ¥¡£ ÔÚ Mapper ÖÐÒÔÿ¸öÌõÄ¿µÄËùÐèÊôÐÔÖµ×÷Ϊ key£¬Æä±¾Éí×÷Ϊֵ´«µÝ¸ø Reducer¡£
Reducer È¡µÃ°´ÕÕÊôÐÔÖµ·Ö×éµÄÌõÄ¿£¬È»ºó¿ÉÒÔ´¦Àí»òÕß±£´æ¡£Èç¹ûÊÇÔÚ¹¹½¨µ¹ÅÅË÷Òý£¬ÄÇô ÿ¸öÌõÄ¿Ï൱ÓÚÒ»¸ö´Ê¶øÊôÐÔÖµ¾ÍÊÇ´ÊËùÔÚµÄÎĵµID¡£
Ó¦ÓÃ:
µ¹ÅÅË÷Òý£¬ ETL
¹ýÂË (Îı¾²éÕÒ)£¬½âÎöºÍУÑé
ÎÊÌâ³ÂÊö:
¼ÙÉèÓкܶàÌõ¼Ç¼£¬ÐèÒª´ÓÆäÖÐÕÒ³öÂú×ãij¸öÌõ¼þµÄËùÓмǼ£¬»òÕß½«Ã¿Ìõ¼Ç¼´«»»³ÉÁíÍâÒ»ÖÖÐÎʽ£¨×ª»»²Ù×÷Ïà¶ÔÓÚ¸÷Ìõ¼Ç¼¶ÀÁ¢£¬¼´¶ÔÒ»Ìõ¼Ç¼µÄ²Ù×÷ÓëÆäËû¼Ç¼Î޹أ©¡£ÏñÎı¾½âÎö¡¢Ìض¨Öµ³éÈ¡¡¢¸ñʽת»»µÈ¶¼ÊôÓÚºóÒ»ÖÖÓÃÀý¡£
½â¾ö·½°¸:
·Ç³£¼òµ¥£¬ÔÚMapper ÀïÖðÌõ½øÐвÙ×÷£¬Êä³öÐèÒªµÄÖµ»òת»»ºóµÄÐÎʽ¡£
Ó¦ÓÃ:
ÈÕÖ¾·ÖÎö£¬Êý¾Ý²éѯ£¬ETL£¬Êý¾ÝУÑé
·Ö²¼Ê½ÈÎÎñÖ´ÐÐ
ÎÊÌâ³ÂÊö:
´óÐͼÆËã¿ÉÒÔ·Ö½âΪ¶à¸ö²¿·Ö·Ö±ð½øÐÐÈ»ºóºÏ²¢¸÷¸ö¼ÆËãµÄ½á¹ûÒÔ»ñµÃ×îÖÕ½á¹û¡£
½â¾ö·½°¸: ½«Êý¾ÝÇзֳɶà·Ý×÷Ϊÿ¸ö Mapper µÄÊäÈ룬ÿ¸öMapper´¦ÀíÒ»·ÝÊý¾Ý£¬Ö´ÐÐͬÑùµÄÔËË㣬²úÉú½á¹û£¬Reducer°Ñ¶à¸öMapperµÄ½á¹û×éºÏ³ÉÒ»¸ö¡£
°¸ÀýÑо¿£º Êý×ÖͨÐÅϵͳģÄâ
Ïñ WiMAX ÕâÑùµÄÊý×ÖͨÐÅÄ£ÄâÈí¼þͨ¹ýϵͳģÐÍÀ´´«Êä´óÁ¿µÄËæ»úÊý¾Ý£¬È»ºó¼ÆËã´«ÊäÖеĴíÎó¼¸ÂÊ¡£ ÿ¸ö
Mapper ´¦ÀíÑù±¾ 1/N µÄÊý¾Ý£¬¼ÆËã³öÕⲿ·ÖÊý¾ÝµÄ´íÎóÂÊ£¬È»ºóÔÚ Reducer Àï¼ÆËãÆ½¾ù´íÎóÂÊ¡£
Ó¦ÓÃ:
¹¤³ÌÄ£Ä⣬Êý×Ö·ÖÎö£¬ÐÔÄܲâÊÔ
ÅÅÐò
ÎÊÌâ³ÂÊö:
ÓÐÐí¶àÌõ¼Ç¼£¬ÐèÒª°´ÕÕijÖÖ¹æÔò½«ËùÓмǼÅÅÐò»òÊǰ´ÕÕ˳ÐòÀ´´¦Àí¼Ç¼¡£
½â¾ö·½°¸: ¼òµ¥ÅÅÐòºÜºÃ°ì ¨C Mappers ½«´ýÅÅÐòµÄÊôÐÔֵΪ¼ü£¬ÕûÌõ¼Ç¼ΪֵÊä³ö¡£ ²»¹ýʵ¼ÊÓ¦ÓÃÖеÄÅÅÐòÒª¸ü¼ÓÇÉÃîÒ»µã£¬
Õâ¾ÍÊÇËüÖ®ËùÒÔ±»³ÆÎªMapReduce ºËÐĵÄÔÒò£¨¡°ºËÐÄ¡±ÊÇ˵ÅÅÐò£¿ÒòΪ֤Ã÷Hadoop¼ÆËãÄÜÁ¦µÄʵÑéÊÇ´óÊý¾ÝÅÅÐò£¿»¹ÊÇ˵HadoopµÄ´¦Àí¹ý³ÌÖжÔkeyÅÅÐòµÄ»·½Ú£¿£©¡£ÔÚʵ¼ùÖУ¬³£ÓÃ×éºÏ¼üÀ´ÊµÏÖ¶þ´ÎÅÅÐòºÍ·Ö×é¡£
MapReduce ×î³õÖ»Äܹ»¶Ô¼üÅÅÐò£¬ µ«ÊÇÒ²Óм¼ÊõÀûÓÿÉÒÔÀûÓÃHadoop µÄÌØÐÔÀ´ÊµÏÖ°´ÖµÅÅÐò¡£ÏëÁ˽âµÄ»°¿ÉÒÔ¿´
ÕâÆª²©¿Í¡£
°´ÕÕBigTableµÄ¸ÅÄʹÓà MapReduceÀ´¶Ô×î³õÊý¾Ý¶ø·ÇÖмäÊý¾ÝÅÅÐò£¬Ò²¼´±£³ÖÊý¾ÝµÄÓÐÐò״̬¸üÓкô¦£¬±ØÐë×¢ÒâÕâÒ»µã¡£»»¾ä»°Ëµ£¬ÔÚÊý¾Ý²åÈëʱÅÅÐòÒ»´ÎÒª±ÈÔÚÿ´Î²éѯÊý¾ÝµÄʱºòÅÅÐò¸ü¸ßЧ¡£
Ó¦ÓÃ:
ETL£¬Êý¾Ý·ÖÎö
·Ç»ù±¾ MapReduce ģʽ
µü´úÏûÏ¢´«µÝ (ͼ´¦Àí)
ÎÊÌâ³ÂÊö£º
¼ÙÉèÒ»¸öʵÌåÍøÂ磬ʵÌåÖ®¼ä´æÔÚ׏ØÏµ¡£ ÐèÒª°´ÕÕÓëËü±ÈÁ򵀮äËûʵÌåµÄÊôÐÔ¼ÆËã³öÒ»¸ö״̬¡£Õâ¸ö״̬¿ÉÒÔ±íÏÖΪËüºÍÆäËü½ÚµãÖ®¼äµÄ¾àÀ룬
´æÔÚÌØ¶¨ÊôÐÔµÄÁÚ½ÓµãµÄ¼£Ïó£¬ ÁÚÓòÃܶÈÌØÕ÷µÈµÈ¡£
½â¾ö·½°¸£º
ÍøÂç´æ´¢ÎªÏµÁнڵãµÄ½áºÏ£¬Ã¿¸ö½Úµã°üº¬ÓÐÆäËùÓÐÁÚ½ÓµãIDµÄÁÐ±í¡£°´ÕÕÕâ¸ö¸ÅÄMapReduce µü´ú½øÐУ¬Ã¿´Îµü´úÖÐÿ¸ö½Úµã¶¼·¢ÏûÏ¢¸øËüµÄÁڽӵ㡣ÁÚ½Óµã¸ù¾Ý½ÓÊÕµ½µÄÐÅÏ¢¸üÐÂ×Ô¼ºµÄ״̬¡£µ±Âú×ãÁËijЩÌõ¼þµÄʱºòµü´úÍ£Ö¹£¬Èç´ïµ½ÁË×î´óµü´ú´ÎÊý£¨ÍøÂç°ë¾¶£©»òÁ½´ÎÁ¬ÐøµÄµü´ú¼¸ºõûÓÐ״̬¸Ä±ä¡£´Ó¼¼ÊõÉÏÀ´¿´£¬Mapper
ÒÔÿ¸öÁÚ½ÓµãµÄIDΪ¼ü·¢³öÐÅÏ¢£¬ËùÓеÄÐÅÏ¢¶¼»á°´ÕÕ½ÓÊܽڵã·Ö×飬reducer ¾ÍÄܹ»ÖØËã¸÷½ÚµãµÄ״̬Ȼºó¸üÐÂÄÇЩ״̬¸Ä±äÁ˵Ľڵ㡣ÏÂÃæÕ¹Ê¾ÁËÕâ¸öËã·¨£º
class Mapper method Map(id n, object N) Emit(id n, object N) for all id m in N.OutgoingRelations do Emit(id m, message getMessage(N)) class Reducer method Reduce(id m, [s1, s2,...]) M = null messages = [] for all s in [s1, s2,...] do if IsObject(s) then M = s else // s is a message messages.add(s) M.State = calculateState(messages) Emit(id m, item M) |
Ò»¸ö½ÚµãµÄ״̬¿ÉÒÔѸËÙµÄÑØ×ÅÍøÂç´«È«Íø£¬ÄÇЩ±»¸ÐȾÁ˵ĽڵãÓÖÈ¥¸ÐȾËüÃǵÄÁÚ¾Ó£¬Õû¸ö¹ý³Ì¾ÍÏñÏÂÃæµÄͼʾһÑù£º

°¸ÀýÑо¿£º ÑØ·ÖÀàÊ÷µÄÓÐЧÐÔ´«µÝ
ÎÊÌâ³ÂÊö£º
Õâ¸öÎÊÌâÀ´×ÔÓÚÕæÊµµÄµç×ÓÉÌÎñÓ¦Ó᣽«¸÷ÖÖ»õÎï·ÖÀ࣬ÕâЩÀà±ð¿ÉÒÔ×é³ÉÒ»¸öÊ÷Ðνṹ£¬±È½Ï´óµÄ·ÖÀࣨÏñÄÐÈË¡¢Å®ÈË¡¢¶ùͯ£©¿ÉÒÔÔÙ·Ö³öС·ÖÀࣨÏñÄпã»òÅ®×°£©£¬Ö±µ½²»ÄÜÔÙ·ÖΪֹ£¨ÏñÄÐʽÀ¶É«Å£×п㣩¡£ÕâЩ²»ÄÜÔٷֵĻù²ãÀà±ð¿ÉÒÔÊÇÓÐЧ£¨Õâ¸öÀà±ð°üº¬ÓлõÆ·£©»òÕßÒÑÎÞЧµÄ£¨Ã»ÓÐÊôÓÚÕâ¸ö·ÖÀàµÄ»õÆ·£©¡£Èç¹ûÒ»¸ö·ÖÀàÖÁÉÙº¬ÓÐÒ»¸öÓÐЧµÄ×Ó·ÖÀàÄÇôÈÏΪÕâ¸ö·ÖÀàÒ²ÊÇÓÐЧµÄ¡£ÎÒÃÇÐèÒªÔÚÒÑ֪һЩ»ù²ã·ÖÀàÓÐЧµÄÇé¿öÏÂÕÒ³ö·ÖÀàÊ÷ÉÏËùÓÐÓÐЧµÄ·ÖÀà¡£
½â¾ö·½°¸£º
Õâ¸öÎÊÌâ¿ÉÒÔÓÃÉÏÒ»½ÚÌáµ½µÄ¿ò¼ÜÀ´½â¾ö¡£ÎÒÃÇÕ¦ÏÂÃæ¶¨ÒåÁËÃûΪ getMessageºÍ
calculateState µÄ·½·¨£º
class N State in {True = 2, False = 1, null = 0}, initialized 1 or 2 for end-of-line categories, 0 otherwise method getMessage(object N) return N.State method calculateState(state s, data [d1, d2,...]) return max( [d1, d2,...] ) |
°¸ÀýÑо¿£º¹ã¶ÈÓÅÏÈËÑË÷
ÎÊÌâ³ÂÊö£ºÐèÒª¼ÆËã³öÒ»¸öͼ½á¹¹ÖÐijһ¸ö½Úµãµ½ÆäËüËùÓнڵãµÄ¾àÀë¡£
½â¾ö·½°¸£º SourceÔ´½Úµã¸øËùÓÐÁڽӵ㷢³öֵΪ0µÄÐźţ¬ÁÚ½Óµã°ÑÊÕµ½µÄÐźÅÔÙת·¢¸ø×Ô¼ºµÄÁڽӵ㣬ÿת·¢Ò»´Î¾Í¶ÔÐźÅÖµ¼Ó1£º
class N State is distance, initialized 0 for source node, INFINITY for all other nodes method getMessage(N) return N.State + 1 method calculateState(state s, data [d1, d2,...]) min( [d1, d2,...] ) |
°¸ÀýÑо¿£ºÍøÒ³ÅÅÃûºÍ Mapper ¶ËÊý¾Ý¾ÛºÏ
Õâ¸öËã·¨ÓÉGoogleÌá³ö£¬Ê¹ÓÃȨÍþµÄPageRankËã·¨£¬Í¨¹ýÁ¬½Óµ½Ò»¸öÍøÒ³µÄÆäËûÍøÒ³À´¼ÆËãÍøÒ³µÄÏà¹ØÐÔ¡£ÕæÊµËã·¨ÊÇÏ൱¸´Ôӵ쬵«ÊǺËÐÄ˼ÏëÊÇÈ¨ÖØ¿ÉÒÔ´«²¥£¬Ò²¼´Í¨¹ýÒ»¸ö½ÚµãµÄ¸÷Áª½Ó½ÚµãµÄÈ¨ÖØµÄ¾ùÖµÀ´¼ÆËã½Úµã×ÔÉíµÄÈ¨ÖØ¡£
class N State is PageRank method getMessage(object N) return N.State / N.OutgoingRelations.size() method calculateState(state s, data [d1, d2,...]) return ( sum([d1, d2,...]) ) |
ÒªÖ¸³öµÄÊÇÉÏÃæÓÃÒ»¸öÊýÖµÀ´×÷ΪÆÀ·Öʵ¼ÊÉÏÊÇÒ»ÖÖ¼ò»¯£¬ÔÚʵ¼ÊÇé¿öÏ£¬ÎÒÃÇÐèÒªÔÚMapper¶ËÀ´½øÐоۺϼÆËãµÃ³öÕâ¸öÖµ¡£ÏÂÃæµÄ´úÂëÆ¬¶ÎչʾÁËÕâ¸ö¸Ä±äºóµÄÂß¼
£¨Õë¶ÔÓÚ PageRank Ëã·¨£©£º
class Mapper method Initialize H = new AssociativeArray method Map(id n, object N) p = N.PageRank / N.OutgoingRelations.size() Emit(id n, object N) for all id m in N.OutgoingRelations do H{m} = H{m} + p method Close for all id n in H do Emit(id n, value H{n}) class Reducer method Reduce(id m, [s1, s2,...]) M = null p = 0 for all s in [s1, s2,...] do if IsObject(s) then M = s else p = p + s M.PageRank = p Emit(id m, item M) |
Ó¦Óãº
ͼ·ÖÎö£¬ÍøÒ³Ë÷Òý
ÖµÈ¥ÖØ £¨¶ÔΨһÏî¼ÆÊý£©
ÎÊÌâ³ÂÊö: ¼Ç¼°üº¬ÖµÓòFºÍÖµÓò G£¬Òª·Ö±ðͳ¼ÆÏàͬGÖµµÄ¼Ç¼Öв»Í¬µÄFÖµµÄÊýÄ¿ (Ï൱ÓÚ°´ÕÕ G·Ö×é).
Õâ¸öÎÊÌâ¿ÉÒÔÍÆ¶ø¹ãÖ®Ó¦ÓÃÓÚ·ÖÃæËÑË÷£¨Ä³Ð©µç×ÓÉÌÎñÍøÕ¾³ÆÖ®ÎªNarrow
Search£©
Record 1: F=1, G={a, b} Record 2: F=2, G={a, d, e} Record 3: F=1, G={b} Record 4: F=3, G={a, b} Result: a -> 3 // F=1, F=2, F=3 b -> 2 // F=1, F=3 d -> 1 // F=2 e -> 1 // F=2 |
½â¾ö·½°¸ I:
µÚÒ»ÖÖ·½·¨ÊÇ·ÖÁ½¸ö½×¶ÎÀ´½â¾öÕâ¸öÎÊÌâ¡£µÚÒ»½×¶ÎÔÚMapperÖÐʹÓÃFºÍG×é³ÉÒ»¸ö¸´ºÏÖµ¶Ô£¬È»ºóÔÚReducerÖÐÊä³öÿ¸öÖµ¶Ô£¬Ä¿µÄÊÇΪÁ˱£Ö¤FÖµµÄΨһÐÔ¡£ÔÚµÚ¶þ½×¶Î£¬ÔÙ½«Öµ¶Ô°´ÕÕGÖµÀ´·Ö×鼯Ëãÿ×éÖеÄÌõÄ¿Êý¡£
µÚÒ»½×¶Î£º
class Mapper method Map(null, record [value f, categories [g1, g2,...]]) for all category g in [g1, g2,...] Emit(record [g, f], count 1) class Reducer method Reduce(record [g, f], counts [n1, n2, ...]) Emit(record [g, f], null ) |
µÚ¶þ½×¶Î£º
class Mapper method Map(record [f, g], null) Emit(value g, count 1) class Reducer method Reduce(value g, counts [n1, n2,...]) Emit(value g, sum( [n1, n2,...] ) ) |
½â¾ö·½°¸ II:
µÚ¶þÖÖ·½·¨Ö»ÐèÒªÒ»´ÎMapReduce ¼´¿ÉʵÏÖ£¬µ«À©Õ¹ÐÔ²»Ç¿¡£Ëã·¨ºÜ¼òµ¥-Mapper Êä³öÖµºÍ·ÖÀ࣬ÔÚReducerÀïΪÿ¸öÖµ¶ÔÓ¦µÄ·ÖÀàÈ¥ÖØÈ»ºó¸øÃ¿¸öËùÊôµÄ·ÖÀà¼ÆÊý¼Ó1£¬×îºóÔÙÔÚReducer½áÊøºó½«ËùÓмÆÊý¼ÓºÍ¡£ÕâÖÖ·½·¨ÊÊÓÃÓÚÖ»ÓÐÓÐÏÞ¸ö·ÖÀ࣬¶øÇÒÓµÓÐÏàͬFÖµµÄ¼Ç¼²»ÊǺܶàµÄÇé¿ö¡£ÀýÈçÍøÂçÈÕÖ¾´¦ÀíºÍÓû§·ÖÀ࣬Óû§µÄ×ÜÊýºÜ¶à£¬µ«ÊÇÿ¸öÓû§µÄʼþÊÇÓÐÏ޵ģ¬ÒÔ´Ë·ÖÀàµÃµ½µÄÀà±ðÒ²ÊÇÓÐÏ޵ġ£ÖµµÃÒ»ÌáµÄÊÇÔÚÕâÖÖģʽÏ¿ÉÒÔÔÚÊý¾Ý´«Êäµ½Reducer֮ǰʹÓÃCombinerÀ´È¥³ý·ÖÀàµÄÖØ¸´Öµ¡£
class Mapper method Map(null, record [value f, categories [g1, g2,...] ) for all category g in [g1, g2,...] Emit(value f, category g) class Reducer method Initialize H = new AssociativeArray : category -> count method Reduce(value f, categories [g1, g2,...]) [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] ) for all category g in [g1', g2',...] H{g} = H{g} + 1 method Close for all category g in H do Emit(category g, count H{g}) |
Ó¦Óãº
ÈÕÖ¾·ÖÎö£¬Óû§¼ÆÊý
»¥Ïà¹Ø
ÎÊÌâ³ÂÊö£ºÓжà¸ö¸÷ÓÉÈô¸ÉÏî¹¹³ÉµÄ×飬¼ÆËãÏîÁ½Á½¹²Í¬³öÏÖÓÚÒ»¸ö×éÖеĴÎÊý¡£¼ÙÈçÏîÊýÊÇN£¬ÄÇôӦ¸Ã¼ÆËãN*N¡£
ÕâÖÖÇé¿ö³£¼ûÓÚÎı¾·ÖÎö£¨ÌõÄ¿Êǵ¥´Ê¶øÔª×éÊǾä×Ó£©£¬Êг¡·ÖÎö£¨¹ºÂòÁË´ËÎïµÄ¿Í»§»¹¿ÉÄܹºÂòʲô£©¡£Èç¹ûN*NСµ½¿ÉÒÔÈÝÄÉÓÚһ̨»úÆ÷µÄÄڴ棬ʵÏÖÆðÀ´¾Í±È½Ï¼òµ¥ÁË¡£
Åä¶Ô·¨
µÚÒ»ÖÖ·½·¨ÊÇÔÚMapperÖиøËùÓÐÌõÄ¿Åä¶Ô£¬È»ºóÔÚReducerÖн«Í¬Ò»ÌõÄ¿¶ÔµÄ¼ÆÊý¼ÓºÍ¡£µ«ÕâÖÖ×ö·¨Ò²ÓÐȱµã£º
ʹÓà combiners ´øÀ´µÄµÄºÃ´¦ÓÐÏÞ£¬ÒòΪºÜ¿ÉÄÜËùÓÐÏî¶Ô¶¼ÊÇΨһµÄ
²»ÄÜÓÐЧÀûÓÃÄÚ´æ
class Mapper method Map(null, items [i1, i2,...] ) for all item i in [i1, i2,...] for all item j in [i1, i2,...] Emit(pair [i j], count 1) class Reducer method Reduce(pair [i j], counts [c1, c2,...]) s = sum([c1, c2,...]) Emit(pair[i j], count s) |
Stripes Approach£¨Ìõ·½·¨£¿²»ÖªµÀÕâ¸öÃû×ÖÔõôÀí½â£©
µÚ¶þÖÖ·½·¨Êǽ«Êý¾Ý°´ÕÕpairÖеĵÚÒ»ÏîÀ´·Ö×飬²¢Î¬»¤Ò»¸ö¹ØÁªÊý×飬Êý×éÖд洢µÄÊÇËùÓйØÁªÏîµÄ¼ÆÊý¡£The
second approach is to group data by the first item in
pair and maintain an associative array (¡°stripe¡±) where
counters for all adjacent items are accumulated. Reducer
receives all stripes for leading item i, merges them,
and emits the same result as in the Pairs approach.
Öмä½á¹ûµÄ¼üÊýÁ¿Ïà¶Ô½ÏÉÙ£¬Òò´Ë¼õÉÙÁËÅÅÐòÏûºÄ¡£
¿ÉÒÔÓÐЧÀûÓà combiners¡£
¿ÉÔÚÄÚ´æÖÐÖ´ÐУ¬²»¹ýÈç¹ûûÓÐÕýÈ·Ö´ÐеĻ°Ò²»á´øÀ´ÎÊÌâ¡£
ʵÏÖÆðÀ´±È½Ï¸´ÔÓ¡£
Ò»°ãÀ´Ëµ£¬ ¡°stripes¡± ±È ¡°pairs¡± ¸ü¿ì
class Mapper method Map(null, items [i1, i2,...] ) for all item i in [i1, i2,...] H = new AssociativeArray : item -> counter for all item j in [i1, i2,...] H{j} = H{j} + 1 Emit(item i, stripe H) class Reducer method Reduce(item i, stripes [H1, H2,...]) H = new AssociativeArray : item -> counter H = merge-sum( [H1, H2,...] ) for all item j in H.keys() Emit(pair [i j], H{j}) |
Ó¦Óãº
Îı¾·ÖÎö£¬Êг¡·ÖÎö
References:
Lin J. Dyer C. Hirst G. Data Intensive
Processing MapReduce
ÓÃMapReduce ±í´ï¹ØÏµÄ£Ê½
ÔÚÕⲿ·ÖÎÒÃÇ»áÌÖÂÛÒ»ÏÂÔõôʹÓÃMapReduceÀ´½øÐÐÖ÷ÒªµÄ¹ØÏµ²Ù×÷¡£
ɸѡ£¨Selection£©
class Mapper method Map(rowkey key, tuple t) if t satisfies the predicate Emit(tuple t, null) |
ͶӰ£¨Projection£©
ͶӰֻ±ÈɸѡÉÔ΢¸´ÔÓÒ»µã£¬ÔÚÕâÖÖÇé¿öÏÂÎÒÃÇ¿ÉÒÔÓÃReducerÀ´Ïû³ý¿ÉÄܵÄÖØ¸´Öµ¡£
class Mapper method Map(rowkey key, tuple t) tuple g = project(t) // extract required fields to tuple g Emit(tuple g, null) class Reducer method Reduce(tuple t, array n) // n is an array of nulls Emit(tuple t, null) |
ºÏ²¢£¨Union£©
Á½¸öÊý¾Ý¼¯ÖеÄËùÓмǼ¶¼ËÍÈëMapper£¬ÔÚReducerÀïÏûÖØ¡£
class Mapper method Map(rowkey key, tuple t) Emit(tuple t, null) class Reducer method Reduce(tuple t, array n) // n is an array of one or two nulls Emit(tuple t, null) |
½»¼¯£¨Intersection£©
½«Á½¸öÊý¾Ý¼¯ÖÐÐèÒª×ö½»²æµÄ¼Ç¼ÊäÈëMapper£¬Reducer Êä³ö³öÏÖÁËÁ½´ÎµÄ¼Ç¼¡£ÒòΪÿÌõ¼Ç¼¶¼ÓÐÒ»¸öÖ÷¼ü£¬ÔÚÿ¸öÊý¾Ý¼¯ÖÐÖ»»á³öÏÖÒ»´Î£¬ËùÒÔÕâÑù×öÊÇ¿ÉÐеġ£
class Mapper method Map(rowkey key, tuple t) Emit(tuple t, null) class Reducer method Reduce(tuple t, array n) // n is an array of one or two nulls if n.size() = 2 Emit(tuple t, null) |
²îÒ죨Difference£©
¼ÙÉèÓÐÁ½¸öÊý¾Ý¼¯RºÍS£¬ÎÒÃÇÒªÕÒ³öRÓëSµÄ²îÒì¡£Mapper½«ËùÓеÄÔª×é×öÉϱê¼Ç£¬±íÃ÷ËûÃÇÀ´×ÔÓÚR»¹ÊÇS£¬ReducerÖ»Êä³öÄÇЩ´æÔÚÓÚRÖжø²»ÔÚSÖеļǼ¡£
class Mapper method Map(rowkey key, tuple t) Emit(tuple t, string t.SetName) // t.SetName is either 'R' or 'S' class Reducer method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R'] if n.size() = 1 and n[1] = 'R' Emit(tuple t, null) |
·Ö×é¾ÛºÏ£¨GroupBy and Aggregation£©
·Ö×é¾ÛºÏ¿ÉÒÔÔÚÈçϵÄÒ»¸öMapReduceÖÐÍê³É¡£Mapper³éÈ¡Êý¾Ý²¢½«Ö®·Ö×é¾ÛºÏ£¬Reducer
ÖжÔÊÕµ½µÄÊý¾ÝÔٴξۺϡ£µäÐ͵ľۺÏÓ¦ÓñÈÈçÇóºÍÓë×îÖµ¿ÉÒÔÒÔÁ÷µÄ·½Ê½½øÐмÆË㣬Òò¶ø²»ÐèҪͬʱ±£ÓÐËùÓеÄÖµ¡£µ«ÊÇÁíÍâһЩÇé¾°¾Í±ØÐëÒªÁ½½×¶ÎMapReduce£¬Ç°ÃæÌáµ½¹ýµÄΩһֵģʽ¾ÍÊÇÒ»¸öÕâÖÖÀàÐ͵ÄÀý×Ó¡£
class Mapper method Map(null, tuple [value GroupBy, value AggregateBy, value ...]) Emit(value GroupBy, value AggregateBy) class Reducer method Reduce(value GroupBy, [v1, v2,...]) Emit(value GroupBy, aggregate( [v1, v2,...] ) ) // aggregate() : sum(), max(),... |
Á¬½Ó£¨Joining£©
MapperReduce¿ò¼Ü¿ÉÒԺܺõش¦ÀíÁ¬½Ó£¬²»¹ýÔÚÃæ¶Ô²»Í¬µÄÊý¾ÝÁ¿ºÍ´¦ÀíЧÂÊÒªÇóµÄʱºò»¹ÊÇÓÐһЩ¼¼ÇÉ¡£ÔÚÕⲿ·ÖÎÒÃÇ»á½éÉÜһЩ»ù±¾·½·¨£¬ÔÚºóÃæµÄ²Î¿¼ÎĵµÖл¹ÁгöÁËһЩ¹ØÓÚÕâ·½ÃæµÄרÌâÎÄÕ¡£
·ÖÅäºóÁ¬½Ó £¨Reduce¶ËÁ¬½Ó,ÅÅÐò-ºÏ²¢Á¬½Ó£©
Õâ¸öËã·¨°´ÕÕ¼üKÀ´Á¬½ÓÊý¾Ý¼¯RºÍL¡£Mapper ±éÀúRºÍLÖеÄËùÓÐÔª×飬ÒÔKΪ¼üÊä³öÿһ¸ö±ê¼ÇÁËÀ´×ÔÓÚR»¹ÊÇLµÄÔª×飬Reducer°Ñͬһ¸öKµÄÊý¾Ý·Ö×°ÈëÁ½¸öÈÝÆ÷£¨RºÍL£©£¬È»ºóǶÌ×Ñ»·±éÀúÁ½¸öÈÝÆ÷ÖеÄÊý¾ÝÒԵõ½½»¼¯£¬×îºóÊä³öµÄÿһÌõ½á¹û¶¼°üº¬ÁËRÖеÄÊý¾Ý¡¢LÖеÄÊý¾ÝºÍK¡£ÕâÖÖ·½·¨ÓÐÒÔÏÂȱµã£º
MapperÒªÊä³öËùÓеÄÊý¾Ý£¬¼´Ê¹Ò»Ð©keyÖ»»áÔÚÒ»¸ö¼¯ºÏÖгöÏÖ¡£
Reducer ÒªÔÚÄÚ´æÖб£ÓÐÒ»¸ökeyµÄËùÓÐÊý¾Ý£¬Èç¹ûÊý¾ÝÁ¿´ò¹ýÁËÄڴ棬ÄÇô¾ÍÒª»º´æµ½Ó²ÅÌÉÏ£¬Õâ¾ÍÔö¼ÓÁËÓ²ÅÌIOµÄÏûºÄ¡£
¾¡¹ÜÈç´Ë£¬ÔÙ·ÖÅäÁ¬½Ó·½Ê½ÈÔÈ»ÊÇ×îͨÓõķ½·¨£¬ÌرðÊÇÆäËûÓÅ»¯¼¼Êõ¶¼²»ÊÊÓõÄʱºò¡£
class Mapper method Map(null, tuple [join_key k, value v1, value v2,...]) Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] ) class Reducer method Reduce(join_key k, tagged_tuples [t1, t2,...]) H = new AssociativeArray : set_name -> values for all tagged_tuple t in [t1, t2,...] // separate values into 2 arrays H{t.tag}.add(t.values) for all values r in H{'R'} // produce a cross-join of the two arrays for all values l in H{'L'} Emit(null, [k r l] ) |
¸´ÖÆÁ´½ÓReplicated Join £¨Mapper¶ËÁ¬½Ó, Hash
Á¬½Ó£©
ÔÚʵ¼ÊÓ¦ÓÃÖУ¬½«Ò»¸öСÊý¾Ý¼¯ºÍÒ»¸ö´óÊý¾Ý¼¯Á¬½ÓÊǺܳ£¼ûµÄ£¨ÈçÓû§ÓëÈÕÖ¾¼Ç¼£©¡£¼Ù¶¨ÒªÁ¬½ÓÁ½¸ö¼¯ºÏRºÍL£¬ÆäÖÐRÏà¶Ô½ÏС£¬ÕâÑù£¬¿ÉÒÔ°ÑR·Ö·¢¸øËùÓеÄMapper£¬Ã¿¸öMapper¶¼¿ÉÒÔÔØÈëËü²¢ÒÔÁ¬½Ó¼üÀ´Ë÷ÒýÆäÖеÄÊý¾Ý£¬×î³£ÓúÍÓÐЧµÄË÷Òý¼¼Êõ¾ÍÊǹþÏ£±í¡£Ö®ºó£¬Mapper±éÀúL£¬²¢½«ÆäÓë´æ´¢ÔÚ¹þÏ£±íÖеÄRÖеÄÏàÓ¦¼Ç¼Á¬½Ó£¬¡£ÕâÖÖ·½·¨·Ç³£¸ßЧ£¬ÒòΪ²»ÐèÒª¶ÔLÖеÄÊý¾ÝÅÅÐò£¬Ò²²»ÐèҪͨ¹ýÍøÂç´«ËÍLÖеÄÊý¾Ý£¬µ«ÊÇR±ØÐë×㹻Сµ½Äܹ»·Ö·¢¸øËùÓеÄMapper¡£ |