Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Storm´óÊý¾Ýʵʱ´¦Àí
 
  2491  次浏览      27
 2019-2-20
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚjdon£¬±¾ÎÄÖ÷Òª½éÉÜÁËStorm¡¢StormºÍHadoopµÄÇø±ð¡¢StormµÄÈÝ´íÐԺͿɿ¿ÐÔµÈÏà¹ØÄÚÈÝ¡£

ʲôÊÇStorm£¿

StormÊÇ£º

¿ìËÙÇÒ¿ÉÀ©Õ¹ÉìËõ

ÈÝ´í

È·±£ÏûÏ¢Äܹ»±»´¦Àí

Ò×ÓÚÉèÖúͲÙ×÷

¿ªÔ´µÄ·Ö²¼Ê½ÊµÊ±¼ÆËãϵͳ

- ×î³õÓÉNathan Marz¿ª·¢

- ʹÓÃJava ºÍ Clojure ±àд

StormºÍHadoopÖ÷񻂿±ðÊÇʵʱºÍÅú´¦ÀíµÄÇø±ð£º

Storm¸ÅÄî ×é³É£ºSpout ºÍBolt×é³ÉTopology¡£

TupleÊÇStormµÄÊý¾ÝÄ£ÐÍ£¬Èç['jdon',12346]

¶à¸öTuple×é³ÉʼþÁ÷£º

SpoutÊǶÁÈ¡ÐèÒª·ÖÎö´¦ÀíµÄÊý¾ÝÔ´£¬È»ºóתΪTuples£¬ÕâЩÊý¾ÝÔ´¿ÉÒÔÊÇWebÈÕÖ¾¡¢ APIµ÷Óá¢Êý¾Ý¿âµÈµÈ¡£SpoutÏ൱ÓÚʼþÁ÷µÄÉú²úÕß¡£

Bolt ´¦ÀíTuplesÈ»ºóÔÙ´´½¨ÐµÄTuplesÁ÷£¬BoltÏ൱ÓÚʼþÁ÷µÄÏû·ÑÕß¡£

Bolt ×÷ÎªÕæÕýÒµÎñ´¦ÀíÕߣ¬Ö÷ҪʵÏÖ´óÊý¾Ý´¦ÀíµÄºËÐŦÄÜ£¬±ÈÈçת»»Êý¾Ý£¬Ó¦ÓÃÏàÓ¦¹ýÂËÆ÷£¬¼ÆËãºÍ¾ÛºÏÊý¾Ý(±ÈÈçͳ¼Æ×ܺ͵ȵÈ) ¡£

ÒÔTwitterµÄij¸öTweetΪ°¸Àý£¬¿´¿´StormÈçºÎ´¦Àí£º

ÕâЩtweettÌùÄÚÈÝÊÇ£º¡°No Small Cell Lung #Cancer(ûÓÐСϸ°û·Î°©££°©Ö¢)¡± "An #OnCology Consult...."

ÕâЩÌù±»Spout¶ÁÈ¡ÒԺ󣬲úÉúTuple£¬×Ö¶ÎÃûÊÇtweet£¬ÄÚÈÝÊÇ"No Small Cell Lung #Cancer"£¬¸ñʽÀàËÆ£º['No Small Cell Lung #Cancer',133221]¡£

È»ºó½øÈë±»Á÷ Ïû·ÑÕßBolt½øÐд¦Àí£¬µÚÒ»¸öBoltÊÇSplitSentence£¬½«tupleÄÚÈݽøÐзÖÀ룬½á¹û³ÉΪ£ºÒ»¸ö¸öµ¥´Ê£º"No" "Small" "Cell" "Lung" "#Cancer" £»È»ºó¾­¹ýµÚ¶þ¸öBolt½øÐйýÂËHashTagFilter´¦Àí£¬Hash±êÇ©Êǵ¥´ÊÖÐÓÃ#±ê×¢µÄ£¬Ò²¾ÍÊÇCancer£»ÔÙ¾­¹ýHasTagCount¼ÆÊý£¬¿ÉÒÔ±¾µØÄڴ滺´æÕâ¸ö¼ÆÊý½á¹û£¬×îºóͨ¹ýPrinterBolt´òÓ¡³ö±êÇ©µ¥´Êͳ¼Æ½á¹û ¡£

ÎÒÃÇʹÓÃStomËùÒª×öµÄ¾ÍÊDZàÖÆSpoutºÍBolt´úÂ룺

