±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½²½âÁË
Á½½×¶ÎÊÂÎñ£¬ÏûÏ¢×î´óŬÁ¦½»¸¶£¬¿É¿¿ÏûÏ¢×îÖÕÒ»ÖÂÐÔ·½°¸ÒÔ¼°Rabbitmq ʵս¡£
À´×ÔÓÚ¼òÊ飬,ÓÉ»ðÁú¹ûÈí¼þAnna±à¼¡¢ÍƼö¡£ |
|
±¾ÎÄ¶Ô±È ¶þ½×¶ÎÊÂÎñ¡¢×î´óŬÁ¦½»¸¶ÒÔ¼°ÏûÏ¢×îÖÕÒ»ÖÂÐÔ£¬²¢¸ø³ö²¿·Ö½â¾ö·½°¸,×îÖÕÒ»ÖÂÐÔ·½°¸²Î¿¼°¢ÀïRockMQÊÂÎñÏûÏ¢¡£


Ò» 2½×¶ÎÊÂÎñ
·Ö²¼Ê½ÏµÍ³×îÖÕÒ»ÖÂÐÔÓÐNÖÖ·½°¸£¬±ÈÈç2PC£¨2½×¶ÎÊÂÎñ£© £¬ÒÔ¼°Èý¶ÎÌá½»µÈµÈ£¬µ«¿ªÏú½Ï´ó£¬ÊµÏÖÆðÀ´¸´ÔÓ£¬±ÈÈç2½×¶ÎÊÂÎñΪÀý£¬ÐèÒªÒýÈëÒ»¸öе÷Õߣ¨Coordinator£©À´Í³Ò»ÕÆ¿ØËùÓвÎÓëÕߣ¨Participant£©µÄ²Ù×÷½á¹û
ÒÔ¿ª»áΪÀý£º
¼×ÒÒ±û¶¡ËÄÈËÒª×éÖ¯Ò»¸ö»áÒ飬ÐèҪȷ¶¨»áÒéʱ¼ä£¬²»·ÁÉè¼×ÊÇе÷Õߣ¬ÒÒ±û¶¡ÊDzÎÓëÕß¡£
ͶƱ½×¶Î£º
£¨1£©¼×·¢Óʼþ¸øÒÒ±û¶¡£¬ÖܶþÊ®µã¿ª»áÊÇ·ñÓÐʱ¼ä£»
£¨2£©¼×»Ø¸´ÓÐʱ¼ä£»
£¨3£©Òһظ´ÓÐʱ¼ä£»
£¨4£©±û³Ù³Ù²»»Ø¸´£¬´Ëʱ¶ÔÓÚÕâ¸ö»î¶¯£¬¼×ÒÒ±û¾ù´¦ÓÚ×èÈû״̬£¬Ëã·¨ÎÞ·¨¼ÌÐø½øÐУ»
£¨5£©±û»Ø¸´ÓÐʱ¼ä£¨»òÕßûÓÐʱ¼ä£©£»
Ìá½»½×¶Î£º
£¨1£©Ðµ÷Õß¼×½«ÊÕ¼¯µ½µÄ½á¹û·´À¡¸øÒÒ±û¶¡£¨Ê²Ã´Ê±ºò·´À¡£¬ÒÔ¼°·´À¡½á¹ûÈçºÎ£¬ÔÚ´ËÀýÖÐÈ¡¾öÓë±ûµÄʱ¼äÓë¾ö¶¨£©£»
£¨2£©ÒÒÊÕµ½£»
£¨3£©±ûÊÕµ½£»
£¨4£©¶¡ÊÕµ½£»
²»½öÒªËø×¡²ÎÓëÕßµÄËùÓÐ×ÊÔ´£¬¶øÇÒÒªËø×¡Ðµ÷Õß×ÊÔ´£¬¿ªÏú´ó¡£Ò»¾ä»°×ܽá¾ÍÊÇ£º2PCЧÂʺܵͣ¬·Ö²¼Ê½ÊÂÎñºÜÄÑ×ö¡£
ÔÚ¶ÔÊÂʵÐÔÒªÇóûÓÐÄÇô¸ßµÄÇé¿öÏ£¬¿ÉÒÔÓûùÓÚ×î´óŬÁ¦½»¸¶ && ÏûÏ¢¶ÓÁÐÒÔ¼°ÏûÏ¢´æ´¢À´½â¾ö×îÖÕÒ»ÖÂÐÔ¡£
¶þ ÏûÏ¢×î´óŬÁ¦½»¸¶
Ëùν×î´óŬÁ¦½»¸¶£¬¾ÍÊǰ³·´ÕýÓÃ×î´óŬÁ¦×ö£¬Äܲ»Äܳɹ¦£¬²»×öÍêÈ«±£Ö¤
»áÉæ¼°µ½Èý¸öÄ£¿é
ÉÏÓÎÓ¦Ó㬷¢ÏûÏ¢µ½ MQ ¶ÓÁС£
ÏÂÓÎÓ¦Óã¨ÀýÈç¶ÌÐÅ·þÎñ¡¢Óʼþ·þÎñ£©£¬½ÓÊÜÇëÇ󣬲¢·µ»ØÍ¨Öª½á¹û¡£
×î´óŬÁ¦Í¨Öª·þÎñ£¬¼àÌýÏûÏ¢¶ÓÁУ¬½«ÏûÏ¢´æ´¢µ½Êý¾Ý¿âÖУ¬²¢°´ÕÕ֪ͨ¹æÔòµ÷ÓÃÏÂÓÎÓ¦Óõķ¢ËÍ֪ͨ½Ó¿Ú¡£
¾ßÌåÁ÷³ÌÈçÏÂ

