| ±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚ51CTO£¬±¾Õ¼òµ¥¶ÔMessage
BrokerÓëAMQPµÄһЩÀíÂÛÌØÕ÷Óë´¦ÀíÁ÷³Ì×öÁ˽éÉÜ¡£
|
|
MQ:Message Queue£¬ÏûÏ¢¶ÓÁУ¬ÊÇÒ»ÖÖÓ¦ÓóÌÐò¶ÔÓ¦ÓóÌÐòµÄͨÐÅ·½·¨£»Ó¦ÓóÌÐòͨ¹ý¶Áд³öÈë¶ÓÁеÄÏûÏ¢£¨Õë¶ÔÓ¦ÓóÌÐòµÄÊý¾Ý£©À´Í¨ÐÅ£¬¶øÎÞÐèרÓÃÁ¬½ÓÀ´Á´½ÓËüÃÇ¡£
1 rabbitMQÈëÃż°ÔÀí
rabbitMQ¹ÙÍø£ºhttp://www.rabbitmq.com/
Erlang¹ÙÍø£ºhttp://www.erlang.org/
1.1 rabbitMQ¸ÅÊö
RabbitMQÊÇÒ»¸öÓÉErlang¿ª·¢µÄAMQP£¨AdvancedMessage Queue £©µÄ¿ªÔ´ÊµÏÖ£¬Ö§³Ö¶àÖÖ¿Í»§¶Ë£¬È磺Python¡¢Ruby¡¢.NET¡¢Java¡¢JMS¡¢C¡¢PHP¡¢ActionScript¡¢XMPP¡¢STOMPµÈ£¬Ö§³ÖAJAX¡£ÓÃÓÚÔÚ·Ö²¼Ê½ÏµÍ³Öд洢ת·¢ÏûÏ¢£¬ÔÚÒ×ÓÃÐÔ¡¢À©Õ¹ÐÔ¡¢¸ß¿ÉÓÃÐԵȷ½Ãæ±íÏÖ²»Ëס£
½èÓÃÍøÂçÖÐÒ»¸örabbitMQµÄϵͳ¼Ü¹¹Í¼£º


1.1.1 AMQP¼ò½é
AMQP£¬¼´Advanced Message Queuing Protocol,Ò»¸öÌṩͳһÏûÏ¢·þÎñµÄÓ¦Óòã±ê×¼¸ß¼¶ÏûÏ¢¶ÓÁÐÐÒé,ÊÇÓ¦ÓòãÐÒéµÄÒ»¸ö¿ª·Å±ê×¼,ÎªÃæÏòÏûÏ¢µÄÖмä¼þÉè¼Æ¡£»ùÓÚ´ËÐÒéµÄ¿Í»§¶ËÓëÏûÏ¢Öмä¼þ¿É´«µÝÏûÏ¢£¬²¢²»Êܿͻ§¶Ë/Öмä¼þ²»Í¬²úÆ·£¬²»Í¬µÄ¿ª·¢ÓïÑÔµÈÌõ¼þµÄÏÞÖÆ¡£ErlangÖеÄʵÏÖÓÐ
RabbitMQµÈ¡£--°Ù¶È°Ù¿Æ
Message BrokerÓëAMQP¼ò½é
Message BrokerÊÇÒ»ÖÖÏûÏ¢ÑéÖ¤¡¢´«Ê䡢·Óɵļܹ¹Ä£Ê½£¬ÆäÉè¼ÆÄ¿±êÖ÷ÒªÓ¦ÓÃÓÚÏÂÃæÕâЩ³¡¾°£º
ÏûϢ·Óɵ½Ò»¸ö»ò¶à¸öÄ¿µÄµØ
ÏûϢת»¯ÎªÆäËûµÄ±íÏÖ·½Ê½
Ö´ÐÐÏûÏ¢µÄ¾Û¼¯¡¢ÏûÏ¢µÄ·Ö½â£¬²¢½«½á¹û·¢Ë͵½ËûÃǵÄÄ¿µÄµØ£¬È»ºóÖØÐÂ×éºÏÏàÓ¦·µ»Ø¸øÏûÏ¢Óû§
µ÷ÓÃWeb·þÎñÀ´¼ìË÷Êý¾Ý
ÏìӦʼþ»ò´íÎó
ʹÓ÷¢²¼-¶©ÔÄģʽÀ´ÌṩÄÚÈÝ»ò»ùÓÚÖ÷ÌâµÄÏûϢ·ÓÉ
AMQPÊÇAdvanced Message QueuingProtocolµÄ¼ò³Æ£¬ËüÊÇÒ»¸öÃæÏòÏûÏ¢Öмä¼þµÄ¿ª·Åʽ±ê×¼Ó¦ÓòãÐÒé¡£AMQP¶¨ÒåÁËÕâÐ©ÌØÐÔ£º
ÏûÏ¢·½Ïò
ÏûÏ¢¶ÓÁÐ
ÏûϢ·ÓÉ£¨°üÀ¨£ºµãµ½µãºÍ·¢²¼-¶©ÔÄģʽ£©
¿É¿¿ÐÔ
°²È«ÐÔ
RabbitMQ¾ÍÊÇÒÔAMQPÐÒéʵÏÖµÄÒ»ÖÖÖмä¼þ²úÆ·£¬Ëü¿ÉÒÔÖ§³Ö¶àÖÖ²Ù×÷ϵͳ£¬¶àÖÖ±à³ÌÓïÑÔ£¬¼¸ºõ¿ÉÒÔ¸²¸ÇËùÓÐÖ÷Á÷µÄÆóÒµ¼¶¼¼Êõƽ̨¡£
1.1.1.1 AMQPÀíÂÛ
AMQPµÄÖ÷ÒªÌØÕ÷ÊÇÃæÏòÏûÏ¢¡¢¶ÓÁС¢Â·ÓÉ£¨°üÀ¨µã¶ÔµãºÍ·¢²¼/¶©ÔÄ£©¡¢¿É¿¿ÐÔ¡¢°²È«¡£
¼òµ¥½éÉÜAMQPµÄÐÒéÕ»£¬AMQPÐÒé±¾Éí°üº¬Èý²ã£¬ÈçÏ£º

