±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚjianshu£¬ÎÄÕ½éÉÜÁËRocketMQÍøÂç¼Ü¹¹Í¼ÒÔ¼°RocketMQ·¢ËÍÆÕͨÏûÏ¢µÄÈ«Á÷³Ì½â¶ÁµÈÏà¹ØÄÚÈÝ¡£
|
|
ÕªÒª£ºÊ¹Óÿͻ§¶Ë·¢ËÍÒ»ÌõÏûÏ¢ºÜEasy£¬ÔÚÕâ±³ºóRocketMQÍê³ÉÁËÔõôÑùµÄ²Ù×÷ÄØ£¿
´óµÀÖÁ¼ò£¬ÏûÏ¢¶ÓÁпÉÒÔ¼òµ¥¸ÅÀ¨Îª£º¡°Ò»·¢Ò»´æÒ»ÊÕ¡±£¬ÔÚÕâÈý¸ö¹ý³ÌÖÐÏûÏ¢·¢ËÍ×îΪ¼òµ¥£¬Ò²±È½ÏÈÝÒ×ÈëÊÖ£¬ÊʺϳõÖн×ͯЬ×÷ΪMQÑо¿ºÍѧϰµÄÇÐÈëµã¡£Òò´Ë£¬±¾ÆªÖ÷Òª´ÓÒ»ÌõÏûÏ¢·¢ËÍΪÇÐÈëµã£¬Ïêϸ²ûÊöÔÚRocketMQÕâ¿î·Ö²¼Ê½ÏûÏ¢¶ÓÁÐÖз¢ËÍÒ»ÌõÆÕͨÏûÏ¢µÄ´óÖÂÁ÷³ÌºÍϸ½Ú¡£
Ò»¡¢RocketMQÍøÂç¼Ü¹¹Í¼
RocketMQ·Ö²¼Ê½ÏûÏ¢¶ÓÁеÄÍøÂ粿Êð¼Ü¹¹Í¼ÈçÏÂͼËùʾ£¨ÆäÖУ¬°üº¬ÁËÉú²úÕßProducer·¢ËÍÆÕͨÏûÏ¢ÖÁ¼¯ÈºµÄÁ½ÌõÖ÷Ïߣ©

