±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚtaobao£¬±¾ÎĽéÉÜÁËÔÚSpringÖйØÓÚÏûÏ¢µÄÁ½¸öÖ÷ÒªµÄ¿ò¼Ü,rocktmq-spring-bootµÄÉè¼ÆÊµÏÖµÈÏà¹ØÄÚÈÝ¡£ |
|
±¾ÎĽ«¶Ôrocktmq-spring-bootµÄÉè¼ÆÊµÏÖ×öÒ»¸ö¼òµ¥µÄ½éÉÜ£¬¶ÁÕß¿ÉÒÔͨ¹ý±¾ÎÄÁ˽⽫RocketMQ
Client¶Ë¼¯³ÉΪspring-boot-starter¿ò¼ÜµÄ¿ª·¢Ï¸½Ú£¬È»ºóͨ¹ýÒ»¸ö¼òµ¥µÄʾÀýÀ´Ò»²½Ò»²½µÄ½²½âÈçºÎʹÓÃÕâ¸öspring-boot-starter¹¤¾ß°üÀ´ÅäÖ㬷¢ËͺÍÏû·ÑRocketMQÏûÏ¢¡£
ͨ¹ý±¾ÎÄ£¬Äú½«Á˽⵽£º
SpringµÄÏûÏ¢¿ò¼Ü½éÉÜ
rocketmq-spring-boot¾ßÌåʵÏÖ
ʹÓÃʾÀý
ǰÑÔ
ÉÏÊÀ¼Í90Äê´úÄ©£¬Ëæ×ÅJava EE(Enterprise Edition)µÄ³öÏÖ£¬ÌرðÊÇEnterprise
Java BeansµÄʹÓÃÐèÒª¸´ÔÓµÄÃèÊö·ûÅäÖúÍËÀ°å¸´ÔӵĴúÂëʵÏÖ£¬Ôö¼ÓÁ˹ã´ó¿ª·¢ÕßµÄѧϰÇúÏߺͿª·¢³É±¾£¬ÓÉ´Ë»ùÓÚ¼òµ¥µÄXMLÅäÖÃºÍÆÕͨJava¶ÔÏó(Plain
Old Java Objects)µÄSpring¼¼ÊõÓ¦Ô˶øÉú£¬ÒÀÀµ×¢Èë(Dependency Injection),
¿ØÖÆ·´×ª(Inversion of Control)ºÍÃæÏòÇÐÃæ±à³Ì(AOP)µÄ¼¼Êõ¸ü¼ÓÃô½ÝµØ½â¾öÁË´«Í³JavaÆóÒµ¼°°æ±¾µÄ²»×ã¡£
Ëæ×ÅSpringµÄ³ÖÐøÑݽø£¬»ùÓÚ×¢½â(Annotation)µÄÅäÖÃÖð½¥È¡´úÁËXMLÎļþÅäÖ㬠2014Äê4ÔÂ1ÈÕ£¬Spring
Boot 1.0.0Õýʽ·¢²¼£¬Ëü»ùÓÚ¡°Ô¼¶¨´óÓÚÅäÖá±£¨Convention over configuration)ÕâÒ»ÀíÄîÀ´¿ìËٵؿª·¢¡¢²âÊÔ¡¢ÔËÐкͲ¿ÊðSpringÓ¦Ó㬲¢ÄÜͨ¹ý¼òµ¥µØÓë¸÷ÖÖÆô¶¯Æ÷(Èç
spring-boot-web-starter)½áºÏ£¬ÈÃÓ¦ÓÃÖ±½ÓÒÔÃüÁîÐеķ½Ê½ÔËÐУ¬²»ÐèÔÙ²¿Êðµ½¶ÀÁ¢ÈÝÆ÷ÖС£ÕâÖÖ¼ò±ãÖ±½Ó¿ìËÙ¹¹½¨ºÍ¿ª·¢Ó¦ÓõĹý³Ì£¬¿ÉÒÔʹÓÃÔ¼¶¨µÄÅäÖò¢ÇÒ¼ò»¯²¿Êð£¬Êܵ½Ô½À´Ô½¶àµÄ¿ª·¢Õߵ϶ӡ£
Apache RocketMQÊÇÒµ½çÖªÃûµÄ·Ö²¼Ê½ÏûÏ¢ºÍÁ÷´¦ÀíÖмä¼þ£¬¼òµ¥µØÀí½â£¬ËüÓÉBroker·þÎñÆ÷ºÍ¿Í»§¶ËÁ½²¿·Ö×é³É£º
ÆäÖпͻ§¶ËÒ»¸öÊÇÏûÏ¢·¢²¼Õ߿ͻ§¶Ë(Producer)£¬Ëü¸ºÔðÏòBroker·þÎñÆ÷·¢ËÍÏûÏ¢£»
ÁíÍâÒ»¸öÊÇÏûÏ¢µÄÏû·ÑÕ߿ͻ§¶Ë(Consumer)£¬¶à¸öÏû·ÑÕß¿ÉÒÔ×é³ÉÒ»¸öÏû·Ñ×飬À´¶©ÔĺÍÀÈ¡Ïû·ÑBroker·þÎñÆ÷ÉÏ´æ´¢µÄÏûÏ¢¡£
ΪÁËÀûÓÃSpring BootµÄ¿ìËÙ¿ª·¢ºÍÈÃÓû§Äܹ»¸üÁé»îµØÊ¹ÓÃRocketMQÏûÏ¢¿Í»§¶Ë£¬Apache
RocketMQÉçÇøÍÆ³öÁËspring-boot-starterʵÏÖ¡£Ëæ×Å·Ö²¼Ê½ÊÂÎñÏûÏ¢¹¦ÄÜÔÚRocketMQ
4.3.0°æ±¾µÄ·¢²¼£¬½üÆÚÉý¼¶ÁËÏà¹ØµÄspring-boot´úÂ룬ͨ¹ý×¢½â·½Ê½Ö§³Ö·Ö²¼Ê½ÊÂÎñµÄ»Ø²éºÍÊÂÎñÏûÏ¢µÄ·¢ËÍ¡£
±¾ÎĽ«¶Ôµ±Ç°µÄÉè¼ÆÊµÏÖ×öÒ»¸ö¼òµ¥µÄ½éÉÜ£¬¶ÁÕß¿ÉÒÔͨ¹ý±¾ÎÄÁ˽⽫RocketMQ Client¶Ë¼¯³ÉΪspring-boot-starter¿ò¼ÜµÄ¿ª·¢Ï¸½Ú£¬È»ºóͨ¹ýÒ»¸ö¼òµ¥µÄʾÀýÀ´Ò»²½Ò»²½µÄ½²½âÈçºÎʹÓÃÕâ¸öspring-boot-starter¹¤¾ß°üÀ´ÅäÖ㬷¢ËͺÍÏû·ÑRocketMQÏûÏ¢¡£
Spring ÖеÄÏûÏ¢¿ò¼Ü
˳±ãÔÚÕâÀïÌÖÂÛÒ»ÏÂÔÚSpringÖйØÓÚÏûÏ¢µÄÁ½¸öÖ÷ÒªµÄ¿ò¼Ü£¬¼´Spring MessagingºÍSpring
Cloud Stream¡£ËüÃǶ¼Äܹ»ÓëSpring BootÕûºÏ²¢ÌṩÁËһЩ²Î¿¼µÄʵÏÖ¡£ºÍËùÓеÄʵÏÖ¿ò¼ÜÒ»Ñù£¬ÏûÏ¢¿ò¼ÜµÄÄ¿µÄÊÇʵÏÖÇáÁ¿¼¶µÄÏûÏ¢Çý¶¯µÄ΢·þÎñ£¬¿ÉÒÔÓÐЧµØ¼ò»¯¿ª·¢ÈËÔ±¶ÔÏûÏ¢Öмä¼þµÄʹÓø´ÔÓ¶È£¬ÈÃϵͳ¿ª·¢ÈËÔ±¿ÉÒÔÓиü¶àµÄ¾«Á¦¹Ø×¢ÓÚºËÐÄÒµÎñÂß¼µÄ´¦Àí¡£
2.1 Spring Messaging
Spring MessagingÊÇSpring Framework 4ÖÐÌí¼ÓµÄÄ£¿é£¬ÊÇSpringÓëÏûϢϵͳ¼¯³ÉµÄÒ»¸öÀ©Õ¹ÐÔµÄÖ§³Ö¡£ËüʵÏÖÁË´Ó»ùÓÚJmsTemplateµÄ¼òµ¥µÄʹÓÃJMS½Ó¿Úµ½Òì²½½ÓÊÕÏûÏ¢µÄÒ»ÕûÌ×ÍêÕûµÄ»ù´¡¼Ü¹¹£¬Spring
AMQPÌṩÁ˸ÃÐÒéËùÒªÇóµÄÀàËÆµÄ¹¦Äܼ¯¡£ ÔÚÓëSpring BootµÄ¼¯³Éºó£¬ËüÓµÓÐÁË×Ô¶¯ÅäÖÃÄÜÁ¦£¬Äܹ»ÔÚ²âÊÔºÍÔËÐÐʱÓëÏàÓ¦µÄÏûÏ¢´«µÝϵͳ½øÐм¯³É¡£
µ¥´¿¶ÔÓÚ¿Í»§¶Ë¶øÑÔ£¬Spring MessagingÌṩÁËÒ»Ì׳éÏóµÄAPI»òÕß˵ÊÇÔ¼¶¨µÄ±ê×¼£¬¶ÔÏûÏ¢·¢ËͶ˺ÍÏûÏ¢½ÓÊն˵Äģʽ½øÐй涨£¬²»Í¬µÄÏûÏ¢Öмä¼þÌṩÉÌ¿ÉÒÔÔÚÕâ¸öģʽÏÂÌṩ×Ô¼ºµÄSpringʵÏÖ£ºÔÚÏûÏ¢·¢ËͶËÐèҪʵÏÖµÄÊÇÒ»¸öXXXTemplateÐÎʽµÄJava
Bean£¬½áºÏSpring BootµÄ×Ô¶¯»¯ÅäÖÃÑ¡ÏîÌṩ¶à¸ö²»Í¬µÄ·¢ËÍÏûÏ¢·½·¨£»ÔÚÏûÏ¢µÄÏû·Ñ¶ËÊÇÒ»¸öXXXMessageListener½Ó¿Ú£¨ÊµÏÖ·½Ê½Í¨³£»áʹÓÃÒ»¸ö×¢½âÀ´ÉùÃ÷Ò»¸öÏûÏ¢Çý¶¯µÄPOJO£©£¬Ìṩ»Øµ÷·½·¨À´¼àÌýºÍÏû·ÑÏûÏ¢£¬Õâ¸ö½Ó¿ÚͬÑù¿ÉÒÔʹÓÃSpring
BootµÄ×Ô¶¯»¯Ñ¡ÏîºÍһЩ¶¨ÖÆ»¯µÄÊôÐÔ¡£
Èç¹ûÓÐÐËȤÉîÈëµÄÁ˽âSpring Messaging¼°Õë¶Ô²»Í¬µÄÏûÏ¢²úÆ·µÄʹÓã¬ÍƼöÔĶÁÕâ¸öÎļþ¡£²Î¿¼Spring
MessagingµÄ¼ÈÓÐʵÏÖ£¬RocketMQµÄspring-boot-starterÖÐ×ñÑÁËÏà¹ØµÄÉè¼ÆÄ£Ê½²¢½áºÏRocketMQ×ÔÉíµÄ¹¦ÄÜÌØµãÌṩÁËÏàÓ¦µÄAPI(È磬˳Ðò£¬Òì²½ºÍÊÂÎñ°ëÏûÏ¢µÈ)¡£
2.2 Spring Cloud Stream
Spring Cloud Stream½áºÏÁËSpring IntegrationµÄ×¢½âºÍ¹¦ÄÜ£¬ËüµÄÓ¦ÓÃÄ£ÐÍÈçÏ£º

