±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬ÎÄÕÂÒ»ÊDz¹ÉϸöÎÄÕ©µôµÄÄÚÈÝ£¬¶þÓн²½âÁËRouting·ÓÉÌõ¼þ£¬·ÖÎĺü¸¸ö·½Ã棬ÏêϸÇë¿´ÏÂÎÄ£º |
|
½ÓÉÏÎÄApache Camel¿ìËÙÈëÃÅ£¨ÉÏ£©
3-1-2¡¢ÌØÊâµÄEndpoint Direct
Endpoint DirectÓÃÓÚÔÚÁ½¸ö±àÅźõÄ·ÓɼäʵÏÖExchangeÏûÏ¢µÄÁ¬½Ó£¬ÉÏÒ»¸ö·ÓÉÖÐÓÉ×îºóÒ»¸öÔªËØ´¦ÀíÍêµÄExchange¶ÔÏ󣬽«±»·¢ËÍÖÁÓÉDirectÁ¬½ÓµÄÏÂÒ»¸ö·ÓÉÆðʼλÖã¨http://camel.apache.org/direct.html£©¡£×¢Ò⣬Á½¸ö±»Á¬½ÓµÄ·ÓÉÒ»¶¨ÒªÊÇ¿ÉÓõ쬲¢ÇÒ´æÔÚÓÚͬһ¸öCamel·þÎñÖС£ÒÔϵÄÀý×Ó˵Ã÷ÁËEndpoint
DirectµÄ¼òµ¥Ê¹Ó÷½Ê½¡£
package com.yinwenjie.test.cameltest.helloworld;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
/**
* ²âÊÔÁ½¸ö·ÓɵÄÁ¬½Ó
* @author yinwenjie
*/
public class DirectCamel {
public static void main(String[] args) throws
Exception {
// ÕâÊÇcamelÉÏÏÂÎĶÔÏó£¬Õû¸ö·ÓɵÄÇý¶¯È«¿¿ËüÁË¡£
ModelCamelContext camelContext = new DefaultCamelContext();
// Æô¶¯route
camelContext.start();
// Ê×ÏȽ«Á½¸öÍêÕûÓÐЧµÄ·ÓÉ×¢²áµ½Camel·þÎñÖÐ
camelContext.addRoutes((new DirectCamel()).new
DirectRouteA());
camelContext.addRoutes((new DirectCamel()).new
DirectRouteB());
// ͨÓÃûÓоßÌåÒµÎñÒâÒåµÄ´úÂ룬ֻÊÇΪÁ˱£Ö¤Ö÷Ï̲߳»Í˳ö
synchronized (DirectCamel.class) {
DirectCamel.class.wait();
}
}
/**
* DirectRouteA ÆäÖÐʹÓÃdirect Á¬½Óµ½ DirectRouteB
* @author yinwenjie
*/
public class DirectRouteA extends RouteBuilder
{
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
from("jetty:http://0.0.0.0:8282/directCamel")
// Á¬½Ó·ÓÉ£ºDirectRouteB
.to("direct:directRouteB")
.to("log:DirectRouteA?showExchangeId=true");
}
}
/**
* @author yinwenjie
*/
public class DirectRouteB extends RouteBuilder
{
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
from("direct:directRouteB")
.to("log:DirectRouteB?showExchangeId=true");
}
}
} |
ÒÔÉÏ´úÂëÆ¬¶ÎÖУ¬ÎÒÃDZàÅÅÁËÁ½¸ö¿ÉÓõÄ·ÓÉ£¨¾¡¹ÜÁ½¸ö·Óɶ¼ºÜ¼òµ¥£¬µ«È·ÊµÊÇÁ½¸ö¶ÀÁ¢µÄ·ÓÉ£©ÃüÃûΪDirectRouteAºÍDirectRouteB¡£ÆäÖÐDirectRouteAʵÀýÔÚ×îºóÒ»¸öEndpoint¿ØÖƶ˵㣨direct:directRouteB£©ÖÐʹÓÃEndpoint
Direct½«ExchangeÏûÏ¢·¢Ë͵½DirectRouteBʵÀýµÄ¿ªÊ¼Î»Öá£ÒÔÏÂÊÇ¿ØÖÆÌ¨Êä³öµÄÄÚÈÝ£º
[2016-06-26 09:54:38]
INFO qtp231573738-21 Exchange[Id: ID-yinwenjie-240-54473-1466906074572-0-1,
ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
[2016-06-26 09:54:38] INFO qtp231573738-21
Exchange[Id: ID-yinwenjie-240-54473-1466906074572-0-1,
ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
|
´ÓÒÔÉÏÖ´ÐÐЧ¹ûÎÒÃÇ¿ÉÒÔ¿´µ½£¬±»Á¬½ÓµÄÁ½¸ö·ÓÉʹÓõÄExchange¶ÔÏóÊÇͬһ¸ö£¬Ò²¾ÍÊÇ˵ÔÚDirectRouteB·ÓÉÖÐÈç¹ûExchange¶ÔÏóÖеÄÄÚÈÝ·¢ÉúÁ˱仯¾Í»áÔÚËæºó¼ÌÐøÖ´ÐеÄDirectRouteA·ÓÉÖвúÉúÓ°Ïì¡£Endpoint
DirectÔªËØÔÚÎÒÃÇʵ¼ÊʹÓÃCamel½øÐзÓɱàÅÅʱ£¬Ó¦ÓÃÆµ¶È·Ç³£¸ß¡£ÒòΪËü¿ÉÒ԰Ѷà¸öÒѱàÅźõÄ·Óɰ´ÕÕÒµÎñÒªÇóÁ¬½ÓÆðÀ´£¬ÐγÉÒ»¸öеķÓÉ£¬±£³ÖÔÓзÓɵÄÁ¼ºÃÖØÓá£
========================================£¨Ôö²¹Í꣩
3-3¡¢Processor ´¦ÀíÆ÷
CamelÖÐÁíÒ»¸öÖØÒªµÄÔªËØÊÇProcessor´¦ÀíÆ÷£¬ËüÓÃÓÚ½ÓÊÕ´Ó¿ØÖƶ˵㡢·ÓÉÑ¡ÔñÌõ¼þÓÖ»òÕßÁíÒ»¸ö´¦ÀíÆ÷µÄExchangeÖд«À´µÄÏûÏ¢ÐÅÏ¢£¬²¢½øÐд¦Àí¡£CamelºËÐİüºÍ¸÷¸öPlugin×é¼þ¶¼ÌṩÁ˺ܶàProcessorµÄʵÏÖ£¬¿ª·¢ÈËÔ±Ò²¿ÉÒÔͨ¹ýʵÏÖorg.apache.camel.Processor½Ó¿Ú×Ô¶¨Òå´¦ÀíÆ÷£¨ºóÕßÊÇͨ³£×ö·¨£©¡£
¼ÈÈ»ÊÇ×ö±àÂ룬ÄÇôÎÒÃÇ×ÔÈ»¿ÉÒÔÔÚ×Ô¶¨ÒåµÄProcessor´¦ÀíÆ÷ÖÐ×öºÜ¶àÊÂÇé¡£ÕâЩÊÂÇé¿ÉÄܰüÀ¨´¦ÀíÒµÎñÂß¼¡¢½¨Á¢Êý¾Ý¿âÁ¬½ÓÈ¥×öÒµÎñÊý¾Ý´æ´¢¡¢½¨Á¢ºÍij¸öµÚÈý·½ÒµÎñϵͳµÄRPCÁ¬½Ó£¬µ«ÊÇÎÒÃÇÒ»°ã²»»áÄÇÑù×ö¡ª¡ªÄÇÊÇEndpointµÄ¹¤×÷¡£Processor´¦ÀíÆ÷ÖÐ×îÖ÷ÒªµÄ¹¤×÷ÊǽøÐÐÒµÎñÊý¾Ý¸ñʽµÄת»»ºÍÖмäÊý¾ÝµÄÁÙʱ´æ´¢¡£ÕâÑù×öÊÇÒòΪProcessor´¦ÀíÆ÷ÊÇCamel±àÅŵÄ·ÓÉÖУ¬Ö÷Òª½øÐÐExchangeÊäÈëÊä³öÏûÏ¢½»»»µÄµØ·½¡£
²»¹ý¿ª·¢ÈËÔ±µ±È»¿ÉÒÔÔÚProcessor´¦ÀíÆ÷ÖÐÁ¬½ÓÊý¾Ý¿â¡£ÀýÈ翪·¢ÈËÔ±ÐèÒª¸ù¾ÝÉÏÒ»¸öEndpointÖÐЯ´øµÄ¡°¶©µ¥±àºÅǰ׺¡±ÐÅÏ¢£¬ÔÚProcessor´¦ÀíÆ÷ÖÐÁ¬½Óµ½Ò»¸ö¶ÀÁ¢µÄÊý¾Ý¿âÖУ¨»òÕß»º´æ·þÎñÖУ©²éÕÒÆä¶ÔÓ¦µÄ·ÓÉÐÅÏ¢£¬ÒԱ㶯̬¾ö¶¨ÏÂÒ»¸ö·ÓÉ·¾¶¡£ÓÉÓÚCamelÖ§³ÖºÍJAVAÓïÑÔµÄSpring¿ò¼ÜÎ޷켯³É£¬ËùÒÔÒªÔÚProcessor´¦ÀíÆ÷ÖвÙ×÷Êý¾Ý¿âÖ»ÐèÒª½øÐзdz£¼òµ¥µÄÅäÖá£
ÒÔÏ´úÂëÆ¬¶ÎÊÇ×Ô¶¨ÒåµÄProcessor´¦ÀíÆ÷ʵÏÖ£¬ÆäÖеÄprocess(Exchange exchange)·½·¨ÊDZØÐë½øÐÐʵÏֵģº
// Ò»¸ö×Ô¶¨Òå´¦ÀíÆ÷µÄʵÏÖ
// ¾ÍÊÇÎÒÃÇÉÏÎÄ¿´µ½¹ýµÄ´¦ÀíÆ÷ʵÏÖÁË
public class OtherProcessor implements Processor
{
......
@Override
public void process(Exchange exchange) throws
Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
//===============
// Äú¿ÉÒÔÔÚÕâÀï½øÐÐÊý¾Ý¸ñʽת»»
// ²¢ÇÒ½«½á¹û´æ´¢µ½out messageÖÐ
//===============
// ´æÈëµ½exchangeµÄoutÇøÓò
if(exchange.getPattern() == ExchangePattern.InOut)
{
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || other out");
}
}
......
} |
×¢Ò⣬´¦ÀíÆ÷ProcessorÊǺͿØÖÆ¶ËµãÆ½¼¶µÄ¸ÅÄî¡£Òª¿´Ò»¸öURI¶ÔÓ¦µÄʵÏÖÊÇ·ñÊÇÒ»¸ö¿ØÖƶ˵㣬×î¸ù±¾µÄ¾ÍÊÇ¿´Õâ¸öʵÏÖÀàÊÇ·ñʵÏÖÁËorg.apache.camel.Endpoint½Ó¿Ú£»¶øÒª¿´Ò»¸ö·ÓÉÖеÄÔªËØÊÇ·ñÊÇProcessor´¦ÀíÆ÷£¬×î¸ù±¾µÄ¾ÍÊÇ¿´Õâ¸öÀàÊÇ·ñʵÏÖÁËorg.apache.camel.Processor½Ó¿Ú¡£
3-5¡¢Routing·ÓÉÌõ¼þ
ÔÚ¿ØÖƶ˵ãºÍ´¦ÀíÆ÷Ö®¼ä¡¢´¦ÀíÆ÷ºÍ´¦ÀíÆ÷Ö®¼ä£¬CamelÔÊÐí¿ª·¢ÈËÔ±½øÐзÓÉÌõ¼þÉèÖá£ÀýÈ翪·¢ÈËÔ±¿ÉÒÔÓµÓе±Exchange
In MessageµÄÄÚÈÝΪAµÄÇé¿öϽ«ÏûÏ¢ËÍÈëÏÂÒ»¸ö´¦ÀíÆ÷A£¬µ±Exchange In MessageµÄÄÚÈÝΪBʱ½«ÏûÏ¢ËÍÈëÏÂÒ»¸ö´¦ÀíÆ÷BµÄ´¦ÀíÄÜÁ¦¡£ÓÖÀýÈ磬ÎÞÂÛ±àÅŵÄ·ÓÉÖÐÉÏÒ»¸öÔªËØµÄ´¦ÀíÏûÏ¢ÈçºÎ£¬¶¼½«Ð¯´øÏûÏ¢µÄExchange¶ÔÏó¸´ÖÆ
¶à·Ý£¬·Ö±ðËÍÈëÏÂÒ»´¦ÀíÆ÷X¡¢Y¡¢Z¡£¿ª·¢ÈËÔ±ÉõÖÁ»¹¿ÉÒÔͨ¹ý·ÓɹæÔòÍê³ÉExchangeµ½¶à¸öEndpointµÄ¸ºÔØ´«Êä¡£
CamelÖÐÖ§³ÖµÄ·ÓɹæÔò·Ç³£·á¸»£¬°üÀ¨£ºMessage Filter¡¢Based
Router¡¢Dynamic Router¡¢Splitter¡¢Aggregator¡¢ResequencerµÈµÈ¡£ÔÚCamelµÄ¹Ù·½ÎĵµÖÐʹÓÃÁ˷dz£ÐÎÏ󻯵ÄͼÐÎÀ´±íʾÕâЩ·Óɹ¦ÄÜ.

ʵ¼ÊÉÏEIP¹æÔòÖÐËùÃèÊöµÄ´ó²¿·ÖÒµÎñ¼¯³Éģʽ£¬ÔÚÒÔÉÏÒ³Ãæ¶¼ÄÜÕÒµ½¶ÔÓ¦µÄͼÐλ¯±í´ï¡£µ«ÓÉÓÚÆª·ùºÍ±¾×¨ÌâµÄÖÐÐÄ˼ÏëÏÞÖÆ£¬Ë¡±ÊÕß²»ÄܶÔCamelÖеÄ·ÓɹæÔòÖðÒ»½²½â¡£ÕâÀïÎÒÃÇѡȡÁ½¸ö±È½ÏÓдú±íÐԵķÓɹæÔò½øÐн²½â£ºContent
Based RouterºÍRecipient List¡£Ï£Íû¶Ô¸÷λ¶ÁÕßÀí½âCamelÖеÄ·ÓɹæÔòÓÐËù°ïÖú£º
3-5-1¡¢Content Based Router »ùÓÚÄÚÈݵÄ·ÓÉ
°ÑContent Based RouterÒëΪ»ùÓÚÄÚÈݵÄ·ÓÉ£¬±ÊÕß¾õµÃ¸üΪÌùÇУ¨²¢²»ÄÜÒë×÷»ù±¾Â·ÓÉ£¬Êµ¼ÊÉÏÄãÎÞ·¨¶¨ÒåʲôÊÇ»ù±¾Â·ÓÉ£©¡£Ëü²¢²»ÊÇÒ»ÖÖµ¥Ò»µÄ·ÓÉ·½Ê½£¬¶øÊǶàÖÖ»ùÓÚÌõ¼þºÍÅжϱí´ïʽµÄ·ÓÉ·½Ê½¡£ÆäÖпÉÄܰüÀ¨choiceÓï¾ä/·½·¨¡¢whenÓï¾ä/·½·¨¡¢otherwiseÓï¾ä/·½·¨¡£Çë¿´ÒÔÏÂʾÀý£º
package com.yinwenjie.test.cameltest.helloworld;
......
/**
* ʹÓÃÌõ¼þÑ¡Ôñ½øÐзÓɱàÅÅ
* @author yinwenjie
*/
public class ChoiceCamel extends RouteBuilder
{
public static void main(String[] args) throws
Exception {
// ÕâÊÇcamelÉÏÏÂÎĶÔÏó£¬Õû¸ö·ÓɵÄÇý¶¯È«¿¿ËüÁË¡£
ModelCamelContext camelContext = new DefaultCamelContext();
// Æô¶¯route
camelContext.start();
// ½«ÎÒÃDZàÅŵÄÒ»¸öÍêÕûÏûϢ·Óɹý³Ì£¬¼ÓÈëµ½ÉÏÏÂÎÄÖÐ
camelContext.addRoutes(new ChoiceCamel());
// ͨÓÃûÓоßÌåÒµÎñÒâÒåµÄ´úÂ룬ֻÊÇΪÁ˱£Ö¤Ö÷Ï̲߳»Í˳ö
synchronized (ChoiceCamel.class) {
ChoiceCamel.class.wait();
}
}
@Override
public void configure() throws Exception {
// ÕâÊÇÒ»¸öJsonPath±í´ïʽ£¬ÓÃÓÚ´ÓhttpЯ´øµÄjsonÐÅÏ¢ÖУ¬ÌáÈ¡orgIdÊôÐÔµÄÖµ
JsonPathExpression jsonPathExpression = new
JsonPathExpression("$.data.orgId");
jsonPathExpression.setResultType(String.class);
// ͨÓÃʹÓÃhttpÐÒé½ÓÊÜÏûÏ¢
from("jetty:http://0.0.0.0:8282/choiceCamel")
// Ê×ÏÈËÍÈëHttpProcessor£¬
// ¸ºÔð½«exchange in Message BodyÖ®ÖеÄstreamת³É×Ö·û´®
// µ±È»£¬²»×ªµÄ»°£¬ÏÂÃæÖ÷ÒªµÄchoice²Ù×÷Ò²¿ÉÒÔÔËÐÐ
// HttpProcessorÖеÄʵÏÖºÍÉÏÎÄ´úÂëÆ¬¶ÎÖеÄÒ»Ö£¬ÕâÀï¾Í²»ÔÙÖØ¸´Ìù³ö
.process(new HttpProcessor())
// ½«orgIdÊôÐÔµÄÖµ´æ´¢ exchange in MessageµÄheaderÖУ¬ÒÔ±ãºóÐø½øÐÐÅжÏ
.setHeader("orgId", jsonPathExpression)
.choice()
// µ±orgId == yuanbao£¬Ö´ÐÐOtherProcessor
// µ±orgId == yinwenjie£¬Ö´ÐÐOtherProcessor2
// ÆäËüÇé¿öÖ´ÐÐOtherProcessor3
.when(header("orgId").isEqualTo("yuanbao"))
.process(new OtherProcessor())
.when(header("orgId").isEqualTo("yinwenjie"))
.process(new OtherProcessor2())
.otherwise()
.process(new OtherProcessor3())
// ½áÊø
.endChoice();
}
/**
* Õâ¸ö´¦ÀíÆ÷ÓÃÀ´Íê³ÉÊäÈëµÄjson¸ñʽµÄת»»
* ºÍÉÏһƪÎÄÕ³öÏÖµÄHttpProcessor ÄÚÈÝ»ù±¾Ò»Ö¡£¾Í²»ÔÙÌù³öÁË
* @author yinwenjie
*/
public class HttpProcessor implements Processor
{
......
}
/**
* ÁíÒ»¸ö´¦ÀíÆ÷OtherProcessor
* @author yinwenjie
*/
public class OtherProcessor implements Processor
{
@Override
public void process(Exchange exchange) throws
Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
// ´æÈëµ½exchangeµÄoutÇøÓò
if(exchange.getPattern() == ExchangePattern.InOut)
{
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || ±»OtherProcessor´¦Àí");
}
}
}
/**
* ºÜ¼òµ¥µÄ´¦ÀíÆ÷OtherProcessor2
* ºÍOtherProcessor»ù±¾Ïàͬ£¬¾Í²»ÔÙÖØ¸´Ìù³ö
* @author yinwenjie
*/
public class OtherProcessor2 implements Processor
{
......
outMessage.setBody(body + " || ±»OtherProcessor2´¦Àí");
}
/**
* ºÜ¼òµ¥µÄ´¦ÀíÆ÷OtherProcessor3
* ºÍOtherProcessor»ù±¾Ïàͬ£¬¾Í²»ÔÙÖØ¸´Ìù³ö
* @author yinwenjie
*/
public class OtherProcessor3 implements Processor
{
......
outMessage.setBody(body + " || ±»OtherProcessor3´¦Àí");
}
} |
ÒÔÉÏ´úÂëÆ¬¶ÎÖУ¬¿ª·¢ÈËÔ±Ê×ÏÈʹÓÃJsonPath±í´ïʽ£¬´ÓHttpÖÐЯ´øµÄjsonÐÅÏ¢ÖÐѰÕÒµ½orgIdÕâ¸öÊôÐÔµÄÖµ£¬È»ºó½«Õâ¸öÖµ´æ´¢ÔÚExchangeµÄheaderÇøÓò£¨ÕâÑù×öÖ»ÊÇΪÁ˺óÐø·½±ãÅжϣ¬ÄúÒ²¿ÉÒÔ½«Öµ´æ´¢ÔÚExchangeµÄpropertiesÇøÓò£¬»¹¿ÉÒÔÖ±½ÓʹÓÃJsonPath±í´ïʽ½øÐÐÅжϣ©
¡£½ÓÏÂÀ´£¬Í¨¹ýÅжϴ洢ÔÚheaderÇøÓòµÄÖµ£¬ÈÃÏûϢ·ÓɽøÈ벻ͬµÄProcessor´¦ÀíÆ÷¡£ÓÉÓÚÎÒÃÇÉèÖõÄfrom-jetty-endpointÖÐĬÈϵÄExchange
PatternֵΪInOut£¬ËùÒÔÔÚ¸÷¸öProcessor´¦ÀíÆ÷ÖÐÍê³É´¦Àíºó Out MessageµÄBodyÄÚÈÝ»áÒÔHttpÏìÓ¦½á¹ûµÄÐÎʽ·µ»Øµ½from-jetty-endPointÖС£×îºóÎÒÃǽ«ÔÚ²âÊÔÒ³ÃæÉÏ¿´µ½Processor´¦ÀíÆ÷ÖеÄÏûÏ¢Öµ¡£
CamelÖÐÖ§³Ö¾ø´ó¶àÊý±»¿ª·¢ÈËÔ±³ÐÈϺÍʹÓõıí´ïʽ£ºÕýÔòʽ¡¢XPath¡¢JsonPathµÈ¡£Èç¹û¸÷λ¶ÁÕß¶ÔJsonPathµÄÓï·¨»¹²»ÊìϤµÄ»°£¬¿ÉÒԲο¼GoogleÌṩµÄ˵Ã÷Îĵµ£¨https://code.google.com/p/json-path/£©¡£ÎªÁ˲âÊÔÒÔÉÏ´úÂëÆ¬¶ÎµÄ¹¤×÷Ч¹û£¬ÎÒÃÇʹÓÃPostman¹¤¾ßÏòÖ¸¶¨µÄµØÖ··¢ËÍÒ»¶ÎjsonÐÅÏ¢£¬²¢¹Û²ìÕû¸ö·ÓɵÄÖ´ÐÐЧ¹û¡£ÈçÏÂͼËùʾ£º