Model Layer£¬Î»ÓÚÐÒé×î¸ß²ã£¬Ö÷Òª¶¨ÒåÁËһЩ¹©¿Í»§¶Ëµ÷ÓõÄÃüÁ¿Í»§¶Ë¿ÉÒÔͨ¹ýÕâЩÃüÁîʵÏÖ×Ô¼ºµÄÒµÎñÂß¼£¬ÀýÈ磬¿Í»§¶Ë¿ÉÒÔͨ¹ýqueue
declareÉùÃ÷Ò»¸ö¶ÓÁУ¬ÀûÓÃconsumeÃüÁî»ñÈ¡¶ÓÁеÄÏûÏ¢¡£
Session Layer£¬Ö÷Òª¸ºÔ𽫿ͻ§¶ËÃüÁî·¢Ë͸ø·þÎñÆ÷£¬ÔÚ½«·þÎñÆ÷¶ËµÄÓ¦´ð·µ»Ø¸ø¿Í»§¶Ë£¬Ö÷ҪΪ¿Í»§¶ËÓë·þÎñÆ÷Ö®¼äͨÐÅÌṩ¿É¿¿ÐÔ¡¢Í¬²½»úÖÆºÍ´íÎó´¦Àí¡£
Transport Layer£¬Ö÷Òª´«Êä¶þ½øÖÆÊý¾ÝÁ÷£¬Ìṩ֡µÄ´¦Àí¡¢ÐŵÀ¸´ÓᢴíÎó¼ì²âºÍÊý¾Ý±íʾ¡£
ÕâÖÖ·Ö²ã¼Ü¹¹ÀàËÆÓÚOSIÍøÂçÐÒ飬¿ÉÌæ»»¸÷²ãʵÏÖ¶ø²»Ó°ÏìÓëÆäËü²ãµÄ½»»¥¡£AMQP¶¨ÒåÁ˺ÏÊʵķþÎñÆ÷¶ËÓòÄ£ÐÍ£¬ÓÃÓڹ淶·þÎñÆ÷µÄÐÐΪ£¨AMQP·þÎñÆ÷¶Ë¿É³ÆÎªbroker£©¡£ÔÚÕâÀïModel²ã¾ö¶¨ÕâЩ»ù±¾ÓòÄ£ÐÍËù²úÉúµÄÐÐΪ£¬ÕâÖÖÐÐΪÔÚAMQPÖÐÓÃcommand±íʾ¡£Session²ã¶¨Òå¿Í»§¶ËÓëbrokerÖ®¼äµÄͨÐÅ£¨Í¨ÐÅË«·½¶¼ÊÇÒ»¸öpeer£¬¿É»¥³Æ×öpartner£©£¬ÎªcommandµÄ¿É¿¿´«ÊäÌṩ±£ÕÏ¡£Transport²ãרעÓÚÊý¾Ý´«ËÍ£¬²¢ÓëSession±£³Ö½»»¥£¬½ÓÊÜÉϲãµÄÊý¾Ý£¬×é×°³É¶þ½øÖÆÁ÷£¬´«Ë͵½receiverºóÔÙ½âÎöÊý¾Ý£¬½»¸¶¸øSession²ã¡£Session²ãÐèÒªTransport²ãÍê³ÉÍøÂçÒì³£Çé¿öµÄ»ã±¨£¬Ë³Ðò´«ËÍcommandµÈ¹¤×÷¡£
½ÓÏÂÀ´Á˽âÏÂAMQPµ±ÖеÄһЩ¸ÅÄî¡£
Broker£¨Server£©£º½ÓÊܿͻ§¶ËÁ¬½Ó£¬ÊµÏÖAMQPÏûÏ¢¶ÓÁкÍ·Óɹ¦ÄܵĽø³Ì¡£
Virtual Host£ºÆäʵÊÇÒ»¸öÐéÄâ¸ÅÄÀàËÆÓÚȨÏÞ¿ØÖÆ×飬һ¸öVirtual HostÀïÃæ¿ÉÒÔÓÐÈô¸É¸öExchangeºÍQueue£¬µ«ÊÇȨÏÞ¿ØÖƵÄ×îСÁ£¶ÈÊÇVirtual
Host¡£
Exchange£º½ÓÊÜÉú²úÕß·¢Ë͵ÄÏûÏ¢£¬²¢¸ù¾ÝBinding¹æÔò½«ÏûϢ·Óɸø·þÎñÆ÷ÖеĶÓÁС£ExchangeType¾ö¶¨ÁËExchange·ÓÉÏûÏ¢µÄÐÐΪ£¬ÀýÈ磬ÔÚRabbitMQÖУ¬ExchangeTypeÓÐdirect¡¢FanoutºÍTopicÈýÖÖ£¬²»Í¬ÀàÐ͵ÄExchange·ÓɵÄÐÐΪÊDz»Ò»ÑùµÄ¡£
Message Queue£ºÏûÏ¢¶ÓÁУ¬ÓÃÓÚ´æ´¢»¹Î´±»Ïû·ÑÕßÏû·ÑµÄÏûÏ¢¡£
Message£ºÓÉHeaderºÍBody×é³É£¬HeaderÊÇÓÉÉú²úÕßÌí¼ÓµÄ¸÷ÖÖÊôÐԵļ¯ºÏ£¬°üÀ¨MessageÊÇ·ñ±»³Ö¾Ã»¯¡¢ÓÉÄĸöMessage
Queue½ÓÊÜ¡¢ÓÅÏȼ¶ÊǶàÉٵȡ£¶øBodyÊÇÕæÕýÐèÒª´«ÊäµÄAPPÊý¾Ý¡£
Binding£ºBindingÁªÏµÁËExchangeÓëMessageQueue¡£ExchangeÔÚÓë¶à¸öMessageQueue·¢ÉúBindingºó»áÉú³ÉÒ»ÕÅ·ÓÉ±í£¬Â·ÓɱíÖд洢×ÅMessage
QueueËùÐèÏûÏ¢µÄÏÞÖÆÌõ¼þ¼´Binding Key¡£µ±ExchangeÊÕµ½Messageʱ»á½âÎöÆäHeaderµÃµ½Routing
Key£¬Exchange¸ù¾ÝRouting KeyÓëExchangeType½«Message·Óɵ½MessageQueue¡£Binding
KeyÓÉConsumerÔÚBinding ExchangeÓëMessageQueueʱָ¶¨£¬¶øRouting
KeyÓÉProducer·¢ËÍMessageʱָ¶¨£¬Á½Õߵį¥Å䷽ʽÓÉExchangeType¾ö¶¨¡£
Connection£ºÁ¬½Ó£¬¶ÔÓÚRabbitMQ¶øÑÔ£¬Æäʵ¾ÍÊÇÒ»¸öλÓÚ¿Í»§¶ËºÍBrokerÖ®¼äµÄTCPÁ¬½Ó¡£
Channel£ºÐŵÀ£¬½ö½ö´´½¨Á˿ͻ§¶Ëµ½BrokerÖ®¼äµÄÁ¬½Óºó£¬¿Í»§¶Ë»¹ÊDz»ÄÜ·¢ËÍÏûÏ¢µÄ¡£ÐèҪΪÿһ¸öConnection´´½¨Channel£¬AMQPÐÒ鹿¶¨Ö»ÓÐͨ¹ýChannel²ÅÄÜÖ´ÐÐAMQPµÄÃüÁî¡£Ò»¸öConnection¿ÉÒÔ°üº¬¶à¸öChannel¡£Ö®ËùÒÔÐèÒªChannel£¬ÊÇÒòΪTCPÁ¬½ÓµÄ½¨Á¢ºÍÊͷŶ¼ÊÇÊ®·Ö°º¹óµÄ£¬Èç¹ûÒ»¸ö¿Í»§¶Ëÿһ¸öÏ̶߳¼ÐèÒªÓëBroker½»»¥£¬Èç¹ûÿһ¸öÏ̶߳¼½¨Á¢Ò»¸öTCPÁ¬½Ó£¬ÔÝÇÒ²»¿¼ÂÇTCPÁ¬½ÓÊÇ·ñÀË·Ñ£¬¾ÍËã²Ù×÷ϵͳҲÎÞ·¨³ÐÊÜÿÃ뽨Á¢Èç´Ë¶àµÄTCPÁ¬½Ó¡£RabbitMQ½¨Òé¿Í»§¶ËÏß³ÌÖ®¼ä²»Òª¹²ÓÃChannel£¬ÖÁÉÙÒª±£Ö¤¹²ÓÃChannelµÄÏ̷߳¢ËÍÏûÏ¢±ØÐëÊÇ´®Ðе쬵«Êǽ¨Ò龡Á¿¹²ÓÃConnection¡£
Command£ºAMQPµÄÃüÁ¿Í»§¶Ëͨ¹ýCommandÍê³ÉÓëAMQP·þÎñÆ÷µÄ½»»¥À´ÊµÏÖ×ÔÉíµÄÂß¼¡£ÀýÈçÔÚRabbitMQÖУ¬¿Í»§¶Ë¿ÉÒÔͨ¹ýpublishÃüÁî·¢ËÍÏûÏ¢£¬txSelect¿ªÆôÒ»¸öÊÂÎñ£¬txCommitÌá½»Ò»¸öÊÂÎñ¡£
ÏûÏ¢Öмä¼þµÄÖ÷Òª¹¦ÄÜÊÇÏûÏ¢µÄ·ÓÉ(Routing)ºÍ»º´æ(Buffering)¡£ÔÚAMQPÖÐÌṩÀàËÆ¹¦ÄܵÄÁ½ÖÖÓòÄ£ÐÍ£ºExchange
ºÍ Messagequeue¡£

