±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½éÉÜÁËÔõô±£Ö¤¿É¿¿ÐԵģ¿ÐèҪעÒâÄÄЩµØ·½
¡¢½áºÏDockerʹÓá¢ÏûÏ¢¶ÓÁÐÖмä¼þµÄ±È½Ï¡¢¼¸¸öÖØÒªµÄ¸ÅÄîµÈÄÚÈÝ¡£
À´×ÔÓÚMaxLeap ÍŶÓ_Service&Infra£¬,ÓÉ»ðÁú¹ûÈí¼þAnna±à¼¡¢ÍƼö¡£ |
|
Ôõô±£Ö¤¿É¿¿ÐԵģ¿
RabbitMQÌṩÁ˼¸ÖÖÌØÐÔ£¬ÎþÉüÁËÒ»µãÐÔÄÜ´ú¼Û£¬ÌṩÁ˿ɿ¿ÐԵı£Ö¤¡£
³Ö¾Ã»¯
µ±RabbitMQÍ˳öʱ£¬Ä¬ÈϻὫÏûÏ¢ºÍ¶ÓÁж¼Çå³ý£¬ËùÒÔÐèÒªÔÚµÚÒ»´ÎÉùÃ÷¶ÓÁкͷ¢ËÍÏûϢʱָ¶¨Æä³Ö¾Ã»¯ÊôÐÔΪtrue£¬ÕâÑùRabbitMQ»á½«¶ÓÁС¢ÏûÏ¢ºÍ״̬´æµ½RabbitMQ±¾µØµÄÊý¾Ý¿â£¬ÖØÆôºó»á»Ö¸´¡£
java:
durable=true
channel.queueDeclare ("task_queue",
durable, false, false, null); // ¶ÓÁÐ
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes()); // ÏûÏ¢
|
×¢£ºµ±ÉùÃ÷µÄ¶ÓÁÐÒѾ´æÔÚʱ£¬³¢ÊÔÖØÐ¶¨ÒåËüµÄdurableÊDz»ÉúЧµÄ¡£
½ÓÊÕÓ¦´ð
¿Í»§¶Ë½ÓÊÕÏûÏ¢µÄģʽĬÈÏÊÇ×Ô¶¯Ó¦´ð£¬µ«ÊÇͨ¹ýÉèÖÃautoAckΪfalse¿ÉÒÔÈÿͻ§¶ËÖ÷¶¯Ó¦´ðÏûÏ¢¡£µ±¿Í»§¶Ë¾Ü¾ø´ËÏûÏ¢»òÕßδӦ´ð±ã¶Ï¿ªÁ¬½Óʱ£¬¾Í»áʹµÃ´ËÏûÏ¢ÖØÐÂÈë¶Ó£¨ÔÚ°æ±¾2.7.0ÒÔǰÊǵ½ÖØÐ¼ÓÈëµ½¶Ó⣬2.7.0¼°ÒÔºóÊDZ£ÁôÏûÏ¢ÔÚ¶ÓÁÐÖеÄÔÀ´Î»Öã©¡£
java:
autoAck =
false;
requeue = true;
channel.basicConsume(queue, autoAck, callback);
channel.basicAck();//Ó¦´ð
channel.basicReject(deliveryTag, requeue); //
¾Ü¾ø
channel.basicRecover(requeue); // »Ö¸´ |
·¢ËÍÈ·ÈÏ
ĬÈÏÇé¿öÏ£¬·¢ËͶ˲»¹Ø×¢·¢³öÈ¥µÄÏûÏ¢ÊÇ·ñ±»Ïû·ÑµôÁË¡£¿ÉÉèÖÃchannelΪconfirmģʽ£¬ËùÓз¢Ë͵ÄÏûÏ¢¶¼»á±»È·ÈÏÒ»´Î£¬Óû§¿ÉÒÔ×ÔÐиù¾Ýserver·¢»ØµÄÈ·ÈÏÏûÏ¢²é¿´×´Ì¬¡£Ïêϸ½éÉܼû£ºconfirms
java:
channel.confirmSelect();
// ½øÈëconfirmģʽ
// do publish messages... ÿÌõÏûÏ¢¶¼»á±»±àºÅ£¬´Ó1¿ªÊ¼
channel.getNextPublishSeqNo() // ²é¿´ÏÂÒ»ÌõÒª·¢Ë͵ÄÏûÏ¢µÄÐòºÅ
channel.waitForConfirms(); // µÈ´ýËùÓÐÏûÏ¢·¢ËͲ¢È·ÈÏ |
ÊÂÎñ£ººÍconfirmģʽ²»ÄÜͬʱʹÓ㬶øÇÒ»á´øÀ´´óÁ¿µÄ¶àÓ࿪Ïú£¬µ¼ÖÂÍÌÍÂÁ¿Ï½µºÜ¶à£¬¹Ê¶ø²»ÍƼö¡£
java:
channel.txSelect();
try {
// do something...
channel.txCommit();
} catch (e){
channel.txRollback();
} |
<a name="ha" /> ÏûÏ¢¶ÓÁеĸ߿ÉÓã¨Ö÷±¸Ä£Ê½£©
Ïà±ÈÓÚ·ÓɺͰ󶨣¬¿ÉÒÔÊÓΪÊǹ²ÏíÓÚËùÓеĽڵãµÄ£¬ÏûÏ¢¶ÓÁÐĬÈÏÖ»´æÔÚÓÚµÚÒ»´ÎÉùÃ÷ËüµÄ½ÚµãÉÏ£¬ÕâÑùÒ»µ©Õâ¸ö½Úµã¹ÒÁË£¬Õâ¸ö¶ÓÁÐÖÐδ´¦ÀíµÄÏûÏ¢¾ÍûÓÐÁË¡£
ÐҺã¬RabbitMQÌṩÁ˽«Ëü±¸·Ýµ½ÆäËû½ÚµãµÄ»úÖÆ£¬ÈκÎʱºò¶¼ÓÐÒ»¸ömaster¸ºÔð´¦ÀíÇëÇ󣬯äËûslaves¸ºÔ𱸷ݣ¬µ±master¹Òµô£¬»á½«×îÔç´´½¨µÄÄǸöslaveÌáÉýΪmaster¡£
ÃüÁ
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'£ºÉèÖÃËùÓÐÒÔ'ha'¿ªÍ·µÄqueueÔÚËùÓнڵãÉÏÓµÓб¸·Ý¡£ÏêϸÓï·¨µãÕâÀ
Ò²¿ÉÒÔÔÚ½çÃæÉÏÅäÖá£
×¢£ºÓÉÓÚexclusiveÀàÐ͵ĶÓÁлáÔÚclientºÍserverÁ¬½Ó¶Ï¿ªÊ±±»É¾µô£¬ËùÒÔ¶ÔËüÉèÖó־û¯ÊôÐԺͱ¸·Ý¶¼ÊÇûÓÐÒâÒåµÄ¡£
˳Ðò±£Ö¤
Ö±½ÓÉÏͼºÃÁË£º

