Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
.NET »·¾³ÖÐʹÓÃRabbitMQ
 
×÷ÕߣºËÕ½ø¾ü£¬ÎÚÏþ·å À´Ô´£º²©¿ÍÔ°   ·¢²¼ÓÚ 2015-02-09
  2966  次浏览      27
 

ÔÚÆóÒµÓ¦ÓÃϵͳÁìÓò£¬»áÃæ¶Ô²»Í¬ÏµÍ³Ö®¼äµÄͨÐÅ¡¢¼¯³ÉÓëÕûºÏ£¬ÓÈÆäµ±ÃæÁÙÒ칹ϵͳʱ£¬ÕâÖÖ·Ö²¼Ê½µÄµ÷ÓÃÓëͨÐűäµÃÔ½ÖØÒª¡£Æä´Î£¬ÏµÍ³ÖÐÒ»°ã»áÓкܶà¶ÔʵʱÐÔÒªÇ󲻸ߵĵ«ÊÇÖ´ÐÐÆðÀ´±È½Ï½ÏºÄʱµÄµØ·½£¬±ÈÈç·¢ËͶÌÐÅ£¬ÓʼþÌáÐÑ£¬¸üÐÂÎÄÕÂÔĶÁ¼ÆÊý£¬¼Ç¼Óû§²Ù×÷ÈÕÖ¾µÈµÈ£¬Èç¹ûʵʱ´¦ÀíµÄ»°£¬ÔÚÓû§·ÃÎÊÁ¿±È½Ï´óµÄÇé¿öÏ£¬¶ÔϵͳѹÁ¦±È½Ï´ó¡£

Ãæ¶ÔÕâЩÎÊÌ⣬һ°ãµÄÎÒÃǻὫÕâЩÇëÇ󣬷ÅÔÚÏûÏ¢¶ÓÁÐÖд¦Àí£»Ò칹ϵͳ֮¼äʹÓÃÏûÏ¢½øÐÐͨѶ¡£ÏûÏ¢´«µÝÏà½ÏÎļþ´«µÝÓëÔ¶³Ì¹ý³Ìµ÷Óã¨RPC£©¶øÑÔ£¬Ëƺõ¸üʤһ³ï£¬ÒòΪËü¾ßÓиüºÃµÄƽ̨ÎÞ¹ØÐÔ£¬²¢Äܹ»ºÜºÃµØÖ§³Ö²¢·¢ÓëÒì²½µ÷Óá£ËùÒÔÈç¹ûϵͳÖгöÏÖÁËÈçÏÂÇé¿ö:

1.¶Ô²Ù×÷µÄʵʱÐÔÒªÇ󲻸ߣ¬¶øÐèÒªÖ´ÐеÄÈÎÎñ¼«ÎªºÄʱ£»

2.´æÔÚÒ칹ϵͳ¼äµÄÕûºÏ£»

Ò»°ãµÄ¿ÉÒÔ¿¼ÂÇÒýÈëÏûÏ¢¶ÓÁС£¶ÔÓÚµÚÒ»ÖÖÇé¿ö£¬³£³£»áÑ¡ÔñÏûÏ¢¶ÓÁÐÀ´´¦ÀíÖ´ÐÐʱ¼ä½Ï³¤µÄÈÎÎñ¡£ÒýÈëµÄÏûÏ¢¶ÓÁоͳÉÁËÏûÏ¢´¦ÀíµÄ»º³åÇø¡£ÏûÏ¢¶ÓÁÐÒýÈëµÄÒ첽ͨÐÅ»úÖÆ£¬Ê¹µÃ·¢ËÍ·½ºÍ½ÓÊÕ·½¶¼²»Óõȴý¶Ô·½·µ»Ø³É¹¦ÏûÏ¢£¬¾Í¿ÉÒÔ¼ÌÐøÖ´ÐÐÏÂÃæµÄ´úÂ룬´Ó¶øÌá¸ßÁËÊý¾Ý´¦ÀíµÄÄÜÁ¦¡£ÓÈÆäÊǵ±·ÃÎÊÁ¿ºÍÊý¾ÝÁ÷Á¿½Ï´óµÄÇé¿öÏ£¬¾Í¿ÉÒÔ½áºÏÏûÏ¢¶ÓÁÐÓëºǫ́ÈÎÎñ£¬Í¨¹ý±Ü¿ª¸ß·åÆÚ¶Ô´óÊý¾Ý½øÐд¦Àí£¬¾Í¿ÉÒÔÓÐЧ½µµÍÊý¾Ý¿â´¦ÀíÊý¾ÝµÄ¸ººÉ¡£

ÔÚÇ°ÃæµÄһƪ½²½âCQRSģʽµÄÎÄÕÂÖУ¬ËùÓеĶÔϵͳµÄ״̬µÄ¸ü¸Ä¶¼ÊÇͨ¹ýʼþÀ´Íê³É£¬Ò»°ãµÄ½«Ê¼þ´æ´¢µ½ÏûÏ¢¶ÓÁÐÖУ¬È»ºó½øÐÐͳһµÄ´¦Àí¡£

±¾Îļòµ¥½éÉÜÔÚRabbitMQÕâÒ»ÏûÏ¢´úÀí¹¤¾ß£¬ÒÔ¼°ÔÚ.NETÖÐÈçºÎʹÓÃRabbitMQ.

Ò» »·¾³´î½¨

