Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Storm½Ì³Ì
 
×÷Õߣºrzhzhz À´Ô´£ºCSDN ·¢²¼ÓÚ£º2015-6-17
  5129  次浏览      27
 

½Ì³Ì

ÔÚÕâ¸ö½Ì³ÌÀïÃæÎÒÃǽ«Ñ§Ï°ÈçºÎ´´½¨Topologies,²¢ÇÒ°Ñtopologies²¿Êðµ½stormµÄ¼¯ÈºÀïÃæÈ¥¡£Java½«ÊÇÎÒÃÇÖ÷ÒªµÄʾ·¶ÓïÑÔ£¬ ¸ö±ðÀý×Ó»áʹÓÃpythonÒÔÑÝʾstormµÄ¶àÓïÑÔÌØÐÔ¡£

×¼±¸¹¤×÷

Õâ¸ö½Ì³ÌʹÓÃstorm-starterÏîÄ¿ÀïÃæµÄÀý×Ó¡£ÎÒÍÆ¼öÄãÃÇÏÂÔØÕâ¸öÏîÄ¿µÄ´úÂë²¢ÇÒ¸ú׎̳ÌÒ»Æð×ö¡£ÏȶÁһϣºÅäÖÃstorm¿ª·¢»·¾³ºÍн¨Ò»¸östromÏîÄ¿ÕâÁ½ÆªÎÄÕ°ÑÄãµÄ»úÆ÷ÉèÖúá£

Ò»¸öStorm¼¯ÈºµÄ»ù±¾×é¼þ

stormµÄ¼¯Èº±íÃæÉÏ¿´ºÍhadoopµÄ¼¯Èº·Ç³£Ïñ¡£µ«ÊÇÔÚHadoopÉÏÃæÄãÔËÐеÄÊÇMapReduceµÄJob, ¶øÔÚStormÉÏÃæÄãÔËÐеÄÊÇTopology¡£ËüÃÇÊǷdz£²»Ò»ÑùµÄ ¡ªÒ»¸ö¹Ø¼üµÄÇø±ðÊÇ£º Ò»¸öMapReduce Job×îÖÕ»á½áÊø£¬¶øÒ»¸öTopologyÔËÓÀÔ¶ÔËÐУ¨³ý·ÇÄãÏÔʽµÄɱµôËû£©¡£

ÔÚStormµÄ¼¯ÈºÀïÃæÓÐÁ½Öֽڵ㣺 ¿ØÖƽڵã(master node)ºÍ¹¤×÷½Úµã(worker node)¡£¿ØÖƽڵãÉÏÃæÔËÐÐÒ»¸öºǫ́³ÌÐò£º Nimbus£¬ ËüµÄ×÷ÓÃÀàËÆHadoopÀïÃæµÄJobTracker¡£Nimbus¸ºÔðÔÚ¼¯ÈºÀïÃæ·Ö²¼´úÂ룬·ÖÅ乤×÷¸ø»úÆ÷£¬²¢ÇÒ¼à¿Ø×´Ì¬¡£

ÿһ¸ö¹¤×÷½ÚµãÉÏÃæÔËÐÐÒ»¸ö½Ð×öSupervisorµÄ½Úµã¡£Supervisor»á¼àÌý·ÖÅ䏸ËüÄÇ̨»úÆ÷µÄ¹¤×÷£¬¸ù¾ÝÐèÒª Æô¶¯/¹Ø±Õ¹¤×÷½ø³Ì¡£Ã¿Ò»¸ö¹¤×÷½ø³ÌÖ´ÐÐÒ»¸öTopologyµÄÒ»¸ö×Ó¼¯£»Ò»¸öÔËÐеÄTopologyÓÉÔËÐÐÔںܶà»úÆ÷Éϵĺܶ๤×÷½ø³Ì×é³É¡£

storm topology½á¹¹

NimbusºÍSupervisorÖ®¼äµÄËùÓÐЭµ÷¹¤×÷¶¼ÊÇͨ¹ýÒ»¸öZookeeper¼¯ÈºÀ´Íê³É¡£²¢ÇÒ£¬nimbus½ø³ÌºÍsupervisor¶¼ÊÇ¿ìËÙʧ°Ü£¨fail-fast)ºÍÎÞ״̬µÄ¡£ËùÓеÄ״̬ҪôÔÚZookeeperÀïÃæ£¬ ҪôÔÚ±¾µØ´ÅÅÌÉÏ¡£ÕâÒ²¾ÍÒâζ×ÅÄã¿ÉÒÔÓÃkill -9À´É±ËÀnimbusºÍsupervisor½ø³Ì£¬ È»ºóÔÙÖØÆôËüÃÇ£¬ËüÃÇ¿ÉÒÔ¼ÌÐø¹¤×÷£¬¾ÍºÃÏñʲô¶¼Ã»Óз¢Éú¹ýËÆµÄ¡£Õâ¸öÉè¼ÆÊ¹µÃstorm²»¿É˼ÒéµÄÎȶ¨¡£

