±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcnblogs£¬ÎÄÕ´ÓÓ¦Óó¡¾°¼Ü¹¹½éÉÜ£¬°üÀ¨Ê¹Óùý³Ì£¬ÔõôʵÏÖµÈÏà¹ØÄÚÈÝ¡£
|
|
Ò»¡¢RabbitMQ
AMQP£¬¼´Advanced Message Queuing Protocol£¬¸ß¼¶ÏûÏ¢¶ÓÁÐÐÒ飬ÊÇÓ¦ÓòãÐÒéµÄÒ»¸ö¿ª·Å±ê×¼£¬ÎªÃæÏòÏûÏ¢µÄÖмä¼þÉè¼Æ¡£ÏûÏ¢Öмä¼þÖ÷ÒªÓÃÓÚ×é¼þÖ®¼äµÄ½âñÏûÏ¢µÄ·¢ËÍÕßÎÞÐèÖªµÀÏûϢʹÓÃÕߵĴæÔÚ£¬·´Ö®ÒàÈ»¡£
AMQPµÄÖ÷ÒªÌØÕ÷ÊÇÃæÏòÏûÏ¢¡¢¶ÓÁС¢Â·ÓÉ£¨°üÀ¨µã¶ÔµãºÍ·¢²¼/¶©ÔÄ£©¡¢¿É¿¿ÐÔ¡¢°²È«¡£
RabbitMQÊÇÒ»¸ö¿ªÔ´µÄAMQPʵÏÖ£¬·þÎñÆ÷¶ËÓÃErlangÓïÑÔ±àд£¬Ö§³Ö¶àÖÖ¿Í»§¶Ë£¬È磺Python¡¢Ruby¡¢.NET¡¢Java¡¢JMS¡¢C¡¢PHP¡¢ActionScript¡¢XMPP¡¢STOMPµÈ£¬Ö§³ÖAJAX¡£ÓÃÓÚÔÚ·Ö²¼Ê½ÏµÍ³Öд洢ת·¢ÏûÏ¢£¬ÔÚÒ×ÓÃÐÔ¡¢À©Õ¹ÐÔ¡¢¸ß¿ÉÓÃÐԵȷ½Ãæ±íÏÖ²»Ëס£
¶þ¡¢RabbitMQµÄʹÓó¡¾°
¶ÔÓÚÒ»¸ö´óÐ͵ÄÈí¼þϵͳÀ´Ëµ£¬Ëü»áÓкܶàµÄ×é¼þ»òÕß˵ģ¿é»òÕß˵×Óϵͳ»òÕߣ¨subsystem or Component
or submodule£©¡£ÄÇôÕâЩģ¿éµÄÈçºÎͨÐÅ£¿ÕâºÍ´«Í³µÄIPCÓкܴóµÄÇø±ð¡£´«Í³µÄIPCºÜ¶à¶¼ÊÇÔÚµ¥Ò»ÏµÍ³Éϵģ¬Ä£¿éñîºÏÐԺܴ󣬲»ÊʺÏÀ©Õ¹£¨Scalability£©£»Èç¹ûʹÓÃsocketÄÇô²»Í¬µÄÄ£¿éµÄÈ·¿ÉÒÔ²¿Êðµ½²»Í¬µÄ»úÆ÷ÉÏ£¬µ«ÊÇ»¹ÊÇÓкܶàÎÊÌâÐèÒª½â¾ö¡£±ÈÈ磺
1£©ÐÅÏ¢µÄ·¢ËÍÕߺͽÓÊÕÕßÈçºÎά³ÖÕâ¸öÁ¬½Ó£¬Èç¹ûÒ»·½µÄÁ¬½ÓÖжϣ¬ÕâÆÚ¼äµÄÊý¾ÝÈçºÎ·½Ê½¶ªÊ§£¿
2£©ÈçºÎ½µµÍ·¢ËÍÕߺͽÓÊÕÕßµÄñîºÏ¶È£¿
3£©ÈçºÎÈÃPriority¸ßµÄ½ÓÊÕÕßÏȽӵ½Êý¾Ý£¿
4£©ÈçºÎ×öµ½load balance£¿ÓÐЧ¾ùºâ½ÓÊÕÕߵĸºÔØ£¿
5£©ÈçºÎÓÐЧµÄ½«Êý¾Ý·¢Ë͵½Ïà¹ØµÄ½ÓÊÕÕߣ¿Ò²¾ÍÊÇ˵½«½ÓÊÕÕßsubscribe ²»Í¬µÄÊý¾Ý£¬ÈçºÎ×öÓÐЧµÄfilter¡£
6£©ÈçºÎ×öµ½¿ÉÀ©Õ¹£¬ÉõÖÁ½«Õâ¸öͨÐÅÄ£¿é·¢µ½clusterÉÏ£¿
7£©ÈçºÎ±£Ö¤½ÓÊÕÕß½ÓÊÕµ½ÁËÍêÕû£¬ÕýÈ·µÄÊý¾Ý£¿
AMDQÐÒé½â¾öÁËÒÔÉϵÄÎÊÌ⣬¶øRabbitMQʵÏÖÁËAMQP¡£
Èý¡¢RabbitMQµÄ½á¹¹
RabbitMQµÄÓ¦Óó¡¾°¼Ü¹¹Í¼ÈçÏ£º

