Storm ÊÇÒ»¸ö·Ö²¼Ê½µÄʵʱ¼ÆËã¿ò¼Ü£¬¿ÉÒԺܷ½±ãµØ¶ÔÁ÷ʽÊý¾Ý½øÐÐʵʱ´¦ÀíºÍ·ÖÎö£¬ÄÜÔËÓÃÔÚʵʱ·ÖÎö¡¢ÔÚÏßÊý¾ÝÍÚ¾ò¡¢³ÖÐø¼ÆËãÒÔ¼°·Ö²¼Ê½
RPC µÈ³¡¾°Ï¡£Storm µÄʵʱÐÔ¿ÉÒÔʹµÃÊý¾Ý´ÓÊÕ¼¯µ½´¦ÀíչʾÔÚÃë¼¶±ðÄÚÍê³É£¬´Ó¶øÎªÒµÎñ·½¾ö²ßÌṩʵʱµÄÊý¾ÝÖ§³Ö¡£
ÔÚÃÀÍŵãÆÀ¹«Ë¾ÄÚ²¿£¬ÊµÊ±¼ÆËãÖ÷ÒªÓ¦Óó¡¾°°üÀ¨ÊµÊ±ÈÕÖ¾½âÎö¡¢Óû§ÐÐΪ·ÖÎö¡¢ÊµÊ±ÏûÏ¢ÍÆËÍ¡¢Ïû·ÑÇ÷ÊÆÕ¹Ê¾¡¢ÊµÊ±Ð¿ÍÅжϡ¢ÊµÊ±»îÔ¾Óû§Êýͳ¼ÆµÈ¡£ÕâЩÊý¾ÝÌṩ¸ø¸÷ÊÂҵȺ£¬²¢×÷ΪËûÃÇʵʱ¾ö²ßµÄÓÐÁ¦ÒÀ¾Ý£¬ÃÖ²¹ÁËÀëÏß¼ÆËã¡°T+1¡±µÄ²»×ã¡£
ÔÚʵʱ¼ÆËãÖУ¬Óû§²»½ö½ö¹ØÐÄʱЧÐÔµÄÎÊÌ⣬ͬʱҲ¹ØÐÄÏûÏ¢´¦ÀíµÄ³É¹¦ÂÊ¡£±¾ÎĽ«Í¨¹ýʵÑéÑéÖ¤ Storm
µÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤»úÖÆ£¬ÎÄÕ·ÖΪÏûÏ¢±£Ö¤»úÖÆ¡¢²âÊÔÄ¿µÄ¡¢²âÊÔ»·¾³¡¢²âÊÔ³¡¾°ÒÔ¼°×ܽáµÈÎå½Ú¡£
Storm µÄÏûÏ¢±£Ö¤»úÖÆ
Storm ÌṩÁËÈýÖÖ²»Í¬²ã´ÎµÄÏûÏ¢±£Ö¤»úÖÆ£¬·Ö±ðÊÇ At Most Once¡¢At Least Once
ÒÔ¼° Exactly Once¡£ÏûÏ¢±£Ö¤»úÖÆÒÀÀµÓÚÏûÏ¢ÊÇ·ñ±»ÍêÈ«´¦Àí¡£
ÏûÏ¢ÍêÈ«´¦Àí
ÿ¸ö´Ó Spout£¨Storm ÖÐÊý¾ÝÔ´½Úµã£©·¢³öµÄ Tuple£¨Storm ÖеÄ×îСÏûÏ¢µ¥Ôª£©¿ÉÄÜ»áÉú³É³ÉǧÉÏÍò¸öеÄ
Tuple£¬ÐγÉÒ»¿Ã Tuple Ê÷£¬µ±Õû¿Ã Tuple Ê÷µÄ½Úµã¶¼±»³É¹¦´¦ÀíÁË£¬ÎÒÃǾÍ˵´Ó Spout
·¢³öµÄ Tuple ±»ÍêÈ«´¦ÀíÁË¡£ ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÀý×ÓÀ´¸üºÃµØÚ¹ÊÍÏûÏ¢±»ÍêÈ«´¦ÀíÕâ¸ö¸ÅÄ
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20) .fieldsGrouping("split", new Fields("word")); |
Õâ¸ö Topology ´Ó Kafka£¨Ò»¸ö¿ªÔ´µÄ·Ö²¼Ê½ÏûÏ¢¶ÓÁУ©¶ÁÈ¡ÐÅÏ¢·¢ÍùÏÂÓΣ¬ÏÂÓ뵀 Bolt ½«ÊÕµ½µÄ¾ä×Ó·Ö¸î³Éµ¥¶ÀµÄµ¥´Ê£¬²¢½øÐмÆÊý¡£Ã¿Ò»¸ö´Ó
Spout ·¢ËͳöÀ´µÄ Tuple »áÑÜÉú³ö¶à¸öÐ嵀 Tuple£¬´Ó Spout ·¢ËͳöÀ´µÄ Tuple
ÒÔ¼°ºóÐøÑÜÉú³öÀ´µÄ Tuple ÐγÉÒ»¿Ã Tuple Ê÷£¬ÏÂͼÊÇÒ»¿Ã Tuple Ê÷ʾÀý£º