Ê×ÏÈ£¬ÓÉÓÚRabbitMQʹÓÃErlang±àдµÄ£¬ÐèÒªÔËÐÐÔÚErlangÔËÐÐʱ»·¾³ÉÏ£¬ËùÒÔÔÚ°²×°RabbitMQ Server֮ǰÐèÒª°²×°Erlang ÔËÐÐʱ»·¾³£¬¿ÉÒÔµ½Erlang¹ÙÍøÏÂÔØ¶ÔӦƽ̨µÄ°²×°Îļþ¡£Èç¹ûûÓа²×°ÔËÐÐʱ»·¾³£¬°²×°RabbitMQ ServerµÄʱºò£¬»áÌáʾÐèÒªÏȰ²×°Erlang»·¾³¡£ °²×°Íê³ÉÖ®ºó£¬È·±£ÒѾ­½«ErlangµÄ°²×°Â·¾¶×¢²áµ½ÏµÍ³µÄ»·¾³±äÁ¿ÖС£°²×°ÍêErlangÖ®ºó£¬Õâ¸ö»·¾³»á×Ô¶¯ÉèÖã¬Èç¹ûûÓУ¬ÔÚadministrator»·¾³ÏÂÔÚ¿ØÖÆÌ¨ÏÂÃæÊäÈ룬Ҳ¿ÉÒÔÉèÖãº

Setx  ERLANG_HOME ¡°D:\Program Files (x86)\erl6.3¡å

È»ºó£¬È¥RabbitMQ¹ÙÍøÏÂÔØRabbitMQ Server·þÎñ¶Ë³ÌÐò£¬Ñ¡ÔñºÏÊÊµÄÆ½Ì¨°æ±¾ÏÂÔØ¡£°²×°Íê³ÉÖ®ºó£¬¾Í¿ÉÒÔ¿ªÊ¼Ê¹ÓÃÁË¡£

ÏÖÔھͿÉÒÔ¶ÔRabbitMQ Server½øÐÐÅäÖÃÁË¡£

Ê×ÏÈ£¬Çл»µ½RabbitMQ ServerµÄ°²×°Ä¿Â¼£º

ÔÚsbinÏÂÃæÓкܶàbatchÎļþ£¬ÓÃÀ´¿ØÖÆRabbitMQ Server£¬µ±È»ÄúÒ²¿ÉÒÔÖ±½ÓÔÚ°²×°¿ªÊ¼²Ëµ¥ÖÐÀ´Ö´ÐÐÏàÓ¦µÄ²Ù×÷£º

×î¼òµ¥µÄ·½Ê½ÊÇʹRabbitMQÒÔWindows ServiceµÄ·½Ê½ÔÚºǫ́ÔËÐУ¬ËùÒÔÎÒÃÇÐèÒªÒÔ¹ÜÀíԱȨÏÞ´ò¿ªcmd£¬È»ºóÇл»µ½sbinĿ¼Ï£¬Ö´ÐÐÕâÈýÌõÃüÁî¼´¿É£º

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

ÏÖÔÚRabbitMQµÄ·þÎñ¶ËÒѾ­Æô¶¯ÆðÀ´ÁË¡£

ÏÂÃæ¿ÉÒÔʹÓÃsbinĿ¼ÏÂÃæµÄrabbitmqctl.batÕâ¸ö½Å±¾À´²é¿´ºÍ¿ØÖÆ·þÎñ¶Ë״̬µÄ£¬ÔÚcmdÖÐÖ±½ÓÔËÐÐrabbitmqctl status¡£Èç¹û¿´µ½ÒÔϽá¹û£º

ÏÔʾnodeûÓÐÁ¬½ÓÉÏ£¬ÐèÒªµ½C:\WindowsĿ¼Ï£¬½«.erlang.cookieÎļþ£¬¿½±´µ½Óû§Ä¿Â¼Ï C:\Users\{Óû§Ãû}£¬ÕâÊÇErlangµÄCookieÎļþ£¬ÔÊÐíÓëErlang½øÐн»»¥£¬ÏÖÔÚÖØ¸´ÔËÐиղŵÄÃüÁî¾Í»áµÃµ½ÈçÏÂÐÅÏ¢£º

RabbitMQ ServerÉÏÃæÒ²ÓÐÓû§¸ÅÄ°²×°ºÃÖ®ºó£¬Ê¹ÓÃrabbitmqctl list_usersÃüÁ¿ÉÒÔ¿´µ½ÉÏÃæÄ¿Ç°µÄÓû§£º

¿ÉÒÔ¿´µ½£¬ÏÖÔÚÖ»ÓÐÒ»¸ö½ÇɫΪadministratorµÄÃûΪguestµÄÓû§£¬Õâ¸öÊÇRabbitMQĬÈÏΪÎÒÃÇ´´½¨µÄ£¬ËûÓÐRabbitMQµÄËùÓÐȨÏÞ£¬Ò»°ãµÄ£¬ÎÒÃÇÐèҪн¨Ò»¸öÎÒÃÇ×Ô¼ºµÄÓû§£¬ÉèÖÃÃÜÂ룬²¢ÊÚÓèȨÏÞ£¬¿ÉÒÔʹÓÃÏÂÃæµÄÃüÁîÀ´Ö´ÐÐÕâÒ»²Ù×÷£º

rabbitmqctl  add_user  yy  hello!
rabbitmqctl set_permissions yy ".*" ".*" ".*"

ÉÏÃæµÄÒ»ÌõÃüÁîÌí¼ÓÁËÒ»¸öÃûΪyyµÄÓû§£¬²¢ÉèÖÃÁËÃÜÂëhello£¡£¬ÏÂÃæµÄÃüÁîΪÓû§yy·Ö±ðÊÚÓè¶ÔËùÓÐÏûÏ¢¶ÓÁеÄÅäÖᢶÁºÍдµÄȨÏÞ¡£