µ±orgIdµÄֵΪyuanbaoʱµÄÖ´ÐÐЧ¹û

µ±orgIdµÄֵΪyinwenjieʱµÄÖ´ÐÐЧ¹û
¹ØÓÚ·ÓÉÅжϣ¬CamelÖÐÌṩÁ˷ḻµÄÌõ¼þÅжÏÊֶΡ£³ýÁËÎÒÃÇÔÚ±¾Ð¡½ÚÖÐʹÓõÄisEqualTo·½Ê½»¹°üÀ¨£ºisGreaterThan¡¢isGreaterThanOrEqualTo¡¢isLessThan¡¢isLessThanOrEqualTo¡¢isNotEqualTo¡¢in£¨¶à¸öÖµ£©¡¢contains¡¢regexµÈµÈ£¬ËüÃǵĹ²Í¬µãÊÇÕâЩ·½·¨¶¼·µ»ØÄ³¸öʵÏÖÁËorg.apache.camel.Predicate½Ó¿ÚµÄÀà¡£
3-5-2¡¢Recipient List ½ÓÊÕÕßÁбí
ÔÚ±¾Ð¡½ÚÉϲ¿·ÖµÄ½éÉÜÖУ¬ÎÒÃÇ˵Ã÷ÁËÔõôʹÓÃÌõ¼þÅжÏÏòÈô¸É¿ÉÄܵÄ·ÓÉ·¾¶ÖеÄijһÌõ·¾¶´«ËÍÏûÏ¢¡£ÄÇôÈçºÎ×öµ½¸ù¾ÝÅжÏÌõ¼þ£¬ÏòÈô¸É¿ÉÄܵÄ·¾¶ÖÐµÄÆäÖжàÌõ·¾¶´«ËÍͬһÌõÏûÏ¢ÄØ£¿ÓÖ»òÕßÏòÈô¸ÉÌõ¿ÉÄܵÄ·¾¶È«²¿´«ÊäͬһÌõÏûÏ¢ÄØ£¿
ÔÚCamelÖпÉÄܱ»Ñ¡ÔñµÄÏûϢ·ÓÉ·¾¶³ÆÎª½ÓÊÕÕߣ¬CamelÌṩÁ˶àÖÖ·½Ê½Ïò·ÓÉÖпÉÄܳÉΪÏÂÒ»´¦ÀíÔªËØµÄ¶à¸ö½ÓÊÕÕß·¢ËÍÏûÏ¢£º¾²Ì¬½ÓÊÕÕßÁÐ±í£¨Static
Recipient List£©¡¢¶¯Ì¬½ÓÊÕÕßÁÐ±í£¨Dynamic Recipient List£©ºÍ Ñ»·¶¯Ì¬Â·ÓÉ£¨Dynamic
Router£©¡£ÏÂÃæÎÒÃǶÔÕ⼸ÖÖ½ÓÊÕÕßÁбíÐÎʽ½øÐÐÖðÒ»½²½â¡£
3-5-2-1¡¢Ê¹ÓÃmulticast´¦ÀíStatic Recipient List
ʹÓÃmulticast·½Ê½Ê±£¬Camel½«»á°ÑÉÏÒ»´¦ÀíÔªËØÊä³öµÄExchange¸´Öƶà·Ý·¢Ë͸øÕâ¸öÁбíÖеÄËùÓнÓÊÕÕߣ¬²¢ÇÒ°´Ë³ÐòÖðÒ»Ö´ÐУ¨¿ÉÉèÖÃΪ²¢Ðд¦Àí£©ÕâЩ½ÓÊÕÕß¡£ÕâЩ½ÓÊÕÕß¿ÉÄÜÊÇͨ¹ýDirectÁ¬½ÓµÄÁíÒ»¸ö·ÓÉ£¬Ò²¿ÉÄÜÊÇProcessor»òÕßij¸öµ¥Ò»µÄEndpoint¡£ÐèҪעÒâµÄÊÇ£¬ExcahngeÊÇÔÚEndpoint¿ØÖƶ˵ãºÍProcessor´¦ÀíÆ÷¼ä»òÕßÁ½¸öProcessor´¦ÀíÆ÷¼äΨһÄܹ»ÓÐЧЯ´øMessageµÄÔªËØ£¬ËùÒÔ½«Ò»ÌõÏûÏ¢¸´Öƶà·Ý²¢ÇÒÈÃÆäÖ´Ðв»Ï໥Êܵ½Ó°Ï죬ÄÇô±ØÈ»¾Í»á¶ÔExchange¶ÔÏó½øÐи´ÖÆ£¨ÊǸ´ÖÆ£¬ÊǸ´ÖÆ£¬ËäÈ»Ö÷ÒªÊôÐÔÄÚÈÝÏàͬ£¬µ«ÊÇÕâЩExchangeʹÓõÄÄÚ´æÇøÓò¶¼ÊDz»Ò»ÑùµÄ£¬ExchangeIdÒ²²»Ò»Ñù£©¡£ÒÔÏÂÊÇmulticastʹÓõļòµ¥Ê¾Àý´úÂ룺
package com.yinwenjie.test.cameltest.helloworld;
......
/**
* ²âÊÔ×鲥·ÓÉ
* @author yinwenjie
*/
public class MulticastCamel extends RouteBuilder
{
public static void main (String[] args) throws
Exception {
// ÕâÊÇcamelÉÏÏÂÎĶÔÏó£¬Õû¸ö·ÓɵÄÇý¶¯È«¿¿ËüÁË¡£
ModelCamelContext camelContext = new DefaultCamelContext ();
// Æô¶¯route
camelContext.start();
// ½«ÎÒÃDZàÅŵÄÒ»¸öÍêÕûÏûϢ·Óɹý³Ì£¬¼ÓÈëµ½ÉÏÏÂÎÄÖÐ
camelContext.addRoutes (new MulticastCamel());
// ͨÓÃûÓоßÌåÒµÎñÒâÒåµÄ´úÂ룬ֻÊÇΪÁ˱£Ö¤Ö÷Ï̲߳»Í˳ö
synchronized (MulticastCamel.class) {
MulticastCamel.class.wait();
}
}
@Override
public void configure() throws Exception {
// Õâ¸öÏ̳߳ØÓÃÀ´½øÐÐmulticastÖи÷¸ö·ÓÉÏß·µÄ²¢·¢Ö´ÐÐ
ExecutorService executorService = Executors.newFixedThreadPool(10);
MulticastDefinition multicastDefinition = from("jetty: http://0.0.0.0:8282/multicastCamel") .multicast();
// multicast ÖеÄÏûϢ·ÓÉ¿ÉÒÔ˳ÐòÖ´ÐÐÒ²¿ÉÒÔ²¢·¢Ö´ÐÐ
// ÕâÀïÎÒÃÇÑÝʾ²¢·¢Ö´ÐÐ
multicastDefinition .setParallelProcessing(true);
// Ϊ²¢·¢Ö´ÐÐÉèÖÃÒ»¸ö¶ÀÁ¢µÄÏ̳߳Ø
multicastDefinition.setExecutorService (executorService);
// ×¢Ò⣬multicastÖи÷·ÓÉ·¾¶µÄExcahnge¶¼ÊÇ»ùÓÚÉÏһ·ÓÉÔªËØµÄexcahnge¸´ÖƶøÀ´
// ÎÞÂÛǰÕßExcahngeÖеÄPatternÈçºÎÉèÖã¬Æä´¦Àí½á¹û¶¼²»»á·´Ó³ÔÚ×î³õµÄExcahnge¶ÔÏóÖÐ
multicastDefinition.to(
"log:helloworld1? showExchangeId=true"
,"log:helloworld2? showExchangeId=true")
// Ò»¶¨ÒªÊ¹ÓÃend£¬·ñÔòOtherProcessor»á±»×öΪmulticastÖеÄÒ»¸ö·Ö֧·ÓÉ
.end()
// ËùÒÔÄúÔÚOtherProcessorÖп´µ½µÄExcahngeÖеÄBody¡¢HeaderµÈÊôÐÔÄÚÈÝ
// ²»»áÓС°¸´ÖƵÄExchange¡±ÉèÖõÄÈκÎÖµµÄºÛ¼£
.process(new OtherProcessor());
}
/**
* ÁíÒ»¸ö´¦ÀíÆ÷
* @author yinwenjie
*/
public class OtherProcessor implements Processor
{
/* (non-Javadoc)
* @ see org.apache.camel.Processo #process (org.apache.camel.Exchange)
*/
@Override
public void process (Exchange exchange) throws
Exception {
Message message = exchange.getIn();
LOGGER.info(" OtherProcessorÖеÄexchange"
+ exchange);
String body = message.getBody().toString();
// ´æÈëµ½exchangeµÄoutÇøÓò
if(exchange.getPattern() == ExchangePattern.InOut)
{
Message outMessage = exchange.getOut();
outMessage.setBody (body + " || ±»OtherProcessor´¦Àí");
}
}
}
} |
ÒÔÉÏ´úÂëÆ¬¶ÎÖУ¬ÎÒÃÇʹÓÃmulticast½«ÔʼµÄExchange¸´ÖÆÁ˶à·Ý£¬·Ö±ð´«Ë͸ømulticastÖеÄÁ½¸ö½ÓÊÕÕߣ¬²¢ÇÒΪÁ˱£Ö¤Á½¸ö½ÓÊÕÕߵĴ¦Àí¹ý³ÌÊDz¢Ðеģ¬ÎÒÃÇ»¹×¨ÃÅΪmulticastÉèÖÃÁËÒ»¸öÏ̳߳أ¨²»ÉèÖõϰCamel½«×ÔÐÐÉèÖã©¡£ÔÚÒÔÉϵĴúÂëÆ¬¶ÎÖУ¬ÔÚmulticast·Óɶ¨ÒåÖ®ºóÎÒÃÇ»¹ÉèÖÃÁËÒ»¸öOtherProcessor´¦ÀíÆ÷£¬ËüÖ÷Òª×÷ÓþÍÊDz鿴OtherProcessorÖеÄExchange¶ÔÏóµÄ״̬¡£ÏÂÃæµÄ½ØÍ¼Õ¹Ê¾ÁËÒÔÉÏ´úÂëÆ¬¶ÎµÄÖ´ÐÐЧ¹û£º
[2016-06-24 16:07:43]
INFO pool-1-thread-7 Exchange[Id: ID-yinwenjie-240-54110-1466755310568-0-20,
ExchangePattern: InOut, BodyType: String, Body:
{"data":{"orgId":"yinwenjie"},"token":"d9c33c8f-ae59-4edf-b37f-290ff208de2e","desc":""}]
(MarkerIgnoringBase.java:96)
[2016-06-24 16:07:43] INFO pool-1-thread-8
Exchange[Id: ID-yinwenjie-240-54110-1466755310568-0-19,
ExchangePattern: InOut, BodyType: String, Body:
{"data":{"orgId":"yinwenjie"},"token":"d9c33c8f-ae59-4edf-b37f-290ff208de2e","desc":""}]
(MarkerIgnoringBase.java:96)
[2016-06-24 16:07:43] INFO qtp1060925979-18
OtherProcessorÖеÄexchange [id:ID-yinwenjie-240-54110-1466755310568-0-16]Exchange[Message:
{"data":{"orgId":"yinwenjie"},"token":"d9c33c8f-ae59-4edf-b37f-290ff208de2e","desc":""}]
(MulticastCamel.java:74) |
ͨ¹ýÖ´Ðнá¹û¿ÉÒÔ¿´µ½£¬ÔÚmulticastÖеÄÁ½¸ö½ÓÊÕÕߣ¨Á½¸ö·ÓÉ·ÖÖ§µÄÉ趨£©·Ö±ðÔÚÎÒÃÇÉèÖõÄÏ̳߳ØÖÐÔËÐУ¬Ïß³ÌID·Ö±ðÊÇ¡¾pool-1-thread-7¡¿ºÍ¡¾pool-1-thread-8¡¿¡£ÔÚmulticastÖеÄËùÓзÓÉ·ÖÖ§¶¼ÔËÐÐÍê³Éºó£¬OtherProcessor´¦ÀíÆ÷µÄʵÀýÔÚ¡¾qtp1060925979-18¡¿Ïß³ÌÖмÌÐøÔËÐУ¨jetty:http-endpint¶ÔÓÚ±¾´ÎÇëÇóµÄ´¦ÀíÔ±¾¾ÍÔÚÕâ¸öÏß³ÌÉÏÔËÐУ©¡£
Çë¸÷λ¶ÁÕßÌØ±ð×¢ÒâÒÔÉÏÈý¾äÈÕÖ¾ËùÊä³öµÄExchangeId£¬ËüÃÇÊÇÍêÈ«²»Í¬µÄÈý¸öExchangeʵÀý£¡ÆäÖÐÔÚmulticastµÄÁ½¸ö·ÓÉ·ÖÖ§ÖгÐÔØMessageµÄExcahnge¶ÔÏó£¬ËüÃǵÄExchange-IDºÅ·Ö±ðΪ¡¾ID-yinwenjie-240-54110-1466755310568-0-20¡¿ºÍ¡¾ID-yinwenjie-240-54110-1466755310568-0-19¡¿£¬À´Ô´ÔòÊÇmulticast¶ÔÔʼExchange¶ÔÏóµÄ¸´ÖÆ£¬ÔʼExchagne¶ÔÏóµÄExchange-IDΪ¡¾ID-yinwenjie-240-54110-1466755310568-0-16¡¿¡£
3-5-2-2¡¢´¦ÀíDynamic Recipient List
ÔÚ±àÅÅ·ÓÉ£¬ºÜ¶àÇé¿öÏ¿ª·¢ÈËÔ±²»ÄÜÈ·¶¨ÓÐÄÄЩ½ÓÊÕÕß»á³ÉΪÏÂÒ»¸ö´¦ÀíÔªËØ£ºÒòΪËüÃÇÐèÒªÓÉExchangeÖÐËùЯ´øµÄÏûÏ¢ÄÚÈÝÀ´¶¯Ì¬¾ö¶¨ÏÂÒ»¸ö´¦ÀíÔªËØ¡£ÕâÖÖÇé¿öÏ£¬¿ª·¢ÈËÔ±¾ÍÐèÒªÓõ½recipient·½·¨¶ÔÏÂһ·ÓÉÄ¿±ê½øÐж¯Ì¬Åжϡ£ÒÔÏ´úÂëʾÀýÖУ¬ÎÒÃǽ«Èý¸öÒѾ±àÅźõÄ·ÓÉ×¢²áµ½Camel·þÎñÖУ¬²¢Í¨¹ý´òÓ¡ÔÚ¿ØÖÆÌ¨ÉϵĽá¹û¹Û²ìÆäÖ´ÐУº
µÚÒ»¸ö·ÓÉ DirectRouteA
public class DirectRouteA
extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel. builder.RouteBuilder #configure()
*/
@Override
public void configure() throws Exception {
from("jetty: http://0.0.0.0:8282/ dynamicCamel")
.setExchangePattern (ExchangePattern.InOnly)
.recipientList().jsonpath ("$.data.routeName") .delimiter (",")
.end()
.process(new OtherProcessor());
}
} |
µÚ¶þ¸öºÍµÚÈý¸ö·ÓÉ
/**
* @author yinwenjie
*/
public class DirectRouteB extends RouteBuilder
{
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
// µÚ¶þ¸ö·Óɺ͵ÚÈý¸ö·ÓɵĴúÂë¶¼ÏàËÆ
// Ψһ²»Í¬µÄÊÇÀàÐÍ
from("direct:directRouteB")
.to("log:DirectRouteB?showExchangeId=true");
}
} |
×¢²áµ½Camel·þÎñÖУ¬²¢¿ªÊ¼Ö´ÐÐ
......
public static void main(String[] args) throws
Exception {
// ÕâÊÇcamelÉÏÏÂÎĶÔÏó£¬Õû¸ö·ÓɵÄÇý¶¯È«¿¿ËüÁË¡£
ModelCamelContext camelContext = new DefaultCamelContext();
// Æô¶¯route
camelContext.start();
// ½«ÎÒÃDZàÅŵÄÒ»¸öÍêÕûÏûϢ·Óɹý³Ì£¬¼ÓÈëµ½ÉÏÏÂÎÄÖÐ
camelContext.addRoutes((new DynamicCamel()).new
DirectRouteA());
camelContext.addRoutes((new DynamicCamel()).new
DirectRouteB());
camelContext.addRoutes((new DynamicCamel()).new
DirectRouteC());
// ͨÓÃûÓоßÌåÒµÎñÒâÒåµÄ´úÂ룬ֻÊÇΪÁ˱£Ö¤Ö÷Ï̲߳»Í˳ö
synchronized (DynamicCamel.class) {
DynamicCamel.class.wait();
}
}
...... |
DirectRouteB·ÓɺÍDirectRouteC·ÓÉÖеĴúÂë·Ç³£¼òµ¥£¬¾ÍÊÇ´Óͨ¹ýdirectÁ¬½Óµ½±¾Â·ÓɵÄÉÏÒ»¸ö·ÓÉʵÀýÖлñÈ¡²¢´òÓ¡Exchange¶ÔÏóµÄÐÅÏ¢¡£ËùÒÔ¸÷λ¶ÁÕß¿ÉÒÔ¿´µ½ÒÔÉÏ´úÂëÆ¬¶ÎÖ»ÁоÙÁËDirectRouteBµÄ´úÂëÐÅÏ¢¡£DirectRouteA·ÓÉÖС°ExchangePattern.InOnly¡±µÄ×÷ÓÃÔÚÉÏÎÄÖÐÒѾ½²¹ý£¬ÕâÀï¾Í²»ÔÙ½øÐÐ׸ÊöÁË¡£ÐèÒªÖØµã˵Ã÷µÄÊÇrecipientList·½·¨£¬Õâ¸ö·½·¨¿ÉÒÔÏñmulticast·½·¨ÄÇÑù½øÐв¢·¢Ö´ÐлòÕßÔËÐÐÏ̳߳صÄÉèÖ㬵«ÊÇÔÚDirectRouteAµÄ´úÂëÖÐÎÒÃDz¢Ã»ÓÐÄÇÑù×ö£¬ÕâÊÇΪÁËÈöÁÕß¿´Çå³ýrecipientList»òÕßmulticast·½·¨µÄ˳ÐòÖ´ÐÐÖ´ÐÐЧ¹û¡£ÒÔÏÂÊÇÎÒÃÇÆô¶¯Camel·þÎñºó£¬´ÓPostman£¨»òÕ߯äËû²âÊÔ¹¤¾ß£©´«ÈëµÄJSON¸ñʽµÄÐÅÏ¢£º
{"data":{"routeName":"
direct:directRouteB, direct:directRouteC"},"t
oken":"d9c33c8f-ae59-4edf -b37f -290ff208de2e","desc":""} |
recipientList·½·¨½«ÒÔ .data.routeName ÖÐÖ¸¶¨µÄ·ÓÉÐÅÏ¢¶¯Ì¬¾ö¶¨Ò»Ï¸ö»òÕß¶à¸öÏûÏ¢½ÓÊÕÕߣ¬ÒÔÉÏJSONƬ¶ÎÖÐÎÒÃÇÖ¸¶¨ÁËÁ½¸ö¡°direct:directRouteB,direct:directRouteC¡±¡£ÄÇôrecipientList»áʹÓÃdelimiter·½·¨ÖÐÉèÖõġ°,¡±×÷Ϊ·Ö¸ô·ûÀ´·Ö±ðÈ·¶¨ÕâÁ½¸ö½ÓÊÕÕß¡£
¹Û²ìµ½µÄÖ´ÐÐЧ¹û
[2016-06-26
10:31:53] INFO qtp1896561093-16 Exchange[Id: ID-yinwenjie-240-55214-1466908306101-0-3,
ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
[2016-06-26 10:31:53] INFO qtp1896561093-16
Exchange[Id: ID-yinwenjie-240-55214-1466908306101-0-4,
ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
[2016-06-26 10:31:53] INFO qtp1896561093-16
OtherProcessorÖеÄexchange [id:ID-yinwenjie-240-55214-1466908306101-0-1]Exchange[Message:
[Body is instance of org.apache.camel.StreamCache]]
(DynamicCamel.java:100) |
¾²Ì¬Â·ÓɺͶ¯Ì¬Â·ÓÉÔÚÖ´ÐÐЧ¹ûÉÏÓкܶàÏàËÆÖ®´¦¡£ÀýÈçÔÚÁ½ÖÖ·¾¶Ñ¡Ôñ·½Ê½ÖУ¬Â·ÓÉ·ÖÖ§ÉϵĽÓÊÕÕßÖÐʹÓõÄExchange¶ÔÏóµÄÀ´Ô´¶¼ÊǶÔÉÏÒ»Ö´ÐÐÔªËØËùÊä³öµÄExchange¶ÔÏóµÄ¸´ÖÆ£¬ÕâЩExchange¶ÔÏó³ýÁËÆäÖÐЯ´øµÄÒµÎñÄÚÈÝÏàͬÍ⣬ExchangeIDÊDz»Ò»Ñù£¬Ò²¾ÍÊÇ˵ÿ¸ö·ÓÉ·ÖÖ§µÄExchange¶ÔÏó¶¼²»Ïàͬ¡£ËùÒÔ¸÷·ÓÉ·ÖÖ§µÄÏûÏ¢¶¼²»Êܱ˴ËÓ°Ïì¡£ÁíÍ⶯̬·Óɺ;²Ì¬Â·Óɶ¼Ö§³Ö¶Ô·ÓÉ·ÖÖ§µÄ˳ÐòÖ´ÐкͲ¢·¢Ö´ÐУ¬¶¼¿ÉÒÔΪ²¢·¢Ö´ÐÐÉèÖöÀÁ¢µÄÏ̳߳ء£
´ÓÒÔÉÏÖ´ÐÐЧ¹ûÖÐÎÒÃÇ¿ÉÒÔ¿´µ½£¬ÓÉÓÚÎÒÃÇûÓÐÉèÖö¯Ì¬Â·ÓÉÊDz¢·¢Ö´ÐУ¬ËùÒÔ¸÷¸öÐèÒªÖ´ÐеÄ·ÓÉ·ÖÖ§¶¼ÊÇÓÉÃûΪ¡¾qtp1896561093-16¡¿µÄCamel·þÎñÏß³ÌÒÀ´ÎÖ´ÐУ¬²¢ÇÒÿ¸ö·ÓÉ·ÖÖ§µÄExchange¶ÔÏó¶¼²»Êܱ˴ËÓ°Ïì¡£ÁíÍ⣬Çë×¢ÒâÒÔÉÏÖ´Ðнá¹ûµÄ×îºóÒ»ÌõÈÕÖ¾ÐÅÏ¢£¬ËüÊÇÔÚ·ÓÉ·ÖÖ§ÒÔÍâ¶ÔOtherProcessor´¦ÀíÆ÷µÄÖ´ÐС£Óɴ˿ɼûÎÞÂÛ·ÓÉ·ÖÖ§ÈçºÎÖ´ÐУ¬¶¼²»»áÓ°Ïì·ÓÉ·ÖÖ§ÒÔÍâµÄÔªËØÖ´ÐÐʱËùʹÓõÄExchange¶ÔÏó¡£ |