±à¼ÍƼö: |
±¾ÆªÎÄÕ¼òµ¥½éÉÜÁËRabbitMQÔÚÎÒÃÇÏîÄ¿¿ª·¢Öг£Óõļ¸ÖÖÌØÐÔ¡£ÕâÐ©ÌØÐÔ¿ÉÒÔ°ïÖúÎÒÃǸüºÃµÄ½«RabbitÓÃÓÚÎÒÃDz»Í¬µÄÒµÎñ³¡¾°ÖС£
À´×ÔÓÚ²©¿ÍÔ°,ÓÉ»ðÁú¹ûÈí¼þAnna±à¼¡¢ÍƼö¡£ |
|
·Ö²¼Ê½ÏµÍ³ÏûÏ¢Öмä¼þ¡ª¡ªRabbitMQµÄʹÓýø½×ƪ#
ǰÑÔ#
ÉÏһƪÎÄÕ·ֲ¼Ê½ÏµÍ³ÏûÏ¢Öмä¼þ¡ª¡ªRabbitMQµÄʹÓûù´¡Æª ¼òµ¥×ܽáÁË·Ö²¼Ê½ÏµÍ³ÖеÄÏûÏ¢Öмä¼þÒÔ¼°RabbitMQµÄ»ù±¾Ê¹Óã¬ÕâÆªÎÄÕÂÖ÷Òª×ܽáÒ»ÏÂRabbitMQÔÚÈÕ³£ÏîÄ¿¿ª·¢ÖбȽϳ£ÓõöÌØÐÔ¡£
Ò» mandatory ²ÎÊý#
ÉÏһƪÎÄÕÂÖÐÎÒÃÇÖªµÀ£¬Éú²úÕß½«ÏûÏ¢·¢Ë͵½RabbitMQµÄ½»»»Æ÷ÖÐͨ¹ýRoutingKeyÓëBindingKeyµÄÆ¥Å佫֮·Óɵ½¾ßÌåµÄ¶ÓÁÐÖÐÒÔ¹©Ïû·ÑÕßÏû·Ñ¡£ÄÇôµ±ÎÒÃÇͨ¹ýÆ¥Å乿ÔòÕÒ²»µ½¶ÓÁеÄʱºò£¬ÏûÏ¢½«ºÎÈ¥ºÎ´ÓÄØ?Rabbit¸øÎÒÃÇÌṩÁËÁ½ÖÖ·½Ê½¡£mandatoryÓ뱸·Ý½»»»Æ÷¡£
mandatory²ÎÊýÊÇchannel.BasicPublish·½·¨ÖеIJÎÊý¡£ÆäÖ÷Òª¹¦ÄÜÊÇÏûÏ¢´«µÝ¹ý³ÌÖв»¿É´ïÄ¿µÄµØÊ±½«ÏûÏ¢·µ»Ø¸øÉú²úÕß¡£µ±mandatory
²ÎÊýÉèΪtrue ʱ£¬½»»»Æ÷ÎÞ·¨¸ù¾Ý×ÔÉíµÄÀàÐͺÍ·ÓɼüÕÒµ½Ò»¸ö·ûºÏÌõ¼þµÄ¶ÓÁУ¬ÄÇôRabbitMQ
»áµ÷ÓÃBasicReturn ÃüÁÏûÏ¢·µ»Ø¸øÉú²úÕß¡£µ±mandatory ²ÎÊýÉèÖÃΪfalse ʱ¡£ÔòÏûÏ¢Ö±½Ó±»¶ªÆú¡£ÆäÔËתÁ÷³ÌÓëʵÏÖ´úÂëÈçÏÂ(ÒÔC#
RabbitMQ.Client 3.6.9ΪÀý):

