Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
ÏûÏ¢Öмä¼þ¡ªRocketMQÏûÏ¢·¢ËÍ
 
  3020  次浏览      27
 2019-1-8
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚjianshu£¬ÎÄÕ½éÉÜÁËRocketMQÍøÂç¼Ü¹¹Í¼ÒÔ¼°RocketMQ·¢ËÍÆÕͨÏûÏ¢µÄÈ«Á÷³Ì½â¶ÁµÈÏà¹ØÄÚÈÝ¡£

ÕªÒª£ºÊ¹Óÿͻ§¶Ë·¢ËÍÒ»ÌõÏûÏ¢ºÜEasy£¬ÔÚÕâ±³ºóRocketMQÍê³ÉÁËÔõôÑùµÄ²Ù×÷ÄØ£¿

´óµÀÖÁ¼ò£¬ÏûÏ¢¶ÓÁпÉÒÔ¼òµ¥¸ÅÀ¨Îª£º¡°Ò»·¢Ò»´æÒ»ÊÕ¡±£¬ÔÚÕâÈý¸ö¹ý³ÌÖÐÏûÏ¢·¢ËÍ×îΪ¼òµ¥£¬Ò²±È½ÏÈÝÒ×ÈëÊÖ£¬ÊʺϳõÖн×ͯЬ×÷ΪMQÑо¿ºÍѧϰµÄÇÐÈëµã¡£Òò´Ë£¬±¾ÆªÖ÷Òª´ÓÒ»ÌõÏûÏ¢·¢ËÍΪÇÐÈëµã£¬Ïêϸ²ûÊöÔÚRocketMQÕâ¿î·Ö²¼Ê½ÏûÏ¢¶ÓÁÐÖз¢ËÍÒ»ÌõÆÕͨÏûÏ¢µÄ´óÖÂÁ÷³ÌºÍϸ½Ú¡£

Ò»¡¢RocketMQÍøÂç¼Ü¹¹Í¼

RocketMQ·Ö²¼Ê½ÏûÏ¢¶ÓÁеÄÍøÂ粿Êð¼Ü¹¹Í¼ÈçÏÂͼËùʾ£¨ÆäÖУ¬°üº¬ÁËÉú²úÕßProducer·¢ËÍÆÕͨÏûÏ¢ÖÁ¼¯ÈºµÄÁ½ÌõÖ÷Ïߣ©

RocketMQ²¿Êð¼Ü¹¹.jpg

¶ÔÓÚÉÏͼÖм¸¸ö½ÇÉ«µÄ˵Ã÷£º

£¨1£©NameServer£ºRocketMQ¼¯ÈºµÄÃüÃû·þÎñÆ÷£¨Ò²¿ÉÒÔ˵ÊÇ×¢²áÖÐÐÄ£©£¬Ëü±¾ÉíÊÇÎÞ״̬µÄ£¨Êµ¼ÊÇé¿öÏ¿ÉÄÜ´æÔÚÿ¸öNameServerʵÀýÉϵÄÊý¾ÝÓжÌÔݵIJ»Ò»ÖÂÏÖÏ󣬵«ÊÇͨ¹ý¶¨Ê±¸üУ¬Ôڴ󲿷ÖÇé¿ö϶¼ÊÇÒ»Öµģ©£¬ÓÃÓÚ¹ÜÀí¼¯ÈºµÄÔªÊý¾Ý£¨ ÀýÈ磬KVÅäÖá¢Topic¡¢BrokerµÄ×¢²áÐÅÏ¢£©¡£

£¨2£©Broker£¨Master£©£ºRocketMQÏûÏ¢´úÀí·þÎñÆ÷Ö÷½Úµã£¬Æðµ½´®ÁªProducerµÄÏûÏ¢·¢ËͺÍConsumerµÄÏûÏ¢Ïû·Ñ£¬ºÍ½«ÏûÏ¢µÄÂäÅÌ´æ´¢µÄ×÷Óã»

£¨3£©Broker£¨Slave£©£ºRocketMQÏûÏ¢´úÀí·þÎñÆ÷±¸·Ý½Úµã£¬Ö÷ÒªÊÇͨ¹ýͬ²½/Òì²½µÄ·½Ê½½«Ö÷½ÚµãµÄÏûϢͬ²½¹ýÀ´½øÐб¸·Ý£¬ÎªRocketMQ¼¯ÈºµÄ¸ß¿ÉÓÃÐÔÌṩ±£ÕÏ£»

