Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
CamelÆóÒµ¼¶¼¯³Éģʽ--Aggregator
 
  3425  次浏览      30
 2019-3-6
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄÏêϸ½éÉÜÁËAggregatorģʽµÄÔ­Àí£¬ÒÔ¼°Í¨¹ýÒ»¸öʾÀý£¬ÍêÕûµÄʵÏÖÁËÀûÓÃAggregator¾ÛºÏ¶àÌõÏûÏ¢£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£

1¡¢Ç°ÑÔ£º

ÓÃÁËAggregatorÕâ¸öģʽ¼¸¸öÔÂÁË£¬×Ô¼º¶ÔÓÚÆäÖеÄÔ­ÀíÒ»Ïò²»ÔõôÉÏÐÄ£¬Ö±µ½Ç°Ð©ÌìÏîÄ¿ÖеÄÒ»¸öÍ»·¢ÎÊÌâÒý·¢ÁËÎÒµÄÐËȤ£¬ÓÚÊÇ¿ªÊ¼Ì½¾¿AggregatorģʽµÄÔ­Àí£¬ÏÂÃæÀ´ºÍ´ó¼Ò·ÖÏíһϡ£

2¡¢¸ÅÊö

Apache Camel×ܹ²ÌṩÁË60ÖÖ×óÓ񵀮óÒµ¼¶¼¯³Éģʽ£¬ÓÐ5ÖÖģʽ±È½Ï³£¼û£¬·Ö±ðÊÇAggregator¡¢Splitter¡¢Routing Slip¡¢Dynamic Router¡¢Load Balancer£¬Õ⼸ÖÖģʽҲÊÇJCFÌṩ¸øÎÒÃÇʹÓõ쬯äÖÐAggregatorÊÇ×ÔÓµÄÒ»ÖÖ¡£ÏÂÃæ£¬ÎÒÃÇÏÈÁ˽âÒ»ÏÂÕâ¸öģʽµÄ×÷ÓãºÖ÷ÒªÓÃÓÚ½«·Ö¸îºóµÄÈô¸ÉÌõÊäÈëÏûÏ¢ÖØÐÂ×é×°³ÉÒ»ÌõÍêÕûµÄÊä³öÏûÏ¢£¬AggregatorµÄ×÷ÓúÍSplitterµÄ×÷ÓøպöÔÁ¢¡£

3¡¢Ïê½â

ÏÂÃæÏÈͨ¹ýÒ»ÕÅͼ¶ÔÕûÌåÓиö´ó¸ÅµÄÁ˽⣬ÈçÏ£º

ÕâÕÅͼÖÐÉæ¼°µ½ÁË3ÖÖģʽ£¬ÕâҲ˵Ã÷ģʽ֮¼äÊÇ¿ÉÒÔ×éºÏʹÓõ쬴ӶøÐγÉÒ»ÖÖеÄģʽ£¬ÆäÖÐAggregatorģʽ½«½ÓÊÕµ½µÄÏûÏ¢Á÷ºÍÏà¹ØÁªµÄ±êʶÏûÏ¢¾ÛºÏ³ÉÒ»ÌõÊä³öÏûÏ¢¡£ÏÂÃæÀ´¿´Ò»¸ö¼òµ¥µÄʹÓÃʾÀý£¬ÈçÏ£º

ÔÚÉÏͼÖУ¬3ÌõÏûÏ¢±»ÒÀ´ÎËÍÈëAggregator£¬×îºó±»¾ÛºÏ³ÉÁËÒ»¸öÏûÏ¢£¬ÒªÊµÏÖÉÏͼµÄ¹¦ÄÜ£¬ÎÒÃÇÐèÒª¹Ø×¢3¸öÅäÖ㬲¢ÇÒÕâ3¸öÅäÖÃȱһ²»¿É£¬Èç¹ûûÓÐÅäÖÃÆäÖÐÈÎÒâÒ»¸ö£¬ÔÚCamelÆô¶¯µÄʱºò¾Í»á±¨ÅäÖÃȱʧµÄ´íÎó£¬Õâ3¸öÅäÖÃÈçÏ£º

Correlation identifier--Ò»Ìõ±í´ïʽ£¬¾ö¶¨ÄĸöÊäÈëÏûÏ¢ÊÇÊôÓÚÒ»¸ö×éµÄ¡£

Completion condition--Ò»¸ö¶ÏÑÔ»òÕßÊÇ»ùÓÚʱ¼äµÄÌõ¼þ£¬¾ö¶¨ÁËʲôʱºòÊä³ö¾ÛºÏµÄ½á¹ûÏûÏ¢¡£