//Á¬½ÓÓë´´½¨ÐŵÀ--ºóÐøµÄʾÀý´úÂëÎÒÃÇ»áÊ¡ÂÔµôÕⲿ·Ö´úÂëºÍÊÍ·ÅÁ¬½Ó
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.HostName = "192.168.121.205";
IConnection conn = factory.CreateConnection();//Á¬½ÓRabbit
IModel channel = conn.CreateModel();//´´½¨ÐŵÀ
channel.ExchangeDeclare("exchangeName",
"direct", true);//¶¨Òå½»»»Æ÷
String queueName = channel.QueueDeclare("TestQueue",
true, false, false, null).QueueName;//¶¨Òå ¶ÓÁÐ
¶ÓÁÐÃûTestQueue,³Ö¾Ã»¯µÄ,·ÇÅÅËüµÄ,·Ç×Ô¶¯É¾³ýµÄ¡£
channel.QueueBind(queueName, "exchangeName",
"routingKey");//¶ÓÁа󶨽»»»Æ÷
var message = Encoding.UTF8.GetBytes("TestMsg");
channel.BasicPublish("exchangeName",
"routingKey", true, null, message);//·¢²¼Ò»¸ö¿ÉÒÔ·Óɵ½¶ÓÁеÄÏûÏ¢£¬mandatory²ÎÊýÉèÖÃΪtrue
var message1 = Encoding.UTF8.GetBytes("TestMsg1");
channel.BasicPublish("exchangeName",
"routingKey1", true, null, message);//·¢²¼Ò»¸ö²»¿ÉÒÔ·Óɵ½¶ÓÁеÄÏûÏ¢£¬mandatory²ÎÊýÉèÖÃΪtrue
//Éú²úÕ߻ص÷º¯Êý
channel.BasicReturn += (model, ea) =>
{
//do something... ÏûÏ¢Èô²»ÄÜ·Óɵ½¶ÓÁÐÔò»áµ÷Óô˻ص÷º¯Êý¡£
};
//¹Ø±ÕÐŵÀÓëÁ¬½Ó
channel.close();
conn.close() ; |
¶þ ±¸·Ý½»»»Æ÷#
µ±ÏûÏ¢²»ÄÜ·Óɵ½¶ÓÁÐʱ£¬Í¨¹ýmandatoryÉèÖòÎÊý,ÎÒÃÇ¿ÉÒÔ½«ÏûÏ¢·µ»Ø¸øÉú²úÕß´¦Àí¡£µ«ÕâÑù»áÓÐÒ»¸öÎÊÌ⣬¾ÍÊÇÉú²úÕßÐèÒª¿ªÒ»¸ö»Øµ÷µÄº¯ÊýÀ´´¦Àí²»ÄÜ·Óɵ½µÄÏûÏ¢£¬ÕâÎÞÒÉ»áÔö¼ÓÉú²úÕߵĴ¦ÀíÂß¼¡£±¸·Ý½»»»Æ÷(Altemate
Exchange)ÔòÌṩÁËÁíÒ»ÖÖ·½Ê½À´´¦Àí²»ÄÜ·ÓɵÄÏûÏ¢¡£±¸·Ý½»»»Æ÷¿ÉÒÔ½«Î´±»Â·ÓɵÄÏûÏ¢´æ´¢ÔÚRabbitMQÖУ¬ÔÚÐèÒªµÄʱºòÈ¥´¦ÀíÕâЩÏûÏ¢¡£ÆäÖ÷ҪʵÏÖ´úÂëÈçÏÂ:
IDictionary<string,
object> args = new Dictionary<string,
object>();
args.Add("alternate-exchange", "altExchange");
channel.ExchangeDeclare("normalExchange",
"direct", true, false, args);//¶¨ÒåÆÕͨ½»»»Æ÷²¢Ìí¼Ó±¸·Ý½»»»Æ÷²ÎÊý
channel.ExchangeDeclare("altExchange",
"fanout", true, false, null); //¶¨Ò屸·Ý½»»»Æ÷£¬²¢ÉùÃ÷ΪÉÈÐν»»»Æ÷
channel.QueueDeclare("normalQueue",
true, false, false, null);//¶¨ÒåÆÕͨ¶ÓÁÐ
channel.QueueBind("normalQueue", "normalExchange",
"NormalRoutingKey1");//ÆÕͨ¶ÓÁжÓÁÐ°ó¶¨ÆÕͨ½»»»Æ÷
channel.QueueDeclare("altQueue",
true, false, false, null);//¶¨Ò屸·Ý¶ÓÁÐ
channel.QueueBind("altQueue", "altExchange",
"");//°ó¶¨±¸·Ý¶ÓÁÐÓë½»»»Æ÷
var msg1 = Encoding.UTF8.GetBytes("TestMsg");
channel.BasicPublish("normalExchange",
"NormalRoutingKey1", false, null,
msg1);//·¢²¼Ò»¸ö¿ÉÒÔ·Óɵ½¶ÓÁеÄÏûÏ¢£¬ÏûÏ¢×îÖÕ»á·Óɵ½normalQueue
var msg2 = Encoding.UTF8.GetBytes("TestMsg1");
channel.BasicPublish("normalExchange",
"NormalRoutingKey2", false, null,
msg2);//·¢²¼Ò»¸ö²»¿ÉÒÔ±»Â·ÓɵÄÏûÏ¢£¬ÏûÏ¢×îÖÕ»á½øÈëaltQueue |

