Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
ActiveMQÏûÏ¢µÄÏû·ÑÔ­Àí
 
  3374  次浏览      27
 2019-6-12 
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚcnblogs£¬ÎÄÕÂÖ÷Òª°¸ÀýΪÖ÷À´½éÉÜActiveMQÏûÏ¢µÄÏû·ÑÔ­Àí£¬´Ó·½·¨µ½¹ý³ÌÒÔ¼°·½°¸µÈ·½ÃæÏêϸ½éÉÜ¡£

Ïû·Ñ¶ËÏû·ÑÏûÏ¢£º

ÕâÀï˵ÁËÁ½ÖÖ·½·¨£¬Á½ÖÖ·½·¨¿ÉÒÔ½ÓÊÕÏûÏ¢£¬Ò»ÖÖÊÇʹÓÃͬ²½×èÈûµÄActiveMQMessageConsumer#receive·½·¨¡£ÁíÒ»ÖÖÊÇʹÓÃÏûÏ¢¼àÌýÆ÷MessageListener¡£ÕâÀïÐèҪעÒâµÄÊÇ£¬ÔÚͬһ¸ösessionÏ£¬ÕâÁ½Õß²»ÄÜͬʱ¹¤×÷£¬Ò²¾ÍÊÇ˵²»ÄÜÕë¶Ô²»Í¬ÏûÏ¢²ÉÓò»Í¬µÄ½ÓÊÕ·½Ê½¡£·ñÔò»áÅ׳öÒì³£¡£ÖÁÓÚΪʲôÕâô×ö£¬×î´óµÄÔ­Òò»¹ÊÇÔÚÊÂÎñÐԻỰÖУ¬Á½ÖÖÏû·ÑģʽµÄÊÂÎñ²»ºÃ¹Ü¿Ø¡£

ÏÈͨ¹ýActiveMQMessageConsumer#receive ·½·¨À´¶ÔÏûÏ¢µÄ½ÓÊÜһ̽¾¿¾¹£º

public Message receive() throws JMSException {
checkClosed();
//¼ì²éreceiveºÍMessageListenerÊÇ·ñͬʱÅäÖÃÔÚµ±Ç°µÄ»á»°ÖУ¬ÓÐÔòÅ׳öÒì³£
checkMessageListener();
//Èç¹ûPrefetchSizeSizeΪ0²¢ÇÒunconsumerMessageΪ¿Õ£¬Ôò·¢ÆðpullÃüÁî
sendPullCommand(0);
MessageDispatch md = dequeue(-1);//³öÁУ¬»ñÈ¡ÏûÏ¢
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
//·¢ËÍack¸øµ½broker
afterMessageIsConsumed(md, false);
//»ñÈ¡ÏûÏ¢²¢·µ»Ø
return createActiveMQMessage(md);
}

 

ÏÂÃæ¼òµ¥µÄ˵һÏÂÒÔÉϼ¸¸öºËÐÄ·½·¨ÖÐ×öÁËʲô²»ÎªÈËÖªµÄÊ£º

sendPullCommand(0) £º·¢ËÍpullÃüÁî´ÓbrokerÉÏ»ñÈ¡ÏûÏ¢£¬Ç°ÌáÊÇprefetchSize=0²¢ÇÒunconsumedMessagesΪ¿Õ¡£unconsumedMessage±íʾδÏû·ÑµÄÏûÏ¢£¬ÕâÀïÃæÔ¤¶ÁÈ¡µÄÏûÏ¢´óСΪprefetchSizeµÄÖµ

protected void sendPullCommand(long timeout) throws JMSException {
clearDeliveredList();
if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
messagePull.setTimeout(timeout);
//Ïò·þÎñ¶ËÒì²½·¢ËÍmessagePullÖ¸Áî
session.asyncSendPacket(messagePull);
}
}

ÕâÀï·¢ËÍÒì²½ÏûÏ¢¸úÏûÏ¢Éú²úµÄÔ­ÀíÊÇÒ»ÑùµÄ¡£Í¨¹ý°ü×°Á´È¥µ÷Óà Sokect ·¢ËÍÇëÇó¡£

clearDeliveredList()£º

ÔÚÉÏÃæµÄsendPullCommand·½·¨ÖУ¬»áÏȵ÷ÓÃclearDeliveredList·½·¨£¬Ö÷ÒªÓÃÀ´ÇåÀíÒѾ­·Ö·¢µÄÏûÏ¢Á´±ídeliveredMessagesdeliveredMessages£¬´æ´¢·Ö·¢¸øÏû·ÑÕßµ«»¹ÎªÓ¦´ðµÄÏûÏ¢Á´±í

Èç¹ûsessionÊÇÊÂÎñµÄ£¬Ôò»á±éÀúdeliveredMessageÖеÄÏûÏ¢·ÅÈëµ½previouslyDeliveredMessageÖÐÀ´×öÖØ·¢

Èç¹ûsessionÊÇ·ÇÊÂÎñµÄ£¬¸ù¾ÝACKµÄģʽÀ´Ñ¡Ôñ²»Í¬µÄÓ¦´ð²Ù×÷

ÕâÊǸöͬ²½µÄ¹ý³Ì£º

private void clearDeliveredList() {
if (clearDeliveredList) {//ÅжÏÊÇ·ñÇå³þ
synchronized (deliveredMessages) {//²ÉÓÃË«ÖØ¼ì²éËø
if (clearDeliveredList) {
if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) {//ÊÇÊÂÎñÏûÏ¢
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext ().getTransactionId());
}
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put (delivered.getMessage().getMessageId(), false);
}
LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
} else {
if (session.isClientAcknowledge()) {
LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
// allow redelivery
if (!this.info.isBrowser()) {
for (MessageDispatch md: deliveredMessages) {
this.session.connection.rollbackDuplicate (this, md.getMessage());
}
}
}
LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
deliveredMessages.clear();
pendingAck = null;
}
}
clearDeliveredList = false;
}
}
}
}

dequeue(-1) £º´ÓunconsumedMessageÖÐÈ¡³öÒ»¸öÏûÏ¢£¬ÔÚ´´½¨Ò»¸öÏû·ÑÕßʱ£¬¾Í»áΪÕâ¸öÏû·ÑÕß´´½¨Ò»¸öδÏû·ÑµÄÏûϢͨµÀ£¬Õâ¸öͨµÀ·ÖΪÁ½ÖÖ£¬Ò»ÖÖÊǼòµ¥ÓÅÏȼ¶¶ÓÁзַ¢Í¨µÀSimplePriorityMessageDispatchChannel £»ÁíÒ»ÖÖÊÇÏȽøÏȳöµÄ·Ö·¢Í¨µÀFifoMessageDispatchChannel.ÖÁÓÚΪʲôҪ´æÔÚÕâÑùÒ»¸öÏûÏ¢·Ö·¢Í¨µÀ£¬´ó¼Ò¿ÉÒÔÏëÏóһϣ¬Èç¹ûÏû·ÑÕßÿ´ÎÈ¥Ïû·ÑÍêÒ»¸öÏûÏ¢ÒÔºóÔÙÈ¥brokerÄÃÒ»¸öÏûÏ¢£¬Ð§ÂÊÊDZȽϵ͵ġ£ËùÒÔͨ¹ýÕâÑùµÄÉè¼Æ¿ÉÒÔÔÊÐísessionÄܹ»Ò»´ÎÐÔ½«¶àÌõÏûÏ¢·Ö·¢¸øÒ»¸öÏû·ÑÕß¡£Ä¬ÈÏÇé¿ö϶ÔÓÚqueueÀ´Ëµ£¬prefetchSizeµÄÖµÊÇ1000

