×î½üÀûÓÃÏÐϾʱ¼ä£¬ÓÖÖØÐÂÑжÁÁËÒ»ÏÂStorm¡£ÈÏÕæ¶Ô±ÈÁËÒ»ÏÂHadoop£¬Ç°Õ߸üÉó¤µÄÊÇ£¬ÊµÊ±Á÷ʽÊý¾Ý´¦Àí£¬ºóÕ߸üÉó¤µÄÊÇ»ùÓÚHDFS£¬Í¨¹ýMapReduce·½Ê½µÄÀëÏßÊý¾Ý·ÖÎö¼ÆËã¡£¶ÔÓÚHadoop£¬±¾Éí²»Éó¤ÊµÊ±µÄÊý¾Ý·ÖÎö´¦Àí¡£Á½ÕߵĹ²Í¬µã¶¼ÊÇ·Ö²¼Ê½µÄ¼Ü¹¹£¬¶øÇÒ£¬¶¼ÀàËÆÓÐÖ÷/´Ó¹ØÏµµÄ¸ÅÄî¡£±¾ÎÄÖÐÎҾͲ»¾ßÌå²ûÊöStorm¼¯ÈººÍZookeeper¼¯ÈºÈçºÎ²¿ÊðµÄÎÊÌ⣬ÎÒÏëͨ¹ýÒ»¸öʵ¼ÊµÄ°¸ÀýÇÐÈ룬·ÖÎöÒ»ÏÂÈçºÎÀûÓÃStorm£¬Íê³Éʵʱ·ÖÎö´¦ÀíÊý¾ÝµÄ¡£
Storm±¾ÉíÊÇApacheÍйܵĿªÔ´µÄ·Ö²¼Ê½ÊµÊ±¼ÆËãϵͳ£¬ËüµÄǰÉíÊÇTwitter Storm¡£ÔÚStormÎÊÊÀÒÔǰ£¬´¦Àíº£Á¿µÄʵʱÊý¾ÝÐÅÏ¢£¬´ó²¿·ÖÊÇÀàËÆÓÚʹÓÃÏûÏ¢¶ÓÁУ¬¼ÓÉϹ¤×÷½ø³Ì/Ï̵߳ķ½Ê½¡£ÕâʹµÃ¹¹½¨ÕâÀàµÄÓ¦ÓóÌÐò£¬±äµÃÒì³£µÄ¸´ÔÓ¡£ºÜ¶àµÄÒµÎñÂß¼ÖУ¬Äã²»µÃ²»¿¼ÂÇÏûÏ¢µÄ·¢ËͺͽÓÊÕ£¬Ïß³ÌÖ®¼äµÄ²¢·¢¿ØÖƵȵÈÎÊÌâ¡£¶øÆäÖеÄÒµÎñÂß¼¿ÉÄÜÖ»ÊÇÕ¼¾ÝÕû¸öÓ¦ÓõÄһС²¿·Ö£¬¶øÇÒºÜÄÑ×öµ½ÒµÎñÂß¼µÄ½âñî¡£µ«ÊÇStormµÄ³öÏָıäÁËÕâÖÖ¾ÖÃæ£¬ËüÊ×ÏȳéÏó³öÊý¾ÝÁ÷StreamµÄ³éÏó¸ÅÄһ¸öStreamÖ¸µÄÊÇtuples×é³ÉµÄÎޱ߽çµÄÐòÁС£ºóÃæÓÖ¼ÌÐøÌá³öSpouts¡¢BoltsµÄ¸ÅÄî¡£SpoutsÔÚStormÀïÃæÊÇÊý¾ÝÔ´£¬×¨ÃŸºÔðÉú³ÉÁ÷¡£¶øBoltsÔòÊÇÒÔÁ÷×÷ΪÊäÈ룬²¢ÖØÐÂÉú³ÉÁ÷×÷ΪÊä³ö£¬²¢ÇÒBolts»¹»á¼ÌÐøÖ¸¶¨ËüÊäÈëµÄÁ÷Ó¦¸ÃÈçºÎ»®·Ö¡£×îºóStormÊÇͨ¹ýÍØÆË£¨Topology£©ÕâÖÖ³éÏó¸ÅÄ×éÖ¯ÆðÈô¸É¸öSpouts¡¢Bolts¹¹³ÉµÄ·Ö²¼Ê½Êý¾Ý´¦ÀíÍøÂç¡£StormÉè¼ÆµÄʱºò£¬¾ÍÓÐÒâµÄ°ÑSpouts¡¢Bolts×é³ÉµÄÍØÆË£¨Topology£©ÍøÂçͨ¹ýThrift·þÎñ·½Ê½½øÐзâ×°£¬Õâ¸ö×ö·¨£¬Ê¹µÃStormµÄSpouts¡¢Bolts×é¼þ¿ÉÒÔͨ¹ýĿǰÖ÷Á÷µÄÈÎÒâÓïÑÔʵÏÖ£¬Ê¹µÃÕû¸ö¿ò¼ÜµÄ¼æÈÝÐÔºÍÀ©Õ¹ÐÔ¸ü¼ÓµÄÓÅÐã¡£
ÔÚStormÀïÃæÍØÆË£¨Topology£©µÄ¸ÅÄ·Ç³£ÀàËÆHadoopÀïÃæMapReduceµÄJobµÄ¸ÅÄî¡£²»Í¬µÄÊÇStormµÄÍØÆË£¨Topology£©Ö»ÒªÄãÆô¶¯ÁË£¬Ëü¾Í»áÒ»Ö±ÔËÐÐÏÂÈ¥£¬³ý·ÇÄãkillµô£»¶øMapReduceµÄJob×îÖÕËüÊÇ»á½áÊøµÄ¡£»ùÓÚÕâÑùµÄģʽ£¬Ê¹µÃStorm·Ç³£Êʺϴ¦ÀíʵʱÐÔµÄÊý¾Ý·ÖÎö£¬³ÖÐø¼ÆË㣬DRPC£¨·Ö²¼Ê½RPC£©µÈ¡£
ºÃÁË£¬ÎҾͽáºÏʵ¼ÊµÄ°¸Àý£¬Éè¼Æ·ÖÎöһϣ¬ÈçºÎÀûÓÃStorm¸ÄÉÆÓ¦ÓõĴ¦ÀíÐÔÄÜ¡£
ÒÆ¶¯¹«Ë¾µÄÀ¬»ø¶ÌÐÅ¼à¿ØÆ½Ì¨£¬ÊµÊ±µØÉÏ´«Ã¿¸öÊ¡µÄÒÉËÆÀ¬»ø¶ÌÐÅÓû§µÄÀ¬»ø¶ÌÐÅÄÚÈÝÎļþ£¬Ã¿¸öÊ¡Ôò¸ù¾ÝÎļþÖÐÀ¬»ø¶ÌÐŵÄÄÚÈÝ£¬½âÎö¹ýÂ˳ö£¬°üº¬Ö¸¶¨Ãô¸Ð¹Ø¼ü×ÖµÄÀ¬»ø¶ÌÐŽøÐÐÈë¿â¡£±»Èë¿âµÄÀ¬»ø¶ÌÐÅÓû§±»ÁÐΪÃô¸ÐÓû§£¬ÊÇÖØµã¼à¿Ø¶ÔÏ󣬱Ͼ¹ÂÒ·¢ÕâЩÀ¬»ø¶ÌÐÅÊǷdz£²»¶ÔµÄ¡£À¬»ø¶ÌÐÅ¼à¿ØÆ½Ì¨Éú³ÉµÄÎļþËٶȷdz£¾ªÈË£¬ÔÀ´µÄ´«Í³×ö·¨ÊÇ£¬¸ù¾Ýÿ¸öÊ¡µÄÿһ¸öµØÊУ¬¶ÔÓ¦Ò»¸ö¶ÀÁ¢Ó¦Ó㬴®Ðл¯µØ½âÎö¡¢¹ýÂËÃô¸Ð¹Ø¼ü×Ö£¬À´½øÐÐÈë¿â´¦Àí¡£µ«ÊÇ£¬´ÓÏÖ×´À´¿´£¬³ÌÐò´¦ÀíµÄÐÔÄܲ¢²»¸ßЧ£¬³£³£Ôì³ÉÎļþ»ýѹ£¬Ã»Óм°Ê±´¦ÀíÈë¿â¡£
ÏÖÔÚ£¬ÎÒÃǾÍͨ¹ýStorm£¬À´ÖØÐÂÊáÀí¡¢×éÖ¯Ò»ÏÂÉÏÊöµÄÓ¦Óó¡¾°¡£
Ê×ÏÈ£¬ÎÒÏÈ˵Ã÷һϣ¬¸Ã°¸ÀýÖУ¬Storm¼¯ÈººÍZookeeper¼¯ÈºµÄ²¿ÊðÇé¿ö£¬ÈçÏÂͼËùʾ£º
mbus¶ÔÓ¦µÄÖ÷»úÊÇ192.168.95.134ÊÇStormÖ÷½Úµã£¬ÆäÓàÁ½Ì¨´Ó½ÚµãSupervisor¶ÔÓ¦µÄÖ÷»ú·Ö±ðÊÇ192.168.95.135£¨Ö÷»úÃû£ºslave1£©¡¢192.168.95.136£¨Ö÷»úÃû£ºslave2£©¡£Í¬ÑùµÄ£¬Zookeeper¼¯ÈºÒ²ÊDz¿ÊðÔÚÉÏÊö½ÚµãÉÏ¡£Storm¼¯ÈººÍZookeeper¼¯Èº»á»¥ÏàͨÐÅ£¬ÒòΪStorm¾ÍÊÇ»ùÓÚZookeeperµÄ¡£È»ºóÏÈÆô¶¯Ã¿¸ö½ÚµãµÄZookeeper·þÎñ£¬Æä´Î·Ö±ðÆô¶¯StormµÄNimbus¡¢Supervisor·þÎñ¡£¾ßÌå¿ÉÒÔµ½Storm°²×°µÄbinĿ¼ÏÂÃæÆô¶¯·þÎñ£¬Æô¶¯ÃüÁî·Ö±ðΪstorm
nimbus > /dev/null 2 > &1 &ºÍstorm supervisor
> /dev/null 2 > &1 &¡£È»ºóÓÃjps¹Û²ìÆô¶¯µÄЧ¹û¡£Ã»ÓÐÎÊÌâµÄ»°£¬ÔÚNimbus·þÎñ¶ÔÓ¦µÄÖ÷»úÉÏÆô¶¯Storm
UI¼à¿Ø¶ÔÓ¦µÄ·þÎñ£¬ÔÚStorm°²×°Ä¿Â¼µÄbinĿ¼ÊäÈëÃüÁstorm ui >/dev/null
2>&1 &¡£È»ºó´ò¿ªä¯ÀÀÆ÷ÊäÈ룺http://{Nimbus·þÎñ¶ÔÓ¦µÄÖ÷»úip}:8080£¬ÕâÀï¾ÍÊÇÊäÈ룺http://192.168.95.134:8080/¡£¹Û²ìStorm¼¯ÈºµÄ²¿ÊðÇé¿ö£¬ÈçÏÂͼËùʾ£º
¿ÉÒÔ·¢ÏÖ£¬ÎÒÃǵÄStormµÄ°æ±¾ÊÇ0.9.5£¬ËüµÄ´Ó½Úµã£¨Supervisor£©ÓÐ2¸ö£¬·Ö±ðÊÇslave1¡¢slave2¡£Ò»¹²µÄwokerµÄÊýÁ¿ÊÇ8¸ö£¨Total
slots£©¡£Storm¼¯ÈºÎÒÃÇÒѾ²¿ÊðÍê±Ï£¬Ò²Æô¶¯³É¹¦ÁË¡£ÏÖÔÚÎÒÃǾÍÀûÓÃStormµÄ·½Ê½£¬À´ÖØÐ¸ÄдһÏÂÕâÖÖÃô¸ÐÐÅϢʵʱ¼à¿Ø¹ýÂ˵ÄÓ¦Óá£Ê×ÏÈ¿´Ï£¬Storm·½Ê½µÄÍØÆË½á¹¹Í¼£º
¡¡ÆäÖеÄSensitiveFileReader-591¡¢SensitiveFileReader-592£¨Óû§¶ÌÐŲɼ¯Æ÷£¬·ÖµØÊУ©´ú±íµÄÊÇStormÖеÄSpouts×é¼þ£¬±íʾһ¸öÊý¾ÝµÄÔ´Í·£¬ÕâÀïÊDZíʾ´Ó·þÎñÆ÷µÄÖ¸¶¨Ä¿Â¼Ï£¬¶ÁÈ¡ÒÉËÆÀ¬»ø¶ÌÐÅÓû§µÄÀ¬»ø¶ÌÐÅÄÚÈÝÎļþ¡£µ±È»SpoutsµÄ×é¼þÄã¿ÉÒÔ¸ù¾Ýʵ¼ÊµÄÐèÇó£¬À©Õ¹³öÐí¶àSpouts¡£
È»ºó¶ÁÈ¡³öÎļþÖÐÿһÐеÄÄÚÈÝÖ®ºó£¬¾ÍÊÇ·ÖÎöÎļþµÄÄÚÈÝ×é¼þÁË£¬ÕâÀïÊÇÖ¸£ºSensitiveFileAnalyzer£¨¼à¿Ø¶ÌÐÅÄÚÈݲð½â·ÖÎö£©£¬Ëü¸ºÔð·ÖÎö³öÎļþµÄ¸ñʽÄÚÈÝ¡£
ΪÁ˼òµ¥ÑÝʾÆð¼û£¬ÎÒÕâÀﶨÒåÎļþµÄ¸ñʽΪÈçÏÂÄÚÈÝ£¨Ëæ±ãдһ¸öÀý×Ó£©£ºhome_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1¡£Ã¿¸öÁÐÖ®¼äÓÃ&½øÐÐÁ¬½Ó¡£ÆäÖÐhome_city=591±íʾÒÉËÆÀ¬»ø¶ÌÐŵÄÓû§¹éÊôµØÊбàÂ룬591±íʾ¸£ÖÝ¡¢592±íʾÏÃÃÅ£»user_id=5911000±íʾÒÉËÆÀ¬»ø¶ÌÐŵÄÓû§±êʶ£»msisdn=10000±íʾÒÉËÆÀ¬»ø¶ÌÐŵÄÓû§ÊÖ»úºÅÂ룻sms_content=abc-slave1´ú±íµÄ¾ÍÊÇÀ¬»ø¶ÌÐŵÄÄÚÈÝÁË¡£SensitiveFileAnalyzer´ú±íµÄ¾ÍÊÇStormÖеÄBolt×é¼þ£¬ÓÃÀ´´¦ÀíSpouts¡°Á÷¡±³öµÄÊý¾Ý¡£
×îºó£¬¾ÍÊÇÎÒÃǸù¾Ý½âÎöºÃµÄÊý¾Ý£¬Æ¥ÅäÒµÎñ¹æ¶¨µÄÃô¸Ð¹Ø¼ü×Ö£¬½øÐйýÂËÈë¿âÁË¡£ÕâÀïÎÒÃÇÊǰѹýÂ˺õÄÊý¾Ý´æÈëMySQLÊý¾Ý¿âÖС£¸ºÔðÕâÏîÈÎÎñµÄ×é¼þÊÇ£ºSensitiveBatchBolt£¨Ãô¸ÐÐÅÏ¢²É¼¯´¦Àí£©£¬µ±È»ËüÒ²ÊÇStormÖеÄBolt×é¼þ¡£ºÃÁË£¬ÒÔÉϾÍÊÇÍêÕûµÄStormÍØÆË£¨Topology£©½á¹¹ÁË¡£
ÏÖÔÚ£¬ÎÒÃǶÔÓÚÕû¸öÃô¸ÐÐÅÏ¢²É¼¯¹ýÂË¼à¿ØµÄÍØÆË½á¹¹£¬ÓÐÁËÒ»¸öÕûÌåµÄÁ˽âÖ®ºó£¬ÎÒÃÇÔÙÀ´¿´ÏÂÈçºÎ¾ßÌå±àÂëʵÏÖ£¡ÏÈÀ´¿´ÏÂÕû¸ö¹¤³ÌµÄ´úÂë²ã´Î½á¹¹£¬ËüÈçÏÂͼËùʾ£º
¡¡¡¡Ê×ÏÈÀ´¿´Ï£¬ÎÒÃǶ¨ÒåµÄÃô¸ÐÓû§µÄÊý¾Ý½á¹¹RubbishUsers£¬¼ÙÉ裬ÎÒÃÇÒª¹ýÂ˵ÄÃô¸ÐÓû§µÄ¶ÌÐÅÄÚÈÝÖУ¬Òª°üº¬¡°racketeer¡±¡¢¡°Bad¡±µÈÃô¸Ð¹Ø¼ü×Ö¡£¾ßÌå´úÂëÈçÏ£º
×î½üÀûÓÃÏÐϾʱ¼ä£¬ÓÖÖØÐÂÑжÁÁËÒ»ÏÂStorm¡£ÈÏÕæ¶Ô±ÈÁËÒ»ÏÂHadoop£¬Ç°Õ߸üÉó¤µÄÊÇ£¬ÊµÊ±Á÷ʽÊý¾Ý´¦Àí£¬ºóÕ߸üÉó¤µÄÊÇ»ùÓÚHDFS£¬Í¨¹ýMapReduce·½Ê½µÄÀëÏßÊý¾Ý·ÖÎö¼ÆËã¡£¶ÔÓÚHadoop£¬±¾Éí²»Éó¤ÊµÊ±µÄÊý¾Ý·ÖÎö´¦Àí¡£Á½ÕߵĹ²Í¬µã¶¼ÊÇ·Ö²¼Ê½µÄ¼Ü¹¹£¬¶øÇÒ£¬¶¼ÀàËÆÓÐÖ÷/´Ó¹ØÏµµÄ¸ÅÄî¡£±¾ÎÄÖÐÎҾͲ»¾ßÌå²ûÊöStorm¼¯ÈººÍZookeeper¼¯ÈºÈçºÎ²¿ÊðµÄÎÊÌ⣬ÎÒÏëͨ¹ýÒ»¸öʵ¼ÊµÄ°¸ÀýÇÐÈ룬·ÖÎöÒ»ÏÂÈçºÎÀûÓÃStorm£¬Íê³Éʵʱ·ÖÎö´¦ÀíÊý¾ÝµÄ¡£
Storm±¾ÉíÊÇApacheÍйܵĿªÔ´µÄ·Ö²¼Ê½ÊµÊ±¼ÆËãϵͳ£¬ËüµÄǰÉíÊÇTwitter Storm¡£ÔÚStormÎÊÊÀÒÔǰ£¬´¦Àíº£Á¿µÄʵʱÊý¾ÝÐÅÏ¢£¬´ó²¿·ÖÊÇÀàËÆÓÚʹÓÃÏûÏ¢¶ÓÁУ¬¼ÓÉϹ¤×÷½ø³Ì/Ï̵߳ķ½Ê½¡£ÕâʹµÃ¹¹½¨ÕâÀàµÄÓ¦ÓóÌÐò£¬±äµÃÒì³£µÄ¸´ÔÓ¡£ºÜ¶àµÄÒµÎñÂß¼ÖУ¬Äã²»µÃ²»¿¼ÂÇÏûÏ¢µÄ·¢ËͺͽÓÊÕ£¬Ïß³ÌÖ®¼äµÄ²¢·¢¿ØÖƵȵÈÎÊÌâ¡£¶øÆäÖеÄÒµÎñÂß¼¿ÉÄÜÖ»ÊÇÕ¼¾ÝÕû¸öÓ¦ÓõÄһС²¿·Ö£¬¶øÇÒºÜÄÑ×öµ½ÒµÎñÂß¼µÄ½âñî¡£µ«ÊÇStormµÄ³öÏָıäÁËÕâÖÖ¾ÖÃæ£¬ËüÊ×ÏȳéÏó³öÊý¾ÝÁ÷StreamµÄ³éÏó¸ÅÄһ¸öStreamÖ¸µÄÊÇtuples×é³ÉµÄÎޱ߽çµÄÐòÁС£ºóÃæÓÖ¼ÌÐøÌá³öSpouts¡¢BoltsµÄ¸ÅÄî¡£SpoutsÔÚStormÀïÃæÊÇÊý¾ÝÔ´£¬×¨ÃŸºÔðÉú³ÉÁ÷¡£¶øBoltsÔòÊÇÒÔÁ÷×÷ΪÊäÈ룬²¢ÖØÐÂÉú³ÉÁ÷×÷ΪÊä³ö£¬²¢ÇÒBolts»¹»á¼ÌÐøÖ¸¶¨ËüÊäÈëµÄÁ÷Ó¦¸ÃÈçºÎ»®·Ö¡£×îºóStormÊÇͨ¹ýÍØÆË£¨Topology£©ÕâÖÖ³éÏó¸ÅÄ×éÖ¯ÆðÈô¸É¸öSpouts¡¢Bolts¹¹³ÉµÄ·Ö²¼Ê½Êý¾Ý´¦ÀíÍøÂç¡£StormÉè¼ÆµÄʱºò£¬¾ÍÓÐÒâµÄ°ÑSpouts¡¢Bolts×é³ÉµÄÍØÆË£¨Topology£©ÍøÂçͨ¹ýThrift·þÎñ·½Ê½½øÐзâ×°£¬Õâ¸ö×ö·¨£¬Ê¹µÃStormµÄSpouts¡¢Bolts×é¼þ¿ÉÒÔͨ¹ýĿǰÖ÷Á÷µÄÈÎÒâÓïÑÔʵÏÖ£¬Ê¹µÃÕû¸ö¿ò¼ÜµÄ¼æÈÝÐÔºÍÀ©Õ¹ÐÔ¸ü¼ÓµÄÓÅÐã¡£
ÔÚStormÀïÃæÍØÆË£¨Topology£©µÄ¸ÅÄ·Ç³£ÀàËÆHadoopÀïÃæMapReduceµÄJobµÄ¸ÅÄî¡£²»Í¬µÄÊÇStormµÄÍØÆË£¨Topology£©Ö»ÒªÄãÆô¶¯ÁË£¬Ëü¾Í»áÒ»Ö±ÔËÐÐÏÂÈ¥£¬³ý·ÇÄãkillµô£»¶øMapReduceµÄJob×îÖÕËüÊÇ»á½áÊøµÄ¡£»ùÓÚÕâÑùµÄģʽ£¬Ê¹µÃStorm·Ç³£Êʺϴ¦ÀíʵʱÐÔµÄÊý¾Ý·ÖÎö£¬³ÖÐø¼ÆË㣬DRPC£¨·Ö²¼Ê½RPC£©µÈ¡£
ºÃÁË£¬ÎҾͽáºÏʵ¼ÊµÄ°¸Àý£¬Éè¼Æ·ÖÎöһϣ¬ÈçºÎÀûÓÃStorm¸ÄÉÆÓ¦ÓõĴ¦ÀíÐÔÄÜ¡£
ÒÆ¶¯¹«Ë¾µÄÀ¬»ø¶ÌÐÅ¼à¿ØÆ½Ì¨£¬ÊµÊ±µØÉÏ´«Ã¿¸öÊ¡µÄÒÉËÆÀ¬»ø¶ÌÐÅÓû§µÄÀ¬»ø¶ÌÐÅÄÚÈÝÎļþ£¬Ã¿¸öÊ¡Ôò¸ù¾ÝÎļþÖÐÀ¬»ø¶ÌÐŵÄÄÚÈÝ£¬½âÎö¹ýÂ˳ö£¬°üº¬Ö¸¶¨Ãô¸Ð¹Ø¼ü×ÖµÄÀ¬»ø¶ÌÐŽøÐÐÈë¿â¡£±»Èë¿âµÄÀ¬»ø¶ÌÐÅÓû§±»ÁÐΪÃô¸ÐÓû§£¬ÊÇÖØµã¼à¿Ø¶ÔÏ󣬱Ͼ¹ÂÒ·¢ÕâЩÀ¬»ø¶ÌÐÅÊǷdz£²»¶ÔµÄ¡£À¬»ø¶ÌÐÅ¼à¿ØÆ½Ì¨Éú³ÉµÄÎļþËٶȷdz£¾ªÈË£¬ÔÀ´µÄ´«Í³×ö·¨ÊÇ£¬¸ù¾Ýÿ¸öÊ¡µÄÿһ¸öµØÊУ¬¶ÔÓ¦Ò»¸ö¶ÀÁ¢Ó¦Ó㬴®Ðл¯µØ½âÎö¡¢¹ýÂËÃô¸Ð¹Ø¼ü×Ö£¬À´½øÐÐÈë¿â´¦Àí¡£µ«ÊÇ£¬´ÓÏÖ×´À´¿´£¬³ÌÐò´¦ÀíµÄÐÔÄܲ¢²»¸ßЧ£¬³£³£Ôì³ÉÎļþ»ýѹ£¬Ã»Óм°Ê±´¦ÀíÈë¿â¡£
ÏÖÔÚ£¬ÎÒÃǾÍͨ¹ýStorm£¬À´ÖØÐÂÊáÀí¡¢×éÖ¯Ò»ÏÂÉÏÊöµÄÓ¦Óó¡¾°¡£
Ê×ÏÈ£¬ÎÒÏÈ˵Ã÷һϣ¬¸Ã°¸ÀýÖУ¬Storm¼¯ÈººÍZookeeper¼¯ÈºµÄ²¿ÊðÇé¿ö£¬ÈçÏÂͼËùʾ£º
¡¡¡¡Nimbus¶ÔÓ¦µÄÖ÷»úÊÇ192.168.95.134ÊÇStormÖ÷½Úµã£¬ÆäÓàÁ½Ì¨´Ó½ÚµãSupervisor¶ÔÓ¦µÄÖ÷»ú·Ö±ðÊÇ192.168.95.135£¨Ö÷»úÃû£ºslave1£©¡¢192.168.95.136£¨Ö÷»úÃû£ºslave2£©¡£Í¬ÑùµÄ£¬Zookeeper¼¯ÈºÒ²ÊDz¿ÊðÔÚÉÏÊö½ÚµãÉÏ¡£Storm¼¯ÈººÍZookeeper¼¯Èº»á»¥ÏàͨÐÅ£¬ÒòΪStorm¾ÍÊÇ»ùÓÚZookeeperµÄ¡£È»ºóÏÈÆô¶¯Ã¿¸ö½ÚµãµÄZookeeper·þÎñ£¬Æä´Î·Ö±ðÆô¶¯StormµÄNimbus¡¢Supervisor·þÎñ¡£¾ßÌå¿ÉÒÔµ½Storm°²×°µÄbinĿ¼ÏÂÃæÆô¶¯·þÎñ£¬Æô¶¯ÃüÁî·Ö±ðΪstorm
nimbus > /dev/null 2 > &1 &ºÍstorm supervisor
> /dev/null 2 > &1 &¡£È»ºóÓÃjps¹Û²ìÆô¶¯µÄЧ¹û¡£Ã»ÓÐÎÊÌâµÄ»°£¬ÔÚNimbus·þÎñ¶ÔÓ¦µÄÖ÷»úÉÏÆô¶¯Storm
UI¼à¿Ø¶ÔÓ¦µÄ·þÎñ£¬ÔÚStorm°²×°Ä¿Â¼µÄbinĿ¼ÊäÈëÃüÁstorm ui >/dev/null
2>&1 &¡£È»ºó´ò¿ªä¯ÀÀÆ÷ÊäÈ룺http://{Nimbus·þÎñ¶ÔÓ¦µÄÖ÷»úip}:8080£¬ÕâÀï¾ÍÊÇÊäÈ룺http://192.168.95.134:8080/¡£¹Û²ìStorm¼¯ÈºµÄ²¿ÊðÇé¿ö£¬ÈçÏÂͼËùʾ£º

