Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spring Cloud Stream ¼òÃ÷½Ì³Ì
 
  5302  次浏览      27
 2018-3-22
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄͨ¹ý¶ÔSpring Cloud Stream ֪ʶÕûÀí £¬ ¼òÒªµÄ½éÉÜÁ˸ÅÄʹÓ÷½·¨ £¬Ï£Íû¶ÔÄúµÄѧϰÓаïÖú¡£

Spring Cloud Stream ֪ʶÕûÀí

¸ÅÄî

1. ·¢²¼/¶©ÔÄ

¼òµ¥µÄ½²¾ÍÊÇÒ»ÖÖÉú²úÕߣ¬Ïû·ÑÕßģʽ¡£·¢²¼ÕßÊÇÉú²ú£¬½«Êä³ö·¢²¼µ½Êý¾ÝÖÐÐÄ£¬¶©ÔÄÕßÊÇÏû·ÑÕߣ¬¶©ÔÄ×Ô¼º¸ÐÐËȤµÄÊý¾Ý¡£µ±ÓÐÊý¾Ýµ½´ïÊý¾ÝÖÐÐÄʱ£¬¾Í°ÑÊý¾Ý·¢Ë͸ø¶ÔÓ¦µÄ¶©ÔÄÕß¡£

2. Ïû·Ñ×é

Ö±¹ÛµÄÀí½â¾ÍÊÇһȺÏû·ÑÕßÒ»Æð´¦ÀíÏûÏ¢¡£ÐèҪעÒâµÄÊÇ£ºÃ¿¸ö·¢Ë͵½Ïû·Ñ×éµÄÊý¾Ý£¬½öÓÉÏû·Ñ×éÖеÄÒ»¸öÏû·ÑÕß´¦Àí¡£

3. ·ÖÇø

Àà±ÈÓÚÏû·Ñ×飬·ÖÇøÊǽ«Êý¾Ý·ÖÇø¡£¾ÙÀý£ºÄ³Ó¦ÓÃÓжà¸öʵÀý£¬¶¼°ó¶¨µ½Í¬Ò»¸öÊý¾ÝÖÐÐÄ£¬Ò²¾ÍÊDz»Í¬ÊµÀý¶¼½«Êý¾Ý·¢²¼µ½Í¬Ò»¸öÊý¾ÝÖÐÐÄ¡£·ÖÇø¾ÍÊǽ«Êý¾ÝÖÐÐĵÄÊý¾ÝÔÙϸ·Ö³É²»Í¬µÄÇø¡£ÎªÊ²Ã´ÐèÒª·ÖÇø£¿ÒòΪ¼´Ê¹ÊÇͬһ¸öÓ¦Ó㬲»Í¬ÊµÀý·¢²¼µÄÊý¾ÝÀàÐÍ¿ÉÄܲ»Í¬£¬Ò²Ï£ÍûÕâЩÊý¾ÝÓɲ»Í¬µÄÏû·ÑÕß´¦Àí¡£Õâ¾ÍÐèÒª£¬Ïû·ÑÕß¿ÉÒÔ½ö¶©ÔÄÒ»¸öÊý¾ÝÖÐÐĵIJ¿·ÖÊý¾Ý¡£Õâ¾ÍÐèÒª·ÖÇøÕâ¸ö¶«Î÷ÁË¡£

Spring Cloud Stream¼ò½é

1. Ó¦ÓÃÄ£ÐÍ

Spring Cloud StreamÓ¦ÓÃÓɵÚÈý·½µÄÖмä¼þ×é³É¡£Ó¦ÓüäµÄͨÐÅͨ¹ýÊäÈëͨµÀ£¨input channel£©ºÍÊä³öͨµÀ£¨output channel£©Íê³É¡£ÕâЩͨµÀÊÇÓÐSpring Cloud Stream ×¢ÈëµÄ¡£¶øÍ¨µÀÓëÍⲿµÄ´úÀí£¨¿ÉÒÔÀí½âΪÉÏÎÄËù˵µÄÊý¾ÝÖÐÐÄ£©µÄÁ¬½ÓÓÖÊÇͨ¹ýBinderʵÏֵġ£

Spring Cloud Stream Ó¦ÓÃÄ£ÐÍ

ÉÏͼ¾ÍÊÇSpring Cloud StreamµÄÓ¦ÓÃÄ£ÐÍ¡£

1.1 ¿É¶ÀÁ¢ÔËÐеÄjar

Spring Cloud StreamÓ¦ÓÿÉÒÔÖ±½ÓÔÚIDEÔËÐС£ÕâÑù»áºÜ·½±ã²âÊÔ¡£µ«ÔÚÉú²ú»·¾³Ï£¬ÕâÊDz»Êʺϵġ£Spring BootΪmavenºÍGradleÌṩÁË´ò°ü³É¿ÉÔËÐÐjarµÄ¹¤¾ß£¬Äã¿ÉÒÔʹÓÃÕâ¸ö¹¤¾ß½«Spring Cloud StreamÓ¦Óôò°ü¡£

2. ³éÏóµÄBinder

Binder¿ÉÒÔÀí½âΪÌṩÁËMiddleware²Ù×÷·½·¨µÄÀà¡£Spring Cloud ÌṩÁËBinder³éÏó½Ó¿ÚÒÔ¼°KafKaºÍRabbit MQµÄBinderµÄʵÏÖ¡£

ʹÓÃSpring Cloud Stream

1. ¿ìËÙ¿ªÊ¼

ÕâÀïÏȷųöÇ°ÃæµÄÓ¦ÓÃÄ£ÐÍͼ

ÏÂÃæÀý×ÓʹÓõÄMiddlewareÊÇKafka£¬°æ±¾ÊÇkafka_2.11-1.0.0¡£KafkaʹÓõÄÊÇĬÈÏÅäÖã¬Ò²¾ÍÊÇ´ÓKafka¹ÙÍøÏÂÔØºÃºóÖ±½Ó´ò¿ª£¬²»¸ü¸ÄÈκÎÅäÖá£

¹ØÓÚpom.xmlÖÐÒÀÀµµÄÏîÄ¿µÄ°æ±¾ÎÊÌ⣬×îºÃ²»¸Ã³É±ðµÄ°æ±¾£¬ÒòΪºÜ´ó¿ÉÄܵ¼Ö°汾³åÍ»¡£

1.1 pom.xml

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Ditmars.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>


<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>

ÐèҪעÒâµÄÊÇ£º¹ÙÍøÉϵÄÀý×ÓÊÇûÓÐÏÂÃæÅäÖõÄ

<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>

µ«ÊÇÔÚ±¾È˵çÄÔÉÏÈç¹û²»¼ÓÉÏÉÏÃæÄǶÎÅäÖþÍÊDZ¨´í£¬¶ÁÕß¿ÉÒÔ°´ÕÕ¸öÈËÇé¿öÑ¡Ôñ¼Ó²»¼Ó¡£

¼òµ¥ËµÃ÷Ò»ÏÂÒÔÉÏÅäÖÃ