private MessageDispatch dequeue (long timeout) throws JMSException {
try {
long deadline = 0;
if (timeout > 0) {
deadline = System.currentTimeMillis() + timeout;
}
while (true) {//protected final MessageDispatchChannel unconsumedMessages;
MessageDispatch md = unconsumedMessages.dequeue(timeout);

...........
}

 

beforeMessageIsConsumed(md)£ºÕâÀïÃæÖ÷ÒªÊÇ×öÏûÏ¢Ïû·Ñ֮ǰµÄһЩ׼±¸¹¤×÷£¬Èç¹ûACKÀàÐͲ»ÊÇDUPS_OK_ACKNOWLEDGE»òÕß¶ÓÁÐģʽ£¨¼òµ¥À´Ëµ¾ÍÊdzýÁËTopicºÍDupAckÕâÁ½ÖÖÇé¿ö£©£¬ËùÓеÄÏûÏ¢Ïȷŵ½deliveredMessagesÁ´±íµÄ¿ªÍ·¡£²¢ÇÒÈç¹ûµ±Ç°ÊÇÊÂÎñÀàÐ͵ĻỰ£¬ÔòÅжÏtransactedIndividualAck£¬Èç¹ûΪtrue£¬±íʾµ¥ÌõÏûÏ¢Ö±½Ó·µ»Øack¡£

·ñÔò£¬µ÷ÓÃackLater£¬ÅúÁ¿Ó¦´ð, client¶ËÔÚÏû·ÑÏûÏ¢ºóÔÝÇÒ²»·¢ËÍACK£¬¶øÊǰÑËü»º´æÏÂÀ´(pendingACK)£¬µÈµ½ÕâЩÏûÏ¢µÄÌõÊý´ïµ½Ò»¶¨·§ÖµÊ±£¬Ö»ÐèҪͨ¹ýÒ»¸öACKÖ¸Áî°ÑËüÃÇÈ«²¿È·ÈÏ£»Õâ±È¶ÔÿÌõÏûÏ¢¶¼Öð¸öÈ·ÈÏ£¬ÔÚÐÔÄÜÉÏÒªÌá¸ßºÜ¶à¡£

private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}¡¡

 

afterMessageIsConsumed£ºÕâ¸ö·½·¨µÄÖ÷Òª×÷ÓÃÊÇÖ´ÐÐÓ¦´ð²Ù×÷£¬ÕâÀïÃæ×öÒÔϼ¸¸ö²Ù×÷

 

1.Èç¹ûÏûÏ¢¹ýÆÚ£¬Ôò·µ»ØÏûÏ¢¹ýÆÚµÄack

2. Èç¹ûÊÇÊÂÎñÀàÐ͵ĻỰ£¬Ôò²»×öÈκδ¦Àí

3. Èç¹ûÊÇAUTOACK»òÕߣ¨DUPS_OK_ACKÇÒÊǶÓÁУ©£¬²¢ÇÒÊÇÓÅ»¯ack²Ù×÷£¬Ôò×ßÅúÁ¿È·ÈÏack

4.Èç¹ûÊÇDUPS_OK_ACK£¬Ôò×ßackLaterÂß¼­

5.Èç¹ûÊÇCLIENT_ACK£¬ÔòÖ´ÐÐackLater

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;

// AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge() ||session.isIndividualAcknowledge()) {
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater (md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid session state.");
}
}
}¡¡

ÆäʵÔÚÒÔÉÏÏûÏ¢µÄ½ÓÊÕ¹ý³ÌÖУ¬ÎÒÃǽö½öÄÜ¿´µ½Õâ¸öÏûÏ¢´ÓÒ»¸ö±¾µØ±äÁ¿Öгö¶Ó£¬²¢Ã»ÓжÔÔ¶³ÌÏûÏ¢ÖÐÐÄ·¢ËÍͨѶ»ñÈ¡£¬ÄÇôÕâ¸öÏûϢʱʲôʱºò¹ýÀ´µÄÄØ£¿Ò²¾ÍÊÇÏûÏ¢³ö¶ÓÖÐ unconsumedMessages Õâ¸ö¶«¶«Ê±Ê²Ã´Ê±ºò³õʼ»¯µÄÄØ £¿ÄÇô½ÓÏÂÈ¥ÎÒÃÇÓ¦¸Ãȥͨ¹ý´´½¨Á¬½ÓµÄʱºòÈ¥¿´¿´ÁË£¬¾ßÌåÁ¬½ÓµÄʱºò¶¼×öÁËÊ²Ã´ÄØ£ºconnectionFactory.createConnection()

private void afterMessageIsConsumed (MessageDispatch md, boolean messageExpired) throws JMSException {
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {// ¹ûÈ»·¢ÏÖÁËÕâ¸ö¶«¶«µÄ³õʼ»¯
Transport transport = createTransport();
// ´´½¨Á¬½Ó
connection = createActiveMQConnection(transport, factoryStats);
// ÉèÖÃÓû§ÃÜÂë
connection.setUserName(userName);
connection.setPassword(password);
// ¶ÔÁ¬½Ó×ö°ü×°
configureConnection(connection);
// Æô¶¯Ò»¸öºǫ́´«ÊäÏß³Ì
transport.start();
// ÉèÖÿͻ§¶ËÏû·ÑµÄid
if (clientID != null) {
connection.setDefaultClientID(clientID);
}

return connection;
} ......
}

´´½¨Á¬½ÓµÄ¹ý³Ì¾ÍÊÇ´´½¨³ýÁËÒ»¸ö´øÓÐÁ´Â·°ü×°µÄTcpTransport ²¢ÇÒ´´½¨Á¬½Ó£¬×îºóÆô¶¯Ò»¸ö´«ÊäỊ̈߳¬¶øÕâÀïµÄ transport.start() µ÷ÓõÄÓ¦¸ÃÊÇTcpTransport ÀïÃæµÄ·½·¨£¬È»¶øÕâ¸öÀàÖв¢Ã»ÓÐ start£¬¶øÊÇÔÚ¸¸Àà

ServiceSupport.start()ÖУº

public void start() throws Exception {
if (started.compareAndSet(false, true)) {
boolean success = false;
stopped.set(false);
try {
preStart();//һЩ³õʼ»¯
doStart();
success = true;
} finally {
started.set(success);
}
for(ServiceListener l:this.serviceListeners) {
l.started(this);
}
}
}

doStart ·½·¨Ç°×öÁËһϵÁеijõʼ»¯£¬È»ºóµ÷Óà TcpTransportµÄdoStart() ·½·¨£º

