Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
MapReduceµÄģʽ¡¢Ëã·¨ºÍÓÃÀý
 
×÷Õß Juliashine£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-06-05
 

ÔÚÕâÆªÎÄÕÂÀï×ܽáÁ˼¸ÖÖÍøÉÏ»òÕßÂÛÎÄÖг£¼ûµÄMapReduceģʽºÍËã·¨£¬²¢ÏµÍ³»¯µÄ½âÊÍÁËÕâЩ¼¼ÊõµÄ²»Í¬Ö®´¦¡£ËùÓÐÃèÊöÐÔµÄÎÄ×ֺʹúÂ붼ʹÓÃÁ˱ê×¼hadoopµÄMapReduceÄ£ÐÍ£¬°üÀ¨Mappers, Reduces, Combiners, Partitioners,ºÍ sorting¡£ÈçÏÂͼËùʾ¡£

MapReduceģʽ

¼ÆÊýÓëÇóºÍ

ÎÊÌâ³ÂÊö: ÓÐÐí¶àÎĵµ£¬Ã¿¸öÎĵµ¶¼ÓÐһЩ×Ö¶Î×é³É¡£ÐèÒª¼ÆËã³öÿ¸ö×Ö¶ÎÔÚËùÓÐÎĵµÖеijöÏÖ´ÎÊý»òÕßÕâЩ×Ö¶ÎµÄÆäËûʲôͳ¼ÆÖµ¡£ÀýÈ磬¸ø¶¨Ò»¸ölogÎļþ£¬ÆäÖеÄÿÌõ¼Ç¼¶¼°üº¬Ò»¸öÏìӦʱ¼ä£¬ÐèÒª¼ÆËã³öƽ¾ùÏìӦʱ¼ä¡£

½â¾ö·½°¸:

ÈÃÎÒÃÇÏÈ´Ó¼òµ¥µÄÀý×ÓÈëÊÖ¡£ÔÚÏÂÃæµÄ´úÂëÆ¬¶ÎÀMapperÿÓöµ½Ö¸¶¨´Ê¾Í°ÑƵ´Î¼Ç1£¬ReducerÒ»¸ö¸ö±éÀúÕâЩ´ÊµÄ¼¯ºÏÈ»ºó°ÑËûÃÇµÄÆµ´Î¼ÓºÍ¡£

class Mapper
method Map(docid id, doc d)
for all term t in doc d do
Emit(term t, count 1)

class Reducer
method Reduce(term t, counts [c1, c2,...])
sum = 0
for all count c in [c1, c2,...] do
sum = sum + c
Emit(term t, count sum)

ÕâÖÖ·½·¨µÄȱµãÏÔ¶øÒ×¼û£¬MapperÌá½»ÁËÌ«¶àÎÞÒâÒåµÄ¼ÆÊý¡£ËüÍêÈ«¿ÉÒÔͨ¹ýÏȶÔÿ¸öÎĵµÖеĴʽøÐмÆÊý´Ó¶ø¼õÉÙ´«µÝ¸øReducerµÄÊý¾ÝÁ¿:

class Mapper
method Map(docid id, doc d)
H = new AssociativeArray
for all term t in doc d do
H{t} = H{t} + 1
for all term t in H do
Emit(term t, count H{t})

Èç¹ûÒªÀۼƼÆÊýµÄµÄ²»Ö»Êǵ¥¸öÎĵµÖеÄÄÚÈÝ£¬»¹°üÀ¨ÁËÒ»¸öMapper½Úµã´¦ÀíµÄËùÓÐÎĵµ£¬ÄǾÍÒªÓõ½CombinerÁË:

class Mapper
method Map(docid id, doc d)
for all term t in doc d do
Emit(term t, count 1)

class Combiner
method Combine(term t, [c1, c2,...])
sum = 0
for all count c in [c1, c2,...] do
sum = sum + c
Emit(term t, count sum)

class Reducer
method Reduce(term t, counts [c1, c2,...])
sum = 0
for all count c in [c1, c2,...] do
sum = sum + c
Emit(term t, count sum)

Ó¦ÓÃ:

Log ·ÖÎö, Êý¾Ý²éѯ

ÕûÀí¹éÀà

ÎÊÌâ³ÂÊö:

ÓÐһϵÁÐÌõÄ¿£¬Ã¿¸öÌõÄ¿¶¼Óм¸¸öÊôÐÔ£¬Òª°Ñ¾ßÓÐͬһÊôÐÔÖµµÄÌõÄ¿¶¼±£´æÔÚÒ»¸öÎļþÀ»òÕß°ÑÌõÄ¿°´ÕÕÊôÐÔÖµ·Ö×é¡£ ×îµäÐ͵ÄÓ¦ÓÃÊǵ¹ÅÅË÷Òý¡£

½â¾ö·½°¸£º