Aggregation strategy--Ò»Ö־ۺϲßÂÔ£¬Ö¸¶¨ÁËͨ¹ýºÎÖÖ·½Ê½À´¾ÛºÏ³ÉÒ»ÌõÏûÏ¢¡£

ÏÂÃæÎÒÃÇÀ´¿´Ò»¸ö¼òµ¥µÄÀý×Ó£¬À´ÊµÏÖ×ÖĸµÄ¾ÛºÏ£¬ÀýÈçÊäÈëÏûÏ¢ÒÀ´ÎΪ£º¡®A¡¯¡¢¡®B¡¯¡¢¡®C¡¯£¬Êä³öÏûϢΪ¡®ABC¡¯£¬Í¨¹ýÕâ¸ö¼òµ¥µÄÀý×ÓÀ´ÈúóÃæµÄÁ÷³ÌºÃ¼ÌÐøÏÂÈ¥¡£ÈçÏÂͼËùʾ£º

µ±µÚÒ»ÌõÏûÏ¢´ø×ŹØÁª±êÖ¾1½øÀ´µÄʱºò£¬¾Í³õʼ»¯ÁËÒ»¸ö¾ÛºÏÆ÷À´´æ´¢ÕâÌõÏûÏ¢£¬ÔÚÕâ¸öÀý×ÓÖУ¬Íê³ÉÌõ¼þÊǵ±ÓÐ3ÌõÏûÏ¢±»¾ÛºÏµÄʱºòÍê³É¡£ËùÒÔµ±µÚÒ»ÌõÏûÏ¢½øÀ´µÄʱºò£¬Õû¸ö¾ÛºÏ»¹Ã»ÓÐÍê³É£¬µ±µÚ¶þÌõ´ø×ŹØÁª±êÖ¾1½øÀ´µÄʱºò£¬·¢ÏָñêÖ¾ÏÂÒѾ­´æÔÚÒ»¸öÏÖ³ÉµÄ¾ÛºÏÆ÷£¬¾Í²»»áÔÙÉú³ÉÐÂµÄ¾ÛºÏÆ÷£¬µ±µÚÈýÌõÏûÏ¢´ø×ŹØÁª±êÖ¾2½øÀ´µÄʱºò£¬·¢Ïָñê־ûÓжÔÓ¦µÄ¾ÛºÏÆ÷£¬¾Í»áÐÂÉú³ÉÒ»¸ö¹ØÁªµÄ¾ÛºÏÆ÷£¬½«¸ÃÏûÏ¢´æµ½Õâ¸ö¾ÛºÏÆ÷ÖУ¬µ±µÚËÄÌõÏûÏ¢´ø×ŹØÁª±êÖ¾1½øÀ´µÄʱºò£¬ÒѾ­Âú×ãÁ˾ۺϵÄÍê³ÉÌõ¼þ£¬´Ëʱ»áͨ¹ý¾ÛºÏ²ßÂÔ½øÐоۺϣ¬¾ÛºÏÍê³ÉÖ®ºó£¬¾Í»áÊä³ö½á¹ûÏûÏ¢¡£ÏÂÃæÍ¨¹ýCamel routeµÄJava DSLÀ´ËµÃ÷Ò»ÏÂ(×¢ÒâÎÄÖмӴÖÌåµÄµØ·½)£º

ÆäÖУ¬¹ØÁªÐÔ±êÖ¾¾ÍÊÇheader(¡°myId¡±)£¬ÊÇÒ»¸öCamelµÄ±í´ïʽ£¬»á´æ·ÅÔÚheaderÖб»´ø»Ø£¬µÚ¶þ¸öÅäÖÃÔªËØÊÇAggregationStrategyÊÇÒ»¸öʵÏÖÁËAggregationStrategy½Ó¿ÚµÄʵÏÖÀ࣬ºóÃæÎÒÃÇ»áÉîÈëµÄѧϰÕâ¸öÀ࣬×îºóҪ˵µÄ¾ÍÊÇÕâ¸öÍê³ÉÌõ¼þ£¬´Ë´¦µÄÍê³ÉÌõ¼þÊÇ»ùÓÚ´óСµÄ£¬Ö»Òª½ÓÊÕµ½3ÌõÊäÈëÏûÏ¢£¬¾ÍÈÎÎñ·ûºÏÌõ¼þ£¬¹ØÓÚÍê³ÉÌõ¼þ£¬ºóÃæ»á×öÉîÈëµÄ½²½â¡£ÉÏÃæÊ¾ÀýÖеÄjava´úÂëʵÏÖÈçÏ£º