±¸·Ý½»»»Æ÷ÆäʵºÍÆÕͨµÄ½»»»Æ÷ûÓÐÌ«´óµÄÇø±ð£¬ÎªÁË·½±ãʹÓ㬽¨ÒéÉèÖÃΪfanoutÀàÐÍ£¬ÈôÉèÖÃΪdirect
»òÕßtopicµÄÀàÐÍ¡£ÐèҪעÒâµÄÊÇ£¬ÏûÏ¢±»ÖØÐ·¢Ë͵½±¸·Ý½»»»Æ÷ʱµÄ·ÓɼüºÍ´ÓÉú²úÕß·¢³öµÄ·ÓɼüÊÇÒ»ÑùµÄ¡£¿¼ÂÇÕâÑùÒ»ÖÖÇé¿ö£¬Èç¹û±¸·Ý½»»»Æ÷µÄÀàÐÍÊÇdirect,²¢ÇÒÓÐÒ»¸öÓëÆä°ó¶¨µÄ¶ÓÁУ¬¼ÙÉè°ó¶¨µÄ·ÓɼüÊÇkey1£¬µ±Ä³ÌõЯ´øÂ·ÓɼüΪkey2
µÄÏûÏ¢±»×ª·¢µ½Õâ¸ö±¸·Ý½»»»Æ÷µÄʱºò£¬±¸·Ý½»»»Æ÷ûÓÐÆ¥Åäµ½ºÏÊʵĶÓÁУ¬ÔòÏûÏ¢¶ªÊ§¡£Èç¹ûÏûϢЯ´øµÄ·ÓɼüΪkeyl£¬Ôò¿ÉÒÔ´æ´¢µ½¶ÓÁÐÖС£
¶ÔÓÚ±¸·Ý½»»»Æ÷£¬ÓÐÒÔϼ¸ÖÖÌØÊâÇé¿ö:
Èç¹ûÉèÖõı¸·Ý½»»»Æ÷²»´æÔÚ£¬¿Í»§¶ËºÍRabbitMQ ·þÎñ¶Ë¶¼²»»áÓÐÒì³£³öÏÖ£¬´ËʱÏûÏ¢»á¶ªÊ§¡£
Èç¹û±¸·Ý½»»»Æ÷ûÓаó¶¨ÈκζÓÁУ¬¿Í»§¶ËºÍRabbitMQ ·þÎñ¶Ë¶¼²»»áÓÐÒì³£³öÏÖ£¬´ËʱÏûÏ¢»á¶ªÊ§¡£
Èç¹û±¸·Ý½»»»Æ÷ûÓÐÈÎºÎÆ¥ÅäµÄ¶ÓÁУ¬¿Í»§¶ËºÍRabbitMQ ·þÎñ¶Ë¶¼²»»áÓÐÒì³£³öÏÖ£¬´ËʱÏûÏ¢»á¶ªÊ§¡£
Èç¹û±¸·Ý½»»»Æ÷ºÍmandatory²ÎÊýÒ»ÆðʹÓã¬ÄÇômandatory²ÎÊýÎÞЧ¡£
Èý ¹ýÆÚʱ¼ä(TTL)#
3.1 ÉèÖÃÏûÏ¢µÄTTL#
ĿǰÓÐÁ½ÖÖ·½·¨¿ÉÒÔÉèÖÃÏûÏ¢µÄTTL¡£µÚÒ»ÖÖ·½·¨ÊÇͨ¹ý¶ÓÁÐÊôÐÔÉèÖ㬶ÓÁÐÖÐËùÓÐÏûÏ¢¶¼ÓÐÏàͬµÄ¹ýÆÚʱ¼ä¡£µÚ¶þÖÖ·½·¨ÊǶÔÏûÏ¢±¾Éí½øÐе¥¶ÀÉèÖã¬Ã¿ÌõÏûÏ¢µÄTTL¿ÉÒÔ²»Í¬¡£Èç¹ûÁ½ÖÖ·½·¨Ò»ÆðʹÓã¬ÔòÏûÏ¢µÄTTL
ÒÔÁ½ÕßÖ®¼ä½ÏСµÄÄǸöÊýֵΪ׼¡£ÏûÏ¢ÔÚ¶ÓÁÐÖеÄÉú´æÊ±¼äÒ»µ©³¬¹ýÉèÖõÄTTLֵʱ£¬¾Í»á±ä³É"ËÀÐÅ"
(Dead Message) £¬Ïû·ÑÕß½«ÎÞ·¨ÔÙÊÕµ½¸ÃÏûÏ¢¡£(ÓйØËÀÐŶÓÁÐÇëÍùÏ¿´)
ͨ¹ý¶ÓÁÐÊôÐÔÉèÖÃÏûÏ¢TTLµÄ·½·¨ÊÇÔÚchannel.QueueDeclare·½·¨ÖмÓÈëx-message-ttl²ÎÊýʵÏֵģ¬Õâ¸ö²ÎÊýµÄµ¥Î»ÊǺÁÃ롣ʾÀý´úÂëÏÂ:
IDictionary<string,
object> args = new Dictionary<string,
object>();
args.Add("x-message-ttl", 6000);
channel.QueueDeclare("ttlQueue", true,
false, false, args);
|
Èç¹û²»ÉèÖÃTTL.Ôò±íʾ´ËÏûÏ¢²»»á¹ýÆÚ;Èç¹û½«TTLÉèÖÃΪ0 £¬Ôò±íʾ³ý·Ç´Ëʱ¿ÉÒÔÖ±½Ó½«ÏûϢͶµÝµ½Ïû·ÑÕߣ¬·ñÔò¸ÃÏûÏ¢»á±»Á¢¼´¶ªÆú(»òÓÉËÀÐŶÓÁÐÀ´´¦Àí)¡£
Õë¶ÔÿÌõÏûÏ¢ÉèÖÃTTLµÄ·½·¨ÊÇÔÚchannel.BasicPublish·½·¨ÖмÓÈëExpirationµÄÊôÐÔ²ÎÊý£¬µ¥Î»ÎªºÁÃë¡£¹Ø¼ü´úÂëÈçÏ£º
BasicProperties
properties = new BasicProperties()
{
Expiration = "20000",//ÉèÖÃTTLΪ20000ºÁÃë
};
var message = Encoding.UTF8.GetBytes("TestMsg");
channel.BasicPublish("normalExchange",
"NormalRoutingKey", true, properties,
message); |
×¢Òâ:¶ÔÓÚµÚÒ»ÖÖÉèÖöÓÁÐTTLÊôÐԵķ½·¨£¬Ò»µ©ÏûÏ¢¹ýÆÚ£¬¾Í»á´Ó¶ÓÁÐÖÐĨȥ£¬¶øÔÚµÚ¶þÖÖ·½·¨ÖУ¬¼´Ê¹ÏûÏ¢¹ýÆÚ£¬Ò²²»»áÂíÉÏ´Ó¶ÓÁÐÖÐĨȥ£¬ÒòΪÿÌõÏûÏ¢ÊÇ·ñ¹ýÆÚÊÇÔÚ¼´½«Í¶µÝµ½Ïû·ÑÕß֮ǰÅж¨µÄ¡£Why?ÔÚµÚÒ»ÖÖ·½·¨À¶ÓÁÐÖмº¹ýÆÚµÄÏûÏ¢¿Ï¶¨ÔÚ¶ÓÁÐÍ·²¿£¬
RabbitMQ Ö»Òª¶¨ÆÚ´Ó¶ÓÍ·¿ªÊ¼É¨ÃèÊÇ·ñÓйýÆÚµÄÏûÏ¢¼´¿É¡£¶øµÚ¶þÖÖ·½·¨ÀÿÌõÏûÏ¢µÄ¹ýÆÚʱ¼ä²»Í¬£¬Èç¹ûҪɾ³ýËùÓйýÆÚÏûÏ¢ÊÆ±ØÒªÉ¨ÃèÕû¸ö¶ÓÁУ¬ËùÒÔ²»ÈçµÈµ½´ËÏûÏ¢¼´½«±»Ïû·ÑʱÔÙÅж¨ÊÇ·ñ¹ýÆÚ£¬Èç¹û¹ýÆÚÔÙ½øÐÐɾ³ý¼´¿É¡£
3.2 ÉèÖöÓÁеÄTTL#
×¢Ò⣬ÕâÀïºÍÉÏÊöͨ¹ý¶ÓÁÐÉèÖÃÏûÏ¢µÄTTL²»Í¬¡£ÉÏÃæÉ¾³ýµÄÊÇÏûÏ¢£¬¶øÕâÀïɾ³ýµÄÊǶÓÁС£Í¨¹ýchannel.QueueDeclare
·½·¨ÖеÄx-expires²ÎÊý¿ÉÒÔ¿ØÖƶÓÁб»×Ô¶¯É¾³ýǰ´¦ÓÚδʹÓÃ״̬µÄʱ¼ä¡£Õâ¸öδʹÓõÄÒâ˼ÊǶÓÁÐÉÏûÓÐÈκεÄÏû·ÑÕߣ¬¶ÓÁÐҲûÓб»ÖØÐÂÉùÃ÷£¬²¢ÇÒÔÚ¹ýÆÚʱ¼ä¶ÎÄÚҲδµ÷Óùýchannel.BasicGetÃüÁî¡£
ÉèÖöÓÁÐÀïµÄTTL¿ÉÒÔÓ¦ÓÃÓÚÀàËÆRPC·½Ê½µÄ»Ø¸´¶ÓÁУ¬ÔÚRPCÖУ¬Ðí¶à¶ÓÁлᱻ´´½¨³öÀ´£¬µ«ÊÇÈ´ÊÇδ±»Ê¹ÓõÄ(ÓйØRabbitMQʵÏÖRPCÇëÍùÏ¿´)¡£RabbitMQ»áÈ·±£ÔÚ¹ýÆÚʱ¼äµ½´ïºó½«¶ÓÁÐɾ³ý£¬µ«ÊDz»±£ÕÏɾ³ýµÄ¶¯×÷Óж༰ʱ¡£ÔÚRabbitMQ
ÖØÆôºó£¬ ³Ö¾Ã»¯µÄ¶ÓÁеĹýÆÚʱ¼ä»á±»ÖØÐ¼ÆËã¡£ÓÃÓÚ±íʾ¹ýÆÚʱ¼äµÄx-expires²ÎÊýÒÔºÁÃëΪµ¥Î»£¬
¾®ÇÒ·þ´ÓºÍx-message-ttlÒ»ÑùµÄÔ¼ÊøÌõ¼þ£¬²»Í¬µÄÊÇËü²»ÄÜÉèÖÃΪ0(»á±¨´í)¡£
ʾÀý´úÂëÈçÏ£º
IDictionary<string,
object> args = new Dictionary<string,
object>();
args.Add("x-expires", 6000);
channel.QueueDeclare("ttlQueue", false,
false, false, args); |
ËÄ ËÀÐŶÓÁÐ#
DLX(Dead-Letter-Exchange)ËÀÐŽ»»»Æ÷£¬µ±ÏûÏ¢ÔÚÒ»¸ö¶ÓÁÐÖбä³ÉËÀÐÅÖ®ºó£¬ËüÄܱ»ÖØÐ±»·¢Ë͵½ÁíÒ»¸ö½»»»Æ÷ÖУ¬Õâ¸ö½»»»Æ÷¾ÍÊÇDLX
£¬°ó¶¨DLXµÄ¶ÓÁоͳÆÖ®ÎªËÀÐŶÓÁС£
ÏûÏ¢±ä³ÉËÀÐÅÖ÷ÒªÓÐÒÔϼ¸ÖÖÇé¿ö:
ÏûÏ¢±»¾Ü¾ø(BasicReject/BasicNack) £¬¾®ÇÒÉèÖÃrequeue ²ÎÊýΪfalse;(Ïû·ÑÕßÈ·ÈÏ»úÖÆ½«»áÔÚÏÂһƪÎÄÕÂÖÐÉæ¼°)
ÏûÏ¢¹ýÆÚ;
¶ÓÁдﵽ×î´ó³¤¶È¡£
DLXÒ²ÊÇÒ»¸öÕý³£µÄ½»»»Æ÷£¬ºÍÒ»°ãµÄ½»»»Æ÷ûÓÐÇø±ð£¬ËüÄÜÔÚÈκεĶÓÁÐÉϱ»Ö¸¶¨£¬Êµ¼ÊÉϾÍÊÇÉèÖÃij¸ö¶ÓÁеÄÊôÐÔ¡£µ±Õâ¸ö¶ÓÁÐÖдæÔÚËÀÐÅʱ£¬RabbitMQ
¾Í»á×Ô¶¯µØ½«Õâ¸öÏûÏ¢ÖØÐ·¢²¼µ½ÉèÖõÄDLXÉÏÈ¥£¬½ø¶ø±»Â·Óɵ½ÁíÒ»¸ö¶ÓÁУ¬¼´ËÀÐŶÓÁС£¿ÉÒÔ¼àÌýÕâ¸ö¶ÓÁÐÖеÄÏûÏ¢¡¢ÒÔ½øÐÐÏàÓ¦µÄ´¦Àí¡£
ͨ¹ýÔÚchannel.QueueDeclare ·½·¨ÖÐÉèÖÃx-dead-letter-exchange²ÎÊýÀ´ÎªÕâ¸ö¶ÓÁÐÌí¼ÓDLX¡£ÆäʾÀý´úÂëÈçÏÂ:
channel.ExchangeDeclare
("exchange.dlx", "direct",
true);//¶¨ÒåËÀÐŽ»»»Æ÷
channel.ExchangeDeclare ("exchange.normal",
"direct", true);//¶¨ÒåÆÕͨ½»»»Æ÷
IDictionary<String, Object> args = new
Dictionary<String, Object>();
args.Add("x-message-ttl",10000);//¶¨ÒåÏûÏ¢¹ýÆÚʱ¼äΪ10000ºÁÃë
args.Add("x-dead-letter-exchange",
"exchange.dlx");//¶¨Òåexchange.dlxΪËÀÐŽ»»»Æ÷
args.Add ("x-dead-letter-routing-key",
"routingkey");//¶¨ÒåËÀÐŽ»»»Æ÷µÄ°ó¶¨key,ÕâÀïÒ²¿ÉÒÔ²»Ö¸¶¨£¬ÔòĬÈÏʹÓÃÔ¶ÓÁеÄ·ÓÉkey
channel.QueueDeclare ("queue.normal",
true, false, false, args);//¶¨ÒåÆÕͨ¶ÓÁÐ
channel.QueueBind ("queue.normal",
"exchange.normal", "normalKey");//ÆÕͨ¶ÓÁн»»»Æ÷°ó¶¨
channel.QueueDeclare("queue.dlx",
true, false, false, null);//¶¨ÒåËÀÐŶÓÁÐ
channel.QueueBind("queue.dlx", "exchange.dlx",
"routingkey");//ËÀÐŶÓÁн»»»Æ÷°ó¶¨,ÈôÉÏ·½ÎªÖƶ¨ËÀÐŶÓÁзÓÉkeyÔòÕâÀïÐèҪʹÓÃÔ¶ÓÁеÄ·ÓÉkey
//·¢²¼ÏûÏ¢
var message = Encoding.UTF8.GetBytes("TestMsg");
channel.BasicPublish("exchange.normal",
"normalKey", null, message) ;
|
ÒÔÏÂΪËÀÐŶÓÁеÄÔËתÁ÷³Ì:

Îå ÑÓ³Ù¶ÓÁÐ#
RabbitMQ±¾Éí²¢Î´ÌṩÑÓ³Ù¶ÓÁеŦÄÜ¡£ÑÓ³Ù¶ÓÁÐÊÇÒ»¸öÂß¼ÉϵĸÅÄ¿ÉÒÔͨ¹ý¹ýÆÚʱ¼ä+ËÀÐŶÓÁÐÀ´Ä£ÄâËüµÄʵÏÖ¡£ÑÓ³Ù¶ÓÁеÄÂß¼¼Ü¹¹´óÖÂÈçÏÂ:

Éú²úÕß½«ÏûÏ¢·¢Ë͵½¹ýÆÚʱ¼äΪnµÄ¶ÓÁÐÖУ¬Õâ¸ö¶ÓÁв¢Î´ÓÐÏû·ÑÕßÀ´Ïû·ÑÏûÏ¢£¬µ±¹ýÆÚʱ¼äµ½´ïʱ£¬ÏûÏ¢»áͨ¹ýËÀÐŽ»»»Æ÷±»×ª·¢µ½ËÀÐŶÓÁÐÖС£¶øÏû·ÑÕß´ÓËÀÐŶÓÁÐÖÐÏû·ÑÏûÏ¢¡£Õâ¸öʱºò¾Í´ïµ½ÁËÉú²úÕß·¢²¼ÁËÏûÏ¢ÔÚ½²¹ýÁËnʱ¼äºóÏû·ÑÕßÏû·ÑÁËÏûÏ¢£¬Æðµ½ÁËÑÓ³ÙÏû·ÑµÄ×÷Óá£
ÑÓ³Ù¶ÓÁÐÔÚÎÒÃǵÄÏîÄ¿ÖпÉÒÔÓ¦ÓÃÓںܶೡ¾°£¬È磺ϵ¥ºóÁ½¸öÏûϢȡÏû¶©µ¥£¬ÆßÌì×Ô¶¯ÊÕ»õ£¬ÆßÌì×Ô¶¯ºÃÆÀ£¬ÃÜÂë¶³½áºó24Сʱ½â¶³£¬ÒÔ¼°ÔÚ·Ö²¼Ê½ÏµÍ³ÖÐÏûÏ¢²¹³¥»úÖÆ(1sºó²¹³¥,10sºó²¹³¥£¬5mºó²¹³¥......)¡£