½â¾ö·½°¸ºÜ¼òµ¥¡£ ÔÚ Mapper ÖÐÒÔÿ¸öÌõÄ¿µÄËùÐèÊôÐÔÖµ×÷Ϊ key£¬Æä±¾Éí×÷Ϊֵ´«µÝ¸ø Reducer¡£ Reducer È¡µÃ°´ÕÕÊôÐÔÖµ·Ö×éµÄÌõÄ¿£¬È»ºó¿ÉÒÔ´¦Àí»òÕß±£´æ¡£Èç¹ûÊÇÔÚ¹¹½¨µ¹ÅÅË÷Òý£¬ÄÇô ÿ¸öÌõÄ¿Ï൱ÓÚÒ»¸ö´Ê¶øÊôÐÔÖµ¾ÍÊÇ´ÊËùÔÚµÄÎĵµID¡£

Ó¦ÓÃ:

µ¹ÅÅË÷Òý£¬ ETL

¹ýÂË (Îı¾²éÕÒ)£¬½âÎöºÍУÑé

ÎÊÌâ³ÂÊö:

¼ÙÉèÓкܶàÌõ¼Ç¼£¬ÐèÒª´ÓÆäÖÐÕÒ³öÂú×ãij¸öÌõ¼þµÄËùÓмǼ£¬»òÕß½«Ã¿Ìõ¼Ç¼´«»»³ÉÁíÍâÒ»ÖÖÐÎʽ£¨×ª»»²Ù×÷Ïà¶ÔÓÚ¸÷Ìõ¼Ç¼¶ÀÁ¢£¬¼´¶ÔÒ»Ìõ¼Ç¼µÄ²Ù×÷ÓëÆäËû¼Ç¼Î޹أ©¡£ÏñÎı¾½âÎö¡¢Ìض¨Öµ³éÈ¡¡¢¸ñʽת»»µÈ¶¼ÊôÓÚºóÒ»ÖÖÓÃÀý¡£

½â¾ö·½°¸:

·Ç³£¼òµ¥£¬ÔÚMapper ÀïÖðÌõ½øÐвÙ×÷£¬Êä³öÐèÒªµÄÖµ»òת»»ºóµÄÐÎʽ¡£

Ó¦ÓÃ:

ÈÕÖ¾·ÖÎö£¬Êý¾Ý²éѯ£¬ETL£¬Êý¾ÝУÑé

·Ö²¼Ê½ÈÎÎñÖ´ÐÐ

ÎÊÌâ³ÂÊö:

´óÐͼÆËã¿ÉÒÔ·Ö½âΪ¶à¸ö²¿·Ö·Ö±ð½øÐÐÈ»ºóºÏ²¢¸÷¸ö¼ÆËãµÄ½á¹ûÒÔ»ñµÃ×îÖÕ½á¹û¡£

½â¾ö·½°¸: ½«Êý¾ÝÇзֳɶà·Ý×÷Ϊÿ¸ö Mapper µÄÊäÈ룬ÿ¸öMapper´¦ÀíÒ»·ÝÊý¾Ý£¬Ö´ÐÐͬÑùµÄÔËË㣬²úÉú½á¹û£¬Reducer°Ñ¶à¸öMapperµÄ½á¹û×éºÏ³ÉÒ»¸ö¡£

°¸ÀýÑо¿£º Êý×ÖͨÐÅϵͳģÄâ

Ïñ WiMAX ÕâÑùµÄÊý×ÖͨÐÅÄ£ÄâÈí¼þͨ¹ýϵͳģÐÍÀ´´«Êä´óÁ¿µÄËæ»úÊý¾Ý£¬È»ºó¼ÆËã´«ÊäÖеĴíÎó¼¸ÂÊ¡£ ÿ¸ö Mapper ´¦ÀíÑù±¾ 1/N µÄÊý¾Ý£¬¼ÆËã³öÕⲿ·ÖÊý¾ÝµÄ´íÎóÂÊ£¬È»ºóÔÚ Reducer Àï¼ÆËãÆ½¾ù´íÎóÂÊ¡£

Ó¦ÓÃ:

¹¤³ÌÄ£Ä⣬Êý×Ö·ÖÎö£¬ÐÔÄܲâÊÔ

ÅÅÐò

ÎÊÌâ³ÂÊö:

ÓÐÐí¶àÌõ¼Ç¼£¬ÐèÒª°´ÕÕijÖÖ¹æÔò½«ËùÓмǼÅÅÐò»òÊǰ´ÕÕ˳ÐòÀ´´¦Àí¼Ç¼¡£