Topologies

ΪÁËÔÚstormÉÏÃæ×öʵʱ¼ÆË㣬 ÄãҪȥ½¨Á¢Ò»Ð©topologies¡£Ò»¸ötopology¾ÍÊÇÒ»¸ö¼ÆËã½ÚµãËù×é³ÉµÄͼ¡£TopologyÀïÃæµÄÿ¸ö´¦Àí½Úµã¶¼°üº¬´¦ÀíÂß¼­£¬ ¶ø½ÚµãÖ®¼äµÄÁ¬½ÓÔò±íʾÊý¾ÝÁ÷¶¯µÄ·½Ïò¡£

ÔËÐÐÒ»¸öTopologyÊǺܼòµ¥µÄ¡£Ê×ÏÈ£¬°ÑÄãËùÓеĴúÂëÒÔ¼°ËùÒÀÀµµÄjar´ò½øÒ»¸öjar°ü¡£È»ºóÔËÐÐÀàËÆÏÂÃæµÄÕâ¸öÃüÁî¡£

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2  

Õâ¸öÃüÁî»áÔËÐÐÖ÷Àà: backtype.strom.MyTopology,²ÎÊýÊÇarg1, arg2¡£Õâ¸öÀàµÄmainº¯Êý¶¨ÒåÕâ¸ötopology²¢ÇÒ°ÑËüÌá½»¸øNimbus¡£storm jar¸ºÔðÁ¬½Óµ½nimbus²¢ÇÒÉÏ´«jarÎļþ¡£

ÒòΪtopologyµÄ¶¨ÒåÆäʵ¾ÍÊÇÒ»¸öThrift½á¹¹²¢ÇÒnimbus¾ÍÊÇÒ»¸öThrift·þÎñ£¬ÓпÉÒÔÓÃÈκÎÓïÑÔ´´½¨²¢ÇÒÌá½»topology¡£ÉÏÃæµÄ·½ÃæÊÇÓÃJVM
-basedÓïÑÔÌá½»µÄ×î¼òµ¥µÄ·½·¨, ¿´Ò»ÏÂÎÄÕÂ: ÔÚÉú²ú¼¯ÈºÉÏÔËÐÐtopologyÈ¥¿´¿´ÔõôÆô¶¯ÒÔ¼°Í£Ö¹topologies¡£

Stream

StreamÊÇstormÀïÃæµÄ¹Ø¼ü³éÏó¡£Ò»¸östreamÊÇÒ»¸öûÓб߽çµÄtupleÐòÁС£stormÌṩһЩԭÓïÀ´·Ö²¼Ê½µØ¡¢¿É¿¿µØ°ÑÒ»¸östream´«Êä½øÒ»¸öеÄstream¡£±ÈÈ磺 Äã¿ÉÒÔ°ÑÒ»¸ötweetsÁ÷´«Êäµ½ÈÈÃÅ»°ÌâµÄÁ÷¡£

stormÌṩµÄ×î»ù±¾µÄ´¦ÀístreamµÄÔ­ÓïÊÇspoutºÍbolt¡£Äã¿ÉÒÔʵÏÖSpoutºÍBolt¶ÔÓ¦µÄ½Ó¿ÚÒÔ´¦ÀíÄãµÄÓ¦ÓõÄÂß¼­¡£

spoutµÄÁ÷µÄÔ´Í·¡£±ÈÈçÒ»¸öspout¿ÉÄÜ´ÓKestrel¶ÓÁÐÀïÃæ¶ÁÈ¡ÏûÏ¢²¢ÇÒ°ÑÕâЩÏûÏ¢·¢Éä³ÉÒ»¸öÁ÷¡£ÓÖ±ÈÈçÒ»¸öspout¿ÉÒÔµ÷ÓÃtwitterµÄÒ»¸öapi²¢ÇÒ°Ñ·µ»ØµÄtweets·¢Éä³ÉÒ»¸öÁ÷¡£

