±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚjdon£¬±¾ÎÄÖ÷Òª½éÉÜÁËStorm¡¢StormºÍHadoopµÄÇø±ð¡¢StormµÄÈÝ´íÐԺͿɿ¿ÐÔµÈÏà¹ØÄÚÈÝ¡£ |
|
ʲôÊÇStorm£¿
StormÊÇ£º
¿ìËÙÇÒ¿ÉÀ©Õ¹ÉìËõ
ÈÝ´í
È·±£ÏûÏ¢Äܹ»±»´¦Àí
Ò×ÓÚÉèÖúͲÙ×÷
¿ªÔ´µÄ·Ö²¼Ê½ÊµÊ±¼ÆËãϵͳ
- ×î³õÓÉNathan Marz¿ª·¢
- ʹÓÃJava ºÍ Clojure ±àд
StormºÍHadoopÖ÷񻂿±ðÊÇʵʱºÍÅú´¦ÀíµÄÇø±ð£º

Storm¸ÅÄî ×é³É£ºSpout ºÍBolt×é³ÉTopology¡£

TupleÊÇStormµÄÊý¾ÝÄ£ÐÍ£¬Èç['jdon',12346]
¶à¸öTuple×é³ÉʼþÁ÷£º

SpoutÊǶÁÈ¡ÐèÒª·ÖÎö´¦ÀíµÄÊý¾ÝÔ´£¬È»ºóתΪTuples£¬ÕâЩÊý¾ÝÔ´¿ÉÒÔÊÇWebÈÕÖ¾¡¢
APIµ÷Óá¢Êý¾Ý¿âµÈµÈ¡£SpoutÏ൱ÓÚʼþÁ÷µÄÉú²úÕß¡£
Bolt ´¦ÀíTuplesÈ»ºóÔÙ´´½¨ÐµÄTuplesÁ÷£¬BoltÏ൱ÓÚʼþÁ÷µÄÏû·ÑÕß¡£
Bolt ×÷ÎªÕæÕýÒµÎñ´¦ÀíÕߣ¬Ö÷ҪʵÏÖ´óÊý¾Ý´¦ÀíµÄºËÐŦÄÜ£¬±ÈÈçת»»Êý¾Ý£¬Ó¦ÓÃÏàÓ¦¹ýÂËÆ÷£¬¼ÆËãºÍ¾ÛºÏÊý¾Ý(±ÈÈçͳ¼Æ×ܺ͵ȵÈ)
¡£
ÒÔTwitterµÄij¸öTweetΪ°¸Àý£¬¿´¿´StormÈçºÎ´¦Àí£º