protected void doStart() throws Exception {
connect();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}

 

¼Ì¶ø¹¹½¨Ò»¸öÁ¬½Ó ÉèÖÃÒ»¸ö CountDownLatch ÃÅãÅ £¬µ÷Óø¸Àà TransportThreadSupport µÄ·½·¨£¬Ð½¨ÁËÒ»¸ö¾«ÁéÏ̲߳¢ÇÒÆô¶¯£º

protected void doStart() throws Exception {
runner = new Thread(null, this, "ActiveMQ Transport: " + toString(), stackSize);
runner.setDaemon(daemon);
runner.start();
}

µ÷ÓÃTransportThreadSupport.doStart(). ´´½¨ÁËÒ»¸öỊ̈߳¬´«ÈëµÄÊÇ this£¬µ÷ÓÃ×ÓÀàµÄ run ·½·¨£¬Ò²¾ÍÊÇ TcpTransport.run().

public void run() {
LOG.trace("TCP consumer thread for " + this + " starting");
this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
doRun();
}
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
} catch (Throwable e){
stoppedLatch.get().countDown();
IOException ioe=new IOException("Unexpected error occurred: " + e);
ioe.initCause(e);
onException(ioe);
}finally {
stoppedLatch.get().countDown();
}
}

run ·½·¨Ö÷ÒªÊÇ´Ó socket ÖжÁÈ¡Êý¾Ý°ü£¬Ö»Òª TcpTransport ûÓÐÍ£Ö¹£¬Ëü¾Í»á²»¶ÏÈ¥µ÷Óà doRun£ºÕâÀïÃæ£¬Í¨¹ý wireFormat ¶ÔÊý¾Ý½øÐиñʽ»¯£¬¿ÉÒÔÈÏΪÕâÊÇÒ»¸ö·´ÐòÁл¯¹ý³Ì¡£wireFormat ĬÈÏʵÏÖÊÇ OpenWireFormat£¬activeMQ ×Ô¶¨ÒåµÄ¿çÓïÑÔµÄwire ЭÒé

protected void doRun() throws IOException {
try {//ͨ¹ý readCommand È¥¶ÁÈ¡Êý¾Ý
Object command = readCommand();
//Ïû·ÑÏûÏ¢
doConsume(command);
} catch (SocketTimeoutException e) {
} catch (InterruptedIOException e) {
}
}
protected Object readCommand() throws IOException {
return wireFormat.unmarshal(dataIn);
}

doConsume£ºÁ÷³Ì×ßµ½ÁËÏû·ÑÏûÏ¢£º

public void doConsume(Object command) {
if (command != null) {//±íʾÒѾ­Äõ½ÁËÏûÏ¢
if (transportListener != null) {
transportListener.onCommand(command);
} else {
LOG.error("No transportListener available to process inbound command: " + command);
}
}
}

TransportSupport ÀàÖÐΨһµÄ³ÉÔ±±äÁ¿ÊÇ TransportListener transportListener;£¬ÕâÒ²Òâζ×ÅÒ»¸ö Transport Ö§³ÖÀà°ó¶¨Ò»¸ö´«ËͼàÌýÆ÷À࣬´«ËͼàÌýÆ÷½Ó¿Ú TransportListener ×îÖØÒªµÄ·½·¨¾ÍÊÇ void onCommand(Object command);£¬ËüÓÃÀ´´¦ÀíÃüÁî¡£ÄÇôÕâ¸ö transportListener ÊÇÔÚÄÇÀï³õʼ»¯µÄÄØ£¿¿ÉÒÔ˼¿¼Ò»Ï ¼ÈÈ»ÊÇTransportSupport ΨһµÄ³ÉÔ±±äÁ¿£¬¶øÎÒÃÇËø´´½¨µÄTcpTransport ÊÇËûµÄ×ÓÀ࣬ÄÇôÊDz»ÊÇÔÚ´´½¨¸ÃtransportµÄʱºòÒà»òÊÇÔÚ¶ÔËû½øÐаü×°´¦ÀíµÄʱºò×öÁ˳õʼ»¯ÄØ£¿ ÎÒÃÇ»áÔÚÁ÷³ÌÖп´µ½ÔÚн¨ ActiveMQConnectionFactory µÄʱºòÓÐÒ»ÐйؼüµÄ´úÂ룺

connection = createActiveMQConnection(transport, factoryStats);

ÔÚÕâ¸ö·½·¨ÁùÃæ×·ËÝÏÂÈ¥£º»á½øÈë ActiveMQConnection µÄ¹¹Ôì·½·¨

