Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
»ùÓÚZookeeperµÄ·Ö²½Ê½¶ÓÁÐϵͳ¼¯³É°¸Àý
 
×÷ÕߣºÕŵ¤(Conan) À´Ô´£º·ÛË¿ÈÕÖ¾ ·¢²¼ÓÚ£º2015-1-4
  2715  次浏览      28
 

ǰÑÔ

Èí¼þϵͳ¼¯³ÉÒ»Ö±Êǹ¤Òµ½çµÄÒ»¸öÄÑÌ⣬Ïñ10ÄêÒÔÉϵÄÒÅÁôϵͳ¼¯³É£¬¹«Ë¾ÊÕ¹ººóµÄ¶àϵͳ¼¯³É£¬È«ÇòÐԵķֲ½Ê½ÏµÍ³¼¯³ÉµÈ¡£ËäÈ»»ùÓÚSOAµÄÈí¼þ¼Ü¹¹£¬´ÓÀíÂÛÉ϶¼¿ÉÒÔ½â¾öÕâЩ¼¯³ÉµÄÎÊÌ⣬µ«ÊǾßÌåʵʩ¹ý³Ì£¬ÓÐЩ¼¯³ÉÏîÄ¿¹ýÓÚ¸´ÔÓ¶øÊ§°Ü¡£

Ëæ×ż¼ÊõµÄ´´Ðºͷ¢Õ¹£¬¶ÔÓÚ·Ö²½Ê½¼¯ÈºÓ¦Óõɣ¬ÓÐÁ˸üºÃµÄ¿ªÔ´Èí¼þµÄÖ§³Ö£¬Ïñzookeeper¾ÍÊÇÒ»¸ö²»´íµÄ·Ö²½Ê½Ð­×÷Èí¼þƽ̨¡£±¾ÎĽ«Í¨¹ýÒ»¸ö°¸Àý½éÉÜZookeeperµÄÇ¿´ó¡£

1. ÏîÄ¿±³¾°£º·Ö²¼Ê½ÏûÏ¢Öмä¼þ

Ëæ×ÅHadoopµÄÆÕ¼°£¬Ô½À´Ô½¶àµÄ¹«Ë¾¿ªÊ¼¹¹½¨×Ô¼ºµÄHadoopϵͳ¡£ÓÐʱºò£¬¹«Ë¾ÄÚ²¿µÄ²»Í¬²¿ÃÅ»ò²»Í¬µÄÍŶӣ¬¶¼ÓÐ×Ô¼ºµÄHadoop¼¯Èº¡£ÕâÖֶ༯ȺµÄ·½Ê½£¬¼ÈÄÜÈÃÿ¸öÍŶÓÓµÓиöÐÔ»¯µÄHadoop£¬ÓÖÄܱÜÃâ´ó¼¯ÈºµÄ¸ß¶ÈÆäÖл¯ÔËάÄѶȡ£µ±Êý¾ÝÁ¿²»ÊÇÌØ±ð¾Þ´óµÄʱºò£¬Ð¡Ðͼ¯Èº»áÓкܶàÊÊÓõij¡ºÏ¡£

µ±È»£¬¶à¸öСÐͼ¯ÈºÒ²ÓÐȱµã£¬¾ÍÊÇ×ÊÔ´ÅäÖÿÉÄÜÔì³ÉÀË·Ñ¡£Ã¿¸öÍŶӵÄHadoop¼¯Èº£¬¶¼ÒªÅäÓзþÎñÆ÷ºÍÔËάÈËÔ±¡£ÓÐЩÄÜÁ¦Ç¿µÄÍŶӣ¬¹¹½¨µÄhadoop¼¯Èº£¬¿ÉÒÔ´ïµ½ÕæÕýµÄ¸öÐÔ»¯ÒªÇó£»¶øÓÐһЩÄÜÁ¦±È½Ï²îµÄÍŶӣ¬´î½¨µÄHadoop¼¯ÈºÐÔÄÜ»á±È½ÏÔã¸â¡£