1.ÉÏÓÎÓ¦Ó÷¢ËÍ MQ ÏûÏ¢µ½ MQ ×é¼þÄÚ£¬ÏûÏ¢ÄÚ°üº¬Í¨Öª¹æÔòºÍ֪ͨµØÖ·
2.×î´óŬÁ¦Í¨Öª·þÎñ¼àÌýµ½ MQ ÄÚµÄÏûÏ¢£¬½âÎö֪ͨ¹æÔò²¢·ÅÈëÑÓʱ¶ÓÁеȴý´¥·¢Í¨Öª
3.×î´óŬÁ¦Í¨Öª·þÎñµ÷ÓÃÏÂÓεÄ֪ͨµØÖ·£¬Èç¹ûµ÷Óóɹ¦£¬Ôò¸ÃÏûÏ¢±ê¼ÇΪ֪ͨ³É¹¦£¬Èç¹ûʧ°ÜÔòÔÚÂú×ã֪ͨ¹æÔò£¨ÀýÈç
5 ·ÖÖÓ·¢Ò»´Î£¬¹²·¢ËÍ 10 ´Î£©µÄÇé¿öÏÂÖØÐ·ÅÈëÑÓʱ¶ÓÁеȴýÏ´δ¥·¢¡£
×î´óŬÁ¦Í¨Öª·þÎñ±íʾÔÚ²»Ó°ÏìÖ÷ÒµÎñµÄÇé¿öÏ£¬¾¡¿ÉÄܵØÈ·±£Êý¾ÝµÄÒ»ÖÂÐÔ¡£ËüÐèÒª¿ª·¢ÈËÔ±¸ù¾ÝÒµÎñÀ´Ö¸¶¨Í¨Öª¹æÔò£¬ÔÚÂú×ã֪ͨ¹æÔòµÄǰÌáÏ£¬¾¡¿ÉÄܵÄÈ·±£Êý¾ÝµÄÒ»Ö£¬ÒÔ´ïµ½×î´óŬÁ¦µÄÄ¿µÄ¡£
ʵÏÖÉÏÒ²±È½Ï¼òµ¥£¬Ä¿Ç°Ö÷Á÷ÏûÏ¢¶ÓÁж¼ÓÐack»úÖÆ£¬µ±Ã»ÊÕµ½ackµÄʱºòÓùæÔò×ö¶¨Ê±ÖØ·¢¼´¿É¡£
Óŵ㣺ʵÏÖ¼òµ¥
ȱµã£ºÎÞ²¹³¥»úÖÆ£¬²»±£Ö¤Äܹ»ËÍ´ï
ʵÏÖÒªµã£º ±£Ö¤ÏûÏ¢·¢ËÍʧ°ÜÖ®ºóÄܹ»ºÍÒµÎñÒ»Æð»Ø¹ö£»ÏûÏ¢½ÓÊÜ·½±£Ö¤Ú¤µÈÐÔ£»¶¨Ê±ÖØ·¢»úÖÆ£¬²ÉÓÃÒ»¶¨µÄÖØ·¢²ßÂÔ£¬ÀýÈç˵ָÊýÔö³¤£¬¾Ý˵°¢Àï²ÉÓÃredisµÄzsetÀ´Íê³É.
ÏûÏ¢½øµ½zsetºó£¬DelayQ»áͨ¹ýtimer´¥·¢(±ÈÈçÃë¼¶)£¬forkÏàÓ¦µÄÏû·ÑÏß³ÌÈ¥´¦ÀízsetÀïExecuteTime´óÓÚµ±Ç°Ê±¼äµÄÏûÏ¢¡£DelayQÄõ½Ò»ÌõÏûÏ¢ºó£¬½âÎöÆäÖеÄcallbackurl£¬²¢×é×°²ÎÊý£¬pushÒµÎñÏûÏ¢¸øConsumer.
Consumer·µ»Ø´¦Àí³É¹¦£¬ÄÇôzrem CodisÀïµÄÏûÏ¢¡£Èç¹û´¦Àíʧ°Ü£¬Ôò¼ÆËãÆäÏ´γ¢ÊÔʱ¼ä£¬²¢¸üÐÂÆäExecuteTime.
Èý ¿É¿¿ÏûÏ¢×îÖÕÒ»ÖÂÐÔ·½°¸
´Ë·½°¸Éæ¼° 3 ¸öÄ£¿é£º
ÉÏÓÎÓ¦Óã¬Ö´ÐÐÒµÎñ²¢·¢ËÍ MQ ÏûÏ¢¡£
¿É¿¿ÏûÏ¢·þÎñºÍ MQ ÏûÏ¢×é¼þ£¬Ðµ÷ÉÏÏÂÓÎÏûÏ¢µÄ´«µÝ£¬²¢È·±£ÉÏÏÂÓÎÊý¾ÝµÄÒ»ÖÂÐÔ¡£
ÏÂÓÎÓ¦Ó㬼àÌý MQ µÄÏûÏ¢²¢Ö´ÐÐ×ÔÉíÒµÎñ¡£