Broker£º¼òµ¥À´Ëµ¾ÍÊÇÏûÏ¢¶ÓÁзþÎñÆ÷ʵÌå¡£
Exchange£ºÏûÏ¢½»»»»ú£¬ËüÖ¸¶¨ÏûÏ¢°´Ê²Ã´¹æÔò£¬Â·Óɵ½Äĸö¶ÓÁС£
Queue£ºÏûÏ¢¶ÓÁÐÔØÌ壬ÿ¸öÏûÏ¢¶¼»á±»Í¶Èëµ½Ò»¸ö»ò¶à¸ö¶ÓÁС£
Binding£º°ó¶¨£¬ËüµÄ×÷ÓþÍÊǰÑexchangeºÍqueue°´ÕÕ·ÓɹæÔò°ó¶¨ÆðÀ´¡£
Routing Key£ºÂ·Óɹؼü×Ö£¬exchange¸ù¾ÝÕâ¸ö¹Ø¼ü×Ö½øÐÐÏûϢͶµÝ¡£
vhost£ºÐéÄâÖ÷»ú£¬Ò»¸öbrokerÀï¿ÉÒÔ¿ªÉè¶à¸övhost£¬ÓÃ×÷²»Í¬Óû§µÄȨÏÞ·ÖÀë¡£
producer£ºÏûÏ¢Éú²úÕߣ¬¾ÍÊÇͶµÝÏûÏ¢µÄ³ÌÐò¡£
consumer£ºÏûÏ¢Ïû·ÑÕߣ¬¾ÍÊǽÓÊÜÏûÏ¢µÄ³ÌÐò¡£
channel£ºÏûϢͨµÀ£¬ÔÚ¿Í»§¶ËµÄÿ¸öÁ¬½ÓÀ¿É½¨Á¢¶à¸öchannel£¬Ã¿¸öchannel´ú±íÒ»¸ö»á»°ÈÎÎñ¡£
ËÄ¡¢RabbitMQµÄʹÓùý³Ì
AMQPÄ£ÐÍÖУ¬ÏûÏ¢ÔÚproducerÖвúÉú£¬·¢Ë͵½MQµÄexchangeÉÏ£¬exchange¸ù¾ÝÅäÖõÄ·ÓÉ·½Ê½·¢µ½ÏàÓ¦µÄQueueÉÏ£¬QueueÓÖ½«ÏûÏ¢·¢Ë͸øconsumer£¬ÏûÏ¢´Óqueueµ½consumerÓÐpushºÍpullÁ½ÖÖ·½Ê½¡£
ÏûÏ¢¶ÓÁеÄʹÓùý³Ì´ó¸ÅÈçÏ£º
¿Í»§¶ËÁ¬½Óµ½ÏûÏ¢¶ÓÁзþÎñÆ÷£¬´ò¿ªÒ»¸öchannel¡£
¿Í»§¶ËÉùÃ÷Ò»¸öexchange£¬²¢ÉèÖÃÏà¹ØÊôÐÔ¡£
¿Í»§¶ËÉùÃ÷Ò»¸öqueue£¬²¢ÉèÖÃÏà¹ØÊôÐÔ¡£
¿Í»§¶ËʹÓÃrouting key£¬ÔÚexchangeºÍqueueÖ®¼ä½¨Á¢ºÃ°ó¶¨¹ØÏµ¡£
¿Í»§¶ËͶµÝÏûÏ¢µ½exchange¡£
exchange½ÓÊÕµ½ÏûÏ¢ºó£¬¾Í¸ù¾ÝÏûÏ¢µÄkeyºÍÒѾÉèÖõÄbinding£¬½øÐÐÏûϢ·ÓÉ£¬½«ÏûϢͶµÝµ½Ò»¸ö»ò¶à¸ö¶ÓÁÐÀï¡£
exchangeÒ²Óм¸¸öÀàÐÍ£¬ÍêÈ«¸ù¾Ýkey½øÐÐͶµÝµÄ½Ð×öDirect½»»»»ú£¬ÀýÈ磬°ó¶¨Ê±ÉèÖÃÁËrouting
keyΪ¡±abc¡±£¬ÄÇô¿Í»§¶ËÌá½»µÄÏûÏ¢£¬Ö»ÓÐÉèÖÃÁËkeyΪ¡±abc¡±µÄ²Å»áͶµÝµ½¶ÓÁС£
4.0 °²×°ºÍÅäÖÃ
RabbitMQʹÓÃErlangÓïÑÔʵÏÖ£¬Òò´ËÔÚʹÓÃʱÊ×ÏÈÒª°²×°ºÍÅäÖÃerlang»·¾³£¬²¢°²×°·þÎñÆ÷ºó½øÐÐÏà¹ØÅäÖã¬ÓÉÓÚ²»ÊDZ¾ÎÄÖ÷ÒªÄÚÈÝËùÒÔºöÂÔ£¬Ïê¼ûRabbitMQ¼ò½é¡£
RabbitMQµÄ¿Í»§¶ËʹÓÃʱÐèÒªÌí¼ÓÏà¹ØÒÀÀµ¡£
4.1 µã¶Ôµã

ÏûÏ¢Éú²úÕߵĴúÂëÈçÏ£º
package com.zenhobby.rabbit.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send
{
//¶ÓÁÐÃû³Æ
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws
java.io.IOException
{
/**
* ´´½¨Á¬½ÓÁ¬½Óµ½MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//ÉèÖÃMabbitMQËùÔÚÖ÷»úip»òÕßÖ÷»úÃû
factory.setHost("localhost");
//´´½¨Ò»¸öÁ¬½Ó
Connection connection = factory.newConnection();
//´´½¨Ò»¸öƵµÀ
Channel channel = connection.createChannel();
//Ö¸¶¨Ò»¸ö¶ÓÁÐ
channel.queueDeclare(QUEUE_NAME, false, false,
false, null);
//·¢Ë͵ÄÏûÏ¢
String message = "hello world!";
//Íù¶ÓÁÐÖз¢³öÒ»ÌõÏûÏ¢
channel.basicPublish("", QUEUE_NAME,
null, message.getBytes());
System.out.println(" [x] Sent '" + message
+ "'");
//¹Ø±ÕƵµÀºÍÁ¬½Ó
channel.close();
connection.close();
}
} |
ÏûÏ¢Ïû·ÑÕߵĴúÂëÈçÏ£º
package com.zenhobby.rabbit.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Recv
{
//¶ÓÁÐÃû³Æ
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws
java.io.IOException,
java.lang.InterruptedException
{
//´ò¿ªÁ¬½ÓºÍ´´½¨ÆµµÀ£¬Óë·¢ËͶËÒ»Ñù
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//ÉùÃ÷¶ÓÁУ¬Ö÷ҪΪÁË·ÀÖ¹ÏûÏ¢½ÓÊÕÕßÏÈÔËÐд˳ÌÐò£¬¶ÓÁл¹²»´æÔÚʱ´´½¨¶ÓÁС£
channel.queueDeclare(QUEUE_NAME, false, false,
false, null);
System.out.println(" [*] Waiting for messages.
To exit press CTRL+C");
//´´½¨¶ÓÁÐÏû·ÑÕß
QueueingConsumer consumer = new QueueingConsumer(channel);
//Ö¸¶¨Ïû·Ñ¶ÓÁУ¬¹Ø±ÕĬÈϵÄÏûÏ¢Ó¦´ð
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDeliveryÊÇÒ»¸ö×èÈû·½·¨£¨ÄÚ²¿ÊµÏÖÆäʵÊÇ×èÈû¶ÓÁеÄtake·½·¨£©
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '"
+ message + "'");
}
}
} |
¶ÓÁзֱðÔÚÉú²úÕߺÍÏû·ÑÕß´¦´´½¨£¬Ö÷ÒªÊÇΪÁË·ÀÖ¹ÓÐÒ»¶Ë佨Á¢ÆðÀ´µÄʱºò¶ªÊ§ÏûÏ¢¡£
4.2 ¹¤×÷¶ÓÁÐ
¹¤×÷¶ÓÁеÄÖ÷ÒªÈÎÎñÊÇ£º±ÜÃâÁ¢¿ÌÖ´ÐÐ×ÊÔ´Ãܼ¯ÐÍÈÎÎñ£¬È»ºó±ØÐëµÈ´ýÆäÍê³É¡£Ïà·´µØ£¬ÎÒÃǽøÐÐÈÎÎñµ÷¶È£ºÎÒÃǰÑÈÎÎñ·âװΪÏûÏ¢·¢Ë͸ø¶ÓÁС£¹¤×÷½øÐÐÔÚºǫ́ÔËÐв¢²»¶ÏµÄ´Ó¶ÓÁÐÖÐÈ¡³öÈÎÎñÈ»ºóÖ´ÐС£µ±ÄãÔËÐÐÁ˶à¸ö¹¤×÷½ø³Ìʱ£¬ÈÎÎñ¶ÓÁÐÖеÄÈÎÎñ½«»á±»¹¤×÷½ø³Ì¹²ÏíÖ´ÐС£ÕâÑùµÄ¸ÅÄîÔÚwebÓ¦ÓÃÖм«ÆäÓÐÓ㬵±Ôں̵ܶÄHTTPÇëÇó¼äÐèÒªÖ´Ðи´ÔÓµÄÈÎÎñ¡£
1.ÏûÏ¢·Ö·¢»úÖÆ
ĬÈϵģ¬RabbitMQ»áÒ»¸öÒ»¸öµÄ·¢ËÍÐÅÏ¢¸øÏÂÒ»¸öÏû·ÑÕß(consumer)£¬¶ø²»¿¼ÂÇÿ¸öÈÎÎñµÄʱ³¤µÈµÈ£¬ÇÒÊÇÒ»´ÎÐÔ·ÖÅ䣬²¢·ÇÒ»¸öÒ»¸ö·ÖÅ䡣ƽ¾ùµÄÿ¸öÏû·ÑÕß½«»á»ñµÃÏàµÈÊýÁ¿µÄÏûÏ¢¡£ÕâÑù·Ö·¢ÏûÏ¢µÄ·½Ê½½Ð×öround-robin¡£
ĬÈϵÄÈÎÎñ·Ö·¢ËäÈ»¿´Ëƹ«Æ½µ«´æÔÚ±×¶Ë¡£±ÈÈ磺ÏÖÔÚÓÐ2¸öÏû·ÑÕߣ¬ËùÓÐµÄÆæÊýµÄÏûÏ¢¶¼ÊÇ·±Ã¦µÄ£¬¶øÅ¼ÊýÔòÊÇÇáËɵġ£°´ÕÕÂÖѯµÄ·½Ê½£¬ÆæÊýµÄÈÎÎñ½»¸øÁ˵ÚÒ»¸öÏû·ÑÕߣ¬ËùÒÔÒ»Ö±ÔÚæ¸ö²»Í£¡£Å¼ÊýµÄÈÎÎñ½»¸øÁíÒ»¸öÏû·ÑÕߣ¬ÔòÁ¢¼´Íê³ÉÈÎÎñ£¬È»ºóÏеò»ÐС£¶øRabbitMQÔòÊDz»Á˽âÕâЩµÄ¡£ÕâÊÇÒòΪµ±ÏûÏ¢½øÈë¶ÓÁУ¬RabbitMQ¾Í»á·ÖÅÉÏûÏ¢¡£Ëü²»¿´Ïû·ÑÕßΪӦ´ðµÄÊýÄ¿£¬Ö»ÊÇäĿµÄ½«µÚnÌõÏûÏ¢·¢¸øµÚn¸öÏû·ÑÕß¡£
ΪÁ˽â¾öÕâ¸öÎÊÌ⣬ÎÒÃÇʹÓÃbasicQos( prefetchCount = 1)·½·¨£¬À´ÏÞÖÆRabbitMQÖ»·¢²»³¬¹ý1ÌõµÄÏûÏ¢¸øÍ¬Ò»¸öÏû·ÑÕß¡£µ±ÏûÏ¢´¦ÀíÍê±Ïºó£¬ÓÐÁË·´À¡£¬²Å»á½øÐеڶþ´Î·¢ËÍ¡£
int prefetchCount
= 1;
channel.basicQos(prefetchCount); |
ʹÓù«Æ½·Ö·¢£¬±ØÐë¹Ø±Õ×Ô¶¯Ó¦´ð£¬¸ÄΪÊÖ¶¯Ó¦´ð¡£
2. ÏûϢȷÈÏ
ÿ¸öConsumer¿ÉÄÜÐèÒªÒ»¶Îʱ¼ä²ÅÄÜ´¦ÀíÍêÊÕµ½µÄÊý¾Ý¡£Èç¹ûÔÚÕâ¸ö¹ý³ÌÖУ¬Consumer³ö´íÁË£¬Òì³£Í˳öÁË£¬¶øÊý¾Ý»¹Ã»Óд¦ÀíÍê³É£¬ÄÇô·Ç³£²»ÐÒ£¬Õâ¶ÎÊý¾Ý¾Í¶ªÊ§ÁË¡£ÒòΪÎÒÃDzÉÓÃno-ackµÄ·½Ê½½øÐÐÈ·ÈÏ£¬Ò²¾ÍÊÇ˵£¬Ã¿´ÎConsumer½Óµ½Êý¾Ýºó£¬¶ø²»¹ÜÊÇ·ñ´¦ÀíÍê³É£¬RabbitMQ
Server»áÁ¢¼´°ÑÕâ¸öMessage±ê¼ÇΪÍê³É£¬È»ºó´ÓqueueÖÐɾ³ýÁË¡£
ΪÁ˱£Ö¤Êý¾Ý²»±»¶ªÊ§£¬RabbitMQÖ§³ÖÏûϢȷÈÏ»úÖÆ£¬¼´acknowledgments¡£ÎªÁ˱£Ö¤Êý¾ÝÄܱ»ÕýÈ·´¦Àí¶ø²»½ö½öÊDZ»ConsumerÊÕµ½£¬ÄÇôÎÒÃDz»ÄܲÉÓÃno-ack¡£¶øÓ¦¸ÃÊÇÔÚ´¦ÀíÍêÊý¾Ýºó·¢ËÍack¡£ÔÚ´¦ÀíÊý¾Ýºó·¢Ë͵Äack£¬¾ÍÊǸæËßRabbitMQÊý¾ÝÒѾ±»½ÓÊÕ£¬´¦ÀíÍê³É£¬RabbitMQ¿ÉÒÔÈ¥°²È«µÄɾ³ýËüÁË¡£Èç¹ûConsumerÍ˳öÁ˵«ÊÇûÓз¢ËÍack£¬ÄÇôRabbitMQ¾Í»á°ÑÕâ¸öMessage·¢Ë͵½ÏÂÒ»¸öConsumer¡£ÕâÑù¾Í±£Ö¤ÁËÔÚConsumerÒì³£Í˳öµÄÇé¿öÏÂÊý¾ÝÒ²²»»á¶ªÊ§¡£ÕâÀﲢûÓÐÓõ½³¬Ê±»úÖÆ¡£RabbitMQ½ö½öͨ¹ýConsumerµÄÁ¬½ÓÖжÏÀ´È·ÈϸÃMessage²¢Ã»Óб»ÕýÈ·´¦Àí¡£Ò²¾ÍÊÇ˵£¬RabbitMQ¸øÁËConsumer×ã¹»³¤µÄʱ¼äÀ´×öÊý¾Ý´¦Àí¡£
ĬÈÏÇé¿öÏ£¬ÏûϢȷÈÏÊÇ´ò¿ªµÄ£¨enabled£©£º
boolean autoAck
= false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
|
ÐÞ¸ÄÏû·ÑÕßÈçÏ£º
channel.basicQos(1);//±£Ö¤Ò»´ÎÖ»·Ö·¢Ò»¸ö
// ´´½¨¶ÓÁÐÏû·ÑÕß
final Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '"
+ message + "'");
System.out.println(" [x] Proccessing... at
" +new Date().toLocaleString());
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at "
+new Date().toLocaleString());
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}; |
ÆäÖУº
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false); |
ÓÃÓÚÔÚÏûÏ¢´¦ÀíÍê±Ïʱ·µ»ØÓ¦´ð״̬¡£Èç¹ûMQ·þÎñÆ÷δÊÕµ½Ó¦´ðÔòÔÚÏû·ÑÕß¹ÒµôÖ®ºóÖØÐ°ÑÏûÏ¢·ÅÈëµ½¶ÓÁÐÖÐÒÔ¹©ÆäËûÏû·ÑÕßʹÓá£Èç¹û¹Ø±ÕÁË×Ô¶¯ÏûÏ¢Ó¦´ð£¬ÊÖ¶¯Ò²Î´ÉèÖÃÓ¦´ð£¬ÕâÊÇÒ»¸öºÜ¼òµ¥µÄ´íÎ󣬵«ÊǺó¹ûÈ´ÊǼ«ÆäÑÏÖØµÄ¡£ÏûÏ¢ÔÚ·Ö·¢³öÈ¥ÒԺ󣬵ò»µ½»ØÓ¦£¬ËùÒÔ²»»áÔÚÄÚ´æÖÐɾ³ý£¬½á¹ûRabbitMQ»áÔ½À´Ô½Õ¼ÓÃÄڴ棬µ¼Ö·þÎñÆ÷¹Òµô¡£
3. ÏûÏ¢³Ö¾Ã»¯
ΪÁ˱£Ö¤ÔÚRabbitMQÍ˳ö»òÕßcrashÁËÊý¾ÝÈÔûÓжªÊ§£¬ÐèÒª½«queueºÍMessage¶¼Òª³Ö¾Ã»¯¡£
queueµÄ³Ö¾Ã»¯ÐèÒªÔÚÉùÃ÷ʱָ¶¨durable=True£º
channel.queue_declare (queue='hello',
durable=True) |
messageµÄ³Ö¾Ã»¯ÐèÒªÔÚ·¢ËÍʱָ¶¨property£º
channel.basicPublish ("",
QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes()); |
Ð޸ĺóµÄÉú²úÕßÈçÏÂËùʾ£º
static void
Main(string[] args)
{
var factory = new ConnectionFactory() { HostName
= "localhost" };
using ( var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
bool durable = true;
channel.QueueDeclare ("task_queue", durable,
false, false, null);//queueµÄ³Ö¾Ã»¯ÐèÒªÔÚÉùÃ÷ʱָ¶¨durable=True
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);//ÐèÒª³Ö¾Ã»¯Message£¬¼´ÔÚPublishµÄʱºòÖ¸¶¨Ò»¸öproperties£¬
channel.BasicPublish ("", "task_hello",
properties, body);
}
}
} |
4.3 Publish/Subscribe
1. ½»»»Æ÷
ÔÚ¹¤×÷¶ÓÁÐÒ»½ÚÖÐʹÓõķַ¢ÈçÏ£º
channel.basicPublish ("",
"hello", null, message.getBytes()); |
ÆäÖеÚÒ»¸öÈë²ÎΪ¿Õ¼´ÎªÄ¬ÈϵĽ»»»Æ÷£¬½»»»Æ÷ÊÇRabbitMQÖеĸÅÄÆäÖ÷Òª¹¤×÷ÊǽÓÊÜÉú²úÕß·¢³öµÄÏûÏ¢£¬²¢ÍÆË͵½ÏûÏ¢¶ÓÁÐÖУ¨Éú²úÕß²¢Ã»ÓÐÖ±½ÓÏòqueueÖз¢ËÍÈκÎÏûÏ¢£¬¶øÊÇ·¢¸ø½»»»Æ÷Óɽ»»»Æ÷ת½»£©¡£

½»»»Æ÷µÄ¹æÔòÓУº
direct £¨Ö±Á¬£©£º
topic £¨Ö÷Ì⣩
headers £¨±êÌ⣩
fanout £¨·Ö·¢£©
Direct Exchange ¨C ´¦Àí·Óɼü¡£ÐèÒª½«Ò»¸ö¶ÓÁа󶨵½½»»»»úÉÏ£¬ÒªÇó¸ÃÏûÏ¢ÓëÒ»¸öÌØ¶¨µÄ·ÓɼüÍêȫƥÅä¡£ÕâÊÇÒ»¸öÍêÕûµÄÆ¥Åä¡£Èç¹ûÒ»¸ö¶ÓÁа󶨵½¸Ã½»»»»úÉÏÒªÇó·Óɼü
¡°dog¡±£¬ÔòÖ»Óб»±ê¼ÇΪ¡°dog¡±µÄÏûÏ¢²Å±»×ª·¢£¬²»»áת·¢dog.puppy£¬Ò²²»»áת·¢dog.guard£¬Ö»»áת·¢dog¡£
Channel channel
= connection.createChannel();
channel.exchangeDeclare ("exchangeName",
"direct"); //direct fanout topic
channel.queueDeclare ("queueName");
channel.queueBind ("queueName", "exchangeName",
"routingKey");
byte[] messageBodyBytes = "hello world".getBytes();
//ÐèÒª°ó¶¨Â·Óɼü
channel.basicPublish ("exchangeName",
"routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes); |
Fanout Exchange ¨C ²»´¦Àí·Óɼü¡£ÄãÖ»ÐèÒª¼òµ¥µÄ½«¶ÓÁа󶨵½½»»»»úÉÏ¡£Ò»¸ö·¢Ë͵½½»»»»úµÄÏûÏ¢¶¼»á±»×ª·¢µ½Óë¸Ã½»»»»ú°ó¶¨µÄËùÓжÓÁÐÉÏ¡£ºÜÏñ×ÓÍø¹ã²¥£¬Ã¿Ì¨×ÓÍøÄÚµÄÖ÷»ú¶¼»ñµÃÁËÒ»·Ý¸´ÖƵÄÏûÏ¢¡£Fanout½»»»»úת·¢ÏûÏ¢ÊÇ×î¿ìµÄ¡£
Channel channel
= connection.createChannel();
channel.exchangeDeclare ("exchangeName",
"fanout"); //direct fanout topic
channel.queueDeclare ("queueName");
channel.queueBind ("queueName", "exchangeName",
"routingKey");
channel.queueDeclare ("queueName1");
channel.queueBind ("queueName1", " exchangeName",
"routingKey1");
byte[] messageBodyBytes = "hello world".getBytes();
//·ÓɼüÐèÒªÉèÖÃΪ¿Õ
channel.basicPublish ("exchangeName",
"", MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes); |
Topic Exchange ¨C ½«Â·ÓɼüºÍijģʽ½øÐÐÆ¥Åä¡£´Ëʱ¶ÓÁÐÐèÒª°ó¶¨ÒªÒ»¸öģʽÉÏ¡£·ûºÅ¡°#¡±Æ¥ÅäÒ»¸ö»ò¶à¸ö´Ê£¬·ûºÅ¡°*¡±Æ¥Åä²»¶à²»ÉÙÒ»¸ö´Ê¡£Òò´Ë¡°audit.#¡±Äܹ»Æ¥Åäµ½¡°audit.irs.corporate¡±£¬µ«ÊÇ¡°audit.*¡±
Ö»»áÆ¥Åäµ½¡°audit.irs¡±¡£
Channel channel
= connection.createChannel();
channel.exchangeDeclare("exchangeName",
"topic"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName",
"routingKey.*");
byte[] messageBodyBytes = "hello world".getBytes();
channel.basicPublish("exchangeName",
"routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes); |
Header Exchange
HeadersÀàÐ͵ÄexchangeʹÓõıȽÏÉÙ£¬ËüÒ²ÊǺöÂÔroutingKeyµÄÒ»ÖÖ·ÓÉ·½Ê½¡£ÊÇʹÓÃHeadersÀ´Æ¥ÅäµÄ¡£
HeadersÊÇÒ»¸ö¼üÖµ¶Ô£¬¿ÉÒÔ¶¨Òå³ÉHashtable¡£·¢ËÍÕßÔÚ·¢Ë͵Äʱºò¶¨ÒåһЩ¼üÖµ¶Ô£¬½ÓÊÕÕßÒ²¿ÉÒÔÔÙ°ó¶¨Ê±ºò´«ÈëһЩ¼üÖµ¶Ô£¬Á½Õ߯¥ÅäµÄ»°£¬Ôò¶ÔÓ¦µÄ¶ÓÁоͿÉÒÔÊÕµ½ÏûÏ¢¡£Æ¥ÅäÓÐÁ½ÖÖ·½Ê½allºÍany¡£ÕâÁ½ÖÖ·½Ê½ÊÇÔÚ½ÓÊն˱ØÐëÒªÓüüÖµ"x-mactch"À´¶¨Òå¡£
all´ú±í¶¨ÒåµÄ¶à¸ö¼üÖµ¶Ô¶¼ÒªÂú×㣬¶øanyÔò´úÂëÖ»ÒªÂú×ãÒ»¸ö¾Í¿ÉÒÔÁË¡£
fanout£¬direct£¬topic exchangeµÄroutingKey¶¼ÐèÒªÒª×Ö·û´®ÐÎʽµÄ£¬¶øheaders
exchangeÔòûÓÐÕâ¸öÒªÇó£¬ÒòΪ¼üÖµ¶ÔµÄÖµ¿ÉÒÔÊÇÈκÎÀàÐÍ¡£
ÏûÏ¢Éú²úÕßÈçÏ£º
package cn.slimsmart.rabbitmq.demo.headers;
import java.util.Date;
import java.util.Hashtable;
import java.util.Map;
import org.springframework .amqp.core.ExchangeTypes;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client .AMQP.BasicProperties;
import com.rabbitmq.client .AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String EXCHANGE_NAME = "header-exchange";
@SuppressWarnings ("deprecation")
public static void main (String[] args) throws
Exception {
// ´´½¨Á¬½ÓºÍƵµÀ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost ("192.168.36.102");
// Ö¸¶¨Óû§ ÃÜÂë
factory.setUsername ("admin");
factory.setPassword ("admin");
// Ö¸¶¨¶Ë¿Ú
factory.setPort (AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//ÉùÃ÷ת·¢Æ÷ºÍÀàÐÍheaders
channel.exchangeDeclare (EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);
String message = new Date().toLocaleString() +
" : log something";
Map<String,Object> headers = new Hashtable<String,
Object>();
headers.put ("aaa", "01234");
Builder properties = new BasicProperties.Builder();
properties.headers (headers);
// Ö¸¶¨ÏûÏ¢·¢Ë͵½µÄת·¢Æ÷,°ó¶¨¼üÖµ¶Ôheaders¼üÖµ¶Ô
channel.basicPublish (EXCHANGE_NAME, "", properties.build(),message.getBytes());
System.out.println("Sent message :'"
+ message + "'");
channel.close();
connection.close();
}
} |
ÏûÏ¢Ïû·ÑÕßÈçÏ£º
package cn.slimsmart.rabbitmq.demo.headers;
import java.util.Hashtable;
import java.util.Map;
import org.springframework.amqp.core.ExchangeTypes;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private final static String EXCHANGE_NAME = "header-exchange";
private final static String QUEUE_NAME = "header-queue";
public static void main(String[] args) throws
Exception {
// ´´½¨Á¬½ÓºÍƵµÀ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.36.102");
// Ö¸¶¨Óû§ ÃÜÂë
factory.setUsername("admin");
factory.setPassword("admin");
// Ö¸¶¨¶Ë¿Ú
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//ÉùÃ÷ת·¢Æ÷ºÍÀàÐÍheaders
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);
channel.queueDeclare(QUEUE_NAME,false, false,
true,null);
Map<String, Object> headers = new Hashtable<String,
Object>();
headers.put("x-match", "any");//all
any
headers.put("aaa", "01234");
headers.put("bbb", "56789");
// Ϊת·¢Æ÷Ö¸¶¨¶ÓÁУ¬ÉèÖÃbinding °ó¶¨header¼üÖµ¶Ô
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"",
headers);
QueueingConsumer consumer = new QueueingConsumer(channel);
// Ö¸¶¨½ÓÊÕÕߣ¬µÚ¶þ¸ö²ÎÊýΪ×Ô¶¯Ó¦´ð£¬ÎÞÐèÊÖ¶¯Ó¦´ð
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
} |
Default Exchange
Æäʵ³ýÁËÉÏÃæËÄÖÖÒÔÍ⻹ÓÐÒ»ÖÖDefault Exchange£¬ËüÊÇÒ»ÖÖÌØ±ðµÄDirect Exchange¡£
µ±ÄãÊÖ¶¯´´½¨Ò»¸ö¶ÓÁÐʱ£¬ºǫ́»á×Ô¶¯½«Õâ¸ö¶ÓÁа󶨵½Ò»¸öÃû³ÆÎª¿ÕµÄDirectÀàÐͽ»»»»úÉÏ£¬°ó¶¨Â·ÓÉÃû³ÆÓë¶ÓÁÐÃû³ÆÏàͬ¡£ÓÐÁËÕâ¸öĬÈϵĽ»»»»úºÍ°ó¶¨£¬ÎÒÃǾͿÉÒÔÏñÆäËûÇáÁ¿¼¶µÄ¶ÓÁУ¬ÈçRedisÄÇÑù£¬Ö±½Ó²Ù×÷¶ÓÁÐÀ´´¦ÀíÏûÏ¢¡£²»¹ýÖ»ÊÇ¿´ÆðÀ´ÊÇ£¬Êµ¼ÊÉÏÔÚRabbitMQÀïÖ±½Ó²Ù×÷ÊDz»¿ÉÄܵġ£ÏûϢʼÖÕ¶¼ÊÇÏÈ·¢Ë͵½½»»»»ú£¬Óɽ»»»¼¶¾¹ý·ÓÉ´«Ë͸ø¶ÓÁУ¬Ïû·ÑÕßÔÙ´Ó¶ÓÁÐÖлñÈ¡ÏûÏ¢µÄ¡£²»¹ýÓÉÓÚÕâ¸öĬÈϽ»»»»úºÍ·ÓɵĹØÏµ£¬Ê¹ÎÒÃÇÖ»¹ØÐĶÓÁÐÕâÒ»²ã¼´¿É£¬Õâ¸ö±È½ÏÊʺÏ×öһЩ¼òµ¥µÄÓ¦Ó㬱Ͼ¹Ã»Óз¢»ÓRabbitMQµÄ×î´ó¹¦ÄÜ£¬Èç¹û¶¼ÓÃÕâÖÖ·½Ê½È¥Ê¹Óõϰ¾ÍÕæÊÇɱ¼¦ÓÃÔ×Å£µ¶ÁË¡£
2. ÁÙʱ¶ÓÁÐ
Èç¹ûÒªÔÚÉú²úÕߺÍÏû·ÑÕßÖ®¼ä´´½¨Ò»¸öеĶÓÁУ¬ÓÖ²»ÏëʹÓÃÔÀ´µÄ¶ÓÁУ¬ÁÙʱ¶ÓÁоÍÊÇΪÕâ¸ö³¡¾°¶øÉúµÄ£º
Ê×ÏÈ£¬Ã¿µ±ÎÒÃÇÁ¬½Óµ½RabbitMQ£¬ÎÒÃÇÐèÒªÒ»¸öеĿնÓÁУ¬ÎÒÃÇ¿ÉÒÔÓÃÒ»¸öËæ»úÃû³ÆÀ´´´½¨£¬»òÕß˵È÷þÎñÆ÷Ñ¡ÔñÒ»¸öËæ»ú¶ÓÁÐÃû³Æ¸øÎÒÃÇ¡£
Ò»µ©ÎÒÃǶϿªÏû·ÑÕߣ¬¶ÓÁÐÓ¦¸ÃÁ¢¼´±»É¾³ý¡£Java¿Í»§¶ËÌṩqueuedeclare()ΪÎÒÃÇ´´½¨Ò»¸ö·Ç³Ö¾Ã»¯¡¢¶ÀÁ¢¡¢×Ô¶¯É¾³ýµÄ¶ÓÁÐÃû³Æ¡£
String queueName
= channel.queueDeclare().getQueue(); |
ͨ¹ýÉÏÃæµÄ´úÂë¾ÍÄÜ»ñÈ¡µ½Ò»¸öËæ»ú¶ÓÁÐÃû³Æ¡£ ÀýÈ磺Ëü¿ÉÄÜÊÇ£ºamq.gen-jzty20brgko-hjmujj0wlg¡£
3. °ó¶¨ 
Èç¹ûÎÒÃÇÒѾ´´½¨ÁËÒ»¸ö·Ö·¢½»»»Æ÷ºÍ¶ÓÁУ¬ÏÖÔÚÎÒÃǾͿÉÒԾͽ«ÎÒÃǵĶÓÁиú½»»»Æ÷½øÐа󶨡£
channel.queueBind(queueName,
"logs", ""); |
Ö´ÐÐÍêÕâ¶Î´úÂëºó£¬ÈÕÖ¾½»»»Æ÷»á½«ÏûÏ¢Ìí¼Óµ½ÎÒÃǵĶÓÁÐÖС£
Îå¡¢RabbitMQʵÏÖRPC
RabbitMQ¿ÉÒÔÓÃÓÚʵÏÖRPC£¬Á½ÕßÓÐÏàÏñÖ®´¦£¬Ê¹ÓÃRabbitMQʵÏÖRPC·ÖΪÈçϼ¸¸ö²½Ö裺
1. Client interface£¨¿Í»§¶Ë½Ó¿Ú£©
ΪÁË˵Ã÷RPC·þÎñ¿ÉÒÔʹÓã¬ÎÒÃÇ´´½¨Ò»¸ö¼òµ¥µÄ¿Í»§¶ËÀà¡£±©Â¶Ò»¸ö·½·¨¡ª¡ª·¢ËÍRPCÇëÇó£¬È»ºó×èÈûÖ±µ½»ñµÃ½á¹û¡£
FibonacciRpcClient
fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println ( "fib(4) is " + result);
|
2. Callback queue£¨»Øµ÷¶ÓÁУ©
Ò»°ãÔÚRabbitMQÖÐ×öRPCÊǺܼòµ¥µÄ¡£¿Í»§¶Ë·¢ËÍÇëÇóÏûÏ¢£¬·þÎñÆ÷»Ø¸´ÏìÓ¦µÄÏûÏ¢¡£ÎªÁ˽ÓÊÜÏìÓ¦µÄÏûÏ¢£¬ÎÒÃÇÐèÒªÔÚÇëÇóÏûÏ¢Öз¢ËÍÒ»¸ö»Øµ÷¶ÓÁС£¿ÉÒÔÓÃĬÈϵĶÓÁУº
BasicProperties
props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue",
props, message.getBytes());
// ... then code to read a response message from
the callback_queue ... |
3. Message properties£¨ÏûÏ¢ÊôÐÔ£©
AMQPÐÒéΪÏûÏ¢Ô¤¶¨ÒåÁËÒ»×é14¸öÊôÐÔ¡£´ó²¿·ÖµÄÊôÐÔÊǺÜÉÙʹÓõġ£³ýÁËһϼ¸ÖÖ£º
deliveryMode£º±ê¼ÇÏûÏ¢´«µÝģʽ£¬2-ÏûÏ¢³Ö¾Ã»¯£¬ÆäËûÖµ-˲̬¡£ÔÚµÚ¶þƪÎÄÕÂÖл¹Ìáµ½¹ý¡£
contentType£ºÄÚÈÝÀàÐÍ£¬ÓÃÓÚÃèÊö±àÂëµÄmime-type¡£ÀýÈç¾³£Îª¸ÃÊôÐÔÉèÖÃJSON±àÂë¡£
replyTo£ºÓ¦´ð£¬Í¨ÓõĻص÷¶ÓÁÐÃû³Æ
correlationId£º¹ØÁªID£¬·½±ãRPCÏìÓ¦ÓëÇëÇó¹ØÁª
ÎÒÃÇÐèÒªÌí¼ÓÒ»¸öеĵ¼È룺
import com.rabbitmq.client.AMQP.BasicProperties;
|
4. Correlation Id
ÔÚÉÏÊö·½·¨ÖÐΪÿ¸öRPCÇëÇó´´½¨Ò»¸ö»Øµ÷¶ÓÁС£ÕâÊǺܵÍЧµÄ¡£ÐÒÔ˵ÄÊÇ£¬Ò»¸ö½â¾ö·½°¸£º¿ÉÒÔΪÿ¸ö¿Í»§¶Ë´´½¨Ò»¸öµ¥Ò»µÄ»Øµ÷¶ÓÁС£
еÄÎÊÌâ±»Ìá³ö£¬¶ÓÁÐÊÕµ½Ò»Ìõ»Ø¸´ÏûÏ¢£¬µ«ÊDz»Çå³þÊÇÄÇÌõÇëÇóµÄ»Ø¸´¡£ÕâÊǾÍÐèҪʹÓÃcorrelationIdÊôÐÔÁË¡£ÎÒÃÇҪΪÿ¸öÇëÇóÉèÖÃΨһµÄÖµ¡£È»ºó£¬Ôڻص÷¶ÓÁÐÖлñÈ¡ÏûÏ¢£¬¿´¿´Õâ¸öÊôÐÔ£¬¹ØÁªresponseºÍrequest¾ÍÊÇ»ùÓÚÕâ¸öÊôÐÔÖµµÄ¡£Èç¹ûÎÒÃÇ¿´µ½Ò»¸öδ֪µÄcorrelationIdÊôÐÔÖµµÄÏûÏ¢£¬¿ÉÒÔ·ÅÐĵÄÎÞÊÓËü¡ª¡ªËü²»ÊÇÎÒÃÇ·¢Ë͵ÄÇëÇó¡£
Äã¿ÉÄÜÎʵÀ£¬ÎªÊ²Ã´ÒªºöÂԻص÷¶ÓÁÐÖÐδ֪µÄÐÅÏ¢£¬¶ø²»Êǵ±×÷Ò»¸öʧ°Ü£¿ÕâÊÇÓÉÓÚÔÚ·þÎñÆ÷¶Ë¾ºÕùÌõ¼þµÄµ¼Öµġ£ËäÈ»²»Ì«¿ÉÄÜ£¬µ«ÊÇÈç¹ûRPC·þÎñÆ÷ÔÚ·¢Ë͸øÎÒÃǽá¹ûºó£¬·¢ËÍÇëÇó·´À¡Ç°¾Í¹ÒµôÁË£¬ÕâÓпÉÄܻᷢËÍδ֪correlationIdÊôÐÔÖµµÄÏûÏ¢¡£Èç¹û·¢ÉúÁËÕâÖÖÇé¿ö£¬ÖØÆôRPC·þÎñÆ÷½«»áÖØÐ´¦Àí¸ÃÇëÇó¡£Õâ¾ÍÊÇΪʲôÔÚ¿Í»§¶Ë±ØÐëºÜºÃµÄ´¦ÀíÖØ¸´ÏìÓ¦£¬RPCÓ¦¸ÃÊÇÃݵȵġ£
5. ʵÏÖ
ÎÒÃǵÄRPCµÄ´¦ÀíÁ÷³Ì£º
µ±¿Í»§¶ËÆô¶¯Ê±£¬´´½¨Ò»¸öÄäÃûµÄ»Øµ÷¶ÓÁС£
¿Í»§¶ËΪRPCÇëÇóÉèÖÃ2¸öÊôÐÔ£ºreplyTo£ºÉèÖûص÷¶ÓÁÐÃû×Ö£»correlationId£º±ê¼Çrequest¡£
ÇëÇó±»·¢Ë͵½rpc_queue¶ÓÁÐÖС£
RPC·þÎñÆ÷¶Ë¼àÌýrpc_queue¶ÓÁÐÖеÄÇëÇ󣬵±ÇëÇóµ½À´Ê±£¬·þÎñÆ÷¶Ë»á´¦Àí²¢ÇÒ°Ñ´øÓнá¹ûµÄÏûÏ¢·¢Ë͸ø¿Í»§¶Ë¡£½ÓÊյĶÓÁоÍÊÇreplyToÉ趨µÄ»Øµ÷¶ÓÁС£
¿Í»§¶Ë¼àÌý»Øµ÷¶ÓÁУ¬µ±ÓÐÏûϢʱ£¬¼ì²écorrelationIdÊôÐÔ£¬Èç¹ûÓërequestÖÐÆ¥Å䣬ÄǾÍÊǽá¹ûÁË¡£
RPC·þÎñÆ÷¶Ë£¨RPCServer.java£©
/**
* RPC·þÎñÆ÷¶Ë
*
* @author arron
* @date 2015Äê9ÔÂ30ÈÕ ÏÂÎç3:49:01
* @version 1.0
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main( String[] args) throws
Exception {
ConnectionFactory factory = new ConnectionFactory();
// ÉèÖÃMabbitMQËùÔÚÖ÷»úip»òÕßÖ÷»úÃû
factory.setHost("127.0.0.1");
// ´´½¨Ò»¸öÁ¬½Ó
Connection connection = factory.newConnection();
// ´´½¨Ò»¸öƵµÀ
Channel channel = connection.createChannel();
//ÉùÃ÷¶ÓÁÐ
channel.queueDeclare(RPC_QUEUE_NAME, false, false,
false, null);
//ÏÞÖÆ£ºÃ¿´Î×î¶à¸øÒ»¸öÏû·ÑÕß·¢ËÍ1ÌõÏûÏ¢
channel.basicQos(1);
//Ϊrpc_queue¶ÓÁд´½¨Ïû·ÑÕߣ¬ÓÃÓÚ´¦ÀíÇëÇó
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//»ñÈ¡ÇëÇóÖеÄcorrelationIdÊôÐÔÖµ£¬²¢½«ÆäÉèÖõ½½á¹ûÏûÏ¢µÄcorrelationIdÊôÐÔÖÐ
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId (props.getCorrelationId()).build();
//»ñÈ¡»Øµ÷¶ÓÁÐÃû×Ö
String callQueueName = props.getReplyTo();
String message = new String( delivery.getBody(),"UTF-8");
System.out.println(" [.] fib(" + message
+ ")");
//»ñÈ¡½á¹û
String response = "" + fib(Integer.parseInt(message));
//ÏÈ·¢Ëͻص÷½á¹û
channel.basicPublish("", callQueueName,
replyProps,response.getBytes());
//ºóÊÖ¶¯·¢ËÍÏûÏ¢·´À¡
channel.basicAck (delivery.getEnvelope( ).getDeliveryTag(),
false);
}
}
/**
* ¼ÆËãì³²¨ÁÐÆäÊýÁеĵÚnÏî
*
* @param n
* @return
* @throws Exception
*/
private static int fib(int n) throws Exception
{
if (n < 0)
throw new Exception("²ÎÊý´íÎó£¬n±ØÐë´óÓÚµÈÓÚ0");
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
} |
RPC¿Í»§¶Ë£¨RPCClient.java£©£º
/**
*
* @author arron
* @date 2015Äê9ÔÂ30ÈÕ ÏÂÎç3:44:43
* @version 1.0
*/
public class RPCClient {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private Connection connection;
private Channel channel;
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// ÉèÖÃMabbitMQËùÔÚÖ÷»úip»òÕßÖ÷»úÃû
factory.setHost("127.0.0.1");
// ´´½¨Ò»¸öÁ¬½Ó
connection = factory.newConnection();
// ´´½¨Ò»¸öƵµÀ
channel = connection.createChannel();
//ÉùÃ÷¶ÓÁÐ
channel.queueDeclare (RPC_QUEUE_NAME, false, false,
false, null);
//Ϊÿһ¸ö¿Í»§¶Ë»ñȡһ¸öËæ»úµÄ»Øµ÷¶ÓÁÐ
replyQueueName = channel.queueDeclare().getQueue();
//Ϊÿһ¸ö¿Í»§¶Ë´´½¨Ò»¸öÏû·ÑÕß £¨ÓÃÓÚ¼àÌý»Øµ÷¶ÓÁУ¬»ñÈ¡½á¹û£©
consumer = new QueueingConsumer(channel);
//Ïû·ÑÕßÓë¶ÓÁйØÁª
channel.basicConsume ( replyQueueName, true, consumer);
}
/**
* »ñȡ쳲¨ÁÐÆäÊýÁеÄÖµ
*
* @param message
* @return
* @throws Exception
*/
public String call(String message) throws Exception{
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
//ÉèÖÃreplyToºÍcorrelationIdÊôÐÔÖµ
BasicProperties props = new BasicProperties.Builder(
).correlationId( corrId).replyTo( replyQueueName
).build();
//·¢ËÍÏûÏ¢µ½rpc_queue¶ÓÁÐ
channel.basicPublish("", RPC_QUEUE_NAME,
props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties( ).getCorrelationId( ).equals(corrId))
{
response = new String( delivery.getBody(),"UTF-8");
break;
}
}
return response;
}
public static void main( String[] args) throws
Exception {
RPCClient fibonacciRpc = new RPCClient();
String result = fibonacciRpc.call("4");
System.out.println ( "fib(4) is " +
result);
}
} |
ÕâÀïµÄÀý×ÓÖ»ÊÇRabbitMQÖÐRPC·þÎñµÄÒ»¸öʵÏÖ£¬ÄãÒ²¿ÉÒÔ¸ù¾ÝÒµÎñÐèҪʵÏÖ¸ü¶à¡£rpcÓÐÒ»¸öÓŵ㣬Èç¹ûÒ»¸öRPC·þÎñÆ÷´¦Àí²»À´£¬¿ÉÒÔÔÙÔö¼ÓÒ»¸ö¡¢Á½¸ö¡¢Èý¸ö¡£ÎÒÃǵÄÀý×ÓÖеĴúÂ뻹±È½Ï¼òµ¥£¬»¹ÓкܶàÎÊÌâûÓнâ¾ö£º
Èç¹ûûÓз¢ÏÖ·þÎñÆ÷£¬¿Í»§¶ËÈçºÎ´¦Àí£¿
Èç¹û¿Í»§¶ËµÄRPCÇëÇó³¬Ê±ÁËÔõô°ì£¿
Èç¹û·þÎñÆ÷³öÏÖÁ˹ÊÕÏ£¬·¢ÉúÁËÒì³££¬ÊÇ·ñ½«Òì³£·¢Ë͵½¿Í»§¶Ë
ÔÚ´¦ÀíÏûϢǰ£¬ÔõÑù·ÀÖ¹ÎÞЧµÄÏûÏ¢£¿¼ì²é·¶Î§¡¢ÀàÐÍ£¿
|