ÏÖÔÚÎÒÃÇ¿ÉÒÔ½«Ä¬ÈϵÄguestÓû§É¾µô£¬Ê¹ÓÃÏÂÃæµÄÃüÁî¼´¿É£º

rabbitmqctl delete_user guest

¶þ ¿ªÊ¼Ê¹ÓÃ

ÔÚ.NETÖÐʹÓÃRabbitMQÐèÒªÏÂÔØRabbitMQµÄ¿Í»§¶Ë³ÌÐò¼¯£¬¿ÉÒÔµ½¹ÙÍøÏÂÔØ£¬ÏÂÔØ½âѹºó¾Í¿ÉÒԵõ½RabbitMQ.Client.dll£¬Õâ¾ÍÊÇRabbitMQµÄ¿Í»§¶Ë¡£

ÔÚʹÓÃRabitMQ֮ǰ£¬ÐèÒª¶ÔÏÂÃæµÄ¼¸¸ö»ù±¾¸ÅÄî˵Ã÷һϣº

RabbitMQÊÇÒ»¸öÏûÏ¢´úÀí¡£Ëû´ÓÏûÏ¢Éú²úÕß(producers)ÄÇÀï½ÓÊÕÏûÏ¢£¬È»ºó°ÑÏûÏ¢Ë͸øÏûÏ¢Ïû·ÑÕߣ¨consumer£©ÔÚ·¢ËͺͽÓÊÜÖ®¼ä£¬ËûÄܹ»¸ù¾ÝÉèÖõĹæÔò½øÐзÓÉ£¬»º´æºÍ³Ö¾Ã»¯¡£

Ò»°ãÌáµ½RabbitMQºÍÏûÏ¢£¬¶¼Óõ½Ò»Ð©×¨ÓÐÃû´Ê¡£

1.Éú²ú(Producing)Òâ˼¾ÍÊÇ·¢ËÍ¡£·¢ËÍÏûÏ¢µÄ³ÌÐò¾ÍÊÇÒ»¸öÉú²úÕß(producer)¡£ÎÒÃÇÒ»°ãÓá±P¡±À´±íʾ£º

2.¶ÓÁÐ(queue)¾ÍÊÇÓÊÏäµÄÃû³Æ¡£ÏûϢͨ¹ýÄãµÄÓ¦ÓóÌÐòºÍRabbitMQ½øÐд«Ê䣬ËüÃÇÖ»ÄÜ´æ´¢ÔÚ¶ÓÁУ¨queue£©ÖС£ ¶ÓÁУ¨queue£©ÈÝÁ¿Ã»ÓÐÏÞÖÆ£¬ÄãÒª´æ´¢¶àÉÙÏûÏ¢¶¼¿ÉÒÔ¡ª¡ª»ù±¾ÉÏÊÇÒ»¸öÎÞÏ޵Ļº³åÇø¡£¶à¸öÉú²úÕߣ¨producers£©Äܹ»°ÑÏûÏ¢·¢Ë͸øÍ¬Ò»¸ö¶ÓÁУ¬Í¬Ñù£¬¶à¸öÏû·ÑÕߣ¨consumers£©Ò²ÄÜ´Óͬһ¸ö¶ÓÁУ¨queue£©ÖлñÈ¡Êý¾Ý¡£¶ÓÁпÉÒÔ»­³ÉÕâÑù£¨Í¼ÉÏÊǶÓÁеÄÃû³Æ£©£º

3.Ïû·Ñ£¨Consuming£©ºÍ»ñÈ¡ÏûÏ¢ÊÇÒ»ÑùµÄÒâ˼¡£Ò»¸öÏû·ÑÕߣ¨consumer£©¾ÍÊÇÒ»¸öµÈ´ý»ñÈ¡ÏûÏ¢µÄ³ÌÐò¡£ÎÒÃǰÑËü»­×÷¡±C¡±£º

ͨ³££¬ÏûÏ¢Éú²úÕߣ¬ÏûÏ¢Ïû·ÑÕߺÍÏûÏ¢´úÀí²»ÔÚͬһ̨»úÆ÷ÉÏ¡£

2.1 Hello World

ΪÁËչʾRabbitMQµÄ»ù±¾Ê¹Óã¬ÎÒÃÇ·¢ËÍÒ»¸öHelloWorldÏûÏ¢£¬È»ºó½ÓÊÕ²¢´¦Àí¡£

Ê×ÏÈ´´½¨Ò»¸ö¿ØÖÆÌ¨³ÌÐò£¬ÓÃÀ´½«ÏûÏ¢·¢Ë͵½RabbitMQµÄÏûÏ¢¶ÓÁÐÖУ¬´úÂëÈçÏ£º

static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "yy";
factory.Password = "hello!"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null);
string message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "hello", null, body);
Console.WriteLine(" set {0}", message);
}
}
}

Ê×ÏÈ£¬ÐèÒª´´½¨Ò»¸öConnectionFactory£¬ÉèÖÃÄ¿±ê£¬ÓÉÓÚÊÇÔÚ±¾»ú£¬ËùÒÔÉèÖÃΪlocalhost£¬Èç¹ûRabbitMQ²»ÔÚ±¾»ú£¬Ö»ÐèÒªÉèÖÃÄ¿±ê»úÆ÷µÄIPµØÖ·»òÕß»úÆ÷Ãû³Æ¼´¿É£¬È»ºóÉèÖÃÇ°Ãæ´´½¨µÄÓû§ÃûyyºÍÃÜÂëhello£¡¡£

