±à¼ÍƼö: |
½ÓÉÏÎÄ£¬±¾ÎÄÖ÷Òª½éÉÜÁËMQÊÇʲô£¬¼°ËüµÄÓ¦Óó¡¾°£¬ÏûÏ¢·¢ËͺͽÓÊÕÑÝʾÒÔ¼°Ïà¹ØµÄ°¸Àý¡£
±¾ÎÄÀ´×ÔÓÚcsdn£¬ÓÉ»ðÁú¹ûÈí¼þAnna±à¼¡¢ÍƼö¡£ |
|
µÚÒ»Õ £ºÎ¢·þÎñµÄ¼Ü¹¹½éÉÜ·¢Õ¹
µÚ¶þÕ : ΢·þÎñ»·¾³´î½¨
µÚÈýÕÂ Nacos Discovery--·þÎñÖÎÀí
µÚËÄÕÂ
Sentinel--·þÎñÈÝ´í
µÚÎåÕÂ
Gateway--·þÎñÍø¹Ø
µÚÁùÕÂ
Sleuth--Á´Â·×·×Ù
µÚÆßÕÂ
Rocketmq--ÏûÏ¢Çý¶¯
µÚ°ËÕÂ SMS--¶ÌÐÅ·þÎñ
µÚ¾ÅÕÂ
Nacos Config--·þÎñÅäÖÃ
µÚÊ®ÕÂ
Seata--·Ö²¼Ê½ÊÂÎñ
7.1 MQ¼ò½é
7.1.1 ʲôÊÇMQ
MQ£¨Message Queue£© ÊÇÒ»ÖÖ¿ç½ø³ÌµÄͨÐÅ»úÖÆ£¬ÓÃÓÚ´«µÝÏûÏ¢¡£Í¨Ë×µã˵£¬¾ÍÊÇÒ»¸öÏȽøÏȳöµÄÊý¾Ý½á¹¹¡£

7.1.2 MQµÄÓ¦Óó¡¾°
7.1.2.1 Òì²½½âñî
×î³£¼ûµÄÒ»¸ö³¡¾°ÊÇÓû§×¢²áºó£¬ÐèÒª·¢ËÍ×¢²áÓʼþºÍ¶ÌÐÅ֪ͨ£¬ÒÔ¸æÖªÓû§×¢²á³É¹¦¡£´«Í³µÄ×ö·¨ÈçÏ£º

´Ë¼Ü¹¹ÏÂ×¢²á¡¢Óʼþ¡¢¶ÌÐÅÈý¸öÈÎÎñÈ«²¿Íê³Éºó£¬²Å·µ»Ø×¢²á½á¹ûµ½¿Í»§¶Ë£¬Óû§²ÅÄÜʹÓÃÕ˺ŵǼ¡£µ«ÊǶÔÓÚÓû§À´Ëµ£¬×¢²á¹¦ÄÜʵ¼ÊÖ»ÐèҪע²áϵͳ´æ´¢Óû§µÄÕË»§ÐÅÏ¢ºó£¬¸ÃÓû§±ã¿ÉÒԵǼ£¬¶øºóÐøµÄ×¢²á¶ÌÐźÍÓʼþ²»ÊǼ´Ê±ÐèÒª¹Ø×¢µÄ²½Öè¡£
ËùÒÔʵ¼Êµ±Êý¾ÝдÈë×¢²áϵͳºó£¬×¢²áϵͳ¾Í¿ÉÒÔ°ÑÆäËûµÄ²Ù×÷·ÅÈë¶ÔÓ¦µÄÏûÏ¢¶ÓÁÐ
MQ ÖÐÈ»ºóÂíÉÏ·µ»ØÓû§½á¹û£¬ÓÉÏûÏ¢¶ÓÁÐ MQ Òì²½µØ½øÐÐÕâЩ²Ù×÷¡£¼Ü¹¹Í¼ÈçÏ£º