½â¾ö·½°¸: ¼òµ¥ÅÅÐòºÜºÃ°ì ¨C Mappers ½«´ýÅÅÐòµÄÊôÐÔֵΪ¼ü£¬ÕûÌõ¼Ç¼ΪֵÊä³ö¡£ ²»¹ýʵ¼ÊÓ¦ÓÃÖеÄÅÅÐòÒª¸ü¼ÓÇÉÃîÒ»µã£¬ Õâ¾ÍÊÇËüÖ®ËùÒÔ±»³ÆÎªMapReduce ºËÐĵÄÔ­Òò£¨¡°ºËÐÄ¡±ÊÇ˵ÅÅÐò£¿ÒòΪ֤Ã÷Hadoop¼ÆËãÄÜÁ¦µÄʵÑéÊÇ´óÊý¾ÝÅÅÐò£¿»¹ÊÇ˵HadoopµÄ´¦Àí¹ý³ÌÖжÔkeyÅÅÐòµÄ»·½Ú£¿£©¡£ÔÚʵ¼ùÖУ¬³£ÓÃ×éºÏ¼üÀ´ÊµÏÖ¶þ´ÎÅÅÐòºÍ·Ö×é¡£

MapReduce ×î³õÖ»Äܹ»¶Ô¼üÅÅÐò£¬ µ«ÊÇÒ²Óм¼ÊõÀûÓÿÉÒÔÀûÓÃHadoop µÄÌØÐÔÀ´ÊµÏÖ°´ÖµÅÅÐò¡£ÏëÁ˽âµÄ»°¿ÉÒÔ¿´ ÕâÆª²©¿Í¡£

°´ÕÕBigTableµÄ¸ÅÄʹÓà MapReduceÀ´¶Ô×î³õÊý¾Ý¶ø·ÇÖмäÊý¾ÝÅÅÐò£¬Ò²¼´±£³ÖÊý¾ÝµÄÓÐÐò״̬¸üÓкô¦£¬±ØÐë×¢ÒâÕâÒ»µã¡£»»¾ä»°Ëµ£¬ÔÚÊý¾Ý²åÈëʱÅÅÐòÒ»´ÎÒª±ÈÔÚÿ´Î²éѯÊý¾ÝµÄʱºòÅÅÐò¸ü¸ßЧ¡£

Ó¦ÓÃ:

ETL£¬Êý¾Ý·ÖÎö

·Ç»ù±¾ MapReduce ģʽ

µü´úÏûÏ¢´«µÝ (ͼ´¦Àí)

ÎÊÌâ³ÂÊö£º

¼ÙÉèÒ»¸öʵÌåÍøÂ磬ʵÌåÖ®¼ä´æÔÚ׏ØÏµ¡£ ÐèÒª°´ÕÕÓëËü±ÈÁ򵀮äËûʵÌåµÄÊôÐÔ¼ÆËã³öÒ»¸ö״̬¡£Õâ¸ö״̬¿ÉÒÔ±íÏÖΪËüºÍÆäËü½ÚµãÖ®¼äµÄ¾àÀ룬 ´æÔÚÌØ¶¨ÊôÐÔµÄÁÚ½ÓµãµÄ¼£Ïó£¬ ÁÚÓòÃܶÈÌØÕ÷µÈµÈ¡£

½â¾ö·½°¸£º

ÍøÂç´æ´¢ÎªÏµÁнڵãµÄ½áºÏ£¬Ã¿¸ö½Úµã°üº¬ÓÐÆäËùÓÐÁÚ½ÓµãIDµÄÁÐ±í¡£°´ÕÕÕâ¸ö¸ÅÄMapReduce µü´ú½øÐУ¬Ã¿´Îµü´úÖÐÿ¸ö½Úµã¶¼·¢ÏûÏ¢¸øËüµÄÁڽӵ㡣ÁÚ½Óµã¸ù¾Ý½ÓÊÕµ½µÄÐÅÏ¢¸üÐÂ×Ô¼ºµÄ״̬¡£µ±Âú×ãÁËijЩÌõ¼þµÄʱºòµü´úÍ£Ö¹£¬Èç´ïµ½ÁË×î´óµü´ú´ÎÊý£¨ÍøÂç°ë¾¶£©»òÁ½´ÎÁ¬ÐøµÄµü´ú¼¸ºõûÓÐ״̬¸Ä±ä¡£´Ó¼¼ÊõÉÏÀ´¿´£¬Mapper ÒÔÿ¸öÁÚ½ÓµãµÄIDΪ¼ü·¢³öÐÅÏ¢£¬ËùÓеÄÐÅÏ¢¶¼»á°´ÕÕ½ÓÊܽڵã·Ö×飬reducer ¾ÍÄܹ»ÖØËã¸÷½ÚµãµÄ״̬Ȼºó¸üÐÂÄÇЩ״̬¸Ä±äÁ˵Ľڵ㡣ÏÂÃæÕ¹Ê¾ÁËÕâ¸öËã·¨£º

class Mapper
method Map(id n, object N)
Emit(id n, object N)
for all id m in N.OutgoingRelations do
Emit(id m, message getMessage(N))

class Reducer
method Reduce(id m, [s1, s2,...])
M = null
messages = []
for all s in [s1, s2,...] do
if IsObject(s) then
M = s
else // s is a message
messages.add(s)
M.State = calculateState(messages)
Emit(id m, item M)

