ÔÚÆóÒµÓ¦ÓÃϵͳÁìÓò£¬»áÃæ¶Ô²»Í¬ÏµÍ³Ö®¼äµÄͨÐÅ¡¢¼¯³ÉÓëÕûºÏ£¬ÓÈÆäµ±ÃæÁÙÒ칹ϵͳʱ£¬ÕâÖÖ·Ö²¼Ê½µÄµ÷ÓÃÓëͨÐűäµÃÔ½ÖØÒª¡£Æä´Î£¬ÏµÍ³ÖÐÒ»°ã»áÓкܶà¶ÔʵʱÐÔÒªÇ󲻸ߵĵ«ÊÇÖ´ÐÐÆðÀ´±È½Ï½ÏºÄʱµÄµØ·½£¬±ÈÈç·¢ËͶÌÐÅ£¬ÓʼþÌáÐÑ£¬¸üÐÂÎÄÕÂÔĶÁ¼ÆÊý£¬¼Ç¼Óû§²Ù×÷ÈÕÖ¾µÈµÈ£¬Èç¹ûʵʱ´¦ÀíµÄ»°£¬ÔÚÓû§·ÃÎÊÁ¿±È½Ï´óµÄÇé¿öÏ£¬¶ÔϵͳѹÁ¦±È½Ï´ó¡£
Ãæ¶ÔÕâЩÎÊÌ⣬һ°ãµÄÎÒÃǻὫÕâЩÇëÇ󣬷ÅÔÚÏûÏ¢¶ÓÁÐÖд¦Àí£»Ò칹ϵͳ֮¼äʹÓÃÏûÏ¢½øÐÐͨѶ¡£ÏûÏ¢´«µÝÏà½ÏÎļþ´«µÝÓëÔ¶³Ì¹ý³Ìµ÷Óã¨RPC£©¶øÑÔ£¬Ëƺõ¸üʤһ³ï£¬ÒòΪËü¾ßÓиüºÃµÄƽ̨ÎÞ¹ØÐÔ£¬²¢Äܹ»ºÜºÃµØÖ§³Ö²¢·¢ÓëÒì²½µ÷Óá£ËùÒÔÈç¹ûϵͳÖгöÏÖÁËÈçÏÂÇé¿ö:
1.¶Ô²Ù×÷µÄʵʱÐÔÒªÇ󲻸ߣ¬¶øÐèÒªÖ´ÐеÄÈÎÎñ¼«ÎªºÄʱ£»
2.´æÔÚÒ칹ϵͳ¼äµÄÕûºÏ£»
Ò»°ãµÄ¿ÉÒÔ¿¼ÂÇÒýÈëÏûÏ¢¶ÓÁС£¶ÔÓÚµÚÒ»ÖÖÇé¿ö£¬³£³£»áÑ¡ÔñÏûÏ¢¶ÓÁÐÀ´´¦ÀíÖ´ÐÐʱ¼ä½Ï³¤µÄÈÎÎñ¡£ÒýÈëµÄÏûÏ¢¶ÓÁоͳÉÁËÏûÏ¢´¦ÀíµÄ»º³åÇø¡£ÏûÏ¢¶ÓÁÐÒýÈëµÄÒ첽ͨÐÅ»úÖÆ£¬Ê¹µÃ·¢ËÍ·½ºÍ½ÓÊÕ·½¶¼²»Óõȴý¶Ô·½·µ»Ø³É¹¦ÏûÏ¢£¬¾Í¿ÉÒÔ¼ÌÐøÖ´ÐÐÏÂÃæµÄ´úÂ룬´Ó¶øÌá¸ßÁËÊý¾Ý´¦ÀíµÄÄÜÁ¦¡£ÓÈÆäÊǵ±·ÃÎÊÁ¿ºÍÊý¾ÝÁ÷Á¿½Ï´óµÄÇé¿öÏ£¬¾Í¿ÉÒÔ½áºÏÏûÏ¢¶ÓÁÐÓëºǫ́ÈÎÎñ£¬Í¨¹ý±Ü¿ª¸ß·åÆÚ¶Ô´óÊý¾Ý½øÐд¦Àí£¬¾Í¿ÉÒÔÓÐЧ½µµÍÊý¾Ý¿â´¦ÀíÊý¾ÝµÄ¸ººÉ¡£
ÔÚÇ°ÃæµÄһƪ½²½âCQRSģʽµÄÎÄÕÂÖУ¬ËùÓеĶÔϵͳµÄ״̬µÄ¸ü¸Ä¶¼ÊÇͨ¹ýʼþÀ´Íê³É£¬Ò»°ãµÄ½«Ê¼þ´æ´¢µ½ÏûÏ¢¶ÓÁÐÖУ¬È»ºó½øÐÐͳһµÄ´¦Àí¡£
±¾Îļòµ¥½éÉÜÔÚRabbitMQÕâÒ»ÏûÏ¢´úÀí¹¤¾ß£¬ÒÔ¼°ÔÚ.NETÖÐÈçºÎʹÓÃRabbitMQ.
Ò» »·¾³´î½¨
Ê×ÏÈ£¬ÓÉÓÚRabbitMQʹÓÃErlang±àдµÄ£¬ÐèÒªÔËÐÐÔÚErlangÔËÐÐʱ»·¾³ÉÏ£¬ËùÒÔÔÚ°²×°RabbitMQ
Server֮ǰÐèÒª°²×°Erlang ÔËÐÐʱ»·¾³£¬¿ÉÒÔµ½Erlang¹ÙÍøÏÂÔØ¶ÔӦƽ̨µÄ°²×°Îļþ¡£Èç¹ûûÓа²×°ÔËÐÐʱ»·¾³£¬°²×°RabbitMQ
ServerµÄʱºò£¬»áÌáʾÐèÒªÏȰ²×°Erlang»·¾³¡£ °²×°Íê³ÉÖ®ºó£¬È·±£ÒѾ½«ErlangµÄ°²×°Â·¾¶×¢²áµ½ÏµÍ³µÄ»·¾³±äÁ¿ÖС£°²×°ÍêErlangÖ®ºó£¬Õâ¸ö»·¾³»á×Ô¶¯ÉèÖã¬Èç¹ûûÓУ¬ÔÚadministrator»·¾³ÏÂÔÚ¿ØÖÆÌ¨ÏÂÃæÊäÈ룬Ҳ¿ÉÒÔÉèÖãº
Setx ERLANG_HOME ¡°D:\Program Files (x86)\erl6.3¡å |