<parent>...</parent>£ºÕâ¶Î´ú±í¼Ì³Ðspring-boot-starter-parentµÄÅäÖá£ÒòΪSpring Cloud Stream ÒÀÀµSpring BootµÄ×Ô¶¯ÅäÖã¬ËùÒÔÐèÒª¼ÓÉÏÕâ¶Î¡£

<dependencyManagement>...</dependencyManagement>£ºÕâ¶ÎÊÇÒýÈëspring-cloud-stream-dependencies.pom.xml£¬¸ÃÅäÖÃÎļþÀﺬÓÐSpring Cloud Stream ÏîÄ¿ÐèҪʹÓõÄjar°üµÄÐÅÏ¢£¨°üÃû¼Ó°æ±¾ºÅ£©

<dependencies>...</dependencies>ÒÀÀµÁ½¸östarter

1.2 App.java

@EnableBinding(value = { Processor.class })
@SpringBootApplication
public class App {

public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(App.class);
// ×¢²á´¦Àíº¯Êý
System.out.println("×¢²á½á¹û£º" + setHander(context));
// ·¢ËÍÏûÏ¢
System.out.println("·¢Ëͽá¹û£º" + write(context));
}

// ·¢ËÍÏûÏ¢
public static boolean write(ConfigurableApplicationContext context) {
Service service = context.getBean(Service.class);
return service.write("¹·×ÓÔÚÂð?");
}

// ×¢²á½ÓÊÕµ½ÏûϢʱµÄ´¦Àíº¯Êý
public static boolean setHander(ConfigurableApplicationContext context) {
Service service = context.getBean(Service.class);
return service.subscribe(result -> {
System.out.print("¹·×ÓÊÕµ½ÏûÏ¢£º" + result.getPayload());
});
}
}

ÉÏÃæÊ¹ÓÃÁËÁ½¸ö×¢½â£º@EnableBinding ºÍ @SpringBootApplication¡£@SpringBootApplication ¾Í²»ËµÁË¡£@EnableBinding ×¢½â½ÓÊÕÒ»¸ö²ÎÊý£¬²ÎÊýÀàÐÍÊÇclass¡£ÉÏÃæ´úÂëÖУ¬´«ÈëµÄ²ÎÊýÊÇ¡°Processor.class¡±,ÕâÊÇÒ»¸ö½Ó¿Ú£¬¶¨ÒåÁËÁ½¸öchannel£¬·Ö±ðÊÇinputºÍoutput¡£¿´Ãû³Æ¾ÍÖªµÀ£¬Ò»¸öÊÇÊä³öͨµÀ£¨input channel£©£¬Ò»¸öÊÇÊä³öͨµÀ£¨output channel£©¡£¡°@EnableBinding(value = { Processor.class })¡±ÕâÕû¶Î´ú±í´´½¨Processor¶¨ÒåµÄͨµÀ£¬²¢½«Í¨µÀºÍBinder°ó¶¨¡£

PorcessorÊÇSpring Cloud StreamΪ·½±ãʹÓöøÔ¤Ïȶ¨ÒåºÃµÄ£¬³ýÁËProcessor»¹ÓÐSinkºÍSource£¬ÕâЩ½Ó¿Ú¶¨ÒåÁËһЩͨµÀ£¨channel£©£¬ÐèҪʱ£¬Ö±½ÓʹÓþͺá£ÎÒÃÇÒ²ÄÜ×Ô¼º¶¨ÒåͨµÀ£¨channel£©£¬ÈçºÎ¶¨ÒåÏÂÎĻὲ¡£

AppÀàÖеÄmain·½·¨µ÷ÓÃÁËSpringApplication.run£¬½Ó×ŵ÷ÓÃÁËwriteºÍsetHandler·½·¨¡£·½·¨ºÜ¼òµ¥£¬ÉÏÎÄÓÐ×¢ÊÍ£¬²»ÔÙ׸Êö¡£

1.3 Service.java

@Component
public class Service {

@Autowired
private Processor processor;

public boolean write(String data) {
return processor.output().send(MessageBuilder

.withPayload(data).build());
}

public boolean subscribe(MessageHandler handler) {
return processor.input().subscribe(handler);
}
}

ÕâÊÇÒ»¸öserviceÀ࣬·â×°ÁËһЩ¶ÔͨµÀµÄ²Ù×÷¡£

ÐèҪעÒâµÄÊÇÕâ¶Î´úÂ룺

@Autowired
private Processor processor;

Ç°ÃæËµ¹ý£¬ProcessorÊÇÒ»¸ö¶¨ÒåÁËÊäÈëÊä³öͨµÀµÄ½Ó¿Ú£¬²¢Ã»ÓоßÌåʵÏÖ¡£Spring Cloud Stream»á°ïÎÒÃÇ×Ô¶¯ÊµÏÖËü¡£ÎÒÃÇÖ»ÐèÒª»ñÈ¡Ëü£¬²¢Ê¹ÓÃËü¡£

½Ó×Å¿´

processor.output().send(MessageBuilder

.withPayload(data).build());

ÏÈÊǵ÷ÓÃoutput()·½·¨»ñÈ¡Êä³öͨµÀ¶ÔÏ󣬽Ó×ŵ÷ÓÃsend·½·¨·¢ËÍÊý¾Ý¡£send·½·¨½ÓÊÕÒ»¸öMessage¶ÔÏó£¬Õâ¸ö¶ÔÏó²»ÄÜÖ±½Ónew£¬ÐèҪʹÓÃMessageBuilder»ñÈ¡¡£

1.4 application.properties

spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.output.destination=test

ÉÏÃæÅäÖÃÁËÄ¿µÄµØ£¬Àà±ÈÓÚKafkaµÄTopicºÍRabbitMQµÄ¶ÓÁеĸÅÄî¡£

ÅäÖøñʽÈçÏ£º

spring.cloud.stream.bindings.<channelName>.<key>=value

channelName¾ÍÊǹܵÀÃû£¬key¾ÍÊǶÔÓ¦ÊôÐÔ£¬ÕâÀïÊÇdestination£¬´ú±íÄ¿µÄµØ¡£

¹ÜµÀÃû£¬keyµÄÆäËû¿ÉѡֵÏÂÎĻὲ£¬ÕâÀﲻҪǿÇóÈ«²¿Åª¶®£¬½Ó×Å¿´¾ÍºÃ¡£

1.4 ×ܽá

ÉÏÃæ¾ÍÊÇÍêÕûµÄÀý×ÓÁË¡£¶Ô±ÈÇ°Ãæ¸ø³öµÄÓ¦ÓÃÄ£ÐÍͼ£¬ÉÏÃæµÄ´úÂëºÍÅäÖÃÎļþ¶¨ÒåÁËApplication Core£¨´úÂëÖеĴ¦Àíº¯Êý£¬·¢ËÍÏûÏ¢µÄº¯ÊýµÈµÈ£©£¬´´½¨ÁËͨµÀ²¢ºÍBinder°ó¶¨£¨@EnableBinding(value = { Processor.class })£©¡£Middleware¾ÍÊDZ¾½ÚÒ»¿ªÊ¼ËµµÄKafka¡£Õû¸öÁ÷³Ì´ó¸ÅÈçÏ£º