Ò»¸ö½ÚµãµÄ״̬¿ÉÒÔѸËÙµÄÑØ×ÅÍøÂç´«È«Íø£¬ÄÇЩ±»¸ÐȾÁ˵ĽڵãÓÖÈ¥¸ÐȾËüÃǵÄÁÚ¾Ó£¬Õû¸ö¹ý³Ì¾ÍÏñÏÂÃæµÄͼʾһÑù£º

°¸ÀýÑо¿£º ÑØ·ÖÀàÊ÷µÄÓÐЧÐÔ´«µÝ

ÎÊÌâ³ÂÊö£º

Õâ¸öÎÊÌâÀ´×ÔÓÚÕæÊµµÄµç×ÓÉÌÎñÓ¦Ó᣽«¸÷ÖÖ»õÎï·ÖÀ࣬ÕâЩÀà±ð¿ÉÒÔ×é³ÉÒ»¸öÊ÷Ðνṹ£¬±È½Ï´óµÄ·ÖÀࣨÏñÄÐÈË¡¢Å®ÈË¡¢¶ùͯ£©¿ÉÒÔÔÙ·Ö³öС·ÖÀࣨÏñÄпã»òÅ®×°£©£¬Ö±µ½²»ÄÜÔÙ·ÖΪֹ£¨ÏñÄÐʽÀ¶É«Å£×п㣩¡£ÕâЩ²»ÄÜÔٷֵĻù²ãÀà±ð¿ÉÒÔÊÇÓÐЧ£¨Õâ¸öÀà±ð°üº¬ÓлõÆ·£©»òÕßÒÑÎÞЧµÄ£¨Ã»ÓÐÊôÓÚÕâ¸ö·ÖÀàµÄ»õÆ·£©¡£Èç¹ûÒ»¸ö·ÖÀàÖÁÉÙº¬ÓÐÒ»¸öÓÐЧµÄ×Ó·ÖÀàÄÇôÈÏΪÕâ¸ö·ÖÀàÒ²ÊÇÓÐЧµÄ¡£ÎÒÃÇÐèÒªÔÚÒÑ֪һЩ»ù²ã·ÖÀàÓÐЧµÄÇé¿öÏÂÕÒ³ö·ÖÀàÊ÷ÉÏËùÓÐÓÐЧµÄ·ÖÀà¡£

½â¾ö·½°¸£º

Õâ¸öÎÊÌâ¿ÉÒÔÓÃÉÏÒ»½ÚÌáµ½µÄ¿ò¼ÜÀ´½â¾ö¡£ÎÒÃÇÕ¦ÏÂÃæ¶¨ÒåÁËÃûΪ getMessageºÍ calculateState µÄ·½·¨£º

class N
State in {True = 2, False = 1, null = 0},
initialized 1 or 2 for end-of-line categories, 0 otherwise
method getMessage(object N)
return N.State
method calculateState(state s, data [d1, d2,...])
return max( [d1, d2,...] )

°¸ÀýÑо¿£º¹ã¶ÈÓÅÏÈËÑË÷

ÎÊÌâ³ÂÊö£ºÐèÒª¼ÆËã³öÒ»¸öͼ½á¹¹ÖÐijһ¸ö½Úµãµ½ÆäËüËùÓнڵãµÄ¾àÀë¡£

½â¾ö·½°¸£º SourceÔ´½Úµã¸øËùÓÐÁڽӵ㷢³öֵΪ0µÄÐźţ¬ÁÚ½Óµã°ÑÊÕµ½µÄÐźÅÔÙת·¢¸ø×Ô¼ºµÄÁڽӵ㣬ÿת·¢Ò»´Î¾Í¶ÔÐźÅÖµ¼Ó1£º

class N
State is distance,
initialized 0 for source node, INFINITY for all other nodes
method getMessage(N)
return N.State + 1
method calculateState(state s, data [d1, d2,...])
min( [d1, d2,...] )

°¸ÀýÑо¿£ºÍøÒ³ÅÅÃûºÍ Mapper ¶ËÊý¾Ý¾ÛºÏ

Õâ¸öËã·¨ÓÉGoogleÌá³ö£¬Ê¹ÓÃȨÍþµÄPageRankËã·¨£¬Í¨¹ýÁ¬½Óµ½Ò»¸öÍøÒ³µÄÆäËûÍøÒ³À´¼ÆËãÍøÒ³µÄÏà¹ØÐÔ¡£ÕæÊµËã·¨ÊÇÏ൱¸´Ôӵ쬵«ÊǺËÐÄ˼ÏëÊÇÈ¨ÖØ¿ÉÒÔ´«²¥£¬Ò²¼´Í¨¹ýÒ»¸ö½ÚµãµÄ¸÷Áª½Ó½ÚµãµÄÈ¨ÖØµÄ¾ùÖµÀ´¼ÆËã½Úµã×ÔÉíµÄÈ¨ÖØ¡£