È»ºó£¬È¥RabbitMQ¹ÙÍøÏÂÔØRabbitMQ Server·þÎñ¶Ë³ÌÐò£¬Ñ¡ÔñºÏÊÊµÄÆ½Ì¨°æ±¾ÏÂÔØ¡£°²×°Íê³ÉÖ®ºó£¬¾Í¿ÉÒÔ¿ªÊ¼Ê¹ÓÃÁË¡£
ÏÖÔھͿÉÒÔ¶ÔRabbitMQ Server½øÐÐÅäÖÃÁË¡£
Ê×ÏÈ£¬Çл»µ½RabbitMQ ServerµÄ°²×°Ä¿Â¼£º

ÔÚsbinÏÂÃæÓкܶàbatchÎļþ£¬ÓÃÀ´¿ØÖÆRabbitMQ Server£¬µ±È»ÄúÒ²¿ÉÒÔÖ±½ÓÔÚ°²×°¿ªÊ¼²Ëµ¥ÖÐÀ´Ö´ÐÐÏàÓ¦µÄ²Ù×÷£º

×î¼òµ¥µÄ·½Ê½ÊÇʹRabbitMQÒÔWindows ServiceµÄ·½Ê½ÔÚºǫ́ÔËÐУ¬ËùÒÔÎÒÃÇÐèÒªÒÔ¹ÜÀíԱȨÏÞ´ò¿ªcmd£¬È»ºóÇл»µ½sbinĿ¼Ï£¬Ö´ÐÐÕâÈýÌõÃüÁî¼´¿É£º
rabbitmq-service install rabbitmq-service enable rabbitmq-service start |

ÏÖÔÚRabbitMQµÄ·þÎñ¶ËÒѾÆô¶¯ÆðÀ´ÁË¡£
ÏÂÃæ¿ÉÒÔʹÓÃsbinĿ¼ÏÂÃæµÄrabbitmqctl.batÕâ¸ö½Å±¾À´²é¿´ºÍ¿ØÖÆ·þÎñ¶Ë״̬µÄ£¬ÔÚcmdÖÐÖ±½ÓÔËÐÐrabbitmqctl
status¡£Èç¹û¿´µ½ÒÔϽá¹û£º

ÏÔʾnodeûÓÐÁ¬½ÓÉÏ£¬ÐèÒªµ½C:\WindowsĿ¼Ï£¬½«.erlang.cookieÎļþ£¬¿½±´µ½Óû§Ä¿Â¼ÏÂ
C:\Users\{Óû§Ãû}£¬ÕâÊÇErlangµÄCookieÎļþ£¬ÔÊÐíÓëErlang½øÐн»»¥£¬ÏÖÔÚÖØ¸´ÔËÐиղŵÄÃüÁî¾Í»áµÃµ½ÈçÏÂÐÅÏ¢£º

RabbitMQ ServerÉÏÃæÒ²ÓÐÓû§¸ÅÄ°²×°ºÃÖ®ºó£¬Ê¹ÓÃrabbitmqctl list_usersÃüÁ¿ÉÒÔ¿´µ½ÉÏÃæÄ¿Ç°µÄÓû§£º

¿ÉÒÔ¿´µ½£¬ÏÖÔÚÖ»ÓÐÒ»¸ö½ÇɫΪadministratorµÄÃûΪguestµÄÓû§£¬Õâ¸öÊÇRabbitMQĬÈÏΪÎÒÃÇ´´½¨µÄ£¬ËûÓÐRabbitMQµÄËùÓÐȨÏÞ£¬Ò»°ãµÄ£¬ÎÒÃÇÐèҪн¨Ò»¸öÎÒÃÇ×Ô¼ºµÄÓû§£¬ÉèÖÃÃÜÂ룬²¢ÊÚÓèȨÏÞ£¬¿ÉÒÔʹÓÃÏÂÃæµÄÃüÁîÀ´Ö´ÐÐÕâÒ»²Ù×÷£º
rabbitmqctl add_user yy hello! rabbitmqctl set_permissions yy ".*" ".*" ".*" |