bolt¿ÉÒÔ½ÓÊÕÈÎÒâ¶à¸öÊäÈëstream£¬×÷һЩ´¦Àí£¬ ÓÐЩbolt¿ÉÄÜ»¹»á·¢ÉäһЩеÄstream¡£Ò»Ð©¸´ÔÓµÄÁ÷ת»»£¬ ±ÈÈç´ÓһЩtweetÀïÃæ¼ÆËã³öÈÈÃÅ»°Ì⣬ÐèÒª¶à¸ö²½Ö裬 ´Ó¶øÒ²¾ÍÐèÒª¶à¸öbolt¡£ Bolt¿ÉÒÔ×öÈκÎÊÂÇé: ÔËÐк¯Êý£¬ ¹ýÂËtuple, ×öһЩ¾ÛºÏ£¬ ×öһЩºÏ²¢ÒÔ¼°·ÃÎÊÊý¾Ý¿âµÈµÈ¡£

spoutºÍboltËù×é³ÉÒ»¸öÍøÂç»á±»´ò°ü³Étopology£¬ topologyÊÇstormÀïÃæ×î¸ßÒ»¼¶µÄ³éÏó£¬Äã¿ÉÒÔ°ÑtopologyÌá½»¸østormµÄ¼¯ÈºÀ´ÔËÐС£topologyµÄ½á¹¹ÔÚTopologyÄÇÒ»¶ÎÒѾ­Ëµ¹ýÁË£¬ÕâÀï¾Í²»ÔÙ׸ÊöÁË¡£

topology½á¹¹

topologyÀïÃæµÄÿһ¸ö½Úµã¶¼ÊDz¢ÐÐÔËÐеġ£ ÔÚÄãµÄtopologyÀïÃæ£¬ Äã¿ÉÒÔÖ¸¶¨Ã¿¸ö½ÚµãµÄ²¢Ðжȣ¬ stormÔò»áÔÚ¼¯ÈºÀïÃæ·ÖÅäÄÇô¶àÏß³ÌÀ´Í¬Ê±¼ÆËã¡£

Ò»¸ötopology»áÒ»Ö±ÔËÐÐÖ±µ½ÄãÏÔʽֹͣËü¡£storm×Ô¶¯ÖØÐ·ÖÅäһЩÔËÐÐʧ°ÜµÄÈÎÎñ£¬ ²¢ÇÒstorm±£Ö¤Äã²»»áÓÐÊý¾Ý¶ªÊ§£¬¼´Ê¹ÔÚһЩ»úÆ÷ÒâÍâÍ£»ú²¢ÇÒÏûÏ¢±»¶ªµôµÄÇé¿öÏ¡£

Êý¾ÝÄ£ÐÍ(Data Model)

stormʹÓÃtupleÀ´×÷ΪËüµÄÊý¾ÝÄ£ÐÍ¡£Ã¿¸ötupleÊÇÒ»¶ÑÖµ£¬Ã¿¸öÖµÓÐÒ»¸öÃû×Ö£¬²¢ÇÒÿ¸öÖµ¿ÉÒÔÊÇÈκÎÀàÐÍ£¬ ÔÚÎÒµÄÀí½âÀïÃæÒ»¸ötuple¿ÉÒÔ¿´×÷Ò»¸öûÓз½·¨µÄjava¶ÔÏó¡£×ÜÌåÀ´¿´£¬stormÖ§³ÖËùÓеĻù±¾ÀàÐÍ¡¢×Ö·û´®ÒÔ¼°×Ö½ÚÊý×é×÷ΪtupleµÄÖµÀà ÐÍ¡£ÄãÒ²¿ÉÒÔʹÓÃÄã×Ô¼º¶¨ÒåµÄÀàÐÍÀ´×÷ΪֵÀàÐÍ£¬ Ö»ÒªÄãʵÏÖ¶ÔÓ¦µÄÐòÁл¯Æ÷(serializer)¡£

topologyÀïÃæµÄÿ¸ö½Úµã±ØÐ붨ÒåËüÒª·¢ÉäµÄtupleµÄÿ¸ö×ֶΡ£±ÈÈçÏÂÃæÕâ¸öbolt¶¨ÒåËüËù·¢ÉäµÄtuple°üº¬Á½¸ö×ֶΣ¬ÀàÐÍ·Ö±ðÊÇ: doubleºÍtriple¡£

public class DoubleAndTripleBolt extends BaseRichBolt {  
private OutputCollectorBase _collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}

@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}

declareOutputFields·½·¨¶¨ÒåÒªÊä³öµÄ×Ö¶Î £º ["double", "triple"]¡£Õâ¸öboltµÄÆäËü²¿·ÖÎÒÃǽÓÏÂÀ´»á½âÊÍ¡£

Ò»¸ö¼òµ¥µÄTopology