µÚÒ»½×¶Î£ºÉÏÓÎÓ¦ÓÃÖ´ÐÐÒµÎñ²¢·¢ËÍ MQ ÏûÏ¢
ÉÏÓÎÓ¦Óý«±¾µØÒµÎñÖ´ÐкÍÏûÏ¢·¢ËͰó¶¨ÔÚͬһ¸ö±¾µØÊÂÎñÖУ¬±£Ö¤ÒªÃ´±¾µØ²Ù×÷³É¹¦²¢·¢ËÍ MQ ÏûÏ¢£¬ÒªÃ´Á½²½²Ù×÷¶¼Ê§°Ü²¢»Ø¹ö¡£
ÉÏÓÎÓ¦ÓúͿɿ¿ÏûÏ¢Ö®¼äµÄÒµÎñ½»»¥Í¼ÈçÏ£º 
ÉÏÓÎÓ¦Ó÷¢ËÍ´ýÈ·ÈÏÏûÏ¢µ½¿É¿¿ÏûϢϵͳ
¿É¿¿ÏûϢϵͳ±£´æ´ýÈ·ÈÏÏûÏ¢²¢·µ»Ø
ÉÏÓÎÓ¦ÓÃÖ´Ðб¾µØÒµÎñ
ÉÏÓÎÓ¦ÓÃ֪ͨ¿É¿¿ÏûϢϵͳȷÈÏÒµÎñÒÑÖ´Ðв¢·¢ËÍÏûÏ¢¡£
¿É¿¿ÏûϢϵͳÐÞ¸ÄÏûϢ״̬Ϊ·¢ËÍ״̬²¢½«ÏûϢͶµÝµ½ MQ Öмä¼þ¡£
ÒÔÉÏÿһ²½¶¼¿ÉÄܳöÏÖʧ°ÜÇé¿ö£¬·ÖÎöÒ»ÏÂÕâ 5 ²½³öÏÖÒì³£ºóÉÏÓÎÒµÎñºÍÏûÏ¢·¢ËÍÊÇ·ñÒ»Ö£º

