Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
·Ö²¼Ê½ÏûÏ¢Öмä¼þ-Rocketmq
 
  5601  次浏览      34
 2019-1-21
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚcsdn£¬ÎÄÕ·ÖÏíÁË·Ö²¼Ê½ÏûÏ¢Öмä¼þ£¬Ö÷Òª»ùÓÚJMS¹æ·¶¡¢RocketmqµÄ½éÉÜ¡¢²¿Êð·½Ê½¡¢ÌØÐÔµÄһЩʹÓü¸´óÄ£¿é²ûÊö¡£

¼òÊö

½ñÌìÒª¸ø´ó¼Ò·ÖÏíµÄÊÇ·Ö²¼Ê½ÏûÏ¢Öмä¼þ¡£ÏûÏ¢Öмä¼þÖ÷ÒªÊÇʵÏÖ·Ö²¼Ê½ÏµÍ³Öнâñî¡¢Òì²½ÏûÏ¢¡¢Á÷Á¿Ïú·æ¡¢ÈÕÖ¾´¦ÀíµÈ³¡¾°£¬ºóÃæÎÒÒ²»á½áºÏһЩ³¡¾°½øÐÐ̽ÌÖ¡£ÏÖÔÚÉú²úÖÐÓõÄ×î¶àµÄÏûÏ¢¶ÓÁÐÓÐActivemq£¬rabbitmq£¬kafka£¬rocketmqµÈ¡£

²»¹ýÕâ¸öÌâĿдµÄÓеã´ó¡£ÎªÊ²Ã´ÕâÑùËµÄØ£¬ÒòΪËäÈ»ÕâÑùд£¬µ«Êµ¼ÊÉÏÎÒÕâ±ßÊÇÒÔJms¹æ·¶ºÍrocketmqΪÖ÷À´·ÖÏíµÄ¡£´ó¼ÒÒ»°ã¶¼ÊÇ×ñ´ÓµÄÊǹ«Ë¾¼¼Êõ¹æ·¶ºÍÑ¡Ôñ(Ò²¾ÍÊǹ«Ë¾ÓÃʲô»ù´¡×é¼þ»ù±¾É϶¼ÊÇÑ¡ÔñºÃµÄ)¡£»ùÓÚÎÒµÄһЩ¹¤×÷¾­Àú£¬ËùÒÔÎÒÒ²Êǽ«ÖØÐÄ·ÅÔÚÁËrocketmq£¬²¢ÇÒÈÏΪ´ó¼Ò¿ÉÄÜˮƽ²»Ò»Ö£¬ËùÒÔÒ²´ÓһЩ»ù´¡µÈ·½Ãæ¶Ôrocketmq×öÁËÒ»¸ö½ÏÈ«ÃæµÄ·ÖÏí¡£ÎÒÃÇ·ÖÏíµÄ°æ±¾Ò²ÊÇ»ùÓÚ3.2.6½øÐеġ£ÎÒÉÏÃæÒ²»áÌùһЩapiʵÀý¡£¿ÉÄÜÊÇÒòΪÎÒÊÇ×ö¼¼ÊõµÄ£¬ËùÒÔÎÒÒ»Ö±ÈÏΪ¼¼Êõ·ÖÏí£¬Èç¹ûÖ»ÊÇÌ췽ҹ̷ûÓÐʵÖÊÐÔ£¬Ò²¾ÍÊÇÓÐЩͬѧ³£ËµµÄûÓиɻÀË·Ñʱ¼ä¡£

ÎÒÕâ±ßÄØÖ÷ÒªÊÇ»ùÓÚÕ⼸¸ö´ó¿éÀ´½øÐеķÖÏí£¬JMS¹æ·¶¡¢RocketmqµÄ½éÉÜ¡¢²¿Êð·½Ê½¡¢ÌØÐÔµÄһЩʹÓá£

JMS¹æ·¶

ÎÒÃÇÊ×ÏÈ¿´ÏÂjms¹æ·¶£¬ÎÒÉÏÃæÒ²Ëµµ½Á˱¾´Î·ÖÏíµÄÖØµãÊÇrocketmq£¬rocketmqËäÈ»²»ÍêÈ«»ùÓÚjms¹æ·¶£¬µ«ÊÇËû²Î¿¼ÁËjms¹æ·¶ºÍ CORBA Notification ¹æ·¶µÈ£¬¿ÉÒÔ˵ÊÇÇà³öÓÚÀ¶¶øÊ¤ÓÚÀ¶£¬jms¹æ·¶ÎÒÕâ±ß»á¿ìËٵĸø´ó¼Ò¹ýһϣ¬ÎÒÃǰѸü¶àµÄʱ¼äÁô¸ørocketmq£¬ºóÃæ½²rocketmqµÄʱºòÒ²»á½«Á½Õß×öÒ»¸ö¼òµ¥µÄ±È½Ï¡£ÎÒ»á¸ø´ó¼Ò½éÉÜÒ»ÏÂʲôÊÇjms£¬Ïà¹ØµÄ¸ÅÄ¶ÔÏóÄ£ÐÍ¡¢ÏêϸÏû·Ñ¡¢±à³ÌʵÀý¡£

ʲôÊÇjmsÄØ

jmsÆäʵ¾ÍÊÇÀàËÆÓÚjdbcµÄÒ»Ì׽ӿڹ淶£¬µ«²»Í¬µÄÊÇËûÊÇÃæÏòµÄÏûÏ¢·þÎñ£¬ÌṩһÌ×±ê×¼API½Ó¿Ú£¬´ó²¿·Ö³§É̶¼»á²Î¿¼jms¹æ·¶£¬²»¹ýÎÒÃǺóÃæÒª½²µ½µÄrocketmqȴûÓÐÑϸñ×ñÊØjms¹æ·¶£¬ºóÃæÎÒÃǻὲµ½¡£

һЩ³£¼ûµÄjms³§ÉÌÓУºIBM µÄ MQSeries¡¢BEAµÄ Weblogic JMS serviceºÍ Progress µÄ SonicMQ£¬»¹ÓÐAPACHE¿ªÔ´µÄActiveMQ¡£ÕâÀïÃæActivemqÕâ¸öÒ²ÊÇÎÒ½Ó´¥µ½µÄµÚÒ»¸ömq£¬ÏÖÔÚÊг¡·Ý¶îÒ²ÊǺܴóµÄ£¬¾©¶«É̳DzÉÓõľÍÊÇÕâ¸ö¡£

»ù±¾¸ÅÄî

ËûµÄһЩ¸ÅÄîÖ÷ÒªÊÇÓÐÕâЩ£¬ÎÒÃǰ¤¸ö¶ù¿´Ò»Ï£º

·¢ËÍÕߣ¨ Sender£©

Ò²¾ÍÊÇÏûÏ¢µÄÉú²úÕߣ¬Ë׵Ľ«¾ÍÊÇ´´½¨²¢·¢ËÍÏûÏ¢µÄJMS¿Í»§¶Ë¡£

½ÓÊÕÕߣ¨ Receiver£©

Ò²¾ÍÊÇÏûÏ¢Ïû·ÑÕߣ¬½ÓÊÕ¶©ÖÆÏûÏ¢µÄ²¢°´ÕÕÏàÓ¦µÄÒµÎñÂß¼­½øÐд¦Àí£¬×îÖÕ½«½á¹û·´À¡¸ømqµÄ·þÎñ¶Ë¡£

µã¶Ôµã£¨ Point-to-Point(P2P) £©

µã¶Ôµã¾ÍÊÇÒ»¶ÔÒ»µÄ¹ØÏµ£¬Ò»¸öÏûÏ¢·¢³öÖ»ÓÐÒ»¸ö½ÓÊÜÕßËù´¦Àí¡£Ã¿¸öÏûÏ¢¶¼±»·¢Ë͵½Ò»¸öÌØ¶¨µÄ¶ÓÁУ¬½ÓÊÕÕß´Ó¶ÓÁÐÖлñÈ¡ÏûÏ¢¡£¶ÓÁб£Áô×ÅÏûÏ¢£¬Ö±µ½ËûÃDZ»Ïû·Ñ»ò³¬Ê±¡£

·¢²¼¶©ÔÄ£¨ Publish/Subscribe(Pub/Sub) £©

1¡¢¿Í»§¶Ë½«ÏûÏ¢·¢Ë͵½Ö÷Ìâ¡£¶à¸ö·¢²¼Õß½«ÏûÏ¢·¢Ë͵½Topic,ϵͳ½«ÕâЩÏûÏ¢´«µÝ¸ø¶à¸ö¶©ÔÄÕß¡£

2¡¢Èç¹ûÄãÏ£Íû·¢Ë͵ÄÏûÏ¢¿ÉÒÔ²»±»×öÈκδ¦Àí¡¢»òÕß±»Ò»¸öÏûÏ¢Õß´¦Àí¡¢»òÕß¿ÉÒÔ±»¶à¸öÏû·ÑÕß´¦ÀíµÄ»°£¬ÄÇô¿ÉÒÔ²ÉÓÃPub/SubÄ£ÐÍ

ÏûÏ¢¶ÓÁУ¨Queue£©

Ò»¸öÈÝÄÉÄÇЩ±»·¢Ë͵ĵȴýÔĶÁµÄÏûÏ¢µÄÇøÓò¡£Óë¶ÓÁÐÃû×ÖËù°µÊ¾µÄÒâ˼²»Í¬£¬ÏûÏ¢µÄ½ÓÊÜ˳Ðò²¢²»Ò»¶¨ÒªÓëÏûÏ¢µÄ·¢ËÍ˳ÐòÏàͬ¡£Ò»µ©Ò»¸öÏûÏ¢±»ÔĶÁ£¬¸ÃÏûÏ¢½«±»´Ó¶ÓÁÐÖÐÒÆ×ß¡£

Ö÷Ì⣨Topic£©

Ò»ÖÖÖ§³Ö·¢ËÍÏûÏ¢¸ø¶à¸ö¶©ÔÄÕߵĻúÖÆ¡£

·¢²¼Õߣ¨Publisher£©

ͬÉú²úÕß

¶©ÔÄÕߣ¨Subscriber£©

Õë¶ÔͬһÖ÷ÌâµÄ¶à¸öÏû·ÑÕß

µã¶Ôµã

´ó¼Ò¿´Õâ¸öͼ£¬ÕâÀï¾ÍÊÇ·¢²¼¶©ÔĵĹØÏµ

·¢²¼¶©ÔÄ

´ó¼Ò¿ÉÒÔͨ¹ýÕâ¸öͼÔÚÊìϤһÏ·¢²¼¶©ÔĵĹØÏµ

¶ÔÏóÄ£ÐÍ

(1) ConnectionFactory

´´½¨Connection¶ÔÏóµÄ¹¤³§£¬Õë¶ÔÁ½ÖÖ²»Í¬µÄjmsÏûϢģÐÍ£¬·Ö±ðÓÐQueueConnectionFactoryºÍTopicConnectionFactoryÁ½ÖÖ£¨ºÜÏÔÈ»ÊÇ»ùÓÚµã¶ÔµãºÍºÍ·¢²¼¶©ÔĵÄÁ½ÖÖ·½Ê½·Ö±ð´´½¨Á¬½Ó¹¤³§µÄ£©¡£¿ÉÒÔͨ¹ýJNDIÀ´²éÕÒConnectionFactory¶ÔÏó¡£

(2) Destination

DestinationµÄÒâ˼ÊÇÏûÏ¢Éú²úÕßµÄÏûÏ¢·¢ËÍÄ¿±ê»òÕß˵ÏûÏ¢Ïû·ÑÕßµÄÏûÏ¢À´Ô´¡£¶ÔÓÚÏûÏ¢Éú²úÕßÀ´Ëµ£¬ËüµÄDestinationÊÇij¸ö¶ÓÁУ¨Queue£©»òij¸öÖ÷Ì⣨Topic£©;¶ÔÓÚÏûÏ¢Ïû·ÑÕßÀ´Ëµ£¬ËüµÄDestinationÒ²ÊÇij¸ö¶ÓÁлòÖ÷Ì⣨¼´ÏûÏ¢À´Ô´£©¡£ËùÒÔ£¬Destinationʵ¼ÊÉϾÍÊÇÁ½ÖÖÀàÐ͵ĶÔÏó£ºQueue¡¢Topic¿ÉÒÔͨ¹ýJNDIÀ´²éÕÒDestination¡£

(3) Connection

Connection±íʾÔÚ¿Í»§¶ËºÍJMSϵͳ֮¼ä½¨Á¢µÄÁ´½Ó£¨¶ÔTCP/IP socketµÄ°ü×°£©¡£Connection¿ÉÒÔ²úÉúÒ»¸ö»ò¶à¸öSession¡£¸úConnectionFactoryÒ»Ñù£¬ConnectionÒ²ÓÐÁ½ÖÖÀàÐÍ£ºQueueConnectionºÍTopicConnection¡£

(4) Session

SessionÊÇÎÒÃDzÙ×÷ÏûÏ¢µÄ½Ó¿Ú¡£¿ÉÒÔͨ¹ýsession´´½¨Éú²úÕß¡¢Ïû·ÑÕß¡¢ÏûÏ¢µÈ¡£SessionÌṩÁËÊÂÎñµÄ¹¦ÄÜ¡£µ±ÎÒÃÇÐèҪʹÓÃsession·¢ËÍ/½ÓÊÕ¶à¸öÏûϢʱ£¬¿ÉÒÔ½«ÕâЩ·¢ËÍ/½ÓÊÕ¶¯×÷·Åµ½Ò»¸öÊÂÎñÖС£Í¬Ñù£¬Ò²·ÖQueueSessionºÍTopicSession¡£

(5) ÏûÏ¢µÄÉú²úÕß

ÏûÏ¢Éú²úÕßÓÉSession´´½¨£¬²¢ÓÃÓÚ½«ÏûÏ¢·¢Ë͵½Destination¡£Í¬Ñù£¬ÏûÏ¢Éú²úÕß·ÖÁ½ÖÖÀàÐÍ£ºQueueSenderºÍTopicPublisher¡£¿ÉÒÔµ÷ÓÃÏûÏ¢Éú²úÕߵķ½·¨£¨send»òpublish·½·¨£©·¢ËÍÏûÏ¢¡£

(6) ÏûÏ¢Ïû·ÑÕß

ÏûÏ¢Ïû·ÑÕßÓÉSession´´½¨£¬ÓÃÓÚ½ÓÊÕ±»·¢Ë͵½DestinationµÄÏûÏ¢¡£Á½ÖÖÀàÐÍ£ºQueueReceiverºÍTopicSubscriber¡£¿É·Ö±ðͨ¹ýsessionµÄcreateReceiver(Queue)»òcreateSubscriber(Topic)À´´´½¨¡£µ±È»£¬Ò²¿ÉÒÔsessionµÄcreatDurableSubscriber·½·¨À´´´½¨³Ö¾Ã»¯µÄ¶©ÔÄÕß¡£

(7) MessageListener

ÏûÏ¢¼àÌýÆ÷¡£Èç¹û×¢²áÁËÏûÏ¢¼àÌýÆ÷£¬Ò»µ©ÏûÏ¢µ½´ï£¬½«×Ô¶¯µ÷ÓüàÌýÆ÷µÄonMessage·½·¨¡£ÎÒÃǺóÃæÏûÏ¢Ïû·Ñ»¹»á¿´µ½¡£

ÏûÏ¢Ïû·Ñ

ÔÚJMSÖУ¬ÏûÏ¢µÄ²úÉúºÍÏûÏ¢ÊÇÒì²½µÄ¡£¶ÔÓÚÏû·ÑÀ´Ëµ£¬JMSµÄÏûÏ¢Õß¿ÉÒÔͨ¹ýÁ½ÖÖ·½Ê½À´Ïû·ÑÏûÏ¢¡£

¡ð ͬ²½

¶©ÔÄÕß»ò½ÓÊÕÕßµ÷ÓÃreceive·½·¨À´½ÓÊÕÏûÏ¢£¬receive·½·¨ÔÚÄܹ»½ÓÊÕµ½ÏûϢ֮ǰ£¨»ò³¬Ê±Ö®Ç°£©½«Ò»Ö±×èÈû

¡ð Òì²½

¶©ÔÄÕß»ò½ÓÊÕÕß¿ÉÒÔ×¢²áΪһ¸öÏûÏ¢¼àÌýÆ÷¡£µ±ÏûÏ¢µ½´ïÖ®ºó£¬ÏµÍ³×Ô¶¯µ÷ÓüàÌýÆ÷µÄonMessage·½·¨¡£

±à³ÌʵÀý

¾Ù¸öÀý×Ó£º

ÉÏÃæËµÁËÕâô¶à¸ÅÄîÐԵĶ«Î÷¿ÉÄÜ´ó¼Ò¶¼¾õµÃûɶÒâ˼£¬¸öÈ˸оõ³ÌÐòÔ±»¹ÊÇÓóÌÐòÀ´Ëµ±È½ÏÇå³þ£¬ÎÒÕâ±ßͨ¹ýactivemqµÄ²¿·Ö´úÂëÀ´¼òµ¥ËµÃ÷Ò»ÏÂÉÏÃæËµµÀµÄһЩJMS¹æ·¶

public void init(){
try {
//´´½¨Ò»¸öÁ´½Ó¹¤³§£¨Óû§Ãû£¬ÃÜÂ룬brokerµÄurlµØÖ·£©
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//´Ó¹¤³§Öд´½¨Ò»¸öÁ´½Ó
connection = connectionFactory.createConnection();
//¿ªÆôÁ´½Ó
connection.start();
//´´½¨Ò»¸ö»á»°
session = connection.createSession(true,Session.
SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}

¹«¹²²¿·Ö£ºÒ²¾ÍÊÇ˵²»¹ÜÄãÊÇÏûÏ¢µÄÉú²úÕß»¹ÊÇÏûÏ¢µÄÏû·ÑÕß¶¼ÐèÒªÕâЩ²½Öè

Ê×ÏÈÎÒÃÇÐèÒª´´½¨Ò»¸öÁ¬½Ó¹¤³§£¬µ±È»ÕâÀïÎÒÃÇÐèÒªÊäÈëÓû§ÐÔºÍÃÜÂ뻹ÓоÍÊÇbrokerµÄurl

È»ºóÎÒÃǸù¾ÝÁ¬½Ó¹¤³§´´½¨ÁËÒ»¸öÁ¬½Ó£¬´Ë¿ÌÕâ¸ö¹¤³§²¢Ã»ÓкÍbroker¼òÀúÁ¬½Ó

µ÷ÓÃstart·½·¨¾ÍºÍbroker½¨Á¢ÁËÁ¬½Ó£¬ÕâÀïÎÒ´ó¸Å½âÊÍÒ»ÏÂbroker

broker£ºÏûÏ¢¶ÓÁкËÐÄ£¬Ï൱ÓÚÒ»¸ö¿ØÖÆÖÐÐÄ£¬¸ºÔð·ÓÉÏûÏ¢¡¢±£´æ¶©ÔĺÍÁ¬½Ó¡¢ÏûϢȷÈϺͿØÖÆÊÂÎñ£¬activemq¿ÉÒÔÅäÖöà¸ö

´´½¨Ò»¸ösession£¬ÉÏÃæÎÒÃÇÌáµ½¹ýËùÓеÄÏûÏ¢²Ù×÷¶¼ÊÇÓësession½øÐеÄ

public void sendMsg(String queueName){
try {
//´´½¨Ò»¸öÏûÏ¢¶ÓÁУ¨´Ë´¦Ò²¾ÍÊÇÔÚ´´½¨Destination£©
Queue queue = session.createQueue(queueName);
//ÏûÏ¢Éú²úÕß
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//´´½¨Ò»ÌõÏûÏ¢
TextMessage msg = session.createTextMessage(Thread.currentThread()
.getName()+
"productor:Éú²úÏûÏ¢,count:"+num);
//·¢ËÍÏûÏ¢
messageProducer.send(msg);
//Ìá½»ÊÂÎñ
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Éú²ú£ºÅäÖÃÍêÉÏÃæµÄ¹«¹²²¿·ÖÎÒÃÇ¾ÍÆÈ²»¼°´ýµÄ°ÑÏûÏ¢Éú²ú³öÀ´°É£¬ÎÒÕâ±ß˵µÄÊǵã¶ÔµãµÄ·½Ê½

ͨ¹ýsession´´½¨Ò»¸öDestination£¬ÎÒÕâ±ßÖ±½Ó¾ÍÓÃÁËqueueÁË

½ÓÏÂÀ´ÎÒÃÇÐèÒª´´½¨Ò»¸öÏûÏ¢µÄÉú²úÕß

ÎÒÕâ±ß¾ÍÑ­»·Ã¿1s·¢ËÍÒ»ÌõÏûÏ¢

Õâ±ß¿´µ½ÎÒÃǵÄÏûÏ¢Ò²ÊÇÓÃsessionÀ´´´½¨µÄ£¬ÕâÀïÃæÎÒÃÇÓõÄÊÇÎı¾µÄÏûÏ¢ÀàÐÍ

·¢ËÍÏûÏ¢

Ìá½»Õâ´Î·¢ËÍ£¬ÖÁ´ËÎÒÃǵÄÏûÏ¢¾Í·¢Ë͵½ÁËbrokerÉÏÁË£¬ÓùýactivemqµÄͬѧ¶¼ÖªµÀ£¬activemqÌṩÁËÒ»¸öºÜºÃÓõĽçÃæ¿ÉÒԲ鵽ÄãµÄÏûÏ¢µÄ״̬£¬°üÀ¨ÊÇ·ñÏû·ÑµÈ

Ïû·Ñ£ºÏû·ÑÎÒÃÇÉÏÃæÒ²Ìáµ½ÁËÁ½ÖÖ·½Ê½£¬Í¬²½ºÍÒì²½£¬ÎÒÕâ±ß×¼±¸ÁËÁ½·Ý´úÂë·Ö±ð˵Ã÷ÁËÒ»ÏÂ

public void doMessage(String queueName){
try {
//´´½¨Destination
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = null;

while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:ÎÒÊÇÏû·ÑÕߣ¬ÎÒÕýÔÚÏû·ÑMsg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

ͬ²½£º¿ÉÒÔ¿´µ½ÏûÏ¢»áÒ»Ö±×èÈûµ½ÓÐÏûÏ¢²Å»á¼ÌÐø

ͨ¹ýsession´´½¨Ò»¸öDestination£¬ÎÒÕâ±ßÖ±½Ó¾ÍÓÃÁËqueueÁË

´´½¨ÁËÒ»¸öConsumer

×öÁËÒ»¸öËÀÑ­»·£¬ÀàËÆÓÚServerSocketµÄaccept·½·¨£¬ÎÒÃǵÄreceive»á×èÈûµ½ÕâÀֱµ½ÓÐÏûÏ¢

Èç¹ûÏûÏ¢²»Îª¿Õ¸æÖªÏûÏ¢Ïû·Ñ³É¹¦

consumer.setMessageListener(MessageListener {
public void onMessage(Message msg) {
try {
String message = ((TextMessage) msg).getText();
if(msg != null){
msg.acknowledge
System.out.println("³É¹¦Ïû·ÑÏûÏ¢£º"+message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
);

Òì²½£ºÇ°Á½²¿ºÍÉÏÃæÊÇÒ»ÑùµÄ£¬ÎÒÃÇ´ÓµÚÈý²½ËµÆð

3¡¢×¢²áÁËÒ»¸ö¼àÌý½Ó¿ÚµÄʵÏÖ£¬µ±ÓÐÏûϢʱ¾Íµ÷ÓÃonMessageµÄʵÏÖ£¬ºóÃæ¾ÍÒ»ÑùÁË

RocketMQ½éÉÜ

¼ò½é

rocketmqÊǰ¢Àï°Í°Í¿ªÔ´µÄÒ»¿î·Ö²¼Ê½µÄÏûÏ¢Öмä¼þ£¬ËûÔ´ÓÚjms¹æ·¶µ«ÊDz»×ñÊØjms¹æ·¶¡£¶ÔÓÚ·Ö²¼Ê½Ö»Ò»µã£¬Èç¹ûÄãÁËÓùýÆäËûmq²¢ÇÒÁ˽â¹ýrocketmq£¬¾ÍÖªµÀrocketmqÌìÉú¾ÍÊÇ·Ö²¼Ê½µÄ£¬¿ÉÒÔ˵ÊÇbroker¡¢provider¡¢consumerµÈ¸÷ÖÖ·Ö²¼Ê½¡£

ËûµÄ´ó¸ÅÌØµãÎÒÃÇ¿ÉÒÔ¿´Ï£¬ºóÃæµÄ½²½âÒ²ÊÇ»ùÓÚÕâÐ©ÌØµãµÄ£º

Äܹ»±£Ö¤ÑϸñµÄÏûϢ˳Ðò£¨ÐèÒª¼¯ÈºµÄÖ§³Ö£©

Ìṩ·á¸»µÄÏûÏ¢À­È¡Ä£Ê½£¨¿ÉÒÔÈÎÒⶨÒåÄãµÄÀ­È¡·½Ê½£¬exmapleÖÐÒ²ÌṩÁËÒ»¸öºÜºÃµÄÀý×Ó£©

¸ßЧµÄ¶©ÔÄÕßˮƽÀ©Õ¹ÄÜÁ¦£¨Í¨¹ýÒ»¸öconsumerGroupµÄ·½Ê½×öµ½consumerµÄ·½±ãÀ©ÈÝ£©

ʵʱµÄÏûÏ¢¶©ÔÄ»úÖÆ£¨ÏûÏ¢µÄÊµÊ±ÍÆËÍ£¬ÀàËÆÓÚÉÏÃæÔÛÃǵÄÒì²½Ïû·ÑµÄ·½Ê½£©

ÒÚ¼¶ÏûÏ¢¶Ñ»ýÄÜÁ¦£¨ÇáËÉÍê³ÉϵͳÏú·æ£©

·¢Õ¹ÀúÊ·

Õâ¿éÎÒ¾õµÃ»¹ÊÇÓбØÒªËµÒ»Ïµģ¬ÕâÑù¿ÉÒÔ°ïÖúÎÒÃÇÈ«Ãæ¿´Ò»¸öÊÂÎñµÄ·¢Õ¹¹æÂÉ

Ò»¡¢ Metaq£¨Metamorphosis£© 1.x

ÓÉ¿ªÔ´ÉçÇø killme2008 ά»¤£¬¿ªÔ´ÉçÇø·Ç³£»îÔ¾¡£

https://github.com/killme2008/Metamorphosis

¶þ¡¢ Metaq 2.x

ÓÚ 2012 Äê 10 Ô·ÝÉÏÏߣ¬ÔÚÌÔ±¦ÄÚ²¿±»¹ã·ºÊ¹ÓÃ

Èý¡¢¸ÄÃûΪRocketMQ

¹«Ë¾ÄÚ²¿¿ªÔ´¹²½¨µÄÔ­Ôò£¬rocketmqֻά»¤Á˺ËÐŦÄÜ£¬¿ÉÒÔ·½ÃæÃ¿¸öSUB£¨ÒµÎñµ¥Ôª£©¶¨ÖÆ£¬µ±È»°¢ÀïÄÚ²¿Ö®ËùÒÔÌṩ¸ßЧµÄÐÂÄܳöÁËrocketmq±¾ÉíÖ®Í⻹ÒÀÀµÓÚÁíÍâÒ»¸ö²úÆ·£¨oceanbaseÑôÕñÀ¤£©

https://github.com/apache/rocketmq

µ±Ç°°æ±¾Îª4.2.0-SNAPSHOT

Ñ¡ÔñµÄÀíÓÉ

ÕâÀïÎÒÃǶÔrocketmqµÄÌØÐÔ½øÐÐһЩ½éÉÜ£¬

Ç¿µ÷¼¯ÈºÎÞµ¥µã£¬¿ÉÀ©Õ¹£¬ÈÎÒâÒ»µã¸ß¿ÉÓã¬Ë®Æ½¿ÉÀ©Õ¹

·½±ã¼¯ÈºÅäÖ㬶øÇÒÈÝÒ×À©Õ¹£¨ºáÏòºÍ×ÝÏò£©£¬Í¨¹ýslaveµÄ·½Ê½Ã¿Ò»µã¶¼¿ÉÒÔʵÏָ߿ÉÓÃ

Ö§³ÖÉÏÍò¸ö¶ÓÁУ¬Ë³ÐòÏûÏ¢

˳ÐòÏû·ÑÊÇʵÏÖÔÚͬһ¶ÓÁеģ¬Èç¹û¸ß²¢·¢µÄÇé¿ö¾ÍÐèÒª¶ÓÁеÄÖ§³Ö£¬rocketmq¿ÉÒÔÂú×ãÉÏÍò¸ö¶ÓÁÐͬÊ´æÔÚ

ÈÎÐÔ¶¨ÖÆÄãµÄÏûÏ¢¹ýÂË

rocketmqÌṩÁËÁ½ÖÖÀàÐ͵ÄÏûÏ¢¹ýÂË£¬Ò²¿ÉÒÔ˵ÈýÖÖ¿ÉÒÔͨ¹ýtopic½øÐÐÏûÏ¢¹ýÂË¡¢¿ÉÒÔͨ¹ýtag½øÐÐÏûÏ¢¹ýÂË¡¢»¹¿ÉÒÔͨ¹ýfilterµÄ·½Ê½ÈÎÒâ¶¨ÖÆ¹ýÂË

ÏûÏ¢µÄ¿É¿¿ÐÔ£¨ÎÞBuffer£¬³Ö¾Ã»¯£¬ÈÝ´í£¬»ØËÝÏû·Ñ£©

ÏûÏ¢ÎÞbuffer¾Í²»Óõ£ÐÄbuffer»ØÂúµÄÇé¿ö£¬rocketmqµÄËùÓÐÏûÏ¢¶¼Êdz־û¯µÄ£¬Éú²úÕß±¾Éí¿ÉÒÔ½øÐдíÎóÖØÊÔ£¬·¢ËÍÕßÒ²»á°´ÕÕʱ¼ä½×Ìݵķ½Ê½½øÐÐÏûÏ¢ÖØ·¢£¬ÏûÏ¢»ØËÝ˵µÄÊÇ¿ÉÒÔ°´ÕÕÖ¸¶¨µÄʱ¼ä½øÐÐÏûÏ¢µÄÖØÐÂÏû·Ñ£¬¼È¿ÉÒÔÏòǰҲ¿ÉÒÔÏòºó£¨Ç°ÌáÌõ¼þÊÇҪעÒâÏûÏ¢µÄ²Á³ýʱ¼ä£©

º£Á¿ÏûÏ¢¶Ñ»ýÄÜÁ¦£¬ÏûÏ¢¶Ñ»ýºó£¬Ð´ÈëµÍÑÓ³Ù

Õë¶ÔÓÚproviderÐèÒªÅäºÏ²¿Êð·½Ê½£¬¶ÔÓÚconsumer£¬Èç¹ûÊǼ¯Èº·½Ê½Ò»µ©master·µÏÖÏûÏ¢¶Ñ»ý»áÏòconsumerÏ·¢Ò»¸öÖØ¶¨ÏòÖ¸Á´Ëʱconsumer¾Í¿ÉÒÔ´Óslave½øÐÐÊý¾ÝÏû·ÑÁË

·Ö²¼Ê½ÊÂÎñ

ÎÒ¸öÈ˸оõrocketmq3.2.6¶ÔÕâÒ»¿é˵µÄ²»ÊǺÜÇåÎú£¬¶øÇÒ¹Ù·½Ò²ËµÏÖÔÚÕâ¿é´æÔÚȱÏÝ£¨»áÁîϵͳpagecache¹ý¶à£©£¬ËùÒÔÏßÉϽ¨Ò黹ÊÇÉÙÓÃΪºÃ£¬Õâ¿éÎÒÒ²ÊǺóÃæ¸ø´ó¼Ò¿´Ò»ÏÂÁÐ×Ó

ÏûϢʧ°ÜÖØÊÔ»úÖÆ

Õë¶ÔproviderµÄÖØÊÔ£¬µ±ÏûÏ¢·¢Ë͵½Ñ¡¶¨µÄbrokerʱÈç¹û³öÏÖʧ°Ü»á×Ô¶¯Ñ¡ÔñÆäËûµÄbroker½øÐÐÖØ·¢£¬Ä¬ÈÏÖØÊÔÈý´Î£¬µ±È»ÖØÊÔ´ÎÊýÒªÔÚÏûÏ¢·¢Ë͵ij¬Ê±Ê±¼ä·¶Î§ÄÚ¡£

Õë¶ÔconsumerµÄÖØÊÔ£¬Èç¹ûÏûÏ¢ÒòΪ¸÷ÖÖÔ­ÒòûÓÐÏû·Ñ³É¹¦£¬»á×Ô¶¯¼ÓÈëµ½ÖØÊÔ¶ÓÁУ¬Ò»°ãÇé¿öÈç¹ûÊÇÒòÎªÍøÂçµÈÎÊÌâÁ¬ÐøÖØÊÔÒ²ÊÇÕÕÑùʧ°Ü£¬ËùÒÔrocketmqÒ²ÊDzÉÓý×ÌÝÖØÊԵķ½Ê½¡£

¶¨Ê±Ïû·Ñ

³öÁËÉÏÃæµÄÅäÖã¬ÔÚ·¢ËÍÏûÏ¢ÊÇÒ²¿ÉÒÔÕë¶ÔmessageÉèÖÃsetDelayTimeLevel

»îÔ¾µÄ¿ªÔ´ÉçÇø

ÏÖÔÚrocketmq³ÉΪÁËapacheµÄÒ»¿î¿ªÔ´²úÆ·£¬»îÔ¾¶ÈÒ²ÊDz»ÈÝ»³ÒɵÄ

³ÉÊì¶È£¨¾­¹ý˫ʮһ¿¼Ñ飩

Õë¶Ô±¾ÉíµÄ³ÉÊì¶È£¬ÎÒÃÇ¿´¿´Õâô¶àÄêµÄ˫ʮһ¾Í¿ÉÏë¶øÖªÁË

רÓÐÊõÓï

NameServer

ÕâÀïÎÒÃÇ¿ÉÒÔÀí½â³ÉÀàËÆÓÚzkµÄÒ»¸ö×¢²áÖÐÐÄ£¬¶øÇÒrocketmq×î³õÒ²ÊÇ»ùÓÚzk×÷Ϊע²áÖÐÐĵģ¬ÏÖÔÚÏ൱ÓÚΪrocketmq×Ô¶¨ÒåÁËÒ»¸ö×¢²áÖÐÐÄ£¬´úÂë²»³¬¹ý1000ÐС£RocketMQ ÓжàÖÖÅäÖ÷½Ê½¿ÉÒÔÁî¿Í»§¶ËÕÒµ½ Name Server, È»ºóͨ¹ý Name Server ÔÙÕÒµ½ Broker£¬·Ö±ðÈçÏ£¬ÓÅÏȼ¶Óɸߵ½µÍ£¬¸ßÓŃ´¼¶»á¸²¸ÇµÍÓŃ´¼¶¡£¿Í»§¶ËÌṩhttpºÍip+¶Ë¿ÚºÅµÄÁ½ÖÖ·½Ê½£¬ÍƼöʹÓÃhttpµÄ·½Ê½¿ÉÒÔʵÏÖnameserverµÄÈȲ¿Êð

Push Consumer

Consumer µÄÒ»ÖÖ£¬Ó¦ÓÃͨ³£Í¨¹ý Consumer ¶ÔÏó×¢²áÒ»¸ö Listener ½Ó¿Ú£¬Ò»µ©ÊÕµ½ÏûÏ¢£¬Consumer ¶ÔÏóÁ¢¿Ì»Øµ÷ Listener ½Ó¿Ú·½·¨£¬ÀàËÆÓÚactivemqµÄ·½Ê½

Pull Consume

Consumer µÄÒ»ÖÖ£¬Ó¦ÓÃͨ³£Ö÷¶¯µ÷Óà Consumer µÄÀ­ÏûÏ¢·½·¨´Ó Broker À­ÏûÏ¢£¬Ö÷¶¯È¨ÓÉÓ¦ÓÿØÖÆ

Producer Group

Ò»ÀàproducerµÄ¼¯ºÏÃû³Æ£¬ÕâÀàproducerͨ³£·¢ËÍÒ»ÀàÏûÏ¢£¬ÇÒ·¢ËÍÂß¼­Ò»ÖÂ

Consumer Group

ͬÉÏ£¬consumerµÄ¼¯ºÏÃû³Æ

Broker

ÏûÏ¢ÖÐתµÄ½ÇÉ«£¬¸ºÔð´æ´¢ÏûÏ¢£¨Êµ¼ÊµÄ´æ´¢Êǵ÷ÓõÄstore×é¼þÍê³ÉµÄ£©£¬×ª·¢ÏûÏ¢£¬Ò»°ãÒ²³ÉΪserver£¬Í¨jmsÖеÄprovider

Message Filter

¿ÉÒÔʵÏָ߼¶µÄ×Ô¶¨ÒåµÄÏûÏ¢¹ýÂË£¬java±àд

Master/Slave

¼¯ÈºµÄÖ÷´Ó¹ØÏµ£¬brokerµÄnameÏàͬ£¬brokerid=0µÄΪÖ÷£¬´óÓÚ0µÄΪ´Ó

²¿Êð·½Ê½

ÎïÀí²¿Êð

NameServer £ºÀàËÆÔÆzkµÄ¼¯Èº£¬Ö÷ÒªÊÇά»¤ÁËbrokerµÄÏà¹ØÄÚÈÝ£¬½øÐдæÈ¡£»½ÚµãÖ®¼äÎÞÈκÎÊý¾Ýͬ²½

1¡¢½ÓÊÕbrokerµÄ×¢²á£¬×¢ÏúÇëÇó

2¡¢Producer»ñÈ¡topicÏÂËùÓеÄBrokerQueue£¬putÏûÏ¢

3¡¢Consumer»ñÈ¡topicÏÂËùÓеÄBrokerQueue£¬getÏûÏ¢

Broker £º

²¿ÊðÏà¶Ô¸´ÔÓ£¬Broker·ÖΪMasterÓëSlave£¬Ò»¸öMaster¿ÉÒÔ¶ÔÓ¦¶à¸öSlave£¬µ«ÊÇÒ»¸öSlaveÖ»ÄܶÔÓ¦Master£¬MasterºÍSlaveµÄ¶ÔÓ¦¹ØÏµÍ¨¹ýÖÆ¶¨ÏàͬµÄBrokerNameÀ´È·¶¨£¬Í¨¹ýÖÆ¶¨BrokerIdÀ´Çø·ÖÖ÷´Ó£¬Èç¹ûÊÇ0ÔòΪMaster£¬Èç¹û´óÓÚ0ÔòΪSlave¡£MasterÒ²¿ÉÒÔ²¿Êð¶à¸ö¡£Ã¿¸öBrokerÓëName Server¼¯ÈºÖеÄËùÓнڵ㽨Á¢³¤Á¬½Ó£¬¶¨Ê±×¢²áTopicÐÅÏ¢µ½ËùÓеÄNameServer

Producer£º

ÓëName sever¼¯ÈºÖÐµÄÆäÖÐÒ»¸ö½Úµã£¨ËæÒâÑ¡Ôñ£©½¨Á¢³¤Á¬½Ó£¬¶¨ÆÚµÄ´ÓName ServerÈ¡Topic·ÓÉÐÅÏ¢£¬²¢ÏòÌṩTopic·þÎñµÄMaster¼òÀú³¤Á¬½Ó£¬ÇÒ¶¨Ê±ÏòMaster·¢ËÍÐÄÌø¡£ProducerÍêÈ«ÎÞ״̬£¬¿ÉÒÔ¼¯Èº²¿Êð¡£

Consumer:

ÓëName Server¼¯ÈºÖÐµÄÆäÖÐÒ»¸ö½Úµã(Ëæ»úÑ¡Ôñ)½¨Á¢³¤Á¬½Ó,¶¨ÆÚ´ÓName ServerÈ¡Topic·ÓÉÐÅÏ¢£¬²¢ÏòÌṩTopicµÄMaster¡¢Slave¼òÀú³¤Á¬½Ó£¬ÇÒ¶¨Ê±ÏòMaster¡¢Slave·¢ËÍÐÄÌø£¬Consumer¼È¿ÉÒÔ´ÓMaster¶©ÔÄÏûÏ¢£¬Ò²¿ÉÒÔ´ÓSlave¶©ÔÄÏûÏ¢£¬¶©ÔĹæÔòÓÐBrokerÅäÖþö¶¨¡£

Âß¼­²¿Êð

Producer Group£º

ÓÃÀ´±íʾһ¸ö·¢ËÍÏûÏ¢Ó¦Óã¬Ò»¸öProducer GroupϰìºÃ¶à¸öProducerʵÀý£¬¿ÉÊǶą̀»úÆ÷£¬Ò²¿ÉÒÔÊÇһ̨»úÆ÷µÄ¶à¸öỊ̈߳¬»òÕßÒ»¸ö½ø³ÌµÄ¶à¸öProducer¶ÔÏó£¬Ò»¸öProducer Group¿ÉÒÔ·¢ËͶà¸öTopicÏûÏ¢£¬Producer GroupµÄ×÷ÓÃÈçÏ£º

1¡¢±êʶһÀàProducer£¨·Ö²¼Ê½£©

2¡¢¿ÉÒÔͨ¹ýÔËά¹¤¾ß²éѯÕâ¸ö·¢ËÍÏûÏ¢Ó¦ÓÃÓжàÉÙ¸öProducerÊÇßÖ

3¡¢·¢ËÍ·Ö²¼Ê½ÊÂÎñÏûϢʱ£¬Èç¹ûProducerÖÐ;ÒâÍâµ±¼´£¬Broker»áÖ÷¶¯»Øµ÷Producer GroupÄÚµÄÈÎÒâһ̨»úÆ÷À´È·ÈÏÊÂÎñ״̬¡£

Consumer Group£º

ÓÃÀ´±íʾһ¸öÏû·ÑÏûÏ¢Ó¦Óã¬Ò»¸öConsumer Groupϰüº¬¶à¸öConsumerʵÀý£¬¿ÉÒÔÊǶą̀»úÆ÷£¬Ò²¿ÉÒÔÊǶà¸ö½ø³Ì£¬»òÕßÊÇÒ»¸ö½ø³ÌµÄ¶à¸öConsumer¶ÔÏó¡£Ò»¸öConsumer GroupϵĶà¸öConsumerÒÔ¾ù̯·½Ê½Ïû·ÑÏûÏ¢¡£Èç¹ûÉèÖÃΪ¹ã²¥·½Ê½£¬ÄÇôÕâ¸öConsumer GroupϵÄÿ¸öʵÀý¶¼Ïû·ÑÈ«Á¿Êý¾Ý¡£

µ¥Masterģʽ

Ö»ÓÐÒ»¸ö Master½Úµã

Óŵ㣺ÅäÖüòµ¥£¬·½±ã²¿Êð

ȱµã£ºÕâÖÖ·½Ê½·çÏսϴó£¬Ò»µ©BrokerÖØÆô»òÕßå´»úʱ£¬»áµ¼ÖÂÕû¸ö·þÎñ²»¿ÉÓ㬲»½¨ÒéÏßÉÏ»·¾³Ê¹ÓÃ

¶àMasterģʽ

Ò»¸ö¼¯ÈºÎÞ Slave£¬È«ÊÇ Master£¬ÀýÈç 2 ¸ö Master »òÕß 3 ¸ö Master

Óŵ㣺ÅäÖüòµ¥£¬µ¥¸öMaster å´»ú»òÖØÆôά»¤¶ÔÓ¦ÓÃÎÞÓ°Ï죬ÔÚ´ÅÅÌÅäÖÃΪRAID10 ʱ£¬¼´Ê¹»úÆ÷å´»ú²»¿É»Ö¸´Çé¿öÏ£¬ÓÉÓë RAID10´ÅÅ̷dz£¿É¿¿£¬ÏûÏ¢Ò²²»»á¶ª£¨Ò첽ˢÅ̶ªÊ§ÉÙÁ¿ÏûÏ¢£¬Í¬²½Ë¢ÅÌÒ»Ìõ²»¶ª£©¡£ÐÔÄÜ×î¸ß¡£¶à Master ¶à Slave ģʽ£¬Òì²½¸´ÖÆ

ȱµã£ºµ¥Ì¨»úÆ÷å´»úÆÚ¼ä£¬Õą̂»úÆ÷ÉÏδ±»Ïû·ÑµÄÏûÏ¢ÔÚ»úÆ÷»Ö¸´Ö®Ç°²»¿É¶©ÔÄ£¬ÏûϢʵʱÐÔ»áÊܵ½Êܵ½Ó°Ïì

¶àMaster¶àSlaveģʽ£¨Òì²½¸´ÖÆ£©

ÿ¸ö Master ÅäÖÃÒ»¸ö Slave£¬Óжà¶ÔMaster-Slave£¬ HA£¬²ÉÓÃÒì²½¸´ÖÆ·½Ê½£¬Ö÷±¸ÓжÌÔÝÏûÏ¢ÑÓ³Ù£¬ºÁÃë¼¶¡£

Óŵ㣺¼´Ê¹´ÅÅÌË𻵣¬ÏûÏ¢¶ªÊ§µÄ·Ç³£ÉÙ£¬ÇÒÏûϢʵʱÐÔ²»»áÊÜÓ°Ï죬ÒòΪMaster å´»úºó£¬Ïû·ÑÕßÈÔÈ»¿ÉÒÔ´Ó SlaveÏû·Ñ£¬´Ë¹ý³Ì¶ÔÓ¦ÓÃ͸Ã÷¡£²»ÐèÒªÈ˹¤¸ÉÔ¤¡£ÐÔÄÜͬ¶à Master ģʽ¼¸ºõÒ»Ñù¡£

ȱµã£º Master å´»ú£¬´ÅÅÌËð»µÇé¿ö£¬»á¶ªÊ§ÉÙÁ¿ÏûÏ¢¡£

¶àMaster¶àSlaveģʽ£¨Í¬²½Ë«Ð´£©

ÿ¸ö Master ÅäÖÃÒ»¸ö Slave£¬Óжà¶ÔMaster-Slave£¬ HA²ÉÓÃͬ²½Ë«Ð´·½Ê½£¬Ö÷±¸¶¼Ð´³É¹¦£¬ÏòÓ¦Ó÷µ»Ø³É¹¦¡£

Óŵ㣺Êý¾ÝÓë·þÎñ¶¼ÎÞµ¥µã£¬ Masterå´»úÇé¿öÏ£¬ÏûÏ¢ÎÞÑÓ³Ù£¬·þÎñ¿ÉÓÃÐÔÓëÊý¾Ý¿ÉÓÃÐÔ¶¼·Ç³£¸ß

ȱµã£ºÐÔÄܱÈÒì²½¸´ÖÆÄ£Ê½ÂԵͣ¬´óÔ¼µÍ 10%×óÓÒ£¬·¢Ë͵¥¸öÏûÏ¢µÄ RT»áÂԸߡ£Ä¿Ç°Ö÷å´»úºó£¬±¸»ú²»ÄÜ×Ô¶¯Çл»ÎªÖ÷»ú£¬ºóÐø»áÖ§³Ö×Ô¶¯Çл»¹¦ÄÜ

ÌØÐÔʹÓÃ

Quick start

Producer£º

mport com.alibaba.rocketmq.client.exception.
MQClientException;
import com.alibaba.rocketmq.client.producer.
DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.
SendResult;
import com.alibaba.rocketmq.common.message.
Message;


/**
* Producer£¬·¢ËÍÏûÏ¢
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("pay_topic_01");
producer.setNamesrvAddr("100.8.8.88:9876");
producer.start();

for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}

1¡¢´´½¨Ò»¸öProducerµÄ£¬ÕâÀïÎÒÃÇ¿´µ½rocketmqµÄ´´½¨producerºÜ¼òµ¥Ö»ÊäÈëÒ»¸öGroup NameÃû×־ͿÉÒÔ£¬²»ÏòactivemqÄÇô¸´ÔÓ

2¡¢µÚ¶þ²½¾ÍÊÇÖÆ¶¨Name ServerµÄµØÖ·£¬ÕâÀï×¢ÒâÁ½µã£¬Ò»¸ö¾ÍÊÇnameserverµÄĬÈ϶˿ÚÊÇ9876£¬ÁíÒ»¸ö¾ÍÊǶà¸önameserver¼¯ÈºÓ÷ֺÅÀ´·Ö¸î

3¡¢ÎÒÕâ±ßÑ­»··¢ËÍÁË1000¸öÏûÏ¢

4¡¢ÏûÏ¢´´½¨Ò²ºÜ¼òµ¥£¬µÚÒ»¸ö²ÎÊýÊÇtopic£¬µÚ¶þ¸ö¾ÍÊÇtags£¨¶à¸ötagÓÃ||Á¬½Ó£©£¬µÚÈý¸ö²ÎËÞÊÇÏûÏ¢ÄÚÈÝ

5¡¢µ÷ÓÃsend·½·¨¾ÍÄÜ·¢Ëͳɹ¦ÁË£¬²»ÓÃÏëactimemqÄÇÑùÐèÒªcommit

Õû¸öµÄ¹ý³ÌÊǺÜÇåÎúµÄ

Consumer£º

import java.util.List;

import com.alibaba.rocketmq.client.consumer.
DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.
ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener
.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.
MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.
MQClientException;
import com.alibaba.rocketmq.common.consumer.
ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;


/**
* Consumer£¬¶©ÔÄÏûÏ¢
*/
public class Consumer {

public static void main(String[] args) throws
InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer
("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("100.8.8.88:9876");
/**
* ÉèÖÃConsumerµÚÒ»´ÎÆô¶¯ÊÇ´Ó¶ÓÁÐÍ·²¿¿ªÊ¼Ïû·Ñ
»¹ÊǶÓÁÐβ²¿¿ªÊ¼Ïû·Ñ<br>
* Èç¹û·ÇµÚÒ»´ÎÆô¶¯£¬ÄÇô°´ÕÕÉÏ´ÎÏû·ÑµÄλÖüÌÐøÏû·Ñ
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.
CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new
MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage
(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

1¡¢Ç°Á½²½ºÍProducerÊÇÒ»ÑùµÄ

2¡¢ÕâÀï¿ÉÒÔÉèÖôÓÄǸöλÖÿªÊ¼¶ÁÈ¡ÏûÏ¢£¬Ò»°ãÎÒÃÇ»á´ÓÍ·²¿¿ªÊ¼¶ÁÈ¡Ïû·Ñ£¬ÏµÍ³ÖÐ×¢ÒâÈ¥ÖØ£¬Ò²¾ÍÊÇÃݵÈ

3¡¢¶©ÔÄtopic£¬µÚÒ»¸ö²ÎÊýÊÇtopicÃû×Ö£¬µÚ¶þ¸öÊÇtag£¬Èç¹ûΪ*µÄ¾ÍÊÇÈ«²¿ÏûÏ¢

4¡¢×¢²áÒ»¸ö¼àÌý£¬Èç¹ûÓÐÏûÏ¢¾Í»áʵʱµÄÍÆË͵½Consumer£¬µ÷ÓÃconsumeMessage½øÐÐÏû·Ñ£¬ÕâÀïÎÒÃÇ¿´µ½msgsÊÇÒ»¸öList£¬Ä¬ÈÏÿ´ÎÍÆË͵ÄÊÇÒ»ÌõÏûÏ¢¡£

5¡¢½øÐÐÏûÏ¢µÄÏû·ÑÂß¼­£¬Ïû·Ñ³É¹¦ºó»á·µ»ØCONSUME_SUCCESS״̬

ÏûÏ¢¹ýÂË

RocketMqµÄÏûÏ¢¹ýÂËÊÇ´Ó¶©ÔĵÄʱºò¿ªÊ¼µÄ£¬ÎÒÃÇ¿´µ½¸Õ²ÅµÄÀý×Ó¶¼ÊÇͨ¹ýtopicµÄtags½øÐеĹýÂË£¬Õâ¸öÒªÇóProducer·¢Ë͵ÄʺóÖ¸¶¨tags£¬Õâ¸öºÍÇ°ÃæÓеãì¶Ü£¬µ«ÊÇÇ°ÃæÖ»ÊǽøÐÐÁË·Ö×飬²¢Î´½øÐйýÂË¡£ConsumerÔÚ¶©ÔÄÏû·ÑµÄʱºòÖ¸¶¨µÄtags²Å»á¶ÔÏûÏ¢½øÐйýÂË£¬ÕâÖÖÊǼòµ¥µÄ¹ýÂË·½Ê½£¬²»¹ýÒ²¿ÉÒÔÂú×ãÎÒÃǴ󲿷ֵÄÏûÏ¢¹ýÂË¡£¸ü¸ß¼¶µÄ¹ýÂ˾ÍÊÇÎÒÃÇÕâÕÅÆ¬×ÓËùչʾµÄÕâÖÖ

1¡¢Ç°ÃæºÍºóÃæ²¿·Ö²»±ä£¬ºìÉ«¿ò²¿·ÖÐèÒªÖ¸¶¨Ò»¸ö¹ýÂËÀ֮࣬ǰÕâÀïÊÇtags

2¡¢ÎÒÃÇ¿´µ½ËùÓеĹýÂËÀà¶¼ÒªÖ±½Ó»òÕß¼ä½ÓʵÏÖMessageFilter½Ó¿Ú£¬²¢ÇÒÐèÒª¸²¸Çmatch·½·¨

3¡¢ÔÚ·½·¨ÀïÃæ¾Í¿ÉÒÔд×Ô¼ºµÄ¹ýÂËÂß¼­ÁË£¬Õâ¸öµØ·½³öÁËÓÃÊÂÏÈÖÆ¶¨µÄÊôÐÔÒ²¿ÉÒÔ·´ÐòÁл¯ÕâЩÏûÏ¢ÄÚÈݽøÐÐÏûÏ¢½âÎö£¬Õë¶ÔÏûÏ¢ÌåµÄ¹ýÂË

˳ÐòÏûÏ¢

ÒòΪһЩÏûÏ¢¿ÉÒÔÐèÒª°´ÕÕ˳ÐòÏû·Ñ²ÅÓÐÒâÒ壬±ÈÈçijÀý×ÓÏÖÔÚÊÇÒ첽ȥִÐеĵ±È»ÏÖÔÚÊDzÉÓõĶ¨Ê±µÄ·½Ê½£¬±ÈÈçÎÒÃǰÑÏÖÔÚµÄģʽÌ×ÉÏÀ´£¬¿´¿´Ë³ÐòÏû·ÑÊÇÒ»¸öʲôÑù×Ó¡£¶©µ¥´´½¨¡··ÖÅú¡·´ò°ü¡·Íâ·¢¡£¡£¡£¡££¬Ò²¾ÍÊDZØÐëÑϸñ°´ÕÕ˳Ðò²ÅÓÐÒâÒå¡£ÄÇôÎÒÃÇÈçºÎ±£Ö¤ÕâÅúÏûÏ¢µÄ˳ÐòÏû·Ñ¾ÍÏԵúÜÖØÒªÁË¡£rocketmqʵÏֵķ½Ê½Ò²ºÜ¼òµ¥£¬Ö»ÒªÎÒÃǰÑÕâЩÏûÏ¢¶¼·Åµ½Ò»¸ö¶ÓÁÐÖоÍÄܹ»×öµ½Ë³ÐòÏû·ÑÁË£¬Êµ¼ÊÉÏrocketmqµÄ˳ÐòÏû·ÑÓÐÁ½ÖÖ·½Ê½£¬Ò»ÖÖÊÇÆÕͨµÄ˳ÐòÏû·Ñ£¨¶àMaster¶àSlaveµÄÒì²½¸´ÖÆ£©£¬Ò»ÖÖÊÇÑϸñµÄ˳ÐòÏû·Ñ£¨¶àMaster¶àSlaveµÄͬ²½Ë«Ð´£©¡£

import java.util.List;

import com.alibaba.rocketmq.client.exception.
MQBrokerException;
import com.alibaba.rocketmq.client.exception.
MQClientException;
import com.alibaba.rocketmq.client.producer.
DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.
MQProducer;
import com.alibaba.rocketmq.client.producer
.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.
SendResult;
import com.alibaba.rocketmq.common.message
.Message;
import com.alibaba.rocketmq.common.message.
MessageQueue;
import com.alibaba.rocketmq.remoting.exception.
RemotingException;


/**
* Producer£¬·¢ËÍ˳ÐòÏûÏ¢
*/
public class Producer {
public static void main(String[] args) {
try {
MQProducer producer = new DefaultMQProducer("please_
rename_unique_group_name");
producer.setNamesrvAddr("100.8.8.88:9876");
producer.start();

String[] tags = new String[] { "TagA", "TagB",
"TagC", "TagD", "TagE" };

for (int i = 0; i < 100; i++) {
// ¶©µ¥IDÏàͬµÄÏûÏ¢ÒªÓÐÐò
int orderId = i % 10;
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue>
mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

System.out.println(sendResult);
}

producer.shutdown();
}
catch (MQClientException e) {
e.printStackTrace();
}
catch (RemotingException e) {
e.printStackTrace();
}
catch (MQBrokerException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}

1¡¢Ê×ÏÈÒª±£ÕÏÏûϢҪͬʱÔÚÒ»¸ötopicÖÐ

2¡¢Òª±£ÕÏÒª·¢Ë͵ÄÏûÏ¢ÓÐÏàͬµÄtag

3¡¢ÔÚ·¢ËÍʱҪ±£ÕϽ«Êý¾Ý·¢Ë͵½Í¬Ò»¸ö¶ÓÁУ¨queue£©£¬ÎÒÃÇÕâÀï²ÉÓõÄȡģµÄ·½Ê½

Ç°ÃæÎÒÃÇ˵¹ýrocketmq¿ÉÒÔͬʱ֧³ÖÉÏÍê¸ö¶ÓÁУ¬Õâ¸öÒ²ÊÇΪÁË˳ÐòÏû·ÑÀ´¿¼ÂǵÄ

ÊÂÎñÏûÏ¢

˵µÀÊÂÎñ£¬ÎÒÏȸø´ó¼Ò¾Ù¸öÀý×Ó£¬±ÈÈçÓÐÁ½¸öÕË»§ÕÅÈý¡¢ÀîËÄ£¬ÕÅÈýÒª¸øÀîËÄת10¿éÇ®£¬ÒÔ϶¼ÔÚͬһ¸öÊÂÎñÖнøÐУ¬Ëø¶¨ÊÇͨ¹ýÊÂÎñÖÐÍê³ÉµÄ

1¡¢Ëø¶¨ÕÅÈýºÍÀîËĵÄÕË»§

2¡¢ÅжÏÕÅÈýµÄÕË»§ÊÇ·ñ´óÓÚµÈÓÚ10¿éÇ®£¬Èç¹û´óÓÚµÈÓÚÔò¼ÌÐø£¬Ð¡ÓÚÔò·µ»Ø£¬ÎÒÃÇÕâÀïÖ»ÌÖÂÛ´óÓÚµÈÓÚµÄ

3¡¢´ÓÕÅÈýµÄÕË»§ÉϼõÈ¥10¿é

4¡¢ÏòÀîËĵÄÕË»§Ôö¼Ó10¿é

5¡¢½âËøÕË»§Íê³É½»Ò×

update account set amount = amount - 100 where userNo='zhangsan' and amount >=10

update account set amount = amount + 100 where userNo='lisi'

Èç¹ûÊÇ·Ö²¼Ê½ÊÂÎñ¾ÍÒª¿¼Âǵ½Á½¸öÓû§ÕË»§µÄÒ»ÖÂÐÔ£¬ÎÒÃǾʹӷֲ¼Ê½µÄ½Ç¶ÈÀ´·ÖÎöÒ»ÏÂ

1¡¢Ëø¶¨ÕÅÈýµÄÕË»§£¬Í¬Ê±Í¨¹ýÍøÂçËø¶¨ÀîËĵÄÕË»§£¨¿ÉÒÔÀí½â³É¶³½á½ð¶î£©

2¡¢ÅжÏÕÅÈýµÄÕË»§ÊÇ·ñ´óÓÚµÈÓÚ10¿éÇ®£¬Èç¹û´óÓÚµÈÓÚÔò¼ÌÐø£¬Ð¡ÓÚÔò·µ»Ø£¬ÎÒÃÇÕâÀïÖ»ÌÖÂÛ´óÓÚµÈÓÚµÄ

3¡¢´ÓÕÅÈýµÄÕË»§ÉϼõÈ¥10¿é

4¡¢Í¨¹ýÍøÂçÏòÀîËĵÄÕË»§Ôö¼Ó10¿é

5¡¢½âËøÕÅÈýÕË»§Íê³É½»Ò×£¬Í¨¹ýÍøÂç½âËøÀîËĵÄÕË»§£¬Ê±¼ä»ù±¾ÉÏÊÇÀۼƵÄ

ͨ¹ýrocketmqÔõô×öÕâ¸öʶùÄØ£¬Ê×ÏÈͨ¹ýrocketmq×öÕâ¸öʶùÎÒÃǾÍÒª·ÖÇåһϽÇÉ«£¬ÕÅÈýΪÊÂÎñµÄ·¢ÆðÕßÒ²¾ÍÊÇÏûÏ¢µÄ·¢ËÍÕߣ¬Ïà¶ÔÀîËľÍÊÇÏûÏ¢µÄÏû·ÑÕßÁË£¬rocketmq¿ÉÒÔÀí½â³ÉÖмäÕË»§£¬Ä¬ÈÏConsumer¶¼»á³É¹¦£¬Èç¹û²»³É¹¦¹Ù·½ÍƼöÈ˹¤½éÈë¡£

1¡¢ÅжÏÕÅÈýµÄÕË»§½ð¶î´óÓÚ10

2¡¢Í¬Ê±ÕÅÈýµÄÕË»§¼õÈ¥10

3¡¢Í¬Ê±¶ª³öÒ»¸ömqÏûÏ¢¸ørocketmq£¬Á½¸öҪȷ±£·ÅÔÚÒ»¸ödbÊÂÎñÖУ¨´ËʱµÄÏûÏ¢Ö»ÊÇ´¦ÓÚprapared½×¶Î£¬²»»á±»ConsumerËùÏû·Ñ£©

4¡¢Èç¹û±¾µØÊÂÎñÖ´Ðгɹ¦ÔòÏòrocketmq·¢ËÍcommit

5¡¢Èç¹ûµÚËIJ¿³öÏÖÁ˱¾Consumerå´»ú£¬Ò²¾ÍÊÇrocketmqûÓÐÊÕµ½commit£¬´Ë¿ÌÏûÏ¢ÊÇÊÇδ֪£¬ËùÒÔËû»áÏòÈÎÒâһ̨ProducerÀ´È·Èϵ±Ç°ÏûÏ¢µÄ״̬

6¡¢´Ó´Ë±£ÕÏÁ˱¾µØÕË»§ºÍrocketmqµÄÒ»ÖÂÐÔ

ÖпØÈçÏ£º

import com.alibaba.rocketmq.client.exception.
MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.
TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.
TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;


/**
* ·¢ËÍÊÂÎñÏûÏ¢Àý×Ó
*
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {

TransactionCheckListener transactionCheckListener
= new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer
("please_rename_unique_group_name");
producer.setNamesrvAddr("100.8.8.88:9876");
// ÊÂÎñ»Ø²é×îС²¢·¢Êý
producer.setCheckThreadPoolMinSize(2);
// ÊÂÎñ»Ø²é×î´ó²¢·¢Êý
producer.setCheckThreadPoolMaxSize(2);
// ¶ÓÁÐÊý
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener
(transactionCheckListener);
producer.start();
String[] tags = new String[] { "TagA",
"TagB", "TagC", "TagD", "TagE" };
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length],
"KEY" + i,
("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.
sendMessageInTransaction(msg, tranExecuter, null);
System.out.println(sendResult);
}
catch (MQClientException e) {
e.printStackTrace();
}
}

for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}

producer.shutdown();

}
}

±¾µØÊÂÎñ£º

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.
LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.
LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;


/**
* Ö´Ðб¾µØÊÂÎñ
*/
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex =
new AtomicInteger(1);


@Override
public LocalTransactionState executeLocalTransactionBranch(final
Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement();

if (value == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}

return LocalTransactionState.UNKNOW;
}
}

»Øµ÷¼ì²éµã£º

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.
LocalTransactionState;
import com.alibaba.rocketmq.client.producer.
TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt;


/**
* δ¾öÊÂÎñ£¬·þÎñÆ÷»Ø²é¿Í»§¶Ë
*/
public class TransactionCheckListenerImpl implements TransactionCheckListener {
private AtomicInteger transactionIndex =
new AtomicInteger(0);


@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("server checking TrMsg
" + msg.toString());

int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}

return LocalTransactionState.UNKNOW;
}
}

µã¶Ôµã/¹ã²¥

µã¶ÔµãºÍ·¢²¼¶©ÔĵÄÁ½ÖÖģʽÉÏÃæÎÒÃÇÒѾ­ËµÁ˺ܶ࣬ÕâÀïÖ»ÒªÎÒÃÇÔÚconsumerÀïÃæÅäÖÃMessageModel¾Í¿ÉÒÔ×öµ½Á½ÖÖģʽµÄÏû·Ñ£¬ÐèҪעÒâµÄÊÇÕâÀïÈç¹ûÅäÖÃÁË·¢²¼¶©ÔĵÄģʽÄÇôÉÏÃæÌáConsumerµÄ¸ºÔؾùºâ½«²»ÉúЧ£¨Consumer Name£©

//·¢²¼¶©ÔÄ
consumer.setMessageModel(MessageModel.BROADCASTING);
//¼¯ÈºÏû·Ñ£¨Ä¬ÈÏ£©
//consumer.setMessageModel(MessageModel.CLUSTERING);

ÍÆËÍ/À­È¡

ÉÏÃæÎÒÃÇ˵ÁËÕâô¶àÆäʵ¶¼ÊDzÉÓÃÏûÏ¢ÍÆË͵Äģʽ£¬×¢²á¼àÌý£¬µ±ÓÐÏûÏ¢²úÉúʱ¾Í»áʵʱµÄÍÆË͵½Consumer½øÐÐÏû·Ñ£¬ÎÒÕâÕÅͼÀïÃæÊÇÏûÏ¢À­È¡µÄ·½Ê½¡£Õâ¸ö¾ÍÏ൱Óë°ÑÖ÷¶¯È¨½»¸øÁËÓ¦ÓÃ×Ô¼ºÀ´¸ºÔ𣬵±È»ÕâÑùÒ²¾Í¸øÏû·ÑÔö¼ÓÁ˸´ÔÓÐÔ£¬±ÈÈç˵offsetµÄ´æ´¢¡¢¶¨Ê±À­È¡µÈ£¬°¢ÀïÕâ±ßÒ²ÊǸøÎÒÃÇÔö¼ÓÁËһЩ±éÀú£¬ÌṩÁËÒ»¸ödemo£¨Îļþ¼ÐÃûÊÇsimple£©£¬ºóÐø´ó¼ÒÈç¹ûÓõ½Ò²¿ÉÒԲο¼Ò»Ï¡£

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.alibaba.rocketmq.client.consumer.
DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.
PullResult;
import com.alibaba.rocketmq.client.exception
.MQClientException;
import com.alibaba.rocketmq.common.message.
MessageQueue;


/**
* PullConsumer£¬¶©ÔÄÏûÏ¢
*/
public class PullConsumer {
private static final Map<MessageQueue, Long>
offseTable = new HashMap<MessageQueue, Long>();


public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer
("please_rename_unique_group_name_5");

consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
putMessageQueueOffset(mq, pullResult.
getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}

consumer.shutdown();
}


private static void putMessageQueueOffset
(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}


private static long getMessageQueueOffset
(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;

return 0;
}

}

ÏûÏ¢»ØËÝ

¸ù¾Ýʱ¼äÀ´ÉèÖÃÏû·Ñ½ø¶È£¬ÉèÖÃ֮ǰҪ¹Ø±ÕÕâ¸ö¶©ÔÄ×éµÄËùÓÐconsumer£¬ÉèÖÃÍêÔÙÆô¶¯£¬·½¿ÉÉúЧ

»ØËÝÏû·ÑÊÇÖ¸ Consumer ÒѾ­Ïû·Ñ³É¹¦µÄÏûÏ¢£¬ÓÉÓÚÒµÎñÉÏÐèÇóÐèÒªÖØÐÂÏû·Ñ£¬Broker ÔÚConsumer ͶµÝ³É¹¦ÏûÏ¢ºó£¬ÏûÏ¢ÈÔÈ»ÐèÒª±£Áô¡£²¢ÇÒÖØÐÂÏû·ÑÒ»°ãÊǰ´ÕÕʱ¼äά¶È£¬ÀýÈçÓÉÓÚ Consumer ϵͳ¹ÊÕÏ£¬»Ö¸´ºóÐèÒªÖØÐÂÏû·Ñ 1 СʱǰµÄÊý¾Ý£¬ÄǾ٠Broker ÒªÌṩһÖÖ»úÖÆ£¬¿ÉÒÔ°´ÕÕʱ¼äά¶ÈÀ´»ØÍËÏû·ÑåöÈ

RocketMQ Ö§³Ö°´ÕÕʱ¼ä»ØËÝÏû·Ñ£¬Ê±¼äά¶È¾«È·µ½ºÁÃ룬¿ÉÒÔÏòǰ»ØËÝ£¬Ò²¿ÉÒÔÏòºó»ØËÝ

²Ù×÷£º mqadmin resetOffsetByTime½ØÖ¹µ½ÏÖÔÚÎÒÃǽñÌìÕû¸öµÄ·ÖÏí¾ÍËãÊǽáÊøÁË£¬ÎÒÕâ±ßÖ»ÊǶÔһЩºËÐĺÍһЩ³£ÓõŦÄÜ×öÁ˽éÉÜ£¬mqµÄ¶«Î÷»¹Óкܶà±ÈÈ磺¼¯Èº´î½¨µÄϸ½Ú¡¢Ê¹Óó¡¾°¡¢¿ØÖÆÌ¨ÃüÁî¡¢¹ÜÀíÒ³ÃæµÈ£¬´ó¼ÒÈç¹û¸ÐÐËȤҲ¿ÉÒÔ×ÔÐÐȥѧϰһÏ£¬Ò²¿ÉÒÔÕÒÎÒÀ´½»Á÷̽ÌÖ¡£

   
5601 ´Îä¯ÀÀ       34
Ïà¹ØÎÄÕÂ

ÆóÒµ¼Ü¹¹¡¢TOGAFÓëArchiMate¸ÅÀÀ
¼Ü¹¹Ê¦Ö®Â·-ÈçºÎ×öºÃÒµÎñ½¨Ä££¿
´óÐÍÍøÕ¾µçÉÌÍøÕ¾¼Ü¹¹°¸ÀýºÍ¼¼Êõ¼Ü¹¹µÄʾÀý
ÍêÕûµÄArchimateÊÓµãÖ¸ÄÏ£¨°üÀ¨Ê¾Àý£©
Ïà¹ØÎĵµ

Êý¾ÝÖÐ̨¼¼Êõ¼Ü¹¹·½·¨ÂÛÓëʵ¼ù
ÊÊÓÃArchiMate¡¢EA ºÍ iSpace½øÐÐÆóÒµ¼Ü¹¹½¨Ä£
ZachmanÆóÒµ¼Ü¹¹¿ò¼Ü¼ò½é
ÆóÒµ¼Ü¹¹ÈÃSOAÂ䵨
Ïà¹Ø¿Î³Ì

ÔÆÆ½Ì¨Óë΢·þÎñ¼Ü¹¹Éè¼Æ
ÖÐ̨սÂÔ¡¢ÖÐ̨½¨ÉèÓëÊý×ÖÉÌÒµ
ÒÚ¼¶Óû§¸ß²¢·¢¡¢¸ß¿ÉÓÃϵͳ¼Ü¹¹
¸ß¿ÉÓ÷ֲ¼Ê½¼Ü¹¹Éè¼ÆÓëʵ¼ù