ÔËÐнá¹ûÈçÏ£º

ͨ¹ýÉÏÃæ¿ØÖÆÌ¨µÄÊä³ö¿ÉÒÔ¿´µ½£¬ËäÈ»Öмä¹ý³ÌÖÐÊäÈëÁËÒ»¸öF£¬µ«ÊÇÕâ¸öÏûÏ¢µÄ¹ØÁªÐÔ±ê־Ϊ2£¬²¢²»ÊÇ1£¬µ±3¸ö¹ØÁªÐÔ±êÖ¾¶¼Îª1µÄÊäÈëÏûÏ¢µ½´ïºó£¬Camel¾ÍÈÏΪÂú×ãÍê³ÉÌõ¼þ£¬¿ÉÒÔ½øÐоۺÏÁË¡£Ò²ÐíÓÐÈË¿´µ½Õâ»áÎÊ£¬ÄÇôÕâ¸öÊäÈëµÄFÏûÏ¢ºóÃæ»á·¢ÉúʲôÊÂÁË£¿ÓÉÓÚÕâ¸öÏûÏ¢µÄÍê³ÉÌõ¼þ²¢²»Âú×㣬ËùÒÔ»áÔÚ¾ÛºÏÆ÷ÖÐÒ»Ö±µÈ´ý£¬Ö±µ½Âú×ãÍê³ÉÌõ¼þ£¬¹ØÓÚÕâµã£¬ºóÃæ»á½øÐÐÏê½â£¬ÏÂÃæ¼ÌÐøÀ´¿´Ï¾ۺϵÄÔ­Àí¡£µ±Âú×ãÍê³ÉÌõ¼þÖ®ºó£¬¾ÛºÏÆ÷¾Í»á¸ù¾Ý¾ÛºÏ²ßÂÔÀ´½øÐоۺϣ¬ÆäÖоۺϲßÂÔÊÇÒ»¸ö½Ó¿Ú£¬ÔÚorg.apache.camel.processor.aggregation°üÏ£¬¸Ã½Ó¿ÚµÄ¶¨ÒåÈçÏ£º

public interface AggregationStrategy {

Exchange aggregate(Exchange oldExchange, Exchange newExchange);

}

¸Ã½Ó¿ÚÖ»ÓÐÒ»¸ö·½·¨£¬µ±ÔËÐеÄʱºò£¬Ò»ÓÐеÄÊäÈëÏûÏ¢´ïµ½£¬aggregateÕâ¸ö·½·¨¾Í»áÖ´ÐÐÒ»´Î£¬ÔÚÉÏÃæµÄÕâ¸öÀý×ÓÖУ¬Õâ¸ö·½·¨Ò»¹²Ö´ÐÐÁË4´Î£¬ÏÂÃæÁÐÒ»ÏÂÿִÐÐÒ»´Î£¬¶¼·¢ÉúÁËʲô£º

ÔÚËæºóµÄ¾ÛºÏÖУ¬Ö»Òª²»Îªnull£¬ÏûÏ¢¾Í»á±»¸üе½ExchangeÖУ¬ÔÚ´Ë×öÏÂ˵Ã÷£ºAggregatorʹÓÃÁËsynchronizationÀ´È·±£AggregationStrategyÊÇḬ̈߳²È«µÄ£¬Í¬Ò»Ê±¿Ì£¬Ö»ÔÊÐíÒ»¸öÏß³ÌÀ´Ö´ÐÐaggregateÕâ¸ö·½·¨£¬AggregatorͬʱҲÊÇÓÐÐòµÄ£¬ÏûÏ¢½øÈëµÄʱºòÊÇʲô˳Ðò£¬¾ÛºÏµÄʱºò¾ÍÊÇʲô˳Ðò¡£

ÏÂÃæ£¬ÎÒÃÇÀ´ËµÒ»ÏÂÍê³ÉÌõ¼þ(Completion conditions)£¬Íê³ÉÌõ¼þÔÚ¾ÛºÏÖаçÑݵĽÇɫԶԶ³¬³öÁËÎÒÃǵÄÏëÏ󣬼ÙÉèµ±Ò»¸öÍê³ÉÌõ¼þ´Ó²»³öÏÖ£¬¾Í»áµ¼Ö¾ۺϵÄÏûÏ¢ÓÀÔ¶¶¼²»»á±»·¢²¼³öÈ¥£¬ÎªÁ˲¹¾ÈÕâÖÖÔã¸âµÄÇé¿ö£¬ÎÒÃÇÐèÒªÌí¼ÓÒ»¸ö³¬Ê±Ìõ¼þ£¬ÕâÑù¾Í¿ÉÒÔ±ÜÃâÒòΪijЩÏûÏ¢ÊÕ²»µ½¶øµ¼ÖµÄËÀµÈ£¬Îª´Ë£¬CamelÌṩÁË5¸ö¹©Ñ¡ÔñµÄÍê³ÉÌõ¼þ£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ý¾ßÌåµÄÐèÇóÀ´Ê¹Óá£ÈçÏÂËùʾ£º

