Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
rabbitMQÏûÏ¢¶ÓÁÐÔ­Àí
 
  6609  次浏览      36
 2019-9-10
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚ51CTO£¬±¾Õ¼òµ¥¶ÔMessage BrokerÓëAMQPµÄһЩÀíÂÛÌØÕ÷Óë´¦ÀíÁ÷³Ì×öÁ˽éÉÜ¡£

MQ:Message Queue£¬ÏûÏ¢¶ÓÁУ¬ÊÇÒ»ÖÖÓ¦ÓóÌÐò¶ÔÓ¦ÓóÌÐòµÄͨÐÅ·½·¨£»Ó¦ÓóÌÐòͨ¹ý¶Áд³öÈë¶ÓÁеÄÏûÏ¢£¨Õë¶ÔÓ¦ÓóÌÐòµÄÊý¾Ý£©À´Í¨ÐÅ£¬¶øÎÞÐèרÓÃÁ¬½ÓÀ´Á´½ÓËüÃÇ¡£

1 rabbitMQÈëÃż°Ô­Àí

rabbitMQ¹ÙÍø£ºhttp://www.rabbitmq.com/

Erlang¹ÙÍø£ºhttp://www.erlang.org/

1.1 rabbitMQ¸ÅÊö

RabbitMQÊÇÒ»¸öÓÉErlang¿ª·¢µÄAMQP£¨AdvancedMessage Queue £©µÄ¿ªÔ´ÊµÏÖ£¬Ö§³Ö¶àÖÖ¿Í»§¶Ë£¬È磺Python¡¢Ruby¡¢.NET¡¢Java¡¢JMS¡¢C¡¢PHP¡¢ActionScript¡¢XMPP¡¢STOMPµÈ£¬Ö§³ÖAJAX¡£ÓÃÓÚÔÚ·Ö²¼Ê½ÏµÍ³Öд洢ת·¢ÏûÏ¢£¬ÔÚÒ×ÓÃÐÔ¡¢À©Õ¹ÐÔ¡¢¸ß¿ÉÓÃÐԵȷ½Ãæ±íÏÖ²»Ëס£

½èÓÃÍøÂçÖÐÒ»¸örabbitMQµÄϵͳ¼Ü¹¹Í¼£º

1.1.1 AMQP¼ò½é

AMQP£¬¼´Advanced Message Queuing Protocol,Ò»¸öÌṩͳһÏûÏ¢·þÎñµÄÓ¦Óòã±ê×¼¸ß¼¶ÏûÏ¢¶ÓÁÐЭÒé,ÊÇÓ¦ÓòãЭÒéµÄÒ»¸ö¿ª·Å±ê×¼,ÎªÃæÏòÏûÏ¢µÄÖмä¼þÉè¼Æ¡£»ùÓÚ´ËЭÒéµÄ¿Í»§¶ËÓëÏûÏ¢Öмä¼þ¿É´«µÝÏûÏ¢£¬²¢²»Êܿͻ§¶Ë/Öмä¼þ²»Í¬²úÆ·£¬²»Í¬µÄ¿ª·¢ÓïÑÔµÈÌõ¼þµÄÏÞÖÆ¡£ErlangÖеÄʵÏÖÓÐ RabbitMQµÈ¡£--°Ù¶È°Ù¿Æ

Message BrokerÓëAMQP¼ò½é

Message BrokerÊÇÒ»ÖÖÏûÏ¢ÑéÖ¤¡¢´«Ê䡢·Óɵļܹ¹Ä£Ê½£¬ÆäÉè¼ÆÄ¿±êÖ÷ÒªÓ¦ÓÃÓÚÏÂÃæÕâЩ³¡¾°£º

ÏûϢ·Óɵ½Ò»¸ö»ò¶à¸öÄ¿µÄµØ

ÏûϢת»¯ÎªÆäËûµÄ±íÏÖ·½Ê½

Ö´ÐÐÏûÏ¢µÄ¾Û¼¯¡¢ÏûÏ¢µÄ·Ö½â£¬²¢½«½á¹û·¢Ë͵½ËûÃǵÄÄ¿µÄµØ£¬È»ºóÖØÐÂ×éºÏÏàÓ¦·µ»Ø¸øÏûÏ¢Óû§

µ÷ÓÃWeb·þÎñÀ´¼ìË÷Êý¾Ý

ÏìӦʼþ»ò´íÎó

ʹÓ÷¢²¼-¶©ÔÄģʽÀ´ÌṩÄÚÈÝ»ò»ùÓÚÖ÷ÌâµÄÏûϢ·ÓÉ

AMQPÊÇAdvanced Message QueuingProtocolµÄ¼ò³Æ£¬ËüÊÇÒ»¸öÃæÏòÏûÏ¢Öмä¼þµÄ¿ª·Åʽ±ê×¼Ó¦ÓòãЭÒé¡£AMQP¶¨ÒåÁËÕâÐ©ÌØÐÔ£º

ÏûÏ¢·½Ïò

ÏûÏ¢¶ÓÁÐ

ÏûϢ·ÓÉ£¨°üÀ¨£ºµãµ½µãºÍ·¢²¼-¶©ÔÄģʽ£©

¿É¿¿ÐÔ

°²È«ÐÔ

RabbitMQ¾ÍÊÇÒÔAMQPЭÒéʵÏÖµÄÒ»ÖÖÖмä¼þ²úÆ·£¬Ëü¿ÉÒÔÖ§³Ö¶àÖÖ²Ù×÷ϵͳ£¬¶àÖÖ±à³ÌÓïÑÔ£¬¼¸ºõ¿ÉÒÔ¸²¸ÇËùÓÐÖ÷Á÷µÄÆóÒµ¼¶¼¼Êõƽ̨¡£

1.1.1.1 AMQPÀíÂÛ

AMQPµÄÖ÷ÒªÌØÕ÷ÊÇÃæÏòÏûÏ¢¡¢¶ÓÁС¢Â·ÓÉ£¨°üÀ¨µã¶ÔµãºÍ·¢²¼/¶©ÔÄ£©¡¢¿É¿¿ÐÔ¡¢°²È«¡£

¼òµ¥½éÉÜAMQPµÄЭÒéÕ»£¬AMQPЭÒé±¾Éí°üº¬Èý²ã£¬ÈçÏ£º

Model Layer£¬Î»ÓÚЭÒé×î¸ß²ã£¬Ö÷Òª¶¨ÒåÁËһЩ¹©¿Í»§¶Ëµ÷ÓõÄÃüÁ¿Í»§¶Ë¿ÉÒÔͨ¹ýÕâЩÃüÁîʵÏÖ×Ô¼ºµÄÒµÎñÂß¼­£¬ÀýÈ磬¿Í»§¶Ë¿ÉÒÔͨ¹ýqueue declareÉùÃ÷Ò»¸ö¶ÓÁУ¬ÀûÓÃconsumeÃüÁî»ñÈ¡¶ÓÁеÄÏûÏ¢¡£

Session Layer£¬Ö÷Òª¸ºÔ𽫿ͻ§¶ËÃüÁî·¢Ë͸ø·þÎñÆ÷£¬ÔÚ½«·þÎñÆ÷¶ËµÄÓ¦´ð·µ»Ø¸ø¿Í»§¶Ë£¬Ö÷ҪΪ¿Í»§¶ËÓë·þÎñÆ÷Ö®¼äͨÐÅÌṩ¿É¿¿ÐÔ¡¢Í¬²½»úÖÆºÍ´íÎó´¦Àí¡£

Transport Layer£¬Ö÷Òª´«Êä¶þ½øÖÆÊý¾ÝÁ÷£¬Ìṩ֡µÄ´¦Àí¡¢ÐŵÀ¸´ÓᢴíÎó¼ì²âºÍÊý¾Ý±íʾ¡£