ÈÃÎÒÃÇÀ´¿´Ò»¸ö¼òµ¥µÄtopologyµÄÀý×Ó£¬ ÎÒÃÇ¿´Ò»ÏÂstorm-starterÀïÃæµÄExclamationTopology:

TopologyBuilder builder = new TopologyBuilder();          
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");

Õâ¸öTopology°üº¬Ò»¸öSpoutºÍÁ½¸öBolt¡£Spout·¢Éäµ¥´Ê£¬ ÿ¸öboltÔÚÿ¸öµ¥´ÊºóÃæ¼Ó¸ö¡±!!!¡±¡£ÕâÈý¸ö½Úµã±»ÅųÉÒ»ÌõÏß: spout·¢Éäµ¥´Ê¸øµÚÒ»¸öbolt£¬ µÚÒ»¸öboltÈ»ºó°Ñ´¦ÀíºÃµÄµ¥´Ê·¢É䏸µÚ¶þ¸öbolt¡£Èç¹ûspout·¢ÉäµÄµ¥´ÊÊÇ["bob"]ºÍ["john"], ÄÇôµÚ¶þ¸öbolt»á·¢Éä["bolt!!!!!!"]ºÍ["john!!!!!!"]³öÀ´¡£
ÎÒÃÇʹÓÃsetSpoutºÍsetBoltÀ´¶¨ÒåTopologyÀïÃæµÄ½Úµã¡£ÕâЩ·½·¨½ÓÊÕÎÒÃÇÖ¸¶¨µÄÒ»¸öid£¬ Ò»¸ö°üº¬´¦ÀíÂß¼­µÄ¶ÔÏó(spout»òÕßbolt), ÒÔ¼°ÄãËùÐèÒªµÄ²¢Ðжȡ£

Õâ¸ö°üº¬´¦ÀíµÄ¶ÔÏóÈç¹ûÊÇspoutÄÇôҪʵÏÖIRichSpoutµÄ½Ó¿Ú£¬ Èç¹ûÊÇbolt£¬ÄÇô¾ÍҪʵÏÖIRichBolt½Ó¿Ú.

×îºóÒ»¸öÖ¸¶¨²¢ÐжȵIJÎÊýÊÇ¿ÉÑ¡µÄ¡£Ëü±íʾ¼¯ÈºÀïÃæÐèÒª¶àÉÙ¸öthreadÀ´Ò»ÆðÖ´ÐÐÕâ¸ö½Úµã¡£Èç¹ûÄãºöÂÔËüÄÇôstorm»á·ÖÅäÒ»¸öÏß³ÌÀ´Ö´ÐÐÕâ¸ö½Úµã¡£

setBolt·½·¨·µ»ØÒ»¸öInputDeclarer¶ÔÏó£¬Õâ¸ö¶ÔÏóÊÇÓÃÀ´¶¨ÒåBoltµÄÊäÈë¡£ ÕâÀïµÚÒ»¸öBoltÉùÃ÷ËüÒª¶ÁÈ¡spoutËù·¢ÉäµÄËùÓеÄtuple ¡ª ʹÓÃshuffle

grouping¡£¶øµÚ¶þ¸öboltÉùÃ÷Ëü¶ÁÈ¡µÚÒ»¸öboltËù·¢ÉäµÄtuple¡£shuffle grouping±íʾËùÓеÄtuple»á±»Ëæ»úµÄ·Ö·¢¸øboltµÄËùÓÐtask¡£¸øtask·Ö·¢tupleµÄ²ßÂÔÓкܶàÖÖ£¬ºóÃæ»á½éÉÜ¡£

Èç¹ûÄãÏëµÚ¶þ¸öbolt¶ÁÈ¡spoutºÍµÚÒ»¸öboltËù·¢ÉäµÄËùÓеÄtuple£¬ ÄÇôÄãÓ¦¸ÃÕâÑù¶¨ÒåµÚ¶þ¸öbolt:

builder.setBolt("exclaim2", new ExclamationBolt(), 5)  
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");

ÈÃÎÒÃÇÉîÈëµØ¿´Ò»ÏÂÕâ¸ötopologyÀïÃæµÄspoutºÍboltÊÇÔõôʵÏֵġ£Spout¸ºÔð·¢ÉäеÄtupleµ½Õâ¸ötopologyÀïÃæ À´¡£TestWordSpout´Ó["nathan", "mike", "jackson", "golda", "bertels"]ÀïÃæËæ»úÑ¡ÔñÒ»¸öµ¥´Ê·¢Éä³öÀ´¡£TestWordSpoutÀïÃæµÄnextTuple()·½·¨ÊÇÕâÑù¶¨ÒåµÄ£º

public void nextTuple() {  
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}