ÉÏÓÎÓ¦ÓÃÖ´ÐÐÍê³É£¬ÏÂÓÎÓ¦ÓÃÉÐδִÐлòÖ´ÐÐʧ°Üʱ£¬´ËÊÂÎñ¼´´¦ÓÚ BASE ÀíÂÛµÄ Soft State
״̬¡£
µÚ¶þ½×¶Î£ºÏÂÓÎÓ¦ÓüàÌý MQ ÏûÏ¢²¢Ö´ÐÐÒµÎñ
ÏÂÓÎÓ¦ÓüàÌý MQ ÏûÏ¢²¢Ö´ÐÐÒµÎñ£¬²¢ÇÒ½«ÏûÏ¢µÄÏû·Ñ½á¹û֪ͨ¿É¿¿ÏûÏ¢·þÎñ¡£
¿É¿¿ÏûÏ¢µÄ״̬ÐèÒªºÍÏÂÓÎÓ¦ÓõÄÒµÎñÖ´Ðб£³ÖÒ»Ö£¬¿É¿¿ÏûϢ״̬²»ÊÇÒÑÍê³Éʱ£¬È·±£ÏÂÓÎÓ¦ÓÃδִÐУ¬¿É¿¿ÏûϢ״̬ÊÇÒÑÍê³Éʱ£¬È·±£ÏÂÓÎÓ¦ÓÃÒÑÖ´ÐС£
ÏÂÓÎÓ¦ÓúͿɿ¿ÏûÏ¢·þÎñÖ®¼äµÄ½»»¥Í¼ÈçÏ£º

