RabbitMQÊÇÒ»¸öÏûÏ¢´úÀí£¬ºËÐÄÔÀí£º·¢ËÍÏûÏ¢£¬½ÓÊÕÏûÏ¢¡£
RabbitMQÖ÷ÒªÓÃÓÚ×é¼þÖ®¼äµÄ½âñÏûÏ¢·¢ËÍÕßÎÞÐèÖªµÀÏûϢʹÓÃÕߵĴæÔÚ£¬·´Ö®ÒàÈ»¡£

ÀýÈçÒ»¸öÈÕ־ϵͳ£¬ºÜÈÝÒ×ʹÓÃRabbitMQ¼ò»¯¹¤×÷Á¿£¬Ò»¸öConsumer½øÐÐÏûÏ¢µÄÕý³£´¦Àí£¬ÁíÒ»¸öConsumer¸´ÖƶÔÏûÏ¢½øÐÐÈÕÖ¾¼Ç¼£¬Ö»ÒªÔÚ³ÌÐòÖÐÖ¸¶¨Á½¸öConsumerËù¼àÌýµÄqueueÒÔÏàͬµÄ·½Ê½°ó¶¨µ½Í¬Ò»¸öexchange¼´¿É£¬Ê£ÏµÄÏûÏ¢·Ö·¢¹¤×÷ÓÉRabbitMQÍê³É¡£
Ê×ÏÈͨ¹ýÒ»¸ö·Ç³£¼òµ¥µÄ¡±hello world¡°Àý×Ó½éÉÜÈçºÎʹÓÃRabbitMQ£¬È»ºóÔÙ½éÉÜÆäÉæ¼°µÄ»ù±¾¸ÅÄî²¢¶Ô½»»»»úºÍ¶ÓÁжà×öµã½éÉÜ¡£
Ò»¡¢helloworldÀý×Ó
±¾Àý·Ç³£¼òµ¥¡ª¡ª·¢ËÍÒ»¸öÏûÏ¢¡±hello world¡°£¬È»ºó»ñÈ¡Ëü²¢Êä³öµ½ÆÁÄ»¡£
×ܹ²ÐèÒªÁ½¸ö³ÌÐò£¬Ò»¸ö·¢ËÍÏûÏ¢½Ðsend.py£¬Ò»¸ö½ÓÊÜÏûÏ¢²¢´òÓ¡ÏûÏ¢ÄÚÈݽÐreceive.py¡£