»¹ÓÐһЩʱºò£¬¶à¸öÍŶÓÐèÒª¹²Í¬Íê³ÉÒ»¸öÈÎÎñ£¬±ÈÈ磬AÍŶÓͨ¹ýHadoop¼¯Èº¼ÆËãµÄ½á¹û£¬½»¸øBÍŶӼÌÐø¹¤×÷£¬BÍê³ÉÁË×Ô¼ºÈÎÎñÔÙ½»¸øCÍŶӼÌÐø×ö¡£Õâ¾ÍÓеãÏñÒµÎñϵͳµÄ¹¤×÷Á÷Ò»Ñù£¬Ò»»·Ò»»·µØ´«ÏÂÈ¥£¬Ö±µ½×îºóÒ»²¿·ÖÍê³É¡£

ÔÚÒµÎñϵͳÖУ¬ÎÒÃǾ­³£»áÓÃSOAµÄ¼Ü¹¹À´½â¾öÕâÖÖÎÊÌ⣬ÿ¸öÍŶÓÔÚESB·þÎñÆ÷Éϲ¿Êð×Ô¼ºµÄ·þÎñ£¬È»ºóͨ¹ýÏûÏ¢Öмä¼þÍê³Éµ÷¶ÈÈÎÎñ¡£¶ÔÓÚ·Ö²½Ê½µÄ¶à¸öHadoop¼¯ÈºÏµÍ³µÄЭ×÷£¬Í¬Ñù¿ÉÒÔÓÃÕâÖּܹ¹À´×ö£¬Ö»Òª°ÑÏûÏ¢Öмä¼þÒýÇæ»»³ÉÖ§³Ö·Ö²½Ê½µÄÏûÏ¢Öмä¼þµÄÒýÇæ¾ÍÐÐÁË¡£

Zookeeper¾Í¿ÉÒÔ×öΪ ·Ö²½Ê½ÏûÏ¢Öмä¼þ£¬À´Íê³ÉÉÏÃæµÄ˵µÄÒµÎñÐèÇó¡£ZooKeeperÊÇHadoop¼Ò×åµÄÒ»¿î¸ßÐÔÄܵķֲ¼Ê½Ð­×÷µÄ²úÆ·£¬ÊÇÒ»¸öΪ·Ö²¼Ê½Ó¦ÓÃËùÉè¼ÆµÄ·Ö²¼µÄ¡¢¿ªÔ´µÄЭµ÷·þÎñ£¬ËüÖ÷ÒªÊÇÓÃÀ´½â¾ö·Ö²¼Ê½Ó¦ÓÃÖо­³£Óöµ½µÄһЩÊý¾Ý¹ÜÀíÎÊÌ⣬¼ò»¯·Ö²¼Ê½Ó¦ÓÃЭµ÷¼°Æä¹ÜÀíµÄÄѶȣ¬Ìṩ¸ßÐÔÄܵķֲ¼Ê½·þÎñ¡£ZookeeperµÄ°²×°ºÍʹÓã¬Çë²Î¿¼ÎÄÕ ZooKeeperα·Ö²¼Ê½¼¯Èº°²×°¼°Ê¹Óá£

ZooKeeperÌṩ·Ö²¼Ê½Ð­×÷·þÎñ£¬²¢²»ÐèÒªÒÀÀµÓÚHadoopµÄ»·¾³¡£

2. ÐèÇó·ÖÎö£ºÒµÎñϵͳÉý¼¶·½°¸

ÏÂÃæÎÒ½«´ÓÒ»¸ö°¸Àý³ö·¢£¬À´½âÊÍÈçºÎ½øÐзֲ½Ê½Ð­×÷ƽ̨µÄϵͳÉè¼Æ¡£

2.1 °¸Àý½éÉÜ

ij´óÐÍÈí¼þ¹«Ë¾£¬´ÓÊÂÁìÓòΪ¹©Ó¦Á´¹ÜÀí£¬Ö÷ÒªÒµÎñ°üÀ¨ÁË ²É¹º¹ÜÀí¡¢Ó¦¸¶ÕË¿î¹ÜÀí¡¢Ó¦ÊÕÕË¿î¹ÜÀí¡¢¹©Ó¦ÉÌ·´¸´¹ÜÀí¡¢ÍË»õ¹ÜÀí¡¢ÏúÊÛ¹ÜÀí¡¢¿â´æ¹ÜÀí¡¢µç×ÓÉÌÎñ¡¢ÏµÍ³¼¯³ÉµÈ¡£