1.ÏÂÓÎÓ¦ÓüàÌý MQ ÏûÏ¢×é¼þ²¢»ñÈ¡ÏûÏ¢
2.ÏÂÓÎÓ¦Óøù¾Ý MQ ÏûÏ¢ÌåÐÅÏ¢´¦Àí±¾µØÒµÎñ
3.ÏÂÓÎÓ¦ÓÃÏò MQ ×é¼þ×Ô¶¯·¢ËÍ ACK È·ÈÏÏûÏ¢±»Ïû·Ñ
4.ÏÂÓÎÓ¦ÓÃ֪ͨ¿É¿¿ÏûϢϵͳÏûÏ¢±»³É¹¦Ïû·Ñ£¬¿É¿¿ÏûÏ¢½«¸ÃÏûϢ״̬¸ü¸ÄΪÒÑÍê³É¡£
ÒÔÉÏÿһ²½¶¼¿ÉÄܳöÏÖʧ°ÜÇé¿ö£¬·ÖÎöÒ»ÏÂÕâ 4 ²½³öÏÖÒì³£ºóÏÂÓÎÒµÎñºÍÏûϢ״̬ÊÇ·ñÒ»Ö£º
ͨ¹ý·ÖÎöÒÔÉÏÁ½¸ö½×¶Î¿ÉÄÜʧ°ÜµÄÇé¿ö£¬ÎªÁËÈ·±£ÉÏÏÂÓÎÊý¾ÝµÄ×îÖÕÒ»ÖÂÐÔ£¬ÔÚ¿É¿¿ÏûϢϵͳÖУ¬ÐèÒª¿ª·¢ ÏûϢ״̬ȷÈÏ
ºÍ ÏûÏ¢ÖØ·¢ Á½¸ö¹¦ÄÜÒÔʵÏÖ BASE ÀíÂÛµÄ Eventually Consistent ÌØÐÔ¡£
Òì³£´¦ÀíÒ»£ºÏûϢ״̬ȷÈÏ
¿É¿¿ÏûÏ¢·þÎñ¶¨Ê±¼àÌýÏûÏ¢µÄ״̬£¬Èç¹û´æÔÚ״̬Ϊ´ýÈ·Èϲ¢ÇÒ³¬Ê±µÄÏûÏ¢£¬Ôò±íʾÉÏÓÎÓ¦ÓúͿɿ¿ÏûÏ¢½»»¥ÖеIJ½Öè
4 »òÕß 5 ³öÏÖÒì³£¡£
¿É¿¿ÏûÏ¢ÔòЯ´øÏûÏ¢ÌåÄÚµÄÐÅÏ¢ÏòÉÏÓÎÓ¦Ó÷¢ÆðÇëÇó²éѯ¸ÃÒµÎñÊÇ·ñÒÑÖ´ÐС£ÉÏÓÎÓ¦ÓÃÌṩһ¸ö¿É²éѯ½Ó¿Ú¹©¿É¿¿ÏûÏ¢×·ËÝÒµÎñÖ´ÐÐ״̬£¬Èç¹ûÒµÎñÖ´Ðгɹ¦Ôò¸ü¸ÄÏûϢ״̬ΪÒÑ·¢ËÍ£¬·ñÔòɾ³ý´ËÏûϢȷ±£Êý¾ÝÒ»Ö¡£¾ßÌåÁ÷³ÌÈçÏ£º

¿É¿¿ÏûÏ¢²éѯ³¬Ê±µÄ´ýÈ·ÈÏ״̬µÄÏûÏ¢
ÏòÉÏÓÎÓ¦ÓòéѯҵÎñÖ´ÐеÄÇé¿ö
ÒµÎñδִÐУ¬Ôòɾ³ý¸ÃÏûÏ¢£¬±£Ö¤ÒµÎñºÍ¿É¿¿ÏûÏ¢·þÎñµÄÒ»ÖÂÐÔ¡£ÒµÎñÒÑÖ´ÐУ¬ÔòÐÞ¸ÄÏûϢ״̬ΪÒÑ·¢ËÍ£¬²¢·¢ËÍÏûÏ¢µ½
MQ ×é¼þ¡£
Òì³£´¦Àí¶þ£ºÏûÏ¢ÖØ·¢
ÏûÏ¢ÒÑ·¢ËÍÔò±íʾÉÏÓÎÓ¦ÓÃÒѾִÐУ¬½ÓÏÂÀ´ÔòÈ·±£ÏÂÓÎÓ¦ÓÃÒ²ÄÜÕý³£Ö´ÐС£
¿É¿¿ÏûÏ¢·þÎñ·¢ÏÖ¿É¿¿ÏûÏ¢·þÎñÖдæÔÚÏûϢ״̬ΪÒÑ·¢ËͲ¢ÇÒ³¬Ê±µÄÏûÏ¢£¬Ôò±íʾ¿É¿¿ÏûÏ¢·þÎñºÍÏÂÓÎÓ¦ÓÃÖдæÔÚÒì³£µÄ²½Ö裬ÎÞÂÛÄĸö²½Öè³öÏÖÒì³££¬¿É¿¿ÏûÏ¢·þÎñ¶¼½«´ËÏûÏ¢ÖØÐÂͶµÝµ½
MQ ×é¼þÖй©ÏÂÓÎÓ¦ÓüàÌý¡£
ÏÂÓÎÓ¦ÓüàÌýµ½´ËÏûÏ¢ºó£¬ÔÚ±£Ö¤ÃݵÈÐÔµÄÇé¿öÏÂÖØÐÂÖ´ÐÐÒµÎñ²¢Í¨Öª¿É¿¿ÏûÏ¢·þÎñ´ËÏûÏ¢ÒѾ³É¹¦Ïû·Ñ£¬×îÖÕÈ·±£ÉÏÓÎÓ¦Óá¢ÏÂÓÎÓ¦ÓõÄÊý¾Ý×îÖÕÒ»ÖÂÐÔ¡£¾ßÌåÁ÷³ÌÈçÏ£º