1.¿ªÆôMiddleware£¨Kafka£©

2.´´½¨Í¨µÀ²¢ÓëBinder°ó¶¨£¨@EnableBinding£©

3.±àд²Ù×÷ͨµÀµÄ´úÂë

4.ÔÚÅäÖÃÎļþÉÏÅäÖÃÄ¿µÄµØ£¬×飬MiddlewareµÄµØÖ·£¬¶Ë¿ÚµÈµÈ

ʹÓÃSpring Cloud Stream

1 ÉùÃ÷ºÍ°ó¶¨Í¨µÀ£¨channel£©

1.1 ÉùÃ÷ͨµÀ

Spring Cloud Stream ¿ÉÒÔÓÐÈÎÒâÊýÁ¿µÄͨµÀ¡£ÉùÃ÷ͨµÀµÄ·½Ê½ºÜ¼òµ¥¡£ÏÂÃæÏȸø³ö֮ǰ˵¹ýµÄSink£¬Source£¬Processor½Ó¿ÚµÄÔ´Â룺

public interface Sink {

String INPUT = "input";

@Input(Sink.INPUT)
SubscribableChannel input();

}

public interface Source {

String OUTPUT = "output";

@Output(Source.OUTPUT)
MessageChannel output();

}

public interface Processor extends

Source, Sink {

}

¼òµ¥°É£¬¾ÍÊÇʹÓÃÁË@InputºÍ@Output×¢½âÁË·½·¨¡£ÆäÖÐ@Input×¢½âµÄ·½·¨·µ»ØµÄÊÇSubscribableChannel£¬@Output×¢½âµÄ·½·¨·µ»ØµÄÊÇMessageChannel¡£

ÉùÃ÷ͨµÀ£¨channel£©µÄ·½·¨¾ÍÊÇʹÓÃ@InputºÍ@Output×¢½â·½·¨¡£ÄãÏëÒª¶àÉÙͨµÀ¾Í×¢½â¶àÉÙ·½·¨¡£

¸øÍ¨µÀÃüÃû

ĬÈÏÇé¿öÏ£¬Í¨µÀµÄÃû³Æ¾ÍÊÇ×¢½âµÄ·½·¨µÄÃû³Æ£¬ÀýÈ磺

@Input
public SubscribableChannel yyy();

ÄÇô¸ÃͨµÀµÄÃû³Æ¾ÍÊÇyyy¡£Ò²Äܹ»×Ô¼º¶¨ÒåͨµÀÃû³Æ¡£Ö»ÐèÒª¸ø@InputºÍ@Output×¢½â´«ÈëStringÀàÐͲÎÊý¾Í¿ÉÒÔÁË£¬´«ÈëµÄ²ÎÊý¾ÍÊǸÃͨµÀÁËÃû³Æ¡£ÀýÈ磺

@Input("zzz")
public SubscribableChannel yyy();

ͨµÀµÄÃû³Æ¾Í±ä³ÉÁËzzz¡£

1.2 ´´½¨ºÍ°ó¶¨Í¨µÀ

Ö»ÐèҪʹÓÃ@EnableBinding¾ÍÄÜ´´½¨ºÍ°ó¶¨Í¨µÀ£¨channel£©¡£

@EnableBinding(value={Sink.class,Source.class})

@EnableBinding×¢½â½ÓÊյIJÎÊý¾ÍÊÇʹÓÃ@Input»òÕß@Output×¢½âÉùÃ÷ÁËͨµÀ£¨channel£©µÄ½Ó¿Ú¡£Spring Cloud Stream»á×Ô¶¯ÊµÏÖÕâЩ½Ó¿Ú¡£

ÉÏÎÄÖÐ˵¹ý£¬@InputºÍ@Output×¢½âµÄ·½·¨ÓÐÏàÓ¦µÄ·µ»ØÖµ£¬ÕâЩ·µ»ØÖµ¾ÍÊǶÔÓ¦µÄͨµÀ£¨channel£©¶ÔÏó¡£ÒªÊ¹ÓÃͨµÀ£¨channel£©Ê±£¬¾ÍÖ»Òª»ñÈ¡µ½Spring Cloud Stream¶ÔÕâЩ½Ó¿ÚµÄʵÏÖ£¬ÔÙµ÷ÓÃ×¢½âµÄ·½·¨»ñÈ¡µ½Í¨µÀ£¨channel£©¶ÔÏó½øÐвÙ×÷¾Í¿ÉÒÔÁË¡£ÈçºÎ»ñÈ¡½Ó¿ÚµÄʵÏÖÏÂÎĻὲ¡£

°ó¶¨Í¨µÀ£¨channel£©ÊÇÖ¸½«Í¨µÀ£¨channel£©ºÍBinder½øÐа󶨡£ÒòΪMiddleware²»Ö»Ò»ÖÖ£¬ÀýÈçÓÐKafka£¬RabbitMQ¡£²»Í¬µÄMiddlewareÓв»Í¬µÄBinderʵÏÖ£¬Í¨µÀ£¨channel£©ÓëMiddlewareÁ¬½ÓÐèÒª¾­¹ýBinder£¬ËùÒÔͨµÀ£¨channel£©ÒªÓëÃ÷È·µÄBinder°ó¶¨¡£

Èç¹ûÀà·¾¶ÏÂÖ»ÓÐÒ»ÖÖBinder£¬Spring Cloud Stream»áÕÒµ½²¢°ó¶¨Ëü£¬²»ÐèÒªÎÒÃǽøÐÐÅäÖá£Èç¹ûÓжà¸ö¾ÍÐèÒªÎÒÃÇÃ÷È·ÅäÖÃÁË£¬ÅäÖ÷½Ê½ÏÂÎĻὲ¡£ÕâÀïÖ»ÐèÒªÖªµÀ@EnableBindingÄܰïÎÒÃÇ×Ô¶¯ÊµÏÖ½Ó¿Ú£¬´´½¨Í¨µÀºÍʵÏÖͨµÀÓëBinderµÄ°ó¶¨¾Í¿ÉÒÔÁË¡£

»ñÈ¡°ó¶¨Á˵ÄͨµÀ

ʹÓÃÁË@EnableBinding×¢½âºó£¬Spring Cloud Stream ¾Í»á×Ô¶¯°ïÎÒÃÇʵÏÖ½Ó¿Ú¡£ÄÇô£¬¿ÉÒÔͨ¹ýSpringÖ§³ÖµÄÈκÎÒ»ÖÖ·½Ê½»ñÈ¡½Ó¿ÚµÄʵÏÖ£¬ÀýÈç×Ô¶¯×¢È룬getBeanµÈ·½Ê½£¬ÏÂÃæ¸ø³ö¹Ù·½Àý×Ó£º

@Component
public class SendingBean {

private Source source;

@Autowired
public SendingBean(Source source) {
this.source = source;
}

public void sayHello(String name) {
source.output().send(MessageBuilder.with

Payload(name).build());
}
}

Ò²Äܹ»Ö±½Ó×¢ÈëͨµÀ£¨channel£©

@Component
public class SendingBean {

private MessageChannel output;

@Autowired
public SendingBean(MessageChannel output) {
this.output = output;
}

public void sayHello(String name) {
output.send(MessageBuilder.withPay

load(name).build());
}
}

Èç¹ûÄã¸øÍ¨µÀÃüÃûÁË£¬ÐèҪʹÓÃ@Qualifier×¢½âÖ¸Ã÷ͨµÀÃû³Æ

@Component
public class SendingBean {

private MessageChannel output;

@Autowired
public SendingBean(@Qualifier("customOutput") MessageChannel output) {
this.output = output;
}

public void sayHello(String name) {
this.output.send(MessageBuilder.with

Payload(name).build());
}
}

2 Éú²úºÍÏû·ÑÏûÏ¢

2.1 Éú²úÏûÏ¢

Ò»ÖÖ·½Ê½Êǵ÷ÓÃͨµÀ£¨channel£©µÄsned·½·¨·¢²¼ÏûÏ¢¡£»¹ÓоÍÊÇʹÓÃSpring IntergrationµÄ·½Ê½Éú²úÊý¾Ý

@EnableBinding(Source.class)
public class TimerSource {

@Value("${format}")
private String format;

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
}
}

Spring Cloud StreamÊǼ̳ÐSpring IntergrationµÄ£¬ËùÓÐSpring Cloud Stream ÌìȻ֧³ÖSpring IntergrationµÄ¶«Î÷¡£

2.2 Ïû·ÑÏûÏ¢

Ò»ÖÖ·½Ê½ÊÇÇ°Ãæ¿ìËÙ¿ªÊ¼ÖеÄÄÇÑù×¢²á´¦Àíº¯Êý£¬ÕâÀï²»ÔÙ׸Êö£¬ÏÂÃæ½«ÊÇʹÓÃ@StreamListener×¢½â¶ÔÏûÏ¢½øÐд¦Àí

ʹÓÃ@StreamListenerµÄÀý×Ó

@EnableBinding(value = { Processor.class })
@SpringBootApplication
public class App {

public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(App.class);
// ·¢ËÍÏûÏ¢
System.out.println("·¢Ëͽá¹û£º" + write(context));
}

@StreamListener(Sink.INPUT)
public void handler(String message) {
System.out.print("¹·×ÓÊÕµ½ÏûÏ¢£º" + message);
}

// ·¢ËÍÏûÏ¢
public static boolean write(ConfigurableApplicationContext context) {
Service service = context.getBean(Service.class);
return service.write("¹·×ÓÔÚÂð?");
}
}

ÕâÊÇ¿ìËÙ¿ªÊ¼µÄÀý×Ó£¬ÔÚÕ⽫ÏÂÃæµÄ´úÂëÈ¥µô£¬»»³É@StreamListener

public static boolean setHander(ConfigurableApplicationContext context) {
Service service = context.getBean(Service.class);
return service.subscribe(result -> {
System.out.print("¹·×ÓÊÕµ½ÏûÏ¢£º" + result.getPayload());
});
}

@StreamListener½ÓÊյIJÎÊýÊÇÒª´¦ÀíµÄͨµÀ£¨channel£©µÄÃû£¬Ëù×¢½âµÄ·½·¨¾ÍÊÇ´¦Àí´ÓͨµÀ»ñÈ¡µ½µÄÊý¾ÝµÄ·½·¨¡£·½·¨µÄ²ÎÊý¾ÍÊÇ»ñÈ¡µ½µÄÊý¾Ý¡£

ÏûÏ¢ÊÇ´øÓÐHeaderµÄ£¬ÀàËÆHttpµÄheadler£¬ÉÏÃæÓÐcontentTypeÊôÐÔÖ¸Ã÷ÏûÏ¢ÀàÐÍ¡£Èç¹ûcontentTypeÊÇapplication/json£¬ÄÇô@Streamlistener»á×Ô¶¯½«Êý¾Ýת»¯³É@StreamListener×¢½âµÄ·½·¨µÄ²ÎÊýµÄÀàÐÍ¡£

¿ÉÒÔÊÇ@Header£¬@Headers×¢½â»ñÈ¡ÏûÏ¢µÄHeader

@StreamListener(target=Sink.INPUT)
public void handler1(Message message,@Header(name="contentType") Object header) {
System.out.print("¹·×ÓÊÕµ½messageÏûÏ¢£º" + message.getMessage());
System.out.print("ÏûÏ¢header£º" + header);
}

Ó÷¨ÈçÉÏ£¬Ê¹ÓÃ@Header»òÕß@Headers×¢½â·½·¨µÄ²ÎÊý£¬Ö¸Ã÷ÈÃSpring Cloud Stream½«ÏûÏ¢µÄHeader´«Èë¶ÔÓ¦µÄ²ÎÊý¡£

@HeaderºÍ@HeadersµÄÇø±ð¾ÍÊÇÒ»¸öÊÇ»ñÈ¡µ¥¸öÊôÐÔ£¬ÐèÒªÖ¸Ã÷ÄĸöÊôÐÔ£¬Ò»¸öÊÇ»ñȡȫ²¿ÊôÐÔ¡£

@StreamListener(target=Sink.INPUT)
public void handler1(Message message,@Headers Map<String,Object> header) {
System.out.print("¹·×ÓÊÕµ½messageÏûÏ¢£º" + message.getMessage());
System.out.print("ÏûÏ¢header£º" + header);
}

ʵ¼ÊÉÏ»¹ÓÐһЩע½âÊÇ@PayLoadºÍ@PayLoads£¬¿´Ãû×Ö¾ÍÖªµÀÊÇ»ñÈ¡ÏûÏ¢ÄÚÈݵ쬾ßÌåÓ÷¨ºÍ×¢ÒâÊÂÏîSpring Cloud Stream ¹Ù·½ÎĵµÉÏû½²£¬Õⲿ·ÖÄÚÈÝÒԺ󲹳䡣

×¢Ò⣺Èç¹û@StreamListener×¢½âµÄ·½·¨Óзµ»ØÖµ£¬ÄÇô±ØÐëʹÓÃ@SendTo×¢½âÖ¸Ã÷·µ»ØµÄֵдÈëÄĸöͨµÀ

@EnableBinding(Processor.class)
public class TransformProcessor {

@Autowired
VotingService votingService;

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
}

ʹÓÃ@StreamListener½«ÏûÏ¢·Ö·¢¸ø¶à¸ö·½·¨

ÈôÏëʹÓÃÏûÏ¢·Ö·¢µÄ¹¦ÄÜ£¬·½·¨±ØÐëÏÈÂú×ãÒ»ÏÂÌõ¼þ£º

1.ûÓзµ»ØÖµ

2.·½·¨Êǵ¥¶ÀµÄÏûÏ¢´¦Àí·½·¨(Ô­ÎÄ£ºit must be an individual message handling method (reactive API methods are not supported))

·Ö·¢µÄÌõ¼þÔÚ×¢½âµÄ¡°condition¡±ÊôÐÔÖÐÖ¸Ã÷£¬¶øÇÒÌõ¼þÊÇÓÉSpEL±í´ïʽ±àдµÄ¡£ËùÓÐÆ¥ÅäÌõ¼þµÄ´¦Àíº¯Êý½«»áÔÚÏàͬµÄÏß³ÌÖÐÎ޹̶¨Ë³ÐòµÄµ÷Óá£

ÏÂÃæ¸ø³öÒ»¸öÀý×Ó£¨ÓÉ¿ìËÙ¿ªÊ¼ÖÐÀý×ÓÐ޸ĶøÀ´£©£º

ÏÂÃæÕâ¸öÀý×ÓÖУ¬pom.xml£¬ºÍapplication.propertiesÓë¿ìËÙ¿ªÊ¼µÄÒ»Ñù¡£

//Ïȶ¨ÒåÁ½¸öDTO
public class Message {
private String message;
private Integer all;

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public Integer getAll() {
return all;
}

public void setAll(Integer all) {
this.all = all;
}

}

public class Error {

private String error;

public String getError() {
return error;
}

public void setError(String error) {
this.error = error;
}

}

½Ó×ÅÊÇ·â×°Á˵ÄͨµÀ£¨channel£©²Ù×÷µÄService¡£Óë¿ìËÙ¿ªÊ¼µÄÀý×Ó²»Í¬µÄÊÇ£¬Õâ¸öÀï´´½¨ÏûϢʱÉèÖõÄHeaderµÄ¡°contentType¡±ÊôÐÔ£¬ÖµÎªÏûϢЯ´øµÄÊý¾ÝµÄClassµÄSimpleName¡£

@Component
public class Service {

@Autowired
private Processor processor;

public boolean write(Object data) {
return processor.output().send(
MessageBuilder.withPayload(data)

.setHeader("contentType", data.getClass().getSimpleName()).build());
}
}

×îºóÊÇAppÀà¡£ÕâÀཫ·¢²¼ÁËÁ½´ÎÏûÏ¢£¬·Ö±ðÊÇMessageÀàÐ͵ĺÍErrorÀàÐ͵ġ£²¢ÇÒʹÓÃ@StreamListener×¢½âÁËÈý¸ö·½·¨£¬¶¼ÉèÖÃÁËconditionÊôÐÔÖ¸Ã÷·Ö·¢Ìõ¼þ¡£

@EnableBinding(value = { Processor.class })
@SpringBootApplication
public class App {

public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(App.class);
// ·¢ËÍÏûÏ¢
Message message = new Message();
message.setAll(200);
message.setMessage("¹·×ÓÔÚÂð£¿");

Error error = new Error();
error.setError("´íÎóºô»½£¡");
write(context, message);
write(context, error);
}

@StreamListener(target = Sink.INPUT, condition = "headers['contentType']=='Message'")
public void handler1(@Payload Message message, @Header("contentType") String header) {
System.out.println("¹·×ÓÊÕµ½messageÏûÏ¢1£º" + message.getMessage());
}

@StreamListener(target = Sink.INPUT, condition = "headers['contentType']=='Error'")
public void handler2(Error message) {
System.out.print("¹·×ÓÊÕµ½errorÏûÏ¢2£º" + message.getError());
}

@StreamListener(target = Sink.INPUT, condition = "headers['contentType']=='Message'")
public void handler3(@Payload Message message, @Header("contentType") String header) {
System.out.println("¹·×ÓÊÕµ½messageÏûÏ¢3£º" + message.getMessage());
}

// ·¢ËÍÏûÏ¢
public static boolean write(ConfigurableApplicationContext context, Object data) {
Service service = context.getBean(Service.class);
return service.write(data);
}
}

Êä³ö½á¹û£º

¹·×ÓÊÕµ½messageÏûÏ¢1£º¹·×ÓÔÚÂð£¿
¹·×ÓÊÕµ½messageÏûÏ¢3£º¹·×ÓÔÚÂð£¿
¹·×ÓÊÕµ½errorÏûÏ¢2£º´íÎóºô»½£¡

¿ÉÒÔ¿´µ½Æ¥ÅäÁË¡°contentType=Message¡±µÄÁ½¸ö·½·¨¶¼Ö´ÐÐÁË£¬Æ¥ÅäÁË¡°contentType=error¡±µÄ·½·¨Ò²Ö´ÐÐÁË¡£

ÕâÀïÎÒÔÙ²¹³äÒ»µãÎÒʹÓÃʱÓöµ½µÄÎÊÌâ

Èç¹ûÎÒ°ÑHeaderÉèÖÃÒ»¸öÊôÐÔ¡°type=XXX¡±£¬µ«»ñÈ¡µ½ÏûÏ¢µÄʱºò£¬HeaderÉϲ¢Ã»ÓÐÕâ¸öÊôÐÔ¡£¼òµ¥³¢ÊÔÁËһЩ£¬·¢ÏÖÖ»ÄÜÐÞ¸ÄÏÖÓÐÊôÐÔ£¨ÀýÈçcontentType£©£¬²»ÄÜÌí¼ÓÐÂÊôÐÔ¡£

2.3 ¾ÛºÏ

2.3.1 ʹÓÃÏÞÖÆ

Spring Cloud Stream Ö§³Ö¾ÛºÏ¶à¸öÓ¦ÓõŦÄÜ¡£Õâ¸ö¹¦ÄÜ¿ÉÒÔÖ±½ÓÁ¬½Ó¶à¸öÓ¦ÓõÄÊäÈ룬Êä³öͨµÀ£¬±ÜÃâͨ¹ý´úÀí£¨Ö¸Kafka£¬RabbitMQÕâЩMiddleware£©½»»»ÏûϢʱ´øÀ´µÄ¶îÍâºÄ·Ñ¡£µ½1.0°æµÄSpring Cloud StreamΪֹ£¬¾ÛºÏ¹¦ÄܽöÖ§³ÖÏÂÁÐÓ¦Óãº

1.Ö»Óе¥¸öÊä³öͨµÀ£¬²¢ÇÒÃüÃûΪoutputµÄÓ¦Ó㨾ÍÊÇSource£©

2.Ö»Óе¥¸öÊäÈëͨµÀ£¬²¢ÇÒÃüÃûΪinputµÄÓ¦Ó㨾ÍÊÇSink£©

3.Ö»ÓÐÒ»¸öÊä³öͨµÀºÍÒ»¸öÊäÈëͨµÀ²¢ÇÒÃüÃûΪoutputºÍinputµÄÓ¦Ó㨾ÍÊÇProcessor£©

ÒÔÉÏÊǹٷ½ÎĵµÔ­»°£¬¸öÈ˾õµÃºÜ¼¦ÀߵŦÄÜ£¬Ò²ÐíÎÒÓõÃÉÙ°É¡£

¾ß±¸ÒÔÉÏÌØÕ÷µÄÓ¦ÓþͿÉÒÔʹÓÃSpring Cloud StreamµÄ¾ÛºÏ¹¦Äܽ«¶à¸öÓ¦¸ÃÁ¬½Ó³ÉÒ»´®»¥ÏàÁ¬½ÓµÄÓ¦Óá£

ÕâÀﻹÓм¸¸öÏÞÖÆ£¬ÆðʼµÄÓ¦ÓñØÐëÊÇSource»òÕßProcessor£¬½áÊøµÄÓ¦ÓñØÐëÊÇSink»òÕßProcessor¡£ÖмäµÄÓ¦ÓñØÐëÊÇProcessor£¬²»¹ý¿ÉÒÔÓÐÈÎÒâÊýÁ¿µÄProcessor¡££¨Soruce£¬Sink£¬Processor¾ÍÊÇÖ¸¾ß±¸ÉÏÃæËùËµÌØÕ÷µÄÓ¦Óã©

2.3.2 Àý×Ó

ÏÂÃæ¸ø³ö¹Ù·½Àý×Ó£¬ÏÈ˵Ã÷¼¸¸ö×¢Òâµã£º

1.ÏÂÃæÀý×ÓÖÐÓÐÈý¸öÓ¦Ó÷ֱðÊÇSource£¬Sink£¬Processor£¬ÕâÈý¸öÓ¦ÓÿÉÒÔ·Ö²¼ÔÚ²»Í¬ÏîÄ¿ÖУ¬Ò²ÄÜÔÚÏàͬÏîÄ¿ÖС£ÐèҪעÒâµÄÊÇ£¬Èç¹ûÔÚÏàͬÏîÄ¿ÖУ¬Ó¦¸ÃÒª´¦ÓÚ²»Í¬µÄ°üÖУ¬Èç¹ûͬ¸ö°ü£¬¶à¸ö@SpringBootApplication×¢½â»áµ¼Ö±¨´í

2.ʹÓÃ@Transformer×¢½âÐèÒªÖ¸Ã÷inputChannelºÍoutputChannelÊôÐÔ¡£¹Ù·½ÎĵµµÄÀý×ÓÉÏÊÇûÓÐÖ¸Ã÷µÄ£¬µ«ÎÒÔËÐеÄʱºòÈç¹û²»Ö¸Ã÷¾Í²»Äܽ«¶à¸öÓ¦ÓÃÁ¬ÔÚÒ»Æð¡£

3.²»ÒªÊ¹ÓÃeclipseÖеÄSpring BootÓ¦ÓõIJå¼þÔËÐУ¬Ê¹Óòå¼þÔËÐлᱨעÒâµã1µÄ´íÎó£¬Ô­ÒòÊÇʲô²»Çå³þ¡£

//Source
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {

@InboundChannelAdapter(value = Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}
//Processor
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {

@Transformer(inputChannel=Sink.INPUT

,outputChannel=Source.OUTPUT)
public String loggerSink(String payload) {
return payload.toUpperCase();
}
}
//Sink
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

@StreamListener(Sink.INPUT)
public void loggerSink(Object payload) {
System.out.println("Received: " + payload);
}
}

ÉÏÃæÊÇÈý¸öÓ¦Óã¬ÏÂÃæÊǽ«Èý¸öÓ¦ÓÃÁ¬½ÓÆðÀ´µÄ´úÂë¡£

@SpringBootApplication
public class App {

public static void main(String[] args) {
new AggregateApplicationBuilder().from(Source

Application.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class).to(SinkApp

lication.class).args("--debug=true").run(args);
}
}

´úÂëºÜ¼òµ¥£¬¾ÍÊÇʹÓÃAggregateApplicationBuilder½«Èý¸öÓ¦ÓÃÁ¬½ÓÆðÀ´¡£.args("XXX")Õâ¶Î´úÂëµÄ×÷ÓþÍÊÇΪ¶ÔÓ¦µÄÓ¦Óô«µÝÔËÐÐʱ²ÎÊý¡£

2.3.3 ²»Í¬Á¬½ÓÇé¿öϵÄBinder°ó¶¨

ÓÉÓÚÏÞÖÆ¶à¶à£¬¿ÉÒÔÇî¾Ù³öËùÓеĿÉÄÜÁ¬½Ó£¬ÏÂÃæ¸ø³ö²»Í¬Á¬½ÓÓëBinderµÄ°ó¶¨Çé¿ö£º

1.Èç¹ûÒÔSourceÓ¦ÓÿªÊ¼²¢ÇÒÒÔSinkÓ¦ÓýáÊø£¬ÄÇôӦÓüäµÄÁ¬½ÓÊÇÖ±½Ó½øÐе쬲»»á¾­¹ý´úÀí£¨Ö¸Kafka£¬RabbitMQÕâЩMiddleware£©£¬Ò²¾Í²»»áÓëBinder°ó¶¨¡£ÀýÈçÉÏÃæµÄÀý×Ó£¬Äã°ÑʹÓõÄMiddleware¹Ø±Õ£¬ÀýÈçÎÒʹÓõÄÊÇKafka£¬ÎÒ°ÑKafka¹ØÁË£¬Ó¦ÓÃÒ²ÄÜÅÜÆðÀ´¡£

2.Èç¹ûÒÔProcessorÓ¦ÓÿªÊ¼£¬ÄÇôÕâ¸öÓ¦ÓõÄinputͨµÀ¾ÍÊÇÕâÒ»´®Ò»ÑùµÄinputͨµÀ£¬ÕâÖÖÇé¿öÏ£¬»á´¥·¢inputͨµÀÓëBinderµÄ°ó¶¨¡£

3.Èç¹ûÒÔProcessorÓ¦ÓýáÊø£¬ÄÇôÕâ¸öÓ¦ÓõÄoutputͨµÀ¾ÍÊÇÕâ´®Ó²Ó²µÄoutputͨµÀ£¬»á´¥·¢outputͨµÀÓëBinderµÄ°ó¶¨¡£

2.3.4 ÅäÖþۺϵÄÓ¦ÓÃ

Spring Cloud Stream Ö§³ÖΪ¾ÛºÏÔÚÒ»ÆðµÄ¶à¸öÓ¦ÓÃÖеÄÒ»¸öÓ¦Óô«µÝ²ÎÊý¡£

ΪӦÓÃÃüÃûnamespaceºó£¬¾Í¿ÉÒÔͨ¹ýÃüÁîÐУ¬»·¾³±äÁ¿µÈ·½Ê½¸øÓ¦Óô«µÝ²ÎÊý¡£

public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApplication.class).namespace("from").

args("--fixedDelay=20000")
.via(ProcessorApplication.class).namespace("via")
.to(SinkApplication.class).namespace("to").args("--debug=true").run(args);
}

Õâ¶Ë´úÂëºÍÇ°ÃæµÄÀý×Óû̫´ó²î±ð£¬Ö»ÊǶàÁË.namespace()£¬Õâ¶Î´úÂë¾ÍÊÇΪӦÓÃÉèÖÃnamesapce¡£

½Ó×ÅÊǾۺÏÔÚÒ»ÆðµÄÓ¦ÓõĴúÂ룺

//»ñÈ¡´«ÈëµÄ²ÎÊý
@Value("${fixedDelay:null}")
private String args;

