Ò»¡¢·þÎñ¶Ë°²×°²¿Êð
ÎÒÊÇÔÚÐéÄâ»úÖеÄCentOS6.5ÖнøÐв¿Êð¡£
1.ÏÂÔØ³ÌÐò
2.tar -xvf alibaba-rocketmq-3.0.7.tar.gz
½âѹµ½Êʵ±µÄĿ¼Èç/opt/Ŀ¼
3.Æô¶¯RocketMQ£º½øÈërocketmq/bin Ŀ¼ Ö´ÐÐ
4.Æô¶¯Broker£¬ÉèÖöÔÓ¦µÄNameServer
nohup sh mqbroker -n "127.0.0.1:9876" & |
¶þ¡¢±àд¿Í»§¶Ë
¿ÉÒԲ鿴samepleÖеÄquickstartÔ´Âë 1.Consumer
ÏûÏ¢Ïû·ÑÕß
/** * Consumer£¬¶©ÔÄÏûÏ¢ */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("QuickStartConsumer"); consumer.subscribe("QuickStart", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } } |
2.ProducerÏûÏ¢Éú²úÕß
/** * Producer£¬·¢ËÍÏûÏ¢ * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("QuickStartProducer"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("QuickStart",// topic "TagA",// tag ("Hello RocketMQ ,QuickStart" + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } } |
3.Ê×ÏÈÔËÐÐConsumer³ÌÐò£¬Ò»Ö±ÔÚÔËÐÐ״̬½ÓÊÕ·þÎñÆ÷¶ËÍÆË͹ýÀ´µÄÏûÏ¢
23:18:07.587 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16 23:18:07.591 [main] DEBUG i.n.util.internal.PlatformDependent - Platform: Windows 23:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 7 23:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false 23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.ByteBuffer.cleaner: available 23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available 23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available 23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available 23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true 23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available 23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false 23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: unavailable 23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent -
You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes.
Please check the configuration for better performance. 23:18:07.595 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false 23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false 23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512 23:18:08.355 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0x8c0d4793e5820c31 23:18:08.446 [NettyClientWorkerThread_1] DEBUG io.netty.util.ResourceLeakDetector
- -Dio.netty.noResourceLeakDetection: false Consumer Started. |
4.ÔÙ´ÎÔËÐÐProducer³ÌÐò£¬Éú³ÉÏûÏ¢²¢·¢Ë͵½Broker£¬ProducerµÄÈÕÖ¾³åûÁË£¬µ«ÊÇ¿ÉÒÔ¿´µ½BrokerÍÆË͵½ConsumerµÄÒ»ÌõÏûÏ¢
ConsumeMessageThread-QuickStartConsumer-3 Receive New Messages: [MessageExt [queueId=0, storeSize=150,
queueOffset=244, sysFlag=0, bornTimestamp=1400772029972, bornHost=/10.162.0.7:54234,
storeTimestamp=1400772016017, storeHost=/127.0.0.1:10911, msgId=0A0A0A5900002A9F0000000000063257,
commitLogOffset=406103, bodyCRC=112549959, reconsumeTimes=0, preparedTransactionOffset=0,
toString()=Message [topic=QuickStart, flag=0,
properties={TAGS=TagA, WAIT=true, MAX_OFFSET=245, MIN_OFFSET=0}, body=29]]] |
Èý¡¢Consumer×î¼Ñʵ¼ù
1.Ïû·Ñ¹ý³ÌÒª×öµ½Ãݵȣ¨¼´Ïû·Ñ¶ËÈ¥ÖØ£©
RocketMQÎÞ·¨×öµ½ÏûÏ¢ÖØ¸´£¬ËùÒÔÈç¹ûÒµÎñ¶ÔÏûÏ¢ÖØ¸´·Ç³£Ãô¸Ð£¬Îñ±ØÒªÔÚÒµÎñ²ãÃæÈ¥ÖØ£¬ÓÐÒÔÏÂһЩ·½Ê½£º
£¨1£©.½«ÏûÏ¢µÄΨһ¼ü£¬¿ÉÒÔÊÇMsgId£¬Ò²¿ÉÒÔÊÇÏûÏ¢ÄÚÈÝÖеÄΨһ±êʶ×ֶΣ¬ÀýÈç¶©µ¥ID£¬Ïû·Ñ֮ǰÅжÏÊÇ·ñÔÚDB»òTair£¨È«¾ÖKV´æ´¢£©ÖдæÔÚ£¬Èç¹û²»´æÔÚÔò²åÈ룬²¢Ïû·Ñ£¬·ñÔòÌø¹ý¡££¨Êµ¼ù¹ý³ÌÒª¿¼ÂÇÔ×ÓÐÔÎÊÌ⣬ÅжÏÊÇ·ñ´æÔÚ¿ÉÒÔ³¢ÊÔ²åÈ룬Èç¹û±¨Ö÷¼ü³åÍ»£¬Ôò²åÈëʧ°Ü£¬Ö±½ÓÌø¹ý£©
msgidÒ»¶¨ÊÇÈ«¾ÖΨһµÄ±êʶ·û£¬µ«ÊÇ¿ÉÄÜ»á´æÔÚͬÑùµÄÏûÏ¢ÓÐÁ½¸ö²»Í¬µÄmsgidµÄÇé¿ö£¨ÓжàÖÖÔÒò£©£¬ÕâÖÖÇé¿ö¿ÉÄÜ»áʹҵÎñÉÏÖØ¸´£¬½¨Òé×îºÃʹÓÃÏûÏ¢ÌåÖеÄΨһ±êʶ×Ö¶ÎÈ¥ÖØ
£¨2£©.ʹҵÎñ²ãÃæµÄ״̬»úÈ¥ÖØ
2.ÅúÁ¿·½Ê½Ïû·Ñ
Èç¹ûÒµÎñÁ÷³ÌÖ§³ÖÅúÁ¿·½Ê½Ïû·Ñ£¬Ôò¿ÉÒԺܴó³Ì¶ÈÉϵÄÌá¸ßÍÌÍÂÁ¿£¬¿ÉÒÔͨ¹ýÉèÖÃConsumerµÄconsumerMessageBatchMaxSize²ÎÊý£¬Ä¬ÈÏÊÇ1£¬¼´Ò»´ÎÏû·ÑÒ»Ìõ²ÎÊý
3.Ìø¹ý·ÇÖØÒªµÄÏûÏ¢
·¢ÉúÏûÏ¢¶Ñ»ýʱ£¬Èç¹ûÏû·ÑËÙ¶ÈÒ»Ö±¸ú²»ÉÏ·¢ËÍËÙ¶È£¬¿ÉÒÔÑ¡Ôñ¶ªÆú²»ÖØÒªµÄÏûÏ¢
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); long offset=msgs.get(0).getQueueOffset(); String maxOffset=msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET); long diff=Long.parseLong(maxOffset)-offset; if(diff>100000){ //´¦ÀíÏûÏ¢¶Ñ»ýÇé¿ö return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } |
ÈçÒÔÉÏ´úÂëËùʾ£¬µ±Ä³¸ö¶ÓÁеÄÏûÏ¢Êý¶Ñ»ýµ½ 100000 ÌõÒÔÉÏ£¬Ôò³¢ÊÔ¶ªÆú²¿·Ö»òÈ«²¿ÏûÏ¢£¬ÕâÑù¾Í¿ÉÒÔ¿ìËÙ×·ÉÏ·¢ËÍÏûÏ¢µÄËÙ¶È
4.ÓÅ»¯Ã»ÌõÏûÏ¢Ïû·Ñ¹ý³Ì
¾ÙÀýÈçÏ£¬Ä³ÌõÏûÏ¢µÄÏû·Ñ¹ý³ÌÈçÏÂ
1. ¸ù¾ÝÏûÏ¢´Ó DB ²éѯÊý¾Ý 1
2. ¸ù¾ÝÏûÏ¢´Ó DB ²éѯÊý¾Ý2
3. ¸´ÔÓµÄÒµÎñ¼ÆËã
4. Ïò DB ²åÈëÊý¾Ý3
5. Ïò DB ²åÈëÊý¾Ý 4
ÕâÌõÏûÏ¢µÄÏû·Ñ¹ý³ÌÓë DB ½»»¥ÁË 4 ´Î£¬Èç¹û°´ÕÕÿ´Î 5ms ¼ÆË㣬ÄÇô×ܹ²ºÄʱ
20ms£¬¼ÙÉèÒµÎñ¼ÆËãºÄʱ 5ms£¬ÄÇô×ܹýºÄʱ 25ms£¬Èç¹ûÄÜ°Ñ 4 ´Î DB ½»»¥ÓÅ»¯Îª 2 ´Î£¬ÄÇô×ܺÄʱ¾Í¿ÉÒÔÓÅ»¯µ½
15ms£¬Ò²¾ÍÊÇ˵×ÜÌåÐÔÄÜÌá¸ßÁË 40%¡£
¶ÔÓÚ Mysql µÈ DB£¬Èç¹û²¿ÊðÔÚ´ÅÅÌ£¬ÄÇôÓë DB ½øÐн»»¥£¬Èç¹ûÊý¾ÝûÓÐÃüÖÐ
cache£¬Ã¿´Î½»»¥µÄ RT »áÖ±ÏßÉÏÉý£¬ Èç¹û²ÉÓà SSD£¬Ôò RT ÉÏÉýÇ÷ÊÆÒªÃ÷ÏÔºÃÓÚ´ÅÅÌ¡£
¸ö±ðÓ¦ÓÿÉÄÜ»áÓöµ½ÕâÖÖÇé¿ö£ºÔÚÏßÏÂѹ²âÏû·Ñ¹ý³ÌÖУ¬db ±íÏַdz£ºÃ£¬Ã¿´Î
RT ¶¼ºÜ¶Ì£¬µ«ÊÇÉÏÏßÔËÐÐÒ»¶Îʱ¼ä£¬RT ¾Í»á±ä³¤£¬Ïû·ÑÍÌÍÂÁ¿Ö±ÏßϽµ
Ö÷ÒªÔÒòÊÇÏßÏÂѹ²âʱ¼ä¹ý¶Ì£¬ÏßÉÏÔËÐÐÒ»¶Îʱ¼äºó£¬cache ÃüÖÐÂÊϽµ£¬ÄÇô
RT ¾Í»áÔö¼Ó¡£½¨ÒéÔÚÏßÏÂѹ²âʱ£¬Òª²âÊÔ×ã¹»³¤Ê±¼ä£¬¾¡¿ÉÄÜÄ£ÄâÏßÉÏ»·¾³£¬Ñ¹²â¹ý³ÌÖУ¬Êý¾ÝµÄ·Ö²¼Ò²ºÜÖØÒª£¬Êý¾Ý²»Í¬£¬¿ÉÄÜ
cache µÄÃüÖÐÂÊÒ²»áÍêÈ«²»Í¬
ËÄ¡¢Producer×î¼Ñʵ¼ù
1.·¢ËÍÏûÏ¢×¢ÒâÊÂÏî
£¨1£© Ò»¸öÓ¦Óþ¡¿ÉÄÜÓÃÒ»¸ö Topic£¬ÏûÏ¢×ÓÀàÐÍÓà tags À´±êʶ£¬tags
¿ÉÒÔÓÉÓ¦ÓÃ×ÔÓÉÉèÖá£Ö»Óз¢ËÍÏûÏ¢ÉèÖÃÁËtags£¬Ïû·Ñ·½ÔÚ¶©ÔÄÏûϢʱ£¬²Å¿ÉÒÔÀûÓà tags ÔÚ broker
×öÏûÏ¢¹ýÂË¡£
£¨2£©Ã¿¸öÏûÏ¢ÔÚÒµÎñ²ãÃæµÄΨһ±êʶÂ룬ҪÉèÖõ½ keys ×ֶΣ¬·½±ã½«À´¶¨Î»ÏûÏ¢¶ªÊ§ÎÊÌâ¡£·þÎñÆ÷»áΪÿ¸öÏûÏ¢´´½¨Ë÷Òý£¨¹þÏ£Ë÷Òý£©£¬Ó¦ÓÿÉÒÔͨ¹ý
topic£¬key À´²éѯÕâÌõÏûÏ¢ÄÚÈÝ£¬ÒÔ¼°ÏûÏ¢±»ËÏû·Ñ¡£ÓÉÓÚÊǹþÏ£Ë÷Òý£¬ÇëÎñ±Ø±£Ö¤ key ¾¡¿ÉÄÜΨһ£¬ÕâÑù¿ÉÒÔ±ÜÃâDZÔڵĹþÏ£³åÍ»¡£
£¨3£©ÏûÏ¢·¢Ëͳɹ¦»òÕßʧ°Ü£¬Òª´òÓ¡ÏûÏ¢ÈÕÖ¾£¬Îñ±ØÒª´òÓ¡ sendresult
ºÍ key ×Ö¶Î
£¨4£©send ÏûÏ¢·½·¨£¬Ö»Òª²»Å×Òì³££¬¾Í´ú±í·¢Ëͳɹ¦¡£µ«ÊÇ·¢Ëͳɹ¦»áÓжà¸ö״̬£¬ÔÚ
sendResult ÀﶨÒå
SEND_OK£ºÏûÏ¢·¢Ëͳɹ¦
FLUSH_DISK_TIMEOUT£ºÏûÏ¢·¢Ëͳɹ¦£¬µ«ÊÇ·þÎñÆ÷Ë¢Å̳¬Ê±£¬ÏûÏ¢ÒѾ½øÈë·þÎñÆ÷¶ÓÁУ¬Ö»ÓдËʱ·þÎñÆ÷å´»ú£¬ÏûÏ¢²Å»á¶ªÊ§
FLUSH_SLAVE_TIMEOUT£ºÏûÏ¢·¢Ëͳɹ¦£¬µ«ÊÇ·þÎñÆ÷ͬ²½µ½
Slave ʱ³¬Ê±£¬ÏûÏ¢ÒѾ½øÈë·þÎñÆ÷¶ÓÁУ¬Ö»ÓдËʱ·þÎñÆ÷å´»ú£¬ÏûÏ¢²Å»á¶ªÊ§
SLAVE_NOT_AVAILABLE£ºÏûÏ¢·¢Ëͳɹ¦£¬µ«ÊÇ´Ëʱ slave
²»¿ÉÓã¬ÏûÏ¢ÒѾ½øÈë·þÎñÆ÷¶ÓÁУ¬Ö»ÓдËʱ·þÎñÆ÷å´»ú£¬ÏûÏ¢²Å»á¶ªÊ§¡£¶ÔÓÚ¾«È··¢ËÍ˳ÐòÏûÏ¢µÄÓ¦Óã¬ÓÉÓÚ˳ÐòÏûÏ¢µÄ¾ÖÏÞÐÔ£¬¿ÉÄÜ»áÉæ¼°µ½Ö÷±¸×Ô¶¯Çл»ÎÊÌ⣬ËùÒÔÈç¹ûsendresult
ÖÐµÄ status ×ֶβ»µÈÓÚ SEND_OK£¬¾ÍÓ¦¸Ã³¢ÊÔÖØÊÔ¡£¶ÔÓÚÆäËûÓ¦Óã¬ÔòûÓбØÒªÕâÑù
£¨5£©¶ÔÓÚÏûÏ¢²»¿É¶ªÊ§Ó¦Óã¬Îñ±ØÒªÓÐÏûÏ¢ÖØ·¢»úÖÆ
2.ÏûÏ¢·¢ËÍʧ°Ü´¦Àí
Producer µÄ send ·½·¨±¾ÉíÖ§³ÖÄÚ²¿ÖØÊÔ£¬ÖØÊÔÂß¼ÈçÏ£º
£¨1£© ÖÁ¶àÖØÊÔ 3 ´Î
£¨2£© Èç¹û·¢ËÍʧ°Ü£¬ÔòÂÖתµ½ÏÂÒ»¸ö Broker
£¨3£© Õâ¸ö·½·¨µÄ×ܺÄʱʱ¼ä²»³¬¹ý sendMsgTimeout ÉèÖõÄÖµ£¬Ä¬ÈÏ
10sËùÒÔ£¬Èç¹û±¾ÉíÏò broker ·¢ËÍÏûÏ¢²úÉú³¬Ê±Òì³££¬¾Í²»»áÔÙ×öÖØÊÔ
Èç¹ûµ÷Óà send ͬ²½·½·¨·¢ËÍʧ°Ü£¬Ôò³¢ÊÔ½«ÏûÏ¢´æ´¢µ½ db£¬Óɺǫ́Ï̶߳¨Ê±ÖØÊÔ£¬±£Ö¤ÏûÏ¢Ò»¶¨µ½´ï
Broker¡£
ÉÏÊö db ÖØÊÔ·½Ê½ÎªÊ²Ã´Ã»Óм¯³Éµ½ MQ ¿Í»§¶ËÄÚ²¿×ö£¬¶øÊÇÒªÇóÓ¦ÓÃ×Ô¼ºÈ¥Íê³É£¬»ùÓÚÒÔϼ¸µã¿¼ÂÇ£º
£¨1£©MQ µÄ¿Í»§¶ËÉè¼ÆÎªÎÞ״̬ģʽ£¬·½±ãÈÎÒâµÄˮƽÀ©Õ¹£¬ÇÒ¶Ô»úÆ÷×ÊÔ´µÄÏûºÄ½ö½öÊÇ
cpu¡¢ÄÚ´æ¡¢ÍøÂç
£¨2£©Èç¹û MQ ¿Í»§¶ËÄÚ²¿¼¯³ÉÒ»¸ö KV ´æ´¢Ä£¿é£¬ÄÇôÊý¾ÝÖ»ÓÐͬ²½ÂäÅ̲ÅÄܽϿɿ¿£¬¶øÍ¬²½ÂäÅ̱¾ÉíÐÔÄÜ¿ªÏú½Ï´ó£¬ËùÒÔͨ³£»á²ÉÓÃÒì²½ÂäÅÌ£¬ÓÖÓÉÓÚÓ¦Óùرչý³Ì²»ÊÜ
MQ ÔËάÈËÔ±¿ØÖÆ£¬¿ÉÄܾ³£»á·¢Éú kill -9 ÕâÑù±©Á¦·½Ê½¹Ø±Õ£¬Ôì³ÉÊý¾ÝûÓм°Ê±ÂäÅ̶ø¶ªÊ§
£¨3£©Producer ËùÔÚ»úÆ÷µÄ¿É¿¿ÐԽϵͣ¬Ò»°ãΪÐéÄâ»ú£¬²»ÊÊºÏ´æ´¢ÖØÒªÊý¾Ý¡£
×ÛÉÏ£¬½¨ÒéÖØÊÔ¹ý³Ì½»ÓÉÓ¦ÓÃÀ´¿ØÖÆ¡£
3.Ñ¡Ôñ oneway ÐÎʽ·¢ËÍ
Ò»¸ö RPC µ÷Óã¬Í¨³£ÊÇÕâÑùÒ»¸ö¹ý³Ì
£¨1£©¿Í»§¶Ë·¢ËÍÇëÇóµ½·þÎñÆ÷
£¨2£©·þÎñÆ÷´¦Àí¸ÃÇëÇó
£¨3£©·þÎñÆ÷Ïò¿Í»§¶Ë·µ»ØÓ¦´ð
ËùÒÔÒ»¸ö RPC µÄºÄʱʱ¼äÊÇÉÏÊöÈý¸ö²½ÖèµÄ×ܺͣ¬¶øÄ³Ð©³¡¾°ÒªÇóºÄʱ·Ç³£¶Ì£¬µ«ÊǶԿɿ¿ÐÔÒªÇó²¢²»¸ß£¬ÀýÈçÈÕÖ¾ÊÕ¼¯ÀàÓ¦Ó㬴ËÀàÓ¦ÓÿÉÒÔ²ÉÓÃ
oneway ÐÎʽµ÷Óã¬oneway ÐÎʽֻ·¢ËÍÇëÇ󲻵ȴýÓ¦´ð£¬¶ø·¢ËÍÇëÇóÔÚ¿Í»§¶ËʵÏÖ²ãÃæ½ö½öÊÇÒ»¸ö os
ϵͳµ÷ÓõĿªÏú£¬¼´½«Êý¾ÝдÈë¿Í»§¶ËµÄ socket »º³åÇø£¬´Ë¹ý³ÌºÄʱͨ³£ÔÚ΢Ãë¼¶¡£
RocketMQ²»Ö¹¿ÉÒÔÖ±½ÓÍÆËÍÏûÏ¢£¬ÔÚÏû·Ñ¶Ë×¢²á¼àÌýÆ÷½øÐмàÌý£¬»¹¿ÉÒÔÓÉÏû·Ñ¶Ë¾ö¶¨×Ô¼ºÈ¥ÀÈ¡Êý¾Ý
/** * PullConsumer£¬¶©ÔÄÏûÏ¢ */ public class PullConsumer { //Java»º´æ private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumerGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); //ÀÈ¡¶©ÔÄÖ÷ÌâµÄ¶ÓÁУ¬Ä¬È϶ÓÁдóСÊÇ4 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTestMapBody"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ:while(true){ try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); List<MessageExt> list=pullResult.getMsgFoundList(); if(list!=null&&list.size()<100){ for(MessageExt msg:list){ System.out.println(SerializableInterface.deserialize(msg.getBody())); } } System.out.println(pullResult.getNextBeginOffset()); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: // TODO break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null){ System.out.println(offset); return offset; } return 0; } |
¸Õ¿ªÊ¼µÄûÓÐϸ¿´PullResult¶ÔÏó£¬ÒÔΪÀÈ¡µ½µÄ½á¹ûûÓÐMessageExt¶ÔÏó»¹Åܵ½ÈºÀïÃæÎʱðÈË£¬·¸2ÁË
ÌØ±ðҪעÒâ ¾²Ì¬±äÁ¿offsetTableµÄ×÷Óã¬ÀÈ¡µÄÊǰ´ÕÕ´Óoffset£¨Àí½âΪϱ꣩λÖÿªÊ¼ÀÈ¡£¬ÀÈ¡NÌõ£¬offsetTable¼Ç¼Ï´ÎÀÈ¡µÄoffsetλÖá£
|