½ô½Ó×ÅÒª´´½¨Ò»¸öChannel£¬Èç¹ûÒª·¢ËÍÏûÏ¢£¬ÐèÒª´´½¨Ò»¸ö¶ÓÁУ¬È»ºó½«ÏûÏ¢·¢²¼µ½Õâ¸ö¶ÓÁÐÖС£ÔÚ´´½¨¶ÓÁеÄʱºò£¬Ö»ÓÐRabbitMQÉϸöÓÁв»´æÔÚ£¬²Å»áÈ¥´´½¨¡£ÏûÏ¢ÊÇÒÔ¶þ½øÖÆÊý×éµÄÐÎʽ´«ÊäµÄ£¬ËùÒÔÈç¹ûÏûÏ¢ÊÇʵÌå¶ÔÏóµÄ»°£¬ÐèÒªÐòÁл¯ºÍÈ»ºóת»¯Îª¶þ½øÖÆÊý×é¡£

ÏÖÔÚ¿Í»§¶Ë·¢ËÍ´úÂëÒѾ­Ð´ºÃÁË£¬ÔËÐÐÖ®ºó£¬ÏûÏ¢»á·¢²¼µ½RabbitMQµÄÏûÏ¢¶ÓÁÐÖУ¬ÏÖÔÚÐèÒª±àд·þÎñ¶ËµÄ´úÂëÁ¬½Óµ½RabbitMQÉÏÈ¥»ñÈ¡ÕâЩÏûÏ¢¡£

ͬÑù£¬´´½¨Ò»¸öÃûΪReceiveµÄ·þÎñ¶Ë¿ØÖÆÌ¨Ó¦ÓóÌÐò£¬·þÎñ¶Ë´úÂëÈçÏ£º

static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "yy";
factory.Password = "hello!"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null); var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer); Console.WriteLine(" waiting for message.");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received {0}", message); }
}
}
}

ºÍ·¢ËÍÒ»Ñù£¬Ê×ÏÈÐèÒª¶¨ÒåÁ¬½Ó£¬È»ºóÉùÃ÷ÏûÏ¢¶ÓÁС£Òª½ÓÊÕÏûÏ¢£¬ÐèÒª¶¨ÒåÒ»¸öConsume£¬È»ºó´ÓÏûÏ¢¶ÓÁÐÖв»¶ÏDequeueÏûÏ¢£¬È»ºó´¦Àí¡£

ÏÖÔÚ·¢ËͶ˺ͽÓÊն˵ĴúÂ붼дºÃÁË£¬ÔËÐз¢ËͶˣ¬·¢ËÍÏûÏ¢£º

ÏÖÔÚ£¬ÃûΪhelloµÄÏûÏ¢¶ÓÁÐÖУ¬·¢ËÍÁËÒ»ÌõÏûÏ¢¡£ÕâÌõÏûÏ¢´æ´¢µ½ÁËRabbitMQµÄ·þÎñÆ÷ÉÏÁË¡£Ê¹ÓÃrabbitmqctl µÄlist_queues¿ÉÒԲ鿴ËùÓеÄÏûÏ¢¶ÓÁУ¬ÒÔ¼°ÀïÃæµÄÏûÏ¢¸öÊý£¬¿ÉÒÔ¿´µ½£¬Ä¿Ç°RabbitmqÉÏÖ»ÓÐÒ»¸öÏûÏ¢¶ÓÁУ¬ÀïÃæÖ»ÓÐÒ»ÌõÏûÏ¢£º

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues
Listing queues ...
hello 1

ÏÖÔÚÔËÐнÓÊն˳ÌÐò£¬ÈçÏ£º

¿ÉÒÔ¿´µ½£¬ÒѾ­½ÓÊܵ½Á˿ͻ§¶Ë·¢Ë͵ÄHello World£¬ÏÖÔÚÔÙÀ´¿´RabitMQÉϵÄÏûÏ¢¶ÓÁÐÐÅÏ¢£º

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues
Listing queues ...
hello 0

¿ÉÒÔ¿´µ½£¬helloÕâ¸ö¶ÓÁÐÖеÄÏûÏ¢¶ÓÁиöÊýΪ0£¬Õâ±íʾ£¬µ±½ÓÊÕ¶Ë£¬½ÓÊÕµ½ÏûÏ¢Ö®ºó£¬RabbitMQÉϾͰÑÕâ¸öÏûϢɾµôÁË¡£

2.2 ¹¤×÷¶ÓÁÐ

Ç°ÃæµÄÀý×ÓչʾÁËÈçºÎÍùÒ»¸öÖ¸¶¨µÄÏûÏ¢¶ÓÁÐÖз¢ËͺÍÊÕÈ¡ÏûÏ¢¡£ÏÖÔÚÎÒÃÇ´´½¨Ò»¸ö¹¤×÷¶ÓÁУ¨work queue£©À´½«Ò»Ð©ºÄʱµÄÈÎÎñ·Ö·¢¸ø¶à¸ö¹¤×÷Õߣ¨workers£©£º

