±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½éÉÜÁËRabbitMQÏûÏ¢ÖØÊÔ»úÖÆ¡¢½â¾öÃݵÈÐÔ¡¢RabbitMQÇ©ÊÕģʽÅäÖá¢RabbitMQËÀÐŶÓÁС¢RabbitMQ½â¾ö·Ö²¼Ê½ÊÂÎñµÈÄÚÈÝ¡£
À´×ÔÓÚcsdn£¬,ÓÉ»ðÁú¹ûÈí¼þAnna±à¼¡¢ÍƼö¡£ |
|
1.RabbitMQÏûÏ¢ÖØÊÔ»úÖÆ
Ïû·ÑÕßÔÚÏû·ÑÏûÏ¢µÄʱºò£¬Èç¹ûÏû·ÑÕßÒµÎñÂß¼³öÏÖ³ÌÐòÒì³££¬ÕâʱºòÓ¦¸ÃÈçºÎ´¦Àí£¿
´ð°¸£ºÊ¹ÓÃÏûÏ¢ÖØÊÔ»úÖÆ¡£(springbootĬÈÏÓÐÏûÏ¢ÖØÊÔ»úÖÆ)
1.1 ÈçºÎºÏÊÊÑ¡ÔñÖØÊÔ»úÖÆ
Ïû·ÑÕß»ñÈ¡µ½ÏûÏ¢ºó£¬µ÷ÓõÚÈý·½½Ó¿Ú£¬µ«½Ó¿ÚÔÝʱÎÞ·¨·ÃÎÊ£¬ÊÇ·ñÐèÒªÖØÊÔ? £¨ÐèÒªÖØÊÔ»úÖÆ£©
Ïû·ÑÕß»ñÈ¡µ½ÏûÏ¢ºó£¬Å׳öÊý¾Ýת»»Òì³££¬ÊÇ·ñÐèÒªÖØÊÔ?£¨²»ÐèÒªÖØÊÔ»úÖÆ£©ÐèÒª·¢²¼½øÐнâ¾ö¡£
1.2 ÈçºÎʵÏÖÖØÊÔ»úÖÆ
¶ÔÓÚÇé¿ö2£¬Èç¹ûÏû·ÑÕß´úÂëÅ׳öÒì³£ÊÇÐèÒª·¢²¼Ð°汾²ÅÄܽâ¾öµÄÎÊÌ⣬ÄÇô²»ÐèÒªÖØÊÔ£¬ÖØÊÔÒ²ÎÞ¼ÃÓÚÊ¡£Ó¦¸Ã²ÉÓÃÈÕÖ¾¼Ç¼+¶¨Ê±ÈÎÎñjob½¡¿µ¼ì²é+È˹¤½øÐв¹³¥
2.½â¾öÃݵÈÐÔ
ÍøÂçÑÓ³Ù´«ÊäÖУ¬Ïû·Ñ³öÏÖÒì³£»òÕßÊÇÏû·ÑÑÓ³ÙÏû·Ñ£¬»áÔì³ÉMQ½øÐÐÖØÊÔ²¹³¥£¬ÔÚÖØÊÔ¹ý³ÌÖУ¬¿ÉÄÜ»áÔì³ÉÖØ¸´Ïû·Ñ¡£
2.1 Ïû·ÑÕßÈçºÎ±£Ö¤ÏûÏ¢ÃݵÈÐÔ£¬²»±»Öظ´Ïû·Ñ
ʹÓÃÈ«¾ÖMessageIDÅжÏÏû·Ñ·½Ê¹ÓÃͬһ¸ö£¬½â¾öÃݵÈÐÔ¡£
»òÕßʹÓÃÒµÎñÂß¼±£Ö¤Î¨Ò»£¨±ÈÈç¶©µ¥ºÅÂë,±£Ö¤Ò»¸ö¶©µ¥ºÅÂëÖ»¿ÉÄܱ»²åÈëÒ»´ÎÊý¾Ý¿â£©
2.2 ´úÂëʵÏÖ
2.2.1 Éú²úÕß
FanoutIdemProducer
2.2.2 Ïû·ÑÕß
FanoutIdemEamilConsumer
2.2.3 ÅäÖÃÎļþ
spring:
rabbitmq:
####Á¬½ÓµØÖ·
host: 127.0.0.1
####¶Ë¿ÚºÅ
port: 5672
####Õ˺Å
username: guest
####ÃÜÂë
password: guest
### µØÖ·
virtual-host: /springbootfanout
listener:
simple:
retry:
####¿ªÆôÏû·ÑÕßÖØÊÔ
enabled: true
####×î´óÖØÊÔ´ÎÊý
max-attempts: 5
####ÖØÊÔ¼ä¸ô´ÎÊý
initial-interval: 3000 |
3.RabbitMQÇ©ÊÕģʽÅäÖÃ
3.1 ¿ªÆôÊÖ¶¯Ó¦´ðģʽ
spring:
rabbitmq:
listener:
simple:
####¿ªÆôÊÖ¶¯ack
acknowledge-mode: manual |
3.2 Ïû·ÑÕß
/**
* ÃèÊö:Ç©ÊÕģʽÑÝʾ
* @author: myx
* @date: 2019-05-01
* Copyright ? 2019-grape. All rights reserved.
*/
@Component
public class AckEamilConsumer {
/**
* Ïû·ÑÕß
* @param message
*/
@RabbitListener(queues = FanoutConstrant.QUEUE_EMAIL_NAME)
public void process( Message message, @Headers
Map<String, Object> headers, Channel channel)
throws IOException {
System.out.println ( Thread.currentThread().getName()
+ ",ÓʼþÏû·ÑÕß»ñÈ¡Éú²úÕßÏûÏ¢msg:"
+ new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties( ).getMessageId());
// ÊÖ¶¯ack
Long deliveryTag = (Long) headers.get( AmqpHeaders.DELIVERY_TAG);
// ÊÖ¶¯Ç©ÊÕ
channel.basicAck(deliveryTag, false);
}
} |
4.RabbitMQËÀÐŶÓÁÐ
4.1 ʲôÊÇËÀÐŶÓÁÐ
ËÀÐŶÓÁÐÊǵ±ÏûÏ¢ÔÚÒ»¸ö¶ÓÁÐÒòΪÏÂÁÐÔÒò:
ÏûÏ¢±»¾Ü¾ø£¨basic.reject»òbasic.nack£©²¢ÇÒrequeue=false.
ÏûÏ¢TTL¹ýÆÚ
¶ÓÁдﵽ×î´ó³¤¶È£¨¶ÓÁÐÂúÁË£¬ÎÞ·¨ÔÙÌí¼ÓÊý¾Ýµ½mqÖУ©
±ä³ÉÁË ¡°ËÀÐÅ¡± ºó±»ÖØÐÂͶµÝ£¨publish£©µ½ÁíÒ»¸öExchange,¸ÃExchange ¾ÍÊÇDLXÈ»ºó¸ÃExchange
¸ù¾Ý°ó¶¨¹æÔòת·¢µ½¶ÔÓ¦µÄ¶ÓÁÐÉϼàÌý¸Ã¶ÓÁÐ ¾Í¿ÉÒÔÖØÐÂÏû·Ñ.˵°×Á˾ÍÊÇûÓб»Ïû·ÑµÄÏûÏ¢»»¸öµØ·½ÖØÐ±»Ïû·Ñ
Éú²úÕß --> ÏûÏ¢ --> ½»»»»ú --> ¶ÓÁÐ --> ±ä³ÉËÀÐÅ -->
DLX½»»»»ú -->¶ÓÁÐ --> Ïû·ÑÕß
4.2 Ó¦Óó¡¾°·ÖÎö
ÔÚ¶¨ÒåÒµÎñ¶ÓÁеÄʱºò£¬¿ÉÒÔ¿¼ÂÇÖ¸¶¨Ò»¸öËÀÐŽ»»»»ú£¬²¢°ó¶¨Ò»¸öËÀÐŶÓÁУ¬µ±ÏûÏ¢±ä³ÉËÀÐÅʱ£¬¸ÃÏûÏ¢¾Í»á±»·¢Ë͵½¸ÃËÀÐŶÓÁÐÉÏ£¬ÕâÑù¾Í·½±ãÎÒÃDz鿴ÏûϢʧ°ÜµÄÔÒòÁË
channel.basicNack(message.getMessageProperties(
).getDeliveryTag( ), false, false); ¶ªÆúÏûÏ¢
4.3 ÈçºÎʹÓÃËÀÐŽ»»»»ú
¶¨ÒåÒµÎñ£¨ÆÕͨ£©¶ÓÁеÄʱºòÖ¸¶¨²ÎÊý
x-dead-letter-exchange: ÓÃÀ´ÉèÖÃËÀÐźó·¢Ë͵Ľ»»»»ú
x-dead-letter-routing-key£ºÓÃÀ´ÉèÖÃËÀÐŵÄroutingKey
4.4 ´úÂëʵÏÖ
4.4.1 ÅäÖÃ
DeadFanoutConfig
4.4.2 ÓʼþÏû·ÑÕß
DeadEamilConsumer
4.4.3 ËÀÐÅÏû·ÑÕß
DeadConsumer
5.RabbitMQ½â¾ö·Ö²¼Ê½ÊÂÎñ
°¸Àý£º
¾µä°¸Àý£¬ÒÔĿǰÁ÷ÐеãÍâÂôµÄ°¸Àý£¬Óû§Ïµ¥ºó£¬µ÷Óö©µ¥·þÎñ£¬Èú󶩵¥·þÎñµ÷ÓÃÅɵ¥ÏµÍ³Í¨ÖªËÍÍâÂôÈËÔ±Ë͵¥£¬Õâʱºò¶©µ¥ÏµÍ³ÓëÅɵ¥ÏµÍ³²ÉÓÃMQÒ첽ͨѶ¡£
5.1 RabbitMQ½â¾ö·Ö²¼Ê½ÊÂÎñÔÀí
²ÉÓÃ×îÖÕÒ»ÖÂÐÔÔÀí¡£
ÐèÒª±£Ö¤ÒÔÏÂÈýÒªËØ:
1.È·ÈÏÉú²úÕßÒ»¶¨Òª½«Êý¾ÝͶµÝµ½MQ·þÎñÆ÷ÖУ¨²ÉÓÃMQÏûϢȷÈÏ»úÖÆ£©
2.MQÏû·ÑÕßÏûÏ¢Äܹ»ÕýÈ·Ïû·ÑÏûÏ¢£¬²ÉÓÃÊÖ¶¯ACKģʽ£¨×¢ÒâÖØÊÔÃݵÈÐÔÎÊÌ⣩
3.ÈçºÎ±£Ö¤µÚÒ»¸öÊÂÎñÏÈÖ´ÐУ¬²ÉÓò¹³¥»úÖÆ£¬ÔÚ´´½¨Ò»¸ö²¹µ¥Ïû·ÑÕß½øÐмàÌý£¬Èç¹û¶©µ¥Ã»Óд´½¨³É¹¦£¬½øÐв¹µ¥¡£(Èç¹ûµÚÒ»¸öÊÂÎñÖгö´í£¬²¹µ¥Ïû·ÑÕß»áÔÚÖØÐÂÖ´ÐÐÒ»´ÎµÚÒ»¸öÊÂÎñ£¬ÀýÈçµÚÒ»¸öÊÂÎñÊÇÌí¼Ó¶©µ¥±í£¬Èç¹ûʧ°ÜÔÚ²¹µ¥µÄʱºòÖØÐÂÉú³É¶©µ¥¼Ç¼£¬ÓÉÓÚ¶©µ¥ºÅΨһ£¬ËùÒÔ²»»áÖØ¸´)

5.2 Ä£Äâ´úÂëʵÏÖ
5.2.1 Éú²úÕß
@Service
public class OrderService extends BaseApiService
implements RabbitTemplate.ConfirmCallback {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
public ResponseBase addOrderAndDispatch()
{
OrderEntity orderEntity = new OrderEntity();
orderEntity.setName("¶©µ¥Ãû³Æ");
orderEntity.setOrderCreatetime(new Date());
// ¼Û¸ñÊÇ300Ôª
orderEntity.setOrderMoney(300d);
// ״̬Ϊ δ֧¸¶
orderEntity.setOrderState(0);
Long commodityId = 30l;
// ÉÌÆ·id
orderEntity.setCommodityId(commodityId);
String orderId = UUID.randomUUID().toString();
orderEntity.setOrderId(orderId);
// 1.ÏÈϵ¥£¬´´½¨¶©µ¥ (Íù¶©µ¥Êý¾Ý¿âÖвåÈëÒ»ÌõÊý¾Ý)
int orderResult = orderMapper.addOrder(orderEntity);
System.out.println("orderResult:"
+ orderResult);
if (orderResult <= 0) {
return setResultError("ϵ¥Ê§°Ü!");
}
// 2.ʹÓÃÏûÏ¢Öмä¼þ½«²ÎÊý´æÔÚÅɵ¥¶ÓÁÐÖÐ
send(orderId);
return setResultSuccess();
} private void send(String orderId) {
JSONObject jsonObect = new JSONObject();
jsonObect.put("orderId", orderId);
String msg = jsonObect.toJSONString();
System.out.println("msg:" + msg);
// ·â×°ÏûÏ¢
Message message = MessageBuilder.withBody( msg.getBytes() ).setContentType( MessageProperties.CONTENT_TYPE_JSON )
.setContentEncoding("utf-8" ).setMessageId(
orderId ).build();
// ¹¹½¨»Øµ÷·µ»ØµÄÊý¾Ý
CorrelationData correlationData = new CorrelationData(orderId);
// ·¢ËÍÏûÏ¢
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.convertAndSend("order_exchange_name",
" orderRoutingKey", message, correlationData); } // Éú²úÏûϢȷÈÏ»úÖÆ
@Override
public void confirm(CorrelationData correlationData,
boolean ack, String cause) {
String orderId = correlationData.getId();
System.out.println("ÏûÏ¢id:" + correlationData.getId());
if (ack) {
System.out.println("ÏûÏ¢·¢ËÍÈ·Èϳɹ¦");
} else {
send(orderId);
System.out.println("ÏûÏ¢·¢ËÍÈ·ÈÏʧ°Ü:" + cause);
} } } |
5.2.2 ²¹µ¥Ïû·ÑÕß
@Component
public class CreateOrderConsumer {
@Autowired
private OrderMapper orderMapper;
@RabbitListener(queues = "order_create_queue")
public void process(Message message, @Headers
Map<String, Object> headers, Channel channel)
throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("²¹µ¥Ïû·ÑÕß" + msg +
",ÏûÏ¢id:" + messageId);
JSONObject jsonObject = JSONObject.parseObject(msg);
String orderId = jsonObject.getString("orderId");
// Åж϶©µ¥ÊÇ·ñ´æÔÚ£¬Èç¹û²»´æÔÚ ÊµÏÖ×Ô¶¯²¹µ¥»úÖÆ
OrderEntity orderEntityResult = orderMapper.findOrderId(orderId);
if (orderEntityResult != null) {
System.out.println("¶©µ¥ÒѾ´æÔÚ ÎÞÐè²¹µ¥ orderId:"
+ orderId);
return;
}
// ¶©µ¥²»´æÔÚ £¬ÔòÐèÒª½øÐв¹µ¥ OrderEntity orderEntity = new OrderEntity();
orderEntity.setName("¶©µ¥Ãû³Æ");
orderEntity.setOrderCreatetime(new Date());
// ¼Û¸ñÊÇ300Ôª
orderEntity.setOrderMoney(300d);
// ״̬Ϊ δ֧¸¶
orderEntity.setOrderState(0);
Long commodityId = 30l;
// ÉÌÆ·id
orderEntity.setCommodityId( commodityId);
orderEntity.setOrderId(orderId);
// 1.ÏÈϵ¥£¬´´½¨¶©µ¥ (Íù¶©µ¥Êý¾Ý¿âÖвåÈëÒ»ÌõÊý¾Ý)
try {
int orderResult = orderMapper.addOrder(orderEntity);
System.out.println("orderResult:"
+ orderResult);
if (orderResult >= 0) {
// ÊÖ¶¯Ç©ÊÕÏûÏ¢,֪ͨmq·þÎñÆ÷¶Ëɾ³ý¸ÃÏûÏ¢
channel.basicAck( message.getMessageProperties().getDeliveryTag(
), false);
}
} catch (Exception e) {
// ¶ªÆú¸ÃÏûÏ¢
channel.basicNack( message.getMessageProperties(
).getDeliveryTag( ), false, false);
} }
} |
5.2.3 RabbitmqConfig
@Component
public class RabbitmqConfig { // ϵ¥²¢ÇÒÅɵ¥´æ¶ÓÁÐ
public static final String ORDER_DIC_QUEUE =
"order_dic_queue";
// ²¹µ¥¶ÓÁУ¬Åж϶©µ¥ÊÇ·ñÒѾ±»´´½¨
public static final String ORDER_CREATE_QUEUE
= "order_create_queue";
// ϵ¥²¢ÇÒÅɵ¥½»»»»ú
private static final String ORDER_EXCHANGE_NAME
= "order_exchange_name"; // 1.¶¨Òå¶©µ¥¶ÓÁÐ
@Bean
public Queue directOrderDicQueue() {
return new Queue(ORDER_DIC_QUEUE);
} // 2.¶¨Òå²¹¶©µ¥¶ÓÁÐ
@Bean
public Queue directCreateOrderQueue() {
return new Queue(ORDER_CREATE_QUEUE);
} // 2.¶¨Òå½»»»»ú
@Bean
DirectExchange directOrderExchange() {
return new DirectExchange(ORDER_EXCHANGE_NAME);
} // 3.¶©µ¥¶ÓÁÐÓë½»»»»ú°ó¶¨
@Bean
Binding bindingExchangeOrderDicQueue() {
return BindingBuilder.bind( directOrderDicQueue(
)).to(directOrderExchange( )).with("orderRoutingKey");
} // 3.²¹µ¥¶ÓÁÐÓë½»»»»ú°ó¶¨
@Bean
Binding bindingExchangeCreateOrder() {
return BindingBuilder.bind( directCreateOrderQueue(
)).to(directOrderExchange( )).with("orderRoutingKey");
} } |
5.2.4 Åɵ¥·þÎñ-Ïû·ÑÕß
@Component
public class DispatchConsumer {
@Autowired
private DispatchMapper dispatchMapper; @RabbitListener(queues = "order_dic_queue")
public void process(Message message, @Headers
Map<String, Object> headers, Channel channel)
throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("Åɵ¥·þÎñƽ̨" + msg
+ ",ÏûÏ¢id:" + messageId);
JSONObject jsonObject = JSONObject.parseObject(msg);
String orderId = jsonObject.getString("orderId");
if (StringUtils.isEmpty(orderId)) {
// ÈÕÖ¾¼Ç¼
return;
}
DispatchEntity dispatchEntity = new DispatchEntity();
// ¶©µ¥id
dispatchEntity.setOrderId(orderId);
// ÍâÂôÔ±id
dispatchEntity.setTakeoutUserId(12l);
// ÍâÂô·Ïß
dispatchEntity.setDispatchRoute("40,40");
try {
int insertDistribute = dispatchMapper.insertDistribute( dispatchEntity);
if (insertDistribute > 0) {
// ÊÖ¶¯Ç©ÊÕÏûÏ¢,֪ͨmq·þÎñÆ÷¶Ëɾ³ý¸ÃÏûÏ¢
channel.basicAck( message.getMessageProperties(
).getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
// // ¶ªÆú¸ÃÏûÏ¢
channel.basicNack( message.getMessageProperties(
).getDeliveryTag(), false, false);
}
} } |
|