£¨4£©Producer£¨ÏûÏ¢Éú²úÕߣ©£ºÔÚÕâÀïΪÆÕͨÏûÏ¢µÄÉú²úÕߣ¬Ö÷Òª»ùÓÚRocketMQ-ClientÄ£¿é½«ÏûÏ¢·¢ËÍÖÁRocketMQµÄÖ÷½Úµã¡£

¶ÔÓÚÉÏÃæÍ¼Öм¸ÌõͨÐÅÁ´Â·µÄ¹ØÏµ£º

£¨1£©ProducerÓëNamerServer£ºÃ¿Ò»¸öProducer»áÓëNameServer¼¯ÈºÖеÄÒ»¸öʵÀý½¨Á¢TCPÁ¬½Ó£¬´ÓÕâ¸öNameServerʵÀýÉÏÀ­È¡Topic·ÓÉÐÅÏ¢£»

£¨2£©ProducerºÍBroker:Producer»áºÍËüÒª·¢Ë͵ÄtopicÏà¹ØÁªµÄMasterµÄBroker´úÀí·þÎñÆ÷½¨Á¢TCPÁ¬½Ó£¬ÓÃÓÚ·¢ËÍÏûÏ¢ÒÔ¼°¶¨Ê±µÄÐÄÌøÐÅÏ¢£»

£¨3£©BrokerºÍNamerServer£ºBroker£¨Master or Slave£©¾ù»áºÍÿһ¸öNameServerʵÀýÀ´½¨Á¢TCPÁ¬½Ó¡£BrokerÔÚÆô¶¯µÄʱºò»á×¢²á×Ô¼ºÅäÖõÄTopicÐÅÏ¢µ½NameServer¼¯ÈºµÄÿһ̨»úÆ÷ÖС£¼´Ã¿Ò»¸öNameServer¾ùÓиÃbrokerµÄTopic·ÓÉÅäÖÃÐÅÏ¢¡£ÆäÖУ¬MasterÓëMasterÖ®¼äÎÞÁ¬½Ó£¬MasterÓëSlaveÖ®¼äÓÐÁ¬½Ó£»

¶þ¡¢¿Í»§¶Ë·¢ËÍÆÕͨÏûÏ¢µÄdemo·½·¨

ÔÚRocketMQÔ´Â빤³ÌµÄexample°üϾÍÓÐ×îΪ¼òµ¥µÄ·¢ËÍÆÕͨÏûÏ¢µÄÑùÀý´úÂ루ps£º¶ÔÓÚ¸Õ¸Õ½Ó´¥RocketMQµÄͯЬʹÓÃÕâ¸ö°üÏÂÃæµÄÑùÀý´úÂë½øÐÐϵͳÐÔµÄѧϰºÍµ÷ÊÔ£©¡£

ÎÒÃÇ¿ÉÒÔÖ±½ÓrunÏ¡°org.apache.rocketmq.example.simple¡±°üÏÂProducerÀàµÄmain·½·¨¼´¿ÉÍê³ÉÒ»´ÎÆÕͨÏûÏ¢µÄ·¢ËÍ£¨Ö÷Òª´úÂëÈçÏ£¬ÔÚÕâÀïÐè±¾µØ½«NameServerºÍBrokerʵÀý¾ù²¿ÊðÆðÀ´£©£º

//step1.Æô¶¯DefaultMQProducer£¬Æô¶¯Ê±µÄ¾ßÌåÁ÷³ÌÒ»»á¶ù»áÏêϸ˵Ã÷
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
try {
{
//step2.·â×°½«Òª·¢ËÍÏûÏ¢µÄÄÚÈÝ
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//step3.·¢ËÍÏûÏ¢Á÷³Ì£¬¾ßÌåÁ÷³Ì´ý»á¶ù˵
SendResult sendResult = producer.send(msg);
}
} catch (Exception e) {
//Exception code
}
producer.shutdown();

Èý¡¢RocketMQ·¢ËÍÆÕͨÏûÏ¢µÄÈ«Á÷³Ì½â¶Á

´ÓÉÏÃæÒ»½ÚÖпÉÒÔ¿´³ö£¬ÏûÏ¢Éú²úÕß·¢ËÍÏûÏ¢µÄdemo´úÂ뻹ÊǽÏΪ¼òµ¥µÄ£¬ºËÐľͼ¸ÐдúÂ룬µ«ÔÚÉîÈëÑжÁRocketMQµÄClientÄ£¿éºó£¬·¢ÏÖÆä·¢ËÍÏûÏ¢µÄºËÐÄÁ÷³Ì»¹ÊÇÓÐһЩ¸´Ôӵġ£ÏÂÃæ½«Ö÷Òª´ÓDefaultMQProducerµÄÆô¶¯Á÷³Ì¡¢send·¢ËÍ·½·¨ºÍBroker´úÀí·þÎñÆ÷µÄÏûÏ¢´¦ÀíÈý·½Ãæ·Ö±ð½øÐзÖÎöºÍ²ûÊö¡£