Áù ÓÅÏȼ¶¶ÓÁÐ#
¾ÍÏñÎÒÃÇÉú»îÖеġ°ÌØÊ⡱ÈËʿһÑù£¬ÎÒÃǵÄÒµÎñÉÏÒ²´æÔÚһЩ¡°ÌØÊ⡱ÏûÏ¢£¬¿ÉÄÜÐèÒªÓÅÏȽøÐд¦Àí£¬ÔÚÉú»îÉÏÎÒÃÇ¿ÉÄÜ»á¶ÔÕⲿ·ÖÌØÊâÈËÊ¿¿ª±ÙÒ»Ì×VIPͨµÀ£¬¶øRabbitͬÑùÒ²ÓÐÕâÑùµÄVIPͨµÀ(ǰÌáÊÇÔÚ3.5µÄ°æ±¾ÒÔºó)£¬¼´ÓÅÏȼ¶¶ÓÁУ¬¶ÓÁÐÖеÄÏûÏ¢»áÓÐÓÅÏȼ¶ÓÅÏȼ¶¸ßµÄÏûÏ¢¾ß±¸ÓÅÏȱ»Ïû·ÑµÄÌØÈ¨¡£Õë¶ÔÕâЩVIPÏûÏ¢£¬ÎÒÃÇÖ»Ðè×öÁ½¼þÊÂ:
ÎÒÃÇÖ»Ðè×öÁ½¼þÊÂÇ飺
½«¶ÓÁÐÉùÃ÷ΪÓÅÏȼ¶¶ÓÁУ¬¼´ÔÚ´´½¨¶ÓÁеÄʱºòÌí¼Ó²ÎÊý x-max-priority ÒÔÖ¸¶¨×î´óµÄÓÅÏȼ¶£¬ÖµÎª0-255£¨ÕûÊý£©¡£
ΪÓÅÏȼ¶ÏûÏ¢Ìí¼ÓÓÅÏȼ¶¡£
ÆäʾÀý´úÂëÈçÏÂ:
channel.ExchangeDeclare("exchange.priority",
"direct", true);//¶¨Òå½»»»Æ÷
IDictionary<String, Object> args = new
Dictionary<String, Object>();
args.Add("x-max-priority", 10);//¶¨ÒåÓÅÏȼ¶¶ÓÁеÄ×î´óÓÅÏȼ¶Îª10
channel.QueueDeclare("queue.priority",
true, false, false, args);//¶¨ÒåÓÅÏȼ¶¶ÓÁÐ
channel.QueueBind("queue.priority",
"exchange.priority", "priorityKey");//¶ÓÁн»»»Æ÷°ó¶¨
BasicProperties properties = new BasicProperties()
{
Priority =8,//ÉèÖÃÏûÏ¢ÓÅÏȼ¶Îª8
};
var message = Encoding.UTF8.GetBytes("TestMsg8");
//·¢²¼ÏûÏ¢
channel.BasicPublish("exchange.priority",
"priorityKey", properties, message);
|
×¢Ò⣺ûÓÐÖ¸¶¨ÓÅÏȼ¶µÄÏûÏ¢»á½«ÓÅÏȼ¶ÒÔ0¶Ô´ý¡£ ¶ÔÓÚ³¬¹ýÓÅÏȼ¶¶ÓÁÐËù¶¨×î´óÓÅÏȼ¶µÄÏûÏ¢£¬ÓÅÏȼ¶ÒÔ×î´óÓÅÏȼ¶¶Ô´ý¡£¶ÔÓÚÏàͬÓÅÏȼ¶µÄÏûÏ¢£¬ºó½øµÄÅÅÔÚÇ°Ãæ¡£Èç¹ûÔÚÏû·ÑÕßµÄÏû·ÑËÙ¶È´óÓÚÉú²úÕßµÄËÙ¶ÈÇÒBroker
ÖÐûÓÐÏûÏ¢¶Ñ»ýµÄÇé¿öÏ£¬ ¶Ô·¢Ë͵ÄÏûÏ¢ÉèÖÃÓÅÏȼ¶Ò²¾ÍûÓÐʲôʵ¼ÊÒâÒå¡£ÒòΪÉú²úÕ߸շ¢ËÍÍêÒ»ÌõÏûÏ¢¾Í±»Ïû·ÑÕßÏû·ÑÁË£¬ÄÇô¾ÍÏ൱ÓÚBroker
ÖÐÖÁ¶àÖ»ÓÐÒ»ÌõÏûÏ¢£¬¶ÔÓÚµ¥ÌõÏûÏ¢À´ËµÓÅÏȼ¶ÊÇûÓÐʲôÒâÒåµÄ¡£
¹ØÓÚÓÅÏȼ¶¶ÓÁУ¬ºÃÏñÎ¥±³Á˶ÓÁÐÕâÖÖÊý¾Ý½á¹¹ÏȽøÏȳöµÄÔÔò£¬Æä¾ßÌåÊÇÔõôʵÏÖµÄÔÚÕâÀï¾Í²»¹ý¶àÌÖÂÛ¡£ÓÐÐËȤµÄ¿ÉÒÔ×Ô¼ºÑо¿Ñо¿¡£ºóÐø¿ÉÄÜÒ²»áÓÐÏà¹ØµÄÎÄÕÂÀ´·ÖÎöÆäÔÀí¡£
Æß RPC ʵÏÖ#
RPC,ÊÇRemote Procedure Call µÄ¼ò³Æ£¬¼´Ô¶³Ì¹ý³Ìµ÷Óá£ËüÊÇÒ»ÖÖͨ¹ýÍøÂç´ÓÔ¶³Ì¼ÆËã»úÉÏÇëÇó·þÎñ£¬¶ø²»ÐèÒªÁ˽âµ×²ãÍøÂçµÄ¼¼Êõ¡£RPC
µÄÖ÷Òª¹¦ÓÃÊÇÈù¹½¨·Ö²¼Ê½¼ÆËã¸üÈÝÒ×£¬ÔÚÌṩǿ´óµÄÔ¶³Ìµ÷ÓÃÄÜÁ¦Ê±²»Ëðʧ±¾µØµ÷ÓõÄÓïÒå¼ò½àÐÔ¡£
ÓйØRPC²»¶à½éÉÜ£¬ÕâÀïÎÒÃÇÖ÷Òª½éÉÜRabbitMQÈçºÎʵÏÖRPC¡£RabbitMQ
¿ÉÒÔʵÏֺܼòµ¥µÄRPC¡£¿Í»§¶Ë·¢ËÍÇëÇóÏûÏ¢£¬·þÎñ¶Ë»Ø¸´ÏìÓ¦µÄÏûÏ¢£¬ÎªÁ˽ÓÊÕÏìÓ¦µÄÏûÏ¢£¬ÎÒÃÇÐèÒªÔÚÇëÇóÏûÏ¢Öз¢ËÍÒ»¸ö»Øµ÷¶ÓÁÐ(¿ÉÒÔʹÓÃĬÈϵĶÓÁÐ)¡£Æä·þÎñÆ÷¶ËʵÏÖ´úÂëÈçÏÂ:
static void
Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.HostName = "192.168.121.205";
IConnection conn = factory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare("RpcQueue", true,
false, false, null);
SimpleRpcServer rpc = new MySimpRpcServer(new
Subscription(channel, "RpcQueue"));
rpc.MainLoop();
} |
public class
MySimpRpcServer: SimpleRpcServer
{
public MySimpRpcServer(Subscription subscription)
: base(subscription)
{
}
/// <summary>
/// Ö´ÐÐÍê³Éºó½øÐлص÷
/// </summary>
public override byte[] HandleSimpleCall(bool
isRedelivered, IBasicProperties requestProperties,
byte[] body, out IBasicProperties replyProperties)
{
replyProperties = null;
return Encoding.UTF8.GetBytes("ÎÒÊÕµ½ÁË!");
}
/// <summary>
/// ½øÐд¦Àí
/// </summary>
/// <param name="evt"></param>
public override void ProcessRequest (BasicDeliverEventArgs
evt)
{
// todo.....
base.ProcessRequest(evt);
}
} |
¿Í»§¶ËʵÏÖ´úÂëÈçÏÂ:
ConnectionFactory
factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.HostName = "192.168.121.205";
IConnection conn = factory.CreateConnection();
IModel channel = conn.CreateModel();
SimpleRpcClient client = new SimpleRpcClient(channel,
"RpcQueue");
var message = Encoding.UTF8.GetBytes("TestMsg8");
var result = client.Call(message);
//do somethings... |
ÒÔÉÏÊÇRabbit¿Í»§¶Ë×Ô¼º°ïÎÒÃÇ·â×°ºÃµÄRpc¿Í»§¶ËÓë·þÎñ¶ËµÄÂß¼¡£µ±È»ÎÒÃÇÒ²¿ÉÒÔ×Ô¼ºÊµÏÖ£¬Ö÷ÒªÊǽèÖúÓÚBasicPropertiesµÄÁ½¸ö²ÎÊý¡£
ReplyTo: ͨ³£ÓÃÀ´ÉèÖÃÒ»¸ö»Øµ÷¶ÓÁС£
CorrelationId : ÓÃÀ´¹ØÁªÇëÇó(request) ºÍÆäµ÷ÓÃRPC Ö®ºóµÄ»Ø¸´(response)
¡£
Æä´¦ÀíÁ÷³ÌÈçÏÂ:

1.µ±¿Í»§¶ËÆô¶¯Ê±£¬´´½¨Ò»¸öÄäÃûµÄ»Øµ÷¶ÓÁС£
2.¿Í»§¶ËΪRPC ÇëÇóÉèÖÃ2¸öÊôÐÔ: ReplyToÓÃÀ´¸æÖªRPC
·þÎñ¶Ë»Ø¸´ÇëÇóʱµÄÄ¿µÄ¶ÓÁУ¬¼´»Øµ÷¶ÓÁÐ; Correlationld ÓÃÀ´±ê¼ÇÒ»¸öÇëÇó¡£
3.ÇëÇó±»·¢Ë͵½RpcQueue¶ÓÁÐÖС£
4.RPC ·þÎñ¶Ë¼àÌýRpcQueue¶ÓÁÐÖеÄÇëÇ󣬵±ÇëÇóµ½À´Ê±£¬·þÎñ¶Ë»á´¦Àí²¢ÇÒ°Ñ´øÓнá¹ûµÄÏûÏ¢·¢Ë͸ø¿Í»§¶Ë¡£½ÓÊյĶÓÁоÍÊÇReplyToÉ趨µÄ»Øµ÷¶ÓÁС£
5.¿Í»§¶Ë¼à꿻ص÷¶ÓÁУ¬µ±ÓÐÏûϢʱ£¬¼ì²éCorrelationld
ÊôÐÔ£¬Èç¹ûÓëÇëÇóÆ¥Å䣬ÄǾÍÊǽá¹ûÁË¡£
½áÊøÓï
±¾ÆªÎÄÕ¼òµ¥½éÉÜÁËRabbitMQÔÚÎÒÃÇÏîÄ¿¿ª·¢Öг£Óõļ¸ÖÖÌØÐÔ¡£ÕâÐ©ÌØÐÔ¿ÉÒÔ°ïÖúÎÒÃǸüºÃµÄ½«RabbitÓÃÓÚÎÒÃDz»Í¬µÄÒµÎñ³¡¾°ÖС£ÕâÐ©ÌØÐÔÓëʾÀý£¬¿ÉÒÔ×Ô¼ºÔÚ³ÌÐòÖÐÔËÐÐһϣ¬È»ºóͨ¹ý²é¿´RabbitÌṩµÄweb¹ÜÀí½çÃæÀ´ÑéÖ¤ÆäÕýÈ·ÐÔ(¹ØÓÚweb¹ÜÀí½çÃæ²»¶à½éÉÜ£¬ÏàÐÅ´ó¼ÒÉÔ΢Ñо¿Ñо¿¾ÍÄÜÃ÷°×)¡£µ±È»£¬¹ØÓÚRabbitµÄʹÓã¬ÈÔÓÐÐí¶àµØ·½ÔÚ±¾ÎÄÖÐûÓÐÌá¼°£¬È磺RabbitMQµÄÌØÉ«¡ª¡ªÈ·ÈÏ»úÖÆ¡¢³Ö¾Ã»¯......½«ÔÚÏÂһƪÎÄÕÂÖÐÔÙÏêϸ½éÉÜ¡£
|