RocketMQ²¿Êð¼Ü¹¹.jpg
¶ÔÓÚÉÏͼÖм¸¸ö½ÇÉ«µÄ˵Ã÷£º
£¨1£©NameServer£ºRocketMQ¼¯ÈºµÄÃüÃû·þÎñÆ÷£¨Ò²¿ÉÒÔ˵ÊÇ×¢²áÖÐÐÄ£©£¬Ëü±¾ÉíÊÇÎÞ״̬µÄ£¨Êµ¼ÊÇé¿öÏ¿ÉÄÜ´æÔÚÿ¸öNameServerʵÀýÉϵÄÊý¾ÝÓжÌÔݵIJ»Ò»ÖÂÏÖÏ󣬵«ÊÇͨ¹ý¶¨Ê±¸üУ¬Ôڴ󲿷ÖÇé¿ö϶¼ÊÇÒ»Öµģ©£¬ÓÃÓÚ¹ÜÀí¼¯ÈºµÄÔªÊý¾Ý£¨
ÀýÈ磬KVÅäÖá¢Topic¡¢BrokerµÄ×¢²áÐÅÏ¢£©¡£
£¨2£©Broker£¨Master£©£ºRocketMQÏûÏ¢´úÀí·þÎñÆ÷Ö÷½Úµã£¬Æðµ½´®ÁªProducerµÄÏûÏ¢·¢ËͺÍConsumerµÄÏûÏ¢Ïû·Ñ£¬ºÍ½«ÏûÏ¢µÄÂäÅÌ´æ´¢µÄ×÷Óã»
£¨3£©Broker£¨Slave£©£ºRocketMQÏûÏ¢´úÀí·þÎñÆ÷±¸·Ý½Úµã£¬Ö÷ÒªÊÇͨ¹ýͬ²½/Òì²½µÄ·½Ê½½«Ö÷½ÚµãµÄÏûϢͬ²½¹ýÀ´½øÐб¸·Ý£¬ÎªRocketMQ¼¯ÈºµÄ¸ß¿ÉÓÃÐÔÌṩ±£ÕÏ£»
£¨4£©Producer£¨ÏûÏ¢Éú²úÕߣ©£ºÔÚÕâÀïΪÆÕͨÏûÏ¢µÄÉú²úÕߣ¬Ö÷Òª»ùÓÚRocketMQ-ClientÄ£¿é½«ÏûÏ¢·¢ËÍÖÁRocketMQµÄÖ÷½Úµã¡£
¶ÔÓÚÉÏÃæÍ¼Öм¸ÌõͨÐÅÁ´Â·µÄ¹ØÏµ£º
£¨1£©ProducerÓëNamerServer£ºÃ¿Ò»¸öProducer»áÓëNameServer¼¯ÈºÖеÄÒ»¸öʵÀý½¨Á¢TCPÁ¬½Ó£¬´ÓÕâ¸öNameServerʵÀýÉÏÀÈ¡Topic·ÓÉÐÅÏ¢£»
£¨2£©ProducerºÍBroker:Producer»áºÍËüÒª·¢Ë͵ÄtopicÏà¹ØÁªµÄMasterµÄBroker´úÀí·þÎñÆ÷½¨Á¢TCPÁ¬½Ó£¬ÓÃÓÚ·¢ËÍÏûÏ¢ÒÔ¼°¶¨Ê±µÄÐÄÌøÐÅÏ¢£»
£¨3£©BrokerºÍNamerServer£ºBroker£¨Master or Slave£©¾ù»áºÍÿһ¸öNameServerʵÀýÀ´½¨Á¢TCPÁ¬½Ó¡£BrokerÔÚÆô¶¯µÄʱºò»á×¢²á×Ô¼ºÅäÖõÄTopicÐÅÏ¢µ½NameServer¼¯ÈºµÄÿһ̨»úÆ÷ÖС£¼´Ã¿Ò»¸öNameServer¾ùÓиÃbrokerµÄTopic·ÓÉÅäÖÃÐÅÏ¢¡£ÆäÖУ¬MasterÓëMasterÖ®¼äÎÞÁ¬½Ó£¬MasterÓëSlaveÖ®¼äÓÐÁ¬½Ó£»
¶þ¡¢¿Í»§¶Ë·¢ËÍÆÕͨÏûÏ¢µÄdemo·½·¨
ÔÚRocketMQÔ´Â빤³ÌµÄexample°üϾÍÓÐ×îΪ¼òµ¥µÄ·¢ËÍÆÕͨÏûÏ¢µÄÑùÀý´úÂ루ps£º¶ÔÓÚ¸Õ¸Õ½Ó´¥RocketMQµÄͯЬʹÓÃÕâ¸ö°üÏÂÃæµÄÑùÀý´úÂë½øÐÐϵͳÐÔµÄѧϰºÍµ÷ÊÔ£©¡£
ÎÒÃÇ¿ÉÒÔÖ±½ÓrunÏ¡°org.apache.rocketmq.example.simple¡±°üÏÂProducerÀàµÄmain·½·¨¼´¿ÉÍê³ÉÒ»´ÎÆÕͨÏûÏ¢µÄ·¢ËÍ£¨Ö÷Òª´úÂëÈçÏ£¬ÔÚÕâÀïÐè±¾µØ½«NameServerºÍBrokerʵÀý¾ù²¿ÊðÆðÀ´£©£º
//step1.Æô¶¯DefaultMQProducer£¬Æô¶¯Ê±µÄ¾ßÌåÁ÷³ÌÒ»»á¶ù»áÏêϸ˵Ã÷
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
try {
{
//step2.·â×°½«Òª·¢ËÍÏûÏ¢µÄÄÚÈÝ
Message msg = new Message("TopicTest",
"TagA", "OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//step3.·¢ËÍÏûÏ¢Á÷³Ì£¬¾ßÌåÁ÷³Ì´ý»á¶ù˵
SendResult sendResult = producer.send(msg);
}
} catch (Exception e) {
//Exception code
}
producer.shutdown(); |
Èý¡¢RocketMQ·¢ËÍÆÕͨÏûÏ¢µÄÈ«Á÷³Ì½â¶Á
´ÓÉÏÃæÒ»½ÚÖпÉÒÔ¿´³ö£¬ÏûÏ¢Éú²úÕß·¢ËÍÏûÏ¢µÄdemo´úÂ뻹ÊǽÏΪ¼òµ¥µÄ£¬ºËÐľͼ¸ÐдúÂ룬µ«ÔÚÉîÈëÑжÁRocketMQµÄClientÄ£¿éºó£¬·¢ÏÖÆä·¢ËÍÏûÏ¢µÄºËÐÄÁ÷³Ì»¹ÊÇÓÐһЩ¸´Ôӵġ£ÏÂÃæ½«Ö÷Òª´ÓDefaultMQProducerµÄÆô¶¯Á÷³Ì¡¢send·¢ËÍ·½·¨ºÍBroker´úÀí·þÎñÆ÷µÄÏûÏ¢´¦ÀíÈý·½Ãæ·Ö±ð½øÐзÖÎöºÍ²ûÊö¡£
3.1 DefaultMQProducerµÄÆô¶¯Á÷³Ì
ÔÚ¿Í»§¶Ë·¢ËÍÆÕͨÏûÏ¢µÄdemo´úÂ벿·Ö£¬ÎÒÃÇÏÈÊǽ«DefaultMQProducerʵÀýÆô¶¯ÆðÀ´£¬ÀïÃæµ÷ÓÃÁËĬÈÏÉú³ÉÏûÏ¢µÄʵÏÖÀࡪDefaultMQProducerImplµÄstart()·½·¨¡£
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
} |
ĬÈÏÉú³ÉÏûÏ¢µÄʵÏÖÀࡪDefaultMQProducerImplµÄÆô¶¯Ö÷ÒªÁ÷³ÌÈçÏ£º
£¨1£©³õʼ»¯µÃµ½MQClientInstanceʵÀý¶ÔÏ󣬲¢×¢²áÖÁ±¾µØ»º´æ±äÁ¿¡ªproducerTableÖУ»
£¨2£©½«Ä¬ÈÏTopic£¨¡°TBW102¡±£©±£´æÖÁ±¾µØ»º´æ±äÁ¿¡ªtopicPublishInfoTableÖУ»
£¨3£©MQClientInstanceʵÀý¶ÔÏóµ÷ÓÃ×Ô¼ºµÄstart()·½·¨£¬Æô¶¯Ò»Ð©¿Í»§¶Ë±¾µØµÄ·þÎñỊ̈߳¬ÈçÀÈ¡ÏûÏ¢·þÎñ¡¢¿Í»§¶ËÍøÂçͨÐÅ·þÎñ¡¢ÖØÐ¸ºÔؾùºâ·þÎñÒÔ¼°ÆäËûÈô¸É¸ö¶¨Ê±ÈÎÎñ£¨°üÀ¨£¬¸üзÓÉ/ÇåÀíÏÂÏßBroker/·¢ËÍÐÄÌø/³Ö¾Ã»¯consumerOffset/µ÷ÕûÏ̳߳أ©£¬²¢ÖØÐÂ×öÒ»´ÎÆô¶¯£¨Õâ´Î²ÎÊýΪfalse£©£»
£¨4£©×îºóÏòËùÓеÄBroker´úÀí·þÎñÆ÷½Úµã·¢ËÍÐÄÌø°ü£»
×ܽáÆðÀ´£¬DefaultMQProducerµÄÖ÷ÒªÆô¶¯Á÷³ÌÈçÏ£º