public class RandomSentenceSpout extends BaseRichSpout {
¡¡¡¡SpoutOutputCollector collector;
¡¡¡¡Random random;

//¶ÁÈëÍⲿÊý¾Ý
¡¡¡¡public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
¡¡¡¡¡¡¡¡this.collector = collector;
¡¡¡¡¡¡¡¡random = new Random();
¡¡¡¡}
¡¡¡¡//²úÉúTuple
¡¡¡¡ public void nextTuple() {
¡¡¡¡¡¡¡¡String[] sentences = new String[] {
¡¡¡¡¡¡¡¡¡¡¡¡"No Small Cell Lung #Cancer",
¡¡¡¡¡¡¡¡¡¡¡¡"An #OnCology Consultant apple a day keeps the doctor away",
¡¡¡¡¡¡¡¡¡¡¡¡"four score and seven years ago",
¡¡¡¡¡¡¡¡¡¡¡¡"snow white and the seven dwarfs",
¡¡¡¡¡¡¡¡¡¡¡¡"i am at two with nature"
¡¡¡¡¡¡¡¡};
¡¡¡¡¡¡¡¡String tweet = sentences[random.nextInt(sentences.length)];
¡¡¡¡¡¡¡¡//¶¨Òå×Ö¶ÎÃû"tweet" µÄÖµ
¡¡¡¡¡¡¡¡collector.emit(new Values(tweet));

¡¡}

¡¡¡¡// ¶¨Òå×Ö¶ÎÃû"tweet"

¡¡public void declareOutputFields(OutputFieldsDeclarer declarer) {
¡¡¡¡¡¡¡¡declarer.declare(new Fields("tweet"));
¡¡¡¡}
¡¡¡¡@Override
¡¡¡¡public void ack(Object msgId) {}
¡¡¡¡@Override
¡¡¡¡public void fail(Object msgId) {}
}

ÏÂÃæÊÇBoltµÄ´úÂë±àд£º

public class SplitSentenceBolt extends BaseRichBolt {
¡¡¡¡OutputCollector collector;

¡¡¡¡@Override
¡¡¡¡public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
¡¡¡¡¡¡¡¡this.collector = collector;
¡¡¡¡}
¡¡¡¡@Override Ïû·ÑÕß¼¤»îÖ÷Òª·½·¨£º·ÖÀë³Éµ¥¸öµ¥´Ê
¡¡¡¡public void execute(Tuple input) {
¡¡¡¡¡¡¡¡for (String s : input.getString(0).split("\\s")) {
¡¡¡¡¡¡¡¡¡¡¡¡collector.emit(new Values(s));
¡¡¡¡¡¡¡¡}
¡¡¡¡}
¡¡¡¡@Override ¶¨ÒåеÄ×Ö¶ÎÃû
¡¡¡¡public void declareOutputFields(OutputFieldsDeclarer declarer) {
¡¡¡¡¡¡¡¡declarer.declare(new Fields("word"));
¡¡¡¡}

×îºóÊÇ×°ÅäÔËÐÐSpoutºÍBoltµÄ¿Í»§¶Ëµ÷ÓôúÂ룺

public class WordCountTopology {
¡¡¡¡public static void main(String[] args) throws Exception {
¡¡¡¡¡¡¡¡TopologyBuilder builder = new TopologyBuilder();
¡¡¡¡¡¡¡¡builder.setSpout("tweet", new RandomSentenceSpout(), 2);
¡¡¡¡¡¡¡¡builder.setBolt("split", new SplitSentenceBolt(), 4)
¡¡¡¡¡¡¡¡¡¡¡¡.shuffleGrouping("tweet")
¡¡¡¡¡¡¡¡¡¡¡¡.setNumTasks(8);
¡¡¡¡¡¡¡¡builder.setBolt("count", new WordCountBolt(), 6)
¡¡¡¡¡¡¡¡¡¡¡¡.fieldsGrouping("split", new Fields("word"));
¡¡¡¡¡¡¡¡..ÉèÖöà¸öBolt

¡¡¡¡¡¡¡¡Config config = new Config();
¡¡¡¡¡¡¡¡config.setNumWorkers(4);
¡¡¡¡¡¡¡¡
¡¡¡¡¡¡¡¡StormSubmitter.submitTopology("wordcount", config, builder.createTopology());
// Local testing
//LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("wordcount", config, builder.createTopology());
//Thread.sleep(10000);
//cluster.shutdown();
}
}

ÔÚÕâ¸ö´úÂëÖж¨ÒåÁËһЩ²ÎÊý±ÈÈçWorksµÄÊýÄ¿ÊÇ4£¬Æäº¬ÒåÔÚºóÃæÏêϸ·ÖÎö¡£