ÉÏͼÖÐËùÓÐµÄ Tuple ¶¼±»³É¹¦´¦ÀíÁË£¬ÎÒÃDzÅÈÏΪ Spout ·¢³öµÄ Tuple ±»ÍêÈ«´¦Àí¡£Èç¹ûÔÚÒ»¸ö¹Ì¶¨µÄʱ¼äÄÚ£¨Õâ¸öʱ¼ä¿ÉÒÔÅäÖã¬Ä¬ÈÏΪ
30 Ã룩£¬ÓÐÖÁÉÙÒ»¸ö Tuple ´¦Àíʧ°Ü»ò³¬Ê±£¬ÔòÈÏΪÕû¿Ã Tuple Ê÷´¦Àíʧ°Ü£¬¼´´Ó Spout
·¢³öµÄ Tuple ´¦Àíʧ°Ü¡£
ÈçºÎʵÏÖ²»Í¬²ã´ÎµÄÏûÏ¢±£Ö¤»úÖÆ

Tuple µÄÍêÈ«´¦ÀíÐèÒª Spout¡¢Bolt ÒÔ¼° Acker£¨Storm ÖÐÓÃÀ´¼Ç¼ij¿Ã Tuple
Ê÷ÊÇ·ñ±»ÍêÈ«´¦ÀíµÄ½Úµã£©ÐͬÍê³É£¬ÈçÉÏͼËùʾ¡£´Ó Spout ·¢ËÍ Tuple µ½ÏÂÓΣ¬²¢°ÑÏàÓ¦ÐÅϢ֪ͨ¸ø
Acker£¬Õû¿Ã Tuple Ê÷ÖÐij¸ö Tuple ±»³É¹¦´¦ÀíÁ˶¼»á֪ͨ Acker£¬´ýÕû¿Ã Tuple
Ê÷¶¼±»´¦ÀíÍê³ÉÖ®ºó£¬Acker ½«³É¹¦´¦ÀíÐÅÏ¢·µ»Ø¸ø Spout£»Èç¹ûij¸ö Tuple ´¦Àíʧ°Ü£¬»òÕß³¬Ê±£¬Acker
½«»á¸ø Spout ·¢ËÍÒ»¸ö´¦Àíʧ°ÜµÄÏûÏ¢£¬Spout ¸ù¾Ý Acker µÄ·µ»ØÐÅÏ¢ÒÔ¼°Óû§¶ÔÏûÏ¢±£Ö¤»úÖÆµÄÑ¡ÔñÅжÏÊÇ·ñÐèÒª½øÐÐÏûÏ¢ÖØ´«¡£
Storm ÌṩµÄÈýÖÖ²»Í¬ÏûÏ¢±£Ö¤»úÖÆÖС£ÀûÓà Spout¡¢Bolt ÒÔ¼° Acker µÄ×éºÏÎÒÃÇ¿ÉÒÔʵÏÖ
At Most Once ÒÔ¼° At Least Once ÓïÒ壬Storm ÔÚ At Least Once
µÄ»ù´¡ÉϽøÐÐÁËÒ»´Î·â×°£¨Trident£©£¬´Ó¶øÊµÏÖ Exactly Once ÓïÒå¡£
Storm µÄÏûÏ¢±£Ö¤»úÖÆÖУ¬Èç¹ûÐèҪʵÏÖ At Most Once ÓïÒ壬ֻÐèÒªÂú×ãÏÂÃæÈκÎÒ»Ìõ¼´¿É£º
¹Ø±Õ ACK »úÖÆ£¬¼´ Acker ÊýÄ¿ÉèÖÃΪ 0
Spout ²»ÊµÏÖ¿É¿¿ÐÔ´«Êä
Spout ·¢ËÍÏûÏ¢ÊÇʹÓò»´ø message ID µÄ API
²»ÊµÏÖ fail º¯Êý
Bolt ²»°Ñ´¦Àí³É¹¦»òʧ°ÜµÄÏûÏ¢·¢Ë͸ø Acker
Èç¹ûÐèҪʵÏÖ At Least Once ÓïÒ壬ÔòÐèҪͬʱ±£Ö¤Èçϼ¸Ìõ£º
¿ªÆô ACK »úÖÆ£¬¼´ Acker ÊýÄ¿´óÓÚ 0
Spout ʵÏÖ¿É¿¿ÐÔ´«Êä±£Ö¤
Spout ·¢ËÍÏûϢʱ¸½´ø message µÄ ID
Èç¹ûÊÕµ½ Acker µÄ´¦Àíʧ°Ü·´À¡£¬ÐèÒª½øÐÐÏûÏ¢ÖØ´«£¬¼´ÊµÏÖ fail º¯Êý
Bolt ÔÚ´¦Àí³É¹¦»òʧ°ÜºóÐèÒªµ÷ÓÃÏàÓ¦µÄ·½·¨Í¨Öª Acker
ʵÏÖ Exactly Once ÓïÒ壬ÔòÐèÒªÔÚ At Least Once µÄ»ù´¡ÉϽøÐÐ״̬µÄ´æ´¢£¬ÓÃÀ´·ÀÖ¹ÖØ¸´·¢Ë͵ÄÊý¾Ý±»Öظ´´¦Àí£¬ÔÚ
Storm ÖÐʹÓà Trident API ʵÏÖ¡£
ÏÂͼÖУ¬Ã¿ÖÖÏûÏ¢±£Ö¤»úÖÆÖÐ×ó±ßµÄ×Öĸ±íʾÉÏÓη¢Ë͵ÄÏûÏ¢£¬ÓұߵÄ×Öĸ±íʾÏÂÓνÓÊÕµ½µÄÏûÏ¢¡£´ÓͼÖпÉÒÔÖªµÀ£¬At
Most Once ÖУ¬ÏûÏ¢¿ÉÄܻᶪʧ£¨ÉÏÓη¢ËÍÁËÁ½¸ö A£¬ÏÂÓÎÖ»ÊÕµ½Ò»¸ö A£©£»At Least Once
ÖУ¬ÏûÏ¢²»»á¶ªÊ§£¬¿ÉÄÜÖØ¸´£¨ÉÏÓÎÖ»·¢ËÍÁËÒ»¸ö B £¬ÏÂÓÎÊÕµ½Á½¸ö B£©£»Exactly Once ÖУ¬ÏûÏ¢²»¶ªÊ§¡¢²»Öظ´£¬Òò´ËÐèÒªÔÚ
At Least Once µÄ»ù´¡Éϱ£´æÏàÓ¦µÄ״̬£¬±íʾÉÏÓεÄÄÄЩÏûÏ¢ÒѾ³É¹¦·¢Ë͵½ÏÂÓΣ¬·ÀֹͬһÌõÏûÏ¢·¢ËͶà´Î¸øÏÂÓεÄÇé¿ö¡£

