±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄͨ¹ý¶ÔSpring
Cloud Stream ֪ʶÕûÀí £¬ ¼òÒªµÄ½éÉÜÁ˸ÅÄʹÓ÷½·¨ £¬Ï£Íû¶ÔÄúµÄѧϰÓаïÖú¡£
|
|
Spring Cloud Stream ֪ʶÕûÀí
¸ÅÄî
1. ·¢²¼/¶©ÔÄ

¼òµ¥µÄ½²¾ÍÊÇÒ»ÖÖÉú²úÕߣ¬Ïû·ÑÕßģʽ¡£·¢²¼ÕßÊÇÉú²ú£¬½«Êä³ö·¢²¼µ½Êý¾ÝÖÐÐÄ£¬¶©ÔÄÕßÊÇÏû·ÑÕߣ¬¶©ÔÄ×Ô¼º¸ÐÐËȤµÄÊý¾Ý¡£µ±ÓÐÊý¾Ýµ½´ïÊý¾ÝÖÐÐÄʱ£¬¾Í°ÑÊý¾Ý·¢Ë͸ø¶ÔÓ¦µÄ¶©ÔÄÕß¡£
2. Ïû·Ñ×é
Ö±¹ÛµÄÀí½â¾ÍÊÇһȺÏû·ÑÕßÒ»Æð´¦ÀíÏûÏ¢¡£ÐèҪעÒâµÄÊÇ£ºÃ¿¸ö·¢Ë͵½Ïû·Ñ×éµÄÊý¾Ý£¬½öÓÉÏû·Ñ×éÖеÄÒ»¸öÏû·ÑÕß´¦Àí¡£
3. ·ÖÇø
Àà±ÈÓÚÏû·Ñ×飬·ÖÇøÊǽ«Êý¾Ý·ÖÇø¡£¾ÙÀý£ºÄ³Ó¦ÓÃÓжà¸öʵÀý£¬¶¼°ó¶¨µ½Í¬Ò»¸öÊý¾ÝÖÐÐÄ£¬Ò²¾ÍÊDz»Í¬ÊµÀý¶¼½«Êý¾Ý·¢²¼µ½Í¬Ò»¸öÊý¾ÝÖÐÐÄ¡£·ÖÇø¾ÍÊǽ«Êý¾ÝÖÐÐĵÄÊý¾ÝÔÙϸ·Ö³É²»Í¬µÄÇø¡£ÎªÊ²Ã´ÐèÒª·ÖÇø£¿ÒòΪ¼´Ê¹ÊÇͬһ¸öÓ¦Ó㬲»Í¬ÊµÀý·¢²¼µÄÊý¾ÝÀàÐÍ¿ÉÄܲ»Í¬£¬Ò²Ï£ÍûÕâЩÊý¾ÝÓɲ»Í¬µÄÏû·ÑÕß´¦Àí¡£Õâ¾ÍÐèÒª£¬Ïû·ÑÕß¿ÉÒÔ½ö¶©ÔÄÒ»¸öÊý¾ÝÖÐÐĵIJ¿·ÖÊý¾Ý¡£Õâ¾ÍÐèÒª·ÖÇøÕâ¸ö¶«Î÷ÁË¡£
Spring Cloud Stream¼ò½é
1. Ó¦ÓÃÄ£ÐÍ
Spring Cloud StreamÓ¦ÓÃÓɵÚÈý·½µÄÖмä¼þ×é³É¡£Ó¦ÓüäµÄͨÐÅͨ¹ýÊäÈëͨµÀ£¨input
channel£©ºÍÊä³öͨµÀ£¨output channel£©Íê³É¡£ÕâЩͨµÀÊÇÓÐSpring Cloud
Stream ×¢ÈëµÄ¡£¶øÍ¨µÀÓëÍⲿµÄ´úÀí£¨¿ÉÒÔÀí½âΪÉÏÎÄËù˵µÄÊý¾ÝÖÐÐÄ£©µÄÁ¬½ÓÓÖÊÇͨ¹ýBinderʵÏֵġ£

Spring Cloud Stream Ó¦ÓÃÄ£ÐÍ
ÉÏͼ¾ÍÊÇSpring Cloud StreamµÄÓ¦ÓÃÄ£ÐÍ¡£
1.1 ¿É¶ÀÁ¢ÔËÐеÄjar
Spring Cloud StreamÓ¦ÓÿÉÒÔÖ±½ÓÔÚIDEÔËÐС£ÕâÑù»áºÜ·½±ã²âÊÔ¡£µ«ÔÚÉú²ú»·¾³Ï£¬ÕâÊDz»Êʺϵġ£Spring
BootΪmavenºÍGradleÌṩÁË´ò°ü³É¿ÉÔËÐÐjarµÄ¹¤¾ß£¬Äã¿ÉÒÔʹÓÃÕâ¸ö¹¤¾ß½«Spring
Cloud StreamÓ¦Óôò°ü¡£
2. ³éÏóµÄBinder
Binder¿ÉÒÔÀí½âΪÌṩÁËMiddleware²Ù×÷·½·¨µÄÀà¡£Spring Cloud ÌṩÁËBinder³éÏó½Ó¿ÚÒÔ¼°KafKaºÍRabbit
MQµÄBinderµÄʵÏÖ¡£
ʹÓÃSpring Cloud Stream
1. ¿ìËÙ¿ªÊ¼
ÕâÀïÏȷųöÇ°ÃæµÄÓ¦ÓÃÄ£ÐÍͼ