ÏÂÃæÎÒÃÇÒª½«ÉÏÃæÕâ¶Î´úÂë·¢²¼²¿Êðµ½StormÖУ¬Ê×ÏÈÁ˽âStormÎïÀí¼Ü¹¹Í¼£º

NimbusÊÇÒ»¸öÖ÷ºǫ́´¦ÀíÆ÷£¬Ö÷Òª¸ºÔð£º

1.·¢²¼·Ö·¢´úÂë

2.·ÖÅäÈÎÎñ

3.¼à¿ØÊ§°Ü¡£

SupervisorÊǸºÔðµ±Ç°Õâ¸ö½ÚµãµÄºǫ́¹¤×÷´¦ÀíÆ÷µÄ¼àÌý¡£

WorkÀàËÆJavaµÄỊ̈߳¬²ÉÈ¡JDKµÄExecutor ¡£

ÏÂÃæ¿ªÊ¼½«ÎÒÃǵĴúÂ벿Êðµ½Õâ¸öÍøÂçÍØÆËÖУº

½«´úÂëJar°üÉÏ´«µ½NimbusµÄinbox£¬°üÀ¨ËùÓеÄÒÀÀµ°ü£¬È»ºóÌá½»¡£

Nimbus½«±£´æÔÚ±¾µØÎļþϵͳ£¬È»ºó¿ªÊ¼ÅäÖÃÍøÂçÍØÆË£¬·ÖÅä¿ªÊ¼ÍØÆË¡£

¼ûÏÂͼ£º

Nimbus·þÎñÆ÷½«ÍØÆËJar ÅäÖúͽṹÏÂÔØµ½ Supervisor£¬¸ºÔØÆ½ºâZooKeeper·ÖÅäij¸öÌØ¶¨µÄSupervisor·þÎñÆ÷£¬¶øSupervisor¿ªÊ¼»ùÓÚÅäÖ÷ÖÅäWork£¬Workµ÷ÓÃJDKµÄExecutorÆô¶¯Ị̈߳¬¿ªÊ¼ÈÎÎñ´¦Àí¡£

ÏÂÃæÊÇÎÒÃÇ´úÂë¶ÔÍØÆË·ÖÅäµÄ²ÎÊýʾÒâͼ£º

ExecutorÆô¶¯µÄÏß³ÌÊýÄ¿ÊÇ12¸ö£¬×é¼þµÄʵÀýÊÇ16¸ö£¬ÄÇôÈçºÎÔÚʵ¼Ê·þÎñÆ÷ÖзÖÅäÄØ£¿ÈçÏÂͼ£º

ͼÖÐRsSpout´ú±íÎÒÃǵĴúÂëÖÐRandomSentenceSpout£»SplitSentenceBolt¼òдΪSSbolt£»

ÏÖÔÚ¿ªÊ¼·ÖÎöStormÄÚ²¿¼Ü¹¹£¬Ê×ÏÈ¿´¿´WorkÖ®¼äµÄÏûÏ¢´«µÝ£¬ÈçÏÂͼ£¬

WorkÖ®¼äµÄͨѶÊÇͨ¹ýZeroMQ£¬µ«ÊÇYahooºóÀ´·¢ÏÖʹÓÃÒì²½µÄNettyÄܹ»ÌáÉýStormÒ»±¶ÐÔÄÜ£¬Êý¾ÝʹÓÃKryo½øÐÐÐòÁл¯£¬±¾µØÍ¨Ñ¶Ê¹ÓÃLmaxµÄDisruptor £¬ÄÚ²¿ÎÞÐèÐòÁл¯¡£

ÈÝ´íÐÔ

ÈçÏÂͼ£¬executor·¢ËÍÐÄÌøµ½Zookeeper£¬Supervisor´Ó±¾µØÎļþ¶ÁÈ¡ËùÔÚ·þÎñÆ÷µÄworkerÐÄÌø×´Ì¬£¬È»ºóͬ²½·ÖÅä·¢Ë͵½zooKeeper¡£Nimbus¼à¿Ø¼¯Èº×´Ì¬¡£ÕâÑùÄÜÈ·±£workerÒ»Ö±»î×Å¡£

Èç¹ûij¸ö½ÚµãÒ²¾ÍÊÇ·þÎñÆ÷ûÓÐÐÄÌø£¬Nimbus½«ÖØÐ·ÖÅäеķþÎñÆ÷ÉÏÏß¹¤×÷¡£

Èç¹ûij¸ö½Úµã·þÎñÆ÷ÖÐworkûÓÐÐÄÌø£¬ÄÇôSupervisor½«¸ºÔðÖØÆôÏ̡߳£