¸ÃͼΪhelloworldÀý×ÓµÄÔÀíͼ£ºÉú²úÕßsend.py£¨Productor£©°ÑÏûÏ¢£¨¡±hello world¡°£©·¢Ë͵½Ò»¸öÃûΪ¡±queue¡°µÄ¶ÓÁÐÖУ¬Ïû·ÑÕßreceive.py´ÓÕâ¸ö¶ÓÁÐÖлñÈ¡ÏûÏ¢¡£½ÓÏÂÀ´¿´´úÂ룺
send.py(·¢ËÍÏûÏ¢)
#!/usr/bin/env python
import pika
#µÚÒ»²½£¬Á¬½ÓRabbitMq·þÎñÆ÷
rabbit_username='xxx'
rabbit_password='xxx'
credentials = pika.PlainCredentials(rabbit_username, rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters( host='x.x.x.x',credentials=credentials))
#channelÊǽøÐÐÏûÏ¢¶ÁдµÄͨµÀ
channel = connection.channel()
#µÚ¶þ²½£¬´´½¨Ò»¸öÃûΪqueueµÄ¶ÓÁУ¬È»ºó°ÑÏûÏ¢·¢Ë͵½Õâ¸ö¶ÓÁÐ
channel.queue_declare(queue='queue')
#µÚÈý²½£¬ÏÖÔÚ¿ÉÒÔ·¢ËÍÏûÏ¢£¬µ«ÊÇRabbitMQ²»ÄܰÑÏûÏ¢Ö±½Ó·¢Ë͵½¶ÓÁУ¬Òª·¢Ë͵½½»»»Æ÷£¬Õâ¸öÉÔºó½éÉÜ£¬ÕâÀïʹÓÃĬÈϽ»»»Æ÷£¨exchange£©,ËüʹÓÃÒ»¸ö¿Õ×Ö·û´®±ê
#ʶ£¬routing_key²ÎÊý±ØÐëÖ¸¶¨Îª¶ÓÁÐÃû³Æ£¬ÕâÀïΪqueue
channel.basic_publish(exchange='',
routing_key='queue',
body='hello world')
print "send.py:send message 'hello world',wait for receive.py deal with this message"
#Í˳ö³ÌÐòǰ£¬Í¨¹ý¹Ø±ÕÁ¬½Ó±£Ö¤ÏûÏ¢ÒѾͶµÝµ½RabbitMq
connection.close()
|
receive.py£¨»ñÈ¡Êý¾Ý£©
<print ' [*] Waiting for messages. To exit press CTRL+C'
#!/usr/bin/env python
import pika
#µÚÒ»²½£¬Í¬ÑùÁ¬½ÓRabbitMq·þÎñÆ÷
rabbit_username='xxx'
rabbit_password='xxx'
credentials = pika.PlainCredentials(rabbit_username, rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters( host='x.x.x.x',credentials=credentials))
channel = connection.channel()
#Ϊȷ±£¶ÓÁдæÔÚ£¬ÔÙ´ÎÖ´ÐÐqueue_declare´´½¨Ò»¸ö¶ÓÁУ¬ÎÒÃÇ¿ÉÒÔ¶à´ÎÔËÐиÃÃüÁµ«ÊÇÖ»ÒªÒ»¸ö¶ÓÁлᴴ½¨
#ÒòΪ²»Äܱ£Ö¤send.pyÏÈÖ´Ðл¹ÊÇreceive.pyÏÈÖ´ÐУ¬ËùÒÔÖØ¸´ÉùÃ÷¶ÓÁÐÀ´È·±£Æä´æÔÚ
channel.queue_declare(queue='hellolxy')
#µÚÈý²½£¬¶¨ÒåÒ»¸ö»Øµ÷º¯Êý£¬µ±»ñµÃÏûϢʱ£¬Pika¿âµ÷ÓÃÕâ¸ö»Øµ÷º¯ÊýÀ´´¦ÀíÏûÏ¢£¬¸Ã»Øµ÷º¯Êý½«ÏûÏ¢ÄÚÈÝ´òÓ¡µ½ÆÁÄ»
def callback(ch, method, properties, body):
print "receive.py: Received message %r" % (body,)
#µÚËIJ½£¬¸æËßrabbbitMq»Øµ÷º¯Êý½«´Óqueue¶ÓÁнÓÊÕÏûÏ¢
channel.basic_consume(callback,
queue='queue',
no_ack=True)
#µÚÎå²½£¬ÊäÈëÒ»¸öÎÞÏÞÑ»·À´µÈ´ýÏûÏ¢Êý¾Ý²¢ÔËÐлص÷º¯Êý
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
|
ÏÖÔÚÔÚÖÕ¶ËÔËÐгÌÐò¡£Ê×ÏÈ£¬ÓÃsend.py·¢ËÍÒ»ÌõÏûÏ¢£º
<$ python send.py
send.py:send message 'hello world',wait for receive.py deal with this message
|
Éú²úÕß(producer)³ÌÐòsend.pyÿ´ÎÔËÐкó¾Í»áÍ£Ö¹¡£ÏÖÔÚÔËÐÐreceive.pyÀ´½ÓÊÕÏûÏ¢£º
<$ python receive.py
receive.py: Received message 'hello world'
[*] Waiting for messages. To exit press CTRL+C
|
³É¹¦ÁË£¡ÏÖÔÚÒѾͨ¹ýRabbitMQ·¢ËÍÁ˵ÚÒ»ÌõÏûÏ¢¡£µ«ÊÇreceive.py³ÌÐò²¢Ã»ÓÐÍ˳ö£¬ËüÒ»Ö±ÔÚ×¼±¸»ñÈ¡ÏûÏ¢£¬¿ÉÒÔͨ¹ýctrl-cÀ´ÖжÏËü¡£
¶þ¡¢RabbitMQ»ù±¾¸ÅÄî
×ܽáһϷ¢ËͽÓÊÕÏûÏ¢µÄ¹ý³Ì£º