¿ÉÒÔ·¢ÏÖ£¬ÎÒÃǵÄStormµÄ°æ±¾ÊÇ0.9.5£¬ËüµÄ´Ó½Úµã£¨Supervisor£©ÓÐ2¸ö£¬·Ö±ðÊÇslave1¡¢slave2¡£Ò»¹²µÄwokerµÄÊýÁ¿ÊÇ8¸ö£¨Total
slots£©¡£Storm¼¯ÈºÎÒÃÇÒѾ²¿ÊðÍê±Ï£¬Ò²Æô¶¯³É¹¦ÁË¡£ÏÖÔÚÎÒÃǾÍÀûÓÃStormµÄ·½Ê½£¬À´ÖØÐ¸ÄдһÏÂÕâÖÖÃô¸ÐÐÅϢʵʱ¼à¿Ø¹ýÂ˵ÄÓ¦Óá£Ê×ÏÈ¿´Ï£¬Storm·½Ê½µÄÍØÆË½á¹¹Í¼£º

ÆäÖеÄSensitiveFileReader-591¡¢SensitiveFileReader-592£¨Óû§¶ÌÐŲɼ¯Æ÷£¬·ÖµØÊУ©´ú±íµÄÊÇStormÖеÄSpouts×é¼þ£¬±íʾһ¸öÊý¾ÝµÄÔ´Í·£¬ÕâÀïÊDZíʾ´Ó·þÎñÆ÷µÄÖ¸¶¨Ä¿Â¼Ï£¬¶ÁÈ¡ÒÉËÆÀ¬»ø¶ÌÐÅÓû§µÄÀ¬»ø¶ÌÐÅÄÚÈÝÎļþ¡£µ±È»SpoutsµÄ×é¼þÄã¿ÉÒÔ¸ù¾Ýʵ¼ÊµÄÐèÇó£¬À©Õ¹³öÐí¶àSpouts¡£
È»ºó¶ÁÈ¡³öÎļþÖÐÿһÐеÄÄÚÈÝÖ®ºó£¬¾ÍÊÇ·ÖÎöÎļþµÄÄÚÈÝ×é¼þÁË£¬ÕâÀïÊÇÖ¸£ºSensitiveFileAnalyzer£¨¼à¿Ø¶ÌÐÅÄÚÈݲð½â·ÖÎö£©£¬Ëü¸ºÔð·ÖÎö³öÎļþµÄ¸ñʽÄÚÈÝ¡£
ΪÁ˼òµ¥ÑÝʾÆð¼û£¬ÎÒÕâÀﶨÒåÎļþµÄ¸ñʽΪÈçÏÂÄÚÈÝ£¨Ëæ±ãдһ¸öÀý×Ó£©£ºhome_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1¡£Ã¿¸öÁÐÖ®¼äÓÃ&½øÐÐÁ¬½Ó¡£ÆäÖÐhome_city=591±íʾÒÉËÆÀ¬»ø¶ÌÐŵÄÓû§¹éÊôµØÊбàÂ룬591±íʾ¸£ÖÝ¡¢592±íʾÏÃÃÅ£»user_id=5911000±íʾÒÉËÆÀ¬»ø¶ÌÐŵÄÓû§±êʶ£»msisdn=10000±íʾÒÉËÆÀ¬»ø¶ÌÐŵÄÓû§ÊÖ»úºÅÂ룻sms_content=abc-slave1´ú±íµÄ¾ÍÊÇÀ¬»ø¶ÌÐŵÄÄÚÈÝÁË¡£SensitiveFileAnalyzer´ú±íµÄ¾ÍÊÇStormÖеÄBolt×é¼þ£¬ÓÃÀ´´¦ÀíSpouts¡°Á÷¡±³öµÄÊý¾Ý¡£
×îºó£¬¾ÍÊÇÎÒÃǸù¾Ý½âÎöºÃµÄÊý¾Ý£¬Æ¥ÅäÒµÎñ¹æ¶¨µÄÃô¸Ð¹Ø¼ü×Ö£¬½øÐйýÂËÈë¿âÁË¡£ÕâÀïÎÒÃÇÊǰѹýÂ˺õÄÊý¾Ý´æÈëMySQLÊý¾Ý¿âÖС£¸ºÔðÕâÏîÈÎÎñµÄ×é¼þÊÇ£ºSensitiveBatchBolt£¨Ãô¸ÐÐÅÏ¢²É¼¯´¦Àí£©£¬µ±È»ËüÒ²ÊÇStormÖеÄBolt×é¼þ¡£ºÃÁË£¬ÒÔÉϾÍÊÇÍêÕûµÄStormÍØÆË£¨Topology£©½á¹¹ÁË¡£
ÏÖÔÚ£¬ÎÒÃǶÔÓÚÕû¸öÃô¸ÐÐÅÏ¢²É¼¯¹ýÂË¼à¿ØµÄÍØÆË½á¹¹£¬ÓÐÁËÒ»¸öÕûÌåµÄÁ˽âÖ®ºó£¬ÎÒÃÇÔÙÀ´¿´ÏÂÈçºÎ¾ßÌå±àÂëʵÏÖ£¡ÏÈÀ´¿´ÏÂÕû¸ö¹¤³ÌµÄ´úÂë²ã´Î½á¹¹£¬ËüÈçÏÂͼËùʾ£º