¿ÉÒÔ¿´µ½£¬ÊµÏֺܼòµ¥¡£

ExclamationBolt°Ñ¡±!!!¡±Æ´½Óµ½ÊäÈëtupleºóÃæ¡£ÎÒÃÇÀ´¿´ÏÂExclamationBoltµÄÍêÕûʵÏÖ¡£

public static class ExclamationBolt implements IRichBolt {  
OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}

public void cleanup() {
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

public Map getComponentConfiguration() {
return null;
}
}

prepare·½·¨Ìṩ¸øboltÒ»¸öOutputcollectorÓÃÀ´·¢Éätuple¡£Bolt¿ÉÒÔÔÚÈκÎʱºò·¢Éätuple ¡ª ÔÚprepare, execute»òÕßcleanup·½·¨ÀïÃæ, »òÕßÉõÖÁÔÚÁíÒ»¸öÏß³ÌÀïÃæÒì²½·¢Éä¡£ÕâÀïprepare·½·¨Ö»ÊǼòµ¥µØ°ÑOutputCollector×÷Ϊһ¸öÀà×ֶα£´æÏÂÀ´¸øºóÃæexecute·½·¨Ê¹Óá£

execute·½·¨´ÓboltµÄÒ»¸öÊäÈë½ÓÊÕtuple(Ò»¸öbolt¿ÉÄÜÓжà¸öÊäÈëÔ´). ExclamationBolt»ñÈ¡tupleµÄµÚÒ»¸ö×ֶΣ¬¼ÓÉÏ¡±!!!¡±Ö®ºóÔÙ·¢Éä³öÈ¥¡£Èç¹ûÒ»¸öboltÓжà¸öÊäÈëÔ´£¬Äã¿ÉÒÔͨ¹ýµ÷Óà Tuple#getSourceComponent·½·¨À´ÖªµÀËüÊÇÀ´×ÔÄĸöÊäÈëÔ´µÄ¡£

execute·½·¨ÀïÃæ»¹ÓÐÆäËüһЩÊÂÇéÖµµÃÒ»Ì᣺ ÊäÈëtuple±»×÷Ϊemit·½·¨µÄµÚÒ»¸ö²ÎÊý£¬²¢ÇÒÊäÈëtupleÔÚ×îºóÒ»Ðб»ack¡£ÕâÐ©ÄØ¶¼ÊÇStorm¿É¿¿ÐÔAPIµÄÒ»²¿·Ö£¬ºóÃæ»á½âÊÍ¡£

cleanup·½·¨ÔÚbolt±»¹Ø±ÕµÄʱºòµ÷Óã¬ËüÓ¦¸ÃÇåÀíËùÓб»´ò¿ªµÄ×ÊÔ´¡£µ«ÊǼ¯Èº²»±£Ö¤Õâ¸ö·½·¨Ò»¶¨»á±»Ö´ÐС£±ÈÈçÖ´ÐÐtaskµÄ»úÆ÷downµôÁË£¬ÄÇô¸ù±¾¾ÍûÓа취À´µ÷ÓÃÄǸö·½·¨¡£cleanupÉè¼ÆµÄʱºòÊDZ»ÓÃÀ´ÔÚlocal modeµÄʱºò²Å±»µ÷ÓÃ(Ò²¾ÍÊÇ˵ÔÚÒ»¸ö½ø³ÌÀïÃæÄ£ÄâÕû¸östorm¼¯Èº), ²¢ÇÒÄãÏëÔڹرÕһЩtopologyµÄʱºò±ÜÃâ×ÊԴй©¡£

×îºó£¬declareOutputFields¶¨ÒåÒ»¸ö½Ð×ö¡±word¡±µÄ×ֶεÄtuple¡£

getComponentConfiguration ·½·¨ÔÊÐíÄãÅäÖÃ×é¼þ¸ÃÔõôÔËÐС£ÕâÊÇÒ»¸ö¸ü¸ß¼¶µÄÖ÷Ìâ,¸ü½øÒ»²½µÄ½âÊͼûÅäÖá£

cleanup ·½·¨ºÍgetComponentConfiguration ·½·¨ÔÚÒ»¸öboltʵÏÖÖо­³£²»ÊDZØÐëµÄ¡£Äã¿ÉÒÔͨ¹ýʵÏÖÒ»¸ö»ùÀà¸ü¼ò½àµÄ¶¨ÒåÒ»¸öbolt£¬´Ë»ùÀà»áÌṩÕâÁ½¸ö·½·¨µÄĬÈÏʵÏÖ¡£ËùÒÔExclamationBolt¿ÉÒÔͨ¹ýÀ©Õ¹BaseRichBolt´Ó¶ø±äµÃ¸ü¼ò½à,¾ÍÏñÕâÑù:

public static class ExclamationBolt extends BaseRichBolt {  
OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

ÒÔlocal modeÔËÐÐExclamationTopology

ÈÃÎÒÃÇ¿´¿´ÔõôÒÔlocal modeÔËÐÐExclamationToplogy¡£

stormµÄÔËÐÐÓÐÁ½ÖÖģʽ:±¾µØÄ£Ê½ºÍ·Ö²¼Ê½Ä£Ê½. ÔÚ±¾µØÄ£Ê½ÖУ¬ stormÓÃÒ»¸ö½ø³ÌÀïÃæµÄÏß³ÌÀ´Ä£ÄâËùÓеÄspoutºÍbolt. ±¾µØÄ£Ê½¶Ô¿ª·¢ºÍ²âÊÔÀ´Ëµ±È½ÏÓÐÓᣠÄãÔËÐÐstorm-starterÀïÃæµÄtopologyµÄʱºòËüÃǾÍÊÇÒÔ±¾µØÄ£Ê½ÔËÐеģ¬Äã¿ÉÒÔ¿´µ½topologyÀïÃæµÄÿһ¸ö×é¼þÔÚ·¢ÉäʲôÏûÏ¢¡£

ÔÚ·Ö²¼Ê½Ä£Ê½Ï£¬ stormÓÉÒ»¶Ñ»úÆ÷×é³É¡£µ±ÄãÌá½»topology¸ømasterµÄʱºò£¬ ÄãͬʱҲ°ÑtopologyµÄ´úÂëÌá½»ÁË¡£master¸ºÔð·Ö·¢ÄãµÄ´úÂë²¢ÇÒ¸ºÔð¸øÄãµÄtopolgoy·ÖÅ乤×÷½ø³Ì¡£Èç¹ûÒ»¸ö¹¤×÷½ø³Ì¹ÒµôÁË£¬ master½Úµã»á°ÑÈÏÎªÖØÐ·ÖÅäµ½ÆäËü½Úµã¡£¹ØÓÚÈçºÎÔÚÒ»¸ö¼¯ÈºÉÏÃæÔËÐÐtopology£¬ Äã¿ÉÒÔ¿´¿´Running topologies on a production clusterÎÄÕ¡£

ÏÂÃæÊÇÒÔ±¾µØÄ£Ê½ÔËÐÐExclamationTopologyµÄ´úÂë:

Config conf = new Config();  
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

Ê×ÏÈ£¬ Õâ¸ö´úÂ붨Òåͨ¹ý¶¨ÒåÒ»¸öLocalCluster¶ÔÏóÀ´¶¨ÒåÒ»¸ö½ø³ÌÄڵļ¯Èº¡£Ìá½»topology¸øÕâ¸öÐéÄâµÄ¼¯ÈººÍÌá½»topology¸ø·Ö²¼Ê½¼¯ ȺÊÇÒ»ÑùµÄ¡£Í¨¹ýµ÷ÓÃsubmitTopology·½·¨À´Ìá½»topology£¬ Ëü½ÓÊÜÈý¸ö²ÎÊý£ºÒªÔËÐеÄtopologyµÄÃû×Ö£¬Ò»¸öÅäÖöÔÏóÒÔ¼°ÒªÔËÐеÄtopology±¾Éí¡£

topologyµÄÃû×ÖÊÇÓÃÀ´Î¨Ò»Çø±ðÒ»¸ötopologyµÄ£¬ÕâÑùÄãÈ»ºó¿ÉÒÔÓÃÕâ¸öÃû×ÖÀ´É±ËÀÕâ¸ötopologyµÄ¡£Ç°ÃæÒѾ­Ëµ¹ýÁË£¬ Äã±ØÐëÏÔʽµÄɱµôÒ»¸ötopology£¬·ñÔòËü»áÒ»Ö±ÔËÐС£

Conf¶ÔÏó¿ÉÒÔÅäÖúܶණÎ÷£¬ ÏÂÃæÁ½¸öÊÇ×î³£¼ûµÄ£º

TOPOLOGY_WORKERS(setNumWorkers) ¶¨ÒåÄãÏ£Íû¼¯Èº·ÖÅä¶àÉÙ¸ö¹¤×÷½ø³Ì¸øÄãÀ´Ö´ÐÐÕâ¸ötopology. topologyÀïÃæµÄÿ¸ö×é¼þ»á±»ÐèÒªÏß³ÌÀ´Ö´ÐС£Ã¿¸ö×é¼þµ½µ×ÓöàÉÙ¸öÏß³ÌÊÇͨ¹ýsetBoltºÍsetSpoutÀ´Ö¸¶¨µÄ¡£ÕâЩÏ̶߳¼ÔËÐÐÔÚ¹¤×÷½ø ³ÌÀïÃæ. ÿһ¸ö¹¤×÷½ø³Ì°üº¬Ò»Ð©½ÚµãµÄһЩ¹¤×÷Ï̡߳£±ÈÈ磬 Èç¹ûÄãÖ¸¶¨300¸öỊ̈߳¬60¸ö½ø³Ì£¬ ÄÇôÿ¸ö¹¤×÷½ø³ÌÀïÃæÒªÖ´ÐÐ6¸öỊ̈߳¬ ¶øÕâ6¸öÏ߳̿ÉÄÜÊôÓÚ²»Í¬µÄ×é¼þ(Spout, Bolt)¡£Äã¿ÉÒÔͨ¹ýµ÷Õûÿ¸ö×é¼þµÄ²¢ÐжÈÒÔ¼°ÕâЩÏß³ÌËùÔڵĽø³ÌÊýÁ¿À´µ÷ÕûtopologyµÄÐÔÄÜ¡£

TOPOLOGY_DEBUG(setDebug), µ±Ëü±»ÉèÖóÉtrueµÄ»°£¬ storm»á¼Ç¼ÏÂÿ¸ö×é¼þËù·¢ÉäµÄÿÌõÏûÏ¢¡£ÕâÔÚ±¾µØ»·¾³µ÷ÊÔtopologyºÜÓÐÓ㬠µ«ÊÇÔÚÏßÉÏÕâô×öµÄ»°»áÓ°ÏìÐÔÄܵġ£

¸ÐÐËȤµÄ»°¿ÉÒÔÈ¥¿´¿´Conf¶ÔÏóµÄJavadocÈ¥¿´¿´topologyµÄËùÓÐÅäÖá£

¿ÉÒÔ¿´¿´´´½¨Ò»¸öÐÂstormÏîĿȥ¿´¿´ÔõôÅäÖÿª·¢»·¾³ÒÔʹÄãÄܹ»ÒÔ±¾µØÄ£Ê½ÔËÐÐtopology.

Á÷·Ö×é²ßÂÔ(Stream grouping)

Á÷·Ö×é²ßÂÔ¸æËßtopologyÈçºÎÔÚÁ½¸ö×é¼þÖ®¼ä·¢ËÍtuple¡£ Òª¼Çס£¬ spoutsºÍboltsÒԺܶàtaskµÄÐÎʽÔÚtopologyÀïÃæÍ¬²½Ö´ÐС£Èç¹û´ÓtaskµÄÁ£¶ÈÀ´¿´Ò»¸öÔËÐеÄtopology£¬ ËüÓ¦¸ÃÊÇÕâÑùµÄ:

´Ótask½Ç¶ÈÀ´¿´topology

µ±Bolt AµÄÒ»¸ötaskÒª·¢ËÍÒ»¸ötuple¸øBolt B£¬ ËüÓ¦¸Ã·¢Ë͸øBolt BµÄÄĸötaskÄØ£¿

stream groupingרÃŻشðÕâÖÖÎÊÌâµÄ¡£ÔÚÎÒÃÇÉîÈëÑо¿²»Í¬µÄstream grouping֮ǰ£¬ÈÃÎÒÃÇ¿´Ò»ÏÂstorm-starterÀïÃæµÄÁíÍâÒ»¸ötopology¡£WordCountTopology¶ÁȡһЩ¾ä×Ó£¬ Êä³ö¾ä×ÓÀïÃæÃ¿¸öµ¥´Ê³öÏֵĴÎÊý.

TopologyBuilder builder = new TopologyBuilder();  

builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));