ÕâÖÖ·Ö²ã¼Ü¹¹ÀàËÆÓÚOSIÍøÂçЭÒ飬¿ÉÌæ»»¸÷²ãʵÏÖ¶ø²»Ó°ÏìÓëÆäËü²ãµÄ½»»¥¡£AMQP¶¨ÒåÁ˺ÏÊʵķþÎñÆ÷¶ËÓòÄ£ÐÍ£¬ÓÃÓڹ淶·þÎñÆ÷µÄÐÐΪ£¨AMQP·þÎñÆ÷¶Ë¿É³ÆÎªbroker£©¡£ÔÚÕâÀïModel²ã¾ö¶¨ÕâЩ»ù±¾ÓòÄ£ÐÍËù²úÉúµÄÐÐΪ£¬ÕâÖÖÐÐΪÔÚAMQPÖÐÓÃcommand±íʾ¡£Session²ã¶¨Òå¿Í»§¶ËÓëbrokerÖ®¼äµÄͨÐÅ£¨Í¨ÐÅË«·½¶¼ÊÇÒ»¸öpeer£¬¿É»¥³Æ×öpartner£©£¬ÎªcommandµÄ¿É¿¿´«ÊäÌṩ±£ÕÏ¡£Transport²ãרעÓÚÊý¾Ý´«ËÍ£¬²¢ÓëSession±£³Ö½»»¥£¬½ÓÊÜÉϲãµÄÊý¾Ý£¬×é×°³É¶þ½øÖÆÁ÷£¬´«Ë͵½receiverºóÔÙ½âÎöÊý¾Ý£¬½»¸¶¸øSession²ã¡£Session²ãÐèÒªTransport²ãÍê³ÉÍøÂçÒì³£Çé¿öµÄ»ã±¨£¬Ë³Ðò´«ËÍcommandµÈ¹¤×÷¡£

½ÓÏÂÀ´Á˽âÏÂAMQPµ±ÖеÄһЩ¸ÅÄî¡£

Broker£¨Server£©£º½ÓÊܿͻ§¶ËÁ¬½Ó£¬ÊµÏÖAMQPÏûÏ¢¶ÓÁкÍ·Óɹ¦ÄܵĽø³Ì¡£

Virtual Host£ºÆäʵÊÇÒ»¸öÐéÄâ¸ÅÄÀàËÆÓÚȨÏÞ¿ØÖÆ×飬һ¸öVirtual HostÀïÃæ¿ÉÒÔÓÐÈô¸É¸öExchangeºÍQueue£¬µ«ÊÇȨÏÞ¿ØÖƵÄ×îСÁ£¶ÈÊÇVirtual Host¡£

Exchange£º½ÓÊÜÉú²úÕß·¢Ë͵ÄÏûÏ¢£¬²¢¸ù¾ÝBinding¹æÔò½«ÏûϢ·Óɸø·þÎñÆ÷ÖеĶÓÁС£ExchangeType¾ö¶¨ÁËExchange·ÓÉÏûÏ¢µÄÐÐΪ£¬ÀýÈ磬ÔÚRabbitMQÖУ¬ExchangeTypeÓÐdirect¡¢FanoutºÍTopicÈýÖÖ£¬²»Í¬ÀàÐ͵ÄExchange·ÓɵÄÐÐΪÊDz»Ò»ÑùµÄ¡£

Message Queue£ºÏûÏ¢¶ÓÁУ¬ÓÃÓÚ´æ´¢»¹Î´±»Ïû·ÑÕßÏû·ÑµÄÏûÏ¢¡£

Message£ºÓÉHeaderºÍBody×é³É£¬HeaderÊÇÓÉÉú²úÕßÌí¼ÓµÄ¸÷ÖÖÊôÐԵļ¯ºÏ£¬°üÀ¨MessageÊÇ·ñ±»³Ö¾Ã»¯¡¢ÓÉÄĸöMessage Queue½ÓÊÜ¡¢ÓÅÏȼ¶ÊǶàÉٵȡ£¶øBodyÊÇÕæÕýÐèÒª´«ÊäµÄAPPÊý¾Ý¡£

Binding£ºBindingÁªÏµÁËExchangeÓëMessageQueue¡£ExchangeÔÚÓë¶à¸öMessageQueue·¢ÉúBindingºó»áÉú³ÉÒ»ÕÅ·ÓÉ±í£¬Â·ÓɱíÖд洢×ÅMessage QueueËùÐèÏûÏ¢µÄÏÞÖÆÌõ¼þ¼´Binding Key¡£µ±ExchangeÊÕµ½Messageʱ»á½âÎöÆäHeaderµÃµ½Routing Key£¬Exchange¸ù¾ÝRouting KeyÓëExchangeType½«Message·Óɵ½MessageQueue¡£Binding KeyÓÉConsumerÔÚBinding ExchangeÓëMessageQueueʱָ¶¨£¬¶øRouting KeyÓÉProducer·¢ËÍMessageʱָ¶¨£¬Á½Õߵį¥Å䷽ʽÓÉExchangeType¾ö¶¨¡£

Connection£ºÁ¬½Ó£¬¶ÔÓÚRabbitMQ¶øÑÔ£¬Æäʵ¾ÍÊÇÒ»¸öλÓÚ¿Í»§¶ËºÍBrokerÖ®¼äµÄTCPÁ¬½Ó¡£

Channel£ºÐŵÀ£¬½ö½ö´´½¨Á˿ͻ§¶Ëµ½BrokerÖ®¼äµÄÁ¬½Óºó£¬¿Í»§¶Ë»¹ÊDz»ÄÜ·¢ËÍÏûÏ¢µÄ¡£ÐèҪΪÿһ¸öConnection´´½¨Channel£¬AMQPЭÒ鹿¶¨Ö»ÓÐͨ¹ýChannel²ÅÄÜÖ´ÐÐAMQPµÄÃüÁî¡£Ò»¸öConnection¿ÉÒÔ°üº¬¶à¸öChannel¡£Ö®ËùÒÔÐèÒªChannel£¬ÊÇÒòΪTCPÁ¬½ÓµÄ½¨Á¢ºÍÊͷŶ¼ÊÇÊ®·Ö°º¹óµÄ£¬Èç¹ûÒ»¸ö¿Í»§¶Ëÿһ¸öÏ̶߳¼ÐèÒªÓëBroker½»»¥£¬Èç¹ûÿһ¸öÏ̶߳¼½¨Á¢Ò»¸öTCPÁ¬½Ó£¬ÔÝÇÒ²»¿¼ÂÇTCPÁ¬½ÓÊÇ·ñÀË·Ñ£¬¾ÍËã²Ù×÷ϵͳҲÎÞ·¨³ÐÊÜÿÃ뽨Á¢Èç´Ë¶àµÄTCPÁ¬½Ó¡£RabbitMQ½¨Òé¿Í»§¶ËÏß³ÌÖ®¼ä²»Òª¹²ÓÃChannel£¬ÖÁÉÙÒª±£Ö¤¹²ÓÃChannelµÄÏ̷߳¢ËÍÏûÏ¢±ØÐëÊÇ´®Ðе쬵«Êǽ¨Ò龡Á¿¹²ÓÃConnection¡£

Command£ºAMQPµÄÃüÁ¿Í»§¶Ëͨ¹ýCommandÍê³ÉÓëAMQP·þÎñÆ÷µÄ½»»¥À´ÊµÏÖ×ÔÉíµÄÂß¼­¡£ÀýÈçÔÚRabbitMQÖУ¬¿Í»§¶Ë¿ÉÒÔͨ¹ýpublishÃüÁî·¢ËÍÏûÏ¢£¬txSelect¿ªÆôÒ»¸öÊÂÎñ£¬txCommitÌá½»Ò»¸öÊÂÎñ¡£

ÏûÏ¢Öмä¼þµÄÖ÷Òª¹¦ÄÜÊÇÏûÏ¢µÄ·ÓÉ(Routing)ºÍ»º´æ(Buffering)¡£ÔÚAMQPÖÐÌṩÀàËÆ¹¦ÄܵÄÁ½ÖÖÓòÄ£ÐÍ£ºExchange ºÍ Messagequeue¡£

Exchange½ÓÊÕÏûÏ¢Éú²úÕß(MessageProducer)·¢Ë͵ÄÏûÏ¢¸ù¾Ý²»Í¬µÄ·ÓÉËã·¨½«ÏûÏ¢·¢ËÍÍùMessage queue¡£Messagequeue»áÔÚÏûÏ¢²»Äܱ»Õý³£Ïû·Ñʱ»º´æÕâЩÏûÏ¢£¬¾ßÌåµÄ»º´æ²ßÂÔÓÉʵÏÖÕß¾ö¶¨£¬µ±message queueÓëÏûÏ¢Ïû·ÑÕß(Messageconsumer)Ö®¼äµÄÁ¬½Óͨ³©Ê±£¬Message queueÓн«ÏûϢת·¢µ½consumerµÄÔðÈΡ£