class N
State is PageRank
method getMessage(object N)
return N.State / N.OutgoingRelations.size()
method calculateState(state s, data [d1, d2,...])
return ( sum([d1, d2,...]) )

ÒªÖ¸³öµÄÊÇÉÏÃæÓÃÒ»¸öÊýÖµÀ´×÷ΪÆÀ·Öʵ¼ÊÉÏÊÇÒ»ÖÖ¼ò»¯£¬ÔÚʵ¼ÊÇé¿öÏ£¬ÎÒÃÇÐèÒªÔÚMapper¶ËÀ´½øÐоۺϼÆËãµÃ³öÕâ¸öÖµ¡£ÏÂÃæµÄ´úÂëÆ¬¶ÎչʾÁËÕâ¸ö¸Ä±äºóµÄÂß¼­ £¨Õë¶ÔÓÚ PageRank Ëã·¨£©£º

class Mapper
method Initialize
H = new AssociativeArray
method Map(id n, object N)
p = N.PageRank / N.OutgoingRelations.size()
Emit(id n, object N)
for all id m in N.OutgoingRelations do
H{m} = H{m} + p
method Close
for all id n in H do
Emit(id n, value H{n})

class Reducer
method Reduce(id m, [s1, s2,...])
M = null
p = 0
for all s in [s1, s2,...] do
if IsObject(s) then
M = s
else
p = p + s
M.PageRank = p
Emit(id m, item M)

Ó¦Óãº

ͼ·ÖÎö£¬ÍøÒ³Ë÷Òý

ÖµÈ¥ÖØ £¨¶ÔΨһÏî¼ÆÊý£©

ÎÊÌâ³ÂÊö: ¼Ç¼°üº¬ÖµÓòFºÍÖµÓò G£¬Òª·Ö±ðͳ¼ÆÏàͬGÖµµÄ¼Ç¼Öв»Í¬µÄFÖµµÄÊýÄ¿ (Ï൱ÓÚ°´ÕÕ G·Ö×é).

Õâ¸öÎÊÌâ¿ÉÒÔÍÆ¶ø¹ãÖ®Ó¦ÓÃÓÚ·ÖÃæËÑË÷£¨Ä³Ð©µç×ÓÉÌÎñÍøÕ¾³ÆÖ®ÎªNarrow Search£©

Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}

Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2

½â¾ö·½°¸ I:

µÚÒ»ÖÖ·½·¨ÊÇ·ÖÁ½¸ö½×¶ÎÀ´½â¾öÕâ¸öÎÊÌâ¡£µÚÒ»½×¶ÎÔÚMapperÖÐʹÓÃFºÍG×é³ÉÒ»¸ö¸´ºÏÖµ¶Ô£¬È»ºóÔÚReducerÖÐÊä³öÿ¸öÖµ¶Ô£¬Ä¿µÄÊÇΪÁ˱£Ö¤FÖµµÄΨһÐÔ¡£ÔÚµÚ¶þ½×¶Î£¬ÔÙ½«Öµ¶Ô°´ÕÕGÖµÀ´·Ö×鼯Ëãÿ×éÖеÄÌõÄ¿Êý¡£

µÚÒ»½×¶Î£º

 class Mapper
method Map(null, record [value f, categories [g1, g2,...]])
for all category g in [g1, g2,...]
Emit(record [g, f], count 1)

class Reducer
method Reduce(record [g, f], counts [n1, n2, ...])
Emit(record [g, f], null )

µÚ¶þ½×¶Î£º

 class Mapper
method Map(record [f, g], null)
Emit(value g, count 1)

class Reducer
method Reduce(value g, counts [n1, n2,...])
Emit(value g, sum( [n1, n2,...] ) )

½â¾ö·½°¸ II:

µÚ¶þÖÖ·½·¨Ö»ÐèÒªÒ»´ÎMapReduce ¼´¿ÉʵÏÖ£¬µ«À©Õ¹ÐÔ²»Ç¿¡£Ëã·¨ºÜ¼òµ¥-Mapper Êä³öÖµºÍ·ÖÀ࣬ÔÚReducerÀïΪÿ¸öÖµ¶ÔÓ¦µÄ·ÖÀàÈ¥ÖØÈ»ºó¸øÃ¿¸öËùÊôµÄ·ÖÀà¼ÆÊý¼Ó1£¬×îºóÔÙÔÚReducer½áÊøºó½«ËùÓмÆÊý¼ÓºÍ¡£ÕâÖÖ·½·¨ÊÊÓÃÓÚÖ»ÓÐÓÐÏÞ¸ö·ÖÀ࣬¶øÇÒÓµÓÐÏàͬFÖµµÄ¼Ç¼²»ÊǺܶàµÄÇé¿ö¡£ÀýÈçÍøÂçÈÕÖ¾´¦ÀíºÍÓû§·ÖÀ࣬Óû§µÄ×ÜÊýºÜ¶à£¬µ«ÊÇÿ¸öÓû§µÄʼþÊÇÓÐÏ޵ģ¬ÒÔ´Ë·ÖÀàµÃµ½µÄÀà±ðÒ²ÊÇÓÐÏ޵ġ£ÖµµÃÒ»ÌáµÄÊÇÔÚÕâÖÖģʽÏ¿ÉÒÔÔÚÊý¾Ý´«Êäµ½Reducer֮ǰʹÓÃCombinerÀ´È¥³ý·ÖÀàµÄÖØ¸´Öµ¡£