@InboundChannelAdapter(value = Source.OUTPUT)
public String timerMessageSource() {
//Êä³ö²ÎÊý
System.out.println("Source get args:"+args);
return new SimpleDateFormat().format(new Date());
}

ÕâÀïÖ»¸ø³öÒ»¸ö£¬ÆäËûÀàËÆ£¬¶¼ÊǼÓÁË»ñÈ¡²ÎÊýºÍÊä³ö²ÎÊýµÄ´úÂë¡£

½Ó×Å´ò°üºóÒÔÏÂÁÐÃüÁîÔËÐУº

java -jar stream-aggregation.jar

Êä³ö£º

Source get args:20000
Processor get args:null
Sink get args:null
Received: 17-12-14 ÏÂÎç5:43

¿ÉÒÔ¿´µ½£¬ÒòΪProcessorÊÇûÓÐfixedDelay²ÎÊýµÄ£¬ËùÓÐÊä³önull

ÒÔÏÂÁÐÃüÁîÔËÐУº

java -jar stream.jar via --fixedDelay=200

Êä³ö£º

Source get args:20000
Processor get args:200
Sink get args:null
Received: 17-12-14 ÏÂÎç5:46

¿ÉÒÔ¿´µ½£¬Êä³öΪ200£¬¾ÍÊÇÎÒÃÇ´«ÈëµÄ²ÎÊý£¬¶øSinkºÍSourceµÄÊä³öû±ä£¬Ò²¾ÍÊÇû¸Ä±äËüÃǵIJÎÊý

×ܽáһϣº

1.ÔÚ¾ÛºÏʱºòÉèÖÃnamespace

2.ÔÚÃüÁîÐлòÕß»·¾³±äÁ¿µÈ·½Ê½Ê¹ÓÃnamespaceΪָ¶¨Ó¦Óô«µÝ²ÎÊý

BinderÒÔ¼°ÅäÖÃ

ÕâÀïÔٷųöÓ¦ÓÃÄ£ÐÍͼ¡£Binder¼òµ¥µÄÀí½â¾ÍÊÇ·â×°Á˶ÔÏûϢϵͳ£¨kafka£¬rabbitMQ£©µÄ²Ù×÷¡£¿ÉÒÔʹÓÿª·¢Õß¼òµ¥µÄÅäÖþÍÄÜʹÓÃÏûϢϵͳµÄ·¢²¼/¶©ÔÄ£¬µã¶Ôµã´«Ê䣬·Ö×飬·ÖÇøµÈµÈ¹¦ÄÜ¡£ÊÇ¿ª·¢Õß¿ª·ÅʱÄܺöÂÔ¶ÔÏûϢϵͳ²Ù×÷µÄϸ½Ú¡£µ±È»£¬ÕâЩ×é¼þµÄÉè¼ÆÒ»°ãÊdzéÏó³öÒ»¸ö½Ó¿Ú£¬È»ºó¶Ô²»Í¬µÄÏûϢϵͳÓв»Í¬µÄʵÏÖ£¬ÕâЩ¶«Î÷ÕâÀï²»½²£¬Ö»½²ÔõôÓá£

1 BinderʵÏÖÀàµÄ¼ì²â

1.1 µ¥¸öBinderʵÏÖÀà

Èç¹ûÔÚÀà·¾¶ÉÏÖ»ÓÐÒ»¸öBinderµÄʵÏÖÀࣨÀýÈçÄãÔÚmavenÏîÄ¿ÖУ¬Ö»Ìí¼ÓÁËkafkaµÄBinderµÄʵÏÖµÄÒÀÀµ£©£¬ÄÇôSpring Cloud Stream»áĬÈÏʹÓÃÕâ¸öʵÏÖÀ࣬ËùÓеÄͨµÀ£¨Channel£©¶¼»á°ó¶¨Õâ¸öBinder¡£¾ÍÏñÇ°ÃæµÄÀý×ÓÄÇÑù£¬Ä㼸ºõ¸Ð¾õ²»µ½BinderµÄ´æÔÚ£¬ÄãÖ»ÐèÒªÅäÖÃÒ»ÏÂͨµÀ£¨Channel£©µÄÄ¿µÄµØ£¨destination£©£¬·Ö×飨group£©£¬·ÖÇø£¨partition£©µÈÐÅÏ¢¾Í¿ÉÒÔʹÓá£ÀýÈç¿ìËÙ¿ªÊ¼µÄÀý×ÓÖоͽö½öÅäÖÃÁËÊäÈ룬Êä³öͨµÀµÄÄ¿µÄµØ

spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.output.destination=test

1.2 ¶à¸öBinderʵÏÖÀà

Èç¹ûÓжà¸öBinderʵÏÖÀ࣬ÄÇô¾Í±ØÐëÖ¸Ã÷ÄĸöͨµÀ£¨Channel£©°ó¶¨ÄĸöBinder¡£ÅäÖõķ½Ê½¾ÍÊÇÔÚapplication.peoperties»òÕßapplication.yamlÅäÖÃÎļþÉÏÌí¼ÓÒ»ÏÂÄÚÈÝ£º

spring.cloud.stream.bindings.ͨµÀÃû³Æ.binder=BinderÃû³Æ

ÕâÑù¾ÍÄÜÖ¸Ã÷ʲôͨµÀ°ó¶¨ÄĸöBinderÁË¡£

µ±È»£¬ÄãÒ²¿ÉÒÔÅäÖÃĬÈϵÄBinder

spring.cloud.stream.defaultBinder=BinderÃû³Æ

¹ØÓÚBinderµÄÃû³Æ

ÔÚÿ¸öBinderʵÏÖµÄjar°üµÄMETA-INFĿ¼Ï¶¼»áÓÐÒ»¸öspring.bindersÎļþ¡£¸ÃÎļþÊÇÒ»¸ö¼òµ¥µÄµ¥ÊôÐÔÎļþ£¬ÀýÈçrabbitMQµÄBinderµÄʵÏÖµÄspring.bindersÎļþµÄÄÚÈÝÈçÏ£º

rabbit:\org.springframework.cloud.stream

.binder.rabbit.config.RabbitServiceAuto

Configuration

Ç°ÃæµÄkey²¿·Ö£¨ÕâÀïÊÇrabbit£©¾ÍÊÇBinderµÄÃû³Æ¡£Ò²¾ÍÊÇrabbitMQµÄBinderµÄÃû³Æ¾ÍÊÇrabbit¡£Êµ¼ÊÉÏÏÖÔÚÒ²¾ÍÖ»ÓÐÁ½ÖÖBinderµÄʵÏÖ£¬Ò»¸öÊÇrabbitMQµÄÒ»¸öÊÇkafkaµÄ£¬kafkaµÄBinderµÄÃû³Æ¾ÍÊÇkafka¡£

2 ¿ÉÑ¡ÅäÖÃ

¿ÉÒÔͨ¹ýSpring BootµÄÈÎÒâÅäÖûúÖÆÀ´¶ÔSpring Cloud StreamÓ¦ÓýøÐÐÅäÖã¬ÀýÈçÓ¦ÓòÎÊý£¨application argument£©£¬»·¾³±äÁ¿£¨environment variable£©ÒÔ¼°YAML »òÕß propertiesÎļþ¡£