¡¡¡¡Ê×ÏÈÀ´¿´Ï£¬ÎÒÃǶ¨ÒåµÄÃô¸ÐÓû§µÄÊý¾Ý½á¹¹RubbishUsers£¬¼ÙÉ裬ÎÒÃÇÒª¹ýÂ˵ÄÃô¸ÐÓû§µÄ¶ÌÐÅÄÚÈÝÖУ¬Òª°üº¬¡°racketeer¡±¡¢¡°Bad¡±µÈÃô¸Ð¹Ø¼ü×Ö¡£¾ßÌå´úÂëÈçÏ£º
/** * @filename:RubbishUsers.java * * Newland Co. Ltd. All rights reserved. * * @Description:Ãô¸ÐÓû§ÊµÌ嶨Òå * @author tangjie * @version 1.0 * */
package newlandframework.storm.model;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import java.io.Serializable;
public class RubbishUsers implements Serializable
{
// Óû§¹éÊôµØÊбàÂë
private Integer homeCity;
// Óû§±àÂë
private Integer userId;
// Óû§ºÅÂë
private Integer msisdn;
// ¶ÌÐÅÄÚÈÝ
String smsContent;
public final static String HOMECITY_COLUMNNAME
= "home_city";
public final static String USERID_COLUMNNAME =
"user_id";
public final static String MSISDN_COLUMNNAME =
"msisdn";
public final static String SMSCONTENT_COLUMNNAME
= "sms_content";
public final static Integer[] SENSITIVE_HOMECITYS
= new Integer[] {
591/* ¸£ÖÝ */, 592 /* ÏÃÃÅ */};
// Ãô¸Ð¹Ø¼ü×Ö,ºóÐø¿ÉÒÔ¿¼Âǵ¥¶À¿ª±Ù·ÅÈ뻺´æ»òÊý¾Ý¿âÖÐ,ÕâÀï½ö½öΪÁËDemoÑÝʾ
public final static String SENSITIVE_KEYWORD1
= "Bad";
public final static String SENSITIVE_KEYWORD2
= "racketeer";
public final static String[] SENSITIVE_KEYWORDS
= new String[] {
SENSITIVE_KEYWORD1, SENSITIVE_KEYWORD2 };
public Integer getHomeCity() {
return homeCity;
}
public void setHomeCity(Integer homeCity) {
this.homeCity = homeCity;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Integer getMsisdn() {
return msisdn;
}
public void setMsisdn(Integer msisdn) {
this.msisdn = msisdn;
}
public String getSmsContent() {
return smsContent;
}
public void setSmsContent(String smsContent)
{
this.smsContent = smsContent;
}
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("homeCity", homeCity).append("userId",
userId)
.append("msisdn", msisdn).append("smsContent",
smsContent)
.toString();
}
}
|
¡¡ÏÖÔÚ£¬ÎÒÃÇ¿´ÏÂÃô¸ÐÐÅÏ¢Êý¾ÝÔ´×é¼þSensitiveFileReaderµÄ¾ßÌåʵÏÖ£¬Ëü¸ºÔð´Ó·þÎñÆ÷µÄÖ¸¶¨Ä¿Â¼ÏÂÃæ£¬¶ÁÈ¡ÒÉËÆÀ¬»ø¶ÌÐÅÓû§µÄÀ¬»ø¶ÌÐÅÄÚÈÝÎļþ£¬È»ºó°ÑÿһÐеÄÊý¾Ý£¬·¢Ë͸øÏÂÒ»¸ö´¦ÀíµÄBolt£¨SensitiveFileAnalyzer£©£¬Ã¿¸öÎļþÈ«²¿·¢ËͽáÊøÖ®ºó£¬ÔÚµ±Ç°Ä¿Â¼ÖУ¬°ÑÔÎļþÖØÃüÃû³Éºó׺bakµÄÎļþ£¨µ±È»£¬Äã¿ÉÒÔÖØÐ½¨Á¢Ò»¸ö±¸·ÝĿ¼£¬×¨ÃÅÓÃÀ´´æ´¢ÕâÖÖ´¦Àí½áÊøµÄÎļþ£©£¬SensitiveFileReaderµÄ¾ßÌåʵÏÖÈçÏ£º
/** * @filename:SensitiveFileReader.java * * Newland Co. Ltd. All rights reserved. * * @Description:Óû§¶ÌÐŲɼ¯Æ÷ * @author tangjie * @version 1.0 * */
package newlandframework.storm.spout;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SensitiveFileReader extends BaseRichSpout
{
// ¸£ÖݵØÊÐÓû§Ãô¸Ð¶ÌÐÅÎļþÉÏ´«Â·¾¶
public static final String InputFuZhouPath = "/home/tj/data/591";
// ÏÃÃŵØÊÐÓû§Ãô¸Ð¶ÌÐÅÎļþÉÏ´«Â·¾¶
public static final String InputXiaMenPath = "/home/tj/data/592";
// ´¦Àí³É¹¦¸Ä³Ébakºó׺
public static final String FinishFileSuffix =
".bak";
private String sensitiveFilePath = "";
private SpoutOutputCollector collector;
public SensitiveFileReader(String sensitiveFilePath)
{
this.sensitiveFilePath = sensitiveFilePath;
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
Collection<File> files = FileUtils.listFiles(
new File(sensitiveFilePath),
FileFilterUtils.notFileFilter(FileFilterUtils
.suffixFileFilter(FinishFileSuffix)), null);
for (File f : files) {
try {
List<String> lines = FileUtils.readLines(f,
"GBK");
for (String line : lines) {
System.out.println("[SensitiveTrace]:"
+ line);
collector.emit(new Values(line));
}
FileUtils.moveFile(f,
new File(f.getPath() + System.currentTimeMillis()
+ FinishFileSuffix));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields("sensitive"));
}
}
|
¼à¿Ø¶ÌÐÅÄÚÈݲð½â·ÖÎöÆ÷SensitiveFileAnalyzer£¬Õâ¸öBolt×é¼þ£¬½ÓÊÕµ½Êý¾ÝÔ´SensitiveFileReaderµÄÊý¾ÝÖ®ºó£¬¾Í°´ÕÕÉÏÃæ¶¨ÒåµÄ¸ñʽ£¬¶ÔÎļþÖÐÿһÐеÄÄÚÈݽøÐнâÎö£¬È»ºó°Ñ½âÎöÍê±ÏµÄÄÚÈÝ£¬¼ÌÐø·¢Ë͸øÏÂÒ»¸öBolt×é¼þ£ºSensitiveBatchBolt£¨Ãô¸ÐÐÅÏ¢²É¼¯´¦Àí£©¡£ÏÖÔÚ£¬ÎÒÃÇÀ´¿´ÏÂSensitiveFileAnalyzerÕâ¸öBolt×é¼þµÄʵÏÖ£º
/** * @filename:SensitiveFileAnalyzer.java * * Newland Co. Ltd. All rights reserved. * * @Description:¼à¿Ø¶ÌÐÅÄÚÈݲð½â·ÖÎö * @author tangjie * @version 1.0 * */
package newlandframework.storm.bolt;
import java.util.Map;
import newlandframework.storm.model.RubbishUsers;
import org.apache.storm.guava.base.Splitter;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SensitiveFileAnalyzer extends BaseBasicBolt
{
@Override
public void execute(Tuple input, BasicOutputCollector
collector) {
String line = input.getString(0);
Map<String, String> join = Splitter.on("&").withKeyValueSeparator("=").split(line);
collector.emit(new Values((String) join
.get(RubbishUsers.HOMECITY_COLUMNNAME), (String)
join
.get(RubbishUsers.USERID_COLUMNNAME), (String)
join
.get(RubbishUsers.MSISDN_COLUMNNAME), (String)
join
.get(RubbishUsers.SMSCONTENT_COLUMNNAME)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields(RubbishUsers.HOMECITY_COLUMNNAME,
RubbishUsers.USERID_COLUMNNAME, RubbishUsers.MSISDN_COLUMNNAME,
RubbishUsers.SMSCONTENT_COLUMNNAME));
}
}
|
×îºóÒ»¸öBolt×é¼þSensitiveBatchBolt£¨Ãô¸ÐÐÅÏ¢²É¼¯´¦Àí£©¸ù¾ÝÉÏÓÎBolt×é¼þSensitiveFileAnalyzer·¢Ë͹ýÀ´µÄÊý¾Ý£¬È»ºó¸úÒµÎñ¹æ¶¨µÄÃô¸Ð¹Ø¼ü×Ö½øÐÐÆ¥Å䣬Èç¹ûÆ¥Åä³É¹¦£¬ËµÃ÷Õâ¸öÓû§£¬¾ÍÊÇÎÒÃÇÒªÖØµã¼à¿ØµÄÓû§£¬ÎÒÃǰÑËû£¬Í¨¹ýhibernate²É¼¯µ½MySQLÊý¾Ý¿â£¬Í³Ò»¹ÜÀí¡£×îºóҪ˵Ã÷µÄÊÇ£¬SensitiveBatchBolt×é¼þ»¹ÊµÏÖÁËÒ»¸ö¼à¿ØµÄ¹¦ÄÜ£¬¾ÍÊǶ¨ÆÚ´òÓ¡³ö£¬ÎÒÃÇÒѾ²É¼¯µ½µÄÃô¸ÐÐÅÏ¢Óû§Êý¾Ý¡£ÏÖÔÚ¸ø³öSensitiveBatchBoltµÄʵÏÖ£º
/** * @filename:SensitiveBatchBolt.java * * Newland Co. Ltd. All rights reserved. * * @Description:Ãô¸ÐÐÅÏ¢²É¼¯´¦Àí * @author tangjie * @version 1.0 * */
package newlandframework.storm.bolt;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.iterators.FilterIterator;
import org.apache.commons.lang.StringUtils;
import org.hibernate.Criteria;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.criterion.MatchMode;
import org.hibernate.criterion.Restrictions;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import newlandframework.storm.model.RubbishUsers;
public class SensitiveBatchBolt implements IBasicBolt
{
// HibernateÅäÖüÓÔØ
private final static String HIBERNATE_APPLICATIONCONTEXT
= "newlandframework/storm/resource/jdbc-hibernate-bean.xml";
// Spring¡¢HibernateÉÏÏÂÎIJ»ÒªÐòÁл¯
private static transient ApplicationContext hibernate
= new ClassPathXmlApplicationContext(
HIBERNATE_APPLICATIONCONTEXT);
private static transient SessionFactory sessionFactory
= (SessionFactory) hibernate
.getBean("sessionFactory");
public SensitiveBatchBolt() throws SQLException
{
super();
}
private static List list = new ArrayList(Arrays.asList(RubbishUsers.SENSITIVE_KEYWORDS));
// Ãô¸ÐÐÅÏ¢Êý¾ÝÔ´,¿ÉÒÔ¿¼ÂÇ·ÅÈ뻺´æ»òÕßÊý¾Ý¿âÖмÓÔØÅжÏ
private class SensitivePredicate implements Predicate
{
private String sensitiveWord = null;
SensitivePredicate(String sensitiveWord) {
this.sensitiveWord = sensitiveWord;
}
public boolean evaluate(Object object) {
return this.sensitiveWord.contains((String) object);
}
}
// MonitorÏ̶߳¨ÆÚ´òÓ¡¼à¿Ø²É¼¯´¦ÀíÇé¿ö
class SensitiveMonitorThread implements Runnable
{
private int sensitiveMonitorTimeInterval = 0;
private Session session = null;
SensitiveMonitorThread(int sensitiveMonitorTimeInterval)
{
this.sensitiveMonitorTimeInterval = sensitiveMonitorTimeInterval;
session = sessionFactory.openSession();
}
public void run() {
while (true) {
try {
Criteria criteria1 = session.createCriteria(RubbishUsers.class);
criteria1.add(Restrictions.and(Restrictions.or(Restrictions
.like("smsContent", StringUtils
.center(RubbishUsers.SENSITIVE_KEYWORD1,
RubbishUsers.SENSITIVE_KEYWORD1
.length() + 2, "%"),
MatchMode.ANYWHERE), Restrictions.like(
"smsContent", StringUtils
.center(RubbishUsers.SENSITIVE_KEYWORD2,
RubbishUsers.SENSITIVE_KEYWORD2
.length() + 2, "%"),
MatchMode.ANYWHERE)), Restrictions.in("homeCity",
RubbishUsers.SENSITIVE_HOMECITYS)));
List<RubbishUsers> rubbishList = (List<RubbishUsers>)
criteria1.list();
System.out.println(StringUtils.center("[SensitiveTrace
Ãô¸ÐÓû§Çåµ¥ÈçÏÂ]", 40, "-"));
if (rubbishList != null) {
System.out.println("[SensitiveTrace Ãô¸ÐÓû§ÊýÁ¿]:"
+ rubbishList.size());
for (RubbishUsers rubbish : rubbishList) {
System.out.println(rubbish + rubbish.getSmsContent());
}
} else {
System.out.println("[SensitiveTrace Ãô¸ÐÓû§ÊýÁ¿]:0");
}
} catch (HibernateException e) {
e.printStackTrace();
}
try {
Thread.sleep(sensitiveMonitorTimeInterval * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// ·Ö²¼Ê½»·¾³ÏÂÃæµÄҪͬ²½¿ØÖÆ
private synchronized void save(Tuple input) {
Session session = sessionFactory.openSession();
try {
RubbishUsers users = new RubbishUsers();
users.setUserId(Integer.parseInt(input
.getStringByField(RubbishUsers.USERID_COLUMNNAME)));
users.setHomeCity(Integer.parseInt(input
.getStringByField(RubbishUsers.HOMECITY_COLUMNNAME)));
users.setMsisdn(Integer.parseInt(input
.getStringByField(RubbishUsers.MSISDN_COLUMNNAME)));
users.setSmsContent(input
.getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME));
Predicate isSensitiveFileAnalysis = new SensitivePredicate(
(String) input.getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME));
FilterIterator iterator = new FilterIterator(list.iterator(),isSensitiveFileAnalysis);
if (iterator.hasNext()) {
session.beginTransaction();
// Èë¿âMySQL
session.save(users);
session.getTransaction().commit();
}
} catch (HibernateException e) {
e.printStackTrace();
session.getTransaction().rollback();
} finally {
session.close();
}
}
// ºÜ¶àÇé¿öÏÂÃæstormÔËÐÐÆÚÖ´Ðб¨´í£¬¶¼ÊÇÓÉÓÚexecuteÓÐÒì³£µ¼Öµģ¬Öصã¹Û²ìexecuteµÄº¯ÊýÂß¼
// ×î¾³£±¨´íµÄÇé¿öÊDZ¨¸æ£ºERROR backtype.storm.daemon.executor
- java.lang.RuntimeException:java.lang.NullPointerException
// backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java
...)
// ÀàËÆÕâÑùµÄ´íÎó£¬ÓеãĪÃûÆäÃ¿ªÊ¼¶¼ÔËÐеĺÜÕý³££¬ºóÃæºöÈ»¾Í±¨¿ÕÖ¸ÕëÒì³£ÁË£¬ÎÒ¿ªÊ¼ÒÔΪÊÇstorm²¿ÊðµÄÎÊÌ⣬
// ºóÃæjstack¸ú×Ù·¢ÏÖ£¬Ö÷Òª»¹ÊÇexecuteÂß¼µÄÎÊÌ⣬ËùÒÔÓöµ½ÕâÀàµÄÎÊÌâ²»ÒªÊÖæ½ÅÂÒ£¬Êʵ±½áºÏjstack¸ú×Ù¶¨Î»
@Override
public void execute(Tuple input, BasicOutputCollector
collector) {
save(input);
}
public Map<String, Object> getComponentConfiguration()
{
return null;
}
@Override
public void prepare(Map stormConf, TopologyContext
context) {
final int sensitiveMonitorTimeInterval = Integer.parseInt(stormConf
.get("RUBBISHMONITOR_INTERVAL").toString());
SensitiveMonitorThread montor = new SensitiveMonitorThread(
sensitiveMonitorTimeInterval);
new Thread(montor).start();
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer
arg0) {
// TODO Auto-generated method stub
}
}
|
ÓÉÓÚÊÇͨ¹ýhibernateÈë¿âµ½MySQL£¬ËùÒÔ¸ø³öhibernateÅäÖã¬Ê×ÏÈÊÇ£ºhibernate.cfg.xml
<?xml version="1.0" encoding="utf-8"?> <!DOCTYPE hibernate-configuration PUBLIC "-//Hibernate/Hibernate Configuration DTD 3.0//EN" "http://hibernate.sourceforge.net/hibernate-configuration-3.0.dtd"> <hibernate-configuration> <session-factory> <property name="hibernate.bytecode.use_reflection_optimizer">false</property> <property name="hibernate.dialect">org.hibernate.dialect.MySQLDialect</property> <property name="show_sql">true</property> <mapping resource="newlandframework/storm/resource/rubbish-users.hbm.xml"/> </session-factory> </hibernate-configuration>
|
¶ÔÓ¦µÄORMÓ³ÉäÅäÖÃÎļþrubbish-users.hbm.xmlÄÚÈÝÈçÏ£º
<?xml version="1.0"?> <!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN" "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"> <hibernate-mapping> <class name="newlandframework.storm.model.RubbishUsers" table="rubbish_users" catalog="ccs"> <id name="userId" type="java.lang.Integer"> <column name="user_id"/> <generator class="assigned"/> </id> <property name="homeCity" type="java.lang.Integer"> <column name="home_city" not-null="true"/> </property> <property name="msisdn" type="java.lang.Integer"> <column name="msisdn" not-null="true"/> </property> <property name="smsContent" type="java.lang.String"> <column name="sms_content" not-null="true"/> </property> </class> </hibernate-mapping>
|
¡¡×îºó£¬»¹ÊÇͨ¹ýSpring°Ñhibernate¼¯³ÉÆðÀ´£¬Êý¾Ý¿âÁ¬½Ó³ØÓõÄÊÇ£ºDBCP¡£¶ÔÓ¦µÄSpringÅäÖÃÎļþjdbc-hibernate-bean.xmlµÄÄÚÈÝÈçÏ£º
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd"
default-autowire="byType" default-lazy-init="false"> <bean id="placeholder"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>newlandframework/storm/resource/jdbc.properties</value> </list> </property> </bean> <bean id="dbcpDataSource"
class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${database.driverClassName}"/> <property name="url" value="${database.url}"/> <property name="username" value="${database.username}"/> <property name="password" value="${database.password}"/> <property name="maxActive" value="32"/> <property name="initialSize" value="1"/> <property name="maxWait" value="60000"/> <property name="maxIdle" value="32"/> <property name="minIdle" value="5"/> <property name="removeAbandoned" value="true"/> <property name="removeAbandonedTimeout" value="180"/> <property name="connectionProperties"
value="bigStringTryClob=true;clientEncoding=GBK;
defaultRowPrefetch=50;serverEncoding=ISO-8859-1"/> <property name="timeBetweenEvictionRunsMillis"> <value>60000</value> </property> <property name="minEvictableIdleTimeMillis"> <value>1800000</value> </property> </bean> <!-- hibernate session factory --> <bean id="sessionFactory"
class="org.springframework.orm.hibernate3.LocalSessionFactoryBean"> <property name="dataSource" ref="dbcpDataSource"/> <property name="configLocation"
value="newlandframework/storm/resource/hibernate.cfg.xml"/> <property name="eventListeners"> <map></map> </property> <property name="entityCacheStrategies"> <props></props> </property> <property name="collectionCacheStrategies"> <props></props> </property> <property name="configurationClass"> <value>org.hibernate.cfg.AnnotationConfiguration</value> </property> </bean> <bean id="hibernateTemplete"
class="org.springframework.orm.hibernate3.HibernateTemplate"> <property name="sessionFactory" ref="sessionFactory"/> </bean> </beans> |
µ½´ËΪֹ£¬ÎÒÃÇÒѾÍê³ÉÁËÃô¸ÐÐÅϢʵʱ¼à¿ØµÄËùÓеÄStorm×é¼þµÄ¿ª·¢¡£ÏÖÔÚ£¬ÎÒÃÇÀ´Íê³ÉStormµÄÍØÆË£¨Topology£©£¬ÓÉÓÚÍØÆË£¨Topology£©ÓÖ·ÖΪ±¾µØÍØÆËºÍ·Ö²¼Ê½ÍØÆË£¬Òò´Ë·â×°ÁËÒ»¸ö¹¤¾ßÀàStormRunner£¨ÍØÆËÖ´ÐÐÆ÷£©£¬¶ÔÓ¦µÄ´úÂëÈçÏ£º
/** * @filename:StormRunner.java * * Newland Co. Ltd. All rights reserved. * * @Description:ÍØÆËÖ´ÐÐÆ÷ * @author tangjie * @version 1.0 * */
package newlandframework.storm.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
public final class StormRunner {
private static final int MILLIS_IN_SEC = 1000;
// ±¾µØÍØÆË StormÓÃÒ»¸ö½ø³ÌÀïÃæµÄN¸öÏ߳̽øÐÐÄ£Äâ
public static void runTopologyLocally(StormTopology
topology,
String topologyName, Config conf, int runtimeInSeconds)
throws InterruptedException {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology);
Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
cluster.killTopology(topologyName);
cluster.shutdown();
}
// ·Ö²¼Ê½ÍØÆË ÕæÕýµÄStorm¼¯ÈºÔËÐл·¾³
public static void runTopologyRemotely(StormTopology
topology,
String topologyName, Config conf) throws AlreadyAliveException,
InvalidTopologyException {
StormSubmitter.submitTopology(topologyName, conf,
topology);
}
}
|
ºÃÁË£¬ÏÖÔÚÎÒÃǰÑÉÏÃæËùÓеÄSpouts/BoltsÆ´½Ó³É¡°ÍØÆË¡±£¨Topology£©½á¹¹£¬ÎÒÃÇÕâÀïÓõÄÊÇ·Ö²¼Ê½ÍØÆË£¬À´½øÐв¿ÊðÔËÐС£¾ßÌåµÄSensitiveTopology£¨Ãô¸ÐÓû§¼à¿ØStormÍØÆË£©´úÂëÈçÏ£º
/** * @filename:SensitiveTopology.java * * Newland Co. Ltd. All rights reserved. * * @Description:Ãô¸ÐÓû§¼à¿ØStormÍØÆË * @author tangjie * @version 1.0 * */
package newlandframework.storm.topology;
import java.sql.SQLException;
import newlandframework.storm.bolt.SensitiveBatchBolt;
import newlandframework.storm.bolt.SensitiveFileAnalyzer;
import newlandframework.storm.model.RubbishUsers;
import newlandframework.storm.spout.SensitiveFileReader;
import org.apache.commons.lang.StringUtils;
import backtype.storm.Config;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class SensitiveTopology {
// Spout/BoltµÄID¶¨Òå
public static final String SensitiveSpoutFuZhou
= "SensitiveSpout591";
public static final String SensitiveSpoutXiaMen
= "SensitiveSpout592";
public static final String SensitiveBoltAnalysis
= "SensitiveBoltAnalysis";
public static final String SensitiveBoltPersistence
= "SensitiveBolPersistence";
public static void main(String[] args) throws
SQLException {
System.out.println(StringUtils.center("SensitiveTopology",
40, "*"));
TopologyBuilder builder = new TopologyBuilder();
// ¹¹½¨spout£¬·Ö±ðÉèÖò¢ÐжÈΪ2
builder.setSpout(SensitiveSpoutFuZhou, new SensitiveFileReader(
SensitiveFileReader.InputFuZhouPath), 2);
builder.setSpout(SensitiveSpoutXiaMen, new SensitiveFileReader(
SensitiveFileReader.InputXiaMenPath), 2);
// ¹¹½¨boltÉèÖò¢ÐжÈΪ4
builder.setBolt(SensitiveBoltAnalysis, new SensitiveFileAnalyzer(),
4)
.shuffleGrouping(SensitiveSpoutFuZhou)
.shuffleGrouping(SensitiveSpoutXiaMen);
// ¹¹½¨boltÉèÖò¢ÐжÈΪ4
SensitiveBatchBolt persistenceBolt = new SensitiveBatchBolt();
builder.setBolt(SensitiveBoltPersistence, persistenceBolt,
4)
.fieldsGrouping(
SensitiveBoltAnalysis,
new Fields(RubbishUsers.HOMECITY_COLUMNNAME,
RubbishUsers.USERID_COLUMNNAME,
RubbishUsers.MSISDN_COLUMNNAME));
Config conf = new Config();
conf.setDebug(true);
// ÉèÖÃworker£¬¼¯ÈºÀïÃæ×î´ó¾Í8¸öslotsÁË£¬È«²¿Ê¹ÓÃÉÏ
conf.setNumWorkers(8);
// 3Ãë¼à¿ØÒ»´ÎÃô¸ÐÐÅÏ¢Èë¿âMySQLÇé¿ö
conf.put("RUBBISHMONITOR_INTERVAL",
3);
// Ö´Ðзֲ¼Ê½ÍØÆË
try {
StormRunner.runTopologyRemotely(builder.createTopology(),"SensitiveTopology",
conf);
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
}
|
µ½´ËΪֹ£¬ËùÓеÄStorm×é¼þÒѾ¿ª·¢Íê±Ï£¡ÏÖÔÚ£¬ÎÒÃǰÑÉÏÊö¹¤³Ì´ò³Éjar°ü£¬·Åµ½Storm¼¯ÈºÖÐÔËÐУ¬
¾ßÌå¿ÉÒÔµ½Nimbus¶ÔÓ¦µÄStorm°²×°Ä¿Â¼ÏÂÃæµÄbinĿ¼£¬ÊäÈ룺storm jar + {jar·¾¶}¡£
±ÈÈçÎÒÕâÀïÊÇÊäÈ룺storm jar /home/tj/install/SensitiveTopology.jar
newlandframework.storm.topology.SensitiveTopology£¬È»ºó£¬°ÑÒÉËÆÀ¬»ø¶ÌÐÅÓû§µÄÀ¬»ø¶ÌÐÅÄÚÈÝÎļþ·Åµ½Ö¸¶¨µÄ·þÎñÆ÷ÏÂÃæµÄĿ¼£¨/home/tj/data/591¡¢/home/tj/data/592£©£¬×îºó£¬´ò¿ª¸Õ²ÅµÄStorm
UI£¬¹Û²ìÈÎÎñµÄÆô¶¯Ö´ÐÐÇé¿ö£¬ÕâÀïÈçÏÂͼËùʾ£º
¡¡ ¡¡¡¡
¿ÉÒÔ¿´µ½ÎÒÃǸղÅÌá½»µÄÍØÆË£ºSensitiveTopologyÒѾ³É¹¦Ìá½»µ½Storm¼¯ÈºÀïÃæÁË¡£Õâ¸öʱºò£¬Äã¿ÉÒÔÊó±êµã»÷SensitiveTopology£¬È»ºó£¬»á´ò¿ªÈçϵÄÒ»¸öSpouts/BoltsµÄ¼à¿Ø½çÃæ£¬ÈçÏÂͼËùʾ£º
¡¡
ÎÒÃÇ¿ÉÒÔºÜÇå³þµÄ¿´µ½£ºSpouts×é¼þ£¨Óû§¶ÌÐŲɼ¯Æ÷£©£ºSensitiveFileReader591¡¢SensitiveFileReader592µÄÏß³ÌÊýexecutors¡¢ÈÎÎñÌá½»emittedÇé¿ö¡£ÒÔ¼°Bolts×é¼þ£º¼à¿Ø¶ÌÐÅÄÚÈݲð½â·ÖÎöÆ÷£¨SensitiveFileAnalyzer£©¡¢Ãô¸ÐÐÅÏ¢²É¼¯´¦Àí£¨SensitiveBatchBolt£©µÄÔËÐÐÇé¿ö£¬ÕâÑù¼à¿ØÆðÀ´¾Í·Ç³£·½±ã¡£³ý´ËÖ®Í⣬ÎÒÃÇ»¹¿ÉÒÔµ½¶ÔÓ¦µÄSupervisor·þÎñÆ÷¶ÔÓ¦µÄStorm°²×°Ä¿Â¼ÏÂÃæµÄlogsĿ¼£¬²é¿´Ò»ÏÂworkerµÄ¹¤×÷ÈÕÖ¾£¬ÎÒÃÇÀ´¿´ÏÂÃô¸ÐÐÅÏ¢¼à¿Ø¹ýÂ˵Ĵ¦ÀíÇé¿ö£¬½ØÍ¼ÈçÏ£º
¡¡
ͨ¹ýSensitiveBatchBoltÄ£¿éµÄ¼à¿ØỊ̈߳¬¿ÉÒÔ¿´µ½£¬ÎÒÃÇĿǰÒѾ²É¼¯µ½ÁË9¸öÃô¸ÐÐÅÏ¢Óû§ÁË£¬ÎÒÃÇÔÙÀ´¿´Ï£¬ÕâЩ°üº¬Ãô¸Ð¹Ø¼ü×ÖµÄÓû§ÓÐûÓÐÈë¿âMySQL³É¹¦ÄØ£¿
¡¡
·¢ÏÖÈë¿âµÄ½á¹ûÒ²ÊÇ9¸ö£¬ºÍÈÕÖ¾´òÓ¡µÄÊýÁ¿ÉÏÊÇÒ»Öµġ£¶øÇÒÀ¬»ø¶ÌÐÅÄÚÈÝsms_content¹ûÈ»¶¼°üº¬ÁË¡°racketeer¡±¡¢¡°Bad¡±ÕâЩÃô¸Ð¹Ø¼ü×Ö£¡ÍêÈ«·ûºÏÎÒÃǵÄÔ¤ÆÚ¡£¶øÇÒ£¬ÒÔºóÎļþ´¦ÀíÁ¿ÉÏÀ´ÁË£¬ÎÒÃÇ¿ÉÒÔͨ¹ýµ÷ÕûÉèÖÃSpouts/BoltsµÄ²¢Ðжȣ¬ºÍWorkerµÄÊýÁ¿½øÐл¯½â¡£µ±È»£¬Ä㻹¿ÉÒÔͨ¹ýˮƽÀ©Õ¹¼¯ÈºµÄÊýÁ¿À´½â¾öÕâ¸öÎÊÌâ¡£
StormÔÚApache¿ªÔ´ÏîÄ¿µÄÍøÖ·ÊÇ£ºhttp://storm.apache.org/£¬ÓÐÐËȤµÄÅóÓÑ¿ÉÒÔ¾³£¹Ø×¢Ò»Ï¡£¹ÙÍøÉÏÃæÓкÜȨÍþµÄ¼¼Êõ¹æ·¶ËµÃ÷£¬ÒÔ¼°ÈçºÎ°ÑStormºÍÏûÏ¢¶ÓÁС¢HDFS¡¢HBaseÓÐЧµÄ¼¯³ÉÆðÀ´¡£Ä¿Ç°ÔÚ¹úÄÚ£¬¾ÍÎÒ¸öÈË¿´·¨£¬¶ÔStorm·ÖÎöÓ¦Óã¬×öµÃ×îºÃµÄÓ¦¸ÃËãÊǰ¢Àï°Í°Í£¬ËüÔÚÔÀ´StormµÄ»ù´¡ÉϼÓÒÔ¸ÄÁ¼£¬¿ªÔ´³öJStorm£¬ÓÐÐËȤµÄÅóÓÑ£¬Í¬Ñù¿ÉÒÔ¶à¹Ø×¢Ò»Ï¡£
½èÖúStorm£¬ÎÒÃÇ¿ÉÒÔºÜÇáËɵؿª·¢·Ö²¼Ê½ÊµÊ±´¦ÀíÓ¦Óã¬ÉÏÊö³¡¾°µÄÉè¼Æ£¬Ö»ÊÇStormÓ¦ÓõÄÒ»¸ö°¸Àý¡£Ïà±È´«Í³µÄµ¥»ú·þÎñÆ÷Ó¦ÓöøÑÔ£¬¼¯Èº»¯µÄ²¢ÐÐÐͬ¼ÆËã´¦Àí£¬ÊÇÔÆ¼ÆËã¡¢´óÊý¾Ýʱ´úµÄÒ»¸öÇ÷ÊÆ
|