ÕâЩtweettÌùÄÚÈÝÊÇ£º¡°No Small Cell Lung #Cancer(ûÓÐСϸ°û·Î°©££°©Ö¢)¡±
"An #OnCology Consult...."
ÕâЩÌù±»Spout¶ÁÈ¡ÒԺ󣬲úÉúTuple£¬×Ö¶ÎÃûÊÇtweet£¬ÄÚÈÝÊÇ"No
Small Cell Lung #Cancer"£¬¸ñʽÀàËÆ£º['No Small Cell
Lung #Cancer',133221]¡£
È»ºó½øÈë±»Á÷ Ïû·ÑÕßBolt½øÐд¦Àí£¬µÚÒ»¸öBoltÊÇSplitSentence£¬½«tupleÄÚÈݽøÐзÖÀ룬½á¹û³ÉΪ£ºÒ»¸ö¸öµ¥´Ê£º"No"
"Small" "Cell" "Lung"
"#Cancer" £»È»ºó¾¹ýµÚ¶þ¸öBolt½øÐйýÂËHashTagFilter´¦Àí£¬Hash±êÇ©Êǵ¥´ÊÖÐÓÃ#±ê×¢µÄ£¬Ò²¾ÍÊÇCancer£»ÔÙ¾¹ýHasTagCount¼ÆÊý£¬¿ÉÒÔ±¾µØÄڴ滺´æÕâ¸ö¼ÆÊý½á¹û£¬×îºóͨ¹ýPrinterBolt´òÓ¡³ö±êÇ©µ¥´Êͳ¼Æ½á¹û
¡£
ÎÒÃÇʹÓÃStomËùÒª×öµÄ¾ÍÊDZàÖÆSpoutºÍBolt´úÂ룺
public class
RandomSentenceSpout extends BaseRichSpout {
¡¡¡¡SpoutOutputCollector collector;
¡¡¡¡Random random;
//¶ÁÈëÍⲿÊý¾Ý
¡¡¡¡public void open(Map conf, TopologyContext
context, SpoutOutputCollector collector) {
¡¡¡¡¡¡¡¡this.collector = collector;
¡¡¡¡¡¡¡¡random = new Random();
¡¡¡¡}
¡¡¡¡//²úÉúTuple
¡¡¡¡ public void nextTuple() {
¡¡¡¡¡¡¡¡String[] sentences = new String[] {
¡¡¡¡¡¡¡¡¡¡¡¡"No Small Cell Lung #Cancer",
¡¡¡¡¡¡¡¡¡¡¡¡"An #OnCology Consultant apple a
day keeps the doctor away",
¡¡¡¡¡¡¡¡¡¡¡¡"four score and seven years ago",
¡¡¡¡¡¡¡¡¡¡¡¡"snow white and the seven dwarfs",
¡¡¡¡¡¡¡¡¡¡¡¡"i am at two with nature"
¡¡¡¡¡¡¡¡};
¡¡¡¡¡¡¡¡String tweet = sentences[random.nextInt(sentences.length)];
¡¡¡¡¡¡¡¡//¶¨Òå×Ö¶ÎÃû"tweet" µÄÖµ
¡¡¡¡¡¡¡¡collector.emit(new Values(tweet));
¡¡}
¡¡¡¡// ¶¨Òå×Ö¶ÎÃû"tweet"
¡¡public void declareOutputFields(OutputFieldsDeclarer
declarer) {
¡¡¡¡¡¡¡¡declarer.declare(new Fields("tweet"));
¡¡¡¡}
¡¡¡¡@Override
¡¡¡¡public void ack(Object msgId) {}
¡¡¡¡@Override
¡¡¡¡public void fail(Object msgId) {}
} |
ÏÂÃæÊÇBoltµÄ´úÂë±àд£º
public class
SplitSentenceBolt extends BaseRichBolt {
¡¡¡¡OutputCollector collector;
¡¡¡¡@Override
¡¡¡¡public void prepare(Map stormConf, TopologyContext
context, OutputCollector collector) {
¡¡¡¡¡¡¡¡this.collector = collector;
¡¡¡¡}
¡¡¡¡@Override Ïû·ÑÕß¼¤»îÖ÷Òª·½·¨£º·ÖÀë³Éµ¥¸öµ¥´Ê
¡¡¡¡public void execute(Tuple input) {
¡¡¡¡¡¡¡¡for (String s : input.getString(0).split("\\s"))
{
¡¡¡¡¡¡¡¡¡¡¡¡collector.emit(new Values(s));
¡¡¡¡¡¡¡¡}
¡¡¡¡}
¡¡¡¡@Override ¶¨ÒåеÄ×Ö¶ÎÃû
¡¡¡¡public void declareOutputFields(OutputFieldsDeclarer
declarer) {
¡¡¡¡¡¡¡¡declarer.declare(new Fields("word"));
¡¡¡¡} |
×îºóÊÇ×°ÅäÔËÐÐSpoutºÍBoltµÄ¿Í»§¶Ëµ÷ÓôúÂ룺
public class
WordCountTopology {
¡¡¡¡public static void main(String[] args) throws
Exception {
¡¡¡¡¡¡¡¡TopologyBuilder builder = new TopologyBuilder();
¡¡¡¡¡¡¡¡builder.setSpout("tweet", new RandomSentenceSpout(),
2);
¡¡¡¡¡¡¡¡builder.setBolt("split", new SplitSentenceBolt(),
4)
¡¡¡¡¡¡¡¡¡¡¡¡.shuffleGrouping("tweet")
¡¡¡¡¡¡¡¡¡¡¡¡.setNumTasks(8);
¡¡¡¡¡¡¡¡builder.setBolt("count", new WordCountBolt(),
6)
¡¡¡¡¡¡¡¡¡¡¡¡.fieldsGrouping("split", new Fields("word"));
¡¡¡¡¡¡¡¡..ÉèÖöà¸öBolt
¡¡¡¡¡¡¡¡Config config = new Config();
¡¡¡¡¡¡¡¡config.setNumWorkers(4);
¡¡¡¡¡¡¡¡
¡¡¡¡¡¡¡¡StormSubmitter.submitTopology("wordcount",
config, builder.createTopology());
// Local testing
//LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("wordcount",
config, builder.createTopology());
//Thread.sleep(10000);
//cluster.shutdown();
}
} |
ÔÚÕâ¸ö´úÂëÖж¨ÒåÁËһЩ²ÎÊý±ÈÈçWorksµÄÊýÄ¿ÊÇ4£¬Æäº¬ÒåÔÚºóÃæÏêϸ·ÖÎö¡£
ÏÂÃæÎÒÃÇÒª½«ÉÏÃæÕâ¶Î´úÂë·¢²¼²¿Êðµ½StormÖУ¬Ê×ÏÈÁ˽âStormÎïÀí¼Ü¹¹Í¼£º