ÉÏÃæµÄÒ»ÌõÃüÁîÌí¼ÓÁËÒ»¸öÃûΪyyµÄÓû§£¬²¢ÉèÖÃÁËÃÜÂëhello£¡£¬ÏÂÃæµÄÃüÁîΪÓû§yy·Ö±ðÊÚÓè¶ÔËùÓÐÏûÏ¢¶ÓÁеÄÅäÖᢶÁºÍдµÄȨÏÞ¡£
ÏÖÔÚÎÒÃÇ¿ÉÒÔ½«Ä¬ÈϵÄguestÓû§É¾µô£¬Ê¹ÓÃÏÂÃæµÄÃüÁî¼´¿É£º
rabbitmqctl delete_user guest |
¶þ ¿ªÊ¼Ê¹ÓÃ
ÔÚ.NETÖÐʹÓÃRabbitMQÐèÒªÏÂÔØRabbitMQµÄ¿Í»§¶Ë³ÌÐò¼¯£¬¿ÉÒÔµ½¹ÙÍøÏÂÔØ£¬ÏÂÔØ½âѹºó¾Í¿ÉÒԵõ½RabbitMQ.Client.dll£¬Õâ¾ÍÊÇRabbitMQµÄ¿Í»§¶Ë¡£
ÔÚʹÓÃRabitMQ֮ǰ£¬ÐèÒª¶ÔÏÂÃæµÄ¼¸¸ö»ù±¾¸ÅÄî˵Ã÷һϣº
RabbitMQÊÇÒ»¸öÏûÏ¢´úÀí¡£Ëû´ÓÏûÏ¢Éú²úÕß(producers)ÄÇÀï½ÓÊÕÏûÏ¢£¬È»ºó°ÑÏûÏ¢Ë͸øÏûÏ¢Ïû·ÑÕߣ¨consumer£©ÔÚ·¢ËͺͽÓÊÜÖ®¼ä£¬ËûÄܹ»¸ù¾ÝÉèÖõĹæÔò½øÐзÓÉ£¬»º´æºÍ³Ö¾Ã»¯¡£
Ò»°ãÌáµ½RabbitMQºÍÏûÏ¢£¬¶¼Óõ½Ò»Ð©×¨ÓÐÃû´Ê¡£
1.Éú²ú(Producing)Òâ˼¾ÍÊÇ·¢ËÍ¡£·¢ËÍÏûÏ¢µÄ³ÌÐò¾ÍÊÇÒ»¸öÉú²úÕß(producer)¡£ÎÒÃÇÒ»°ãÓá±P¡±À´±íʾ£º

2.¶ÓÁÐ(queue)¾ÍÊÇÓÊÏäµÄÃû³Æ¡£ÏûϢͨ¹ýÄãµÄÓ¦ÓóÌÐòºÍRabbitMQ½øÐд«Ê䣬ËüÃÇÖ»ÄÜ´æ´¢ÔÚ¶ÓÁУ¨queue£©ÖС£
¶ÓÁУ¨queue£©ÈÝÁ¿Ã»ÓÐÏÞÖÆ£¬ÄãÒª´æ´¢¶àÉÙÏûÏ¢¶¼¿ÉÒÔ¡ª¡ª»ù±¾ÉÏÊÇÒ»¸öÎÞÏ޵Ļº³åÇø¡£¶à¸öÉú²úÕߣ¨producers£©Äܹ»°ÑÏûÏ¢·¢Ë͸øÍ¬Ò»¸ö¶ÓÁУ¬Í¬Ñù£¬¶à¸öÏû·ÑÕߣ¨consumers£©Ò²ÄÜ´Óͬһ¸ö¶ÓÁУ¨queue£©ÖлñÈ¡Êý¾Ý¡£¶ÓÁпÉÒÔ»³ÉÕâÑù£¨Í¼ÉÏÊǶÓÁеÄÃû³Æ£©£º

3.Ïû·Ñ£¨Consuming£©ºÍ»ñÈ¡ÏûÏ¢ÊÇÒ»ÑùµÄÒâ˼¡£Ò»¸öÏû·ÑÕߣ¨consumer£©¾ÍÊÇÒ»¸öµÈ´ý»ñÈ¡ÏûÏ¢µÄ³ÌÐò¡£ÎÒÃǰÑËü»×÷¡±C¡±£º

ͨ³££¬ÏûÏ¢Éú²úÕߣ¬ÏûÏ¢Ïû·ÑÕߺÍÏûÏ¢´úÀí²»ÔÚͬһ̨»úÆ÷ÉÏ¡£
2.1 Hello World
ΪÁËչʾRabbitMQµÄ»ù±¾Ê¹Óã¬ÎÒÃÇ·¢ËÍÒ»¸öHelloWorldÏûÏ¢£¬È»ºó½ÓÊÕ²¢´¦Àí¡£

Ê×ÏÈ´´½¨Ò»¸ö¿ØÖÆÌ¨³ÌÐò£¬ÓÃÀ´½«ÏûÏ¢·¢Ë͵½RabbitMQµÄÏûÏ¢¶ÓÁÐÖУ¬´úÂëÈçÏ£º
static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "yy"; factory.Password = "hello!";
using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("hello", false, false, false, null); string message = "Hello World"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", "hello", null, body); Console.WriteLine(" set {0}", message); } } }
|
Ê×ÏÈ£¬ÐèÒª´´½¨Ò»¸öConnectionFactory£¬ÉèÖÃÄ¿±ê£¬ÓÉÓÚÊÇÔÚ±¾»ú£¬ËùÒÔÉèÖÃΪlocalhost£¬Èç¹ûRabbitMQ²»ÔÚ±¾»ú£¬Ö»ÐèÒªÉèÖÃÄ¿±ê»úÆ÷µÄIPµØÖ·»òÕß»úÆ÷Ãû³Æ¼´¿É£¬È»ºóÉèÖÃÇ°Ãæ´´½¨µÄÓû§ÃûyyºÍÃÜÂëhello£¡¡£
½ô½Ó×ÅÒª´´½¨Ò»¸öChannel£¬Èç¹ûÒª·¢ËÍÏûÏ¢£¬ÐèÒª´´½¨Ò»¸ö¶ÓÁУ¬È»ºó½«ÏûÏ¢·¢²¼µ½Õâ¸ö¶ÓÁÐÖС£ÔÚ´´½¨¶ÓÁеÄʱºò£¬Ö»ÓÐRabbitMQÉϸöÓÁв»´æÔÚ£¬²Å»áÈ¥´´½¨¡£ÏûÏ¢ÊÇÒÔ¶þ½øÖÆÊý×éµÄÐÎʽ´«ÊäµÄ£¬ËùÒÔÈç¹ûÏûÏ¢ÊÇʵÌå¶ÔÏóµÄ»°£¬ÐèÒªÐòÁл¯ºÍÈ»ºóת»¯Îª¶þ½øÖÆÊý×é¡£
ÏÖÔÚ¿Í»§¶Ë·¢ËÍ´úÂëÒѾдºÃÁË£¬ÔËÐÐÖ®ºó£¬ÏûÏ¢»á·¢²¼µ½RabbitMQµÄÏûÏ¢¶ÓÁÐÖУ¬ÏÖÔÚÐèÒª±àд·þÎñ¶ËµÄ´úÂëÁ¬½Óµ½RabbitMQÉÏÈ¥»ñÈ¡ÕâЩÏûÏ¢¡£
ͬÑù£¬´´½¨Ò»¸öÃûΪReceiveµÄ·þÎñ¶Ë¿ØÖÆÌ¨Ó¦ÓóÌÐò£¬·þÎñ¶Ë´úÂëÈçÏ£º
static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "yy"; factory.Password = "hello!";
using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("hello", false, false, false, null);
var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume("hello", true, consumer);
Console.WriteLine(" waiting for message."); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message);
} } } }
|
ºÍ·¢ËÍÒ»Ñù£¬Ê×ÏÈÐèÒª¶¨ÒåÁ¬½Ó£¬È»ºóÉùÃ÷ÏûÏ¢¶ÓÁС£Òª½ÓÊÕÏûÏ¢£¬ÐèÒª¶¨ÒåÒ»¸öConsume£¬È»ºó´ÓÏûÏ¢¶ÓÁÐÖв»¶ÏDequeueÏûÏ¢£¬È»ºó´¦Àí¡£
ÏÖÔÚ·¢ËͶ˺ͽÓÊն˵ĴúÂ붼дºÃÁË£¬ÔËÐз¢ËͶˣ¬·¢ËÍÏûÏ¢£º

ÏÖÔÚ£¬ÃûΪhelloµÄÏûÏ¢¶ÓÁÐÖУ¬·¢ËÍÁËÒ»ÌõÏûÏ¢¡£ÕâÌõÏûÏ¢´æ´¢µ½ÁËRabbitMQµÄ·þÎñÆ÷ÉÏÁË¡£Ê¹ÓÃrabbitmqctl
µÄlist_queues¿ÉÒԲ鿴ËùÓеÄÏûÏ¢¶ÓÁУ¬ÒÔ¼°ÀïÃæµÄÏûÏ¢¸öÊý£¬¿ÉÒÔ¿´µ½£¬Ä¿Ç°RabbitmqÉÏÖ»ÓÐÒ»¸öÏûÏ¢¶ÓÁУ¬ÀïÃæÖ»ÓÐÒ»ÌõÏûÏ¢£º
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues Listing queues ... hello 1 |
ÏÖÔÚÔËÐнÓÊն˳ÌÐò£¬ÈçÏ£º