Exchange½ÓÊÕÏûÏ¢Éú²úÕß(MessageProducer)·¢Ë͵ÄÏûÏ¢¸ù¾Ý²»Í¬µÄ·ÓÉËã·¨½«ÏûÏ¢·¢ËÍÍùMessage
queue¡£Messagequeue»áÔÚÏûÏ¢²»Äܱ»Õý³£Ïû·Ñʱ»º´æÕâЩÏûÏ¢£¬¾ßÌåµÄ»º´æ²ßÂÔÓÉʵÏÖÕß¾ö¶¨£¬µ±message
queueÓëÏûÏ¢Ïû·ÑÕß(Messageconsumer)Ö®¼äµÄÁ¬½Óͨ³©Ê±£¬Message queueÓн«ÏûϢת·¢µ½consumerµÄÔðÈΡ£
Ò»¸öMessageµÄ´¦ÀíÁ÷³ÌÀàËÆÓÚÏÂͼ£º

MessageÊǵ±Ç°Ä£ÐÍÖÐËù²Ù×ݵĻù±¾µ¥Î»£¬ËüÓÉProducer²úÉú£¬¾¹ýBroker±»ConsumerËùÏû·Ñ¡£ËüµÄ»ù±¾½á¹¹ÓÐÁ½²¿·Ö:
HeaderºÍBody¡£HeaderÊÇÓÉProducerÌí¼ÓÉϵĸ÷ÖÖÊôÐԵļ¯ºÏ£¬ÕâЩÊôÐÔÓпØÖÆMessageÊÇ·ñ¿É±»»º´æ£¬½ÓÊÕµÄqueueÊÇÄĸö£¬ÓÅÏȼ¶ÊǶàÉٵȡ£BodyÊÇÕæÕýÐèÒª´«Ë͵ÄÊý¾Ý£¬ËüÊǶÔBroker²»¿É¼ûµÄ¶þ½øÖÆÊý¾ÝÁ÷£¬ÔÚ´«Êä¹ý³ÌÖв»Ó¦¸ÃÊܵ½Ó°Ïì¡£
Ò»¸öbrokerÖÐ»á´æÔÚ¶à¸öMessage queue£¬ExchangeÔõÑùÖªµÀËüÒª°ÑÏûÏ¢·¢Ë͵½ÄĸöMessage
queueÖÐÈ¥ÄØ? Õâ¾ÍÊÇÉÏͼÖÐËùչʾBindingµÄ×÷Óá£MessagequeueµÄ´´½¨ÊÇÓÉclient
application¿ØÖƵģ¬ÔÚ´´½¨Message queueºóÐèҪȷ¶¨ËüÀ´½ÓÊÕ²¢±£´æÄĸöExchange·ÓɵĽá¹û¡£BindingÊÇÓÃÀ´¹ØÁªExchangeÓëMessage
queueµÄÓòÄ£ÐÍ¡£Clientapplication¿ØÖÆExchangeÓëij¸öÌØ¶¨Messagequeue¹ØÁª£¬²¢½«Õâ¸öqueue½ÓÊÜÄÄÖÖÏûÏ¢µÄÌõ¼þ°ó¶¨µ½Exchange£¬Õâ¸öÌõ¼þÒ²½ÐBinding
key»òÊÇ Criteria¡£
ÔÚÓë¶à¸öMessagequeue¹ØÁªºó£¬ExchangeÖÐ¾Í»á´æÔÚÒ»¸ö·ÓÉ±í£¬Õâ¸ö±íÖд洢×Åÿ¸öMessage
queueËùÐèÒªÏûÏ¢µÄÏÞÖÆÌõ¼þ¡£Exchange¾Í»á¼ì²éËü½ÓÊܵ½µÄÿ¸öMessageµÄHeader¼°BodyÐÅÏ¢£¬À´¾ö¶¨½«Message·Óɵ½ÄĸöqueueÖÐÈ¥¡£MessageµÄHeaderÖÐÓ¦¸ÃÓиöÊôÐÔ½ÐRouting
Key£¬ËüÓÉMessage·¢ËÍÕß²úÉú£¬Ìṩ¸øExchange·ÓÉÕâÌõMessageµÄ±ê×¼¡£Exchange¸ù¾Ý²»Í¬Â·ÓÉËã·¨Óв»Í¬ÓÐExchangeType¡£±ÈÈçÓÐDirectÀàËÆ£¬ÐèÒªBindingkeyµÈÓÚRouting
key£»Ò²ÓÐBindingkeyÓëRouting key·ûºÏÒ»¸öģʽ¹ØÏµ£»Ò²Óиù¾ÝMessage°üº¬µÄijЩÊôÐÔÀ´Åжϡ£Ò»Ð©»ù´¡µÄ·ÓÉËã·¨ÓÉAMQPËùÌṩ£¬clientapplicationÒ²¿ÉÒÔ×Ô¶¨Òå¸÷ÖÖ×Ô¼ºµÄÀ©Õ¹Â·ÓÉËã·¨¡£
ÔÚAMQPÖУ¬Client applicationÏëÒªÓëBroker¹µÍ¨£¬¾ÍÐèÒª½¨Á¢ÆðÓëBrokerµÄconnection£¬ÕâÖÖconnectionÆäʵÊÇÓëVirtual
HostÏà¹ØÁªµÄ£¬Ò²¾ÍÊÇ˵£¬connectionÊǽ¨Á¢ÔÚclientÓëVirtual HostÖ®¼ä¡£¿ÉÒÔÔÚÒ»¸öconnectionÉϲ¢·¢ÔËÐжà¸öchannel£¬Ã¿¸öchannelÖ´ÐÐÓëBrokerµÄͨÐÅ£¬ÎÒÃÇÇ°ÃæÌṩµÄsession¾ÍÊÇÒÀ¸½ÓÚchannelÉϵġ£
ÕâÀïµÄSession¿ÉÒÔÓжàÖÖ¶¨Ò壬¼È¿ÉÒÔ±íʾAMQPÄÚ²¿ÌṩµÄcommand·Ö·¢»úÖÆ£¬Ò²¿ÉÒÔ˵ÊÇÔÚºê¹ÛÉÏÇø±ðÓëÓòÄ£Ð͵Ľӿڡ£Õý³£Àí½â¾ÍÊÇÎÒÃÇÆ½Ê±Ëù˵µÄ½»»¥context£¬Ö÷Òª×÷ÓþÍÊÇÔÚÍøÂçÉϿɿ¿µØ´«µÝÿһ¸öcommand¡£ÔÚAMQPµÄÉè¼ÆÖУ¬Ó¦µ±ÊÇ½è¼øÁËTCPµÄ¸÷ÖÖÉè¼Æ£¬ÓÃÓÚ±£Ö¤ÕâÖÖ¿É¿¿ÐÔ¡£
ÔÚSession²ã£¬ÎªÉϲãËùÐèÒª½»»¥µÄÿ¸öcommand·ÖÅäÒ»¸öΩһ±êʶ·û(¿ÉÒÔÊÇÒ»¸öUUID)£¬ÊÇΪÁËÔÚ´«Êä¹ý³ÌÖпÉÒÔ¶Ôcommand×öУÑéºÍÖØ´«¡£Command·¢ËͶËÒ²ÐèÒª¼Ç¼ÿ¸ö·¢ËͳöÈ¥µÄcommandµ½ReplayBuffer£¬ÒÔÆÚµÃµ½½ÓÊÕ·½µÄ»ØÀ¡£¬±£Ö¤Õâ¸öcommand±»½ÓÊÕ·½Ã÷È·µØ½ÓÊÕ»òÊÇÒÑÖ´ÐÐÕâ¸öcommand¡£¶ÔÓÚ³¬Ê±Ã»ÓÐÊÕµ½·´À¡µÄcommand£¬·¢ËÍ·½ÔÙ´ÎÖØ´«¡£Èç¹û½ÓÊÕ·½ÒÑÃ÷È·µØ»ØÀ¡ÐÅÏ¢ÏëÒª¸æÖªcommand·¢ËÍ·½µ«ÕâÌõÐÅÏ¢ÔÚÖÐ;¶ªÊ§»òÊÇÆäËüÎÊÌâ·¢ËÍ·½Ã»ÓÐÊÕµ½£¬ÄÇô·¢ËÍ·½²»¶ÏÖØ´«»á¶Ô½ÓÊÕ·½²úÉúÓ°Ï죬ΪÁ˽µµÍÕâÖÖÓ°Ï죬command½ÓÊÕ·½ÉèÖÃÒ»¸ö¹ýÂËÆ÷IdempotencyBarrier£¬À´À¹½ØÄÇЩÒѽÓÊÕ¹ýµÄcommand¡£¹ØÓÚÕâÖÖÖØ´«¼°È·ÈÏ»úÖÆ£¬¿ÉÒԲο¼ÏÂTCPµÄÏà¹ØÉè¼Æ¡£
1.1.2 Erlang¼ò½é
Erlang([':l])ÊÇÒ»ÖÖͨÓõÄÃæÏò²¢·¢µÄ±à³ÌÓïÑÔ£¬ËüÓÉÈðµäµçÐÅÉè±¸ÖÆÔìḚ́®Á¢ÐÅËùϽµÄCS-Lab¿ª·¢£¬Ä¿µÄÊÇ´´ÔìÒ»ÖÖ¿ÉÒÔÓ¦¶Ô´ó¹æÄ£²¢·¢»î¶¯µÄ±à³ÌÓïÑÔºÍÔËÐл·¾³¡£ErlangÎÊÊÀÓÚ1987Ä꣬¾¹ýÊ®ÄêµÄ·¢Õ¹£¬ÓÚ1998Äê·¢²¼¿ªÔ´°æ±¾¡£ErlangÊÇÔËÐÐÓÚÐéÄâ»úµÄ½âÊÍÐÔÓïÑÔ£¬µ«ÊÇÏÖÔÚÒ²°üº¬ÓÐÎÚÆÕÈøÀ´óѧ¸ßÐÔÄÜErlang¼Æ»®£¨HiPE£©¿ª·¢µÄ±¾µØ´úÂë±àÒëÆ÷£¬×ÔR11B-4°æ±¾¿ªÊ¼£¬ErlangÒ²¿ªÊ¼Ö§³Ö½Å±¾Ê½½âÊÍÆ÷¡£ÔÚ±à³Ì·¶ÐÍÉÏ£¬ErlangÊôÓÚ¶àÖØ·¶Ðͱà³ÌÓïÑÔ£¬º¸Çº¯Êýʽ¡¢²¢·¢Ê½¼°·Ö²¼Ê½¡£Ë³ÐòÖ´ÐеÄErlangÊÇÒ»¸ö¼°ÔçÇóÖµ,
µ¥´Î¸³ÖµºÍ¶¯Ì¬ÀàÐ͵ĺ¯Êýʽ±à³ÌÓïÑÔ¡£
ErlangÊÇÒ»¸ö½á¹¹»¯£¬¶¯Ì¬ÀàÐͱà³ÌÓïÑÔ£¬ÄÚ½¨²¢ÐмÆËãÖ§³Ö¡£×î³õÊÇÓɰ®Á¢ÐÅרÃÅΪͨÐÅÓ¦ÓÃÉè¼ÆµÄ£¬±ÈÈç¿ØÖÆ½»»»»ú»òÕ߱任ÐÒéµÈ£¬Òò´Ë·Ç³£ÊʺÏÓÚ¹¹½¨·Ö²¼Ê½£¬ÊµÊ±Èí²¢ÐмÆËãϵͳ¡£Ê¹ÓÃErlang±àд³öµÄÓ¦ÓÃÔËÐÐʱͨ³£ÓɳÉǧÉÏÍò¸öÇáÁ¿¼¶½ø³Ì×é³É£¬²¢Í¨¹ýÏûÏ¢´«µÝÏ໥ͨѶ¡£½ø³Ì¼äÉÏÏÂÎÄÇл»¶ÔÓÚErlangÀ´Ëµ½ö½öÖ»ÊÇÒ»Á½¸ö»·½Ú£¬±ÈÆðC³ÌÐòµÄÏß³ÌÇл»Òª¸ßЧµÃ¶àµÃ¶àÁË¡£
ʹÓÃErlangÀ´±àд·Ö²¼Ê½Ó¦ÓÃÒª¼òµ¥µÄ¶à£¬ÒòΪËüµÄ·Ö²¼Ê½»úÖÆÊÇ͸Ã÷µÄ£º¶ÔÓÚ³ÌÐòÀ´Ëµ²¢²»ÖªµÀ×Ô¼ºÊÇÔÚ·Ö²¼Ê½ÔËÐС£ErlangÔËÐÐʱ»·¾³ÊÇÒ»¸öÐéÄâ»ú£¬ÓеãÏñJavaÐéÄâ»ú£¬ÕâÑù´úÂëÒ»¾±àÒ룬ͬÑù¿ÉÒÔËæ´¦ÔËÐС£ËüµÄÔËÐÐʱϵͳÉõÖÁÔÊÐí´úÂëÔÚ²»±»ÖжϵÄÇé¿öϸüС£ÁíÍâÈç¹ûÐèÒª¸ü¸ßЧµÄ»°£¬×Ö½Ú´úÂëÒ²¿ÉÒÔ±àÒë³É±¾µØ´úÂëÔËÐС£
ÆäËûMQ

1.1.3 ÏÂÔØrabbitMQ¡¢Erlang

Ê×Ò³£¬ÏÂÀÖÁ£ºdownloadÏÂÔØ£¬Tutorials½Ì³Ì

ÏÂÔØwindows°æ±¾£º
rabbitmq-server-3.6.12.exe
½Ì³ÌRabbitMQ Tutorials£º
http://www.erlang.org/·ÃÎʱȽÏÂý£¬½¨Òé´ó¼ÒÒ²¿ÉÒÔÍøÉÏÕÒ×ÊÔ´ÏÂÔØ¡£

1.1.4 rabbitMQ»ù±¾¸ÅÄî
spring-boot-rabbitMQÏîĿԴÂ룺https://git.oschina.net/wyait/springboot1.5.4.git
configÅäÖÃÀࣺ
| @Autowired
private ConnectionFactoryconnectionFactory;
@Autowired
private Queue3Listenerqueue3Listener;
@Bean
@Primary
public RabbitTemplaterabbitTemplate() {
RabbitTemplate rabbitTemplate = newRabbitTemplate
(connectionFactory);
rabbitTemplate.setMessageConverter(newJackson2
JsonMessageConverter());
return rabbitTemplate;
}
@Bean
@Primary
publicSimpleRabbitListenerContainerFactory
simpleRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactorysimpleRabb
itListenerContainerFactory = newSimpleRabbitLi
stenerContainerFactory();
simpleRabbitListenerContainerFactory.set ConnectionFactory
(connectionFactory);
simpleRabbitListenerContainerFactory.set MessageConverter
(newJackson2JsonMessageConverter());
returnsimpleRabbitListenerContainerFactory;
}
@Bean
publicSimpleMessageListenerContainer
simpleMessageListenerContainer() {
SimpleMessageListenerContainer container =new
SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue3());
container.setMessageListener(queue3Listener);
return container;
}
@Bean
public DirectExchangedirectExchange() {
return new DirectExchange(EX_CHANGE_NAME1);
}
@Bean
public Queue queue1() {
return new Queue(QUEUE1, true);
}
@Bean
public Queue queue2() {
return new Queue(QUEUE2, true);
}
@Bean
public Queue queue3() {
return new Queue(QUEUE3, true);
}
@Bean
public Binding binding1() {
returnBindingBuilder.bind(queue1()). to(directExchange())
.with(ROUTING_KEY1);
}
@Bean
public Binding binding2() {
returnBindingBuilder.bind(queue2()). to(directExchange())
.with(ROUTING_KEY2);
}
@Bean
public Binding binding3() {
return BindingBuilder.bind(queue3()). to(directExchange())
.with(ROUTING_KEY3);
} |
»ù±¾¸ÅÄ
ConnectionFactory¡¢Connection¡¢Channel
connectionΪsocketÁ¬½ÓµÄ·â×°£¬connectionFqactoryÊÇconnectionµÄÉú²ú¹¤³Ì£¬channelÊÇͨÐŵÄÐŵÀ£¬Êµ¼Ê½øÐÐÊý¾Ý½»Á÷µÄ¹ÜµÀ£¬ÒòΪ½¨Á¢connectionµÄ¿ªÏúÃ÷ÏÔÒª±È½¨Á¢channelÒª´óºÜ¶à£¬ËùÒÔÊý¾Ý´«ÊäÕæÊµ·¢ÉúÔÚchannelÄÚ
Exchange,Queue
exchangeÊÇ¿ÉÒÔÀí½â³ÉÒ»ÌõÌØÊâµÄ´«ÊäͨµÀ£¬Ëû»á°ÑÏûϢͶµÝµ½°ó¶¨µÄÏûÏ¢³ØÄÚ¡£
queue¾ÍÊÇÏûÏ¢³ØÁË£¬Ê¹ÓÃǰÐèÒª°ó¶¨exchange£¬ÒÔ¼°×Ô¼ºµÄ±êÖ¾¡£
exchange_key,routing_key
exchange_key¾ö¶¨ÁËpublisherµÄÏûϢͶµÝµ½ÄÄÌõͨµÀ£¬routing_key¾ö¶¨Á˽«ÏûÏ¢·Åµ½Äĸö³Ø×ÓÀï
°ó¶¨
queueÒª½ÓÊÜÏûÏ¢±ØÐëÓëexchange½øÐа󶨣¬²¢Ôڰ󶨵Äʱºò¸ø×Ô¼ºÓëexchangeµÄ°ó¶¨ÉèÖÃÒ»¸ö±ê¼Çrouting_key£¬ÒÔºóÓÃÀ´Æ¥ÅäÏûÏ¢½ÓÊÕ
exchangeÓëqueueÊÇÒ»¶Ô¶àµÄ¹ØÏµ£¬¸ù¾Ýexchange²»Í¬ÀàÐÍ£¬·Ö±ðͶµÝµ½²»Í¬µÄÏûÏ¢³Ø
ÏÂÃæÀ´¿´¿´exchangeµÄÀàÐÍ
1. fanout
Ö±½Ó½«ÏûÏ¢·¢Ë͵½Óë¸Ãexchange°ó¶¨µÄËùÓÐqueueÄÚ
2. direct
¶Ôrouting_key½øÐÐÑϸñÆ¥Å䣬µ±ÏûÏ¢À´µ½µÄʱºò£¬Ö»ÓÐexchangeÓëijqueue°ó¶¨µÄrouting_keyÍêȫƥÅä²Å½«ÏûϢͶµÝµ½¸Ãqueue
3. topic
ÓÃÌØÊâ·ûºÅ½øÐÐÆ¥Å䣬Âú×ãÌõ¼þµÄqueue¶¼ÄÜÊÕµ½ÏûÏ¢£¬ÕâÀïµÄrouting_keyÒÔ"."·Ö¸ô£¬*Æ¥ÅäÒ»¸öµ¥´Ê£¬#Æ¥Åä¶à¸öµ¥´Ê£¬Èç¹ûÂú×ã¶à¸öÌõ¼þÒ²²»»áͶµÝ¶à´Î
4. headers
²»ÒÀÀµrouting_keyÆ¥Å䣬¸ù¾ÝÏûÏ¢ÌåÄÚµÄheadersÊôÐÔÆ¥Å䣬°ó¶¨µÄʱºò¿ÉÒÔÖÆ¶¨¼üÖµ¶Ô
½ÓÏÂÀ´À´¿´¿´ÅäÖÃÎļþ
1.@BeanͳһעÈëµ½ÈÝÆ÷ÖУ¬ÎÒÃÇÉùÃ÷ÁËconnectionfactory£¬Ëû»á×Ô¶¯¸ù¾ÝapplicationÀïÃæµÄÊôÐÔ½øÐÐ×é×°£¬Õâ¸öÁ¬½Ó¶ÔÓÚºóÃæµÄÈÝÆ÷¶¼ÊÇÒªÓõ½µÄ£¬ÕâÀïҪעÒâconverterµÄÉèÖã¬ÒòΪÎÒÃÇÒª½«pojoÀàÐͽøÐд«Ê䣬һ°ã³ÌÐòÍâµÄ´«Êä¶¼Êǽ¨Á¢ÔÚ×Ö½ÚÁ÷µÄ»ù´¡ÉÏ£¬converter¾Í»á×Ô¶¯×ª»»
2.½ÓÏÂÀ´ÎÒÃÇÉùÃ÷queue£¬trueÊôÐÔÉèÖÃΪ³Ö¾ÃÐ͵ijØ×Ó£¬µ±Á¬½Ó¶Ï¿ªÊ±£¬ÏûÏ¢»áß±£Áô£¬È»ºóÉùÃ÷exchange£¬ÕâÀïÎÒÃÇʹÓõÄÊÇdirectexchange£¬½ÓÏÂÀ´½«Á½Õ߰󶍯ðÀ´
5. ÉùÃ÷SimpleMessageListenerContainer£¬SimpleRabbitListenerContainerFactory×¢ÒâÕâÀïÉùÃ÷Á½¸öÊÇÒòΪÕâÊÇÏûÏ¢¼àÌýµÄÁ½ÖÖ·½Ê½
Ê×ÏȽ²½²SimpleMessageListenerContainer£¬Õâ¸öÐèÒªÉèÖÃÈ·ÈÏ·½Ê½£¬Óн϶àÊôÐÔ¿ËÉèÖã¬ÓÐÐËȤ¿É×ÔÐÐÉèÖã¬ÕâÀïÎÒÖ»ÊǼòµ¥µÄÉèÖÃÁËһϣ¬È»ºóÒªÉèÖÃlistener£¬
listenerÐèҪʵÏÖChannelAwareMessageListenerÀïÃæÓÐ
public void onMessage(Message message,Channel channel)
µÄÖØÔØ·½·¨ÐèҪʵÏÖ£¬ÏûÏ¢ÌåÔÚMessageµÄbodyÄÚ£¬Ïà¶ÔÀ´ËµÐÅÏ¢±È½ÏÍ걸
½ÓÏÂÀ´¿´¿´SimpleRabbitListenerContainerFactory£¬Õâ¸öÓм¸¸ö×¢Òâµã£¬ÐèÒªÔÙ´ÎÉèÖÃconverterÒòΪ£¬Ò»¸öÊÇ·¢ÏûÏ¢µÄʱºò½âÎö³É¶þ½øÖÆ£¬Õâ¸öÔòÊǽ«¶þ½øÖƽâÎö³É¾ßÌåµÄÀ࣬»Øµ÷Ïà¶Ô¼òµ¥Ò»µã
| @Component
@RabbitListener(queues =RabbitMQConfig.QUEUE1,
containerFactory ="simpleRabbitListenerContainerFactory") public class Queue1Listener { private static Logger logger =LoggerFactory.getLogger(Queue1Listener.class); @RabbitHandler public void receive(@Payload String s) { logger.info("listener1 info: " +s); } } |
¼ÇµÃÐèÒªcontainerFactory¾ßÌåд³öÀ´
ÔÚ½ÓÊÕÏûÏ¢µÄ·½·¨ÉÏд@RabbitHandler£¬ÏûÏ¢Ìå´òÉÏ@payload¾Ã¿ÉÒÔ½ÓÊÜÏûÏ¢ÁË¡£
Æäʵ»¹Óиö·½·¨¾ÍÊÇÖ¸¶¨Ò»¸öMessageAdapter,È»ºóÔÚcontainerÀïÃæ¾Í¿ÉÒÔÖ¸¶¨½ÓÊյķ½·¨Ãû£¬²»ÊǺÜÍÆ¼ö£¬Ã÷ÎÄ·´Éä×ܸоõÈÝÒ׳öÎÊÌâ
µ±È»publisherÒ²ÊÇÓÐÏûÏ¢µÄ»Øµ÷µÄ
RabbitTemplateÏÂÓÐConfirmCallbackʵÏÖconfirm·½·¨¾ÍºÃÁË
1.2 rabbitMQÈëÃÅ
1.2.1 °²×°rabbitMQ£¨windows£©
°²×°²½Ö裺
1. °²×°Erland£¬Í¨¹ý¹Ù·½ÏÂÔØÒ³Ãæhttp://www.erlang.org/downloads»ñÈ¡exe°²×°°ü£¬Ö±½Ó´ò¿ª²¢Íê³É°²×°¡£
2. °²×°RabbitMQ£¬Í¨¹ý¹Ù·½ÏÂÔØÒ³Ãæhttps://www.rabbitmq.com/download.html»ñÈ¡exe°²×°°ü¡£
3. ÏÂÔØÍê³Éºó£¬¿ÉÒÔÖ±½ÓÔËÐа²×°³ÌÐò£¬»òÅäÖû·¾³±äÁ¿ºóÔËÐÐrabbitMQ-server°²×°³ÌÐò¡£
4. RabbitMQ Server°²×°Íê³ÉÖ®ºó£¬»á×Ô¶¯µÄ×¢²áΪ·þÎñ£¬²¢ÒÔĬÈÏÅäÖÃÆô¶¯ÆðÀ´¡£
°²×°¹ý³ÌÇë°Ù¶È
°²×°³É¹¦£º·ÃÎÊ£ºhttp://127.0.0.1:15672 Óû§ÃÜÂ룺guest/guest

