Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
´óÊý¾Ý¿ª·¢Ö®×ß½øMapReduce
 
À´Ô´£º¿áÇÚÍø ·¢²¼ÓÚ£º2014-12-01
  3523  次浏览      31
 

MapReduceÊÇÒ»ÖÖ¿ÉÓÃÓÚÊý¾Ý´¦ÀíµÄ±à³ÌÄ£ÐÍ¡£¸ÃÄ£ÐͱȽϼòµ¥£¬µ«ÒªÏëд³öÓÐÓõijÌÐòÈ´²»Ì«ÈÝÒס£Hadoop¿ÉÒÔÔËÐи÷ÖÖÓïÑÔ°æ±¾µÄMapReduce³ÌÐò¡£ÔÚ±¾ÕÂÖУ¬ÎÒÃǽ«¿´µ½Í¬Ò»¸ö³ÌÐòµÄJava¡¢Ruby¡¢PythonºÍC++ÓïÑÔ°æ±¾¡£×îÖØÒªµÄÊÇ£¬MapReduce³ÌÐò±¾ÖÊÉÏÊDz¢ÐÐÔËÐеģ¬Òò´Ë¿ÉÒÔ½«´ó¹æÄ£µÄÊý¾Ý·ÖÎöÈÎÎñ·Ö·¢¸øÈκÎÒ»¸öÓµÓÐ×ã¹»¶à»úÆ÷µÄÊý¾ÝÖÐÐÄ¡£MapReduceµÄÓÅÊÆÔÚÓÚ´¦Àí´ó¹æÄ£Êý¾Ý¼¯£¬ËùÒÔÕâÀïÏÈÀ´¿´Ò»¸öÊý¾Ý¼¯¡£

2.1 ÆøÏóÊý¾Ý¼¯

ÔÚÎÒÃǵÄÀý×ÓÀҪдһ¸öÍÚ¾òÆøÏóÊý¾ÝµÄ³ÌÐò¡£·Ö²¼ÔÚÈ«Çò¸÷µØµÄºÜ¶àÆøÏó´«¸ÐÆ÷ÿ¸ôһСʱÊÕ¼¯ÆøÏóÊý¾ÝºÍÊÕ¼¯´óÁ¿ÈÕÖ¾Êý¾Ý£¬ÕâЩÊý¾ÝÊǰë½á¹¹»¯Êý¾ÝÇÒÊǰ´ÕռǼ·½Ê½´æ´¢µÄ£¬Òò´Ë·Ç³£ÊʺÏʹÓÃMapReduceÀ´·ÖÎö¡£

Êý¾Ý¸ñʽ

