±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄÏêϸ½éÉÜÁËAggregatorģʽµÄÔÀí£¬ÒÔ¼°Í¨¹ýÒ»¸öʾÀý£¬ÍêÕûµÄʵÏÖÁËÀûÓÃAggregator¾ÛºÏ¶àÌõÏûÏ¢£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£ |
|
1¡¢Ç°ÑÔ£º
ÓÃÁËAggregatorÕâ¸öģʽ¼¸¸öÔÂÁË£¬×Ô¼º¶ÔÓÚÆäÖеÄÔÀíÒ»Ïò²»ÔõôÉÏÐÄ£¬Ö±µ½Ç°Ð©ÌìÏîÄ¿ÖеÄÒ»¸öÍ»·¢ÎÊÌâÒý·¢ÁËÎÒµÄÐËȤ£¬ÓÚÊÇ¿ªÊ¼Ì½¾¿AggregatorģʽµÄÔÀí£¬ÏÂÃæÀ´ºÍ´ó¼Ò·ÖÏíһϡ£
2¡¢¸ÅÊö
Apache Camel×ܹ²ÌṩÁË60ÖÖ×óÓ񵀮óÒµ¼¶¼¯³Éģʽ£¬ÓÐ5ÖÖģʽ±È½Ï³£¼û£¬·Ö±ðÊÇAggregator¡¢Splitter¡¢Routing
Slip¡¢Dynamic Router¡¢Load Balancer£¬Õ⼸ÖÖģʽҲÊÇJCFÌṩ¸øÎÒÃÇʹÓõ쬯äÖÐAggregatorÊÇ×ÔÓµÄÒ»ÖÖ¡£ÏÂÃæ£¬ÎÒÃÇÏÈÁ˽âÒ»ÏÂÕâ¸öģʽµÄ×÷ÓãºÖ÷ÒªÓÃÓÚ½«·Ö¸îºóµÄÈô¸ÉÌõÊäÈëÏûÏ¢ÖØÐÂ×é×°³ÉÒ»ÌõÍêÕûµÄÊä³öÏûÏ¢£¬AggregatorµÄ×÷ÓúÍSplitterµÄ×÷ÓøպöÔÁ¢¡£
3¡¢Ïê½â
ÏÂÃæÏÈͨ¹ýÒ»ÕÅͼ¶ÔÕûÌåÓиö´ó¸ÅµÄÁ˽⣬ÈçÏ£º

ÕâÕÅͼÖÐÉæ¼°µ½ÁË3ÖÖģʽ£¬ÕâҲ˵Ã÷ģʽ֮¼äÊÇ¿ÉÒÔ×éºÏʹÓõ쬴ӶøÐγÉÒ»ÖÖеÄģʽ£¬ÆäÖÐAggregatorģʽ½«½ÓÊÕµ½µÄÏûÏ¢Á÷ºÍÏà¹ØÁªµÄ±êʶÏûÏ¢¾ÛºÏ³ÉÒ»ÌõÊä³öÏûÏ¢¡£ÏÂÃæÀ´¿´Ò»¸ö¼òµ¥µÄʹÓÃʾÀý£¬ÈçÏ£º

ÔÚÉÏͼÖУ¬3ÌõÏûÏ¢±»ÒÀ´ÎËÍÈëAggregator£¬×îºó±»¾ÛºÏ³ÉÁËÒ»¸öÏûÏ¢£¬ÒªÊµÏÖÉÏͼµÄ¹¦ÄÜ£¬ÎÒÃÇÐèÒª¹Ø×¢3¸öÅäÖ㬲¢ÇÒÕâ3¸öÅäÖÃȱһ²»¿É£¬Èç¹ûûÓÐÅäÖÃÆäÖÐÈÎÒâÒ»¸ö£¬ÔÚCamelÆô¶¯µÄʱºò¾Í»á±¨ÅäÖÃȱʧµÄ´íÎó£¬Õâ3¸öÅäÖÃÈçÏ£º
Correlation identifier--Ò»Ìõ±í´ïʽ£¬¾ö¶¨ÄĸöÊäÈëÏûÏ¢ÊÇÊôÓÚÒ»¸ö×éµÄ¡£
Completion condition--Ò»¸ö¶ÏÑÔ»òÕßÊÇ»ùÓÚʱ¼äµÄÌõ¼þ£¬¾ö¶¨ÁËʲôʱºòÊä³ö¾ÛºÏµÄ½á¹ûÏûÏ¢¡£
Aggregation strategy--Ò»Ö־ۺϲßÂÔ£¬Ö¸¶¨ÁËͨ¹ýºÎÖÖ·½Ê½À´¾ÛºÏ³ÉÒ»ÌõÏûÏ¢¡£
ÏÂÃæÎÒÃÇÀ´¿´Ò»¸ö¼òµ¥µÄÀý×Ó£¬À´ÊµÏÖ×ÖĸµÄ¾ÛºÏ£¬ÀýÈçÊäÈëÏûÏ¢ÒÀ´ÎΪ£º¡®A¡¯¡¢¡®B¡¯¡¢¡®C¡¯£¬Êä³öÏûϢΪ¡®ABC¡¯£¬Í¨¹ýÕâ¸ö¼òµ¥µÄÀý×ÓÀ´ÈúóÃæµÄÁ÷³ÌºÃ¼ÌÐøÏÂÈ¥¡£ÈçÏÂͼËùʾ£º

µ±µÚÒ»ÌõÏûÏ¢´ø×ŹØÁª±êÖ¾1½øÀ´µÄʱºò£¬¾Í³õʼ»¯ÁËÒ»¸ö¾ÛºÏÆ÷À´´æ´¢ÕâÌõÏûÏ¢£¬ÔÚÕâ¸öÀý×ÓÖУ¬Íê³ÉÌõ¼þÊǵ±ÓÐ3ÌõÏûÏ¢±»¾ÛºÏµÄʱºòÍê³É¡£ËùÒÔµ±µÚÒ»ÌõÏûÏ¢½øÀ´µÄʱºò£¬Õû¸ö¾ÛºÏ»¹Ã»ÓÐÍê³É£¬µ±µÚ¶þÌõ´ø×ŹØÁª±êÖ¾1½øÀ´µÄʱºò£¬·¢ÏָñêÖ¾ÏÂÒѾ´æÔÚÒ»¸öÏÖ³ÉµÄ¾ÛºÏÆ÷£¬¾Í²»»áÔÙÉú³ÉÐÂµÄ¾ÛºÏÆ÷£¬µ±µÚÈýÌõÏûÏ¢´ø×ŹØÁª±êÖ¾2½øÀ´µÄʱºò£¬·¢Ïָñê־ûÓжÔÓ¦µÄ¾ÛºÏÆ÷£¬¾Í»áÐÂÉú³ÉÒ»¸ö¹ØÁªµÄ¾ÛºÏÆ÷£¬½«¸ÃÏûÏ¢´æµ½Õâ¸ö¾ÛºÏÆ÷ÖУ¬µ±µÚËÄÌõÏûÏ¢´ø×ŹØÁª±êÖ¾1½øÀ´µÄʱºò£¬ÒѾÂú×ãÁ˾ۺϵÄÍê³ÉÌõ¼þ£¬´Ëʱ»áͨ¹ý¾ÛºÏ²ßÂÔ½øÐоۺϣ¬¾ÛºÏÍê³ÉÖ®ºó£¬¾Í»áÊä³ö½á¹ûÏûÏ¢¡£ÏÂÃæÍ¨¹ýCamel
routeµÄJava DSLÀ´ËµÃ÷Ò»ÏÂ(×¢ÒâÎÄÖмӴÖÌåµÄµØ·½)£º

ÆäÖУ¬¹ØÁªÐÔ±êÖ¾¾ÍÊÇheader(¡°myId¡±)£¬ÊÇÒ»¸öCamelµÄ±í´ïʽ£¬»á´æ·ÅÔÚheaderÖб»´ø»Ø£¬µÚ¶þ¸öÅäÖÃÔªËØÊÇAggregationStrategyÊÇÒ»¸öʵÏÖÁËAggregationStrategy½Ó¿ÚµÄʵÏÖÀ࣬ºóÃæÎÒÃÇ»áÉîÈëµÄѧϰÕâ¸öÀ࣬×îºóҪ˵µÄ¾ÍÊÇÕâ¸öÍê³ÉÌõ¼þ£¬´Ë´¦µÄÍê³ÉÌõ¼þÊÇ»ùÓÚ´óСµÄ£¬Ö»Òª½ÓÊÕµ½3ÌõÊäÈëÏûÏ¢£¬¾ÍÈÎÎñ·ûºÏÌõ¼þ£¬¹ØÓÚÍê³ÉÌõ¼þ£¬ºóÃæ»á×öÉîÈëµÄ½²½â¡£ÉÏÃæÊ¾ÀýÖеÄjava´úÂëʵÏÖÈçÏ£º

ÔËÐнá¹ûÈçÏ£º

ͨ¹ýÉÏÃæ¿ØÖÆÌ¨µÄÊä³ö¿ÉÒÔ¿´µ½£¬ËäÈ»Öмä¹ý³ÌÖÐÊäÈëÁËÒ»¸öF£¬µ«ÊÇÕâ¸öÏûÏ¢µÄ¹ØÁªÐÔ±ê־Ϊ2£¬²¢²»ÊÇ1£¬µ±3¸ö¹ØÁªÐÔ±êÖ¾¶¼Îª1µÄÊäÈëÏûÏ¢µ½´ïºó£¬Camel¾ÍÈÏΪÂú×ãÍê³ÉÌõ¼þ£¬¿ÉÒÔ½øÐоۺÏÁË¡£Ò²ÐíÓÐÈË¿´µ½Õâ»áÎÊ£¬ÄÇôÕâ¸öÊäÈëµÄFÏûÏ¢ºóÃæ»á·¢ÉúʲôÊÂÁË£¿ÓÉÓÚÕâ¸öÏûÏ¢µÄÍê³ÉÌõ¼þ²¢²»Âú×㣬ËùÒÔ»áÔÚ¾ÛºÏÆ÷ÖÐÒ»Ö±µÈ´ý£¬Ö±µ½Âú×ãÍê³ÉÌõ¼þ£¬¹ØÓÚÕâµã£¬ºóÃæ»á½øÐÐÏê½â£¬ÏÂÃæ¼ÌÐøÀ´¿´Ï¾ۺϵÄÔÀí¡£µ±Âú×ãÍê³ÉÌõ¼þÖ®ºó£¬¾ÛºÏÆ÷¾Í»á¸ù¾Ý¾ÛºÏ²ßÂÔÀ´½øÐоۺϣ¬ÆäÖоۺϲßÂÔÊÇÒ»¸ö½Ó¿Ú£¬ÔÚorg.apache.camel.processor.aggregation°üÏ£¬¸Ã½Ó¿ÚµÄ¶¨ÒåÈçÏ£º
public
interface AggregationStrategy {
Exchange aggregate(Exchange oldExchange, Exchange
newExchange);
} |
¸Ã½Ó¿ÚÖ»ÓÐÒ»¸ö·½·¨£¬µ±ÔËÐеÄʱºò£¬Ò»ÓÐеÄÊäÈëÏûÏ¢´ïµ½£¬aggregateÕâ¸ö·½·¨¾Í»áÖ´ÐÐÒ»´Î£¬ÔÚÉÏÃæµÄÕâ¸öÀý×ÓÖУ¬Õâ¸ö·½·¨Ò»¹²Ö´ÐÐÁË4´Î£¬ÏÂÃæÁÐÒ»ÏÂÿִÐÐÒ»´Î£¬¶¼·¢ÉúÁËʲô£º

ÔÚËæºóµÄ¾ÛºÏÖУ¬Ö»Òª²»Îªnull£¬ÏûÏ¢¾Í»á±»¸üе½ExchangeÖУ¬ÔÚ´Ë×öÏÂ˵Ã÷£ºAggregatorʹÓÃÁËsynchronizationÀ´È·±£AggregationStrategyÊÇḬ̈߳²È«µÄ£¬Í¬Ò»Ê±¿Ì£¬Ö»ÔÊÐíÒ»¸öÏß³ÌÀ´Ö´ÐÐaggregateÕâ¸ö·½·¨£¬AggregatorͬʱҲÊÇÓÐÐòµÄ£¬ÏûÏ¢½øÈëµÄʱºòÊÇʲô˳Ðò£¬¾ÛºÏµÄʱºò¾ÍÊÇʲô˳Ðò¡£
ÏÂÃæ£¬ÎÒÃÇÀ´ËµÒ»ÏÂÍê³ÉÌõ¼þ(Completion conditions)£¬Íê³ÉÌõ¼þÔÚ¾ÛºÏÖаçÑݵĽÇɫԶԶ³¬³öÁËÎÒÃǵÄÏëÏ󣬼ÙÉèµ±Ò»¸öÍê³ÉÌõ¼þ´Ó²»³öÏÖ£¬¾Í»áµ¼Ö¾ۺϵÄÏûÏ¢ÓÀÔ¶¶¼²»»á±»·¢²¼³öÈ¥£¬ÎªÁ˲¹¾ÈÕâÖÖÔã¸âµÄÇé¿ö£¬ÎÒÃÇÐèÒªÌí¼ÓÒ»¸ö³¬Ê±Ìõ¼þ£¬ÕâÑù¾Í¿ÉÒÔ±ÜÃâÒòΪijЩÏûÏ¢ÊÕ²»µ½¶øµ¼ÖµÄËÀµÈ£¬Îª´Ë£¬CamelÌṩÁË5¸ö¹©Ñ¡ÔñµÄÍê³ÉÌõ¼þ£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ý¾ßÌåµÄÐèÇóÀ´Ê¹Óá£ÈçÏÂËùʾ£º

ÔÚÉÏÃæµÄ5¸öÍê³ÉÌõ¼þÖУ¬×î¶à¿ÉÒÔͬʱʹÓÃ4¸ö£¬ÆäÖÐcompletionTimeoutºÍcompletionInterval²»Äܹ»Í¬Ê±Ê¹Óá£ÔÚExchangeÖУ¬ÎÒÃÇ¿ÉÒÔÔÚpropertyÖлñÈ¡ºÍÉèÖÃÏàÓ¦µÄÊôÐÔ£¬ÊôÐÔÁбíÈçÏ£º

4¡¢³Ö¾Ã»¯
AggregatorÊÇÓÐ״̬µÄ£¬ÒòΪÐèÒª´æ´¢½ø³ÌÖеľۺÏÎֱµ½Âú×ãÍê³ÉÌõ¼þ²¢½«¾ÛºÏµÄÏûÏ¢·¢²¼³öÈ¥£¬Ä¬ÈÏÇé¿öÏ£¬AggregatorÖ»»á±£´æ×´Ì¬µ½ÄÚ´æÖУ¬µ±Ó¦ÓùرջòÕßÖ÷»úå´»úµÄʱºò£¬ÕâЩ״̬»á¶ªÊ§£¬ÎªÁ˲¹¾ÈÕâ¸öÎÊÌ⣬ÎÒÃÇÐèÒª´æ´¢×´Ì¬µ½³Ö¾Ã»¯²Ö¿âÖУ¬CamelÌṩÁËÒ»¸ö¿É²å°ÎµÄÌØÕ÷¹©ÎÒÃÇÑ¡ÔñÐÔµÄʹÓã¬ÏÂÃæ½éÉÜÏÂÕâ2Öз½Ê½¡£
AggregationRepository--ÕâÊÇÒ»¸ö½Ó¿Ú£¬¸Ã½Ó¿Ú¶¨ÒåÁ˳£¹æµÄ·½·¨À´²Ù×÷²Ö¿â£¬ÀýÈçÌí¼ÓÊý¾Ýµ½²Ö¿â£¬´Ó²Ö¿âɾ³ýÊý¾ÝµÈ£¬Ä¬ÈÏÇé¿öÏ£¬CamelʹÓÃMemoryAggregationRepository£¬ÕâÊÇÒ»¸öÄÚ´æ²Ö¿â¡£½Ó¿Ú¶¨ÒåÈçÏ£º

RecoverableAggregationRepository--Ò²ÊÇÒ»¸ö½Ó¿Ú£¬¸Ã½Ó¿Ú¼Ì³ÐÁËAggregationRepository½Ó¿Ú£¬¶¨ÒåÁ˶îÍâµÄ²Ù×÷À´Ö§³Ö»Ö¸´¶ªÊ§µÄ״̬¡£½Ó¿Ú¶¨ÒåÈçÏ£º