ÿ¿éÒµÎñµÄÂß¼­¶¼ºÜ¸´ÔÓ£¬Óɵ¥¶À²¿ÃŽøÐÐÈí¼þ¿ª·¢ºÍά»¤£¬²¿ÃÅÖ®¼äµÄϵͳûÓÐÖ±½ÓͨÐÅÐèÇó£¬Ã¿¸ö²¿ÃÅÍê³É×Ô¼ºµÄ¹¦ÄܾÍÐÐÁË£¬×îºóͨ¹ýÊý¾Ý¿âÀ´¹²ÏíÊý¾Ý£¬ÊµÏÖ¸÷¹¦ÄÜÖ®¼äµÄÊý¾Ý½»»»¡£

Ëæ×ÅÒµÎñµÄ·¢Õ¹£¬¿Í»§¶ÔÏìÓ¦ËÙ¶ÈÒªÇóÔ½À´Ô½¸ß£¬Í¨¹ýÊý¾Ý¿âÀ´¹²ÏíÊý¾ÝµÄ·½Ê½£¬ÒѾ­´ï²»µ½ÐÅÏ¢½»»»µÄÒªÇó£¬ÏµÍ³½øÐÐÁ˵ÚÒ»´ÎÉý¼¶£¬Í¨¹ýÆóÒµ·þÎñ×ÜÏß(ESB)ͳһ¹ÜÀí¹«Ë¾ÄÚ²¿ËùÓÐÒµÎñ¡£Í¨¹ýWebServices·¢²¼·þÎñ£¬Í¨¹ýMessage QueueʵÏÖÒµÎñ¹¦Äܵĵ÷¶È¡£

¹«Ë¾ÒµÎñ¹æÄ£¼ÌÐøÀ©´ó£¬¿ç¹úÊÕ¹ºÁ˶à¼Ò¹«Ë¾¡£ÒµÎñϵͳ´ÓÔ­À´µÄÒ»¸ö»ú·¿µÄ¼¯ÖÐʽ²¿Ê𣬱ä³ÉÁËÈ«ÇòÐԵĶà»ú·¿µÄ·Ö²½Ê½²¿Êð¡£Õâʱ£¬Message QueueÒѾ­²»ÄÜÂú×ã¶à»ú·¿¿çµØÓòµÄÒµÎñϵͳµÄ¹¦ÄÜÐèÇóÁË£¬ÐèÒªÒ»ÖÖ·Ö²½Ê½µÄÏûÏ¢Öмä¼þ½â¾ö·½°¸£¬À´´úÌæÔ­ÓÐÏûÏ¢Öмä¼þµÄ·þÎñ¡£

ϵͳ½øÐÐÁ˵ڶþ´ÎÉý¼¶£¬²ÉÓÃZookeeper×÷Ϊ·Ö²½Ê½Öмä¼þµ÷¶ÈÒýÇæ¡£

ͨ¹ýÉÏÃæµÄÃèÊö£¬ÎÒÃÇ¿ÉÒÔ¿´³ö£¬µ±Ò»¸ö¹«Ë¾´ÓСµ½´ó£¬´Ó¹úÄÚÒµÎñ·¢Õ¹µ½È«ÇòÐÔÒµÎñµÄʱºò¡£
ΪÁËÅäºÏÒµÎñ·¢Õ¹£¬ITϵͳҲÊÇÔ½À´Ô½¸´Ôӵ쬴Ó×îÔçµÄÖ÷´ÓÊý¾Ý¿âÉè¼Æ£¬µ½ESBÆóҵϵͳ×ÜÏßµÄÀ©Õ¹£¬ÔÙµ½·Ö²½Ê½ESBÅäºÏ·Ö²½Ê½ÏûϢϵͳ£¬Ã¿Ò»´ÎµÄÉý¼¶¶¼ÐèÒªÈí¼þ¼¼ÊõµÄÖ§³Å¡£

2.2 ¹¦ÄÜÐèÇó

È«ÇòÐԲɹºÒµÎñºÍÈ«ÇòÐÔÏúÊÛÒµÎñ£¬Èù«Ë¾ÔÚÊг¡Öд¦ÓÚ¾ºÕùÓÅÊÆ¡£µ«ÓÉÓڲɹººÍÏúÊÛ·Ö±ðÊÇÓɲ»Í¬²¿ÃŽøÐеÄÈí¼þ¿ª·¢ºÍά»¤£¬¶øÇÒÒµÎñÍùÀ´Ò²ÔÚ²»Í¬µÄ¹ú¼ÒºÍµØÇø¡£ËùÒÔÔÚÿÔµ׽áËãʱ£¬¹¤×÷Á¿¶¼Ìرð´ó¡£