Òì²½½âñîÊÇÏûÏ¢¶ÓÁÐ MQ µÄÖ÷ÒªÌØµã£¬Ö÷ҪĿµÄÊǼõÉÙÇëÇóÏìӦʱ¼äºÍ½âñî¡£Ö÷ÒªµÄʹÓó¡¾°¾ÍÊǽ«±È½ÏºÄʱ¶øÇÒ²»ÐèÒª¼´Ê±£¨Í¬²½£©·µ»Ø½á¹ûµÄ²Ù×÷×÷ΪÏûÏ¢·ÅÈëÏûÏ¢¶ÓÁС£Í¬Ê±£¬ÓÉÓÚʹÓÃÁËÏûÏ¢¶ÓÁÐMQ£¬Ö»Òª±£Ö¤ÏûÏ¢¸ñʽ²»±ä£¬ÏûÏ¢µÄ·¢ËÍ·½ºÍ½ÓÊÕ·½²¢²»ÐèÒª±Ë´ËÁªÏµ£¬Ò²²»ÐèÒªÊܶԷ½µÄÓ°Ï죬¼´½âñîºÏ¡£
7.1.2.2 Á÷Á¿Ï÷·å
Á÷Á¿Ï÷·åÒ²ÊÇÏûÏ¢¶ÓÁÐ MQ µÄ³£Óó¡¾°£¬Ò»°ãÔÚÃëɱ»òÍŶÓÇÀ¹º(¸ß²¢·¢)»î¶¯ÖÐʹÓù㷺¡£
ÔÚÃëɱ»òÍŶÓÇÀ¹º»î¶¯ÖУ¬ÓÉÓÚÓû§ÇëÇóÁ¿½Ï´ó£¬µ¼ÖÂÁ÷Á¿±©Ôö£¬ÃëɱµÄÓ¦ÓÃÔÚ´¦ÀíÈç´Ë´óÁ¿µÄ·ÃÎÊÁ÷Á¿ºó£¬ÏÂÓεÄ֪ͨϵͳÎÞ·¨³ÐÔØº£Á¿µÄµ÷ÓÃÁ¿£¬ÉõÖÁ»áµ¼ÖÂϵͳ±ÀÀ£µÈÎÊÌâ¶ø·¢Éú©֪ͨµÄÇé¿ö¡£Îª½â¾öÕâЩÎÊÌ⣬¿ÉÔÚÓ¦ÓúÍÏÂÓÎ֪ͨϵͳ֮¼ä¼ÓÈëÏûÏ¢¶ÓÁÐ
MQ¡£

Ãëɱ´¦ÀíÁ÷³ÌÈçÏÂËùÊö£º
Óû§·¢Æðº£Á¿ÃëɱÇëÇóµ½ÃëɱҵÎñ´¦Àíϵͳ¡£
Ãëɱ´¦Àíϵͳ°´ÕÕÃëɱ´¦ÀíÂß¼½«Âú×ãÃëɱÌõ¼þµÄÇëÇó·¢ËÍÖÁÏûÏ¢¶ÓÁÐ MQ¡£
ÏÂÓεÄ֪ͨϵͳ¶©ÔÄÏûÏ¢¶ÓÁÐ MQ µÄÃëɱÏà¹ØÏûÏ¢£¬ÔÙ½«Ãëɱ³É¹¦µÄÏûÏ¢·¢Ë͵½ÏàÓ¦Óû§¡£
Óû§ÊÕµ½Ãëɱ³É¹¦µÄ֪ͨ¡£
7.1.3 ³£¼ûµÄMQ²úÆ·*
Ŀǰҵ½çÓкܶàMQ²úÆ·£¬±È½Ï³öÃûµÄÓÐÏÂÃæÕâЩ£º
ZeroMQ
ºÅ³Æ×î¿ìµÄÏûÏ¢¶ÓÁÐϵͳ£¬ÓÈÆäÕë¶Ô´óÍÌÍÂÁ¿µÄÐèÇ󳡾°¡£À©Õ¹ÐԺ㬿ª·¢±È½ÏÁé»î£¬²ÉÓÃCÓïÑÔ
ʵÏÖ£¬Êµ¼ÊÉÏÖ»ÊÇÒ»¸ösocket¿âµÄÖØÐ·â×°£¬Èç¹û×öΪÏûÏ¢¶ÓÁÐʹÓã¬ÐèÒª¿ª·¢´óÁ¿µÄ´úÂë¡£
ZeroMQ½öÌṩ·Ç³Ö¾ÃÐԵĶÓÁУ¬Ò²¾ÍÊÇ˵Èç¹ûdown»ú£¬Êý¾Ý½«»á¶ªÊ§¡£
RabbitMQ
ʹÓÃerlangÓïÑÔ¿ª·¢£¬ÐÔÄܽϺã¬ÊʺÏÓÚÆóÒµ¼¶µÄ¿ª·¢¡£µ«ÊDz»ÀûÓÚ×ö¶þ´Î¿ª·¢ºÍά»¤¡£
ActiveMQ
ÀúÊ·ÓÆ¾ÃµÄApache¿ªÔ´ÏîÄ¿¡£ÒѾÔںܶà²úÆ·Öеõ½Ó¦Óã¬ÊµÏÖÁËJMS1.1¹æ·¶£¬¿ÉÒÔºÍspringjms
ÇáËÉÈںϣ¬ÊµÏÖÁ˶àÖÖÐÒ飬֧³Ö³Ö¾Ã»¯µ½Êý¾Ý¿â£¬¶Ô¶ÓÁÐÊý½Ï¶àµÄÇé¿öÖ§³Ö²»ºÃ¡£
RocketMQ
°¢Àï°Í°ÍµÄMQÖмä¼þ£¬ÓÉjavaÓïÑÔ¿ª·¢£¬ÐÔÄܷdz£ºÃ£¬Äܹ»³Åס˫ʮһµÄ´óÁ÷Á¿£¬¶øÇÒʹÓÃÆðÀ´
ºÜ¼òµ¥¡£
Kafka
KafkaÊÇApacheϵÄÒ»¸ö×ÓÏîÄ¿£¬ÊÇÒ»¸ö¸ßÐÔÄÜ¿çÓïÑÔ·Ö²¼Ê½Publish/SubscribeÏûÏ¢¶ÓÁÐϵͳ£¬
Ïà¶ÔÓÚActiveMQÊÇÒ»¸ö·Ç³£ÇáÁ¿¼¶µÄÏûϢϵͳ£¬³ýÁËÐÔÄܷdz£ºÃÖ®Í⣬»¹ÊÇÒ»¸ö¹¤×÷Á¼ºÃµÄ·Ö²¼
ʽϵͳ¡£
7.2 RocketMQÈëÃÅ
RocketMQÊǰ¢Àï°Í°Í¿ªÔ´µÄ·Ö²¼Ê½ÏûÏ¢Öмä¼þ£¬ÏÖÔÚÊÇApacheµÄÒ»¸ö¶¥¼¶ÏîÄ¿¡£ÔÚ°¢ÀïÄÚ²¿Ê¹ÓÃ
·Ç³£¹ã·º£¬ÒѾ¾¹ýÁË"Ë«11"ÕâÖÖÍòÒÚ¼¶µÄÏûÏ¢Á÷ת¡£
7.2.1 RocketMQ»·¾³´î½¨
½ÓÏÂÀ´ÎÒÃÇÏÈÔÚlinuxƽ̨ϰ²×°Ò»¸öRocketMQµÄ·þÎñ
7.2.1.1 »·¾³×¼±¸
ÏÂÔØRocketMQ
»·¾³ÒªÇó
Linux 64λ²Ù×÷ϵͳ
64bit JDK 1.8+
7.2.1.2 °²×°RocketMQ
1 ÉÏ´«Îļþµ½Linuxϵͳ
[root@spiritmark
rocketmq]# ls /usr/local/src/
rocketmq-all-4.4.0-bin-release.zip |
2 ½âѹµ½°²×°Ä¿Â¼
[root@spiritmark
src] # unzip rocketmq-all-4.4.0-bin-release.zip
[root@spiritmark src] # mv rocketmq-all-4.4.0-bin-release
../rocketmq |
7.2.1.3 Æô¶¯RocketMQ
1. Çл»µ½°²×°Ä¿Â¼
[root@spiritmark
rocketmq]# ls
benchmark bin conf lib LICENSE NOTICE README.md |
2 Æô¶¯NameServer
[root@spiritmark
rocketmq]# nohup ./bin/mqnamesrv &
[1] 1467
# Ö»Òª½ø³Ì²»±¨´í,¾ÍÓ¦¸ÃÊÇÆô¶¯³É¹¦ÁË,¿ÉÒԲ鿴һÏÂÈÕÖ¾
[root@spiritmark rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log
|
3 Æô¶¯Broker
# ±à¼bin/runbroker.sh
ºÍ bin/runserver.shÎļþ,ÐÞ¸ÄÀïÃæµÄ
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g
-Xmn4g"
# ΪJAVA_OPT="${JAVA_OPT} -server -Xms256m
-Xmx256m -Xmn128m"
[root@spiritmark rocketmq]# nohup bin/mqbroker
-n localhost:9876 &
[root@spiritmark rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log
|
7.2.1.4 ²âÊÔRocketMQ
1 ²âÊÔÏûÏ¢·¢ËÍ
[root@spiritmark
rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@spiritmark rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Producer |
2 ²âÊÔÏûÏ¢½ÓÊÕ
[root@spiritmark
rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@spiritmark rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Consumer |
7.2.1.5 ¹Ø±ÕRocketMQ
[root@heima rocketmq]#
bin/mqshutdown broker
[root@heima rocketmq]# bin/mqshutdown namesrv |
7.2.2 RocketMQµÄ¼Ü¹¹¼°¸ÅÄî

ÈçÉÏͼËùʾ£¬ÕûÌå¿ÉÒÔ·Ö³É4¸ö½ÇÉ«£¬·Ö±ðÊÇ£ºNameServer£¬Broker£¬Producer£¬Consumer¡£
Broker(ÓʵÝÔ±)
BrokerÊÇRocketMQµÄºËÐÄ£¬¸ºÔðÏûÏ¢µÄ½ÓÊÕ£¬´æ´¢£¬Í¶µÝµÈ¹¦ÄÜ
NameServer(ÓʾÖ)
ÏûÏ¢¶ÓÁеÄе÷Õߣ¬BrokerÏòËü×¢²á·ÓÉÐÅÏ¢£¬Í¬Ê±ProducerºÍConsumerÏòÆä»ñȡ·ÓÉÐÅÏ¢
Producer(¼Ä¼þÈË)
ÏûÏ¢µÄÉú²úÕߣ¬ÐèÒª´ÓNameServer»ñÈ¡BrokerÐÅÏ¢£¬È»ºóÓëBroker½¨Á¢Á¬½Ó£¬ÏòBroker·¢ËÍÏû
Ï¢
Consumer(ÊÕ¼þÈË)
ÏûÏ¢µÄÏû·ÑÕߣ¬ÐèÒª´ÓNameServer»ñÈ¡BrokerÐÅÏ¢£¬È»ºóÓëBroker½¨Á¢Á¬½Ó£¬´ÓBroker»ñÈ¡Ïû
Ï¢
Topic(µØÇø)
ÓÃÀ´Çø·Ö²»Í¬ÀàÐ͵ÄÏûÏ¢£¬·¢ËͺͽÓÊÕÏûϢǰ¶¼ÐèÒªÏÈ´´½¨Topic£¬Õë¶ÔTopicÀ´·¢ËͺͽÓÊÕÏûÏ¢
Message Queue(Óʼþ)
ΪÁËÌá¸ßÐÔÄܺÍÍÌÍÂÁ¿£¬ÒýÈëÁËMessage Queue£¬Ò»¸öTopic¿ÉÒÔÉèÖÃÒ»¸ö»ò¶à¸öMessage
Queue£¬ÕâÑùÏûÏ¢¾Í¿ÉÒÔ²¢ÐÐÍù¸÷¸öMessage Queue·¢ËÍÏûÏ¢£¬Ïû·ÑÕßÒ²¿ÉÒÔ²¢ÐеĴӶà¸ö
Message Queue¶ÁÈ¡ÏûÏ¢
Message
Message ÊÇÏûÏ¢µÄÔØÌå¡£
Producer Group
Éú²úÕß×飬¼òµ¥À´Ëµ¾ÍÊǶà¸ö·¢ËÍͬһÀàÏûÏ¢µÄÉú²úÕß³ÆÖ®ÎªÒ»¸öÉú²úÕß×é¡£
Consumer Group
Ïû·ÑÕß×飬Ïû·ÑͬһÀàÏûÏ¢µÄ¶à¸ö consumer ʵÀý×é³ÉÒ»¸öÏû·ÑÕß×é¡£
7.2.3 RocketMQ¿ØÖÆÌ¨°²×°
1 ÏÂÔØ

2 ÐÞ¸ÄÅäÖÃÎļþ

3 ´ò³Éjar°ü£¬²¢Æô¶¯
# ½øÈë¿ØÖÆÌ¨ÏîÄ¿£¬½«¹¤³Ì´ò³Éjar°ü
mvn clean package -Dmaven.test.skip=true
# Æô¶¯¿ØÖÆÌ¨
java -jar target/rocketmq-console-ng-1.0.0.jar |
4 ·ÃÎÊ¿ØÖÆÌ¨

7.3 ÏûÏ¢·¢ËͺͽÓÊÕÑÝʾ
½ÓÏÂÀ´ÎÒÃÇʹÓÃJava´úÂëÀ´ÑÝʾÏûÏ¢µÄ·¢ËͺͽÓÊÕ
<dependency>
<groupId> org.apache.rocketmq </groupId>
<artifactId> rocketmq-spring-boot-starter </artifactId>
<version>2.0.2</version> </dependency>
|
7.3.1 ·¢ËÍÏûÏ¢
ÏûÏ¢·¢ËͲ½Öè:
´´½¨ÏûÏ¢Éú²úÕß, Ö¸¶¨Éú²úÕßËùÊôµÄ×éÃû
Ö¸¶¨NameserverµØÖ·
Æô¶¯Éú²úÕß
´´½¨ÏûÏ¢¶ÔÏó£¬Ö¸¶¨Ö÷Ìâ¡¢±êÇ©ºÍÏûÏ¢Ìå
·¢ËÍÏûÏ¢
¹Ø±ÕÉú²úÕß

7.3.2 ½ÓÊÕÏûÏ¢
ÏûÏ¢½ÓÊÕ²½Öè:
´´½¨ÏûÏ¢Ïû·ÑÕß, Ö¸¶¨Ïû·ÑÕßËùÊôµÄ×éÃû
Ö¸¶¨NameserverµØÖ·
Ö¸¶¨Ïû·ÑÕß¶©ÔĵÄÖ÷ÌâºÍ±êÇ©
ÉèÖûص÷º¯Êý£¬±àд´¦ÀíÏûÏ¢µÄ·½·¨
Æô¶¯ÏûÏ¢Ïû·ÑÕß

7.4 °¸Àý
½ÓÏÂÀ´ÎÒÃÇÄ£ÄâÒ»ÖÖ³¡¾°: ϵ¥³É¹¦Ö®ºó£¬Ïòϵ¥Óû§·¢ËͶÌÐÅ¡£Éè¼ÆÍ¼ÈçÏ£º

7.4.1 ¶©µ¥Î¢·þÎñ·¢ËÍÏûÏ¢
1 ÔÚshop-order ÖÐÌí¼ÓrocketmqµÄÒÀÀµ
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version> </dependency>
<dependency> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency> |
2 Ìí¼ÓÅäÖÃ
#rocketmq
rocketmq:
name-server: 192.168.109.131:9876 #rocketMQ·þÎñµÄµØÖ·
producer:
group: shop-order # Éú²úÕß×é
|
3 ±àд²âÊÔ´úÂë

7.4.2 Óû§Î¢·þÎñ¶©ÔÄÏûÏ¢
1 ÐÞ¸Äshop-user Ä£¿éÅäÖÃ

2 ÐÞ¸ÄÖ÷Àà
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
}
|
3 ÐÞ¸ÄÅäÖÃÎļþ
server:
port: 8071
spring:
application:
name: service-user
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql:///shop?serverTimezone =UTC&useUnicode
=true&characterEncoding =utf-8&useSSL=true
username: root
password: root
jpa:
properties:
hibernate:
hbm2ddl:
auto: update
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
#rocketmq
rocketmq:
name-server: 192.168.109.131:9876
|
4 ±àдÏûÏ¢½ÓÊÕ·þÎñ@Slf4j
@Slf4j
@Service("shopSmsService")
//consumerGroup-Ïû·ÑÕß×éÃû topic-ÒªÏû·ÑµÄÖ÷Ìâ
@RocketMQMessageListener(
consumerGroup = "shop-user", //Ïû·ÑÕß×éÃû
topic = "order-topic",//Ïû·ÑÖ÷Ìâ
consumeMode = ConsumeMode.CONCURRENTLY,//Ïû·Ñģʽ,Ö¸¶¨ÊÇ·ñ˳ÐòÏû·Ñ
CONCURRENTLY(ͬ²½,ĬÈÏ) ORDERLY(˳Ðò)
messageModel = MessageModel.CLUSTERING//ÏûϢģʽ BROADCASTING(¹ã²¥)
CLUSTERING(¼¯Èº,ĬÈÏ)
)
public class SmsService implements RocketMQListener<Order>
{
@Override
public void onMessage(Order order) {
log.info("ÊÕµ½Ò»¸ö¶©µ¥ÐÅÏ¢{},½ÓÏÂÀ´·¢ËͶÌÐÅ", JSON.toJSONString(order));
}
}
}
|
5 Æô¶¯·þÎñ£¬Ö´ÐÐϵ¥²Ù×÷£¬¹Û¿´ºǫ́Êä³ö
7.5 ·¢ËͲ»Í¬ÀàÐ͵ÄÏûÏ¢
7.5.1 ÆÕͨÏûÏ¢
RocketMQÌṩÈýÖÖ·½Ê½À´·¢ËÍÆÕͨÏûÏ¢£º¿É¿¿Í¬²½·¢ËÍ¡¢¿É¿¿Òì²½·¢Ëͺ͵¥Ïò·¢ËÍ¡£
¿É¿¿Í¬²½·¢ËÍ
ͬ²½·¢ËÍÊÇÖ¸ÏûÏ¢·¢ËÍ·½·¢³öÊý¾Ýºó£¬»áÔÚÊÕµ½½ÓÊÕ·½·¢»ØÏìÓ¦Ö®ºó²Å·¢ÏÂÒ»¸öÊý¾Ý°üµÄͨѶ·½
ʽ¡£
´ËÖÖ·½Ê½Ó¦Óó¡¾°·Ç³£¹ã·º£¬ÀýÈçÖØÒªÍ¨ÖªÓʼþ¡¢±¨Ãû¶ÌÐÅ֪ͨ¡¢ÓªÏú¶ÌÐÅϵͳµÈ¡£
¿É¿¿Òì²½·¢ËÍ
Òì²½·¢ËÍÊÇÖ¸·¢ËÍ·½·¢³öÊý¾Ýºó£¬²»µÈ½ÓÊÕ·½·¢»ØÏìÓ¦£¬½Ó×Å·¢ËÍϸöÊý¾Ý°üµÄͨѶ·½Ê½¡£·¢ËÍ
·½Í¨¹ý»Øµ÷½Ó¿Ú½ÓÊÕ·þÎñÆ÷ÏìÓ¦£¬²¢¶ÔÏìÓ¦½á¹û½øÐд¦Àí¡£
Òì²½·¢ËÍÒ»°ãÓÃÓÚÁ´Â·ºÄʱ½Ï³¤£¬¶Ô RT ÏìӦʱ¼ä½ÏΪÃô¸ÐµÄÒµÎñ³¡¾°£¬ÀýÈçÓû§ÊÓÆµÉÏ´«ºó֪ͨ
Æô¶¯×ªÂë·þÎñ£¬×ªÂëÍê³ÉºóÍ¨ÖªÍÆËÍתÂë½á¹ûµÈ¡£
µ¥Ïò·¢ËÍ
µ¥Ïò·¢ËÍÊÇÖ¸·¢ËÍ·½Ö»¸ºÔð·¢ËÍÏûÏ¢£¬²»µÈ´ý·þÎñÆ÷»ØÓ¦ÇÒûÓлص÷º¯Êý´¥·¢£¬¼´Ö»·¢ËÍÇëÇó²»
µÈ´ýÓ¦´ð¡£
ÊÊÓÃÓÚijЩºÄʱ·Ç³£¶Ì£¬µ«¶Ô¿É¿¿ÐÔÒªÇó²¢²»¸ßµÄ³¡¾°£¬ÀýÈçÈÕÖ¾ÊÕ¼¯¡£
<!--ÒÀÀµ--> <dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency> <dependency>
<groupId>junit</groupId> <artifactId>junit</artifactId>
</dependency>
|
ÈýÖÖ·¢ËÍ·½Ê½µÄ¶Ô±È

7.5.2 ˳ÐòÏûÏ¢
˳ÐòÏûÏ¢ÊÇÏûÏ¢¶ÓÁÐÌṩµÄÒ»ÖÖÑϸñ°´ÕÕ˳ÐòÀ´·¢²¼ºÍÏû·ÑµÄÏûÏ¢ÀàÐÍ¡£

//ͬ²½Ë³ÐòÏûÏ¢[Ò첽˳Ðò
µ¥Ïò˳Ðòд·¨ÀàËÆ]
public void testSyncSendOrderly() {
//µÚÈý¸ö²ÎÊýÓÃÓÚ¶ÓÁеÄÑ¡Ôñ
rocketMQTemplate.syncSendOrderly("test-topic-1",
"ÕâÊÇÒ»ÌõÒ첽˳ÐòÏûÏ¢", "xxxx");
} |
7.5.3 ÊÂÎñÏûÏ¢
RocketMQÌṩÁËÊÂÎñÏûÏ¢£¬Í¨¹ýÊÂÎñÏûÏ¢¾ÍÄÜ´ïµ½·Ö²¼Ê½ÊÂÎñµÄ×îÖÕÒ»Ö¡£
ÊÂÎñÏûÏ¢½»»¥Á÷³Ì:

Á½¸ö¸ÅÄî:
°ëÊÂÎñÏûÏ¢£ºÔݲ»ÄÜͶµÝµÄÏûÏ¢£¬·¢ËÍ·½ÒѾ³É¹¦µØ½«ÏûÏ¢·¢Ë͵½ÁËRocketMQ·þÎñ¶Ë£¬µ«ÊÇ·þÎñ
¶ËδÊÕµ½Éú²úÕß¶Ô¸ÃÏûÏ¢µÄ¶þ´ÎÈ·ÈÏ£¬´Ëʱ¸ÃÏûÏ¢±»±ê¼Ç³É¡°Ôݲ»ÄÜͶµÝ¡±×´Ì¬£¬´¦ÓÚ¸ÃÖÖ״̬ϵÄ
ÏûÏ¢¼´°ëÊÂÎñÏûÏ¢¡£
ÏûÏ¢»Ø²é£ºÓÉÓÚÍøÂçÉÁ¶Ï¡¢Éú²úÕßÓ¦ÓÃÖØÆôµÈÔÒò£¬µ¼ÖÂijÌõÊÂÎñÏûÏ¢µÄ¶þ´ÎÈ·È϶ªÊ§£¬
RocketMQ·þÎñ¶Ëͨ¹ýɨÃè·¢ÏÖijÌõÏûÏ¢³¤ÆÚ´¦ÓÚ¡°°ëÊÂÎñÏûÏ¢¡±Ê±£¬ÐèÒªÖ÷¶¯ÏòÏûÏ¢Éú²úÕßѯÎʸÃ
ÏûÏ¢µÄ×îÖÕ״̬£¨Commit »òÊÇ Rollback£©£¬¸ÃѯÎʹý³Ì¼´ÏûÏ¢»Ø²é¡£
ÊÂÎñÏûÏ¢·¢ËͲ½Ö裺
·¢ËÍ·½½«°ëÊÂÎñÏûÏ¢·¢ËÍÖÁRocketMQ·þÎñ¶Ë¡£
RocketMQ·þÎñ¶Ë½«ÏûÏ¢³Ö¾Ã»¯Ö®ºó£¬Ïò·¢ËÍ·½·µ»ØAckÈ·ÈÏÏûÏ¢ÒѾ·¢Ëͳɹ¦£¬´ËʱÏûϢΪ°ëÊÂ
ÎñÏûÏ¢¡£
·¢ËÍ·½¿ªÊ¼Ö´Ðб¾µØÊÂÎñÂß¼¡£
·¢ËÍ·½¸ù¾Ý±¾µØÊÂÎñÖ´Ðнá¹ûÏò·þÎñ¶ËÌá½»¶þ´ÎÈ·ÈÏ£¨Commit »òÊÇ Rollback£©£¬·þÎñ¶ËÊÕµ½
Commit ״̬Ôò½«°ëÊÂÎñÏûÏ¢±ê¼ÇΪ¿ÉͶµÝ£¬¶©ÔÄ·½×îÖÕ½«ÊÕµ½¸ÃÏûÏ¢£»·þÎñ¶ËÊÕµ½ Rollback
×´
̬Ôòɾ³ý°ëÊÂÎñÏûÏ¢£¬¶©ÔÄ·½½«²»»á½ÓÊܸÃÏûÏ¢¡£
ÊÂÎñÏûÏ¢»Ø²é²½Ö裺
ÔÚ¶ÏÍø»òÕßÊÇÓ¦ÓÃÖØÆôµÄÌØÊâÇé¿öÏ£¬ÉÏÊö²½Öè4Ìá½»µÄ¶þ´ÎÈ·ÈÏ×îÖÕδµ½´ï·þÎñ¶Ë£¬¾¹ý¹Ì¶¨Ê±
¼äºó·þÎñ¶Ë½«¶Ô¸ÃÏûÏ¢·¢ÆðÏûÏ¢»Ø²é¡£
·¢ËÍ·½ÊÕµ½ÏûÏ¢»Ø²éºó£¬ÐèÒª¼ì²é¶ÔÓ¦ÏûÏ¢µÄ±¾µØÊÂÎñÖ´ÐеÄ×îÖÕ½á¹û¡£
·¢ËÍ·½¸ù¾Ý¼ì²éµÃµ½µÄ±¾µØÊÂÎñµÄ×îÖÕ״̬ÔÙ´ÎÌá½»¶þ´ÎÈ·ÈÏ£¬·þÎñ¶ËÈÔ°´ÕÕ²½Öè4¶Ô°ëÊÂÎñÏûÏ¢
½øÐвÙ×÷¡£
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
import java.util.Date;
//ÏûÏ¢ÊÂÎï״̬¼Ç¼
@Entity(name = "shop_txlog")
@Data
public class TxLog {
@Id
private String txId;
private Date date;
}
|
//@RestController
@Slf4j
public class OrderController4 {
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderServiceImpl4 orderService;
@Autowired
private ProductService productService;
//쵴--fegin
@RequestMapping("/order/prod/{pid}")
public Order order (@PathVariable("pid")
Integer pid) {
log.info("½ÓÊÕµ½{}ºÅÉÌÆ·µÄϵ¥ÇëÇó,½ÓÏÂÀ´µ÷ÓÃÉÌÆ·Î¢·þÎñ²éѯ´ËÉÌÆ·ÐÅÏ¢",
pid);
//µ÷ÓÃÉÌÆ·Î¢·þÎñ,²éѯÉÌÆ·ÐÅÏ¢
Product product = productService.findByPid(pid);
if (product.getPid() == -100) {
Order order = new Order();
order.setOid(-100L);
order.setPname("ϵ¥Ê§°Ü");
return order;
}
log.info("²éѯµ½{}ºÅÉÌÆ·µÄÐÅÏ¢,ÄÚÈÝÊÇ:{}", pid,
JSON.toJSONString(product));
//ϵ¥(´´½¨¶©µ¥)
Order order = new Order();
order.setUid(1);
order.setUsername("²âÊÔÓû§");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.createOrderBefore(order);
log.info("´´½¨¶©µ¥³É¹¦,¶©µ¥ÐÅϢΪ{}", JSON.toJSONString(order));
return order;
}
|

7.6 ÏûÏ¢Ïû·ÑҪעÒâµÄϸ½Ú
@Slf4j
@Service("shopSmsService")
//consumerGroup-Ïû·ÑÕß×éÃû topic-ÒªÏû·ÑµÄÖ÷Ìâ
@RocketMQMessageListener(
consumerGroup = "shop-user", //Ïû·ÑÕß×éÃû
topic = "order-topic",//Ïû·ÑÖ÷Ìâ
consumeMode = ConsumeMode.CONCURRENTLY,//Ïû·Ñģʽ,Ö¸¶¨ÊÇ·ñ˳ÐòÏû·Ñ
CONCURRENTLY(ͬ²½,ĬÈÏ) ORDERLY(˳Ðò)
messageModel = MessageModel.CLUSTERING//ÏûϢģʽ BROADCASTING(¹ã²¥)
CLUSTERING(¼¯Èº,ĬÈÏ)
)
public class SmsService implements RocketMQListener<Order>
{}
|
RocketMQÖ§³ÖÁ½ÖÖÏûϢģʽ:
¹ã²¥Ïû·Ñ: ÿ¸öÏû·ÑÕßʵÀý¶¼»áÊÕµ½ÏûÏ¢,Ò²¾ÍÊÇÒ»ÌõÏûÏ¢¿ÉÒÔ±»Ã¿¸öÏû·ÑÕßʵÀý´¦Àí£»
¼¯ÈºÏû·Ñ: Ò»ÌõÏûÏ¢Ö»Äܱ»Ò»¸öÏû·ÑÕßʵÀýÏû·Ñ
|