CamelÌṩÁËÒ»¸öcamel-hawtdb×é¼þ£¬HawtDBÊÇÒ»¸öÇáÁ¿¼¶ºÍ¿ÉǶÈëµÄ»ùÓÚkey-valueµÄÎļþÊý¾Ý¿â£¬ËûÔÊÐíCamelΪ¸÷ÖÖģʽÌṩ³Ö¾Ã»¯£¬ÀýÈçAggregator£¬ÔÚδÀ´£¬»áÓиü¶àµÄCamelģʽÀûÓÃHawtDB.ÏÂÃæÎÒÃÇÀ´¿´¿´HawtDBÔõôʹÓá£
Ê×ÏÈ£¬ÎÒÃÇÐèÒªÉèÖÃHawtDB£¬ÉèÖ÷½Ê½ÈçÏ£º
AggregationRepository
myRepo = new HawtDBAggregationRepository(¡°myrepo¡±,
¡°data/myrepo.dat¡±);
|
ÈçÉÏËùʾ£¬ÎÒÃÇ´´½¨ÁËÒ»¸öHawtDBAggregationRepositoryµÄʾÀý£¬Í¬Ê±ÌṩÁËÁ½¸ö²ÎÊý£¬²Ö¿âÃû³Æ±ØÐëÖ¸¶¨£¬²¢ÇÒ¶à¸ö²Ö¿âÊÇ¿ÉÒÔͬÃûµÄ¡£
5¡¢Ê¾Àý
ÏÂÃæ£¬ÎÒÃÇÀ´¿´Ò»¸öÍêÕûµÄʾÀý¡£
¡ò»Á÷³Ìͼ£¬ÈçÏ£º

¡ò²ð·ÖÏûÏ¢£¬½«ÊäÈëµÄhello£¬world²ð·Ö³ÉhelloºÍworldÁ½¸ö×Ö·û´®£¬Ê¾Àý´úÂëÈçÏ£º
public
class AggreSplitter {
public Collection<Message> splitter(Exchange
exchange) throws UnsupportedEncodingException
{
MessageContext msg = ((MessageContext) exchange.getIn().getBody());
String inputData = JCFUtils.buffer2String(msg.getBody());
List<Message> list = new ArrayList<Message>();
// ·Ö¸î
String[] strArray = inputData.split(",");
for (String str : strArray) {
Message message = new DefaultMessage();
message.setBody(str);
list.add(message);
}
return list;
}
} |
¡ò½ÓÊÕ²ð·ÖµÄÏûÏ¢£¬Ê¾Àý´úÂëÈçÏ£º
public
class GetMessage implements Processor {
@Override
public void process(Exchange exchange) throws
Exception {
String message = exchange.getIn().getBody(String.class);
exchange.getIn().setBody(message);
}
} |
¡ò¾ÛºÏ½ÓÊÕµ½µÄÏûÏ¢£¬Ê¾Àý´úÂëÈçÏ£º
public
class AggreData implements AggregationStrategy
{
@SuppressWarnings("unchecked")
@Override
public Exchange aggregate(Exchange oldEx, Exchange
newEx) {
String message = newEx.getIn().getBody(String.class);
ArrayList<String> list = null;
if(oldEx == null){
list = new ArrayList<String>();
list.add(message);
newEx.getIn().setBody(list);
return newEx;
}else{
list = oldEx.getIn().getBody(ArrayList.class);
list.add(message);
return oldEx;
}
}
} |
¡ò·¢²¼ÏûÏ¢£¬Ê¾Àý´úÂëÈçÏ£º
public
class ReturnOutMessage implements Processor
{
@SuppressWarnings("unchecked")
@Override
public void process(Exchange exchange) throws
Exception {
List<String> list = exchange.getIn().getBody(ArrayList.class);
System.out.println("½ÓÊÕµ½µÄ±¨ÎÄΪ:"+list);
for(String str:list){
System.out.println("½ÓÊÕµ½µÄ±¨ÎÄΪ:"+str);
}
exchange.getIn().setBody(list);
}
} |
²âÊÔ½á¹ûÈçÏ£º
½ÓÊÕµ½µÄ±¨ÎÄΪ:[hello, world]
½ÓÊÕµ½µÄ±¨ÎÄΪ:hello
½ÓÊÕµ½µÄ±¨ÎÄΪ:world
ÉÏÃæµÄʾÀý£¬ÍêÕûµÄʵÏÖÁËÀûÓÃAggregator¾ÛºÏ¶àÌõÏûÏ¢£¬¹ØÓÚCamelµÄAggregatorģʽ£¬¾ÍÏȽ²µ½ÕâÀï¡£
|