һЩÐèҪעÒâµÄµØ·½
¼¯ÈºÅäÖãº
Ò»¸ö¼¯ÈºÖжà¸ö½Úµã¹²ÏíÒ»·Ý.erlang.cookieÎļþ£»ÈôÊÇûÓÐÆôÓÃRABBITMQ_USE_LONGNAME£¬ÐèÒªÔÚÿ¸ö½ÚµãµÄhostsÎļþÖÐÖ¸¶¨ÆäËû½ÚµãµÄµØÖ·£¬²»È»»áÕÒ²»µ½ÆäËû¼¯ÈºÖеĽڵ㡣
<a name="cluster_partion" /> ÄÔÁÑ£¨ÍøÂç·ÖÇø£©£º
RabbitMQ¼¯Èº¶ÔÓÚÍøÂç·ÖÇøµÄ´¦ÀíºÍÈÌÊÜÄÜÁ¦²»Ì«ºÃ£¬ÍƼöʹÓÃfederation»òÕßshovel²å¼þÈ¥½â¾ö¡£
µ«ÊÇ£¬Çé¿öÒѾ·¢ÉúÁË£¬Ôõôȥ½â¾öÄØ£¿·ÅÐÄ£¬»¹ÊÇÓа취»Ö¸´µÄ¡£
µ±ÍøÂç¶Ï¶ÏÐøÐøÊ±£¬»áʹµÃ½ÚµãÖ®¼äµÄͨÐŶϵô£¬½ø¶øÔì³É¼¯Èº±»·Ö¸ô¿ªµÄÇé¿ö¡£
ÕâÑù£¬Ã¿¸öС¼¯ÈºÖ®ºó±ãÖ»´¦Àí¸÷×Ô±¾µØµÄÁ¬½ÓºÍÏûÏ¢£¬´Ó¶øµ¼ÖÂÊý¾Ý²»Í¬²½¡£µ±ÖØÐ»ָ´ÍøÂçÁ¬½Óʱ£¬ËüÃDZ˴˶¼ÈÏΪÊǶԷ½¹ÒÁË-_-||£¬±ã¿ÉÒÔÅжϳöÓÐÍøÂç·ÖÇø³öÏÖÁË¡£µ«ÊÇRabbitMQĬÈÏÊǺöÂÔµô²»´¦ÀíµÄ£¬Ôì³ÉÁ½¸ö½Úµã¼ÌÐø¸÷×ÔΪÕþ£¨Â·ÓÉ£¬°ó¶¨¹ØÏµ£¬¶ÓÁеȿÉÒÔ¶ÀÁ¢µØ´´½¨É¾³ý£¬ÉõÖÁÖ÷±¸¶ÓÁÐÒ²»áÿһ·½ÓµÓÐ×Ô¼ºµÄmaster£©¡£
¿ÉÒÔ¸ü¸ÄÅäÖÃʹµÃÁ¬½Ó»Ö¸´Ê±£¬»á¸ù¾ÝÅäÖÃ×Ô¶¯»Ö¸´£º
ignore£ºÄ¬ÈÏ£¬²»×öÈκδ¦Àí
pause-minority£º¶Ï¿ªÁ¬½Óʱ£¬Åжϵ±Ç°½ÚµãÊÇ·ñÊôÓÚÉÙÊýÅÉ£¨½ÚµãÊýÉÙÓÚ»òÕßµÈÓÚÒ»°ë£©£¬Èç¹ûÊÇ£¬ÔòÔÝÍ£Ö±µ½»Ö¸´Á¬½Ó¡£
{pause_if_all_down, [nodes], ignore | autoheal}£º¶Ï¿ªÁ¬½Óʱ£¬Åжϵ±Ç°¼¯ÈºÖнڵãÊÇ·ñÓнڵãÔÚnodesÖУ¬Èç¹ûÓУ¬Ôò¼ÌÐøÔËÐУ¬·ñÔòÔÝÍ£Ö±µ½»Ö¸´Á¬½Ó¡£ÕâÖÖ²ßÂÔÏ£¬µ±»Ö¸´Á¬½Óʱ£¬¿ÉÄÜ»áÓжà¸ö·ÖÇø´æ»î£¬ËùÒÔ£¬×îºóÒ»¸ö²ÎÊý¾ö¶¨ËüÃÇÔõôºÏ²¢¡£
autoheal£ºµ±»Ö¸´Á¬½Óʱ£¬Ñ¡Ôñ¿Í»§¶ËÁ¬½ÓÊý×î¶àµÄ½Úµã״̬ΪÖ÷£¬ÖØÆôÆäËû½Úµã¡£
ÅäÖ㺼¯ÈºÅäÖÃ
¶à´Îack
¿Í»§¶Ë¶à´ÎÓ¦´ðͬһÌõÏûÏ¢£¬»áʹµÃ¸Ã¿Í»§¶ËÊÕ²»µ½ºóÐøÏûÏ¢¡£
½áºÏDockerʹÓÃ
¼¯Èº°æ±¾µÄʵÏÖ£ºÏê¼ûÎÒ×Ô¼ºÐ´µÄÒ»¸öÀý×Ó rabbitmq-server-cluster
ÏûÏ¢¶ÓÁÐÖмä¼þµÄ±È½Ï
RabbitMQ£º
Óŵ㣺֧³ÖºÜ¶àÐÒéÈ磺AMQP£¬XMPP£¬STMP£¬STOMP£»Áé»îµÄ·ÓÉ£»³ÉÊìÎȶ¨µÄ¼¯Èº·½°¸£»¸ºÔؾùºâ£»Êý¾Ý³Ö¾Ã»¯µÈ¡£
ȱµã£ºËٶȽÏÂý£»±È½ÏÖØÁ¿¼¶£¬°²×°ÐèÒªÒÀÀµErlang»·¾³¡£
Redis£º
Óŵ㣺±È½ÏÇáÁ¿¼¶£¬Ò×ÉÏÊÖ
ȱµã£ºµ¥µãÎÊÌ⣬¹¦Äܵ¥Ò»
Kafka£º
Óŵ㣺¸ßÍÌÍ£»·Ö²¼Ê½£»¿ìËٳ־û¯£»¸ºÔؾùºâ£»ÇáÁ¿¼¶
ȱµã£º¼«¶ËÇé¿öÏ»ᶪÏûÏ¢
×îºó¸½Ò»ÕÅÍøÉϽØÈ¡µÄ²âÊÔ½á¹û: 
¸ü¶àÐÔÄܲÎÊý¼û
Èç¹ûÓÐÐËȤ¼òµ¥Á˽âÏÂRabbitMQµÄ¼òµ¥½éÉÜ£¬¿ÉÒÔ¼ÌÐøÍùÏ¿´¡«
¼ò½é
¼¸¸öÖØÒªµÄ¸ÅÄî
Virtual Host: °üº¬Èô¸É¸öExchangeºÍQueue£¬±íʾһ¸ö½Úµã£»
Exchange: ½ÓÊܿͻ§¶Ë·¢Ë͵ÄÏûÏ¢£¬²¢¸ù¾ÝBinding½«ÏûϢ·Óɸø·þÎñÆ÷ÖеĶÓÁУ¬Exchange·ÖΪdirect,
fanout, topicÈýÖÖ¡£
Binding: Á¬½ÓExchangeºÍQueue£¬°üº¬Â·ÓɹæÔò¡£
Queue: ÏûÏ¢¶ÓÁУ¬´æ´¢»¹Î´±»Ïû·ÑµÄÏûÏ¢¡£
Message: Header+Body
Channel: ͨµÀ£¬Ö´ÐÐAMQPµÄÃüÁһ¸öÁ¬½Ó¿É´´½¨¶à¸öͨµÀÒÔ½ÚÊ¡×ÊÔ´¡£
Client
RabbitMQ¹Ù·½ÊµÏÖÁ˺ܶàÈÈÃÅÓïÑԵĿͻ§¶Ë£¬¾Í²»Ò»Ò»ÁоÙÀ²£¬ÒÔjavaΪÀý£¬Ö±½Ó¿ªÊ¼ÕýÌ⣺
½¨Á¢Á¬½Ó£º
ConnectionFactory
factory = new ConnectionFactory();
factory.setHost("localhost"); |
¿ÉÒÔ¼ÓÉ϶ϿªÖØÊÔ»úÖÆ£º
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000); |
´´½¨Á¬½ÓºÍͨµÀ£º
Connection
connection = factory.newConnection();
Channel channel = connection.createChannel(); |
Ò»¶ÔÒ»£ºÒ»¸öÉú²úÕߣ¬Ò»¸öÏû·ÑÕß