¹¤×÷¶ÓÁУ¨work queues, ÓÖ³ÆÈÎÎñ¶ÓÁÐTask Queues£©µÄÖ÷Ҫ˼ÏëÊÇΪÁ˱ÜÃâÁ¢¼´Ö´Ðв¢µÈ´ýһЩռÓôóÁ¿×ÊÔ´¡¢Ê±¼äµÄ²Ù×÷Íê³É¡£¶øÊǰÑÈÎÎñ£¨Task£©µ±×÷ÏûÏ¢·¢Ë͵½¶ÓÁÐÖУ¬ÉÔºó´¦Àí¡£Ò»¸öÔËÐÐÔÚºǫ́µÄ¹¤×÷Õߣ¨worker£©½ø³Ì¾Í»áÈ¡³öÈÎÎñÈ»ºó´¦Àí¡£µ±ÔËÐжà¸ö¹¤×÷Õߣ¨workers£©Ê±£¬ÈÎÎñ»áÔÚËüÃÇÖ®¼ä¹²Ïí¡£

Õâ¸öÔÚÍøÂçÓ¦ÓÃÖзdz£ÓÐÓã¬Ëü¿ÉÒÔÔÚ¶ÌÔݵÄHTTPÇëÇóÖд¦ÀíһЩ¸´ÔÓµÄÈÎÎñ¡£ÔÚһЩʵʱÐÔÒªÇó²»Ì«¸ßµÄµØ·½£¬ÎÒÃÇ¿ÉÒÔ´¦ÀíÍêÖ÷Òª²Ù×÷Ö®ºó£¬ÒÔÏûÏ¢µÄ·½Ê½À´´¦ÀíÆäËûµÄ²»½ôÒªµÄ²Ù×÷£¬±ÈÈçдÈÕÖ¾µÈµÈ¡£

×¼±¸

ÔÚµÚÒ»²¿·Ö£¬·¢ËÍÁËÒ»¸ö°üº¬¡°Hello World!¡±µÄ×Ö·û´®ÏûÏ¢¡£ÏÖÔÚ·¢ËÍһЩ×Ö·û´®£¬°ÑÕâЩ×Ö·û´®µ±×÷¸´ÔÓµÄÈÎÎñ¡£ÕâÀïʹÓÃtime.sleep()º¯ÊýÀ´Ä£ÄâºÄʱµÄÈÎÎñ¡£ÔÚ×Ö·û´®ÖмÓÉϵãºÅ£¨.£©À´±íʾÈÎÎñµÄ¸´Ôӳ̶ȣ¬Ò»¸öµã£¨.£©½«»áºÄʱ1ÃëÖÓ¡£±ÈÈ硱Hello¡­¡±¾Í»áºÄʱ3ÃëÖÓ¡£

¶Ô֮ǰʾÀýµÄsend.cs×öЩ¼òµ¥µÄµ÷Õû£¬ÒÔ±ã¿ÉÒÔ·¢ËÍËæÒâµÄÏûÏ¢¡£Õâ¸ö³ÌÐò»á°´Õռƻ®·¢ËÍÈÎÎñµ½ÎÒÃǵŤ×÷¶ÓÁÐÖС£

static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "yy";
factory.Password = "hello!"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null);
string message = GetMessage(args);
var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "hello", properties, body);
Console.WriteLine(" set {0}", message);
}
} Console.ReadKey();
} private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

¼Ó´Ö²¿·ÖÊǾ­¹ýÐ޸ĹýÁ˵ġ£

½Ó×ÅÎÒÃÇÐ޸ĽÓÊÕ¶Ë£¬ÈÃËû¸ù¾ÝÏûÏ¢ÖеĶºµãµÄ¸öÊýÀ´Sleep¶ÔÓ¦µÄÃëÊý£º

static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "yy";
factory.Password = "hello!"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null); using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null); var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer); while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body;
var message = Encoding.UTF8.GetString(body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine("Received {0}", message);
Console.WriteLine("Done");
}
}
}
}

ÂÖѯ·Ö·¢

ʹÓù¤×÷¶ÓÁеÄÒ»¸öºÃ´¦¾ÍÊÇËüÄܹ»²¢ÐеĴ¦Àí¶ÓÁС£Èç¹û¶Ñ»ýÁ˺ܶàÈÎÎñ£¬ÎÒÃÇÖ»ÐèÒªÌí¼Ó¸ü¶àµÄ¹¤×÷Õߣ¨workers£©¾Í¿ÉÒÔÁË£¬À©Õ¹ºÜ¼òµ¥¡£

ÏÖÔÚ£¬ÎÒÃÇÏÈÆô¶¯Á½¸ö½ÓÊÕ¶Ë£¬µÈ´ý½ÓÊÜÏûÏ¢£¬È»ºóÆô¶¯Ò»¸ö·¢ËͶ˿ªÊ¼·¢ËÍÏûÏ¢¡£

ÔÚcmdÌõ¼þÏ£¬·¢ËÍÁË5ÌõÏûÏ¢£¬Ã¿ÌõÏûÏ¢ºóÃæµÄ¶ºµã±íʾ¸ÃÏûÏ¢ÐèÒªÖ´ÐеÄʱ³¤£¬À´Ä£ÄâºÄʱµÄ²Ù×÷¡£

È»ºó¿ÉÒÔ¿´µ½£¬Á½¸ö½ÓÊÕ¶ËÒÀ´Î½ÓÊÕµ½ÁË·¢³öµÄÏûÏ¢£º

ĬÈÏ£¬RabbitMQ»á½«Ã¿¸öÏûÏ¢°´ÕÕ˳ÐòÒÀ´Î·Ö·¢¸øÏÂÒ»¸öÏû·ÑÕß¡£ËùÒÔÿ¸öÏû·ÑÕß½ÓÊÕµ½µÄÏûÏ¢¸öÊý´óÖÂÊÇÆ½¾ùµÄ¡£ ÕâÖÖÏûÏ¢·Ö·¢µÄ·½Ê½³ÆÖ®ÎªÂÖѯ£¨round-robin£©¡£