ͨ¹ýÉÏÃæÀý×Ó¶ÔRabbitMQÓÐÒ»¸ö¸ÐÐÔÈÏʶºó£¬ÏÖÔÚÀ´½éÉÜRabbitMQÖеĻù±¾¸ÅÄî¡£
Broker:ÏûÏ¢¶ÓÁзþÎñÆ÷ʵÌå¡£
ÏûÏ¢£ºÃ¿¸öÏûÏ¢¶¼ÓÐÒ»¸ö·Óɼü(routing key)µÄÊôÐÔ¡£¾ÍÊÇÒ»¸ö¼òµ¥µÄ×Ö·û´®¡£
connection£ºÓ¦ÓóÌÐòÓëbrokerµÄÍøÂçÁ¬½Ó¡£
channel:¼¸ºõËùÓеIJÙ×÷¶¼ÔÚchannelÖнøÐУ¬channelÊǽøÐÐÏûÏ¢¶ÁдµÄͨµÀ¡£¿Í»§¶Ë¿É½¨Á¢¶à¸öchannel£¬Ã¿¸öchannel´ú±íÒ»¸ö»á»°ÈÎÎñ¡£
½»»»»ú£º½ÓÊÕÏûÏ¢£¬¸ù¾Ý·Óɼüת·¢ÏûÏ¢µ½°ó¶¨µÄ¶ÓÁС£
°ó¶¨£ºÒ»¸ö°ó¶¨¾ÍÊÇ»ùÓÚ·Óɼü½«½»»»»úºÍ¶ÓÁÐÁ¬½ÓÆðÀ´µÄ·ÓɹæÔò£¬ËùÒÔ½»»»»ú²»¹ý¾ÍÊÇÒ»¸öÓɰ󶨹¹³ÉµÄ·ÓÉ±í¡£
¾ÙÀý£ºÒ»¸ö¾ßÓзÓɼü“key1”µÄÏûÏ¢Òª·¢Ë͵½Á½¸ö¶ÓÁУ¬queueAºÍqueueB¡£Òª×öµ½Õâµã¾ÍÒª½¨Á¢Á½¸ö°ó¶¨£¬Ã¿¸ö°ó¶¨Á¬½ÓÒ»¸ö½»»»»úºÍÒ»¸ö¶ÓÁС£Á½Õß¶¼ÊÇÓÉ·Óɼü“key1”´¥·¢£¬ÕâÖÖÇé¿ö£¬½»»»»ú»á¸´ÖÆÒ»·ÝÏûÏ¢²¢°ÑËüÃÇ·Ö±ð·¢Ë͵½Á½¸ö¶ÓÁÐÖС£
¶ÓÁУºÏûÏ¢µÄÈÝÆ÷£¬Ò²ÊÇÏûÏ¢µÄÖյ㡣һ¸öÏûÏ¢¿ÉͶÈëÒ»¸ö»ò¶à¸ö¶ÓÁС£ÏûÏ¢Ò»Ö±ÔÚ¶ÓÁÐÀïÃæ£¬µÈ´ýÏû·ÑÕßÁ¬½Óµ½Õâ¸ö¶ÓÁн«ÆäÈ¡×ß¡£
Èý¡¢½»»»»ú
½»»»»úÓÃÀ´½ÓÊÕÏûÏ¢£¬×ª·¢ÏûÏ¢µ½°ó¶¨µÄ¶ÓÁУ¬ÊÇrabbitMqÖеĺËÐÄ¡£
½»»»»ú¹²ÓÐ4ÖÖÀàÐÍ£ºdirect£¬topic£¬headersºÍfanout¡£
Ϊʲô²»´´½¨Ò»ÖÖ½»»»»úÀ´´¦ÀíËùÓÐÀàÐ͵Ä·ÓɹæÔò£¿ÒòΪÿÖÖ¹æÔòÆ¥ÅäʱµÄCPU¿ªÏúÊDz»Í¬µÄ£¬ËùÒÔ¸ù¾Ý²»Í¬ÐèÇóÑ¡ÔñºÏÊʽ»»»»ú¡£
¾ÙÀý£ºÒ»¸ö"topic"ÀàÐ͵Ľ»»»»ú»á½«ÏûÏ¢µÄ·ÓɼüÓëÀàËÆ“dog.*”µÄģʽ½øÐÐÆ¥Åä¡£Ò»¸ö“direct”ÀàÐ͵Ľ»»»»ú»á½«Â·ÓɼüÓë“dogs”½øÐбȽϡ£Æ¥ÅäÄ©¶ËͨÅä·û±ÈÖ±½Ó±È½ÏÏûºÄ¸ü¶àµÄcpu,ËùÒÔÈç¹ûÓò»µ½“topic”ÀàÐͽ»»»»ú´øÀ´µÄÁé»îÐÔ£¬¾Íͨ¹ý“direct”ÀàÐͽ»»»»ú»ñµÃ¸ü¸ßµÄ´¦ÀíЧÂÊ¡£
1¡¢Direct½»»»»ú£º×ª·¢ÏûÏ¢µ½routingKeyÖ¸¶¨¶ÓÁУ¨ÍêȫƥÅ䣬µ¥²¥£©¡£
routingKeyÓë¶ÓÁÐÃûÍêȫƥÅ䣬Èç¹ûÒ»¸ö¶ÓÁа󶨵½½»»»»úÒªÇó·ÓɼüΪ¡°dog¡±£¬Ôòֻת·¢routingkey±ê¼ÇΪdogµÄÏûÏ¢£¬²»»áת·¢dog.puppy£¬Ò²²»»áת·¢dog.guardµÈ¡£ 
2¡¢Topic½»»»»ú£º°´¹æÔòת·¢ÏûÏ¢£¨×îÁé»î£¬×é²¥£©£º
TopicÀàÐͽ»»»»úͨ¹ýģʽƥÅä·ÖÅäÏûÏ¢µÄrouting-keyÊôÐÔ¡£½«Â·ÓɼüºÍij¸öģʽ½øÐÐÆ¥Å䣬´Ëʱ¶ÓÁÐÐèÒª°ó¶¨µ½Ò»¸öģʽÉÏ¡£
Ëü½«routing-keyºÍbinding-keyµÄ×Ö·û´®Çзֳɵ¥´Ê¡£ÕâЩµ¥´ÊÖ®¼äÓõã¸ô¿ª¡£ËüͬÑùÒ²»áʶ±ðÁ½¸öͨÅä·û£º·ûºÅ“#”ºÍ·ûºÅ“*”¡£#Æ¥Åä0¸ö»ò¶à¸öµ¥´Ê£¬*Æ¥Åä²»¶à²»ÉÙÒ»¸öµ¥´Ê¡£
ÀýÈ磬binding key:*.stock.#Æ¥Åärouting key: usd.stockºÍeur.stock.db£¬µ«ÊDz»Æ¥Åästock.nana¡£
ÀýÈ磬“audit.#”Äܹ»Æ¥Åäµ½“audit.irs.corporate”£¬µ«ÊÇ“audit.*”Ö»»áÆ¥Åäµ½“audit.irs”¡£

