±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcnblogs£¬ÎÄÕÂÖ÷Òª°¸ÀýΪÖ÷À´½éÉÜActiveMQÏûÏ¢µÄÏû·ÑÔÀí£¬´Ó·½·¨µ½¹ý³ÌÒÔ¼°·½°¸µÈ·½ÃæÏêϸ½éÉÜ¡£
|
|
Ïû·Ñ¶ËÏû·ÑÏûÏ¢£º
ÕâÀï˵ÁËÁ½ÖÖ·½·¨£¬Á½ÖÖ·½·¨¿ÉÒÔ½ÓÊÕÏûÏ¢£¬Ò»ÖÖÊÇʹÓÃͬ²½×èÈûµÄActiveMQMessageConsumer#receive·½·¨¡£ÁíÒ»ÖÖÊÇʹÓÃÏûÏ¢¼àÌýÆ÷MessageListener¡£ÕâÀïÐèҪעÒâµÄÊÇ£¬ÔÚͬһ¸ösessionÏ£¬ÕâÁ½Õß²»ÄÜͬʱ¹¤×÷£¬Ò²¾ÍÊÇ˵²»ÄÜÕë¶Ô²»Í¬ÏûÏ¢²ÉÓò»Í¬µÄ½ÓÊÕ·½Ê½¡£·ñÔò»áÅ׳öÒì³£¡£ÖÁÓÚΪʲôÕâô×ö£¬×î´óµÄÔÒò»¹ÊÇÔÚÊÂÎñÐԻỰÖУ¬Á½ÖÖÏû·ÑģʽµÄÊÂÎñ²»ºÃ¹Ü¿Ø¡£
ÏÈͨ¹ýActiveMQMessageConsumer#receive ·½·¨À´¶ÔÏûÏ¢µÄ½ÓÊÜһ̽¾¿¾¹£º
public Message
receive() throws JMSException {
checkClosed();
//¼ì²éreceiveºÍMessageListenerÊÇ·ñͬʱÅäÖÃÔÚµ±Ç°µÄ»á»°ÖУ¬ÓÐÔòÅ׳öÒì³£
checkMessageListener();
//Èç¹ûPrefetchSizeSizeΪ0²¢ÇÒunconsumerMessageΪ¿Õ£¬Ôò·¢ÆðpullÃüÁî
sendPullCommand(0);
MessageDispatch md = dequeue(-1);//³öÁУ¬»ñÈ¡ÏûÏ¢
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
//·¢ËÍack¸øµ½broker
afterMessageIsConsumed(md, false);
//»ñÈ¡ÏûÏ¢²¢·µ»Ø
return createActiveMQMessage(md);
} |
ÏÂÃæ¼òµ¥µÄ˵һÏÂÒÔÉϼ¸¸öºËÐÄ·½·¨ÖÐ×öÁËʲô²»ÎªÈËÖªµÄÊ£º
sendPullCommand(0) £º·¢ËÍpullÃüÁî´ÓbrokerÉÏ»ñÈ¡ÏûÏ¢£¬Ç°ÌáÊÇprefetchSize=0²¢ÇÒunconsumedMessagesΪ¿Õ¡£unconsumedMessage±íʾδÏû·ÑµÄÏûÏ¢£¬ÕâÀïÃæÔ¤¶ÁÈ¡µÄÏûÏ¢´óСΪprefetchSizeµÄÖµ
protected void
sendPullCommand(long timeout) throws JMSException
{
clearDeliveredList();
if (info.getCurrentPrefetchSize() == 0 &&
unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
messagePull.setTimeout(timeout);
//Ïò·þÎñ¶ËÒì²½·¢ËÍmessagePullÖ¸Áî
session.asyncSendPacket(messagePull);
}
} |
ÕâÀï·¢ËÍÒì²½ÏûÏ¢¸úÏûÏ¢Éú²úµÄÔÀíÊÇÒ»ÑùµÄ¡£Í¨¹ý°ü×°Á´È¥µ÷Óà Sokect ·¢ËÍÇëÇó¡£
clearDeliveredList()£º
ÔÚÉÏÃæµÄsendPullCommand·½·¨ÖУ¬»áÏȵ÷ÓÃclearDeliveredList·½·¨£¬Ö÷ÒªÓÃÀ´ÇåÀíÒѾ·Ö·¢µÄÏûÏ¢Á´±ídeliveredMessagesdeliveredMessages£¬´æ´¢·Ö·¢¸øÏû·ÑÕßµ«»¹ÎªÓ¦´ðµÄÏûÏ¢Á´±í
Èç¹ûsessionÊÇÊÂÎñµÄ£¬Ôò»á±éÀúdeliveredMessageÖеÄÏûÏ¢·ÅÈëµ½previouslyDeliveredMessageÖÐÀ´×öÖØ·¢
Èç¹ûsessionÊÇ·ÇÊÂÎñµÄ£¬¸ù¾ÝACKµÄģʽÀ´Ñ¡Ôñ²»Í¬µÄÓ¦´ð²Ù×÷
ÕâÊǸöͬ²½µÄ¹ý³Ì£º
private void
clearDeliveredList() {
if (clearDeliveredList) {//ÅжÏÊÇ·ñÇå³þ
synchronized (deliveredMessages) {//²ÉÓÃË«ÖØ¼ì²éËø
if (clearDeliveredList) {
if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) {//ÊÇÊÂÎñÏûÏ¢
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,
Boolean>(session.getTransactionContext ().getTransactionId());
}
for (MessageDispatch delivered : deliveredMessages)
{
previouslyDeliveredMessages.put (delivered.getMessage().getMessageId(),
false);
}
LOG.debug("{} tracking existing transacted
{} delivered list ({}) on transport interrupt",
getConsumerId(), previouslyDeliveredMessages.transactionId,
deliveredMessages.size());
} else {
if (session.isClientAcknowledge()) {
LOG.debug("{} rolling back delivered list
({}) on transport interrupt", getConsumerId(),
deliveredMessages.size());
// allow redelivery
if (!this.info.isBrowser()) {
for (MessageDispatch md: deliveredMessages) {
this.session.connection.rollbackDuplicate (this,
md.getMessage());
}
}
}
LOG.debug("{} clearing delivered list ({})
on transport interrupt", getConsumerId(),
deliveredMessages.size());
deliveredMessages.clear();
pendingAck = null;
}
}
clearDeliveredList = false;
}
}
}
} |
dequeue(-1) £º´ÓunconsumedMessageÖÐÈ¡³öÒ»¸öÏûÏ¢£¬ÔÚ´´½¨Ò»¸öÏû·ÑÕßʱ£¬¾Í»áΪÕâ¸öÏû·ÑÕß´´½¨Ò»¸öδÏû·ÑµÄÏûϢͨµÀ£¬Õâ¸öͨµÀ·ÖΪÁ½ÖÖ£¬Ò»ÖÖÊǼòµ¥ÓÅÏȼ¶¶ÓÁзַ¢Í¨µÀSimplePriorityMessageDispatchChannel
£»ÁíÒ»ÖÖÊÇÏȽøÏȳöµÄ·Ö·¢Í¨µÀFifoMessageDispatchChannel.ÖÁÓÚΪʲôҪ´æÔÚÕâÑùÒ»¸öÏûÏ¢·Ö·¢Í¨µÀ£¬´ó¼Ò¿ÉÒÔÏëÏóһϣ¬Èç¹ûÏû·ÑÕßÿ´ÎÈ¥Ïû·ÑÍêÒ»¸öÏûÏ¢ÒÔºóÔÙÈ¥brokerÄÃÒ»¸öÏûÏ¢£¬Ð§ÂÊÊDZȽϵ͵ġ£ËùÒÔͨ¹ýÕâÑùµÄÉè¼Æ¿ÉÒÔÔÊÐísessionÄܹ»Ò»´ÎÐÔ½«¶àÌõÏûÏ¢·Ö·¢¸øÒ»¸öÏû·ÑÕß¡£Ä¬ÈÏÇé¿ö϶ÔÓÚqueueÀ´Ëµ£¬prefetchSizeµÄÖµÊÇ1000
private MessageDispatch
dequeue (long timeout) throws JMSException {
try {
long deadline = 0;
if (timeout > 0) {
deadline = System.currentTimeMillis() + timeout;
}
while (true) {//protected final MessageDispatchChannel
unconsumedMessages;
MessageDispatch md = unconsumedMessages.dequeue(timeout);
...........
} |
beforeMessageIsConsumed(md)£ºÕâÀïÃæÖ÷ÒªÊÇ×öÏûÏ¢Ïû·Ñ֮ǰµÄһЩ׼±¸¹¤×÷£¬Èç¹ûACKÀàÐͲ»ÊÇDUPS_OK_ACKNOWLEDGE»òÕß¶ÓÁÐģʽ£¨¼òµ¥À´Ëµ¾ÍÊdzýÁËTopicºÍDupAckÕâÁ½ÖÖÇé¿ö£©£¬ËùÓеÄÏûÏ¢Ïȷŵ½deliveredMessagesÁ´±íµÄ¿ªÍ·¡£²¢ÇÒÈç¹ûµ±Ç°ÊÇÊÂÎñÀàÐ͵ĻỰ£¬ÔòÅжÏtransactedIndividualAck£¬Èç¹ûΪtrue£¬±íʾµ¥ÌõÏûÏ¢Ö±½Ó·µ»Øack¡£
·ñÔò£¬µ÷ÓÃackLater£¬ÅúÁ¿Ó¦´ð, client¶ËÔÚÏû·ÑÏûÏ¢ºóÔÝÇÒ²»·¢ËÍACK£¬¶øÊǰÑËü»º´æÏÂÀ´(pendingACK)£¬µÈµ½ÕâЩÏûÏ¢µÄÌõÊý´ïµ½Ò»¶¨·§ÖµÊ±£¬Ö»ÐèҪͨ¹ýÒ»¸öACKÖ¸Áî°ÑËüÃÇÈ«²¿È·ÈÏ£»Õâ±È¶ÔÿÌõÏûÏ¢¶¼Öð¸öÈ·ÈÏ£¬ÔÚÐÔÄÜÉÏÒªÌá¸ßºÜ¶à¡£
private void
beforeMessageIsConsumed(MessageDispatch md) throws
JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}¡¡ |
afterMessageIsConsumed£ºÕâ¸ö·½·¨µÄÖ÷Òª×÷ÓÃÊÇÖ´ÐÐÓ¦´ð²Ù×÷£¬ÕâÀïÃæ×öÒÔϼ¸¸ö²Ù×÷
1.Èç¹ûÏûÏ¢¹ýÆÚ£¬Ôò·µ»ØÏûÏ¢¹ýÆÚµÄack
2. Èç¹ûÊÇÊÂÎñÀàÐ͵ĻỰ£¬Ôò²»×öÈκδ¦Àí
3. Èç¹ûÊÇAUTOACK»òÕߣ¨DUPS_OK_ACKÇÒÊǶÓÁУ©£¬²¢ÇÒÊÇÓÅ»¯ack²Ù×÷£¬Ôò×ßÅúÁ¿È·ÈÏack
4.Èç¹ûÊÇDUPS_OK_ACK£¬Ôò×ßackLaterÂß¼
5.Èç¹ûÊÇCLIENT_ACK£¬ÔòÖ´ÐÐackLater
private void
afterMessageIsConsumed(MessageDispatch md, boolean
messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false,
true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
// AMQ-3956 evaluate both expired and normal msgs
as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize()
* .65) || (optimizeAcknowledgeTimeOut > 0 &&
System.currentTimeMillis() >= (optimizeAckTimestamp
+ optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg
just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter
> 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge() ||session.isIndividualAcknowledge())
{
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater (md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid
session state.");
}
}
}¡¡ |
ÆäʵÔÚÒÔÉÏÏûÏ¢µÄ½ÓÊÕ¹ý³ÌÖУ¬ÎÒÃǽö½öÄÜ¿´µ½Õâ¸öÏûÏ¢´ÓÒ»¸ö±¾µØ±äÁ¿Öгö¶Ó£¬²¢Ã»ÓжÔÔ¶³ÌÏûÏ¢ÖÐÐÄ·¢ËÍͨѶ»ñÈ¡£¬ÄÇôÕâ¸öÏûϢʱʲôʱºò¹ýÀ´µÄÄØ£¿Ò²¾ÍÊÇÏûÏ¢³ö¶ÓÖÐ
unconsumedMessages Õâ¸ö¶«¶«Ê±Ê²Ã´Ê±ºò³õʼ»¯µÄÄØ £¿ÄÇô½ÓÏÂÈ¥ÎÒÃÇÓ¦¸Ãȥͨ¹ý´´½¨Á¬½ÓµÄʱºòÈ¥¿´¿´ÁË£¬¾ßÌåÁ¬½ÓµÄʱºò¶¼×öÁËÊ²Ã´ÄØ£ºconnectionFactory.createConnection()
private void
afterMessageIsConsumed (MessageDispatch md, boolean
messageExpired) throws JMSException {
protected ActiveMQConnection createActiveMQConnection(String
userName, String password) throws JMSException
{
if (brokerURL == null) {
throw new ConfigurationException("brokerURL
not set.");
}
ActiveMQConnection connection = null;
try {// ¹ûÈ»·¢ÏÖÁËÕâ¸ö¶«¶«µÄ³õʼ»¯
Transport transport = createTransport();
// ´´½¨Á¬½Ó
connection = createActiveMQConnection(transport,
factoryStats);
// ÉèÖÃÓû§ÃÜÂë
connection.setUserName(userName);
connection.setPassword(password);
// ¶ÔÁ¬½Ó×ö°ü×°
configureConnection(connection);
// Æô¶¯Ò»¸öºǫ́´«ÊäÏß³Ì
transport.start();
// ÉèÖÿͻ§¶ËÏû·ÑµÄid
if (clientID != null) {
connection.setDefaultClientID(clientID);
}
return connection;
} ......
} |
´´½¨Á¬½ÓµÄ¹ý³Ì¾ÍÊÇ´´½¨³ýÁËÒ»¸ö´øÓÐÁ´Â·°ü×°µÄTcpTransport
²¢ÇÒ´´½¨Á¬½Ó£¬×îºóÆô¶¯Ò»¸ö´«ÊäỊ̈߳¬¶øÕâÀïµÄ transport.start() µ÷ÓõÄÓ¦¸ÃÊÇTcpTransport
ÀïÃæµÄ·½·¨£¬È»¶øÕâ¸öÀàÖв¢Ã»ÓÐ start£¬¶øÊÇÔÚ¸¸Àà
ServiceSupport.start()ÖУº
public void
start() throws Exception {
if (started.compareAndSet(false, true)) {
boolean success = false;
stopped.set(false);
try {
preStart();//һЩ³õʼ»¯
doStart();
success = true;
} finally {
started.set(success);
}
for(ServiceListener l:this.serviceListeners) {
l.started(this);
}
}
} |
doStart ·½·¨Ç°×öÁËһϵÁеijõʼ»¯£¬È»ºóµ÷Óà TcpTransportµÄdoStart()
·½·¨£º
protected void
doStart() throws Exception {
connect();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
} |
¼Ì¶ø¹¹½¨Ò»¸öÁ¬½Ó ÉèÖÃÒ»¸ö CountDownLatch ÃÅãÅ £¬µ÷Óø¸Àà
TransportThreadSupport µÄ·½·¨£¬Ð½¨ÁËÒ»¸ö¾«ÁéÏ̲߳¢ÇÒÆô¶¯£º
protected void
doStart() throws Exception {
runner = new Thread(null, this, "ActiveMQ
Transport: " + toString(), stackSize);
runner.setDaemon(daemon);
runner.start();
} |
µ÷ÓÃTransportThreadSupport.doStart().
´´½¨ÁËÒ»¸öỊ̈߳¬´«ÈëµÄÊÇ this£¬µ÷ÓÃ×ÓÀàµÄ run ·½·¨£¬Ò²¾ÍÊÇ TcpTransport.run().
public void
run() {
LOG.trace("TCP consumer thread for "
+ this + " starting");
this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
doRun();
}
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
} catch (Throwable e){
stoppedLatch.get().countDown();
IOException ioe=new IOException("Unexpected
error occurred: " + e);
ioe.initCause(e);
onException(ioe);
}finally {
stoppedLatch.get().countDown();
}
} |
run ·½·¨Ö÷ÒªÊÇ´Ó socket ÖжÁÈ¡Êý¾Ý°ü£¬Ö»Òª TcpTransport
ûÓÐÍ£Ö¹£¬Ëü¾Í»á²»¶ÏÈ¥µ÷Óà doRun£ºÕâÀïÃæ£¬Í¨¹ý wireFormat ¶ÔÊý¾Ý½øÐиñʽ»¯£¬¿ÉÒÔÈÏΪÕâÊÇÒ»¸ö·´ÐòÁл¯¹ý³Ì¡£wireFormat
ĬÈÏʵÏÖÊÇ OpenWireFormat£¬activeMQ ×Ô¶¨ÒåµÄ¿çÓïÑÔµÄwire ÐÒé
protected void
doRun() throws IOException {
try {//ͨ¹ý readCommand È¥¶ÁÈ¡Êý¾Ý
Object command = readCommand();
//Ïû·ÑÏûÏ¢
doConsume(command);
} catch (SocketTimeoutException e) {
} catch (InterruptedIOException e) {
}
}
protected Object readCommand() throws IOException
{
return wireFormat.unmarshal(dataIn);
} |
doConsume£ºÁ÷³Ì×ßµ½ÁËÏû·ÑÏûÏ¢£º
public void
doConsume(Object command) {
if (command != null) {//±íʾÒѾÄõ½ÁËÏûÏ¢
if (transportListener != null) {
transportListener.onCommand(command);
} else {
LOG.error("No transportListener available
to process inbound command: " + command);
}
}
} |
TransportSupport ÀàÖÐΨһµÄ³ÉÔ±±äÁ¿ÊÇ TransportListener
transportListener;£¬ÕâÒ²Òâζ×ÅÒ»¸ö Transport Ö§³ÖÀà°ó¶¨Ò»¸ö´«ËͼàÌýÆ÷À࣬´«ËͼàÌýÆ÷½Ó¿Ú
TransportListener ×îÖØÒªµÄ·½·¨¾ÍÊÇ void onCommand(Object command);£¬ËüÓÃÀ´´¦ÀíÃüÁî¡£ÄÇôÕâ¸ö
transportListener ÊÇÔÚÄÇÀï³õʼ»¯µÄÄØ£¿¿ÉÒÔ˼¿¼Ò»Ï ¼ÈÈ»ÊÇTransportSupport
ΨһµÄ³ÉÔ±±äÁ¿£¬¶øÎÒÃÇËø´´½¨µÄTcpTransport ÊÇËûµÄ×ÓÀ࣬ÄÇôÊDz»ÊÇÔÚ´´½¨¸ÃtransportµÄʱºòÒà»òÊÇÔÚ¶ÔËû½øÐаü×°´¦ÀíµÄʱºò×öÁ˳õʼ»¯ÄØ£¿
ÎÒÃÇ»áÔÚÁ÷³ÌÖп´µ½ÔÚн¨ ActiveMQConnectionFactory µÄʱºòÓÐÒ»ÐйؼüµÄ´úÂ룺
connection =
createActiveMQConnection(transport, factoryStats); |
ÔÚÕâ¸ö·½·¨ÁùÃæ×·ËÝÏÂÈ¥£º»á½øÈë ActiveMQConnection
µÄ¹¹Ôì·½·¨
protected ActiveMQConnection (final
Transport transport, IdGenerator clientIdGenerator,
IdGenerator connectionIdGenerator, JMSStatsImpl
factoryStats) throws Exception {
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;
// Configure a single threaded executor who's
core thread can timeout if
// idle
executor = new ThreadPoolExecutor (1, 1, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new
ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection
Executor: " + transport);
//Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
//thread.setDaemon(true);
return thread;
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo (new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant (transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(),
-1);
this.transport.setTransportListener(this);
this.stats = new JMSConnectionStatsImpl (sessions,
this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates (transport.isFaultTolerant());
} |
´ÓÒÔÉÏ´úÂëÎÒÃÇ·¢ÏÖ this.transport.setTransportListener(this);
ÄÇôÕâ¸öthisÊÇÊ²Ã´ÄØ £¿ ÕýÊÇActiveMQConnection £¬¿´ÁËÒ»ÑÛ¸ÃÀ࣬·¢ÏÖÕâ¸öÀàʵÏÖÁË
TransportListener £¬±¾Éí¾ÍÊÇÒ»¸öTransportListener¡£ËùÒÔÉÏÃæ transportListener.onCommand(command);
¾ÍÊÇ ActiveMQConnection.onCommand(command)¡£³ýÁËºÍ TransportÏ໥°ó¶¨£¬»¹¶ÔÏ̳߳ØÖ´ÐÐÆ÷
executor ½øÐÐÁ˳õʼ»¯¡£Õâ¸çÖ´ÐÐÆ÷ÊǺóÀ´Òª½øÐÐÏûÏ¢´¦ÀíµÄ¡£
ÕâÀïÃæ»áÕë¶Ô²»Í¬µÄÏûÏ¢×ö·Ö·¢£¬ÔÚActiveMQMessageConsumer#receive·½·¨ÖÐËødequeueËù·µ»ØµÄ¶ÔÏóÊÇMessageDispatch
¡£¼ÙÉèÕâÀï´«ÈëµÄ command ÊÇMessageDispatch£¬ÄÇôÕâ¸ö command µÄ visit
·½·¨¾Í»áµ÷ÓÃprocessMessageDispatch ·½·¨¡£¼ôÇгöÆäÖеĴúÂëÆ¬¶Î£º
public Response
processMessageDispatch (MessageDispatch md) throws
Exception {
// µÈ´ý Transport Öжϴ¦ÀíÍê³É
waitForTransportInterruptionProcessingToComplete();
// ÕâÀïͨ¹ýÏû·ÑÕß ID À´»ñÈ¡Ïû·ÑÕß¶ÔÏó
//£¨ActiveMQMessageConsumer ʵÏÖÁË ActiveMQDispatcher
½Ó¿Ú£©£¬ËùÒÔ
//MessageDispatch °üº¬ÁËÏûÏ¢Ó¦¸Ã±»·ÖÅäµ½ÄǸöÏû·ÑÕßµÄÓ³ÉäÐÅÏ¢
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching
via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
// µ÷ÓûỰActiveMQSession ×Ô¼ºµÄ dispatch ·½·¨À´´¦ÀíÕâÌõÏûÏ¢
dispatcher.dispatch(md);
} else {
LOG.debug("{} no dispatcher for {} in {}",
this, md, dispatchers);
}
return null;
} |
ÆäÖÐ ActiveMQDispatcher dispatcher =
dispatchers.get(md.getConsumerId());ÕâÐдúÂëµÄ dispatchers
ÊÇÔÚ Í¨¹ýsession.createConsumer(destination); µÄʱºòͨ¹ý ActiveMQMessageConsumer
µÄ¹¹Ôì·½·¨ÖÐÓÐÒ»ÐдúÂë £ºthis.session.addConsumer(this); ½« this´«È룬¼´
ActiveMQMessageConsumer ¶ÔÏó¡£¶øÕâ¸ö addConsumer ·½·¨£º
protected void
addConsumer (ActiveMQMessageConsumer consumer)
throws JMSException {
this.consumers.add(consumer);
if (consumer.isDurableSubscriber()) {
stats.onCreateDurableSubscriber();
}
this.connection.addDispatcher(consumer.getConsumerId(),
this);
} |
¿ÉÒÔ·¢ÏÖÕâÀïµÄ³õʼ»¯ÁË£ºthis.connection.addDispatcher(consumer.getConsumerId(),
this); ÕâÀïµÄthis ¼´ ActiveMQSession¡£ËùÒԻص½ ActiveMQConnection#onCommand·½·¨ÄÚ
processMessageDispatch Õâ¸ö·½·¨×îºóµ÷ÓÃÁË dispatcher.dispatch(md);
Õâ¸ö·½·¨µÄºËÐŦÄܾÍÊÇ´¦ÀíÏûÏ¢µÄ·Ö·¢¡££º
public void
dispatch(MessageDispatch messageDispatch) {
try {
executor.execute(messageDispatch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
connection.onClientInternalException(e);
}
} |
ÕâÀïÀëÎÒÃÇÕæÕýÒªÕҵĽøÐÐÏûÏ¢Èë¶ÓµÄ½á¹ûºÜ½üÁË£¬½øÈëexecutor.execute(messageDispatch);Õâ¸ö·½·¨£º
void execute(MessageDispatch
message) throws InterruptedException {
...........
//Èç¹û»á»°²»ÊÇÒì²½·Ö·¢²¢ÇÒûÓÐʹÓà sessionpool ·Ö·¢£¬Ôòµ÷Óà dispatch
·¢ËÍÏûÏ¢
if (!session.isSessionAsyncDispatch() &&
!dispatchedBySessionPool) {
dispatch(message);
} else {//½«ÏûÏ¢Ö±½Ó·Åµ½¶ÓÁÐÀï
messageQueue.enqueue(message);
wakeup();
}
} |
ÕâÀï×îºóÖÕÓÚ·¢ÏÖÁËÈë¶Ó£¬ÅжÏÊÇ·ñÒì²½·Ö·¢£¬²»Êǵϰ×ßdispatch(message)
·ñÔò½øÈëÒì²½·Ö·¢¡£Ä¬ÈÏÊDzÉÓÃÒì²½ÏûÏ¢·Ö·¢¡£ËùÒÔ£¬Ö±½Óµ÷Óà messageQueue.enqueue£¬°ÑÏûÏ¢·Åµ½¶ÓÁÐÖУ¬²¢ÇÒµ÷ÓÃ
wakeup ·½·¨:
public void
wakeup() {
if (!dispatchedBySessionPool) {//½øÒ»²½ÑéÖ¤
// //ÅÐ¶Ï session ÊÇ·ñΪÒì²½·Ö·¢
if (session.isSessionAsyncDispatch()) {
try {
TaskRunner taskRunner = this.taskRunner;
if (taskRunner == null) {
synchronized (this) {
if (this.taskRunner == null) {
if (!isRunning()) {
// stop has been called
return;
}
//ͨ¹ý TaskRunnerFactory ´´½¨ÁËÒ»¸öÈÎÎñÔËÐÐÀà taskRunner£¬ÕâÀï°Ñ×Ô¼º×÷Ϊһ¸ö
task ´«Èëµ½ createTaskRunner ÖУ¬
//˵Ã÷µ±Ç°µÄÀàÒ»¶¨ÊÇʵÏÖÁË Task ½Ó¿ÚµÄ. ¼òµ¥À´Ëµ£¬ ¾ÍÊÇͨ¹ýÏ̳߳ØÈ¥Ö´ÐÐÒ»¸öÈÎÎñ£¬Íê³ÉÒì²½µ÷¶È
//ÕâÀïÓÉÓÚexecutor != null ËùÒÔÕâ¸ötaskµÄÀàÐÍÊÇPooledTaskRunner
this.taskRunner = session.connection .getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: " + session.getSessionId());
}
taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {// Òì²½·Ö·¢
while (iterate()) {
}
}
}
} |
ËùÒÔ£¬¶ÔÓÚÒì²½·Ö·¢µÄ·½Ê½£¬»áµ÷Óà ActiveMQSessionExecutor ÖÐµÄ iterate·½·¨£¬ÎÒÃÇÀ´¿´¿´Õâ¸ö·½·¨µÄ´úÂë
iterate £¨£©£ºÕâ¸ö·½·¨ÀïÃæ×öÁ½¸öÊÂ
1.°ÑÏû·ÑÕß¼àÌýµÄËùÓÐÏûϢת´æµ½´ýÏû·Ñ¶ÓÁÐÖÐ
2. Èç¹û messageQueue »¹´æÔÚÒÅÁôÏûÏ¢£¬Í¬Ñù°ÑÏûÏ¢·Ö·¢(µ÷¶È)³öÈ¥
public boolean
iterate() {
// Deliver any messages queued on the consumer
to their listeners.<br> // ½«Ïû·ÑÕßÉÏÅŶӵÄÈκÎÏûÏ¢´«µÝ¸øËüÃǵÄÕìÌýÆ÷¡£
for (ActiveMQMessageConsumer consumer : this.session.consumers)
{
if (consumer.iterate()) {
return true;
}
}
// No messages left queued on the listeners..
so now dispatch messages
// queued on the session<br> // ÕìÌýÆ÷ÉÏûÓÐÁôÏÂÅŶӵȴýµÄÏûÏ¢¡£ÏÖÔÚ·ÖÅÉÏûÏ¢
MessageDispatch message = messageQueue.dequeueNoWait();
if (message == null) {
return false;
} else {// ·Ö·¢(µ÷¶È)ÏûÏ¢
dispatch(message);
return !messageQueue.isEmpty();
}
} |
dispatch(message);ÏûϢȷÈÏ·Ö·¢¡£Í¨¹ýActiveMQSessionExecutorµÄdispatch
·½·¨£¬×ªµ½ÁË ActiveMQMessageConsumer Ïû·ÑÕßÀàµÄ dispatch ·½·¨£º
public void
dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
clearMessagesInProgress();
clearDeliveredList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {// ÅжÏÏûÏ¢ÊÇ·ñÎªÖØ·¢ÏûÏ¢
if (this.info.isBrowser() || !session.connection.isDuplicate(this,
md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning())
{
//ÎÒÕâ±ßͨ¹ýconsumer.receive()´¦ÀíÏûÏ¢£¬ËùÒÔÕâÀïlistenerΪ¿Õ£¬×ßÏÂÃæ
} else {
if (!unconsumedMessages.isRunning()) {
// delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage());
}
if (md.getMessage() == null) {
// End of browse or pull request timeout.
unconsumedMessages.enqueue(md);
} else {
if (!consumeExpiredMessage(md)) {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
¡¡¡¡¡¡¡¡¡¡¡¡.........
} |
×îÖÕ»á×ßÈë unconsumedMessages.enqueue(md);Ìí¼ÓÏûÏ¢¡£ÕâÀïÐèҪעÒâµÄÊÇenqueue
·½·¨£ºÓÉÓÚÏû·ÑÕß¿ÉÄÜ´¦ÓÚ×èÈû״̬£¬ÕâÀï×öÁËÈë¶Óºó»ØÊÍ·ÅËø£¬Ò²¾ÍÊǽӴ¥×èÈû¡£
public void
enqueue(MessageDispatch message) {
synchronized (mutex) {
list.addLast(message);
mutex.notify();
}
} |
µ½ÕâÀïΪֹ£¬ÏûÏ¢ÈçºÎ½ÓÊÜÒÔ¼°ËûµÄ´¦Àí·½Ê½µÄÁ÷³Ì£¬ÎÒÃÇÒѾ¸ãÇå³þÁË¡£ÆäʵÔÚÕâ¸öÏûÏ¢Ïû·ÑµÄÁ÷³ÌÖУ¬ÒѾÔÚ½¨Á¢Á¬½Ó£¬´´½¨Ïû·ÑÕßµÄʱºò¾ÍÒѾ³õʼ»¯ºÃÁËÏûÏ¢¶ÓÁÐÁË¡£½áºÏÉÏÃæµÄ¹ý³ÌÀ´¿´¿´Õû¸öÏû·ÑÁ÷³ÌµÄÁ÷³Ìͼ

Ïû·Ñ¶ËµÄ PrefetchSize£º ÔÚÏûÏ¢·¢²¼µÄʱºòÎÒÃÇÔø¾Ñо¿¹ý producerWindowSize ¡£Ö÷ÒªÓÃÀ´Ô¼ÊøÔÚÒì²½·¢ËÍʱproducer¶ËÔÊÐí»ýѹµÄ(ÉÐδACK)µÄÏûÏ¢µÄ´óС£¬ÇÒÖ»¶ÔÒì²½·¢ËÍÓÐÒâÒå¡£¶ÔÓÚ¿Í»§¶Ë£¬Ò²ÊÇÀàËÆ´æÔÚÕâôһ¸öÊôÐÔÀ´Ô¼Êø¿Í»§¶ËµÄÏûÏ¢´¦Àí¡£activemq
µÄ consumer ¶ËÒ²Óд°¿Ú»úÖÆ£¬Í¨¹ý prefetchSize ¾Í¿ÉÒÔÉèÖô°¿Ú´óС¡£²»Í¬µÄÀàÐ͵ĶÓÁУ¬prefetchSize
µÄĬÈÏÖµÒ²ÊDz»Ò»ÑùµÄ.
1. ³Ö¾Ã»¯¶ÓÁкͷdz־û¯¶ÓÁеÄĬÈÏֵΪ 1000
2.³Ö¾Ã»¯ topic ĬÈÏֵΪ 100
3.·Ç³Ö¾Ã»¯¶ÓÁеÄĬÈÏֵΪ Short.MAX_VALUE-1
²âÊÔ·½·¨ÊÇÔÚMQÉÏÉú²ú1000ÌõÏûÏ¢£¬ÏȺóÆô¶¯comsumer1£¬comsumer2 Á½¸öÏû·ÑÕß²¢ÇÒÑ»·µ÷ÓÃ1000´ÎÏû·Ñ£¬ÎÒÂè»á·¢ÏÖ
comsumer2 Äò»µ½ÏûÏ¢£¬Õâ¸öʱºòÎÒÃÇ¿ÉÒÔͨ¹ýdebug½øÈëcomsumer1 µÄActiveMQConnect»á·¢ÏÖÀïÃæÓиöÊôÐÔµÄsize=1000.Æäʵ¾ÍÊÇÕâ¸öprefetchSize£¬·Òë¹ýÀ´ÊÇԤȡ´óС£¬Ïû·Ñ¶Ë»á¸ù¾ÝprefetchSize
µÄ´óСÅúÁ¿»ñÈ¡Êý¾Ý¡£Òâ˼ÊÇÔÚ´´½¨Á¬½ÓµÄʱºò»áÈ¡»ñÈ¡1000ÌõÏûÏ¢Ô¤¼ÓÔØµ½»º´æÖеȴý´¦Àí£¬ÕâÑù×Óµ¼ÖÂcomsumer2È¥»ñÈ¡ÏûÏ¢µÄʱºò
brokerÉÏÒѾ¿ÕÁË¡£
prefetchSize µÄÉèÖ÷½·¨£º
ÔÚ createQueue ÖÐÌí¼Ó consumer.prefetchSize£¬¾Í¿ÉÒÔ¿´µ½Ð§¹û
Destination destination=session.createQueue ("myQueue?consumer.prefetchSize=10"); |
¼ÈÈ»ÓÐÅúÁ¿¼ÓÔØ£¬ÄÇôһ¶¨ÓÐÅúÁ¿È·ÈÏ£¬ÕâÑù²ÅËãÊdz¹µ×µÄÓÅ»¯£¬Õâ¾ÍÉæ¼°µ½ optimizeAcknowledge optimizeAcknowledge
ActiveMQ ÌṩÁË optimizeAcknowledge À´ÓÅ»¯È·ÈÏ£¬Ëü±íʾÊÇ·ñ¿ªÆô¡°ÓÅ»¯ACK¡±£¬Ö»ÓÐÔÚΪ
true µÄÇé¿öÏ£¬prefetchSize ÒÔ¼°optimizeAcknowledgeTimeout
²ÎÊý²Å»áÓÐÒâÒåÓÅ»¯È·ÈÏÒ»·½Ãæ¿ÉÒÔ¼õÇá client ¸ºµ££¨²»ÐèҪƵ·±µÄÈ·ÈÏÏûÏ¢£©¡¢¼õÉÙͨÐÅ¿ªÏú£¬ÁíÒ»·½ÃæÓÉÓÚÑÓ³ÙÁËÈ·ÈÏ£¨Ä¬ÈÏ
ack ÁË 0.65*prefetchSize ¸öÏûÏ¢²ÅÈ·ÈÏ£©£¬Õâ¸öÔÚÔ´ÂëÖÐÓÐÌåÏÖ¡£ÔÚActiveMQMessageConsumer#receive·½·¨ÄڵĴ¦ÀíÏûÏ¢ºóµÄ
afterMessageIsConsumed ·½·¨ÄÚÓÐÒ»¸öÅжϣº
if (ackCounter
+ deliveredCounter >= (info.getPrefetchSize()
* .65) || (optimizeAcknowledgeTimeOut > 0 &&
System.currentTimeMillis() >= (optimizeAckTimestamp
+ optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);//Âú×ãÌõ¼þÔò·¢ËÍÅúÁ¿Ó¦´ðACK
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg
just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter
> 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
} |
broker Ôٴη¢ËÍÏûϢʱÓÖ¿ÉÒÔÅúÁ¿·¢ËÍÈç¹ûÖ»ÊÇ¿ªÆôÁË prefetchSize£¬Ã¿ÌõÏûÏ¢¶¼È¥È·Èϵϰ£¬broker
ÔÚÊÕµ½È·ÈϺóÒ²Ö»ÊÇ·¢ËÍÒ»ÌõÏûÏ¢£¬²¢²»ÊÇÅúÁ¿·¢²¼£¬µ±È»Ò²¿ÉÒÔͨ¹ýÉèÖà DUPS_OK_ACKÀ´ÊÖ¶¯ÑÓ³ÙÈ·ÈÏ£¬
ÎÒÃÇÐèÒªÔÚ brokerUrl Ö¸¶¨ optimizeACK Ñ¡Ïî
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory
("tcp://192.168.11.153:61616? jms.optimizeAcknowledge =true&jms.optimizeAcknowledgeTimeOut=10000"); |
×¢Ò⣬Èç¹û optimizeAcknowledge Ϊ true£¬ÄÇô
prefetchSize ±ØÐë´óÓÚ 0. µ± prefetchSize=0 µÄʱºò£¬±íʾ consumer
ͨ¹ý PULL ·½Ê½´Ó broker »ñÈ¡ÏûÏ¢.
optimizeAcknowledge ºÍ prefetchSize µÄ×÷Óã¬Á½ÕßÐͬ¹¤×÷£¬Í¨¹ýÅúÁ¿»ñÈ¡ÏûÏ¢¡¢²¢ÑÓ³ÙÅúÁ¿È·ÈÏ£¬À´´ïµ½Ò»¸ö¸ßЧµÄÏûÏ¢Ïû·ÑÄ£ÐÍ¡£Ëü±È½ö¼õÉÙÁ˿ͻ§¶ËÔÚ»ñÈ¡ÏûϢʱµÄ×èÈû´ÎÊý£¬»¹ÄܼõÉÙÿ´Î»ñÈ¡ÏûϢʱµÄÍøÂçͨÐÅ¿ªÏú
ÐèҪעÒâµÄÊÇ£¬Èç¹ûÏû·Ñ¶ËµÄÏû·ÑËٶȱȽϸߣ¬Í¨¹ýÕâÁ½Õß×éºÏÊÇÄÜ´ó´óÌáÉý
consumer µÄÐÔÄÜ¡£Èç¹û consumer µÄÏû·ÑÐÔÄܱ¾Éí¾Í±È½ÏÂý£¬ÉèÖñȽϴóµÄ prefetchSize
·´¶ø²»ÄÜÓÐЧµÄ´ïµ½ÌáÉýÏû·ÑÐÔÄܵÄÄ¿µÄ¡£ÒòΪ¹ý´óµÄprefetchSize ²»ÀûÓÚ consumer
¶ËÏûÏ¢µÄ¸ºÔؾùºâ¡£ÒòΪͨ³£Çé¿öÏ£¬ÎÒÃǶ¼»á²¿Êð¶à¸ö consumer ½ÚµãÀ´ÌáÉýÏû·Ñ¶ËµÄÏû·ÑÐÔÄÜ¡£Õâ¸öÓÅ»¯·½°¸»¹»á´æÔÚÁíÍâÒ»¸öDZÔÚ·çÏÕ£¬µ±ÏûÏ¢±»Ïû·ÑÖ®ºó»¹Ã»ÓÐÀ´µÃ¼°È·ÈÏʱ£¬client
¶Ë·¢Éú¹ÊÕÏ£¬ÄÇôÕâЩÏûÏ¢¾ÍÓпÉÄÜ»á±»ÖØÐ·¢Ë͸øÆäËûconsumer£¬ÄÇôÕâÖÖ·çÏÕ¾ÍÐèÒª client
¶ËÄܹ»ÈÝÈÌ¡°Öظ´¡±ÏûÏ¢¡£
ÏûÏ¢µÄÈ·ÈϹý³Ì£º ÏûϢȷÈÏÓÐËÄÖÖ ACK_MODE£¬·Ö±ðÊÇ£º
1. AUTO_ACKNOWLEDGE = 1 ×Ô¶¯È·ÈÏ
2.CLIENT_ACKNOWLEDGE = 2 ¿Í»§¶ËÊÖ¶¯È·ÈÏ
3.DUPS_OK_ACKNOWLEDGE = 3 ×Ô¶¯ÅúÁ¿È·ÈÏ
4.SESSION_TRANSACTED = 0 ÊÂÎñÌá½»²¢È·ÈÏ
ACK_MODE µÄÑ¡ÔñÓ°Ïì×ÅÏûÏ¢Ïû·ÑÁ÷³ÌµÄ×ßÏò¡£ËäÈ» Client ¶ËÖ¸¶¨ÁË ACK ģʽ,µ«ÊÇÔÚ
Client Óë broker ÔÚ½»»» ACK Ö¸ÁîµÄʱºò,»¹ÐèÒª¸æÖª ACK_TYPE,ACK_TYPE
±íʾ´ËÈ·ÈÏÖ¸ÁîµÄÀàÐÍ£¬²»Í¬µÄACK_TYPE ½«´«µÝ×ÅÏûÏ¢µÄ״̬£¬broker ¿ÉÒÔ¸ù¾Ý²»Í¬µÄ ACK_TYPE
¶ÔÏûÏ¢½øÐв»Í¬µÄ²Ù×÷¡£
ACK_TYPEÓ¦´ðÀàÐÍ£º DELIVERED_ACK_TYPE = 0 ÏûÏ¢"ÒѽÓÊÕ"£¬µ«ÉÐδ´¦Àí½áÊø
STANDARD_ACK_TYPE = 2 "±ê×¼"ÀàÐÍ,ͨ³£±íʾΪÏûÏ¢"´¦Àí³É¹¦"£¬broker
¶Ë¿ÉÒÔɾ³ýÏûÏ¢ÁË
POSION_ACK_TYPE = 1 ÏûÏ¢"´íÎó",ͨ³£±íʾ"Åׯú"´ËÏûÏ¢£¬±ÈÈçÏûÏ¢ÖØ·¢¶à´Îºó£¬¶¼ÎÞ·¨ÕýÈ·´¦Àíʱ£¬ÏûÏ¢½«»á±»É¾³ý»òÕß
DLQ(ËÀÐŶÓÁÐ)£¬ÔÚÏûÏ¢´¦ÀíµÄʱºò£¬dispatch·½·¨ÄÚ»áÅжϸÃÏûÏ¢ÊÇ·ñÎªÖØ·¢ÏûÏ¢
if (this.info.isBrowser()
|| !session.connection.isDuplicate(this, md.getMessage()))
{
if (listener != null && unconsumedMessages.isRunning())
{
// Õâ¶ÎΪ·ÇÖØ·¢ÏûÏ¢£¬×ßelse
} else {
// deal with duplicate delivery
ConsumerId consumerWithPendingTransaction;
if (redeliveryExpectedInCurrentTransaction(md,
true)) {
LOG.debug("{} tracking transacted redelivery
{}", getConsumerId(), md.getMessage());
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE,
1));
}
} else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md))
!= null) {
LOG.warn("{} delivering duplicate {}, pending
transaction completion on {} will rollback",
getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
session.getConnection().rollbackDuplicate(this,
md.getMessage());
dispatch(md);
} else {// ×ßPOSION_ACK_TYPE Ìí¼ÓActive_DLQ ËÀÐŶÓÁÐ
LOG.warn("{} suppressing duplicate delivery
on connection, poison acking: {}", getConsumerId(),
md);
posionAck(md, "Suppressing duplicate delivery
on connection, consumer " + getConsumerId());
}
} |
REDELIVERED_ACK_TYPE = 3 ÏûÏ¢Ðè"ÖØ·¢"£¬±ÈÈç consumer
´¦ÀíÏûϢʱÅ׳öÁËÒì³££¬broker ÉÔºó»áÖØÐ·¢ËÍ´ËÏûÏ¢
INDIVIDUAL_ACK_TYPE = 4 ±íʾֻȷÈÏ"µ¥ÌõÏûÏ¢",ÎÞÂÛÔÚÈκÎ
ACK_MODE ÏÂ
UNMATCHED_ACK_TYPE = 5 ÔÚ Topic ÖУ¬Èç¹ûÒ»ÌõÏûÏ¢ÔÚת·¢¸ø¡°¶©ÔÄÕß¡±Ê±£¬·¢ÏÖ´ËÏûÏ¢²»·ûºÏ
Selector ¹ýÂËÌõ¼þ£¬ÄÇô´ËÏûÏ¢½« ²»»áת·¢¸ø¶©ÔÄÕߣ¬ÏûÏ¢½«»á±»´æ´¢ÒýÇæÉ¾³ý(Ï൱ÓÚÔÚ Broker
ÉÏÈ· ÈÏÁËÏûÏ¢)¡£
Client ¶ËÔÚ²»Í¬µÄ ACK ģʽʱ,½«Òâζ×ÅÔÚ²»Í¬µÄʱ»ú·¢ËÍ ACK Ö¸Áî,ÿ¸ö ACK
Command Öлá°üº¬ ACK_TYPE,ÄÇô broker ¶Ë¾Í¿ÉÒÔ¸ù¾Ý ACK_TYPE À´¾ö¶¨´ËÏûÏ¢µÄºóÐø²Ù×÷¡£ÔÚ
afterMessageIsConsumed ÏûÏ¢½ÓÊÕ´¦Àíºó»á¸ù¾ÝÌõ¼þÀ´ÉèÖà ACK_TYPE.
ÏûÏ¢µÄÖØ·¢»úÖÆÔÀí: ÔÚÕý³£Çé¿öÏ£¬Óм¸ÖÐÇé¿ö»áµ¼ÖÂÏûÏ¢ÖØÐ·¢ËÍ
ÔÚÊÂÎñÐԻỰÖУ¬Ã»Óе÷Óà session.commit È·ÈÏÏûϢ崻ú»òÕßµ÷ÓÃsession.rollback
·½·¨»Ø¹öÏûÏ¢
ÔÚ·ÇÊÂÎñÐԻỰÖУ¬ACK ģʽΪ CLIENT_ACKNOWLEDGE
(¿Í»§¶ËÊÖ¶¯Ó¦´ð)µÄÇé¿öÏ£¬Ã»Óе÷Óà session.commit»òÕßµ÷ÓÃÁË recover ·½·¨£»
Ò»¸öÏûÏ¢±» redelivedred ³¬¹ýĬÈϵÄ×î´óÖØ·¢´ÎÊý£¨Ä¬ÈÏ 6 ´Î£©Ê±£¬Ïû·Ñ¶Ë»á¸ø broker
·¢ËÍÒ»¸ö¡±poison ack¡±±íʾÕâ¸öÏûÏ¢Óж¾£¬¸æËß broker ²»ÒªÔÙ·¢ÁË¡£Õâ¸öʱºò broker
»á°ÑÕâ¸öÏûÏ¢·Åµ½ DLQ£¨ËÀÐŶÓÁУ©¡£
ËÀÐŶÓÁУº ActiveMQ ÖÐĬÈϵÄËÀÐŶÓÁÐÊÇ ActiveMQ.DLQ£¬Èç¹ûûÓÐÌØ±ðµÄÅäÖã¬Óж¾µÄÏûÏ¢¶¼»á±»·¢Ë͵½Õâ¸ö¶ÓÁС£Ä¬ÈÏÇé¿öÏ£¬Èç¹û³Ö¾ÃÏûÏ¢¹ýÆÚÒÔºó£¬Ò²»á±»Ë͵½
DLQ ÖС£
Ö»ÒªÔÚ´¦ÀíÏûÏ¢µÄʱºòÅ׳öÒ»¸öÒì³£¾Í¿ÉÒÔÑÝʾ£¬»á¿´µ½¿ØÖÆÌ¨¶ÔÓÚʧ°ÜÏûÏ¢»áÖØ·¢6´Î£¬µÇ½ActiveMQ¿ØÖÆÌ¨»á¿´µ½Ò»¸ö
ActiveMQ.DLQ¡£ÔÚ´´½¨¶ÓÁеÄʱºò¿ÉÒÔÖ±½ÓÖ¸¶¨´ÓActiveMQ.DLQÈ¥Ïû·ÑÏûÏ¢¡£
ËÀÐŶÓÁÐÅäÖòßÂÔ£º
ȱʡËùÓжÓÁеÄËÀÐÅÏûÏ¢¶¼±»·¢Ë͵½Í¬Ò»¸öȱʡËÀÐŶÓÁУ¬²»±ãÓÚ¹ÜÀí£¬¿ÉÒÔͨ¹ý
individualDeadLetterStrategy »ò sharedDeadLetterStrategy
²ßÂÔÀ´½øÐÐÐ޸ġ£ÔÚactivemq.xmlÉÏ
<destinationPolicy>
<policyMap> <policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy
is used to prevent
slow topic consumers to block producers and affect
other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
--> <pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy> </policyEntry>
// ¡°>¡±±íʾ¶ÔËùÓжÓÁÐÉúЧ£¬Èç¹ûÐèÒªÉèÖÃÖ¸¶¨¶ÓÁУ¬ÔòÖ±½Óд¶ÓÁÐÃû³Æ <policyEntry
queue=">"> <deadLetterStrategy>
//queuePrefix:ÉèÖÃËÀÐŶÓÁÐǰ׺
//useQueueForQueueMessage ÉèÖöÓÁб£´æµ½ËÀÐÅ¡£ <individualDeadLetterStrategy
queuePrefix ="DLQ."useQueueForQueueMessages ="true"/>
</deadLetterStrategy> </policyEntry>
</policyEntries> </policyMap>
</destinationPolicy> |
×Ô¶¯¶ªÆú¹ýÆÚÏûÏ¢
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false"
/>
</deadLetterStrategy> |
ActiveMQ ¾²Ì¬ÍøÂçÅäÖãºbrokerÍøÂçÁ¬½Ó(brokerµÄ¸ßÐÔÄÜ·½°¸)
ÐÞ¸Ä activeMQ ·þÎñÆ÷µÄ activeMQ.xml, Ôö¼ÓÈçÏÂÅäÖã¬Õâ¸öÅäÖÃÖ»ÄÜʵÏÖµ¥ÏòÁ¬½Ó£¬ÊµÏÖË«ÏòÁ¬½ÓÐèÒª¸÷¸ö½Úµã¶¼ÅäÖÃÈçÏÂÅäÖá£
<networkConnectors>
<networkConnector uri="static:// (tcp://192.168.254.135:61616, tcp://192.168.254.136:61616)"/>
</networkConnectors> |
Á½¸ö Brokers ͨ¹ýÒ»¸ö static µÄÐÒéÀ´½øÐÐÍøÂçÁ¬½Ó¡£Ò»¸ö Consumer Á¬½Óµ½BrokerB
µÄÒ»¸öµØÖ·ÉÏ£¬µ± Producer ÔÚ BrokerA ÉÏÒÔÏàͬµÄµØÖ··¢ËÍÏûÏ¢£¬´ËʱÏûÏ¢»á±»×ªÒƵ½
BrokerB ÉÏ£¬Ò²¾ÍÊÇ˵ BrokerA »áת·¢ÏûÏ¢µ½BrokerB ÉÏ¡£
ÔÚactiveMQÖУ¬½øÐÐÁ˾²Ì¬ÍøÂçÇŽӵÄÁ½Ì¨½Úµã¶øÑÔ£¬µ± Producer
ÔÚ BrokerA ÉÏÒÔÏàͬµÄµØÖ··¢ËÍ10ÌõÏûÏ¢¡£Ò»¸ö Consumer Á¬½Óµ½BrokerBÈ¥Ïû·ÑÏûÏ¢£¬µ±Ïû·ÑÁËÒ»°ëµÄʱºò³öÏÖÒì³£ÁË£¬ÄÇôʣÏÂÀ´Î´´¦ÀíµÄÏûÏ¢»á±»´æ·Åµ½
BrokerB µÄ´ý´¦ÀíÏûÏ¢¶ÓÁÐÖУ¬´ËʱҪͨ¹ýBrokerAÔÙÈ¥Ïû·ÑÊÇÏû·Ñ²»µ½µÄ£¬ÍòÒ»´Ë¿ÌBrokerB
¹ÒÁË£¬ÄÇôÄÄЩûÓÐÏû·ÑµÄÏûÏ¢½«»á¶ªÊ§¡£mq¸øÎÒÃÇÌṩÁËÒ»¸öÓÐЧµÄÏûÏ¢»ØÁ÷»úÖÆ¡£
<policyEntry
queue=">" enableAudit="false">
<networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory
replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry> |
ActiveMQ µÄÓÅȱµã ActiveMQ ²ÉÓÃÏûÏ¢ÍÆËÍ·½Ê½£¬ËùÒÔ×îÊʺϵij¡¾°ÊÇĬÈÏÏûÏ¢¶¼¿ÉÔÚ¶Ìʱ¼äÄÚ±»Ïû·Ñ¡£Êý¾ÝÁ¿Ô½´ó£¬²éÕÒºÍÏû·ÑÏûÏ¢¾ÍÔ½Âý£¬ÏûÏ¢»ýѹ³Ì¶ÈÓëÏûÏ¢Ëٶȳɷ´±È¡£
ȱµã
1.ÍÌÍÂÁ¿µÍ¡£ÓÉÓÚ ActiveMQ ÐèÒª½¨Á¢Ë÷Òý£¬µ¼ÖÂÍÌÍÂÁ¿Ï½µ¡£ÕâÊÇÎÞ·¨¿Ë·þµÄȱµã£¬Ö»ÒªÊ¹ÓÃÍêÈ«·ûºÏ
JMS ¹æ·¶µÄÏûÏ¢Öмä¼þ£¬¾ÍÒª½ÓÊÜÕâ¸ö¼¶±ðµÄTPS¡£
2.ÎÞ·ÖÆ¬¹¦ÄÜ¡£ÕâÊÇÒ»¸ö¹¦ÄÜȱʧ£¬JMS ²¢Ã»Óй涨ÏûÏ¢Öмä¼þµÄ¼¯Èº¡¢·ÖƬ»úÖÆ¡£¶øÓÉÓÚ ActiveMQ
ÊÇΰÆóÒµ¼¶¿ª·¢Éè¼ÆµÄÏûÏ¢Öмä¼þ£¬³õÖÔ²¢²»ÊÇΪÁË´¦Àíº£Á¿ÏûÏ¢ºÍ¸ß²¢·¢ÇëÇó¡£Èç¹ûһ̨·þÎñÆ÷²»ÄܳÐÊܸü¶àÏûÏ¢£¬ÔòÐèÒªºáÏò²ð·Ö¡£ActiveMQ
¹Ù·½²»Ìṩ·ÖƬ»úÖÆ£¬ÐèÒª×Ô¼ºÊµÏÖ¡£
ÊÊÓó¡¾°
1. ¶Ô TPS ÒªÇó±È½ÏµÍµÄϵͳ£¬¿ÉÒÔʹÓà ActiveMQ À´ÊµÏÖ£¬Ò»·½Ãæ±È½Ï¼òµ¥£¬Äܹ»¿ìËÙÉÏÊÖ¿ª·¢£¬ÁíÒ»·½Ãæ¿É¿ØÐÔÒ²±È½ÏºÃ£¬»¹ÓÐ±È½ÏºÃµÄ¼à¿Ø»úÖÆºÍ½çÃæ
²»ÊÊÓõij¡¾°
1.ÏûÏ¢Á¿¾Þ´óµÄ³¡¾°¡£ActiveMQ ²»Ö§³ÖÏûÏ¢×Ô¶¯·ÖƬ»úÖÆ£¬Èç¹ûÏûÏ¢Á¿¾Þ´ó£¬µ¼ÖÂһ̨·þÎñÆ÷²»ÄÜ´¦ÀíÈ«²¿ÏûÏ¢£¬¾ÍÐèÒª×Ô¼º¿ª·¢ÏûÏ¢·ÖƬ¹¦ÄÜ¡£
|