Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Storm µÄ¿É¿¿ÐÔ±£Ö¤²âÊÔ
 
À´Ô´£ºtech.meituan.com ·¢²¼ÓÚ£º 2017-1-5
  4126  次浏览      29
 

Storm ÊÇÒ»¸ö·Ö²¼Ê½µÄʵʱ¼ÆËã¿ò¼Ü£¬¿ÉÒԺܷ½±ãµØ¶ÔÁ÷ʽÊý¾Ý½øÐÐʵʱ´¦ÀíºÍ·ÖÎö£¬ÄÜÔËÓÃÔÚʵʱ·ÖÎö¡¢ÔÚÏßÊý¾ÝÍÚ¾ò¡¢³ÖÐø¼ÆËãÒÔ¼°·Ö²¼Ê½ RPC µÈ³¡¾°Ï¡£Storm µÄʵʱÐÔ¿ÉÒÔʹµÃÊý¾Ý´ÓÊÕ¼¯µ½´¦ÀíչʾÔÚÃë¼¶±ðÄÚÍê³É£¬´Ó¶øÎªÒµÎñ·½¾ö²ßÌṩʵʱµÄÊý¾ÝÖ§³Ö¡£

ÔÚÃÀÍŵãÆÀ¹«Ë¾ÄÚ²¿£¬ÊµÊ±¼ÆËãÖ÷ÒªÓ¦Óó¡¾°°üÀ¨ÊµÊ±ÈÕÖ¾½âÎö¡¢Óû§ÐÐΪ·ÖÎö¡¢ÊµÊ±ÏûÏ¢ÍÆËÍ¡¢Ïû·ÑÇ÷ÊÆÕ¹Ê¾¡¢ÊµÊ±Ð¿ÍÅжϡ¢ÊµÊ±»îÔ¾Óû§Êýͳ¼ÆµÈ¡£ÕâЩÊý¾ÝÌṩ¸ø¸÷ÊÂҵȺ£¬²¢×÷ΪËûÃÇʵʱ¾ö²ßµÄÓÐÁ¦ÒÀ¾Ý£¬ÃÖ²¹ÁËÀëÏß¼ÆËã¡°T+1¡±µÄ²»×ã¡£

ÔÚʵʱ¼ÆËãÖУ¬Óû§²»½ö½ö¹ØÐÄʱЧÐÔµÄÎÊÌ⣬ͬʱҲ¹ØÐÄÏûÏ¢´¦ÀíµÄ³É¹¦ÂÊ¡£±¾ÎĽ«Í¨¹ýʵÑéÑéÖ¤ Storm µÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤»úÖÆ£¬ÎÄÕ·ÖΪÏûÏ¢±£Ö¤»úÖÆ¡¢²âÊÔÄ¿µÄ¡¢²âÊÔ»·¾³¡¢²âÊÔ³¡¾°ÒÔ¼°×ܽáµÈÎå½Ú¡£

Storm µÄÏûÏ¢±£Ö¤»úÖÆ

Storm ÌṩÁËÈýÖÖ²»Í¬²ã´ÎµÄÏûÏ¢±£Ö¤»úÖÆ£¬·Ö±ðÊÇ At Most Once¡¢At Least Once ÒÔ¼° Exactly Once¡£ÏûÏ¢±£Ö¤»úÖÆÒÀÀµÓÚÏûÏ¢ÊÇ·ñ±»ÍêÈ«´¦Àí¡£

ÏûÏ¢ÍêÈ«´¦Àí

ÿ¸ö´Ó Spout£¨Storm ÖÐÊý¾ÝÔ´½Úµã£©·¢³öµÄ Tuple£¨Storm ÖеÄ×îСÏûÏ¢µ¥Ôª£©¿ÉÄÜ»áÉú³É³ÉǧÉÏÍò¸öÐ嵀 Tuple£¬ÐγÉÒ»¿Ã Tuple Ê÷£¬µ±Õû¿Ã Tuple Ê÷µÄ½Úµã¶¼±»³É¹¦´¦ÀíÁË£¬ÎÒÃǾÍ˵´Ó Spout ·¢³öµÄ Tuple ±»ÍêÈ«´¦ÀíÁË¡£ ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÀý×ÓÀ´¸üºÃµØÚ¹ÊÍÏûÏ¢±»ÍêÈ«´¦ÀíÕâ¸ö¸ÅÄ

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));

Õâ¸ö Topology ´Ó Kafka£¨Ò»¸ö¿ªÔ´µÄ·Ö²¼Ê½ÏûÏ¢¶ÓÁУ©¶ÁÈ¡ÐÅÏ¢·¢ÍùÏÂÓΣ¬ÏÂÓ뵀 Bolt ½«ÊÕµ½µÄ¾ä×Ó·Ö¸î³Éµ¥¶ÀµÄµ¥´Ê£¬²¢½øÐмÆÊý¡£Ã¿Ò»¸ö´Ó Spout ·¢ËͳöÀ´µÄ Tuple »áÑÜÉú³ö¶à¸öÐ嵀 Tuple£¬´Ó Spout ·¢ËͳöÀ´µÄ Tuple ÒÔ¼°ºóÐøÑÜÉú³öÀ´µÄ Tuple ÐγÉÒ»¿Ã Tuple Ê÷£¬ÏÂͼÊÇÒ»¿Ã Tuple Ê÷ʾÀý£º

ÉÏͼÖÐËùÓÐµÄ Tuple ¶¼±»³É¹¦´¦ÀíÁË£¬ÎÒÃDzÅÈÏΪ Spout ·¢³öµÄ Tuple ±»ÍêÈ«´¦Àí¡£Èç¹ûÔÚÒ»¸ö¹Ì¶¨µÄʱ¼äÄÚ£¨Õâ¸öʱ¼ä¿ÉÒÔÅäÖã¬Ä¬ÈÏΪ 30 Ã룩£¬ÓÐÖÁÉÙÒ»¸ö Tuple ´¦Àíʧ°Ü»ò³¬Ê±£¬ÔòÈÏΪÕû¿Ã Tuple Ê÷´¦Àíʧ°Ü£¬¼´´Ó Spout ·¢³öµÄ Tuple ´¦Àíʧ°Ü¡£

ÈçºÎʵÏÖ²»Í¬²ã´ÎµÄÏûÏ¢±£Ö¤»úÖÆ

Tuple µÄÍêÈ«´¦ÀíÐèÒª Spout¡¢Bolt ÒÔ¼° Acker£¨Storm ÖÐÓÃÀ´¼Ç¼ij¿Ã Tuple Ê÷ÊÇ·ñ±»ÍêÈ«´¦ÀíµÄ½Úµã£©Ð­Í¬Íê³É£¬ÈçÉÏͼËùʾ¡£´Ó Spout ·¢ËÍ Tuple µ½ÏÂÓΣ¬²¢°ÑÏàÓ¦ÐÅϢ֪ͨ¸ø Acker£¬Õû¿Ã Tuple Ê÷ÖÐij¸ö Tuple ±»³É¹¦´¦ÀíÁ˶¼»á֪ͨ Acker£¬´ýÕû¿Ã Tuple Ê÷¶¼±»´¦ÀíÍê³ÉÖ®ºó£¬Acker ½«³É¹¦´¦ÀíÐÅÏ¢·µ»Ø¸ø Spout£»Èç¹ûij¸ö Tuple ´¦Àíʧ°Ü£¬»òÕß³¬Ê±£¬Acker ½«»á¸ø Spout ·¢ËÍÒ»¸ö´¦Àíʧ°ÜµÄÏûÏ¢£¬Spout ¸ù¾Ý Acker µÄ·µ»ØÐÅÏ¢ÒÔ¼°Óû§¶ÔÏûÏ¢±£Ö¤»úÖÆµÄÑ¡ÔñÅжÏÊÇ·ñÐèÒª½øÐÐÏûÏ¢ÖØ´«¡£

Storm ÌṩµÄÈýÖÖ²»Í¬ÏûÏ¢±£Ö¤»úÖÆÖС£ÀûÓà Spout¡¢Bolt ÒÔ¼° Acker µÄ×éºÏÎÒÃÇ¿ÉÒÔʵÏÖ At Most Once ÒÔ¼° At Least Once ÓïÒ壬Storm ÔÚ At Least Once µÄ»ù´¡ÉϽøÐÐÁËÒ»´Î·â×°£¨Trident£©£¬´Ó¶øÊµÏÖ Exactly Once ÓïÒå¡£

Storm µÄÏûÏ¢±£Ö¤»úÖÆÖУ¬Èç¹ûÐèҪʵÏÖ At Most Once ÓïÒ壬ֻÐèÒªÂú×ãÏÂÃæÈκÎÒ»Ìõ¼´¿É£º

¹Ø±Õ ACK »úÖÆ£¬¼´ Acker ÊýÄ¿ÉèÖÃΪ 0

Spout ²»ÊµÏÖ¿É¿¿ÐÔ´«Êä

Spout ·¢ËÍÏûÏ¢ÊÇʹÓò»´ø message ID µÄ API

²»ÊµÏÖ fail º¯Êý

Bolt ²»°Ñ´¦Àí³É¹¦»òʧ°ÜµÄÏûÏ¢·¢Ë͸ø Acker

Èç¹ûÐèҪʵÏÖ At Least Once ÓïÒ壬ÔòÐèҪͬʱ±£Ö¤Èçϼ¸Ìõ£º

¿ªÆô ACK »úÖÆ£¬¼´ Acker ÊýÄ¿´óÓÚ 0

Spout ʵÏÖ¿É¿¿ÐÔ´«Êä±£Ö¤

Spout ·¢ËÍÏûϢʱ¸½´ø message µÄ ID

Èç¹ûÊÕµ½ Acker µÄ´¦Àíʧ°Ü·´À¡£¬ÐèÒª½øÐÐÏûÏ¢ÖØ´«£¬¼´ÊµÏÖ fail º¯Êý

Bolt ÔÚ´¦Àí³É¹¦»òʧ°ÜºóÐèÒªµ÷ÓÃÏàÓ¦µÄ·½·¨Í¨Öª Acker

ʵÏÖ Exactly Once ÓïÒ壬ÔòÐèÒªÔÚ At Least Once µÄ»ù´¡ÉϽøÐÐ״̬µÄ´æ´¢£¬ÓÃÀ´·ÀÖ¹ÖØ¸´·¢Ë͵ÄÊý¾Ý±»Öظ´´¦Àí£¬ÔÚ Storm ÖÐʹÓà Trident API ʵÏÖ¡£

ÏÂͼÖУ¬Ã¿ÖÖÏûÏ¢±£Ö¤»úÖÆÖÐ×ó±ßµÄ×Öĸ±íʾÉÏÓη¢Ë͵ÄÏûÏ¢£¬ÓұߵÄ×Öĸ±íʾÏÂÓνÓÊÕµ½µÄÏûÏ¢¡£´ÓͼÖпÉÒÔÖªµÀ£¬At Most Once ÖУ¬ÏûÏ¢¿ÉÄܻᶪʧ£¨ÉÏÓη¢ËÍÁËÁ½¸ö A£¬ÏÂÓÎÖ»ÊÕµ½Ò»¸ö A£©£»At Least Once ÖУ¬ÏûÏ¢²»»á¶ªÊ§£¬¿ÉÄÜÖØ¸´£¨ÉÏÓÎÖ»·¢ËÍÁËÒ»¸ö B £¬ÏÂÓÎÊÕµ½Á½¸ö B£©£»Exactly Once ÖУ¬ÏûÏ¢²»¶ªÊ§¡¢²»Öظ´£¬Òò´ËÐèÒªÔÚ At Least Once µÄ»ù´¡Éϱ£´æÏàÓ¦µÄ״̬£¬±íʾÉÏÓεÄÄÄЩÏûÏ¢ÒѾ­³É¹¦·¢Ë͵½ÏÂÓΣ¬·ÀֹͬһÌõÏûÏ¢·¢ËͶà´Î¸øÏÂÓεÄÇé¿ö¡£

²âÊÔÄ¿µÄ

Storm ¹Ù·½Ìṩ At Most Once¡¢At Least Once ÒÔ¼° Exactly Once ÈýÖÖ²»Í¬²ã´ÎµÄÏûÏ¢±£Ö¤»úÖÆ£¬ÎÒÃÇÏ£Íûͨ¹ýÏà¹Ø²âÊÔ£¬´ïµ½ÈçÏÂÄ¿µÄ£º

ÈýÖÖÏûÏ¢±£Ö¤»úÖÆµÄ±íÏÖ£¬ÊÇ·ñÓë¹Ù·½µÄÃèÊöÏà·û£»

At Most Once ÓïÒåÏ£¬ÏûÏ¢µÄ¶ªÊ§ÂʺÍʲôÓйØÏµ¡¢¹ØÏµÈçºÎ£»

At Least Once ÓïÒåÏ£¬ÏûÏ¢µÄÖØ¸´ÂʺÍʲôÓйØÏµ¡¢¹ØÏµÈçºÎ¡£

²âÊÔ»·¾³

±¾ÎĵIJâÊÔ»·¾³ÈçÏÂ: ÿ¸ö worker£¨worker Ϊһ¸ö ÎïÀí JVM ½ø³Ì£¬ÓÃÓÚÔËÐÐʵ¼ÊµÄ Storm ×÷Òµ£©·ÖÅä 1 CPU ÒÔ¼° 1.6G ÄÚ´æ¡£Spout¡¢Bolt¡¢Acker ·Ö±ðÅÜÔÚµ¥¶ÀµÄ worker ÉÏ¡£²¢Í¨¹ýÔÚ³ÌÐòÖпØÖÆÅ׳öÒì³£ÒÔ¼°È˹¤ Kill Spout/Bolt/Acker µÄ·½Ê½À´Ä£Äâʵ¼ÊÇé¿öÖеÄÒì³£Çé¿ö¡£

ÈýÖÖÏûÏ¢±£Ö¤»úÖÆµÄ²âÊÔ¾ùÓÉ Spout ´Ó Kafka ¶ÁÈ¡²âÊÔÊý¾Ý£¬¾­ÓÉÏàÓ¦ Bolt ½øÐд¦Àí£¬È»ºó·¢Ë͵½ Kafka£¬²¢½« Kafka ÉϵÄÊý¾Ýͬ²½µ½ MySQL ·½±ã×îÖÕ½á¹ûµÄͳ¼Æ£¬ÈçÏÂͼËùʾ£º

²âÊÔÊý¾ÝΪ Kafka ÉÏ˳Ðò±£´æµÄһϵÁд¿Êý×Ö£¬Êý¾ÝÁ¿·Ö±ðÓÐÊ®Íò¡¢ÎåÊ®Íò¡¢Ò»°ÙÍòµÈ£¬Ã¿¸öÊý×ÖÔÚÿ¸ö²âÊÔÑùÀýÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£

²âÊÔ³¡¾°

¶ÔÓÚÈýÖÖ²»Í¬µÄÏûÏ¢±£Ö¤»úÖÆ£¬ÎÒÃÇ·Ö±ðÉèÖÃÁ˲»Í¬µÄ²âÊÔ³¡¾°£¬À´½øÐгä·ÖµÄ²âÊÔ¡£ÆäÖÐΪÁ˱£Ö¤ Spout/Bolt/Acker ·¢ÉúÒì³£µÄÇé¿öϲ»Ó°ÏìÆäËû½Úµã£¬ÔÚÏÂÃæµÄ²âÊÔÖУ¬ËùÓеĽڵ㵥¶ÀÔËÐÐÔÚ¶ÀÁ¢µÄ Worker ÉÏ¡£

At Most Once

´Ó±³¾°ÖпÉÒÔµÃÖª£¬Èç¹ûÏ£ÍûʵÏÖ At Most Once ÓïÒ壬½« Acker µÄÊýÄ¿ÉèÖÃΪ 0 ¼´¿É£¬±¾ÎĵIJâÊÔ¹ý³ÌÖÐͨ¹ý°ÑÉèÖà Acker Ϊ 0 À´½øÐÐ At Most Once µÄ²âÊÔ¡£

ÊäÈëÊý¾Ý

±£´æÔÚ Kafka ÉϵÄһϵÁд¿Êý×Ö£¬Êý¾ÝÁ¿´ÓÊ®Íòµ½Îå°ÙÍò²»µÈ£¬Ã¿¸ö²âÊÔÑùÀýÖУ¬Í¬Ò»¸öÊý×ÖÔÚ Kafka ÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£

²âÊÔ½á¹û

½áÂÛ

²»·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢Äܹ»²»¶ª²»ÖØ£»Bolt ·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢»á¶ªÊ§£¬²»»áÖØ¸´£¬ÆäÖÐÏûÏ¢µÄ ¶ªÊ§ÊýÄ¿ Óë Òì³£´ÎÊýÕýÏà¹Ø ¡£Óë¹Ù·½ÎĵµÃèÊöÏà·û£¬·ûºÏÔ¤ÆÚ¡£

At Least Once

ΪÁËʵÏÖ At Least Once ÓïÒ壬ÐèÒª Spout¡¢Bolt¡¢Acker ½øÐÐÅäºÏ¡£ÎÒÃÇʹÓà Kafka-Spout ²¢Í¨¹ý×Ô¼º¹ÜÀí offset µÄ·½Ê½À´ÊµÏÖ¿É¿¿µÄ Spout£»Bolt ͨ¹ý¼Ì³Ð BaseBasicBolt£¬×Ô¶¯°ïÎÒÃǽ¨Á¢ Tuple Ê÷ÒÔ¼°ÏûÏ¢´¦ÀíÖ®ºó֪ͨ Acker£»½« Acker µÄÊýÄ¿ÉèÖÃΪ 1£¬¼´´ò¿ª ACK »úÖÆ£¬ÕâÑùÕû¸ö Topology ¼´¿ÉÌṩ At Least Once µÄÓïÒå¡£

²âÊÔÊý¾Ý

Kafka Éϱ£´æµÄÊ®Íòµ½ÎåÊ®Íò²»µÈµÄ´¿Êý×Ö£¬ÆäÖÐÿ¸ö²âÊÔÑùÀýÖУ¬Ã¿¸öÊý×ÖÔÚ Kafka ÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£

²âÊÔ½á¹û

Acker ·¢ÉúÒì³£µÄÇé¿ö

 

½áÂÛ

´ÓÉÏÃæµÄ±í¸ñÖпÉÒԵõ½£¬ÏûÏ¢²»»á¶ªÊ§£¬¿ÉÄÜ·¢ÉúÖØ¸´£¬Öظ´µÄÊýÄ¿ÓëÒì³£µÄÇé¿öÏà¹Ø¡£

²»·¢ÉúÈκÎÒì³£µÄÇé¿öÏ£¬ÏûÏ¢²»»áÖØ¸´²»»á¶ªÊ§¡£

Spout ·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢µÄÖØ¸´ÊýĿԼµÈÓÚ spout.max.pending(Spout µÄÅäÖÃÏÿ´Î¿ÉÒÔ·¢Ë͵Ä×î¶àÏûÏ¢ÌõÊý£© * NumberOfException£¨Òì³£´ÎÊý£©¡£

Acker ·¢ÉúÒì³£µÄÇé¿öÏ£¬ÏûÏ¢ÖØ¸´µÄÊýÄ¿µÈÓÚ spout.max.pending * NumberOfException¡£

Bolt ·¢ÉúÒì³£µÄÇé¿ö£º

emit ֮ǰ·¢ÉúÒì³££¬ÏûÏ¢²»»áÖØ¸´¡£

emit Ö®ºó·¢ÉúÒì³££¬ÏûÏ¢ÖØ¸´µÄ´ÎÊýµÈÓÚÒì³£µÄ´ÎÊý¡£

½áÂÛÓë¹Ù·½ÎĵµËùÊöÏà·û£¬Ã¿ÌõÏûÏ¢ÖÁÉÙ·¢ËÍÒ»´Î£¬±£Ö¤Êý¾Ý²»»á¶ªÊ§£¬µ«¿ÉÄÜÖØ¸´£¬·ûºÏÔ¤ÆÚ¡£

Exactly Once

¶ÔÓÚ Exactly Once µÄÓïÒ壬ÀûÓà Storm ÖÐµÄ Trident À´ÊµÏÖ¡£

²âÊÔÊý¾Ý

Kafka Éϱ£´æµÄÒ»Íòµ½Ò»°ÙÍò²»µÈµÄÊý×Ö£¬Ã¿¸öÊý×ÖÔÚÿ´Î²âÊÔÑùÀýÖгöÏÖÇÒ½ö³öÏÖÒ»´Î¡£

²âÊÔ½á¹û

Spout ·¢ÉúÒì³£Çé¿ö

½áÂÛ

ÔÚËùÓÐÇé¿öÏ£¬×îÖÕ½á¹û¼¯ÖеÄÏûÏ¢²»»á¶ªÊ§£¬²»»áÖØ¸´£¬Óë¹Ù·½ÎĵµÖеÄÃèÊöÏà·û£¬·ûºÏÔ¤ÆÚ¡£

×ܽá

¶Ô Storm ÌṩµÄÈýÖÖ²»Í¬ÏûÏ¢±£Ö¤»úÖÆ£¬Óû§¿ÉÒÔ¸ù¾Ý×Ô¼ºµÄÐèÇóÑ¡Ôñ²»Í¬µÄÏûÏ¢±£Ö¤»úÖÆ¡£

²»Í¬ÏûÏ¢¿É¿¿ÐÔ±£Ö¤µÄʹÓó¡¾°

¶ÔÓÚ Storm ÌṩµÄÈýÖÖÏûÏ¢¿É¿¿ÐÔ±£Ö¤£¬ÓÅȱµãÒÔ¼°Ê¹Óó¡¾°ÈçÏÂËùʾ£º

ÈçºÎʵÏÖ²»Í¬²ã´ÎµÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤

¶ÔÓÚ At Least Once µÄ±£Ö¤ÐèÒª×öÈçϼ¸²½£º

ÐèÒª¿ªÆô ACK »úÖÆ£¬¼´ Topology ÖÐµÄ Acker ÊýÁ¿´óÓÚÁ㣻

Spout Êǿɿ¿µÄ¡£¼´ Spout ·¢ËÍÏûÏ¢µÄʱºòÐèÒª¸½´ø msgId£¬²¢ÇÒʵÏÖʧ°ÜÏûÏ¢ÖØ´«¹¦ÄÜ£¨fail º¯Êý £¬¿ÉÒԲο¼ÏÂÃæµÄ Spout ´úÂ룩£»

Bolt ÔÚ·¢ËÍÏûϢʱ£¬ÐèÒªµ÷Óà emit£¨inputTuple, outputTuple£©½øÐн¨Á¢ anchor Ê÷£¨²Î¿¼ÏÂÃæ½¨Á¢ anchor Ê÷µÄ´úÂ룩£¬²¢ÇÒÔڳɹ¦´¦ÀíÖ®ºóµ÷Óà ack £¬´¦Àíʧ°Üʱµ÷Óà fail º¯Êý£¬Í¨Öª Acker¡£

²»Âú×ãÒÔÉÏÈýÌõÖÐÈÎÒâÒ»ÌõµÄ¶¼Ö»Ìṩ At Most Once µÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤£¬Èç¹ûÏ£ÍûµÃµ½ Exactly Once µÄÏûÏ¢¿É¿¿ÐÔ±£Ö¤£¬¿ÉÒÔʹÓà Trident ½øÐÐʵÏÖ¡£

²»Í¬²ã²âµÄ¿É¿¿ÐÔ±£Ö¤ÈçºÎʵÏÖ

ÈçºÎʵÏÖ¿É¿¿µÄ Spout

ʵÏÖ¿É¿¿µÄ Spout ÐèÒªÔÚ nextTuple º¯ÊýÖз¢ËÍÏûϢʱ£¬µ÷Óôø msgID µÄ emit ·½·¨£¬È»ºóʵÏÖʧ°ÜÏûÏ¢µÄÖØ´«£¨fail º¯Êý£©£¬²Î¿¼ÈçÏÂʾÀý:

/**
* ÏëʵÏÖ¿É¿¿µÄ Spout£¬ÐèҪʵÏÖÈçÏÂÁ½µã
* 1. ÔÚ nextTuple º¯ÊýÖе÷Óà emit º¯ÊýʱÐèÒª´øÒ»¸ö
msgId£¬ÓÃÀ´±íʾµ±Ç°µÄÏûÏ¢
£¨Èç¹ûÏûÏ¢·¢ËÍʧ°Ü»áÓà msgId ×÷Ϊ²ÎÊý»Øµ÷ fail º¯Êý£©
* 2. ×Ô¼ºÊµÏÖ fail º¯Êý£¬½øÐÐÖØ·¢
£¨×¢Ò⣬ÔÚ storm ÖÐûÓÐ msgId ºÍÏûÏ¢µÄ¶ÔÓ¦¹ØÏµ£¬ÐèÒª×Ô¼º½øÐÐά»¤£©
*/
public void nextTuple() {
//ÉèÖà msgId ºÍ Value Ò»Ñù£¬·½±ã fail Ö®ºóÖØ·¢
collector.emit
(new Values(curNum + "", round + ""), curNum + ":" + round);
}

@Override
public void fail(Object msgId) {//ÏûÏ¢·¢ËÍʧ°ÜʱµÄ»Øµ÷º¯Êý
String tmp = (String)msgId;
//ÉÏÃæÎÒÃÇÉèÖÃÁË msgId ºÍÏûÏ¢Ïàͬ£¬ÕâÀïͨ¹ý msgId ½âÎö³ö¾ßÌåµÄÏûÏ¢
String[] args = tmp.split(":");

//ÏûÏ¢½øÐÐÖØ·¢
collector.emit(new Values(args[0], args[1]), msgId);
}

ÈçºÎʵÏÖ¿É¿¿µÄ Bolt

Storm ÌṩÁ½ÖÖ²»Í¬ÀàÐ굀 Bolt£¬·Ö±ðÊÇ BaseRichBolt ºÍ BaseBasicBolt£¬¶¼¿ÉÒÔʵÏÖ¿É¿¿ÐÔÏûÏ¢´«µÝ£¬²»¹ý BaseRichBolt ÐèÒª×Ô¼º×öºÜ¶àÖܱߵÄÊÂÇ飨½¨Á¢ anchor Ê÷£¬ÒÔ¼°ÊÖ¶¯ ACK/FAIL ֪ͨ Acker£©£¬Ê¹Óó¡¾°¸ü¹ã·º£¬¶ø BaseBasicBolt ÔòÓÉ Storm °ïæʵÏÖÁ˺ܶàÖܱߵÄÊÂÇ飬ʵÏÖÆðÀ´·½±ã¼òµ¥£¬µ«ÊÇʹÓó¡¾°µ¥Ò»¡£ÈçºÎÓÃÕâÁ½¸ö Bolt ʵÏÖ£¨²»£©¿É¿¿µÄÏûÏ¢´«µÝÈçÏÂËùʾ£º

//BaseRichBolt ʵÏÖ²»¿É¿¿ÏûÏ¢´«µÝ
public class SplitSentence extends BaseRichBolt {//²»½¨Á¢ anchor Ê÷µÄÀý×Ó
OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(new Values(word)); // ²»½¨Á¢ anchor Ê÷
}
_collector.ack(tuple); //ÊÖ¶¯ ack£¬Èç¹û²»½¨Á¢ anchor Ê÷£¬ÊÇ·ñ ack ÊÇûÓÐÇø±ðµÄ£¬Õâ¾ä¿ÉÒÔ½øÐÐ×¢ÊÍ
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

//BaseRichBolt ʵÏÖ¿É¿¿µÄ Bolt
public class SplitSentence extends BaseRichBolt {//½¨Á¢ anchor Ê÷ÒÔ¼°ÊÖ¶¯ ack µÄÀý×Ó
OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word)); // ½¨Á¢ anchor Ê÷
}
_collector.ack(tuple); //ÊÖ¶¯ ack£¬Èç¹ûÏëÈà Spout ÖØ·¢¸Ã Tuple£¬Ôòµ÷Óà _collector.fail(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

ÏÂÃæµÄʾÀý»á¿ÉÒÔ½¨Á¢ Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

//BaseBasicBolt ÊÇÎüÄɿɿ¿µÄÏûÏ¢´«µÝ
public class SplitSentence extends BaseBasicBolt {//×Ô¶¯½¨Á¢ anchor£¬×Ô¶¯ ack
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

Trident

ÔÚ Trident ÖУ¬Spout ºÍ State ·Ö±ðÓÐÈýÖÖ״̬£¬ÈçÏÂͼËùʾ£º

ÆäÖбí¸ñÖÐµÄ Yes ±íʾÏàÓ¦µÄ Spout ºÍ State ×éºÏ¿ÉÒÔʵÏÖ Exactly Once ÓïÒ壬No ±íʾÏàÓ¦µÄ Spout ºÍ State ×éºÏ²»±£Ö¤ Exactly Once ÓïÒå¡£ÏÂÃæµÄ´úÂëÊÇÒ»¸ö Trident ʾÀý£º

OpaqueTridentKafkaSpout spout =
new OpaqueTridentKafkaSpout(spoutConf);
//Opaque Spout
//TransactionalTridentKafkaSpout spout =
new TransactionalTridentKafkaSpout(spoutConf);
//Transaction Spout

TridentTopology topology = new TridentTopology();
String spoutTxid =
Utils.kafkaSpoutGroupIdBuilder
(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);
Stream stream = topology.newStream(spoutTxid, spout)
.name("new stream")
.parallelismHint(1);

// kafka config
KafkaProducerConfig kafkaProducerConfig
= new KafkaProducerConfig();
//KafkaProducerConfig ½ö¶Ô kafka
Ïà¹ØÅäÖýøÐÐÁË·â×°£¬¾ßÌå¿ÉÒԲο¼
TridentKafkaStateFactory2
(Map<String, String> config)
Map<String, String>
kafkaConfigs =
kafkaProducerConfig.loadFromConfig(topologyConfig);
TridentToKafkaMapper
tridentToKafkaMapper
= new TridentToKafkaMapper();
//TridentToKafkaMapper
¼Ì³Ð×Ô TridentTupleToKafkaMapper<String, String>£¬
ʵÏÖ getMessageFromTuple ½Ó¿Ú£¬¸Ã½Ó¿ÚÖзµ»Ø tridentTuple.getString(0);

String dstTopic = "test__topic_for_all";

TridentKafkaStateFactory2 stateFactory
= new TridentKafkaStateFactory2(kafkaConfigs);
stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);
stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));

stream.each(new Fields("bytes"),
new AddMarkFunction(), new Fields("word"))
//´Óspout ³öÀ´Êý¾ÝÊÇÒ»¸ö bytes ÀàÐ͵ÄÊý¾Ý
£¬µÚ¶þ¸öÊDzÎÊýÊÇ×Ô¼ºµÄ´¦Àíº¯Êý£¬µÚÈý¸ö²ÎÊýÊÇ´¦Àíº¯ÊýµÄÊä³ö×Ö¶Î
.name("write2kafka")
.partitionPersist(stateFactory
//½«Êý¾ÝдÈëµ½ Kafka ÖУ¬¿ÉÒÔ±£Ö¤Ð´Èëµ½ Kafka µÄÊý¾ÝÊÇ exactly once µÄ
, new Fields("word")
, new TridentKafkaUpdater())
.parallelismHint(1);

   
4126 ´Îä¯ÀÀ       29
Ïà¹ØÎÄÕÂ

΢·þÎñ²âÊÔÖ®µ¥Ôª²âÊÔ
һƪͼÎÄ´øÄãÁ˽â°×ºÐ²âÊÔÓÃÀýÉè¼Æ·½·¨
È«ÃæµÄÖÊÁ¿±£ÕÏÌåϵ֮»Ø¹é²âÊÔ²ßÂÔ
È˹¤ÖÇÄÜ×Ô¶¯»¯²âÊÔ̽Ë÷
Ïà¹ØÎĵµ

×Ô¶¯»¯½Ó¿Ú²âÊÔʵ¼ù֮·
jenkins³ÖÐø¼¯³É²âÊÔ
ÐÔÄܲâÊÔÕï¶Ï·ÖÎöÓëÓÅ»¯
ÐÔÄܲâÊÔʵÀý
Ïà¹Ø¿Î³Ì

³ÖÐø¼¯³É²âÊÔ×î¼Ñʵ¼ù
×Ô¶¯»¯²âÊÔÌåϵ½¨ÉèÓë×î¼Ñʵ¼ù
²âÊԼܹ¹µÄ¹¹½¨ÓëÓ¦ÓÃʵ¼ù
DevOpsʱ´úµÄ²âÊÔ¼¼ÊõÓë×î¼Ñʵ¼ù
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

LoadRunnerÐÔÄܲâÊÔ»ù´¡
Èí¼þ²âÊÔ½á¹û·ÖÎöºÍÖÊÁ¿±¨¸æ
ÃæÏò¶ÔÏóÈí¼þ²âÊÔ¼¼ÊõÑо¿
Éè¼Æ²âÊÔÓÃÀýµÄËÄÌõÔ­Ôò
¹¦ÄܲâÊÔÖйÊÕÏÄ£Ð͵Ľ¨Á¢
ÐÔÄܲâÊÔ×ÛÊö


ÐÔÄܲâÊÔ·½·¨Óë¼¼Êõ
²âÊÔ¹ý³ÌÓëÍŶӹÜÀí
LoadRunner½øÐÐÐÔÄܲâÊÔ
WEBÓ¦ÓõÄÈí¼þ²âÊÔ
ÊÖ»úÈí¼þ²âÊÔ
°×ºÐ²âÊÔ·½·¨Óë¼¼Êõ


ij²©²ÊÐÐÒµ Êý¾Ý¿â×Ô¶¯»¯²âÊÔ
IT·þÎñÉÌ Web°²È«²âÊÔ
IT·þÎñÉÌ ×Ô¶¯»¯²âÊÔ¿ò¼Ü
º£º½¹É·Ý µ¥Ôª²âÊÔ¡¢Öع¹
²âÊÔÐèÇó·ÖÎöÓë²âÊÔÓÃÀý·ÖÎö
»¥ÁªÍøweb²âÊÔ·½·¨Óëʵ¼ù
»ùÓÚSeleniumµÄWeb×Ô¶¯»¯²âÊÔ