3¡¢Fanout½»»»»ú£º×ª·¢ÏûÏ¢µ½ËùÓа󶨶ÓÁУ¨×î¿ì£¬¹ã²¥£©
fanout½»»»»ú²»´¦Àí·Óɼü£¬¼òµ¥µÄ½«¶ÓÁа󶨵½½»»»»úÉÏ£¬Ã¿¸ö·¢Ë͵½½»»»»úµÄÏûÏ¢¶¼»á±»×ª·¢µ½Óë¸Ã½»»»»ú°ó¶¨µÄËùÓжÓÁÐÉÏ¡£
ºÜÏñ×ÓÍø¹ã²¥£¬Ã¿Ì¨×ÓÍøÄÚµÄÖ÷»ú¶¼»ñµÃÁËÒ»·Ý¸´ÖƵÄÏûÏ¢¡£Fanout½»»»»úת·¢ÏûÏ¢ÊÇ×î¿ìµÄ¡£

4¡¢Note
- Èç¹ûûÓжÓÁаó¶¨ÔÚ½»»»»úÉÏ£¬Ôò·¢Ë͵½¸Ã½»»»»úÉϵÄÏûÏ¢»á¶ªÊ§¡£
- Ò»¸ö½»»»»ú¿ÉÒ԰󶨶à¸ö¶ÓÁУ¬Ò»¸ö¶ÓÁпÉÒÔ±»¶à¸ö½»»»»ú°ó¶¨¡£
- »¹ÓÐһЩÆäËûÀàÐ͵Ľ»»»»úÀàÐÍ£¬Èçheader¡¢failover¡¢systemµÈ£¬ÏÖÔÚÔÚµ±Ç°µÄRabbitMQ°æ±¾ÖоùδʵÏÖ¡£
- ÒòΪ½»»»»úÊÇÃüÃûʵÌ壬ÉùÃ÷Ò»¸öÒѾ´æÔڵĽ»»»»ú£¬µ«ÊÇÊÔͼ¸³Ó費ͬÀàÐÍÊǻᵼÖ´íÎ󡣿ͻ§¶ËÐèҪɾ³ýÕâ¸öÒѾ´æÔڵĽ»»»»ú£¬È»ºóÖØÐÂÉùÃ÷²¢ÇÒ¸³ÓèеÄÀàÐÍ¡£
- ½»»»»úµÄÊôÐÔ£º
- ³Ö¾ÃÐÔ£ºÈç¹ûÆôÓ㬽»»»»ú½«»áÔÚserverÖØÆôǰ¶¼ÓÐЧ¡£
- ×Ô¶¯É¾³ý£ºÈç¹ûÆôÓã¬ÄÇô½»»»»ú½«»áÔÚÆä°ó¶¨µÄ¶ÓÁж¼±»É¾µôÖ®ºóɾ³ý×ÔÉí¡£
- ¶èÐÔ:Èç¹ûûÓÐÉùÃ÷½»»»»ú£¬ÄÇôÔÚÖ´Ðе½Ê¹ÓõÄʱºò»áµ¼ÖÂÒì³££¬²¢²»»áÖ÷¶¯ÉùÃ÷¡£
ËÄ¡¢¶ÓÁÐ
- ¶ÓÁеÄÊôÐÔ£º
- ³Ö¾ÃÐÔ£ºÈç¹ûÆôÓ㬶ÓÁн«ÔÚServer·þÎñÖØÆôǰ¶¼ÓÐЧ¡£
- ×Ô¶¯É¾³ý£ºÈç¹ûÆôÓã¬ÄÇô¶ÓÁн«»áÔÚËùÓеÄÏû·ÑÕßֹͣʹÓÃÖ®ºó×Ô¶¯É¾³ý×ÔÉí¡£
- ¶èÐÔ£ºÈç¹ûûÓÐÉùÃ÷¶ÓÁУ¬ÄÇôÔÚÖ´Ðе½Ê¹ÓõÄʱºò»áµ¼ÖÂÒì³££¬²¢²»»áÖ÷¶¯ÉùÃ÷¡£
- ÅÅËûÐÔ£ºÈç¹ûÆôÓ㬶ÓÁÐÖ»Äܱ»ÉùÃ÷ËüµÄÏû·ÑÕßʹÓᣡ¡¡¡
|