NimbusÊÇÒ»¸öÖ÷ºǫ́´¦ÀíÆ÷£¬Ö÷Òª¸ºÔð£º
1.·¢²¼·Ö·¢´úÂë
2.·ÖÅäÈÎÎñ
3.¼à¿ØÊ§°Ü¡£
SupervisorÊǸºÔðµ±Ç°Õâ¸ö½ÚµãµÄºǫ́¹¤×÷´¦ÀíÆ÷µÄ¼àÌý¡£
WorkÀàËÆJavaµÄỊ̈߳¬²ÉÈ¡JDKµÄExecutor ¡£
ÏÂÃæ¿ªÊ¼½«ÎÒÃǵĴúÂ벿Êðµ½Õâ¸öÍøÂçÍØÆËÖУº

½«´úÂëJar°üÉÏ´«µ½NimbusµÄinbox£¬°üÀ¨ËùÓеÄÒÀÀµ°ü£¬È»ºóÌá½»¡£
Nimbus½«±£´æÔÚ±¾µØÎļþϵͳ£¬È»ºó¿ªÊ¼ÅäÖÃÍøÂçÍØÆË£¬·ÖÅä¿ªÊ¼ÍØÆË¡£
¼ûÏÂͼ£º

Nimbus·þÎñÆ÷½«ÍØÆËJar ÅäÖúͽṹÏÂÔØµ½ Supervisor£¬¸ºÔØÆ½ºâZooKeeper·ÖÅäij¸öÌØ¶¨µÄSupervisor·þÎñÆ÷£¬¶øSupervisor¿ªÊ¼»ùÓÚÅäÖ÷ÖÅäWork£¬Workµ÷ÓÃJDKµÄExecutorÆô¶¯Ị̈߳¬¿ªÊ¼ÈÎÎñ´¦Àí¡£
ÏÂÃæÊÇÎÒÃÇ´úÂë¶ÔÍØÆË·ÖÅäµÄ²ÎÊýʾÒâͼ£º

ExecutorÆô¶¯µÄÏß³ÌÊýÄ¿ÊÇ12¸ö£¬×é¼þµÄʵÀýÊÇ16¸ö£¬ÄÇôÈçºÎÔÚʵ¼Ê·þÎñÆ÷ÖзÖÅäÄØ£¿ÈçÏÂͼ£º

ͼÖÐRsSpout´ú±íÎÒÃǵĴúÂëÖÐRandomSentenceSpout£»SplitSentenceBolt¼òдΪSSbolt£»
ÏÖÔÚ¿ªÊ¼·ÖÎöStormÄÚ²¿¼Ü¹¹£¬Ê×ÏÈ¿´¿´WorkÖ®¼äµÄÏûÏ¢´«µÝ£¬ÈçÏÂͼ£¬
WorkÖ®¼äµÄͨѶÊÇͨ¹ýZeroMQ£¬µ«ÊÇYahooºóÀ´·¢ÏÖʹÓÃÒì²½µÄNettyÄܹ»ÌáÉýStormÒ»±¶ÐÔÄÜ£¬Êý¾ÝʹÓÃKryo½øÐÐÐòÁл¯£¬±¾µØÍ¨Ñ¶Ê¹ÓÃLmaxµÄDisruptor
£¬ÄÚ²¿ÎÞÐèÐòÁл¯¡£

ÈÝ´íÐÔ
ÈçÏÂͼ£¬executor·¢ËÍÐÄÌøµ½Zookeeper£¬Supervisor´Ó±¾µØÎļþ¶ÁÈ¡ËùÔÚ·þÎñÆ÷µÄworkerÐÄÌø×´Ì¬£¬È»ºóͬ²½·ÖÅä·¢Ë͵½zooKeeper¡£Nimbus¼à¿Ø¼¯Èº×´Ì¬¡£ÕâÑùÄÜÈ·±£workerÒ»Ö±»î×Å¡£