ÎÒÃÇʹÓõÄÊý¾ÝÀ´×ÔÃÀ¹ú¹ú¼ÒÆøºòÊý¾ÝÖÐÐÄ(NationalClimaticDataCenter£¬¼ò³ÆNCDC£¬http://www.ncdc.noaa.gov/)¡£ÕâЩÊý¾Ý°´Ðв¢ÒÔASCII¸ñʽ´æ´¢£¬ÆäÖÐÿһÐÐÊÇÒ»Ìõ¼Ç¼¡£¸Ã´æ´¢¸ñʽ֧³Ö·á¸»µÄÆøÏóÒªËØ£¬ÆäÖÐÐí¶àÒªËØ¿ÉÒÔÑ¡ÔñÐÔµØÁÐÈëÊÕ¼¯·¶Î§»òÆäÊý¾ÝËùÐèµÄ´æ´¢³¤¶ÈÊǿɱäµÄ¡£ÎªÁ˼òµ¥Æð¼û£¬ÎÒÃÇÖØµãÌÖÂÛһЩ»ù±¾ÒªËØ(±ÈÈçÆøÎÂ)£¬ÕâÐ©ÒªËØÊ¼ÖÕ¶¼ÓжøÇÒ³¤¶È¶¼Êǹ̶¨µÄ¡£

·¶Àý2-1ÏÔʾÁËÒ»ÐвÉÑùÊý¾Ý£¬ÆäÖÐÖØÒª×ֶα»Í»³öÏÔʾ¡£¸ÃÐÐÊý¾Ý±»·Ö³ÉºÜ¶àÐÐÒÔÍ»³öÿ¸ö×ֶΣ¬µ«ÔÚʵ¼ÊÎļþÖУ¬ÕâЩ×ֶα»ÕûºÏ³ÉÒ»ÐÐÇÒûÓÐÈκηָô·û¡£

0057 
332130#USAFweatherstationidentifier 
99999#WBANweatherstationidentifier 
19500101#observationdate 
0300#observationtime 
4 
+51317#latitude(degreesx1000) 
+028783#longitude(degreesx1000) 
FM-12 
+0171#elevation(meters) 
99999 
V020 
320#winddirection(degrees) 
1#qualitycode 
N 
0072 
1 
00450#skyceilingheight(meters) 
1#qualitycode 
C 
N 
010000#visibilitydistance(meters) 
1#qualitycode 
N 
9 
-0128#airtemperature(degreesCelsiusx10) 
1#qualitycode 
-0139#dewpointtemperature(degreesCelsiusx10) 
1#qualitycode 
10268#atmosphericpressure(hectopascalsx10) 
1#qualitycode 

Êý¾ÝÎļþ°´ÕÕÈÕÆÚºÍÆøÏóÕ¾½øÐÐ×éÖ¯¡£´Ó1901 Äêµ½2001 Ä꣬ÿһÄê¶¼ÓÐÒ»¸öĿ¼£¬Ã¿Ò»¸öĿ¼Öаüº¬¸÷¸öÆøÏóÕ¾¸ÃÄêÆøÏóÊý¾ÝµÄ´ò°üÎļþ¼°Æä˵Ã÷Îļþ¡£ÀýÈ磬1999Äê¶ÔÓ¦Îļþ¼ÐÏÂÃæ¾Í°üº¬ÏÂÃæµÄ¼Ç¼£º

%lsraw/1990|head 
010010-99999-1990.gz 
010014-99999-1990.gz 
010015-99999-1990.gz 
010016-99999-1990.gz 
010017-99999-1990.gz 
010030-99999-1990.gz 
010040-99999-1990.gz 
010080-99999-1990.gz 
010100-99999-1990.gz 
010150-99999-1990.gz 

ÒòΪÓгÉǧÉÏÍò¸öÆøÏǫ́£¬ËùÒÔÕû¸öÊý¾Ý¼¯ÓÉ´óÁ¿µÄСÎļþ×é³É¡£Í¨³£Çé¿öÏ£¬´¦ÀíÉÙÁ¿µÄ´óÐÍÎļþ¸üÈÝÒס¢¸üÓÐЧ£¬Òò´Ë£¬ÕâЩÊý¾ÝÐèÒª¾­¹ýÔ¤´¦Àí£¬½«Ã¿ÄêµÄÊý¾ÝÎļþÆ´½Ó³ÉÒ»¸öµ¥¶ÀµÄÎļþ¡£¾ßÌå×ö·¨Çë²Î¼û¸½Â¼C¡£

2.2 ʹÓÃUnix¹¤¾ßÀ´·ÖÎöÊý¾Ý

ÔÚÕâ¸öÊý¾Ý¼¯ÖУ¬Ã¿ÄêÈ«ÇòÆøÎµÄ×î¸ß¼Ç¼ÊǶàÉÙ£¿ÎÒÃÇÏȲ»Ê¹ÓÃHadoopÀ´½â¾öÕâ¸öÎÊÌ⣬ÒòΪֻÓÐÌṩÁËÐÔÄÜ»ù×¼ºÍ½á¹û¼ì²é¹¤¾ß£¬²ÅÄܺÍHadoop½øÐÐÓÐЧ¶Ô±È¡£

´«Í³´¦Àí°´Ðд洢Êý¾ÝµÄ¹¤¾ßÊÇawk¡£·¶Àý2-2ÊÇÒ»¸ö³ÌÐò½Å±¾£¬ÓÃÓÚ¼ÆËãÿÄêµÄ×î¸ßÆøÎ¡£

·¶Àý2-2. ¸Ã³ÌÐò´ÓNCDCÆøÏó¼Ç¼ÖÐÕÒ³öÿÄê×î¸ßÆøÎÂ

#!/usr/bin/envbash 
foryearinall/* 
do 
echo-ne`basename$year.gz`"t" 
gunzip-c$year| 
awk'{temp=substr($0,88,5)+0; 
q=substr($0,93,1); 
if(temp!=9999&&q~/[01459]/&&temp>max)max=temp} 
END{printmax}' 
done 

Õâ¸ö½Å±¾Ñ­»·±éÀú°´ÄêѹËõµÄÊý¾ÝÎļþ£¬Ê×ÏÈÏÔʾÄê·Ý£¬È»ºóʹÓÃawk´¦Àíÿһ¸öÎļþ¡£awk´ÓÊý¾ÝÖÐÌáÈ¡Á½¸ö×ֶΣºÆøÎºÍÖÊÁ¿´úÂë¡£ÆøÎÂÖµ¼Ó0ºóת»»ÎªÕûÊý¡£½Ó×ŲâÊÔÆøÎÂÖµÊÇ·ñÓÐЧ(ÓÃ9999 Ìæ´úNCDC Êý¾Ý¼¯ÖеÄȱʧµÄÖµ)£¬Í¨¹ýÖÊÁ¿´úÂëÀ´¼ì²â¶ÁÈ¡µÄÊýÖµÊÇ·ñÓÐÒÉÎÊ»ò´íÎó¡£Èç¹ûÊý¾Ý¶ÁÈ¡ÕýÈ·£¬ÄÇô¸ÃÖµ½«ÓëĿǰ¶ÁÈ¡µ½µÄ×î´óÆøÎÂÖµ½øÐбȽϣ¬Èç¹û¸ÃÖµ±ÈÔ­ÏȵÄ×î´óÖµ´ó£¬¾ÍÌæ»»Ä¿Ç°µÄ×î´óÖµ¡£´¦ÀíÍêÎļþÖÐËùÓеÄÐкó£¬ÔÙÖ´ÐÐEND¿éÖеĴúÂë²¢ÔÚÆÁÄ»ÉÏÊä³ö×î´óÆøÎÂÖµ¡£

ÏÂÃæÊÇij´ÎÔËÐнá¹ûµÄÆðʼ²¿·Ö£º

%./max_temperature.sh 
1901317 
1902244 
1903289 
1904256 
1905283 
... 

ÓÉÓÚÔ´ÎļþÖÐµÄÆøÎÂÖµ±»·Å´ó10±¶£¬ËùÒÔ1901ÄêµÄ×î¸ßÆøÎÂÊÇ31.7¡æ(20ÊÀ¼Í³õ¼Ç¼µÄÆøÎÂÊý¾Ý±È½ÏÉÙ£¬ËùÒÔÕâ¸ö½á¹ûÒ²ÊÇ¿ÉÄܵÄ)¡£Ê¹ÓÃÑÇÂíÑ·µÄEC2 High-CPU Extra Large InstanceÔËÐÐÕâ¸ö³ÌÐò£¬Ö»ÐèÒª42·ÖÖӾͿÉÒÔ´¦ÀíÍêÒ»¸öÊÀ¼ÍµÄÆøÏóÊý¾Ý£¬ÕÒ³ö×î¸ßÆøÎ¡£

ΪÁ˼ӿ촦ÀíËÙ¶È£¬ÎÒÃÇÐèÒª²¢Ðд¦Àí³ÌÐòÀ´½øÐÐÊý¾Ý·ÖÎö¡£´ÓÀíÂÛÉϽ²£¬ÕâºÜ¼òµ¥£ºÎÒÃÇ¿ÉÒÔʹÓüÆËã»úÉÏËùÓпÉÓõÄÓ²¼þỊ̈߳¨hardware thread£©À´´¦Àí£¬Ã¿¸öÏ̸߳ºÔð´¦Àí²»Í¬Äê·ÝµÄÊý¾Ý¡£µ«ÕâÑù×öÈÔÈ»´æÔÚһЩÎÊÌâ¡£

Ê×ÏÈ£¬½«ÈÎÎñ»®·Ö³É´óСÏàͬµÄ×÷ҵͨ³£²¢²»ÊÇÒ»¼þÈÝÒ×µÄÊÂÇé¡£ÔÚÎÒÃÇÕâ¸öÀý×ÓÖУ¬²»Í¬Äê·ÝÊý¾ÝÎļþµÄ´óС²îÒìºÜ´ó£¬ËùÒÔÓÐÒ»²¿·ÖÏ̻߳á±ÈÆäËûÏ̸߳üÔç½áÊøÔËÐС£¼´Ê¹¿ÉÒÔÔÙΪËüÃÇ·ÖÅäÏÂÒ»¸ö×÷Òµ£¬µ«×ܵÄÔËÐÐʱ¼äÈÔȻȡ¾öÓÚ´¦Àí×ÎļþËùÐèÒªµÄʱ¼ä¡£ÁíÒ»ÖÖ¸üºÃµÄ·½·¨Êǽ«ÊäÈëÊý¾Ý·Ö³É¹Ì¶¨´óСµÄ¿é(chunk)£¬È»ºóÿ¿é·Öµ½¸÷¸ö½ø³ÌÈ¥Ö´ÐУ¬ÕâÑùÒ»À´£¬¼´Ê¹ÓÐһЩ½ø³Ì¿ÉÒÔ´¦Àí¸ü¶àÊý¾Ý£¬ÎÒÃÇÒ²¿ÉÒÔΪËüÃÇ·ÖÅä¸ü¶àµÄÊý¾Ý¡£

Æä´Î£¬ºÏ²¢¸÷¸ö¶ÀÁ¢½ø³ÌµÄÔËÐнá¹û£¬¿ÉÄÜ»¹ÐèÒª¶îÍâ½øÐд¦Àí¡£ÔÚÎÒÃǵÄÀý×ÓÖУ¬Ã¿ÄêµÄ½á¹û¶ÀÁ¢ÓÚÆäËûÄê·Ý£¬ËùÒÔ¿ÉÄÜÐèÒª°ÑËùÓнá¹ûÆ´½ÓÆðÀ´£¬È»ºóÔÙ°´Äê·Ý½øÐÐÅÅÐò¡£Èç¹ûʹÓù̶¨¿é´óСµÄ·½·¨£¬ÔòÐèÒªÒ»ÖÖ¾«Çɵķ½·¨À´ºÏ²¢½á¹û¡£ÔÚÕâ¸öÀý×ÓÖУ¬Ä³ÄêµÄÊý¾Ýͨ³£±»·Ö¸î³É¼¸¸ö¿é£¬Ã¿¸ö¿é¶ÀÁ¢´¦Àí¡£ÎÒÃÇ×îÖÕ»ñµÃÿ¸ö¿éµÄ×î¸ßÆøÎ£¬ËùÒÔ×îºóÒ»²½ÕÒ³ö×î´óÖµ×÷Ϊ¸ÃÄêµÄ×î¸ßÆøÎ£¬ÆäËûÄê·ÝµÄÊý¾Ý¶¼ÏñÕâÑù´¦Àí¡£

×îºó£¬»¹ÊǵÃÊÜÏÞÓÚµ¥Ì¨¼ÆËã»úµÄ´¦ÀíÄÜÁ¦¡£¼´±ã¿ª×ãÂíÁ¦£¬ÓÃÉÏËùÓд¦ÀíÆ÷£¬ÖÁÉÙÒ²µÃ»¨20·ÖÖÓ£¬ÏµÍ³ÎÞ·¨¸ü¿ìÁË¡£ÁíÍ⣬ijЩÊý¾Ý¼¯µÄÔö³¤¿ÉÄܻᳬ³öµ¥Ì¨¼ÆËã»úµÄ´¦ÀíÄÜÁ¦¡£Ò»µ©¿ªÊ¼Ê¹Óöą̀¼ÆËã»ú£¬Õû¸ö´ó»·¾³ÖÐµÄÆäËûÒòËØ¾Í»á»¥ÏàÓ°Ï죬×îÖ÷ÒªµÄÁ½¸öÒòËØÊÇЭµ÷ÐԺͿɿ¿ÐÔ¡£Äĸö½ø³Ì¸ºÔðÔËÐÐÕû¸ö×÷Òµ£¿ÎÒÃÇÈçºÎ´¦Àíʧ°ÜµÄ½ø³Ì£¿

Òò´Ë£¬ËäÈ»²¢Ðд¦ÀíÊÇ¿ÉÐе쬲»¹ýʵ¼ÊÉÏÒ²ºÜÂé·³¡£Ê¹ÓÃHadoopÕâÑùµÄ¿ò¼ÜÀ´½â¾öÕâЩÎÊÌâºÜÓаïÖú¡£

2.3 ʹÓÃHadoopÀ´·ÖÎöÊý¾Ý

ΪÁ˳ä·ÖÀûÓÃHadoop ÌṩµÄ²¢Ðд¦ÀíÓÅÊÆ£¬ÎÒÃÇÐèÒª½«²éѯ±íʾ³ÉMapReduce ×÷Òµ¡£Íê³ÉijÖÖ±¾µØ¶ËµÄС¹æÄ£²âÊÔÖ®ºó£¬¾Í¿ÉÒÔ°Ñ×÷Òµ²¿Êðµ½ÔÚ¼¯ÈºÉÏÔËÐС£

2.3.1 mapºÍreduce

MapReduceÈÎÎñ¹ý³Ì·ÖΪÁ½¸ö´¦Àí½×¶Î£ºmap½×¶ÎºÍreduce½×¶Î¡£Ã¿¸ö½×¶Î¶¼ÒÔ¼üÖµ¶Ô×÷ΪÊäÈëºÍÊä³ö£¬ÆäÀàÐÍÓɳÌÐòÔ±À´Ñ¡Ôñ¡£³ÌÐòÔ±»¹ÐèҪдÁ½¸öº¯Êý£ºmapº¯ÊýºÍreduceº¯Êý¡£

map½×¶ÎµÄÊäÈëÊÇNCDCԭʼÊý¾Ý¡£ÎÒÃÇÑ¡ÔñÎı¾¸ñʽ×÷ΪÊäÈë¸ñʽ£¬½«Êý¾Ý¼¯µÄÿһÐÐ×÷ΪÎı¾ÊäÈë¡£¼üÊÇijһÐÐÆðʼλÖÃÏà¶ÔÓÚÎļþÆðʼλÖÃµÄÆ«ÒÆÁ¿£¬²»¹ýÎÒÃDz»ÐèÒªÕâ¸öÐÅÏ¢£¬ËùÒÔ½«ÆäºöÂÔ¡£

ÎÒÃǵÄmapº¯ÊýºÜ¼òµ¥¡£ÓÉÓÚÎÒÃÇÖ»¶ÔÄê·ÝºÍÆøÎÂÊôÐÔ¸ÐÐËȤ£¬ËùÒÔÖ»ÐèҪȡ³öÕâÁ½¸ö×Ö¶ÎÊý¾Ý¡£ÔÚ±¾ÀýÖУ¬mapº¯ÊýÖ»ÊÇÒ»¸öÊý¾Ý×¼±¸½×¶Î£¬Í¨¹ýÕâÖÖ·½Ê½À´×¼±¸Êý¾Ý£¬Ê¹reducerº¯ÊýÄܹ»¼ÌÐø¶ÔËü½øÐд¦Àí£º¼´ÕÒ³öÿÄêµÄ×î¸ßÆøÎ¡£mapº¯Êý»¹ÊÇÒ»¸ö±È½ÏÊʺÏÈ¥³ýÒÑËð¼Ç¼µÄµØ·½£º´Ë´¦£¬ÎÒÃÇɸµôȱʧµÄ¡¢¿ÉÒɵĻò´íÎóµÄÆøÎÂÊý¾Ý¡£

ΪÁËÈ«ÃæÁ˽âmapµÄ¹¤×÷·½Ê½£¬ÎÒÃÇ¿¼ÂÇÒÔÏÂÊäÈëÊý¾ÝµÄʾÀýÊý¾Ý(¿¼Âǵ½Æª·ù£¬È¥³ýÁËһЩδʹÓõÄÁУ¬²¢ÓÃÊ¡ÂԺűíʾ)£º

0067011990999991950051507004...9999999N9+00001+99999999999... 
0043011990999991950051512004...9999999N9+00221+99999999999... 
0043011990999991950051518004...9999999N9-00111+99999999999... 
0043012650999991949032412004...0500001N9+01111+99999999999... 
0043012650999991949032418004...0500001N9+00781+99999999999... 

ÕâЩÐÐÒÔ¼ü/Öµ¶ÔµÄ·½Ê½×÷Ϊmapº¯ÊýµÄÊäÈ룺

(0,0067011990999991950051507004...9999999N9+00001+99999999999...) 
(106,0043011990999991950051512004...9999999N9+00221+99999999999...) 
(212,0043011990999991950051518004...9999999N9-00111+99999999999...) 
(318,0043012650999991949032412004...0500001N9+01111+99999999999...) 
(424,0043012650999991949032418004...0500001N9+00781+99999999999...) 

¼ü(key)ÊÇÎļþÖеÄÐÐÆ«ÒÆÁ¿£¬mapº¯Êý²¢²»ÐèÒªÕâ¸öÐÅÏ¢£¬ËùÒÔ½«ÆäºöÂÔ¡£mapº¯ÊýµÄ¹¦ÄܽöÏÞÓÚÌáÈ¡Äê·ÝºÍÆøÎÂÐÅÏ¢(ÒÔ´ÖÌåÏÔʾ)£¬²¢½«ËüÃÇ×÷ΪÊä³ö(ÆøÎÂÖµÒÑÓÃÕûÊý±íʾ)£º

(1950,0) 
(1950,22) 
(1950,?11) 
(1949,111) 
(1949,78) 

mapº¯ÊýµÄÊä³ö¾­ÓÉMapReduce¿ò¼Ü´¦Àíºó£¬×îºó·¢Ë͵½reduceº¯Êý¡£Õâ¸ö´¦Àí¹ý³Ì»ùÓÚ¼üÀ´¶Ô¼üÖµ¶Ô½øÐÐÅÅÐòºÍ·Ö×é¡£Òò´Ë£¬ÔÚÕâһʾÀýÖУ¬reduceº¯Êý¿´µ½µÄÊÇÈçÏÂÊäÈ룺

(1949,[111,78]) 
(1950,[0,22,?11])

ÿһÄê·Ýºó½ô¸ú×ÅһϵÁÐÆøÎÂÊý¾Ý¡£reduceº¯ÊýÏÖÔÚÒª×öµÄÊDZéÀúÕû¸öÁÐ±í²¢´ÓÖÐÕÒ³ö×î´óµÄ¶ÁÊý£º

(1949,111) 
(1950,22)

ÕâÊÇ×îÖÕÊä³ö½á¹û£ºÃ¿Ò»ÄêµÄÈ«Çò×î¸ßÆøÎ¼Ç¼¡£

Õû¸öÊý¾ÝÁ÷Èçͼ2-1Ëùʾ¡£ÔÚͼµÄµ×²¿ÊÇUnix¹ÜÏߣ¬ÓÃÓÚÄ£ÄâÕû¸öMapReduceµÄÁ÷³Ì£¬²¿·ÖÄÚÈݽ«ÔÚÌÖÂÛHadoop StreamingʱÔÙ´ÎÉæ¼°¡£

ͼ2-1. MapReduceµÄÂß¼­Êý¾ÝÁ÷

2.3.2 Java MapReduce

Ã÷°×MapReduce³ÌÐòµÄ¹¤×÷Ô­ÀíÖ®ºó£¬ÏÂÒ»²½¾ÍÊÇд´úÂëʵÏÖËü¡£ÎÒÃÇÐèÒªÈýÑù¶«Î÷£ºÒ»¸ömapº¯Êý¡¢Ò»¸öreduceº¯ÊýºÍһЩÓÃÀ´ÔËÐÐ×÷ÒµµÄ´úÂë¡£mapº¯ÊýÓÉMapperÀàʵÏÖÀ´±íʾ£¬ºóÕßÉùÃ÷Ò»¸ömap()Ðé·½·¨¡£·¶Àý2-3ÏÔʾÁËÎÒÃǵÄmapº¯ÊýʵÏÖ¡£

·¶Àý2-3.²éÕÒ×î¸ßÆøÎµÄMapperÀà

importjava.io.IOException; 

importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Mapper;

publicclassMaxTemperatureMapper
extendsMapReduceBaseimplementsMapper<LongWritable,Text,Text,IntWritable>{

privatestaticfinalintMISSING=9999;

@Override
publicvoidmap(LongWritablekey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{

Stringline=value.toString();
Stringyear=line.substring(15,19);
intairTemperature;
if(line.charAt(87)=='+'){//parseIntdoesn'tlikeleadingplussigns
airTemperature=Integer.parseInt(line.substring(88,92));
}else{
airTemperature=Integer.parseInt(line.substring(87,92));
}
Stringquality=line.substring(92,93);
if(airTemperature!=MISSING&&quality.matches("[01459]")){
context.write(newText(year),newIntWritable(airTemperature));
}
}
}

Õâ¸öMapperÀàÊÇÒ»¸ö·ºÐÍÀàÐÍ£¬ËüÓÐËĸöÐβÎÀàÐÍ£¬·Ö±ðÖ¸¶¨mapº¯ÊýµÄÊäÈë¼ü¡¢ÊäÈëÖµ¡¢Êä³ö¼üºÍÊä³öÖµµÄÀàÐÍ¡£¾ÍÏÖÔÚÕâ¸öÀý×ÓÀ´Ëµ£¬ÊäÈë¼üÊÇÒ»¸ö³¤ÕûÊýÆ«ÒÆÁ¿£¬ÊäÈëÖµÊÇÒ»ÐÐÎı¾£¬Êä³ö¼üÊÇÄê·Ý£¬Êä³öÖµÊÇÆøÎÂ(ÕûÊý)¡£Hadoop±¾ÉíÌṩÁËÒ»Ì׿ÉÓÅ»¯ÍøÂçÐòÁл¯´«ÊäµÄ»ù±¾ÀàÐÍ£¬¶ø²»Ö±½ÓʹÓÃJavaÄÚǶµÄÀàÐÍ¡£ÕâЩÀàÐͶ¼ÔÚorg.apache.hadoop.io°üÖС£ÕâÀïʹÓÃLongWritableÀàÐÍ(Ï൱ÓÚJavaµÄLongÀàÐÍ)¡¢TextÀàÐÍ(Ï൱ÓÚJavaÖеÄStringÀàÐÍ)ºÍIntWritableÀàÐÍ(Ï൱ÓÚJavaµÄIntegerÀàÐÍ)¡£

map()·½·¨µÄÊäÈëÊÇÒ»¸ö¼üºÍÒ»¸öÖµ¡£ÎÒÃÇÊ×ÏȽ«°üº¬ÓÐÒ»ÐÐÊäÈëµÄTextֵת»»³ÉJavaµÄStringÀàÐÍ£¬Ö®ºóʹÓÃsubstring()·½·¨ÌáÈ¡ÎÒÃǸÐÐËȤµÄÁС£

map()·½·¨»¹ÌṩÁËContextʵÀýÓÃÓÚÊä³öÄÚÈݵÄдÈë¡£ÔÚÕâÖÖÇé¿öÏ£¬ÎÒÃǽ«Äê·ÝÊý¾Ý°´Text¶ÔÏó½øÐжÁ/д(ÒòΪÎÒÃǰÑÄê·Ýµ±×÷¼ü)£¬½«ÆøÎÂÖµ·â×°ÔÚIntWritableÀàÐÍÖС£Ö»ÓÐÆøÎÂÊý¾Ý²»È±²¢ÇÒËù¶ÔÓ¦ÖÊÁ¿´úÂëÏÔʾΪÕýÈ·µÄÆøÎ¶ÁÊýʱ£¬ÕâЩÊý¾Ý²Å»á±»Ð´ÈëÊä³ö¼Ç¼ÖС£

ÒÔÀàËÆ·½·¨ÓÃReducerÀ´¶¨Òåreduceº¯Êý£¬Èç·¶Àý2-4Ëùʾ¡£

·¶Àý2-4.²éÕÒ×î¸ßÆøÎµÄReducerÀà

importjava.io.IOException; 

importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Reducer;

publicclassMaxTemperatureReducer
extendsReducer<Text,IntWritable,Text,IntWritable>{

@Override
publicvoidreduce(Textkey,Iterable<IntWritable>values,
Contextcontext)
throwsIOException,InterruptedException{

intmaxValue=Integer.MIN_VALUE;
for(IntWritablevalue:values){
maxValue=Math.max(maxValue,value.get());
}
context.write(key,newIntWritable(maxValue));
}
}

ͬÑù£¬reduceº¯ÊýÒ²ÓÐËĸöÐÎʽ²ÎÊýÀàÐÍÓÃÓÚÖ¸¶¨ÊäÈëºÍÊä³öÀàÐÍ¡£reduceº¯ÊýµÄÊäÈëÀàÐͱØÐëÆ¥Åämapº¯ÊýµÄÊä³öÀàÐÍ£º¼´TextÀàÐͺÍIntWritableÀàÐÍ¡£ÔÚÕâÖÖÇé¿öÏ£¬reduceº¯ÊýµÄÊä³öÀàÐÍÒ²±ØÐëÊÇTextºÍIntWritableÀàÐÍ£¬·Ö±ðÊä³öÄê·Ý¼°Æä×î¸ßÆøÎ¡£Õâ¸ö×î¸ßÆøÎÂÊÇͨ¹ýÑ­»·±È½Ïÿ¸öÆøÎÂÓ뵱ǰËùÖª×î¸ßÆøÎÂËùµÃµ½µÄ¡£

µÚÈý²¿·Ö´úÂ븺ÔðÔËÐÐMapReduce×÷Òµ(Çë²Î¼û·¶Àý2-5)¡£

·¶Àý2-5.Õâ¸öÓ¦ÓóÌÐòÔÚÆøÏóÊý¾Ý¼¯ÖÐÕÒ³ö×î¸ßÆøÎÂ

importjava.io.IOException; 
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.input.FileOutputFormat;
importorg.apache.hadoop.mapredduce.input.FileOutputFormat

publicclassMaxTemperature{

publicstaticvoidmain(String[]args)throwsException{
if(args.length!=2){
System.err.println("Usage:MaxTemperature<inputpath><outputpath>");
System.exit(-1);
}

Jobjob=newJob();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Maxtemperature");

FileInputFormat.addInputPath(job,newPath(args[0]));
FileOutputFormat.setOutputPath(job,newPath(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true)?0:1);
}
}

Job¶ÔÏóÖ¸¶¨×÷ÒµÖ´Ðй淶¡£ÎÒÃÇ¿ÉÒÔÓÃËüÀ´¿ØÖÆÕû¸ö×÷ÒµµÄÔËÐС£ÎÒÃÇÔÚHadoop¼¯ÈºÉÏÔËÐÐÕâ¸ö×÷ҵʱ£¬Òª°Ñ´úÂë´ò°ü³ÉÒ»¸öJARÎļþ(HadoopÔÚ¼¯ÈºÉÏ·¢²¼Õâ¸öÎļþ)¡£²»±ØÃ÷È·Ö¸¶¨JARÎļþµÄÃû³Æ£¬ÔÚJob¶ÔÏóµÄsetJarByClass()·½·¨Öд«µÝÒ»¸öÀ༴¿É£¬HadoopÀûÓÃÕâ¸öÀàÀ´²éÕÒ°üº¬ËüµÄJARÎļþ£¬½ø¶øÕÒµ½Ïà¹ØµÄJARÎļþ¡£

¹¹ÔìJob¶ÔÏóÖ®ºó£¬ÐèÒªÖ¸¶¨ÊäÈëºÍÊä³öÊý¾ÝµÄ·¾¶¡£µ÷ÓÃFileInputFormatÀàµÄ¾²Ì¬·½·¨addInputPath()À´¶¨ÒåÊäÈëÊý¾ÝµÄ·¾¶£¬Õâ¸ö·¾¶¿ÉÒÔÊǵ¥¸öµÄÎļþ¡¢Ò»¸öĿ¼(´Ëʱ£¬½«Ä¿Â¼ÏÂËùÓÐÎļþµ±×÷ÊäÈë)»ò·ûºÏÌØ¶¨ÎļþģʽµÄһϵÁÐÎļþ¡£Óɺ¯ÊýÃû¿ÉÖª£¬¿ÉÒÔ¶à´Îµ÷ÓÃaddInputPath()À´ÊµÏÖ¶à·¾¶µÄÊäÈë¡£

µ÷ÓÃFileOutputFormatÀàÖеľ²Ì¬·½·¨setOutputPath()À´Ö¸¶¨Êä³ö·¾¶(Ö»ÄÜÓÐÒ»¸öÊä³ö·¾¶)¡£Õâ¸ö·½·¨Ö¸¶¨µÄÊÇreduceº¯ÊýÊä³öÎļþµÄдÈëĿ¼¡£ÔÚÔËÐÐ×÷ҵǰ¸ÃĿ¼ÊDz»Ó¦¸Ã´æÔڵ쬷ñÔòHadoop»á±¨´í²¢¾Ü¾øÔËÐÐ×÷Òµ¡£ÕâÖÖÔ¤·À´ëÊ©µÄÄ¿µÄÊÇ·ÀÖ¹Êý¾Ý¶ªÊ§(³¤Ê±¼äÔËÐеÄ×÷ÒµÈç¹û½á¹û±»ÒâÍ⸲¸Ç£¬¿Ï¶¨ÊǷdz£ÄÕÈ˵Ä)¡£

½Ó×Å£¬Í¨¹ýsetMapperClass()ºÍsetReducerClass()Ö¸¶¨mapÀàÐͺÍreduceÀàÐÍ¡£

setOutputKeyClass()ºÍsetOutputValueClass()¿ØÖÆmapºÍreduceº¯ÊýµÄÊä³öÀàÐÍ£¬ÕýÈç±¾ÀýËùʾ£¬ÕâÁ½¸öÊä³öÀàÐÍÒ»°ã¶¼ÊÇÏàͬµÄ¡£Èç¹û²»Í¬£¬Ôòͨ¹ýsetMapOutputKeyClass()ºÍsetMapOutputValueClass()À´ÉèÖÃmapº¯ÊýµÄÊä³öÀàÐÍ¡£

ÊäÈëµÄÀàÐÍͨ¹ýInputFormatÀàÀ´¿ØÖÆ£¬ÎÒÃǵÄÀý×ÓÖÐûÓÐÉèÖã¬ÒòΪʹÓõÄÊÇĬÈϵÄTextInputFormat(Îı¾ÊäÈë¸ñʽ)¡£

ÔÚÉèÖö¨ÒåmapºÍreduceº¯ÊýµÄÀàÖ®ºó£¬¿ÉÒÔ¿ªÊ¼ÔËÐÐ×÷Òµ¡£JobÖеÄwaitForCompletion()·½·¨Ìá½»×÷Òµ²¢µÈ´ýÖ´ÐÐÍê³É¡£¸Ã·½·¨ÖеIJ¼¶û²ÎÊýÊǸöÏêϸ±êʶ£¬ËùÒÔ×÷Òµ»á°Ñ½ø¶Èдµ½¿ØÖÆÌ¨¡£

waitForCompletion()·½·¨·µ»ØÒ»¸ö²¼¶ûÖµ£¬±íʾִÐеijÉ(true)°Ü(false)£¬Õâ¸ö²¼¶ûÖµ±»×ª»»³É³ÌÐòµÄÍ˳ö´úÂë0»òÕß1¡£

2.3.1.1 ÔËÐвâÊÔ

дºÃMapReduce×÷ÒµÖ®ºó£¬Í¨³£ÒªÄÃÒ»¸öСÐÍÊý¾Ý¼¯½øÐвâÊÔÒÔÅųý´úÂëÎÊÌâ¡£Ê×ÏÈ£¬ÒÔ¶ÀÁ¢(±¾»ú)ģʽ°²×°Hadoop£¬Ïêϸ˵Ã÷Çë²Î¼û¸½Â¼A¡£ÔÚÕâÖÖģʽÏ£¬HadoopÔÚ±¾µØÎļþϵͳÉÏÔËÐÐ×÷Òµ³ÌÐò¡£È»ºó£¬Ê¹Óñ¾ÊéÍøÕ¾ÉϵÄÖ¸Áî°²×°ºÍ±àÒëʾÀý¡£

ÒÔÇ°ÃæÌÖ¹ýµÄ5ÐвÉÑùÊý¾ÝΪÀýÀ´²âÊÔMapReduce×÷Òµ(¿¼Âǵ½Æª·ù£¬ÕâÀï¶ÔÊä³öÉÔÓÐÐÞ¸Ä)£º

%exportHADOOP_CLASSPATH=hadoop-examples.jar 
%hadoopMaxTemperatureinput/ncdc/sample.txtoutput

12/02/0411:50:41WARNutil.NativeCodeLoader:Unabletoloadnative-hadooplibrary
foryourplatform...usingbuiltin-javaclasseswhereapplicable
12/02/0411:50:41WARNmapred.JobClient:UseGenericOptionsParserforparsingthe
arguments.ApplicationsshouldimplementToolforthesame.
12/02/0411:50:41INFOinput.FileInputFormat:Totalinputpathstoprocess:1
12/02/0411:50:41INFOmapred.JobClient:Runningjob:job_local_0001
12/02/0411:50:41INFOmapred.Task:UsingResourceCalculatorPlugin:null
12/02/0411:50:41INFOmapred.MapTask:io.sort.mb=100
12/02/0411:50:42INFOmapred.MapTask:databuffer=79691776/99614720
12/02/0411:50:42INFOmapred.MapTask:recordbuffer=262144/327680
12/02/0411:50:42INFOmapred.MapTask:Startingflushofmapoutput
12/02/0411:50:42INFOmapred.MapTask:Finishedspill0
12/02/0411:50:42INFOmapred.Task:Task:attempt_local_0001_m_000000_0isdone.Andi
sintheprocessofcommiting
12/02/0411:50:42INFOmapred.JobClient:map0%reduce0%
12/02/0411:50:44INFOmapred.LocalJobRunner:
12/02/0411:50:44INFOmapred.Task:Task'attempt_local_0001_m_000000_0'done.
12/02/0411:50:44INFOmapred.Task:UsingResourceCalculatorPlugin:null
12/02/0411:50:44INFOmapred.LocalJobRunner:
12/02/0411:50:44INFOmapred.Merger:Merging1sortedsegments
12/02/0411:50:44INFOmapred.Merger:Downtothelastmerge-pass,with1segments
leftoftotalsize:57bytes
12/02/0411:50:44INFOmapred.LocalJobRunner:
12/02/0411:50:45INFOmapred.Task:Task:attempt_local_0001_r_000000_0isdone.And
isintheprocessofcommiting
12/02/0411:50:45INFOmapred.LocalJobRunner:
12/02/0411:50:45INFOmapred.Task:Taskattempt_local_0001_r_000000_0isallowedto
commitnow
12/02/0411:50:45INFOoutput.FileOutputCommitter:Savedoutputoftask'attempt_local
_0001_r_000000_0'tooutput
12/02/0411:50:45INFOmapred.JobClient:map100%reduce0%
12/02/0411:50:47INFOmapred.LocalJobRunner:reduce>reduce
12/02/0411:50:47INFOmapred.Task:Task'attempt_local_0001_r_000000_0'done.
12/02/0411:50:48INFOmapred.JobClient:map100%reduce100%
12/02/0411:50:48INFOmapred.JobClient:Jobcomplete:job_local_0001
12/02/0411:50:48INFOmapred.JobClient:Counters:17
12/02/0411:50:48INFOmapred.JobClient:FileOutputFormatCounters
12/02/0411:50:48INFOmapred.JobClient:BytesWritten=29
12/02/0411:50:48INFOmapred.JobClient:FileSystemCounters
12/02/0411:50:48INFOmapred.JobClient:FILE_BYTES_READ=357503
12/02/0411:50:48INFOmapred.JobClient:FILE_BYTES_WRITTEN=425817
12/02/0411:50:48INFOmapred.JobClient:FileInputFormatCounters
12/02/0411:50:48INFOmapred.JobClient:BytesRead=529
12/02/0411:50:48INFOmapred.JobClient:Map-ReduceFramework
12/02/0411:50:48INFOmapred.JobClient:Mapoutputmaterializedbytes=61
12/02/0411:50:48INFOmapred.JobClient:Mapinputrecords=5
12/02/0411:50:48INFOmapred.JobClient:Reduceshufflebytes=0
12/02/0411:50:48INFOmapred.JobClient:SpilledRecords=10
12/02/0411:50:48INFOmapred.JobClient:Mapoutputbytes=45
12/02/0411:50:48INFOmapred.JobClient:Totalcommittedheapusage(bytes)=36923
8016
12/02/0411:50:48INFOmapred.JobClient:SPLIT_RAW_BYTES=129
12/02/0411:50:48INFOmapred.JobClient:Combineinputrecords=0
12/02/0411:50:48INFOmapred.JobClient:Reduceinputrecords=5
12/02/0411:50:48INFOmapred.JobClient:Reduceinputgroups=2
12/02/0411:50:48INFOmapred.JobClient:Combineoutputrecords=0
12/02/0411:50:48INFOmapred.JobClient:Reduceoutputrecords=2
12/02/0411:50:48INFOmapred.JobClient:Mapoutputrecords=5

Èç¹ûµ÷ÓÃhadoopÃüÁîµÄµÚÒ»¸ö²ÎÊýÊÇÀàÃû£¬Hadoop¾Í»áÆô¶¯Ò»¸öJVM£¨JavaÐéÄâ»ú£©À´ÔËÐÐÕâ¸öÀࡣʹÓÃhadoopÃüÁîÔËÐÐ×÷Òµ±ÈÖ±½ÓʹÓÃJavaÃüÁîÀ´ÔËÐиü·½±ã£¬ÒòΪǰÕß½«Hadoop¿âÎļþ(¼°ÆäÒÀÀµ¹ØÏµ)·¾¶¼ÓÈëµ½Àà·¾¶²ÎÊýÖУ¬Í¬Ê±Ò²ÄÜ»ñµÃHadoopµÄÅäÖÃÎļþ¡£ÐèÒª¶¨ÒåÒ»¸ö HADOOP_CLASSPATH »·¾³±äÁ¿ÓÃÓÚÌí¼ÓÓ¦ÓóÌÐòÀàµÄ·¾¶£¬È»ºóÓÉHadoop ½Å±¾À´Ö´ÐÐÏà¹Ø²Ù×÷¡£

ÒÔ±¾µØ(¶ÀÁ¢)ģʽÔËÐÐʱ£¬±¾ÊéÖÐËùÓгÌÐò¾ù¼ÙÉè°´ÕÕÕâÖÖ·½Ê½À´ÉèÖÃHADOOP_CLASSPATH¡£ÃüÁîµÄÔËÐÐÐèÒªÔÚ·¶Àý´úÂëËùÔÚµÄÎļþ¼ÐϽøÐС£

ÔËÐÐ×÷ÒµËùµÃµ½µÄÊä³öÌṩÁËһЩÓÐÓõÄÐÅÏ¢¡£ÀýÈ磬ÎÒÃÇ¿ÉÒÔ¿´µ½£¬Õâ¸ö×÷ÒµÓÐÖ¸¶¨µÄ±êʶ£¬¼´job_local_0001£¬²¢ÇÒÖ´ÐÐÁËÒ»¸ömap ÈÎÎñºÍÒ»¸öreduce ÈÎÎñ(ʹÓÃattempt_local_0001_m_000000_0ºÍattempt_ local_0001_r_000000_0Á½¸öID)¡£ÔÚµ÷ÊÔMapReduce×÷ҵʱ£¬ÖªµÀ×÷ÒµIDºÍÈÎÎñID ÊǷdz£ÓÐÓõġ£

Êä³öµÄ×îºóÒ»²¿·Ö£¬ÒÔCountersΪ±êÌ⣬ÏÔʾHadoop ÉÏÔËÐеÄÿ¸ö×÷ÒµµÄһЩͳ¼ÆÐÅÏ¢¡£ÕâЩÐÅÏ¢¶Ô¼ì²éÊý¾ÝÊÇ·ñ°´ÕÕÔ¤ÆÚ½øÐд¦Àí·Ç³£ÓÐÓá£ÀýÈ磬ÎÒÃDz鿴ϵͳÊä³öµÄ¼Ç¼ÐÅÏ¢¿ÉÖª£º5¸ömapÊäÈë²úÉúÁË5¸ömapÊä³ö£¬È»ºó5¸öreduce ÊäÈë²úÉú2¸öreduce Êä³ö¡£

Êä³öÊý¾ÝдÈëoutputĿ¼£¬ÆäÖÐÿ¸öreducer¶¼ÓÐÒ»¸öÊä³öÎļþ¡£ÎÒÃǵÄÀý×ÓÖÐÖ»ÓÐÒ»¸ö reducer£¬ËùÒÔÖ»ÄÜÕÒµ½Ò»¸öÃûΪpart-00000µÄÎļþ£º

%catoutput/part-00000 
1949111
195022

Õâ¸ö½á¹ûºÍÎÒÃÇ֮ǰÊÖ¶¯Ñ°ÕҵĽá¹ûÒ»Ñù¡£ÎÒÃǰÑÕâ¸ö½á¹û½âÊÍΪ1949ÄêµÄ×î¸ßÆøÎ¼Ç¼Ϊ11.1¡æ£¬¶ø1950 ÄêΪ2.2¡æ¡£

2.3.1.2 ¾ÉµÄºÍеÄJava MapReduce API

ǰһС½ÚÖÐʹÓõÄJavaMapReduceAPIÂÊÏÈÔÚHadoop0.20.0Öз¢²¼¡£ÕâһеÄAPI£¬ÓÐʱҲ³ÆÎª¡°ÉÏÏÂÎĶÔÏó¡±(contextobject)£¬Éè¼ÆÒâͼÊÇʹAPIÈÕºó¸üÈÝÒ×À©Õ¹¡£ÐÂAPIÔÚÀàÐÍÉϲ»¼æÈݾɵÄAPI£¬ËùÒÔÐèÒªÖØÐ´ÒÔǰµÄÓ¦ÓóÌÐò²ÅÄÜʹеÄAPI·¢»Ó×÷Óá£

³ýÁËȱʧµÄ¼«ÉÙÊýMapReduceÀà¿âÖ®Íâ(Çë²é¿´×îз¢Ðа汾£¬ÒÔÈ·¶¨org.apache.hadoop.mapreduce.libµÄ×Ó³ÌÐò°üÖÐÊÇ·ñ°üº¬×Ô¼ºÏëÒªµÄÀà¿â)£¬ÐµÄAPIÔÚ×îз¢²¼µÄ1.xϵÁÐ(¸ÃϵÁÐÊÇ0.20ϵÁеĺó¼Ì°æ±¾)ÖеÃÒÔÏÔÖø¸ÄÉÆ¡£

±¾ÊéµÄǰÁ½¸ö°æ±¾ÊÇ»ùÓÚ0.20·¢Ðа汾µÄ£¬Ò»Ö±Ê¹ÓõÄÊǾɵÄAPI¡£³ýÁ˼«ÉÙ¼¸¸öµØ·½£¬±¾ÊéÖн«ÐµÄAPI×÷ΪÖ÷ҪʹÓõÄAPI¡£ÒòΪ±¾ÊéÍøÕ¾ÉÏÕë¶ÔÊéÖеķ¶ÀýÌṩÁËʹÓþɵÄAPIµÄ´úÂ룬ËùÒÔÄãÏ£ÍûʹÓþɵÄAPIÒ²ÊÇ¿ÉÒԵġ£(²¿·ÖÔçÆÚµÄ0.20·¢Ðа汾·´¶ÔʹÓþÉAPI£¬µ«ÊÇÔÚºóÐø°æ±¾ÖпÉÒÔ¼ÌÐøÊ¹ÓþÉAPI£¬Òò´Ë1.xºÍ2.x·¢Ðа汾ͬʱ֧³ÖоÉAPI£¬¶ø²»»áÌáʾ·ÏÆú¾ÉAPIµÄ¾¯¸æ¡£)

оÉAPIÖ®¼äÓÐÈçϼ¸¸öÃ÷ÏÔµÄÇø±ð¡£

ÐÂAPIÇãÏòÓÚʹÓÃÐéÀ࣬¶ø²»Êǽӿڣ¬ÒòΪ¸üÓÐÀûÓÚÀ©Õ¹¡£ÕâÒâζ×ÅÓò»×ÅÐÞ¸ÄÀàµÄʵÏÖ£¬¼´¿ÉÔÚÐéÀàÖÐÌí¼ÓÒ»¸ö·½·¨(¼´Ä¬ÈϵÄʵÏÖ)¡£ÔÚ¾ÉAPIÖÐʹÓÃMapperºÍReducer½Ó¿Ú£¬¶øÔÚÐÂAPIÖÐʹÓÃÐéÀà¡£

ÐÂAPI·ÅÔÚorg.apache.hadoop.mapreduce°ü(ºÍ×Ó°ü)ÖС£Ö®Ç°°æ±¾µÄAPIÒÀ¾É·ÅÔÚorg.apache.hadoop.mapredÖС£

ÐÂAPI³ä·ÖʹÓÃÉÏÏÂÎĶÔÏó£¬Ê¹Óû§´úÂëÄÜÓëMapReduceϵͳͨÐÅ¡£ÀýÈ磬еÄContext»ù±¾Í³Ò»Á˾ÉAPIÖеÄJobConf¡¢OutputCollectorºÍReporterµÄ¹¦ÄÜ¡£

¼ü/Öµ¶Ô¼Ç¼ÔÚÕâÁ½ÀàAPIÖж¼±»ÍƸømapperºÍreducer£¬µ«³ý´ËÖ®Í⣬еÄAPIͨ¹ýÖØÐ´run()·½·¨ÔÊÐímapperºÍreducer¿ØÖÆÖ´ÐÐÁ÷³Ì¡£ÀýÈ磬¼È¿ÉÒÔÅú´¦Àí¼Ç¼£¬Ò²¿ÉÒÔÔÚ´¦ÀíÍêËùÓеļǼ֮ǰֹͣ¡£ÔÚ¾ÉAPIÖпÉÒÔͨ¹ýдMapRunnableÀàÔÚmapperÖÐʵÏÖÉÏÊö¹¦ÄÜ£¬µ«ÊÇÔÚreducerÖÐûÓжԵȵÄʵÏÖ¡£

еÄAPIÖÐ×÷Òµ¿ØÖÆÓÉJobÀàʵÏÖ£¬¶ø·Ç¾ÉAPIÖеÄJobClientÀ࣬еÄAPIÖÐɾ³ýÁËJobClientÀà¡£

ÐÂÔöµÄAPIʵÏÖÁËÅäÖõÄͳһ¡£¾ÉAPIͨ¹ýÒ»¸öÌØÊâµÄJobConf¶ÔÏóÅäÖÃ×÷Òµ£¬¸Ã¶ÔÏóÊÇHadoopÅäÖöÔÏóµÄÒ»¸öÀ©Õ¹(ÓÃÓÚÅäÖÃÊØ»¤½ø³Ì£¬ÏêÇéÇë²Î¼û5.1½ÚµÄ¡°APIÅäÖá±)¡£ÔÚÐÂAPIÖУ¬×÷ÒµµÄÅäÖÃÓÉConfiguration(»òÐíͨ¹ýJobÀàÖеÄһЩ¸¨Öú·½·¨)À´Íê³É¡£

Êä³öÎļþµÄÃüÃû·½Ê½ÉÔÓв»Í¬¡£ÔھɵÄAPIÖÐmapºÍreduceµÄÊä³ö±»Í³Ò»ÃüÃûΪpart-nnmm£¬µ«ÊÇÔÚÐÂAPIÖÐmapµÄÊä³öÎļþÃûΪpart-m-nnnnn£¬¶øreduceµÄÊä³öÎļþÃûΪpart-r-nnnnn(ÆäÖÐnnnnnÊÇ´Ó0¿ªÊ¼µÄ±íʾ·Ö¿éÐòºÅµÄÕûÊý)¡£

ÐÂAPIÖеÄÓû§ÖØÔغ¯Êý±»ÉùÃ÷ΪÅ׳öÒì³£java.lang.InterruptedException¡£ÕâÒâζ×Å¿ÉÒÔÓôúÂëÀ´ÊµÏÖÖжÏÏìÓ¦£¬´Ó¶øÊ¹¸Ã¿ò¼ÜÔÚ±ØÒªÊ±¿ÉÒÔÓÅÑŵØÈ¡ÏûÐ賤ʱ¼äÔËÐеÄ×÷Òµ¡£

ÔÚеÄAPIÖУ¬reduce()´«µÝµÄÖµÊÇjava.lang.IterableÀàÐ͵쬶ø·Çjava.lang.IteratorÀàÐÍ(¾ÉAPIÖд«µÝ¸ÃÀàÐ͵ÄÖµ)¡£ÕâÒ»¸Ä±äʹÎÒÃǸüÈÝÒ×ͨ¹ýJavaµÄfor-eachÑ­»·½á¹¹À´À´µü´úÕâЩֵ¡£

For(VALUEINvalueLvalues){¡­} 

·¶Àý2-6ËùʾΪʹÓþÉAPI ÖØÐ´µÄMaxTemperatureÓ¦Óᣲ»Í¬µÄµØ·½ÒѾ­¼Ó´ÖÏÔʾ¡£

½«MapperºÍReducerÀàת»»ÎªÐÂAPIʱ£¬¼Çס½«map()ºÍreduce()µÄÇ©Ãûת»»ÎªÐÂÐÎʽ¡£Èç¹ûÖ»Êǽ«ÀàµÄ¼Ì³ÐÐÞ¸ÄΪ¶ÔеÄMapperºÍReducerÀàµÄ¼Ì³Ð£¬±àÒëµÄʱºò²»»á±¨´í»òÏÔʾ¾¯¸æÐÅÏ¢£¬ÒòΪеÄMapperºÍReducerÀàҲͬÑùÌṩÁ˵ȼ۵Ämap()ºÍreduce()º¯Êý¡£µ«ÊÇ£¬×Ô¼ºÐ´µÄmapper»òreducer´úÂëÊDz»»á±»µ÷Óõģ¬Õâ»áµ¼ÖÂÄÑÒÔÕï¶ÏµÄ´íÎó¡£

¶Ômap()ºÍreduce()·½·¨Ìí¼Ó@override×¢ÊÍ£¬Java±àÒëÆ÷»á·¢ÏÖÕâЩ´íÎó¡£

·¶Àý2-6. ʹÓþÉMapReduce APIÖØÐ´ºóµÄMaxTemperatureÓ¦ÓÃ

publicclassOldMaxTemperature{ 

staticclassOldMaxTemperatureMapperextendsMapReduceBase
implementsMapper<LongWritable,Text,Text,IntWritable>{

privatestaticfinalintMISSING=9999;

@Override
publicvoidmap(LongWritablekey,Textvalue,
OutputCollector<Text,IntWritable>output,Reporterreporter)
throwsIOException{

Stringline=value.toString();
Stringyear=line.substring(15,19);
intairTemperature;
if(line.charAt(87)=='+'){//parseIntdoesn'tlikeleadingplussigns
airTemperature=Integer.parseInt(line.substring(88,92));
}else{
airTemperature=Integer.parseInt(line.substring(87,92));
}
Stringquality=line.substring(92,93);
if(airTemperature!=MISSING&&quality.matches("[01459]")){
output.collect(newText(year),newIntWritable(airTemperature));
}
}
}

staticclassOldMaxTemperatureReducerextendsMapReduceBase
implementsReducer<Text,IntWritable,Text,IntWritable>{
@Override
publicvoidreduce(Textkey,Iterator<IntWritable>values,
OutputCollector<Text,IntWritable>output,Reporterreporter)
throwsIOException{

intmaxValue=Integer.MIN_VALUE;
while(values.hasNext()){
maxValue=Math.max(maxValue,values.next().get());
}
output.collect(key,newIntWritable(maxValue));
}
}
publicstaticvoidmain(String[]args)throwsIOException{
if(args.length!=2){
System.err.println("Usage:OldMaxTemperature<inputpath><outputpath>");
System.exit(-1);
}

JobConfconf=newJobConf(OldMaxTemperature.class);
conf.setJobName("Maxtemperature");

FileInputFormat.addInputPath(conf,newPath(args[0]));
FileOutputFormat.setOutputPath(conf,newPath(args[1]));

conf.setMapperClass(OldMaxTemperatureMapper.class);
conf.setReducerClass(OldMaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);
}
}

2.4 ºáÏòÀ©Õ¹

Ç°Ãæ½éÉÜÁËMapReduceÕë¶ÔÉÙÁ¿ÊäÈëÊý¾ÝÊÇÈçºÎ¹¤×÷µÄ£¬ÏÖÔÚÎÒÃÇ¿ªÊ¼Äñî«Õû¸öϵͳÒÔ¼°ÓдóÁ¿ÊäÈëʱµÄÊý¾ÝÁ÷¡£ÎªÁ˼òµ¥Æð¼û£¬µ½Ä¿Ç°ÎªÖ¹£¬ÎÒÃǵÄÀý×Ó¶¼Ö»ÊÇÓÃÁ˱¾µØÎļþϵͳÖеÄÎļþ¡£È»¶ø£¬ÎªÁËʵÏÖºáÏòÀ©Õ¹(scalingout)£¬ÎÒÃÇÐèÒª°ÑÊý¾Ý´æ´¢ÔÚ·Ö²¼Ê½ÎļþϵͳÖУ¬Ò»°ãΪHDFS(Ïê¼ûµÚ3ÕÂ)£¬ÓÉ´ËÔÊÐíHadoop½«MapReduce¼ÆËã×ªÒÆµ½´æ´¢Óв¿·ÖÊý¾ÝµÄ¸÷̨»úÆ÷ÉÏ¡£ÏÂÃæÎÒÃÇ¿´¿´¾ßÌå¹ý³Ì¡£

2.4.1 Êý¾ÝÁ÷

Ê×Ïȶ¨ÒåһЩÊõÓï¡£MapReduce×÷Òµ(job)Êǿͻ§¶ËÐèÒªÖ´ÐеÄÒ»¸ö¹¤×÷µ¥Ôª£ºËü°üÀ¨ÊäÈëÊý¾Ý¡¢MapReduce³ÌÐòºÍÅäÖÃÐÅÏ¢¡£Hadoop½«×÷Òµ·Ö³ÉÈô¸É¸öСÈÎÎñ(task)À´Ö´ÐУ¬ÆäÖаüÀ¨Á½ÀàÈÎÎñ£ºmapÈÎÎñºÍreduceÈÎÎñ¡£

ÓÐÁ½Àà½Úµã¿ØÖÆ×Å×÷ÒµÖ´Ðйý³Ì£ºÒ»¸öjobtracker¼°Ò»ÏµÁÐtasktracker¡£jobtrackerͨ¹ýµ÷¶ÈtasktrackerÉÏÔËÐеÄÈÎÎñÀ´Ð­µ÷ËùÓÐÔËÐÐÔÚϵͳÉϵÄ×÷Òµ¡£tasktrackerÔÚÔËÐÐÈÎÎñµÄͬʱ½«ÔËÐнø¶È±¨¸æ·¢Ë͸øjobtracker£¬jobtrackerÓɴ˼ǼÿÏî×÷ÒµÈÎÎñµÄÕûÌå½ø¶ÈÇé¿ö¡£Èç¹ûÆäÖÐÒ»¸öÈÎÎñʧ°Ü£¬jobtracker¿ÉÒÔÔÚÁíÍâÒ»¸ötasktracker½ÚµãÉÏÖØÐµ÷¶È¸ÃÈÎÎñ¡£

Hadoop½«MapReduceµÄÊäÈëÊý¾Ý»®·Ö³ÉµÈ³¤µÄСÊý¾Ý¿é£¬³ÆÎªÊäÈë·ÖƬ(inputsplit)»ò¼ò³Æ¡°·ÖƬ¡±¡£HadoopΪÿ¸ö·ÖƬ¹¹½¨Ò»¸ömapÈÎÎñ£¬²¢ÓɸÃÈÎÎñÀ´ÔËÐÐÓû§×Ô¶¨ÒåµÄmapº¯Êý´Ó¶ø´¦Àí·ÖƬÖеÄÿÌõ¼Ç¼¡£

ÓµÓÐÐí¶à·ÖƬ£¬Òâζ×Å´¦Àíÿ¸ö·ÖƬËùÐèÒªµÄʱ¼äÉÙÓÚ´¦ÀíÕû¸öÊäÈëÊý¾ÝËù»¨µÄʱ¼ä¡£Òò´Ë£¬Èç¹ûÎÒÃDz¢Ðд¦Àíÿ¸ö·ÖƬ£¬ÇÒÿ¸ö·ÖƬÊý¾Ý±È½ÏС£¬ÄÇôÕû¸ö´¦Àí¹ý³Ì½«»ñµÃ¸üºÃµÄ¸ºÔØÆ½ºâ£¬ÒòΪһ̨½Ï¿ìµÄ¼ÆËã»úÄܹ»´¦ÀíµÄÊý¾Ý·ÖƬ±Èһ̨½ÏÂýµÄ¼ÆËã»ú¸ü¶à£¬ÇÒ³ÉÒ»¶¨µÄ±ÈÀý¡£¼´Ê¹Ê¹ÓÃÏàͬµÄ»úÆ÷£¬Ê§°ÜµÄ½ø³Ì»òÆäËûͬʱÔËÐеÄ×÷ÒµÄܹ»ÊµÏÖÂúÒâµÄ¸ºÔØÆ½ºâ£¬²¢ÇÒÈç¹û·ÖƬ±»Çзֵøüϸ£¬¸ºÔØÆ½ºâµÄ»á¸ü¸ß¡£

ÁíÒ»·½Ã棬Èç¹û·ÖƬÇзֵÃ̫С£¬ÄÇô¹ÜÀí·ÖƬµÄ×Üʱ¼äºÍ¹¹½¨mapÈÎÎñµÄ×Üʱ¼ä½«¾ö¶¨×÷ÒµµÄÕû¸öÖ´ÐÐʱ¼ä¡£¶ÔÓÚ´ó¶àÊý×÷ÒµÀ´Ëµ£¬Ò»¸öºÏÀíµÄ·ÖƬ´óСÇ÷ÏòÓÚHDFSµÄÒ»¸ö¿éµÄ´óС£¬Ä¬ÈÏÊÇ64MB£¬²»¹ý¿ÉÒÔÕë¶Ô¼¯Èºµ÷ÕûÕâ¸öĬÈÏÖµ(¶Ôн¨µÄËùÓÐÎļþ)£¬»ò¶Ôн¨µÄÿ¸öÎļþ¾ßÌåÖ¸¶¨¡£

HadoopÔÚ´æ´¢ÓÐÊäÈëÊý¾Ý(HDFSÖеÄÊý¾Ý)µÄ½ÚµãÉÏÔËÐÐmapÈÎÎñ£¬¿ÉÒÔ»ñµÃ×î¼ÑÐÔÄÜ¡£Õâ¾ÍÊÇËùνµÄ¡°Êý¾Ý±¾µØ»¯ÓÅ»¯¡±(datalocalityoptimization)£¬ÒòΪËüÎÞÐèʹÓñ¦¹óµÄ¼¯Èº´ø¿í×ÊÔ´¡£µ«ÊÇ£¬ÓÐʱ¶ÔÓÚÒ»¸ömapÈÎÎñµÄÊäÈëÀ´Ëµ£¬´æ´¢ÓÐij¸öHDFSÊý¾Ý¿é±¸·ÝµÄÈý¸ö½Úµã¿ÉÄÜÕýÔÚÔËÐÐÆäËûmapÈÎÎñ£¬´Ëʱ×÷Òµµ÷¶ÈÐèÒªÔÚÈý¸ö±¸·ÝÖеÄij¸öÊý¾ÝѰÇóͬ¸ö»ú¼ÜÖпÕÏеĻúÆ÷À´ÔËÐиÃmapÈÎÎñ¡£½ö½öÔڷdz£Å¼È»µÄÇé¿öÏÂ(¸ÃÇé¿ö»ù±¾Éϲ»»á·¢Éú)£¬»áʹÓÃÆäËû»ú¼ÜÖеĻúÆ÷ÔËÐиÃmapÈÎÎñ£¬Õ⽫µ¼Ö»ú¼ÜÓë»ú¼ÜÖ®¼äµÄÍøÂç´«Êä¡£

ͼ2-2ÏÔʾÁËÕâÈýÖÖ¿ÉÄÜÐÔ¡£

ͼ2-2. ±¾µØÊý¾Ý(a)¡¢±¾µØ»ú¼Ü(b)ºÍ¿ç»ú¼Ü(c)mapÈÎÎñ

ÏÖÔÚÎÒÃÇÓ¦¸ÃÇå³þΪʲô×î¼Ñ·ÖƬµÄ´óСӦ¸ÃÓë¿é´óСÏàͬ£ºÒòΪËüÊÇÈ·±£¿ÉÒÔ´æ´¢ÔÚµ¥¸ö½ÚµãÉϵÄ×î´óÊäÈë¿éµÄ´óС¡£Èç¹û·ÖƬ¿çÔ½Á½¸öÊý¾Ý¿é£¬ÄÇô¶ÔÓÚÈκÎÒ»¸öHDFS½Úµã£¬»ù±¾É϶¼²»¿ÉÄÜͬʱ´æ´¢ÕâÁ½¸öÊý¾Ý¿é£¬Òò´Ë·ÖƬÖеIJ¿·ÖÊý¾ÝÐèҪͨ¹ýÍøÂç´«Êäµ½mapÈÎÎñ½Úµã¡£ÓëʹÓñ¾µØÊý¾ÝÔËÐÐÕû¸ömapÈÎÎñÏà±È£¬ÕâÖÖ·½·¨ÏÔȻЧÂʸüµÍ¡£

mapÈÎÎñ½«ÆäÊä³öдÈë±¾µØÓ²ÅÌ£¬¶ø·ÇHDFS¡£ÕâÊÇΪʲô£¿ÒòΪmapµÄÊä³öÊÇÖмä½á¹û£º¸ÃÖмä½á¹ûÓÉreduceÈÎÎñ´¦Àíºó²Å²úÉú×îÖÕÊä³ö½á¹û£¬¶øÇÒÒ»µ©×÷ÒµÍê³É£¬mapµÄÊä³ö½á¹û¾Í¿ÉÒÔɾ³ý¡£Òò´Ë£¬Èç¹û°ÑËü´æ´¢ÔÚHDFSÖв¢ÊµÏÖ±¸·Ý£¬ÄÑÃâÓÐЩСÌâ´ó×ö¡£Èç¹û¸Ã½ÚµãÉÏÔËÐеÄmapÈÎÎñÔÚ½«mapÖмä½á¹û´«Ë͸øreduceÈÎÎñ֮ǰʧ°Ü£¬Hadoop½«ÔÚÁíÒ»¸ö½ÚµãÉÏÖØÐÂÔËÐÐÕâ¸ömapÈÎÎñÒÔÔٴι¹½¨mapÖмä½á¹û¡£

reduceÈÎÎñ²¢²»¾ß±¸Êý¾Ý±¾µØ»¯µÄÓÅÊÆ¡ª¡ªµ¥¸öreduceÈÎÎñµÄÊäÈëͨ³£À´×ÔÓÚËùÓÐmapperµÄÊä³ö¡£ÔÚ±¾ÀýÖУ¬ÎÒÃǽöÓÐÒ»¸öreduceÈÎÎñ£¬ÆäÊäÈëÊÇËùÓÐmapÈÎÎñµÄÊä³ö¡£Òò´Ë£¬ÅŹýÐòµÄmapÊä³öÐèͨ¹ýÍøÂç´«Êä·¢Ë͵½ÔËÐÐreduceÈÎÎñµÄ½Úµã¡£Êý¾ÝÔÚreduce¶ËºÏ²¢£¬È»ºóÓÉÓû§¶¨ÒåµÄreduceº¯Êý´¦Àí¡£reduceµÄÊä³öͨ³£´æ´¢ÔÚHDFSÖÐÒÔʵÏÖ¿É¿¿´æ´¢¡£ÈçµÚ3ÕÂËùÊö£¬¶ÔÓÚÿ¸öreduceÊä³öµÄHDFS¿é£¬µÚÒ»¸ö¸´±¾´æ´¢ÔÚ±¾µØ½ÚµãÉÏ£¬ÆäËû¸´±¾´æ´¢ÔÚÆäËû»ú¼Ü½ÚµãÖС£Òò´Ë£¬½«reduceµÄÊä³öдÈëHDFSȷʵÐèÒªÕ¼ÓÃÍøÂç´ø¿í£¬µ«ÕâÓëÕý³£µÄHDFSÁ÷Ë®ÏßдÈëµÄÏûºÄÒ»Ñù¡£

Ò»¸öreduceÈÎÎñµÄÍêÕûÊý¾ÝÁ÷Èçͼ2-3Ëùʾ¡£ÐéÏß¿ò±íʾ½Úµã£¬ÐéÏß¼ýÍ·±íʾ½ÚµãÄÚ²¿µÄÊý¾Ý´«Ê䣬¶øÊµÏß¼ýÍ·±íʾ²»Í¬½ÚµãÖ®¼äµÄÊý¾Ý´«Êä¡£

ͼ2-3. Ò»¸öreduceÈÎÎñµÄMapReduceÊý¾ÝÁ÷

reduceÈÎÎñµÄÊýÁ¿²¢·ÇÓÉÊäÈëÊý¾ÝµÄ´óС¾ö¶¨£¬¶øÊÂʵÉÏÊǶÀÁ¢Ö¸¶¨µÄ¡£7.1.1½Ú½«½éÉÜÈçºÎΪָ¶¨µÄ×÷ҵѡÔñreduceÈÎÎñµÄÊýÁ¿¡£

Èç¹ûÓкöà¸öreduceÈÎÎñ£¬Ã¿¸ömapÈÎÎñ¾Í»áÕë¶ÔÊä³ö½øÐзÖÇø(partition)£¬¼´ÎªÃ¿¸öreduceÈÎÎñ½¨Ò»¸ö·ÖÇø¡£Ã¿¸ö·ÖÇøÓÐÐí¶à¼ü(¼°Æä¶ÔÓ¦µÄÖµ)£¬µ«Ã¿¸ö¼ü¶ÔÓ¦µÄ¼ü/Öµ¶Ô¼Ç¼¶¼ÔÚͬһ·ÖÇøÖС£·ÖÇøÓÉÓû§¶¨ÒåµÄpartitionº¯Êý¿ØÖÆ£¬µ«Í¨³£ÓÃĬÈϵÄpartitionerͨ¹ý¹þÏ£º¯ÊýÀ´·ÖÇø£¬ºÜ¸ßЧ¡£

Ò»°ãÇé¿öÏ£¬¶à¸öreduceÈÎÎñµÄÊý¾ÝÁ÷Èçͼ2-4Ëùʾ¡£¸ÃͼÇå³þµØ±íÃ÷ÁËΪʲômapÈÎÎñºÍreduceÈÎÎñÖ®¼äµÄÊý¾ÝÁ÷³ÆÎªshuffle(»ìÏ´)£¬ÒòΪÿ¸öreduceÈÎÎñµÄÊäÈë¶¼À´×ÔÐí¶àmapÈÎÎñ¡£shuffleÒ»°ã±ÈͼÖÐËùʾµÄ¸ü¸´ÔÓ£¬¶øÇÒµ÷Õû»ìÏ´²ÎÊý¶Ô×÷Òµ×ÜÖ´ÐÐʱ¼äµÄÓ°Ïì·Ç³£´ó£¬ÏêÇé²Î¼û6.4½Ú¡£

×îºó£¬µ±Êý¾Ý´¦Àí¿ÉÒÔÍêÈ«²¢ÐУ¬¼´ÎÞÐè»ìϴʱ£¬¿ÉÄÜ»á³öÏÖÎÞreduceÈÎÎñµÄÇé¿ö(ʾÀý²Î¼û7.2.2½Ú)¡£ÔÚÕâÖÖÇé¿öÏ£¬Î¨Ò»µÄ·Ç±¾µØ½ÚµãÊý¾Ý´«ÊäÊÇmapÈÎÎñ½«½á¹ûдÈëHDFS(²Î¼ûͼ2-5)¡£

ͼ2-4. ¶à¸öreduceÈÎÎñµÄÊý¾ÝÁ÷

2.4.2 combinerº¯Êý

¼¯ÈºÉϵĿÉÓôø¿íÏÞÖÆÁËMapReduce×÷ÒµµÄÊýÁ¿£¬Òò´Ë¾¡Á¿±ÜÃâmapºÍreduceÈÎÎñÖ®¼äµÄÊý¾Ý´«ÊäÊÇÓÐÀûµÄ¡£HadoopÔÊÐíÓû§Õë¶ÔmapÈÎÎñµÄÊä³öÖ¸¶¨Ò»¸öcombiner£¨¾ÍÏñmapperºÍreducerÒ»Ñù)¡ª¡ªcombinerº¯ÊýµÄÊä³ö×÷Ϊreduceº¯ÊýµÄÊäÈë¡£ÓÉÓÚcombinerÊôÓÚÓÅ»¯·½°¸£¬ËùÒÔHadoopÎÞ·¨È·¶¨Òª¶ÔmapÈÎÎñÊä³ö¼Ç¼µ÷ÓöàÉÙ´Îcombiner(Èç¹ûÐèÒª)¡£»»¶øÑÔÖ®£¬²»¹Üµ÷ÓÃcombiner¶àÉٴΣ¬0´Î¡¢1´Î»ò¶à´Î£¬reducerµÄÊä³ö½á¹û¶¼ÊÇÒ»ÑùµÄ¡£

combinerµÄ¹æÔòÖÆÔ¼×Å¿ÉÓõĺ¯ÊýÀàÐÍ¡£ÕâÀï×îºÃÓÃÒ»¸öÀý×ÓÀ´ËµÃ÷¡£»¹ÊǼÙÉèÒÔǰ¼ÆËã×î¸ßÆøÎµÄÀý×Ó£¬1950ÄêµÄ¶ÁÊýÓÉÁ½¸ömapÈÎÎñ´¦Àí(ÒòΪËüÃÇÔÚ²»Í¬µÄ·ÖƬÖÐ)¡£¼ÙÉèµÚÒ»¸ömapµÄÊä³öÈçÏ£º

(1950,0) 
(1950,20)
(1950,10)

µÚ¶þ¸ömapµÄÊä³öÈçÏ£º

(1950,25) 
(1950,15)

ͼ2-5. ÎÞreduceÈÎÎñµÄMapReduceÊý¾ÝÁ÷

reduceº¯Êý±»µ÷ÓÃʱ£¬ÊäÈëÈçÏ£º

(1950,[0,20,10,25,15]) 

ÒòΪ25Ϊ¸ÃÁÐÊý¾ÝÖÐ×î´óµÄ£¬ËùÒÔËüµÄÊä³öÈçÏ£º

(1950,25) 

ÎÒÃÇ¿ÉÒÔÏñʹÓÃreduceº¯ÊýÄÇÑù£¬Ê¹ÓÃcombinerÕÒ³öÿ¸ömapÈÎÎñÊä³ö½á¹ûÖеÄ×î¸ßÆøÎ¡£Èç´ËÒ»À´£¬reduceº¯Êýµ÷ÓÃʱ½«±»´«ÈëÒÔÏÂÊý¾Ý£º

(1950,[20,25]) 

reduceÊä³öµÄ½á¹ûºÍÒÔǰһÑù¡£¸ü¼òµ¥µØËµ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄ±í´ïʽÀ´ËµÃ÷ÆøÎÂÊýÖµµÄº¯Êýµ÷Óãº

max(0,20,10,25,15)=max(max(0,20,10),max(25,15))=max(20,25)=25 

²¢·ÇËùÓк¯Êý¶¼¾ßÓиÃÊôÐÔ¡£[ ÓдËÊôÐԵĺ¯Êý½ÐcommutativeºÍassociative¡£ÓÐʱҲ½«ËüÃdzÆÎªdistributive£¬±ÈÈçÔÚGrayµÈÈË1995Äê·¢±íµÄÂÛÎÄ¡°Data Cube: A Relational Aggregation Operatior Generalizing Groupby, Cross-Tab, and Sub-Totals¡±ÖС£]ÀýÈ磬Èç¹ûÎÒÃǼÆËãÆ½¾ùÆøÎ£¬¾Í²»ÄÜÓÃÆ½¾ùÊý×÷Ϊcombiner£¬ÒòΪ

mean(0,20,10,25,15)=14 

µ«ÊÇcombiner²»ÄÜÈ¡´úreduceº¯Êý£º

mean(mean(0,20,10),mean(25,15))=mean(10,20)=15 

ÎªÊ²Ã´ÄØ£¿ÎÒÃÇÈÔÈ»ÐèÒªreduceº¯ÊýÀ´´¦Àí²»Í¬mapÊä³öÖоßÓÐÏàͬ¼üµÄ¼Ç¼¡£µ«ËüÄÜÓÐЧ¼õÉÙmapperºÍreducerÖ®¼äµÄÊý¾Ý´«ÊäÁ¿£¬ÔÚMapReduce×÷ÒµÖÐʹÓÃcombinerº¯ÊýÐèÒªÉ÷ÖØ¿¼ÂÇ¡£

Ö¸¶¨Ò»¸öcombiner

ÈÃÎÒÃǻص½JavaMapReduce³ÌÐò£¬combinerÊÇͨ¹ýReducerÀàÀ´¶¨ÒåµÄ£¬²¢ÇÒÔÚÕâ¸öÀý×ÓÖУ¬ËüµÄʵÏÖÓëMaxTemperatureReducerÖеÄreduceº¯ÊýÏàͬ¡£Î¨Ò»µÄ¸Ä¶¯ÊÇÔÚJobÖÐÉèÖÃcombinerÀà(²Î¼û·¶Àý2-7)¡£

·¶Àý2-7.ʹÓÃcombiner¿ìËÙÕÒ³ö×î¸ßÆøÎÂ

publicclassMaxTemperatureWithCombiner{ 
publicstaticvoidmain(String[]args)throwsException{
if(args.length!=2){
System.err.println("Usage:MaxTemperatureWithCombiner<inputpath>"+
"<outputpath>");
System.exit(-1);
}

Jobjob=newJob();
job.setJarByClass(MaxTemperatureWithCombiner.class);
job.setJobName("Maxtemperature");

FileInputFormat.addInputPath(job,newPath(args[0]));
FileOutputFormat.setOutputPath(job,newPath(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true)?0:1);
}
}

2.4.3 ÔËÐзֲ¼Ê½µÄMapReduce×÷Òµ

Õâ¸ö³ÌÐòÎÞÐèÐ޸ıã¿ÉÒÔÔÚÒ»¸öÍêÕûµÄÊý¾Ý¼¯ÉÏÖ±½ÓÔËÐС£ÕâÊÇMapReduceµÄÓÅÊÆ£ºËü¿ÉÒÔ¸ù¾ÝÊý¾ÝÁ¿µÄ´óСºÍÓ²¼þ¹æÄ£½øÐÐÀ©Õ¹¡£ÕâÀïÓÐÒ»¸öÔËÐнá¹û£ºÔÚÒ»¸ö10½ÚµãEC2¼¯ÈºÔËÐÐHigh-CPUExtraLargelnstance£¬³ÌÐòÖ´ÐÐʱ¼äÖ»»¨ÁË6·ÖÖÓ¡£

ÎÒÃǽ«ÔÚµÚ5Õ·ÖÎöÔÚ¼¯ÈºÉÏÔËÐгÌÐòµÄ»úÖÆ¡£

2.5 Hadoop Streaming

HadoopÌṩÁËMapReduceµÄAPI£¬ÔÊÐíÄãʹÓ÷ÇJavaµÄÆäËûÓïÑÔÀ´Ð´×Ô¼ºµÄmapºÍreduceº¯Êý¡£HadoopStreamingʹÓÃUnix±ê×¼Á÷×÷ΪHadoopºÍÓ¦ÓóÌÐòÖ®¼äµÄ½Ó¿Ú£¬ËùÒÔÎÒÃÇ¿ÉÒÔʹÓÃÈκαà³ÌÓïÑÔͨ¹ý±ê×¼ÊäÈë/Êä³öÀ´Ð´MapReduce³ÌÐò¡£

StreamingÌìÉúÊʺÏÓÃÓÚÎı¾´¦Àí¡£mapµÄÊäÈëÊý¾Ýͨ¹ý±ê×¼ÊäÈëÁ÷´«µÝ¸ømapº¯Êý£¬²¢ÇÒÊÇÒ»ÐÐÒ»Ðеش«Ê䣬×îºó½«½á¹ûÐÐдµ½±ê×¼Êä³ö¡£mapÊä³öµÄ¼ü/Öµ¶ÔÊÇÒÔÒ»¸öÖÆ±í·û·Ö¸ôµÄÐУ¬²¢ÇÒдÈë±ê×¼Êä³öreduceº¯ÊýµÄÊäÈë¸ñʽÓëÖ®Ïàͬ(ͨ¹ýÖÆ±í·ûÀ´·Ö¸ôµÄ¼ü/Öµ¶Ô)²¢Í¨¹ý±ê×¼ÊäÈëÁ÷½øÐд«Êä¡£reduceº¯Êý´Ó±ê×¼ÊäÈëÁ÷ÖжÁÈ¡ÊäÈëÐУ¬¸ÃÊäÈëÒÑÓÉHadoop¿ò¼Ü¸ù¾Ý¼üÅŹýÐò£¬×îºó½«½á¹ûдÈë±ê×¼Êä³ö¡£

ÏÂÃæÊ¹ÓÃStreamingÀ´ÖØÐ´°´Äê·Ý²éÕÒ×î¸ßÆøÎµÄMapReduce³ÌÐò¡£

2.5.1 Ruby°æ±¾

·¶Àý2-8ÏÔʾÁËÓÃRuby±àдµÄmapº¯Êý¡£

·¶Àý2-8.ÓÃRuby±àд²éÕÒ×î¸ßÆøÎµÄmapº¯Êý

#!/usr/bin/envruby 
STDIN.each_linedo|line|
val=line
year,temp,q=val[15,4],val[87,5],val[92,1]
puts"#{year}t#{temp}"if(temp!="+9999"&&q=~/[01459]/)
end

³ÌÐòͨ¹ý³ÌÐò¿éÖ´ÐÐSTDIN(Ò»¸öIOÀàÐ͵ÄÈ«¾Ö³£Á¿)ÖеÄÿһÐÐÀ´µü´úÖ´Ðбê×¼ÊäÈëÖеÄÿһÐС£¸Ã³ÌÐò¿é´ÓÊäÈëµÄÿһÐÐÖÐÈ¡³öÏà¹Ø×ֶΣ¬Èç¹ûÆøÎÂÓÐЧ£¬¾Í½«Äê·ÝÒÔ¼°ÆøÎÂÒÔÖÆ±í·ût¸ô¿ªÐ´Îª±ê×¼Êä³ö(ʹÓÃputs)¡£

ÖµµÃÒ»ÌáµÄÊÇStreamingºÍJava MapReduce APIÖ®¼äµÄÉè¼Æ²îÒì¡£Java API¿ØÖƵÄmapº¯ÊýÒ»´ÎÖ»´¦ÀíÒ»Ìõ¼Ç¼¡£Õë¶ÔÊäÈëÊý¾ÝÖеÄÿһÌõ¼Ç¼£¬¸Ã¿ò¼Ü¾ùÐèµ÷ÓÃMapperµÄmap()·½·¨À´´¦Àí¡£È»¶øÔÚStreamingÖУ¬map³ÌÐò¿ÉÒÔ×Ô¼º¾ö¶¨ÈçºÎ´¦ÀíÊäÈëÊý¾Ý£¬ÀýÈ磬Ëü¿ÉÒÔÇáËɶÁÈ¡²¢Í¬Ê±´¦ÀíÈô¸ÉÐУ¬ÒòΪËüÊܶÁ²Ù×÷µÄ¿ØÖÆ¡£Óû§µÄJava mapʵÏÖµÄÊÇ¡°ÍÆ¡±¼Ç¼·½Ê½£¬µ«ËüÈÔÈ»¿ÉÒÔͬʱ´¦Àí¶àÐУ¬¾ßÌå×ö·¨ÊÇͨ¹ýmapperÖÐʵÀý±äÁ¿½«Ö®Ç°¶ÁÈ¡µÄ¶àÐлã¾ÛÔÚÒ»Æð¡£[ ÁíÒ»ÖÖ·½·¨ÊÇ£¬¿ÉÒÔÔÚÐÂÔöµÄMapReduce APIÖÐʹÓá°À­¡±µÄ·½Ê½À´´¦Àí¡£ÏêÇé²Î¼û2.3.1½Ú¶ÔоÉJava MapReduce APIµÄÌÖÂÛ¡£]ÔÚÕâÖÖÇé¿öÏ£¬ÐèҪʵÏÖclose()·½·¨£¬ÒÔ±ãÖªµÀºÎʱ¶Áµ½×îºóÒ»Ìõ¼Ç¼£¬½ø¶øÍê³É¶Ô×îºóÒ»×é¼Ç¼ÐеĴ¦Àí¡£

ÓÉÓÚÕâ¸ö½Å±¾Ö»ÄÜÔÚ±ê×¼ÊäÈëºÍÊä³öÉÏÔËÐУ¬ËùÒÔ×î¼òµ¥µÄ·½Ê½ÊÇÔÚUnix¹ÜµÀÉϽøÐвâÊÔ£¬¶ø²»ÊÇÔÚHadoopÖнøÐвâÊÔ£º

%catinput/ncdc/sample.txt|ch02/src/main/ruby/max_temperature_map.rb 
1950+0000
1950+0022
1950-0011
1949+0111
1949+0078

·¶Àý2-9ÏÔʾµÄreduceº¯Êý¸ü¸´ÔÓһЩ¡£

·¶Àý2-9. ÓÃRuby±àдµÄ²éÕÒ×î¸ßÆøÎµÄreduceº¯Êý

#!/usr/bin/envruby 

last_key,max_val=nil,-1000000
STDIN.each_linedo|line|
key,val=line.split("t")
iflast_key&&last_key!=key
puts"#{last_key}t#{max_val}"
last_key,max_val=key,val.to_i
else
last_key,max_val=key,[max_val,val.to_i].max
end
end
puts"#{last_key}t#{max_val}"iflast_key

ͬÑù£¬³ÌÐò±éÀú±ê×¼ÊäÈëÖеÄÐУ¬µ«ÔÚÎÒÃÇ´¦Àíÿ¸ö¼ü×éʱ£¬Òª´æ´¢Ò»Ð©×´Ì¬¡£ÔÚÕâÖÖÇé¿öÏ£¬¼üÊÇÄê·Ý£¬ÎÒÃÇ´æ´¢×îºóÒ»¸ö¿´µ½µÄ¼üºÍÆù½ñΪֹ¼ûµ½µÄ¸Ã¼ü¶ÔÓ¦µÄ×î¸ßÆøÎ¡£MapReduce¿ò¼Ü±£Ö¤Á˼üµÄÓÐÐòÐÔ£¬ÎÒÃÇÓÉ´Ë¿ÉÖª£¬Èç¹û¶Áµ½Ò»¸ö¼üÓëǰһ¸ö¼ü²»Í¬£¬¾ÍÐèÒª¿ªÊ¼´¦ÀíÒ»¸öеļü×é¡£Ïà±È֮ϣ¬JavaAPIϵͳÌṩһ¸öÕë¶Ôÿ¸ö¼ü×éµÄµü´úÆ÷£¬¶øÔÚStreamingÖУ¬ÐèÒªÔÚ³ÌÐòÖÐÕÒ³ö¼ü×éµÄ±ß½ç¡£

ÎÒÃÇ´ÓÿÐÐÈ¡³ö¼üºÍÖµ£¬È»ºóÈç¹ûÕýºÃÍê³ÉÒ»¸ö¼ü×éµÄ´¦Àí(last_key&last_key!=key)£¬¾ÍÕë¶Ô¸Ã¼ü×éдÈë¸Ã¼ü¼°Æä×î¸ßÆøÎ£¬ÓÃÒ»¸öÖÆ±í·ûÀ´½øÐзָô£¬×îºó¿ªÊ¼´¦Àíмü×éʱÎÒÃÇÐèÒªÖØÖÃ×î¸ßÆøÎÂÖµ¡£Èç¹ûÉÐδÍê³É¶ÔÒ»¸ö¼ü×éµÄ´¦Àí£¬ÄÇô¾ÍÖ»¸üе±Ç°¼üµÄ×î¸ßÆøÎ¡£

³ÌÐòµÄ×îºóÒ»ÐÐÈ·±£´¦ÀíÍêÊäÈëµÄ×îºóÒ»¸ö¼ü×éÖ®ºó£¬»áÓÐÒ»ÐÐÊä³ö¡£

ÏÖÔÚ¿ÉÒÔÓÃUnix¹ÜÏßÀ´Ä£ÄâÕû¸öMapReduce¹ÜÏߣ¬¸Ã¹ÜÏßÓëͼ2-1ÖÐÏÔʾµÄUnix¹ÜÏßÊÇÏàͬµÄ£º

%catinput/ncdc/sample.txt|ch02/src/main/ruby/max_temperature_map.rb| 
sort|ch02/src/main/ruby/max_temperature_reduce.rb
1949111
195022

Êä³ö½á¹ûºÍJava³ÌÐòµÄÒ»Ñù£¬ËùÒÔÏÂÒ»²½ÊÇͨ¹ýHadoopÔËÐÐËü¡£

hadoopÃüÁî²»Ö§³ÖStreaming£¬Òò´Ë£¬ÎÒÃÇÐèÒªÔÚÖ¸¶¨Streaming JARÎļþÁ÷ÓëjarÑ¡Ïîʱָ¶¨¡£Streaming³ÌÐòµÄÑ¡ÏîÖ¸¶¨ÁËÊäÈëºÍÊä³ö·¾¶ÒÔ¼°mapºÍreduce½Å±¾¡£ÈçÏÂËùʾ£º

%hadoopjar$HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar 
-inputinput/ncdc/sample.txt
-outputoutput
-mapperch02/src/main/ruby/max_temperature_map.rb
-reducerch02/src/main/ruby/max_temperature_reduce.rb

ÔÚÒ»¸ö¼¯ÈºÉÏÔËÐÐÒ»¸öÅÓ´óµÄÊý¾Ý¼¯Ê±£¬ÎÒÃÇÓ¦¸ÃʹÓÃ-combinerÑ¡ÏîÀ´ÉèÖÃcombiner¡£

ÔÚ1.xÖ®ºóµÄ·¢Ðа汾£¬combiner¿ÉÒÔÊÇÈκÎÒ»¸öStreamingÃüÁî¡£¶ÔÓÚÔçÆÚ°æ±¾£¬combinerÖ»ÄÜÓÃJavaд£¬ËùÒÔÒ»¸ö±äͨµÄ·½·¨ÊÇÔÚmapperÖнøÐÐÊÖ¶¯ºÏ²¢£¬´Ó¶ø±Ü¿ªJavaÓïÑÔ¡£ÔÚÕâÀÎÒÃÇ¿ÉÒÔ°Ñmapper¸Ä³É¹ÜÏߣº

%hadoopjar$HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar 
-inputinput/ncdc/all
-outputoutput
-mapper"ch02/src/main/ruby/max_temperature_map.rb|sort|
ch02/src/main/ruby/max_temperature_reduce.rb"
-reducerch02/src/main/ruby/max_temperature_reduce.rb
-filech02/src/main/ruby/max_temperature_map.rb
-filech02/src/main/ruby/max_temperature_reduce.rb

»¹Ðè×¢Òâ-fileÑ¡ÏîµÄʹÓã¬ÔÚ¼¯ÈºÉÏÔËÐÐStreaming³ÌÐòʱ£¬ÎÒÃÇ»áʹÓÃÕâ¸öÑ¡Ï´Ó¶ø½«½Å±¾´«Êäµ½¼¯Èº¡£

2.5.2 Python°æ±¾

StreamingÖ§³ÖÈκοÉÒÔ´Ó±ê×¼ÊäÈë¶ÁÈ¡ºÍдÈëµ½±ê×¼Êä³öÖеıà³ÌÓïÑÔ£¬Òò´Ë¶ÔÓÚ¸üÊìϤPythonµÄ¶ÁÕߣ¬ÏÂÃæÌṩÁËͬһ¸öÀý×ÓµÄPython°æ±¾¡£map½Å±¾²Î¼û·¶Àý2-10£¬reduce½Å±¾²Î¼û·¶Àý2-11¡£

·¶Àý2-10.ÓÃÓÚ²éÕÒ×î¸ßÆøÎµÄmapº¯Êý(python°æ)

#!/usr/bin/envpython 

importre
importsys

forlineinsys.stdin:
val=line.strip()
(year,temp,q)=(val[15:19],val[87:92],val[92:93])
if(temp!="+9999"andre.match("[01459]",q)):
print"%st%s"%(year,temp)

·¶Àý2-11. ÓÃÓÚ²éÕÒ×î¸ßÆøÎµÄreduceº¯Êý(python°æ)

#!/usr/bin/envpython 
importsys
(last_key,max_val)=(None,-sys.maxint)
forlineinsys.stdin:
(key,val)=line.strip().split("t")
iflast_keyandlast_key!=key:
print"%st%s"%(last_key,max_val)
(last_key,max_val)=(key,int(val))
else:
(last_key,max_val)=(key,max(max_val,int(val)))
iflast_key:
print"%st%s"%(last_key,max_val)

ÎÒÃÇ¿ÉÒÔÏñ²âÊÔRuby³ÌÐòÄÇÑù²âÊÔ³ÌÐò²¢ÔËÐÐ×÷Òµ¡£ÀýÈ磬¿ÉÒÔÏñÏÂÃæÕâÑùÔËÐвâÊÔ£º

%catinput/ncdc/sample.txt|ch02/src/main/python/max_temperature_map.py| 
sort|ch02/src/main/python/max_temperature_reduce.py
1949111
195022

2.6 Hadoop Pipes

HadoopPipesÊÇHadoopMapReduceµÄC++½Ó¿ÚÃû³Æ¡£²»Í¬ÓÚʹÓñê×¼ÊäÈëºÍÊä³öÀ´ÊµÏÖmap´úÂëºÍreduce´úÂëÖ®¼äµÄStreaming£¬HadoopPipesʹÓÃÌ×½Ó×Ö×÷ΪtasktrackerÓëC++°æ±¾mapº¯Êý»òreduceº¯ÊýµÄ½ø³ÌÖ®¼äµÄͨµÀ£¬¶øÎ´Ê¹ÓÃJNI¡£

ÎÒÃǽ«ÓÃC++ÖØÐ´±¾ÕÂµÄÆøÎÂʾÀý£¬È»ºó£¬ÎÒÃǽ«¿´µ½ÈçºÎʹÓÃHadoopPipesÀ´ÔËÐÐËü¡£·¶Àý2-12ÏÔʾÁËÓÃC++ÓïÑÔ±àдµÄmapº¯ÊýºÍreduceº¯ÊýµÄÔ´´úÂë¡£

·¶Àý2-12.MaxTemperature³ÌÐò(C++°æ)

#include<algorithm> 
#include<limits>
#include<stdint.h>
#include<string>

#include"hadoop/Pipes.hh"
#include"hadoop/TemplateFactory.hh"
#include"hadoop/StringUtils.hh"

classMaxTemperatureMapper:publicHadoopPipes::Mapper{
public:
MaxTemperatureMapper(HadoopPipes::TaskContext&context){
}
voidmap(HadoopPipes::MapContext&context){
std::stringline=context.getInputValue();
std::stringyear=line.substr(15,4);
std::stringairTemperature=line.substr(87,5);
std::stringq=line.substr(92,1);
if(airTemperature!="+9999"&&
(q=="0"||q=="1"||q=="4"||q=="5"||q=="9")){
context.emit(year,airTemperature);
}
}
};

classMapTemperatureReducer:publicHadoopPipes::Reducer{
public:
MapTemperatureReducer(HadoopPipes::TaskContext&context){
}
voidreduce(HadoopPipes::ReduceContext&context){
intmaxValue=INT_MIN;
while(context.nextValue()){
maxValue=std::max(maxValue,HadoopUtils::toInt(context.getInputValue()));
}
context.emit(context.getInputKey(),HadoopUtils::toString(maxValue));
}
};

intmain(intargc,char*argv[]){
returnHadoopPipes::runTask(HadoopPipes::TemplateFactory<MaxTemperatureMapper,
MapTemperatureReducer>());
}

Ó¦ÓóÌÐò¶ÔHadoop C++¿âÁ´½ÓÌṩÁËÒ»¸öÓëtasktracker ×Ó½ø³Ì½øÐÐͨѶµÄ¼òµ¥·â×°¡£Í¨¹ýÀ©Õ¹HadoopPipesÃüÃû¿Õ¼äÖж¨ÒåµÄmapperºÍreducerÁ½¸öÀ࣬ÎÒÃǶ¨ÒåÁËmap()ºÍreduce()·½·¨£¬Í¬Ê±ÎÒÃÇÌṩ¸÷ÖÖÇé¿öÏÂmap()ºÍreduce()·½·¨µÄʵÏÖ¡£ÕâЩ·½·¨²ÉÓÃÁËÉÏÏÂÎĶÔÏó(MapContextÀàÐÍ»òReduceContextÀàÐÍ)£¬½ø¶øÌṩÁ˶ÁÈ¡ÊäÈëÊý¾ÝºÍдÈëÊä³öÊý¾Ý£¬ÒÔ¼°Í¨¹ýJobConfÀàÀ´·ÃÎÊ×÷ÒµÅäÖÃÐÅÏ¢µÄ¹¦ÄÜ¡£±¾ÀýÖеĴ¦Àí¹ý³ÌÀàËÆÓÚJavaµÄ´¦Àí·½Ê½¡£

ÓëJava½Ó¿Ú²»Í¬£¬C++½Ó¿ÚÖеļüºÍÖµ°´×Ö½Ú»º³å£¬Óñê׼ģ°å¿â(Standard Template Library£¬STL)ÖеÄ×Ö·û´®±íʾ¡£ÕâÑù×ö¼ò»¯Á˽ӿڣ¬µ«°Ñ¸üÖØµÄ¸ºµ£Áô¸øÁËÓ¦ÓóÌÐò¿ª·¢ÈËÔ±£¬ÒòΪ¿ª·¢ÈËÔ±±ØÐëÀ´»Ø·âËÍ(marshall)×Ö·û´®ÓëÌØ¶¨Ó¦ÓÃÁìÓòÄÚʹÓõľßÌåÀàÐÍ¡£ÕâÒ»µãÔÚMapTemperatureReducerÖÐÓÐËùÌåÏÖ£¬ÎÒÃDZØÐë°ÑÊäÈëֵת»»ÎªÕûÐÍÖµ(ͨ¹ýHadoopUtilsÖж¨ÒåµÄ·½·¨)£¬È»ºó½«ÕÒµ½µÄ×î´óֵת»¯Îª×Ö·û´®ºóÔÙÊä³ö¡£ÔÚijЩÇé¿öÏ£¬ÎÒÃÇ¿ÉÒÔÊ¡ÂÔÕâÀàת»¯£¬ÈçMaxTemperatureMapper ÖеÄairTemperatureÖµÎÞÐèת»»ÎªÕûÐÍ£¬ÒòΪmap()·½·¨¾ø²»»á°ÑËüµ±×÷ÊýÖµÀàÐÍÀ´´¦Àí¡£

Õâ¸öÓ¦ÓóÌÐòµÄÈë¿ÚµãÊÇmain()·½·¨¡£Ëüµ÷ÓÃHadoopPipes::runTask£¬¸Ãº¯ÊýÁ¬½Óµ½Java¸¸½ø³Ì£¬²¢ÔÚmapperºÍreducerÖ®¼äÀ´»Ø·âËÍÊý¾Ý¡£runTask()·½·¨±»´«ÈëÒ»¸öFactory²ÎÊý£¬ÓÉ´Ë¿Éн¨mapper»òreducerʵÀý¡£´´½¨mapper»¹ÊÇreducer¿ÉÓÉJava¸¸½ø³Ìͨ¹ýÌ×½Ó×ÖÁ¬½Ó½øÐпØÖÆ¡£ÎÒÃÇ¿ÉÒÔÓÃÖØÔØÄ£°åfactoryÀ´ÉèÖÃcombiner¡¢partitioner¡¢record reader»òrecord writer¡£

±àÒëÔËÐÐ

ÏÖÔÚÎÒÃÇ¿ÉÒÔÓÃMakefile±àÒëÁ¬½Ó·¶Àý2-13ÖеijÌÐò¡£

·¶Àý2-13.MapReduce³ÌÐò(C++°æ)µÄMakefile

CC=g++ 
CPPFLAGS=-m32-I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

max_temperature:max_temperature.cpp
$(CC)$(CPPFLAGS)<preclass="brush:cpp">CC=g++
CPPFLAGS=-m32-I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

max_temperature:max_temperature.cpp
$(CC)$(CPPFLAGS)<preclass="cpp"name="code">CC=g++
CPPFLAGS=-m32-I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

max_temperature:max_temperature.cpp
$(CC)$(CPPFLAGS)<preclass="cpp"name="code">CC=g++
CPPFLAGS=-m32-I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

max_temperature:max_temperature.cpp
$(CC)$(CPPFLAGS)$<-Wall-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib
-lhadooppipes-lhadooputils-lpthread-g-O2-o$@</pre>lt;-Wall-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib
-lhadooppipes-lhadooputils-lpthread-g-O2-o$@</pre>lt;-Wall
-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib-lhadooppipes-lhadooputils-lpthread
-g-O2-o$@

ÔÚMakefileÖÐÐèÒªÉèÖÃÐí¶à»·¾³±äÁ¿¡£³ýÁËHADOOP_INSTALL±äÁ¿(Èç¹û×ñÑ­¸½Â¼A Öеݲװ˵Ã÷£¬Ó¦¸ÃÒѾ­ÉèÖúÃ)£¬»¹ÐèÒª¶¨ÒåPLATFORM£¬¸Ã±äÁ¿Ö¸¶¨Á˲Ù×÷ϵͳ¡¢Ìåϵ½á¹¹ºÍÊý¾ÝÄ£ÐÍ(ÀýÈ磬32 λ»ò64 λ)¡£ÎÒÔÚ32λLinuxϵͳµÄ»úÆ÷±àÒëÔËÐÐÁËÈçÏÂÄÚÈÝ£º

%exportPLATFORM=Linux-i386-32 
%make

³É¹¦±àÒëÖ®ºó£¬¿ÉÒÔÔÚµ±Ç°Ä¿Â¼ÖÐÕÒµ½max_temperature¿ÉÖ´ÐÐÎļþ¡£

ÎÒÃÇÐèÒªÒÔα·Ö²¼Ê½(pseudo_distrinuted)ģʽ(ÆäÖÐËùÓкǫ́½ø³ÌÔÚ±¾µØ¼ÆËã»úÉÏÔËÐÐ)ÔËÐÐHadoopÀ´ÔËÐÐPipes×÷Òµ£¬¾ßÌåÉèÖò½ÖèÇë²Î¼û¸½Â¼A¡£Pipes²»ÄÜÔÚ¶ÀÁ¢Ä£Ê½(±¾µØÔËÐÐ)ÏÂÔËÐУ¬ÒòΪËüÒÀÀµÓÚHadoopµÄ·Ö²¼Ê½»º´æ»úÖÆ£¬¶ø¸Ã»úÖÆÖ»ÓÐÔÚHDFS ÔËÐÐʱ²ÅÆð×÷Óá£

Hadoopºǫ́½ø³Ì¿ªÊ¼ÔËÐк󣬵ÚÒ»²½ÊǰѿÉÖ´ÐÐÎļþ¸´ÖƵ½HDFS£¬ÒÔ±ãÔÚÆô¶¯ mapºÍreduceÈÎÎñʱ£¬tasktrackerÄܹ»ÕÒµ½Õâ¸ö¿ÉÖ´ÐгÌÐò£º

%hadoopfs-putmax_temperaturebin/max_temperature 

ʾÀýÊý¾ÝͬÑùÒ²ÐèÒª´Ó±¾µØÎļþϵͳ¸´ÖƵ½HDFS¡£

ÏÖÔÚÎÒÃÇÓÃHadoop PipesÃüÁîÔËÐÐÕâ¸ö×÷Òµ£¬Ê¹ÓÃ-program²ÎÊýÀ´´«µÝÔÚHDFSÖпÉÖ´ÐÐÎļþµÄURI£º

%hadooppipes 
-Dhadoop.pipes.java.recordreader=true
-Dhadoop.pipes.java.recordwriter=true
-inputsample.txt
-outputoutput
-programbin/max_temperature

ÎÒÃÇʹÓÃ-D Ñ¡ÏîÀ´Ö¸¶¨Á½¸öÊôÐÔ£ºhadoop.pipes.java.recordreaderºÍhadoop.pipes.java.recordwriter£¬ÕâÁ½¸öÊôÐÔ¶¼±»ÉèÖÃΪtrue£¬±íʾÎÒÃDz¢²»Ö¸¶¨C++¼Ç¼¶ÁÈ¡º¯Êý»ò¼Ç¼дÈ뺯Êý£¬¶øÊÇʹÓÃĬÈϵÄJavaÉèÖÃ(ÓÃÀ´ÉèÖÃÎı¾ÊäÈëºÍÊä³ö)¡£Pipes»¹ÔÊÐíÎÒÃÇÉèÖÃÒ»¸öJava mapper¡¢reducer¡¢combiner»òpartitioner¡£ÊÂʵÉÏ£¬ÔÚÈκÎÒ»¸ö×÷ÒµÖУ¬¶¼¿ÉÒÔ»ìºÏʹÓÃJavaÀà»òC++Àà¡£

½á¹ûºÍ֮ǰÆäËûÓïÑÔ°æ±¾µÄ½á¹ûÒ»Ñù¡£

   
3523 ´Îä¯ÀÀ       31
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí