½Ì³Ì
ÔÚÕâ¸ö½Ì³ÌÀïÃæÎÒÃǽ«Ñ§Ï°ÈçºÎ´´½¨Topologies,²¢ÇÒ°Ñtopologies²¿Êðµ½stormµÄ¼¯ÈºÀïÃæÈ¥¡£Java½«ÊÇÎÒÃÇÖ÷ÒªµÄʾ·¶ÓïÑÔ£¬
¸ö±ðÀý×Ó»áʹÓÃpythonÒÔÑÝʾstormµÄ¶àÓïÑÔÌØÐÔ¡£
×¼±¸¹¤×÷
Õâ¸ö½Ì³ÌʹÓÃstorm-starterÏîÄ¿ÀïÃæµÄÀý×Ó¡£ÎÒÍÆ¼öÄãÃÇÏÂÔØÕâ¸öÏîÄ¿µÄ´úÂë²¢ÇÒ¸ú׎̳ÌÒ»Æð×ö¡£ÏȶÁһϣºÅäÖÃstorm¿ª·¢»·¾³ºÍн¨Ò»¸östromÏîÄ¿ÕâÁ½ÆªÎÄÕ°ÑÄãµÄ»úÆ÷ÉèÖúá£
Ò»¸öStorm¼¯ÈºµÄ»ù±¾×é¼þ
stormµÄ¼¯Èº±íÃæÉÏ¿´ºÍhadoopµÄ¼¯Èº·Ç³£Ïñ¡£µ«ÊÇÔÚHadoopÉÏÃæÄãÔËÐеÄÊÇMapReduceµÄJob,
¶øÔÚStormÉÏÃæÄãÔËÐеÄÊÇTopology¡£ËüÃÇÊǷdz£²»Ò»ÑùµÄ ¡ªÒ»¸ö¹Ø¼üµÄÇø±ðÊÇ£º Ò»¸öMapReduce
Job×îÖÕ»á½áÊø£¬¶øÒ»¸öTopologyÔËÓÀÔ¶ÔËÐУ¨³ý·ÇÄãÏÔʽµÄɱµôËû£©¡£
ÔÚStormµÄ¼¯ÈºÀïÃæÓÐÁ½Öֽڵ㣺 ¿ØÖƽڵã(master node)ºÍ¹¤×÷½Úµã(worker
node)¡£¿ØÖƽڵãÉÏÃæÔËÐÐÒ»¸öºǫ́³ÌÐò£º Nimbus£¬ ËüµÄ×÷ÓÃÀàËÆHadoopÀïÃæµÄJobTracker¡£Nimbus¸ºÔðÔÚ¼¯ÈºÀïÃæ·Ö²¼´úÂ룬·ÖÅ乤×÷¸ø»úÆ÷£¬²¢ÇÒ¼à¿Ø×´Ì¬¡£
ÿһ¸ö¹¤×÷½ÚµãÉÏÃæÔËÐÐÒ»¸ö½Ð×öSupervisorµÄ½Úµã¡£Supervisor»á¼àÌý·ÖÅ䏸ËüÄÇ̨»úÆ÷µÄ¹¤×÷£¬¸ù¾ÝÐèÒª
Æô¶¯/¹Ø±Õ¹¤×÷½ø³Ì¡£Ã¿Ò»¸ö¹¤×÷½ø³ÌÖ´ÐÐÒ»¸öTopologyµÄÒ»¸ö×Ó¼¯£»Ò»¸öÔËÐеÄTopologyÓÉÔËÐÐÔںܶà»úÆ÷Éϵĺܶ๤×÷½ø³Ì×é³É¡£
storm topology½á¹¹
NimbusºÍSupervisorÖ®¼äµÄËùÓÐе÷¹¤×÷¶¼ÊÇͨ¹ýÒ»¸öZookeeper¼¯ÈºÀ´Íê³É¡£²¢ÇÒ£¬nimbus½ø³ÌºÍsupervisor¶¼ÊÇ¿ìËÙʧ°Ü£¨fail-fast)ºÍÎÞ״̬µÄ¡£ËùÓеÄ״̬ҪôÔÚZookeeperÀïÃæ£¬
ҪôÔÚ±¾µØ´ÅÅÌÉÏ¡£ÕâÒ²¾ÍÒâζ×ÅÄã¿ÉÒÔÓÃkill -9À´É±ËÀnimbusºÍsupervisor½ø³Ì£¬ È»ºóÔÙÖØÆôËüÃÇ£¬ËüÃÇ¿ÉÒÔ¼ÌÐø¹¤×÷£¬¾ÍºÃÏñʲô¶¼Ã»Óз¢Éú¹ýËÆµÄ¡£Õâ¸öÉè¼ÆÊ¹µÃstorm²»¿É˼ÒéµÄÎȶ¨¡£
Topologies
ΪÁËÔÚstormÉÏÃæ×öʵʱ¼ÆË㣬 ÄãҪȥ½¨Á¢Ò»Ð©topologies¡£Ò»¸ötopology¾ÍÊÇÒ»¸ö¼ÆËã½ÚµãËù×é³ÉµÄͼ¡£TopologyÀïÃæµÄÿ¸ö´¦Àí½Úµã¶¼°üº¬´¦ÀíÂß¼£¬
¶ø½ÚµãÖ®¼äµÄÁ¬½ÓÔò±íʾÊý¾ÝÁ÷¶¯µÄ·½Ïò¡£
ÔËÐÐÒ»¸öTopologyÊǺܼòµ¥µÄ¡£Ê×ÏÈ£¬°ÑÄãËùÓеĴúÂëÒÔ¼°ËùÒÀÀµµÄjar´ò½øÒ»¸öjar°ü¡£È»ºóÔËÐÐÀàËÆÏÂÃæµÄÕâ¸öÃüÁî¡£
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2 |
Õâ¸öÃüÁî»áÔËÐÐÖ÷Àà: backtype.strom.MyTopology,²ÎÊýÊÇarg1, arg2¡£Õâ¸öÀàµÄmainº¯Êý¶¨ÒåÕâ¸ötopology²¢ÇÒ°ÑËüÌá½»¸øNimbus¡£storm
jar¸ºÔðÁ¬½Óµ½nimbus²¢ÇÒÉÏ´«jarÎļþ¡£
ÒòΪtopologyµÄ¶¨ÒåÆäʵ¾ÍÊÇÒ»¸öThrift½á¹¹²¢ÇÒnimbus¾ÍÊÇÒ»¸öThrift·þÎñ£¬ÓпÉÒÔÓÃÈκÎÓïÑÔ´´½¨²¢ÇÒÌá½»topology¡£ÉÏÃæµÄ·½ÃæÊÇÓÃJVM
-basedÓïÑÔÌá½»µÄ×î¼òµ¥µÄ·½·¨, ¿´Ò»ÏÂÎÄÕÂ: ÔÚÉú²ú¼¯ÈºÉÏÔËÐÐtopologyÈ¥¿´¿´ÔõôÆô¶¯ÒÔ¼°Í£Ö¹topologies¡£
Stream
StreamÊÇstormÀïÃæµÄ¹Ø¼ü³éÏó¡£Ò»¸östreamÊÇÒ»¸öûÓб߽çµÄtupleÐòÁС£stormÌṩһЩÔÓïÀ´·Ö²¼Ê½µØ¡¢¿É¿¿µØ°ÑÒ»¸östream´«Êä½øÒ»¸öеÄstream¡£±ÈÈ磺
Äã¿ÉÒÔ°ÑÒ»¸ötweetsÁ÷´«Êäµ½ÈÈÃÅ»°ÌâµÄÁ÷¡£
stormÌṩµÄ×î»ù±¾µÄ´¦ÀístreamµÄÔÓïÊÇspoutºÍbolt¡£Äã¿ÉÒÔʵÏÖSpoutºÍBolt¶ÔÓ¦µÄ½Ó¿ÚÒÔ´¦ÀíÄãµÄÓ¦ÓõÄÂß¼¡£
spoutµÄÁ÷µÄÔ´Í·¡£±ÈÈçÒ»¸öspout¿ÉÄÜ´ÓKestrel¶ÓÁÐÀïÃæ¶ÁÈ¡ÏûÏ¢²¢ÇÒ°ÑÕâЩÏûÏ¢·¢Éä³ÉÒ»¸öÁ÷¡£ÓÖ±ÈÈçÒ»¸öspout¿ÉÒÔµ÷ÓÃtwitterµÄÒ»¸öapi²¢ÇÒ°Ñ·µ»ØµÄtweets·¢Éä³ÉÒ»¸öÁ÷¡£
bolt¿ÉÒÔ½ÓÊÕÈÎÒâ¶à¸öÊäÈëstream£¬×÷һЩ´¦Àí£¬ ÓÐЩbolt¿ÉÄÜ»¹»á·¢ÉäһЩеÄstream¡£Ò»Ð©¸´ÔÓµÄÁ÷ת»»£¬
±ÈÈç´ÓһЩtweetÀïÃæ¼ÆËã³öÈÈÃÅ»°Ì⣬ÐèÒª¶à¸ö²½Ö裬 ´Ó¶øÒ²¾ÍÐèÒª¶à¸öbolt¡£ Bolt¿ÉÒÔ×öÈκÎÊÂÇé:
ÔËÐк¯Êý£¬ ¹ýÂËtuple, ×öһЩ¾ÛºÏ£¬ ×öһЩºÏ²¢ÒÔ¼°·ÃÎÊÊý¾Ý¿âµÈµÈ¡£
spoutºÍboltËù×é³ÉÒ»¸öÍøÂç»á±»´ò°ü³Étopology£¬ topologyÊÇstormÀïÃæ×î¸ßÒ»¼¶µÄ³éÏó£¬Äã¿ÉÒÔ°ÑtopologyÌá½»¸østormµÄ¼¯ÈºÀ´ÔËÐС£topologyµÄ½á¹¹ÔÚTopologyÄÇÒ»¶ÎÒѾ˵¹ýÁË£¬ÕâÀï¾Í²»ÔÙ׸ÊöÁË¡£

topology½á¹¹
topologyÀïÃæµÄÿһ¸ö½Úµã¶¼ÊDz¢ÐÐÔËÐеġ£ ÔÚÄãµÄtopologyÀïÃæ£¬ Äã¿ÉÒÔÖ¸¶¨Ã¿¸ö½ÚµãµÄ²¢Ðжȣ¬
stormÔò»áÔÚ¼¯ÈºÀïÃæ·ÖÅäÄÇô¶àÏß³ÌÀ´Í¬Ê±¼ÆËã¡£
Ò»¸ötopology»áÒ»Ö±ÔËÐÐÖ±µ½ÄãÏÔʽֹͣËü¡£storm×Ô¶¯ÖØÐ·ÖÅäһЩÔËÐÐʧ°ÜµÄÈÎÎñ£¬ ²¢ÇÒstorm±£Ö¤Äã²»»áÓÐÊý¾Ý¶ªÊ§£¬¼´Ê¹ÔÚһЩ»úÆ÷ÒâÍâÍ£»ú²¢ÇÒÏûÏ¢±»¶ªµôµÄÇé¿öÏ¡£
Êý¾ÝÄ£ÐÍ(Data Model)
stormʹÓÃtupleÀ´×÷ΪËüµÄÊý¾ÝÄ£ÐÍ¡£Ã¿¸ötupleÊÇÒ»¶ÑÖµ£¬Ã¿¸öÖµÓÐÒ»¸öÃû×Ö£¬²¢ÇÒÿ¸öÖµ¿ÉÒÔÊÇÈκÎÀàÐÍ£¬
ÔÚÎÒµÄÀí½âÀïÃæÒ»¸ötuple¿ÉÒÔ¿´×÷Ò»¸öûÓз½·¨µÄjava¶ÔÏó¡£×ÜÌåÀ´¿´£¬stormÖ§³ÖËùÓеĻù±¾ÀàÐÍ¡¢×Ö·û´®ÒÔ¼°×Ö½ÚÊý×é×÷ΪtupleµÄÖµÀà
ÐÍ¡£ÄãÒ²¿ÉÒÔʹÓÃÄã×Ô¼º¶¨ÒåµÄÀàÐÍÀ´×÷ΪֵÀàÐÍ£¬ Ö»ÒªÄãʵÏÖ¶ÔÓ¦µÄÐòÁл¯Æ÷(serializer)¡£
topologyÀïÃæµÄÿ¸ö½Úµã±ØÐ붨ÒåËüÒª·¢ÉäµÄtupleµÄÿ¸ö×ֶΡ£±ÈÈçÏÂÃæÕâ¸öbolt¶¨ÒåËüËù·¢ÉäµÄtuple°üº¬Á½¸ö×ֶΣ¬ÀàÐÍ·Ö±ðÊÇ:
doubleºÍtriple¡£
public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } } |
declareOutputFields·½·¨¶¨ÒåÒªÊä³öµÄ×Ö¶Î £º ["double",
"triple"]¡£Õâ¸öboltµÄÆäËü²¿·ÖÎÒÃǽÓÏÂÀ´»á½âÊÍ¡£
Ò»¸ö¼òµ¥µÄTopology
ÈÃÎÒÃÇÀ´¿´Ò»¸ö¼òµ¥µÄtopologyµÄÀý×Ó£¬ ÎÒÃÇ¿´Ò»ÏÂstorm-starterÀïÃæµÄExclamationTopology:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1"); |
Õâ¸öTopology°üº¬Ò»¸öSpoutºÍÁ½¸öBolt¡£Spout·¢Éäµ¥´Ê£¬ ÿ¸öboltÔÚÿ¸öµ¥´ÊºóÃæ¼Ó¸ö¡±!!!¡±¡£ÕâÈý¸ö½Úµã±»ÅųÉÒ»ÌõÏß:
spout·¢Éäµ¥´Ê¸øµÚÒ»¸öbolt£¬ µÚÒ»¸öboltÈ»ºó°Ñ´¦ÀíºÃµÄµ¥´Ê·¢É䏸µÚ¶þ¸öbolt¡£Èç¹ûspout·¢ÉäµÄµ¥´ÊÊÇ["bob"]ºÍ["john"],
ÄÇôµÚ¶þ¸öbolt»á·¢Éä["bolt!!!!!!"]ºÍ["john!!!!!!"]³öÀ´¡£
ÎÒÃÇʹÓÃsetSpoutºÍsetBoltÀ´¶¨ÒåTopologyÀïÃæµÄ½Úµã¡£ÕâЩ·½·¨½ÓÊÕÎÒÃÇÖ¸¶¨µÄÒ»¸öid£¬
Ò»¸ö°üº¬´¦ÀíÂß¼µÄ¶ÔÏó(spout»òÕßbolt), ÒÔ¼°ÄãËùÐèÒªµÄ²¢Ðжȡ£
Õâ¸ö°üº¬´¦ÀíµÄ¶ÔÏóÈç¹ûÊÇspoutÄÇôҪʵÏÖIRichSpoutµÄ½Ó¿Ú£¬ Èç¹ûÊÇbolt£¬ÄÇô¾ÍҪʵÏÖIRichBolt½Ó¿Ú.
×îºóÒ»¸öÖ¸¶¨²¢ÐжȵIJÎÊýÊÇ¿ÉÑ¡µÄ¡£Ëü±íʾ¼¯ÈºÀïÃæÐèÒª¶àÉÙ¸öthreadÀ´Ò»ÆðÖ´ÐÐÕâ¸ö½Úµã¡£Èç¹ûÄãºöÂÔËüÄÇôstorm»á·ÖÅäÒ»¸öÏß³ÌÀ´Ö´ÐÐÕâ¸ö½Úµã¡£
setBolt·½·¨·µ»ØÒ»¸öInputDeclarer¶ÔÏó£¬Õâ¸ö¶ÔÏóÊÇÓÃÀ´¶¨ÒåBoltµÄÊäÈë¡£ ÕâÀïµÚÒ»¸öBoltÉùÃ÷ËüÒª¶ÁÈ¡spoutËù·¢ÉäµÄËùÓеÄtuple
¡ª ʹÓÃshuffle
grouping¡£¶øµÚ¶þ¸öboltÉùÃ÷Ëü¶ÁÈ¡µÚÒ»¸öboltËù·¢ÉäµÄtuple¡£shuffle grouping±íʾËùÓеÄtuple»á±»Ëæ»úµÄ·Ö·¢¸øboltµÄËùÓÐtask¡£¸øtask·Ö·¢tupleµÄ²ßÂÔÓкܶàÖÖ£¬ºóÃæ»á½éÉÜ¡£
Èç¹ûÄãÏëµÚ¶þ¸öbolt¶ÁÈ¡spoutºÍµÚÒ»¸öboltËù·¢ÉäµÄËùÓеÄtuple£¬ ÄÇôÄãÓ¦¸ÃÕâÑù¶¨ÒåµÚ¶þ¸öbolt:
builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1"); |
ÈÃÎÒÃÇÉîÈëµØ¿´Ò»ÏÂÕâ¸ötopologyÀïÃæµÄspoutºÍboltÊÇÔõôʵÏֵġ£Spout¸ºÔð·¢ÉäеÄtupleµ½Õâ¸ötopologyÀïÃæ
À´¡£TestWordSpout´Ó["nathan", "mike",
"jackson", "golda", "bertels"]ÀïÃæËæ»úÑ¡ÔñÒ»¸öµ¥´Ê·¢Éä³öÀ´¡£TestWordSpoutÀïÃæµÄnextTuple()·½·¨ÊÇÕâÑù¶¨ÒåµÄ£º
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } |
¿ÉÒÔ¿´µ½£¬ÊµÏֺܼòµ¥¡£
ExclamationBolt°Ñ¡±!!!¡±Æ´½Óµ½ÊäÈëtupleºóÃæ¡£ÎÒÃÇÀ´¿´ÏÂExclamationBoltµÄÍêÕûʵÏÖ¡£
public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public Map getComponentConfiguration() { return null; } } |
prepare·½·¨Ìṩ¸øboltÒ»¸öOutputcollectorÓÃÀ´·¢Éätuple¡£Bolt¿ÉÒÔÔÚÈκÎʱºò·¢Éätuple
¡ª ÔÚprepare, execute»òÕßcleanup·½·¨ÀïÃæ, »òÕßÉõÖÁÔÚÁíÒ»¸öÏß³ÌÀïÃæÒì²½·¢Éä¡£ÕâÀïprepare·½·¨Ö»ÊǼòµ¥µØ°ÑOutputCollector×÷Ϊһ¸öÀà×ֶα£´æÏÂÀ´¸øºóÃæexecute·½·¨Ê¹Óá£
execute·½·¨´ÓboltµÄÒ»¸öÊäÈë½ÓÊÕtuple(Ò»¸öbolt¿ÉÄÜÓжà¸öÊäÈëÔ´). ExclamationBolt»ñÈ¡tupleµÄµÚÒ»¸ö×ֶΣ¬¼ÓÉÏ¡±!!!¡±Ö®ºóÔÙ·¢Éä³öÈ¥¡£Èç¹ûÒ»¸öboltÓжà¸öÊäÈëÔ´£¬Äã¿ÉÒÔͨ¹ýµ÷ÓÃ
Tuple#getSourceComponent·½·¨À´ÖªµÀËüÊÇÀ´×ÔÄĸöÊäÈëÔ´µÄ¡£
execute·½·¨ÀïÃæ»¹ÓÐÆäËüһЩÊÂÇéÖµµÃÒ»Ì᣺ ÊäÈëtuple±»×÷Ϊemit·½·¨µÄµÚÒ»¸ö²ÎÊý£¬²¢ÇÒÊäÈëtupleÔÚ×îºóÒ»Ðб»ack¡£ÕâÐ©ÄØ¶¼ÊÇStorm¿É¿¿ÐÔAPIµÄÒ»²¿·Ö£¬ºóÃæ»á½âÊÍ¡£
cleanup·½·¨ÔÚbolt±»¹Ø±ÕµÄʱºòµ÷Óã¬ËüÓ¦¸ÃÇåÀíËùÓб»´ò¿ªµÄ×ÊÔ´¡£µ«ÊǼ¯Èº²»±£Ö¤Õâ¸ö·½·¨Ò»¶¨»á±»Ö´ÐС£±ÈÈçÖ´ÐÐtaskµÄ»úÆ÷downµôÁË£¬ÄÇô¸ù±¾¾ÍûÓа취À´µ÷ÓÃÄǸö·½·¨¡£cleanupÉè¼ÆµÄʱºòÊDZ»ÓÃÀ´ÔÚlocal
modeµÄʱºò²Å±»µ÷ÓÃ(Ò²¾ÍÊÇ˵ÔÚÒ»¸ö½ø³ÌÀïÃæÄ£ÄâÕû¸östorm¼¯Èº), ²¢ÇÒÄãÏëÔڹرÕһЩtopologyµÄʱºò±ÜÃâ×ÊԴй©¡£
×îºó£¬declareOutputFields¶¨ÒåÒ»¸ö½Ð×ö¡±word¡±µÄ×ֶεÄtuple¡£
getComponentConfiguration ·½·¨ÔÊÐíÄãÅäÖÃ×é¼þ¸ÃÔõôÔËÐС£ÕâÊÇÒ»¸ö¸ü¸ß¼¶µÄÖ÷Ìâ,¸ü½øÒ»²½µÄ½âÊͼûÅäÖá£
cleanup ·½·¨ºÍgetComponentConfiguration ·½·¨ÔÚÒ»¸öboltʵÏÖÖо³£²»ÊDZØÐëµÄ¡£Äã¿ÉÒÔͨ¹ýʵÏÖÒ»¸ö»ùÀà¸ü¼ò½àµÄ¶¨ÒåÒ»¸öbolt£¬´Ë»ùÀà»áÌṩÕâÁ½¸ö·½·¨µÄĬÈÏʵÏÖ¡£ËùÒÔExclamationBolt¿ÉÒÔͨ¹ýÀ©Õ¹BaseRichBolt´Ó¶ø±äµÃ¸ü¼ò½à,¾ÍÏñÕâÑù:
public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
ÒÔlocal modeÔËÐÐExclamationTopology
ÈÃÎÒÃÇ¿´¿´ÔõôÒÔlocal modeÔËÐÐExclamationToplogy¡£
stormµÄÔËÐÐÓÐÁ½ÖÖģʽ:±¾µØÄ£Ê½ºÍ·Ö²¼Ê½Ä£Ê½. ÔÚ±¾µØÄ£Ê½ÖУ¬ stormÓÃÒ»¸ö½ø³ÌÀïÃæµÄÏß³ÌÀ´Ä£ÄâËùÓеÄspoutºÍbolt.
±¾µØÄ£Ê½¶Ô¿ª·¢ºÍ²âÊÔÀ´Ëµ±È½ÏÓÐÓᣠÄãÔËÐÐstorm-starterÀïÃæµÄtopologyµÄʱºòËüÃǾÍÊÇÒÔ±¾µØÄ£Ê½ÔËÐеģ¬Äã¿ÉÒÔ¿´µ½topologyÀïÃæµÄÿһ¸ö×é¼þÔÚ·¢ÉäʲôÏûÏ¢¡£
ÔÚ·Ö²¼Ê½Ä£Ê½Ï£¬ stormÓÉÒ»¶Ñ»úÆ÷×é³É¡£µ±ÄãÌá½»topology¸ømasterµÄʱºò£¬ ÄãͬʱҲ°ÑtopologyµÄ´úÂëÌá½»ÁË¡£master¸ºÔð·Ö·¢ÄãµÄ´úÂë²¢ÇÒ¸ºÔð¸øÄãµÄtopolgoy·ÖÅ乤×÷½ø³Ì¡£Èç¹ûÒ»¸ö¹¤×÷½ø³Ì¹ÒµôÁË£¬
master½Úµã»á°ÑÈÏÎªÖØÐ·ÖÅäµ½ÆäËü½Úµã¡£¹ØÓÚÈçºÎÔÚÒ»¸ö¼¯ÈºÉÏÃæÔËÐÐtopology£¬ Äã¿ÉÒÔ¿´¿´Running
topologies on a production clusterÎÄÕ¡£
ÏÂÃæÊÇÒÔ±¾µØÄ£Ê½ÔËÐÐExclamationTopologyµÄ´úÂë:
Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); |
Ê×ÏÈ£¬ Õâ¸ö´úÂ붨Òåͨ¹ý¶¨ÒåÒ»¸öLocalCluster¶ÔÏóÀ´¶¨ÒåÒ»¸ö½ø³ÌÄڵļ¯Èº¡£Ìá½»topology¸øÕâ¸öÐéÄâµÄ¼¯ÈººÍÌá½»topology¸ø·Ö²¼Ê½¼¯
ȺÊÇÒ»ÑùµÄ¡£Í¨¹ýµ÷ÓÃsubmitTopology·½·¨À´Ìá½»topology£¬ Ëü½ÓÊÜÈý¸ö²ÎÊý£ºÒªÔËÐеÄtopologyµÄÃû×Ö£¬Ò»¸öÅäÖöÔÏóÒÔ¼°ÒªÔËÐеÄtopology±¾Éí¡£
topologyµÄÃû×ÖÊÇÓÃÀ´Î¨Ò»Çø±ðÒ»¸ötopologyµÄ£¬ÕâÑùÄãÈ»ºó¿ÉÒÔÓÃÕâ¸öÃû×ÖÀ´É±ËÀÕâ¸ötopologyµÄ¡£Ç°ÃæÒѾ˵¹ýÁË£¬
Äã±ØÐëÏÔʽµÄɱµôÒ»¸ötopology£¬·ñÔòËü»áÒ»Ö±ÔËÐС£
Conf¶ÔÏó¿ÉÒÔÅäÖúܶණÎ÷£¬ ÏÂÃæÁ½¸öÊÇ×î³£¼ûµÄ£º
TOPOLOGY_WORKERS(setNumWorkers) ¶¨ÒåÄãÏ£Íû¼¯Èº·ÖÅä¶àÉÙ¸ö¹¤×÷½ø³Ì¸øÄãÀ´Ö´ÐÐÕâ¸ötopology.
topologyÀïÃæµÄÿ¸ö×é¼þ»á±»ÐèÒªÏß³ÌÀ´Ö´ÐС£Ã¿¸ö×é¼þµ½µ×ÓöàÉÙ¸öÏß³ÌÊÇͨ¹ýsetBoltºÍsetSpoutÀ´Ö¸¶¨µÄ¡£ÕâЩÏ̶߳¼ÔËÐÐÔÚ¹¤×÷½ø
³ÌÀïÃæ. ÿһ¸ö¹¤×÷½ø³Ì°üº¬Ò»Ð©½ÚµãµÄһЩ¹¤×÷Ï̡߳£±ÈÈ磬 Èç¹ûÄãÖ¸¶¨300¸öỊ̈߳¬60¸ö½ø³Ì£¬ ÄÇôÿ¸ö¹¤×÷½ø³ÌÀïÃæÒªÖ´ÐÐ6¸öỊ̈߳¬
¶øÕâ6¸öÏ߳̿ÉÄÜÊôÓÚ²»Í¬µÄ×é¼þ(Spout, Bolt)¡£Äã¿ÉÒÔͨ¹ýµ÷Õûÿ¸ö×é¼þµÄ²¢ÐжÈÒÔ¼°ÕâЩÏß³ÌËùÔڵĽø³ÌÊýÁ¿À´µ÷ÕûtopologyµÄÐÔÄÜ¡£
TOPOLOGY_DEBUG(setDebug), µ±Ëü±»ÉèÖóÉtrueµÄ»°£¬ storm»á¼Ç¼ÏÂÿ¸ö×é¼þËù·¢ÉäµÄÿÌõÏûÏ¢¡£ÕâÔÚ±¾µØ»·¾³µ÷ÊÔtopologyºÜÓÐÓã¬
µ«ÊÇÔÚÏßÉÏÕâô×öµÄ»°»áÓ°ÏìÐÔÄܵġ£
¸ÐÐËȤµÄ»°¿ÉÒÔÈ¥¿´¿´Conf¶ÔÏóµÄJavadocÈ¥¿´¿´topologyµÄËùÓÐÅäÖá£
¿ÉÒÔ¿´¿´´´½¨Ò»¸öÐÂstormÏîĿȥ¿´¿´ÔõôÅäÖÿª·¢»·¾³ÒÔʹÄãÄܹ»ÒÔ±¾µØÄ£Ê½ÔËÐÐtopology.
Á÷·Ö×é²ßÂÔ(Stream grouping)
Á÷·Ö×é²ßÂÔ¸æËßtopologyÈçºÎÔÚÁ½¸ö×é¼þÖ®¼ä·¢ËÍtuple¡£ Òª¼Çס£¬ spoutsºÍboltsÒԺܶàtaskµÄÐÎʽÔÚtopologyÀïÃæÍ¬²½Ö´ÐС£Èç¹û´ÓtaskµÄÁ£¶ÈÀ´¿´Ò»¸öÔËÐеÄtopology£¬
ËüÓ¦¸ÃÊÇÕâÑùµÄ:

´Ótask½Ç¶ÈÀ´¿´topology
µ±Bolt AµÄÒ»¸ötaskÒª·¢ËÍÒ»¸ötuple¸øBolt B£¬ ËüÓ¦¸Ã·¢Ë͸øBolt BµÄÄĸötaskÄØ£¿
stream groupingרÃŻشðÕâÖÖÎÊÌâµÄ¡£ÔÚÎÒÃÇÉîÈëÑо¿²»Í¬µÄstream grouping֮ǰ£¬ÈÃÎÒÃÇ¿´Ò»ÏÂstorm-starterÀïÃæµÄÁíÍâÒ»¸ötopology¡£WordCountTopology¶ÁȡһЩ¾ä×Ó£¬
Êä³ö¾ä×ÓÀïÃæÃ¿¸öµ¥´Ê³öÏֵĴÎÊý.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 12) .fieldsGrouping("split", new Fields("word")); |
SplitSentence¶ÔÓÚ¾ä×ÓÀïÃæµÄÿ¸öµ¥´Ê·¢ÉäÒ»¸öеÄtuple, WordCountÔÚÄÚ´æÀïÃæÎ¬»¤Ò»¸öµ¥´Ê->´ÎÊýµÄmapping£¬
WordCountÿÊÕµ½Ò»¸öµ¥´Ê£¬ Ëü¾Í¸üÐÂÄÚ´æÀïÃæµÄͳ¼Æ×´Ì¬¡£
Óкü¸ÖÖ²»Í¬µÄstream grouping:
×î¼òµ¥µÄgroupingÊÇshuffle grouping, ËüËæ»ú·¢¸øÈκÎÒ»¸ötask¡£ÉÏÃæÀý×ÓÀïÃæRandomSentenceSpoutºÍSplitSentenceÖ®¼äÓõľÍÊÇshuffle
grouping, shuffle grouping¶Ô¸÷¸ötaskµÄtuple·ÖÅäµÄ±È½Ï¾ùÔÈ¡£
Ò»ÖÖ¸üÓÐȤµÄgroupingÊÇfields grouping, SplitSentenceºÍWordCountÖ®¼äʹÓõľÍÊÇfields
grouping, ÕâÖÖgrouping»úÖÆ±£Ö¤ÏàͬfieldÖµµÄtuple»áȥͬһ¸ötask£¬ Õâ¶ÔÓÚWordCountÀ´Ëµ·Ç³£¹Ø¼ü£¬Èç¹ûͬһ¸öµ¥´Ê²»È¥Í¬Ò»¸ötask£¬
ÄÇôͳ¼Æ³öÀ´µÄµ¥´Ê´ÎÊý¾Í²»¶ÔÁË¡£
fields groupingÊÇstreamºÏ²¢£¬stream¾ÛºÏÒÔ¼°ºÜ¶àÆäËü³¡¾°µÄ»ù´¡¡£ÔÚ±³ºóÄØ£¬
fields groupingʹÓõÄÒ»ÖÂÐÔ¹þÏ£À´·ÖÅätupleµÄ¡£
»¹ÓÐһЩÆäËüÀàÐ͵Ästream grouping.Äã¿ÉÒÔÔÚConceptsÒ»ÕÂÀï¸üÏêϸµÄÁ˽⡣
ʹÓñðµÄÓïÑÔÀ´¶¨ÒåBolt
Bolt¿ÉÒÔʹÓÃÈκÎÓïÑÔÀ´¶¨Òå¡£ÓÃÆäËüÓïÑÔ¶¨ÒåµÄbolt»á±»µ±×÷×Ó½ø³Ì(subprocess)À´Ö´ÐУ¬
stormʹÓÃJSONÏûϢͨ¹ýstdin/stdoutÀ´ºÍÕâЩsubprocessͨÐÅ¡£Õâ¸öͨÐÅÐÒéÊÇÒ»¸öÖ»ÓÐ100ÐеĿ⣬
stormÍŶӸøÕâЩ¿â¿ª·¢Á˶ÔÓ¦µÄRuby, PythonºÍFancy°æ±¾¡£
ÏÂÃæÊÇWordCountTopologyÀïÃæµÄSplitSentenceµÄ¶¨Òå:
public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
SplitSentence¼Ì³Ð×ÔShellBolt²¢ÇÒÉùÃ÷Õâ¸öBoltÓÃpythonÀ´ÔËÐУ¬²¢ÇÒ²ÎÊýÊÇ:
splitsentence.py¡£ÏÂÃæÊÇsplitsentence.pyµÄ¶¨Òå:
import storm class SplitSentenceBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) SplitSentenceBolt().run() |
¸ü¶àÓйØÓÃÆäËüÓïÑÔ¶¨ÒåSpoutºÍBoltµÄÐÅÏ¢£¬ ÒÔ¼°ÓÃÆäËüÓïÑÔÀ´´´½¨topologyµÄ ÐÅÏ¢¿ÉÒԲμû:Using
non-JVM languages with Storm.
¿É¿¿µÄÏûÏ¢´¦Àí
ÔÚÕâ¸ö½Ì³ÌµÄÇ°Ãæ£¬ÎÒÃÇÌø¹ýÁËÓйØtupleµÄÒ»Ð©ÌØÕ÷¡£ÕâÐ©ÌØÕ÷¾ÍÊÇstormµÄ¿É¿¿ÐÔAPI£º stormÈçºÎ±£Ö¤spout·¢³öµÄÿһ¸ötuple¶¼±»ÍêÕû´¦Àí¡£¿´¿´¡¶stormÈçºÎ±£Ö¤ÏûÏ¢²»¶ªÊ§¡·ÒÔ¸üÉîÈëÁ˽âstormµÄ¿É¿¿ÐÔAPI.
½áÂÛ
Õâ¸öÈëÃŽ̳̱ȽϹ㷺µÄ½éÉÜÁË´Ó¿ª·¢£¬²âÊԺͲ¿ÊðÒ»¸ötopology.ÎĵµµÄÆäËü²¿·Ö»áÉîÈë½éÉÜʹÓÃstormµÄ¸÷¸ö·½Ãæ¡£
|