class Mapper
method Map(null, record [value f, categories [g1, g2,...] )
for all category g in [g1, g2,...]
Emit(value f, category g)

class Reducer
method Initialize
H = new AssociativeArray : category -> count
method Reduce(value f, categories [g1, g2,...])
[g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
for all category g in [g1', g2',...]
H{g} = H{g} + 1
method Close
for all category g in H do
Emit(category g, count H{g})

Ó¦Óãº

ÈÕÖ¾·ÖÎö£¬Óû§¼ÆÊý

»¥Ïà¹Ø

ÎÊÌâ³ÂÊö£ºÓжà¸ö¸÷ÓÉÈô¸ÉÏî¹¹³ÉµÄ×飬¼ÆËãÏîÁ½Á½¹²Í¬³öÏÖÓÚÒ»¸ö×éÖеĴÎÊý¡£¼ÙÈçÏîÊýÊÇN£¬ÄÇôӦ¸Ã¼ÆËãN*N¡£

ÕâÖÖÇé¿ö³£¼ûÓÚÎı¾·ÖÎö£¨ÌõÄ¿Êǵ¥´Ê¶øÔª×éÊǾä×Ó£©£¬Êг¡·ÖÎö£¨¹ºÂòÁË´ËÎïµÄ¿Í»§»¹¿ÉÄܹºÂòʲô£©¡£Èç¹ûN*NСµ½¿ÉÒÔÈÝÄÉÓÚһ̨»úÆ÷µÄÄڴ棬ʵÏÖÆðÀ´¾Í±È½Ï¼òµ¥ÁË¡£

Åä¶Ô·¨

µÚÒ»ÖÖ·½·¨ÊÇÔÚMapperÖиøËùÓÐÌõÄ¿Åä¶Ô£¬È»ºóÔÚReducerÖн«Í¬Ò»ÌõÄ¿¶ÔµÄ¼ÆÊý¼ÓºÍ¡£µ«ÕâÖÖ×ö·¨Ò²ÓÐȱµã£º

ʹÓà combiners ´øÀ´µÄµÄºÃ´¦ÓÐÏÞ£¬ÒòΪºÜ¿ÉÄÜËùÓÐÏî¶Ô¶¼ÊÇΨһµÄ
²»ÄÜÓÐЧÀûÓÃÄÚ´æ

class Mapper
method Map(null, items [i1, i2,...] )
for all item i in [i1, i2,...]
for all item j in [i1, i2,...]
Emit(pair [i j], count 1)

class Reducer
method Reduce(pair [i j], counts [c1, c2,...])
s = sum([c1, c2,...])
Emit(pair[i j], count s)

Stripes Approach£¨Ìõ·½·¨£¿²»ÖªµÀÕâ¸öÃû×ÖÔõôÀí½â£©

µÚ¶þÖÖ·½·¨Êǽ«Êý¾Ý°´ÕÕpairÖеĵÚÒ»ÏîÀ´·Ö×飬²¢Î¬»¤Ò»¸ö¹ØÁªÊý×飬Êý×éÖд洢µÄÊÇËùÓйØÁªÏîµÄ¼ÆÊý¡£The second approach is to group data by the first item in pair and maintain an associative array (¡°stripe¡±) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

Öмä½á¹ûµÄ¼üÊýÁ¿Ïà¶Ô½ÏÉÙ£¬Òò´Ë¼õÉÙÁËÅÅÐòÏûºÄ¡£

¿ÉÒÔÓÐЧÀûÓà combiners¡£

¿ÉÔÚÄÚ´æÖÐÖ´ÐУ¬²»¹ýÈç¹ûûÓÐÕýÈ·Ö´ÐеĻ°Ò²»á´øÀ´ÎÊÌâ¡£

ʵÏÖÆðÀ´±È½Ï¸´ÔÓ¡£

Ò»°ãÀ´Ëµ£¬ ¡°stripes¡± ±È ¡°pairs¡± ¸ü¿ì

class Mapper
method Map(null, items [i1, i2,...] )
for all item i in [i1, i2,...]
H = new AssociativeArray : item -> counter
for all item j in [i1, i2,...]
H{j} = H{j} + 1
Emit(item i, stripe H)

class Reducer
method Reduce(item i, stripes [H1, H2,...])
H = new AssociativeArray : item -> counter
H = merge-sum( [H1, H2,...] )
for all item j in H.keys()
Emit(pair [i j], H{j})

Ó¦Óãº

Îı¾·ÖÎö£¬Êг¡·ÖÎö

References:

Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

ÓÃMapReduce ±í´ï¹ØÏµÄ£Ê½

ÔÚÕⲿ·ÖÎÒÃÇ»áÌÖÂÛÒ»ÏÂÔõôʹÓÃMapReduceÀ´½øÐÐÖ÷ÒªµÄ¹ØÏµ²Ù×÷¡£

ɸѡ£¨Selection£©

class Mapper
method Map(rowkey key, tuple t)
if t satisfies the predicate
Emit(tuple t, null)

ͶӰ£¨Projection£©

ͶӰֻ±ÈɸѡÉÔ΢¸´ÔÓÒ»µã£¬ÔÚÕâÖÖÇé¿öÏÂÎÒÃÇ¿ÉÒÔÓÃReducerÀ´Ïû³ý¿ÉÄܵÄÖØ¸´Öµ¡£

class Mapper
method Map(rowkey key, tuple t)
tuple g = project(t) // extract required fields to tuple g
Emit(tuple g, null)

class Reducer
method Reduce(tuple t, array n) // n is an array of nulls
Emit(tuple t, null)

ºÏ²¢£¨Union£©

Á½¸öÊý¾Ý¼¯ÖеÄËùÓмǼ¶¼ËÍÈëMapper£¬ÔÚReducerÀïÏûÖØ¡£

class Mapper
method Map(rowkey key, tuple t)
Emit(tuple t, null)

class Reducer
method Reduce(tuple t, array n) // n is an array of one or two nulls
Emit(tuple t, null)

½»¼¯£¨Intersection£©

½«Á½¸öÊý¾Ý¼¯ÖÐÐèÒª×ö½»²æµÄ¼Ç¼ÊäÈëMapper£¬Reducer Êä³ö³öÏÖÁËÁ½´ÎµÄ¼Ç¼¡£ÒòΪÿÌõ¼Ç¼¶¼ÓÐÒ»¸öÖ÷¼ü£¬ÔÚÿ¸öÊý¾Ý¼¯ÖÐÖ»»á³öÏÖÒ»´Î£¬ËùÒÔÕâÑù×öÊÇ¿ÉÐеġ£

class Mapper
method Map(rowkey key, tuple t)
Emit(tuple t, null)

class Reducer
method Reduce(tuple t, array n) // n is an array of one or two nulls
if n.size() = 2
Emit(tuple t, null)

²îÒ죨Difference£©

¼ÙÉèÓÐÁ½¸öÊý¾Ý¼¯RºÍS£¬ÎÒÃÇÒªÕÒ³öRÓëSµÄ²îÒì¡£Mapper½«ËùÓеÄÔª×é×öÉϱê¼Ç£¬±íÃ÷ËûÃÇÀ´×ÔÓÚR»¹ÊÇS£¬ReducerÖ»Êä³öÄÇЩ´æÔÚÓÚRÖжø²»ÔÚSÖеļǼ¡£

class Mapper
method Map(rowkey key, tuple t)
Emit(tuple t, string t.SetName) // t.SetName is either 'R' or 'S'

class Reducer
method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R']
if n.size() = 1 and n[1] = 'R'
Emit(tuple t, null)

·Ö×é¾ÛºÏ£¨GroupBy and Aggregation£©

·Ö×é¾ÛºÏ¿ÉÒÔÔÚÈçϵÄÒ»¸öMapReduceÖÐÍê³É¡£Mapper³éÈ¡Êý¾Ý²¢½«Ö®·Ö×é¾ÛºÏ£¬Reducer ÖжÔÊÕµ½µÄÊý¾ÝÔٴξۺϡ£µäÐ͵ľۺÏÓ¦ÓñÈÈçÇóºÍÓë×îÖµ¿ÉÒÔÒÔÁ÷µÄ·½Ê½½øÐмÆË㣬Òò¶ø²»ÐèҪͬʱ±£ÓÐËùÓеÄÖµ¡£µ«ÊÇÁíÍâһЩÇé¾°¾Í±ØÐëÒªÁ½½×¶ÎMapReduce£¬Ç°ÃæÌáµ½¹ýµÄΩһֵģʽ¾ÍÊÇÒ»¸öÕâÖÖÀàÐ͵ÄÀý×Ó¡£

class Mapper
method Map(null, tuple [value GroupBy, value AggregateBy, value ...])
Emit(value GroupBy, value AggregateBy)
class Reducer
method Reduce(value GroupBy, [v1, v2,...])
Emit(value GroupBy, aggregate( [v1, v2,...] ) ) // aggregate() : sum(), max(),...

Á¬½Ó£¨Joining£©

MapperReduce¿ò¼Ü¿ÉÒԺܺõش¦ÀíÁ¬½Ó£¬²»¹ýÔÚÃæ¶Ô²»Í¬µÄÊý¾ÝÁ¿ºÍ´¦ÀíЧÂÊÒªÇóµÄʱºò»¹ÊÇÓÐһЩ¼¼ÇÉ¡£ÔÚÕⲿ·ÖÎÒÃÇ»á½éÉÜһЩ»ù±¾·½·¨£¬ÔÚºóÃæµÄ²Î¿¼ÎĵµÖл¹ÁгöÁËһЩ¹ØÓÚÕâ·½ÃæµÄרÌâÎÄÕ¡£

·ÖÅäºóÁ¬½Ó £¨Reduce¶ËÁ¬½Ó,ÅÅÐò-ºÏ²¢Á¬½Ó£©

Õâ¸öËã·¨°´ÕÕ¼üKÀ´Á¬½ÓÊý¾Ý¼¯RºÍL¡£Mapper ±éÀúRºÍLÖеÄËùÓÐÔª×飬ÒÔKΪ¼üÊä³öÿһ¸ö±ê¼ÇÁËÀ´×ÔÓÚR»¹ÊÇLµÄÔª×飬Reducer°Ñͬһ¸öKµÄÊý¾Ý·Ö×°ÈëÁ½¸öÈÝÆ÷£¨RºÍL£©£¬È»ºóǶÌ×Ñ­»·±éÀúÁ½¸öÈÝÆ÷ÖеÄÊý¾ÝÒԵõ½½»¼¯£¬×îºóÊä³öµÄÿһÌõ½á¹û¶¼°üº¬ÁËRÖеÄÊý¾Ý¡¢LÖеÄÊý¾ÝºÍK¡£ÕâÖÖ·½·¨ÓÐÒÔÏÂȱµã£º

MapperÒªÊä³öËùÓеÄÊý¾Ý£¬¼´Ê¹Ò»Ð©keyÖ»»áÔÚÒ»¸ö¼¯ºÏÖгöÏÖ¡£

Reducer ÒªÔÚÄÚ´æÖб£ÓÐÒ»¸ökeyµÄËùÓÐÊý¾Ý£¬Èç¹ûÊý¾ÝÁ¿´ò¹ýÁËÄڴ棬ÄÇô¾ÍÒª»º´æµ½Ó²ÅÌÉÏ£¬Õâ¾ÍÔö¼ÓÁËÓ²ÅÌIOµÄÏûºÄ¡£
¾¡¹ÜÈç´Ë£¬ÔÙ·ÖÅäÁ¬½Ó·½Ê½ÈÔÈ»ÊÇ×îͨÓõķ½·¨£¬ÌرðÊÇÆäËûÓÅ»¯¼¼Êõ¶¼²»ÊÊÓõÄʱºò¡£

class Mapper
method Map(null, tuple [join_key k, value v1, value v2,...])
Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] )