¿É¿¿ÏûÏ¢·þÎñ¶¨Ê±²éѯ״̬ΪÒÑ·¢ËͲ¢³¬Ê±µÄÏûÏ¢
¿É¿¿ÏûÏ¢½«ÏûÏ¢ÖØÐÂͶµÝµ½ MQ ×é¼þÖÐ
ÏÂÓÎÓ¦ÓüàÌýÏûÏ¢£¬ÔÚÂú×ãÃݵÈÐÔµÄÌõ¼þÏ£¬ÖØÐÂÖ´ÐÐÒµÎñ¡£
ÏÂÓÎÓ¦ÓÃ֪ͨ¿É¿¿ÏûÏ¢·þÎñ¸ÃÏûÏ¢ÒѾ³É¹¦Ïû·Ñ¡£
ͨ¹ýÏûϢ״̬ȷÈϺÍÏûÏ¢ÖØ·¢Á½¸ö¹¦ÄÜ£¬¿ÉÒÔÈ·±£ÉÏÓÎÓ¦Óᢿɿ¿ÏûÏ¢·þÎñºÍÏÂÓÎÓ¦ÓÃÊý¾ÝµÄ×îÖÕÒ»ÖÂÐÔ¡£
ËÄ ÈâÉíʵսRabbitmq
ÎÒÃÇÔÚrabbitmqÉÏÈâÉíʵսÁËһϿɿ¿ÏûÏ¢£¬rabbitmqµÄ·¢Ë͹ý³ÌÈçÏÂ
·¢ËÍÏûÏ¢µ½ÏûÏ¢·þÎñ
ÏûÏ¢¶ÓÁн«ÏûÏ¢·¢Ë͸ø¼àÌý
ÏûÏ¢¼àÌý½ÓÊܲ¢´¦ÀíÏûÏ¢
ÎÒÃÇÀ´¿´¿´¿ÉÄÜ·¢ËÍÒì³£µÄËÄÖÖ
1 Ö±½ÓÎÞ·¨µ½´ïÏûÏ¢·þÎñ
ÍøÂç¶ÏÁË£¬Å׳öÒì³££¬ÒµÎñÖ±½Ó»Ø¹ö¼´¿É¡£Èç¹û³öÏÖconnenction error£¬Ö±½ÓÔö¼Ó connectionÊý¼´¿É
connectionFactory.setChannelCacheSize(100);
2 ÏûÏ¢ÒѾµ½´ï·þÎñÆ÷£¬µ«·µ»ØµÄʱºò³öÏÖÒì³£
rabbitmqÌṩÁËÈ·ÈÏack»úÖÆ£¬¿ÉÒÔÓÃÀ´È·ÈÏÏûÏ¢ÊÇ·ñÓзµ»Ø¡£Òò´ËÎÒÃÇ¿ÉÒÔÔÚ·¢ËÍǰÔÚdbÖÐ(ÄÚ´æ»ò¹ØÏµÐÍÊý¾Ý¿â)ÏÈ´æÒ»ÏÂÏûÏ¢£¬Èç¹ûackÒì³£Ôò½øÐÐÖØ·¢
/**confirmcallbackÓÃÀ´È·ÈÏÏûÏ¢ÊÇ·ñÓÐËÍ´ïÏûÏ¢¶ÓÁÐ*/
rabbitTemplate.setConfirmCallback((correlationData,
ack, cause) -> {
if (!ack) {
//try to resend msg
} else {
//delete msg in db
}
});
/**ÈôÏûÏ¢ÕÒ²»µ½¶ÔÓ¦µÄExchange»áÏÈ´¥·¢returncallback */
rabbitTemplate.setReturnCallback((message, replyCode,
replyText, tmpExchange, tmpRoutingKey) -> {
try {
Thread.sleep(Constants.ONE_SECOND);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("send message failed: "
+ replyCode + " " + replyText);
rabbitTemplate.send(message);
}); |
Èç¹ûÏûϢûÓе½exchange,Ôòconfirm»Øµ÷,ack=false
Èç¹ûÏûÏ¢µ½´ïexchange,Ôòconfirm»Øµ÷,ack=true
µ«Èç¹ûÊÇÕÒ²»µ½exchange£¬Ôò»áÏÈ´¥·¢returncallback
3 ÏûÏ¢ËÍ´ïºó£¬ÏûÏ¢·þÎñ×Ô¼º¹ÒÁË
Èç¹ûÉèÖÃÁËÏûÏ¢³Ö¾Ã»¯£¬ÄÇôack= trueÊÇÔÚÏûÏ¢³Ö¾Ã»¯Íê³Éºó£¬¾ÍÊÇ´æµ½Ó²ÅÌÉÏÖ®ºóÔÙ·¢Ë͵ģ¬È·±£ÏûÏ¢ÒѾ´æÔÚÓ²ÅÌÉÏ£¬ÍòÒ»ÏûÏ¢·þÎñ¹ÒÁË£¬ÏûÏ¢·þÎñ»Ö¸´ÊÇÄܹ»ÔÙÖØ·¢ÏûÏ¢
4 δËÍ´ïÏû·ÑÕß
ÏûÏ¢·þÎñÊÕµ½ÏûÏ¢ºó£¬ÏûÏ¢»á´¦ÓÚ"UNACK"µÄ״̬£¬Ö±µ½¿Í»§¶ËÈ·ÈÏÏûÏ¢
channel.basicQos(1);
// accept only one unack-ed message at a time
(see below)
final Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '"
+ message + "'");
try {
doWork(message);
} finally {
//È·ÈÏÊÕµ½ÏûÏ¢
channel.basicAck(envelope.getDeliveryTag(),
false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck,
consumer); |
5 È·ÈÏÏûÏ¢¶ªÊ§
ÏûÏ¢·µ»ØÊ±¼ÙÉèÈ·ÈÏÏûÏ¢¶ªÊ§ÁË£¬ÄÇôÏûÏ¢·þÎñ»áÖØ·¢ÏûÏ¢¡£×¢Ò⣬Èç¹ûÄãÉèÖÃÁËautoAck=
false£¬µ«ÓÖûӦ´ð channel.baskAckҲûÓÐÓ¦´ð channel.baskNack£¬ÄÇô»áµ¼Ö·dz£ÑÏÖØµÄ´íÎó£ºÏûÏ¢¶ÓÁлᱻ¶ÂÈûס£¬ËùÒÔ£¬ÎÞÂÛÈçºÎ¶¼±ØÐëÓ¦´ð
6 Ïû·ÑÕßÒµÎñ´¦ÀíÒì³£
ÏûÏ¢¼àÌý½ÓÊÜÏûÏ¢²¢´¦Àí£¬¼ÙÉèÅ×Òì³£ÁË£¬µÚÒ»½×¶ÎÊÂÎïÒѾÍê³É£¬Èç¹ûÒªÅäÖûعöÔò¹ýÓÚÂé·³£¬¼´Ê¹×öÊÂÎñ²¹³¥Ò²¿ÉÄÜÊÂÎñ²¹³¥Ê§Ð§µÄÇé¿ö£¬ËùÒÔÕâÀï¿ÉÒÔ×öÒ»¸öÖØ¸´Ö´ÐУ¬±ÈÈçguavaµÄretry£¬ÉèÖÃÒ»¸öÖ¸Êýʱ¼äÀ´Ñ»·Ö´ÐУ¬Èç¹ûn´ÎºóÒÀȻʧ°Ü£¬·¢Óʼþ¡¢¶ÌÐÅ£¬ÓÃÈËÈâÀ´¶µµ×¡£
|