Èç¹ûij¸ö½ÚµãÒ²¾ÍÊÇ·þÎñÆ÷ûÓÐÐÄÌø£¬Nimbus½«ÖØÐ·ÖÅäеķþÎñÆ÷ÉÏÏß¹¤×÷¡£
Èç¹ûij¸ö½Úµã·þÎñÆ÷ÖÐworkûÓÐÐÄÌø£¬ÄÇôSupervisor½«¸ºÔðÖØÆôÏ̡߳£
Èç¹ûij¸öSupervisorÍêµ°£¬Õû¸ö´¦ÀíÕý³££¬µ«ÊÇ·ÖÅäµÄͬ²½¹¤×÷¾ÍÎÞ·¨½øÐÐÁË¡£
Èç¹ûNimbus±ÀÀ££¬Õû¸öϵͳ¿ÉÒÔÔËÐУ¬µ«ÊÇÍØÆË·ÖÅ乤×÷ÎÞ·¨½øÐÐÁË¡£
¿É¿¿ÐÔ£ºÈ·±£ÏûÏ¢±»´¦Àí
public class
RandomSentenceSpout extends BaseRichSpout {
¡¡¡¡public void nextTuple() {
¡¡¡¡¡¡¡¡ UUID msgId = getMsgId();//ÓÃÏûÏ¢ID·¢ËÍÏûÏ¢
¡¡¡¡¡¡¡¡collector.emit(new Values(tweet), msgId);
¡¡¡¡}
¡¡¡¡public void ack(Object msgId) {
¡¡¡¡¡¡¡¡// Do something with acked message id. È·ÈÏÏûÏ¢ID
¡¡¡¡}
¡¡¡¡public void fail(Object msgId) {
¡¡¡¡¡¡¡¡ // Do something with failed message id. ÏûÏ¢IDʧ°ÜÁË
¡¡¡¡}
}
public class SplitSentenceBolt extends BaseRichBolt
{
¡¡¡¡public void execute(Tuple input) {
¡¡¡¡¡¡¡¡for (String s : input.getString(0).split("\\s"))
{
¡¡¡¡¡¡¡¡¡¡¡¡collector.emit(input, new Values(s));
¡¡¡¡¡¡¡¡}
¡¡¡¡¡¡¡¡//µ±Õû¸ö´ÊÓï¶¼±»Çзֺó£¬È·ÈÏÊäÈëµÄʼþÒѾ±»½ÓÊÜ´¦Àí¡£
¡¡¡¡¡¡¡¡collector.ack(input);
¡¡¡¡}
} |
ÏÂÃæÊÇÒ»¸öAckÈ·ÈÏÁ÷³Ì£¬×¢Òâµ½Acker Implicit bolt¡£

¶ÔÓÚÒ»¸öÊ÷ÐνṹTupleÁ÷£¬Ò²¾ÍÊÇTupleÀïÃæÇ¶Ì×Tuple¡£
Èç¹ûʼþ±»ÏÂÒ»¸ö½Úµã³É¹¦½ÓÊܺʹ¦Àí£¬Õâ¸ö½Úµã½«¸üÐÂÏàÓ¦³õʼʼþµÄÇ©Ãû£¬Í¨¹ýÒì»ò²Ù×÷£¬½«ÊäÈëʼþµÄIDºÍËùÓлùÓÚ¸ÃÊäÈëʼþ²úÉúµÄËùÓÐʼþµÄID½øÐÐÒì»ò²Ù×÷£¬ÈçÏÂͼ£¬Ê¼þ
01111 ²úÉú×Óʼþ 01100, 10010, ºÍ 00010, ÕâÑùʼþ 01111µÄÇ©ÃûÊÇ11100
(= 01111 (initial value) xor 01111 xor 01100 xor 10010
xor 00010).
µ±AckÖµ±ä³É0£¬Acker implicit bolt¾ÍÖªµÀtupleÊ÷ÐÎÊý¾Ý¼¯ºÏÈ«²¿±»´¦ÀíÍê³É£¬Ò»¸öÊÂÎñÈ·±£¿É¿¿½áÊø¡£ÀýÈçÓï¾ä·Ö³ÉÒ»¸ö¸öµ¥´ÊÈ«²¿Íê³É¡£

StormµÄ¼¯ÈºÉèÖÃ
ÉèÖÃZooKeeper cluster
(1)°²×°StormÒÀÀµµÄ¿â°üµ½·þÎñÆ÷ÉÏ:
- ZeroMQ 2.1.7 and JZMQ
- Java 6 and Python 2.6.6
- unzip
(2)ÏÂÔØ½âѹStorm¡£
ÌîÐ´Ç¿ÖÆÐÔÅäÖõ½storm.yaml
ÓÃstorm½Å±¾Æô¶¯ÊØ»¤Á÷³ÌµÄ¼à¶½
ͨ¹ýWeb½çÃæÄܹ»¹Û²ì¹ÜÀíÍØÆËÍøÂçÇé¿öºÍ×é¼þÇé¿ö¡£ |