ÏÂÃæÀý×ÓʹÓõÄMiddlewareÊÇKafka£¬°æ±¾ÊÇkafka_2.11-1.0.0¡£KafkaʹÓõÄÊÇĬÈÏÅäÖã¬Ò²¾ÍÊÇ´ÓKafka¹ÙÍøÏÂÔØºÃºóÖ±½Ó´ò¿ª£¬²»¸ü¸ÄÈκÎÅäÖá£
¹ØÓÚpom.xmlÖÐÒÀÀµµÄÏîÄ¿µÄ°æ±¾ÎÊÌ⣬×îºÃ²»¸Ã³É±ðµÄ°æ±¾£¬ÒòΪºÜ´ó¿ÉÄܵ¼Ö°汾³åÍ»¡£
1.1 pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Ditmars.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
|
ÐèҪעÒâµÄÊÇ£º¹ÙÍøÉϵÄÀý×ÓÊÇûÓÐÏÂÃæÅäÖõÄ
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions> |
µ«ÊÇÔÚ±¾È˵çÄÔÉÏÈç¹û²»¼ÓÉÏÉÏÃæÄǶÎÅäÖþÍÊDZ¨´í£¬¶ÁÕß¿ÉÒÔ°´ÕÕ¸öÈËÇé¿öÑ¡Ôñ¼Ó²»¼Ó¡£
¼òµ¥ËµÃ÷Ò»ÏÂÒÔÉÏÅäÖÃ
<parent>...</parent>£ºÕâ¶Î´ú±í¼Ì³Ðspring-boot-starter-parentµÄÅäÖá£ÒòΪSpring
Cloud Stream ÒÀÀµSpring BootµÄ×Ô¶¯ÅäÖã¬ËùÒÔÐèÒª¼ÓÉÏÕâ¶Î¡£
<dependencyManagement>...</dependencyManagement>£ºÕâ¶ÎÊÇÒýÈëspring-cloud-stream-dependencies.pom.xml£¬¸ÃÅäÖÃÎļþÀﺬÓÐSpring
Cloud Stream ÏîÄ¿ÐèҪʹÓõÄjar°üµÄÐÅÏ¢£¨°üÃû¼Ó°æ±¾ºÅ£©
<dependencies>...</dependencies>ÒÀÀµÁ½¸östarter
1.2 App.java
@EnableBinding(value
= { Processor.class })
@SpringBootApplication
public class App {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(App.class);
// ×¢²á´¦Àíº¯Êý
System.out.println("×¢²á½á¹û£º" + setHander(context));
// ·¢ËÍÏûÏ¢
System.out.println("·¢Ëͽá¹û£º" + write(context));
}
// ·¢ËÍÏûÏ¢
public static boolean write(ConfigurableApplicationContext
context) {
Service service = context.getBean(Service.class);
return service.write("¹·×ÓÔÚÂð?");
}
// ×¢²á½ÓÊÕµ½ÏûϢʱµÄ´¦Àíº¯Êý
public static boolean setHander(ConfigurableApplicationContext
context) {
Service service = context.getBean(Service.class);
return service.subscribe(result -> {
System.out.print("¹·×ÓÊÕµ½ÏûÏ¢£º" + result.getPayload());
});
}
} |
ÉÏÃæÊ¹ÓÃÁËÁ½¸ö×¢½â£º@EnableBinding ºÍ @SpringBootApplication¡£@SpringBootApplication
¾Í²»ËµÁË¡£@EnableBinding ×¢½â½ÓÊÕÒ»¸ö²ÎÊý£¬²ÎÊýÀàÐÍÊÇclass¡£ÉÏÃæ´úÂëÖУ¬´«ÈëµÄ²ÎÊýÊÇ¡°Processor.class¡±,ÕâÊÇÒ»¸ö½Ó¿Ú£¬¶¨ÒåÁËÁ½¸öchannel£¬·Ö±ðÊÇinputºÍoutput¡£¿´Ãû³Æ¾ÍÖªµÀ£¬Ò»¸öÊÇÊä³öͨµÀ£¨input
channel£©£¬Ò»¸öÊÇÊä³öͨµÀ£¨output channel£©¡£¡°@EnableBinding(value
= { Processor.class })¡±ÕâÕû¶Î´ú±í´´½¨Processor¶¨ÒåµÄͨµÀ£¬²¢½«Í¨µÀºÍBinder°ó¶¨¡£
PorcessorÊÇSpring Cloud StreamΪ·½±ãʹÓöøÔ¤Ïȶ¨ÒåºÃµÄ£¬³ýÁËProcessor»¹ÓÐSinkºÍSource£¬ÕâЩ½Ó¿Ú¶¨ÒåÁËһЩͨµÀ£¨channel£©£¬ÐèҪʱ£¬Ö±½ÓʹÓþͺá£ÎÒÃÇÒ²ÄÜ×Ô¼º¶¨ÒåͨµÀ£¨channel£©£¬ÈçºÎ¶¨ÒåÏÂÎĻὲ¡£
AppÀàÖеÄmain·½·¨µ÷ÓÃÁËSpringApplication.run£¬½Ó×ŵ÷ÓÃÁËwriteºÍsetHandler·½·¨¡£·½·¨ºÜ¼òµ¥£¬ÉÏÎÄÓÐ×¢ÊÍ£¬²»ÔÙ׸Êö¡£
1.3 Service.java
@Component
public class Service {
@Autowired
private Processor processor;
public boolean write(String data) {
return processor.output().send(MessageBuilder
.withPayload(data).build());
}
public boolean subscribe(MessageHandler handler)
{
return processor.input().subscribe(handler);
}
} |
ÕâÊÇÒ»¸öserviceÀ࣬·â×°ÁËһЩ¶ÔͨµÀµÄ²Ù×÷¡£
ÐèҪעÒâµÄÊÇÕâ¶Î´úÂ룺
@Autowired
private Processor processor; |
Ç°ÃæËµ¹ý£¬ProcessorÊÇÒ»¸ö¶¨ÒåÁËÊäÈëÊä³öͨµÀµÄ½Ó¿Ú£¬²¢Ã»ÓоßÌåʵÏÖ¡£Spring Cloud
Stream»á°ïÎÒÃÇ×Ô¶¯ÊµÏÖËü¡£ÎÒÃÇÖ»ÐèÒª»ñÈ¡Ëü£¬²¢Ê¹ÓÃËü¡£
½Ó×Å¿´
processor.output().send(MessageBuilder
.withPayload(data).build()); |
ÏÈÊǵ÷ÓÃoutput()·½·¨»ñÈ¡Êä³öͨµÀ¶ÔÏ󣬽Ó×ŵ÷ÓÃsend·½·¨·¢ËÍÊý¾Ý¡£send·½·¨½ÓÊÕÒ»¸öMessage¶ÔÏó£¬Õâ¸ö¶ÔÏó²»ÄÜÖ±½Ónew£¬ÐèҪʹÓÃMessageBuilder»ñÈ¡¡£
1.4 application.properties
spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.output.destination=test |
ÉÏÃæÅäÖÃÁËÄ¿µÄµØ£¬Àà±ÈÓÚKafkaµÄTopicºÍRabbitMQµÄ¶ÓÁеĸÅÄî¡£
ÅäÖøñʽÈçÏ£º
spring.cloud.stream.bindings.<channelName>.<key>=value |
channelName¾ÍÊǹܵÀÃû£¬key¾ÍÊǶÔÓ¦ÊôÐÔ£¬ÕâÀïÊÇdestination£¬´ú±íÄ¿µÄµØ¡£
¹ÜµÀÃû£¬keyµÄÆäËû¿ÉѡֵÏÂÎĻὲ£¬ÕâÀﲻҪǿÇóÈ«²¿Åª¶®£¬½Ó×Å¿´¾ÍºÃ¡£
1.4 ×ܽá
ÉÏÃæ¾ÍÊÇÍêÕûµÄÀý×ÓÁË¡£¶Ô±ÈÇ°Ãæ¸ø³öµÄÓ¦ÓÃÄ£ÐÍͼ£¬ÉÏÃæµÄ´úÂëºÍÅäÖÃÎļþ¶¨ÒåÁËApplication
Core£¨´úÂëÖеĴ¦Àíº¯Êý£¬·¢ËÍÏûÏ¢µÄº¯ÊýµÈµÈ£©£¬´´½¨ÁËͨµÀ²¢ºÍBinder°ó¶¨£¨@EnableBinding(value
= { Processor.class })£©¡£Middleware¾ÍÊDZ¾½ÚÒ»¿ªÊ¼ËµµÄKafka¡£Õû¸öÁ÷³Ì´ó¸ÅÈçÏ£º
1.¿ªÆôMiddleware£¨Kafka£©
2.´´½¨Í¨µÀ²¢ÓëBinder°ó¶¨£¨@EnableBinding£©
3.±àд²Ù×÷ͨµÀµÄ´úÂë
4.ÔÚÅäÖÃÎļþÉÏÅäÖÃÄ¿µÄµØ£¬×飬MiddlewareµÄµØÖ·£¬¶Ë¿ÚµÈµÈ
ʹÓÃSpring Cloud Stream
1 ÉùÃ÷ºÍ°ó¶¨Í¨µÀ£¨channel£©
1.1 ÉùÃ÷ͨµÀ
Spring Cloud Stream ¿ÉÒÔÓÐÈÎÒâÊýÁ¿µÄͨµÀ¡£ÉùÃ÷ͨµÀµÄ·½Ê½ºÜ¼òµ¥¡£ÏÂÃæÏȸø³ö֮ǰ˵¹ýµÄSink£¬Source£¬Processor½Ó¿ÚµÄÔ´Â룺
public
interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
public interface Processor extends
Source, Sink {
} |
¼òµ¥°É£¬¾ÍÊÇʹÓÃÁË@InputºÍ@Output×¢½âÁË·½·¨¡£ÆäÖÐ@Input×¢½âµÄ·½·¨·µ»ØµÄÊÇSubscribableChannel£¬@Output×¢½âµÄ·½·¨·µ»ØµÄÊÇMessageChannel¡£
ÉùÃ÷ͨµÀ£¨channel£©µÄ·½·¨¾ÍÊÇʹÓÃ@InputºÍ@Output×¢½â·½·¨¡£ÄãÏëÒª¶àÉÙͨµÀ¾Í×¢½â¶àÉÙ·½·¨¡£
¸øÍ¨µÀÃüÃû
ĬÈÏÇé¿öÏ£¬Í¨µÀµÄÃû³Æ¾ÍÊÇ×¢½âµÄ·½·¨µÄÃû³Æ£¬ÀýÈ磺
@Input
public SubscribableChannel yyy(); |
ÄÇô¸ÃͨµÀµÄÃû³Æ¾ÍÊÇyyy¡£Ò²Äܹ»×Ô¼º¶¨ÒåͨµÀÃû³Æ¡£Ö»ÐèÒª¸ø@InputºÍ@Output×¢½â´«ÈëStringÀàÐͲÎÊý¾Í¿ÉÒÔÁË£¬´«ÈëµÄ²ÎÊý¾ÍÊǸÃͨµÀÁËÃû³Æ¡£ÀýÈ磺
@Input("zzz")
public SubscribableChannel yyy(); |
ͨµÀµÄÃû³Æ¾Í±ä³ÉÁËzzz¡£
1.2 ´´½¨ºÍ°ó¶¨Í¨µÀ
Ö»ÐèҪʹÓÃ@EnableBinding¾ÍÄÜ´´½¨ºÍ°ó¶¨Í¨µÀ£¨channel£©¡£
@EnableBinding(value={Sink.class,Source.class}) |
@EnableBinding×¢½â½ÓÊյIJÎÊý¾ÍÊÇʹÓÃ@Input»òÕß@Output×¢½âÉùÃ÷ÁËͨµÀ£¨channel£©µÄ½Ó¿Ú¡£Spring
Cloud Stream»á×Ô¶¯ÊµÏÖÕâЩ½Ó¿Ú¡£
ÉÏÎÄÖÐ˵¹ý£¬@InputºÍ@Output×¢½âµÄ·½·¨ÓÐÏàÓ¦µÄ·µ»ØÖµ£¬ÕâЩ·µ»ØÖµ¾ÍÊǶÔÓ¦µÄͨµÀ£¨channel£©¶ÔÏó¡£ÒªÊ¹ÓÃͨµÀ£¨channel£©Ê±£¬¾ÍÖ»Òª»ñÈ¡µ½Spring
Cloud Stream¶ÔÕâЩ½Ó¿ÚµÄʵÏÖ£¬ÔÙµ÷ÓÃ×¢½âµÄ·½·¨»ñÈ¡µ½Í¨µÀ£¨channel£©¶ÔÏó½øÐвÙ×÷¾Í¿ÉÒÔÁË¡£ÈçºÎ»ñÈ¡½Ó¿ÚµÄʵÏÖÏÂÎĻὲ¡£
°ó¶¨Í¨µÀ£¨channel£©ÊÇÖ¸½«Í¨µÀ£¨channel£©ºÍBinder½øÐа󶨡£ÒòΪMiddleware²»Ö»Ò»ÖÖ£¬ÀýÈçÓÐKafka£¬RabbitMQ¡£²»Í¬µÄMiddlewareÓв»Í¬µÄBinderʵÏÖ£¬Í¨µÀ£¨channel£©ÓëMiddlewareÁ¬½ÓÐèÒª¾¹ýBinder£¬ËùÒÔͨµÀ£¨channel£©ÒªÓëÃ÷È·µÄBinder°ó¶¨¡£
Èç¹ûÀà·¾¶ÏÂÖ»ÓÐÒ»ÖÖBinder£¬Spring Cloud Stream»áÕÒµ½²¢°ó¶¨Ëü£¬²»ÐèÒªÎÒÃǽøÐÐÅäÖá£Èç¹ûÓжà¸ö¾ÍÐèÒªÎÒÃÇÃ÷È·ÅäÖÃÁË£¬ÅäÖ÷½Ê½ÏÂÎĻὲ¡£ÕâÀïÖ»ÐèÒªÖªµÀ@EnableBindingÄܰïÎÒÃÇ×Ô¶¯ÊµÏÖ½Ó¿Ú£¬´´½¨Í¨µÀºÍʵÏÖͨµÀÓëBinderµÄ°ó¶¨¾Í¿ÉÒÔÁË¡£
»ñÈ¡°ó¶¨Á˵ÄͨµÀ
ʹÓÃÁË@EnableBinding×¢½âºó£¬Spring Cloud Stream ¾Í»á×Ô¶¯°ïÎÒÃÇʵÏÖ½Ó¿Ú¡£ÄÇô£¬¿ÉÒÔͨ¹ýSpringÖ§³ÖµÄÈκÎÒ»ÖÖ·½Ê½»ñÈ¡½Ó¿ÚµÄʵÏÖ£¬ÀýÈç×Ô¶¯×¢È룬getBeanµÈ·½Ê½£¬ÏÂÃæ¸ø³ö¹Ù·½Àý×Ó£º
@Component
public class SendingBean {
private Source source;
@Autowired
public SendingBean(Source source) {
this.source = source;
}
public void sayHello(String name) {
source.output().send(MessageBuilder.with
Payload(name).build());
}
} |
Ò²Äܹ»Ö±½Ó×¢ÈëͨµÀ£¨channel£©
@Component
public class SendingBean {
private MessageChannel output;
@Autowired
public SendingBean(MessageChannel output) {
this.output = output;
}
public void sayHello(String name) {
output.send(MessageBuilder.withPay
load(name).build());
}
} |
Èç¹ûÄã¸øÍ¨µÀÃüÃûÁË£¬ÐèҪʹÓÃ@Qualifier×¢½âÖ¸Ã÷ͨµÀÃû³Æ
@Component
public class SendingBean {
private MessageChannel output;
@Autowired
public SendingBean(@Qualifier("customOutput")
MessageChannel output) {
this.output = output;
}
public void sayHello(String name) {
this.output.send(MessageBuilder.with
Payload(name).build());
}
} |
2 Éú²úºÍÏû·ÑÏûÏ¢
2.1 Éú²úÏûÏ¢
Ò»ÖÖ·½Ê½Êǵ÷ÓÃͨµÀ£¨channel£©µÄsned·½·¨·¢²¼ÏûÏ¢¡£»¹ÓоÍÊÇʹÓÃSpring IntergrationµÄ·½Ê½Éú²úÊý¾Ý
@EnableBinding(Source.class)
public class TimerSource {
@Value("${format}")
private String format;
@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "${fixedDelay}",
maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource()
{
return () -> new GenericMessage<>(new
SimpleDateFormat(format).format(new Date()));
}
} |
Spring Cloud StreamÊǼ̳ÐSpring IntergrationµÄ£¬ËùÓÐSpring
Cloud Stream ÌìȻ֧³ÖSpring IntergrationµÄ¶«Î÷¡£
2.2 Ïû·ÑÏûÏ¢
Ò»ÖÖ·½Ê½ÊÇÇ°Ãæ¿ìËÙ¿ªÊ¼ÖеÄÄÇÑù×¢²á´¦Àíº¯Êý£¬ÕâÀï²»ÔÙ׸Êö£¬ÏÂÃæ½«ÊÇʹÓÃ@StreamListener×¢½â¶ÔÏûÏ¢½øÐд¦Àí
ʹÓÃ@StreamListenerµÄÀý×Ó
@EnableBinding(value
= { Processor.class })
@SpringBootApplication
public class App {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(App.class);
// ·¢ËÍÏûÏ¢
System.out.println("·¢Ëͽá¹û£º" + write(context));
}
@StreamListener(Sink.INPUT)
public void handler(String message) {
System.out.print("¹·×ÓÊÕµ½ÏûÏ¢£º" + message);
}
// ·¢ËÍÏûÏ¢
public static boolean write(ConfigurableApplicationContext
context) {
Service service = context.getBean(Service.class);
return service.write("¹·×ÓÔÚÂð?");
}
} |
ÕâÊÇ¿ìËÙ¿ªÊ¼µÄÀý×Ó£¬ÔÚÕ⽫ÏÂÃæµÄ´úÂëÈ¥µô£¬»»³É@StreamListener
public static boolean setHander(ConfigurableApplicationContext
context) {
Service service = context.getBean(Service.class);
return service.subscribe(result -> {
System.out.print("¹·×ÓÊÕµ½ÏûÏ¢£º" + result.getPayload());
});
} |
@StreamListener½ÓÊյIJÎÊýÊÇÒª´¦ÀíµÄͨµÀ£¨channel£©µÄÃû£¬Ëù×¢½âµÄ·½·¨¾ÍÊÇ´¦Àí´ÓͨµÀ»ñÈ¡µ½µÄÊý¾ÝµÄ·½·¨¡£·½·¨µÄ²ÎÊý¾ÍÊÇ»ñÈ¡µ½µÄÊý¾Ý¡£
ÏûÏ¢ÊÇ´øÓÐHeaderµÄ£¬ÀàËÆHttpµÄheadler£¬ÉÏÃæÓÐcontentTypeÊôÐÔÖ¸Ã÷ÏûÏ¢ÀàÐÍ¡£Èç¹ûcontentTypeÊÇapplication/json£¬ÄÇô@Streamlistener»á×Ô¶¯½«Êý¾Ýת»¯³É@StreamListener×¢½âµÄ·½·¨µÄ²ÎÊýµÄÀàÐÍ¡£
¿ÉÒÔÊÇ@Header£¬@Headers×¢½â»ñÈ¡ÏûÏ¢µÄHeader
@StreamListener(target=Sink.INPUT)
public void handler1(Message message,@Header(name="contentType")
Object header) {
System.out.print("¹·×ÓÊÕµ½messageÏûÏ¢£º"
+ message.getMessage());
System.out.print("ÏûÏ¢header£º" + header);
} |
Ó÷¨ÈçÉÏ£¬Ê¹ÓÃ@Header»òÕß@Headers×¢½â·½·¨µÄ²ÎÊý£¬Ö¸Ã÷ÈÃSpring Cloud Stream½«ÏûÏ¢µÄHeader´«Èë¶ÔÓ¦µÄ²ÎÊý¡£
@HeaderºÍ@HeadersµÄÇø±ð¾ÍÊÇÒ»¸öÊÇ»ñÈ¡µ¥¸öÊôÐÔ£¬ÐèÒªÖ¸Ã÷ÄĸöÊôÐÔ£¬Ò»¸öÊÇ»ñȡȫ²¿ÊôÐÔ¡£
@StreamListener(target=Sink.INPUT)
public void handler1(Message message,@Headers
Map<String,Object> header) {
System.out.print("¹·×ÓÊÕµ½messageÏûÏ¢£º"
+ message.getMessage());
System.out.print("ÏûÏ¢header£º" + header);
} |
ʵ¼ÊÉÏ»¹ÓÐһЩע½âÊÇ@PayLoadºÍ@PayLoads£¬¿´Ãû×Ö¾ÍÖªµÀÊÇ»ñÈ¡ÏûÏ¢ÄÚÈݵ쬾ßÌåÓ÷¨ºÍ×¢ÒâÊÂÏîSpring
Cloud Stream ¹Ù·½ÎĵµÉÏû½²£¬Õⲿ·ÖÄÚÈÝÒԺ󲹳䡣
×¢Ò⣺Èç¹û@StreamListener×¢½âµÄ·½·¨Óзµ»ØÖµ£¬ÄÇô±ØÐëʹÓÃ@SendTo×¢½âÖ¸Ã÷·µ»ØµÄֵдÈëÄĸöͨµÀ
@EnableBinding(Processor.class)
public class TransformProcessor {
@Autowired
VotingService votingService;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
} |
ʹÓÃ@StreamListener½«ÏûÏ¢·Ö·¢¸ø¶à¸ö·½·¨
ÈôÏëʹÓÃÏûÏ¢·Ö·¢µÄ¹¦ÄÜ£¬·½·¨±ØÐëÏÈÂú×ãÒ»ÏÂÌõ¼þ£º
1.ûÓзµ»ØÖµ
2.·½·¨Êǵ¥¶ÀµÄÏûÏ¢´¦Àí·½·¨(ÔÎÄ£ºit must be an individual
message handling method (reactive API methods are
not supported))
·Ö·¢µÄÌõ¼þÔÚ×¢½âµÄ¡°condition¡±ÊôÐÔÖÐÖ¸Ã÷£¬¶øÇÒÌõ¼þÊÇÓÉSpEL±í´ïʽ±àдµÄ¡£ËùÓÐÆ¥ÅäÌõ¼þµÄ´¦Àíº¯Êý½«»áÔÚÏàͬµÄÏß³ÌÖÐÎ޹̶¨Ë³ÐòµÄµ÷Óá£
ÏÂÃæ¸ø³öÒ»¸öÀý×Ó£¨ÓÉ¿ìËÙ¿ªÊ¼ÖÐÀý×ÓÐ޸ĶøÀ´£©£º
ÏÂÃæÕâ¸öÀý×ÓÖУ¬pom.xml£¬ºÍapplication.propertiesÓë¿ìËÙ¿ªÊ¼µÄÒ»Ñù¡£
//Ïȶ¨ÒåÁ½¸öDTO
public class Message {
private String message;
private Integer all;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Integer getAll() {
return all;
}
public void setAll(Integer all) {
this.all = all;
}
}
public class Error {
private String error;
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
}
|
½Ó×ÅÊÇ·â×°Á˵ÄͨµÀ£¨channel£©²Ù×÷µÄService¡£Óë¿ìËÙ¿ªÊ¼µÄÀý×Ó²»Í¬µÄÊÇ£¬Õâ¸öÀï´´½¨ÏûϢʱÉèÖõÄHeaderµÄ¡°contentType¡±ÊôÐÔ£¬ÖµÎªÏûϢЯ´øµÄÊý¾ÝµÄClassµÄSimpleName¡£
@Component
public class Service {
@Autowired
private Processor processor;
public boolean write(Object data) {
return processor.output().send(
MessageBuilder.withPayload(data)
.setHeader("contentType", data.getClass().getSimpleName()).build());
}
}
|
×îºóÊÇAppÀà¡£ÕâÀཫ·¢²¼ÁËÁ½´ÎÏûÏ¢£¬·Ö±ðÊÇMessageÀàÐ͵ĺÍErrorÀàÐ͵ġ£²¢ÇÒʹÓÃ@StreamListener×¢½âÁËÈý¸ö·½·¨£¬¶¼ÉèÖÃÁËconditionÊôÐÔÖ¸Ã÷·Ö·¢Ìõ¼þ¡£
@EnableBinding(value
= { Processor.class })
@SpringBootApplication
public class App {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(App.class);
// ·¢ËÍÏûÏ¢
Message message = new Message();
message.setAll(200);
message.setMessage("¹·×ÓÔÚÂð£¿");
Error error = new Error();
error.setError("´íÎóºô»½£¡");
write(context, message);
write(context, error);
}
@StreamListener(target = Sink.INPUT, condition
= "headers['contentType']=='Message'")
public void handler1(@Payload Message message,
@Header("contentType") String header)
{
System.out.println("¹·×ÓÊÕµ½messageÏûÏ¢1£º"
+ message.getMessage());
}
@StreamListener(target = Sink.INPUT, condition
= "headers['contentType']=='Error'")
public void handler2(Error message) {
System.out.print("¹·×ÓÊÕµ½errorÏûÏ¢2£º" +
message.getError());
}
@StreamListener(target = Sink.INPUT, condition
= "headers['contentType']=='Message'")
public void handler3(@Payload Message message,
@Header("contentType") String header)
{
System.out.println("¹·×ÓÊÕµ½messageÏûÏ¢3£º"
+ message.getMessage());
}
// ·¢ËÍÏûÏ¢
public static boolean write(ConfigurableApplicationContext
context, Object data) {
Service service = context.getBean(Service.class);
return service.write(data);
}
} |
Êä³ö½á¹û£º
¹·×ÓÊÕµ½messageÏûÏ¢1£º¹·×ÓÔÚÂð£¿
¹·×ÓÊÕµ½messageÏûÏ¢3£º¹·×ÓÔÚÂð£¿
¹·×ÓÊÕµ½errorÏûÏ¢2£º´íÎóºô»½£¡ |
¿ÉÒÔ¿´µ½Æ¥ÅäÁË¡°contentType=Message¡±µÄÁ½¸ö·½·¨¶¼Ö´ÐÐÁË£¬Æ¥ÅäÁË¡°contentType=error¡±µÄ·½·¨Ò²Ö´ÐÐÁË¡£
ÕâÀïÎÒÔÙ²¹³äÒ»µãÎÒʹÓÃʱÓöµ½µÄÎÊÌâ
Èç¹ûÎÒ°ÑHeaderÉèÖÃÒ»¸öÊôÐÔ¡°type=XXX¡±£¬µ«»ñÈ¡µ½ÏûÏ¢µÄʱºò£¬HeaderÉϲ¢Ã»ÓÐÕâ¸öÊôÐÔ¡£¼òµ¥³¢ÊÔÁËһЩ£¬·¢ÏÖÖ»ÄÜÐÞ¸ÄÏÖÓÐÊôÐÔ£¨ÀýÈçcontentType£©£¬²»ÄÜÌí¼ÓÐÂÊôÐÔ¡£
2.3 ¾ÛºÏ
2.3.1 ʹÓÃÏÞÖÆ
Spring Cloud Stream Ö§³Ö¾ÛºÏ¶à¸öÓ¦ÓõŦÄÜ¡£Õâ¸ö¹¦ÄÜ¿ÉÒÔÖ±½ÓÁ¬½Ó¶à¸öÓ¦ÓõÄÊäÈ룬Êä³öͨµÀ£¬±ÜÃâͨ¹ý´úÀí£¨Ö¸Kafka£¬RabbitMQÕâЩMiddleware£©½»»»ÏûϢʱ´øÀ´µÄ¶îÍâºÄ·Ñ¡£µ½1.0°æµÄSpring
Cloud StreamΪֹ£¬¾ÛºÏ¹¦ÄܽöÖ§³ÖÏÂÁÐÓ¦Óãº
1.Ö»Óе¥¸öÊä³öͨµÀ£¬²¢ÇÒÃüÃûΪoutputµÄÓ¦Ó㨾ÍÊÇSource£©
2.Ö»Óе¥¸öÊäÈëͨµÀ£¬²¢ÇÒÃüÃûΪinputµÄÓ¦Ó㨾ÍÊÇSink£©
3.Ö»ÓÐÒ»¸öÊä³öͨµÀºÍÒ»¸öÊäÈëͨµÀ²¢ÇÒÃüÃûΪoutputºÍinputµÄÓ¦Ó㨾ÍÊÇProcessor£©
ÒÔÉÏÊǹٷ½ÎĵµÔ»°£¬¸öÈ˾õµÃºÜ¼¦ÀߵŦÄÜ£¬Ò²ÐíÎÒÓõÃÉÙ°É¡£
¾ß±¸ÒÔÉÏÌØÕ÷µÄÓ¦ÓþͿÉÒÔʹÓÃSpring Cloud StreamµÄ¾ÛºÏ¹¦Äܽ«¶à¸öÓ¦¸ÃÁ¬½Ó³ÉÒ»´®»¥ÏàÁ¬½ÓµÄÓ¦Óá£
ÕâÀﻹÓм¸¸öÏÞÖÆ£¬ÆðʼµÄÓ¦ÓñØÐëÊÇSource»òÕßProcessor£¬½áÊøµÄÓ¦ÓñØÐëÊÇSink»òÕßProcessor¡£ÖмäµÄÓ¦ÓñØÐëÊÇProcessor£¬²»¹ý¿ÉÒÔÓÐÈÎÒâÊýÁ¿µÄProcessor¡££¨Soruce£¬Sink£¬Processor¾ÍÊÇÖ¸¾ß±¸ÉÏÃæËùËµÌØÕ÷µÄÓ¦Óã©
2.3.2 Àý×Ó
ÏÂÃæ¸ø³ö¹Ù·½Àý×Ó£¬ÏÈ˵Ã÷¼¸¸ö×¢Òâµã£º
1.ÏÂÃæÀý×ÓÖÐÓÐÈý¸öÓ¦Ó÷ֱðÊÇSource£¬Sink£¬Processor£¬ÕâÈý¸öÓ¦ÓÿÉÒÔ·Ö²¼ÔÚ²»Í¬ÏîÄ¿ÖУ¬Ò²ÄÜÔÚÏàͬÏîÄ¿ÖС£ÐèҪעÒâµÄÊÇ£¬Èç¹ûÔÚÏàͬÏîÄ¿ÖУ¬Ó¦¸ÃÒª´¦ÓÚ²»Í¬µÄ°üÖУ¬Èç¹ûͬ¸ö°ü£¬¶à¸ö@SpringBootApplication×¢½â»áµ¼Ö±¨´í
2.ʹÓÃ@Transformer×¢½âÐèÒªÖ¸Ã÷inputChannelºÍoutputChannelÊôÐÔ¡£¹Ù·½ÎĵµµÄÀý×ÓÉÏÊÇûÓÐÖ¸Ã÷µÄ£¬µ«ÎÒÔËÐеÄʱºòÈç¹û²»Ö¸Ã÷¾Í²»Äܽ«¶à¸öÓ¦ÓÃÁ¬ÔÚÒ»Æð¡£
3.²»ÒªÊ¹ÓÃeclipseÖеÄSpring BootÓ¦ÓõIJå¼þÔËÐУ¬Ê¹Óòå¼þÔËÐлᱨעÒâµã1µÄ´íÎó£¬ÔÒòÊÇʲô²»Çå³þ¡£
//Source
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {
@InboundChannelAdapter(value = Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}
//Processor
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {
@Transformer(inputChannel=Sink.INPUT
,outputChannel=Source.OUTPUT)
public String loggerSink(String payload) {
return payload.toUpperCase();
}
}
//Sink
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {
@StreamListener(Sink.INPUT)
public void loggerSink(Object payload) {
System.out.println("Received: " +
payload);
}
} |
ÉÏÃæÊÇÈý¸öÓ¦Óã¬ÏÂÃæÊǽ«Èý¸öÓ¦ÓÃÁ¬½ÓÆðÀ´µÄ´úÂë¡£
@SpringBootApplication
public class App {
public static void main(String[] args) {
new AggregateApplicationBuilder().from(Source
Application.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class).to(SinkApp
lication.class).args("--debug=true").run(args);
}
} |
´úÂëºÜ¼òµ¥£¬¾ÍÊÇʹÓÃAggregateApplicationBuilder½«Èý¸öÓ¦ÓÃÁ¬½ÓÆðÀ´¡£.args("XXX")Õâ¶Î´úÂëµÄ×÷ÓþÍÊÇΪ¶ÔÓ¦µÄÓ¦Óô«µÝÔËÐÐʱ²ÎÊý¡£
2.3.3 ²»Í¬Á¬½ÓÇé¿öϵÄBinder°ó¶¨
ÓÉÓÚÏÞÖÆ¶à¶à£¬¿ÉÒÔÇî¾Ù³öËùÓеĿÉÄÜÁ¬½Ó£¬ÏÂÃæ¸ø³ö²»Í¬Á¬½ÓÓëBinderµÄ°ó¶¨Çé¿ö£º
1.Èç¹ûÒÔSourceÓ¦ÓÿªÊ¼²¢ÇÒÒÔSinkÓ¦ÓýáÊø£¬ÄÇôӦÓüäµÄÁ¬½ÓÊÇÖ±½Ó½øÐе쬲»»á¾¹ý´úÀí£¨Ö¸Kafka£¬RabbitMQÕâЩMiddleware£©£¬Ò²¾Í²»»áÓëBinder°ó¶¨¡£ÀýÈçÉÏÃæµÄÀý×Ó£¬Äã°ÑʹÓõÄMiddleware¹Ø±Õ£¬ÀýÈçÎÒʹÓõÄÊÇKafka£¬ÎÒ°ÑKafka¹ØÁË£¬Ó¦ÓÃÒ²ÄÜÅÜÆðÀ´¡£
2.Èç¹ûÒÔProcessorÓ¦ÓÿªÊ¼£¬ÄÇôÕâ¸öÓ¦ÓõÄinputͨµÀ¾ÍÊÇÕâÒ»´®Ò»ÑùµÄinputͨµÀ£¬ÕâÖÖÇé¿öÏ£¬»á´¥·¢inputͨµÀÓëBinderµÄ°ó¶¨¡£
3.Èç¹ûÒÔProcessorÓ¦ÓýáÊø£¬ÄÇôÕâ¸öÓ¦ÓõÄoutputͨµÀ¾ÍÊÇÕâ´®Ó²Ó²µÄoutputͨµÀ£¬»á´¥·¢outputͨµÀÓëBinderµÄ°ó¶¨¡£
2.3.4 ÅäÖþۺϵÄÓ¦ÓÃ
Spring Cloud Stream Ö§³ÖΪ¾ÛºÏÔÚÒ»ÆðµÄ¶à¸öÓ¦ÓÃÖеÄÒ»¸öÓ¦Óô«µÝ²ÎÊý¡£
ΪӦÓÃÃüÃûnamespaceºó£¬¾Í¿ÉÒÔͨ¹ýÃüÁîÐУ¬»·¾³±äÁ¿µÈ·½Ê½¸øÓ¦Óô«µÝ²ÎÊý¡£
public
static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApplication.class).namespace("from").
args("--fixedDelay=20000")
.via(ProcessorApplication.class).namespace("via")
.to(SinkApplication.class).namespace("to").args("--debug=true").run(args);
} |
Õâ¶Ë´úÂëºÍÇ°ÃæµÄÀý×Óû̫´ó²î±ð£¬Ö»ÊǶàÁË.namespace()£¬Õâ¶Î´úÂë¾ÍÊÇΪӦÓÃÉèÖÃnamesapce¡£
½Ó×ÅÊǾۺÏÔÚÒ»ÆðµÄÓ¦ÓõĴúÂ룺
//»ñÈ¡´«ÈëµÄ²ÎÊý
@Value("${fixedDelay:null}")
private String args;
@InboundChannelAdapter(value = Source.OUTPUT)
public String timerMessageSource() {
//Êä³ö²ÎÊý
System.out.println("Source get args:"+args);
return new SimpleDateFormat().format(new Date());
} |
ÕâÀïÖ»¸ø³öÒ»¸ö£¬ÆäËûÀàËÆ£¬¶¼ÊǼÓÁË»ñÈ¡²ÎÊýºÍÊä³ö²ÎÊýµÄ´úÂë¡£
½Ó×Å´ò°üºóÒÔÏÂÁÐÃüÁîÔËÐУº
java
-jar stream-aggregation.jar |
Êä³ö£º
Source
get args:20000
Processor get args:null
Sink get args:null
Received: 17-12-14 ÏÂÎç5:43 |
¿ÉÒÔ¿´µ½£¬ÒòΪProcessorÊÇûÓÐfixedDelay²ÎÊýµÄ£¬ËùÓÐÊä³önull
ÒÔÏÂÁÐÃüÁîÔËÐУº
java
-jar stream.jar via --fixedDelay=200 |
Êä³ö£º
Source
get args:20000
Processor get args:200
Sink get args:null
Received: 17-12-14 ÏÂÎç5:46 |
¿ÉÒÔ¿´µ½£¬Êä³öΪ200£¬¾ÍÊÇÎÒÃÇ´«ÈëµÄ²ÎÊý£¬¶øSinkºÍSourceµÄÊä³öû±ä£¬Ò²¾ÍÊÇû¸Ä±äËüÃǵIJÎÊý
×ܽáһϣº
1.ÔÚ¾ÛºÏʱºòÉèÖÃnamespace
2.ÔÚÃüÁîÐлòÕß»·¾³±äÁ¿µÈ·½Ê½Ê¹ÓÃnamespaceΪָ¶¨Ó¦Óô«µÝ²ÎÊý
BinderÒÔ¼°ÅäÖÃ

ÕâÀïÔٷųöÓ¦ÓÃÄ£ÐÍͼ¡£Binder¼òµ¥µÄÀí½â¾ÍÊÇ·â×°Á˶ÔÏûϢϵͳ£¨kafka£¬rabbitMQ£©µÄ²Ù×÷¡£¿ÉÒÔʹÓÿª·¢Õß¼òµ¥µÄÅäÖþÍÄÜʹÓÃÏûϢϵͳµÄ·¢²¼/¶©ÔÄ£¬µã¶Ôµã´«Ê䣬·Ö×飬·ÖÇøµÈµÈ¹¦ÄÜ¡£ÊÇ¿ª·¢Õß¿ª·ÅʱÄܺöÂÔ¶ÔÏûϢϵͳ²Ù×÷µÄϸ½Ú¡£µ±È»£¬ÕâЩ×é¼þµÄÉè¼ÆÒ»°ãÊdzéÏó³öÒ»¸ö½Ó¿Ú£¬È»ºó¶Ô²»Í¬µÄÏûϢϵͳÓв»Í¬µÄʵÏÖ£¬ÕâЩ¶«Î÷ÕâÀï²»½²£¬Ö»½²ÔõôÓá£
1 BinderʵÏÖÀàµÄ¼ì²â
1.1 µ¥¸öBinderʵÏÖÀà
Èç¹ûÔÚÀà·¾¶ÉÏÖ»ÓÐÒ»¸öBinderµÄʵÏÖÀࣨÀýÈçÄãÔÚmavenÏîÄ¿ÖУ¬Ö»Ìí¼ÓÁËkafkaµÄBinderµÄʵÏÖµÄÒÀÀµ£©£¬ÄÇôSpring
Cloud Stream»áĬÈÏʹÓÃÕâ¸öʵÏÖÀ࣬ËùÓеÄͨµÀ£¨Channel£©¶¼»á°ó¶¨Õâ¸öBinder¡£¾ÍÏñÇ°ÃæµÄÀý×ÓÄÇÑù£¬Ä㼸ºõ¸Ð¾õ²»µ½BinderµÄ´æÔÚ£¬ÄãÖ»ÐèÒªÅäÖÃÒ»ÏÂͨµÀ£¨Channel£©µÄÄ¿µÄµØ£¨destination£©£¬·Ö×飨group£©£¬·ÖÇø£¨partition£©µÈÐÅÏ¢¾Í¿ÉÒÔʹÓá£ÀýÈç¿ìËÙ¿ªÊ¼µÄÀý×ÓÖоͽö½öÅäÖÃÁËÊäÈ룬Êä³öͨµÀµÄÄ¿µÄµØ
spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.output.destination=test |
1.2 ¶à¸öBinderʵÏÖÀà
Èç¹ûÓжà¸öBinderʵÏÖÀ࣬ÄÇô¾Í±ØÐëÖ¸Ã÷ÄĸöͨµÀ£¨Channel£©°ó¶¨ÄĸöBinder¡£ÅäÖõķ½Ê½¾ÍÊÇÔÚapplication.peoperties»òÕßapplication.yamlÅäÖÃÎļþÉÏÌí¼ÓÒ»ÏÂÄÚÈÝ£º
spring.cloud.stream.bindings.ͨµÀÃû³Æ.binder=BinderÃû³Æ |
ÕâÑù¾ÍÄÜÖ¸Ã÷ʲôͨµÀ°ó¶¨ÄĸöBinderÁË¡£
µ±È»£¬ÄãÒ²¿ÉÒÔÅäÖÃĬÈϵÄBinder
spring.cloud.stream.defaultBinder=BinderÃû³Æ |
¹ØÓÚBinderµÄÃû³Æ
ÔÚÿ¸öBinderʵÏÖµÄjar°üµÄMETA-INFĿ¼Ï¶¼»áÓÐÒ»¸öspring.bindersÎļþ¡£¸ÃÎļþÊÇÒ»¸ö¼òµ¥µÄµ¥ÊôÐÔÎļþ£¬ÀýÈçrabbitMQµÄBinderµÄʵÏÖµÄspring.bindersÎļþµÄÄÚÈÝÈçÏ£º
rabbit:\org.springframework.cloud.stream
.binder.rabbit.config.RabbitServiceAuto
Configuration |
Ç°ÃæµÄkey²¿·Ö£¨ÕâÀïÊÇrabbit£©¾ÍÊÇBinderµÄÃû³Æ¡£Ò²¾ÍÊÇrabbitMQµÄBinderµÄÃû³Æ¾ÍÊÇrabbit¡£Êµ¼ÊÉÏÏÖÔÚÒ²¾ÍÖ»ÓÐÁ½ÖÖBinderµÄʵÏÖ£¬Ò»¸öÊÇrabbitMQµÄÒ»¸öÊÇkafkaµÄ£¬kafkaµÄBinderµÄÃû³Æ¾ÍÊÇkafka¡£
2 ¿ÉÑ¡ÅäÖÃ
¿ÉÒÔͨ¹ýSpring BootµÄÈÎÒâÅäÖûúÖÆÀ´¶ÔSpring Cloud StreamÓ¦ÓýøÐÐÅäÖã¬ÀýÈçÓ¦ÓòÎÊý£¨application
argument£©£¬»·¾³±äÁ¿£¨environment variable£©ÒÔ¼°YAML »òÕß propertiesÎļþ¡£
2.1 ¶ÔÓ¦ÓõÄÅäÖÃ
spring.cloud.stream.instanceCount |
Õâ¸öÊÇÅäÖÃÓ¦ÓÃʵÀýµÄÊýÁ¿¡£Èç¹ûʹÓÃkafka£¬±ØÐëÉèÖ÷ÖÇø¡£Ä¬ÈÏֵΪ1
spring.cloud.stream.instanceIndex |
ʵÀýµÄ±àºÅ£¬±àºÅ´Ó0¿ªÊ¼¡£
spring.cloud.stream.dynamicDesinations |
ÉèÖÃÒ»ÁÐÄ¿µÄµØÓÃÒÔ¶¯Ì¬°ó¶¨¡£Èç¹ûÉèÖÃÁË£¬Ö»ÓÐÁбíÖеÄÄ¿µÄµØÄܱ»°ó¶¨¡£Ä¬ÈÏֵΪ¿Õ¡£
spring.cloud.stream.defaultBinder |
ÉèÖõÄĬÈϵÄBinder£¬Õâ¸öÇ°ÃæËµ¹ý£¬²»ÔÙ׸Êö¡£Ä¬ÈÏֵΪ¿Õ¡£
spring.cloud.stream.overrideCloudConnectors |
ĬÈÏֵΪfalse¡£µ±ÖµÎªfalseʱ£¬Binder»á¼ì²é²¢Ñ¡ÔñºÏÊʵÄbound ServiceÀ´´´½¨Á¬½Ó¡£µ±ÉèΪtrueµÄʱºò£¬Binder»á°´ÕÕSpring
Cloud StreamÅäÖÃÎļþÀ´Ñ¡Ôñbound Service¡£Õâ¸öÅäÖÃͨ³£ÊÇÔÚÐèÒªÁ¬½Ó¶à¸öÏûϢϵͳµÄʱºòÓõ½¡£
2.2 Á¬½Ó£¨Binding£©µÄÅäÖÃ
ÕâÀàÅäÖõĸñʽÈçÏ£º
spring.cloud.stream.bindings.<channelName>
.<property>=<value> |
Òâ˼¾ÍÊÇÅäÖÃÃûΪchannelNameµÄͨµÀµÄpropertyÊôÐÔµÄֵΪvalue¡£
ΪÁ˱ÜÃâÖØ¸´ÅäÖã¬Spring Cloud Stream Ò²Ö§³Ö¶ÔÈ«²¿Í¨µÀ£¨channel£©½øÐÐÉèÖá£ÅäÖÃĬÈÏÊôÐԵĸñʽÈçÏ£º
spring.cloud.stream.default.<property>=<value> |
2.2.1 ͨÓõÄÅäÖÃ
һϵÄÅäÖÃÊôÐÔ¶¼´øÓС°spring.cloud.stream.bindings.<channelName>¡±Ç°×º£¬Îª·½±ãÎÄ×ÖÅŰ棬ʡÂÔǰ׺¡£
destination
ͨµÀ£¨channel£©ÓëÏûϢϵͳÁ¬½ÓµÄÄ¿µÄµØ£¨ÈôÏûϢϵͳÊÇRabbitMQ£¬Ä¿µÄµØ£¨destination£©¾ÍÊÇÖ¸exchange£¬ÏûϢϵͳÊÇKafka£¬ÄÇô¾ÍÊÇÖ¸topic£©¡£
¿ÉÒÔÁ¬½Ó¶à¸öÄ¿µÄµØ¡£ÒªÏëÁ¬½Ó¶à¸öÄ¿µÄµØ£¬Ö»ÐèÒªÓá°,¡±½«¶à¸öÄ¿µÄµØ·Ö¿ª¼´¿É¡£ÀýÈ磺
spring.cloud.stream.channelName.
destinaction=destinaction1,destinaction2 |
group
ÅäÖÃͨµÀµÄÏû·ÑÕß×é¡£½öÓ¦ÓÃÓÚÊäÈëͨµÀ¡£
ĬÈÏֵΪnull
²¹³ä£ºÒ»¸öchannel¿ÉÒÔÁ¬½Ó¶à¸ödestination£¬Í¬Ò»¸ögroupÄÚµÄchannelÁ¬½ÓµÄdestination¿ÉÒÔ²»Í¬¡£
Èç¹ûÒ»¸ögroupÄÚµÄchannelÁ¬½ÓÁËA£¬B£¬CÈý¸ödestination¡£ÄÇôA£¬B£¬CÕâ¸öÈý¸ödestinationµÄÏûÏ¢¶¼»á¿½±´Ò»·Ý·¢¸øÕâ¸ögroup£¬²¢ÇÒÑ¡ÔñÕâ¸ögroupÖÐchannelÏû·ÑÕâ¸öÏûÏ¢¡£ÀýÈ磬Õâ¸ögroupÖеÄa£¬bÁ½¸öchannelÁ¬½Ó²¢ÇÒÖ»Á¬½ÓÁËdestination
A£¬channel cÁ¬½ÓÇÒÖ»Á¬½ÓÁËdestination B£¬ÄÇô»áÔÚa£¬bÖÐѡһ¸öÀ´´¦ÀíÀ´×ÔAµÄÏûÏ¢£¬c²»ÔÚÑ¡ÔñµÄ·¶Î§ÄÚ¡£
Èç¹ûÓÐÁ½¸ögroup¶¼Á¬½ÓÁËdestination A£¬ÄÇôAµÄÏûÏ¢»á¿½±´Á½·Ý·Ö±ð·¢¸øÕâÁ½¸ögroup¡£
contentType
ͨµÀ£¨channel£©³ÐÔØµÄÄÚÈݵÄÀàÐÍ¡£
ĬÈÏֵΪnull¡£
binder
Õâ¸öÔÚÇ°Ãæ¡°¶à¸öBinderʵÏÖÀࡱ²¿·Ö½²ÁË¡£ |