±ÈÈ磬¼ÆËãÀûÈó±í (Çë²»Òª¾À½áÓÚ¹«Ê½µÄ׼ȷÐÔ)

µ±ÔÂÀûÈó = µ±ÔÂÏúÊÛ½ð¶î - µ±Ô²ɹº½ð¶î - µ±ÔÂÆäËûÖ§³ö
ÕâÑùÒ»¸ö·Ç³£¼òµ¥µÄ¼ÆË㹫ʽ£¬µ«¶ÔÓÚ¿ç¹ú¹«Ë¾ºÍ²¿ÃÅÀ´Ëµ£¬Ò»µãÒ²²»¼òµ¥µÄ¡£

´Óϵͳ½Ç¶ÈÀ´¿´£¬²É¹º²¿ÃÅҪͳ¼Æ²É¹ºÊý¾Ý(º£Á¿Êý¾Ý)£¬ÏúÊÛ²¿ÃÅͳ¼ÆÏúÊÛÊý¾Ý((º£Á¿Êý¾Ý)£¬ÆäËû²¿ÃÅͳ¼ÆµÄÆäËû·ÑÓÃÖ§³ö(»ã×ܵÄÉÙÁ¿Êý¾Ý)£¬×îºóϵͳ¼ÆËãµÃµ½µ±ÔµÄÀûÈó¡£

ÕâÀïҪ˵Ã÷µÄÊÇ£¬²É¹ºÏµÍ³Êǵ¥¶ÀµÄϵͳ£¬ÏúÊÛÊÇÁíÍâµ¥¶ÀµÄϵͳ£¬¼°ÒÔÆäËû¼¸Ê®¸ö´ó´óССµÄϵͳ£¬ÈçºÎÄÜÈöà¸öϵͳ£¬ÅäºÏÆðÀ´×öÕâµÀ¼ÆËãÌâÄØ£¿£¿

3. ¼Ü¹¹Éè¼Æ£º´î½¨ZookeeperµÄ·Ö²½Ê½Ð­×÷ƽ̨

½ÓÏÂÀ´£¬ÎÒÃÇ»ùÓÚzookeeperÀ´¹¹½¨Ò»¸ö·Ö²½Ê½¶ÓÁеÄÓ¦Óã¬À´½â¾öÉÏÃæµÄ¹¦ÄÜÐèÇó¡£ÏÂÃæÄÚÈÝ£¬ÅųýÁËESBµÄ²¿·Ö£¬Ö»±£Áôzookeeper½øÐÐʵÏÖ¡£

²É¹ºÊý¾Ý£¬Îªº£Á¿Êý¾Ý£¬»ùÓÚHadoop´æ´¢ºÍ·ÖÎö¡£

ÏúÊÛÊý¾Ý£¬Îªº£Á¿Êý¾Ý£¬»ùÓÚHadoop´æ´¢ºÍ·ÖÎö¡£

ÆäËû·ÑÓÃÖ§³ö£¬ÎªÉÙÁ¿Êý¾Ý£¬»ùÓÚÎļþ»òÊý¾Ý¿â´æ´¢ºÍ·ÖÎö¡£

ÎÒÃÇÉè¼ÆÒ»¸öͬ²½¶ÓÁУ¬Õâ¸ö¶ÓÁÐÓÐ3¸öÌõ¼þ½Úµã£¬·Ö±ð¶ÔÓ¦²É¹º(purchase)£¬ÏúÊÛ(sell)£¬ÆäËû·ÑÓÃ(other)3¸ö²¿·Ö¡£µ±3¸ö½Úµã¶¼±»´´½¨ºó£¬³ÌÐò»á×Ô¶¯´¥·¢¼ÆËãÀûÈ󣬲¢´´½¨ÀûÈó(profit)½Úµã¡£ÉÏÃæ3¸ö½ÚµãµÄ´´½¨£¬ÎÞ˳ÐòÒªÇó¡£Ã¿¸ö½ÚµãÖ»Äܱ»´´½¨Ò»´Î¡£

ϵͳ»·¾³

1.2¸ö¶ÀÁ¢µÄHadoop¼¯Èº

2.2¸ö¶ÀÁ¢µÄJavaÓ¦ÓÃ

3.3¸öZookeeper¼¯Èº½Úµã

ͼ±ê½âÊÍ£º

1.Hadoop App1,Hadoop App2 ÊÇ2¸ö¶ÀÁ¢µÄHadoop¼¯ÈºÓ¦ÓÃ

2.Java App3,Java App4 ÊÇ2¸ö¶ÀÁ¢µÄJavaÓ¦ÓÃ

3.zk1,zk2,zk3ÊÇZooKeeper¼¯ÈºµÄ3¸öÁ¬½Óµã

4./queue£¬ÊÇznodeµÄ¶ÓÁÐĿ¼£¬¼ÙÉè¶ÓÁ㤶ÈΪ3

5./queue/purchase£¬ÊÇznode¶ÓÁÐÖУ¬1ºÅÅŶÔÕߣ¬ÓÉHadoop App1Ìá½»£¬ÓÃÓÚͳ¼Æ²É¹º½ð¶î¡£

6./queue/sell£¬ÊÇznode¶ÓÁÐÖУ¬2ºÅÅŶÔÕߣ¬ÓÉHadoop App2Ìá½»£¬ÓÃÓÚͳ¼ÆÏúÊÛ½ð¶î¡£

7./queue/other£¬ÊÇznode¶ÓÁÐÖУ¬3ºÅÅŶÔÕߣ¬ÓÉJava App3Ìá½»£¬ÓÃÓÚͳ¼ÆÆäËû·ÑÓÃÖ§³ö½ð¶î¡£

8./queue/profit£¬µ±znode¶ÓÁÐÖÐÂúÁË£¬´¥·¢´´½¨ÀûÈó½Úµã¡£

9.µ±/qeueu/profit±»´´½¨ºó£¬app4±»Æô¶¯£¬ËùÓÐzkµÄÁ¬½Ó֪ͨͬ²½³ÌÐò(ºìÉ«Ïß)£¬¶ÓÁÐÒÑÍê³É£¬ËùÓгÌÐò½áÊø¡£

²¹³ä˵Ã÷£º

1.´´½¨/queue/purchase,/queue/sell,/queue/otherĿ¼ʱ£¬Ã»ÓÐǰºó˳Ðò£¬³ÌÐòÌá½»ºó£¬/queueĿ¼Ï»áÉú³É¶ÔÓ¦¸Ã×ÓĿ¼App1¿ÉÒÔͨ¹ýzk2Ìá½»£¬App2Ò²¿Éͨ¹ýzk3Ìá½»¡£Ô­ÔòÉÏ£¬ÕÒ×î½ü·ÓÉ×î½üµÄznode½ÚµãÌá½»¡£

2.ÿ¸öÓ¦Óò»ÄÜÖØ¸´Ìá³ö£¬Ö±µ½3¸öÈÎÎñ¶¼Ìá½»£¬¼ÆËãÀûÈóµÄÈÎÎñ²Å»á±»Ö´ÐС£

3./queue/profit±»´´½¨ºó£¬zkµÄÓ¦Óûá¼àÌýµ½Õâ¸öʼþ£¬Í¨ÖªÓ¦Ó㬶ÓÁÐÒÑÍê³É£¡

4.ÕâÀïµÄͬ²½¶ÓÁеļܹ¹¸üÏêϸµÄÉè¼ÆË¼Â·£¬Çë²Î¿¼ÎÄÕ ZooKeeperʵÏÖ·Ö²¼Ê½¶ÓÁÐQueue

4. ³ÌÐò¿ª·¢£º»ùÓÚZookeeperµÄ³ÌÐòÉè¼Æ

×îÖյŦÄÜÐèÇ󣺼ÆËã2013Äê01ÔµÄÀûÈó¡£

4.1 ʵÑé»·¾³

ÔÚÕæÕýÆóÒµ¿ª·¢Ê±£¬ÎÒÃǵÄʵÑé»·¾³Ó¦¸ÃÓëÐèÇóÊÇÒ»Öµģ¬µ«ÎÒµÄÓ²¼þÌõ¼þÓÐÏÞ£¬ÒòЩ×öÁËÒ»¸ö¼ò»¯µÄ»·¾³ÉèÖá£

1.°ÑzookeeperµÄÍêÈ«·Ö²½Ê½²¿ÊðµÄ3̨·þÎñÆ÷¼¯Èº½ÚµãµÄ£¬¸ÄΪһ̨·þÎñÆ÷ÉÏ3¸ö¼¯Èº½Úµã¡£

2.°Ñ2¸ö¶ÀÁ¢Hadoop¼¯Èº£¬¸ÄΪһ¸ö¼¯ÈºµÄ2¸ö¶ÀÁ¢µÄMapReduceÈÎÎñ¡£

¿ª·¢»·¾³£º

Win7 64bit
JDK 1.6
Maven3
Juno Service Release 2
IP£º192.168.1.10

Zookeeper·þÎñÆ÷»·¾³£º

Linux Ubuntu 12.04 LTS 64bit
Java 1.6.0_29
Zookeeper: 3.4.5
IP: 192.168.1.201
3¸ö¼¯Èº½Úµã

Hadoop·þÎñÆ÷»·¾³£º

Linux Ubuntu 12.04 LTS 64bit
Java 1.6.0_29
Hadoop: 1.0.3
IP: 192.168.1.210

4.2 ʵÑéÊý¾Ý

3×éʵÑéÊý¾Ý£º

²É¹ºÊý¾Ý£¬purchase.csv
ÏúÊÛÊý¾Ý£¬sell.csv
ÆäËû·ÑÓÃÊý¾Ý£¬other.csv

4.2.1 ²É¹ºÊý¾Ý¼¯

Ò»¹²4ÁУ¬·Ö±ð¶ÔÓ¦ ²úÆ·ID,²úÆ·ÊýÁ¿,²úÆ·µ¥¼Û,²É¹ºÈÕÆÚ¡£

1,26,1168,2013-01-08
2,49,779,2013-02-12
3,80,850,2013-02-05
4,69,1585,2013-01-26
5,88,1052,2013-01-13
6,84,2363,2013-01-19
7,64,1410,2013-01-12
8,53,910,2013-01-11
9,21,1661,2013-01-19
10,53,2426,2013-02-18
11,64,2022,2013-01-07
12,36,2941,2013-01-28
13,99,3819,2013-01-19
14,64,2563,2013-02-16
15,91,752,2013-02-05
16,65,750,2013-02-04
17,19,2426,2013-02-23
18,19,724,2013-02-05
19,87,137,2013-01-25
20,86,2939,2013-01-14
21,92,159,2013-01-23
22,81,2331,2013-03-01
23,88,998,2013-01-20
24,38,102,2013-02-22
25,32,4813,2013-01-13
26,36,1671,2013-01-19

4.2.2 ÏúÊÛÊý¾Ý¼¯

Ò»¹²4ÁУ¬·Ö±ð¶ÔÓ¦ ²úÆ·ID,ÏúÊÛÊýÁ¿,ÏúÊÛµ¥¼Û,ÏúÊÛÈÕÆÚ¡£

1,14,1236,2013-01-14
2,19,808,2013-03-06
3,26,886,2013-02-23
4,23,1793,2013-02-09
5,27,1206,2013-01-21
6,27,2648,2013-01-30
7,22,1502,2013-01-19
8,20,1050,2013-01-18
9,13,1778,2013-01-30
10,20,2718,2013-03-14
11,22,2175,2013-01-12
12,16,3284,2013-02-12
13,30,4152,2013-01-30
14,22,2770,2013-03-11
15,28,778,2013-02-23
16,22,874,2013-02-22
17,12,2718,2013-03-22
18,12,747,2013-02-23
19,27,172,2013-02-07
20,27,3282,2013-01-22
21,28,224,2013-02-05
22,26,2613,2013-03-30
23,27,1147,2013-01-31
24,16,141,2013-03-20
25,15,5343,2013-01-21
26,16,1887,2013-01-30
27,12,2535,2013-01-12
28,16,469,2013-01-07
29,29,2395,2013-03-30
30,17,1549,2013-01-30
31,25,4173,2013-03-17

4.2.3 ÆäËû·ÑÓÃÊý¾Ý¼¯

Ò»¹²2ÁУ¬·Ö±ð¶ÔÓ¦ ·¢ÉúÈÕÆÚ£¬·¢Éú½ð¶î

2013-01-02,552
2013-01-03,1092
2013-01-04,1794
2013-01-05,435
2013-01-06,960
2013-01-07,1066
2013-01-08,1354
2013-01-09,880
2013-01-10,1992
2013-01-11,931
2013-01-12,1209
2013-01-13,1491
2013-01-14,804
2013-01-15,480
2013-01-16,1891
2013-01-17,156
2013-01-18,1439
2013-01-19,1018
2013-01-20,1506
2013-01-21,1216
2013-01-22,2045
2013-01-23,400
2013-01-24,1795
2013-01-25,1977
2013-01-26,1002
2013-01-27,226
2013-01-28,1239
2013-01-29,702
2013-01-30,1396

4.3 ³ÌÐòÉè¼Æ

ÎÒÃÇÒª±àд5¸öÎļþ£º

1.¼ÆËã²É¹º½ð¶î£¬Purchase.java

2.¼ÆËãÏúÊÛ½ð¶î£¬Sell.java

3.¼ÆËãÆäËû·ÑÓýð¶î£¬Other.java

4.¼ÆËãÀûÈó£¬Profit.java

5.ZookeeperµÄµ÷¶È£¬ZookeeperJob.java

4.3.1 ¼ÆËã²É¹º½ð¶î

²É¹º½ð¶î£¬ÊÇ»ùÓÚHadoopµÄMapReduceͳ¼Æ¼ÆËã¡£

public class Purchase {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1ÔµÄÊý¾Ý
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//µ¥¼Û*ÊýÁ¿
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        // ³õʼ»¯purchase
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Purchase.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop¼¯ÈºµÄÔ¶³ÌÅäÖÃÐÅÏ¢
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("purchase", "logfile/biz/purchase.csv");// ±¾µØµÄÊý¾ÝÎļþ
        path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFSµÄĿ¼
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // Êä³öĿ¼
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.2 ¼ÆËãÏúÊÛ½ð¶î

ÏúÊÛ½ð¶î£¬ÊÇ»ùÓÚHadoopµÄMapReduceͳ¼Æ¼ÆËã¡£

public class Sell {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1ÔµÄÊý¾Ý
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//µ¥¼Û*ÊýÁ¿
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // ³õʼ»¯sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Sell.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop¼¯ÈºµÄÔ¶³ÌÅäÖÃÐÅÏ¢
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("sell", "logfile/biz/sell.csv");// ±¾µØµÄÊý¾ÝÎļþ
        path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFSµÄĿ¼
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // Êä³öĿ¼
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.3 ¼ÆËãÆäËû·ÑÓýð¶î

ÆäËû·ÑÓýð¶î£¬ÊÇ»ùÓÚ±¾µØÎļþµÄͳ¼Æ¼ÆËã¡£

public class Other {

    public static String file = "logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2013-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));

        String s = null;
        while ((s = br.readLine()) != null) {
            // System.out.println(s);
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) {// 1ÔµÄÊý¾Ý
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();

        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

4.3.4 ¼ÆËãÀûÈó

ÀûÈó£¬Í¨¹ýzookeeper·Ö²½Ê½×Ô¶¯µ÷¶È¼ÆËãÀûÈó¡£

public class Profit {

    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }

}

4.3.5 Zookeeperµ÷¶È

µ÷¶È£¬Í¨¹ý¹¹½¨·Ö²½Ê½¶ÓÁÐϵͳ£¬×Ô¶¯»¯³ÌÐò´úÌæÈ˹¤²Ù×÷¡£

public class ZooKeeperJob {

    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            doPurchase(zk);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            doSell(zk);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            doOther(zk);
            break;
        }
    }

    // ´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // ¼à¿ØËùÓб»´¥·¢µÄʼþ
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);

        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {

            Purchase.run(Purchase.path());

            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {

            Sell.run(Sell.path());

            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {

            Other.calcOther(Other.file);

            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List children = zk.getChildren(QUEUE, true);
        int length = children.size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            for (String child : children) {// Çå¿Õ½Úµã
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

5. ÔËÐгÌÐò

×îºó£¬ÎÒÃÇÔËÐÐÕû¸öµÄ³ÌÐò£¬°üÀ¨3¸ö²¿·Ö¡£

zookeeper·þÎñÆ÷

hadoop·þÎñÆ÷

·Ö²½Ê½¶ÓÁÐÓ¦ÓÃ

5.1 Æô¶¯zookeeper·þÎñ

Æô¶¯zookeeper·þÎñÆ÷¼¯Èº£º

~ cd toolkit/zookeeper345

# Æô¶¯zk¼¯Èº3¸ö½Úµã
~ bin/zkServer.sh start conf/zk1.cfg
~ bin/zkServer.sh start conf/zk2.cfg
~ bin/zkServer.sh start conf/zk3.cfg

~ jps
4234 QuorumPeerMain
5002 Jps
4275 QuorumPeerMain
4207 QuorumPeerMain

²é¿´zookeeper¼¯ÈºÖУ¬¸÷½ÚµãµÄ״̬

# ²é¿´zk1½Úµã״̬
~ bin/zkServer.sh status conf/zk1.cfg
JMX enabled by default
Using config: conf/zk1.cfg
Mode: follower

# ²é¿´zk2½Úµã״̬£¬zk2Ϊleader
~ bin/zkServer.sh status conf/zk2.cfg
JMX enabled by default
Using config: conf/zk2.cfg
Mode: leader

# ²é¿´zk3½Úµã״̬
~ bin/zkServer.sh status conf/zk3.cfg
JMX enabled by default
Using config: conf/zk3.cfg
Mode: follower

Æô¶¯zookeeper¿Í»§¶Ë£º

~ bin/zkCli.sh -server 192.168.1.201:2181

# ²é¿´zk
[zk: 192.168.1.201:2181(CONNECTED) 0] ls /
[queue, queue-fifo, zookeeper]

# /queue·¾¶ÎÞ×ÓĿ¼
[zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue
[]

5.2 Æô¶¯Hadoop·þÎñ

~ hadoop/hadoop-1.0.3
~ bin/start-all.sh

~ jps
25979 JobTracker
26257 TaskTracker
25576 DataNode
25300 NameNode
12116 Jps
25875 SecondaryNameNode

5.3 Æô¶¯·Ö²½Ê½¶ÓÁÐZookeeperJob

5.3.1 Æô¶¯Í³¼Æ²É¹ºÊý¾Ý³ÌÐò£¬ÉèÖÃÆô¶¯²ÎÊý1

Ö»ÏÔʾÓû§ÈÕÖ¾£¬ºöÂÔϵͳÈÕÖ¾¡£

WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Output:2013-01,9609887
create /queue/purchase
Queue Complete:1/3

ÔÚzkÖв鿴queueĿ¼

[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue
[purchase]

5.3.2 Æô¶¯Í³¼ÆÏúÊÛÊý¾Ý³ÌÐò£¬ÉèÖÃÆô¶¯²ÎÊý2

Ö»ÏÔʾÓû§ÈÕÖ¾£¬ºöÂÔϵͳÈÕÖ¾¡£

WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Output:2013-01,2950315
create /queue/sell
Queue Complete:2/3

ÔÚzkÖв鿴queueĿ¼

[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[purchase, sell]

5.3.3 Æô¶¯Í³¼ÆÆäËû·ÑÓÃÊý¾Ý³ÌÐò£¬ÉèÖÃÆô¶¯²ÎÊý3

Ö»ÏÔʾÓû§ÈÕÖ¾£¬ºöÂÔϵͳÈÕÖ¾¡£

WATCH => /queue/profit
/queue is exist!
Output:2013-01,34193
create /queue/other
Queue Complete:3/3
create /queue/profit
cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000
2950315

cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000
9609887

Output:2013-01,34193
profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765
Queue has Completed!!!

ÔÚzkÖв鿴queueĿ¼

[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[profit]

ÔÚ×îºóÒ»²½£¬Í³¼ÆÆäËû·ÑÓÃÊý¾Ý³ÌÐòÔËÐк󣬴ÓÈÕÖ¾Öп´µ½3¸öÌõ¼þ½Úµã¶¼ÒÑÂú×ãÒªÇó¡£È»ºó£¬Í¨¹ýͬ²½µÄ·Ö²½Ê½¶ÓÁÐ×Ô¶¯Æô¶¯Á˼ÆËãÀûÈóµÄ³ÌÐò£¬²¢ÔÚÈÕÖ¾ÖдòÓ¡ÁË2013Äê1ÔµÄÀûÈóΪ-6693765¡£

   
2715 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí