±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚ¼òÊ飬½éÉÜÁ˹ؼüÌØÐÔÒÔ¼°ÆäʵÏÖÔÀí£¬Rocket
MQ ×î¼Ñʵ¼ù£¬×¨ÒµÊõÓïºÍÕûÌå¼Ü¹¹µÈ֪ʶ¡£
|
|
¹Ø¼üÌØÐÔÒÔ¼°ÆäʵÏÖÔÀí
Ò»¡¢Ë³ÐòÏûÏ¢
ÏûÏ¢ÓÐÐòÖ¸µÄÊÇÒ»ÀàÏûÏ¢Ïû·Ñʱ£¬Äܰ´ÕÕ·¢Ë͵Ä˳ÐòÀ´Ïû·Ñ¡£ÀýÈ磺һ¸ö¶©µ¥²úÉúÁË 3 ÌõÏûÏ¢£¬·Ö±ðÊǶ©µ¥´´½¨¡¢¶©µ¥¸¶¿î¡¢¶©µ¥Íê³É¡£Ïû·Ñʱ£¬Òª°´ÕÕÕâ¸ö˳ÐòÏû·Ñ²ÅÓÐÒâÒå¡£µ«Í¬Ê±¶©µ¥Ö®¼äÓÖÊÇ¿ÉÒÔ²¢ÐÐÏû·ÑµÄ¡£
¼ÙÈçÉú²úÕß²úÉúÁË2ÌõÏûÏ¢£ºM1¡¢M2£¬Òª±£Ö¤ÕâÁ½ÌõÏûÏ¢µÄ˳Ðò£¬Ó¦¸ÃÔõÑù×ö£¿ÄãÄÔÖÐÏëµ½µÄ¿ÉÄÜÊÇÕâÑù£º

Äã¿ÉÄÜ»á²ÉÓÃÕâÖÖ·½Ê½±£Ö¤ÏûϢ˳Ðò
M1·¢Ë͵½S1ºó£¬M2·¢Ë͵½S2£¬Èç¹ûÒª±£Ö¤M1ÏÈÓÚM2±»Ïû·Ñ£¬ÄÇôÐèÒªM1µ½´ïÏû·Ñ¶Ëºó£¬Í¨ÖªS2£¬È»ºóS2ÔÙ½«M2·¢Ë͵½Ïû·Ñ¶Ë¡£
Õâ¸öÄ£ÐÍ´æÔÚµÄÎÊÌâÊÇ£¬Èç¹ûM1ºÍM2·Ö±ð·¢Ë͵½Á½Ì¨ServerÉÏ£¬¾Í²»Äܱ£Ö¤M1ÏÈ´ïµ½£¬Ò²¾Í²»Äܱ£Ö¤M1±»ÏÈÏû·Ñ£¬ÄÇô¾ÍÐèÒªÔÚMQ
Server¼¯ÈºÎ¬»¤ÏûÏ¢µÄ˳Ðò¡£ÄÇôÈçºÎ½â¾ö£¿Ò»ÖÖ¼òµ¥µÄ·½Ê½¾ÍÊǽ«M1¡¢M2·¢Ë͵½Í¬Ò»¸öServerÉÏ£º

±£Ö¤ÏûϢ˳Ðò£¬Äã¸Ä½øºóµÄ·½·¨
ÕâÑù¿ÉÒÔ±£Ö¤M1ÏÈÓÚM2µ½´ïMQServer£¨¿Í»§¶ËµÈ´ýM1³É¹¦ºóÔÙ·¢ËÍM2£©£¬¸ù¾ÝÏÈ´ïµ½Ïȱ»Ïû·ÑµÄÔÔò£¬M1»áÏÈÓÚM2±»Ïû·Ñ£¬ÕâÑù¾Í±£Ö¤ÁËÏûÏ¢µÄ˳Ðò¡£
Õâ¸öÄ£ÐÍ£¬ÀíÂÛÉÏ¿ÉÒÔ±£Ö¤ÏûÏ¢µÄ˳Ðò£¬µ«ÔÚʵ¼ÊÔËÓÃÖÐÄãÓ¦¸Ã»áÓöµ½ÏÂÃæµÄÎÊÌ⣺

ÍøÂçÑÓ³ÙÎÊÌâ
Ö»Òª½«ÏûÏ¢´Óһ̨·þÎñÆ÷·¢ÍùÁíһ̨·þÎñÆ÷£¬¾Í»á´æÔÚÍøÂçÑÓ³ÙÎÊÌâ¡£ÈçÉÏͼËùʾ£¬Èç¹û·¢ËÍM1ºÄʱ´óÓÚ·¢ËÍM2µÄºÄʱ£¬ÄÇôM2¾ÍÏȱ»Ïû·Ñ£¬ÈÔÈ»²»Äܱ£Ö¤ÏûÏ¢µÄ˳Ðò¡£¼´Ê¹M1ºÍM2ͬʱµ½´ïÏû·Ñ¶Ë£¬ÓÉÓÚ²»Çå³þÏû·Ñ¶Ë1ºÍÏû·Ñ¶Ë2µÄ¸ºÔØÇé¿ö£¬ÈÔÈ»ÓпÉÄܳöÏÖM2ÏÈÓÚM1±»Ïû·Ñ¡£ÈçºÎ½â¾öÕâ¸öÎÊÌ⣿½«M1ºÍM2·¢Íùͬһ¸öÏû·ÑÕß¼´¿É£¬ÇÒ·¢ËÍM1ºó£¬ÐèÒªÏû·Ñ¶ËÏìÓ¦³É¹¦ºó²ÅÄÜ·¢ËÍM2¡£
µ«ÓÖ»áÒýÈëÁíÍâÒ»¸öÎÊÌ⣬Èç¹û·¢ËÍM1ºó£¬Ïû·Ñ¶Ë1ûÓÐÏìÓ¦£¬ÄÇÊǼÌÐø·¢ËÍM2ÄØ£¬»¹ÊÇÖØÐ·¢ËÍM1£¿Ò»°ãΪÁ˱£Ö¤ÏûÏ¢Ò»¶¨±»Ïû·Ñ£¬¿Ï¶¨»áÑ¡ÔñÖØ·¢M1µ½ÁíÍâÒ»¸öÏû·Ñ¶Ë2£¬¾ÍÈçÏÂͼËùʾ¡£

±£Ö¤ÏûϢ˳ÐòµÄÕýÈ·×ËÊÆ
ÕâÑùµÄÄ£Ð;ÍÑϸñ±£Ö¤ÏûÏ¢µÄ˳Ðò£¬Ï¸ÐĵÄÄãÈÔÈ»»á·¢ÏÖÎÊÌ⣬Ïû·Ñ¶Ë1ûÓÐÏìÓ¦ServerʱÓÐÁ½ÖÖÇé¿ö£¬Ò»ÖÖÊÇM1ȷʵûÓе½´ï£¬ÁíÍâÒ»ÖÖÇé¿öÊÇÏû·Ñ¶Ë1ÒѾÏìÓ¦£¬µ«ÊÇServer¶ËûÓÐÊÕµ½¡£Èç¹ûÊǵڶþÖÖÇé¿ö£¬ÖØ·¢M1£¬¾Í»áÔì³ÉM1±»Öظ´Ïû·Ñ¡£Ò²¾ÍÊÇÎÒÃǺóÃæÒªËµµÄµÚ¶þ¸öÎÊÌ⣬ÏûÏ¢ÖØ¸´ÎÊÌâ¡£
»Ø¹ýÍ·À´¿´ÏûϢ˳ÐòÎÊÌ⣬ÑϸñµÄ˳ÐòÏûÏ¢·Ç³£ÈÝÒ×Àí½â£¬¶øÇÒ´¦ÀíÎÊÌâÒ²±È½ÏÈÝÒ×£¬ÒªÊµÏÖÑϸñµÄ˳ÐòÏûÏ¢£¬¼òµ¥ÇÒ¿ÉÐеİ취¾ÍÊÇ£º
±£Ö¤Éú²úÕß - MQServer - Ïû·ÑÕßÊÇÒ»¶ÔÒ»¶ÔÒ»µÄ¹ØÏµ
µ«ÊÇÕâÑùÉè¼Æ£¬²¢ÐжȾͳÉΪÁËÏûϢϵͳµÄÆ¿¾±£¨ÍÌÍÂÁ¿²»¹»£©£¬Ò²»áµ¼Ö¸ü¶àµÄÒì³£´¦Àí£¬±ÈÈ磺ֻҪÏû·Ñ¶Ë³öÏÖÎÊÌ⣬¾Í»áµ¼ÖÂÕû¸ö´¦ÀíÁ÷³Ì×èÈû£¬ÎÒÃDz»µÃ²»»¨·Ñ¸ü¶àµÄ¾«Á¦À´½â¾ö×èÈûµÄÎÊÌâ¡£
µ«ÎÒÃǵÄ×îÖÕÄ¿±êÊÇÒª¼¯ÈºµÄ¸ßÈÝ´íÐԺ͸ßÍÌÍÂÁ¿¡£ÕâËÆºõÊÇÒ»¶Ô²»¿Éµ÷ºÍµÄì¶Ü£¬ÄÇô°¢ÀïÊÇÈçºÎ½â¾öµÄ£¿
ÊÀ½çÉϽâ¾öÒ»¸ö¼ÆËã»úÎÊÌâ×î¼òµ¥µÄ·½·¨£º¡°Ç¡ºÃ¡±²»ÐèÒª½â¾öËü£¡¡ª¡ª Éòѯ
ÓÐЩÎÊÌ⣬¿´ÆðÀ´ºÜÖØÒª£¬µ«Êµ¼ÊÉÏÎÒÃÇ¿ÉÒÔͨ¹ýºÏÀíµÄÉè¼Æ»òÕß½«ÎÊÌâ·Ö½âÀ´¹æ±Ü¡£Èç¹ûÓ²Òª°Ñʱ¼ä»¨ÔÚ½â¾öËüÃÇÉíÉÏ£¬Êµ¼ÊÉÏÊÇÀ˷ѵģ¬Ð§ÂʵÍϵġ£´ÓÕâ¸ö½Ç¶ÈÀ´¿´ÏûÏ¢µÄ˳ÐòÎÊÌ⣬ÎÒÃÇ¿ÉÒԵóöÁ½¸ö½áÂÛ£º
1¡¢²»¹Ø×¢ÂÒÐòµÄÓ¦ÓÃʵ¼Ê´óÁ¿´æÔÚ
2¡¢¶ÓÁÐÎÞÐò²¢²»Òâζ×ÅÏûÏ¢ÎÞÐò
×îºóÎÒÃÇ´ÓÔ´Âë½Ç¶È·ÖÎöRocketMQÔõôʵÏÖ·¢ËÍ˳ÐòÏûÏ¢¡£
Ò»°ãÏûÏ¢ÊÇͨ¹ýÂÖѯËùÓжÓÁÐÀ´·¢Ë͵썏ºÔؾùºâ²ßÂÔ£©£¬Ë³ÐòÏûÏ¢¿ÉÒÔ¸ù¾ÝÒµÎñ£¬±ÈÈç˵¶©µ¥ºÅÏàͬµÄÏûÏ¢·¢Ë͵½Í¬Ò»¸ö¶ÓÁС£ÏÂÃæµÄʾÀýÖУ¬OrderIdÏàͬµÄÏûÏ¢£¬»á·¢Ë͵½Í¬Ò»¸ö¶ÓÁУº
// RocketMQĬÈÏÌṩÁËÁ½ÖÖ
MessageQueueSelector ʵÏÖ £ºËæ»ú /Hash
SendResult sendResult = producer.send (msg, new
MessageQueueSelector () {
@Override
public MessageQueue select (List mqs, Message
msg , Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size ();
return mqs.get (index);
}
}, orderId );
|
ÔÚ»ñÈ¡µ½Â·ÓÉÐÅÏ¢ÒԺ󣬻á¸ù¾ÝMessageQueueSelectorʵÏÖµÄËã·¨À´Ñ¡ÔñÒ»¸ö¶ÓÁУ¬Í¬Ò»¸öOrderId»ñÈ¡µ½µÄ¶ÓÁÐÊÇͬһ¸ö¶ÓÁС£
private SendResult
send() { // »ñÈ¡topic·ÓÉÐÅÏ¢
TopicPublishInfo topicPublishInfo = this .tryToFindTop
icPublishInfo (msg.getTopic ());
if (topicPublishInfo != null && topicPublishInfo.ok())
{
MessageQueue mq = null;
// ¸ù¾ÝÎÒÃǵÄËã·¨ £¬Ñ¡ÔñÒ»¸ö·¢ËͶÓÁÐ
// ÕâÀïµÄarg = orderId
mq = selector.select (topicPublishInfo .ge tMessage
QueueList (), msg, arg );
if (mq != null) {
return this. sendKernelImpl (msg, mq, communicationMode,
sendCallback, timeout);
}
}
}
|
¶þ¡¢ÏûÏ¢ÖØ¸´
ÉÏÃæÔÚ½â¾öÏûϢ˳ÐòÎÊÌâʱ£¬ÒýÈëÁËÒ»¸öеÄÎÊÌ⣬¾ÍÊÇÏûÏ¢ÖØ¸´¡£ÄÇôRocketMQÊÇÔõÑù½â¾öÏûÏ¢ÖØ¸´µÄÎÊÌâÄØ£¿»¹ÊÇ¡°Ç¡ºÃ¡±²»½â¾ö¡£
Ôì³ÉÏûÏ¢µÄÖØ¸´µÄ¸ù±¾ÔÒòÊÇ£ºÍøÂç²»¿É´ï¡£Ö»ÒªÍ¨¹ýÍøÂç½»»»Êý¾Ý£¬¾ÍÎÞ·¨±ÜÃâÕâ¸öÎÊÌâ¡£ËùÒÔ½â¾öÕâ¸öÎÊÌâµÄ°ì·¨¾ÍÊDz»½â¾ö£¬×ª¶øÈƹýÕâ¸öÎÊÌâ¡£ÄÇôÎÊÌâ¾Í±ä³ÉÁË£ºÈç¹ûÏû·Ñ¶ËÊÕµ½Á½ÌõÒ»ÑùµÄÏûÏ¢£¬Ó¦¸ÃÔõÑù´¦Àí£¿
1¡¢Ïû·Ñ¶Ë´¦ÀíÏûÏ¢µÄÒµÎñÂß¼±£³ÖÃݵÈÐÔ
2¡¢±£Ö¤Ã¿ÌõÏûÏ¢¶¼ÓÐΨһ±àºÅÇÒ±£Ö¤ÏûÏ¢´¦Àí³É¹¦ÓëÈ¥ÖØ±íµÄÈÕ־ͬʱ³öÏÖ
µÚ1ÌõºÜºÃÀí½â£¬Ö»Òª±£³ÖÃݵÈÐÔ£¬²»¹ÜÀ´¶àÉÙÌõÖØ¸´ÏûÏ¢£¬×îºó´¦ÀíµÄ½á¹û¶¼Ò»Ñù¡£µÚ2ÌõÔÀí¾ÍÊÇÀûÓÃÒ»ÕÅÈÕÖ¾±íÀ´¼Ç¼ÒѾ´¦Àí³É¹¦µÄÏûÏ¢µÄID£¬Èç¹ûе½µÄÏûÏ¢IDÒѾÔÚÈÕÖ¾±íÖУ¬ÄÇô¾Í²»ÔÙ´¦ÀíÕâÌõÏûÏ¢¡£
ÎÒÃÇ¿ÉÒÔ¿´µ½µÚ1ÌõµÄ½â¾ö·½Ê½£¬ºÜÃ÷ÏÔÓ¦¸ÃÔÚÏû·Ñ¶ËʵÏÖ£¬²»ÊôÓÚÏûϢϵͳҪʵÏֵŦÄÜ¡£µÚ2Ìõ¿ÉÒÔÏûϢϵͳʵÏÖ£¬Ò²¿ÉÒÔÒµÎñ¶ËʵÏÖ¡£Õý³£Çé¿öϳöÏÖÖØ¸´ÏûÏ¢µÄ¸ÅÂʲ»Ò»¶¨´ó£¬ÇÒÓÉÏûϢϵͳʵÏֵϰ£¬¿Ï¶¨»á¶ÔÏûϢϵͳµÄÍÌÍÂÁ¿ºÍ¸ß¿ÉÓÃÓÐÓ°Ï죬ËùÒÔ×îºÃ»¹ÊÇÓÉÒµÎñ¶Ë×Ô¼º´¦ÀíÏûÏ¢ÖØ¸´µÄÎÊÌ⣬ÕâÒ²ÊÇRocketMQ²»½â¾öÏûÏ¢ÖØ¸´µÄÎÊÌâµÄÔÒò¡£
RocketMQ²»±£Ö¤ÏûÏ¢²»Öظ´£¬Èç¹ûÄãµÄÒµÎñÐèÒª±£Ö¤ÑϸñµÄ²»Öظ´ÏûÏ¢£¬ÐèÒªÄã×Ô¼ºÔÚÒµÎñ¶ËÈ¥ÖØ¡£
Èý¡¢ÊÂÎñÏûÏ¢
RocketMQ³ýÁËÖ§³ÖÆÕͨÏûÏ¢£¬Ë³ÐòÏûÏ¢£¬ÁíÍ⻹֧³ÖÊÂÎñÏûÏ¢¡£Ê×ÏÈÌÖÂÛÒ»ÏÂʲôÊÇÊÂÎñÏûÏ¢ÒÔ¼°Ö§³ÖÊÂÎñÏûÏ¢µÄ±ØÒªÐÔ¡£ÎÒÃÇÒÔÒ»¸öתÕʵij¡¾°ÎªÀýÀ´ËµÃ÷Õâ¸öÎÊÌ⣺BobÏòSmithתÕË100¿é¡£
ÔÚµ¥»ú»·¾³Ï£¬Ö´ÐÐÊÂÎñµÄÇé¿ö£¬´ó¸ÅÊÇÏÂÃæÕâ¸öÑù×Ó£º