²âÊÔÄ¿µÄ
Storm ¹Ù·½Ìṩ At Most Once¡¢At Least Once ÒÔ¼° Exactly Once
ÈýÖÖ²»Í¬²ã´ÎµÄÏûÏ¢±£Ö¤»úÖÆ£¬ÎÒÃÇÏ£Íûͨ¹ýÏà¹Ø²âÊÔ£¬´ïµ½ÈçÏÂÄ¿µÄ£º
ÈýÖÖÏûÏ¢±£Ö¤»úÖÆµÄ±íÏÖ£¬ÊÇ·ñÓë¹Ù·½µÄÃèÊöÏà·û£»
At Most Once ÓïÒåÏ£¬ÏûÏ¢µÄ¶ªÊ§ÂʺÍʲôÓйØÏµ¡¢¹ØÏµÈçºÎ£»
At Least Once ÓïÒåÏ£¬ÏûÏ¢µÄÖØ¸´ÂʺÍʲôÓйØÏµ¡¢¹ØÏµÈçºÎ¡£
²âÊÔ»·¾³
±¾ÎĵIJâÊÔ»·¾³ÈçÏÂ: ÿ¸ö worker£¨worker Ϊһ¸ö ÎïÀí JVM ½ø³Ì£¬ÓÃÓÚÔËÐÐʵ¼ÊµÄ Storm
×÷Òµ£©·ÖÅä 1 CPU ÒÔ¼° 1.6G ÄÚ´æ¡£Spout¡¢Bolt¡¢Acker ·Ö±ðÅÜÔÚµ¥¶ÀµÄ worker
ÉÏ¡£²¢Í¨¹ýÔÚ³ÌÐòÖпØÖÆÅ׳öÒì³£ÒÔ¼°È˹¤ Kill Spout/Bolt/Acker µÄ·½Ê½À´Ä£Äâʵ¼ÊÇé¿öÖеÄÒì³£Çé¿ö¡£
ÈýÖÖÏûÏ¢±£Ö¤»úÖÆµÄ²âÊÔ¾ùÓÉ Spout ´Ó Kafka ¶ÁÈ¡²âÊÔÊý¾Ý£¬¾ÓÉÏàÓ¦ Bolt ½øÐд¦Àí£¬È»ºó·¢Ë͵½
Kafka£¬²¢½« Kafka ÉϵÄÊý¾Ýͬ²½µ½ MySQL ·½±ã×îÖÕ½á¹ûµÄͳ¼Æ£¬ÈçÏÂͼËùʾ£º

²âÊÔÊý¾ÝΪ Kafka ÉÏ˳Ðò±£´æµÄһϵÁд¿Êý×Ö£¬Êý¾ÝÁ¿·Ö±ðÓÐÊ®Íò¡¢ÎåÊ®Íò¡¢Ò»°ÙÍòµÈ£¬Ã¿¸öÊý×ÖÔÚÿ¸ö²âÊÔÑùÀýÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£
²âÊÔ³¡¾°
¶ÔÓÚÈýÖÖ²»Í¬µÄÏûÏ¢±£Ö¤»úÖÆ£¬ÎÒÃÇ·Ö±ðÉèÖÃÁ˲»Í¬µÄ²âÊÔ³¡¾°£¬À´½øÐгä·ÖµÄ²âÊÔ¡£ÆäÖÐΪÁ˱£Ö¤ Spout/Bolt/Acker
·¢ÉúÒì³£µÄÇé¿öϲ»Ó°ÏìÆäËû½Úµã£¬ÔÚÏÂÃæµÄ²âÊÔÖУ¬ËùÓеĽڵ㵥¶ÀÔËÐÐÔÚ¶ÀÁ¢µÄ Worker ÉÏ¡£
At Most Once
´Ó±³¾°ÖпÉÒÔµÃÖª£¬Èç¹ûÏ£ÍûʵÏÖ At Most Once ÓïÒ壬½« Acker µÄÊýÄ¿ÉèÖÃΪ 0 ¼´¿É£¬±¾ÎĵIJâÊÔ¹ý³ÌÖÐͨ¹ý°ÑÉèÖÃ
Acker Ϊ 0 À´½øÐÐ At Most Once µÄ²âÊÔ¡£
ÊäÈëÊý¾Ý
±£´æÔÚ Kafka ÉϵÄһϵÁд¿Êý×Ö£¬Êý¾ÝÁ¿´ÓÊ®Íòµ½Îå°ÙÍò²»µÈ£¬Ã¿¸ö²âÊÔÑùÀýÖУ¬Í¬Ò»¸öÊý×ÖÔÚ Kafka
ÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£
²âÊÔ½á¹û