¿ÉÒÔ¿´µ½£¬ÒѾ½ÓÊܵ½Á˿ͻ§¶Ë·¢Ë͵ÄHello World£¬ÏÖÔÚÔÙÀ´¿´RabitMQÉϵÄÏûÏ¢¶ÓÁÐÐÅÏ¢£º
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues Listing queues ... hello 0 |
¿ÉÒÔ¿´µ½£¬helloÕâ¸ö¶ÓÁÐÖеÄÏûÏ¢¶ÓÁиöÊýΪ0£¬Õâ±íʾ£¬µ±½ÓÊÕ¶Ë£¬½ÓÊÕµ½ÏûÏ¢Ö®ºó£¬RabbitMQÉϾͰÑÕâ¸öÏûϢɾµôÁË¡£
2.2 ¹¤×÷¶ÓÁÐ
Ç°ÃæµÄÀý×ÓչʾÁËÈçºÎÍùÒ»¸öÖ¸¶¨µÄÏûÏ¢¶ÓÁÐÖз¢ËͺÍÊÕÈ¡ÏûÏ¢¡£ÏÖÔÚÎÒÃÇ´´½¨Ò»¸ö¹¤×÷¶ÓÁУ¨work queue£©À´½«Ò»Ð©ºÄʱµÄÈÎÎñ·Ö·¢¸ø¶à¸ö¹¤×÷Õߣ¨workers£©£º

¹¤×÷¶ÓÁУ¨work queues, ÓÖ³ÆÈÎÎñ¶ÓÁÐTask Queues£©µÄÖ÷Ҫ˼ÏëÊÇΪÁ˱ÜÃâÁ¢¼´Ö´Ðв¢µÈ´ýһЩռÓôóÁ¿×ÊÔ´¡¢Ê±¼äµÄ²Ù×÷Íê³É¡£¶øÊǰÑÈÎÎñ£¨Task£©µ±×÷ÏûÏ¢·¢Ë͵½¶ÓÁÐÖУ¬ÉÔºó´¦Àí¡£Ò»¸öÔËÐÐÔÚºǫ́µÄ¹¤×÷Õߣ¨worker£©½ø³Ì¾Í»áÈ¡³öÈÎÎñÈ»ºó´¦Àí¡£µ±ÔËÐжà¸ö¹¤×÷Õߣ¨workers£©Ê±£¬ÈÎÎñ»áÔÚËüÃÇÖ®¼ä¹²Ïí¡£
Õâ¸öÔÚÍøÂçÓ¦ÓÃÖзdz£ÓÐÓã¬Ëü¿ÉÒÔÔÚ¶ÌÔݵÄHTTPÇëÇóÖд¦ÀíһЩ¸´ÔÓµÄÈÎÎñ¡£ÔÚһЩʵʱÐÔÒªÇó²»Ì«¸ßµÄµØ·½£¬ÎÒÃÇ¿ÉÒÔ´¦ÀíÍêÖ÷Òª²Ù×÷Ö®ºó£¬ÒÔÏûÏ¢µÄ·½Ê½À´´¦ÀíÆäËûµÄ²»½ôÒªµÄ²Ù×÷£¬±ÈÈçдÈÕÖ¾µÈµÈ¡£
×¼±¸
ÔÚµÚÒ»²¿·Ö£¬·¢ËÍÁËÒ»¸ö°üº¬¡°Hello World!¡±µÄ×Ö·û´®ÏûÏ¢¡£ÏÖÔÚ·¢ËÍһЩ×Ö·û´®£¬°ÑÕâЩ×Ö·û´®µ±×÷¸´ÔÓµÄÈÎÎñ¡£ÕâÀïʹÓÃtime.sleep()º¯ÊýÀ´Ä£ÄâºÄʱµÄÈÎÎñ¡£ÔÚ×Ö·û´®ÖмÓÉϵãºÅ£¨.£©À´±íʾÈÎÎñµÄ¸´Ôӳ̶ȣ¬Ò»¸öµã£¨.£©½«»áºÄʱ1ÃëÖÓ¡£±ÈÈ硱Hello¡¡±¾Í»áºÄʱ3ÃëÖÓ¡£
¶Ô֮ǰʾÀýµÄsend.cs×öЩ¼òµ¥µÄµ÷Õû£¬ÒÔ±ã¿ÉÒÔ·¢ËÍËæÒâµÄÏûÏ¢¡£Õâ¸ö³ÌÐò»á°´Õռƻ®·¢ËÍÈÎÎñµ½ÎÒÃǵŤ×÷¶ÓÁÐÖС£
static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "yy"; factory.Password = "hello!";
using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("hello", false, false, false, null); string message = GetMessage(args); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "hello",
properties, body);
Console.WriteLine(" set {0}", message);
}
}
Console.ReadKey(); }
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
|
¼Ó´Ö²¿·ÖÊǾ¹ýÐ޸ĹýÁ˵ġ£
½Ó×ÅÎÒÃÇÐ޸ĽÓÊÕ¶Ë£¬ÈÃËû¸ù¾ÝÏûÏ¢ÖеĶºµãµÄ¸öÊýÀ´Sleep¶ÔÓ¦µÄÃëÊý£º
static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "yy"; factory.Password = "hello!";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false,
false, false, null);
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false,
false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true,
consumer);
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
int dots = message.Split('.').Length - 1; Thread.Sleep(dots
* 1000);
Console.WriteLine("Received {0}",
message);
Console.WriteLine("Done");
}
}
}
}
|
ÂÖѯ·Ö·¢
ʹÓù¤×÷¶ÓÁеÄÒ»¸öºÃ´¦¾ÍÊÇËüÄܹ»²¢ÐеĴ¦Àí¶ÓÁС£Èç¹û¶Ñ»ýÁ˺ܶàÈÎÎñ£¬ÎÒÃÇÖ»ÐèÒªÌí¼Ó¸ü¶àµÄ¹¤×÷Õߣ¨workers£©¾Í¿ÉÒÔÁË£¬À©Õ¹ºÜ¼òµ¥¡£
ÏÖÔÚ£¬ÎÒÃÇÏÈÆô¶¯Á½¸ö½ÓÊÕ¶Ë£¬µÈ´ý½ÓÊÜÏûÏ¢£¬È»ºóÆô¶¯Ò»¸ö·¢ËͶ˿ªÊ¼·¢ËÍÏûÏ¢¡£