ÎÒÃÇ¿ÉÒÔ¿´µ½Ò»Ð©»ù±¾¸ÅÄ±ÈÈ磺Connections¡¢Channels¡¢Exchanges¡¢QueueµÈ¡£µÚÒ»´ÎʹÓ㬿ÉÒÔ¶¼µã¿ª¿´¿´¶¼ÓÐЩʲôÄÚÈÝ£¬ÊìϤһÏÂRabbitMQ
ServerµÄ·þÎñ¶Ë¡£
µã»÷Admin±êÇ©£¬ÔÚÕâÀï¿ÉÒÔ½øÐÐÓû§µÄ¹ÜÀí¡£

µã»÷admin£¬Ìí¼ÓÓû§£ºwyait/wyait²¢ÊÚȨ¡£

µã»÷all users±íµ¥ÖеÄÓû§Ãû¡°wyait¡±½øÐÐÊÚȨ£º

1.2.1.1 Virtual HostsÉèÖýçÃæ£º 
³ÌÐòÖкÍrabbitMQ½»»¥µÄ¶Ë¿ÚÊÇ£º5672£¬AMQPÐÒé¶Ë¿Ú
1.2.2 ´´½¨spring-boot-MQ¹¤³Ì
ÏîĿԴÂ룬
ÂëÔÆµØÖ·£ºhttps://git.oschina.net/wyait/springboot1.5.4.git
githubµØÖ·£ºhttps://github.com/wyait/spring-boot-1.5.4.git
spring bootÕûºÏrabbitMQÏîÄ¿²©¿ÍÁ´½Ó£ºspring
boot 1.5.4 ÕûºÏrabbitMQ£¨Ê®Æß£©
1.2.3 ÏûÏ¢¶ÓÁÐ
¹ÙÍø£ºrabbitMQTutorials