µ¥»ú»·¾³ÏÂתÕËÊÂÎñʾÒâͼ
µ±Óû§Ôö³¤µ½Ò»¶¨³Ì¶È£¬BobºÍSmithµÄÕË»§¼°Óà¶îÐÅÏ¢ÒѾ²»ÔÚͬһ̨·þÎñÆ÷ÉÏÁË£¬ÄÇôÉÏÃæµÄÁ÷³Ì¾Í±ä³ÉÁËÕâÑù£º

¼¯Èº»·¾³ÏÂתÕËÊÂÎñʾÒâͼ
ÕâʱºòÄã»á·¢ÏÖ£¬Í¬ÑùÊÇÒ»¸öתÕ˵ÄÒµÎñ£¬ÔÚ¼¯Èº»·¾³Ï£¬ºÄʱ¾ÓÈ»³É±¶µÄÔö³¤£¬ÕâÏÔÈ»ÊDz»Äܹ»½ÓÊܵġ£ÄÇÎÒÃÇÈçºÎÀ´¹æ±ÜÕâ¸öÎÊÌ⣿
´óÊÂÎñ = СÊÂÎñ + Òì²½
½«´óÊÂÎñ²ð·Ö³É¶à¸öСÊÂÎñÒì²½Ö´ÐС£ÕâÑù»ù±¾ÉÏÄܹ»½«¿ç»úÊÂÎñµÄÖ´ÐÐЧÂÊÓÅ»¯µ½Óëµ¥»úÒ»Ö¡£×ªÕ˵ÄÊÂÎñ¾Í¿ÉÒÔ·Ö½â³ÉÈçÏÂÁ½¸öСÊÂÎñ£º

СÊÂÎñ+Òì²½ÏûÏ¢
ͼÖÐÖ´Ðб¾µØÊÂÎñ£¨BobÕË»§¿Û¿î£©ºÍ·¢ËÍÒì²½ÏûÏ¢Ó¦¸Ã±£³Öͬʱ³É¹¦»òÕßʧ°ÜÖУ¬Ò²¾ÍÊǿۿî³É¹¦ÁË£¬·¢ËÍÏûÏ¢Ò»¶¨Òª³É¹¦£¬Èç¹û¿Û¿îʧ°ÜÁË£¬¾Í²»ÄÜÔÙ·¢ËÍÏûÏ¢¡£ÄÇÎÊÌâÊÇ£ºÎÒÃÇÊÇÏȿۿÊÇÏÈ·¢ËÍÏûÏ¢ÄØ£¿
Ê×ÏÈÎÒÃÇ¿´Ï£¬ÏÈ·¢ËÍÏûÏ¢£¬´óÖµÄʾÒâͼÈçÏ£º

ÊÂÎñÏûÏ¢£ºÏÈ·¢ËÍÏûÏ¢
´æÔÚµÄÎÊÌâÊÇ£ºÈç¹ûÏûÏ¢·¢Ëͳɹ¦£¬µ«Êǿۿîʧ°Ü£¬Ïû·Ñ¶Ë¾Í»áÏû·Ñ´ËÏûÏ¢£¬½ø¶øÏòSmithÕË»§¼ÓÇ®¡£
ÏÈ·¢ÏûÏ¢²»ÐУ¬ÄÇÎÒÃǾÍÏȿۿîߣ¬´óÖµÄʾÒâͼÈçÏ£º

ÊÂÎñÏûÏ¢-Ïȿۿî
´æÔÚµÄÎÊÌâ¸úÉÏÃæÀàËÆ£ºÈç¹û¿Û¿î³É¹¦£¬·¢ËÍÏûϢʧ°Ü£¬¾Í»á³öÏÖBob¿ÛÇ®ÁË£¬µ«ÊÇSmithÕË»§Î´¼ÓÇ®¡£
¿ÉÄÜ´ó¼Ò»áÓкܶàµÄ·½·¨À´½â¾öÕâ¸öÎÊÌ⣬±ÈÈ磺ֱ½Ó½«·¢ÏûÏ¢·Åµ½Bob¿Û¿îµÄÊÂÎñÖÐÈ¥£¬Èç¹û·¢ËÍʧ°Ü£¬Å׳öÒì³££¬ÊÂÎñ»Ø¹ö¡£ÕâÑùµÄ´¦Àí·½Ê½Ò²·ûºÏ¡°Ç¡ºÃ¡±²»ÐèÒª½â¾öµÄÔÔò¡£RocketMQÖ§³ÖÊÂÎñÏûÏ¢£¬ÏÂÃæÎÒÃÇÀ´¿´¿´RocketMQÊÇÔõÑùÀ´ÊµÏֵġ£

RocketMQʵÏÖ·¢ËÍÊÂÎñÏûÏ¢
RocketMQµÚÒ»½×¶Î·¢ËÍPreparedÏûϢʱ£¬»áÄõ½ÏûÏ¢µÄµØÖ·£¬µÚ¶þ½×¶ÎÖ´Ðб¾µØÊÂÎµÚÈý½×¶Îͨ¹ýµÚÒ»½×¶ÎÄõ½µÄµØÖ·È¥·ÃÎÊÏûÏ¢£¬²¢ÐÞ¸Ä״̬¡£Ï¸ÐĵÄÄã¿ÉÄÜÓÖ·¢ÏÖÎÊÌâÁË£¬Èç¹ûÈ·ÈÏÏûÏ¢·¢ËÍʧ°ÜÁËÔõô°ì£¿RocketMQ»á¶¨ÆÚɨÃèÏûÏ¢¼¯ÈºÖеÄÊÂÎïÏûÏ¢£¬Õâʱºò·¢ÏÖÁËPreparedÏûÏ¢£¬Ëü»áÏòÏûÏ¢·¢ËÍÕßÈ·ÈÏ£¬BobµÄÇ®µ½µ×ÊǼõÁË»¹ÊÇû¼õÄØ£¿Èç¹û¼õÁËÊǻعö»¹ÊǼÌÐø·¢ËÍÈ·ÈÏÏûÏ¢ÄØ£¿RocketMQ»á¸ù¾Ý·¢ËͶËÉèÖõIJßÂÔÀ´¾ö¶¨Êǻعö»¹ÊǼÌÐø·¢ËÍÈ·ÈÏÏûÏ¢¡£ÕâÑù¾Í±£Ö¤ÁËÏûÏ¢·¢ËÍÓë±¾µØÊÂÎñͬʱ³É¹¦»òͬʱʧ°Ü¡£
ÄÇÎÒÃÇÀ´¿´ÏÂRocketMQÔ´Â룬ÊDz»ÊÇÕâÑùÀ´´¦ÀíÊÂÎñÏûÏ¢µÄ¡£¿Í»§¶Ë·¢ËÍÊÂÎñÏûÏ¢µÄ²¿·Ö£¨ÍêÕû´úÂëÇë²é¿´£ºrocketmq-example¹¤³ÌϵÄcom.alibaba.rocketmq.example.transaction.TransactionProducer£©£º
// δ¾öÊÂÎñ£¬MQ·þÎñÆ÷»Ø²é¿Í»§¶Ë
// Ò²¾ÍÊÇÉÏÎÄËù˵µÄ£¬µ±RocketMQ·¢ÏÖ`PreparedÏûÏ¢`ʱ£¬»á¸ù¾ÝÕâ¸öListenerʵÏֵIJßÂÔÀ´¾ö¶ÏÊÂÎñ
TransactionCheckListener transactionCheckListener
= new TransactionCheckListenerImpl();
// ¹¹ÔìÊÂÎñÏûÏ¢µÄÉú²úÕß
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// ÉèÖÃÊÂÎñ¾ö¶Ï´¦ÀíÀà
producer.setTransactionCheckListener(transactionCheckListener);
// ±¾µØÊÂÎñµÄ´¦ÀíÂß¼£¬Ï൱ÓÚʾÀýÖмì²éBobÕË»§²¢¿ÛÇ®µÄÂß¼
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// ¹¹ÔìMSG£¬Ê¡ÂÔ¹¹Ôì²ÎÊý
Message msg = new Message(......);
// ·¢ËÍÏûÏ¢
SendResult sendResult = producer.sendMessageInTransaction(msg,
tranExecuter, null);
producer.shutdown();
½Ó×Ų鿴sendMessageInTransaction·½·¨µÄÔ´Â룬×ܹ²·ÖΪ3¸ö½×¶Î£º·¢ËÍPreparedÏûÏ¢¡¢Ö´Ðб¾µØÊÂÎñ¡¢·¢ËÍÈ·ÈÏÏûÏ¢¡£
public TransactionSendResult sendMessageInTransaction(.....)
{
// Âß¼´úÂ룬·Çʵ¼Ê´úÂë
// 1.·¢ËÍÏûÏ¢
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.Èç¹ûÏûÏ¢·¢Ëͳɹ¦£¬´¦ÀíÓëÏûÏ¢¹ØÁªµÄ±¾µØÊÂÎñµ¥Ôª
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg,
arg);
// 3.½áÊøÊÂÎñ
this.endTransaction(sendResult, localTransactionState,
localException);
}
endTransaction·½·¨»á½«ÇëÇó·¢Íùbroker(mq server)È¥¸üÐÂÊÂÎïÏûÏ¢µÄ×îÖÕ״̬£º
¸ù¾ÝsendResultÕÒµ½PreparedÏûÏ¢
¸ù¾ÝlocalTransaction¸üÐÂÏûÏ¢µÄ×îÖÕ״̬
Èç¹ûendTransaction·½·¨Ö´ÐÐʧ°Ü£¬µ¼ÖÂÊý¾ÝûÓз¢Ë͵½broker£¬broker»áÓлزéÏ̶߳¨Ê±£¨Ä¬ÈÏ1·ÖÖÓ£©É¨Ãèÿ¸ö´æ´¢ÊÂÎñ״̬µÄ±í¸ñÎļþ£¬Èç¹ûÊÇÒѾÌá½»»òÕ߻عöµÄÏûÏ¢Ö±½ÓÌø¹ý£¬Èç¹ûÊÇprepared״̬Ôò»áÏòProducer·¢ÆðCheckTransactionÇëÇó£¬Producer»áµ÷ÓÃDefaultMQProducerImpl.checkTransactionState()·½·¨À´´¦ÀíbrokerµÄ¶¨Ê±»Øµ÷ÇëÇ󣬶øcheckTransactionState»áµ÷ÓÃÎÒÃǵÄÊÂÎñÉèÖõľö¶Ï·½·¨£¬×îºóµ÷ÓÃendTransactionOnewayÈÃbrokerÀ´¸üÐÂÏûÏ¢µÄ×îÖÕ״̬¡£
Ôٻص½×ªÕ˵ÄÀý×Ó£¬Èç¹ûBobµÄÕË»§µÄÓà¶îÒѾ¼õÉÙ£¬ÇÒÏûÏ¢ÒѾ·¢Ëͳɹ¦£¬Smith¶Ë¿ªÊ¼Ïû·ÑÕâÌõÏûÏ¢£¬Õâ¸öʱºò¾Í»á³öÏÖÏû·Ñʧ°ÜºÍÏû·Ñ³¬Ê±Á½¸öÎÊÌ⣿½â¾ö³¬Ê±ÎÊÌâµÄ˼·¾ÍÊÇÒ»Ö±ÖØÊÔ£¬Ö±µ½Ïû·Ñ¶ËÏû·ÑÏûÏ¢³É¹¦£¬Õû¸ö¹ý³ÌÖÐÓпÉÄÜ»á³öÏÖÏûÏ¢ÖØ¸´µÄÎÊÌ⣬°´ÕÕÇ°ÃæµÄ˼·½â¾ö¼´¿É¡£

Ïû·ÑÊÂÎñÏûÏ¢
ÕâÑù»ù±¾ÉÏ¿ÉÒÔ½â¾ö³¬Ê±ÎÊÌ⣬µ«ÊÇÈç¹ûÏû·Ñʧ°ÜÔõô°ì£¿°¢ÀïÌṩ¸øÎÒÃǵĽâ¾ö·½·¨ÊÇ£ºÈ˹¤½â¾ö¡£´ó¼Ò¿ÉÒÔ¿¼ÂÇһϣ¬°´ÕÕÊÂÎñµÄÁ÷³Ì£¬ÒòΪijÖÖÔÒòSmith¼Ó¿îʧ°Ü£¬ÐèÒª»Ø¹öÕû¸öÁ÷³Ì¡£Èç¹ûÏûϢϵͳҪʵÏÖÕâ¸ö»Ø¹öÁ÷³ÌµÄ»°£¬ÏµÍ³¸´ÔӶȽ«´ó´óÌáÉý£¬ÇÒºÜÈÝÒ׳öÏÖBug£¬¹À¼Æ³öÏÖBugµÄ¸ÅÂÊ»á±ÈÏû·Ñʧ°ÜµÄ¸ÅÂÊ´óºÜ¶à¡£ÎÒÃÇÐèÒªºâÁ¿ÊÇ·ñÖµµÃ»¨Õâô´óµÄ´ú¼ÛÀ´½â¾öÕâÑùÒ»¸ö³öÏÖ¸ÅÂʷdz£Ð¡µÄÎÊÌ⣬ÕâÒ²ÊÇ´ó¼ÒÔÚ½â¾öÒÉÄÑÎÊÌâʱÐèÒª¶à¶à˼¿¼µÄµØ·½¡£
20160321²¹³ä£ºÔÚ3.2.6°æ±¾ÖÐÒÆ³ýÁËÊÂÎñÏûÏ¢µÄʵÏÖ£¬ËùÒԴ˰汾²»Ö§³ÖÊÂÎñÏûÏ¢£¬¾ßÌåÇé¿öÇë²Î¿¼rocketmqµÄissues£º
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156
ËÄ¡¢ProducerÈçºÎ·¢ËÍÏûÏ¢
ProducerÂÖѯijtopicϵÄËùÓжÓÁеķ½Ê½À´ÊµÏÖ·¢ËÍ·½µÄ¸ºÔؾùºâ£¬ÈçÏÂͼËùʾ£º

producer·¢ËÍÏûÏ¢¸ºÔؾùºâ
Ê×ÏÈ·ÖÎöÒ»ÏÂRocketMQµÄ¿Í»§¶Ë·¢ËÍÏûÏ¢µÄÔ´Â룺
// ¹¹ÔìProducer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// ³õʼ»¯Producer£¬Õû¸öÓ¦ÓÃÉúÃüÖÜÆÚÄÚ£¬Ö»ÐèÒª³õʼ»¯1´Î
producer.start();
// ¹¹ÔìMessage
Message msg = new Message("TopicTest1",//
topic
"TagA",// tag£º¸øÏûÏ¢´ò±êÇ©,ÓÃÓÚÇø·ÖÒ»ÀàÏûÏ¢£¬¿ÉΪnull
"OrderID188",// key£º×Ô¶¨ÒåKey£¬¿ÉÒÔÓÃÓÚÈ¥ÖØ£¬¿ÉΪnull
("Hello MetaQ").getBytes());// body£ºÏûÏ¢ÄÚÈÝ
// ·¢ËÍÏûÏ¢²¢·µ»Ø½á¹û
SendResult sendResult = producer.send(msg);
// ÇåÀí×ÊÔ´£¬¹Ø±ÕÍøÂçÁ¬½Ó£¬×¢Ïú×Ô¼º
producer.shutdown();
ÔÚÕû¸öÓ¦ÓÃÉúÃüÖÜÆÚÄÚ£¬Éú²úÕßÐèÒªµ÷ÓÃÒ»´Îstart·½·¨À´³õʼ»¯£¬³õʼ»¯Ö÷ÒªÍê³ÉµÄÈÎÎñÓУº
Èç¹ûûÓÐÖ¸¶¨namesrvµØÖ·£¬½«»á×Ô¶¯Ñ°Ö·
Æô¶¯¶¨Ê±ÈÎÎñ£º¸üÐÂnamesrvµØÖ·¡¢´Ónamsrv¸üÐÂtopic·ÓÉÐÅÏ¢¡¢ÇåÀíÒѾ¹ÒµôµÄbroker¡¢ÏòËùÓÐbroker·¢ËÍÐÄÌø...
Æô¶¯¸ºÔؾùºâµÄ·þÎñ
³õʼ»¯Íê³Éºó£¬¿ªÊ¼·¢ËÍÏûÏ¢£¬·¢ËÍÏûÏ¢µÄÖ÷Òª´úÂëÈçÏ£º
private SendResult sendDefaultImpl(Message msg,......)
{
// ¼ì²éProducerµÄ״̬ÊÇ·ñÊÇRUNNING
this.makeSureStateOK();
// ¼ì²émsgÊÇ·ñºÏ·¨£ºÊÇ·ñΪnull¡¢topic,bodyÊÇ·ñΪ¿Õ¡¢bodyÊÇ·ñ³¬³¤
Validators.checkMessage(msg, this.defaultMQProducer);
// »ñÈ¡topic·ÓÉÐÅÏ¢
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// ´Ó·ÓÉÐÅÏ¢ÖÐÑ¡ÔñÒ»¸öÏûÏ¢¶ÓÁÐ
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
// ½«ÏûÏ¢·¢Ë͵½¸Ã¶ÓÁÐÉÏÈ¥
sendResult = this.sendKernelImpl(msg, mq, communicationMode,
sendCallback, timeout);
}
´úÂëÖÐÐèÒª¹Ø×¢µÄÁ½¸ö·½·¨tryToFindTopicPublishInfoºÍselectOneMessageQueue¡£Ç°ÃæËµ¹ýÔÚproducer³õʼ»¯Ê±£¬»áÆô¶¯¶¨Ê±ÈÎÎñ»ñȡ·ÓÉÐÅÏ¢²¢¸üе½±¾µØ»º´æ£¬ËùÒÔtryToFindTopicPublishInfo»áÊ×ÏÈ´Ó»º´æÖлñÈ¡topic·ÓÉÐÅÏ¢£¬Èç¹ûûÓлñÈ¡µ½£¬Ôò»á×Ô¼ºÈ¥namesrv»ñȡ·ÓÉÐÅÏ¢¡£selectOneMessageQueue·½·¨Í¨¹ýÂÖѯµÄ·½Ê½£¬·µ»ØÒ»¸ö¶ÓÁУ¬ÒÔ´ïµ½¸ºÔؾùºâµÄÄ¿µÄ¡£
Èç¹ûProducer·¢ËÍÏûϢʧ°Ü£¬»á×Ô¶¯ÖØÊÔ£¬ÖØÊԵIJßÂÔ£º
ÖØÊÔ´ÎÊý < retryTimesWhenSendFailed£¨¿ÉÅäÖã©
×ܵĺÄʱ£¨°üº¬ÖØÊÔn´ÎµÄºÄʱ£© < sendMsgTimeout£¨·¢ËÍÏûϢʱ´«ÈëµÄ²ÎÊý£©
ͬʱÂú×ãÉÏÃæÁ½¸öÌõ¼þºó£¬Producer»áÑ¡ÔñÁíÍâÒ»¸ö¶ÓÁз¢ËÍÏûÏ¢
Îå¡¢ÏûÏ¢´æ´¢
RocketMQµÄÏûÏ¢´æ´¢ÊÇÓÉconsume queueºÍcommit logÅäºÏÍê³ÉµÄ¡£
1¡¢Consume Queue
consume queueÊÇÏûÏ¢µÄÂß¼¶ÓÁУ¬Ï൱ÓÚ×ÖµäµÄĿ¼£¬ÓÃÀ´Ö¸¶¨ÏûÏ¢ÔÚÎïÀíÎļþcommit
logÉϵÄλÖá£
ÎÒÃÇ¿ÉÒÔÔÚÅäÖÃÖÐÖ¸¶¨consumequeueÓëcommitlog´æ´¢µÄĿ¼
ÿ¸ötopicϵÄÿ¸öqueue¶¼ÓÐÒ»¸ö¶ÔÓ¦µÄconsumequeueÎļþ£¬±ÈÈ磺
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume QueueÎļþ×éÖ¯£¬ÈçͼËùʾ£º

Consume QueueÎļþ×é֯ʾÒâͼ
¸ù¾ÝtopicºÍqueueIdÀ´×éÖ¯Îļþ£¬Í¼ÖÐTopicAÓÐÁ½¸ö¶ÓÁÐ0,1£¬ÄÇôTopicAºÍQueueId=0×é³ÉÒ»¸öConsumeQueue£¬TopicAºÍQueueId=1×é³ÉÁíÒ»¸öConsumeQueue¡£
°´ÕÕÏû·Ñ¶ËµÄGroupNameÀ´·Ö×éÖØÊÔ¶ÓÁУ¬Èç¹ûÏû·Ñ¶ËÏû·Ñʧ°Ü£¬ÏûÏ¢½«±»·¢ÍùÖØÊÔ¶ÓÁÐÖУ¬±ÈÈçͼÖеÄ%RETRY%ConsumerGroupA¡£
°´ÕÕÏû·Ñ¶ËµÄGroupNameÀ´·Ö×éËÀÐŶÓÁУ¬Èç¹ûÏû·Ñ¶ËÏû·Ñʧ°Ü£¬²¢ÖØÊÔÖ¸¶¨´ÎÊýºó£¬ÈÔȻʧ°Ü£¬Ôò·¢ÍùËÀÐŶÓÁУ¬±ÈÈçͼÖеÄ%DLQ%ConsumerGroupA¡£
ËÀÐŶÓÁУ¨Dead Letter Queue£©Ò»°ãÓÃÓÚ´æ·ÅÓÉÓÚijÖÖÔÒòÎÞ·¨´«µÝµÄÏûÏ¢£¬±ÈÈç´¦Àíʧ°Ü»òÕßÒѾ¹ýÆÚµÄÏûÏ¢¡£
Consume QueueÖд洢µ¥ÔªÊÇÒ»¸ö20×Ö½Ú¶¨³¤µÄ¶þ½øÖÆÊý¾Ý£¬Ë³Ðòд˳Ðò¶Á£¬ÈçÏÂͼËùʾ£º

consumequeueÎļþ´æ´¢µ¥Ôª¸ñʽ
CommitLog OffsetÊÇÖ¸ÕâÌõÏûÏ¢ÔÚCommit LogÎļþÖеÄʵ¼ÊÆ«ÒÆÁ¿
Size´æ´¢ÖÐÏûÏ¢µÄ´óС
Message Tag HashCode´æ´¢ÏûÏ¢µÄTagµÄ¹þÏ£Öµ£ºÖ÷ÒªÓÃÓÚ¶©ÔÄʱÏûÏ¢¹ýÂË£¨¶©ÔÄʱÈç¹ûÖ¸¶¨ÁËTag£¬»á¸ù¾ÝHashCodeÀ´¿ìËÙ²éÕÒµ½¶©ÔĵÄÏûÏ¢£©
2¡¢Commit Log
CommitLog£ºÏûÏ¢´æ·ÅµÄÎïÀíÎļþ£¬Ã¿Ì¨brokerÉϵÄcommitlog±»±¾»úËùÓеÄqueue¹²Ïí£¬²»×öÈκÎÇø·Ö¡£
ÎļþµÄĬÈÏλÖÃÈçÏ£¬ÈÔÈ»¿Éͨ¹ýÅäÖÃÎļþÐ޸ģº
${user.home} \store\${commitlog}\${fileName}
CommitLogµÄÏûÏ¢´æ´¢µ¥Ôª³¤¶È²»¹Ì¶¨£¬Îļþ˳Ðòд£¬Ëæ»ú¶Á¡£ÏûÏ¢µÄ´æ´¢½á¹¹ÈçϱíËùʾ£¬°´ÕÕ±àºÅ˳ÐòÒÔ¼°±àºÅ¶ÔÓ¦µÄÄÚÈÝÒÀ´Î´æ´¢¡£

Commit Log´æ´¢µ¥Ôª½á¹¹Í¼
3¡¢ÏûÏ¢´æ´¢ÊµÏÖ
ÏûÏ¢´æ´¢ÊµÏÖ£¬±È½Ï¸´ÔÓ£¬Ò²ÖµµÃ´ó¼ÒÉîÈëÁ˽⣬ºóÃæ»áµ¥¶À³ÉÎÄÀ´·ÖÎö£¬ÕâС½ÚÖ»ÒÔ´úÂë˵Ã÷һϾßÌåµÄÁ÷³Ì¡£
// Set the
storage time msg.setStoreTimestamp (System . current
TimeMillis ());
// Set the message body BODY CRC (consider the
most app ropriate setting
msg. setBodyCRC (UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this. default
Message Store .get StoreStatsService ();
synchronized (this) {
long beginLockTimestamp = this. defaultMessageStore.
get SystemClock ().now();
// Here settings are stored timestamp, in order
to ensure an orderly global
msg. setStoreTimestamp (beginLockTimestamp);
// MapedFile£º²Ù×÷ÎïÀíÎļþÔÚÄÚ´æÖеÄÓ³ÉäÒÔ¼°½«ÄÚ´æÊý¾Ý³Ö¾Ã»¯µ½ÎïÀíÎļþÖÐ
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
// ½«Message×·¼Óµ½Îļþcommitlog
result = mapedFile.appendMessage (msg, this. appendMessage
Callback );
switch (result.getStatus()) {
case PUT_OK : break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
result = mapedFile.appendMessage(msg, this. appendMessage
Callback );
break;
DispatchRequest dispatchRequest = new DispatchRequest(
topic ,// 1
queueId ,// 2
result.getWroteOffset (),// 3
result.getWroteBytes (),// 4
tagsCode,// 5
msg.getStoreTimestamp (),// 6
result.getLogicsOffset (),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg. getSysFlag(),// 9
msg. getPreparedTransactionOffset());// 10
// 1.·Ö·¢ÏûϢλÖõ½ConsumeQueue
// 2.·Ö·¢µ½IndexService½¨Á¢Ë÷Òý
this.defaultMessageStore.putDispatchRequest (dispatchRequest
) ;
}
|
4¡¢ÏûÏ¢µÄË÷ÒýÎļþ
Èç¹ûÒ»¸öÏûÏ¢°üº¬keyÖµµÄ»°£¬»áʹÓÃIndexFile´æ´¢ÏûÏ¢Ë÷Òý£¬ÎļþµÄÄÚÈݽṹÈçͼ£º

ÏûÏ¢Ë÷Òý
Ë÷ÒýÎļþÖ÷ÒªÓÃÓÚ¸ù¾ÝkeyÀ´²éѯÏûÏ¢µÄ£¬Á÷³ÌÖ÷ÒªÊÇ£º
¸ù¾Ý²éѯµÄ key µÄ hashcode%slotNum µÃµ½¾ßÌåµÄ²ÛµÄλÖÃ(slotNum ÊÇÒ»¸öË÷ÒýÎļþÀïÃæ°üº¬µÄ×î´ó²ÛµÄÊýÄ¿£¬ÀýÈçͼÖÐËùʾ
slotNum=5000000)
¸ù¾Ý slotValue(slot λÖöÔÓ¦µÄÖµ)²éÕÒµ½Ë÷ÒýÏîÁбíµÄ×îºóÒ»Ïî(µ¹ÐòÅÅÁÐ,slotValue
×ÜÊÇÖ¸Ïò×îеÄÒ»¸öË÷ÒýÏî)
±éÀúË÷ÒýÏîÁÐ±í·µ»Ø²éѯʱ¼ä·¶Î§ÄڵĽá¹û¼¯(ĬÈÏÒ»´Î×î´ó·µ»ØµÄ 32 Ìõ¼Ç¼)
Áù¡¢ÏûÏ¢¶©ÔÄ
RocketMQÏûÏ¢¶©ÔÄÓÐÁ½ÖÖģʽ£¬Ò»ÖÖÊÇPushģʽ£¬¼´MQServerÖ÷¶¯ÏòÏû·Ñ¶ËÍÆËÍ£»ÁíÍâÒ»ÖÖÊÇPullģʽ£¬¼´Ïû·Ñ¶ËÔÚÐèҪʱ£¬Ö÷¶¯µ½MQServerÀÈ¡¡£µ«ÔÚ¾ßÌåʵÏÖʱ£¬PushºÍPullģʽ¶¼ÊDzÉÓÃÏû·Ñ¶ËÖ÷¶¯ÀÈ¡µÄ·½Ê½¡£
Ê×ÏÈ¿´ÏÂÏû·Ñ¶ËµÄ¸ºÔؾùºâ£º

Ïû·Ñ¶Ë¸ºÔؾùºâ
Ïû·Ñ¶Ë»áͨ¹ýRebalanceServiceỊ̈߳¬10ÃëÖÓ×öÒ»´Î»ùÓÚtopicϵÄËùÓжÓÁиºÔØ£º
±éÀúConsumerϵÄËùÓÐtopic£¬È»ºó¸ù¾Ýtopic¶©ÔÄËùÓеÄÏûÏ¢
»ñȡͬһtopicºÍConsumer GroupϵÄËùÓÐConsumer
È»ºó¸ù¾Ý¾ßÌåµÄ·ÖÅä²ßÂÔÀ´·ÖÅäÏû·Ñ¶ÓÁУ¬·ÖÅäµÄ²ßÂÔ°üº¬£ºÆ½¾ù·ÖÅä¡¢Ïû·Ñ¶ËÅäÖõÈ
ÈçͬÉÏͼËùʾ£ºÈç¹ûÓÐ 5 ¸ö¶ÓÁУ¬2 ¸ö consumer£¬ÄÇôµÚÒ»¸ö Consumer Ïû·Ñ 3
¸ö¶ÓÁУ¬µÚ¶þ consumer Ïû·Ñ 2 ¸ö¶ÓÁС£ÕâÀï²ÉÓõľÍÊÇÆ½¾ù·ÖÅä²ßÂÔ£¬ËüÀàËÆÓÚÎÒÃǵķÖÒ³£¬TOPICÏÂÃæµÄËùÓÐqueue¾ÍÊǼǼ£¬ConsumerµÄ¸öÊý¾ÍÏ൱ÓÚ×ܵÄÒ³Êý£¬ÄÇôÿҳÓжàÉÙÌõ¼Ç¼£¬¾ÍÀàËÆÓÚij¸öConsumer»áÏû·ÑÄÄЩ¶ÓÁС£
ͨ¹ýÕâÑùµÄ²ßÂÔÀ´´ïµ½´óÌåÉÏµÄÆ½¾ùÏû·Ñ£¬ÕâÑùµÄÉè¼ÆÒ²¿ÉÒԺܷ½ÃæµÄˮƽÀ©Õ¹ConsumerÀ´Ìá¸ßÏû·ÑÄÜÁ¦¡£
Ïû·Ñ¶ËµÄPushģʽÊÇͨ¹ý³¤ÂÖѯµÄģʽÀ´ÊµÏֵ쬾ÍÈçͬÏÂͼ£º

PushģʽʾÒâͼ
Consumer¶Ëÿ¸ôÒ»¶Îʱ¼äÖ÷¶¯Ïòbroker·¢ËÍÀÏûÏ¢ÇëÇó£¬brokerÔÚÊÕµ½PullÇëÇóºó£¬Èç¹ûÓÐÏûÏ¢¾ÍÁ¢¼´·µ»ØÊý¾Ý£¬Consumer¶ËÊÕµ½·µ»ØµÄÏûÏ¢ºó£¬Ôٻص÷Ïû·ÑÕßÉèÖõÄListener·½·¨¡£Èç¹ûbrokerÔÚÊÕµ½PullÇëÇóʱ£¬ÏûÏ¢¶ÓÁÐÀïûÓÐÊý¾Ý£¬broker¶Ë»á×èÈûÇëÇóÖ±µ½ÓÐÊý¾Ý´«µÝ»ò³¬Ê±²Å·µ»Ø¡£
µ±È»£¬Consumer¶ËÊÇͨ¹ýÒ»¸öÏ߳̽«×èÈû¶ÓÁÐLinkedBlockingQueueÖеÄPullRequest·¢Ë͵½brokerÀÈ¡ÏûÏ¢£¬ÒÔ·ÀÖ¹ConsumerÒ»Ö±»×èÈû¡£¶øBroker¶Ë£¬ÔÚ½ÓÊÕµ½ConsumerµÄPullRequestʱ£¬Èç¹û·¢ÏÖûÓÐÏûÏ¢£¬¾Í»á°ÑPullRequestÈÓµ½ConcurrentHashMapÖлº´æÆðÀ´¡£brokerÔÚÆô¶¯Ê±£¬»áÆô¶¯Ò»¸öÏ̲߳»Í£µÄ´ÓConcurrentHashMapÈ¡³öPullRequest¼ì²é£¬Ö±µ½ÓÐÊý¾Ý·µ»Ø¡£
Æß¡¢RocketMQµÄÆäËûÌØÐÔ
Ç°ÃæµÄ6¸öÌØÐÔ¶¼ÊÇ»ù±¾É϶¼Êǵ㵽Ϊֹ£¬ÏëÒªÉîÈëÁ˽⣬»¹ÐèÒª´ó¼Ò¶à¶à²é¿´Ô´Â룬¶à¶àÔÚʵ¼ÊÖÐÔËÓᣵ±È»³ýÁËÒѾÌáµ½µÄÌØÐÔÍ⣬RocketMQ»¹Ö§³Ö£º
¶¨Ê±ÏûÏ¢
ÏûÏ¢µÄË¢Å̲ßÂÔ
Ö÷¶¯Í¬²½²ßÂÔ£ºÍ¬²½Ë«Ð´¡¢Òì²½¸´ÖÆ
º£Á¿ÏûÏ¢¶Ñ»ýÄÜÁ¦
¸ßЧͨÐÅ
.......
ÆäÖÐÉæ¼°µ½µÄºÜ¶àÉè¼ÆË¼Â·ºÍ½â¾ö·½·¨¶¼ÖµµÃÎÒÃÇÉîÈëÑо¿£º
ÏûÏ¢µÄ´æ´¢Éè¼Æ£º¼ÈÒªÂú×㺣Á¿ÏûÏ¢µÄ¶Ñ»ýÄÜÁ¦£¬ÓÖÒªÂú×㼫¿ìµÄ²éѯЧÂÊ£¬»¹Òª±£Ö¤Ð´ÈëµÄЧÂÊ¡£
¸ßЧµÄͨÐÅ×é¼þÉè¼Æ£º¸ßÍÌÍÂÁ¿£¬ºÁÃë¼¶µÄÏûϢͶµÝÄÜÁ¦¶¼Àë²»¿ª¸ßЧµÄͨÐÅ¡£
.......
RocketMQ×î¼Ñʵ¼ù
Ò»¡¢Producer×î¼Ñʵ¼ù
1¡¢Ò»¸öÓ¦Óþ¡¿ÉÄÜÓÃÒ»¸ö Topic£¬ÏûÏ¢×ÓÀàÐÍÓà tags À´±êʶ£¬tags ¿ÉÒÔÓÉÓ¦ÓÃ×ÔÓÉÉèÖá£Ö»Óз¢ËÍÏûÏ¢ÉèÖÃÁËtags£¬Ïû·Ñ·½ÔÚ¶©ÔÄÏûϢʱ£¬²Å¿ÉÒÔÀûÓÃ
tags ÔÚ broker ×öÏûÏ¢¹ýÂË¡£
2¡¢Ã¿¸öÏûÏ¢ÔÚÒµÎñ²ãÃæµÄΨһ±êʶÂ룬ҪÉèÖõ½ keys ×ֶΣ¬·½±ã½«À´¶¨Î»ÏûÏ¢¶ªÊ§ÎÊÌâ¡£ÓÉÓÚÊǹþÏ£Ë÷Òý£¬ÇëÎñ±Ø±£Ö¤
key ¾¡¿ÉÄÜΨһ£¬ÕâÑù¿ÉÒÔ±ÜÃâDZÔڵĹþÏ£³åÍ»¡£
3¡¢ÏûÏ¢·¢Ëͳɹ¦»òÕßʧ°Ü£¬Òª´òÓ¡ÏûÏ¢ÈÕÖ¾£¬Îñ±ØÒª´òÓ¡ sendresult ºÍ key ×ֶΡ£
4¡¢¶ÔÓÚÏûÏ¢²»¿É¶ªÊ§Ó¦Óã¬Îñ±ØÒªÓÐÏûÏ¢ÖØ·¢»úÖÆ¡£ÀýÈ磺ÏûÏ¢·¢ËÍʧ°Ü£¬´æ´¢µ½Êý¾Ý¿â£¬ÄÜÓж¨Ê±³ÌÐò³¢ÊÔÖØ·¢»òÕßÈ˹¤´¥·¢ÖØ·¢¡£
5¡¢Ä³Ð©Ó¦ÓÃÈç¹û²»¹Ø×¢ÏûÏ¢ÊÇ·ñ·¢Ëͳɹ¦£¬ÇëÖ±½ÓʹÓÃsendOneWay·½·¨·¢ËÍÏûÏ¢¡£
¶þ¡¢Consumer×î¼Ñʵ¼ù
1¡¢Ïû·Ñ¹ý³ÌÒª×öµ½Ãݵȣ¨¼´Ïû·Ñ¶ËÈ¥ÖØ£©
2¡¢¾¡Á¿Ê¹ÓÃÅúÁ¿·½Ê½Ïû·Ñ·½Ê½£¬¿ÉÒԺܴó³Ì¶ÈÉÏÌá¸ßÏû·ÑÍÌÍÂÁ¿¡£
3¡¢ÓÅ»¯Ã¿ÌõÏûÏ¢Ïû·Ñ¹ý³Ì
Èý¡¢ÆäËûÅäÖÃ
ÏßÉÏÓ¦¸Ã¹Ø±ÕautoCreateTopicEnable£¬¼´ÔÚÅäÖÃÎļþÖн«ÆäÉèÖÃΪfalse¡£
RocketMQÔÚ·¢ËÍÏûϢʱ£¬»áÊ×ÏÈ»ñȡ·ÓÉÐÅÏ¢¡£Èç¹ûÊÇеÄÏûÏ¢£¬ÓÉÓÚMQServerÉÏÃæ»¹Ã»Óд´½¨¶ÔÓ¦µÄTopic£¬Õâ¸öʱºò£¬Èç¹ûÉÏÃæµÄÅäÖôò¿ªµÄ»°£¬»á·µ»ØÄ¬ÈÏTOPICµÄ£¨RocketMQ»áÔÚÿ̨brokerÉÏÃæ´´½¨ÃûΪTBW102µÄTOPIC£©Â·ÓÉÐÅÏ¢£¬È»ºóProducer»áÑ¡Ôñһ̨Broker·¢ËÍÏûÏ¢£¬Ñ¡ÖеÄbrokerÔÚ´æ´¢ÏûϢʱ£¬·¢ÏÖÏûÏ¢µÄtopic»¹Ã»Óд´½¨£¬¾Í»á×Ô¶¯´´½¨topic¡£ºó¹û¾ÍÊÇ£ºÒÔºóËùÓиÃTOPICµÄÏûÏ¢£¬¶¼½«·¢Ë͵½Õą̂brokerÉÏ£¬´ï²»µ½¸ºÔؾùºâµÄÄ¿µÄ¡£
ËùÒÔ»ùÓÚĿǰRocketMQµÄÉè¼Æ£¬½¨Ò鹨±Õ×Ô¶¯´´½¨TOPICµÄ¹¦ÄÜ£¬È»ºó¸ù¾ÝÏûÏ¢Á¿µÄ´óС£¬ÊÖ¶¯´´½¨TOPIC¡£
RocketMQÉè¼ÆÏà¹Ø
RocketMQµÄÉè¼Æ¼Ù¶¨£º
ÿ̨PC»úÆ÷¶¼¿ÉÄÜå´»ú²»¿É·þÎñ
ÈÎÒ⼯Ⱥ¶¼ÓпÉÄÜ´¦ÀíÄÜÁ¦²»×ã
×µÄÇé¿öÒ»¶¨»á·¢Éú
ÄÚÍø»·¾³ÐèÒªµÍÑÓ³ÙÀ´Ìṩ×î¼ÑÓû§ÌåÑé
RocketMQµÄ¹Ø¼üÉè¼Æ£º
·Ö²¼Ê½¼¯Èº»¯
Ç¿Êý¾Ý°²È«
º£Á¿Êý¾Ý¶Ñ»ý
ºÁÃ뼶ͶµÝÑÓ³Ù£¨ÍÆÀģʽ£©
ÕâÊÇRocketMQÔÚÉè¼ÆÊ±µÄ¼Ù¶¨Ç°ÌáÒÔ¼°ÐèÒªµ½´ïµÄЧ¹û¡£ÎÒÏëÕâЩ¼Ù¶¨ÊÊÓÃÓÚËùÓеÄϵͳÉè¼Æ¡£Ëæ×ÅÎÒÃÇϵͳµÄ·þÎñµÄÔö¶à£¬Ã¿Î»¿ª·¢Õß¶¼Òª×¢Òâ×Ô¼ºµÄ³ÌÐòÊÇ·ñ´æÔÚµ¥µã¹ÊÕÏ£¬Èç¹û¹ÒÁËÓ¦¸ÃÔõô»Ö¸´¡¢Äܲ»ÄܺܺõÄˮƽÀ©Õ¹¡¢¶ÔÍâµÄ½Ó¿ÚÊÇ·ñ×ã¹»¸ßЧ¡¢×Ô¼º¹ÜÀíµÄÊý¾ÝÊÇ·ñ×ã¹»°²È«......
¶à¶à¹æ·¶×Ô¼ºµÄÉè¼Æ£¬²ÅÄÜ¿ª·¢³ö¸ßЧ½¡×³µÄ³ÌÐò¡£
¸½Â¼£ºRocketMQÉæ¼°µ½µÄ¼¸¸öרҵÊõÓïºÍÕûÌå¼Ü¹¹½éÉÜ
Ò»¡¢RocketMQÖеÄרҵÊõÓï
Topic
topic±íʾÏûÏ¢µÄµÚÒ»¼¶ÀàÐÍ£¬±ÈÈçÒ»¸öµçÉÌϵͳµÄÏûÏ¢¿ÉÒÔ·ÖΪ£º½»Ò×ÏûÏ¢¡¢ÎïÁ÷ÏûÏ¢...... Ò»ÌõÏûÏ¢±ØÐëÓÐÒ»¸öTopic¡£
Tag
Tag±íʾÏûÏ¢µÄµÚ¶þ¼¶ÀàÐÍ£¬±ÈÈç½»Ò×ÏûÏ¢ÓÖ¿ÉÒÔ·ÖΪ£º½»Ò×´´½¨ÏûÏ¢£¬½»Ò×Íê³ÉÏûÏ¢..... Ò»ÌõÏûÏ¢¿ÉÒÔûÓÐTag¡£RocketMQÌṩ2¼¶ÏûÏ¢·ÖÀ࣬·½±ã´ó¼ÒÁé»î¿ØÖÆ¡£
Queue
Ò»¸ötopicÏ£¬ÎÒÃÇ¿ÉÒÔÉèÖöà¸öqueue(ÏûÏ¢¶ÓÁÐ)¡£µ±ÎÒÃÇ·¢ËÍÏûϢʱ£¬ÐèÒªÒªÖ¸¶¨¸ÃÏûÏ¢µÄtopic¡£RocketMQ»áÂÖѯ¸ÃtopicϵÄËùÓжÓÁУ¬½«ÏûÏ¢·¢ËͳöÈ¥¡£
Producer Óë Producer Group
Producer±íʾÏûÏ¢¶ÓÁеÄÉú²úÕß¡£ÏûÏ¢¶ÓÁеı¾ÖʾÍÊÇʵÏÖÁËpublish-subscribeģʽ£¬Éú²úÕßÉú²úÏûÏ¢£¬Ïû·ÑÕßÏû·ÑÏûÏ¢¡£ËùÒÔÕâÀïµÄProducer¾ÍÊÇÓÃÀ´Éú²úºÍ·¢ËÍÏûÏ¢µÄ£¬Ò»°ãÖ¸ÒµÎñϵͳ¡£
Producer GroupÊÇÒ»ÀàProducerµÄ¼¯ºÏÃû³Æ£¬ÕâÀàProducerͨ³£·¢ËÍÒ»ÀàÏûÏ¢£¬ÇÒ·¢ËÍÂß¼Ò»Ö¡£
Consumer Óë Consumer Group
ÏûÏ¢Ïû·ÑÕߣ¬Ò»°ãÓɺǫ́ϵͳÒì²½Ïû·ÑÏûÏ¢¡£
Push Consumer
Consumer µÄÒ»ÖÖ£¬Ó¦ÓÃͨ³£Ïò Consumer ¶ÔÏó×¢²áÒ»¸ö Listener ½Ó¿Ú£¬Ò»µ©ÊÕµ½ÏûÏ¢£¬Consumer
¶ÔÏóÁ¢¿Ì»Øµ÷ Listener ½Ó¿Ú·½·¨¡£
Pull Consumer
Consumer µÄÒ»ÖÖ£¬Ó¦ÓÃͨ³£Ö÷¶¯µ÷Óà Consumer µÄÀÏûÏ¢·½·¨´Ó Broker ÀÏûÏ¢£¬Ö÷¶¯È¨ÓÉÓ¦ÓÿØÖÆ¡£
Consumer GroupÊÇÒ»ÀàConsumerµÄ¼¯ºÏÃû³Æ£¬ÕâÀàConsumerͨ³£Ïû·ÑÒ»ÀàÏûÏ¢£¬ÇÒÏû·ÑÂß¼Ò»Ö¡£
Broker
ÏûÏ¢µÄÖÐתÕߣ¬¸ºÔð´æ´¢ºÍת·¢ÏûÏ¢¡£¿ÉÒÔÀí½âΪÏûÏ¢¶ÓÁзþÎñÆ÷£¬ÌṩÁËÏûÏ¢µÄ½ÓÊÕ¡¢´æ´¢¡¢ÀÈ¡ºÍת·¢·þÎñ¡£brokerÊÇRocketMQµÄºËÐÄ£¬Ëü²»²»Äܹҵģ¬ËùÒÔÐèÒª±£Ö¤brokerµÄ¸ß¿ÉÓá£
¹ã²¥Ïû·Ñ
Ò»ÌõÏûÏ¢±»¶à¸öConsumerÏû·Ñ£¬¼´Ê¹ÕâЩConsumerÊôÓÚͬһ¸öConsumer Group£¬ÏûÏ¢Ò²»á±»Consumer
GroupÖеÄÿ¸öConsumer¶¼Ïû·ÑÒ»´Î¡£Ôڹ㲥Ïû·ÑÖеÄConsumer Group¸ÅÄî¿ÉÒÔÈÏΪÔÚÏûÏ¢»®·Ö·½ÃæÎÞÒâÒå¡£
¼¯ÈºÏû·Ñ
Ò»¸öConsumer GroupÖеÄConsumerʵÀýƽ¾ù·Ö̯Ïû·ÑÏûÏ¢¡£ÀýÈçij¸öTopicÓÐ 9
ÌõÏûÏ¢£¬ÆäÖÐÒ»¸öConsumer GroupÓÐ 3 ¸öʵÀý(¿ÉÄÜÊÇ 3 ¸ö½ø³Ì,»òÕß 3 ̨»úÆ÷)£¬ÄÇôÿ¸öʵÀýÖ»Ïû·ÑÆäÖеÄ
3 ÌõÏûÏ¢¡£
NameServer
NameServer¼´Ãû³Æ·þÎñ£¬Á½¸ö¹¦ÄÜ£º
½ÓÊÕbrokerµÄÇëÇó£¬×¢²ábrokerµÄ·ÓÉÐÅÏ¢
½Ó¿ÚclientµÄÇëÇ󣬸ù¾Ýij¸ötopic»ñÈ¡Æäµ½brokerµÄ·ÓÉÐÅÏ¢
NameServerûÓÐ״̬£¬¿ÉÒÔºáÏòÀ©Õ¹¡£Ã¿¸öbrokerÔÚÆô¶¯µÄʱºò»áµ½NameServer×¢²á£»ProducerÔÚ·¢ËÍÏûϢǰ»á¸ù¾Ýtopicµ½NameServer»ñȡ·ÓÉ(µ½broker)ÐÅÏ¢£»ConsumerÒ²»á¶¨Ê±»ñÈ¡topic·ÓÉÐÅÏ¢¡£
¶þ¡¢RocketMQ Overview

rocketmq overview
ProducerÏòһЩ¶ÓÁÐÂÖÁ÷·¢ËÍÏûÏ¢£¬¶ÓÁм¯ºÏ³ÆÎªTopic£¬ConsumerÈç¹û×ö¹ã²¥Ïû·Ñ£¬ÔòÒ»¸öconsumerʵÀýÏû·ÑÕâ¸öTopic¶ÔÓ¦µÄËùÓжÓÁУ»Èç¹û×ö¼¯ÈºÏû·Ñ£¬Ôò¶à¸öConsumerʵÀýƽ¾ùÏû·ÑÕâ¸öTopic¶ÔÓ¦µÄ¶ÓÁм¯ºÏ¡£
ÔÙ¿´ÏÂRocketMQÎïÀí²¿Êð½á¹¹Í¼£º

RocketMQÍøÂ粿Êðͼ
RocketMQÍøÂ粿ÊðÌØµã£º
Name Server ÊÇÒ»¸ö¼¸ºõÎÞ״̬½Úµã£¬¿É¼¯Èº²¿Ê𣬽ڵãÖ®¼äÎÞÈκÎÐÅϢͬ²½¡£
Broker²¿ÊðÏà¶Ô¸´ÔÓ£¬Broker·ÖΪMasterÓëSlave£¬Ò»¸öMaster¿ÉÒÔ¶ÔÓ¦¶à¸öSlave£¬µ«ÊÇÒ»¸öSlaveÖ»ÄܶÔÓ¦Ò»¸öMaster£¬MasterÓëSlaveµÄ¶ÔÓ¦¹ØÏµÍ¨¹ýÖ¸¶¨ÏàͬµÄBrokerName£¬²»Í¬µÄBrokerIdÀ´¶¨Ò壬BrokerId=0±íʾMaster£¬·Ç0±íʾSlave¡£MasterÒ²¿ÉÒÔ²¿Êð¶à¸ö¡£Ã¿¸öBrokerÓëName
Server¼¯ÈºÖеÄËùÓнڵ㽨Á¢³¤Á¬½Ó£¬¶¨Ê±×¢²áTopicÐÅÏ¢µ½ËùÓÐName Server¡£
ProducerÓëName Server¼¯ÈºÖÐµÄÆäÖÐÒ»¸ö½Úµã£¨Ëæ»úÑ¡Ôñ£©½¨Á¢³¤Á¬½Ó£¬¶¨ÆÚ´ÓName
ServerÈ¡Topic·ÓÉÐÅÏ¢£¬²¢ÏòÌṩTopic ·þÎñµÄMaster½¨Á¢³¤Á¬½Ó£¬ÇÒ¶¨Ê±ÏòMaster·¢ËÍÐÄÌø¡£Producer
ÍêÈ«ÎÞ״̬£¬¿É¼¯Èº²¿Êð¡£
ConsumerÓëName Server¼¯ÈºÖÐµÄÆäÖÐÒ»¸ö½Úµã£¨Ëæ»úÑ¡Ôñ£©½¨Á¢³¤Á¬½Ó£¬¶¨ÆÚ´ÓName
ServerÈ¡Topic ·ÓÉÐÅÏ¢£¬²¢ÏòÌṩTopic·þÎñµÄMaster¡¢Slave½¨Á¢³¤Á¬½Ó£¬ÇÒ¶¨Ê±ÏòMaster¡¢Slave·¢ËÍÐÄÌø¡£Consumer¼È¿ÉÒÔ´ÓMaster¶©ÔÄÏûÏ¢£¬Ò²¿ÉÒÔ´ÓSlave¶©ÔÄÏûÏ¢£¬¶©ÔĹæÔòÓÉBrokerÅäÖþö¶¨¡£
|