ÔÚcmdÌõ¼þÏ£¬·¢ËÍÁË5ÌõÏûÏ¢£¬Ã¿ÌõÏûÏ¢ºóÃæµÄ¶ºµã±íʾ¸ÃÏûÏ¢ÐèÒªÖ´ÐеÄʱ³¤£¬À´Ä£ÄâºÄʱµÄ²Ù×÷¡£
È»ºó¿ÉÒÔ¿´µ½£¬Á½¸ö½ÓÊÕ¶ËÒÀ´Î½ÓÊÕµ½ÁË·¢³öµÄÏûÏ¢£º

ĬÈÏ£¬RabbitMQ»á½«Ã¿¸öÏûÏ¢°´ÕÕ˳ÐòÒÀ´Î·Ö·¢¸øÏÂÒ»¸öÏû·ÑÕß¡£ËùÒÔÿ¸öÏû·ÑÕß½ÓÊÕµ½µÄÏûÏ¢¸öÊý´óÖÂÊÇÆ½¾ùµÄ¡£
ÕâÖÖÏûÏ¢·Ö·¢µÄ·½Ê½³ÆÖ®ÎªÂÖѯ£¨round-robin£©¡£
2.3 ÏûÏ¢ÏìÓ¦
µ±´¦ÀíÒ»¸ö±È½ÏºÄʱµÃÈÎÎñµÄʱºò£¬Ò²ÐíÏëÖªµÀÏû·ÑÕߣ¨consumers£©ÊÇ·ñÔËÐе½Ò»°ë¾Í¹Òµô¡£ÔÚµ±Ç°µÄ´úÂëÖУ¬µ±RabbitMQ½«ÏûÏ¢·¢Ë͸øÏû·ÑÕߣ¨consumers£©Ö®ºó£¬ÂíÉϾͻὫ¸ÃÏûÏ¢´Ó¶ÓÁÐÖÐÒÆ³ý¡£´Ëʱ£¬Èç¹û°Ñ´¦ÀíÕâ¸öÏûÏ¢µÄ¹¤×÷Õߣ¨worker£©Í£µô£¬ÕýÔÚ´¦ÀíµÄÕâÌõÏûÏ¢¾Í»á¶ªÊ§¡£Í¬Ê±£¬ËùÓз¢Ë͵½Õâ¸ö¹¤×÷ÕߵĻ¹Ã»Óд¦ÀíµÄÏûÏ¢¶¼»á¶ªÊ§¡£
ÎÒÃDz»Ï붪ʧÈκÎÈÎÎñÏûÏ¢¡£Èç¹ûÒ»¸ö¹¤×÷Õߣ¨worker£©¹ÒµôÁË£¬ÎÒÃÇÏ£Íû¸ÃÏûÏ¢»áÖØÐ·¢Ë͸øÆäËûµÄ¹¤×÷Õߣ¨worker£©¡£
ΪÁË·ÀÖ¹ÏûÏ¢¶ªÊ§£¬RabbitMQÌṩÁËÏûÏ¢ÏìÓ¦£¨acknowledgments£©»úÖÆ¡£Ïû·ÑÕß»áͨ¹ýÒ»¸öack£¨ÏìÓ¦£©£¬¸æËßRabbitMQÒѾÊÕµ½²¢´¦ÀíÁËijÌõÏûÏ¢£¬È»ºóRabbitMQ²Å»áÊͷآɾ³ýÕâÌõÏûÏ¢¡£
Èç¹ûÏû·ÑÕߣ¨consumer£©¹ÒµôÁË£¬Ã»Óз¢ËÍÏìÓ¦£¬RabbitMQ¾Í»áÈÏΪÏûϢûÓб»ÍêÈ«´¦Àí£¬È»ºóÖØÐ·¢Ë͸øÆäËûÏû·ÑÕߣ¨consumer£©¡£ÕâÑù£¬¼´Ê¹¹¤×÷Õߣ¨workers£©Å¼¶ûµÄ¹Òµô£¬Ò²²»»á¶ªÊ§ÏûÏ¢¡£
ÏûÏ¢ÊÇûÓг¬Ê±Õâ¸ö¸ÅÄîµÄ£»µ±¹¤×÷ÕßÓëËü¶Ï¿ªÁ¬µÄʱºò£¬RabbitMQ»áÖØÐ·¢ËÍÏûÏ¢¡£ÕâÑùÔÚ´¦ÀíÒ»¸öºÄʱ·Ç³£³¤µÄÏûÏ¢ÈÎÎñµÄʱºò¾Í²»»á³öÎÊÌâÁË¡£
ÏûÏ¢ÏìӦĬÈÏÊÇ¿ªÆôµÄ¡£ÔÚ֮ǰµÄÀý×ÓÖÐʹÓÃÁËno_ack=True±êʶ°ÑËü¹Ø±Õ¡£ÊÇʱºòÒÆ³ýÕâ¸ö±êʶÁË£¬µ±¹¤×÷Õߣ¨worker£©Íê³ÉÁËÈÎÎñ£¬¾Í·¢ËÍÒ»¸öÏìÓ¦¡£
channel.BasicConsume("hello", false, consumer);
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine("Received {0}",
message);
Console.WriteLine("Done");
channel.BasicAck(ea.DeliveryTag, false);
}
|
ÏÖÔÚ,¿ÉÒÔ±£Ö¤,¼´Ê¹ÕýÔÚ´¦ÀíÏûÏ¢µÄ¹¤×÷Õß±»Í£µô,ÕâЩÏûÏ¢Ò²²»»á¶ªÊ§,ËùÓÐûÓб»Ó¦´ðµÄÏûÏ¢»á±»ÖØÐ·¢Ë͸øÆäËû¹¤×÷Õß.
Ò»¸öºÜ³£¼ûµÄ´íÎó¾ÍÊÇÍüµôÁËBasicAckÕâ¸ö·½·¨,Õâ¸ö´íÎóºÜ³£¼û,µ«ÊǺó¹ûºÜÑÏÖØ. µ±¿Í»§¶ËÍ˳öʱ,´ý´¦ÀíµÄÏûÏ¢¾Í»á±»ÖØÐ·ַ¢,µ«ÊÇRabitMQ»áÏûºÄÔ½À´Ô½¶àµÄÄÚ´æ,ÒòΪÕâЩûÓб»Ó¦´ðµÄÏûÏ¢²»Äܹ»±»ÊÍ·Å¡£µ÷ÊÔÕâÖÖcase£¬¿ÉÒÔʹÓÃrabbitmqct´òÓ¡messages_unacknoledged×ֶΡ£
rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done. |
2.4 ÏûÏ¢³Ö¾Ã»¯
Ç°ÃæÒѾ¸ã¶¨Á˼´Ê¹Ïû·ÑÕßdownµô£¬ÈÎÎñÒ²²»»á¶ªÊ§£¬µ«ÊÇ£¬Èç¹ûRabbitMQ ServerÍ£µôÁË£¬ÄÇôÕâЩÏûÏ¢»¹Êǻᶪʧ¡£
µ±RabbitMQ Server ¹Ø±Õ»òÕß±ÀÀ££¬ÄÇôÀïÃæ´æ´¢µÄ¶ÓÁкÍÏûϢĬÈÏÊDz»»á±£´æÏÂÀ´µÄ¡£Èç¹ûÒªÈÃRabbitMQ±£´æ×¡ÏûÏ¢£¬ÐèÒªÔÚÁ½¸öµØ·½Í¬Ê±ÉèÖãºÐèÒª±£Ö¤¶ÓÁкÍÏûÏ¢¶¼Êdz־û¯µÄ¡£
Ê×ÏÈ£¬Òª±£Ö¤RabbitMQ²»»á¶ªÊ§¶ÓÁУ¬ËùÒÔÒª×öÈçÏÂÉèÖãº
bool durable = true; channel.QueueDeclare("hello", durable, false, false, null); |
ËäÈ»ÔÚÓï·¨ÉÏÊÇÕýÈ·µÄ£¬µ«ÊÇÔÚĿǰ½×¶ÎÊDz»ÕýÈ·µÄ£¬ÒòΪÎÒÃÇ֮ǰÒѾ¶¨ÒåÁËÒ»¸ö·Ç³Ö¾Ã»¯µÄhello¶ÓÁС£RabbitMQ²»ÔÊÐíÎÒÃÇʹÓò»Í¬µÄ²ÎÊýÖØÐ¶¨ÒåÒ»¸öÒѾ´æÔÚµÄͬÃû¶ÓÁУ¬Èç¹ûÕâÑù×ö¾Í»á±¨´í¡£ÏÖÔÚ£¬¶¨ÒåÁíÍâÒ»¸ö²»Í¬Ãû³ÆµÄ¶ÓÁУº
bool durable = true; channel.queueDeclare("task_queue", durable, false, false, null); |
queueDeclare Õâ¸ö¸Ä¶¯ÐèÒªÔÚ·¢ËͶ˺ͽÓÊÕ¶ËͬʱÉèÖá£
ÏÖÔÚ±£Ö¤ÁËtask_queueÕâ¸öÏûÏ¢¶ÓÁм´Ê¹ÔÚRabbitMQ ServerÖØÆôÖ®ºó£¬¶ÓÁÐÒ²²»»á¶ªÊ§¡£
È»ºóÐèÒª±£Ö¤ÏûÏ¢Ò²Êdz־û¯µÄ£¬ Õâ¿ÉÒÔͨ¹ýÉèÖÃIBasicProperties.SetPersistent
ΪtrueÀ´ÊµÏÖ£º
var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); |
ÐèҪעÒâµÄÊÇ£¬½«ÏûÏ¢ÉèÖÃΪ³Ö¾Ã»¯²¢²»ÄÜÍêÈ«±£Ö¤ÏûÏ¢²»¶ªÊ§¡£ËäÈ»Ëû¸æËßRabbitMQ½«ÏûÏ¢±£´æµ½´ÅÅÌÉÏ£¬µ«ÊÇÔÚRabbitMQ½ÓÊÕµ½ÏûÏ¢ºÍ½«Æä±£´æµ½´ÅÅÌÉÏÕâÖ®¼äÈÔÈ»ÓÐÒ»¸öСµÄʱ¼ä´°¿Ú¡£
RabbitMQ ¿ÉÄÜÖ»Êǽ«ÏûÏ¢±£´æµ½ÁË»º´æÖУ¬²¢Ã»Óн«ÆäдÈëµ½´ÅÅÌÉÏ¡£³Ö¾Ã»¯ÊDz»Äܹ»Ò»¶¨±£Ö¤µÄ£¬µ«ÊǶÔÓÚÒ»¸ö¼òµ¥ÈÎÎñ¶ÓÁÐÀ´ËµÒѾ×ã¹»¡£Èç¹ûÐèÒªÏûÏ¢¶ÓÁг־û¯µÄÇ¿±£Ö¤£¬¿ÉÒÔʹÓÃpublisher
confirms
2.5 ¹«Æ½·Ö·¢
Äã¿ÉÄÜ»á×¢Òâµ½£¬ÏûÏ¢µÄ·Ö·¢¿ÉÄܲ¢Ã»ÓÐÈçÎÒÃÇÏëÒªµÄÄÇÑù¹«Æ½·ÖÅä¡£±ÈÈ磬¶ÔÓÚÁ½¸ö¹¤×÷Õß¡£µ±ÆæÊý¸öÏûÏ¢µÄÈÎÎñ±È½ÏÖØ£¬µ«ÊÇżÊý¸öÏûÏ¢ÈÎÎñ±È½ÏÇáʱ£¬ÆæÊý¸ö¹¤×÷ÕßʼÖÕ´¦Àíæµ״̬£¬¶øÅ¼Êý¸ö¹¤×÷ÕßʼÖÕ´¦Àí¿ÕÏÐ״̬¡£µ«ÊÇRabbitMQ²¢²»ÖªµÀÕâЩ£¬ËûÈÔÈ»»áƽ¾ùÒÀ´ÎµÄ·Ö·¢ÏûÏ¢¡£
ΪÁ˸ıäÕâһ״̬£¬ÎÒÃÇ¿ÉÒÔʹÓÃbasicQos·½·¨£¬ÉèÖÃperfetchCount=1 ¡£ÕâÑù¾Í¸æËßRabbitMQ
²»ÒªÔÚͬһʱ¼ä¸øÒ»¸ö¹¤×÷Õß·¢ËͶàÓÚ1¸öµÄÏûÏ¢£¬»òÕß»»¾ä»°Ëµ¡£ÔÚÒ»¸ö¹¤×÷Õß»¹ÔÚ´¦ÀíÏûÏ¢£¬²¢ÇÒûÓÐÏìÓ¦ÏûϢ֮ǰ£¬²»Òª¸øËû·Ö·¢ÐµÄÏûÏ¢¡£Ïà·´£¬½«ÕâÌõеÄÏûÏ¢·¢Ë͸øÏÂÒ»¸ö²»ÄÇô浵Ť×÷Õß¡£
channel.BasicQos(0, 1, false); |
2.6 ÍêÕûʵÀý
ÏÖÔÚ½«ËùÓÐÕâЩ·ÅÔÚÒ»Æð£º
·¢ËͶ˴úÂëÈçÏ£º
static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "yy"; factory.Password = "hello!";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
bool durable = true;
channel.QueueDeclare("task_queue", durable,
false, false, null);
string message = GetMessage(args);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "task_queue",
properties, body);
Console.WriteLine(" set {0}", message);
}
}
Console.ReadKey();
}
Console.ReadKey();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join("
", args) : "Hello World!");
}
|
½ÓÊÕ¶Ë´úÂëÈçÏ£º
static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "yy"; factory.Password = "hello!";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
bool durable = true;
channel.QueueDeclare("task_queue", durable,
false, false, null);
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("task_queue", false,
consumer);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("task_queue", false,
consumer);while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine("Received {0}",
message);
Console.WriteLine("Done");
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
|
Èý ×ܽá
±¾Îļòµ¥½éÉÜÁËÏûÏ¢¶ÓÁеÄÏà¹Ø¸ÅÄ²¢½éÉÜÁËRabbitMQÏûÏ¢´úÀíµÄ»ù±¾ÔÀíÒÔ¼°ÔÚWindows ÉÏÈçºÎ°²×°RabbitMQºÍÔÚ.NETÖÐÈçºÎʹÓÃRabbitMQ¡£ÏûÏ¢¶ÓÁÐÔÚ¹¹½¨·Ö²¼Ê½ÏµÍ³ºÍÌá¸ßϵͳµÄ¿ÉÀ©Õ¹ÐÔºÍÏìÓ¦ÐÔ·½ÃæÓÐןÜÖØÒªµÄ×÷Óã¬Ï£Íû±¾ÎĶÔÄúÁ˽âÏûÏ¢¶ÓÁÐÒÔ¼°ÈçºÎʹÓÃRabbitMQÓÐËù°ïÖú¡£
|