SplitSentence¶ÔÓÚ¾ä×ÓÀïÃæµÄÿ¸öµ¥´Ê·¢ÉäÒ»¸öеÄtuple, WordCountÔÚÄÚ´æÀïÃæÎ¬»¤Ò»¸öµ¥´Ê->´ÎÊýµÄmapping£¬ WordCountÿÊÕµ½Ò»¸öµ¥´Ê£¬ Ëü¾Í¸üÐÂÄÚ´æÀïÃæµÄͳ¼Æ×´Ì¬¡£

Óкü¸ÖÖ²»Í¬µÄstream grouping:

×î¼òµ¥µÄgroupingÊÇshuffle grouping, ËüËæ»ú·¢¸øÈκÎÒ»¸ötask¡£ÉÏÃæÀý×ÓÀïÃæRandomSentenceSpoutºÍSplitSentenceÖ®¼äÓõľÍÊÇshuffle grouping, shuffle grouping¶Ô¸÷¸ötaskµÄtuple·ÖÅäµÄ±È½Ï¾ùÔÈ¡£

Ò»ÖÖ¸üÓÐȤµÄgroupingÊÇfields grouping, SplitSentenceºÍWordCountÖ®¼äʹÓõľÍÊÇfields grouping, ÕâÖÖgrouping»úÖÆ±£Ö¤ÏàͬfieldÖµµÄtuple»áȥͬһ¸ötask£¬ Õâ¶ÔÓÚWordCountÀ´Ëµ·Ç³£¹Ø¼ü£¬Èç¹ûͬһ¸öµ¥´Ê²»È¥Í¬Ò»¸ötask£¬ ÄÇôͳ¼Æ³öÀ´µÄµ¥´Ê´ÎÊý¾Í²»¶ÔÁË¡£