2.1 ¶ÔÓ¦ÓõÄÅäÖÃ

spring.cloud.stream.instanceCount

Õâ¸öÊÇÅäÖÃÓ¦ÓÃʵÀýµÄÊýÁ¿¡£Èç¹ûʹÓÃkafka£¬±ØÐëÉèÖ÷ÖÇø¡£Ä¬ÈÏֵΪ1

spring.cloud.stream.instanceIndex

ʵÀýµÄ±àºÅ£¬±àºÅ´Ó0¿ªÊ¼¡£

spring.cloud.stream.dynamicDesinations

ÉèÖÃÒ»ÁÐÄ¿µÄµØÓÃÒÔ¶¯Ì¬°ó¶¨¡£Èç¹ûÉèÖÃÁË£¬Ö»ÓÐÁбíÖеÄÄ¿µÄµØÄܱ»°ó¶¨¡£Ä¬ÈÏֵΪ¿Õ¡£

spring.cloud.stream.defaultBinder

ÉèÖõÄĬÈϵÄBinder£¬Õâ¸öÇ°ÃæËµ¹ý£¬²»ÔÙ׸Êö¡£Ä¬ÈÏֵΪ¿Õ¡£

spring.cloud.stream.overrideCloudConnectors

ĬÈÏֵΪfalse¡£µ±ÖµÎªfalseʱ£¬Binder»á¼ì²é²¢Ñ¡ÔñºÏÊʵÄbound ServiceÀ´´´½¨Á¬½Ó¡£µ±ÉèΪtrueµÄʱºò£¬Binder»á°´ÕÕSpring Cloud StreamÅäÖÃÎļþÀ´Ñ¡Ôñbound Service¡£Õâ¸öÅäÖÃͨ³£ÊÇÔÚÐèÒªÁ¬½Ó¶à¸öÏûϢϵͳµÄʱºòÓõ½¡£

2.2 Á¬½Ó£¨Binding£©µÄÅäÖÃ

ÕâÀàÅäÖõĸñʽÈçÏ£º

spring.cloud.stream.bindings.<channelName>

.<property>=<value>

Òâ˼¾ÍÊÇÅäÖÃÃûΪchannelNameµÄͨµÀµÄpropertyÊôÐÔµÄֵΪvalue¡£

ΪÁ˱ÜÃâÖØ¸´ÅäÖã¬Spring Cloud Stream Ò²Ö§³Ö¶ÔÈ«²¿Í¨µÀ£¨channel£©½øÐÐÉèÖá£ÅäÖÃĬÈÏÊôÐԵĸñʽÈçÏ£º

spring.cloud.stream.default.<property>=<value>

2.2.1 ͨÓõÄÅäÖÃ

һϵÄÅäÖÃÊôÐÔ¶¼´øÓС°spring.cloud.stream.bindings.<channelName>¡±Ç°×º£¬Îª·½±ãÎÄ×ÖÅŰ棬ʡÂÔǰ׺¡£

destination

ͨµÀ£¨channel£©ÓëÏûϢϵͳÁ¬½ÓµÄÄ¿µÄµØ£¨ÈôÏûϢϵͳÊÇRabbitMQ£¬Ä¿µÄµØ£¨destination£©¾ÍÊÇÖ¸exchange£¬ÏûϢϵͳÊÇKafka£¬ÄÇô¾ÍÊÇÖ¸topic£©¡£

¿ÉÒÔÁ¬½Ó¶à¸öÄ¿µÄµØ¡£ÒªÏëÁ¬½Ó¶à¸öÄ¿µÄµØ£¬Ö»ÐèÒªÓá°,¡±½«¶à¸öÄ¿µÄµØ·Ö¿ª¼´¿É¡£ÀýÈ磺

spring.cloud.stream.channelName.

destinaction=destinaction1,destinaction2

group

ÅäÖÃͨµÀµÄÏû·ÑÕß×é¡£½öÓ¦ÓÃÓÚÊäÈëͨµÀ¡£

ĬÈÏֵΪnull

²¹³ä£ºÒ»¸öchannel¿ÉÒÔÁ¬½Ó¶à¸ödestination£¬Í¬Ò»¸ögroupÄÚµÄchannelÁ¬½ÓµÄdestination¿ÉÒÔ²»Í¬¡£

Èç¹ûÒ»¸ögroupÄÚµÄchannelÁ¬½ÓÁËA£¬B£¬CÈý¸ödestination¡£ÄÇôA£¬B£¬CÕâ¸öÈý¸ödestinationµÄÏûÏ¢¶¼»á¿½±´Ò»·Ý·¢¸øÕâ¸ögroup£¬²¢ÇÒÑ¡ÔñÕâ¸ögroupÖÐchannelÏû·ÑÕâ¸öÏûÏ¢¡£ÀýÈ磬Õâ¸ögroupÖеÄa£¬bÁ½¸öchannelÁ¬½Ó²¢ÇÒÖ»Á¬½ÓÁËdestination A£¬channel cÁ¬½ÓÇÒÖ»Á¬½ÓÁËdestination B£¬ÄÇô»áÔÚa£¬bÖÐѡһ¸öÀ´´¦ÀíÀ´×ÔAµÄÏûÏ¢£¬c²»ÔÚÑ¡ÔñµÄ·¶Î§ÄÚ¡£

Èç¹ûÓÐÁ½¸ögroup¶¼Á¬½ÓÁËdestination A£¬ÄÇôAµÄÏûÏ¢»á¿½±´Á½·Ý·Ö±ð·¢¸øÕâÁ½¸ögroup¡£

contentType

ͨµÀ£¨channel£©³ÐÔØµÄÄÚÈݵÄÀàÐÍ¡£

ĬÈÏֵΪnull¡£

binder

Õâ¸öÔÚÇ°Ãæ¡°¶à¸öBinderʵÏÖÀࡱ²¿·Ö½²ÁË¡£

   
5302 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

Java΢·þÎñÐÂÉú´úÖ®Nacos
ÉîÈëÀí½âJavaÖеÄÈÝÆ÷
JavaÈÝÆ÷Ïê½â
Java´úÂëÖÊÁ¿¼ì²é¹¤¾ß¼°Ê¹Óð¸Àý
Ïà¹ØÎĵµ

JavaÐÔÄÜÓÅ»¯
Spring¿ò¼Ü
SSM¿ò¼Ü¼òµ¥¼òÉÜ
´ÓÁ㿪ʼѧjava±à³Ì¾­µä
Ïà¹Ø¿Î³Ì

¸ßÐÔÄÜJava±à³ÌÓëϵͳÐÔÄÜÓÅ»¯
JavaEE¼Ü¹¹¡¢ Éè¼ÆÄ£Ê½¼°ÐÔÄܵ÷ÓÅ
Java±à³Ì»ù´¡µ½Ó¦Óÿª·¢
JAVAÐéÄâ»úÔ­ÀíÆÊÎö