class Reducer
method Reduce(join_key k, tagged_tuples [t1, t2,...])
H = new AssociativeArray : set_name -> values
for all tagged_tuple t in [t1, t2,...] // separate values into 2 arrays
H{t.tag}.add(t.values)
for all values r in H{'R'} // produce a cross-join of the two arrays
for all values l in H{'L'}
Emit(null, [k r l] )

¸´ÖÆÁ´½ÓReplicated Join £¨Mapper¶ËÁ¬½Ó, Hash Á¬½Ó£©

ÔÚʵ¼ÊÓ¦ÓÃÖУ¬½«Ò»¸öСÊý¾Ý¼¯ºÍÒ»¸ö´óÊý¾Ý¼¯Á¬½ÓÊǺܳ£¼ûµÄ£¨ÈçÓû§ÓëÈÕÖ¾¼Ç¼£©¡£¼Ù¶¨ÒªÁ¬½ÓÁ½¸ö¼¯ºÏRºÍL£¬ÆäÖÐRÏà¶Ô½ÏС£¬ÕâÑù£¬¿ÉÒÔ°ÑR·Ö·¢¸øËùÓеÄMapper£¬Ã¿¸öMapper¶¼¿ÉÒÔÔØÈëËü²¢ÒÔÁ¬½Ó¼üÀ´Ë÷ÒýÆäÖеÄÊý¾Ý£¬×î³£ÓúÍÓÐЧµÄË÷Òý¼¼Êõ¾ÍÊǹþÏ£±í¡£Ö®ºó£¬Mapper±éÀúL£¬²¢½«ÆäÓë´æ´¢ÔÚ¹þÏ£±íÖеÄRÖеÄÏàÓ¦¼Ç¼Á¬½Ó£¬¡£ÕâÖÖ·½·¨·Ç³£¸ßЧ£¬ÒòΪ²»ÐèÒª¶ÔLÖеÄÊý¾ÝÅÅÐò£¬Ò²²»ÐèҪͨ¹ýÍøÂç´«ËÍLÖеÄÊý¾Ý£¬µ«ÊÇR±ØÐë×㹻Сµ½Äܹ»·Ö·¢¸øËùÓеÄMapper¡£

   
´Îä¯ÀÀ       
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ


MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí