Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
·Ö²¼Ê½¿ª·ÅÏûϢϵͳ(RocketMQ)µÄÔ­ÀíÓëʵ¼ù
 
  3898  次浏览      27
 2018-5-11 
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚ¼òÊ飬½éÉÜÁ˹ؼüÌØÐÔÒÔ¼°ÆäʵÏÖÔ­Àí£¬Rocket MQ ×î¼Ñʵ¼ù£¬×¨ÒµÊõÓïºÍÕûÌå¼Ü¹¹µÈ֪ʶ¡£

¹Ø¼üÌØÐÔÒÔ¼°ÆäʵÏÖÔ­Àí

Ò»¡¢Ë³ÐòÏûÏ¢

ÏûÏ¢ÓÐÐòÖ¸µÄÊÇÒ»ÀàÏûÏ¢Ïû·Ñʱ£¬Äܰ´ÕÕ·¢Ë͵Ä˳ÐòÀ´Ïû·Ñ¡£ÀýÈ磺һ¸ö¶©µ¥²úÉúÁË 3 ÌõÏûÏ¢£¬·Ö±ðÊǶ©µ¥´´½¨¡¢¶©µ¥¸¶¿î¡¢¶©µ¥Íê³É¡£Ïû·Ñʱ£¬Òª°´ÕÕÕâ¸ö˳ÐòÏû·Ñ²ÅÓÐÒâÒå¡£µ«Í¬Ê±¶©µ¥Ö®¼äÓÖÊÇ¿ÉÒÔ²¢ÐÐÏû·ÑµÄ¡£

¼ÙÈçÉú²úÕß²úÉúÁË2ÌõÏûÏ¢£ºM1¡¢M2£¬Òª±£Ö¤ÕâÁ½ÌõÏûÏ¢µÄ˳Ðò£¬Ó¦¸ÃÔõÑù×ö£¿ÄãÄÔÖÐÏëµ½µÄ¿ÉÄÜÊÇÕâÑù£º

Äã¿ÉÄÜ»á²ÉÓÃÕâÖÖ·½Ê½±£Ö¤ÏûϢ˳Ðò

M1·¢Ë͵½S1ºó£¬M2·¢Ë͵½S2£¬Èç¹ûÒª±£Ö¤M1ÏÈÓÚM2±»Ïû·Ñ£¬ÄÇôÐèÒªM1µ½´ïÏû·Ñ¶Ëºó£¬Í¨ÖªS2£¬È»ºóS2ÔÙ½«M2·¢Ë͵½Ïû·Ñ¶Ë¡£

Õâ¸öÄ£ÐÍ´æÔÚµÄÎÊÌâÊÇ£¬Èç¹ûM1ºÍM2·Ö±ð·¢Ë͵½Á½Ì¨ServerÉÏ£¬¾Í²»Äܱ£Ö¤M1ÏÈ´ïµ½£¬Ò²¾Í²»Äܱ£Ö¤M1±»ÏÈÏû·Ñ£¬ÄÇô¾ÍÐèÒªÔÚMQ Server¼¯ÈºÎ¬»¤ÏûÏ¢µÄ˳Ðò¡£ÄÇôÈçºÎ½â¾ö£¿Ò»ÖÖ¼òµ¥µÄ·½Ê½¾ÍÊǽ«M1¡¢M2·¢Ë͵½Í¬Ò»¸öServerÉÏ£º

±£Ö¤ÏûϢ˳Ðò£¬Äã¸Ä½øºóµÄ·½·¨

ÕâÑù¿ÉÒÔ±£Ö¤M1ÏÈÓÚM2µ½´ïMQServer£¨¿Í»§¶ËµÈ´ýM1³É¹¦ºóÔÙ·¢ËÍM2£©£¬¸ù¾ÝÏÈ´ïµ½Ïȱ»Ïû·ÑµÄÔ­Ôò£¬M1»áÏÈÓÚM2±»Ïû·Ñ£¬ÕâÑù¾Í±£Ö¤ÁËÏûÏ¢µÄ˳Ðò¡£

Õâ¸öÄ£ÐÍ£¬ÀíÂÛÉÏ¿ÉÒÔ±£Ö¤ÏûÏ¢µÄ˳Ðò£¬µ«ÔÚʵ¼ÊÔËÓÃÖÐÄãÓ¦¸Ã»áÓöµ½ÏÂÃæµÄÎÊÌ⣺

ÍøÂçÑÓ³ÙÎÊÌâ

Ö»Òª½«ÏûÏ¢´Óһ̨·þÎñÆ÷·¢ÍùÁíһ̨·þÎñÆ÷£¬¾Í»á´æÔÚÍøÂçÑÓ³ÙÎÊÌâ¡£ÈçÉÏͼËùʾ£¬Èç¹û·¢ËÍM1ºÄʱ´óÓÚ·¢ËÍM2µÄºÄʱ£¬ÄÇôM2¾ÍÏȱ»Ïû·Ñ£¬ÈÔÈ»²»Äܱ£Ö¤ÏûÏ¢µÄ˳Ðò¡£¼´Ê¹M1ºÍM2ͬʱµ½´ïÏû·Ñ¶Ë£¬ÓÉÓÚ²»Çå³þÏû·Ñ¶Ë1ºÍÏû·Ñ¶Ë2µÄ¸ºÔØÇé¿ö£¬ÈÔÈ»ÓпÉÄܳöÏÖM2ÏÈÓÚM1±»Ïû·Ñ¡£ÈçºÎ½â¾öÕâ¸öÎÊÌ⣿½«M1ºÍM2·¢Íùͬһ¸öÏû·ÑÕß¼´¿É£¬ÇÒ·¢ËÍM1ºó£¬ÐèÒªÏû·Ñ¶ËÏìÓ¦³É¹¦ºó²ÅÄÜ·¢ËÍM2¡£

µ«ÓÖ»áÒýÈëÁíÍâÒ»¸öÎÊÌ⣬Èç¹û·¢ËÍM1ºó£¬Ïû·Ñ¶Ë1ûÓÐÏìÓ¦£¬ÄÇÊǼÌÐø·¢ËÍM2ÄØ£¬»¹ÊÇÖØÐ·¢ËÍM1£¿Ò»°ãΪÁ˱£Ö¤ÏûÏ¢Ò»¶¨±»Ïû·Ñ£¬¿Ï¶¨»áÑ¡ÔñÖØ·¢M1µ½ÁíÍâÒ»¸öÏû·Ñ¶Ë2£¬¾ÍÈçÏÂͼËùʾ¡£

±£Ö¤ÏûϢ˳ÐòµÄÕýÈ·×ËÊÆ

ÕâÑùµÄÄ£Ð;ÍÑϸñ±£Ö¤ÏûÏ¢µÄ˳Ðò£¬Ï¸ÐĵÄÄãÈÔÈ»»á·¢ÏÖÎÊÌ⣬Ïû·Ñ¶Ë1ûÓÐÏìÓ¦ServerʱÓÐÁ½ÖÖÇé¿ö£¬Ò»ÖÖÊÇM1ȷʵûÓе½´ï£¬ÁíÍâÒ»ÖÖÇé¿öÊÇÏû·Ñ¶Ë1ÒѾ­ÏìÓ¦£¬µ«ÊÇServer¶ËûÓÐÊÕµ½¡£Èç¹ûÊǵڶþÖÖÇé¿ö£¬ÖØ·¢M1£¬¾Í»áÔì³ÉM1±»Öظ´Ïû·Ñ¡£Ò²¾ÍÊÇÎÒÃǺóÃæÒªËµµÄµÚ¶þ¸öÎÊÌ⣬ÏûÏ¢ÖØ¸´ÎÊÌâ¡£

»Ø¹ýÍ·À´¿´ÏûϢ˳ÐòÎÊÌ⣬ÑϸñµÄ˳ÐòÏûÏ¢·Ç³£ÈÝÒ×Àí½â£¬¶øÇÒ´¦ÀíÎÊÌâÒ²±È½ÏÈÝÒ×£¬ÒªÊµÏÖÑϸñµÄ˳ÐòÏûÏ¢£¬¼òµ¥ÇÒ¿ÉÐеİ취¾ÍÊÇ£º

±£Ö¤Éú²úÕß - MQServer - Ïû·ÑÕßÊÇÒ»¶ÔÒ»¶ÔÒ»µÄ¹ØÏµ

µ«ÊÇÕâÑùÉè¼Æ£¬²¢ÐжȾͳÉΪÁËÏûϢϵͳµÄÆ¿¾±£¨ÍÌÍÂÁ¿²»¹»£©£¬Ò²»áµ¼Ö¸ü¶àµÄÒì³£´¦Àí£¬±ÈÈ磺ֻҪÏû·Ñ¶Ë³öÏÖÎÊÌ⣬¾Í»áµ¼ÖÂÕû¸ö´¦ÀíÁ÷³Ì×èÈû£¬ÎÒÃDz»µÃ²»»¨·Ñ¸ü¶àµÄ¾«Á¦À´½â¾ö×èÈûµÄÎÊÌâ¡£

µ«ÎÒÃǵÄ×îÖÕÄ¿±êÊÇÒª¼¯ÈºµÄ¸ßÈÝ´íÐԺ͸ßÍÌÍÂÁ¿¡£ÕâËÆºõÊÇÒ»¶Ô²»¿Éµ÷ºÍµÄì¶Ü£¬ÄÇô°¢ÀïÊÇÈçºÎ½â¾öµÄ£¿

ÊÀ½çÉϽâ¾öÒ»¸ö¼ÆËã»úÎÊÌâ×î¼òµ¥µÄ·½·¨£º¡°Ç¡ºÃ¡±²»ÐèÒª½â¾öËü£¡¡ª¡ª Éòѯ

ÓÐЩÎÊÌ⣬¿´ÆðÀ´ºÜÖØÒª£¬µ«Êµ¼ÊÉÏÎÒÃÇ¿ÉÒÔͨ¹ýºÏÀíµÄÉè¼Æ»òÕß½«ÎÊÌâ·Ö½âÀ´¹æ±Ü¡£Èç¹ûÓ²Òª°Ñʱ¼ä»¨ÔÚ½â¾öËüÃÇÉíÉÏ£¬Êµ¼ÊÉÏÊÇÀ˷ѵģ¬Ð§ÂʵÍϵġ£´ÓÕâ¸ö½Ç¶ÈÀ´¿´ÏûÏ¢µÄ˳ÐòÎÊÌ⣬ÎÒÃÇ¿ÉÒԵóöÁ½¸ö½áÂÛ£º

1¡¢²»¹Ø×¢ÂÒÐòµÄÓ¦ÓÃʵ¼Ê´óÁ¿´æÔÚ

2¡¢¶ÓÁÐÎÞÐò²¢²»Òâζ×ÅÏûÏ¢ÎÞÐò

×îºóÎÒÃÇ´ÓÔ´Âë½Ç¶È·ÖÎöRocketMQÔõôʵÏÖ·¢ËÍ˳ÐòÏûÏ¢¡£

Ò»°ãÏûÏ¢ÊÇͨ¹ýÂÖѯËùÓжÓÁÐÀ´·¢Ë͵썏ºÔؾùºâ²ßÂÔ£©£¬Ë³ÐòÏûÏ¢¿ÉÒÔ¸ù¾ÝÒµÎñ£¬±ÈÈç˵¶©µ¥ºÅÏàͬµÄÏûÏ¢·¢Ë͵½Í¬Ò»¸ö¶ÓÁС£ÏÂÃæµÄʾÀýÖУ¬OrderIdÏàͬµÄÏûÏ¢£¬»á·¢Ë͵½Í¬Ò»¸ö¶ÓÁУº

// RocketMQĬÈÏÌṩÁËÁ½ÖÖ MessageQueueSelector ʵÏÖ £ºËæ»ú /Hash
SendResult sendResult = producer.send (msg, new MessageQueueSelector () {
@Override
public MessageQueue select (List mqs, Message msg , Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size ();
return mqs.get (index);
}
}, orderId );

ÔÚ»ñÈ¡µ½Â·ÓÉÐÅÏ¢ÒԺ󣬻á¸ù¾ÝMessageQueueSelectorʵÏÖµÄËã·¨À´Ñ¡ÔñÒ»¸ö¶ÓÁУ¬Í¬Ò»¸öOrderId»ñÈ¡µ½µÄ¶ÓÁÐÊÇͬһ¸ö¶ÓÁС£

private SendResult send() { // »ñÈ¡topic·ÓÉÐÅÏ¢
TopicPublishInfo topicPublishInfo = this .tryToFindTop icPublishInfo (msg.getTopic ());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// ¸ù¾ÝÎÒÃǵÄËã·¨ £¬Ñ¡ÔñÒ»¸ö·¢ËͶÓÁÐ
// ÕâÀïµÄarg = orderId
mq = selector.select (topicPublishInfo .ge tMessage QueueList (), msg, arg );
if (mq != null) {
return this. sendKernelImpl (msg, mq, communicationMode, sendCallback, timeout);
}
}
}

¶þ¡¢ÏûÏ¢ÖØ¸´

ÉÏÃæÔÚ½â¾öÏûϢ˳ÐòÎÊÌâʱ£¬ÒýÈëÁËÒ»¸öеÄÎÊÌ⣬¾ÍÊÇÏûÏ¢ÖØ¸´¡£ÄÇôRocketMQÊÇÔõÑù½â¾öÏûÏ¢ÖØ¸´µÄÎÊÌâÄØ£¿»¹ÊÇ¡°Ç¡ºÃ¡±²»½â¾ö¡£

Ôì³ÉÏûÏ¢µÄÖØ¸´µÄ¸ù±¾Ô­ÒòÊÇ£ºÍøÂç²»¿É´ï¡£Ö»ÒªÍ¨¹ýÍøÂç½»»»Êý¾Ý£¬¾ÍÎÞ·¨±ÜÃâÕâ¸öÎÊÌâ¡£ËùÒÔ½â¾öÕâ¸öÎÊÌâµÄ°ì·¨¾ÍÊDz»½â¾ö£¬×ª¶øÈƹýÕâ¸öÎÊÌâ¡£ÄÇôÎÊÌâ¾Í±ä³ÉÁË£ºÈç¹ûÏû·Ñ¶ËÊÕµ½Á½ÌõÒ»ÑùµÄÏûÏ¢£¬Ó¦¸ÃÔõÑù´¦Àí£¿

1¡¢Ïû·Ñ¶Ë´¦ÀíÏûÏ¢µÄÒµÎñÂß¼­±£³ÖÃݵÈÐÔ

2¡¢±£Ö¤Ã¿ÌõÏûÏ¢¶¼ÓÐΨһ±àºÅÇÒ±£Ö¤ÏûÏ¢´¦Àí³É¹¦ÓëÈ¥ÖØ±íµÄÈÕ־ͬʱ³öÏÖ

µÚ1ÌõºÜºÃÀí½â£¬Ö»Òª±£³ÖÃݵÈÐÔ£¬²»¹ÜÀ´¶àÉÙÌõÖØ¸´ÏûÏ¢£¬×îºó´¦ÀíµÄ½á¹û¶¼Ò»Ñù¡£µÚ2ÌõÔ­Àí¾ÍÊÇÀûÓÃÒ»ÕÅÈÕÖ¾±íÀ´¼Ç¼ÒѾ­´¦Àí³É¹¦µÄÏûÏ¢µÄID£¬Èç¹ûе½µÄÏûÏ¢IDÒѾ­ÔÚÈÕÖ¾±íÖУ¬ÄÇô¾Í²»ÔÙ´¦ÀíÕâÌõÏûÏ¢¡£

ÎÒÃÇ¿ÉÒÔ¿´µ½µÚ1ÌõµÄ½â¾ö·½Ê½£¬ºÜÃ÷ÏÔÓ¦¸ÃÔÚÏû·Ñ¶ËʵÏÖ£¬²»ÊôÓÚÏûϢϵͳҪʵÏֵŦÄÜ¡£µÚ2Ìõ¿ÉÒÔÏûϢϵͳʵÏÖ£¬Ò²¿ÉÒÔÒµÎñ¶ËʵÏÖ¡£Õý³£Çé¿öϳöÏÖÖØ¸´ÏûÏ¢µÄ¸ÅÂʲ»Ò»¶¨´ó£¬ÇÒÓÉÏûϢϵͳʵÏֵϰ£¬¿Ï¶¨»á¶ÔÏûϢϵͳµÄÍÌÍÂÁ¿ºÍ¸ß¿ÉÓÃÓÐÓ°Ï죬ËùÒÔ×îºÃ»¹ÊÇÓÉÒµÎñ¶Ë×Ô¼º´¦ÀíÏûÏ¢ÖØ¸´µÄÎÊÌ⣬ÕâÒ²ÊÇRocketMQ²»½â¾öÏûÏ¢ÖØ¸´µÄÎÊÌâµÄÔ­Òò¡£

RocketMQ²»±£Ö¤ÏûÏ¢²»Öظ´£¬Èç¹ûÄãµÄÒµÎñÐèÒª±£Ö¤ÑϸñµÄ²»Öظ´ÏûÏ¢£¬ÐèÒªÄã×Ô¼ºÔÚÒµÎñ¶ËÈ¥ÖØ¡£

Èý¡¢ÊÂÎñÏûÏ¢

RocketMQ³ýÁËÖ§³ÖÆÕͨÏûÏ¢£¬Ë³ÐòÏûÏ¢£¬ÁíÍ⻹֧³ÖÊÂÎñÏûÏ¢¡£Ê×ÏÈÌÖÂÛÒ»ÏÂʲôÊÇÊÂÎñÏûÏ¢ÒÔ¼°Ö§³ÖÊÂÎñÏûÏ¢µÄ±ØÒªÐÔ¡£ÎÒÃÇÒÔÒ»¸öתÕʵij¡¾°ÎªÀýÀ´ËµÃ÷Õâ¸öÎÊÌ⣺BobÏòSmithתÕË100¿é¡£

ÔÚµ¥»ú»·¾³Ï£¬Ö´ÐÐÊÂÎñµÄÇé¿ö£¬´ó¸ÅÊÇÏÂÃæÕâ¸öÑù×Ó£º

µ¥»ú»·¾³ÏÂתÕËÊÂÎñʾÒâͼ

µ±Óû§Ôö³¤µ½Ò»¶¨³Ì¶È£¬BobºÍSmithµÄÕË»§¼°Óà¶îÐÅÏ¢ÒѾ­²»ÔÚͬһ̨·þÎñÆ÷ÉÏÁË£¬ÄÇôÉÏÃæµÄÁ÷³Ì¾Í±ä³ÉÁËÕâÑù£º

¼¯Èº»·¾³ÏÂתÕËÊÂÎñʾÒâͼ

ÕâʱºòÄã»á·¢ÏÖ£¬Í¬ÑùÊÇÒ»¸öתÕ˵ÄÒµÎñ£¬ÔÚ¼¯Èº»·¾³Ï£¬ºÄʱ¾ÓÈ»³É±¶µÄÔö³¤£¬ÕâÏÔÈ»ÊDz»Äܹ»½ÓÊܵġ£ÄÇÎÒÃÇÈçºÎÀ´¹æ±ÜÕâ¸öÎÊÌ⣿

´óÊÂÎñ = СÊÂÎñ + Òì²½

½«´óÊÂÎñ²ð·Ö³É¶à¸öСÊÂÎñÒì²½Ö´ÐС£ÕâÑù»ù±¾ÉÏÄܹ»½«¿ç»úÊÂÎñµÄÖ´ÐÐЧÂÊÓÅ»¯µ½Óëµ¥»úÒ»Ö¡£×ªÕ˵ÄÊÂÎñ¾Í¿ÉÒÔ·Ö½â³ÉÈçÏÂÁ½¸öСÊÂÎñ£º

СÊÂÎñ+Òì²½ÏûÏ¢

ͼÖÐÖ´Ðб¾µØÊÂÎñ£¨BobÕË»§¿Û¿î£©ºÍ·¢ËÍÒì²½ÏûÏ¢Ó¦¸Ã±£³Öͬʱ³É¹¦»òÕßʧ°ÜÖУ¬Ò²¾ÍÊǿۿî³É¹¦ÁË£¬·¢ËÍÏûÏ¢Ò»¶¨Òª³É¹¦£¬Èç¹û¿Û¿îʧ°ÜÁË£¬¾Í²»ÄÜÔÙ·¢ËÍÏûÏ¢¡£ÄÇÎÊÌâÊÇ£ºÎÒÃÇÊÇÏȿۿÊÇÏÈ·¢ËÍÏûÏ¢ÄØ£¿

Ê×ÏÈÎÒÃÇ¿´Ï£¬ÏÈ·¢ËÍÏûÏ¢£¬´óÖµÄʾÒâͼÈçÏ£º

ÊÂÎñÏûÏ¢£ºÏÈ·¢ËÍÏûÏ¢

´æÔÚµÄÎÊÌâÊÇ£ºÈç¹ûÏûÏ¢·¢Ëͳɹ¦£¬µ«Êǿۿîʧ°Ü£¬Ïû·Ñ¶Ë¾Í»áÏû·Ñ´ËÏûÏ¢£¬½ø¶øÏòSmithÕË»§¼ÓÇ®¡£

ÏÈ·¢ÏûÏ¢²»ÐУ¬ÄÇÎÒÃǾÍÏȿۿîߣ¬´óÖµÄʾÒâͼÈçÏ£º

ÊÂÎñÏûÏ¢-Ïȿۿî

´æÔÚµÄÎÊÌâ¸úÉÏÃæÀàËÆ£ºÈç¹û¿Û¿î³É¹¦£¬·¢ËÍÏûϢʧ°Ü£¬¾Í»á³öÏÖBob¿ÛÇ®ÁË£¬µ«ÊÇSmithÕË»§Î´¼ÓÇ®¡£

¿ÉÄÜ´ó¼Ò»áÓкܶàµÄ·½·¨À´½â¾öÕâ¸öÎÊÌ⣬±ÈÈ磺ֱ½Ó½«·¢ÏûÏ¢·Åµ½Bob¿Û¿îµÄÊÂÎñÖÐÈ¥£¬Èç¹û·¢ËÍʧ°Ü£¬Å׳öÒì³££¬ÊÂÎñ»Ø¹ö¡£ÕâÑùµÄ´¦Àí·½Ê½Ò²·ûºÏ¡°Ç¡ºÃ¡±²»ÐèÒª½â¾öµÄÔ­Ôò¡£RocketMQÖ§³ÖÊÂÎñÏûÏ¢£¬ÏÂÃæÎÒÃÇÀ´¿´¿´RocketMQÊÇÔõÑùÀ´ÊµÏֵġ£

RocketMQʵÏÖ·¢ËÍÊÂÎñÏûÏ¢

RocketMQµÚÒ»½×¶Î·¢ËÍPreparedÏûϢʱ£¬»áÄõ½ÏûÏ¢µÄµØÖ·£¬µÚ¶þ½×¶ÎÖ´Ðб¾µØÊÂÎµÚÈý½×¶Îͨ¹ýµÚÒ»½×¶ÎÄõ½µÄµØÖ·È¥·ÃÎÊÏûÏ¢£¬²¢ÐÞ¸Ä״̬¡£Ï¸ÐĵÄÄã¿ÉÄÜÓÖ·¢ÏÖÎÊÌâÁË£¬Èç¹ûÈ·ÈÏÏûÏ¢·¢ËÍʧ°ÜÁËÔõô°ì£¿RocketMQ»á¶¨ÆÚɨÃèÏûÏ¢¼¯ÈºÖеÄÊÂÎïÏûÏ¢£¬Õâʱºò·¢ÏÖÁËPreparedÏûÏ¢£¬Ëü»áÏòÏûÏ¢·¢ËÍÕßÈ·ÈÏ£¬BobµÄÇ®µ½µ×ÊǼõÁË»¹ÊÇû¼õÄØ£¿Èç¹û¼õÁËÊǻعö»¹ÊǼÌÐø·¢ËÍÈ·ÈÏÏûÏ¢ÄØ£¿RocketMQ»á¸ù¾Ý·¢ËͶËÉèÖõIJßÂÔÀ´¾ö¶¨Êǻعö»¹ÊǼÌÐø·¢ËÍÈ·ÈÏÏûÏ¢¡£ÕâÑù¾Í±£Ö¤ÁËÏûÏ¢·¢ËÍÓë±¾µØÊÂÎñͬʱ³É¹¦»òͬʱʧ°Ü¡£

ÄÇÎÒÃÇÀ´¿´ÏÂRocketMQÔ´Â룬ÊDz»ÊÇÕâÑùÀ´´¦ÀíÊÂÎñÏûÏ¢µÄ¡£¿Í»§¶Ë·¢ËÍÊÂÎñÏûÏ¢µÄ²¿·Ö£¨ÍêÕû´úÂëÇë²é¿´£ºrocketmq-example¹¤³ÌϵÄcom.alibaba.rocketmq.example.transaction.TransactionProducer£©£º

// δ¾öÊÂÎñ£¬MQ·þÎñÆ÷»Ø²é¿Í»§¶Ë

// Ò²¾ÍÊÇÉÏÎÄËù˵µÄ£¬µ±RocketMQ·¢ÏÖ`PreparedÏûÏ¢`ʱ£¬»á¸ù¾ÝÕâ¸öListenerʵÏֵIJßÂÔÀ´¾ö¶ÏÊÂÎñ

TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

// ¹¹ÔìÊÂÎñÏûÏ¢µÄÉú²úÕß

TransactionMQProducer producer = new TransactionMQProducer("groupName");

// ÉèÖÃÊÂÎñ¾ö¶Ï´¦ÀíÀà

producer.setTransactionCheckListener(transactionCheckListener);

// ±¾µØÊÂÎñµÄ´¦ÀíÂß¼­£¬Ï൱ÓÚʾÀýÖмì²éBobÕË»§²¢¿ÛÇ®µÄÂß¼­

TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

producer.start()

// ¹¹ÔìMSG£¬Ê¡ÂÔ¹¹Ôì²ÎÊý

Message msg = new Message(......);

// ·¢ËÍÏûÏ¢

SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

producer.shutdown();

½Ó×Ų鿴sendMessageInTransaction·½·¨µÄÔ´Â룬×ܹ²·ÖΪ3¸ö½×¶Î£º·¢ËÍPreparedÏûÏ¢¡¢Ö´Ðб¾µØÊÂÎñ¡¢·¢ËÍÈ·ÈÏÏûÏ¢¡£

public TransactionSendResult sendMessageInTransaction(.....) {

// Âß¼­´úÂ룬·Çʵ¼Ê´úÂë

// 1.·¢ËÍÏûÏ¢

sendResult = this.send(msg);

// sendResult.getSendStatus() == SEND_OK

// 2.Èç¹ûÏûÏ¢·¢Ëͳɹ¦£¬´¦ÀíÓëÏûÏ¢¹ØÁªµÄ±¾µØÊÂÎñµ¥Ôª

LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);

// 3.½áÊøÊÂÎñ

this.endTransaction(sendResult, localTransactionState, localException);

}

endTransaction·½·¨»á½«ÇëÇó·¢Íùbroker(mq server)È¥¸üÐÂÊÂÎïÏûÏ¢µÄ×îÖÕ״̬£º

¸ù¾ÝsendResultÕÒµ½PreparedÏûÏ¢

¸ù¾ÝlocalTransaction¸üÐÂÏûÏ¢µÄ×îÖÕ״̬

Èç¹ûendTransaction·½·¨Ö´ÐÐʧ°Ü£¬µ¼ÖÂÊý¾ÝûÓз¢Ë͵½broker£¬broker»áÓлزéÏ̶߳¨Ê±£¨Ä¬ÈÏ1·ÖÖÓ£©É¨Ãèÿ¸ö´æ´¢ÊÂÎñ״̬µÄ±í¸ñÎļþ£¬Èç¹ûÊÇÒѾ­Ìá½»»òÕ߻عöµÄÏûÏ¢Ö±½ÓÌø¹ý£¬Èç¹ûÊÇprepared״̬Ôò»áÏòProducer·¢ÆðCheckTransactionÇëÇó£¬Producer»áµ÷ÓÃDefaultMQProducerImpl.checkTransactionState()·½·¨À´´¦ÀíbrokerµÄ¶¨Ê±»Øµ÷ÇëÇ󣬶øcheckTransactionState»áµ÷ÓÃÎÒÃǵÄÊÂÎñÉèÖõľö¶Ï·½·¨£¬×îºóµ÷ÓÃendTransactionOnewayÈÃbrokerÀ´¸üÐÂÏûÏ¢µÄ×îÖÕ״̬¡£

Ôٻص½×ªÕ˵ÄÀý×Ó£¬Èç¹ûBobµÄÕË»§µÄÓà¶îÒѾ­¼õÉÙ£¬ÇÒÏûÏ¢ÒѾ­·¢Ëͳɹ¦£¬Smith¶Ë¿ªÊ¼Ïû·ÑÕâÌõÏûÏ¢£¬Õâ¸öʱºò¾Í»á³öÏÖÏû·Ñʧ°ÜºÍÏû·Ñ³¬Ê±Á½¸öÎÊÌ⣿½â¾ö³¬Ê±ÎÊÌâµÄ˼·¾ÍÊÇÒ»Ö±ÖØÊÔ£¬Ö±µ½Ïû·Ñ¶ËÏû·ÑÏûÏ¢³É¹¦£¬Õû¸ö¹ý³ÌÖÐÓпÉÄÜ»á³öÏÖÏûÏ¢ÖØ¸´µÄÎÊÌ⣬°´ÕÕÇ°ÃæµÄ˼·½â¾ö¼´¿É¡£

Ïû·ÑÊÂÎñÏûÏ¢

ÕâÑù»ù±¾ÉÏ¿ÉÒÔ½â¾ö³¬Ê±ÎÊÌ⣬µ«ÊÇÈç¹ûÏû·Ñʧ°ÜÔõô°ì£¿°¢ÀïÌṩ¸øÎÒÃǵĽâ¾ö·½·¨ÊÇ£ºÈ˹¤½â¾ö¡£´ó¼Ò¿ÉÒÔ¿¼ÂÇһϣ¬°´ÕÕÊÂÎñµÄÁ÷³Ì£¬ÒòΪijÖÖÔ­ÒòSmith¼Ó¿îʧ°Ü£¬ÐèÒª»Ø¹öÕû¸öÁ÷³Ì¡£Èç¹ûÏûϢϵͳҪʵÏÖÕâ¸ö»Ø¹öÁ÷³ÌµÄ»°£¬ÏµÍ³¸´ÔӶȽ«´ó´óÌáÉý£¬ÇÒºÜÈÝÒ׳öÏÖBug£¬¹À¼Æ³öÏÖBugµÄ¸ÅÂÊ»á±ÈÏû·Ñʧ°ÜµÄ¸ÅÂÊ´óºÜ¶à¡£ÎÒÃÇÐèÒªºâÁ¿ÊÇ·ñÖµµÃ»¨Õâô´óµÄ´ú¼ÛÀ´½â¾öÕâÑùÒ»¸ö³öÏÖ¸ÅÂʷdz£Ð¡µÄÎÊÌ⣬ÕâÒ²ÊÇ´ó¼ÒÔÚ½â¾öÒÉÄÑÎÊÌâʱÐèÒª¶à¶à˼¿¼µÄµØ·½¡£

20160321²¹³ä£ºÔÚ3.2.6°æ±¾ÖÐÒÆ³ýÁËÊÂÎñÏûÏ¢µÄʵÏÖ£¬ËùÒԴ˰汾²»Ö§³ÖÊÂÎñÏûÏ¢£¬¾ßÌåÇé¿öÇë²Î¿¼rocketmqµÄissues£º

https://github.com/alibaba/RocketMQ/issues/65

https://github.com/alibaba/RocketMQ/issues/138

https://github.com/alibaba/RocketMQ/issues/156

ËÄ¡¢ProducerÈçºÎ·¢ËÍÏûÏ¢

ProducerÂÖѯijtopicϵÄËùÓжÓÁеķ½Ê½À´ÊµÏÖ·¢ËÍ·½µÄ¸ºÔؾùºâ£¬ÈçÏÂͼËùʾ£º

producer·¢ËÍÏûÏ¢¸ºÔؾùºâ

Ê×ÏÈ·ÖÎöÒ»ÏÂRocketMQµÄ¿Í»§¶Ë·¢ËÍÏûÏ¢µÄÔ´Â룺

// ¹¹ÔìProducer

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

// ³õʼ»¯Producer£¬Õû¸öÓ¦ÓÃÉúÃüÖÜÆÚÄÚ£¬Ö»ÐèÒª³õʼ»¯1´Î

producer.start();

// ¹¹ÔìMessage

Message msg = new Message("TopicTest1",// topic

"TagA",// tag£º¸øÏûÏ¢´ò±êÇ©,ÓÃÓÚÇø·ÖÒ»ÀàÏûÏ¢£¬¿ÉΪnull

"OrderID188",// key£º×Ô¶¨ÒåKey£¬¿ÉÒÔÓÃÓÚÈ¥ÖØ£¬¿ÉΪnull

("Hello MetaQ").getBytes());// body£ºÏûÏ¢ÄÚÈÝ

// ·¢ËÍÏûÏ¢²¢·µ»Ø½á¹û

SendResult sendResult = producer.send(msg);

// ÇåÀí×ÊÔ´£¬¹Ø±ÕÍøÂçÁ¬½Ó£¬×¢Ïú×Ô¼º

producer.shutdown();

ÔÚÕû¸öÓ¦ÓÃÉúÃüÖÜÆÚÄÚ£¬Éú²úÕßÐèÒªµ÷ÓÃÒ»´Îstart·½·¨À´³õʼ»¯£¬³õʼ»¯Ö÷ÒªÍê³ÉµÄÈÎÎñÓУº

Èç¹ûûÓÐÖ¸¶¨namesrvµØÖ·£¬½«»á×Ô¶¯Ñ°Ö·

Æô¶¯¶¨Ê±ÈÎÎñ£º¸üÐÂnamesrvµØÖ·¡¢´Ónamsrv¸üÐÂtopic·ÓÉÐÅÏ¢¡¢ÇåÀíÒѾ­¹ÒµôµÄbroker¡¢ÏòËùÓÐbroker·¢ËÍÐÄÌø...

Æô¶¯¸ºÔؾùºâµÄ·þÎñ

³õʼ»¯Íê³Éºó£¬¿ªÊ¼·¢ËÍÏûÏ¢£¬·¢ËÍÏûÏ¢µÄÖ÷Òª´úÂëÈçÏ£º

private SendResult sendDefaultImpl(Message msg,......) {

// ¼ì²éProducerµÄ״̬ÊÇ·ñÊÇRUNNING

this.makeSureStateOK();

// ¼ì²émsgÊÇ·ñºÏ·¨£ºÊÇ·ñΪnull¡¢topic,bodyÊÇ·ñΪ¿Õ¡¢bodyÊÇ·ñ³¬³¤

Validators.checkMessage(msg, this.defaultMQProducer);

// »ñÈ¡topic·ÓÉÐÅÏ¢

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

// ´Ó·ÓÉÐÅÏ¢ÖÐÑ¡ÔñÒ»¸öÏûÏ¢¶ÓÁÐ

MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);

// ½«ÏûÏ¢·¢Ë͵½¸Ã¶ÓÁÐÉÏÈ¥

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);

}

´úÂëÖÐÐèÒª¹Ø×¢µÄÁ½¸ö·½·¨tryToFindTopicPublishInfoºÍselectOneMessageQueue¡£Ç°ÃæËµ¹ýÔÚproducer³õʼ»¯Ê±£¬»áÆô¶¯¶¨Ê±ÈÎÎñ»ñȡ·ÓÉÐÅÏ¢²¢¸üе½±¾µØ»º´æ£¬ËùÒÔtryToFindTopicPublishInfo»áÊ×ÏÈ´Ó»º´æÖлñÈ¡topic·ÓÉÐÅÏ¢£¬Èç¹ûûÓлñÈ¡µ½£¬Ôò»á×Ô¼ºÈ¥namesrv»ñȡ·ÓÉÐÅÏ¢¡£selectOneMessageQueue·½·¨Í¨¹ýÂÖѯµÄ·½Ê½£¬·µ»ØÒ»¸ö¶ÓÁУ¬ÒÔ´ïµ½¸ºÔؾùºâµÄÄ¿µÄ¡£

Èç¹ûProducer·¢ËÍÏûϢʧ°Ü£¬»á×Ô¶¯ÖØÊÔ£¬ÖØÊԵIJßÂÔ£º

ÖØÊÔ´ÎÊý < retryTimesWhenSendFailed£¨¿ÉÅäÖã©

×ܵĺÄʱ£¨°üº¬ÖØÊÔn´ÎµÄºÄʱ£© < sendMsgTimeout£¨·¢ËÍÏûϢʱ´«ÈëµÄ²ÎÊý£©

ͬʱÂú×ãÉÏÃæÁ½¸öÌõ¼þºó£¬Producer»áÑ¡ÔñÁíÍâÒ»¸ö¶ÓÁз¢ËÍÏûÏ¢

Îå¡¢ÏûÏ¢´æ´¢

RocketMQµÄÏûÏ¢´æ´¢ÊÇÓÉconsume queueºÍcommit logÅäºÏÍê³ÉµÄ¡£

1¡¢Consume Queue

consume queueÊÇÏûÏ¢µÄÂß¼­¶ÓÁУ¬Ï൱ÓÚ×ÖµäµÄĿ¼£¬ÓÃÀ´Ö¸¶¨ÏûÏ¢ÔÚÎïÀíÎļþcommit logÉϵÄλÖá£

ÎÒÃÇ¿ÉÒÔÔÚÅäÖÃÖÐÖ¸¶¨consumequeueÓëcommitlog´æ´¢µÄĿ¼

ÿ¸ötopicϵÄÿ¸öqueue¶¼ÓÐÒ»¸ö¶ÔÓ¦µÄconsumequeueÎļþ£¬±ÈÈ磺

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume QueueÎļþ×éÖ¯£¬ÈçͼËùʾ£º

Consume QueueÎļþ×é֯ʾÒâͼ

¸ù¾ÝtopicºÍqueueIdÀ´×éÖ¯Îļþ£¬Í¼ÖÐTopicAÓÐÁ½¸ö¶ÓÁÐ0,1£¬ÄÇôTopicAºÍQueueId=0×é³ÉÒ»¸öConsumeQueue£¬TopicAºÍQueueId=1×é³ÉÁíÒ»¸öConsumeQueue¡£

°´ÕÕÏû·Ñ¶ËµÄGroupNameÀ´·Ö×éÖØÊÔ¶ÓÁУ¬Èç¹ûÏû·Ñ¶ËÏû·Ñʧ°Ü£¬ÏûÏ¢½«±»·¢ÍùÖØÊÔ¶ÓÁÐÖУ¬±ÈÈçͼÖеÄ%RETRY%ConsumerGroupA¡£

°´ÕÕÏû·Ñ¶ËµÄGroupNameÀ´·Ö×éËÀÐŶÓÁУ¬Èç¹ûÏû·Ñ¶ËÏû·Ñʧ°Ü£¬²¢ÖØÊÔÖ¸¶¨´ÎÊýºó£¬ÈÔȻʧ°Ü£¬Ôò·¢ÍùËÀÐŶÓÁУ¬±ÈÈçͼÖеÄ%DLQ%ConsumerGroupA¡£

ËÀÐŶÓÁУ¨Dead Letter Queue£©Ò»°ãÓÃÓÚ´æ·ÅÓÉÓÚijÖÖÔ­ÒòÎÞ·¨´«µÝµÄÏûÏ¢£¬±ÈÈç´¦Àíʧ°Ü»òÕßÒѾ­¹ýÆÚµÄÏûÏ¢¡£

Consume QueueÖд洢µ¥ÔªÊÇÒ»¸ö20×Ö½Ú¶¨³¤µÄ¶þ½øÖÆÊý¾Ý£¬Ë³Ðòд˳Ðò¶Á£¬ÈçÏÂͼËùʾ£º

consumequeueÎļþ´æ´¢µ¥Ôª¸ñʽ

CommitLog OffsetÊÇÖ¸ÕâÌõÏûÏ¢ÔÚCommit LogÎļþÖеÄʵ¼ÊÆ«ÒÆÁ¿

Size´æ´¢ÖÐÏûÏ¢µÄ´óС

Message Tag HashCode´æ´¢ÏûÏ¢µÄTagµÄ¹þÏ£Öµ£ºÖ÷ÒªÓÃÓÚ¶©ÔÄʱÏûÏ¢¹ýÂË£¨¶©ÔÄʱÈç¹ûÖ¸¶¨ÁËTag£¬»á¸ù¾ÝHashCodeÀ´¿ìËÙ²éÕÒµ½¶©ÔĵÄÏûÏ¢£©

2¡¢Commit Log

CommitLog£ºÏûÏ¢´æ·ÅµÄÎïÀíÎļþ£¬Ã¿Ì¨brokerÉϵÄcommitlog±»±¾»úËùÓеÄqueue¹²Ïí£¬²»×öÈκÎÇø·Ö¡£

ÎļþµÄĬÈÏλÖÃÈçÏ£¬ÈÔÈ»¿Éͨ¹ýÅäÖÃÎļþÐ޸ģº

${user.home} \store\${commitlog}\${fileName}

CommitLogµÄÏûÏ¢´æ´¢µ¥Ôª³¤¶È²»¹Ì¶¨£¬Îļþ˳Ðòд£¬Ëæ»ú¶Á¡£ÏûÏ¢µÄ´æ´¢½á¹¹ÈçϱíËùʾ£¬°´ÕÕ±àºÅ˳ÐòÒÔ¼°±àºÅ¶ÔÓ¦µÄÄÚÈÝÒÀ´Î´æ´¢¡£

Commit Log´æ´¢µ¥Ôª½á¹¹Í¼

3¡¢ÏûÏ¢´æ´¢ÊµÏÖ

ÏûÏ¢´æ´¢ÊµÏÖ£¬±È½Ï¸´ÔÓ£¬Ò²ÖµµÃ´ó¼ÒÉîÈëÁ˽⣬ºóÃæ»áµ¥¶À³ÉÎÄÀ´·ÖÎö£¬ÕâС½ÚÖ»ÒÔ´úÂë˵Ã÷һϾßÌåµÄÁ÷³Ì¡£

// Set the storage time msg.setStoreTimestamp (System . current TimeMillis ());
// Set the message body BODY CRC (consider the most app ropriate setting
msg. setBodyCRC (UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this. default Message Store .get StoreStatsService ();
synchronized (this) {
long beginLockTimestamp = this. defaultMessageStore. get SystemClock ().now();
// Here settings are stored timestamp, in order to ensure an orderly global
msg. setStoreTimestamp (beginLockTimestamp);
// MapedFile£º²Ù×÷ÎïÀíÎļþÔÚÄÚ´æÖеÄÓ³ÉäÒÔ¼°½«ÄÚ´æÊý¾Ý³Ö¾Ã»¯µ½ÎïÀíÎļþÖÐ
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
// ½«Message×·¼Óµ½Îļþcommitlog
result = mapedFile.appendMessage (msg, this. appendMessage Callback );
switch (result.getStatus()) {
case PUT_OK : break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
result = mapedFile.appendMessage(msg, this. appendMessage Callback );
break;
DispatchRequest dispatchRequest = new DispatchRequest(
topic ,// 1
queueId ,// 2
result.getWroteOffset (),// 3
result.getWroteBytes (),// 4
tagsCode,// 5
msg.getStoreTimestamp (),// 6
result.getLogicsOffset (),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg. getSysFlag(),// 9
msg. getPreparedTransactionOffset());// 10
// 1.·Ö·¢ÏûϢλÖõ½ConsumeQueue
// 2.·Ö·¢µ½IndexService½¨Á¢Ë÷Òý
this.defaultMessageStore.putDispatchRequest (dispatchRequest ) ;
}

4¡¢ÏûÏ¢µÄË÷ÒýÎļþ

Èç¹ûÒ»¸öÏûÏ¢°üº¬keyÖµµÄ»°£¬»áʹÓÃIndexFile´æ´¢ÏûÏ¢Ë÷Òý£¬ÎļþµÄÄÚÈݽṹÈçͼ£º

ÏûÏ¢Ë÷Òý

Ë÷ÒýÎļþÖ÷ÒªÓÃÓÚ¸ù¾ÝkeyÀ´²éѯÏûÏ¢µÄ£¬Á÷³ÌÖ÷ÒªÊÇ£º

¸ù¾Ý²éѯµÄ key µÄ hashcode%slotNum µÃµ½¾ßÌåµÄ²ÛµÄλÖÃ(slotNum ÊÇÒ»¸öË÷ÒýÎļþÀïÃæ°üº¬µÄ×î´ó²ÛµÄÊýÄ¿£¬ÀýÈçͼÖÐËùʾ slotNum=5000000)

¸ù¾Ý slotValue(slot λÖöÔÓ¦µÄÖµ)²éÕÒµ½Ë÷ÒýÏîÁбíµÄ×îºóÒ»Ïî(µ¹ÐòÅÅÁÐ,slotValue ×ÜÊÇÖ¸Ïò×îеÄÒ»¸öË÷ÒýÏî)

±éÀúË÷ÒýÏîÁÐ±í·µ»Ø²éѯʱ¼ä·¶Î§ÄڵĽá¹û¼¯(ĬÈÏÒ»´Î×î´ó·µ»ØµÄ 32 Ìõ¼Ç¼)

Áù¡¢ÏûÏ¢¶©ÔÄ

RocketMQÏûÏ¢¶©ÔÄÓÐÁ½ÖÖģʽ£¬Ò»ÖÖÊÇPushģʽ£¬¼´MQServerÖ÷¶¯ÏòÏû·Ñ¶ËÍÆËÍ£»ÁíÍâÒ»ÖÖÊÇPullģʽ£¬¼´Ïû·Ñ¶ËÔÚÐèҪʱ£¬Ö÷¶¯µ½MQServerÀ­È¡¡£µ«ÔÚ¾ßÌåʵÏÖʱ£¬PushºÍPullģʽ¶¼ÊDzÉÓÃÏû·Ñ¶ËÖ÷¶¯À­È¡µÄ·½Ê½¡£

Ê×ÏÈ¿´ÏÂÏû·Ñ¶ËµÄ¸ºÔؾùºâ£º

Ïû·Ñ¶Ë¸ºÔؾùºâ

Ïû·Ñ¶Ë»áͨ¹ýRebalanceServiceỊ̈߳¬10ÃëÖÓ×öÒ»´Î»ùÓÚtopicϵÄËùÓжÓÁиºÔØ£º

±éÀúConsumerϵÄËùÓÐtopic£¬È»ºó¸ù¾Ýtopic¶©ÔÄËùÓеÄÏûÏ¢

»ñȡͬһtopicºÍConsumer GroupϵÄËùÓÐConsumer

È»ºó¸ù¾Ý¾ßÌåµÄ·ÖÅä²ßÂÔÀ´·ÖÅäÏû·Ñ¶ÓÁУ¬·ÖÅäµÄ²ßÂÔ°üº¬£ºÆ½¾ù·ÖÅä¡¢Ïû·Ñ¶ËÅäÖõÈ

ÈçͬÉÏͼËùʾ£ºÈç¹ûÓÐ 5 ¸ö¶ÓÁУ¬2 ¸ö consumer£¬ÄÇôµÚÒ»¸ö Consumer Ïû·Ñ 3 ¸ö¶ÓÁУ¬µÚ¶þ consumer Ïû·Ñ 2 ¸ö¶ÓÁС£ÕâÀï²ÉÓõľÍÊÇÆ½¾ù·ÖÅä²ßÂÔ£¬ËüÀàËÆÓÚÎÒÃǵķÖÒ³£¬TOPICÏÂÃæµÄËùÓÐqueue¾ÍÊǼǼ£¬ConsumerµÄ¸öÊý¾ÍÏ൱ÓÚ×ܵÄÒ³Êý£¬ÄÇôÿҳÓжàÉÙÌõ¼Ç¼£¬¾ÍÀàËÆÓÚij¸öConsumer»áÏû·ÑÄÄЩ¶ÓÁС£

ͨ¹ýÕâÑùµÄ²ßÂÔÀ´´ïµ½´óÌåÉÏµÄÆ½¾ùÏû·Ñ£¬ÕâÑùµÄÉè¼ÆÒ²¿ÉÒԺܷ½ÃæµÄˮƽÀ©Õ¹ConsumerÀ´Ìá¸ßÏû·ÑÄÜÁ¦¡£

Ïû·Ñ¶ËµÄPushģʽÊÇͨ¹ý³¤ÂÖѯµÄģʽÀ´ÊµÏֵ쬾ÍÈçͬÏÂͼ£º

PushģʽʾÒâͼ

Consumer¶Ëÿ¸ôÒ»¶Îʱ¼äÖ÷¶¯Ïòbroker·¢ËÍÀ­ÏûÏ¢ÇëÇó£¬brokerÔÚÊÕµ½PullÇëÇóºó£¬Èç¹ûÓÐÏûÏ¢¾ÍÁ¢¼´·µ»ØÊý¾Ý£¬Consumer¶ËÊÕµ½·µ»ØµÄÏûÏ¢ºó£¬Ôٻص÷Ïû·ÑÕßÉèÖõÄListener·½·¨¡£Èç¹ûbrokerÔÚÊÕµ½PullÇëÇóʱ£¬ÏûÏ¢¶ÓÁÐÀïûÓÐÊý¾Ý£¬broker¶Ë»á×èÈûÇëÇóÖ±µ½ÓÐÊý¾Ý´«µÝ»ò³¬Ê±²Å·µ»Ø¡£

µ±È»£¬Consumer¶ËÊÇͨ¹ýÒ»¸öÏ߳̽«×èÈû¶ÓÁÐLinkedBlockingQueueÖеÄPullRequest·¢Ë͵½brokerÀ­È¡ÏûÏ¢£¬ÒÔ·ÀÖ¹ConsumerÒ»Ö±»×èÈû¡£¶øBroker¶Ë£¬ÔÚ½ÓÊÕµ½ConsumerµÄPullRequestʱ£¬Èç¹û·¢ÏÖûÓÐÏûÏ¢£¬¾Í»á°ÑPullRequestÈÓµ½ConcurrentHashMapÖлº´æÆðÀ´¡£brokerÔÚÆô¶¯Ê±£¬»áÆô¶¯Ò»¸öÏ̲߳»Í£µÄ´ÓConcurrentHashMapÈ¡³öPullRequest¼ì²é£¬Ö±µ½ÓÐÊý¾Ý·µ»Ø¡£

Æß¡¢RocketMQµÄÆäËûÌØÐÔ

Ç°ÃæµÄ6¸öÌØÐÔ¶¼ÊÇ»ù±¾É϶¼Êǵ㵽Ϊֹ£¬ÏëÒªÉîÈëÁ˽⣬»¹ÐèÒª´ó¼Ò¶à¶à²é¿´Ô´Â룬¶à¶àÔÚʵ¼ÊÖÐÔËÓᣵ±È»³ýÁËÒѾ­Ìáµ½µÄÌØÐÔÍ⣬RocketMQ»¹Ö§³Ö£º

¶¨Ê±ÏûÏ¢

ÏûÏ¢µÄË¢Å̲ßÂÔ

Ö÷¶¯Í¬²½²ßÂÔ£ºÍ¬²½Ë«Ð´¡¢Òì²½¸´ÖÆ

º£Á¿ÏûÏ¢¶Ñ»ýÄÜÁ¦

¸ßЧͨÐÅ

.......

ÆäÖÐÉæ¼°µ½µÄºÜ¶àÉè¼ÆË¼Â·ºÍ½â¾ö·½·¨¶¼ÖµµÃÎÒÃÇÉîÈëÑо¿£º

ÏûÏ¢µÄ´æ´¢Éè¼Æ£º¼ÈÒªÂú×㺣Á¿ÏûÏ¢µÄ¶Ñ»ýÄÜÁ¦£¬ÓÖÒªÂú×㼫¿ìµÄ²éѯЧÂÊ£¬»¹Òª±£Ö¤Ð´ÈëµÄЧÂÊ¡£

¸ßЧµÄͨÐÅ×é¼þÉè¼Æ£º¸ßÍÌÍÂÁ¿£¬ºÁÃë¼¶µÄÏûϢͶµÝÄÜÁ¦¶¼Àë²»¿ª¸ßЧµÄͨÐÅ¡£

.......

RocketMQ×î¼Ñʵ¼ù

Ò»¡¢Producer×î¼Ñʵ¼ù

1¡¢Ò»¸öÓ¦Óþ¡¿ÉÄÜÓÃÒ»¸ö Topic£¬ÏûÏ¢×ÓÀàÐÍÓà tags À´±êʶ£¬tags ¿ÉÒÔÓÉÓ¦ÓÃ×ÔÓÉÉèÖá£Ö»Óз¢ËÍÏûÏ¢ÉèÖÃÁËtags£¬Ïû·Ñ·½ÔÚ¶©ÔÄÏûϢʱ£¬²Å¿ÉÒÔÀûÓà tags ÔÚ broker ×öÏûÏ¢¹ýÂË¡£

2¡¢Ã¿¸öÏûÏ¢ÔÚÒµÎñ²ãÃæµÄΨһ±êʶÂ룬ҪÉèÖõ½ keys ×ֶΣ¬·½±ã½«À´¶¨Î»ÏûÏ¢¶ªÊ§ÎÊÌâ¡£ÓÉÓÚÊǹþÏ£Ë÷Òý£¬ÇëÎñ±Ø±£Ö¤ key ¾¡¿ÉÄÜΨһ£¬ÕâÑù¿ÉÒÔ±ÜÃâDZÔڵĹþÏ£³åÍ»¡£

3¡¢ÏûÏ¢·¢Ëͳɹ¦»òÕßʧ°Ü£¬Òª´òÓ¡ÏûÏ¢ÈÕÖ¾£¬Îñ±ØÒª´òÓ¡ sendresult ºÍ key ×ֶΡ£

4¡¢¶ÔÓÚÏûÏ¢²»¿É¶ªÊ§Ó¦Óã¬Îñ±ØÒªÓÐÏûÏ¢ÖØ·¢»úÖÆ¡£ÀýÈ磺ÏûÏ¢·¢ËÍʧ°Ü£¬´æ´¢µ½Êý¾Ý¿â£¬ÄÜÓж¨Ê±³ÌÐò³¢ÊÔÖØ·¢»òÕßÈ˹¤´¥·¢ÖØ·¢¡£

5¡¢Ä³Ð©Ó¦ÓÃÈç¹û²»¹Ø×¢ÏûÏ¢ÊÇ·ñ·¢Ëͳɹ¦£¬ÇëÖ±½ÓʹÓÃsendOneWay·½·¨·¢ËÍÏûÏ¢¡£

¶þ¡¢Consumer×î¼Ñʵ¼ù

1¡¢Ïû·Ñ¹ý³ÌÒª×öµ½Ãݵȣ¨¼´Ïû·Ñ¶ËÈ¥ÖØ£©

2¡¢¾¡Á¿Ê¹ÓÃÅúÁ¿·½Ê½Ïû·Ñ·½Ê½£¬¿ÉÒԺܴó³Ì¶ÈÉÏÌá¸ßÏû·ÑÍÌÍÂÁ¿¡£

3¡¢ÓÅ»¯Ã¿ÌõÏûÏ¢Ïû·Ñ¹ý³Ì

Èý¡¢ÆäËûÅäÖÃ

ÏßÉÏÓ¦¸Ã¹Ø±ÕautoCreateTopicEnable£¬¼´ÔÚÅäÖÃÎļþÖн«ÆäÉèÖÃΪfalse¡£

RocketMQÔÚ·¢ËÍÏûϢʱ£¬»áÊ×ÏÈ»ñȡ·ÓÉÐÅÏ¢¡£Èç¹ûÊÇеÄÏûÏ¢£¬ÓÉÓÚMQServerÉÏÃæ»¹Ã»Óд´½¨¶ÔÓ¦µÄTopic£¬Õâ¸öʱºò£¬Èç¹ûÉÏÃæµÄÅäÖôò¿ªµÄ»°£¬»á·µ»ØÄ¬ÈÏTOPICµÄ£¨RocketMQ»áÔÚÿ̨brokerÉÏÃæ´´½¨ÃûΪTBW102µÄTOPIC£©Â·ÓÉÐÅÏ¢£¬È»ºóProducer»áÑ¡Ôñһ̨Broker·¢ËÍÏûÏ¢£¬Ñ¡ÖеÄbrokerÔÚ´æ´¢ÏûϢʱ£¬·¢ÏÖÏûÏ¢µÄtopic»¹Ã»Óд´½¨£¬¾Í»á×Ô¶¯´´½¨topic¡£ºó¹û¾ÍÊÇ£ºÒÔºóËùÓиÃTOPICµÄÏûÏ¢£¬¶¼½«·¢Ë͵½Õą̂brokerÉÏ£¬´ï²»µ½¸ºÔؾùºâµÄÄ¿µÄ¡£

ËùÒÔ»ùÓÚĿǰRocketMQµÄÉè¼Æ£¬½¨Ò鹨±Õ×Ô¶¯´´½¨TOPICµÄ¹¦ÄÜ£¬È»ºó¸ù¾ÝÏûÏ¢Á¿µÄ´óС£¬ÊÖ¶¯´´½¨TOPIC¡£

RocketMQÉè¼ÆÏà¹Ø

RocketMQµÄÉè¼Æ¼Ù¶¨£º

ÿ̨PC»úÆ÷¶¼¿ÉÄÜå´»ú²»¿É·þÎñ

ÈÎÒ⼯Ⱥ¶¼ÓпÉÄÜ´¦ÀíÄÜÁ¦²»×ã

×µÄÇé¿öÒ»¶¨»á·¢Éú

ÄÚÍø»·¾³ÐèÒªµÍÑÓ³ÙÀ´Ìṩ×î¼ÑÓû§ÌåÑé

RocketMQµÄ¹Ø¼üÉè¼Æ£º

·Ö²¼Ê½¼¯Èº»¯

Ç¿Êý¾Ý°²È«

º£Á¿Êý¾Ý¶Ñ»ý

ºÁÃ뼶ͶµÝÑÓ³Ù£¨ÍÆÀ­Ä£Ê½£©

ÕâÊÇRocketMQÔÚÉè¼ÆÊ±µÄ¼Ù¶¨Ç°ÌáÒÔ¼°ÐèÒªµ½´ïµÄЧ¹û¡£ÎÒÏëÕâЩ¼Ù¶¨ÊÊÓÃÓÚËùÓеÄϵͳÉè¼Æ¡£Ëæ×ÅÎÒÃÇϵͳµÄ·þÎñµÄÔö¶à£¬Ã¿Î»¿ª·¢Õß¶¼Òª×¢Òâ×Ô¼ºµÄ³ÌÐòÊÇ·ñ´æÔÚµ¥µã¹ÊÕÏ£¬Èç¹û¹ÒÁËÓ¦¸ÃÔõô»Ö¸´¡¢Äܲ»ÄܺܺõÄˮƽÀ©Õ¹¡¢¶ÔÍâµÄ½Ó¿ÚÊÇ·ñ×ã¹»¸ßЧ¡¢×Ô¼º¹ÜÀíµÄÊý¾ÝÊÇ·ñ×ã¹»°²È«...... ¶à¶à¹æ·¶×Ô¼ºµÄÉè¼Æ£¬²ÅÄÜ¿ª·¢³ö¸ßЧ½¡×³µÄ³ÌÐò¡£

¸½Â¼£ºRocketMQÉæ¼°µ½µÄ¼¸¸öרҵÊõÓïºÍÕûÌå¼Ü¹¹½éÉÜ

Ò»¡¢RocketMQÖеÄרҵÊõÓï

Topic

topic±íʾÏûÏ¢µÄµÚÒ»¼¶ÀàÐÍ£¬±ÈÈçÒ»¸öµçÉÌϵͳµÄÏûÏ¢¿ÉÒÔ·ÖΪ£º½»Ò×ÏûÏ¢¡¢ÎïÁ÷ÏûÏ¢...... Ò»ÌõÏûÏ¢±ØÐëÓÐÒ»¸öTopic¡£

Tag

Tag±íʾÏûÏ¢µÄµÚ¶þ¼¶ÀàÐÍ£¬±ÈÈç½»Ò×ÏûÏ¢ÓÖ¿ÉÒÔ·ÖΪ£º½»Ò×´´½¨ÏûÏ¢£¬½»Ò×Íê³ÉÏûÏ¢..... Ò»ÌõÏûÏ¢¿ÉÒÔûÓÐTag¡£RocketMQÌṩ2¼¶ÏûÏ¢·ÖÀ࣬·½±ã´ó¼ÒÁé»î¿ØÖÆ¡£

Queue

Ò»¸ötopicÏ£¬ÎÒÃÇ¿ÉÒÔÉèÖöà¸öqueue(ÏûÏ¢¶ÓÁÐ)¡£µ±ÎÒÃÇ·¢ËÍÏûϢʱ£¬ÐèÒªÒªÖ¸¶¨¸ÃÏûÏ¢µÄtopic¡£RocketMQ»áÂÖѯ¸ÃtopicϵÄËùÓжÓÁУ¬½«ÏûÏ¢·¢ËͳöÈ¥¡£

Producer Óë Producer Group

Producer±íʾÏûÏ¢¶ÓÁеÄÉú²úÕß¡£ÏûÏ¢¶ÓÁеı¾ÖʾÍÊÇʵÏÖÁËpublish-subscribeģʽ£¬Éú²úÕßÉú²úÏûÏ¢£¬Ïû·ÑÕßÏû·ÑÏûÏ¢¡£ËùÒÔÕâÀïµÄProducer¾ÍÊÇÓÃÀ´Éú²úºÍ·¢ËÍÏûÏ¢µÄ£¬Ò»°ãÖ¸ÒµÎñϵͳ¡£

Producer GroupÊÇÒ»ÀàProducerµÄ¼¯ºÏÃû³Æ£¬ÕâÀàProducerͨ³£·¢ËÍÒ»ÀàÏûÏ¢£¬ÇÒ·¢ËÍÂß¼­Ò»Ö¡£

Consumer Óë Consumer Group

ÏûÏ¢Ïû·ÑÕߣ¬Ò»°ãÓɺǫ́ϵͳÒì²½Ïû·ÑÏûÏ¢¡£

Push Consumer

Consumer µÄÒ»ÖÖ£¬Ó¦ÓÃͨ³£Ïò Consumer ¶ÔÏó×¢²áÒ»¸ö Listener ½Ó¿Ú£¬Ò»µ©ÊÕµ½ÏûÏ¢£¬Consumer ¶ÔÏóÁ¢¿Ì»Øµ÷ Listener ½Ó¿Ú·½·¨¡£

Pull Consumer

Consumer µÄÒ»ÖÖ£¬Ó¦ÓÃͨ³£Ö÷¶¯µ÷Óà Consumer µÄÀ­ÏûÏ¢·½·¨´Ó Broker À­ÏûÏ¢£¬Ö÷¶¯È¨ÓÉÓ¦ÓÿØÖÆ¡£

Consumer GroupÊÇÒ»ÀàConsumerµÄ¼¯ºÏÃû³Æ£¬ÕâÀàConsumerͨ³£Ïû·ÑÒ»ÀàÏûÏ¢£¬ÇÒÏû·ÑÂß¼­Ò»Ö¡£

Broker

ÏûÏ¢µÄÖÐתÕߣ¬¸ºÔð´æ´¢ºÍת·¢ÏûÏ¢¡£¿ÉÒÔÀí½âΪÏûÏ¢¶ÓÁзþÎñÆ÷£¬ÌṩÁËÏûÏ¢µÄ½ÓÊÕ¡¢´æ´¢¡¢À­È¡ºÍת·¢·þÎñ¡£brokerÊÇRocketMQµÄºËÐÄ£¬Ëü²»²»Äܹҵģ¬ËùÒÔÐèÒª±£Ö¤brokerµÄ¸ß¿ÉÓá£

¹ã²¥Ïû·Ñ

Ò»ÌõÏûÏ¢±»¶à¸öConsumerÏû·Ñ£¬¼´Ê¹ÕâЩConsumerÊôÓÚͬһ¸öConsumer Group£¬ÏûÏ¢Ò²»á±»Consumer GroupÖеÄÿ¸öConsumer¶¼Ïû·ÑÒ»´Î¡£Ôڹ㲥Ïû·ÑÖеÄConsumer Group¸ÅÄî¿ÉÒÔÈÏΪÔÚÏûÏ¢»®·Ö·½ÃæÎÞÒâÒå¡£

¼¯ÈºÏû·Ñ

Ò»¸öConsumer GroupÖеÄConsumerʵÀýƽ¾ù·Ö̯Ïû·ÑÏûÏ¢¡£ÀýÈçij¸öTopicÓÐ 9 ÌõÏûÏ¢£¬ÆäÖÐÒ»¸öConsumer GroupÓÐ 3 ¸öʵÀý(¿ÉÄÜÊÇ 3 ¸ö½ø³Ì,»òÕß 3 ̨»úÆ÷)£¬ÄÇôÿ¸öʵÀýÖ»Ïû·ÑÆäÖÐµÄ 3 ÌõÏûÏ¢¡£

NameServer

NameServer¼´Ãû³Æ·þÎñ£¬Á½¸ö¹¦ÄÜ£º

½ÓÊÕbrokerµÄÇëÇó£¬×¢²ábrokerµÄ·ÓÉÐÅÏ¢

½Ó¿ÚclientµÄÇëÇ󣬸ù¾Ýij¸ötopic»ñÈ¡Æäµ½brokerµÄ·ÓÉÐÅÏ¢

NameServerûÓÐ״̬£¬¿ÉÒÔºáÏòÀ©Õ¹¡£Ã¿¸öbrokerÔÚÆô¶¯µÄʱºò»áµ½NameServer×¢²á£»ProducerÔÚ·¢ËÍÏûϢǰ»á¸ù¾Ýtopicµ½NameServer»ñȡ·ÓÉ(µ½broker)ÐÅÏ¢£»ConsumerÒ²»á¶¨Ê±»ñÈ¡topic·ÓÉÐÅÏ¢¡£

¶þ¡¢RocketMQ Overview

rocketmq overview

ProducerÏòһЩ¶ÓÁÐÂÖÁ÷·¢ËÍÏûÏ¢£¬¶ÓÁм¯ºÏ³ÆÎªTopic£¬ConsumerÈç¹û×ö¹ã²¥Ïû·Ñ£¬ÔòÒ»¸öconsumerʵÀýÏû·ÑÕâ¸öTopic¶ÔÓ¦µÄËùÓжÓÁУ»Èç¹û×ö¼¯ÈºÏû·Ñ£¬Ôò¶à¸öConsumerʵÀýƽ¾ùÏû·ÑÕâ¸öTopic¶ÔÓ¦µÄ¶ÓÁм¯ºÏ¡£

ÔÙ¿´ÏÂRocketMQÎïÀí²¿Êð½á¹¹Í¼£º

RocketMQÍøÂ粿Êðͼ

RocketMQÍøÂ粿ÊðÌØµã£º

Name Server ÊÇÒ»¸ö¼¸ºõÎÞ״̬½Úµã£¬¿É¼¯Èº²¿Ê𣬽ڵãÖ®¼äÎÞÈκÎÐÅϢͬ²½¡£

Broker²¿ÊðÏà¶Ô¸´ÔÓ£¬Broker·ÖΪMasterÓëSlave£¬Ò»¸öMaster¿ÉÒÔ¶ÔÓ¦¶à¸öSlave£¬µ«ÊÇÒ»¸öSlaveÖ»ÄܶÔÓ¦Ò»¸öMaster£¬MasterÓëSlaveµÄ¶ÔÓ¦¹ØÏµÍ¨¹ýÖ¸¶¨ÏàͬµÄBrokerName£¬²»Í¬µÄBrokerIdÀ´¶¨Ò壬BrokerId=0±íʾMaster£¬·Ç0±íʾSlave¡£MasterÒ²¿ÉÒÔ²¿Êð¶à¸ö¡£Ã¿¸öBrokerÓëName Server¼¯ÈºÖеÄËùÓнڵ㽨Á¢³¤Á¬½Ó£¬¶¨Ê±×¢²áTopicÐÅÏ¢µ½ËùÓÐName Server¡£

ProducerÓëName Server¼¯ÈºÖÐµÄÆäÖÐÒ»¸ö½Úµã£¨Ëæ»úÑ¡Ôñ£©½¨Á¢³¤Á¬½Ó£¬¶¨ÆÚ´ÓName ServerÈ¡Topic·ÓÉÐÅÏ¢£¬²¢ÏòÌṩTopic ·þÎñµÄMaster½¨Á¢³¤Á¬½Ó£¬ÇÒ¶¨Ê±ÏòMaster·¢ËÍÐÄÌø¡£Producer ÍêÈ«ÎÞ״̬£¬¿É¼¯Èº²¿Êð¡£

ConsumerÓëName Server¼¯ÈºÖÐµÄÆäÖÐÒ»¸ö½Úµã£¨Ëæ»úÑ¡Ôñ£©½¨Á¢³¤Á¬½Ó£¬¶¨ÆÚ´ÓName ServerÈ¡Topic ·ÓÉÐÅÏ¢£¬²¢ÏòÌṩTopic·þÎñµÄMaster¡¢Slave½¨Á¢³¤Á¬½Ó£¬ÇÒ¶¨Ê±ÏòMaster¡¢Slave·¢ËÍÐÄÌø¡£Consumer¼È¿ÉÒÔ´ÓMaster¶©ÔÄÏûÏ¢£¬Ò²¿ÉÒÔ´ÓSlave¶©ÔÄÏûÏ¢£¬¶©ÔĹæÔòÓÉBrokerÅäÖþö¶¨¡£

   
3898 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

ÆóÒµ¼Ü¹¹¡¢TOGAFÓëArchiMate¸ÅÀÀ
¼Ü¹¹Ê¦Ö®Â·-ÈçºÎ×öºÃÒµÎñ½¨Ä££¿
´óÐÍÍøÕ¾µçÉÌÍøÕ¾¼Ü¹¹°¸ÀýºÍ¼¼Êõ¼Ü¹¹µÄʾÀý
ÍêÕûµÄArchimateÊÓµãÖ¸ÄÏ£¨°üÀ¨Ê¾Àý£©
Ïà¹ØÎĵµ

Êý¾ÝÖÐ̨¼¼Êõ¼Ü¹¹·½·¨ÂÛÓëʵ¼ù
ÊÊÓÃArchiMate¡¢EA ºÍ iSpace½øÐÐÆóÒµ¼Ü¹¹½¨Ä£
ZachmanÆóÒµ¼Ü¹¹¿ò¼Ü¼ò½é
ÆóÒµ¼Ü¹¹ÈÃSOAÂ䵨
Ïà¹Ø¿Î³Ì

ÔÆÆ½Ì¨Óë΢·þÎñ¼Ü¹¹Éè¼Æ
ÖÐ̨սÂÔ¡¢ÖÐ̨½¨ÉèÓëÊý×ÖÉÌÒµ
ÒÚ¼¶Óû§¸ß²¢·¢¡¢¸ß¿ÉÓÃϵͳ¼Ü¹¹
¸ß¿ÉÓ÷ֲ¼Ê½¼Ü¹¹Éè¼ÆÓëʵ¼ù