¸ÃͼƬÒý×Ôspring cloud stream
Spring Cloud Stream¿ò¼ÜÖÐÌṩһ¸ö¶ÀÁ¢µÄÓ¦ÓÃÄںˣ¬Ëüͨ¹ýÊäÈë(@Input)ºÍÊä³ö(@Output)ͨµÀÓëÍⲿÊÀ½ç½øÐÐͨÐÅ£¬ÏûÏ¢Ô´¶Ë(Source)ͨ¹ýÊäÈëͨµÀ·¢ËÍÏûÏ¢£¬Ïû·ÑÄ¿±ê¶Ë(Sink)ͨ¹ý¼àÌýÊä³öͨµÀÀ´»ñÈ¡Ïû·ÑµÄÏûÏ¢¡£ÕâЩͨµÀͨ¹ýרÓõÄBinderʵÏÖÓëÍⲿ´úÀíÁ¬½Ó¡£¿ª·¢ÈËÔ±µÄ´úÂëÖ»ÐèÒªÕë¶ÔÓ¦ÓÃÄÚºËÌṩµÄ¹Ì¶¨µÄ½Ó¿ÚºÍ×¢½â·½Ê½½øÐбà³Ì£¬¶ø²»ÐèÒª¹ØÐÄÔËÐÐʱ¾ßÌåµÄBinder°ó¶¨µÄÏûÏ¢Öмä¼þ¡£ÔÚÔËÐÐʱ£¬Spring
Cloud StreamÄܹ»×Ô¶¯Ì½²â²¢Ê¹ÓÃÔÚclasspathÏÂÕÒµ½µÄBinder¡£
ÕâÑù¿ª·¢ÈËÔ±¿ÉÒÔÇáËɵØÔÚÏàͬµÄ´úÂëÖÐʹÓò»Í¬ÀàÐ͵ÄÖмä¼þ£º½ö½öÐèÒªÔÚ¹¹½¨Ê±°üº¬½ø²»Í¬µÄBinder¡£ÔÚ¸ü¼Ó¸´ÔÓµÄʹÓó¡¾°ÖУ¬Ò²¿ÉÒÔÔÚÓ¦ÓÃÖдò°ü¶à¸öBinder²¢ÈÃËü×Ô¼ºÑ¡ÔñBinder£¬ÉõÖÁÔÚÔËÐÐʱΪ²»Í¬µÄͨµÀʹÓò»Í¬µÄBinder¡£
Binder³éÏóʹµÃSpring Cloud StreamÓ¦ÓÿÉÒÔÁé»îµÄÁ¬½Óµ½Öмä¼þ£¬¼ÓÖ®Spring
Cloud StreamʹÓÃÀûÓÃÁËSpring BootµÄÁé»îÅäÖÃÅäÖÃÄÜÁ¦£¬ÕâÑùµÄÅäÖÿÉÒÔͨ¹ýÍⲿÅäÖõÄÊôÐÔºÍSpring
BooÖ§³ÖµÄÈκÎÐÎʽÀ´Ìṩ£¨°üÀ¨Ó¦ÓÃÆô¶¯²ÎÊý¡¢»·¾³±äÁ¿ºÍapplication.yml»òÕßapplication.propertiesÎļþ£©£¬²¿ÊðÈËÔ±¿ÉÒÔÔÚÔËÐÐʱ¶¯Ì¬Ñ¡ÔñͨµÀÁ¬½Ódestination£¨ÀýÈ磬KafkaµÄtopic»òÕßRabbitMQµÄexchange£©¡£
Binder SPIµÄ·½Ê½À´ÈÃÏûÏ¢Öмä¼þ²úƷʹÓÿÉÀ©Õ¹µÄAPIÀ´±àдÏàÓ¦µÄBinder£¬²¢¼¯³Éµ½Spring
Cloud Steam»·¾³£¬Ä¿Ç°RocketMQ»¹Ã»ÓÐÌṩÏà¹ØµÄBinder£¬ÎÒÃǼƻ®ÔÚÏÂÒ»²½½«ÍêÉÆÕâÒ»¹¦ÄÜ£¬Ò²Ï£ÍûÉçÇøÀïÓÐÕâ·½Ãæ¾ÑéµÄͬѧ»ý¼«³¢ÊÔ£¬¹±Ï×PR»ò½¨Òé¡£
spring-boot-starterµÄʵÏÖ
ÔÚ¿ªÊ¼µÄʱºòÎÒÃÇÒѾ֪µÀ£¬spring boot starter¹¹ÔìµÄÆô¶¯Æ÷¶ÔÓÚʹÓÃÕßÊǷdz£·½±ãµÄ£¬Ê¹ÓÃÕßÖ»ÒªÔÚpom.xmlÒýÈëstarterµÄÒÀÀµ¶¨Ò壬ÏàÓ¦µÄ±àÒ룬ÔËÐкͲ¿Êð¹¦ÄܾÍÈ«²¿×Ô¶¯ÒýÈë¡£Òò´Ë³£ÓõĿªÔ´×é¼þ¶¼»áΪSpringµÄÓû§Ìṩһ¸öspring-boot-starter·â×°¸ø¿ª·¢Õߣ¬Èÿª·¢Õ߷dz£·½±ã¼¯³ÉºÍʹÓã¬ÕâÀïÎÒÃÇÏêϸµÄ½éÉÜÒ»ÏÂRocketMQ£¨¿Í»§¶Ë£©µÄstarterʵÏÖ¹ý³Ì¡£
3.1. spring-boot-starterµÄʵÏÖ²½Öè
¶ÔÓÚÒ»¸öspring-boot-starterʵÏÖÐèÒª°üº¬Èçϼ¸¸ö²¿·Ö£º
1.ÔÚpom.xmlµÄ¶¨Òå
¶¨Òå×îÖÕÒªÉú³ÉµÄstarter×é¼þÐÅÏ¢
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version> |
¶¨ÒåÒÀÀµ°ü£¬
Ëü·ÖΪÁ½¸ö²¿·Ö: A¡¢Spring×ÔÉíµÄÒÀÀµ°ü£» B¡¢RocketMQµÄÒÀÀµ°ü
<dependencies>
<!-- spring-boot-start internal depdencies
--> <dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> </dependency>
<!-- rocketmq dependencies -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- spring-boot-start parent depdency definition
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement> |
2.ÅäÖÃÎļþÀà
¶¨ÒåÓ¦ÓÃÊôÐÔÅäÖÃÎļþÀàRocketMQProperties,Õâ¸öBean¶¨ÒåÒ»×éĬÈϵÄÊôÐÔÖµ¡£Óû§ÔÚʹÓÃ×îÖÕµÄstarterʱ£¬¿ÉÒÔ¸ù¾ÝÕâ¸öÀඨÒåµÄÊôÐÔÀ´ÐÞ¸Äȡֵ£¬µ±È»²»ÊÇÖ±½ÓÐÞ¸ÄÕâ¸öÀàµÄÅäÖ㬶øÊÇspring-bootÓ¦ÓÃÖжÔÓ¦µÄÅäÖÃÎļþ£ºsrc/main/resources/application.properties.
3.¶¨Òå×Ô¶¯¼ÓÔØÀà
¶¨Òå src/resources/META-INF/spring.factoriesÎļþÖеÄ×Ô¶¯¼ÓÔØÀ࣬
ÆäÄ¿µÄÊÇÈÃspring boot¸ü¾ßÎÄÖÐÖÐËùÖ¸¶¨µÄ×Ô¶¯»¯ÅäÖÃÀàÀ´×Ô¶¯³õʼ»¯Ïà¹ØµÄBean£¬Component»òService£¬ËüµÄÄÚÈÝÈçÏ£º
org.springframework.boot.autoconfigure. EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter. RocketMQAutoConfiguration |
ÔÚRocketMQAutoConfigurationÀàµÄ¾ßÌåʵÏÖÖУ¬¶¨Ò忪·Å¸øÓû§Ö±½ÓʹÓõÄBean¶ÔÏó.
°üÀ¨£º
RocketMQProperties ¼ÓÔØÓ¦ÓÃÊôÐÔÅäÖÃÎļþµÄ´¦ÀíÀࣻ
RocketMQTemplate ·¢ËͶËÓû§·¢ËÍÏûÏ¢µÄ·¢ËÍÄ£°åÀࣻ
ListenerContainerConfiguration ÈÝÆ÷Bean¸ºÔð·¢ÏÖºÍ×¢²áÏû·Ñ¶ËÏû·ÑʵÏÖ½Ó¿ÚÀ࣬Õâ¸öÀàÒªÇó£ºÓÉ@RocketMQMessageListener×¢½â±ê×¢£»ÊµÏÖRocketMQListener·º»¯½Ó¿Ú¡£
×îºó¾ßÌåµÄRocketMQÏà¹ØµÄ·â×°
ÔÚ·¢ËͶˣ¨producer£©ºÍÏû·Ñ¶Ë(consumer)¿Í»§¶Ë·Ö±ð½øÐзâ×°£¬ÔÚµ±Ç°µÄʵÏÖ°æ±¾ÌṩÁ˶ÔSpring
Messaging½Ó¿ÚµÄ¼æÈÝ·½Ê½¡£
3.2. ÏûÏ¢·¢ËͶËʵÏÖ
ÆÕͨ·¢ËͶË
·¢ËͶ˵ĴúÂë·â×°ÔÚRocketMQTemplate POJOÖУ¬ÏÂͼÊÇ·¢ËͶ˵ÄÏà¹Ø´úÂëµÄµ÷ÓùØÏµÍ¼£º