2.3 ÏûÏ¢ÏìÓ¦

µ±´¦ÀíÒ»¸ö±È½ÏºÄʱµÃÈÎÎñµÄʱºò£¬Ò²ÐíÏëÖªµÀÏû·ÑÕߣ¨consumers£©ÊÇ·ñÔËÐе½Ò»°ë¾Í¹Òµô¡£ÔÚµ±Ç°µÄ´úÂëÖУ¬µ±RabbitMQ½«ÏûÏ¢·¢Ë͸øÏû·ÑÕߣ¨consumers£©Ö®ºó£¬ÂíÉϾͻὫ¸ÃÏûÏ¢´Ó¶ÓÁÐÖÐÒÆ³ý¡£´Ëʱ£¬Èç¹û°Ñ´¦ÀíÕâ¸öÏûÏ¢µÄ¹¤×÷Õߣ¨worker£©Í£µô£¬ÕýÔÚ´¦ÀíµÄÕâÌõÏûÏ¢¾Í»á¶ªÊ§¡£Í¬Ê±£¬ËùÓз¢Ë͵½Õâ¸ö¹¤×÷ÕߵĻ¹Ã»Óд¦ÀíµÄÏûÏ¢¶¼»á¶ªÊ§¡£

ÎÒÃDz»Ï붪ʧÈκÎÈÎÎñÏûÏ¢¡£Èç¹ûÒ»¸ö¹¤×÷Õߣ¨worker£©¹ÒµôÁË£¬ÎÒÃÇÏ£Íû¸ÃÏûÏ¢»áÖØÐ·¢Ë͸øÆäËûµÄ¹¤×÷Õߣ¨worker£©¡£

ΪÁË·ÀÖ¹ÏûÏ¢¶ªÊ§£¬RabbitMQÌṩÁËÏûÏ¢ÏìÓ¦£¨acknowledgments£©»úÖÆ¡£Ïû·ÑÕß»áͨ¹ýÒ»¸öack£¨ÏìÓ¦£©£¬¸æËßRabbitMQÒѾ­ÊÕµ½²¢´¦ÀíÁËijÌõÏûÏ¢£¬È»ºóRabbitMQ²Å»áÊͷآɾ³ýÕâÌõÏûÏ¢¡£

Èç¹ûÏû·ÑÕߣ¨consumer£©¹ÒµôÁË£¬Ã»Óз¢ËÍÏìÓ¦£¬RabbitMQ¾Í»áÈÏΪÏûϢûÓб»ÍêÈ«´¦Àí£¬È»ºóÖØÐ·¢Ë͸øÆäËûÏû·ÑÕߣ¨consumer£©¡£ÕâÑù£¬¼´Ê¹¹¤×÷Õߣ¨workers£©Å¼¶ûµÄ¹Òµô£¬Ò²²»»á¶ªÊ§ÏûÏ¢¡£

ÏûÏ¢ÊÇûÓг¬Ê±Õâ¸ö¸ÅÄîµÄ£»µ±¹¤×÷ÕßÓëËü¶Ï¿ªÁ¬µÄʱºò£¬RabbitMQ»áÖØÐ·¢ËÍÏûÏ¢¡£ÕâÑùÔÚ´¦ÀíÒ»¸öºÄʱ·Ç³£³¤µÄÏûÏ¢ÈÎÎñµÄʱºò¾Í²»»á³öÎÊÌâÁË¡£

ÏûÏ¢ÏìӦĬÈÏÊÇ¿ªÆôµÄ¡£ÔÚ֮ǰµÄÀý×ÓÖÐʹÓÃÁËno_ack=True±êʶ°ÑËü¹Ø±Õ¡£ÊÇʱºòÒÆ³ýÕâ¸ö±êʶÁË£¬µ±¹¤×÷Õߣ¨worker£©Íê³ÉÁËÈÎÎñ£¬¾Í·¢ËÍÒ»¸öÏìÓ¦¡£

channel.BasicConsume("hello", false, consumer);
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body;
var message = Encoding.UTF8.GetString(body); int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000); int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000); Console.WriteLine("Received {0}", message);
Console.WriteLine("Done"); channel.BasicAck(ea.DeliveryTag, false);
}

ÏÖÔÚ,¿ÉÒÔ±£Ö¤,¼´Ê¹ÕýÔÚ´¦ÀíÏûÏ¢µÄ¹¤×÷Õß±»Í£µô,ÕâЩÏûÏ¢Ò²²»»á¶ªÊ§,ËùÓÐûÓб»Ó¦´ðµÄÏûÏ¢»á±»ÖØÐ·¢Ë͸øÆäËû¹¤×÷Õß.

Ò»¸öºÜ³£¼ûµÄ´íÎó¾ÍÊÇÍüµôÁËBasicAckÕâ¸ö·½·¨,Õâ¸ö´íÎóºÜ³£¼û,µ«ÊǺó¹ûºÜÑÏÖØ. µ±¿Í»§¶ËÍ˳öʱ,´ý´¦ÀíµÄÏûÏ¢¾Í»á±»ÖØÐ·ַ¢,µ«ÊÇRabitMQ»áÏûºÄÔ½À´Ô½¶àµÄÄÚ´æ,ÒòΪÕâЩûÓб»Ó¦´ðµÄÏûÏ¢²»Äܹ»±»ÊÍ·Å¡£µ÷ÊÔÕâÖÖcase£¬¿ÉÒÔʹÓÃrabbitmqct´òÓ¡messages_unacknoledged×ֶΡ£

rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.

2.4 ÏûÏ¢³Ö¾Ã»¯

Ç°ÃæÒѾ­¸ã¶¨Á˼´Ê¹Ïû·ÑÕßdownµô£¬ÈÎÎñÒ²²»»á¶ªÊ§£¬µ«ÊÇ£¬Èç¹ûRabbitMQ ServerÍ£µôÁË£¬ÄÇôÕâЩÏûÏ¢»¹Êǻᶪʧ¡£

µ±RabbitMQ Server ¹Ø±Õ»òÕß±ÀÀ££¬ÄÇôÀïÃæ´æ´¢µÄ¶ÓÁкÍÏûϢĬÈÏÊDz»»á±£´æÏÂÀ´µÄ¡£Èç¹ûÒªÈÃRabbitMQ±£´æ×¡ÏûÏ¢£¬ÐèÒªÔÚÁ½¸öµØ·½Í¬Ê±ÉèÖãºÐèÒª±£Ö¤¶ÓÁкÍÏûÏ¢¶¼Êdz־û¯µÄ¡£

Ê×ÏÈ£¬Òª±£Ö¤RabbitMQ²»»á¶ªÊ§¶ÓÁУ¬ËùÒÔÒª×öÈçÏÂÉèÖãº

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

ËäÈ»ÔÚÓï·¨ÉÏÊÇÕýÈ·µÄ£¬µ«ÊÇÔÚĿǰ½×¶ÎÊDz»ÕýÈ·µÄ£¬ÒòΪÎÒÃÇ֮ǰÒѾ­¶¨ÒåÁËÒ»¸ö·Ç³Ö¾Ã»¯µÄhello¶ÓÁС£RabbitMQ²»ÔÊÐíÎÒÃÇʹÓò»Í¬µÄ²ÎÊýÖØÐ¶¨ÒåÒ»¸öÒѾ­´æÔÚµÄͬÃû¶ÓÁУ¬Èç¹ûÕâÑù×ö¾Í»á±¨´í¡£ÏÖÔÚ£¬¶¨ÒåÁíÍâÒ»¸ö²»Í¬Ãû³ÆµÄ¶ÓÁУº

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare Õâ¸ö¸Ä¶¯ÐèÒªÔÚ·¢ËͶ˺ͽÓÊÕ¶ËͬʱÉèÖá£

ÏÖÔÚ±£Ö¤ÁËtask_queueÕâ¸öÏûÏ¢¶ÓÁм´Ê¹ÔÚRabbitMQ ServerÖØÆôÖ®ºó£¬¶ÓÁÐÒ²²»»á¶ªÊ§¡£ È»ºóÐèÒª±£Ö¤ÏûÏ¢Ò²Êdz־û¯µÄ£¬ Õâ¿ÉÒÔͨ¹ýÉèÖÃIBasicProperties.SetPersistent ΪtrueÀ´ÊµÏÖ£º

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

ÐèҪעÒâµÄÊÇ£¬½«ÏûÏ¢ÉèÖÃΪ³Ö¾Ã»¯²¢²»ÄÜÍêÈ«±£Ö¤ÏûÏ¢²»¶ªÊ§¡£ËäÈ»Ëû¸æËßRabbitMQ½«ÏûÏ¢±£´æµ½´ÅÅÌÉÏ£¬µ«ÊÇÔÚRabbitMQ½ÓÊÕµ½ÏûÏ¢ºÍ½«Æä±£´æµ½´ÅÅÌÉÏÕâÖ®¼äÈÔÈ»ÓÐÒ»¸öСµÄʱ¼ä´°¿Ú¡£ RabbitMQ ¿ÉÄÜÖ»Êǽ«ÏûÏ¢±£´æµ½ÁË»º´æÖУ¬²¢Ã»Óн«ÆäдÈëµ½´ÅÅÌÉÏ¡£³Ö¾Ã»¯ÊDz»Äܹ»Ò»¶¨±£Ö¤µÄ£¬µ«ÊǶÔÓÚÒ»¸ö¼òµ¥ÈÎÎñ¶ÓÁÐÀ´ËµÒѾ­×ã¹»¡£Èç¹ûÐèÒªÏûÏ¢¶ÓÁг־û¯µÄÇ¿±£Ö¤£¬¿ÉÒÔʹÓÃpublisher confirms

2.5 ¹«Æ½·Ö·¢

Äã¿ÉÄÜ»á×¢Òâµ½£¬ÏûÏ¢µÄ·Ö·¢¿ÉÄܲ¢Ã»ÓÐÈçÎÒÃÇÏëÒªµÄÄÇÑù¹«Æ½·ÖÅä¡£±ÈÈ磬¶ÔÓÚÁ½¸ö¹¤×÷Õß¡£µ±ÆæÊý¸öÏûÏ¢µÄÈÎÎñ±È½ÏÖØ£¬µ«ÊÇżÊý¸öÏûÏ¢ÈÎÎñ±È½ÏÇáʱ£¬ÆæÊý¸ö¹¤×÷ÕßʼÖÕ´¦Àíæµ״̬£¬¶øÅ¼Êý¸ö¹¤×÷ÕßʼÖÕ´¦Àí¿ÕÏÐ״̬¡£µ«ÊÇRabbitMQ²¢²»ÖªµÀÕâЩ£¬ËûÈÔÈ»»áƽ¾ùÒÀ´ÎµÄ·Ö·¢ÏûÏ¢¡£