½áÂÛ
²»·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢Äܹ»²»¶ª²»ÖØ£»Bolt ·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢»á¶ªÊ§£¬²»»áÖØ¸´£¬ÆäÖÐÏûÏ¢µÄ ¶ªÊ§ÊýÄ¿
Óë Òì³£´ÎÊýÕýÏà¹Ø ¡£Óë¹Ù·½ÎĵµÃèÊöÏà·û£¬·ûºÏÔ¤ÆÚ¡£
At Least Once
ΪÁËʵÏÖ At Least Once ÓïÒ壬ÐèÒª Spout¡¢Bolt¡¢Acker ½øÐÐÅäºÏ¡£ÎÒÃÇʹÓÃ
Kafka-Spout ²¢Í¨¹ý×Ô¼º¹ÜÀí offset µÄ·½Ê½À´ÊµÏÖ¿É¿¿µÄ Spout£»Bolt ͨ¹ý¼Ì³Ð
BaseBasicBolt£¬×Ô¶¯°ïÎÒÃǽ¨Á¢ Tuple Ê÷ÒÔ¼°ÏûÏ¢´¦ÀíÖ®ºó֪ͨ Acker£»½« Acker
µÄÊýÄ¿ÉèÖÃΪ 1£¬¼´´ò¿ª ACK »úÖÆ£¬ÕâÑùÕû¸ö Topology ¼´¿ÉÌṩ At Least Once
µÄÓïÒå¡£
²âÊÔÊý¾Ý
Kafka Éϱ£´æµÄÊ®Íòµ½ÎåÊ®Íò²»µÈµÄ´¿Êý×Ö£¬ÆäÖÐÿ¸ö²âÊÔÑùÀýÖУ¬Ã¿¸öÊý×ÖÔÚ Kafka ÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£
²âÊÔ½á¹û
Acker ·¢ÉúÒì³£µÄÇé¿ö




½áÂÛ
´ÓÉÏÃæµÄ±í¸ñÖпÉÒԵõ½£¬ÏûÏ¢²»»á¶ªÊ§£¬¿ÉÄÜ·¢ÉúÖØ¸´£¬Öظ´µÄÊýÄ¿ÓëÒì³£µÄÇé¿öÏà¹Ø¡£
²»·¢ÉúÈκÎÒì³£µÄÇé¿öÏ£¬ÏûÏ¢²»»áÖØ¸´²»»á¶ªÊ§¡£
Spout ·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢µÄÖØ¸´ÊýĿԼµÈÓÚ spout.max.pending(Spout µÄÅäÖÃÏÿ´Î¿ÉÒÔ·¢Ë͵Ä×î¶àÏûÏ¢ÌõÊý£©
* NumberOfException£¨Òì³£´ÎÊý£©¡£
Acker ·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢ÖØ¸´µÄÊýÄ¿µÈÓÚ spout.max.pending * NumberOfException¡£
Bolt ·¢ÉúÒì³£µÄÇé¿ö£º
emit ֮ǰ·¢ÉúÒì³££¬ÏûÏ¢²»»áÖØ¸´¡£
emit Ö®ºó·¢ÉúÒì³££¬ÏûÏ¢ÖØ¸´µÄ´ÎÊýµÈÓÚÒì³£µÄ´ÎÊý¡£
½áÂÛÓë¹Ù·½ÎĵµËùÊöÏà·û£¬Ã¿ÌõÏûÏ¢ÖÁÉÙ·¢ËÍÒ»´Î£¬±£Ö¤Êý¾Ý²»»á¶ªÊ§£¬µ«¿ÉÄÜÖØ¸´£¬·ûºÏÔ¤ÆÚ¡£
Exactly Once
¶ÔÓÚ Exactly Once µÄÓïÒ壬ÀûÓà Storm ÖÐµÄ Trident À´ÊµÏÖ¡£
²âÊÔÊý¾Ý
Kafka Éϱ£´æµÄÒ»Íòµ½Ò»°ÙÍò²»µÈµÄÊý×Ö£¬Ã¿¸öÊý×ÖÔÚÿ´Î²âÊÔÑùÀýÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£
²âÊÔ½á¹û
Spout ·¢ÉúÒì³£Çé¿ö

½áÂÛ
ÔÚËùÓÐÇé¿öÏ£¬×îÖÕ½á¹û¼¯ÖеÄÏûÏ¢²»»á¶ªÊ§£¬²»»áÖØ¸´£¬Óë¹Ù·½ÎĵµÖеÄÃèÊöÏà·û£¬·ûºÏÔ¤ÆÚ¡£
×ܽá
¶Ô Storm ÌṩµÄÈýÖÖ²»Í¬ÏûÏ¢±£Ö¤»úÖÆ£¬Óû§¿ÉÒÔ¸ù¾Ý×Ô¼ºµÄÐèÇóÑ¡Ôñ²»Í¬µÄÏûÏ¢±£Ö¤»úÖÆ¡£
²»Í¬ÏûÏ¢¿É¿¿ÐÔ±£Ö¤µÄʹÓó¡¾°
¶ÔÓÚ Storm ÌṩµÄÈýÖÖÏûÏ¢¿É¿¿ÐÔ±£Ö¤£¬ÓÅȱµãÒÔ¼°Ê¹Óó¡¾°ÈçÏÂËùʾ£º

ÈçºÎʵÏÖ²»Í¬²ã´ÎµÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤
¶ÔÓÚ At Least Once µÄ±£Ö¤ÐèÒª×öÈçϼ¸²½£º
ÐèÒª¿ªÆô ACK »úÖÆ£¬¼´ Topology ÖÐµÄ Acker ÊýÁ¿´óÓÚÁ㣻
Spout Êǿɿ¿µÄ¡£¼´ Spout ·¢ËÍÏûÏ¢µÄʱºòÐèÒª¸½´ø msgId£¬²¢ÇÒʵÏÖʧ°ÜÏûÏ¢ÖØ´«¹¦ÄÜ£¨fail
º¯Êý £¬¿ÉÒԲο¼ÏÂÃæµÄ Spout ´úÂ룩£»
Bolt ÔÚ·¢ËÍÏûϢʱ£¬ÐèÒªµ÷Óà emit£¨inputTuple, outputTuple£©½øÐн¨Á¢
anchor Ê÷£¨²Î¿¼ÏÂÃæ½¨Á¢ anchor Ê÷µÄ´úÂ룩£¬²¢ÇÒÔڳɹ¦´¦ÀíÖ®ºóµ÷Óà ack £¬´¦Àíʧ°Üʱµ÷ÓÃ
fail º¯Êý£¬Í¨Öª Acker¡£
²»Âú×ãÒÔÉÏÈýÌõÖÐÈÎÒâÒ»ÌõµÄ¶¼Ö»Ìṩ At Most Once µÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤£¬Èç¹ûÏ£ÍûµÃµ½ Exactly
Once µÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤£¬¿ÉÒÔʹÓà Trident ½øÐÐʵÏÖ¡£
²»Í¬²ã²âµÄ¿É¿¿ÐÔ±£Ö¤ÈçºÎʵÏÖ
ÈçºÎʵÏÖ¿É¿¿µÄ Spout
ʵÏÖ¿É¿¿µÄ Spout ÐèÒªÔÚ nextTuple º¯ÊýÖз¢ËÍÏûϢʱ£¬µ÷Óôø msgID µÄ emit
·½·¨£¬È»ºóʵÏÖʧ°ÜÏûÏ¢µÄÖØ´«£¨fail º¯Êý£©£¬²Î¿¼ÈçÏÂʾÀý:
/** * ÏëʵÏÖ¿É¿¿µÄ Spout£¬ÐèҪʵÏÖÈçÏÂÁ½µã * 1. ÔÚ nextTuple º¯ÊýÖе÷Óà emit º¯ÊýʱÐèÒª´øÒ»¸ö msgId£¬ÓÃÀ´±íʾµ±Ç°µÄÏûÏ¢ £¨Èç¹ûÏûÏ¢·¢ËÍʧ°Ü»áÓà msgId ×÷Ϊ²ÎÊý»Øµ÷ fail º¯Êý£© * 2. ×Ô¼ºÊµÏÖ fail º¯Êý£¬½øÐÐÖØ·¢ £¨×¢Ò⣬ÔÚ storm ÖÐûÓÐ msgId ºÍÏûÏ¢µÄ¶ÔÓ¦¹ØÏµ£¬ÐèÒª×Ô¼º½øÐÐά»¤£© */ public void nextTuple() { //ÉèÖà msgId ºÍ Value Ò»Ñù£¬·½±ã fail Ö®ºóÖØ·¢ collector.emit (new Values(curNum + "", round + ""), curNum + ":" + round); }
@Override
public void fail(Object msgId) {//ÏûÏ¢·¢ËÍʧ°ÜʱµÄ»Øµ÷º¯Êý
String tmp = (String)msgId; //ÉÏÃæÎÒÃÇÉèÖÃÁË msgId ºÍÏûÏ¢Ïàͬ£¬ÕâÀïͨ¹ý
msgId ½âÎö³ö¾ßÌåµÄÏûÏ¢
String[] args = tmp.split(":");
//ÏûÏ¢½øÐÐÖØ·¢
collector.emit(new Values(args[0], args[1]), msgId);
} |
ÈçºÎʵÏÖ¿É¿¿µÄ Bolt
Storm ÌṩÁ½ÖÖ²»Í¬ÀàÐ굀 Bolt£¬·Ö±ðÊÇ BaseRichBolt ºÍ BaseBasicBolt£¬¶¼¿ÉÒÔʵÏÖ¿É¿¿ÐÔÏûÏ¢´«µÝ£¬²»¹ý
BaseRichBolt ÐèÒª×Ô¼º×öºÜ¶àÖܱߵÄÊÂÇ飨½¨Á¢ anchor Ê÷£¬ÒÔ¼°ÊÖ¶¯ ACK/FAIL
֪ͨ Acker£©£¬Ê¹Óó¡¾°¸ü¹ã·º£¬¶ø BaseBasicBolt ÔòÓÉ Storm °ïæʵÏÖÁ˺ܶàÖܱߵÄÊÂÇ飬ʵÏÖÆðÀ´·½±ã¼òµ¥£¬µ«ÊÇʹÓó¡¾°µ¥Ò»¡£ÈçºÎÓÃÕâÁ½¸ö
Bolt ʵÏÖ£¨²»£©¿É¿¿µÄÏûÏ¢´«µÝÈçÏÂËùʾ£º
//BaseRichBolt ʵÏÖ²»¿É¿¿ÏûÏ¢´«µÝ public class SplitSentence extends BaseRichBolt {//²»½¨Á¢ anchor Ê÷µÄÀý×Ó OutputCollector _collector;
public void prepare(Map conf, TopologyContext
context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" "))
{
_collector.emit(new Values(word)); // ²»½¨Á¢ anchor
Ê÷
}
_collector.ack(tuple); //ÊÖ¶¯ ack£¬Èç¹û²»½¨Á¢ anchor Ê÷£¬ÊÇ·ñ
ack ÊÇûÓÐÇø±ðµÄ£¬Õâ¾ä¿ÉÒÔ½øÐÐ×¢ÊÍ
}
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields("word"));
}
}
//BaseRichBolt ʵÏÖ¿É¿¿µÄ Bolt
public class SplitSentence extends BaseRichBolt
{//½¨Á¢ anchor Ê÷ÒÔ¼°ÊÖ¶¯ ack µÄÀý×Ó
OutputCollector _collector;
public void prepare(Map conf, TopologyContext
context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" "))
{
_collector.emit(tuple, new Values(word)); // ½¨Á¢
anchor Ê÷
}
_collector.ack(tuple); //ÊÖ¶¯ ack£¬Èç¹ûÏëÈà Spout ÖØ·¢¸Ã
Tuple£¬Ôòµ÷Óà _collector.fail(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields("word"));
}
}
ÏÂÃæµÄʾÀý»á¿ÉÒÔ½¨Á¢ Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
//BaseBasicBolt ÊÇÎüÄɿɿ¿µÄÏûÏ¢´«µÝ
public class SplitSentence extends BaseBasicBolt
{//×Ô¶¯½¨Á¢ anchor£¬×Ô¶¯ ack
public void execute(Tuple tuple, BasicOutputCollector
collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" "))
{
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields("word"));
}
} |
Trident
ÔÚ Trident ÖУ¬Spout ºÍ State ·Ö±ðÓÐÈýÖÖ״̬£¬ÈçÏÂͼËùʾ£º