Éú²úÕߣº
channel.queueDeclare
(QUEUE_NAME, false, false, false, null);
channel.basicPublish ("", QUEUE_NAME,
null, message.getBytes()); |
Ïû·ÑÕߣº
Consumer
consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery (String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '"
+ message + "'");
}
};
channel.basicConsume (QUEUE_NAME, autoAck, consumer); |
Ò»¶Ô¶à£ºÒ»¸öÉú²úÕߣ¬¶à¸öÏû·ÑÕß 
´úÂëͬÉÏ£¬Ö»²»¹ý»áÓжà¸öÏû·ÑÕߣ¬ÏûÏ¢»áÂÖÐò·¢¸ø¸÷¸öÏû·ÑÕß¡£
Èç¹ûÉèÖÃÁËautoAck=false£¬ÄÇô¿ÉÒÔʵÏÖ¹«Æ½·Ö·¢£¨¼´¶ÔÓÚij¸öÌØ¶¨µÄÏû·ÑÕߣ¬Ã¿´Î×î¶àÖ»·¢ËÍÖ¸¶¨ÌõÊýµÄÏûÏ¢£¬Ö±µ½ÆäÖÐÒ»ÌõÏûÏ¢Ó¦´ðºó£¬ÔÙ·¢ËÍÏÂÒ»Ìõ£©¡£ÐèÒªÔÚÏû·ÑÕßÖмÓÉÏ:
int prefetchCount
= 1;
channel.basicQos(prefetchCount); |
ÆäËûͬÉÏ¡£
¹ã²¥

broadcast
Éú²úÕߣº
channel.exchangeDeclare
(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind (queueName, EXCHANGE_NAME,
"");
channel.basicPublish (EXCHANGE_NAME, "",
null, message.getBytes());; |
Ïû·ÑÕßͬÉÏ¡£
Routing: Ö¸¶¨Â·ÓɹæÔò

routing
Éú²úÕߣº
String queueName
= channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME,
routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey,
null, message.getBytes()); |
Ïû·ÑÕßͬÉÏ¡£
Topics: Ö§³ÖͨÅä·ûµÄRouting

topics
*¿ÉÒÔ±íʾһ¸öµ¥´Ê
#¿ÉÒÔ±íʾһ¸ö»ò¶à¸öµ¥´Ê
Éú²úÕߣº
channel.exchangeDeclare (EXCHANGE_NAME,
"topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind (queueName, EXCHANGE_NAME,
bindingKey); |
Ïû·ÑÕßͬÉÏ¡£
RPC

rpc
Æäʵ¾ÍÊÇÒ»¶ÔһģʽµÄÒ»ÖÖÓ÷¨£º
Ê×ÏÈ£¬¿Í»§¶Ë·¢ËÍÒ»ÌõÏûÏ¢µ½·þÎñ¶ËÉùÃ÷µÄ¶ÓÁУ¬ÏûÏ¢ÊôÐÔÖаüº¬reply_toºÍcorrelation_id
- reply_to Êǿͻ§¶Ë´´½¨µÄÏûÏ¢µÄ¶ÓÁУ¬ÓÃÀ´½ÓÊÕÔ¶³Ìµ÷Óýá¹û
- correlation_id ÊÇÏûÏ¢µÄ±êʶ£¬·þÎñ¶Ë»ØÓ¦µÄÏûÏ¢ÊôÐÔÖÐ»á´øÉÏÒÔ±ãÖªµÀÊÇÄÄÌõÏûÏ¢µÄ½á¹û¡£
È»ºó£¬·þÎñ¶Ë½ÓÊÕµ½ÏûÏ¢£¬´¦Àí£¬²¢·µ»ØÒ»Ìõ½á¹ûµ½reply_to¶ÓÁÐÖУ¬
×îÖÕ£¬¿Í»§¶Ë½ÓÊÕµ½·µ»ØÏûÏ¢£¬¼ÌÐøÏòÏ´¦Àí¡£
Server
Ö§³Ö¸÷´óÖ÷Á÷²Ù×÷ϵͳ£¬ÕâÀïÒÔUnixΪÀý½éÉÜϳ£ÓÃÅäÖúÍÃüÁ
°²×°
ÓÉÓÚRabbitMQÊÇÒÀÀµÓÚErlangµÄ£¬ËùÒÔµÃÊ×ÏȰ²×°×î½ü°æ±¾µÄErlang¡£
µ¥µãµÄ°²×°±È½Ï¼òµ¥£¬ÏÂÔØ½âѹ¼´¿É¡£ÏÂÔØµØÖ·
ÅäÖ㺣¨Ò»°ãµÄ£¬ÓÃĬÈϵļ´¿É¡££©
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf: »·¾³±äÁ¿Ä¬ÈÏÅäÖã¨Ò²¿ÉÔÚÆô¶¯½Å±¾ÖÐÉèÖã¬ÇÒÒÔÆô¶¯ÃüÁîÖеÄÅäÖÃΪ׼£©¡£³£ÓõÄÓУº
RABBITMQ_NODENAME£º½ÚµãÃû³Æ£¬Ä¬ÈÏÊÇrabbit@$HOSTNAME¡£
RABBITMQ_NODE_PORT£ºÐÒé¶Ë¿ÚºÅ£¬Ä¬ÈÏ5672¡£
RABBITMQ_SERVER_START_ARGS£º¸²¸Çrabbitmq.configÖеÄһЩÅäÖá£
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config: ºËÐÄ×é¼þ£¬²å¼þ£¬erlang·þÎñµÈÅäÖ㬳£ÓõÄÓУº
disk_free_limit£º¶ÓÁг־û¯µÈÐÅÏ¢¶¼ÊÇ´æµ½RabbitMQ±¾µØµÄÊý¾Ý¿âÖеģ¬Ä¬ÈÏÏÞÖÆ50000000£¨Ò²¾ÍÊÇ×î¶àÖ»ÈÃËüʹÓÃ50M¿Õ¼äÀ²£¬²»¹»¿ÉÒÔÉϵ÷£¬Ò²Ö§³Ö¿ÕÏпռä°Ù·Ö±ÈµÄÅäÖã©¡£ÒªÊdz¬±êÁË£¬Ëü¾Í°Õ¹¤ÁË¡¡
vm_memory_high_watermark£ºÄÚ´æÊ¹Óã¬Ä¬ÈÏ0.4£¨×î¶àÈÃËüʹÓÃ40%µÄÄڴ棬³¬±ê°Õ¹¤£©
×¢£ºÈôÆô¶¯Ê§°ÜÁË£¬¿ÉÒÔÔÚÆô¶¯ÈÕÖ¾Öв鿴µ½¾ßÌåµÄ´íÎóÐÅÏ¢¡£
ÃüÁ
$RABBITMQ_HOME/sbin/rabbitmq-server£ºÆô¶¯½Å±¾£¬»á´òÓ¡³öÅäÖÃÎļþ£¬²å¼þ£¬¼¯ÈºµÈÐÅÏ¢£»¼ÓÉÏ-detachedΪºǫ́Æô¶¯£»
/sbin/rabbitmqctl status£º²é¿´Æô¶¯×´Ì¬
/sbin/rabbitmqctl add_user admin admin£ºÌí¼ÓÐÂÓû§admin£¬ÃÜÂëadmin£»Ä¬ÈÏÖ»ÓÐÒ»¸öguestÓû§£¬µ«Ö»ÏÞ±¾»ú·ÃÎÊ¡£
/sbin/rabbitmqctl set_user_tags admin administrator£º½«adminÉèÖÃΪ¹ÜÀíԱȨÏÞ
/sbin/rabbitmqctl set_permissions -p / admin ".*"
".*" ".*" ¸³ÓèadminËùÓÐȨÏÞ
/sbin/rabbitmqctl stop£º¹Ø±Õ
¼¯Èº
¼¯Èº½Úµã¹²ÏíËùÓеÄ״̬ºÍÊý¾Ý£¬È磺Óû§¡¢Â·ÓÉ¡¢°ó¶¨µÈÐÅÏ¢£¨¶ÓÁÐÓеãÌØÊ⣬ËäÈ»´ÓËùÓнڵ㶼¿É´ï£¬µ«ÊÇÖ»´æÔÚÓÚµÚÒ»´ÎÉùÃ÷ËüµÄÄǸö½ÚµãÉÏ£¬½â¾ö·½°¸£ºÏûÏ¢¶ÓÁеĸ߿ÉÓã©£»Ã¿¸ö½Úµã¶¼¿ÉÒÔ½ÓÊÕÁ¬½Ó£¬´¦ÀíÊý¾Ý¡£
¼¯Èº½ÚµãÓÐÁ½ÖÖ£¬disc£ºÄ¬ÈÏ£¬ÐÅÏ¢´æÔÚ±¾µØÊý¾Ý¿â£»ram£º¼ÓÈ뼯Ⱥʱ£¬Ìí¼Ó--ram²ÎÊý£¬ÐÅÏ¢´æÔÚÄڴ棬¿ÉÌá¸ßÐÔÄÜ¡£
ÅäÖ㺣¨Ò»°ãµÄ£¬ÓÃĬÈϵļ´¿É¡££©
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf£º
RABBITMQ_USE_LONGNAME£ºÄ¬ÈÏfalse£¬(ĬÈϵģ¬RABBITMQ_NODENAMEÖÐ@ºóÃæµÄ$HOSTNAMEÊÇÖ÷»úÃû£¬ËùÒÔÐèÒª¼¯ÈºÖÐÿ¸ö½ÚµãµÄhostsÎļþ°üº¬ÆäËû½ÚµãÖ÷»úÃûµ½µØÖ·µÄÓ³Éä¡£µ«ÊÇÈç¹ûÉèÖÃΪtrue£¬¾Í¿ÉÒÔ¶¨ÒåRABBITMQ_NODENAMEÖеÄ$HOSTNAMEΪÓòÃûÁË£©
RABBITMQ_DIST_PORT£º¼¯Èº¶Ë¿ÚºÅ£¬Ä¬ÈÏRABBITMQ_NODE_PORT + 20000
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config£º
cluster_nodes£ºÉèÖúó£¬ÔÚÆô¶¯Ê±»á³¢ÊÔ×Ô¶¯Á¬½Ó¼ÓÈëµÄ½Úµã²¢×é³É¼¯Èº¡£
cluster_partition_handling£ºÍøÂç·ÖÇøµÄ´¦Àí¡£
¸ü¶àÏêϸµÄÅäÖüû£ºÅäÖÃ
ÃüÁî
rabbitmqctl stop_app
rabbitmqctl join_cluster [--ram] nodename@hostname£º½«µ±Ç°½Úµã¼ÓÈëµ½¼¯ÈºÖУ»Ä¬ÈÏÊÇÒÔdisc½Úµã¼ÓÈ뼯Ⱥ£¬¼ÓÉÏ--ramΪram½Úµã¡£
rabbitmqctl start_app
rabbitmqctl cluster_status£º²é¿´¼¯Èº×´Ì¬
×¢£ºÈç¹û¼ÓÈ뼯Ⱥʧ°Ü£¬¿ÉÏȲ鿴
ÿ¸ö½ÚµãµÄ$HOME/.erlang.cookieÄÚÈÝÒ»Ö£»
Èç¹ûhostnameÊÇÖ÷»úÃû£¬ÄÇô´ËhostnameºÍµØÖ·µÄÓ³ÉäÐèÒª¼ÓÈëhostsÎļþÖУ»
Èç¹ûʹÓõÄÊÇÓòÃû£¬ÄÇôÐèÒªÉèÖÃRABBITMQ_USE_LONGNAMEΪtrue¡£
×¢£ºdocker°æ¼¯ÈºµÄ¼û£ºrabbitmq-server-cluster
¸ß¼¶
AMQPÐÒé¼ò½é
RabbitMQÔÉúÖ§³ÖAMQP 0-9-1²¢À©Õ¹ÊµÏÖÁËÁËһЩ³£ÓõŦÄÜ£ºAMQP 0-9-1
°üº¬Èý²ã£º
Ä£ÐͲã: ×î¸ß²ã£¬ÌṩÁ˿ͻ§¶Ëµ÷ÓõÄÃüÁÈ磺queue.declare,basic.ack,consumeµÈ¡£
»á»°²ã£º½«ÃüÁî´Ó¿Í»§¶Ë´«µÝ¸ø·þÎñÆ÷£¬ÔÙ½«·þÎñÆ÷µÄÓ¦´ð´«µÝ¸ø¿Í»§¶Ë£¬»á»°²ãΪÕâ¸ö´«µÝ¹ý³ÌÌṩ¿É¿¿ÐÔ¡¢Í¬²½»úÖÆºÍ´íÎó´¦Àí¡£
´«Êä²ã£ºÖ÷Òª´«Êä¶þ½øÖÆÊý¾ÝÁ÷£¬Ìṩ֡µÄ´¦Àí¡¢ÐŵÀ¸´ÓᢴíÎó¼ì²âºÍÊý¾Ý±íʾ¡£

×¢£ºÆäËûÐÒéµÄÖ§³Ö¼û£ºRabbitMQÖ§³ÖµÄÐÒé
³£Óòå¼þ
¹ÜÀí½çÃæ£¨ÉñÆ÷£©
Æô¶¯ºó£¬Ö´ÐÐrabbitmq-plugins enable rabbitmq_management->
·ÃÎÊhttp://localhost:15672->²é¿´½Úµã״̬£¬¶ÓÁÐÐÅÏ¢µÈµÈ£¬ÉõÖÁ¿ÉÒÔ¶¯Ì¬ÅäÖÃÏûÏ¢¶ÓÁеÄÖ÷±¸²ßÂÔ£¬ÈçÏÂͼ£º

management
Federation
ÆôÓÃFederation²å¼þ£¬Ê¹µÃ²»Í¬¼¯ÈºµÄ½ÚµãÖ®¼ä¿ÉÒÔ´«µÝÏûÏ¢£¬´Ó¶øÄ£Äâ³öÀàËÆ¼¯ÈºµÄЧ¹û¡£ÕâÑù¿ÉÒÔÓм¸µãºÃ´¦£º
ËÉñîºÏ£ºÁªºÏÔÚÒ»ÆðµÄ²»Í¬¼¯Èº¿ÉÒÔÓи÷×ÔµÄÓû§£¬È¨ÏÞµÈÐÅÏ¢£¬ÎÞÐèÒ»Ö£»´ËÍ⣬ÕâЩ¼¯ÈºµÄRabbitMQºÍErlangµÄ°æ±¾¿ÉÒÔ²»Ò»Ö¡£
Ô¶³ÌÍøÂçÁ¬½ÓÓѺãºÓÉÓÚͨÐÅÊÇ×ñÑAMQPÐÒéµÄ£¬¹Ê¶ø¶Ô¶Ï¶ÏÐøÐøµÄÍøÂçÁ¬½ÓÈÝÈ̶ȸߡ£
×Ô¶¨Ò壺¿ÉÒÔ×ÔÖ÷Ñ¡ÔñÄÄЩ×é¼þÆôÓÃfederation¡£
¼¸¸ö¸ÅÄ
Upstreams: ¶¨ÒåÉÏÓνڵãÐÅÏ¢£¬È磺
rabbitmqctl set_parameter federation-upstream my-upstream
'{"uri":"amqp://server-name","expires":3600000}'
¶¨ÒåÒ»¸ömy-upstream
uriÊÇÆäÉÏÓνڵãµÄµØÖ·£¬¶à¸öupstreamµÄ½ÚµãÎÞÐèÔÚͬһ¼¯ÈºÖС£
expires±íʾ¶Ï¿ªÁ¬½Ó3600000msºóÆäÉÏÓνڵã»á»º´æÏûÏ¢¡£
Upstream sets: ¶à¸öUpstreamµÄ¼¯ºÏ£»Ä¬ÈÏÓиöall£¬»á½«ËùÓеÄUpstream¼Ó½øÈ¥¡£
Policies: ¶¨ÒåÄÄЩexchanges,queues¹ØÁªµ½ÄĸöUpstream»òÕßUpstream
set£¬È磺
rabbitmqctl set_policy --apply-to exchanges federate-me
"^amq\." '{"federation-upstream-set":"all"}'
½«´Ë½ÚµãËùÓÐÒÔamq.¿ªÍ·µÄexchangeÁªºÏµ½ÉÏÓνڵãµÄͬÃûexchange¡£
×¢£º
ÓÉÓÚÏÂÓνڵãµÄexchange¿ÉÒÔ¼ÌÐø×÷ΪÆäËû½ÚµãµÄÉÏÓΣ¬¹Ê¿ÉÉèÖóÉÑ»·£¬¹ã²¥µÈÐÎʽ¡£
ͨ¹ýmax_hops²ÎÊý¿ØÖÆ´«µÝ²ãÊý¡£
Ä£Ä⼯Ⱥ£¬¿ÉÒÔ½«¶à¸ö½ÚµãÁ½Á½»¥Á¬£¬²¢ÉèÖÃmax_hops=1¡£
federated_cluster
federated_broadcast
rabbitmq-plugins enable rabbitmq_federation
Èç¹ûÆôÓÃÁ˹ÜÀí½çÃæ£¬¿ÉÒÔÌí¼Ó£º
rabbitmq-plugins enable rabbitmq_federation_management
ÕâÑù¾Í¿ÉÒÔÔÚ½çÃæÅäÖÃUpstreamºÍPolicyÁË¡£
×¢£ºÈç¹ûÔÚÒ»¸ö¼¯ÈºÖÐʹÓÃfederation£¬ÐèÒª¸Ã¼¯ÈºÃ¿¸ö½Úµã¶¼ÆôÓÃFederation²å¼þ
×¢£º¸ü¶à²å¼þÇë¼û£º²å¼þ
|