ΪÁ˸ıäÕâһ״̬£¬ÎÒÃÇ¿ÉÒÔʹÓÃbasicQos·½·¨£¬ÉèÖÃperfetchCount=1 ¡£ÕâÑù¾Í¸æËßRabbitMQ ²»ÒªÔÚͬһʱ¼ä¸øÒ»¸ö¹¤×÷Õß·¢ËͶàÓÚ1¸öµÄÏûÏ¢£¬»òÕß»»¾ä»°Ëµ¡£ÔÚÒ»¸ö¹¤×÷Õß»¹ÔÚ´¦ÀíÏûÏ¢£¬²¢ÇÒûÓÐÏìÓ¦ÏûϢ֮ǰ£¬²»Òª¸øËû·Ö·¢ÐµÄÏûÏ¢¡£Ïà·´£¬½«ÕâÌõеÄÏûÏ¢·¢Ë͸øÏÂÒ»¸ö²»ÄÇô浵Ť×÷Õß¡£

channel.BasicQos(0, 1, false);

2.6 ÍêÕûʵÀý

ÏÖÔÚ½«ËùÓÐÕâЩ·ÅÔÚÒ»Æð£º

·¢ËͶ˴úÂëÈçÏ£º

static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "yy";
factory.Password = "hello!"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{ bool durable = true;
channel.QueueDeclare("task_queue", durable, false, false, null); string message = GetMessage(args);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true); var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "task_queue", properties, body);
Console.WriteLine(" set {0}", message);
}
} Console.ReadKey();
} Console.ReadKey();
} private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

½ÓÊÕ¶Ë´úÂëÈçÏ£º

static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "yy";
factory.Password = "hello!"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
bool durable = true;
channel.QueueDeclare("task_queue", durable, false, false, null);
channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("task_queue", false, consumer); var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("task_queue", false, consumer);while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body;
var message = Encoding.UTF8.GetString(body); int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000); int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000); Console.WriteLine("Received {0}", message);
Console.WriteLine("Done"); channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}

Èý ×ܽá

±¾Îļòµ¥½éÉÜÁËÏûÏ¢¶ÓÁеÄÏà¹Ø¸ÅÄ²¢½éÉÜÁËRabbitMQÏûÏ¢´úÀíµÄ»ù±¾Ô­ÀíÒÔ¼°ÔÚWindows ÉÏÈçºÎ°²×°RabbitMQºÍÔÚ.NETÖÐÈçºÎʹÓÃRabbitMQ¡£ÏûÏ¢¶ÓÁÐÔÚ¹¹½¨·Ö²¼Ê½ÏµÍ³ºÍÌá¸ßϵͳµÄ¿ÉÀ©Õ¹ÐÔºÍÏìÓ¦ÐÔ·½ÃæÓÐןÜÖØÒªµÄ×÷Óã¬Ï£Íû±¾ÎĶÔÄúÁ˽âÏûÏ¢¶ÓÁÐÒÔ¼°ÈçºÎʹÓÃRabbitMQÓÐËù°ïÖú¡£

   
2966 ´Îä¯ÀÀ       27
 
Ïà¹ØÎÄÕÂ

Éî¶È½âÎö£ºÇåÀíÀôúÂë
ÈçºÎ±àд³öÓµ±§±ä»¯µÄ´úÂë
ÖØ¹¹-ʹ´úÂë¸ü¼ò½àÓÅÃÀ
ÍŶÓÏîÄ¿¿ª·¢"±àÂë¹æ·¶"ϵÁÐÎÄÕÂ
 
Ïà¹ØÎĵµ

ÖØ¹¹-¸ÄÉÆ¼ÈÓдúÂëµÄÉè¼Æ
Èí¼þÖØ¹¹v2
´úÂëÕû½àÖ®µÀ
¸ßÖÊÁ¿±à³Ì¹æ·¶
 
Ïà¹Ø¿Î³Ì

»ùÓÚHTML5¿Í»§¶Ë¡¢Web¶ËµÄÓ¦Óÿª·¢
HTML 5+CSS ¿ª·¢
ǶÈëʽC¸ßÖÊÁ¿±à³Ì
C++¸ß¼¶±à³Ì
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

ʹÓÃdecj¼ò»¯Webǰ¶Ë¿ª·¢
Web¿ª·¢¿ò¼ÜÐγÉÖ®ÂÃ
¸üÓÐЧÂʵÄʹÓÃVisual Studio
MVP+WCF+Èý²ã½á¹¹´î½¨¿ò¼Ü
ASP.NETÔËÐлúÖÆÇ³Îö¡¾Í¼½â¡¿
±àд¸üºÃµÄC#´úÂë
10¸öVisual Studio¿ª·¢µ÷ÊÔ¼¼ÇÉ

.NET¿ò¼ÜÓë·Ö²¼Ê½Ó¦Óüܹ¹Éè¼Æ
.NET & WPF & WCFÓ¦Óÿª·¢
UML&.Net¼Ü¹¹Éè¼Æ
COM×é¼þ¿ª·¢
.NetÓ¦Óÿª·¢
InstallShield

ÈÕÕÕ¸Û .NET Framework & WCFÓ¦Óÿª·¢
Éñ»ªÐÅÏ¢ .NETµ¥Ôª²âÊÔ
±±¾© .NetÓ¦ÓÃÈí¼þϵͳ¼Ü¹¹
̨´ïµç×Ó .NET³ÌÐòÉè¼ÆÓ뿪·¢
ÈüÃÅÌú¿Ë C#Óë.NET¼Ü¹¹Éè¼Æ
¹ã¶«ºËµç .NetÓ¦ÓÃϵͳ¼Ü¹¹