ÆäÖбí¸ñÖÐµÄ Yes ±íʾÏàÓ¦µÄ Spout ºÍ State ×éºÏ¿ÉÒÔʵÏÖ Exactly Once
ÓïÒ壬No ±íʾÏàÓ¦µÄ Spout ºÍ State ×éºÏ²»±£Ö¤ Exactly Once ÓïÒå¡£ÏÂÃæµÄ´úÂëÊÇÒ»¸ö
Trident ʾÀý£º
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); //Opaque Spout //TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf); //Transaction Spout
TridentTopology topology = new TridentTopology();
String spoutTxid = Utils.kafkaSpoutGroupIdBuilder (topologyConfig.kafkaSrcTopic,
topologyConfig.topologyName);
Stream stream = topology.newStream(spoutTxid,
spout)
.name("new stream")
.parallelismHint(1);
// kafka config
KafkaProducerConfig kafkaProducerConfig = new
KafkaProducerConfig(); //KafkaProducerConfig ½ö¶Ô
kafka Ïà¹ØÅäÖýøÐÐÁË·â×°£¬¾ßÌå¿ÉÒԲο¼ TridentKafkaStateFactory2 (Map<String,
String> config)
Map<String, String> kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);
TridentToKafkaMapper tridentToKafkaMapper = new
TridentToKafkaMapper(); //TridentToKafkaMapper
¼Ì³Ð×Ô TridentTupleToKafkaMapper<String, String>£¬ ʵÏÖ
getMessageFromTuple ½Ó¿Ú£¬¸Ã½Ó¿ÚÖзµ»Ø tridentTuple.getString(0);
String dstTopic = "test__topic_for_all";
TridentKafkaStateFactory2 stateFactory = new
TridentKafkaStateFactory2(kafkaConfigs);
stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);
stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));
stream.each(new Fields("bytes"), new
AddMarkFunction(), new Fields("word"))
//´Óspout ³öÀ´Êý¾ÝÊÇÒ»¸ö bytes ÀàÐ͵ÄÊý¾Ý £¬µÚ¶þ¸öÊDzÎÊýÊÇ×Ô¼ºµÄ´¦Àíº¯Êý£¬µÚÈý¸ö²ÎÊýÊÇ´¦Àíº¯ÊýµÄÊä³ö×Ö¶Î
.name("write2kafka")
.partitionPersist(stateFactory //½«Êý¾ÝдÈëµ½ Kafka
ÖУ¬¿ÉÒÔ±£Ö¤Ð´Èëµ½ Kafka µÄÊý¾ÝÊÇ exactly once µÄ
, new Fields("word")
, new TridentKafkaUpdater())
.parallelismHint(1); |
|