ÔÚÉÏÃæµÄ5¸öÍê³ÉÌõ¼þÖУ¬×î¶à¿ÉÒÔͬʱʹÓÃ4¸ö£¬ÆäÖÐcompletionTimeoutºÍcompletionInterval²»Äܹ»Í¬Ê±Ê¹Óá£ÔÚExchangeÖУ¬ÎÒÃÇ¿ÉÒÔÔÚpropertyÖлñÈ¡ºÍÉèÖÃÏàÓ¦µÄÊôÐÔ£¬ÊôÐÔÁбíÈçÏ£º

4¡¢³Ö¾Ã»¯

AggregatorÊÇÓÐ״̬µÄ£¬ÒòΪÐèÒª´æ´¢½ø³ÌÖеľۺÏÎֱµ½Âú×ãÍê³ÉÌõ¼þ²¢½«¾ÛºÏµÄÏûÏ¢·¢²¼³öÈ¥£¬Ä¬ÈÏÇé¿öÏ£¬AggregatorÖ»»á±£´æ×´Ì¬µ½ÄÚ´æÖУ¬µ±Ó¦ÓùرջòÕßÖ÷»úå´»úµÄʱºò£¬ÕâЩ״̬»á¶ªÊ§£¬ÎªÁ˲¹¾ÈÕâ¸öÎÊÌ⣬ÎÒÃÇÐèÒª´æ´¢×´Ì¬µ½³Ö¾Ã»¯²Ö¿âÖУ¬CamelÌṩÁËÒ»¸ö¿É²å°ÎµÄÌØÕ÷¹©ÎÒÃÇÑ¡ÔñÐÔµÄʹÓã¬ÏÂÃæ½éÉÜÏÂÕâ2Öз½Ê½¡£

AggregationRepository--ÕâÊÇÒ»¸ö½Ó¿Ú£¬¸Ã½Ó¿Ú¶¨ÒåÁ˳£¹æµÄ·½·¨À´²Ù×÷²Ö¿â£¬ÀýÈçÌí¼ÓÊý¾Ýµ½²Ö¿â£¬´Ó²Ö¿âɾ³ýÊý¾ÝµÈ£¬Ä¬ÈÏÇé¿öÏ£¬CamelʹÓÃMemoryAggregationRepository£¬ÕâÊÇÒ»¸öÄÚ´æ²Ö¿â¡£½Ó¿Ú¶¨ÒåÈçÏ£º


RecoverableAggregationRepository--Ò²ÊÇÒ»¸ö½Ó¿Ú£¬¸Ã½Ó¿Ú¼Ì³ÐÁËAggregationRepository½Ó¿Ú£¬¶¨ÒåÁ˶îÍâµÄ²Ù×÷À´Ö§³Ö»Ö¸´¶ªÊ§µÄ״̬¡£½Ó¿Ú¶¨ÒåÈçÏ£º


CamelÌṩÁËÒ»¸öcamel-hawtdb×é¼þ£¬HawtDBÊÇÒ»¸öÇáÁ¿¼¶ºÍ¿ÉǶÈëµÄ»ùÓÚkey-valueµÄÎļþÊý¾Ý¿â£¬ËûÔÊÐíCamelΪ¸÷ÖÖģʽÌṩ³Ö¾Ã»¯£¬ÀýÈçAggregator£¬ÔÚδÀ´£¬»áÓиü¶àµÄCamelģʽÀûÓÃHawtDB.ÏÂÃæÎÒÃÇÀ´¿´¿´HawtDBÔõôʹÓá£

Ê×ÏÈ£¬ÎÒÃÇÐèÒªÉèÖÃHawtDB£¬ÉèÖ÷½Ê½ÈçÏ£º

AggregationRepository myRepo = new HawtDBAggregationRepository(¡°myrepo¡±, ¡°data/myrepo.dat¡±);

