±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬ÎÄÕÂͨ¹ý½éÉÜActiveMQµÄ°²×°£¬Ê¹Ó㬴µÈµÈ£¬¼òµ¥ÕûÀíÁËActiveMQ¡£
|
|
Ò».±³¾°½éÉÜ
1.1 javaÏûÏ¢·þÎñ:
²»Í¬ÏµÍ³Ö®¼äµÄÐÅÏ¢½»»»£¬ÊÇÎÒÃÇ¿ª·¢ÖбȽϳ£¼ûµÄ³¡¾°£¬±ÈÈçϵͳAÒª°ÑÊý¾Ý·¢Ë͸øÏµÍ³B£¬Õâ¸öÎÊÌâÎÒÃÇÓ¦¸ÃÈçºÎÈ¥´¦Àí£¿
1999Ä꣬ÔÀ´µÄSUN¹«Ë¾ÁìÏÎÌá³öÁËÒ»ÖÖÃæÏòÏûÏ¢µÄÖмä¼þ·þÎñ--JMS¹æ·¶£¨±ê×¼£©£»³£Óõļ¸ÖÖÐÅÏ¢½»»¥¼¼Êõ(httpClient¡¢hessian¡¢dubbo¡¢jms¡¢webservice
ÎåÖÖ).
1.2JMS¸ÅÊö:
JMS¼´JavaÏûÏ¢·þÎñ£¨Java Message ServiceµÄ¼ò³Æ£©£¬ÊÇJava EE µÄ±ê×¼/¹æ·¶Ö®Ò»¡£ÕâÖֹ淶£¨±ê×¼£©Ö¸³ö£ºÏûÏ¢µÄ·¢ËÍÓ¦¸ÃÊÇÒì²½µÄ¡¢·Ç×èÈûµÄ¡£Ò²¾ÍÊÇ˵ÏûÏ¢µÄ·¢ËÍÕß·¢ËÍÍêÏûÏ¢ºó¾ÍÖ±½Ó·µ»ØÁË£¬²»ÐèÒªµÈ´ý½ÓÊÕÕß·µ»Øºó²ÅÄÜ·µ»Ø£¬·¢ËÍÕߺͽÓÊÕÕß¿ÉÒÔ˵ÊÇ»¥²»Ó°Ïì¡£ËùÒÔÕâÖֹ淶£¨±ê×¼£©Äܹ»¼õÇá»òÏû³ýϵͳƿ¾±£¬ÊµÏÖϵͳ֮¼äÈ¥³ýñîºÏ£¬Ìá¸ßϵͳµÄÕûÌå¿ÉÉìËõÐÔºÍÁé»îÐÔ¡£JMSÖ»ÊÇJava
EEÖж¨ÒåµÄÒ»×é±ê×¼API£¬Ëü×ÔÉí²¢²»ÊÇÒ»¸öÏûÏ¢·þÎñϵͳ£¬ËüÊÇÏûÏ¢´«ËÍ·þÎñµÄÒ»¸ö³éÏó£¬Ò²¾ÍÊÇ˵Ëü¶¨ÒåÁËÏûÏ¢´«Ë͵Ľӿڶø²¢Ã»ÓоßÌåʵÏÖ¡£
1.3ActiveMQ¸ÅÊö:
ÎÒÃÇÖªµÀJMSÖ»ÊÇÏûÏ¢·þÎñµÄÒ»×鹿·¶ºÍ½Ó¿Ú£¬²¢Ã»ÓоßÌåµÄʵÏÖ£¬¶øActiveMQ¾ÍÊÇJMS¹æ·¶µÄ¾ßÌåʵÏÖ£»ËüÊÇApacheϵÄÒ»¸öÏîÄ¿£¬²ÉÓÃJavaÓïÑÔ¿ª·¢£»ÊÇÒ»¿î·Ç³£Á÷ÐеĿªÔ´ÏûÏ¢·þÎñÆ÷.
1.4 ActiveMQÓëJMS¹ØÏµ:
ÎÒÃÇÖªµÀ£¬JMSÖ»ÊǶ¨ÒåÁËÒ»×éÓйØÏûÏ¢´«Ë͵Ĺ淶ºÍ±ê×¼£¬²¢Ã»ÓÐÕæÕýʵÏÖ£¬Ò²¾Í˵JMSÖ»ÊǶ¨ÒåÁËÒ»×é½Ó¿Ú¶øÒÑ£»¾ÍÏñJDBC³éÏóÁ˹ØÏµÊý¾Ý¿â·ÃÎÊ¡¢JPA³éÏóÁ˶ÔÏóÓë¹ØÏµÊý¾Ý¿âÓ³Éä¡¢JNDI³éÏóÁËÃüÃûĿ¼·þÎñ·ÃÎÊÒ»Ñù£¬JMS¾ßÌåµÄʵÏÖÓɲ»Í¬µÄÏûÏ¢Öмä¼þ³§ÉÌÌṩ£¬±ÈÈçApache
ActiveMQ¾ÍÊÇJMS¹æ·¶µÄ¾ßÌåʵÏÖ£¬Apache ActiveMQ²ÅÊÇÒ»¸öÏûÏ¢·þÎñϵͳ£¬¶øJMS²»ÊÇ¡£

¶þ.ActiveMQµÄʹÓÃ
2.1 ActiveMQ»·¾³´î½¨:
1.ActiveMQÔËÐÐÐèÒªJavaµÄÖ§³Ö£¬Ê×ÏÈÐèÒªÅäÖÃJava»·¾³±äÁ¿£»
3.Çл»µ½½âѹºóµÄactivemqµÄbinĿ¼Ï cd / usr
/ local / apache - activemq - 5.15.2 È¥Æô¶¯
3.Çл»µ½binĿ¼Ï£¬Æô¶¯£º./activemq start ;¹Ø±Õ£º
. / activemq stop
4.Æô¶¯ºóÓÐÁ½¸ö¶Ë¿ÚºÅ£¬Ò»¸öÊÇweb¿ØÖÆÌ¨:8161£¬Ò»¸öÊÇÏûÏ¢·þÎñbrokerÁ¬½Ó¶Ë¿Ú£º61616
5.web¹ÜÀí¿ØÖÆÌ¨admin URLµØÖ·£ºhttp : // localhost
: 8161 ĬÈϵǼÕ˺Šadmin ÃÜÂë admin£¬×¢Ò⣺Linux·À»ðǰҪ¹Ø±Õ ;ͨ¹ýÕâ¸öµØÖ·¿ÉÒÔ¼´Ê±·ÃÎʽ»»¥ÐÅÏ¢.ÈçÏÂͼ:

ÏûÏ¢·þÎñbroker URLµØÖ· : tcp : // localhost : 61616
2.2 JavaÏûÏ¢¶ÓÁÐJMSÕûÌåÉè¼Æ½á¹¹
2.2.1 »ù±¾ÒªËØ:1¡¢Éú²úÕßproducer ; 2¡¢Ïû·ÑÕßconsumer
; 3¡¢ÏûÏ¢·þÎñbroker

2.2.1 ½»»¥Ä£ÐÍ:
2.2.3 JMSÁ½ÖÖÏûÏ¢´«ËÍģʽ
µã¶Ôµã£¨ Point-to-Point£©£º×¨ÃÅÓÃÓÚʹÓöÓÁÐQueue´«ËÍÏûÏ¢£»»ùÓÚ¶ÓÁÐQueueµÄµã¶ÔµãÏûÏ¢Ö»Äܱ»Ò»¸öÏû·ÑÕßÏû·Ñ£¬Èç¶à¸öÏû·ÑÕß¶¼×¢²áµ½Í¬Ò»¸öÏûÏ¢¶ÓÁÐÉÏ£¬µ±Éú²úÕß·¢ËÍÒ»ÌõÏûÏ¢ºó£¬¶øÖ»ÓÐÆäÖÐÒ»¸öÏû·ÑÕß»á½ÓÊÕµ½¸ÃÏûÏ¢£¬¶ø²»ÊÇËùÓÐÏû·ÑÕß¶¼ÄܽÓÊÕµ½¸ÃÏûÏ¢¡£
·¢²¼/¶©ÔÄ£¨Publish/Subscribe£©£º×¨ÃÅÓÃÓÚʹÓÃÖ÷ÌâTopic´«ËÍÏûÏ¢¡£»ùÓÚÖ÷ÌâµÄ·¢²¼Óë¶©ÔÄÏûÏ¢Äܱ»¶à¸öÏû·ÑÕßÏû·Ñ£¬Éú²úÕß·¢Ë͵ÄÏûÏ¢£¬ËùÓж©ÔÄÁ˸ÃtopicµÄÏû·ÑÕß¶¼ÄܽÓÊÕµ½¡£
2.3 JavaÏûÏ¢¶ÓÁÐJMS API×ÜÌå¸ÅÀÀ:
2.3.1JMS API ¸ÅÀÀ
JMS API¿ÉÒÔ·ÖΪ3¸öÖ÷Òª²¿·Ö£º
1¡¢¹«¹²API£º¿ÉÓÃÓÚÏòÒ»¸ö¶ÓÁлòÖ÷Ìâ·¢ËÍÏûÏ¢»ò´ÓÆäÖнÓÊÕÏûÏ¢£»
2¡¢µã¶ÔµãAPI£º×¨ÃÅÓÃÓÚʹÓöÓÁÐQueue´«ËÍÏûÏ¢£»
3¡¢·¢²¼/¶©ÔÄAPI£º×¨ÃÅÓÃÓÚʹÓÃÖ÷ÌâTopic´«ËÍÏûÏ¢¡£
JMS¹«¹²API:
ÔÚJMS¹«¹²APIÄÚ²¿£¬ºÍ·¢ËÍÓë½ÓÊÕÏûÏ¢ÓйصÄJMS API½Ó¿ÚÖ÷ÒªÊÇ£ºConnectionFactory
/ Connection / Session / Message / Destination / MessageProducer
/ MessageConsumer . ËüÃǵĹØÏµÊÇ£ºÒ»µ©ÓÐÁËConnectionFactory£¬¾Í¿ÉÒÔ´´½¨Connection£¬Ò»µ©ÓÐÁËConnection£¬¾Í¿ÉÒÔ´´½¨Session£¬¶øÒ»µ©ÓÐÁËSession£¬¾Í¿ÉÒÔ´´½¨
Message ¡¢MessageProducer ºÍ MessageConsumer ¡£
JMSµã¶ÔµãAPI:
µã¶Ôµã£¨p2p£©ÏûÏ¢´«ËÍÄ£ÐÍAPIÊÇÖ¸JMS APIÖ®ÄÚ»ùÓÚ¶ÓÁУ¨Queue£©µÄ½Ó¿Ú£ºQueueConnectionFactory
/ QueueConnection / QueueSession / Message / Queue
/ QueueSender / QueueReceiver .
´Ó½Ó¿ÚµÄÃüÃû¿ÉÒÔ¿´³ö£¬´ó¶àÊý½Ó¿ÚÃû³Æ½ö½öÊÇÔÚ¹«¹²API½Ó¿Ú
Ãû³ÆÖ®Ç°Ìí¼ÓQueueÒ»´Ê¡£Ò»°ãÀ´Ëµ£¬Ê¹Óõã¶ÔµãÏûÏ¢´«ËÍÄ£Ð͵ÄÓ¦ÓóÌÐò½«Ê¹ÓûùÓÚ¶ÓÁеÄAPI£¬¶ø²»Ê¹Óù«¹²API
¡£
JMS·¢²¼/¶©ÔÄAPI:
·¢²¼/¶©ÔÄÏûÏ¢´«ËÍÄ£ÐÍAPIÊÇÖ¸JMS
APIÖ®ÄÚ»ùÓÚÖ÷Ì⣨Topic£©µÄ½Ó¿Ú£ºTopicConnectionFactory
/ TopicConnection /
TopicSession / Message / Topic /
TopicPublisher / TopicSubscriber . ÓÉÓÚ»ùÓÚÖ÷Ì⣨Topic£©µÄJMS
APIÀàËÆÓÚ»ùÓÚ¶ÓÁУ¨Queue£©
µÄAPI£¬Òò´ËÔÚ´ó¶àÊýÇé¿öÏ£¬QueueÕâ¸ö´Ê»áÓÉTopicÈ¡´ú¡£
2.4 ActiveMQµã¶Ôµã·¢ËÍÓë½ÓÊÕÏûϢʾÀý

2.4.1 ¼òµ¥Ê¾Àý:
дһ¸ö²ÉÓÃQueue¶ÓÁз½Ê½µã¶Ôµã·¢ËͽÓÊÕÎı¾ÐÅÏ¢µÄDemo,ÏÈд·¢ËÍÕß,ÈçÏÂ:
package
com.kinglong.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* ÏûÏ¢·¢ËÍÕß
*
*/
public class Sender {
/**ÏûÏ¢·þÎñÆ÷µÄÁ¬½ÓµØÖ·**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Sender sender = new Sender();
sender.sendMessage("Hello ActiveMQ.");
}
/**
* ·¢ËÍÏûÏ¢
*
* @param msg
*/
public void sendMessage (String msg) {
Connection connection = null;
Session session = null;
MessageProducer messageProducer = null;
try {
//1.´´½¨Ò»¸öÁ¬½Ó¹¤³§
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.´´½¨Ò»¸öÁ¬½Ó
connection = connectionFactory.createConnection();
//3.´´½¨Ò»¸öSession
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
//4.´´½¨ÏûÏ¢£¬´Ë´¦´´½¨ÁËÒ»¸öÎı¾ÏûÏ¢
Message message = session.createTextMessage(msg);
//5.´´½¨Ò»¸öÄ¿µÄµØ
Destination destination = session.createQueue("myQueue");
//6.´´½¨Ò»¸öÏûÏ¢µÄÉú²úÕߣ¨·¢ËÍÕߣ©
messageProducer = session.createProducer(destination);
//7.·¢ËÍÏûÏ¢
messageProducer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//¹Ø±ÕÁ¬½ÓÊÍ·Å×ÊÔ´
if (null != messageProducer) {
messageProducer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
} |
ÔÙд½ÓÊÕÕß:
package
com.kinglong.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Receiver {
/**ÏûÏ¢·þÎñÆ÷µÄÁ¬½ÓµØÖ·**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.receiveMessage();
}
/**
* ½ÓÊÕÏûÏ¢
*
*/
public void receiveMessage () {
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//1.´´½¨Ò»¸öÁ¬½Ó¹¤³§
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.´´½¨Ò»¸öÁ¬½Ó
connection = connectionFactory.createConnection();
//3.´´½¨Ò»¸öSession
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
//4.´´½¨Ò»¸öÄ¿µÄµØ
Destination destination = session.createQueue("myQueue");
//5.´´½¨Ò»¸öÏûÏ¢µÄÏû·ÑÕߣ¨½ÓÊÕÕߣ©
messageConsumer = session.createConsumer(destination);
//½ÓÊÕÏûϢ֮ǰ£¬ÐèÒª°ÑÁ¬½ÓÆô¶¯Ò»ÏÂ
connection.start();
//6.½ÓÊÕÏûÏ¢
Message message = messageConsumer.receive();
//ÅжÏÏûÏ¢µÄÀàÐÍ
if (message instanceof TextMessage) { //ÅжÏÊÇ·ñÊÇÎı¾ÏûÏ¢
String text = ((TextMessage) message).getText();
System.out.println("½ÓÊÕµ½µÄÏûÏ¢ÄÚÈÝÊÇ£º" +
text);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//¹Ø±ÕÁ¬½ÓÊÍ·Å×ÊÔ´
if (null != messageConsumer) {
messageConsumer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
} |
×ܽá:
´´½¨¹ý³ÌÖÐÓм¸¸öÖØÒªµÄ×¢Òâµã,˵Ã÷Ò»ÏÂ:
1.
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// ÆäÖÐ:Boolean.FALSE±íʾ±¾´Î»á»°²»¿ªÆôÊÂÎñ¹ÜÀí,¼ÙÈçÐèÒª¿ªÆôÊÂÎñ¹ÜÀí,½«Æä¸ÄΪBoolean.TRUE¼´¿É
//ͬʱÐèÒªÔÚ·¢ËÍÏûÏ¢ºóÌí¼Ósession.commit(),·ñÔò,ÏûÏ¢ÊDz»»á±»Ìá½»µÄ.
//Session.AUTO_ACKNOWLEDGE±íʾÏûϢȷÈÏ»úÖÆ
AUTO_ACKNOWLEDGE:×Ô¶¯È·ÈÏ
CLIENT_ACKNOWLEDGE:¿Í»§¶ËÈ·ÈÏ
SESSION_TRANSACTED:ÊÂÎñÈ·ÈÏ,Èç¹ûʹÓÃÊÂÎñÍÆ¼öʹÓøÃÈ·ÈÏ»úÖÆ
AUTO_ACKNOWLEDGE:ÀÁɢʽȷÈÏ,ÏûϢż¶û²»»á±»È·ÈÏ,Ò²¾ÍÊÇÏûÏ¢¿ÉÄÜ»á±»ÖØ¸´·¢ËÍ.µ«·¢ÉúµÄ¸ÅÂʺÜС
2. connection.start();
//ÔÚÏûÏ¢½ÓÊÕ¶Ë,½ÓÊÜÏûϢǰÐèÒª¼ÓÈëÕâ¶Î´úÂë,¿ªÆôÁ¬½Ó,·ñÔòÒ»ÑùÎÞ·¨»ñÈ¡ÏûÏ¢.
3. Destination destination = session.createQueue("myQueue");
//´´½¨Ä¿µÄµØÊ±,Èç¹û×ö²âÊÔÊÕ²»µ½ÐÅÏ¢,¿ÉÒÔ½«Ä¿µÄµØÃû³ÆÐÞ¸ÄÒ»ÏÂ,ÎÒÓõÄÊÇIDEA,²»Çå³þΪºÎ,
//ÓÐʱºòÊÕ²»µ½ÐÅÏ¢,ÐÞ¸ÄһϾͺÃÁË,²Â²â¿ÉÄÜÊÇ»º´æµÄÔÒò°É |
·¢²¼Óë¶©ÔĵÄtopic·½Ê½Êµ¼ÊÓëµã¶ÔµãµÄqueue·½Ê½,´úÂëͨÓúܶà,Ö»ÊÇÔÚ´´½¨Ä¿µÄµØDestinationʱºò´´½¨Îª
Destination
destination = session.createTopic("myTopic |
2.4.2QueueÓëTopic±È½Ï

2.4.3ÀģʽÓëÍÆÄ£Ê½
a.µã¶ÔµãÏûÏ¢£¬Èç¹ûûÓÐÏû·ÑÕßÔÚ¼àÌý¶ÓÁУ¬ÏûÏ¢½«±£ÁôÔÚ¶ÓÁÐÖУ¬Ö±ÖÁÏûÏ¢Ïû·ÑÕßÁ¬½Óµ½¶ÓÁÐΪֹ¡£ÕâÖÖÏûÏ¢´«µÝÄ£ÐÍÊÇ
´«Í³ÒâÒåÉϵÄÀÁÄ£ÐÍ»òÂÖѯģÐÍ¡£ÔÚ´ËÄ£ÐÍÖУ¬ÏûÏ¢²»ÊÇ×Ô¶¯Íƶ¯¸øÏûÏ¢Ïû·ÑÕߵ쬶øÊÇÒªÓÉÏûÏ¢Ïû·ÑÕß´Ó¶ÓÁÐÖÐÇëÇó»ñµÃ(Àģʽ)¡£
b.pub/subÏûÏ¢´«µÝÄ£ÐÍ»ù±¾ÉÏÊÇÒ»¸öÍÆÄ£ÐÍ¡£ÔÚ¸ÃÄ£ÐÍÖУ¬ÏûÏ¢»á×Ô¶¯¹ã²¥£¬ÏûÏ¢Ïû·ÑÕßÎÞÐëͨ¹ýÖ÷¶¯ÇëÇó»òÂÖѯÖ÷ÌâµÄ·½·¨À´»ñµÃеÄÏûÏ¢¡£
2.5 ActiveMQÏûÏ¢ÀàÐÍ
1¡¢TextMessage Îı¾ÏûÏ¢£ºÐ¯´øÒ»¸öjava.lang.String×÷ΪÓÐЧÊý¾Ý(¸ºÔØ)µÄÏûÏ¢£¬¿ÉÓÃÓÚ×Ö·û´®ÀàÐ͵ÄÐÅÏ¢½»»»£»
2¡¢ObjectMessage ¶ÔÏóÏûÏ¢£ºÐ¯´øÒ»¸ö¿ÉÒÔÐòÁл¯µÄJava¶ÔÏó×÷ΪÓÐЧ¸ºÔصÄÏûÏ¢£¬¿ÉÓÃÓÚJava¶ÔÏóÀàÐ͵ÄÐÅÏ¢½»»»£»
3¡¢MapMessage Ó³ÉäÏûÏ¢£ºÐ¯´øÒ»×é¼üÖµ¶ÔµÄÊý¾Ý×÷ΪÓÐЧ¸ºÔصÄÏûÏ¢£¬ÓÐЧÊý¾ÝÖµ±ØÐëÊÇJavaÔʼÊý¾ÝÀàÐÍ£¨»òÕßËüÃǵİü×°Àࣩ¼°String¡£¼´£ºbyte
, short , int , long , float , double , char , boolean
, String
4¡¢BytesMessage ×Ö½ÚÏûÏ¢ £ºÐ¯´øÒ»×éÔʼÊý¾ÝÀàÐ͵Ä×Ö½ÚÁ÷×÷ΪÓÐЧ¸ºÔصÄÏûÏ¢£»
5¡¢StreamMessage Á÷ÏûÏ¢£ºÐ¯´øÒ»¸öÔʼÊý¾ÝÀàÐÍÁ÷×÷ΪÓÐЧ¸ºÔصÄÏûÏ¢£¬Ëü±£³ÖÁËдÈëÁ÷ʱµÄÊý¾ÝÀàÐÍ£¬Ð´ÈëʲôÀàÐÍ£¬
Ôò¶ÁȡҲÐèÒªÊÇÏàͬµÄÀàÐÍ£»
ÐèҪעÒâµÄÊÇ:Èç¹ûʹÓöÔÏóÏûÏ¢×öDemoʱ,Èç¹ûʹÓÃ֮ǰµÄÁ¬½Ó´´½¨·½Ê½¿ÉÄÜ»áÎÞ·¨½ÓÊÕµ½ÏûÏ¢,ÒòΪ°²È«ÎÊÌâ.ËùÒÔ,Èç¹ûÏëҪʹÓÃDemo²âÊÔ¶ÔÏóÏûÏ¢,´´½¨Á¬½Óʱ½¨Òé¸ÄΪÕâÑù(¹ÙÍøÍÆ¼öµÄÁ¬½Ó·½Ê½):
//´´½¨¶ÔÏóÏûÏ¢Á¬½Ó¹¤³§
ActiveMQConnectionFactory activeMQConnectionFactory
= new ActiveMQConnectionFactory(BROKER_URL);
List<String > list = new ArrayList<String>();
list.add("com.kinglong.activemq.receiver");
list.add("com.kinglong.activemq.model");
activeMQConnectionFactory.setTrustedPackages(list);
|
2.6 ActiveMQÊÂÎñÏûÏ¢ºÍ·ÇÊÂÎñÏûÏ¢
ÏûÏ¢·ÖΪÊÂÎñÏûÏ¢ºÍ·ÇÊÂÎñÏûÏ¢
1¡¢ÊÂÎñÏûÏ¢£º´´½¨»á»°SessionʹÓÃtransacted=true
connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE); |
2¡¢·ÇÊÂÎñÏûÏ¢£º´´½¨»á»°SessionʹÓÃtransacted=false
connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); |
ÊÂÎñÏûÏ¢±ØÐëÔÚ·¢ËͺͽÓÊÕÍêÏûÏ¢ºóÏÔʽµØµ÷ÓÃsession.commit();
ÊÂÎñÐÔÏûÏ¢£¬²»¹ÜÉèÖúÎÖÖÏûϢȷÈÏģʽ£¬¶¼»á×Ô¶¯±»È·ÈÏ£»ÓëÉèÖõÄÈ·ÈÏ»úÖÆÎÞ¹Ø,µ«¹Ù·½ÍƼöÊÂÎñÐÔÏûϢʹÓÃÊÂÎñÈ·ÈÏ»úÖÆ.
2.7 ActiveMQÏûϢȷÈÏ»úÖÆ
ÏûÏ¢Ö»ÓÐÔÚ±»È·ÈÏÖ®ºó£¬²ÅÈÏΪÒѾ±»³É¹¦Ïû·Ñ£¬È»ºóÏûÏ¢²Å»á´Ó¶ÓÁлòÖ÷ÌâÖÐɾ³ý¡£ÏûÏ¢µÄ³É¹¦Ïû·Ñͨ³£°üº¬Èý¸ö½×¶Î£º

(1)¡¢¿Í»§½ÓÊÕÏûÏ¢£»(2)¡¢¿Í»§´¦ÀíÏûÏ¢;(3)¡¢ÏûÏ¢±»È·ÈÏ£»
È·ÈÏ»úÖÆÇ°ÃæÌá¹ýÒ»ÏÂ,¹²ÓÐËÄÖÖ:
(1)¡¢Session.AUTO_ACKNOWLEDGE£»¿Í»§£¨Ïû·ÑÕߣ©³É¹¦´Óreceive·½·¨·µ»ØÊ±£¬»òÕß´ÓMessageListener.onMessage·½·¨³É¹¦·µ»ØÊ±£¬»á»°×Ô¶¯È·ÈÏÏûÏ¢,È»ºó×Ô¶¯É¾³ýÏûÏ¢.
(2)¡¢Session.CLIENT_ACKNOWLEDGE£»¿Í»§Í¨¹ýÏÔʽµ÷ÓÃÏûÏ¢µÄacknowledge·½·¨È·ÈÏÏûÏ¢,¡£
¼´ÔÚ½ÓÊն˵÷ÓÃmessage.acknowledge();·½·¨,·ñÔò,ÏûÏ¢ÊDz»»á±»É¾³ýµÄ.
(3)¡¢Session. DUPS_OK_ACKNOWLEDGE £»²»ÊDZØÐëÈ·ÈÏ£¬ÊÇÒ»ÖÖ¡°ÀÁÉ¢µÄ¡±ÏûϢȷÈÏ£¬ÏûÏ¢¿ÉÄÜ»áÖØ¸´·¢ËÍ£¬ÔÚµÚ¶þ´ÎÖØÐ´«ËÍÏûϢʱ£¬ÏûϢͷµÄJMSRedelivered»á±»ÖÃΪtrue±êʶµ±Ç°ÏûÏ¢ÒѾ´«Ë͹ýÒ»´Î£¬¿Í»§¶ËÐèÒª½øÐÐÏûÏ¢µÄÖØ¸´´¦Àí¿ØÖÆ¡£
(4)¡¢ Session.SESSION_TRANSACTED£»ÊÂÎñÌá½»²¢È·ÈÏ¡£
2.8 ActiveMQ³Ö¾Ã»¯ÏûÏ¢Óë·Ç³Ö¾Ã»¯ÏûÏ¢
messageProducer.setDeliveryMode(DeliveryMode.
NON_PERSISTENT);//²»³Ö¾Ã»¯
messageProducer.setDeliveryMode(DeliveryMode.
PERSISTENT);//³Ö¾Ã»¯µÄ£¬µ±È»activemq·¢ËÍÏûϢĬÈ϶¼Êdz־û¯µÄ |
˵Ã÷:
ÉèÖÃÍêºó,Èç¹ûΪ³Ö¾Ã»¯,ÄÇôÏûÏ¢ÔÚûÓб»Ïû·Ñǰ¶¼»á±»Ð´Èë±¾µØ´ÅÅÌkahadbÎļþÖб£´æÆðÀ´,¼´Ê¹·þÎñÆ÷å´»ú,Ò²²»»áÓ°Ïì
ÏûÏ¢.Èç¹ûÊǷdz־û¯µÄ,ÄÇô,·þÎñÒ»µ©å´»úÖ®ÀàµÄÇé¿ö·¢Éú,ÏûÏ¢¼´»á±»É¾³ý.
ActiveMQĬÈÏÊdz־û¯µÄ.
2.9 ActiveMQÏûÏ¢¹ýÂË
ActiveMQÌṩÁËÒ»ÖÖ»úÖÆ£¬¿É¸ù¾ÝÏûϢѡÔñÆ÷Öеıê×¼À´Ö´ÐÐÏûÏ¢¹ýÂË£¬Ö»½ÓÊÕ·ûºÏ¹ýÂ˱ê×¼µÄÏûÏ¢£»
Éú²úÕß¿ÉÔÚÏûÏ¢ÖзÅÈëÌØÓеıêÖ¾£¬¶øÏû·ÑÕßʹÓûùÓÚÕâÐ©ÌØ¶¨µÄ±êÖ¾À´½ÓÊÕÏûÏ¢£»
1¡¢·¢ËÍÏûÏ¢·ÅÈëÌØÊâ±êÖ¾£ºmessage . setString Property ( name
, value ) ;
2¡¢½ÓÊÕÏûϢʹÓûùÓÚÌØÊâ±êÖ¾µÄÏûϢѡÔñÆ÷:
MessageConsumer
createConsumer(Destination destination, String
messageSelector); |
×¢£ºÏûϢѡÔñÆ÷ÊÇÒ»¸ö×Ö·û´®£¬Óï·¨ÓëÊý¾Ý¿âµÄSQLÏàËÆ£¬Ï൱ÓÚSQLÓï¾äwhereÌõ¼þºóÃæµÄÄÚÈÝ£»
¾ßÌå´úÂëÈçÏÂ:
·¢ËͶ˴úÂë:
package
com.bjpowernode.activemq.selector;
import org.apache.activemq.Active
MQConnectionFactory;
import javax.jms.*;
/**
* ÏûÏ¢·¢ËÍÕß
*
*/
public class Sender {
/**ÏûÏ¢·þÎñÆ÷µÄÁ¬½ÓµØÖ·**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Sender sender = new Sender();
sender.sendMessage("Hello ActiveMQ.");
}
/**
* ·¢ËÍÏûÏ¢
*
* @param msg
*/
public void sendMessage (String msg) {
Connection connection = null;
Session session = null;
MessageProducer messageProducer = null;
try {
//1.´´½¨Ò»¸öÁ¬½Ó¹¤³§
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.´´½¨Ò»¸öÁ¬½Ó
connection = connectionFactory.
createConnection();
//3.´´½¨Ò»¸öSession
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
//5.´´½¨Ò»¸öÄ¿µÄµØ
Destination destination = session.create
Queue("myQueue");
//6.´´½¨Ò»¸öÏûÏ¢µÄÉú²úÕߣ¨·¢ËÍÕߣ©
messageProducer = session.createProducer
(destination);
//ÉèÖ÷¢Ë͵ÄÏûÏ¢ÊÇ·ñÐèÒª³Ö¾Ã»¯
messageProducer.setDeliveryMode(Delivery
Mode.NON_PERSISTENT);//ÕâÀïʹÓò»³Ö¾Ã»¯
//´´½¨Ò»¸öÑ»·,²âÊÔÏûÏ¢±êʶµÄʹÓÃ
for (int i=0; i<20; i++) {
//4.´´½¨ÏûÏ¢£¬´Ë´¦´´½¨ÁËÒ»¸öÎı¾ÏûÏ¢
Message message = session.createText
Message(msg+i);
//½«ÏûÏ¢ÉèÖÃÒ»¸öÌØÓеıêʶ
message.setIntProperty("id", i);
//7.·¢ËÍÏûÏ¢
messageProducer.send(message);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//¹Ø±ÕÁ¬½ÓÊÍ·Å×ÊÔ´
if (null != messageProducer) {
messageProducer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
} |
½ÓÊÕ¶Ë´úÂë:
package
com.bjpowernode.activemq.selector;
import org.apache.activemq.ActiveMQ
ConnectionFactory;
import javax.jms.*;
public class Receiver {
/**ÏûÏ¢·þÎñÆ÷µÄÁ¬½ÓµØÖ·**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.receiveMessage();
}
/**
* ½ÓÊÕÏûÏ¢
*
*/
public void receiveMessage () {
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//1.´´½¨Ò»¸öÁ¬½Ó¹¤³§
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.´´½¨Ò»¸öÁ¬½Ó
connection = connectionFactory.
createConnection();
//3.´´½¨Ò»¸öSession
session = connection.createSession
(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
//4.´´½¨Ò»¸öÄ¿µÄµØ
Destination destination = session.createQueue("myQueue");
//5.´´½¨Ò»¸öÏûÏ¢µÄÏû·ÑÕߣ¨½ÓÊÕÕߣ©,
selector¼´ÎªÏûϢѡÔñÆ÷,ͨ¹ýÑ¡ÔñÐèÒªµÄ±êʶ,¹ýÂËÏûÏ¢½ÓÊÜidΪ10-15Ö®
//¼äµÄÏûÏ¢
String selector = "id >=10 and id<=15";
messageConsumer = session.createConsumer
(destination, selector);
//½ÓÊÕÏûϢ֮ǰ£¬ÐèÒª°ÑÁ¬½ÓÆô¶¯Ò»ÏÂ
connection.start();
while (true) {
//6.½ÓÊÕÏûÏ¢ ͬ²½½ÓÊÕ
Message message = messageConsumer.
receive();
//ÅжÏÏûÏ¢µÄÀàÐÍ
if (message instanceof TextMessage)
{ //ÅжÏÊÇ·ñÊÇÎı¾ÏûÏ¢
String text = ((TextMessage) message).getText();
System.out.println("½ÓÊÕµ½µÄÏûÏ¢ÄÚÈÝÊÇ£º" +
text);
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//¹Ø±ÕÁ¬½ÓÊÍ·Å×ÊÔ´
if (null != messageConsumer) {
messageConsumer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
} |
2.10 ActiveMQÏûÏ¢½ÓÊÕ·½Ê½
ͬ²½½ÓÊÕ:receive()·½·¨½ÓÊÕÏûÏ¢½Ðͬ²½½ÓÊÕ,¾ÍÊÇ֮ǰµÄDemo´úÂëʹÓõĽÓÊÕ·½Ê½.ÔÚ²»Ê¹ÓÃÑ»··½·¨Ê±½ÓÊÕ¶Ë´úÂëÖ´ÐÐ
Ò»´Î¼´½áÊø.
Òì²½½ÓÊÕ:ʹÓüàÌýÆ÷½ÓÊÕÏûÏ¢£¬ÕâÖÖ½ÓÊÕ·½Ê½½ÐÒì²½½ÓÊÕ,½ÓÊÕ¶Ë»áÒ»Ö±´¦ÓÚ¼àÌý״̬,Ö»ÒªÓÐÏûÏ¢²úÉú,¼´»á½ÓÊÕÏûÏ¢.
ÏÂÃæÊÇÒì²½½ÓÊÕ´úÂë:
package
com.bjpowernode.activemq.listener;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Receiver {
/**ÏûÏ¢·þÎñÆ÷µÄÁ¬½ÓµØÖ·**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.receiveMessage();
}
/**
* ½ÓÊÕÏûÏ¢
*
*/
public void receiveMessage () {
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//1.´´½¨Ò»¸öÁ¬½Ó¹¤³§
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.´´½¨Ò»¸öÁ¬½Ó
connection = connectionFactory.createConnection();
//3.´´½¨Ò»¸öSession
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
//4.´´½¨Ò»¸öÄ¿µÄµØ
Destination destination = session.createQueue("myQueue");
//5.´´½¨Ò»¸öÏûÏ¢µÄÏû·ÑÕߣ¨½ÓÊÕÕߣ©
messageConsumer = session.createConsumer(destination);
//½ÓÊÕÏûϢ֮ǰ£¬ÐèÒª°ÑÁ¬½ÓÆô¶¯Ò»ÏÂ
connection.start();
//6.½ÓÊÕÏûÏ¢ ͬ²½½ÓÊÕ
//Message message = messageConsumer.receive();
//Òì²½½ÓÊÕ£¬Ê¹ÓüàÌýÆ÷½ÓÊÕÏûÏ¢
messageConsumer.setMessageListener(new MessageListener(){
public void onMessage(Message message) {
//ÅжÏÏûÏ¢µÄÀàÐÍ
if (message instanceof TextMessage) { //ÅжÏÊÇ·ñÊÇÎı¾ÏûÏ¢
String text = null;
try {
text = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("½ÓÊÕµ½µÄÏûÏ¢ÄÚÈÝÊÇ£º" +
text);
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
/*try {
//¹Ø±ÕÁ¬½ÓÊÍ·Å×ÊÔ´
if (null != messageConsumer) {
messageConsumer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}*/
}
}
} |
Èý.ActiveMQµÄ¼¯³ÉʹÓÃ:
3.1ActiveMQÓëSpring¼¯³É(ÒÔÏÂÊÇÒ»¸ö¼òµ¥µÄÉÏÊÖDemo):
¼øÓÚ·¢ËͶËÓë½ÓÊÕ¶ËpomÎļþÒÀÀµÏàͬ,ËùÒÔֻдһ·Ý,ÒÀÀµÈçÏÂ:
[html]
view plain copy
<!--spring-jmsÒÀÀµ-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.13.RELEASE</version>
</dependency>
<!--activemqµÄjarÒÀÀµ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.2</version>
</dependency>
<!-- JMS¹æ·¶µÄjarÒÀÀµ -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
|
·¢ËͶË:(ÐèÒªÁ½¸öÅäÖÃÎļþ)
1. applicationContext.xml
<?xml
version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.
org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema
-instance"
xsi:schemaLocation="http://www.springframework.
org/schema/beans
http://www.springframework.
org/schema/beans
/spring-beans.xsd">
<import resource="applicationContext-jms.xml"/>
</beans> |
2. applicationContext-jms.xml
<?xml
version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.
org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema
-instance"
xmlns:context="http://www.springframework.org/
schema/context"
xsi:schemaLocation="http://www.springframework.
org/schema/beans
http://www.springframework.
org/schema/beans/
spring-beans.xsd
http://www.springframework.
org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="com.kinglong.activemq.sender"/>
<!-- ÅäÖÃÒ»¸öÁ¬½Ó¹¤³§ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnection
Factory">
<property name="brokerURL" value="tcp://192.168.
174.129:61616"/>
</bean>
<!-- ÅäÖÃJmsTemplate -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory"
ref="connection
Factory" />
<property name="defaultDestinationName"
value="springTopic"
/>
<!--Ö¸¶¨ÏûÏ¢´«ËÍģʽ£¬true±íʾ·¢²¼¶©ÔÄ¡¢
false±íʾµã¶Ôµã-->
<property name="pubSubDomain" value="true"/>
</bean>
</beans> |
·¢ËÍÀà:
package
com.kinglong.activemq.sender;
import org.springframework.beans.factory.annotation
.Autowired;
import org.springframework.jms.core.
JmsTemplate;
import org.springframework.jms.core.
MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
@Component
public class Sender {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage (final String msg) {
//·¢ËÍÏûÏ¢
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session
session) throws
JMSException {
return session.createTextMessage(msg);
}
});
}
} |
ÏÂÃæÊDzâÊÔÀà:
package
com.kinglong.activemq.sender;
import org.springframework.context.support.
ClassPathXmlApplicationContext;
public class Test {
public static void main(String[] args) {
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("classpath:app
licationContext.xml");
Sender springSender = (Sender)context.getBean("sender");
springSender.sendMessage("Spring jms ActiveMQ.");
}
}
|
½ÓÊÕ¶Ë(ҲͬÑùÐèÒªÁ½¸öÅäÖÃÎļþ,ÕâÀïʹÓÃÒì²½½ÓÊÕ,ËùÒÔÐèÒªÅäÖÃÒ»¸ö¼àÌýÆ÷):
1. applicationContext.xml
<?xml
version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework
.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema
-instance"
xsi:schemaLocation="http://www.springframework.
org/schema/beans
http://www.springframework.
org/schema
/beans/spring-beans.xsd">
<import resource="applicationContext-jms.xml"/>
</beans> |
2. applicationContext-jms.xml
¼àÌýÆ÷:(MyMessageListener)
<?xml
version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org
/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XML
Schema-instance"
xmlns:context="http://www.springframework.
org/schema/context"
xsi:schemaLocation="http://www.springframework.
org/schema/beans
http://www.springframework.
org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context
/spring-context.xsd">
<context:component-scan base-package="com
.kinglong.activemq.receiver"/>
<!-- ÅäÖÃÒ»¸öÁ¬½Ó¹¤³§ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.
168.174.129:61616"/>
</bean>
<!-- ÅäÖÃJmsTemplate -->
<bean id="jmsTemplate" class="org.springframework.
jms.core.JmsTemplate">
<property name="connectionFactory"
ref="
connectionFactory"
/>
<property name="defaultDestinationName"
value="springTopic"
/>
<!--Ö¸¶¨ÏûÏ¢´«ËÍģʽ£¬true±íʾ·¢²¼¶©ÔÄ¡¢
false±íʾµã¶Ôµã-->
<property name="pubSubDomain" value="true"/>
</bean>
<!-- ÎÒÃÇ×Ô¶¨ÒåµÄÒ»¸öÏûÏ¢¼àÌýÆ÷ -->
<bean id="receiverListener" class="com.kinglong.activemq.receiver.MyMessage
Listener" />
<!-- ÅäÖÃÒ»¸ösping¼àÌýÆ÷µÄÈÝÆ÷ -->
<bean class="org.springframework.jms.listener.Default
MessageListenerContainer">
<property name="connectionFactory"
ref="
connectionFactory"/>
<property name="destinationName"
value="
springTopic"/>
<property name="messageListener"
ref="receiver
Listener" />
<!--Ö¸¶¨ÏûÏ¢´«ËÍģʽ£¬true±íʾ·¢²¼¶©ÔÄ¡¢
false±íʾµã¶Ôµã-->
<property name="pubSubDomain" value="true"/>
</bean>
</beans> |
ÏÂÃæÊDzâÊÔÀà:
package
com.kinglong.activemq.receiver;
import org.springframework.context.support.Class
PathXmlApplicationContext;
public class Test {
public static void main(String[] args) {
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("classpath:
applicationContext.xml");
}
} |
˵Ã÷:ÓÉÓÚʹÓõÄÊÇTopic·¢²¼¶©ÔÄģʽ,ËùÒÔ²âÊÔʱһ¶¨ÏÈ¿ªÆô½ÓÊÕ¶Ë·þÎñ,ÔÙ¿ªÆô·¢ËͶ˷¢ËÍÏûÏ¢,·ñÔò½Ó²»µ½ÏûÏ¢.
Ïà±È½ÏÖ®ÏÂ,spring¼¯³ÉµÄactivemqʹÓÃÒѾÏ൱¼ò»¯,ÓëÔʼ°æÏà±È,Õû¸öÁ´½Ó¶ÔÏóµÄ´´½¨È«²¿½»¸øspringÍê³ÉÁË,ËùÒÔЧÂÊÉϸü¸ßÁË.
Æä´Î,¸÷ÊôÐÔÅäÖÃÒ²¶¼´Ó´úÂëÖаþÀë¹éÕûÔÚÅäÖÃÎļþÀï,ʹµÃÐÞ¸ÄÅäÖÃÒ²±äµÃ¸üΪÈÝÒ×,µ«ÊÇ,¸öÈ˾õµÃ»¹ÊÇÓбØÒªÑ§Ï°Ñ§Ï°Ôʼ°æ±¾,
¿ÉÒÔ¸üºÃµØÀí½âÆä´´½¨¹ý³Ì,¼ÓÉîÀí½â.
3.2ActiveMQÓëSpringboot¼¯³É(ÒÔÏÂÊÇÒ»¸ö¼òµ¥µÄÉÏÊÖDemo):
ͬÑùpomÒÀÀµ,ÒòΪÒÀÀµÏàͬ,ËùÒÔֻдһ·Ý,ÈçÏÂ:
<!--ÅäÖø¸¼¶ÒÀÀµ-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent
from repository -->
</parent>
<!--ÊôÐÔÅäÖÃ-->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--¿ª·¢springbootµÄJava³ÌÐòµÄÆð²½ÒÀÀµ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--springboot¼¯³ÉactivemqµÄÆð²½ÒÀÀµ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies> |
·¢ËͶË:
Ö»ÓÐÒ»¸öÖ÷ÅäÖÃÎļþ:application.properties
spring.activemq.broker-url=tcp://192.168.174.129:61616
#·¢²¼¶©ÔÄģʽ
spring.jms.pub-sub-domain=true |
main·½·¨Àà:
package
com.kinglong.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.
SpringBootApplication;
import org.springframework.context.Configurable
ApplicationContext;
@SpringBootApplication
public class SenderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(SenderApplication.
class, args);
SpringBootSender springBootSender = (SpringBootSender)context.getBean
("springBootSender");
springBootSender.sendMessage("springboot
activemq.");
}
} |
·¢ËÍÕß:SpringBootSender
package
com.kinglong.activemq;
import org.springframework.beans.factory.
annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
@Component
public class SpringBootSender {
@Autowired //×¢ÈëJmsTemplate
private JmsTemplate jmsTemplate;
//·¢ËÍÏûÏ¢£¬"myTopic"ÊÇ·¢Ë͵½µÄ¶ÓÁÐ
destination£¬messageÊÇ´ý·¢Ë͵ÄÏûÏ¢
public void sendMessage(String message) {
jmsTemplate.send("myTopic", new Message
Creator() {
@Override
public Message createMessage(Session session)
throws JMSException
{
return session.createTextMessage(message);
}
});
}
} |
½ÓÊÕ¶Ë:
Ö÷ÅäÖÃÎļþÓë·¢ËͶËÏàͬ
main·½·¨Àà:ReceiverApplication
package
com.kinglong.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.
SpringBootApplication;
import org.springframework.context.Configurable
ApplicationContext;
import javax.jms.JMSException;
@SpringBootApplication
public class ReceiverApplication {
public static void main(String[] args) throws
JMSException {
ConfigurableApplicationContext context = SpringApplication.run(ReceiverApplication.
class, args);
//ÕâÀï²ÉÓÃÒì²½½ÓÊÕ
}
} |
¼àÌýÆ÷:MessageListenerReceiver
package
com.kinglong.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MessageListenerReceiver {
@JmsListener(destination = "myTopic")
public void receiveQueue(String text) {
System.out.println("ConsumerÊÕµ½µÄ±¨ÎÄΪ:"
+ text);
}
} |
OVER! ÊDz»ÊÇ·¢ÏÖspringboot¼¯³ÉµÄActiveMQ¸ü¼Ó¾«ÇÉÁË?ÅäÖÃÎļþ¶¼²»ÓÃдÁË,springbootÈ«°ïÎÒÃÇ×öÍêÁË.²»¹ý,»¹ÊÇÄǾ仰,½¨Òé°ÑÔʼ°æ±¾ÔÀí,˼·ÕûͨÁËÔÙ×öspring»òÊÇspringbootµÄ¼¯³É,Òª²»¸Ð¾õÖ±½ÓÀí½âÆðÀ´springbootµÄ·¢ËͺͽÓÊÕ»¹ÊDZȽϳéÏóµÄ.
ËÄ.ActiveMQ¼¯Èº
4.1 ºÎΪ¼¯Èº?
¼¯Èº¾ÍÊǽ«ÏàͬµÄ³ÌÐò¡¢¹¦ÄÜ£¬²¿ÊðÔÚÁ½Ì¨»ò¶ą̀·þÎñÆ÷ÉÏ£¬ÕâЩ·þÎñÆ÷¶ÔÍâÌṩµÄ¹¦ÄÜÊÇÍêȫһÑùµÄ¡£Í¨¹ý²»¶ÏºáÏòÀ©Õ¹Ôö¼Ó·þÎñÆ÷µÄ·½Ê½£¬ÒÔÌá¸ß·þÎñµÄÄÜÁ¦¡£

4.1.1 ²»¼¯ÈºÄ£Ê½

4.1.2 ¼¯ÈºÄ£Ê½
4.2 ¼¯ÈºµÄÓÅÊÆ:
1¡¢¼¯Èº¿ÉÒÔ½â¾öµ¥µã¹ÊÕÏÎÊÌ⣻
2¡¢¼¯Èº¿ÉÒÔÌá¸ßϵͳ·þÎñÄÜÁ¦£»
|