3.1 DefaultMQProducerµÄÆô¶¯Á÷³Ì

ÔÚ¿Í»§¶Ë·¢ËÍÆÕͨÏûÏ¢µÄdemo´úÂ벿·Ö£¬ÎÒÃÇÏÈÊǽ«DefaultMQProducerʵÀýÆô¶¯ÆðÀ´£¬ÀïÃæµ÷ÓÃÁËĬÈÏÉú³ÉÏûÏ¢µÄʵÏÖÀࡪDefaultMQProducerImplµÄstart()·½·¨¡£

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}

ĬÈÏÉú³ÉÏûÏ¢µÄʵÏÖÀࡪDefaultMQProducerImplµÄÆô¶¯Ö÷ÒªÁ÷³ÌÈçÏ£º

£¨1£©³õʼ»¯µÃµ½MQClientInstanceʵÀý¶ÔÏ󣬲¢×¢²áÖÁ±¾µØ»º´æ±äÁ¿¡ªproducerTableÖУ»

£¨2£©½«Ä¬ÈÏTopic£¨¡°TBW102¡±£©±£´æÖÁ±¾µØ»º´æ±äÁ¿¡ªtopicPublishInfoTableÖУ»

£¨3£©MQClientInstanceʵÀý¶ÔÏóµ÷ÓÃ×Ô¼ºµÄstart()·½·¨£¬Æô¶¯Ò»Ð©¿Í»§¶Ë±¾µØµÄ·þÎñỊ̈߳¬ÈçÀ­È¡ÏûÏ¢·þÎñ¡¢¿Í»§¶ËÍøÂçͨÐÅ·þÎñ¡¢ÖØÐ¸ºÔؾùºâ·þÎñÒÔ¼°ÆäËûÈô¸É¸ö¶¨Ê±ÈÎÎñ£¨°üÀ¨£¬¸üзÓÉ/ÇåÀíÏÂÏßBroker/·¢ËÍÐÄÌø/³Ö¾Ã»¯consumerOffset/µ÷ÕûÏ̳߳أ©£¬²¢ÖØÐÂ×öÒ»´ÎÆô¶¯£¨Õâ´Î²ÎÊýΪfalse£©£»

£¨4£©×îºóÏòËùÓеÄBroker´úÀí·þÎñÆ÷½Úµã·¢ËÍÐÄÌø°ü£»

×ܽáÆðÀ´£¬DefaultMQProducerµÄÖ÷ÒªÆô¶¯Á÷³ÌÈçÏ£º

DefaultMQProducerµÄstart·½·¨Æô¶¯¹ý³Ì.jpg

ÕâÀïÓÐÒÔϼ¸µãÐèҪ˵Ã÷£º

£¨1£©ÔÚÒ»¸ö¿Í»§¶ËÖУ¬Ò»¸öproducerGroupÖ»ÄÜÓÐÒ»¸öʵÀý£»

£¨2£©¸ù¾Ý²»Í¬µÄclientId£¬MQClientManager½«¸ø³ö²»Í¬µÄMQClientInstance£»

£¨3£©¸ù¾Ý²»Í¬µÄproducerGroup£¬MQClientInstance½«¸ø³ö²»Í¬µÄMQProducerºÍMQConsumer£¨±£´æÔÚ±¾µØ»º´æ±äÁ¿¡ª¡ªproducerTableºÍconsumerTableÖУ©£»

3.2 send·¢ËÍ·½·¨µÄºËÐÄÁ÷³Ì

ͨ¹ýRocketmqµÄ¿Í»§¶ËÄ£¿é·¢ËÍÏûÏ¢Ö÷ÒªÓÐÒÔÏÂÈýÖÖ·½·¨£º

£¨1£©Í¬²½·½Ê½

£¨2£©Òì²½·½Ê½

£¨3£©Oneway·½Ê½

ÆäÖУ¬Ê¹Óã¨1£©¡¢£¨2£©ÖÖ·½Ê½À´·¢ËÍÏûÏ¢±È½Ï³£¼û£¬¾ßÌåʹÓÃÄÄÒ»ÖÖ·½Ê½ÐèÒª¸ù¾ÝÒµÎñÇé¿öÀ´Åжϡ£±¾½ÚÄÚÈݽ«½áºÏͬ²½·¢ËÍ·½Ê½£¨Í¬²½·¢ËÍģʽÏ£¬Èç¹ûÓз¢ËÍʧ°ÜµÄ×î¶à»áÓÐ3´ÎÖØÊÔ£¨Ò²¿ÉÒÔ×Ô¼ºÉèÖã©£¬ÆäËûģʽ¾ù1´Î£©½øÐÐÏûÏ¢·¢ËͺËÐÄÁ÷³ÌµÄ¼òÎö¡£Ê¹ÓÃͬ²½·½Ê½·¢ËÍÏûÏ¢ºËÐÄÁ÷³ÌµÄÈë¿ÚÈçÏ£º