Ò»¸öMessageµÄ´¦ÀíÁ÷³ÌÀàËÆÓÚÏÂͼ£º

MessageÊǵ±Ç°Ä£ÐÍÖÐËù²Ù×ݵĻù±¾µ¥Î»£¬ËüÓÉProducer²úÉú£¬¾­¹ýBroker±»ConsumerËùÏû·Ñ¡£ËüµÄ»ù±¾½á¹¹ÓÐÁ½²¿·Ö: HeaderºÍBody¡£HeaderÊÇÓÉProducerÌí¼ÓÉϵĸ÷ÖÖÊôÐԵļ¯ºÏ£¬ÕâЩÊôÐÔÓпØÖÆMessageÊÇ·ñ¿É±»»º´æ£¬½ÓÊÕµÄqueueÊÇÄĸö£¬ÓÅÏȼ¶ÊǶàÉٵȡ£BodyÊÇÕæÕýÐèÒª´«Ë͵ÄÊý¾Ý£¬ËüÊǶÔBroker²»¿É¼ûµÄ¶þ½øÖÆÊý¾ÝÁ÷£¬ÔÚ´«Êä¹ý³ÌÖв»Ó¦¸ÃÊܵ½Ó°Ïì¡£

Ò»¸öbrokerÖÐ»á´æÔÚ¶à¸öMessage queue£¬ExchangeÔõÑùÖªµÀËüÒª°ÑÏûÏ¢·¢Ë͵½ÄĸöMessage queueÖÐÈ¥ÄØ? Õâ¾ÍÊÇÉÏͼÖÐËùչʾBindingµÄ×÷Óá£MessagequeueµÄ´´½¨ÊÇÓÉclient application¿ØÖƵģ¬ÔÚ´´½¨Message queueºóÐèҪȷ¶¨ËüÀ´½ÓÊÕ²¢±£´æÄĸöExchange·ÓɵĽá¹û¡£BindingÊÇÓÃÀ´¹ØÁªExchangeÓëMessage queueµÄÓòÄ£ÐÍ¡£Clientapplication¿ØÖÆExchangeÓëij¸öÌØ¶¨Messagequeue¹ØÁª£¬²¢½«Õâ¸öqueue½ÓÊÜÄÄÖÖÏûÏ¢µÄÌõ¼þ°ó¶¨µ½Exchange£¬Õâ¸öÌõ¼þÒ²½ÐBinding key»òÊÇ Criteria¡£

ÔÚÓë¶à¸öMessagequeue¹ØÁªºó£¬ExchangeÖÐ¾Í»á´æÔÚÒ»¸ö·ÓÉ±í£¬Õâ¸ö±íÖд洢×Åÿ¸öMessage queueËùÐèÒªÏûÏ¢µÄÏÞÖÆÌõ¼þ¡£Exchange¾Í»á¼ì²éËü½ÓÊܵ½µÄÿ¸öMessageµÄHeader¼°BodyÐÅÏ¢£¬À´¾ö¶¨½«Message·Óɵ½ÄĸöqueueÖÐÈ¥¡£MessageµÄHeaderÖÐÓ¦¸ÃÓиöÊôÐÔ½ÐRouting Key£¬ËüÓÉMessage·¢ËÍÕß²úÉú£¬Ìṩ¸øExchange·ÓÉÕâÌõMessageµÄ±ê×¼¡£Exchange¸ù¾Ý²»Í¬Â·ÓÉËã·¨Óв»Í¬ÓÐExchangeType¡£±ÈÈçÓÐDirectÀàËÆ£¬ÐèÒªBindingkeyµÈÓÚRouting key£»Ò²ÓÐBindingkeyÓëRouting key·ûºÏÒ»¸öģʽ¹ØÏµ£»Ò²Óиù¾ÝMessage°üº¬µÄijЩÊôÐÔÀ´Åжϡ£Ò»Ð©»ù´¡µÄ·ÓÉËã·¨ÓÉAMQPËùÌṩ£¬clientapplicationÒ²¿ÉÒÔ×Ô¶¨Òå¸÷ÖÖ×Ô¼ºµÄÀ©Õ¹Â·ÓÉËã·¨¡£

ÔÚAMQPÖУ¬Client applicationÏëÒªÓëBroker¹µÍ¨£¬¾ÍÐèÒª½¨Á¢ÆðÓëBrokerµÄconnection£¬ÕâÖÖconnectionÆäʵÊÇÓëVirtual HostÏà¹ØÁªµÄ£¬Ò²¾ÍÊÇ˵£¬connectionÊǽ¨Á¢ÔÚclientÓëVirtual HostÖ®¼ä¡£¿ÉÒÔÔÚÒ»¸öconnectionÉϲ¢·¢ÔËÐжà¸öchannel£¬Ã¿¸öchannelÖ´ÐÐÓëBrokerµÄͨÐÅ£¬ÎÒÃÇÇ°ÃæÌṩµÄsession¾ÍÊÇÒÀ¸½ÓÚchannelÉϵġ£

ÕâÀïµÄSession¿ÉÒÔÓжàÖÖ¶¨Ò壬¼È¿ÉÒÔ±íʾAMQPÄÚ²¿ÌṩµÄcommand·Ö·¢»úÖÆ£¬Ò²¿ÉÒÔ˵ÊÇÔÚºê¹ÛÉÏÇø±ðÓëÓòÄ£Ð͵Ľӿڡ£Õý³£Àí½â¾ÍÊÇÎÒÃÇÆ½Ê±Ëù˵µÄ½»»¥context£¬Ö÷Òª×÷ÓþÍÊÇÔÚÍøÂçÉϿɿ¿µØ´«µÝÿһ¸öcommand¡£ÔÚAMQPµÄÉè¼ÆÖУ¬Ó¦µ±ÊÇ½è¼øÁËTCPµÄ¸÷ÖÖÉè¼Æ£¬ÓÃÓÚ±£Ö¤ÕâÖÖ¿É¿¿ÐÔ¡£

ÔÚSession²ã£¬ÎªÉϲãËùÐèÒª½»»¥µÄÿ¸öcommand·ÖÅäÒ»¸öΩһ±êʶ·û(¿ÉÒÔÊÇÒ»¸öUUID)£¬ÊÇΪÁËÔÚ´«Êä¹ý³ÌÖпÉÒÔ¶Ôcommand×öУÑéºÍÖØ´«¡£Command·¢ËͶËÒ²ÐèÒª¼Ç¼ÿ¸ö·¢ËͳöÈ¥µÄcommandµ½ReplayBuffer£¬ÒÔÆÚµÃµ½½ÓÊÕ·½µÄ»ØÀ¡£¬±£Ö¤Õâ¸öcommand±»½ÓÊÕ·½Ã÷È·µØ½ÓÊÕ»òÊÇÒÑÖ´ÐÐÕâ¸öcommand¡£¶ÔÓÚ³¬Ê±Ã»ÓÐÊÕµ½·´À¡µÄcommand£¬·¢ËÍ·½ÔÙ´ÎÖØ´«¡£Èç¹û½ÓÊÕ·½ÒÑÃ÷È·µØ»ØÀ¡ÐÅÏ¢ÏëÒª¸æÖªcommand·¢ËÍ·½µ«ÕâÌõÐÅÏ¢ÔÚÖÐ;¶ªÊ§»òÊÇÆäËüÎÊÌâ·¢ËÍ·½Ã»ÓÐÊÕµ½£¬ÄÇô·¢ËÍ·½²»¶ÏÖØ´«»á¶Ô½ÓÊÕ·½²úÉúÓ°Ï죬ΪÁ˽µµÍÕâÖÖÓ°Ï죬command½ÓÊÕ·½ÉèÖÃÒ»¸ö¹ýÂËÆ÷IdempotencyBarrier£¬À´À¹½ØÄÇЩÒѽÓÊÕ¹ýµÄcommand¡£¹ØÓÚÕâÖÖÖØ´«¼°È·ÈÏ»úÖÆ£¬¿ÉÒԲο¼ÏÂTCPµÄÏà¹ØÉè¼Æ¡£

1.1.2 Erlang¼ò½é

