±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½²½âÁËÏûÏ¢Öмä¼þÓ¦Óó¡¾°¡¢ÏûÏ¢Öмä¼þµÄ´«ÊäģʽÒÔ¼°ActiveMQ ÈëÃŵÈÏà¹ØÄÚÈÝ¡£
À´×ÔÓÚ²©¿ÍÔ°,ÓÉ»ðÁú¹ûÈí¼þAnna±à¼¡¢ÍƼö¡£ |
|
ÖÚËùÖÜÖª£¬ÏûÏ¢Öмä¼þÊÇ´óÐÍ·Ö²¼Ê½ÏµÍ³Öв»¿É»òȱµÄÖØÒª×é¼þ¡£ËüʹÓüòµ¥£¬È´½â¾öÁ˲»ÉÙÄÑÌ⣬±ÈÈçÒì²½´¦Àí£¬ÏµÍ³ÅººÏ£¬Á÷Á¿Ï÷·æ£¬·Ö²¼Ê½ÊÂÎñ¹ÜÀíµÈ¡£ÊµÏÖÁËÒ»¸ö¸ßÐÔÄÜ£¬¸ß¿ÉÓ㬸ßÀ©Õ¹µÄϵͳ¡£±¾ÕÂͨ¹ý½éÉÜÏûÏ¢Öмä¼þµÄÓ¦Óó¡¾°£¬ÏûÏ¢Öмä¼þµÄ´«Êäģʽ£¬ActiveMQ¿ìËÙÈëÃÅ
Èý¸ö·½ÃæÀ´¶ÔÏûÏ¢Öмä¼þ½øÐÐÈëÃŽéÉÜ¡£»¹ÔÚµÈʲô£¬¸Ï¿ìÀ´Ñ§Ï°°É£¡
˵Ã÷£ºÏûÏ¢Öмä¼þ·Ç³£Ç¿´ó£¬ÖµµÃÎÒÃÇÈÏÕæÈ¥Ñ§Ï°ºÍʹÓá£ÍêÕû´úÂëÇëÒì²½github¡£
¼¼Êõ£ºÏûÏ¢Öмä¼þµÄÓ¦Óó¡¾°£¬Í¨ÐÅģʽ£¬ActiveMQ¡£
Ô´Âë
ÎÄÕÂĿ¼½á¹¹£º

ÏûÏ¢Öмä¼þÓ¦Óó¡¾°
Òì²½´¦Àí
Òì²½´¦Àí£ºµ÷ÓÃÕß·¢ÆðÇëÇóºó£¬µ÷ÓÃÕß²»»áÁ¢¿ÌµÃµ½½á¹û£¬Ò²ÎÞÐèµÈ´ý½á¹û£¬¼ÌÐøÖ´ÐÐÆäËûÒµÎñÂß¼¡£Ìá¸ßÁËЧÂʵ«´æÔÚÒì²½ÇëÇóʧ°ÜµÄÒþ»¼£¬ÊÊÓÃÓڷǺËÐÄÒµÎñÂß¼´¦Àí¡£
ͬ²½´¦Àí£ºµ÷ÓÃÕß·¢ÆðÇëÇóºó£¬µ÷ÓÃÕß±ØÐëµÈ´ýÖ±µ½·µ»Ø½á¹û£¬ÔÙ¸ù¾Ý·µ»ØµÄ½á¹ûÖ´ÐÐÆäËûÒµÎñÂß¼¡£Ð§ÂÊËäȻûÓÐÒì²½´¦Àí¸ß£¬µ«Äܱ£Ö¤ÒµÎñÂß¼¿É¿ØÐÔ£¬ÊÊÓÃÓÚºËÐÄÒµÎñÂß¼´¦Àí¡£
¾ÙÒ»¸ö±È½Ï³£¼ûµÄÓ¦Óó¡¾°£ºÎªÁËÈ·±£×¢²áÓû§µÄÕæÊµÐÔ£¬Ò»°ãÔÚ×¢²á³É¹¦ºó»á·¢ËÍÑéÖ¤Óʼþ»òÕßÑéÖ¤Âë¶ÌÐÅ£¬Ö»ÓÐÈÏÖ¤³É¹¦µÄÓû§²ÅÄÜÕý³£Ê¹ÓÃÆ½Ì¨¹¦ÄÜ¡£
ÈçÏÂͼËùʾ£ºÍ¬²½´¦ÀíºÍÒì²½´¦ÀíµÄ±È½Ï¡£

ÓÃÏûÏ¢Öмä¼þʵÏÖÒì²½´¦ÀíµÄºÃ´¦£º
Ò»¡¢ÔÚ´«Í³µÄϵͳ¼Ü¹¹£¬Óû§´Ó×¢²áµ½Ìø×ª³É¹¦Ò³Ã棬ÖмäÐèÒªµÈ´ýÓʼþ·¢Ë͵ÄÒµÎñÂß¼ºÄʱ¡£Õâ²»½öÓ°ÏìϵͳÏìӦʱ¼ä£¬½µµÍÁËCPUÍÌÍÂÁ¿£¬Í¬Ê±»¹Ó°ÏìÁËÓû§µÄÌåÑé¡£
¶þ¡¢Í¨¹ýÏûÏ¢Öмä¼þ½«Óʼþ·¢Ë͵ÄÒµÎñÂß¼Òì²½´¦Àí£¬Óû§×¢²á³É¹¦ºó·¢ËÍÊý¾Ýµ½ÏûÏ¢Öмä¼þ£¬ÔÙÌø×ª³É¹¦Ò³Ã棬Óʼþ·¢Ë͵ÄÂß¼ÔÙÓɶ©ÔĸÃÏûÏ¢Öмä¼þµÄÆäËûϵͳ¸ºÔð´¦Àí£¬
Èý¡¢ÏûÏ¢Öмä¼þµÄ¶ÁдËٶȷdz£µÄ¿ì£¬ÆäÖеĺÄʱ¿ÉÒÔºöÂÔ²»¼Æ¡£Í¨¹ýÏûÏ¢Öмä¼þ¿ÉÒÔ´¦Àí¸ü¶àµÄÇëÇó¡£
С½á£ºÕýȷʹÓÃÏûÏ¢Öмä¼þ½«·ÇºËÐÄÒµÎñÂß¼¹¦ÄÜÒì²½´¦Àí£¬¿ÉÒÔÌá¸ßϵͳµÄÏìӦЧÂÊ£¬Ìá¸ßÁËCPUµÄÍÌÍÂÁ¿£¬¸ÄÉÆÓû§µÄÌåÑé¡£
ϵͳźºÏºÍÊÂÎñµÄ×îÖÕÒ»ÖÂÐÔ
·Ö²¼Ê½ÏµÍ³ÊÇÈô¸É¸ö¶ÀÁ¢µÄ¼ÆËã»ú£¨ÏµÍ³£©¼¯ºÏ¡£Ã¿¸ö¼ÆËã»ú¸ºÔð×Ô¼ºµÄÄ£¿é£¬ÊµÏÖϵͳµÄ½âñҲ±ÜÃâµ¥µã¹ÊÕ϶ÔÕû¸öϵͳµÄÓ°Ï졣ÿ¸öϵͳ»¹¿ÉÒÔ×öÒ»¸ö¼¯Èº£¬½øÒ»²½½µµÍ¹ÊÕϵķ¢Éú¸ÅÂÊ¡£
ÔÚÕâÑùµÄ·Ö²¼Ê½ÏµÍ³ÖУ¬ÏûÏ¢Öмä¼þÓÖ°çÑÝ×ÅʲôÑùµÄ½ÇɫĨ£¿
¾ÙÒ»¸ö±È½Ï³£¼ûµÄÓ¦Óó¡¾°£º¶©µ¥ÏµÍ³Ïµ¥³É¹¦ºó£¬ÐèÒªµ÷Óòֿâϵͳ½Ó¿Ú£¬Ñ¡Ôñ×îÓŵķ¢»õ²Ö¿âºÍ¸üÐÂÉÌÆ·¿â´æ¡£ÈôÒòΪijÖÖÔÒòÔÚµ÷Óòֿâϵͳ½Ó¿Úʧ°Ü£¬»áÖ±½ÓÓ°Ï쵽ϵ¥Á÷³Ì¡£
ÈçÏÂͼËùʾ£º¸ÐÊÜÒ»ÏÂÏûÏ¢Öмä¼þ°çÑݵÄÖØÒª½ÇÉ«¡£

ϵͳ½âñî
ÓÃÏûÏ¢Öмä¼þʵÏÖϵͳźºÏµÄºÃ´¦£º
Ò»¡¢ÏûÏ¢Öмä¼þ¿ÉÒÔÈø÷ϵͳ֮¼äñîºÏÐÔ½µµÍ£¬²»»áÒòΪÆäËûϵͳµÄÒì³£Ó°Ïìµ½×ÔÉíÒµÎñÂß¼¡£¸÷¾¡ÆäÖ°£¬¶©µ¥ÏµÍ³Ö»Ð踺Ô𽫶©µ¥Êý¾Ý³Ö¾Ã»¯µ½Êý¾Ý¿âÖУ¬²Ö¿âϵͳֻÐ踺Ôð¸üÐÂ¿â´æ£¬²»»áÒòΪ²Ö¿âϵͳµÄÔÒò´Ó¶øÓ°Ï쵽ϵ¥µÄÁ÷³Ì¡£
¶þ¡¢¸÷λ¿´¹ÙÊÇ·ñ·¢ÏÖÁËÒ»¸öÎÊÌ⣬ϵ¥ºÍ¿â´æ¼õÉÙ±¾Ó¦¸ÃÊÇÒ»¸öÊÂÎñ¡£ÒòΪ·Ö²¼Ê½µÄÔÒòºÜÄѱ£Ö¤ÊÂÎñµÄǿһÖÂÐÔ¡£ÕâÀïͨ¹ýÏûÏ¢Öмä¼þʵÏÖÊÂÎñµÄ×îÖÕÒ»ÖÂÐÔЧ¹û(ºóÐø»áÏêϸ½éÉÜ)¡£
С½á£ºÊÂÎñµÄÒ»ÖÂÐÔ¹ÌÈ»ÖØÒª£¬Ã»ÓÐ¿â´æ»áµ¼ÖÂϵ¥Ê§°ÜÊÇÒ»¸öÀíÂÛÉϺÜÕý³£µÄÂß¼¡£µ«Êµ¼ÊÒµÎñÖв¢·ÇÈç´Ë£¬ÎÒÃÇÍêÈ«¿ÉÒÔÀûÓ÷¢»õÆÚͨ¹ý²É¹º»òÕß½è¿âµÄ·½Ê½À´Ôö¼Ó¿â´æ¡£ÕâÑùÎÞÒÉ¿ÉÒÔÔö¼ÓÏúÁ¿£¬»¹ÊÇ¿ÉÒÔ±£Ö¤ÊÂÎñµÄ×îÖÕÒ»ÖÂÐÔ¡£
Á÷Á¿Ï÷·æ
Á÷Á¿Ï÷·æÒ²³ÆÏÞÁ÷¡£ÔÚÃëɱ£¬ÇÀ¹ºµÄ»î¶¯ÖУ¬ÎªÁ˲»Ó°ÏìÕû¸öϵͳµÄÕý³£Ê¹Óã¬Ò»°ã»áͨ¹ýÏûÏ¢Öмä¼þ×öÏÞÁ÷£¬±ÜÃâÁ÷Á¿Í»Ôöѹ¿åϵͳ£¬Ç°¶ËÒ³Ãæ¿ÉÒÔÌáʾ"ÅŶӵȴý"£¬¼´±ãÓû§ÌåÑéºÜ²î£¬Ò²²»ÄÜÈÃϵͳ¿åµô¡£

С½á£ºÏÞÁ÷¿ÉÒÔÔÚÁ÷Á¿Í»ÔöµÄÇé¿öϱ£ÕÏϵͳµÄÎȶ¨¡£ÏµÍ³å´»ú»á±»Í¬ÐÐץסЦ±ú¡£
ÏûÏ¢Öмä¼þµÄ´«Êäģʽ
ÏûÏ¢Öмä¼þ³ýÁËÖ§³Ö¶Ôµã¶ÔºÍ·¢²¼¶©ÔÄÁ½ÖÖģʽÍ⣬ÔÚʵ¼Ê¿ª·¢Öл¹ÓÐÒ»ÖÖË«ÏòÓ¦´ðģʽ±»¹ã·ºÊ¹Óá£
µã¶Ôµã(p2p)ģʽ
µã¶Ôµã(p2p)ģʽÓÐÈý¸ö½ÇÉ«£ºÏûÏ¢¶ÓÁУ¨Queue£©£¬·¢ËÍÕß(Sender)£¬½ÓÊÕÕß(Receiver)¡£·¢ËÍÕß½«ÏûÏ¢·¢Ë͵½Ò»¸öÌØ¶¨µÄ¶ÓÁÐÖУ¬µÈ´ý½ÓÊÕÕß´Ó¶ÓÁÐÖлñÈ¡ÏûÏ¢ÏûºÄ¡£
P2PµÄÈý¸öÌØµã£º
Ò»¡¢Ã¿¸öÏûÏ¢Ö»Äܱ»Ò»¸ö½ÓÊÕÕßÏû·Ñ£¬ÇÒÏûÏ¢±»Ïû·ÑºóĬÈÏ´Ó¶ÓÁÐÖÐɾµô£¨Ò²¿ÉÒÔͨ¹ýÆäËûÇ©ÊÕ»úÖÆÖØ¸´Ïû·Ñ£©¡£
¶þ¡¢·¢ËÍÕߺͽÓÊÕÕßÖ®¼äûÓÐÒÀÀµÐÔ£¬Éú²úÕß·¢ËÍÏûÏ¢ºÍÏû·ÑÕß½ÓÊÕÏûÏ¢²¢²»ÒªÇóͬʱÔËÐС£
Èý¡¢½ÓÊÕÕßÔڳɹ¦½ÓÊÕÏûÏ¢Ö®ºóÐèÏò¶ÓÁз¢ËͽÓÊճɹ¦µÄÈ·ÈÏÏûÏ¢¡£

·¢²¼¶©ÔÄ(Pub/Sub)ģʽ
·¢²¼¶©ÔÄ(Pub/Sub)ģʽҲÓÐÈý¸ö½ÇÉ«£ºÖ÷Ì⣨Topic£©£¬·¢²¼Õߣ¨Publisher£©£¬¶©ÔÄÕߣ¨Subscriber£©¡£·¢²¼Õß½«ÏûÏ¢·¢Ë͵½Ö÷Ìâ¶ÓÁÐÖУ¬ÏµÍ³ÔÙ½«ÕâЩÏûÏ¢´«µÝ¸ø¶©ÔÄÕß¡£
Pub/SubµÄÌØµã£º
Ò»¡¢Ã¿¸öÏûÏ¢¿ÉÒÔ±»¶à¸ö¶©ÔÄÕßÏû·Ñ¡£
¶þ¡¢·¢²¼ÕߺͶ©ÔÄÕßÖ®¼ä´æÔÚÒÀÀµÐÔ¡£¶©ÔÄÕß±ØÐëÏȶ©ÔÄÖ÷Ìâºó²ÅÄܽÓÊÕµ½ÐÅÏ¢£¬ÔÚ¶©ÔÄǰ·¢²¼µÄÏûÏ¢£¬¶©ÔÄÕßÊǽÓÊÕ²»µ½µÄ¡£
Èý¡¢·Ç³Ö¾Ã»¯¶©ÔÄ£ºÈç¹û¶©ÔÄÕß²»ÔÚÏߣ¬´Ëʱ·¢²¼µÄÏûÏ¢¶©ÔÄÕßÊÇÒ²½ÓÊÕ²»µ½£¬¼´±ã¶©ÔÄÕßÖØÐÂÉÏÏßÒ²½ÓÊÕ²»µ½¡£
ËÄ¡¢³Ö¾Ã»¯¶©ÔÄ£º¶©ÔÄÕß¶©ÔÄÖ÷Ìâºó£¬¼´±ã¶©ÔÄÕß²»ÔÚÏߣ¬´Ëʱ·¢²¼µÄÏûÏ¢¿ÉÒÔÔÚ¶©ÔÄÕßÖØÐÂÉÏÏߺó½ÓÊÕµ½µÄ¡£

Ë«ÏòÓ¦´ðģʽ
Ë«ÏòÓ¦´ðģʽ²¢²»ÊÇÏûÏ¢Öмä¼þÌṩµÄÒ»ÖÖͨÐÅģʽ£¬ËüÊÇÓÉÓÚʵ¼ÊÉú³É»·¾³µÄÐèÒª£¬ÔÚÔÓеĻù´¡ÉÏ×öÁ˸ÄÁ¼¡£¼´ÏûÏ¢µÄ·¢ËÍÕßÒ²ÊÇÏûÏ¢µÄ½ÓÊÕÕß¡£ÏûÏ¢µÄ½ÓÊÕÕßÒ²ÊÇÏûÏ¢µÄ·¢ËÍÕß¡£ÈçÏÂͼËùʾ

ActiveMQ ÈëÃÅ
ActiveMQÊÇApache³öÆ·£¬¼òµ¥ºÃÓã¬ÄÜÁ¦Ç¿´ó£¬¿ÉÒÔ´¦Àí´ó²¿·ÖµÄÒµÎñµÄ¿ªÔ´ÏûÏ¢×ÜÏß¡£Í¬Ê±Ò²Ö§³ÖJMS¹æ·¶¡£
JMS£¨JAVA Message Service,javaÏûÏ¢·þÎñ£©APIÊÇÒ»¸öÏûÏ¢·þÎñµÄ±ê×¼»òÕß˵Êǹ淶£¬ÔÊÐíÓ¦ÓóÌÐò×é¼þ»ùÓÚJavaEEƽ̨´´½¨¡¢·¢ËÍ¡¢½ÓÊպͶÁÈ¡ÏûÏ¢¡£Ëüʹ·Ö²¼Ê½Í¨ÐÅñîºÏ¶È¸üµÍ£¬ÏûÏ¢·þÎñ¸ü¼Ó¿É¿¿ÒÔ¼°Òì²½ÐÔ¡£
ActiveMQ °²×°
ActiveMQ µÄ°²×°ºÜ¼òµ¥£¬Èý¸ö²½Ö裺
µÚÒ»²½£ºÏÂÔØ£¬
µÚ¶þ²½£ºÔËÐУ¬Ñ¹Ëõ°ü½âѹºó£¬ÔÚbinĿ¼Ï¸ù¾ÝµçÄÔϵͳλÊýÕÒµ½¶ÔÓ¦µÄwrapper.exe³ÌÐò£¬Ë«»÷ÔËÐС£
µÚÈý²½£º·ÃÎÊ£¬ä¯ÀÀÆ÷·ÃÎÊhttp://localhost:8161/admin£¬Õ˺ÅÃÜÂë¶¼ÊÇadmin¡£
ActiveMQ ¹¤×÷Á÷³Ì
ÎÒÃÇͨ¹ý¼òµ¥µÄP2PģʽÀ´Á˽âActiveMQµÄ¹¤×÷Á÷³Ì¡£
Éú²úÕß·¢ËÍÏûÏ¢¸øMQÖ÷Òª²½Ö裺
µÚÒ»²½£º´´½¨Á¬½Ó¹¤³§ÊµÀý
µÚ¶þ²½£º´´½¨Á¬½Ó²¢Æô¶¯
µÚÈý²½£º»ñÈ¡²Ù×÷ÏûÏ¢µÄ½Ó¿Ú
µÚËIJ½£º´´½¨¶ÓÁУ¬¼´Queue»òÕßTopic
µÚÎå²½£º´´½¨ÏûÏ¢·¢ËÍÕß
µÚÁù²½£º·¢ËÍÏûÏ¢£¬¹Ø±Õ×ÊÔ´
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq .ActiveMQConnection;
import org.apache.activemq .ActiveMQConnectionFactory;
/**
* ÏûÏ¢¶ÓÁÐÉú²úÕß
* @author itdragon
*/
public class ITDragonProducer {
private static final String QUEUE_NAME = "ITDragon.Queue";
public static void main(String[] args) {
// ConnectionFactory: Á¬½Ó¹¤³§,JMS ÓÃËü´´½¨Á¬½Ó
ConnectionFactory connectionFactory = null;
// Connection: ¿Í»§¶ËºÍJMSϵͳ֮¼ä½¨Á¢µÄÁ´½Ó
Connection connection = null;
// Session: Ò»¸ö·¢ËÍ»ò½ÓÊÕÏûÏ¢µÄÏß³Ì ,²Ù×÷ÏûÏ¢µÄ½Ó¿Ú
Session session = null;
// Destination: ÏûÏ¢µÄÄ¿µÄµØ,ÏûÏ¢·¢Ë͸øË
Destination destination = null;
// MessageProducer: ÏûÏ¢Éú²úÕß
MessageProducer producer = null;
try {
// step1 ¹¹ÔìConnectionFactoryʵÀý¶ÔÏó£¬ÐèÒªÌîÈë Óû§Ãû, ÃÜÂë
ÒÔ¼°ÒªÁ¬½ÓµÄµØÖ·£¬Ä¬È϶˿ÚΪ"tcp://localhost:61616"
connectionFactory = new ActiveMQConnectionFactory (ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
// step2 Á¬½Ó¹¤³§´´½¨Á¬½Ó¶ÔÏó
connection = connectionFactory.createConnection();
// step3 Æô¶¯
connection.start();
// step4 »ñÈ¡²Ù×÷Á¬½Ó
/**
* µÚÒ»¸ö²ÎÊý£ºÊÇ·ñÉèÖÃÊÂÎñ true or false¡£ Èç¹ûÉèÖÃÁËtrue£¬µÚ¶þ¸ö²ÎÊýºöÂÔ£¬²¢ÇÒÐèÒªcommit()²ÅÖ´ÐÐ
* µÚ¶þ¸ö²ÎÊý£ºacknowledgeģʽ
* AUTO_ACKNOWLEDGE£º×Ô¶¯È·ÈÏ£¬¿Í»§¶Ë·¢ËͺͽÓÊÕÏûÏ¢²»ÐèÒª×ö¶îÍâµÄ¹¤×÷¡£²»¹ÜÏûÏ¢ÊÇ·ñ±»Õý³£´¦Àí¡£
ĬÈÏ
* CLIENT_ACKNOWLEDGE£º¿Í»§¶ËÈ·ÈÏ¡£¿Í»§¶Ë½ÓÊÕµ½ÏûÏ¢ºó£¬±ØÐëÊÖ¶¯µ÷ÓÃacknowledge·½·¨£¬jms·þÎñÆ÷²Å»áɾ³ýÏûÏ¢¡£
* DUPS_OK_ACKNOWLEDGE£ºÔÊÐíÖØ¸´µÄÈ·ÈÏģʽ¡£
*/
session = connection.createSession (Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// step5 ´´½¨Ò»¸ö¶ÓÁе½Ä¿µÄµØ
destination = session.createQueue(QUEUE_NAME);
// step6 ÔÚÄ¿µÄµØ´´½¨Ò»¸öÉú²úÕß
producer = session.createProducer (destination);
// step7 Éú²úÕßÉèÖò»³Ö¾Ã»¯£¬ÈôÒªÉèÖó־û¯ÔòʹÓà PERSISTENT
producer.setDeliveryMode (DeliveryMode.NON_PERSISTENT);
// step8 Éú²úÕß·¢ËÍÐÅÏ¢£¬¾ßÌåµÄÒµÎñÂß¼
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void sendMessage (Session session,
MessageProducer producer) throws Exception {
for(int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators
[random.nextInt(4)]+ (random.nextInt(10)+1);
TextMessage message = session.createTextMessage (expression);
// ·¢ËÍÏûÏ¢µ½Ä¿µÄµØ·½
producer.send(message);
System.out.println ("Queue Sender --------->
" + expression);
}
}
} |
Ïû·ÑÕß´ÓMQÖлñÈ¡Êý¾ÝÏû·Ñ²½ÖèºÍÉÏÃæÀàËÆ£¬Ö»Êǽ«·¢ËÍÏûÏ¢¸Ä³ÉÁ˽ÓÊÕÏûÏ¢¡£
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq .ActiveMQConnectionFactory;
import com.itdragon.utils .ITDragonUtil;
/**
* ÏûÏ¢¶ÓÁÐÏû·ÑÕß
* @author itdragon
*/
public class ITDragonConsumer {
private static final String QUEUE_NAME = "ITDragon.Queue";
// ÒªºÍSenderÒ»ÖÂ
public static void main (String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
// MessageConsumer: ÐÅÏ¢Ïû·ÑÕß
MessageConsumer consumer = null;
try {
connectionFactory = new ActiveMQConnectionFactory
(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession (Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue (QUEUE_NAME);
consumer = session.createConsumer (destination);
// ²»¶ÏµØ½ÓÊÕÐÅÏ¢£¬Ö±µ½Ã»ÓÐΪֹ
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.print (ITDragonUtil.cal (message.getText()));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
} |
SpringBoot ÕûºÏActiveMQʹÓÃ
SpringBoot¿ÉÒÔ°ïÖúÎÒÃÇ¿ìËٴÏîÄ¿£¬¼õÉÙSpringÕûºÏµÚÈý·½ÅäÖõľ«Á¦¡£SpringBootÕûºÏActiveMQÒ²ÊǷdz£¼òµ¥£¬Ö»ÐèÒª¼òµ¥µÄÁ½¸ö²½Ö裺
µÚÒ»²½£¬ÔÚpom.xmlÎļþÖÐÌí¼ÓÒÀÀµÊ¹ÆäÖ§³ÖActiveMQ
µÚ¶þ²½£¬ÔÚapplication.propertiesÎļþÖÐÅäÖÃÁ¬½ÓActiveMQ²ÎÊý
pom.xmlÊÇMavenÏîÄ¿µÄºËÐÄÅäÖÃÎļþ
<dependency>
<!-- Ö§³ÖActiveMQÒÀÀµ -->
<groupId> org.springframework.boot </groupId>
<artifactId> spring-boot-starter-activemq
</artifactId>
</dependency>
<dependency> <!-- Ö§³ÖʹÓÃmqÁ¬½Ó³Ø -->
<groupId> org.apache.activemq </groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
|
application.propertiesÊÇSpringBootÏîÄ¿µÄºËÐIJÎÊýÅäÖÃÎļþ
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url= tcp://localhost:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=true |
spring.activemq.in-memory ĬÈÏֵΪtrue£¬±íʾÎÞÐè°²×°ActiveMQµÄ·þÎñÆ÷£¬Ö±½ÓʹÓÃÄÚ´æ¡£
spring.activemq.pool.enabled ±íʾͨ¹ýÁ¬½Ó³ØµÄ·½Ê½Á¬½Ó¡£
springboot-activemq-producer
springboot-activemq-producer ÏîĿģÄâÉú²úÕßËùÔÚµÄϵͳ£¬Ö§³Öͬʱ·¢Ë͵ã¶ÔµãģʽºÍ·¢²¼¶©ÔÄģʽ¡£
Á½¸öºËÐÄÎļþ£ºÒ»¸öÊÇÏûÏ¢·¢ËÍÀ࣬һ¸öÊǶÓÁÐBean¹ÜÀíÅäÖÃÀà¡£
ÈýÖÖÖ÷Ҫģʽ£ºÒ»¸öÊǶԵã¶Ôģʽ£¬¶ÓÁÐÃûΪ"queue.name"£»Ò»¸öÊÇ·¢²¼¶©ÔÄģʽ£¬Ö÷ÌâÃûΪ"topic.name"£»×îºóÒ»¸öÊÇË«ÏòÓ¦´ðģʽ£¬¶ÓÁÐÃûΪ"response.name"
¡£
import java.util.Random;
import javax.jms.Queue;
import javax.jms.Topic;
import org.springframework .beans.factory.annotation.Autowired;
import org.springframework .jms.annotation.JmsListener;
import org.springframework .jms.core.JmsMessagingTemplate;
import org.springframework .scheduling.annotation.EnableScheduling;
import org.springframework .scheduling.annotation.Scheduled;
import org.springframework .stereotype.Service;
/**
* ÏûÏ¢¶ÓÁÐÉú²úÕß
* @author itdragon
*/
@Service
@EnableScheduling
public class ITDragonProducer {
@Autowired
private JmsMessagingTemplate jmsTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@Autowired
private Queue responseQueue;
/**
* µã¶Ôµã(p2p)ģʽ²âÊÔ
* °üº¬Èý¸ö½ÇÉ«£ºÏûÏ¢¶ÓÁУ¨Queue£©£¬·¢ËÍÕß(Sender)£¬½ÓÊÕÕß(Receiver)¡£
* ·¢ËÍÕß½«ÏûÏ¢·¢Ë͵½Ò»¸öÌØ¶¨µÄ¶ÓÁУ¬¶ÓÁб£Áô×ÅÏûÏ¢£¬Ö±µ½½ÓÊÕÕß´Ó¶ÓÁÐÖлñÈ¡ÏûÏ¢¡£
*/
@Scheduled(fixedDelay = 5000)
public void testP2PMQ(){
for(int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random (System.currentTimeMillis());
String expression = random.nextInt(10)+operators
[random.nextInt(4)]+ (random.nextInt(10)+1);
jmsTemplate.convertAndSend (this.queue, expression);
System.out.println ("Queue Sender --------->
" + expression);
}
}
/**
* ¶©ÔÄ/·¢²¼(Pub/Sub)Ä£Äâ²âÊÔ
* °üº¬Èý¸ö½ÇÉ«£ºÖ÷Ì⣨Topic£©£¬·¢²¼Õߣ¨Publisher£©£¬¶©ÔÄÕߣ¨Subscriber£©
¡£
* ¶à¸ö·¢²¼Õß½«ÏûÏ¢·¢Ë͵½Topic,ϵͳ½«ÕâЩÏûÏ¢´«µÝ¸ø¶à¸ö¶©ÔÄÕß¡£
*/
@Scheduled(fixedDelay = 5000)
public void testPubSubMQ() {
for (int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random (System.currentTimeMillis());
String expression = random.nextInt(10)+ operators[random.nextInt(4)]
+(random.nextInt(10)+1);
jmsTemplate.convertAndSend (this.topic, expression);
System.out.println ("Topic Sender --------->
" + expression);
}
}
/**
* Ë«ÏòÓ¦´ðģʽ²âÊÔ
* P2PºÍPub/SubÊÇMQĬÈÏÌṩµÄÁ½ÖÖģʽ£¬¶øË«ÏòÓ¦´ðģʽÔòÊÇÔÚÔÓеĻù´¡ÉÏ×öÁ˸Ľø¡£·¢ËÍÕß¼ÈÊǽÓÊÕÕߣ¬½ÓÊÕÕßÒ²ÊÇ·¢ËÍÕß¡£
*/
@Scheduled(fixedDelay = 5000)
public void testReceiveResponseMQ(){
for (int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random (System.currentTimeMillis());
String expression = random.nextInt(10)+ operators[random.nextInt(4)]+
(random.nextInt(10)+1);
jmsTemplate.convertAndSend (this.responseQueue,
expression);
}
}
// ½ÓÊÕP2Pģʽ£¬Ïû·ÑÕß·µ»ØµÄÊý¾Ý
@JmsListener (destination = "out.queue")
public void receiveResponse (String message)
{
System.out.println ("Producer Response
Receiver ---------> " + message);
}
} |
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq .command.ActiveMQQueue;
import org.apache.activemq .command.ActiveMQTopic;
import org.springframework .context.annotation.Bean;
import org.springframework .context.annotation.Configuration;
/**
* beanÅäÖùÜÀíÀà
* @author itdragon
*/
@Configuration
public class ActiveMQBeansConfig {
@Bean // ¶¨ÒåÒ»¸öÃû×ÖΪqueue.nameµÄµã¶ÔµãÁжÓ
public Queue queue() {
return new ActiveMQQueue ("queue.name");
}
@Bean // ¶¨ÒåÒ»¸öÃû×ÖΪtopic.nameµÄÖ÷Ìâ¶ÓÁÐ
public Topic topic() {
return new ActiveMQTopic ("topic.name");
}
@Bean // ¶¨ÒåÒ»¸öÃû×ÖΪresponse.nameµÄË«ÏòÓ¦´ð¶ÓÁÐ
public Queue responseQueue() {
return new ActiveMQQueue ("response.name");
}
} |
springboot-activemq-consumer
springboot-activemq-consumer Ä£ÄâÏû·ÑÕßËùÔڵķþÎñÆ÷£¬Ö÷Òª¸ºÔð¼àÌý¶ÓÁÐÏûÏ¢¡£
Á½¸öºËÐÄÎļþ£ºÒ»¸öÊÇÏûÏ¢½ÓÊÕÀ࣬һ¸öÊǼæÈݵã¶ÔµãģʽºÍ·¢²¼¶©ÔÄģʽµÄÁ´½Ó¹¤³§ÅäÖÃÀà¡£
import org.springframework
.jms.annotation.JmsListener;
import org.springframework .messaging.handler.annotation.SendTo;
import org.springframework .stereotype.Service;
import com.itdragon.utils .ITDragonUtil;
/**
* ÏûÏ¢¶ÓÁÐÏû·ÑÕß
* @author itdragon
*/
@Service
public class ITDragonConsumer {
// 1. ¼àÌýÃû×ÖΪ"queue.name"µÄµã¶Ôµã¶ÓÁÐ
@JmsListener(destination = "queue.name",
containerFactory= "queueListenerFactory")
public void receiveQueue (String text) {
System.out.println ("Queue Receiver : "
+ text + " \t ´¦Àí½á¹û : " + ITDragonUtil.cal(text));
}
// 2. ¼àÌýÃû×ÖΪ"topic.name"µÄ·¢²¼¶©ÔĶÓÁÐ
@JmsListener (destination = "topic.name",
containerFactory= "topicListenerFactory")
public void receiveTopicOne(String text) {
System.out.println (Thread.currentThread().getName()
+ " No.1 Topic Receiver : " + text
+ " \t ´¦Àí½á¹û : " + ITDragonUtil.cal(text));
}
// 2. ¼àÌýÃû×ÖΪ"topic.name"µÄ·¢²¼¶©ÔĶÓÁÐ
@JmsListener (destination = "topic.name",
containerFactory= "topicListenerFactory")
public void receiveTopicTwo (String text) {
System.out.println (Thread.currentThread().getName()
+" No.2 Topic Receiver : " + text
+ " \t ´¦Àí½á¹û : " + ITDragonUtil.cal(text));
}
// 3. ¼àÌýÃû×ÖΪ"response.name"µÄ½ÓÊÕÓ¦´ð(Ë«Ïò)¶ÓÁÐ
@JmsListener(destination = "response.name")
@SendTo("out.queue")
public String receiveResponse (String text)
{
System.out.println ("Response Receiver
: " + text + " \t ÕýÔÚ·µ»ØÊý¾Ý......");
return ITDragonUtil.cal (text).toString();
}
} |
import java.util.concurrent.Executors;
import javax.jms.ConnectionFactory;
import org.springframework .context.annotation.Bean;
import org.springframework .context.annotation.Configuration;
import org.springframework .jms.annotation.EnableJms;
import org.springframework .jms.config .DefaultJmsListenerContainerFactory;
import org.springframework.jms .config.JmsListenerContainerFactory;
@Configuration
@EnableJms
public class JmsConfig {
@Bean // ¿ªÆôpub/Subģʽ
public JmsListenerContainerFactory<?>
topicListenerFactory(ConnectionFactory connectionFactory)
{
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean // JMSĬÈÏ¿ªÆôP2Pģʽ
public JmsListenerContainerFactory<?>
queueListenerFactory (ConnectionFactory connectionFactory)
{
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory (connectionFactory);
return factory;
}
} |
×ܽá
ÏûÏ¢Öмä¼þ¿ÉÒÔ½â¾öÒì²½´¦Àí£¬ÏµÍ³½âñÁ÷Á¿Ï÷·æ£¬·Ö²¼Ê½ÏµÍ³ÊÂÎñ¹ÜÀíµÈÎÊÌâ¡£
ÏûÏ¢Öмä¼þĬÈÏÖ§³Öµã¶ÔµãģʽºÍ·¢²¼¶©ÔÄģʽ£¬Êµ¼Ê¹¤×÷Öл¹¿ÉÒÔʹÓÃË«ÏòÓ¦µ±Ä£Ê½¡£
ActiveMQÊÇApache³öÆ·£¬¼òµ¥ºÃÓ㬹¦ÄÜÇ¿´ó£¬¿ÉÒÔ´¦Àí´ó²¿·ÖµÄÒµÎñµÄ¿ªÔ´ÏûÏ¢×ÜÏß¡£
µ½ÕâÀï ÏûÏ¢Öмä¼þÆóÒµ¼¶Ó¦ÓÃʹÓà µÄÎÄÕ¾ÍдÍêÁË¡£Èç¹ûÎÄÕ¶ÔÄãÓаïÖú£¬¿ÉÒÔµã¸ö"ÍÆ¼ö"£¬Ò²¿ÉÒÔ"¹Ø×¢"ÎÒ£¬»ñµÃ¸ü¶à·á¸»µÄ֪ʶ¡£ºóÐø²©¿Í¼Æ»®ÊÇ£ºRocketMQºÍKafkaµÄʹÓã¬ZookeeperºÍÏà¹Ø¼¯ÈºµÄ´î½¨¡£ÈôÎÄÖÐÓÐʲô²»¶Ô»òÕß²»ÑϽ÷µÄµØ·½£¬ÇëÖ¸Õý¡£ |