ǰÌᣬrabbitMQ·þÎñÒѾÆô¶¯£»²âÊÔ¹ý³Ì£º
1£¬spring BootÏîÄ¿ÏÈÆô¶¯£¬¼àÌý¶ÓÁУ»
2£¬Æô¶¯²âÊÔÀà·¢ËÍÏûÏ¢µ½¶ÓÁÐÖУ»¡¢
3£¬Ïû·ÑÕßÏû·ÑÏûÏ¢¡£
1.2.3.1 ¡°hello world¡±
The simplest thingthat does something ¼òµ¥µÄÏûÏ¢¶ÓÁУº

P£ºÏûÏ¢µÄÉú²úÕߣ»
C£ºÏûÏ¢µÄÏû·ÑÕߣ»
ºìÉ«¿ò£ºÏûÏ¢¶ÓÁУ»
demo²Î¿¼1.2.2Õ½ڡ£
1.2.3.2 Work Queues
Distributing tasksamong workers (the competing consumers
pattern)

Ò»¸öÉú²úÕß¶ÔÓ¦Ò»¸öÏûÏ¢¶ÓÁÐMQ£¬MQ¿ÉÒÔ¶ÔÓ¦¶à¸öÏû·ÑÕߣ¬µ«ÊÇͬһ¸öÏûÏ¢Ö»Äܱ»Ò»¸ö¿Í»§¶ËÉú²úÕßËù»ñÈ¡;
ͬһ¸öÏûÏ¢Ö»Äܱ»Ò»¸ö¿Í»§¶ËËù»ñÈ¡¡£µ«ÊǶÔÓÚ²»Í¬µÄÏû·ÑÕߣ¬½ÓÊÜÏûÏ¢£¬´¦ÀíµÄЧÂʲ»Í¬£¬ËùÒÔ»áÓв»ºÏÀíµÄµØ·½¡£
ÔÚRabbitMqConfigÖж¨ÒåÒ»¸ö¶ÓÁÐworkQueues£º
| @Bean
public Queue workQueue() { return new Queue("workQueues"); } |
ÏûÏ¢Éú²úÕßWorkSender:
| @Component
public class WorkSender {
@Autowired
private AmqpTemplate rabbitMQTemplate;
/**
*
* @ÃèÊö£ºworkģʽ
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ14ÈÕÏÂÎç5:51:20
*/
public void workSend(String msg) {
String context = msg + new Date();
System.out.println("workSender : "
+ context);
this.rabbitMQTemplate.convertAndSend("workQueues", context);
}
} |
ÏûÏ¢Ïû·ÑÕß1 WorkReceiver:
| @Component
@RabbitListener(queues ="workQueues") public class WorkReceiver { @RabbitHandler // handler×¢½âÀ´Ö¸¶¨¶ÔÏûÏ¢µÄ´¦Àí·½·¨ public void process(String hello) { System.out.println("workReceiver:"
+ hello); } } |
ÏûÏ¢Ïû·ÑÕß2 WorkReceiverTwo:
| @Component
@RabbitListener(queues ="workQueues") public class WorkReceiverTwo { @RabbitHandler // handler×¢½âÀ´Ö¸¶¨¶ÔÏûÏ¢µÄ´¦Àí·½·¨ public void process(String hello) { System.out.println("workReceiverTwo:"
+ hello); } } |
²âÊÔÏû·ÑÏûÏ¢½á¹û£º 
ƽ¾ù·ÖÅäÏûÏ¢ÔÔò£¨ÄãÒ»Ìõ£¬ÎÒÒ»Ìõ£©¡£
¿Éͨ¹ý¸ü¸ÄchannelÉèÖ㬸ıä·ÖÅä²ßÂÔ¡£
1.2.3.3 Publish/Subscribe¶©ÔÄģʽ
Sending messagesto many consumers at once.
Ò»¸öÉú²úÕß½«Í¬Ò»ÌõÏûÏ¢message·¢Ë͵½½»»»»úexchange£¬Í¨¹ýexchange·¢Ë͵½¶à¸ö¶ÓÁÐÖУ¬¶ø¶ÔÓ¦µÄÏû·ÑÕß¶¼ÄÜ»ñÈ¡µ½¸ÃÏûÏ¢¡£