Erlang([':l])ÊÇÒ»ÖÖͨÓõÄÃæÏò²¢·¢µÄ±à³ÌÓïÑÔ£¬ËüÓÉÈðµäµçÐÅÉè±¸ÖÆÔìḚ́®Á¢ÐÅËùϽµÄCS-Lab¿ª·¢£¬Ä¿µÄÊÇ´´ÔìÒ»ÖÖ¿ÉÒÔÓ¦¶Ô´ó¹æÄ£²¢·¢»î¶¯µÄ±à³ÌÓïÑÔºÍÔËÐл·¾³¡£ErlangÎÊÊÀÓÚ1987Ä꣬¾­¹ýÊ®ÄêµÄ·¢Õ¹£¬ÓÚ1998Äê·¢²¼¿ªÔ´°æ±¾¡£ErlangÊÇÔËÐÐÓÚÐéÄâ»úµÄ½âÊÍÐÔÓïÑÔ£¬µ«ÊÇÏÖÔÚÒ²°üº¬ÓÐÎÚÆÕÈøÀ­´óѧ¸ßÐÔÄÜErlang¼Æ»®£¨HiPE£©¿ª·¢µÄ±¾µØ´úÂë±àÒëÆ÷£¬×ÔR11B-4°æ±¾¿ªÊ¼£¬ErlangÒ²¿ªÊ¼Ö§³Ö½Å±¾Ê½½âÊÍÆ÷¡£ÔÚ±à³Ì·¶ÐÍÉÏ£¬ErlangÊôÓÚ¶àÖØ·¶Ðͱà³ÌÓïÑÔ£¬º­¸Çº¯Êýʽ¡¢²¢·¢Ê½¼°·Ö²¼Ê½¡£Ë³ÐòÖ´ÐеÄErlangÊÇÒ»¸ö¼°ÔçÇóÖµ, µ¥´Î¸³ÖµºÍ¶¯Ì¬ÀàÐ͵ĺ¯Êýʽ±à³ÌÓïÑÔ¡£

ErlangÊÇÒ»¸ö½á¹¹»¯£¬¶¯Ì¬ÀàÐͱà³ÌÓïÑÔ£¬ÄÚ½¨²¢ÐмÆËãÖ§³Ö¡£×î³õÊÇÓɰ®Á¢ÐÅרÃÅΪͨÐÅÓ¦ÓÃÉè¼ÆµÄ£¬±ÈÈç¿ØÖÆ½»»»»ú»òÕ߱任ЭÒéµÈ£¬Òò´Ë·Ç³£ÊʺÏÓÚ¹¹½¨·Ö²¼Ê½£¬ÊµÊ±Èí²¢ÐмÆËãϵͳ¡£Ê¹ÓÃErlang±àд³öµÄÓ¦ÓÃÔËÐÐʱͨ³£ÓɳÉǧÉÏÍò¸öÇáÁ¿¼¶½ø³Ì×é³É£¬²¢Í¨¹ýÏûÏ¢´«µÝÏ໥ͨѶ¡£½ø³Ì¼äÉÏÏÂÎÄÇл»¶ÔÓÚErlangÀ´Ëµ½ö½öÖ»ÊÇÒ»Á½¸ö»·½Ú£¬±ÈÆðC³ÌÐòµÄÏß³ÌÇл»Òª¸ßЧµÃ¶àµÃ¶àÁË¡£

ʹÓÃErlangÀ´±àд·Ö²¼Ê½Ó¦ÓÃÒª¼òµ¥µÄ¶à£¬ÒòΪËüµÄ·Ö²¼Ê½»úÖÆÊÇ͸Ã÷µÄ£º¶ÔÓÚ³ÌÐòÀ´Ëµ²¢²»ÖªµÀ×Ô¼ºÊÇÔÚ·Ö²¼Ê½ÔËÐС£ErlangÔËÐÐʱ»·¾³ÊÇÒ»¸öÐéÄâ»ú£¬ÓеãÏñJavaÐéÄâ»ú£¬ÕâÑù´úÂëÒ»¾­±àÒ룬ͬÑù¿ÉÒÔËæ´¦ÔËÐС£ËüµÄÔËÐÐʱϵͳÉõÖÁÔÊÐí´úÂëÔÚ²»±»ÖжϵÄÇé¿öϸüС£ÁíÍâÈç¹ûÐèÒª¸ü¸ßЧµÄ»°£¬×Ö½Ú´úÂëÒ²¿ÉÒÔ±àÒë³É±¾µØ´úÂëÔËÐС£

ÆäËûMQ

1.1.3 ÏÂÔØrabbitMQ¡¢Erlang

Ê×Ò³£¬ÏÂÀ­ÖÁ£ºdownloadÏÂÔØ£¬Tutorials½Ì³Ì

ÏÂÔØwindows°æ±¾£º

rabbitmq-server-3.6.12.exe

½Ì³ÌRabbitMQ Tutorials£º

http://www.erlang.org/·ÃÎʱȽÏÂý£¬½¨Òé´ó¼ÒÒ²¿ÉÒÔÍøÉÏÕÒ×ÊÔ´ÏÂÔØ¡£

1.1.4 rabbitMQ»ù±¾¸ÅÄî

spring-boot-rabbitMQÏîĿԴÂ룺https://git.oschina.net/wyait/springboot1.5.4.git

configÅäÖÃÀࣺ

@Autowired

private ConnectionFactoryconnectionFactory;
@Autowired
private Queue3Listenerqueue3Listener;

@Bean
@Primary
public RabbitTemplaterabbitTemplate() {
RabbitTemplate rabbitTemplate = newRabbitTemplate
(connectionFactory);
rabbitTemplate.setMessageConverter(newJackson2
JsonMessageConverter());
return rabbitTemplate;
}

@Bean
@Primary
publicSimpleRabbitListenerContainerFactory
simpleRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactorysimpleRabb
itListenerContainerFactory = newSimpleRabbitLi
stenerContainerFactory();
simpleRabbitListenerContainerFactory.set
ConnectionFactory
(connectionFactory);
simpleRabbitListenerContainerFactory.set
MessageConverter
(newJackson2JsonMessageConverter());
returnsimpleRabbitListenerContainerFactory;
}

@Bean
publicSimpleMessageListenerContainer
simpleMessageListenerContainer() {
SimpleMessageListenerContainer container =new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue3());
container.setMessageListener(queue3Listener);
return container;
}

@Bean
public DirectExchangedirectExchange() {
return new DirectExchange(EX_CHANGE_NAME1);
}

@Bean
public Queue queue1() {
return new Queue(QUEUE1, true);
}

@Bean
public Queue queue2() {
return new Queue(QUEUE2, true);
}

@Bean
public Queue queue3() {
return new Queue(QUEUE3, true);
}

@Bean
public Binding binding1() {
returnBindingBuilder.bind(queue1()).
to(directExchange())
.with(ROUTING_KEY1);
}

@Bean
public Binding binding2() {
returnBindingBuilder.bind(queue2()).
to(directExchange())
.with(ROUTING_KEY2);

}

@Bean
public Binding binding3() {
return BindingBuilder.bind(queue3()).
to(directExchange())
.with(ROUTING_KEY3);
}

»ù±¾¸ÅÄ

ConnectionFactory¡¢Connection¡¢Channel

connectionΪsocketÁ¬½ÓµÄ·â×°£¬connectionFqactoryÊÇconnectionµÄÉú²ú¹¤³Ì£¬channelÊÇͨÐŵÄÐŵÀ£¬Êµ¼Ê½øÐÐÊý¾Ý½»Á÷µÄ¹ÜµÀ£¬ÒòΪ½¨Á¢connectionµÄ¿ªÏúÃ÷ÏÔÒª±È½¨Á¢channelÒª´óºÜ¶à£¬ËùÒÔÊý¾Ý´«ÊäÕæÊµ·¢ÉúÔÚchannelÄÚ

Exchange,Queue

exchangeÊÇ¿ÉÒÔÀí½â³ÉÒ»ÌõÌØÊâµÄ´«ÊäͨµÀ£¬Ëû»á°ÑÏûϢͶµÝµ½°ó¶¨µÄÏûÏ¢³ØÄÚ¡£

queue¾ÍÊÇÏûÏ¢³ØÁË£¬Ê¹ÓÃǰÐèÒª°ó¶¨exchange£¬ÒÔ¼°×Ô¼ºµÄ±êÖ¾¡£

exchange_key,routing_key

exchange_key¾ö¶¨ÁËpublisherµÄÏûϢͶµÝµ½ÄÄÌõͨµÀ£¬routing_key¾ö¶¨Á˽«ÏûÏ¢·Åµ½Äĸö³Ø×ÓÀï

°ó¶¨

queueÒª½ÓÊÜÏûÏ¢±ØÐëÓëexchange½øÐа󶨣¬²¢Ôڰ󶨵Äʱºò¸ø×Ô¼ºÓëexchangeµÄ°ó¶¨ÉèÖÃÒ»¸ö±ê¼Çrouting_key£¬ÒÔºóÓÃÀ´Æ¥ÅäÏûÏ¢½ÓÊÕ

exchangeÓëqueueÊÇÒ»¶Ô¶àµÄ¹ØÏµ£¬¸ù¾Ýexchange²»Í¬ÀàÐÍ£¬·Ö±ðͶµÝµ½²»Í¬µÄÏûÏ¢³Ø

ÏÂÃæÀ´¿´¿´exchangeµÄÀàÐÍ

1. fanout

Ö±½Ó½«ÏûÏ¢·¢Ë͵½Óë¸Ãexchange°ó¶¨µÄËùÓÐqueueÄÚ

2. direct

¶Ôrouting_key½øÐÐÑϸñÆ¥Å䣬µ±ÏûÏ¢À´µ½µÄʱºò£¬Ö»ÓÐexchangeÓëijqueue°ó¶¨µÄrouting_keyÍêȫƥÅä²Å½«ÏûϢͶµÝµ½¸Ãqueue

3. topic

ÓÃÌØÊâ·ûºÅ½øÐÐÆ¥Å䣬Âú×ãÌõ¼þµÄqueue¶¼ÄÜÊÕµ½ÏûÏ¢£¬ÕâÀïµÄrouting_keyÒÔ"."·Ö¸ô£¬*Æ¥ÅäÒ»¸öµ¥´Ê£¬#Æ¥Åä¶à¸öµ¥´Ê£¬Èç¹ûÂú×ã¶à¸öÌõ¼þÒ²²»»áͶµÝ¶à´Î

4. headers

²»ÒÀÀµrouting_keyÆ¥Å䣬¸ù¾ÝÏûÏ¢ÌåÄÚµÄheadersÊôÐÔÆ¥Å䣬°ó¶¨µÄʱºò¿ÉÒÔÖÆ¶¨¼üÖµ¶Ô

½ÓÏÂÀ´À´¿´¿´ÅäÖÃÎļþ

1.@BeanͳһעÈëµ½ÈÝÆ÷ÖУ¬ÎÒÃÇÉùÃ÷ÁËconnectionfactory£¬Ëû»á×Ô¶¯¸ù¾ÝapplicationÀïÃæµÄÊôÐÔ½øÐÐ×é×°£¬Õâ¸öÁ¬½Ó¶ÔÓÚºóÃæµÄÈÝÆ÷¶¼ÊÇÒªÓõ½µÄ£¬ÕâÀïҪעÒâconverterµÄÉèÖã¬ÒòΪÎÒÃÇÒª½«pojoÀàÐͽøÐд«Ê䣬һ°ã³ÌÐòÍâµÄ´«Êä¶¼Êǽ¨Á¢ÔÚ×Ö½ÚÁ÷µÄ»ù´¡ÉÏ£¬converter¾Í»á×Ô¶¯×ª»»

2.½ÓÏÂÀ´ÎÒÃÇÉùÃ÷queue£¬trueÊôÐÔÉèÖÃΪ³Ö¾ÃÐ͵ijØ×Ó£¬µ±Á¬½Ó¶Ï¿ªÊ±£¬ÏûÏ¢»áß±£Áô£¬È»ºóÉùÃ÷exchange£¬ÕâÀïÎÒÃÇʹÓõÄÊÇdirectexchange£¬½ÓÏÂÀ´½«Á½Õ߰󶍯ðÀ´

5. ÉùÃ÷SimpleMessageListenerContainer£¬SimpleRabbitListenerContainerFactory×¢ÒâÕâÀïÉùÃ÷Á½¸öÊÇÒòΪÕâÊÇÏûÏ¢¼àÌýµÄÁ½ÖÖ·½Ê½

Ê×ÏȽ²½²SimpleMessageListenerContainer£¬Õâ¸öÐèÒªÉèÖÃÈ·ÈÏ·½Ê½£¬Óн϶àÊôÐÔ¿ËÉèÖã¬ÓÐÐËȤ¿É×ÔÐÐÉèÖã¬ÕâÀïÎÒÖ»ÊǼòµ¥µÄÉèÖÃÁËһϣ¬È»ºóÒªÉèÖÃlistener£¬

listenerÐèҪʵÏÖChannelAwareMessageListenerÀïÃæÓÐ

public void onMessage(Message message,Channel channel) µÄÖØÔØ·½·¨ÐèҪʵÏÖ£¬ÏûÏ¢ÌåÔÚMessageµÄbodyÄÚ£¬Ïà¶ÔÀ´ËµÐÅÏ¢±È½ÏÍ걸

½ÓÏÂÀ´¿´¿´SimpleRabbitListenerContainerFactory£¬Õâ¸öÓм¸¸ö×¢Òâµã£¬ÐèÒªÔÙ´ÎÉèÖÃconverterÒòΪ£¬Ò»¸öÊÇ·¢ÏûÏ¢µÄʱºò½âÎö³É¶þ½øÖÆ£¬Õâ¸öÔòÊǽ«¶þ½øÖƽâÎö³É¾ßÌåµÄÀ࣬»Øµ÷Ïà¶Ô¼òµ¥Ò»µã

@Component

@RabbitListener(queues =RabbitMQConfig.QUEUE1, containerFactory ="simpleRabbitListenerContainerFactory")
public class Queue1Listener {
private static Logger logger =LoggerFactory.getLogger(Queue1Listener.class);
@RabbitHandler
public void receive(@Payload String s) {

logger.info("listener1 info: " +s);

}
}

¼ÇµÃÐèÒªcontainerFactory¾ßÌåд³öÀ´

ÔÚ½ÓÊÕÏûÏ¢µÄ·½·¨ÉÏд@RabbitHandler£¬ÏûÏ¢Ìå´òÉÏ@payload¾Ã¿ÉÒÔ½ÓÊÜÏûÏ¢ÁË¡£

Æäʵ»¹Óиö·½·¨¾ÍÊÇÖ¸¶¨Ò»¸öMessageAdapter,È»ºóÔÚcontainerÀïÃæ¾Í¿ÉÒÔÖ¸¶¨½ÓÊյķ½·¨Ãû£¬²»ÊǺÜÍÆ¼ö£¬Ã÷ÎÄ·´Éä×ܸоõÈÝÒ׳öÎÊÌâ

µ±È»publisherÒ²ÊÇÓÐÏûÏ¢µÄ»Øµ÷µÄ

RabbitTemplateÏÂÓÐConfirmCallbackʵÏÖconfirm·½·¨¾ÍºÃÁË

1.2 rabbitMQÈëÃÅ

1.2.1 °²×°rabbitMQ£¨windows£©

°²×°²½Ö裺

1. °²×°Erland£¬Í¨¹ý¹Ù·½ÏÂÔØÒ³Ãæhttp://www.erlang.org/downloads»ñÈ¡exe°²×°°ü£¬Ö±½Ó´ò¿ª²¢Íê³É°²×°¡£

2. °²×°RabbitMQ£¬Í¨¹ý¹Ù·½ÏÂÔØÒ³Ãæhttps://www.rabbitmq.com/download.html»ñÈ¡exe°²×°°ü¡£

3. ÏÂÔØÍê³Éºó£¬¿ÉÒÔÖ±½ÓÔËÐа²×°³ÌÐò£¬»òÅäÖû·¾³±äÁ¿ºóÔËÐÐrabbitMQ-server°²×°³ÌÐò¡£

4. RabbitMQ Server°²×°Íê³ÉÖ®ºó£¬»á×Ô¶¯µÄ×¢²áΪ·þÎñ£¬²¢ÒÔĬÈÏÅäÖÃÆô¶¯ÆðÀ´¡£

°²×°¹ý³ÌÇë°Ù¶È

°²×°³É¹¦£º·ÃÎÊ£ºhttp://127.0.0.1:15672 Óû§ÃÜÂ룺guest/guest

ÎÒÃÇ¿ÉÒÔ¿´µ½Ò»Ð©»ù±¾¸ÅÄ±ÈÈ磺Connections¡¢Channels¡¢Exchanges¡¢QueueµÈ¡£µÚÒ»´ÎʹÓ㬿ÉÒÔ¶¼µã¿ª¿´¿´¶¼ÓÐЩʲôÄÚÈÝ£¬ÊìϤһÏÂRabbitMQ ServerµÄ·þÎñ¶Ë¡£

µã»÷Admin±êÇ©£¬ÔÚÕâÀï¿ÉÒÔ½øÐÐÓû§µÄ¹ÜÀí¡£

µã»÷admin£¬Ìí¼ÓÓû§£ºwyait/wyait²¢ÊÚȨ¡£

µã»÷all users±íµ¥ÖеÄÓû§Ãû¡°wyait¡±½øÐÐÊÚȨ£º

1.2.1.1 Virtual HostsÉèÖýçÃæ£º

³ÌÐòÖкÍrabbitMQ½»»¥µÄ¶Ë¿ÚÊÇ£º5672£¬AMQPЭÒé¶Ë¿Ú

1.2.2 ´´½¨spring-boot-MQ¹¤³Ì

ÏîĿԴÂ룬

ÂëÔÆµØÖ·£ºhttps://git.oschina.net/wyait/springboot1.5.4.git

githubµØÖ·£ºhttps://github.com/wyait/spring-boot-1.5.4.git

spring bootÕûºÏrabbitMQÏîÄ¿²©¿ÍÁ´½Ó£ºspring boot 1.5.4 ÕûºÏrabbitMQ£¨Ê®Æß£©

1.2.3 ÏûÏ¢¶ÓÁÐ

¹ÙÍø£ºrabbitMQTutorials

ǰÌᣬrabbitMQ·þÎñÒѾ­Æô¶¯£»²âÊÔ¹ý³Ì£º

1£¬spring BootÏîÄ¿ÏÈÆô¶¯£¬¼àÌý¶ÓÁУ»

2£¬Æô¶¯²âÊÔÀà·¢ËÍÏûÏ¢µ½¶ÓÁÐÖУ»¡¢

3£¬Ïû·ÑÕßÏû·ÑÏûÏ¢¡£

1.2.3.1 ¡°hello world¡±

The simplest thingthat does something ¼òµ¥µÄÏûÏ¢¶ÓÁУº

P£ºÏûÏ¢µÄÉú²úÕߣ»

C£ºÏûÏ¢µÄÏû·ÑÕߣ»

ºìÉ«¿ò£ºÏûÏ¢¶ÓÁУ»

demo²Î¿¼1.2.2Õ½ڡ£

1.2.3.2 Work Queues

Distributing tasksamong workers (the competing consumers pattern)

Ò»¸öÉú²úÕß¶ÔÓ¦Ò»¸öÏûÏ¢¶ÓÁÐMQ£¬MQ¿ÉÒÔ¶ÔÓ¦¶à¸öÏû·ÑÕߣ¬µ«ÊÇͬһ¸öÏûÏ¢Ö»Äܱ»Ò»¸ö¿Í»§¶ËÉú²úÕßËù»ñÈ¡;

ͬһ¸öÏûÏ¢Ö»Äܱ»Ò»¸ö¿Í»§¶ËËù»ñÈ¡¡£µ«ÊǶÔÓÚ²»Í¬µÄÏû·ÑÕߣ¬½ÓÊÜÏûÏ¢£¬´¦ÀíµÄЧÂʲ»Í¬£¬ËùÒÔ»áÓв»ºÏÀíµÄµØ·½¡£

ÔÚRabbitMqConfigÖж¨ÒåÒ»¸ö¶ÓÁÐworkQueues£º

@Bean

public Queue workQueue() {
return new Queue("workQueues");
}

ÏûÏ¢Éú²úÕßWorkSender:

@Component

public class WorkSender {
@Autowired
private AmqpTemplate rabbitMQTemplate;

/**
*
* @ÃèÊö£ºworkģʽ
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ14ÈÕÏÂÎç5:51:20
*/
public void workSend(String msg) {
String context = msg + new Date();
System.out.println("workSender : " + context);
this.rabbitMQTemplate.convertAndSend("workQueues",
context);
}
}

ÏûÏ¢Ïû·ÑÕß1 WorkReceiver:

@Component

@RabbitListener(queues ="workQueues")
public class WorkReceiver {

@RabbitHandler
// handler×¢½âÀ´Ö¸¶¨¶ÔÏûÏ¢µÄ´¦Àí·½·¨
public void process(String hello) {
System.out.println("workReceiver:" + hello);
}
}

ÏûÏ¢Ïû·ÑÕß2 WorkReceiverTwo:

@Component

@RabbitListener(queues ="workQueues")
public class WorkReceiverTwo {

@RabbitHandler
// handler×¢½âÀ´Ö¸¶¨¶ÔÏûÏ¢µÄ´¦Àí·½·¨
public void process(String hello) {
System.out.println("workReceiverTwo:" + hello);
}
}

²âÊÔÏû·ÑÏûÏ¢½á¹û£º

ƽ¾ù·ÖÅäÏûÏ¢Ô­Ôò£¨ÄãÒ»Ìõ£¬ÎÒÒ»Ìõ£©¡£

¿Éͨ¹ý¸ü¸ÄchannelÉèÖ㬸ıä·ÖÅä²ßÂÔ¡£

1.2.3.3 Publish/Subscribe¶©ÔÄģʽ

Sending messagesto many consumers at once.

Ò»¸öÉú²úÕß½«Í¬Ò»ÌõÏûÏ¢message·¢Ë͵½½»»»»úexchange£¬Í¨¹ýexchange·¢Ë͵½¶à¸ö¶ÓÁÐÖУ¬¶ø¶ÔÓ¦µÄÏû·ÑÕß¶¼ÄÜ»ñÈ¡µ½¸ÃÏûÏ¢¡£

×¢Ò⣺

ÎÊÌâ1£ºÏûÏ¢ÊÇ·¢Ë͵½½»»»»ú¶ø²»ÊǶÓÁУ¿´ð£ºÏûÏ¢¿ÉÒÔ·¢Ë͵½¶ÓÁУ¬Ò²¿ÉÒÔ·¢Ë͵½½»»»»ú¡£

ÎÊÌâ2£ºÏû·ÑÕßµÄÏûÏ¢À´Ô´Ö»ÄÜÊǶÓÁУ»

ÎÊÌâ3£ºÈç¹û½«ÏûÏ¢·¢Ë͵½Ã»Óа󶨶ÓÁеĽ»»»»úÉÏ£¬ÏûÏ¢»áÈ¥ÄÄ£¿´ð£ºÏûÏ¢¶ªÊ§¡£

×ܽ᣺ÏûÏ¢Ö»ÄÜ´æ·ÅÓÚ¶ÓÁУ¬²»ÄÜ´æ·ÅÔÚ½»»»»ú£»½»»»»úÖ»ÄÜÓÃÓÚÏûÏ¢µÄ´«µÝ£¬ÏûϢͨµÀ¡£

Fanout Exchange:

²»´¦Àí·Óɼü(routingKey)¡£ÄãÖ»ÐèÒª¼òµ¥µÄ½«¶ÓÁа󶨵½½»»»»úÉÏ¡£Ò»¸ö·¢Ë͵½½»»»»úµÄÏûÏ¢¶¼»á±»×ª·¢µ½Óë¸Ã½»»»»ú°ó¶¨µÄËùÓжÓÁÐÉÏ¡£ºÜÏñ×ÓÍø¹ã²¥£¬Ã¿Ì¨×ÓÍøÄÚµÄÖ÷»ú¶¼»ñµÃÁËÒ»·Ý¸´ÖƵÄÏûÏ¢¡£Fanout½»»»»úת·¢ÏûÏ¢ÊÇ×î¿ìµÄ¡£

Fanout ¾ÍÊÇÎÒÃÇÊìϤµÄ¹ã²¥Ä£Ê½»òÕß¶©ÔÄģʽ£¬¸øFanoutת·¢Æ÷·¢ËÍÏûÏ¢£¬°ó¶¨ÁËÕâ¸öת·¢Æ÷µÄËùÓжÓÁж¼ÊÕµ½Õâ¸öÏûÏ¢¡£

ÕâÀïʹÓÃÈý¸ö¶ÓÁÐÀ´²âÊÔ£¨Ò²¾ÍÊÇÔÚApplicationÀàÖд´½¨ºÍ°ó¶¨µÄfanout.A¡¢fanout.B¡¢fanout.C£©ÕâÈý¸ö¶ÓÁж¼ºÍApplicationÖд´½¨µÄfanoutExchangeת·¢Æ÷°ó¶¨¡£

ÐÂÔösubscribe¶©ÔÄģʽÅäÖãº

/**

*
* @ÃèÊö£ºsubscribe¶©ÔÄģʽµÄ¶ÓÁÐA;
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31
* @return
*/
@Bean
public Queue subscribeQueueA() {
return new Queue("fanout.A");
}

/**
*
* @ÃèÊö£ºsubscribe¶©ÔÄģʽµÄ¶ÓÁÐB;
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31
* @return
*/
@Bean
public Queue subscribeQueueB() {
return new Queue("fanout.B");
}

/**
*
* @ÃèÊö£ºsubscribe¶©ÔÄģʽµÄ¶ÓÁÐC;
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31
* @return
*/
@Bean
public Queue subscribeQueueC() {
return new Queue("fanout.C");
}

/**
*
* @ÃèÊö£ºfanoutExchange½»»»»ú
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:34:41
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}

/**
*
* @ÃèÊö£ºsubscribeQueue°ó¶¨fanoutExchange½»»»»ú
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:41:10
* @param subscribeQueue
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeA(Queue subscribeQueueA,
FanoutExchange fanoutExchange) {
// °ó¶¨¶ÓÁÐAµ½fanoutExchange½»»»»ú£¬Ò²¿ÉÒÔʹÓãºbind(subscribeQueueA())½øÐаó¶¨;
return BindingBuilder.bind(subscribeQueueA).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue subscribeQueueB,
FanoutExchange fanoutExchange) {
return BindingBuilder.bind(subscribeQueueB).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue subscribeQueueC,
FanoutExchange fanoutExchange) {
return BindingBuilder.bind(subscribeQueueC).to(fanoutExchange);
}

ÏûÏ¢Éú²úÕßSubscribeSenderÖ¸¶¨½»»»»ú£º

@Component

public class SubscribeSender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---SubscribeSender : "
+sendMsg);
// convertAndSend(String exchange, String
routingKey, Objectmessage)
this.rabbitTemplate.convertAndSend("fanout
Exchange",
"aaa", sendMsg);
}

}

ÏûÏ¢Ïû·ÑÕßSubscribeReveicerA¡¢B¡¢C¼àÌý¶ÓÁÐfanout.A/B/C:

@Component

@RabbitListener(queues ="fanout.A")
public class SubscribeReceiver{
@RabbitHandler
public void precess(String msg) {
System.out.println("SubscribeReceiverA : " + msg);
}
}

²âÊÔtestÀࣺ

@Autowired

private SubscribeSender subSend;

@Test
public void subscribeTest() {
System.out.println("==========subscribe·¢ËÍÏûÏ¢£¡");
for (int i = 0; i < 50; i++) {
String msg = "==========msg_" + i;
subSend.send(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Èý¸öÏû·ÑÕß¶¼½ÓÊÕµ½ÁËÿһÌõÐÅÏ¢¡£

×¢Ò⣺subscribe¶©ÔÄģʽºÍworkģʽµÄÇø±ð¡£

1¡¢ workģʽ½«ÏûÏ¢·¢Ë͵½¶ÓÁÐ

2¡¢ ¶©ÔÄģʽ½«ÏûÏ¢·¢Ë͵½½»»»»ú

3¡¢ workģʽÊÇ1¸ö¶ÓÁÐN¸öÏû·ÑÕߣ¬¶©ÔÄģʽÊÇN¸ö¶ÓÁÐN¸öÏû·ÑÕß(N>0)

1.2.3.4 Routing·ÓÉģʽ

·ÓÉģʽ£º»ùÓÚ¶©ÔÄģʽ£¬

¿ÉÒÔÔÚ¶ÓÁа󶨵½½»»»»úʱָ¶¨Ò»¸ö¹æÔò£¬¸ù¾Ý²»Í¬µÄÏûÏ¢¹æÔò£¬Ñ¡ÔñÊÇ·ñ½ÓÊܸÃÏûÏ¢¡£

´¦Àí·Óɼü(routingKey)¡£ÐèÒª½«Ò»¸ö¶ÓÁа󶨵½½»»»»úÉÏ£¬ÒªÇó¸ÃÏûÏ¢ÓëÒ»¸öÌØ¶¨µÄ·ÓɼüroutingKeyÍêȫƥÅä¡£

»ùÓÚSubscribe¶©ÔÄģʽ£¬ÅäÖÃÀàÖÐÌí¼Ó¶ÓÁС¢DirectExchange½»»»»ú²¢½øÐа󶨣º

/**

*
* @ÃèÊö£ºrouting·ÓÉģʽµÄ¶ÓÁÐA;
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31
* @return
*/
@Bean
public Queue routingQueueA() {
return new Queue("routing.A");
}

/**
*
* @ÃèÊö£ºrouting·ÓÉģʽµÄ¶ÓÁÐB;
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:24:31
* @return
*/
@Bean
public Queue routingQueueB() {
return new Queue("routing.B");
}

/**
*
* @ÃèÊö£ºDirectExchange½»»»»ú
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:34:41
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}

/**
*
* @ÃèÊö£ºroutingQueue°ó¶¨directExchange½»»»»ú
* @´´½¨ÈË£ºwyait
* @´´½¨Ê±¼ä£º2017Äê9ÔÂ15ÈÕÏÂÎç3:41:10
* @param routingQueue
* @param directExchange
* @return
*/
@Bean
Binding bindingDirectExchangeA(Queue routingQueueA,
DirectExchange directExchange) {
// °ó¶¨routing¶ÓÁÐAµ½directExchange½»»»»ú,
²¢Ö¸¶¨routing·ÓɹæÔò;
return BindingBuilder.bind(routingQueueA()).
to(directExchange())
.with("info");
}

@Bean
Binding bindingDirectExchangeB(Queue routingQueueB,
DirectExchange directExchange) {
// °ó¶¨routing¶ÓÁÐAµ½directExchange½»»»»ú,²¢Ö¸¶¨
routing·ÓɹæÔò;
returnBindingBuilder.bind(routingQueueB()).to
(directExchange())
.with("error");
}

ÏûÏ¢Éú²úÕߣº

@Component

public class RoutingSender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender : " + sendMsg);
this.rabbitTemplate.convertAndSend("directExchange",
"info", sendMsg);
}

public void sendTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender TWO: " +sendMsg);
this.rabbitTemplate
.convertAndSend("directExchange", "infoTwo",sendMsg);
}

public void sendError(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender Error: " +sendMsg);
this.rabbitTemplate.convertAndSend("directExchange",
"error", sendMsg);
}

public void sendErrorTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender ErrorTwo:
" +sendMsg);
this.rabbitTemplate.convertAndSend("directExchange",
"errorTwo",
sendMsg);
}

}

ÏûÏ¢Ïû·ÑÕßA£º

@Component

@RabbitListener(queues ="routing.A")
public class RoutingReceiver {
@RabbitHandler
public void precess(String msg) {
System.out.println("RoutingReceiverA === : " + msg);
}

}

²âÊÔÀࣺ

@Autowired

private RoutingSender routSend;

@Test
public void routingTest() {
System.out.println("==========routing·¢ËÍÏûÏ¢£¡");
routSend.send("==========msg_info ");
routSend.sendTwo("==========msg_infoTwo ");
routSend.sendError("==========msg_error ");
routSend.sendErrorTwo("==========msg_ErrorTwo ");

System.out.println("==========routing·¢ËÍÏûÏ¢ ½áÊø£¡");
}

ÔËÐУº

MqApplication¿ØÖÆÌ¨£º

ÓÉ´Ë¿ÉÒÔ¿´³ö£¬routingKey·ûºÏ¹æÔòµÄÏûÏ¢£¬»á±»Ïû·Ñ·½½ÓÊÕ²¢Ïû·Ñ¡£

1.2.3.5 TopicsͨÅä·ûģʽ

Receiving messagesbased on a pattern (topics)

»ùÓÚ·ÓÉģʽ£¬Ê¹ÓÃͨÅä·ûÆ¥Åä¶ÓÁУ¬·¢ËÍÏûÏ¢

½«Â·ÓɼüºÍijģʽ½øÐÐÆ¥Åä¡£

Èκη¢Ë͵½Topic ExchangeµÄÏûÏ¢¶¼»á±»×ª·¢µ½ËùÓйØÐÄRouteKeyÖÐÖ¸¶¨»°ÌâµÄQueueÉÏ

1. ÕâÖÖģʽÐèÒªRouteKey£¬ÒªÌáǰ°ó¶¨ExchangeÓëQueue¡£

2. Èç¹ûExchangeûÓз¢ÏÖÄܹ»ÓëRouteKeyÆ¥ÅäµÄQueue£¬Ôò»áÅׯú´ËÏûÏ¢¡£

3. ÔÚ½øÐаó¶¨Ê±£¬ÒªÌṩһ¸ö¸Ã¶ÓÁйØÐĵÄÖ÷Ì⣬Èç¡°#.log.#¡±±íʾ¸Ã¶ÓÁйØÐÄËùÓÐÉæ¼°logµÄÏûÏ¢(Ò»¸öRouteKeyΪ¡±MQ.log.error¡±µÄÏûÏ¢»á±»×ª·¢µ½¸Ã¶ÓÁÐ)¡£

4. ¡°#¡±±íʾ0¸ö»òÈô¸É¸ö¹Ø¼ü×Ö£¬¡°*¡±±íʾһ¸ö¹Ø¼ü×Ö¡£Èç¡°log.*¡±ÄÜÓë¡°log.warn¡±Æ¥Å䣬ÎÞ·¨Óë¡°log.warn.timeout¡±Æ¥Å䣻µ«ÊÇ¡°log.#¡±ÄÜÓëÉÏÊöÁ½Õ߯¥Åä¡£

ͨÅä·û#ºÍ*µÄÇø±ð£»

#£º´ú±íÆ¥ÅäÒ»¸ö»ò¶à¸ö´Ê£»

*£º´ú±íֻƥÅäÒ»¸ö´Ê.

ÅäÖÃÀàÖÐÐÂÔö¶ÓÁаó¶¨TopicExchange½»»»»ú£¬²¢Ö¸¶¨routingKeyºÍÆ¥Åäģʽ£º

@Bean

public Queue topicQueueA() {
return new Queue("topic.queueA", true); // true±íʾ³Ö¾Ã»¯¸Ã¶ÓÁÐ
}

@Bean
public Queue topicQueueB() {
return new Queue("topic.queueB", true);
}

// ÉùÃ÷½»»¥Æ÷
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}

// °ó¶¨
@Bean
public Binding bindingA() {
return BindingBuilder.bind(topicQueueA()).to(topicExchange())
.with("topic.message");
}

@Bean
public Binding bindingB() {
return BindingBuilder.bind(topicQueueB()).to(topicExchange())
.with("topic.#");
}

ÏûÏ¢Éú²úÕߣº

@Component

public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---TopicSender : " + sendMsg);
this.rabbitTemplate.convertAndSend("topicExchange",
"topic.message",
sendMsg);
}

public void sendTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---TopicSender messages: " +sendMsg);
this.rabbitTemplate.convertAndSend("topicExchange",
"topic.messages",
sendMsg);
}

}

ÏûÏ¢Ïû·ÑÕߣº

@Component

@RabbitListener(queues ="topic.queueA")
public class TopicReceiver {
@RabbitHandler
public void precess(String msg) {
System.out.println("TopicReceiverA : " + msg);
}
}

test²âÊÔÀࣺ

@Autowired

private TopicSender topicSend;

@Test
public void topicTest() {
System.out.println("==========topic·¢ËÍÏûÏ¢£¡");
topicSend.send("==========msg_info ");
topicSend.sendTwo("==========msg_infoTwo ");

System.out.println("==========topic·¢ËÍÏûÏ¢ ½áÊø£¡");
}

ÖØÆôMqApplication£¬ÔËÐÐtestÀࣺ½á¹û£º

¸ù¾Ý·ÓɹæÔò£¬½ÓÊÕ²»Í¬Éú²úÕßµÄÏûÏ¢¡£

1.2.3.6 ½»»»»ú×ܽá

RPCģʽ¿ÉÒ԰ٶȲé×ÊÁÏÈ¥Á˽â!

FanoutExchange: ½«ÏûÏ¢·Ö·¢µ½ËùÓеİ󶨶ÓÁУ¬ÎÞroutingkeyµÄ¸ÅÄî

HeadersExchange £ºÍ¨¹ýÌí¼ÓÊôÐÔkey-valueÆ¥Åä

DirectExchange:°´ÕÕroutingkey·Ö·¢µ½Ö¸¶¨¶ÓÁÐ

TopicExchange:¶à¹Ø¼ü×ÖÆ¥Åä

1.2.4 ¹ÜÀí½çÃæ²Ù×÷¶ÓÁкͽ»»»»ú

½øÈëExchanges½»»»»ú½çÃæ£¬¿ÉÒÔ¿´µ½ËùÓеÄAMQPĬÈϵĽ»»»»úºÍ¶¨ÒåµÄExchange:

Ñ¡ÔñtopicExchange:

¿ÉÒÔ¶Ô¶ÓÁнøÐÐÌí¼ÓºÍ½â°ó²Ù×÷£¡

1.2.5 ¶ÓÁеij־û¯

RabbitMQµÄ¶ÓÁÐÓÐ2ÖÖ£¬Ò»ÖÖÊÇÄÚ´æ¶ÓÁУ¬Ò»ÖÖÊdz־û¯¶ÓÁÐ

1¡¢ ÄÚ´æ¶ÓÁÐ

Óŵ㣺Ëٶȿ죬ЧÂʸß

ȱµã£ºå´»ú£¬ÏûÏ¢¶ªÊ§

2¡¢ ³Ö¾Ã»¯¶ÓÁÐ

Óŵ㣺ÏûÏ¢¿ÉÒԳ־û¯±£´æ£¬å´»ú»ò¶ÏµçºóÏûÏ¢²»¶ªÊ§

ȱµã£º±ÈÄÚ´æ´æ´¢ËÙ¶ÈÂý£¬ÐÔÄܲî

ÉèÖ÷½·¨£º

@Bean

public Queue topicQueueA() {
return new Queue("topic.queueA", true); // true±íʾ³Ö¾Ã»¯¸Ã¶ÓÁÐ
}

¹ÜÀí½çÃæ²é¿´ÊÇ·ñ³Ö¾Ã»¯£º

D ³Ö¾Ã»¯

ÏîĿԴÂ룬

ÂëÔÆµØÖ·£ºhttps://git.oschina.net/wyait/springboot1.5.4.git

githubµØÖ·£ºhttps://github.com/wyait/spring-boot-1.5.4.git

 

   
6609 ´Îä¯ÀÀ       36
Ïà¹ØÎÄÕÂ

ÆóÒµ¼Ü¹¹¡¢TOGAFÓëArchiMate¸ÅÀÀ
¼Ü¹¹Ê¦Ö®Â·-ÈçºÎ×öºÃÒµÎñ½¨Ä££¿
´óÐÍÍøÕ¾µçÉÌÍøÕ¾¼Ü¹¹°¸ÀýºÍ¼¼Êõ¼Ü¹¹µÄʾÀý
ÍêÕûµÄArchimateÊÓµãÖ¸ÄÏ£¨°üÀ¨Ê¾Àý£©
Ïà¹ØÎĵµ

Êý¾ÝÖÐ̨¼¼Êõ¼Ü¹¹·½·¨ÂÛÓëʵ¼ù
ÊÊÓÃArchiMate¡¢EA ºÍ iSpace½øÐÐÆóÒµ¼Ü¹¹½¨Ä£
ZachmanÆóÒµ¼Ü¹¹¿ò¼Ü¼ò½é
ÆóÒµ¼Ü¹¹ÈÃSOAÂ䵨
Ïà¹Ø¿Î³Ì

ÔÆÆ½Ì¨Óë΢·þÎñ¼Ü¹¹Éè¼Æ
ÖÐ̨սÂÔ¡¢ÖÐ̨½¨ÉèÓëÊý×ÖÉÌÒµ
ÒÚ¼¶Óû§¸ß²¢·¢¡¢¸ß¿ÉÓÃϵͳ¼Ü¹¹
¸ß¿ÉÓ÷ֲ¼Ê½¼Ü¹¹Éè¼ÆÓëʵ¼ù