±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬ÎÄÕÂÖ÷Òª½éÉÜÁËJMS×÷Óã¬Ä£ÐÍ£¬»ù±¾¹¹¼þ£¬ÏûÏ¢·¢ËÍʱÐòͼµÈ»ù´¡ÖªÊ¶ºÍActiveMQÄ£ÐÍ·ÖÎö֪ʶ¡£
|
|
1.JMS½éÉÜ
Java Message Service£¨JMS£©ÊÇSUNÌá³öµÄÖ¼ÔÚͳһ¸÷ÖÖMOM£¨Message-Oriented
Middleware £©ÏµÍ³½Ó¿ÚµÄ¹æ·¶£¬Ëü°üº¬µã¶Ôµã£¨Point to Point£¬PTP£©ºÍ·¢²¼/¶©ÔÄ£¨Publish/Subscribe£¬pub/sub£©Á½ÖÖÏûϢģÐÍ£¬Ìṩ¿É¿¿ÏûÏ¢´«Êä¡¢ÊÂÎñºÍÏûÏ¢¹ýÂ˵ȻúÖÆ¡£
¼òµ¥µÄ˵£¬JMSÖÆ¶¨ÁËÒ»¸ö·¢ÏûÏ¢µÄ¹æ·¶£¬ÊÇÒ»¸öÓë¾ßÌåÆ½Ì¨Î޹صÄAPI£¬¾ø´ó¶àÊýMOM£¨ÃæÏòÏûÏ¢Öмä¼þ£©ÌṩÉ̶¼¶ÔJMSÌṩ֧³Ö¡£
ActiveMQÊÇApache³öÆ·µÄ¿ªÔ´ÏîÄ¿£¬ËüÊÇJMS¹æ·¶µÄÒ»¸öʵÏÖ¡£
2.JMS×÷ÓÃ
ÔÚ²»Í¬Ó¦ÓÃÖ®¼ä½øÐÐͨÐÅ»òÕß´ÓÒ»¸öϵͳ´«ÊäÊý¾Ýµ½ÁíÍâÒ»¸öϵͳ¡£Á½¸öÓ¦ÓóÌÐòÖ®¼ä£¬»ò·Ö²¼Ê½ÏµÍ³Öз¢ËÍÏûÏ¢£¬½øÐÐÒ첽ͨÐÅ¡£
ÕâÀàÎÊÌâÓкܶà½â¾ö·½°¸ ,±ÈÈçSOA¡¢SocketͨÐÅ¡¢RMIµÈ£¬µ«ÎÒÃÇÐèÒª¸ù¾ÝÏîÄ¿µÄÏÞÖÆÒÔ¼°¹¦ÄܺÍÐÔÄܵÄÐèÒª×÷³öÑ¡Ôñ¡£
JMSµÄÓ¦Óó¡¾°£º¹æÄ£ºÍ¸´ÔӶȽϸߵķֲ¼Ê½ÏµÍ³
(1)ͬ²½Í¨ÐÅ£º¿Í»§·¢³öµ÷Óú󣬱ØÐëµÈ´ý·þÎñ¶ÔÏóÍê³É´¦Àí²¢·µ»Ø½á¹ûºó²ÅÄܼÌÐøÖ´ÐÐ
(2)¿Í»§ºÍ·þÎñ¶ÔÏóµÄÉúÃüÖÜÆÚ½ôÃÜñîºÏ£º¿Í»§½ø³ÌºÍ·þÎñ¶ÔÏó½ø³Ì¶¼±ØÐëÕý³£ÔËÐУ¬Èç¹ûÓÉÓÚ·þÎñ¶ÔÏó±ÀÀ£»òÕßÍøÂç¹ÊÕϵ¼Ö¿ͻ§µÄÇëÇ󲻿ɴ¿Í»§»á½ÓÊÕµ½Òì³£
(3)µã¶ÔµãͨÐÅ£º¿Í»§µÄÒ»´Îµ÷ÓÃÖ»·¢Ë͸øÄ³¸öµ¥¶ÀµÄÄ¿±ê¶ÔÏó

3.JMSÄ£ÐÍ
JavaÏûÏ¢·þÎñÓ¦ÓóÌÐò½á¹¹Ö§³ÖÁ½ÖÖÄ£ÐÍ£º
(1)µã¶ÔµãÄ£ÐÍ(»ùÓÚ¶ÓÁÐ)
ÿ¸öÏûÏ¢Ö»ÄÜÓÐÒ»¸öÏû·ÑÕߣ¬ÏûÏ¢µÄÉú²úÕߺÍÏû·ÑÕßÖ®¼äûÓÐʱ¼äÉϵÄÏà¹ØÐÔ£¬¿ÉÒÔÓɶà¸ö·¢ËÍÕߣ¬µ«Ö»Äܱ»Ò»¸öÏû·ÑÕßÏû·Ñ¡£
Ò»¸öÏûÏ¢Ö»Äܱ»Ò»¸ö½ÓÊÜÕß½ÓÊÜÒ»´Î
Éú²úÕß°ÑÏûÏ¢·¢Ë͵½¶ÓÁÐÖÐ(Queue)£¬Õâ¸ö¶ÓÁпÉÒÔÀí½âΪµçÊÓ»úƵµÀ(channel)
ÔÚÕâ¸öÏûÏ¢Öмä¼þÉÏÓжà¸öÕâÑùµÄchannel
½ÓÊÜÕßÎÞÐè¶©ÔÄ£¬µ±½ÓÊÜÕßδ½ÓÊܵ½ÏûϢʱ¾Í»á´¦ÓÚ×èÈû״̬
(2)·¢²¼Õß/¶©ÔÄÕßÄ£ÐÍ£¨»ùÓÚÖ÷ÌâµÄ£©
ÿ¸öÏûÏ¢¿ÉÒÔÓжà¸öÏû·ÑÕߣ¬Éú²úÕߺÍÏû·ÑÕßÖ®¼äÓÐʱ¼äÉϵÄÏà¹ØÐÔ£¬¶©ÔÄÒ»¸öÖ÷ÌâµÄÏû·ÑÕßÖ»ÄÜÏû·Ñ×ÔËü¶©ÔÄÖ®ºó·¢²¼µÄÏûÏ¢¡£
ÔÊÐí¶à¸ö½ÓÊÜÕߣ¬ÀàËÆÓڹ㲥µÄ·½Ê½
Éú²úÕß½«ÏûÏ¢·¢Ë͵½Ö÷ÌâÉÏ(Topic)
½ÓÊÜÕß±ØÐëÏȶ©ÔÄ
×¢£º³Ö¾Ã»¯¶©ÔÄÕߣºÌØÊâµÄÏû·ÑÕߣ¬¸æËßÖ÷Ì⣬ÎÒÒ»Ö±¶©ÔÄ×Å£¬¼´Ê¹ÍøÂç¶Ï¿ª£¬ÏûÏ¢·þÎñÆ÷Ò²¼ÇסËùÓг־û¯¶©ÔÄÕߣ¬Èç¹ûÓÐÐÂÏûÏ¢£¬Ò²»áÖªµÀ±Ø¶¨ÓÐÈË»ØÀ´Ïû·Ñ¡£

4.JMSµÄ»ù±¾¹¹¼þ
Á¬½Ó¹¤³§£ºÁ¬½Ó¹¤³§Êǿͻ§ÓÃÀ´´´½¨Á¬½ÓµÄ¶ÔÏó£¬ÀýÈçActiveMQÌṩµÄActiveMQConnectionFactory
Á¬½Ó£º JMS Connection·â×°ÁËJMS ¿Í»§¶Ëµ½JMS Provider µÄÁ¬½ÓÓëJMSÌṩÕßÖ®¼äµÄÒ»¸öÐéÄâµÄÁ¬½Ó
»á»°£º JMS SessionÊÇÉú²úºÍÏû·ÑÏûÏ¢µÄÒ»¸öµ¥Ïß³ÌÉÏÏÂÎÄ£¬»á»°ÓÃÓÚ´´½¨ÏûÏ¢µÄÉú²úÕߣ¨producer£©£¬Ïû·ÑÕߣ¨consumer£©£¬ÏûÏ¢£¨message£©µÈ¡£»á»°ÊÇÒ»¸öÊÂÎñÐÔµÄÉÏÏÂÎÄ£¬ÏûÏ¢µÄÉú²úºÍÏû·Ñ²»Äܰüº¬ÔÚͬһ¸öÊÂÎñÖС£
Éú²úÕߣºMessageProducer ÓÉSession ¶ÔÏó´´½¨µÄÓÃÀ´·¢ËÍÏûÏ¢µÄ¶ÔÏó
Ïû·ÑÕߣºMessageConsumer ÓÉSession ¶ÔÏó´´½¨µÄÓÃÀ´½ÓÊÕÏûÏ¢µÄ¶ÔÏó
ÏûÏ¢£ºMessage jmsÏûÏ¢°üÀ¨ÏûϢͷºÍÏûÏ¢ÌåÒÔ¼°ÆäËüµÄÀ©Õ¹ÊôÐÔ¡£JMS¶¨ÒåµÄÏûÏ¢ÀàÐÍTextMessage¡¢MapMessage¡¢BytesMessage¡¢StreamMessageºÍObjectMessage¡£
Ä¿µÄµØ£ºDestination ÏûÏ¢µÄÄ¿µÄµØ£¬ÊÇÓÃÀ´Ö¸¶¨Éú²úµÄÏûÏ¢µÄÄ¿±êºÍËüÏû·ÑµÄÏûÏ¢µÄÀ´Ô´µÄ¶ÔÏó
ÏûÏ¢¶ÓÁУºQueue µã¶ÔµãµÄÏûÏ¢¶ÓÁÐ
ÏûÏ¢Ö÷Ì⣺Tipic ·¢²¼¶©ÔĵÄÏûÏ¢¶ÓÁÐ
5.JMSÏûÏ¢·¢ËÍʱÐòͼ

6.JMSÏûÏ¢·¢ËÍ¿ª·¢Á÷³Ì
(1)Éú²úÕߣ¨producer£©¿ª·¢Á÷³Ì£¨ProducerTool.java£©
1.1 ´´½¨Connection£º ¸ù¾Ýurl£¬userºÍpassword´´½¨Ò»¸öjms Connection¡£
1.2 ´´½¨Session£ºÔÚconnectionµÄ»ù´¡ÉÏ´´½¨Ò»¸ösession£¬Í¬Ê±ÉèÖÃÊÇ·ñÖ§³ÖÊÂÎñºÍACKNOWLEDGE±êʶ¡£
1.3 ´´½¨Destination¶ÔÏó£ºÐèÖ¸¶¨Æä¶ÔÓ¦µÄÖ÷Ì⣨subject£©Ãû³Æ£¬producerºÍconsumer½«¸ù¾ÝsubjectÀ´·¢ËÍ/½ÓÊÕ¶ÔÓ¦µÄÏûÏ¢¡£
1.4 ´´½¨MessageProducer£º¸ù¾ÝDestination´´½¨MessageProducer¶ÔÏó£¬Í¬Ê±ÉèÖÃÆä³Ö¾Ãģʽ¡£
1.5 ·¢ËÍÏûÏ¢µ½¶ÓÁУ¨Queue£©£º·â×°TextMessageÏûÏ¢£¬Ê¹ÓÃMessageProducerµÄsend·½·¨½«ÏûÏ¢·¢ËͳöÈ¥¡£
(2)Ïû·ÑÕߣ¨consumer£©¿ª·¢Á÷³Ì£¨ConsumerTool.java£©
2.1 ʵÏÖMessageListener½Ó¿Ú£ºÏû·ÑÕßÀà±ØÐëʵÏÖMessageListener½Ó¿Ú£¬È»ºóÔÚonMessage()·½·¨ÖмàÌýÏûÏ¢µÄµ½´ï²¢´¦Àí¡£
2.2 ´´½¨Connection£º¸ù¾Ýurl£¬userºÍpassword´´½¨Ò»¸öjms Connection£¬Èç¹ûÊÇdurableģʽ£¬»¹ÐèÒª¸øconnectionÉèÖÃÒ»¸öclientId¡£
2.3 ´´½¨SessionºÍDestination
2.4´´½¨replyProducer¡¾¿ÉÑ¡¡¿£º¿ÉÒÔÓÃÀ´½«ÏûÏ¢´¦Àí½á¹û·¢Ë͸øproducer¡£
2.5 ´´½¨MessageConsumer£º ¸ù¾ÝDestination´´½¨MessageConsumer¶ÔÏó¡£
2.6 Ïû·Ñmessage£ºÔÚonMessage()·½·¨ÖнÓÊÕproducer·¢Ë͹ýÀ´µÄÏûÏ¢½øÐд¦Àí£¬²¢¿ÉÒÔͨ¹ýreplyProducer·´À¡ÐÅÏ¢¸øproducer¡£
7.JMSÏûÏ¢µÄÊÂÎñ
(1)´´½¨ÊÂÎñcreateSession(paramA,paramB)
paramAÊÇÉèÖÃÊÂÎñµÄ£¬paramBÉèÖÃacknowledgment mode£¨Ó¦´ðģʽ£©
paramAÉèÖÃΪfalseʱ£ºparamBµÄÖµ¿ÉΪSession.AUTO_ACKNOWLEDGE£¬Session.CLIENT_ACKNOWLEDGE£¬
DUPS_OK_ACKNOWLEDGEÆäÖÐÒ»¸ö¡£
(2)ÊÂÎñµÄÓ¦´ðÈ·ÈÏ
A£©paramAÉèÖÃΪtrueʱ£º
paramBµÄÖµºöÂÔ£¬ acknowledgment mode±»jms·þÎñÆ÷ÉèÖà SESSION_TRANSACTED
¡£µ±Ò»¸öÊÂÎñ±»Ìá½»µÄʱºò£¬ÏûϢȷÈϾͻá×Ô¶¯·¢Éú¡£
B£© paramAÉèÖÃΪfalseʱ£º
Session.AUTO_ACKNOWLEDGEΪ×Ô¶¯È·ÈÏ£¬µ±¿Í»§³É¹¦µÄ´Óreceive·½·¨·µ»ØµÄʱºò£¬»òÕß´Ó
MessageListener.onMessage·½·¨³É¹¦·µ»ØµÄʱºò£¬»á»°×Ô¶¯È·ÈϿͻ§ÊÕµ½µÄÏûÏ¢¡£
Session.CLIENT_ACKNOWLEDGE Ϊ¿Í»§¶ËÈ·ÈÏ£¬¿Í»§¶Ë½ÓÊÕµ½ÏûÏ¢ºó£¬±ØÐëµ÷ÓÃjavax.jms.MessageµÄ
acknowledge·½·¨£¬jms·þÎñÆ÷²Å»áɾ³ýÏûÏ¢¡££¨Ä¬ÈÏÊÇÅúÁ¿È·ÈÏ£©
DUPS_OK_ACKNOWLEDGE ÔÊÐí¸±±¾µÄÈ·ÈÏģʽ£¬Ò»µ©½ÓÊÕ·½Ó¦ÓóÌÐòµÄ·½·¨µ÷ÓôӴ¦ÀíÏûÏ¢´¦·µ»Ø£¬»á
»°¶ÔÏó¾Í»áÈ·ÈÏÏûÏ¢µÄ½ÓÊÕ£¬¶øÇÒÔÊÐíÖØ¸´È·ÈÏ¡£Èç¹ûÊÇÖØ¸´µÄÏûÏ¢£¬ÄÇôJMS provider±ØÐë°ÑÏûϢͷµÄJMSRedelivered×Ö¶ÎÉèÖÃΪtrue¡£
8.Ïû·ÑÕßµÄÏû·Ñ·½Ê½
1£©Í¬²½Ïû·Ñ£ºÍ¨¹ýµ÷ÓÃÏû·ÑÕßµÄreceive·½·¨´ÓÄ¿µÄµØÖÐÏÔʽÌáÈ¡ÏûÏ¢¡£receive·½·¨¿ÉÒÔÒ»Ö±×èÈûµ½ÏûÏ¢µ½´ï¡£
2£©Òì²½Ïû·Ñ£º¿Í»§¿ÉÒÔΪÏû·ÑÕß×¢²áÒ»¸öÏûÏ¢¼àÌýÆ÷£¬ÒÔ¶¨ÒåÔÚÏûÏ¢µ½´ïʱËù²ÉÈ¡µÄ¶¯×÷¡£ÊµÏÖMessageListener½Ó¿Ú£¬ÔÚMessageListener£¨£©·½·¨ÖÐʵÏÖÏûÏ¢µÄ´¦ÀíÂß¼¡£
9.JMSµÄͨÐÅ»úÖÆ

activeMQÖ§³Ö¶àÖÖͨѶÐÒéTCP/UDPµÈ£¬ÎÒÃÇѡȡ×î³£ÓõÄTCPÀ´·ÖÎöactiveMQµÄͨѶ»úÖÆ¡£Ê×ÏÈÎÒÃÇÀ´Ã÷È·Ò»¸ö¸ÅÄ
1£©¿Í»§(Client)£ºÏûÏ¢µÄÉú²úÕß¡¢Ïû·ÑÕß¶ÔactiveMQÀ´Ëµ¶¼½Ð×÷¿Í»§¡£
2£©ÏûÏ¢ÖÐתÆ÷(Message broker)£ºËüÊÇactiveMQµÄºËÐÄ£¬Ëü½ÓÊÕÐÅÏ¢²¢½øÐÐÏà¹Ø´¦Àíºó·Ö·¢¸øÏûÏ¢Ïû·ÑÕß¡£
ΪÁËÄÜÇå³þµÄÃèÊö³öactiveMQµÄºËÐÄͨѶ»úÖÆ£¬ÎÒÃÇÑ¡Ôñ3¸ö²¿·ÖÀ´½øÐÐ˵Ã÷£¬ËüÃÇ·Ö±ðÊǽ¨Á¢Á´½Ó¡¢¹Ø±ÕÁ´½Ó¡¢ÐÄÌø¡£
Ò»¡¢Client¸úactiveMQµÄTCPͨѶµÄ³õʼ»¯¹ý³Ì·ÖÎöÈçÏ£º
1£® activeMQ³õʼ»¯Ê±£¬Í¨¹ýTcpTransportServerÀà¸ù¾ÝÅäÖôò¿ªTCPÕìÌý¶Ë¿Ú£¬¿Í»§Í¨¹ý¸Ã¶Ë¿Ú·¢Æð½¨Á¢Á´½ÓµÄ¶¯×÷¡£
2£® °ÑacceptµÄSocket·ÅÈë×èÈû¶ÓÁÐÖС£
3£® ÁíÍâÒ»¸öÏß³ÌSocket handler×èÈû×ŵȴý¶ÓÁÐÖÐÊÇ·ñÓÐеÄSocket£¬Èç¹ûÓÐÔòÈ¡³öÀ´¡£
4£® Éú³ÉÒ»¸öTransportConnectionµÄʵÀý¡£TransportConnectionÀàµÄÖ÷Òª×÷ÓÃÊÇ´¦ÀíÁ´Â·µÄ״̬ÐÅÏ¢£¬²¢ÊµÏÖCommandVisitor½Ó¿ÚÀ´Íê³É¸÷ÀàÏûÏ¢µÄ´¦Àí¡£
5£® TransportConnection»áʹÓÃÒ»¸öÓɶà¸öTransportFilterʵÀý×é³ÉµÄÏûÏ¢´¦ÀíÁ´Ìõ£¬¸ºÔð¶Ô½ÓÊÕµ½µÄ¸÷ÀàÏûÏ¢½øÐд¦Àí²¢·¢ËÍÏàÓ¦µÄÓ¦´ð¡£Õâ¸öÁ´ÌõµÄµäÐÍ×é³É˳Ðò£ºMutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport¡£ÔÚÕâÌõÁ´ÌõÖÐ×îºóµÄÒ»»·¾ÍÊÇTcpTransportÀ࣬ËüÊÇʵ¼ÊºÍClient»ñÈ¡ºÍ·¢ËÍÊý¾ÝµÄµØ·½£¬¸ÃÀàµÄÖØÒª¡£
6£® ½¨Á´Íê³É£¬¿ÉÒÔ½øÐÐͨѶ²Ù×÷¡£·½·¨ÓÐrun()ºÍoneway()£¬Ò»¸ö¸ºÔð¶ÁÈ¡£¬Ò»¸ö¸ºÔð·¢ËÍ¡£
¶þ¡¢¹Ø±ÕÁ´½Ó
1£® activeMQ·¢ÏÖTCPÁ´½ÓµÄ¹Ø±Õ£¬×î¹Ø¼üµÄ´úÂëÔÚTcpBufferedInputStreamÀàÖеÄ
int n = in.read(buffer, position, buffer.length -
position);
Èý¡¢ÐÄÌø
ΪÁ˸üºÃµÄά»¤TCPÁ´Â·µÄʹÓã¬activeMQ²ÉÓÃÁËÐÄÌø»úÖÆ×÷ΪÅжÏË«·½Á´Â·µÄ½¡¿µÇé¿ö¡£activeMQʹÓõÄÊÇË«ÏòÐÄÌø£¬Ò²¾ÍÊÇactiveMQµÄBrokerºÍClientË«·½¶¼½øÐÐÏ໥ÐÄÌø£¬µ«²»¹ÜÊÇBroker»òClientÐÄÌøµÄ¾ßÌå´¦ÀíÇé¿öÊÇÍêȫһÑùµÄ£¬¶¼ÔÚInactivityMonitorÀàÖÐʵÏÖ£¬ÏÂÃæ¾ßÌå½éÉÜ¡£
1£® ÐÄÌø»á²úÉúÁ½¸öÏ̡߳°InactivityMonitor ReadCheck¡±ºÍ¡°InactivityMonitor
WriteCheck¡±£¬ËüÃǶ¼ÊÇTimerÀàÐÍ£¬¶¼»á¸ôÒ»¶Î¹Ì¶¨Ê±¼ä±»µ÷ÓÃÒ»´Î¡£ReadCheckÏß³ÌÖ÷Òªµ÷Óõķ½·¨ÊÇreadCheck()£¬µ±Ôڵȴýʱ¼äÄÚ£¬ÓÐÏûÏ¢½ÓÊÕµ½£¬Ôò¸Ã·½·¨»á·µ»Øtrue¡£WriteCheckÏß³ÌÖ÷Òªµ÷Óõķ½·¨ÊÇwriteCheck()£¬ÕâÓиöС¼¼ÇÉ£¬´ó¼Ò¿ÉÒԲο¼Ò»Ï£¬ÄǾÍÊǵ±WriteCheckÏß³ÌÐÝÃßʱ£¬ÓÐÈκÎÊý¾Ý·¢Ëͳɹ¦£¬Ôò¸ÃÏ̱߳»»½ÐѺ󣬲»ÓÃͨ¹ýTCPÏò¶Ô·½ÕæµÄ·¢ËÍÐÄÌøÏûÏ¢£¬ÕâÑù¿ÉÒÔ´ÓÒ»¶¨³Ì¶ÈÉϼõÉÙÍøÂç´«ÊäµÄÊý¾ÝÁ¿¡£
10.ActiveMQÄ£ÐÍ·ÖÎö
Ê×ÏȽéÉܸÃÄ£ÐÍÖÐÿ¸öÁìÓòÀàµÄ×÷Óã¬È»ºóÔÙ½éÉÜËüÃÇÖ®¼äµÄ¹ØÏµ¡£
Broker£ºactiveMQµÄÒ»¸öÕûÌå´ú±í
RegionBroker£º¸ºÔð·Ö·¢brokerµÄ²Ù×÷µ½ÏàÓ¦µÄÏûÏ¢ÇøÓò
Region£ºactiveMQĿǰÓÐËÄÖÖÖ÷ÒªÏûÏ¢ÇøÓò£º¶ÓÁÐÓò(queueRegion)¡¢Ö÷ÌâÓò(topicRegion)¡¢ÁÙʱ¶ÓÁÐÓò(tempQueueRegion)¡¢ÁÙʱÖ÷ÌâÓò(tempTopicRegion)
TransportConnection£º´ú±íÒ»¸öͨѶÁ¬½Ó
Destination£ºÏûÏ¢µÄÄ¿µÄµØ£¬Ö÷Òª°üÀ¨Á½ÖÖQueue¡¢TopicÁ½ÖÖ
Subscription£ºÏûÏ¢µÄÏû·ÑÕß¡¢¶©ÔÄÕß
MessageStore£ºÏûÏ¢³Ö¾Ã»¯´æ´¢£¬Ïñ±È½Ï¸´ÔÓµÄKaha´æ´¢»úÖÆ¾Í·ÅÔÚÕâ
PendingMessageCursor£ºµÈ´ý·¢¸øÏû·ÑÕßµÄÏûÏ¢·Ö·¢Ö¸Õë
ConnectionContext£ºÓÃÀ´Î¬»¤·¢ËÍÇëÇóËùÐèµÄÁ¬½ÓÉÏÏÂÎÄ
1£©ActiveMQÄ£ÐÍ·ÖÎö¡ª¾²Ì¬Ä£ÐÍ

ÏÂÃæÎÒÃǰÑÕâЩÁìÓòÀàµÄ¹ØÏµ½øÐÐÒ»¸öÃèÊö£º
1¡¢Ò»¸öRegionBrokerÓµÓÐ4ÖÖÏûÏ¢ÓòµÄ¶ÔÏó¡£
2¡¢RegionBrokerÓµÓÐËùÓÐÄ¿µÄµØ¶ÔÏó(destination)
3¡¢Ã¿¸öÏûÏ¢Óò(Region)Ò²ÓµÓÐËüÃǶÔÓ¦µÄ0»òN¸öÄ¿µÄµØ¶ÔÏó(destination)
4¡¢Í¬Ê±Ã¿¸öRegionÒ²ÓµÓÐËüÃǶÔÓ¦µÄ0»òN¸öÏûÏ¢Ïû·ÑÕß¡¢¶©ÔÄÕß(subscription)
5¡¢Ã¿¸öÄ¿µÄµØ¶¼ÓÐÒ»¸öÏàÓ¦µÄ³Ö¾Ã»¯´æ´¢·½Ê½(messageStore)£¬ÒÔ¼°Ò»¸öµÈ´ý·¢Ë͵ÄÏûÏ¢·Ö·¢Ö¸Õë(pendingMessageCursor)
6¡¢ÏûÏ¢Ïû·ÑÕߺÍÄ¿µÄµØ¿ÉÒԱ˴ËÓµÓÐ0»òN¸ö
7¡¢Ã¿¸öÏû·ÑÕß¶¼ÓÐÒ»¸ö¶ÔÓ¦µÄConnectionContext£¬ConnectionContextÀï°üÀ¨Ò»¸öTransportConnection¶ÔÏó£¬Í¨¹ýTransportConnection°ÑÕæÊµµÄÏûÏ¢·¢¸øÏû·ÑÕß¡£
8¡¢TransportConnectionÒ²¿ÉÒÔ×öΪͨѶÁ¬½Ó£¬ÕìÌýÏûÏ¢Éú²úÕß·¢³öµÄÐÅÏ¢£¬ËùÒÔÿ¸öTransportConnection»áÖ¸ÏòBroker¶ÔÏó¡£
2£©ActiveMQÄ£ÐÍ·ÖÎö¡ª¨C¶¯Ì¬Ä£ÐÍ

Ïû·ÑÉú²úÕß½ø³ÌÏòactiveMQËùÔÚ½ø³Ì·¢ËÍÏûÏ¢ºÍÏû·ÑÕßÏû·ÑÏûÏ¢µÄ¹ý³ÌÈçÉÏͼËùʾ£¬ÏûÏ¢´«µÝµÄ·¾¶¾¹ýÁ˺ËÐÄÁìÓòÄ£ÐÍ£¬¾ßÌå²½ÖèÈçÏ£º
1¡¢Éú²úÕßͨ¹ýÏòactiveMQΪËü½¨Á¢ºÃµÄTransportConnection·¢ËÍÏûÏ¢¸øactiveMQ
2¡¢TransportConnection¶ÔÏóÕÒµ½RegionBroker
3¡¢RegionBroker¸ù¾ÝÏûÏ¢µÄÀàÐÍÕÒµ½¶ÔÓ¦µÄÏûÏ¢ÇøÓò(Region)
4¡¢¸ÃRegionÔÚËü×Ô¼ºÀïÃæÕÒµ½ÏàÓ¦µÄÏûϢĿµÄµØ¡£
5¡¢6£º¸ÃÄ¿µÄµØÊ×Ïȸù¾ÝÐèÒª½øÐг־û¯²Ù×÷£¬²¢Ê¹Óôý·¢ËÍÏûÏ¢Ö¸Õë¶ÔÏó
7¡¢µ±ÓкÏÊʵÄÏûÏ¢Ïû·ÑÕß¡¢¶©ÔÄÕßÀ´µ½Ê±£¬Ä¿µÄµØ»áÕÒµ½ÕâЩÏû·ÑÕß
8¡¢9£ºÍ¨¹ý¸ÃÏû·ÑÕß¶ÔÓ¦µÄTransportConnection£¬·¢¸øÏàÓ¦µÄÏû·ÑÕß½ø³Ì
11.ActiveMQÏûÏ¢·Ö·¢Ö¸Õë
ÏûÏ¢·Ö·¢ÓαêÊÇÓÃÀ´±£´æJMSÏûÏ¢µÄÒýÓã¬ÏûÏ¢ÓαêµÄ´¦Àí¹ý³ÌÈçÏ£º
1.µ±producer·¢Ë͵ij־û¯ÏûÏ¢µ½´ïbrokerÖ®ºó£¬brokerÊ×ÏÈ»á°ÑËü±£´æÔڳ־ô洢ÖÐ
2.Èç¹û·¢ÏÖµ±Ç°ÓлîÔ¾µÄconsumer£¬¶øÇÒÕâ¸öconsumerÏû·ÑÏûÏ¢µÄËÙ¶ÈÄܸúÉÏproducerÉú²úÏûÏ¢µÄËÙ¶È£¬ÄÇôActiveMQ»áÖ±½Ó°ÑÏûÏ¢´«µÝ¸øbrokerÄÚ²¿¸úÕâ¸öconsumer¹ØÁªµÄqueue¡£
3.Èç¹ûµ±Ç°Ã»ÓлîÔ¾µÄconsumer»òÕßconsumerÏû·ÑÏûÏ¢µÄËٶȸú²»ÉÏproducerÉú²úÏûÏ¢µÄËÙ¶È£¬ÄÇôActiveMQ»áʹÓÃPending
Message Cursors±£´æ¶ÔÏûÏ¢µÄÒýÓá£
4.Pending Message Cursors°ÑÏûÏ¢ÒýÓô«µÝ¸øbrokerÄÚ²¿¸úÕâ¸öconsumer¹ØÁªµÄdispatch
queue¡£ÒÔÏÂÊÇÁ½ÖÖPending Message Cursors£º
VM Cursor ÔÚÄÚ´æÖб£´æÏûÏ¢µÄÒýÓÃ
File Cursor Ê×ÏÈÔÚÄÚ´æÖб£´æÏûÏ¢µÄÒýÓã¬Èç¹ûÄÚ´æÊ¹ÓÃÁ¿´ïµ½ÉÏÏÞ£¬ÄÇô»á°ÑÏûÏ¢ÒýÓñ£´æµ½ÁÙʱÎļþÖС£
ÎÒÃÇ¿ÉÒÔÔÚactivemq.xml ÖÐÅäÖÃÏûÏ¢·Ö·¢Ö¸ÕëµÄ´æ´¢²ßÂÔ¡£
12.ActiveMQµÄ¼à¿Ø
1.activeMQ×Ô¶¯µÄ¹ÜÀíÕ¾µã
http://localhost:8161/admin
2.Advisory Messages
ActiveMQ Ö§³ÖAdvisory Messages£¬ËüÔÊÐíÎÒÃÇͨ¹ý±ê×¼µÄJMS ÏûÏ¢À´¼à¿ØÏµÍ³.ͨ¹ýËüÎÒÃÇ¿ÉÒԵõ½¹Ø
ÓÚJMS provider¡¢producers¡¢consumersºÍdestinationsµÄÐÅÏ¢¡£
3.QueueBrowser
ʹÓÃQueueBrowserµÄÏûÏ¢Ô¤ÀÀ£¬±à³ÌÌṩ¼à¿Ø½Ó¿Ú¡£
13.ActiveMQÅäÖÃÁ¬½ÓURI
1.ÅäÖÃJMSÁ¬½Ó×î´óÏÐÖÃʱ¼ä(ÏûÏ¢·þÎñÆ÷ÎÞÏûÏ¢)
jmsBrokerURL = tcp://218.241.100.165:61616? wireFormat.maxInactivityDuration =90000
¸ÃwireFormat.maxInactivityDuration = 90000µÄĬÈÏÖµÊÇ30000ms
wireFormat.maxInactivity Duration=0 ÕâÑùµÄ²ÎÊý£¬ wireFormat.maxInactivity DurationÊÇÐÄÌø²ÎÊý¡£
±ÜÃâActiveMQÔÚÒ»¶Îʱ¼äûÓÐÏûÏ¢·¢ËÍʱÅ׳ö ¡°Channel was inactive for
too long¡±Òì³£¡£
2.maxReconnectDelay ×î´óÖØÁ¬¼ä¸ô
failover: (tcp://127.0.0.1:61616? wireFormat.maxInactivityDuration =10000); maxReconnectDelay=10000
failover:(tcp://localhost:61616, tcp://remotehost:61616) ?initialReconnectDelay=100
failover ʧЧ±¸Ô®
maxReconnectDelay=10000 ×î´óÖØÁ¬¼ä¸ô
3.ÉèÖÃÒì²½·¢ËÍÏûÏ¢
tcp://localhost:61616 ?jms.useAsyncSend=true
tcp://localhost:61616 ?jms.prefetchPolicy.all =100&jms.redeliveryPolicy .maximumRedeliveries=5
4.¿Í»§¶ËÏûÏ¢»º´æµÄÊýÁ¿
tcp://localhost:61616? jms.prefetchPolicy.all =50 ##ÉèÖÿͻ§¶Ë×î¶à»º´æ50ÌõÏûÏ¢
5.¿Í»§¶ËµÄÔ¤Ö§È¡²ßÂÔ¡£
tcp://localhost:61616? jms.prefetchPolicy .queuePrefetch =1
14.ActiveMQÎȶ¨ÐÔºÍÈÝ´íÐÔ¿¼ÂÇ
1.±£ÕÏJmsÁ¬½Ó
ʹÓÃʧЧ±¸Ô®»úÖÆ£¬ºÍ¼ä¸ô×Ô¶¯ÖØÊÔ»úÖÆ£¬³ÌÐò¿ØÖƵȷ½ÃæÀ´¿ØÖÆ¡£
failover: (tcp://localhost:61616) ?initialReconnectDelay =100&; maxReconnectAttempts=5
failover transportÊÇÒ»ÖÖÖØÐÂÁ¬½Ó»úÖÆ£¬ÓÃÓÚ½¨Á¢¿É¿¿µÄ´«Êä¡£´Ë´¦ÅäÖõÄÊÇÒ»µ©ActiveMQ
brokerÖжϣ¬Listener¶Ë½«Ã¿¸ô100ms×Ô¶¯³¢ÊÔÁ¬½Ó£¬Ö±ÖÁ³É¹¦Á¬½Ó»òÖØÊÔ5´ÎÁ¬½Óʧ°ÜΪֹ¡£
failover»¹Ö§³Ö¶à¸öborkerͬʱÌṩ·þÎñ£¬ÊµÏÖ¸ºÔؾùºâµÄͬʱ¿ÉÔö¼ÓϵͳÈÝ´íÐÔ£¬¸ñʽ£ºfailover:(uri1,¡,uriN)?transportOptions
failover:// (tcp://masterhost:61616, tcp://slavehost:61616) ?randomize =false
failover:(uri1,¡,uriN) ?transportOptions
failover:uri1,¡,uriN
failover:(tcp://localhost:61616)
2.JMSRedelivered
Èç¹ûÕâ¸öֵΪtrue£¬±íʾÏûÏ¢ÊDZ»ÖØÐ·¢ËÍÁË¡£ÒòΪÓÐʱÏû·ÑÕßûÓÐÈ·ÈÏËûÒѾÊÕµ½ÏûÏ¢»òÕßJMSÌṩÕß²»È·¶¨Ïû·ÑÕßÊÇ·ñÒѾÊÕµ½¡£
3.JMSExpiration
ÔÊÐíÏûÏ¢¹ýÆÚ£¬ setTimeToLive()ÉèÖÃÏûÏ¢µÄÓÐЧÆÚ¡£
15.ActiveMQµÄfailOverÖØÁ¬»úÖÆ
¡°failover:(tcp://IPAddress1:61616, tcp://IPAddress1:61616) ?initialReconnectDelay =100&maxReconnectAttempts =5¡±;
ºóÃæµÄ²ÎÊýinitialReconnectDelay= 100& maxReconnectAttempts
=5¡°¶Ôÿһ¸öÁ¬½ÓURIÊÇͨÓõġ£
Èç¹ûûÓÐÖ¸¶¨URIµÄ»ñÈ¡·½Ê½£¬activeMQ»á×Ô¶¯Ñ¡ÔñÆäÖеÄÒ»¸öURIÀ´³¢ÊÔ½¨Á¢Á¬½Ó£¨randomize
Ö¸¶¨Ëæ»ú£©£¬»ñÈ¡Á¬½Óºó£¬ActiveMQ»áά»¤Á¬½ÓµÄÔÝÍ£ºÍ»Ö¸´¡£
ÒÔÉÏÃæµÄURLΪÀý,˵Ã÷failOverµÄÖØÁ¬»úÖÆ£º
a. IPAddress1, IPAddress2ÉϵÄbroker1,broker2¶¼Õý³£ÔËÐУ¬´´½¨µÄConnection»áʹÓÃIPAddress1µÄbroker1À´·¢ËÍÏûÏ¢£¬Õâʱ²»¼¤»îÏû·ÑÕß¡£
b.¹Ø±Õbroker1£¬Connection»á×Ô¶¯Çл»µ½broker2µÄURIÉÏÀ´·¢ËÍÏûÏ¢¡£
c. ¼¤»îÏû·ÑÕߣ¬Ïû·ÑÕß»áÏȳ¢ÊÔbroker1£¬ÓÉÓÚbroker1²»¿ÉÓã¬Ê¹ÓÃbroker2À´ÊÕÏûÏ¢£¬
ÕâʱֻÄÜÊÕµ½broker2ÉϵÄÏûÏ¢¡£
d.ÔÙÖØÐÂÆô¶¯broker1£¬Éú²úÕߣ¬ºÍÏû·ÑÕß¶¼ÈÔȻʹÓÃbroker2À´·¢ËͺͽÓÊÜÏûÏ¢¡£
e. ¹Ø±Õbroker2£¬Éú²úÕߺÍÏû·ÑÕß¶¼»á×Ô¶¯Çл»µ½broker1ÉÏ£¬Ïû·ÑÕß¾ÍÊÕµ½Ö®Ç°broker·¢Ë͵ÄÏûÏ¢ÁË¡£

16.ActiveMQ°²È«¹ÜÀí
1.±à³ÌʽʵÏÖ
ͨ¹ýActiveMQÌṩµÄʵÏÖÌí¼ÓÏûÏ¢Óû§µÄȨÏÞ£¨ÓÉSimpleAuthenticationPluginÀàʵÏÖ£©¡£
2.ÅäÖÃʵÏÖ
ÅäÖÃmq·ÃÎÊÕßÐÅÏ¢£¬ activemq°²×°Ä¿Â¼ÏÂ/conf/credentials.propertiesȨÏÞ¹ÜÀí
£¬ ÔÚ${ACTIVEMQ_HOME} /conf/activemq.xml ÖÐÅäÖÃ
< plugins
>
< simpleAuthenticationPlugin >
< /simpleAuthenticationPlugin >
< authorizationPlugin >
< /authorizationPlugin >
< /plugins > |
17.µ÷ÕûTCP´«ÊäÉèÖÃ
TCP´«ÊäÊÇactiveMQ×î³£ÓõĴ«Ê䷽ʽ¡£ÆäÖÐsocketBufferSizeºÍtcpNoDelay¶Ô´«ÊäÐÔÄÜÓнϴóµÄÓ°Ïì¡£
socketBufferSize ͨ¹ýtcp´«Êä·¢ËͺͽÓÊÜÊý¾ÝµÄ»º³åÇø´óС£¬Ä¬ÈÏ£¨ 65536 bytes£©
tcpNoDelay - ĬÈÏΪfalse¡£Í¨³£Ò»¸öTCP socket»º³åÇø´´½¨Ð¡µÄÊý¾ÝÔÚ·¢ËÍ֮ǰ¡£ÆôÓôËÑ¡Ïî
- ÏûÏ¢½«±»¾¡¿ì·¢ËÍ¡£
url = ¡°failover:// (tcp://localhost:61616? tcpNoDelay=true)¡±;
18.ActiveMQ¼¯Èº²¿Êð
1.¶à¸öÏûÏ¢ÌṩÕß
ʹÓÃNetwork of brokers£¬ÒÔ±ãÔÚbrokerÖ®¼ä´æ´¢×ª·¢ÏûÏ¢¡£
2.¶à¸öÏûÏ¢Ïû·ÑÕß
ActiveMQÖ§³Ö¶©ÔÄͬһ¸öqueueµÄconsumersÉϵļ¯Èº¡£Èç¹ûÒ»¸öconsumerʧЧ£¬ÄÇôËùÓÐδ±»È·ÈÏ£¨unacknowledged£©µÄÏûÏ¢¶¼»á±»·¢Ë͵½Õâ¸öqueueÉÏÆäËüµÄconsumers¡£Èç¹ûij¸öconsumerµÄ´¦ÀíËÙ¶È±ÈÆäËüconsumers¸ü¿ì£¬ÄÇôÕâ¸öconsumer¾Í»áÏû·Ñ¸ü¶àµÄÏûÏ¢¡£

Master/salve Server
1.Ö÷¸¨·þÎñÆ÷µÄ×÷ÓÃ
Ö÷¸¨·þÎñÆ÷£ºÌṩÏûÏ¢·þÎñ¡£
¸¨·þÎñÆ÷£ºÌṩÏûÏ¢µÄ±¸·Ý£¬·þÎñµÄ±¸·Ý¡£
2.Pure Master SlaveµÄ¹¤×÷·½Ê½
A£©·þÎñ¶Ë£º
Slave brokerÏû·Ñmaster brokerÉÏËùÓеÄÏûϢ״̬£¬ÀýÈçÏûÏ¢¡¢È·ÈϺÍÊÂÎñ״̬µÈ¡£
Slave broker²»ÌṩÏûÏ¢·þÎñ¡£
Master brokerÖ»ÓÐÔÚÏûÏ¢³É¹¦±»¸´ÖƵ½slave brokerÖ®ºó²Å»áÏìÓ¦¿Í»§¡£
master brokerʧЧµÄʱºò£¬slave broker¿ÉÒÔÆô¶¯network connectorsºÍtransport
connectors£¬ÌṩÏûÏ¢·þÎñ£¬Ò²¿ÉÒÔ¸ú×ÅÍ£Ö¹¡£
B£©¿Í»§¶Ë£º
ʹÓÃfailoverµÄ»úÖÆ uri = ¡°failover:// (tcp://masterhost:
61616 , tcp://slavehost:61616) ?randomize=false ¡±;
3.ÅäÖÃ
Master broker²»ÐèÒªÌØÊâµÄÅäÖá£
Slave brokerÐèÒª½øÐÐÒÔÏÂÅäÖãº
<broker masterConnectorURI
= "tcp: // masterhost:62001" shut downOnMasterFailure
= "false"> |
4.ÏÞÖÆ
Ö»ÄÜÓÐÒ»¸öslave brokerÁ¬½Óµ½master broker¡£
master brokerʧЧ¶øµ¼ÖÂslave broker³ÉΪmasterÖ®ºó£¬Ö®Ç°µÄmaster
brokerÖ»ÓÐÔÚµ±Ç°µÄmaster broker£¨Ôslave broker£©Í£Ö¹ºó²ÅÄÜÖØÐÂÉúЧ¡£
19.SpringºÍActiveMQµÄ½áºÏ
ʹÓÃspring¶ÔjmsµÄÖ§³Ö£¬ÅäÖÃjmsµÄ¸÷¸ö×é¼þ
1 ÅäÖÃjmsÁ¬½Ó¹¤³§
<amq:connectionFactory
id = "jmsConnectionFactory " brokerURL
= "vm : // localhost " / > |
2 ÅäÖÃÏûÏ¢¶ÓÁÐ
< amq : queue
name = " destination" physicalName = ¡°queuename"
/> |
3 ÅäÖÃÏûÏ¢¼àÌýÆ÷
<bean id
= "messageListener"
class = "org . springframework .jms.listener
.adapter .MessageListenerAdapter ">
<constructor-arg>
<bean class = ¡°Àà·¾¶"> </bean>
</constructor - arg>
<!-- ÅäÖüàÌýµ½jms·½·¨ºóµ÷ÓõÄÖ´Ðз½·¨ -->
<property name = "defaultListenerMethod"
value = "printMyOut" /> <!--
custom MessageConverter define -->
<property name = "messageConverter"
ref = "invokeMessageConverter" /
</bean> |
4 ÅäÖÃÏûÏ¢¼àÌýÈÝÆ÷
<bean id
= "listenerContainer"
class="org.springframework .jms .listener.
DefaultMessageListenerContainer ">
<property name = "connectionFactory"
ref = "jmsConnectionFactory" />
<property name ="destination" ref="destination"
/>
<property name ="messageListener"
ref="messageListener" />
</bean> |
5 ÅäÖÃjmsÏûϢת»»Æ÷
<bean id
= "invokeMessageConverter" class = "com.components
.jms .InvokeMessageConverter" /> |
6.ÅäÖÃSpringµÄjms´¦ÀíÄ£°æÀࣨjmsTemplte£©
<bean id
= "jmsTemplate" class = "org.springframework
.jms .core .JmsTemplate">
<property name = "connectionFactory">
<ref local = "jmsFactory" />
</property>
<property name = "defaultDestinationName"
value = " subject" /> <!--
Çø±ðËü²ÉÓõÄģʽΪfalseÊÇp2pΪtrueÊǶ©ÔÄ
<property name = "pubSubDomain" value="true"/>
--> <!-- custom MessageConverter -->
<property name = "messageConverter"
ref = "invokeMessageConverter" />
</bean> |
7.ÏûÏ¢Éú²úÕߺÍÏûÏ¢Ïû·ÑÕß
¸Ã²¿·Ö¿ÉÒÔ¸ù¾ÝÒµÎñÐèÒª£¬Óû§×Ô¼º±à³ÌʵÏÖ¡£ |