/**
* ͬ²½·½Ê½·¢ËÍÏûÏ¢ºËÐÄÁ÷³ÌµÄÈë¿Ú,ĬÈϳ¬Ê±Ê±¼äΪ3s
*
* @param msg ·¢ËÍÏûÏ¢µÄ¾ßÌåMessageÄÚÈÝ
* @param timeout ÆäÖз¢ËÍÏûÏ¢µÄ³¬Ê±Ê±¼ä¿ÉÒÔ²ÎÊýÉèÖÃ
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

3.2.1 ³¢ÊÔ»ñÈ¡TopicPublishInfoµÄ·ÓÉÐÅÏ¢

ÎÒÃÇÒ»²½²½debug½øÈ¥ºó»á·¢ÏÖÔÚsendDefaultImpl()·½·¨ÖÐÏȶԴý·¢Ë͵ÄÏûÏ¢½øÐÐǰÖõÄÑéÖ¤¡£Èç¹ûÏûÏ¢µÄTopicºÍBody¾ùûÓÐÎÊÌâµÄ»°£¬ÄÇô»áµ÷ÓáªtryToFindTopicPublishInfo()·½·¨£¬¸ù¾Ý´ý·¢ËÍÏûÏ¢µÄÖаüº¬µÄTopic³¢ÊÔ´ÓClient¶ËµÄ±¾µØ»º´æ±äÁ¿¡ªtopicPublishInfoTableÖвéÕÒ£¬Èç¹ûûÓÐÔò»á´ÓNameServerÉϸüÐÂTopicµÄ·ÓÉÐÅÏ¢£¨ÆäÖУ¬µ÷ÓÃÁËMQClientInstanceʵÀýµÄupdateTopicRouteInfoFromNameServer·½·¨£¬×îÖÕÖ´ÐеÄÊÇMQClientAPIImplʵÀýµÄgetTopicRouteInfoFromNameServer·½·¨£©£¬ÕâÀï·Ö±ð»á´æÔÚÒÔÏÂÁ½ÖÖ³¡¾°£º

£¨1£©Éú²úÕßµÚÒ»´Î·¢ËÍÏûÏ¢£¨´Ëʱ£¬TopicÔÚNameServerÖв¢²»´æÔÚ£©£ºÒòΪµÚÒ»´Î»ñȡʱºò²¢²»ÄÜ´ÓÔ¶¶ËµÄNameServerÉÏÀ­È¡ÏÂÀ´²¢¸üб¾µØ»º´æ±äÁ¿¡ªtopicPublishInfoTable³É¹¦¡£Òò´Ë£¬µÚ¶þ´ÎÐèҪͨ¹ýĬÈÏTopic¡ªTBW102µÄTopicRouteData±äÁ¿À´¹¹ÔìTopicPublishInfo¶ÔÏ󣬲¢¸üÐÂDefaultMQProducerImplʵÀýµÄ±¾µØ»º´æ±äÁ¿¡ª¡ªtopicPublishInfoTable¡£

ÁíÍ⣬ÔÚ¸ÃÖÖÀàÐ͵ij¡¾°Ï£¬µ±ÏûÏ¢·¢ËÍÖÁBroker´úÀí·þÎñÆ÷ʱ£¬ÔÚSendMessageProcessorÒµÎñ´¦ÀíÆ÷µÄsendBatchMessage/sendMessage·½·¨ÀïÃæµÄsuper.msgCheck(ctx, requestHeader, response)ÏûϢǰÖÃУÑéÖУ¬»áµ÷ÓÃTopicConfigManagerµÄcreateTopicInSendMessageMethod·½·¨£¬ÔÚBroker¶ËÍê³ÉÐÂTopicµÄ´´½¨²¢³Ö¾Ã»¯ÖÁÅäÖÃÎļþÖУ¨ÅäÖÃÎļþ·¾¶£º{rocketmq.home.dir}/store/config/topics.json£©¡££¨ps£º¸Ã²¿·ÖÄÚÈÝÆäʵÊôÓÚBrokerÓе㳬±¾ÆªµÄ·¶Î§£¬²»¹ýÓÉÓÚÉæ¼°ÐÂTopicµÄ´´½¨Òò´ËÔÚÂÔ΢ÌáÁËÏ£©

£¨2£©Éú²úÕß·¢ËÍTopicÒÑ´æÔÚµÄÏûÏ¢£ºÓÉÓÚÔÚNameServerÖÐÒѾ­´æÔÚÁ˸ÃTopic£¬Òò´ËÔÚµÚÒ»´Î»ñȡʱºò¾ÍÄܹ»È¡µ½²¢ÇÒ¸üÐÂÖÁ±¾µØ»º´æ±äÁ¿ÖÐtopicPublishInfoTable£¬ËæºótryToFindTopicPublishInfo·½·¨Ö±½Ó¿ÉÒÔreturn¡£

ÔÚRocketMQÖиò¿·ÖµÄºËÐÄ·½·¨Ô´ÂëÈçÏ£¨ÒѾ­¼ÓÁË×¢ÊÍ£©£º

/**
* ¸ù¾ÝmsgµÄtopic´ÓtopicPublishInfoTable»ñÈ¡¶ÔÓ¦µÄtopicPublishInfo
* Èç¹ûûÓÐÔò¸üзÓÉÐÅÏ¢£¬´Ónameserver¶ËÀ­È¡×îзÓÉÐÅÏ¢
*
* topicPublishInfo
*
* @param topic
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(
final String topic) {
//step1.ÏÈ´Ó±¾µØ»º´æ±äÁ¿topicPublishInfoTableÖÐÏÈgetÒ»´Î
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.
ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//step1.2 È»ºó´ÓnameServerÉϸüÐÂtopic·ÓÉÐÅÏ¢
this.mQClientFactory.updateTopicRo
uteInfoFromNameServer(topic);
//step2 È»ºóÔÙ´Ó±¾µØ»º´æ±äÁ¿topicPublishInfoTableÖÐ
ÔÙgetÒ»´Î
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
/**
* µÚÒ»´ÎµÄʱºòisDefaultΪfalse£¬µÚ¶þ´ÎµÄʱºòdefaultΪtrue£¬
¼´ÎªÓÃĬÈϵÄtopicµÄ²ÎÊý½øÐиüÐÂ
*/
this.mQClientFactory.updateTopicR
outeInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

/**
* ±¾µØ»º´æÖв»´æÔÚʱ´ÓÔ¶¶ËµÄNameServer×¢²áÖÐÐÄÖÐÀ­È¡Topic·ÓÉÐÅÏ¢
*
* @param topic
* @param timeoutMillis
* @param allowTopicNotExist
* @return
* @throws MQClientException
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final
String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
//ÉèÖÃÇëÇóÍ·ÖеÄTopic²ÎÊýºó£¬·¢ËÍ»ñÈ¡Topic·ÓÉÐÅÏ¢µÄrequest
ÇëÇó¸øNameServer
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode
.GET_ROUTEINTO_BY_TOPIC, requestHeader);
//ÕâÀïÓÉÓÚÊÇͬ²½·½Ê½·¢ËÍ£¬ËùÒÔÖ±½Óreturn responseµÄÏìÓ¦
RemotingCommand response = this.remotingClient.invokeSync(null,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
//Èç¹ûNameServerÖв»´æÔÚ´ý·¢ËÍÏûÏ¢µÄTopic
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll
.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer
is not exist value", topic);
}
break;
}
//Èç¹û»ñÈ¡Topic´æÔÚ£¬Ôò³É¹¦·µ»Ø£¬ÀûÓÃTopicRouteData½øÐÐ
½âÂ룬ÇÒÖ±½Ó·µ»ØTopicRouteData
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(),
response.getRemark());
}

½«TopicRouteDataת»»ÖÁTopicPublishInfo·ÓÉÐÅÏ¢µÄÓ³ÉäͼÈçÏ£º

ClientÖÐTopicRouteDataµ½TopicPublishInfoµÄÓ³Éä.jpg

ÆäÖУ¬ÉÏÃæµÄTopicRouteDataºÍTopicPublishInfo·ÓÉÐÅÏ¢±äÁ¿´óÖÂÈçÏ£º

TopicRouteData±äÁ¿ÄÚÈÝ.jpg

TopicPublishInfo±äÁ¿ÄÚÈÝ.jpg

3.2.2 Ñ¡ÔñÏûÏ¢·¢Ë͵ĶÓÁÐ

ÔÚ»ñÈ¡ÁËTopicPublishInfo·ÓÉÐÅÏ¢ºó£¬RocketMQµÄ¿Í»§¶ËÔÚĬÈÏ·½Ê½Ï£¬selectOneMessageQueuef()·½·¨»á´ÓTopicPublishInfoÖеÄmessageQueueListÖÐÑ¡ÔñÒ»¸ö¶ÓÁУ¨MessageQueue£©½øÐз¢ËÍÏûÏ¢¡£¾ßÌåµÄÈÝ´í²ßÂÔ¾ùÔÚMQFaultStrategyÕâ¸öÀàÖж¨Ò壺

public class MQFaultStrategy {
//ά»¤Ã¿¸öBroker·¢ËÍÏûÏ¢µÄÑÓ³Ù
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//·¢ËÍÏûÏ¢ÑÓ³ÙÈÝ´í¿ª¹Ø
private boolean sendLatencyFaultEnable = false;
//ÑÓ³Ù¼¶±ðÊý×é
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//²»¿ÉÓÃʱ³¤Êý×é
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
......
}

ÕâÀïͨ¹ýÒ»¸ösendLatencyFaultEnable¿ª¹ØÀ´½øÐÐÑ¡Ôñ²ÉÓÃÏÂÃæÄÄÖÖ·½Ê½£º

£¨1£©sendLatencyFaultEnable¿ª¹Ø´ò¿ª£ºÔÚËæ»úµÝÔöȡģµÄ»ù´¡ÉÏ£¬ÔÙ¹ýÂ˵ônot availableµÄBroker´úÀí¡£ËùνµÄ"latencyFaultTolerance"£¬ÊÇÖ¸¶Ô֮ǰʧ°ÜµÄ£¬°´Ò»¶¨µÄʱ¼ä×öÍ˱ܡ£ÀýÈ磬Èç¹ûÉÏ´ÎÇëÇóµÄlatency³¬¹ý550Lms£¬¾ÍÍ˱Ü3000Lms£»³¬¹ý1000L£¬¾ÍÍ˱Ü60000L¡£

£¨2£©sendLatencyFaultEnable¿ª¹Ø¹Ø±Õ£¨Ä¬ÈϹرգ©£º²ÉÓÃËæ»úµÝÔöȡģµÄ·½Ê½Ñ¡ÔñÒ»¸ö¶ÓÁУ¨MessageQueue£©À´·¢ËÍÏûÏ¢¡£

/**
* ¸ù¾ÝsendLatencyFaultEnable¿ª¹ØÊÇ·ñ´ò¿ªÀ´·ÖÁ½ÖÖÇé
¿öÑ¡Ôñ¶ÓÁз¢ËÍÏûÏ¢
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)
{
if (this.sendLatencyFaultEnable) {
try {
//1.ÔÚËæ»úµÝÔöȡģµÄ»ù´¡ÉÏ£¬ÔÙ¹ýÂ˵ônot available
µÄBroker´úÀí;¶Ô֮ǰʧ°ÜµÄ£¬°´Ò»¶¨µÄʱ¼ä×öÍ˱Ü
int index = tpInfo.getSendWhichQueue()
.getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().
size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().
get(pos);
if (latencyFaultTolerance.isAvailable(mq.
getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.
selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().
getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message
queue", e);
}
return tpInfo.selectOneMessageQueue();
}
//2.²ÉÓÃËæ»úµÝÔöȡģµÄ·½Ê½Ñ¡ÔñÒ»¸ö¶ÓÁÐ
£¨MessageQueue£©À´·¢ËÍÏûÏ¢
return tpInfo.selectOneMessageQueue
(lastBrokerName);
}

3.2.3 ·¢ËÍ·â×°ºóµÄRemotingCommandÊý¾Ý°ü

ÔÚÑ¡ÔñÍê·¢ËÍÏûÏ¢µÄ¶ÓÁкó£¬RocketMQ¾Í»áµ÷ÓÃsendKernelImpl()·½·¨·¢ËÍÏûÏ¢£¨¸Ã·½·¨Îª£¬Í¨¹ýRocketMQµÄRemotingͨÐÅÄ£¿éÕæÕý·¢ËÍÏûÏ¢µÄºËÐÄ£©¡£Ôڸ÷½·¨ÄÚ×ܹ²Íê³ÉÒÔϼ¸¸ö²½Á÷³Ì£º

£¨1£©¸ù¾ÝÇ°Ãæ»ñÈ¡µ½µÄMessageQueueÖеÄbrokerName£¬µ÷ÓÃMQClientInstanceʵÀýµÄfindBrokerAddressInPublish()·½·¨£¬µÃµ½´ý·¢ËÍÏûÏ¢Öдæ·ÅµÄBroker´úÀí·þÎñÆ÷µØÖ·£¬Èç¹ûûÓÐÕÒµ½Ôò¸úзÓÉÐÅÏ¢£»

£¨2£©Èç¹ûûÓнûÓã¬Ôò·¢ËÍÏûϢǰºó»áÓй³×Óº¯ÊýµÄÖ´ÐУ¨executeSendMessageHookBefore()/
executeSendMessageHookAfter()·½·¨£©£»

£¨3£©½«Óë¸ÃÏûÏ¢Ïà¹ØÐÅÏ¢·â×°³ÉRemotingCommandÊý¾Ý°ü£¬ÆäÖÐÇëÇóÂëRequestCodeΪÒÔϼ¸ÖÖÖ®Ò»£º

a.SEND_MESSAGE£¨ÆÕͨ·¢ËÍÏûÏ¢£©

b.SEND_MESSAGE_V2£¨ÓÅ»¯ÍøÂçÊý¾Ý°ü·¢ËÍ£©c.SEND_BATCH_MESSAGE£¨ÏûÏ¢ÅúÁ¿·¢ËÍ£©

£¨4£©¸ù¾Ý»ñÈ¡µ½µÄBroke´úÀí·þÎñÆ÷µØÖ·£¬½«·â×°ºÃµÄRemotingCommandÊý¾Ý°ü·¢ËͶÔÓ¦µÄBrokerÉÏ£¬Ä¬ÈÏ·¢Ëͳ¬Ê±¼äΪ3s£»

£¨5£©ÕâÀï£¬ÕæÕýµ÷ÓÃRocketMQµÄRemotingͨÐÅÄ£¿éÍê³ÉÏûÏ¢·¢ËÍÊÇÔÚMQClientAPIImplʵÀýsendMessageSync()·½·¨ÖУ¬´úÂë¾ßÌåÈçÏ£º

private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}

£¨6£©processSendResponse·½·¨¶Ô·¢ËÍÕý³£ºÍÒì³£Çé¿ö·Ö±ð½øÐв»Í¬µÄ´¦Àí²¢·µ»ØsendResult¶ÔÏó£»

£¨7£©·¢ËÍ·µ»Øºó£¬µ÷ÓÃupdateFaultItem¸üÐÂBroker´úÀí·þÎñÆ÷µÄ¿ÉÓÃʱ¼ä£»

£¨8£©¶ÔÓÚÒì³£Çé¿ö£¬ÇÒ±ê־λ¡ªretryAnotherBrokerWhenNotStoreOK£¬ÉèÖÃΪtrueʱ£¬ÔÚ·¢ËÍʧ°ÜµÄʱºò£¬»áÑ¡Ôñ»»Ò»¸öBroker£»

ÔÚÉú²úÕß·¢ËÍÍê³ÉÏûÏ¢ºó£¬¿Í»§¶ËÈÕÖ¾´òÓ¡ÈçÏ£º

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]

3.3 Broker´úÀí·þÎñÆ÷µÄÏûÏ¢´¦Àí¼òÎö

Broker´úÀí·þÎñÆ÷ÖдæÔںܶàProcessorÒµÎñ´¦ÀíÆ÷£¬ÓÃÓÚ´¦Àí²»Í¬ÀàÐ͵ÄÇëÇ󣬯äÖÐÒ»¸ö»òÕß¶à¸öProcessor»á¹²ÓÃÒ»¸öÒµÎñ´¦ÀíÆ÷Ï̳߳ء£¶ÔÓÚ½ÓÊÕµ½ÏûÏ¢£¬Broker»áʹÓÃSendMessageProcessorÕâ¸öÒµÎñ´¦ÀíÆ÷À´´¦Àí¡£SendMessageProcessor»áÒÀ´Î×öÒÔÏ´¦Àí£º

£¨1£©ÏûϢǰÖÃУÑ飬°üÀ¨brokerÊÇ·ñ¿Éд¡¢Ð£ÑéqueueIdÊÇ·ñ³¬¹ýÖ¸¶¨´óС¡¢ÏûÏ¢ÖеÄTopic·ÓÉÐÅÏ¢ÊÇ·ñ´æÔÚ£¬Èç¹û²»´æÔÚ¾Íн¨Ò»¸ö¡£ÕâÀïÓëÉÏÎÄÖС°³¢ÊÔ»ñÈ¡TopicPublishInfoµÄ·ÓÉÐÅÏ¢¡±Ò»½ÚÖнéÉܵÄÄÚÈݶÔÓ¦¡£Èç¹ûTopic·ÓÉÐÅÏ¢²»´æÔÚ£¬ÔòBroker¶ËÈÕÖ¾Êä³öÈçÏ£º

2018-06-14 17:17:24 INFO SendMessageThread_1 - receive
SendMessage request command, RemotingCommand [code=310,
language=JAVA, version=252, opaque=6, flag(B)=0,
remark=null, extFields={a=ProducerGroupName,
b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569,
h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD
46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-06-14 17:17:24 WARN SendMessageThread_1 -
the topic TopicTest not exist, producer: /172.20.21.162:62661
2018-06-14 17:17:24 INFO SendMessageThread_1 -
Create new topic by default topic:[TBW102] config:
[TopicConfig [topicName=TopicTest, readQueueNums=4,
writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG,
topicSysFlag=0, order=false]] producer:[172.20.21.162:62661]

Topic·ÓÉÐÅϢн¨ºó£¬µÚ¶þ´ÎÏûÏ¢·¢Ëͺó£¬Broker¶ËÈÕÖ¾Êä³öÈçÏ£º

2018-08-02 16:26:13 INFO SendMessageThread_1 -
receive SendMessage request command,
RemotingCommand [code=310, language=JAVA,
version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest,
c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC20
8AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-08-02 16:26:13 INFO SendMessageThread_1 -
the msgInner's content is:MessageExt [queueId=2,
storeSize=0, queueOffset=0, sysFlag=0,
bornTimestamp=1533198373524,
bornHost=/172.20.21.162:53914, storeTimestamp=0, storeHost=/172.20.21.162:10911, msgId=null,
commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message
[topic=TopicTest, flag=0, properties={KEYS=OrderID188, UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true,
TAGS=TagA}, body=11body's content is:Hello world]]

£¨2£©¹¹½¨MessageExtBrokerInner£»

£¨3£©µ÷Óá°brokerController.getMessageStore().putMessage¡±
½«MessageExtBrokerInner×öÂäÅ̳־û¯´¦Àí£»

£¨4£©¸ù¾ÝÏûÏ¢ÂäÅ̽á¹û£¨Õý³£/Òì³£Çé¿ö£©£¬BrokerStatsManager×öһЩͳ¼ÆÊý¾ÝµÄ¸üУ¬×îºóÉèÖÃResponse²¢·µ»Ø£»

ËÄ¡¢×ܽá

ʹÓÃRocketMQµÄ¿Í»§¶Ë·¢ËÍÆÕͨÏûÏ¢µÄÁ÷³Ì´ó¸Åµ½ÕâÀï¾Í·ÖÎöÍê³É¡£¹ØÓÚ˳ÐòÏûÏ¢¡¢·Ö²¼Ê½ÊÂÎñÏûÏ¢µÈÄÚÈݽ«ÔÚºóÐøÆª·ùÖÐÂ½Ðø½éÉÜ£¬¾´ÇëÆÚ´ý¡£ÏÞÓÚ±ÊÕߵIJÅÊèѧdz£¬¶Ô±¾ÎÄÄÚÈÝ¿ÉÄÜ»¹ÓÐÀí½â²»µ½Î»µÄµØ·½£¬ÈçÓвûÊö²»ºÏÀíÖ®´¦»¹ÍûÁôÑÔÒ»Æð̽ÌÖ¡£

   
3020 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

ÆóÒµ¼Ü¹¹¡¢TOGAFÓëArchiMate¸ÅÀÀ
¼Ü¹¹Ê¦Ö®Â·-ÈçºÎ×öºÃÒµÎñ½¨Ä££¿
´óÐÍÍøÕ¾µçÉÌÍøÕ¾¼Ü¹¹°¸ÀýºÍ¼¼Êõ¼Ü¹¹µÄʾÀý
ÍêÕûµÄArchimateÊÓµãÖ¸ÄÏ£¨°üÀ¨Ê¾Àý£©
Ïà¹ØÎĵµ

Êý¾ÝÖÐ̨¼¼Êõ¼Ü¹¹·½·¨ÂÛÓëʵ¼ù
ÊÊÓÃArchiMate¡¢EA ºÍ iSpace½øÐÐÆóÒµ¼Ü¹¹½¨Ä£
ZachmanÆóÒµ¼Ü¹¹¿ò¼Ü¼ò½é
ÆóÒµ¼Ü¹¹ÈÃSOAÂ䵨
Ïà¹Ø¿Î³Ì

ÔÆÆ½Ì¨Óë΢·þÎñ¼Ü¹¹Éè¼Æ
ÖÐ̨սÂÔ¡¢ÖÐ̨½¨ÉèÓëÊý×ÖÉÌÒµ
ÒÚ¼¶Óû§¸ß²¢·¢¡¢¸ß¿ÉÓÃϵͳ¼Ü¹¹
¸ß¿ÉÓ÷ֲ¼Ê½¼Ü¹¹Éè¼ÆÓëʵ¼ù