±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚÐÓÈʼ¼ÊõÕ¾£¬±¾ÕÂÖ÷Ҫͨ¹ýÒ»¸öСdemo½éÉÜÁËRabbitMQ
×÷ΪÖмä¼þʵÏÖµÄ RPC ģʽ£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
|
|
±³¾°ÖªÊ¶
RabbitMQ
RabbitMQ ÊÇ»ùÓÚ AMQP ÐÒéʵÏÖµÄÒ»¸öÏûÏ¢¶ÓÁУ¨Message Queue£©£¬Message
Queue ÊÇÒ»¸öµäÐ͵ÄÉú²úÕß/Ïû·ÑÕßģʽ¡£Éú²úÕß·¢²¼ÏûÏ¢£¬Ïû·ÑÕßÏû·ÑÏûÏ¢£¬Éú²úÕߺÍÏû·ÑÕßÖ®¼äÊǽâñîµÄ£¬»¥Ï಻֪µÀ¶Ô·½µÄ´æÔÚ¡£

RPC
Remote Procedure Call£ºÔ¶³Ì¹ý³Ìµ÷Óã¬Ò»´ÎÔ¶³Ì¹ý³Ìµ÷ÓõÄÁ÷³Ì¼´¿Í»§¶Ë·¢ËÍÒ»¸öÇëÇóµ½·þÎñ¶Ë£¬·þÎñ¶Ë¸ù¾ÝÇëÇóÐÅÏ¢½øÐд¦Àíºó·µ»ØÏìÓ¦ÐÅÏ¢£¬¿Í»§¶ËÊÕµ½ÏìÓ¦ÐÅÏ¢ºó½áÊø¡£

ÈçºÎʹÓà RabbitMQ ʵÏÖ RPC£¿
ʹÓà RabbitMQ ʵÏÖ RPC£¬ÏàÓ¦µÄ½ÇÉ«ÊÇÓÉÉú²úÕßÀ´×÷Ϊ¿Í»§¶Ë£¬Ïû·ÑÕß×÷Ϊ·þÎñ¶Ë¡£
µ« RPC µ÷ÓÃÒ»°ãÊÇͬ²½µÄ£¬¿Í»§¶ËºÍ·þÎñÆ÷Ò²ÊǽôÃÜñîºÏµÄ¡£¼´¿Í»§¶Ëͨ¹ý IP/ÓòÃûºÍ¶Ë¿ÚÁ´½Óµ½·þÎñÆ÷£¬Ïò·þÎñÆ÷·¢ËÍÇëÇóºóµÈ´ý·þÎñÆ÷·µ»ØÏìÓ¦ÐÅÏ¢¡£
µ« MQ µÄÉú²úÕߺÍÏû·ÑÕßÊÇÍêÈ«½âñîµÄ£¬ÄÇôÈçºÎÓà MQ ʵÏÖ RPC ÄØ£¿ºÜÃ÷ÏÔ¾ÍÊÇ°Ñ MQ µ±×÷Öмä¼þʵÏÖÒ»´ÎË«ÏòµÄÏûÏ¢´«µÝ£º

¿Í»§¶ËºÍ·þÎñ¶Ë¼´ÊÇÉú²úÕßÒ²ÊÇÏû·ÑÕß¡£¿Í»§¶Ë·¢²¼ÇëÇó£¬Ïû·ÑÏìÓ¦£»·þÎñ¶ËÏû·ÑÇëÇ󣬷¢²¼ÏìÓ¦¡£
¾ßÌåʵÏÖ
MQ²¿·ÖµÄ¶¨Òå
ÇëÇóÐÅÏ¢µÄ¶ÓÁÐ
ÎÒÃÇÐèÒªÒ»¸ö¶ÓÁÐÀ´´æ·ÅÇëÇóÐÅÏ¢£¬¿Í»§¶ËÏòÕâ¸ö¶ÓÁз¢²¼ÇëÇóÐÅÏ¢£¬·þÎñ¶ËÏû·Ñ¸Ã¶ÓÁд¦ÀíÇëÇ󡣸öÓÁв»ÐèÒª¸´ÔӵķÓɹæÔò£¬Ö±½ÓʹÓÃ
RabbitMQ ĬÈ쵀 direct exchange À´Â·ÓÉÏûÏ¢¼´¿É¡£
ÏìÓ¦ÐÅÏ¢µÄ¶ÓÁÐ
´æ·ÅÏìÓ¦ÐÅÏ¢µÄ¶ÓÁв»Ó¦Ö»ÓÐÒ»¸ö¡£Èç¹û´æÔÚ¶à¸ö¿Í»§¶Ë£¬²»Äܱ£Ö¤ÏìÓ¦ÐÅÏ¢±»·¢²¼ÇëÇóµÄÄǸö¿Í»§¶ËÏû·Ñµ½¡£ËùÒÔӦΪÿһ¸ö¿Í»§¶Ë´´½¨Ò»¸öÏìÓ¦¶ÓÁУ¬Õâ¸ö¶ÓÁÐÓ¦¸ÃÓɿͻ§¶ËÀ´´´½¨ÇÒÖ»ÄÜÓÉÕâ¸ö¿Í»§¶ËʹÓò¢ÔÚʹÓÃÍê±Ïºóɾ³ý£¬ÕâÀï¿ÉÒÔʹÓÃ
RabbitMQ ÌṩµÄÅÅËû¶ÓÁУ¨Exclusive Queue£©£º
channel.queueDeclare(queue:"",
durable:false, exclusive:true, autoDelete:false,
new HashMap<>()) |
²¢ÇÒÒª±£Ö¤¶ÓÁÐÃûΨһ£¬ÉùÃ÷¶ÓÁÐʱÃû³ÆÉèΪ¿Õ RabbitMQ »áÉú³ÉÒ»¸öΨһµÄ¶ÓÁÐÃû¡£
exclusiveÉèΪtrue±íʾÉùÃ÷Ò»¸öÅÅËû¶ÓÁУ¬ÅÅËû¶ÓÁеÄÌØµãÊÇÖ»Äܱ»µ±Ç°µÄÁ¬½ÓʹÓ㬲¢ÇÒÔÚÁ¬½Ó¹Ø±Õºó±»É¾³ý¡£
Ò»¸ö¼òµ¥µÄ demo£¨Ê¹Óà pull »úÖÆ£©
ÎÒÃÇʹÓÃÒ»¸ö¼òµ¥µÄ demo À´Á˽â¿Í»§¶ËºÍ·þÎñ¶ËµÄ´¦ÀíÁ÷³Ì¡£
·¢²¼ÇëÇó
±àд´úÂëǰµÄÒ»¸öСÎÊÌâ
ÎÒÃÇÔÚÉùÃ÷¶ÓÁÐʱΪÿһ¸ö¿Í»§¶ËÉùÃ÷Á˶ÀÓеÄÏìÓ¦¶ÓÁУ¬ÄÇ·þÎñÆ÷ÔÚ·¢²¼ÏìӦʱÈçºÎÖªµÀ·¢²¼µ½Äĸö¶ÓÁÐÄØ£¿Æäʵ¾ÍÊǿͻ§¶ËÐèÒª¸æËß·þÎñ¶Ë½«ÏìÓ¦·¢²¼µ½Äĸö¶ÓÁУ¬RabbitMQ
ÌṩÁËÕâ¸öÖ§³Ö£¬ÏûÏ¢ÌåµÄPropertiesÖÐÓÐÒ»¸öÊôÐÔreply_to¾ÍÊÇÓÃÀ´±ê¼Ç»Øµ÷¶ÓÁеÄÃû³Æ£¬·þÎñÆ÷ÐèÒª½«ÏìÓ¦·¢²¼µ½reply_toÖ¸¶¨µÄ»Øµ÷¶ÓÁÐÖС£
½â¾öÁËÕâ¸öÎÊÌâÖ®ºóÎÒÃǾͿÉÒÔ±àд¿Í»§¶Ë·¢²¼ÇëÇóµÄ´úÂëÁË£º
// ¶¨ÒåÏìÓ¦»Øµ÷¶ÓÁÐ
String replyQueueName = channel.queueDeclare("",
false, true, false, new HashMap<>()).getQueue();
// ÉèÖûص÷¶ÓÁе½ Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.replyTo(replyQueueName)
.build();
String request = "request"; // ·¢²¼ÇëÇó
channel.basicPublish("", "rpc_queue",
properties, request.getBytes()); |
RabbitMQ ÌṩÁËÒ»ÖÖ¸ü±ã½ÝµÄ»úÖÆÀ´ÊµÏÖ RPC£¬²»ÐèÒª¿Í»§¶Ëÿ´Î¶¼¶¨Ò廨µ÷¶ÓÁУ¬¿Í»§¶Ë·¢²¼ÇëÇóʱ½«replyToÉèΪamq.rabbitmq.reply-to£¬Ïû·ÑÏìӦʱҲָ¶¨Ïû·Ñamq.rabbitmq.reply-to£¬RabbitMQ
»áΪ¿Í»§¶Ë´´½¨Ò»¸öÄÚ²¿¶ÓÁÐ
Ïû·ÑÇëÇó
½ÓÏÂÀ´ÊÇ·þÎñ¶Ë´¦ÀíÇëÇóµÄ²¿·Ö£¬½ÓÊÕµ½ÇëÇóºó¾¹ý´¦Àí½«ÏìÓ¦ÐÅÏ¢·¢²¼µ½reply_toÖ¸¶¨µÄ»Øµ÷¶ÓÁУº
// ·þÎñ¶Ë Consumer
µÄ¶¨Òå
public class RpcServer extends DefaultConsumer
{
public RpcServer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
String response = (msg + " Received");
// »ñÈ¡»Øµ÷¶ÓÁÐÃû
String replyTo = properties.getReplyTo();
// ·¢²¼ÏìÓ¦ÏûÏ¢µ½»Øµ÷¶ÓÁÐ
this.getChannel().basicPublish("",
replyTo, new AMQP.BasicProperties(), response.getBytes());
}
}
... // Æô¶¯·þÎñ¶Ë Consumer
channel.basicConsume("rpc_queue",
true, new RpcServer(channel)); |
½ÓÊÕÏìÓ¦
¿Í»§¶ËÈçºÎ½ÓÊÕ·þÎñÆ÷µÄÏìӦĨ£¿ÓÐÁ½ÖÖ·½Ê½£º1.ÂÖѯµÄÈ¥ pull »Øµ÷¶ÓÁÐÖеÄÏûÏ¢£¬2.Òì²½µÄÏû·Ñ»Øµ÷¶ÓÁÐÖеÄÏûÏ¢¡£ÎÒÃÇÔÚÕâÀï¼òµ¥ÊµÏÖµÚÒ»ÖÖ·½°¸¡£
GetResponse getResponse
= null;
while (getResponse == null) {
getResponse = channel.basicGet(replyQueueName,
true);
}
String response = new String(getResponse.getBody());
|
Ò»¸ö¼òµ¥µÄ»ùÓÚ RabbitMQ µÄ RPC Ä£ÐÍÒѾʵÏÖÁË£¬µ«Õâ¸ö demo ²¢²»ÊµÓã¬ÒòΪ¿Í»§¶Ëÿ´Î·¢ËÍÍêÇëÇó¶¼ÒªÍ¬²½µÄÂÖѯµÈ´ýÏìÓ¦ÏûÏ¢£¬Ö»ÄÜÿ´Î´¦ÀíÒ»¸öÇëÇó¡£RabbitMQ
µÄ pull ģʽЧÂÊÒ²±È½ÏµÍ¡£
ʵÏÖÒ»¸öÍ걸¿ÉÓÃµÄ RPC ģʽÐèÒª×öµÄ¹¤×÷»¹Óкܶ࣬Ҫ´¦ÀíµÄ¹Ø¼üµãÒ²±È½Ï¸´ÔÓ£¬Óо仰½Ð²»ÒªÖظ´ÔìÂÖ×Ó£¬spring
ÒѾʵÏÖÁËÒ»¸öÍ걸¿ÉÓÃµÄ RPC ģʽµÄ¿â£¬½ÓÏÂÀ´ÎÒÃÇÀ´Á˽âһϡ£
Spring Rabbit ÖеÄʵÏÖ
ºÍÉÏÃæ demo µÄ pull ģʽһ´ÎÖ»ÄÜ´¦ÀíÒ»¸öÇëÇóÏà¶ÔÓ¦µÄ£ºÈçºÎÒì²½µÄ½ÓÊÕÏìÓ¦²¢´¦Àí¶à¸öÇëÇóÄØ£¿¹Ø¼üµã¾ÍÔÚÓÚÎÒÃÇÐèÒª¼Ç¼ÇëÇóºÍÏìÓ¦²¢½«ËüÃǹØÁªÆðÀ´£¬RabbitMQ
Ò²ÌṩÁËÖ§³Ö£¬Properties ÖеÄÁíÒ»¸öÊôÐÔcorrelation_idÓÃÀ´±êʶһ¸öÏûÏ¢µÄΨһ
id¡£
²Î¿¼spring-rabbitÖеÄconvertSendAndReceive·½·¨µÄʵÏÖ£¬ÎªÃ¿Ò»´ÎÇëÇóÉú³ÉÒ»¸öΨһµÄcorrelation_id£º
private final
AtomicInteger messageTagProvider = new AtomicInteger();
...
String messageTag = String.valueOf(this.messageTagProvider. incrementAndGet());
...
message.getMessageProperties().setCorrela tionId(messageTag); |
²¢Ê¹ÓÃÒ»¸öConcurrentHashMapÀ´Î¬»¤correlation_idºÍÏìÓ¦ÐÅÏ¢µÄÓ³É䣺
private final
Map<String, PendingReply> replyHolder =
new ConcurrentHashMap<String, PendingReply>();
...
final PendingReply pendingReply = new PendingReply();
this.replyHolder.put(correlationId, pendingReply);
|
PendingReplyÖÐÓÐÒ»¸öBlockingQueue´æ·ÅÏìÓ¦ÐÅÏ¢£¬ÔÚ·¢ËÍÍêÇëÇóÐÅÏ¢ºóµ÷ÓÃBlockingQueueµÄpull·½·¨²¢ÉèÖó¬Ê±Ê±¼äÀ´»ñÈ¡ÏìÓ¦£º
private final
BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
public Message get(long timeout, TimeUnit unit)
throws InterruptedException {
Object reply = this.queue.poll(timeout, unit);
return reply == null ? null : processReply(reply);
} |
ÔÚ»ñÈ¡ÏìÓ¦ºó²»ÂÛ½á¹ûÈçºÎ£¬¶¼»á½«PendingReply´ÓreplyHolderÖÐÒÆ³ý£¬·ÀÖ¹replyHolderÖлýѹ³¬Ê±µÄÏìÓ¦ÏûÏ¢£º
try {
reply = exchangeMessages(exchange, routingKey,
message, correlationData, channel, pendingReply,messageTag);
} finally {
this.replyHolder.remove(messageTag);
...
} |
ÏìÓ¦ÐÅÏ¢ÊǺÎʱÈçºÎ±»·Åµ½Õâ¸öBlockingQueueÖеÄÄØ£¿¿´Ò»ÏÂRabbitTemplate½ÓÊÕÏûÏ¢µÄµØ·½£º
public void onMessage(Message
message) {
String messageTag;
if (this.correlationKey == null) { // using standard
correlationId property
messageTag = message.getMessageProperties().getCorrelationId();
} else {
messageTag = (String) message.getMessageProperties()
.getHeaders().get(this.correlationKey);
}
// ´æÔÚ correlation_id ²ÅÈÏΪÊÇRPCµÄÏìÓ¦ÐÅÏ¢£¬²»´æÔÚʱ²»´¦Àí
if (messageTag == null) {
logger.error("No correlation header in
reply");
return;
} // ´Ó replyHolder ÖÐÈ¡³ö correlation_id ¶ÔÓ¦µÄ PendingReply
PendingReply pendingReply = this.replyHolder.get(messageTag);
if (pendingReply == null) {
if (logger.isWarnEnabled()) {
logger.warn("Reply received after timeout
for " + messageTag);
}
throw new AmqpRejectAndDontRequeueException("Reply
received after timeout");
}
else {
restoreProperties(message, pendingReply);
// ½«ÏìÓ¦ÐÅÏ¢ add µ½ BlockingQueue ÖÐ
pendingReply.reply(message);
}
} |
ÒÔÉ쵀 spring ´úÂëÒþÈ¥Á˺ܶà¶îÍⲿ·ÖµÄ´¦ÀíºÍϸ½Ú£¬Ö»¹Ø×¢¹Ø¼üµÄ²¿·Ö¡£
ÖÁ´ËÒ»¸öÍêÕû¿ÉÓõÄÓÉ RabbitMQ ×÷ΪÖмä¼þʵÏÖµÄ RPC ģʽ¾ÍÍê³ÉÁË¡£
×ܽá
·þÎñ¶Ë
·þÎñ¶ËµÄʵÏֱȽϼòµ¥£¬ºÍÒ»°ãµÄConsumerµÄÇø±ðÖ»ÔÚÓÚÐèÒª½«ÇëÇ󻨏´µ½replyToÖ¸¶¨µÄ queue
Öв¢´øÉÏÏûÏ¢±êʶcorrelation_id¼´¿É
·þÎñ¶ËµÄÒ»µãСÓÅ»¯£º
³¬Ê±µÄ´¦ÀíÊÇÓɿͻ§¶ËÀ´ÊµÏֵģ¬ÄÇ·þÎñ¶ËÓÐûÓпÉÒÔÓÅ»¯µÄµØ·½ÄØ£¿
´ð°¸ÊÇÓеģºÈç¹ûÎÒÃǵķþÎñ¶Ë´¦Àí±È½ÏºÄʱ£¬ÈçºÎÅжϿͻ§¶ËÊÇ·ñ»¹ÔڵȴýÏìӦĨ£¿
ÎÒÃÇ¿ÉÒÔʹÓÃpassive²ÎÊýÈ¥¼ì²éreplyToµÄ queue ÊÇ·ñ´æÔÚ£¬ÒòΪ¿Í»§¶ËÉùÃ÷µÄÊÇÄÚ²¿¶ÓÁУ¬¿Í»§¶ËÈç¹û¶ÏµôÁ´½ÓÁËÕâ¸ö
queue ¾Í²»´æÔÚÁË£¬Õâʱ·þÎñ¶Ë¾ÍÎÞÐè´¦ÀíÕâ¸öÏûÏ¢ÁË¡£
¿Í»§¶Ë
¿Í»§¶Ë³Ðµ£Á˸ü¶àµÄ¹¤×÷Á¿£¬°üÀ¨£º
ÉùÃ÷replyTo¶ÓÁУ¨Ê¹ÓÃamq.rabbitmq.reply-to»á¼òµ¥ºÜ¶à£©
ά»¤ÇëÇóºÍÏìÓ¦ÏûÏ¢£¨Ê¹ÓÃΨһµÄcorrelation_idÀ´¹ØÁª£©
Ïû·Ñ·þÎñ¶ËµÄ·µ»Ø
´¦Àí³¬Ê±µÈÒì³£Çé¿ö£¨Ê¹ÓÃBlockingQueueÀ´×èÈû»ñÈ¡£©
ºÃÔÚ spring ÒѾʵÏÖÁËÒ»Ì×Í걸¿É¿¿µÄ´úÂ룬ÎÒÃÇÔÚÇå³þÁËÁ÷³ÌºÍ¹Ø¼üµãÖ®ºó£¬¿ÉÒÔÖ±½ÓʹÓà spring
ÌṩµÄRabbitTemplate£¬ÎÞÐè×Ô¼ºÊµÏÖ¡£
ʹÓà MQ ʵÏÖ RPC µÄÒâÒå
ͨ¹ý MQ ʵÏÖ RPC ¿´ÆðÀ´±È¿Í»§¶ËºÍ·þÎñÆ÷Ö±½ÓͨѶҪ¸´ÔÓһЩ£¬ÄÇÎÒÃÇΪʲôҪÕâÑù×öÄØ£¿»òÕß˵ÕâÑù×öÓÐʲôºÃ´¦£º
½«¿Í»§¶ËºÍ·þÎñÆ÷½âñ¿Í»§¶ËÖ»ÊÇ·¢²¼Ò»¸öÇëÇóµ½ MQ ²¢Ïû·ÑÕâ¸öÇëÇóµÄÏìÓ¦¡£²¢²»¹ØÐľßÌåÓÉËÀ´´¦ÀíÕâ¸öÇëÇó£¬MQ
ÁíÒ»¶ËµÄÇëÇóµÄÏû·ÑÕß¿ÉÒÔËæÒâÌæ»»³ÉÈκοÉÒÔ´¦ÀíÇëÇóµÄ·þÎñÆ÷£¬²¢²»Ó°Ïìµ½¿Í»§¶Ë¡£
¼õÇá·þÎñÆ÷µÄѹÁ¦£º´«Í³µÄ RPC ģʽÖÐÈç¹û¿Í»§¶ËºÍÇëÇó¹ý¶à£¬·þÎñÆ÷µÄѹÁ¦»á¹ý´ó¡£ÓÉ MQ ×÷ΪÖмä¼þµÄ»°£¬¹ý¶àµÄÇëÇó¶øÊDZ»
MQ Ïû»¯µô£¬·þÎñÆ÷¿ÉÒÔ¿ØÖÆÏû·ÑÇëÇóµÄƵ´Î£¬²¢²»»áÓ°Ïìµ½·þÎñÆ÷¡£
·þÎñÆ÷µÄºáÏòÀ©Õ¹¸ü¼ÓÈÝÒ×£ºÈç¹û·þÎñÆ÷µÄ´¦ÀíÄÜÁ¦²»ÄÜÂú×ãÇëÇóµÄƵ´Î£¬Ö»ÐèÒªÔö¼Ó·þÎñÆ÷À´Ïû·Ñ MQ µÄÏûÏ¢¼´¿É£¬MQ»á°ïÎÒÃÇʵÏÖÏûÏ¢Ïû·ÑµÄ¸ºÔؾùºâ¡£
¿ÉÒÔ¿´³ö RabbitMQ ¶ÔÓÚ RPC ģʽµÄÖ§³ÖÒ²ÊDZȽÏÓѺõأ¬
amq.rabbitmq.reply-to, reply_to, correlation_idÕâÐ©ÌØÐÔ¶¼ËµÃ÷ÁËÕâÒ»µã£¬ÔÙ¼ÓÉÏ
spring-rabbit µÄʵÏÖ£¬¿ÉÒÔÈÃÎÒÃǺܼòµ¥µÄʹÓÃÏûÏ¢¶ÓÁÐģʽµÄ RPC µ÷Óá£
|