ÈçÉÏËùʾ£¬ÎÒÃÇ´´½¨ÁËÒ»¸öHawtDBAggregationRepositoryµÄʾÀý£¬Í¬Ê±ÌṩÁËÁ½¸ö²ÎÊý£¬²Ö¿âÃû³Æ±ØÐëÖ¸¶¨£¬²¢ÇÒ¶à¸ö²Ö¿âÊÇ¿ÉÒÔͬÃûµÄ¡£

5¡¢Ê¾Àý

ÏÂÃæ£¬ÎÒÃÇÀ´¿´Ò»¸öÍêÕûµÄʾÀý¡£

¡ò»­Á÷³Ìͼ£¬ÈçÏ£º


¡ò²ð·ÖÏûÏ¢£¬½«ÊäÈëµÄhello£¬world²ð·Ö³ÉhelloºÍworldÁ½¸ö×Ö·û´®£¬Ê¾Àý´úÂëÈçÏ£º

public class AggreSplitter {

public Collection<Message> splitter(Exchange exchange) throws UnsupportedEncodingException {

MessageContext msg = ((MessageContext) exchange.getIn().getBody());

String inputData = JCFUtils.buffer2String(msg.getBody());

List<Message> list = new ArrayList<Message>();

// ·Ö¸î

String[] strArray = inputData.split(",");

for (String str : strArray) {

Message message = new DefaultMessage();

message.setBody(str);

list.add(message);

}

return list;

}

}

¡ò½ÓÊÕ²ð·ÖµÄÏûÏ¢£¬Ê¾Àý´úÂëÈçÏ£º

public class GetMessage implements Processor {

@Override

public void process(Exchange exchange) throws Exception {

String message = exchange.getIn().getBody(String.class);

exchange.getIn().setBody(message);

}

}

¡ò¾ÛºÏ½ÓÊÕµ½µÄÏûÏ¢£¬Ê¾Àý´úÂëÈçÏ£º

public class AggreData implements AggregationStrategy {

@SuppressWarnings("unchecked")

@Override

public Exchange aggregate(Exchange oldEx, Exchange newEx) {

String message = newEx.getIn().getBody(String.class);

ArrayList<String> list = null;

if(oldEx == null){

list = new ArrayList<String>();

list.add(message);

newEx.getIn().setBody(list);

return newEx;

}else{

list = oldEx.getIn().getBody(ArrayList.class);

list.add(message);

return oldEx;

}

}

}

¡ò·¢²¼ÏûÏ¢£¬Ê¾Àý´úÂëÈçÏ£º

public class ReturnOutMessage implements Processor {

@SuppressWarnings("unchecked")

@Override

public void process(Exchange exchange) throws Exception {

List<String> list = exchange.getIn().getBody(ArrayList.class);

System.out.println("½ÓÊÕµ½µÄ±¨ÎÄΪ:"+list);

for(String str:list){

System.out.println("½ÓÊÕµ½µÄ±¨ÎÄΪ:"+str);

}

exchange.getIn().setBody(list);

}

}

²âÊÔ½á¹ûÈçÏ£º

½ÓÊÕµ½µÄ±¨ÎÄΪ:[hello, world]

½ÓÊÕµ½µÄ±¨ÎÄΪ:hello

½ÓÊÕµ½µÄ±¨ÎÄΪ:world

ÉÏÃæµÄʾÀý£¬ÍêÕûµÄʵÏÖÁËÀûÓÃAggregator¾ÛºÏ¶àÌõÏûÏ¢£¬¹ØÓÚCamelµÄAggregatorģʽ£¬¾ÍÏȽ²µ½ÕâÀï¡£

 

   
3425 ´Îä¯ÀÀ       30
Ïà¹ØÎÄÕÂ

Java΢·þÎñÐÂÉú´úÖ®Nacos
ÉîÈëÀí½âJavaÖеÄÈÝÆ÷
JavaÈÝÆ÷Ïê½â
Java´úÂëÖÊÁ¿¼ì²é¹¤¾ß¼°Ê¹Óð¸Àý
Ïà¹ØÎĵµ

JavaÐÔÄÜÓÅ»¯
Spring¿ò¼Ü
SSM¿ò¼Ü¼òµ¥¼òÉÜ
´ÓÁ㿪ʼѧjava±à³Ì¾­µä
Ïà¹Ø¿Î³Ì

¸ßÐÔÄÜJava±à³ÌÓëϵͳÐÔÄÜÓÅ»¯
JavaEE¼Ü¹¹¡¢ Éè¼ÆÄ£Ê½¼°ÐÔÄܵ÷ÓÅ
Java±à³Ì»ù´¡µ½Ó¦Óÿª·¢
JAVAÐéÄâ»úÔ­ÀíÆÊÎö