DefaultMQProducerµÄstart·½·¨Æô¶¯¹ý³Ì.jpg
ÕâÀïÓÐÒÔϼ¸µãÐèҪ˵Ã÷£º
£¨1£©ÔÚÒ»¸ö¿Í»§¶ËÖУ¬Ò»¸öproducerGroupÖ»ÄÜÓÐÒ»¸öʵÀý£»
£¨2£©¸ù¾Ý²»Í¬µÄclientId£¬MQClientManager½«¸ø³ö²»Í¬µÄMQClientInstance£»
£¨3£©¸ù¾Ý²»Í¬µÄproducerGroup£¬MQClientInstance½«¸ø³ö²»Í¬µÄMQProducerºÍMQConsumer£¨±£´æÔÚ±¾µØ»º´æ±äÁ¿¡ª¡ªproducerTableºÍconsumerTableÖУ©£»
3.2 send·¢ËÍ·½·¨µÄºËÐÄÁ÷³Ì
ͨ¹ýRocketmqµÄ¿Í»§¶ËÄ£¿é·¢ËÍÏûÏ¢Ö÷ÒªÓÐÒÔÏÂÈýÖÖ·½·¨£º
£¨1£©Í¬²½·½Ê½
£¨2£©Òì²½·½Ê½
£¨3£©Oneway·½Ê½
ÆäÖУ¬Ê¹Óã¨1£©¡¢£¨2£©ÖÖ·½Ê½À´·¢ËÍÏûÏ¢±È½Ï³£¼û£¬¾ßÌåʹÓÃÄÄÒ»ÖÖ·½Ê½ÐèÒª¸ù¾ÝÒµÎñÇé¿öÀ´Åжϡ£±¾½ÚÄÚÈݽ«½áºÏͬ²½·¢ËÍ·½Ê½£¨Í¬²½·¢ËÍģʽÏ£¬Èç¹ûÓз¢ËÍʧ°ÜµÄ×î¶à»áÓÐ3´ÎÖØÊÔ£¨Ò²¿ÉÒÔ×Ô¼ºÉèÖã©£¬ÆäËûģʽ¾ù1´Î£©½øÐÐÏûÏ¢·¢ËͺËÐÄÁ÷³ÌµÄ¼òÎö¡£Ê¹ÓÃͬ²½·½Ê½·¢ËÍÏûÏ¢ºËÐÄÁ÷³ÌµÄÈë¿ÚÈçÏ£º
/**
* ͬ²½·½Ê½·¢ËÍÏûÏ¢ºËÐÄÁ÷³ÌµÄÈë¿Ú,ĬÈϳ¬Ê±Ê±¼äΪ3s
*
* @param msg ·¢ËÍÏûÏ¢µÄ¾ßÌåMessageÄÚÈÝ
* @param timeout ÆäÖз¢ËÍÏûÏ¢µÄ³¬Ê±Ê±¼ä¿ÉÒÔ²ÎÊýÉèÖÃ
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC,
null, timeout);
} |
3.2.1 ³¢ÊÔ»ñÈ¡TopicPublishInfoµÄ·ÓÉÐÅÏ¢
ÎÒÃÇÒ»²½²½debug½øÈ¥ºó»á·¢ÏÖÔÚsendDefaultImpl()·½·¨ÖÐÏȶԴý·¢Ë͵ÄÏûÏ¢½øÐÐǰÖõÄÑéÖ¤¡£Èç¹ûÏûÏ¢µÄTopicºÍBody¾ùûÓÐÎÊÌâµÄ»°£¬ÄÇô»áµ÷ÓáªtryToFindTopicPublishInfo()·½·¨£¬¸ù¾Ý´ý·¢ËÍÏûÏ¢µÄÖаüº¬µÄTopic³¢ÊÔ´ÓClient¶ËµÄ±¾µØ»º´æ±äÁ¿¡ªtopicPublishInfoTableÖвéÕÒ£¬Èç¹ûûÓÐÔò»á´ÓNameServerÉϸüÐÂTopicµÄ·ÓÉÐÅÏ¢£¨ÆäÖУ¬µ÷ÓÃÁËMQClientInstanceʵÀýµÄupdateTopicRouteInfoFromNameServer·½·¨£¬×îÖÕÖ´ÐеÄÊÇMQClientAPIImplʵÀýµÄgetTopicRouteInfoFromNameServer·½·¨£©£¬ÕâÀï·Ö±ð»á´æÔÚÒÔÏÂÁ½ÖÖ³¡¾°£º
£¨1£©Éú²úÕßµÚÒ»´Î·¢ËÍÏûÏ¢£¨´Ëʱ£¬TopicÔÚNameServerÖв¢²»´æÔÚ£©£ºÒòΪµÚÒ»´Î»ñȡʱºò²¢²»ÄÜ´ÓÔ¶¶ËµÄNameServerÉÏÀÈ¡ÏÂÀ´²¢¸üб¾µØ»º´æ±äÁ¿¡ªtopicPublishInfoTable³É¹¦¡£Òò´Ë£¬µÚ¶þ´ÎÐèҪͨ¹ýĬÈÏTopic¡ªTBW102µÄTopicRouteData±äÁ¿À´¹¹ÔìTopicPublishInfo¶ÔÏ󣬲¢¸üÐÂDefaultMQProducerImplʵÀýµÄ±¾µØ»º´æ±äÁ¿¡ª¡ªtopicPublishInfoTable¡£
ÁíÍ⣬ÔÚ¸ÃÖÖÀàÐ͵ij¡¾°Ï£¬µ±ÏûÏ¢·¢ËÍÖÁBroker´úÀí·þÎñÆ÷ʱ£¬ÔÚSendMessageProcessorÒµÎñ´¦ÀíÆ÷µÄsendBatchMessage/sendMessage·½·¨ÀïÃæµÄsuper.msgCheck(ctx,
requestHeader, response)ÏûϢǰÖÃУÑéÖУ¬»áµ÷ÓÃTopicConfigManagerµÄcreateTopicInSendMessageMethod·½·¨£¬ÔÚBroker¶ËÍê³ÉÐÂTopicµÄ´´½¨²¢³Ö¾Ã»¯ÖÁÅäÖÃÎļþÖУ¨ÅäÖÃÎļþ·¾¶£º{rocketmq.home.dir}/store/config/topics.json£©¡££¨ps£º¸Ã²¿·ÖÄÚÈÝÆäʵÊôÓÚBrokerÓе㳬±¾ÆªµÄ·¶Î§£¬²»¹ýÓÉÓÚÉæ¼°ÐÂTopicµÄ´´½¨Òò´ËÔÚÂÔ΢ÌáÁËÏ£©
£¨2£©Éú²úÕß·¢ËÍTopicÒÑ´æÔÚµÄÏûÏ¢£ºÓÉÓÚÔÚNameServerÖÐÒѾ´æÔÚÁ˸ÃTopic£¬Òò´ËÔÚµÚÒ»´Î»ñȡʱºò¾ÍÄܹ»È¡µ½²¢ÇÒ¸üÐÂÖÁ±¾µØ»º´æ±äÁ¿ÖÐtopicPublishInfoTable£¬ËæºótryToFindTopicPublishInfo·½·¨Ö±½Ó¿ÉÒÔreturn¡£
ÔÚRocketMQÖиò¿·ÖµÄºËÐÄ·½·¨Ô´ÂëÈçÏ£¨ÒѾ¼ÓÁË×¢ÊÍ£©£º
/**
* ¸ù¾ÝmsgµÄtopic´ÓtopicPublishInfoTable»ñÈ¡¶ÔÓ¦µÄtopicPublishInfo
* Èç¹ûûÓÐÔò¸üзÓÉÐÅÏ¢£¬´Ónameserver¶ËÀÈ¡×îзÓÉÐÅÏ¢
*
* topicPublishInfo
*
* @param topic
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo( final
String topic) {
//step1.ÏÈ´Ó±¾µØ»º´æ±äÁ¿topicPublishInfoTableÖÐÏÈgetÒ»´Î
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo. ok())
{
this.topicPublishInfoTable.putIfAbsent(topic,
new TopicPublishInfo());
//step1.2 È»ºó´ÓnameServerÉϸüÐÂtopic·ÓÉÐÅÏ¢
this.mQClientFactory.updateTopicRo uteInfoFromNameServer(topic);
//step2 È»ºóÔÙ´Ó±¾µØ»º´æ±äÁ¿topicPublishInfoTableÖÐ ÔÙgetÒ»´Î
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() ||
topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
/**
* µÚÒ»´ÎµÄʱºòisDefaultΪfalse£¬µÚ¶þ´ÎµÄʱºòdefaultΪtrue£¬ ¼´ÎªÓÃĬÈϵÄtopicµÄ²ÎÊý½øÐиüÐÂ
*/
this.mQClientFactory.updateTopicR outeInfoFromNameServer(topic,
true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
} |
/**
* ±¾µØ»º´æÖв»´æÔÚʱ´ÓÔ¶¶ËµÄNameServer×¢²áÖÐÐÄÖÐÀÈ¡Topic·ÓÉÐÅÏ¢
*
* @param topic
* @param timeoutMillis
* @param allowTopicNotExist
* @return
* @throws MQClientException
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final
String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException,
InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException
{
GetRouteInfoRequestHeader requestHeader = new
GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
//ÉèÖÃÇëÇóÍ·ÖеÄTopic²ÎÊýºó£¬·¢ËÍ»ñÈ¡Topic·ÓÉÐÅÏ¢µÄrequest ÇëÇó¸øNameServer
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode .GET_ROUTEINTO_BY_TOPIC,
requestHeader);
//ÕâÀïÓÉÓÚÊÇͬ²½·½Ê½·¢ËÍ£¬ËùÒÔÖ±½Óreturn responseµÄÏìÓ¦
RemotingCommand response = this.remotingClient.invokeSync(null,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
//Èç¹ûNameServerÖв»´æÔÚ´ý·¢ËÍÏûÏ¢µÄTopic
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll .DEFAULT_TOPIC))
{
log.warn("get Topic [{}] RouteInfoFromNameServer
is not exist value", topic);
}
break;
}
//Èç¹û»ñÈ¡Topic´æÔÚ£¬Ôò³É¹¦·µ»Ø£¬ÀûÓÃTopicRouteData½øÐÐ ½âÂ룬ÇÒÖ±½Ó·µ»ØTopicRouteData
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(),
response.getRemark());
} |
½«TopicRouteDataת»»ÖÁTopicPublishInfo·ÓÉÐÅÏ¢µÄÓ³ÉäͼÈçÏ£º

ClientÖÐTopicRouteDataµ½TopicPublishInfoµÄÓ³Éä.jpg
ÆäÖУ¬ÉÏÃæµÄTopicRouteDataºÍTopicPublishInfo·ÓÉÐÅÏ¢±äÁ¿´óÖÂÈçÏ£º

TopicRouteData±äÁ¿ÄÚÈÝ.jpg

TopicPublishInfo±äÁ¿ÄÚÈÝ.jpg
3.2.2 Ñ¡ÔñÏûÏ¢·¢Ë͵ĶÓÁÐ
ÔÚ»ñÈ¡ÁËTopicPublishInfo·ÓÉÐÅÏ¢ºó£¬RocketMQµÄ¿Í»§¶ËÔÚĬÈÏ·½Ê½Ï£¬selectOneMessageQueuef()·½·¨»á´ÓTopicPublishInfoÖеÄmessageQueueListÖÐÑ¡ÔñÒ»¸ö¶ÓÁУ¨MessageQueue£©½øÐз¢ËÍÏûÏ¢¡£¾ßÌåµÄÈÝ´í²ßÂÔ¾ùÔÚMQFaultStrategyÕâ¸öÀàÖж¨Ò壺
public class
MQFaultStrategy {
//ά»¤Ã¿¸öBroker·¢ËÍÏûÏ¢µÄÑÓ³Ù
private final LatencyFaultTolerance<String>
latencyFaultTolerance = new LatencyFaultToleranceImpl();
//·¢ËÍÏûÏ¢ÑÓ³ÙÈÝ´í¿ª¹Ø
private boolean sendLatencyFaultEnable = false;
//ÑÓ³Ù¼¶±ðÊý×é
private long[] latencyMax = {50L, 100L, 550L,
1000L, 2000L, 3000L, 15000L};
//²»¿ÉÓÃʱ³¤Êý×é
private long[] notAvailableDuration = {0L, 0L,
30000L, 60000L, 120000L, 180000L, 600000L};
......
} |
ÕâÀïͨ¹ýÒ»¸ösendLatencyFaultEnable¿ª¹ØÀ´½øÐÐÑ¡Ôñ²ÉÓÃÏÂÃæÄÄÖÖ·½Ê½£º
£¨1£©sendLatencyFaultEnable¿ª¹Ø´ò¿ª£ºÔÚËæ»úµÝÔöȡģµÄ»ù´¡ÉÏ£¬ÔÙ¹ýÂ˵ônot
availableµÄBroker´úÀí¡£ËùνµÄ"latencyFaultTolerance"£¬ÊÇÖ¸¶Ô֮ǰʧ°ÜµÄ£¬°´Ò»¶¨µÄʱ¼ä×öÍ˱ܡ£ÀýÈ磬Èç¹ûÉÏ´ÎÇëÇóµÄlatency³¬¹ý550Lms£¬¾ÍÍ˱Ü3000Lms£»³¬¹ý1000L£¬¾ÍÍ˱Ü60000L¡£
£¨2£©sendLatencyFaultEnable¿ª¹Ø¹Ø±Õ£¨Ä¬ÈϹرգ©£º²ÉÓÃËæ»úµÝÔöȡģµÄ·½Ê½Ñ¡ÔñÒ»¸ö¶ÓÁУ¨MessageQueue£©À´·¢ËÍÏûÏ¢¡£
/**
* ¸ù¾ÝsendLatencyFaultEnable¿ª¹ØÊÇ·ñ´ò¿ªÀ´·ÖÁ½ÖÖÇé ¿öÑ¡Ôñ¶ÓÁз¢ËÍÏûÏ¢
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final
TopicPublishInfo tpInfo, final String lastBrokerName)
{
if (this.sendLatencyFaultEnable) {
try {
//1.ÔÚËæ»úµÝÔöȡģµÄ»ù´¡ÉÏ£¬ÔÙ¹ýÂ˵ônot available µÄBroker´úÀí;¶Ô֮ǰʧ°ÜµÄ£¬°´Ò»¶¨µÄʱ¼ä×öÍ˱Ü
int index = tpInfo.getSendWhichQueue() .getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList(). size();
i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList(). get(pos);
if (latencyFaultTolerance.isAvailable(mq. getBrokerName()))
{
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo. selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue(). getAndIncrement()
% writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting
message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
//2.²ÉÓÃËæ»úµÝÔöȡģµÄ·½Ê½Ñ¡ÔñÒ»¸ö¶ÓÁÐ £¨MessageQueue£©À´·¢ËÍÏûÏ¢
return tpInfo.selectOneMessageQueue (lastBrokerName);
} |
3.2.3 ·¢ËÍ·â×°ºóµÄRemotingCommandÊý¾Ý°ü
ÔÚÑ¡ÔñÍê·¢ËÍÏûÏ¢µÄ¶ÓÁкó£¬RocketMQ¾Í»áµ÷ÓÃsendKernelImpl()·½·¨·¢ËÍÏûÏ¢£¨¸Ã·½·¨Îª£¬Í¨¹ýRocketMQµÄRemotingͨÐÅÄ£¿éÕæÕý·¢ËÍÏûÏ¢µÄºËÐÄ£©¡£Ôڸ÷½·¨ÄÚ×ܹ²Íê³ÉÒÔϼ¸¸ö²½Á÷³Ì£º
£¨1£©¸ù¾ÝÇ°Ãæ»ñÈ¡µ½µÄMessageQueueÖеÄbrokerName£¬µ÷ÓÃMQClientInstanceʵÀýµÄfindBrokerAddressInPublish()·½·¨£¬µÃµ½´ý·¢ËÍÏûÏ¢Öдæ·ÅµÄBroker´úÀí·þÎñÆ÷µØÖ·£¬Èç¹ûûÓÐÕÒµ½Ôò¸úзÓÉÐÅÏ¢£»
£¨2£©Èç¹ûûÓнûÓã¬Ôò·¢ËÍÏûϢǰºó»áÓй³×Óº¯ÊýµÄÖ´ÐУ¨executeSendMessageHookBefore()/ executeSendMessageHookAfter()·½·¨£©£»
£¨3£©½«Óë¸ÃÏûÏ¢Ïà¹ØÐÅÏ¢·â×°³ÉRemotingCommandÊý¾Ý°ü£¬ÆäÖÐÇëÇóÂëRequestCodeΪÒÔϼ¸ÖÖÖ®Ò»£º
a.SEND_MESSAGE£¨ÆÕͨ·¢ËÍÏûÏ¢£©
b.SEND_MESSAGE_V2£¨ÓÅ»¯ÍøÂçÊý¾Ý°ü·¢ËÍ£©c.SEND_BATCH_MESSAGE£¨ÏûÏ¢ÅúÁ¿·¢ËÍ£©
£¨4£©¸ù¾Ý»ñÈ¡µ½µÄBroke´úÀí·þÎñÆ÷µØÖ·£¬½«·â×°ºÃµÄRemotingCommandÊý¾Ý°ü·¢ËͶÔÓ¦µÄBrokerÉÏ£¬Ä¬ÈÏ·¢Ëͳ¬Ê±¼äΪ3s£»
£¨5£©ÕâÀï£¬ÕæÕýµ÷ÓÃRocketMQµÄRemotingͨÐÅÄ£¿éÍê³ÉÏûÏ¢·¢ËÍÊÇÔÚMQClientAPIImplʵÀýsendMessageSync()·½·¨ÖУ¬´úÂë¾ßÌåÈçÏ£º
private SendResult
sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException,
InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr,
request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg,
response);
} |
£¨6£©processSendResponse·½·¨¶Ô·¢ËÍÕý³£ºÍÒì³£Çé¿ö·Ö±ð½øÐв»Í¬µÄ´¦Àí²¢·µ»ØsendResult¶ÔÏó£»
£¨7£©·¢ËÍ·µ»Øºó£¬µ÷ÓÃupdateFaultItem¸üÐÂBroker´úÀí·þÎñÆ÷µÄ¿ÉÓÃʱ¼ä£»
£¨8£©¶ÔÓÚÒì³£Çé¿ö£¬ÇÒ±ê־λ¡ªretryAnotherBrokerWhenNotStoreOK£¬ÉèÖÃΪtrueʱ£¬ÔÚ·¢ËÍʧ°ÜµÄʱºò£¬»áÑ¡Ôñ»»Ò»¸öBroker£»
ÔÚÉú²úÕß·¢ËÍÍê³ÉÏûÏ¢ºó£¬¿Í»§¶ËÈÕÖ¾´òÓ¡ÈçÏ£º
SendResult [sendStatus=SEND_OK,
msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A,
messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC,
queueId=2], queueOffset=1] |
3.3 Broker´úÀí·þÎñÆ÷µÄÏûÏ¢´¦Àí¼òÎö
Broker´úÀí·þÎñÆ÷ÖдæÔںܶàProcessorÒµÎñ´¦ÀíÆ÷£¬ÓÃÓÚ´¦Àí²»Í¬ÀàÐ͵ÄÇëÇ󣬯äÖÐÒ»¸ö»òÕß¶à¸öProcessor»á¹²ÓÃÒ»¸öÒµÎñ´¦ÀíÆ÷Ï̳߳ء£¶ÔÓÚ½ÓÊÕµ½ÏûÏ¢£¬Broker»áʹÓÃSendMessageProcessorÕâ¸öÒµÎñ´¦ÀíÆ÷À´´¦Àí¡£SendMessageProcessor»áÒÀ´Î×öÒÔÏ´¦Àí£º
£¨1£©ÏûϢǰÖÃУÑ飬°üÀ¨brokerÊÇ·ñ¿Éд¡¢Ð£ÑéqueueIdÊÇ·ñ³¬¹ýÖ¸¶¨´óС¡¢ÏûÏ¢ÖеÄTopic·ÓÉÐÅÏ¢ÊÇ·ñ´æÔÚ£¬Èç¹û²»´æÔÚ¾Íн¨Ò»¸ö¡£ÕâÀïÓëÉÏÎÄÖС°³¢ÊÔ»ñÈ¡TopicPublishInfoµÄ·ÓÉÐÅÏ¢¡±Ò»½ÚÖнéÉܵÄÄÚÈݶÔÓ¦¡£Èç¹ûTopic·ÓÉÐÅÏ¢²»´æÔÚ£¬ÔòBroker¶ËÈÕÖ¾Êä³öÈçÏ£º
2018-06-14 17:17:24
INFO SendMessageThread_1 - receive SendMessage
request command, RemotingCommand [code=310, language=JAVA,
version=252, opaque=6, flag(B)=0, remark=null,
extFields={a=ProducerGroupName, b=TopicTest, c=TBW102,
d=4, e=2, f=0, g=1528967815569, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD 46930000WAITtrueTAGSTagA,
j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-06-14 17:17:24 WARN SendMessageThread_1 -
the topic TopicTest not exist, producer: /172.20.21.162:62661
2018-06-14 17:17:24 INFO SendMessageThread_1 -
Create new topic by default topic:[TBW102] config: [TopicConfig
[topicName=TopicTest, readQueueNums=4, writeQueueNums=4,
perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0,
order=false]] producer:[172.20.21.162:62661] |
Topic·ÓÉÐÅϢн¨ºó£¬µÚ¶þ´ÎÏûÏ¢·¢Ëͺó£¬Broker¶ËÈÕÖ¾Êä³öÈçÏ£º
2018-08-02 16:26:13
INFO SendMessageThread_1 - receive SendMessage
request command, RemotingCommand [code=310, language=JAVA,
version=253, opaque=6, flag(B)=0, remark=null,
extFields={a=ProducerGroupName, b=TopicTest, c=TBW102,
d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC20 8AD46930000WAITtrueTAGSTagA,
j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-08-02 16:26:13 INFO SendMessageThread_1 -
the msgInner's content is:MessageExt [queueId=2,
storeSize=0, queueOffset=0, sysFlag=0, bornTimestamp=1533198373524,
bornHost=/172.20.21.162:53914, storeTimestamp=0,
storeHost=/172.20.21.162:10911, msgId=null, commitLogOffset=0,
bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0,
toString()=Message [topic=TopicTest, flag=0, properties={KEYS=OrderID188,
UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true,
TAGS=TagA}, body=11body's content is:Hello world]] |
£¨2£©¹¹½¨MessageExtBrokerInner£»
£¨3£©µ÷Óá°brokerController.getMessageStore().putMessage¡±
½«MessageExtBrokerInner×öÂäÅ̳־û¯´¦Àí£»
£¨4£©¸ù¾ÝÏûÏ¢ÂäÅ̽á¹û£¨Õý³£/Òì³£Çé¿ö£©£¬BrokerStatsManager×öһЩͳ¼ÆÊý¾ÝµÄ¸üУ¬×îºóÉèÖÃResponse²¢·µ»Ø£»
ËÄ¡¢×ܽá
ʹÓÃRocketMQµÄ¿Í»§¶Ë·¢ËÍÆÕͨÏûÏ¢µÄÁ÷³Ì´ó¸Åµ½ÕâÀï¾Í·ÖÎöÍê³É¡£¹ØÓÚ˳ÐòÏûÏ¢¡¢·Ö²¼Ê½ÊÂÎñÏûÏ¢µÈÄÚÈݽ«ÔÚºóÐøÆª·ùÖÐÂ½Ðø½éÉÜ£¬¾´ÇëÆÚ´ý¡£ÏÞÓÚ±ÊÕߵIJÅÊèѧdz£¬¶Ô±¾ÎÄÄÚÈÝ¿ÉÄÜ»¹ÓÐÀí½â²»µ½Î»µÄµØ·½£¬ÈçÓвûÊö²»ºÏÀíÖ®´¦»¹ÍûÁôÑÔÒ»Æð̽ÌÖ¡£ |