protected ActiveMQConnection (final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {

this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;

// Configure a single threaded executor who's core thread can timeout if
// idle
executor = new ThreadPoolExecutor (1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
//Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
//thread.setDaemon(true);
return thread;
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo (new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant (transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

this.transport.setTransportListener(this);

this.stats = new JMSConnectionStatsImpl (sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates (transport.isFaultTolerant());
}

´ÓÒÔÉÏ´úÂëÎÒÃÇ·¢ÏÖ this.transport.setTransportListener(this); ÄÇôÕâ¸öthisÊÇÊ²Ã´ÄØ £¿ ÕýÊÇActiveMQConnection £¬¿´ÁËÒ»ÑÛ¸ÃÀ࣬·¢ÏÖÕâ¸öÀàʵÏÖÁË TransportListener £¬±¾Éí¾ÍÊÇÒ»¸öTransportListener¡£ËùÒÔÉÏÃæ transportListener.onCommand(command); ¾ÍÊÇ ActiveMQConnection.onCommand(command)¡£³ýÁËºÍ TransportÏ໥°ó¶¨£¬»¹¶ÔÏ̳߳ØÖ´ÐÐÆ÷ executor ½øÐÐÁ˳õʼ»¯¡£Õâ¸çÖ´ÐÐÆ÷ÊǺóÀ´Òª½øÐÐÏûÏ¢´¦ÀíµÄ¡£

ÕâÀïÃæ»áÕë¶Ô²»Í¬µÄÏûÏ¢×ö·Ö·¢£¬ÔÚActiveMQMessageConsumer#receive·½·¨ÖÐËødequeueËù·µ»ØµÄ¶ÔÏóÊÇMessageDispatch ¡£¼ÙÉèÕâÀï´«ÈëµÄ command ÊÇMessageDispatch£¬ÄÇôÕâ¸ö command µÄ visit ·½·¨¾Í»áµ÷ÓÃprocessMessageDispatch ·½·¨¡£¼ôÇгöÆäÖеĴúÂëÆ¬¶Î£º

public Response processMessageDispatch (MessageDispatch md) throws Exception {
// µÈ´ý Transport Öжϴ¦ÀíÍê³É
waitForTransportInterruptionProcessingToComplete();
// ÕâÀïͨ¹ýÏû·ÑÕß ID À´»ñÈ¡Ïû·ÑÕß¶ÔÏó
//£¨ActiveMQMessageConsumer ʵÏÖÁË ActiveMQDispatcher ½Ó¿Ú£©£¬ËùÒÔ
//MessageDispatch °üº¬ÁËÏûÏ¢Ó¦¸Ã±»·ÖÅäµ½ÄǸöÏû·ÑÕßµÄÓ³ÉäÐÅÏ¢
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
// µ÷ÓûỰActiveMQSession ×Ô¼ºµÄ dispatch ·½·¨À´´¦ÀíÕâÌõÏûÏ¢
dispatcher.dispatch(md);
} else {
LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
}
return null;
}

 

ÆäÖÐ ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());ÕâÐдúÂëµÄ dispatchers ÊÇÔÚ Í¨¹ýsession.createConsumer(destination); µÄʱºòͨ¹ý ActiveMQMessageConsumer µÄ¹¹Ôì·½·¨ÖÐÓÐÒ»ÐдúÂë £ºthis.session.addConsumer(this); ½« this´«È룬¼´ ActiveMQMessageConsumer ¶ÔÏó¡£¶øÕâ¸ö addConsumer ·½·¨£º

protected void addConsumer (ActiveMQMessageConsumer consumer) throws JMSException {
this.consumers.add(consumer);
if (consumer.isDurableSubscriber()) {
stats.onCreateDurableSubscriber();
}
this.connection.addDispatcher(consumer.getConsumerId(), this);
}

 

¿ÉÒÔ·¢ÏÖÕâÀïµÄ³õʼ»¯ÁË£ºthis.connection.addDispatcher(consumer.getConsumerId(), this); ÕâÀïµÄthis ¼´ ActiveMQSession¡£ËùÒԻص½ ActiveMQConnection#onCommand·½·¨ÄÚ processMessageDispatch Õâ¸ö·½·¨×îºóµ÷ÓÃÁË dispatcher.dispatch(md); Õâ¸ö·½·¨µÄºËÐŦÄܾÍÊÇ´¦ÀíÏûÏ¢µÄ·Ö·¢¡££º

public void dispatch(MessageDispatch messageDispatch) {
try {
executor.execute(messageDispatch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
connection.onClientInternalException(e);
}
}

ÕâÀïÀëÎÒÃÇÕæÕýÒªÕҵĽøÐÐÏûÏ¢Èë¶ÓµÄ½á¹ûºÜ½üÁË£¬½øÈëexecutor.execute(messageDispatch);Õâ¸ö·½·¨£º

void execute(MessageDispatch message) throws InterruptedException {

...........
//Èç¹û»á»°²»ÊÇÒì²½·Ö·¢²¢ÇÒûÓÐʹÓà sessionpool ·Ö·¢£¬Ôòµ÷Óà dispatch ·¢ËÍÏûÏ¢
if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
dispatch(message);
} else {//½«ÏûÏ¢Ö±½Ó·Åµ½¶ÓÁÐÀï
messageQueue.enqueue(message);
wakeup();
}
}

 

ÕâÀï×îºóÖÕÓÚ·¢ÏÖÁËÈë¶Ó£¬ÅжÏÊÇ·ñÒì²½·Ö·¢£¬²»Êǵϰ×ßdispatch(message) ·ñÔò½øÈëÒì²½·Ö·¢¡£Ä¬ÈÏÊDzÉÓÃÒì²½ÏûÏ¢·Ö·¢¡£ËùÒÔ£¬Ö±½Óµ÷Óà messageQueue.enqueue£¬°ÑÏûÏ¢·Åµ½¶ÓÁÐÖУ¬²¢ÇÒµ÷Óà wakeup ·½·¨:

public void wakeup() {
if (!dispatchedBySessionPool) {//½øÒ»²½ÑéÖ¤
// //ÅÐ¶Ï session ÊÇ·ñΪÒì²½·Ö·¢
if (session.isSessionAsyncDispatch()) {
try {
TaskRunner taskRunner = this.taskRunner;
if (taskRunner == null) {
synchronized (this) {
if (this.taskRunner == null) {
if (!isRunning()) {
// stop has been called
return;
}
//ͨ¹ý TaskRunnerFactory ´´½¨ÁËÒ»¸öÈÎÎñÔËÐÐÀà taskRunner£¬ÕâÀï°Ñ×Ô¼º×÷Ϊһ¸ö task ´«Èëµ½ createTaskRunner ÖУ¬
//˵Ã÷µ±Ç°µÄÀàÒ»¶¨ÊÇʵÏÖÁË Task ½Ó¿ÚµÄ. ¼òµ¥À´Ëµ£¬ ¾ÍÊÇͨ¹ýÏ̳߳ØÈ¥Ö´ÐÐÒ»¸öÈÎÎñ£¬Íê³ÉÒì²½µ÷¶È
//ÕâÀïÓÉÓÚexecutor != null ËùÒÔÕâ¸ötaskµÄÀàÐÍÊÇPooledTaskRunner
this.taskRunner = session.connection .getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: " + session.getSessionId());
}
taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {// Òì²½·Ö·¢
while (iterate()) {
}
}
}
}

ËùÒÔ£¬¶ÔÓÚÒì²½·Ö·¢µÄ·½Ê½£¬»áµ÷Óà ActiveMQSessionExecutor ÖÐµÄ iterate·½·¨£¬ÎÒÃÇÀ´¿´¿´Õâ¸ö·½·¨µÄ´úÂë iterate £¨£©£ºÕâ¸ö·½·¨ÀïÃæ×öÁ½¸öÊÂ

1.°ÑÏû·ÑÕß¼àÌýµÄËùÓÐÏûϢת´æµ½´ýÏû·Ñ¶ÓÁÐÖÐ

2. Èç¹û messageQueue »¹´æÔÚÒÅÁôÏûÏ¢£¬Í¬Ñù°ÑÏûÏ¢·Ö·¢(µ÷¶È)³öÈ¥

public boolean iterate() {
// Deliver any messages queued on the consumer to their listeners.<br> // ½«Ïû·ÑÕßÉÏÅŶӵÄÈκÎÏûÏ¢´«µÝ¸øËüÃǵÄÕìÌýÆ÷¡£
for (ActiveMQMessageConsumer consumer : this.session.consumers) {
if (consumer.iterate()) {
return true;
}
}
// No messages left queued on the listeners.. so now dispatch messages
// queued on the session<br> // ÕìÌýÆ÷ÉÏûÓÐÁôÏÂÅŶӵȴýµÄÏûÏ¢¡£ÏÖÔÚ·ÖÅÉÏûÏ¢
MessageDispatch message = messageQueue.dequeueNoWait();
if (message == null) {
return false;
} else {// ·Ö·¢(µ÷¶È)ÏûÏ¢
dispatch(message);
return !messageQueue.isEmpty();
}
}

 

dispatch(message);ÏûϢȷÈÏ·Ö·¢¡£Í¨¹ýActiveMQSessionExecutorµÄdispatch ·½·¨£¬×ªµ½ÁË ActiveMQMessageConsumer Ïû·ÑÕßÀàµÄ dispatch ·½·¨£º

public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
clearMessagesInProgress();
clearDeliveredList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {// ÅжÏÏûÏ¢ÊÇ·ñÎªÖØ·¢ÏûÏ¢
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
//ÎÒÕâ±ßͨ¹ýconsumer.receive()´¦ÀíÏûÏ¢£¬ËùÒÔÕâÀïlistenerΪ¿Õ£¬×ßÏÂÃæ
} else {
if (!unconsumedMessages.isRunning()) {
// delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage());
}

if (md.getMessage() == null) {
// End of browse or pull request timeout.
unconsumedMessages.enqueue(md);
} else {
if (!consumeExpiredMessage(md)) {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
¡¡¡¡¡¡¡¡¡¡¡¡.........
}

×îÖÕ»á×ßÈë unconsumedMessages.enqueue(md);Ìí¼ÓÏûÏ¢¡£ÕâÀïÐèҪעÒâµÄÊÇenqueue ·½·¨£ºÓÉÓÚÏû·ÑÕß¿ÉÄÜ´¦ÓÚ×èÈû״̬£¬ÕâÀï×öÁËÈë¶Óºó»ØÊÍ·ÅËø£¬Ò²¾ÍÊǽӴ¥×èÈû¡£

public void enqueue(MessageDispatch message) {
synchronized (mutex) {
list.addLast(message);
mutex.notify();
}
}

µ½ÕâÀïΪֹ£¬ÏûÏ¢ÈçºÎ½ÓÊÜÒÔ¼°ËûµÄ´¦Àí·½Ê½µÄÁ÷³Ì£¬ÎÒÃÇÒѾ­¸ãÇå³þÁË¡£ÆäʵÔÚÕâ¸öÏûÏ¢Ïû·ÑµÄÁ÷³ÌÖУ¬ÒѾ­ÔÚ½¨Á¢Á¬½Ó£¬´´½¨Ïû·ÑÕßµÄʱºò¾ÍÒѾ­³õʼ»¯ºÃÁËÏûÏ¢¶ÓÁÐÁË¡£½áºÏÉÏÃæµÄ¹ý³ÌÀ´¿´¿´Õû¸öÏû·ÑÁ÷³ÌµÄÁ÷³Ìͼ

Ïû·Ñ¶ËµÄ PrefetchSize£º

ÔÚÏûÏ¢·¢²¼µÄʱºòÎÒÃÇÔø¾­Ñо¿¹ý producerWindowSize ¡£Ö÷ÒªÓÃÀ´Ô¼ÊøÔÚÒì²½·¢ËÍʱproducer¶ËÔÊÐí»ýѹµÄ(ÉÐδACK)µÄÏûÏ¢µÄ´óС£¬ÇÒÖ»¶ÔÒì²½·¢ËÍÓÐÒâÒå¡£¶ÔÓÚ¿Í»§¶Ë£¬Ò²ÊÇÀàËÆ´æÔÚÕâôһ¸öÊôÐÔÀ´Ô¼Êø¿Í»§¶ËµÄÏûÏ¢´¦Àí¡£activemq µÄ consumer ¶ËÒ²Óд°¿Ú»úÖÆ£¬Í¨¹ý prefetchSize ¾Í¿ÉÒÔÉèÖô°¿Ú´óС¡£²»Í¬µÄÀàÐ͵ĶÓÁУ¬prefetchSize µÄĬÈÏÖµÒ²ÊDz»Ò»ÑùµÄ.

1. ³Ö¾Ã»¯¶ÓÁкͷdz־û¯¶ÓÁеÄĬÈÏֵΪ 1000

2.³Ö¾Ã»¯ topic ĬÈÏֵΪ 100

3.·Ç³Ö¾Ã»¯¶ÓÁеÄĬÈÏֵΪ Short.MAX_VALUE-1

²âÊÔ·½·¨ÊÇÔÚMQÉÏÉú²ú1000ÌõÏûÏ¢£¬ÏȺóÆô¶¯comsumer1£¬comsumer2 Á½¸öÏû·ÑÕß²¢ÇÒÑ­»·µ÷ÓÃ1000´ÎÏû·Ñ£¬ÎÒÂè»á·¢ÏÖ comsumer2 Äò»µ½ÏûÏ¢£¬Õâ¸öʱºòÎÒÃÇ¿ÉÒÔͨ¹ýdebug½øÈëcomsumer1 µÄActiveMQConnect»á·¢ÏÖÀïÃæÓиöÊôÐÔµÄsize=1000.Æäʵ¾ÍÊÇÕâ¸öprefetchSize£¬·­Òë¹ýÀ´ÊÇԤȡ´óС£¬Ïû·Ñ¶Ë»á¸ù¾ÝprefetchSize µÄ´óСÅúÁ¿»ñÈ¡Êý¾Ý¡£Òâ˼ÊÇÔÚ´´½¨Á¬½ÓµÄʱºò»áÈ¡»ñÈ¡1000ÌõÏûÏ¢Ô¤¼ÓÔØµ½»º´æÖеȴý´¦Àí£¬ÕâÑù×Óµ¼ÖÂcomsumer2È¥»ñÈ¡ÏûÏ¢µÄʱºò brokerÉÏÒѾ­¿ÕÁË¡£

prefetchSize µÄÉèÖ÷½·¨£º

ÔÚ createQueue ÖÐÌí¼Ó consumer.prefetchSize£¬¾Í¿ÉÒÔ¿´µ½Ð§¹û

Destination destination=session.createQueue ("myQueue?consumer.prefetchSize=10");

 

¼ÈÈ»ÓÐÅúÁ¿¼ÓÔØ£¬ÄÇôһ¶¨ÓÐÅúÁ¿È·ÈÏ£¬ÕâÑù²ÅËãÊdz¹µ×µÄÓÅ»¯£¬Õâ¾ÍÉæ¼°µ½ optimizeAcknowledge

optimizeAcknowledge

ActiveMQ ÌṩÁË optimizeAcknowledge À´ÓÅ»¯È·ÈÏ£¬Ëü±íʾÊÇ·ñ¿ªÆô¡°ÓÅ»¯ACK¡±£¬Ö»ÓÐÔÚΪ true µÄÇé¿öÏ£¬prefetchSize ÒÔ¼°optimizeAcknowledgeTimeout ²ÎÊý²Å»áÓÐÒâÒåÓÅ»¯È·ÈÏÒ»·½Ãæ¿ÉÒÔ¼õÇá client ¸ºµ££¨²»ÐèҪƵ·±µÄÈ·ÈÏÏûÏ¢£©¡¢¼õÉÙͨÐÅ¿ªÏú£¬ÁíÒ»·½ÃæÓÉÓÚÑÓ³ÙÁËÈ·ÈÏ£¨Ä¬ÈÏ ack ÁË 0.65*prefetchSize ¸öÏûÏ¢²ÅÈ·ÈÏ£©£¬Õâ¸öÔÚÔ´ÂëÖÐÓÐÌåÏÖ¡£ÔÚActiveMQMessageConsumer#receive·½·¨ÄڵĴ¦ÀíÏûÏ¢ºóµÄ afterMessageIsConsumed ·½·¨ÄÚÓÐÒ»¸öÅжϣº

if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);//Âú×ãÌõ¼þÔò·¢ËÍÅúÁ¿Ó¦´ðACK
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}

broker Ôٴη¢ËÍÏûϢʱÓÖ¿ÉÒÔÅúÁ¿·¢ËÍÈç¹ûÖ»ÊÇ¿ªÆôÁË prefetchSize£¬Ã¿ÌõÏûÏ¢¶¼È¥È·Èϵϰ£¬broker ÔÚÊÕµ½È·ÈϺóÒ²Ö»ÊÇ·¢ËÍÒ»ÌõÏûÏ¢£¬²¢²»ÊÇÅúÁ¿·¢²¼£¬µ±È»Ò²¿ÉÒÔͨ¹ýÉèÖà DUPS_OK_ACKÀ´ÊÖ¶¯ÑÓ³ÙÈ·ÈÏ£¬ ÎÒÃÇÐèÒªÔÚ brokerUrl Ö¸¶¨ optimizeACK Ñ¡Ïî



ConnectionFactory connectionFactory = new ActiveMQConnectionFactory ("tcp://192.168.11.153:61616? jms.optimizeAcknowledge =true&jms.optimizeAcknowledgeTimeOut=10000");

 

×¢Ò⣬Èç¹û optimizeAcknowledge Ϊ true£¬ÄÇô prefetchSize ±ØÐë´óÓÚ 0. µ± prefetchSize=0 µÄʱºò£¬±íʾ consumer ͨ¹ý PULL ·½Ê½´Ó broker »ñÈ¡ÏûÏ¢.

optimizeAcknowledge ºÍ prefetchSize µÄ×÷Óã¬Á½ÕßЭͬ¹¤×÷£¬Í¨¹ýÅúÁ¿»ñÈ¡ÏûÏ¢¡¢²¢ÑÓ³ÙÅúÁ¿È·ÈÏ£¬À´´ïµ½Ò»¸ö¸ßЧµÄÏûÏ¢Ïû·ÑÄ£ÐÍ¡£Ëü±È½ö¼õÉÙÁ˿ͻ§¶ËÔÚ»ñÈ¡ÏûϢʱµÄ×èÈû´ÎÊý£¬»¹ÄܼõÉÙÿ´Î»ñÈ¡ÏûϢʱµÄÍøÂçͨÐÅ¿ªÏú

ÐèҪעÒâµÄÊÇ£¬Èç¹ûÏû·Ñ¶ËµÄÏû·ÑËٶȱȽϸߣ¬Í¨¹ýÕâÁ½Õß×éºÏÊÇÄÜ´ó´óÌáÉý consumer µÄÐÔÄÜ¡£Èç¹û consumer µÄÏû·ÑÐÔÄܱ¾Éí¾Í±È½ÏÂý£¬ÉèÖñȽϴóµÄ prefetchSize ·´¶ø²»ÄÜÓÐЧµÄ´ïµ½ÌáÉýÏû·ÑÐÔÄܵÄÄ¿µÄ¡£ÒòΪ¹ý´óµÄprefetchSize ²»ÀûÓÚ consumer ¶ËÏûÏ¢µÄ¸ºÔؾùºâ¡£ÒòΪͨ³£Çé¿öÏ£¬ÎÒÃǶ¼»á²¿Êð¶à¸ö consumer ½ÚµãÀ´ÌáÉýÏû·Ñ¶ËµÄÏû·ÑÐÔÄÜ¡£Õâ¸öÓÅ»¯·½°¸»¹»á´æÔÚÁíÍâÒ»¸öDZÔÚ·çÏÕ£¬µ±ÏûÏ¢±»Ïû·ÑÖ®ºó»¹Ã»ÓÐÀ´µÃ¼°È·ÈÏʱ£¬client ¶Ë·¢Éú¹ÊÕÏ£¬ÄÇôÕâЩÏûÏ¢¾ÍÓпÉÄÜ»á±»ÖØÐ·¢Ë͸øÆäËûconsumer£¬ÄÇôÕâÖÖ·çÏÕ¾ÍÐèÒª client ¶ËÄܹ»ÈÝÈÌ¡°Öظ´¡±ÏûÏ¢¡£

ÏûÏ¢µÄÈ·ÈϹý³Ì£º

ÏûϢȷÈÏÓÐËÄÖÖ ACK_MODE£¬·Ö±ðÊÇ£º

1. AUTO_ACKNOWLEDGE = 1 ×Ô¶¯È·ÈÏ

2.CLIENT_ACKNOWLEDGE = 2 ¿Í»§¶ËÊÖ¶¯È·ÈÏ

3.DUPS_OK_ACKNOWLEDGE = 3 ×Ô¶¯ÅúÁ¿È·ÈÏ

4.SESSION_TRANSACTED = 0 ÊÂÎñÌá½»²¢È·ÈÏ

ACK_MODE µÄÑ¡ÔñÓ°Ïì×ÅÏûÏ¢Ïû·ÑÁ÷³ÌµÄ×ßÏò¡£ËäÈ» Client ¶ËÖ¸¶¨ÁË ACK ģʽ,µ«ÊÇÔÚ Client Óë broker ÔÚ½»»» ACK Ö¸ÁîµÄʱºò,»¹ÐèÒª¸æÖª ACK_TYPE,ACK_TYPE ±íʾ´ËÈ·ÈÏÖ¸ÁîµÄÀàÐÍ£¬²»Í¬µÄACK_TYPE ½«´«µÝ×ÅÏûÏ¢µÄ״̬£¬broker ¿ÉÒÔ¸ù¾Ý²»Í¬µÄ ACK_TYPE ¶ÔÏûÏ¢½øÐв»Í¬µÄ²Ù×÷¡£

ACK_TYPEÓ¦´ðÀàÐÍ£º

DELIVERED_ACK_TYPE = 0 ÏûÏ¢"ÒѽÓÊÕ"£¬µ«ÉÐδ´¦Àí½áÊø

STANDARD_ACK_TYPE = 2 "±ê×¼"ÀàÐÍ,ͨ³£±íʾΪÏûÏ¢"´¦Àí³É¹¦"£¬broker ¶Ë¿ÉÒÔɾ³ýÏûÏ¢ÁË

POSION_ACK_TYPE = 1 ÏûÏ¢"´íÎó",ͨ³£±íʾ"Åׯú"´ËÏûÏ¢£¬±ÈÈçÏûÏ¢ÖØ·¢¶à´Îºó£¬¶¼ÎÞ·¨ÕýÈ·´¦Àíʱ£¬ÏûÏ¢½«»á±»É¾³ý»òÕß DLQ(ËÀÐŶÓÁÐ)£¬ÔÚÏûÏ¢´¦ÀíµÄʱºò£¬dispatch·½·¨ÄÚ»áÅжϸÃÏûÏ¢ÊÇ·ñÎªÖØ·¢ÏûÏ¢

if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
// Õâ¶ÎΪ·ÇÖØ·¢ÏûÏ¢£¬×ßelse
} else {
// deal with duplicate delivery
ConsumerId consumerWithPendingTransaction;
if (redeliveryExpectedInCurrentTransaction(md, true)) {
LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
}
} else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
session.getConnection().rollbackDuplicate(this, md.getMessage());
dispatch(md);
} else {// ×ßPOSION_ACK_TYPE Ìí¼ÓActive_DLQ ËÀÐŶÓÁÐ
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
}
}

 

REDELIVERED_ACK_TYPE = 3 ÏûÏ¢Ðè"ÖØ·¢"£¬±ÈÈç consumer ´¦ÀíÏûϢʱÅ׳öÁËÒì³££¬broker ÉÔºó»áÖØÐ·¢ËÍ´ËÏûÏ¢

INDIVIDUAL_ACK_TYPE = 4 ±íʾֻȷÈÏ"µ¥ÌõÏûÏ¢",ÎÞÂÛÔÚÈκΠACK_MODE ÏÂ

UNMATCHED_ACK_TYPE = 5 ÔÚ Topic ÖУ¬Èç¹ûÒ»ÌõÏûÏ¢ÔÚת·¢¸ø¡°¶©ÔÄÕß¡±Ê±£¬·¢ÏÖ´ËÏûÏ¢²»·ûºÏ Selector ¹ýÂËÌõ¼þ£¬ÄÇô´ËÏûÏ¢½« ²»»áת·¢¸ø¶©ÔÄÕߣ¬ÏûÏ¢½«»á±»´æ´¢ÒýÇæÉ¾³ý(Ï൱ÓÚÔÚ Broker ÉÏÈ· ÈÏÁËÏûÏ¢)¡£

Client ¶ËÔÚ²»Í¬µÄ ACK ģʽʱ,½«Òâζ×ÅÔÚ²»Í¬µÄʱ»ú·¢ËÍ ACK Ö¸Áî,ÿ¸ö ACK Command Öлá°üº¬ ACK_TYPE,ÄÇô broker ¶Ë¾Í¿ÉÒÔ¸ù¾Ý ACK_TYPE À´¾ö¶¨´ËÏûÏ¢µÄºóÐø²Ù×÷¡£ÔÚ afterMessageIsConsumed ÏûÏ¢½ÓÊÕ´¦Àíºó»á¸ù¾ÝÌõ¼þÀ´ÉèÖà ACK_TYPE.

ÏûÏ¢µÄÖØ·¢»úÖÆÔ­Àí:

ÔÚÕý³£Çé¿öÏ£¬Óм¸ÖÐÇé¿ö»áµ¼ÖÂÏûÏ¢ÖØÐ·¢ËÍ

ÔÚÊÂÎñÐԻỰÖУ¬Ã»Óе÷Óà session.commit È·ÈÏÏûϢ崻ú»òÕßµ÷ÓÃsession.rollback ·½·¨»Ø¹öÏûÏ¢

ÔÚ·ÇÊÂÎñÐԻỰÖУ¬ACK ģʽΪ CLIENT_ACKNOWLEDGE (¿Í»§¶ËÊÖ¶¯Ó¦´ð)µÄÇé¿öÏ£¬Ã»Óе÷Óà session.commit»òÕßµ÷ÓÃÁË recover ·½·¨£»

Ò»¸öÏûÏ¢±» redelivedred ³¬¹ýĬÈϵÄ×î´óÖØ·¢´ÎÊý£¨Ä¬ÈÏ 6 ´Î£©Ê±£¬Ïû·Ñ¶Ë»á¸ø broker ·¢ËÍÒ»¸ö¡±poison ack¡±±íʾÕâ¸öÏûÏ¢Óж¾£¬¸æËß broker ²»ÒªÔÙ·¢ÁË¡£Õâ¸öʱºò broker »á°ÑÕâ¸öÏûÏ¢·Åµ½ DLQ£¨ËÀÐŶÓÁУ©¡£

ËÀÐŶÓÁУº

ActiveMQ ÖÐĬÈϵÄËÀÐŶÓÁÐÊÇ ActiveMQ.DLQ£¬Èç¹ûûÓÐÌØ±ðµÄÅäÖã¬Óж¾µÄÏûÏ¢¶¼»á±»·¢Ë͵½Õâ¸ö¶ÓÁС£Ä¬ÈÏÇé¿öÏ£¬Èç¹û³Ö¾ÃÏûÏ¢¹ýÆÚÒÔºó£¬Ò²»á±»Ë͵½ DLQ ÖС£

Ö»ÒªÔÚ´¦ÀíÏûÏ¢µÄʱºòÅ׳öÒ»¸öÒì³£¾Í¿ÉÒÔÑÝʾ£¬»á¿´µ½¿ØÖÆÌ¨¶ÔÓÚʧ°ÜÏûÏ¢»áÖØ·¢6´Î£¬µÇ½ActiveMQ¿ØÖÆÌ¨»á¿´µ½Ò»¸ö ActiveMQ.DLQ¡£ÔÚ´´½¨¶ÓÁеÄʱºò¿ÉÒÔÖ±½ÓÖ¸¶¨´ÓActiveMQ.DLQÈ¥Ïû·ÑÏûÏ¢¡£

ËÀÐŶÓÁÐÅäÖòßÂÔ£º

ȱʡËùÓжÓÁеÄËÀÐÅÏûÏ¢¶¼±»·¢Ë͵½Í¬Ò»¸öȱʡËÀÐŶÓÁУ¬²»±ãÓÚ¹ÜÀí£¬¿ÉÒÔͨ¹ý individualDeadLetterStrategy »ò sharedDeadLetterStrategy ²ßÂÔÀ´½øÐÐÐ޸ġ£ÔÚactivemq.xmlÉÏ

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:

http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>

// ¡°>¡±±íʾ¶ÔËùÓжÓÁÐÉúЧ£¬Èç¹ûÐèÒªÉèÖÃÖ¸¶¨¶ÓÁУ¬ÔòÖ±½Óд¶ÓÁÐÃû³Æ
<policyEntry queue=">">
<deadLetterStrategy>
//queuePrefix:ÉèÖÃËÀÐŶÓÁÐǰ׺
//useQueueForQueueMessage ÉèÖöÓÁб£´æµ½ËÀÐÅ¡£
<individualDeadLetterStrategy queuePrefix ="DLQ."useQueueForQueueMessages ="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

 

×Ô¶¯¶ªÆú¹ýÆÚÏûÏ¢

<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>

ActiveMQ ¾²Ì¬ÍøÂçÅäÖãºbrokerÍøÂçÁ¬½Ó(brokerµÄ¸ßÐÔÄÜ·½°¸)

ÐÞ¸Ä activeMQ ·þÎñÆ÷µÄ activeMQ.xml, Ôö¼ÓÈçÏÂÅäÖã¬Õâ¸öÅäÖÃÖ»ÄÜʵÏÖµ¥ÏòÁ¬½Ó£¬ÊµÏÖË«ÏòÁ¬½ÓÐèÒª¸÷¸ö½Úµã¶¼ÅäÖÃÈçÏÂÅäÖá£

<networkConnectors>
<networkConnector uri="static:// (tcp://192.168.254.135:61616, tcp://192.168.254.136:61616)"/>
</networkConnectors>

Á½¸ö Brokers ͨ¹ýÒ»¸ö static µÄЭÒéÀ´½øÐÐÍøÂçÁ¬½Ó¡£Ò»¸ö Consumer Á¬½Óµ½BrokerB µÄÒ»¸öµØÖ·ÉÏ£¬µ± Producer ÔÚ BrokerA ÉÏÒÔÏàͬµÄµØÖ··¢ËÍÏûÏ¢£¬´ËʱÏûÏ¢»á±»×ªÒƵ½ BrokerB ÉÏ£¬Ò²¾ÍÊÇ˵ BrokerA »áת·¢ÏûÏ¢µ½BrokerB ÉÏ¡£

ÔÚactiveMQÖУ¬½øÐÐÁ˾²Ì¬ÍøÂçÇŽӵÄÁ½Ì¨½Úµã¶øÑÔ£¬µ± Producer ÔÚ BrokerA ÉÏÒÔÏàͬµÄµØÖ··¢ËÍ10ÌõÏûÏ¢¡£Ò»¸ö Consumer Á¬½Óµ½BrokerBÈ¥Ïû·ÑÏûÏ¢£¬µ±Ïû·ÑÁËÒ»°ëµÄʱºò³öÏÖÒì³£ÁË£¬ÄÇôʣÏÂÀ´Î´´¦ÀíµÄÏûÏ¢»á±»´æ·Åµ½ BrokerB µÄ´ý´¦ÀíÏûÏ¢¶ÓÁÐÖУ¬´ËʱҪͨ¹ýBrokerAÔÙÈ¥Ïû·ÑÊÇÏû·Ñ²»µ½µÄ£¬ÍòÒ»´Ë¿ÌBrokerB ¹ÒÁË£¬ÄÇôÄÄЩûÓÐÏû·ÑµÄÏûÏ¢½«»á¶ªÊ§¡£mq¸øÎÒÃÇÌṩÁËÒ»¸öÓÐЧµÄÏûÏ¢»ØÁ÷»úÖÆ¡£

<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>

ActiveMQ µÄÓÅȱµã

ActiveMQ ²ÉÓÃÏûÏ¢ÍÆËÍ·½Ê½£¬ËùÒÔ×îÊʺϵij¡¾°ÊÇĬÈÏÏûÏ¢¶¼¿ÉÔÚ¶Ìʱ¼äÄÚ±»Ïû·Ñ¡£Êý¾ÝÁ¿Ô½´ó£¬²éÕÒºÍÏû·ÑÏûÏ¢¾ÍÔ½Âý£¬ÏûÏ¢»ýѹ³Ì¶ÈÓëÏûÏ¢Ëٶȳɷ´±È¡£

ȱµã

1.ÍÌÍÂÁ¿µÍ¡£ÓÉÓÚ ActiveMQ ÐèÒª½¨Á¢Ë÷Òý£¬µ¼ÖÂÍÌÍÂÁ¿Ï½µ¡£ÕâÊÇÎÞ·¨¿Ë·þµÄȱµã£¬Ö»ÒªÊ¹ÓÃÍêÈ«·ûºÏ JMS ¹æ·¶µÄÏûÏ¢Öмä¼þ£¬¾ÍÒª½ÓÊÜÕâ¸ö¼¶±ðµÄTPS¡£

2.ÎÞ·ÖÆ¬¹¦ÄÜ¡£ÕâÊÇÒ»¸ö¹¦ÄÜȱʧ£¬JMS ²¢Ã»Óй涨ÏûÏ¢Öмä¼þµÄ¼¯Èº¡¢·ÖƬ»úÖÆ¡£¶øÓÉÓÚ ActiveMQ ÊÇΰÆóÒµ¼¶¿ª·¢Éè¼ÆµÄÏûÏ¢Öмä¼þ£¬³õÖÔ²¢²»ÊÇΪÁË´¦Àíº£Á¿ÏûÏ¢ºÍ¸ß²¢·¢ÇëÇó¡£Èç¹ûһ̨·þÎñÆ÷²»ÄܳÐÊܸü¶àÏûÏ¢£¬ÔòÐèÒªºáÏò²ð·Ö¡£ActiveMQ ¹Ù·½²»Ìṩ·ÖƬ»úÖÆ£¬ÐèÒª×Ô¼ºÊµÏÖ¡£

ÊÊÓó¡¾°

1. ¶Ô TPS ÒªÇó±È½ÏµÍµÄϵͳ£¬¿ÉÒÔʹÓà ActiveMQ À´ÊµÏÖ£¬Ò»·½Ãæ±È½Ï¼òµ¥£¬Äܹ»¿ìËÙÉÏÊÖ¿ª·¢£¬ÁíÒ»·½Ãæ¿É¿ØÐÔÒ²±È½ÏºÃ£¬»¹ÓÐ±È½ÏºÃµÄ¼à¿Ø»úÖÆºÍ½çÃæ

²»ÊÊÓõij¡¾°

1.ÏûÏ¢Á¿¾Þ´óµÄ³¡¾°¡£ActiveMQ ²»Ö§³ÖÏûÏ¢×Ô¶¯·ÖƬ»úÖÆ£¬Èç¹ûÏûÏ¢Á¿¾Þ´ó£¬µ¼ÖÂһ̨·þÎñÆ÷²»ÄÜ´¦ÀíÈ«²¿ÏûÏ¢£¬¾ÍÐèÒª×Ô¼º¿ª·¢ÏûÏ¢·ÖƬ¹¦ÄÜ¡£

 

   
3374 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

ÆóÒµ¼Ü¹¹¡¢TOGAFÓëArchiMate¸ÅÀÀ
¼Ü¹¹Ê¦Ö®Â·-ÈçºÎ×öºÃÒµÎñ½¨Ä££¿
´óÐÍÍøÕ¾µçÉÌÍøÕ¾¼Ü¹¹°¸ÀýºÍ¼¼Êõ¼Ü¹¹µÄʾÀý
ÍêÕûµÄArchimateÊÓµãÖ¸ÄÏ£¨°üÀ¨Ê¾Àý£©
Ïà¹ØÎĵµ

Êý¾ÝÖÐ̨¼¼Êõ¼Ü¹¹·½·¨ÂÛÓëʵ¼ù
ÊÊÓÃArchiMate¡¢EA ºÍ iSpace½øÐÐÆóÒµ¼Ü¹¹½¨Ä£
ZachmanÆóÒµ¼Ü¹¹¿ò¼Ü¼ò½é
ÆóÒµ¼Ü¹¹ÈÃSOAÂ䵨
Ïà¹Ø¿Î³Ì

ÔÆÆ½Ì¨Óë΢·þÎñ¼Ü¹¹Éè¼Æ
ÖÐ̨սÂÔ¡¢ÖÐ̨½¨ÉèÓëÊý×ÖÉÌÒµ
ÒÚ¼¶Óû§¸ß²¢·¢¡¢¸ß¿ÉÓÃϵͳ¼Ü¹¹
¸ß¿ÉÓ÷ֲ¼Ê½¼Ü¹¹Éè¼ÆÓëʵ¼ù