Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
RocketMQÈëÃÅ£¨Ï£©
 
×÷Õߣº CharlesSong's Blog À´Ô´£º CharlesSong's Blog ·¢²¼ÓÚ£º 2015-04-01
  21874  次浏览      28
 

Ò»¡¢·þÎñ¶Ë°²×°²¿Êð

ÎÒÊÇÔÚÐéÄâ»úÖеÄCentOS6.5ÖнøÐв¿Êð¡£

1.ÏÂÔØ³ÌÐò

2.tar -xvf alibaba-rocketmq-3.0.7.tar.gz ½âѹµ½Êʵ±µÄĿ¼Èç/opt/Ŀ¼

3.Æô¶¯RocketMQ£º½øÈërocketmq/bin Ŀ¼ Ö´ÐÐ

nohup sh mqnamesrv &

4.Æô¶¯Broker£¬ÉèÖöÔÓ¦µÄNameServer

nohup sh  mqbroker -n "127.0.0.1:9876" &

¶þ¡¢±àд¿Í»§¶Ë

¿ÉÒԲ鿴samepleÖеÄquickstartÔ´Âë 1.Consumer ÏûÏ¢Ïû·ÑÕß

/**
* Consumer£¬¶©ÔÄÏûÏ¢
*/
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");

consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("QuickStartConsumer");
consumer.subscribe("QuickStart", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

2.ProducerÏûÏ¢Éú²úÕß

/**
* Producer£¬·¢ËÍÏûÏ¢
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("QuickStartProducer");
producer.start();

for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("QuickStart",// topic
"TagA",// tag
("Hello RocketMQ ,QuickStart" + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

producer.shutdown();
}
}

3.Ê×ÏÈÔËÐÐConsumer³ÌÐò£¬Ò»Ö±ÔÚÔËÐÐ״̬½ÓÊÕ·þÎñÆ÷¶ËÍÆË͹ýÀ´µÄÏûÏ¢

23:18:07.587 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
23:18:07.591 [main] DEBUG i.n.util.internal.PlatformDependent - Platform: Windows
23:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 7
23:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.ByteBuffer.cleaner: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: unavailable
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes. Please check the configuration for better performance.
23:18:07.595 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
23:18:08.355 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0x8c0d4793e5820c31
23:18:08.446 [NettyClientWorkerThread_1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.noResourceLeakDetection: false
Consumer Started.

4.ÔÙ´ÎÔËÐÐProducer³ÌÐò£¬Éú³ÉÏûÏ¢²¢·¢Ë͵½Broker£¬ProducerµÄÈÕÖ¾³åûÁË£¬µ«ÊÇ¿ÉÒÔ¿´µ½BrokerÍÆË͵½ConsumerµÄÒ»ÌõÏûÏ¢

ConsumeMessageThread-QuickStartConsumer-3 Receive New Messages: [MessageExt [queueId=0, storeSize=150, 
queueOffset=244, sysFlag=0, bornTimestamp=1400772029972, bornHost=/10.162.0.7:54234, 
storeTimestamp=1400772016017, storeHost=/127.0.0.1:10911, msgId=0A0A0A5900002A9F0000000000063257, 
commitLogOffset=406103, bodyCRC=112549959, reconsumeTimes=0, preparedTransactionOffset=0,
 toString()=Message [topic=QuickStart, flag=0, 
properties={TAGS=TagA, WAIT=true, MAX_OFFSET=245, MIN_OFFSET=0}, body=29]]]

Èý¡¢Consumer×î¼Ñʵ¼ù

1.Ïû·Ñ¹ý³ÌÒª×öµ½Ãݵȣ¨¼´Ïû·Ñ¶ËÈ¥ÖØ£©

RocketMQÎÞ·¨×öµ½ÏûÏ¢ÖØ¸´£¬ËùÒÔÈç¹ûÒµÎñ¶ÔÏûÏ¢ÖØ¸´·Ç³£Ãô¸Ð£¬Îñ±ØÒªÔÚÒµÎñ²ãÃæÈ¥ÖØ£¬ÓÐÒÔÏÂһЩ·½Ê½£º

£¨1£©.½«ÏûÏ¢µÄΨһ¼ü£¬¿ÉÒÔÊÇMsgId£¬Ò²¿ÉÒÔÊÇÏûÏ¢ÄÚÈÝÖеÄΨһ±êʶ×ֶΣ¬ÀýÈç¶©µ¥ID£¬Ïû·Ñ֮ǰÅжÏÊÇ·ñÔÚDB»òTair£¨È«¾ÖKV´æ´¢£©ÖдæÔÚ£¬Èç¹û²»´æÔÚÔò²åÈ룬²¢Ïû·Ñ£¬·ñÔòÌø¹ý¡££¨Êµ¼ù¹ý³ÌÒª¿¼ÂÇÔ­×ÓÐÔÎÊÌ⣬ÅжÏÊÇ·ñ´æÔÚ¿ÉÒÔ³¢ÊÔ²åÈ룬Èç¹û±¨Ö÷¼ü³åÍ»£¬Ôò²åÈëʧ°Ü£¬Ö±½ÓÌø¹ý£© msgidÒ»¶¨ÊÇÈ«¾ÖΨһµÄ±êʶ·û£¬µ«ÊÇ¿ÉÄÜ»á´æÔÚͬÑùµÄÏûÏ¢ÓÐÁ½¸ö²»Í¬µÄmsgidµÄÇé¿ö£¨ÓжàÖÖÔ­Òò£©£¬ÕâÖÖÇé¿ö¿ÉÄÜ»áʹҵÎñÉÏÖØ¸´£¬½¨Òé×îºÃʹÓÃÏûÏ¢ÌåÖеÄΨһ±êʶ×Ö¶ÎÈ¥ÖØ

£¨2£©.ʹҵÎñ²ãÃæµÄ״̬»úÈ¥ÖØ

2.ÅúÁ¿·½Ê½Ïû·Ñ

Èç¹ûÒµÎñÁ÷³ÌÖ§³ÖÅúÁ¿·½Ê½Ïû·Ñ£¬Ôò¿ÉÒԺܴó³Ì¶ÈÉϵÄÌá¸ßÍÌÍÂÁ¿£¬¿ÉÒÔͨ¹ýÉèÖÃConsumerµÄconsumerMessageBatchMaxSize²ÎÊý£¬Ä¬ÈÏÊÇ1£¬¼´Ò»´ÎÏû·ÑÒ»Ìõ²ÎÊý

3.Ìø¹ý·ÇÖØÒªµÄÏûÏ¢

·¢ÉúÏûÏ¢¶Ñ»ýʱ£¬Èç¹ûÏû·ÑËÙ¶ÈÒ»Ö±¸ú²»ÉÏ·¢ËÍËÙ¶È£¬¿ÉÒÔÑ¡Ôñ¶ªÆú²»ÖØÒªµÄÏûÏ¢

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

long offset=msgs.get(0).getQueueOffset();

String maxOffset=msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
long diff=Long.parseLong(maxOffset)-offset;
if(diff>100000){
//´¦ÀíÏûÏ¢¶Ñ»ýÇé¿ö
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

ÈçÒÔÉÏ´úÂëËùʾ£¬µ±Ä³¸ö¶ÓÁеÄÏûÏ¢Êý¶Ñ»ýµ½ 100000 ÌõÒÔÉÏ£¬Ôò³¢ÊÔ¶ªÆú²¿·Ö»òÈ«²¿ÏûÏ¢£¬ÕâÑù¾Í¿ÉÒÔ¿ìËÙ×·ÉÏ·¢ËÍÏûÏ¢µÄËÙ¶È

4.ÓÅ»¯Ã»ÌõÏûÏ¢Ïû·Ñ¹ý³Ì

¾ÙÀýÈçÏ£¬Ä³ÌõÏûÏ¢µÄÏû·Ñ¹ý³ÌÈçÏÂ

1. ¸ù¾ÝÏûÏ¢´Ó DB ²éѯÊý¾Ý 1

2. ¸ù¾ÝÏûÏ¢´Ó DB ²éѯÊý¾Ý2

3. ¸´ÔÓµÄÒµÎñ¼ÆËã

4. Ïò DB ²åÈëÊý¾Ý3

5. Ïò DB ²åÈëÊý¾Ý 4

ÕâÌõÏûÏ¢µÄÏû·Ñ¹ý³ÌÓë DB ½»»¥ÁË 4 ´Î£¬Èç¹û°´ÕÕÿ´Î 5ms ¼ÆË㣬ÄÇô×ܹ²ºÄʱ 20ms£¬¼ÙÉèÒµÎñ¼ÆËãºÄʱ 5ms£¬ÄÇô×ܹýºÄʱ 25ms£¬Èç¹ûÄÜ°Ñ 4 ´Î DB ½»»¥ÓÅ»¯Îª 2 ´Î£¬ÄÇô×ܺÄʱ¾Í¿ÉÒÔÓÅ»¯µ½ 15ms£¬Ò²¾ÍÊÇ˵×ÜÌåÐÔÄÜÌá¸ßÁË 40%¡£

¶ÔÓÚ Mysql µÈ DB£¬Èç¹û²¿ÊðÔÚ´ÅÅÌ£¬ÄÇôÓë DB ½øÐн»»¥£¬Èç¹ûÊý¾ÝûÓÐÃüÖÐ cache£¬Ã¿´Î½»»¥µÄ RT »áÖ±ÏßÉÏÉý£¬ Èç¹û²ÉÓà SSD£¬Ôò RT ÉÏÉýÇ÷ÊÆÒªÃ÷ÏÔºÃÓÚ´ÅÅÌ¡£

¸ö±ðÓ¦ÓÿÉÄÜ»áÓöµ½ÕâÖÖÇé¿ö£ºÔÚÏßÏÂѹ²âÏû·Ñ¹ý³ÌÖУ¬db ±íÏַdz£ºÃ£¬Ã¿´Î RT ¶¼ºÜ¶Ì£¬µ«ÊÇÉÏÏßÔËÐÐÒ»¶Îʱ¼ä£¬RT ¾Í»á±ä³¤£¬Ïû·ÑÍÌÍÂÁ¿Ö±ÏßϽµ

Ö÷ÒªÔ­ÒòÊÇÏßÏÂѹ²âʱ¼ä¹ý¶Ì£¬ÏßÉÏÔËÐÐÒ»¶Îʱ¼äºó£¬cache ÃüÖÐÂÊϽµ£¬ÄÇô RT ¾Í»áÔö¼Ó¡£½¨ÒéÔÚÏßÏÂѹ²âʱ£¬Òª²âÊÔ×ã¹»³¤Ê±¼ä£¬¾¡¿ÉÄÜÄ£ÄâÏßÉÏ»·¾³£¬Ñ¹²â¹ý³ÌÖУ¬Êý¾ÝµÄ·Ö²¼Ò²ºÜÖØÒª£¬Êý¾Ý²»Í¬£¬¿ÉÄÜ cache µÄÃüÖÐÂÊÒ²»áÍêÈ«²»Í¬

ËÄ¡¢Producer×î¼Ñʵ¼ù

1.·¢ËÍÏûÏ¢×¢ÒâÊÂÏî

£¨1£© Ò»¸öÓ¦Óþ¡¿ÉÄÜÓÃÒ»¸ö Topic£¬ÏûÏ¢×ÓÀàÐÍÓà tags À´±êʶ£¬tags ¿ÉÒÔÓÉÓ¦ÓÃ×ÔÓÉÉèÖá£Ö»Óз¢ËÍÏûÏ¢ÉèÖÃÁËtags£¬Ïû·Ñ·½ÔÚ¶©ÔÄÏûϢʱ£¬²Å¿ÉÒÔÀûÓà tags ÔÚ broker ×öÏûÏ¢¹ýÂË¡£

£¨2£©Ã¿¸öÏûÏ¢ÔÚÒµÎñ²ãÃæµÄΨһ±êʶÂ룬ҪÉèÖõ½ keys ×ֶΣ¬·½±ã½«À´¶¨Î»ÏûÏ¢¶ªÊ§ÎÊÌâ¡£·þÎñÆ÷»áΪÿ¸öÏûÏ¢´´½¨Ë÷Òý£¨¹þÏ£Ë÷Òý£©£¬Ó¦ÓÿÉÒÔͨ¹ý topic£¬key À´²éѯÕâÌõÏûÏ¢ÄÚÈÝ£¬ÒÔ¼°ÏûÏ¢±»Ë­Ïû·Ñ¡£ÓÉÓÚÊǹþÏ£Ë÷Òý£¬ÇëÎñ±Ø±£Ö¤ key ¾¡¿ÉÄÜΨһ£¬ÕâÑù¿ÉÒÔ±ÜÃâDZÔڵĹþÏ£³åÍ»¡£

£¨3£©ÏûÏ¢·¢Ëͳɹ¦»òÕßʧ°Ü£¬Òª´òÓ¡ÏûÏ¢ÈÕÖ¾£¬Îñ±ØÒª´òÓ¡ sendresult ºÍ key ×Ö¶Î

£¨4£©send ÏûÏ¢·½·¨£¬Ö»Òª²»Å×Òì³££¬¾Í´ú±í·¢Ëͳɹ¦¡£µ«ÊÇ·¢Ëͳɹ¦»áÓжà¸ö״̬£¬ÔÚ sendResult ÀﶨÒå

SEND_OK£ºÏûÏ¢·¢Ëͳɹ¦

FLUSH_DISK_TIMEOUT£ºÏûÏ¢·¢Ëͳɹ¦£¬µ«ÊÇ·þÎñÆ÷Ë¢Å̳¬Ê±£¬ÏûÏ¢ÒѾ­½øÈë·þÎñÆ÷¶ÓÁУ¬Ö»ÓдËʱ·þÎñÆ÷å´»ú£¬ÏûÏ¢²Å»á¶ªÊ§

FLUSH_SLAVE_TIMEOUT£ºÏûÏ¢·¢Ëͳɹ¦£¬µ«ÊÇ·þÎñÆ÷ͬ²½µ½ Slave ʱ³¬Ê±£¬ÏûÏ¢ÒѾ­½øÈë·þÎñÆ÷¶ÓÁУ¬Ö»ÓдËʱ·þÎñÆ÷å´»ú£¬ÏûÏ¢²Å»á¶ªÊ§

SLAVE_NOT_AVAILABLE£ºÏûÏ¢·¢Ëͳɹ¦£¬µ«ÊÇ´Ëʱ slave ²»¿ÉÓã¬ÏûÏ¢ÒѾ­½øÈë·þÎñÆ÷¶ÓÁУ¬Ö»ÓдËʱ·þÎñÆ÷å´»ú£¬ÏûÏ¢²Å»á¶ªÊ§¡£¶ÔÓÚ¾«È··¢ËÍ˳ÐòÏûÏ¢µÄÓ¦Óã¬ÓÉÓÚ˳ÐòÏûÏ¢µÄ¾ÖÏÞÐÔ£¬¿ÉÄÜ»áÉæ¼°µ½Ö÷±¸×Ô¶¯Çл»ÎÊÌ⣬ËùÒÔÈç¹ûsendresult ÖÐµÄ status ×ֶβ»µÈÓÚ SEND_OK£¬¾ÍÓ¦¸Ã³¢ÊÔÖØÊÔ¡£¶ÔÓÚÆäËûÓ¦Óã¬ÔòûÓбØÒªÕâÑù

£¨5£©¶ÔÓÚÏûÏ¢²»¿É¶ªÊ§Ó¦Óã¬Îñ±ØÒªÓÐÏûÏ¢ÖØ·¢»úÖÆ

2.ÏûÏ¢·¢ËÍʧ°Ü´¦Àí

Producer µÄ send ·½·¨±¾ÉíÖ§³ÖÄÚ²¿ÖØÊÔ£¬ÖØÊÔÂß¼­ÈçÏ£º

£¨1£© ÖÁ¶àÖØÊÔ 3 ´Î

£¨2£© Èç¹û·¢ËÍʧ°Ü£¬ÔòÂÖתµ½ÏÂÒ»¸ö Broker

£¨3£© Õâ¸ö·½·¨µÄ×ܺÄʱʱ¼ä²»³¬¹ý sendMsgTimeout ÉèÖõÄÖµ£¬Ä¬ÈÏ 10sËùÒÔ£¬Èç¹û±¾ÉíÏò broker ·¢ËÍÏûÏ¢²úÉú³¬Ê±Òì³££¬¾Í²»»áÔÙ×öÖØÊÔ

Èç¹ûµ÷Óà send ͬ²½·½·¨·¢ËÍʧ°Ü£¬Ôò³¢ÊÔ½«ÏûÏ¢´æ´¢µ½ db£¬Óɺǫ́Ï̶߳¨Ê±ÖØÊÔ£¬±£Ö¤ÏûÏ¢Ò»¶¨µ½´ï Broker¡£

ÉÏÊö db ÖØÊÔ·½Ê½ÎªÊ²Ã´Ã»Óм¯³Éµ½ MQ ¿Í»§¶ËÄÚ²¿×ö£¬¶øÊÇÒªÇóÓ¦ÓÃ×Ô¼ºÈ¥Íê³É£¬»ùÓÚÒÔϼ¸µã¿¼ÂÇ£º

£¨1£©MQ µÄ¿Í»§¶ËÉè¼ÆÎªÎÞ״̬ģʽ£¬·½±ãÈÎÒâµÄˮƽÀ©Õ¹£¬ÇÒ¶Ô»úÆ÷×ÊÔ´µÄÏûºÄ½ö½öÊÇ cpu¡¢ÄÚ´æ¡¢ÍøÂç

£¨2£©Èç¹û MQ ¿Í»§¶ËÄÚ²¿¼¯³ÉÒ»¸ö KV ´æ´¢Ä£¿é£¬ÄÇôÊý¾ÝÖ»ÓÐͬ²½ÂäÅ̲ÅÄܽϿɿ¿£¬¶øÍ¬²½ÂäÅ̱¾ÉíÐÔÄÜ¿ªÏú½Ï´ó£¬ËùÒÔͨ³£»á²ÉÓÃÒì²½ÂäÅÌ£¬ÓÖÓÉÓÚÓ¦Óùرչý³Ì²»ÊÜ MQ ÔËάÈËÔ±¿ØÖÆ£¬¿ÉÄܾ­³£»á·¢Éú kill -9 ÕâÑù±©Á¦·½Ê½¹Ø±Õ£¬Ôì³ÉÊý¾ÝûÓм°Ê±ÂäÅ̶ø¶ªÊ§

£¨3£©Producer ËùÔÚ»úÆ÷µÄ¿É¿¿ÐԽϵͣ¬Ò»°ãΪÐéÄâ»ú£¬²»ÊÊºÏ´æ´¢ÖØÒªÊý¾Ý¡£ ×ÛÉÏ£¬½¨ÒéÖØÊÔ¹ý³Ì½»ÓÉÓ¦ÓÃÀ´¿ØÖÆ¡£

3.Ñ¡Ôñ oneway ÐÎʽ·¢ËÍ

Ò»¸ö RPC µ÷Óã¬Í¨³£ÊÇÕâÑùÒ»¸ö¹ý³Ì

£¨1£©¿Í»§¶Ë·¢ËÍÇëÇóµ½·þÎñÆ÷

£¨2£©·þÎñÆ÷´¦Àí¸ÃÇëÇó

£¨3£©·þÎñÆ÷Ïò¿Í»§¶Ë·µ»ØÓ¦´ð

ËùÒÔÒ»¸ö RPC µÄºÄʱʱ¼äÊÇÉÏÊöÈý¸ö²½ÖèµÄ×ܺͣ¬¶øÄ³Ð©³¡¾°ÒªÇóºÄʱ·Ç³£¶Ì£¬µ«ÊǶԿɿ¿ÐÔÒªÇó²¢²»¸ß£¬ÀýÈçÈÕÖ¾ÊÕ¼¯ÀàÓ¦Ó㬴ËÀàÓ¦ÓÿÉÒÔ²ÉÓà oneway ÐÎʽµ÷Óã¬oneway ÐÎʽֻ·¢ËÍÇëÇ󲻵ȴýÓ¦´ð£¬¶ø·¢ËÍÇëÇóÔÚ¿Í»§¶ËʵÏÖ²ãÃæ½ö½öÊÇÒ»¸ö os ϵͳµ÷ÓõĿªÏú£¬¼´½«Êý¾ÝдÈë¿Í»§¶ËµÄ socket »º³åÇø£¬´Ë¹ý³ÌºÄʱͨ³£ÔÚ΢Ãë¼¶¡£

RocketMQ²»Ö¹¿ÉÒÔÖ±½ÓÍÆËÍÏûÏ¢£¬ÔÚÏû·Ñ¶Ë×¢²á¼àÌýÆ÷½øÐмàÌý£¬»¹¿ÉÒÔÓÉÏû·Ñ¶Ë¾ö¶¨×Ô¼ºÈ¥À­È¡Êý¾Ý

/**
* PullConsumer£¬¶©ÔÄÏûÏ¢
*/
public class PullConsumer {
//Java»º´æ
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();


public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//À­È¡¶©ÔÄÖ÷ÌâµÄ¶ÓÁУ¬Ä¬È϶ÓÁдóСÊÇ4
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ:while(true){
try {

PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
List<MessageExt> list=pullResult.getMsgFoundList();
if(list!=null&&list.size()<100){
for(MessageExt msg:list){
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}

consumer.shutdown();
}


private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}


private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null){
System.out.println(offset);
return offset;
}
return 0;
}

¸Õ¿ªÊ¼µÄûÓÐϸ¿´PullResult¶ÔÏó£¬ÒÔΪÀ­È¡µ½µÄ½á¹ûûÓÐMessageExt¶ÔÏó»¹Åܵ½ÈºÀïÃæÎʱðÈË£¬·¸2ÁË

ÌØ±ðҪעÒâ ¾²Ì¬±äÁ¿offsetTableµÄ×÷Óã¬À­È¡µÄÊǰ´ÕÕ´Óoffset£¨Àí½âΪϱ꣩λÖÿªÊ¼À­È¡£¬À­È¡NÌõ£¬offsetTable¼Ç¼Ï´ÎÀ­È¡µÄoffsetλÖá£

   
21874 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

ÆóÒµ¼Ü¹¹¡¢TOGAFÓëArchiMate¸ÅÀÀ
¼Ü¹¹Ê¦Ö®Â·-ÈçºÎ×öºÃÒµÎñ½¨Ä££¿
´óÐÍÍøÕ¾µçÉÌÍøÕ¾¼Ü¹¹°¸ÀýºÍ¼¼Êõ¼Ü¹¹µÄʾÀý
ÍêÕûµÄArchimateÊÓµãÖ¸ÄÏ£¨°üÀ¨Ê¾Àý£©
Ïà¹ØÎĵµ

Êý¾ÝÖÐ̨¼¼Êõ¼Ü¹¹·½·¨ÂÛÓëʵ¼ù
ÊÊÓÃArchiMate¡¢EA ºÍ iSpace½øÐÐÆóÒµ¼Ü¹¹½¨Ä£
ZachmanÆóÒµ¼Ü¹¹¿ò¼Ü¼ò½é
ÆóÒµ¼Ü¹¹ÈÃSOAÂ䵨
Ïà¹Ø¿Î³Ì

ÔÆÆ½Ì¨Óë΢·þÎñ¼Ü¹¹Éè¼Æ
ÖÐ̨սÂÔ¡¢ÖÐ̨½¨ÉèÓëÊý×ÖÉÌÒµ
ÒÚ¼¶Óû§¸ß²¢·¢¡¢¸ß¿ÉÓÃϵͳ¼Ü¹¹
¸ß¿ÉÓ÷ֲ¼Ê½¼Ü¹¹Éè¼ÆÓëʵ¼ù
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

ר¼ÒÊӽǿ´ITÓë¼Ü¹¹
Èí¼þ¼Ü¹¹Éè¼Æ
ÃæÏò·þÎñÌåϵ¼Ü¹¹ºÍÒµÎñ×é¼þ
ÈËÈËÍøÒÆ¶¯¿ª·¢¼Ü¹¹
¼Ü¹¹¸¯»¯Ö®ÃÕ
̸ƽ̨¼´·þÎñPaaS


ÃæÏòÓ¦Óõļܹ¹Éè¼ÆÊµ¼ù
µ¥Ôª²âÊÔ+ÖØ¹¹+Éè¼ÆÄ£Ê½
Èí¼þ¼Ü¹¹Ê¦¡ª¸ß¼¶Êµ¼ù
Èí¼þ¼Ü¹¹Éè¼Æ·½·¨¡¢°¸ÀýÓëʵ¼ù
ǶÈëʽÈí¼þ¼Ü¹¹Éè¼Æ¡ª¸ß¼¶Êµ¼ù
SOAÌåϵ½á¹¹Êµ¼ù


Èñ°²¿Æ¼¼ Èí¼þ¼Ü¹¹Éè¼Æ·½·¨
³É¶¼ ǶÈëʽÈí¼þ¼Ü¹¹Éè¼Æ
ÉϺ£Æû³µ ǶÈëʽÈí¼þ¼Ü¹¹Éè¼Æ
±±¾© Èí¼þ¼Ü¹¹Éè¼Æ
ÉϺ£ Èí¼þ¼Ü¹¹Éè¼Æ°¸ÀýÓëʵ¼ù
±±¾© ¼Ü¹¹Éè¼Æ·½·¨°¸ÀýÓëʵ¼ù
ÉîÛÚ ¼Ü¹¹Éè¼Æ·½·¨°¸ÀýÓëʵ¼ù
ǶÈëʽÈí¼þ¼Ü¹¹Éè¼Æ¡ª¸ß¼¶Êµ¼ù