×¢Ò⣺
ÎÊÌâ1£ºÏûÏ¢ÊÇ·¢Ë͵½½»»»»ú¶ø²»ÊǶÓÁУ¿´ð£ºÏûÏ¢¿ÉÒÔ·¢Ë͵½¶ÓÁУ¬Ò²¿ÉÒÔ·¢Ë͵½½»»»»ú¡£
ÎÊÌâ2£ºÏû·ÑÕßµÄÏûÏ¢À´Ô´Ö»ÄÜÊǶÓÁУ»
ÎÊÌâ3£ºÈç¹û½«ÏûÏ¢·¢Ë͵½Ã»Óа󶨶ÓÁеĽ»»»»úÉÏ£¬ÏûÏ¢»áÈ¥ÄÄ£¿´ð£ºÏûÏ¢¶ªÊ§¡£
×ܽ᣺ÏûÏ¢Ö»ÄÜ´æ·ÅÓÚ¶ÓÁУ¬²»ÄÜ´æ·ÅÔÚ½»»»»ú£»½»»»»úÖ»ÄÜÓÃÓÚÏûÏ¢µÄ´«µÝ£¬ÏûϢͨµÀ¡£
Fanout Exchange:

²»´¦Àí·Óɼü(routingKey)¡£ÄãÖ»ÐèÒª¼òµ¥µÄ½«¶ÓÁа󶨵½½»»»»úÉÏ¡£Ò»¸ö·¢Ë͵½½»»»»úµÄÏûÏ¢¶¼»á±»×ª·¢µ½Óë¸Ã½»»»»ú°ó¶¨µÄËùÓжÓÁÐÉÏ¡£ºÜÏñ×ÓÍø¹ã²¥£¬Ã¿Ì¨×ÓÍøÄÚµÄÖ÷»ú¶¼»ñµÃÁËÒ»·Ý¸´ÖƵÄÏûÏ¢¡£Fanout½»»»»úת·¢ÏûÏ¢ÊÇ×î¿ìµÄ¡£
Fanout ¾ÍÊÇÎÒÃÇÊìϤµÄ¹ã²¥Ä£Ê½»òÕß¶©ÔÄģʽ£¬¸øFanoutת·¢Æ÷·¢ËÍÏûÏ¢£¬°ó¶¨ÁËÕâ¸öת·¢Æ÷µÄËùÓжÓÁж¼ÊÕµ½Õâ¸öÏûÏ¢¡£
ÕâÀïʹÓÃÈý¸ö¶ÓÁÐÀ´²âÊÔ£¨Ò²¾ÍÊÇÔÚApplicationÀàÖд´½¨ºÍ°ó¶¨µÄfanout.A¡¢fanout.B¡¢fanout.C£©ÕâÈý¸ö¶ÓÁж¼ºÍApplicationÖд´½¨µÄfanoutExchangeת·¢Æ÷°ó¶¨¡£
ÐÂÔösubscribe¶©ÔÄģʽÅäÖãº
| /**
* * @ÃèÊö£ºsubscribe¶©ÔÄģʽµÄ¶ÓÁÐA; * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31 * @return */ @Bean public Queue subscribeQueueA() { return new Queue("fanout.A"); } /** * * @ÃèÊö£ºsubscribe¶©ÔÄģʽµÄ¶ÓÁÐB; * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31 * @return */ @Bean public Queue subscribeQueueB() { return new Queue("fanout.B"); } /** * * @ÃèÊö£ºsubscribe¶©ÔÄģʽµÄ¶ÓÁÐC; * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31 * @return */ @Bean public Queue subscribeQueueC() { return new Queue("fanout.C"); } /** * * @ÃèÊö£ºfanoutExchange½»»»»ú * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:34:41 * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /** * * @ÃèÊö£ºsubscribeQueue°ó¶¨fanoutExchange½»»»»ú * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:41:10 * @param subscribeQueue * @param fanoutExchange * @return */ @Bean Binding bindingExchangeA(Queue subscribeQueueA, FanoutExchange fanoutExchange) { // °ó¶¨¶ÓÁÐAµ½fanoutExchange½»»»»ú£¬Ò²¿ÉÒÔʹÓãºbind(subscribeQueueA())½øÐаó¶¨; return BindingBuilder.bind(subscribeQueueA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue subscribeQueueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(subscribeQueueB).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue subscribeQueueC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(subscribeQueueC).to(fanoutExchange); } |
ÏûÏ¢Éú²úÕßSubscribeSenderÖ¸¶¨½»»»»ú£º
| @Component
public class SubscribeSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---SubscribeSender
: " +sendMsg);
// convertAndSend(String exchange, String routingKey, Objectmessage)
this.rabbitTemplate.convertAndSend("fanout Exchange", "aaa", sendMsg);
}
} |
ÏûÏ¢Ïû·ÑÕßSubscribeReveicerA¡¢B¡¢C¼àÌý¶ÓÁÐfanout.A/B/C:
| @Component
@RabbitListener(queues ="fanout.A") public class SubscribeReceiver{ @RabbitHandler public void precess(String msg) { System.out.println("SubscribeReceiverA
: " + msg); } } |
²âÊÔtestÀࣺ
| @Autowired
private SubscribeSender subSend; @Test public void subscribeTest() { System.out.println("==========subscribe·¢ËÍÏûÏ¢£¡"); for (int i = 0; i < 50; i++) { String msg = "==========msg_" +
i; subSend.send(msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } |
Èý¸öÏû·ÑÕß¶¼½ÓÊÕµ½ÁËÿһÌõÐÅÏ¢¡£

×¢Ò⣺subscribe¶©ÔÄģʽºÍworkģʽµÄÇø±ð¡£
1¡¢ workģʽ½«ÏûÏ¢·¢Ë͵½¶ÓÁÐ
2¡¢ ¶©ÔÄģʽ½«ÏûÏ¢·¢Ë͵½½»»»»ú
3¡¢ workģʽÊÇ1¸ö¶ÓÁÐN¸öÏû·ÑÕߣ¬¶©ÔÄģʽÊÇN¸ö¶ÓÁÐN¸öÏû·ÑÕß(N>0)
1.2.3.4 Routing·ÓÉģʽ 
·ÓÉģʽ£º»ùÓÚ¶©ÔÄģʽ£¬
¿ÉÒÔÔÚ¶ÓÁа󶨵½½»»»»úʱָ¶¨Ò»¸ö¹æÔò£¬¸ù¾Ý²»Í¬µÄÏûÏ¢¹æÔò£¬Ñ¡ÔñÊÇ·ñ½ÓÊܸÃÏûÏ¢¡£

´¦Àí·Óɼü(routingKey)¡£ÐèÒª½«Ò»¸ö¶ÓÁа󶨵½½»»»»úÉÏ£¬ÒªÇó¸ÃÏûÏ¢ÓëÒ»¸öÌØ¶¨µÄ·ÓɼüroutingKeyÍêȫƥÅä¡£
»ùÓÚSubscribe¶©ÔÄģʽ£¬ÅäÖÃÀàÖÐÌí¼Ó¶ÓÁС¢DirectExchange½»»»»ú²¢½øÐа󶨣º
| /**
* * @ÃèÊö£ºrouting·ÓÉģʽµÄ¶ÓÁÐA; * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31 * @return */ @Bean public Queue routingQueueA() { return new Queue("routing.A"); } /** * * @ÃèÊö£ºrouting·ÓÉģʽµÄ¶ÓÁÐB; * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31 * @return */ @Bean public Queue routingQueueB() { return new Queue("routing.B"); } /** * * @ÃèÊö£ºDirectExchange½»»»»ú * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:34:41 * @return */ @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } /** * * @ÃèÊö£ºroutingQueue°ó¶¨directExchange½»»»»ú * @´´½¨ÈË£ºwyait * @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:41:10 * @param routingQueue * @param directExchange * @return */ @Bean Binding bindingDirectExchangeA(Queue routingQueueA, DirectExchange directExchange) { // °ó¶¨routing¶ÓÁÐAµ½directExchange½»»»»ú, ²¢Ö¸¶¨routing·ÓɹæÔò; return BindingBuilder.bind(routingQueueA()). to(directExchange()) .with("info"); } @Bean Binding bindingDirectExchangeB(Queue routingQueueB, DirectExchange directExchange) { // °ó¶¨routing¶ÓÁÐAµ½directExchange½»»»»ú,²¢Ö¸¶¨ routing·ÓɹæÔò; returnBindingBuilder.bind(routingQueueB()).to (directExchange()) .with("error"); } |
ÏûÏ¢Éú²úÕߣº
| @Component
public class RoutingSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender :
" + sendMsg);
this.rabbitTemplate.convertAndSend("directExchange",
"info", sendMsg);
}
public void sendTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender TWO:
" +sendMsg);
this.rabbitTemplate
.convertAndSend("directExchange",
"infoTwo",sendMsg);
}
public void sendError(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender Error:
" +sendMsg);
this.rabbitTemplate.convertAndSend("directExchange", "error", sendMsg);
}
public void sendErrorTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender ErrorTwo:
" +sendMsg);
this.rabbitTemplate.convertAndSend("directExchange",
"errorTwo",
sendMsg);
}
} |
ÏûÏ¢Ïû·ÑÕßA£º
| @Component
@RabbitListener(queues ="routing.A") public class RoutingReceiver { @RabbitHandler public void precess(String msg) { System.out.println("RoutingReceiverA
=== : " + msg); } } |
²âÊÔÀࣺ
| @Autowired
private RoutingSender routSend; @Test public void routingTest() { System.out.println("==========routing·¢ËÍÏûÏ¢£¡"); routSend.send("==========msg_info "); routSend.sendTwo("==========msg_infoTwo
"); routSend.sendError("==========msg_error
"); routSend.sendErrorTwo("==========msg_ErrorTwo
"); System.out.println("==========routing·¢ËÍÏûÏ¢
½áÊø£¡"); } |
ÔËÐУº

MqApplication¿ØÖÆÌ¨£º

ÓÉ´Ë¿ÉÒÔ¿´³ö£¬routingKey·ûºÏ¹æÔòµÄÏûÏ¢£¬»á±»Ïû·Ñ·½½ÓÊÕ²¢Ïû·Ñ¡£
1.2.3.5 TopicsͨÅä·ûģʽ
Receiving messagesbased on a pattern (topics)

»ùÓÚ·ÓÉģʽ£¬Ê¹ÓÃͨÅä·ûÆ¥Åä¶ÓÁУ¬·¢ËÍÏûÏ¢

½«Â·ÓɼüºÍijģʽ½øÐÐÆ¥Åä¡£
Èκη¢Ë͵½Topic ExchangeµÄÏûÏ¢¶¼»á±»×ª·¢µ½ËùÓйØÐÄRouteKeyÖÐÖ¸¶¨»°ÌâµÄQueueÉÏ
1. ÕâÖÖģʽÐèÒªRouteKey£¬ÒªÌáǰ°ó¶¨ExchangeÓëQueue¡£
2. Èç¹ûExchangeûÓз¢ÏÖÄܹ»ÓëRouteKeyÆ¥ÅäµÄQueue£¬Ôò»áÅׯú´ËÏûÏ¢¡£
3. ÔÚ½øÐаó¶¨Ê±£¬ÒªÌṩһ¸ö¸Ã¶ÓÁйØÐĵÄÖ÷Ì⣬Èç¡°#.log.#¡±±íʾ¸Ã¶ÓÁйØÐÄËùÓÐÉæ¼°logµÄÏûÏ¢(Ò»¸öRouteKeyΪ¡±MQ.log.error¡±µÄÏûÏ¢»á±»×ª·¢µ½¸Ã¶ÓÁÐ)¡£
4. ¡°#¡±±íʾ0¸ö»òÈô¸É¸ö¹Ø¼ü×Ö£¬¡°*¡±±íʾһ¸ö¹Ø¼ü×Ö¡£Èç¡°log.*¡±ÄÜÓë¡°log.warn¡±Æ¥Å䣬ÎÞ·¨Óë¡°log.warn.timeout¡±Æ¥Å䣻µ«ÊÇ¡°log.#¡±ÄÜÓëÉÏÊöÁ½Õ߯¥Åä¡£
ͨÅä·û#ºÍ*µÄÇø±ð£»
#£º´ú±íÆ¥ÅäÒ»¸ö»ò¶à¸ö´Ê£»
*£º´ú±íֻƥÅäÒ»¸ö´Ê.
ÅäÖÃÀàÖÐÐÂÔö¶ÓÁаó¶¨TopicExchange½»»»»ú£¬²¢Ö¸¶¨routingKeyºÍÆ¥Åäģʽ£º
| @Bean
public Queue topicQueueA() { return new Queue("topic.queueA",
true); // true±íʾ³Ö¾Ã»¯¸Ã¶ÓÁÐ } @Bean public Queue topicQueueB() { return new Queue("topic.queueB",
true); } // ÉùÃ÷½»»¥Æ÷ @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } // °ó¶¨ @Bean public Binding bindingA() { return BindingBuilder.bind(topicQueueA()).to(topicExchange()) .with("topic.message"); } @Bean public Binding bindingB() { return BindingBuilder.bind(topicQueueB()).to(topicExchange()) .with("topic.#"); } |
ÏûÏ¢Éú²úÕߣº
| @Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---TopicSender : "
+ sendMsg);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.message",
sendMsg);
}
public void sendTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---TopicSender messages:
" +sendMsg);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages",
sendMsg);
}
} |
ÏûÏ¢Ïû·ÑÕߣº
| @Component
@RabbitListener(queues ="topic.queueA") public class TopicReceiver { @RabbitHandler public void precess(String msg) { System.out.println("TopicReceiverA :
" + msg); } } |
test²âÊÔÀࣺ
| @Autowired
private TopicSender topicSend; @Test public void topicTest() { System.out.println("==========topic·¢ËÍÏûÏ¢£¡"); topicSend.send("==========msg_info "); topicSend.sendTwo("==========msg_infoTwo
"); System.out.println("==========topic·¢ËÍÏûÏ¢
½áÊø£¡"); } |
ÖØÆôMqApplication£¬ÔËÐÐtestÀࣺ½á¹û£º


¸ù¾Ý·ÓɹæÔò£¬½ÓÊÕ²»Í¬Éú²úÕßµÄÏûÏ¢¡£
1.2.3.6 ½»»»»ú×ܽá
RPCģʽ¿ÉÒ԰ٶȲé×ÊÁÏÈ¥Á˽â!
FanoutExchange: ½«ÏûÏ¢·Ö·¢µ½ËùÓеİ󶨶ÓÁУ¬ÎÞroutingkeyµÄ¸ÅÄî
HeadersExchange £ºÍ¨¹ýÌí¼ÓÊôÐÔkey-valueÆ¥Åä
DirectExchange:°´ÕÕroutingkey·Ö·¢µ½Ö¸¶¨¶ÓÁÐ
TopicExchange:¶à¹Ø¼ü×ÖÆ¥Åä
1.2.4 ¹ÜÀí½çÃæ²Ù×÷¶ÓÁкͽ»»»»ú
½øÈëExchanges½»»»»ú½çÃæ£¬¿ÉÒÔ¿´µ½ËùÓеÄAMQPĬÈϵĽ»»»»úºÍ¶¨ÒåµÄExchange:

Ñ¡ÔñtopicExchange:

¿ÉÒÔ¶Ô¶ÓÁнøÐÐÌí¼ÓºÍ½â°ó²Ù×÷£¡
1.2.5 ¶ÓÁеij־û¯
RabbitMQµÄ¶ÓÁÐÓÐ2ÖÖ£¬Ò»ÖÖÊÇÄÚ´æ¶ÓÁУ¬Ò»ÖÖÊdz־û¯¶ÓÁÐ
1¡¢ ÄÚ´æ¶ÓÁÐ
Óŵ㣺Ëٶȿ죬ЧÂʸß
ȱµã£ºå´»ú£¬ÏûÏ¢¶ªÊ§
2¡¢ ³Ö¾Ã»¯¶ÓÁÐ
Óŵ㣺ÏûÏ¢¿ÉÒԳ־û¯±£´æ£¬å´»ú»ò¶ÏµçºóÏûÏ¢²»¶ªÊ§
ȱµã£º±ÈÄÚ´æ´æ´¢ËÙ¶ÈÂý£¬ÐÔÄܲî
ÉèÖ÷½·¨£º
| @Bean
public Queue topicQueueA() { return new Queue("topic.queueA",
true); // true±íʾ³Ö¾Ã»¯¸Ã¶ÓÁÐ } |
¹ÜÀí½çÃæ²é¿´ÊÇ·ñ³Ö¾Ã»¯£º

D ³Ö¾Ã»¯
ÏîĿԴÂ룬
ÂëÔÆµØÖ·£ºhttps://git.oschina.net/wyait/springboot1.5.4.git
githubµØÖ·£ºhttps://github.com/wyait/spring-boot-1.5.4.git
|