Èç¹ûij¸öSupervisorÍêµ°£¬Õû¸ö´¦ÀíÕý³££¬µ«ÊÇ·ÖÅäµÄͬ²½¹¤×÷¾ÍÎÞ·¨½øÐÐÁË¡£

Èç¹ûNimbus±ÀÀ££¬Õû¸öϵͳ¿ÉÒÔÔËÐУ¬µ«ÊÇÍØÆË·ÖÅ乤×÷ÎÞ·¨½øÐÐÁË¡£

¿É¿¿ÐÔ£ºÈ·±£ÏûÏ¢±»´¦Àí

public class RandomSentenceSpout extends BaseRichSpout {
¡¡¡¡public void nextTuple() {
¡¡¡¡¡¡¡¡ UUID msgId = getMsgId();//ÓÃÏûÏ¢ID·¢ËÍÏûÏ¢
¡¡¡¡¡¡¡¡collector.emit(new Values(tweet), msgId);
¡¡¡¡}
¡¡¡¡public void ack(Object msgId) {
¡¡¡¡¡¡¡¡// Do something with acked message id. È·ÈÏÏûÏ¢ID
¡¡¡¡}
¡¡¡¡public void fail(Object msgId) {
¡¡¡¡¡¡¡¡ // Do something with failed message id. ÏûÏ¢IDʧ°ÜÁË
¡¡¡¡}
}

public class SplitSentenceBolt extends BaseRichBolt {
¡¡¡¡public void execute(Tuple input) {
¡¡¡¡¡¡¡¡for (String s : input.getString(0).split("\\s")) {
¡¡¡¡¡¡¡¡¡¡¡¡collector.emit(input, new Values(s));
¡¡¡¡¡¡¡¡}
¡¡¡¡¡¡¡¡//µ±Õû¸ö´ÊÓï¶¼±»Çзֺó£¬È·ÈÏÊäÈëµÄʼþÒѾ­±»½ÓÊÜ´¦Àí¡£
¡¡¡¡¡¡¡¡collector.ack(input);
¡¡¡¡}
}

ÏÂÃæÊÇÒ»¸öAckÈ·ÈÏÁ÷³Ì£¬×¢Òâµ½Acker Implicit bolt¡£

¶ÔÓÚÒ»¸öÊ÷ÐνṹTupleÁ÷£¬Ò²¾ÍÊÇTupleÀïÃæÇ¶Ì×Tuple¡£

Èç¹ûʼþ±»ÏÂÒ»¸ö½Úµã³É¹¦½ÓÊܺʹ¦Àí£¬Õâ¸ö½Úµã½«¸üÐÂÏàÓ¦³õʼʼþµÄÇ©Ãû£¬Í¨¹ýÒì»ò²Ù×÷£¬½«ÊäÈëʼþµÄIDºÍËùÓлùÓÚ¸ÃÊäÈëʼþ²úÉúµÄËùÓÐʼþµÄID½øÐÐÒì»ò²Ù×÷£¬ÈçÏÂͼ£¬Ê¼þ 01111 ²úÉú×Óʼþ 01100, 10010, ºÍ 00010, ÕâÑùʼþ 01111µÄÇ©ÃûÊÇ11100 (= 01111 (initial value) xor 01111 xor 01100 xor 10010 xor 00010).

µ±AckÖµ±ä³É0£¬Acker implicit bolt¾ÍÖªµÀtupleÊ÷ÐÎÊý¾Ý¼¯ºÏÈ«²¿±»´¦ÀíÍê³É£¬Ò»¸öÊÂÎñÈ·±£¿É¿¿½áÊø¡£ÀýÈçÓï¾ä·Ö³ÉÒ»¸ö¸öµ¥´ÊÈ«²¿Íê³É¡£

StormµÄ¼¯ÈºÉèÖÃ

ÉèÖÃZooKeeper cluster

(1)°²×°StormÒÀÀµµÄ¿â°üµ½·þÎñÆ÷ÉÏ:

- ZeroMQ 2.1.7 and JZMQ

- Java 6 and Python 2.6.6

- unzip

(2)ÏÂÔØ½âѹStorm¡£

ÌîÐ´Ç¿ÖÆÐÔÅäÖõ½storm.yaml

ÓÃstorm½Å±¾Æô¶¯ÊØ»¤Á÷³ÌµÄ¼à¶½

ͨ¹ýWeb½çÃæÄܹ»¹Û²ì¹ÜÀíÍØÆËÍøÂçÇé¿öºÍ×é¼þÇé¿ö¡£

   
2491 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