ΪÁËÓëSpring MessagingµÄ·¢ËÍÄ£°å¼æÈÝ£¬ÔÚRocketMQTemplate¼¯³ÉÁËAbstractMessageSendingTemplate³éÏóÀ࣬À´Ö§³ÖÏà¹ØµÄÏûϢת»»ºÍ·¢ËÍ·½·¨£¬ÕâЩ·½·¨×îÖÕ»á´úÀí¸ødoSend()·½·¨£»doSend()ÒÔ¼°RocoketMQËùÌØÓеÄһЩ·½·¨ÈçÒì²½£¬µ¥ÏòºÍ˳ÐòµÈ·½·¨Ö±½ÓÌí¼Óµ½RoketMQTempalteÖУ¬ÕâЩ·½·¨Ö±½Ó´úÀíµ÷Óõ½RocketMQµÄProducer
APIÀ´½øÐÐÏûÏ¢µÄ·¢ËÍ¡£
2.ÊÂÎñÏûÏ¢·¢ËͶË
¶ÔÓÚÊÂÎñÏûÏ¢µÄ´¦Àí£¬ÔÚÏûÏ¢·¢ËͶ˽øÐÐÁ˲¿·ÖµÄÀ©Õ¹£¬²Î¿¼ÏÂͼµÄµ÷ÓùØÏµÀàͼ£º
RocketMQTemplateÀï¼ÓÈëÁËÒ»¸ö·¢ËÍÊÂÎñÏûÏ¢µÄ·½·¨sendMessageInTransaction(),
²¢ÇÒ×îÖÕÕâ¸ö·½·¨»á´úÀíµ½RocketMQµÄTransactionProducer½øÐе÷Óã¬ÔÚÕâ¸öProducerÉÏ»á×¢²áÆä¹ØÁªµÄTransactionListenerʵÏÖÀ࣬ÒÔ±ãÔÚ·¢ËÍÏûÏ¢ºóÄܹ»¶ÔTransactionListenerÀïµÄ·½·¨ÊµÏÖ½øÐе÷Óá£
3.3. ÏûÏ¢Ïû·Ñ¶ËʵÏÖ

ÔÚÏû·Ñ¶ËSpring-BootÓ¦ÓÃÆô¶¯ºó£¬»áɨÃèËùÓаüº¬@RocketMQMessageListener×¢½âµÄÀà(ÕâЩÀàÐèÒª¼¯³ÉRocketMQListener½Ó¿Ú£¬²¢ÊµÏÖonMessage()·½·¨)£¬Õâ¸öListener»áÒ»¶ÔÒ»µÄ±»·ÅÖõ½DefaultRocketMQListenerContainerÈÝÆ÷¶ÔÏóÖУ¬ÈÝÆ÷¶ÔÏó»á¸ù¾ÝÏû·ÑµÄ·½Ê½(²¢·¢»ò˳Ðò)£¬½«RocketMQListener·â×°µ½¾ßÌåµÄRocketMQÄÚ²¿µÄ²¢·¢»òÕß˳Ðò½Ó¿ÚʵÏÖ¡£ÔÚÈÝÆ÷Öд´½¨RocketMQ
Consumer¶ÔÏ󣬯ô¶¯²¢¼àÌý¶¨ÖƵÄTopicÏûÏ¢£¬Èç¹ûÓÐÏû·ÑÏûÏ¢£¬Ôò»Øµ÷µ½ListenerµÄonMessage()·½·¨¡£
ʹÓÃʾÀý
ÉÏÃæµÄÒ»Õ½éÉÜÁËRocketMQÔÚspring-boot-starter·½Ê½µÄʵÏÖ£¬ÕâÀïͨ¹ýÒ»¸ö×î¼òµ¥µÄÏûÏ¢·¢ËͺÍÏû·ÑµÄÀý×ÓÀ´½éÉÜÈçºÎʹÕâ¸örocketmq-spring-boot-starter¡£
4.1 RocketMQ·þÎñ¶ËµÄ×¼±¸
Æô¶¯NameServerºÍBroker
ÒªÑéÖ¤RocketMQµÄSpring-Boot¿Í»§¶Ë£¬Ê×ÏÈҪȷ±£RocketMQ·þÎñÕýÈ·µÄÏÂÔØ²¢Æô¶¯¡£¿ÉÒԲο¼RocketMQÖ÷Õ¾µÄ¿ìËÙ¿ªÊ¼À´½øÐвÙ×÷¡£È·±£Æô¶¯NameServerºÍBrokerÒѾÕýÈ·Æô¶¯¡£
2.´´½¨ÊµÀýÖÐËùÐèÒªµÄTopics
ÔÚÖ´ÐÐÆô¶¯ÃüÁîµÄĿ¼ÏÂÖ´ÐÐÏÂÃæµÄÃüÁîÐвÙ×÷
bash bin/mqadmin
updateTopic -c DefaultCluster -t string-topic |
4.2. ±àÒërocketmq-spring-boot-starter
ĿǰµÄspring-boot-starterÒÀÀµ»¹Ã»ÓÐÌá½»µÄMavenµÄÖÐÐĿ⣬Óû§Ê¹ÓÃǰÐèÒª×ÔÐÐÏÂÔØgitÔ´Â룬ȻºóÖ´ÐÐmvn
clean install °²×°µ½±¾µØ²Ö¿â¡£
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install |
4.3. ±àд¿Í»§¶Ë´úÂë
Óû§Èç¹ûʹÓÃËü£¬ÐèÒªÔÚÏûÏ¢µÄ·¢²¼ºÍÏû·Ñ¿Í»§¶ËµÄmavenÅäÖÃÎļþpom.xmlÖÐÌí¼ÓÈçϵÄÒÀÀµ£º
<properties>
<spring-boot-starter-rocketmq-version>1.0.0-SNAPSHOT</spring-boot-starter-rocketmq-version>
</properties>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>${spring-boot-starter-rocketmq-version}</version>
</dependency> |
ÊôÐÔspring-boot-starter-rocketmq-versionµÄȡֵΪ£º1.0.0-SNAPSHOT£¬
ÕâÓëÉÏÒ»²½ÖèÖÐÖ´Ðа²×°µ½±¾µØ²Ö¿âµÄ°æ±¾Ò»Ö¡£
ÏûÏ¢·¢ËͶ˵ĴúÂë
·¢ËͶ˵ÄÅäÖÃÎļþapplication.properties
# ¶¨Òåname-serverµØÖ·
spring.rocketmq.name-server=localhost:9876
# ¶¨Òå·¢²¼Õß×éÃû
spring.rocketmq.producer.group=my-group1
# ¶¨ÒåÒª·¢Ë͵Ätopic
spring.rocketmq.topic=string-topic |
·¢ËͶ˵ÄJava´úÂë
import org.apache.rocketmq.spring.starter.core. RocketMQTemplate;
...
@SpringBootApplication
public class ProducerApplication implements
CommandLineRunner {
// ÉùÃ÷²¢ÒýÓÃRocketMQTemplate
@Resource
private RocketMQTemplate rocketMQTemplate;
// ʹÓÃapplication.propertiesÀﶨÒåµÄtopicÊôÐÔ
@Value("${spring.rocketmq.springTopic}")
private String springTopic;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class,
args);
}
public void run(String... args) throws Exception
{
// ÒÔͬ²½µÄ·½Ê½·¢ËÍ×Ö·û´®ÏûÏ¢¸øÖ¸¶¨µÄtopic
SendResult sendResult = rocketMQTemplate.syncSend(springTopic,
"Hello, World!");
// ´òÓ¡·¢Ëͽá¹ûÐÅÏ¢
System.out.printf("string-topic syncSend1
sendResult=%s %n", sendResult);
}
} |
ÏûÏ¢Ïû·Ñ¶Ë´úÂë
Ïû·Ñ¶ËµÄÅäÖÃÎļþapplication.properties
# ¶¨Òåname-serverµØÖ·
spring.rocketmq.name-server=localhost:9876
# ¶¨Òå·¢²¼Õß×éÃû
spring.rocketmq.consumer.group=my-customer-group1
# ¶¨ÒåÒª·¢Ë͵Ätopic
spring.rocketmq.topic=string-topic |
Ïû·Ñ¶ËµÄJava´úÂë
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,
args);
}
}
// ÉùÃ÷Ïû·ÑÏûÏ¢µÄÀ࣬²¢ÔÚ×¢½âÖÐÖ¸¶¨£¬Ïà¹ØµÄÏû·ÑÐÅÏ¢
@Service
@RocketMQMessageListener(topic = "${spring.rocketmq.topic}",
consumerGroup = "${spring.rocketmq.consumer.group}")
class StringConsumer implements RocketMQListener
{
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer
received: %s %f", message);
}
} |
ÕâÀïÖ»ÊǼòµ¥µÄ½éÉÜÁËʹÓÃspring-bootÀ´±àд×î»ù±¾µÄÏûÏ¢·¢ËͺͽÓÊյĴúÂ룬Èç¹ûÐèÒªÁ˽â¸ü¶àµÄµ÷Ó÷½Ê½£¬Èç:
Òì²½·¢ËÍ£¬¶ÔÏóÏûÏ¢Ì壬ָ¶¨tag±êÇ©ÒÔ¼°Ö¸¶¨ÊÂÎñÏûÏ¢£¬Çë²Î¿´githubµÄ˵Ã÷ÎĵµºÍÏêϸµÄ´úÂë¡£ÎÒÃǺóÐø»¹»á¶ÔÕâЩ¸ß¼¶¹¦ÄܽøÐÐÂ½ÐøµÄ½éÉÜ¡£ |