fields groupingÊÇstreamºÏ²¢£¬stream¾ÛºÏÒÔ¼°ºÜ¶àÆäËü³¡¾°µÄ»ù´¡¡£ÔÚ±³ºóÄØ£¬ fields groupingʹÓõÄÒ»ÖÂÐÔ¹þÏ£À´·ÖÅätupleµÄ¡£

»¹ÓÐһЩÆäËüÀàÐ͵Ästream grouping.Äã¿ÉÒÔÔÚConceptsÒ»ÕÂÀï¸üÏêϸµÄÁ˽⡣

ʹÓñðµÄÓïÑÔÀ´¶¨ÒåBolt

Bolt¿ÉÒÔʹÓÃÈκÎÓïÑÔÀ´¶¨Òå¡£ÓÃÆäËüÓïÑÔ¶¨ÒåµÄbolt»á±»µ±×÷×Ó½ø³Ì(subprocess)À´Ö´ÐУ¬ stormʹÓÃJSONÏûϢͨ¹ýstdin/stdoutÀ´ºÍÕâЩsubprocessͨÐÅ¡£Õâ¸öͨÐÅЭÒéÊÇÒ»¸öÖ»ÓÐ100ÐеĿ⣬ stormÍŶӸøÕâЩ¿â¿ª·¢Á˶ÔÓ¦µÄRuby, PythonºÍFancy°æ±¾¡£

ÏÂÃæÊÇWordCountTopologyÀïÃæµÄSplitSentenceµÄ¶¨Òå:

public static class SplitSentence extends ShellBolt implements IRichBolt {  
public SplitSentence() {
super("python", "splitsentence.py");
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

SplitSentence¼Ì³Ð×ÔShellBolt²¢ÇÒÉùÃ÷Õâ¸öBoltÓÃpythonÀ´ÔËÐУ¬²¢ÇÒ²ÎÊýÊÇ: splitsentence.py¡£ÏÂÃæÊÇsplitsentence.pyµÄ¶¨Òå:

import storm  

class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])

SplitSentenceBolt().run()

¸ü¶àÓйØÓÃÆäËüÓïÑÔ¶¨ÒåSpoutºÍBoltµÄÐÅÏ¢£¬ ÒÔ¼°ÓÃÆäËüÓïÑÔÀ´´´½¨topologyµÄ ÐÅÏ¢¿ÉÒԲμû:Using non-JVM languages with Storm.

¿É¿¿µÄÏûÏ¢´¦Àí

ÔÚÕâ¸ö½Ì³ÌµÄÇ°Ãæ£¬ÎÒÃÇÌø¹ýÁËÓйØtupleµÄÒ»Ð©ÌØÕ÷¡£ÕâÐ©ÌØÕ÷¾ÍÊÇstormµÄ¿É¿¿ÐÔAPI£º stormÈçºÎ±£Ö¤spout·¢³öµÄÿһ¸ötuple¶¼±»ÍêÕû´¦Àí¡£¿´¿´¡¶stormÈçºÎ±£Ö¤ÏûÏ¢²»¶ªÊ§¡·ÒÔ¸üÉîÈëÁ˽âstormµÄ¿É¿¿ÐÔAPI.

½áÂÛ

Õâ¸öÈëÃŽ̳̱ȽϹ㷺µÄ½éÉÜÁË´Ó¿ª·¢£¬²âÊԺͲ¿ÊðÒ»¸ötopology.ÎĵµµÄÆäËü²¿·Ö»áÉîÈë½éÉÜʹÓÃstormµÄ¸÷¸ö·½Ãæ¡£

   
5129 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ

²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí