±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄÖØµã·ÖÎöʹÓÃJMSÏòactiveMQÖмä¼þ·¢ËÍÏûÏ¢µÄ¹ý³Ì·ÖÎö£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
|
|
ͬ²½·¢ËÍÓëÒì²½·¢ËÍ
activeMQ·¢ËÍÏûÏ¢¿Í»§¶Ë·¢ËÍÏûÏ¢·ÖΪͬ²½·¢ËÍÓëÒì²½·¢ËÍ
ͬ²½·¢ËÍ£¬·¢ËÍÕß·¢ËÍÒ»ÌõÏûÏ¢»á×èÈûÖ±µ½broker·´À¡Ò»¸öÈ·ÈÏÏûÏ¢¸ø·¢ËÍÕߣ¬±íʾÏûÏ¢ÒѾ±»broker´¦Àí¡£Õâ¸ö»ú
ÖÆÌṩÁËÏûÏ¢µÄ°²È«ÐÔ±£ÕÏ£¬ÓÉÓÚÊÇ×èÈûµÄ²Ù×÷£¬»áÓ°Ïìµ½¿Í»§¶ËÏûÏ¢·¢Ë͵ÄÐÔÄÜ¡£
Òì²½·¢ËÍ£¬·¢ËÍÕß²»ÐèÒªµÈ´ýbrokerÌṩ·´À¡¸ø·¢ËÍÕߣ¬ÐÔÄÜÏà¶Ô½Ï¸ß¡£µ«ÊÇ¿ÉÄÜ»á³öÏÖÏûÏ¢¶ªÊ§µÄÇé¿ö¡£Ëù
ÒÔʹÓÃÒì²½·¢Ë͵ÄǰÌáÊÇÔÚijЩÇé¿öÏÂÔÊÐí³öÏÖÊý¾Ý¶ªÊ§µÄÇé¿ö¡£
ÄÇôÔÚʲôÇé¿öÏÂÑ¡Ôñͬ²½·¢Éú£¬Ê²Ã´Çé¿öÏÂÊÇÑ¡ÔñÒì²½·¢ËÍÄØ£¿
³ýÈ¥ÎÒÃÇÈËΪÉèÖõķ½Ê½£¬ÆäĬÈϵÄÑ¡Ôñ²ßÂÔ×ܽáÈçÏ£º
1.·Ç³Ö¾Ã»¯µÄÏûÏ¢¶¼ÊÇÒì²½·¢Ë͵ġ£
2.³Ö¾Ã»¯ÏûÏ¢ÔÚ·ÇÊÂÎñģʽÏÂÊÇͬ²½·¢Ë͵ġ£
3.ÔÚ¿ªÆôÊÂÎñµÄÇé¿öÏ£¬ÏûÏ¢¶¼ÊÇÒì²½·¢ËÍ¡£
ÓÉÓÚÒì²½·¢Ë͵ÄЧÂÊ»á±Èͬ²½·¢ËÍÐÔÄܸü¸ß£¬ÔÚ·¢Ëͳ־û¯ÏûÏ¢µÄʱºò£¬¾¡Á¿È¥¿ªÆôÊÂÎñ»á»°¡£»òÕßÉèÖÃʹÓÃÒì²½·¢ËÍ¡£
ÒÔÉÏÈýÖÖĬÈϲßÂÔ£¬ÔÚÔ´ÂëÖж¼»áÌåÏÖ³öÀ´£¬ÉÔºóÔÚÔ´ÂëÖзÖÎö¡£
³ýÁËĬÈÏÉèÖã¬ÎÒÃÇ¿ÉÒÔÈÏΪÉèÖ÷¢ËÍģʽ¡£×ܽáÈçÏÂÈýÖÖ·½Ê½¡£
1.ConnectionFactory
connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.15:61616?
jms.useAsyncSend=true"); |
2.((ActiveMQConnectionFactory)
connectionFactory).setUseAsyncSend(true); |
3.((ActiveMQConnection)connection).setUseAsyncSend(true);
|
Òì²½·¢ËÍ·ÖÎö
Ê×ÏÈ·ÖÎöÒì²½·¢Ë͹ý³Ì£¬ÆäÊý¾ÝÁ÷תÈçÏÂͼËùʾ¡£

producerWindow
producerWindowÖ÷ÒªÊÇÕë¶ÔÒì²½·¢ËÍÏûϢʱ£¬¿ØÖÆÔÊÐíÄܹ»»ýѹµÄÏûÏ¢ÀۼƴóС£¬ÕâЩÏûϢûÓеõ½brokerµÄack£¬Ôڵõ½ackÖ®ºó»á¼õÈ¥ÏàÓ¦µÄÏûÏ¢size£¬ÊÍ·ÅproducerWindow¡£
Õâ¸öproducerWindowµÄ´óСÉèÖÃÓÐ2ÖÖ·½Ê½
1.ÔÚbrokerUrlÖÐÉèÖÃ: ¡°tcp : //localhost:61616?jms
. producerWindowSize = 1048576¡±,ÕâÖÖÉèÖý«»á¶ÔËùÓÐµÄ producerÉúЧ¡£
2.ÔÚdestinationUriÖÐÉèÖÃ: ¡° test-queue ?producer.windowSize
= 1048576 ¡±,´Ë²ÎÊýÖ»»á¶ÔʹÓÃ´Ë Destination ʵÀý µÄproducerʧЧ£¬½«»á¸²¸ÇbrokerUrlÖеÄproducerWindowSizeÖµ¡£
ͨ¹ýÉèÖÃproducerWindow´óС¿ÉÒÔ¿ØÖÆÏûÏ¢·¢Ë͵ÄÁ÷Á¿¿ØÖÆ¡£
ÏÂÃæ¿ªÊ¼¸ú×ÙÔ´Âë·ÖÎöͼÖеÄÁ÷³Ì¡£
ActiveMQMessageProducer.send
·¢ËÍ·½·¨¸ú×Ù£¬Ê×ÏȵĹؼü·½·¨¾ÍÊÇÕâ¸ö£¬Ô´ÂëÈçÏ¡£
public void send(Destination
destination, Message message, int deliveryMode,
int priority, long timeToLive, AsyncCallback onComplete)
throws JMSException {
checkClosed();
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A
destination must be specified.");
}
throw new InvalidDestinationException("Don't
understand null destinations");
}
ActiveMQDestination dest;
if (destination.equals(info.getDestination()))
{
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This
producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
Message transformedMessage = transformer.producerTransform(session,
this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (producerWindow != null) {
try {
//´°¿Ú´óС£¬Ö´ÐÐÊÇ·ñ×èÈû
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due
to thread interrupt.");
}
}
this.session.send(this, dest, message, deliveryMode,
priority, timeToLive, producerWindow, sendTimeout,
onComplete);
stats.onMessage();
}
|
ÉÏÃæµÄ´úÂë¸ù¾ÝÒì²½µÄÁ÷³Ìͼ·ÖÎö£¬¿´µ½ÊÇÏÈÐèÒª¸ù¾ÝproducerWindow£¬À´ÅжÏÊÇ·ñ×èÈûµÄ£¬Èç¹û
producerWindow ²»¹»£¬ÄÇô producerWindow.waitForSpace();
¾Í»á×èÈûµÈ´ý¡£
protected void
send(ActiveMQMessageProducer producer, ActiveMQDestination
destination, Message message, int deliveryMode,
int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback
onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination))
{
throw new InvalidDestinationException("Cannot
publish to a deleted Destination: " + destination);
}
//»ñÈ¡»á»°µÄ¼àÊÓÆ÷
synchronized (sendMutex) {
// tell the Broker we are about to start a new
transaction
//¿ªÆôÊÂÎñ
doStartTransaction();
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//......×é×°ÏûÏ¢ÄÚÈÝ
//¡£¡£¡£¡£¡£¡£
//ÅжÏÊÇÒ»²¿»¹ÊÇͬ²½·¢ËÍ
if (onComplete==null && sendTimeout <=
0 && !msg.isResponseRequired() &&
!connection.isAlwaysSyncSend() && (!msg.isPersistent()
|| connection.isUseAsyncSend() || txid != null))
{
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till
we hit the
// wire, this might not
// provide and accurate size. We may change over
to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important
once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null)
{
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
|
ÉÏÃæÕâ¶Î´úÂ룬¸ù¾ÝÒì²½Á÷³ÌͼÀ´·ÖÎö£¬Ö÷Òª¿´µÄ¾ÍÊÇÅжÏÊÇÒì²½»¹ÊÇͬ²½·¢Ë͵ĴúÂ룬Òì²½·¢ËÍÐèÒªÂú×㣺
onComplete==null
&& sendTimeout <= 0 && !msg.isResponseRequired()
&& !connection.isAlwaysSyncSend() &&
(!msg.isPersistent() || connection.isUseAsyncSend()
|| txid != null
|
onComplete==null:·¢Ëͻص÷Ϊ¿Õ¡£Ä¬ÈÏÂú×ã
sendTimeout <= 0£º·¢Ëͳ¬Ê±Ê±¼äСÓÚµÈÓÚ0¡£Ä¬ÈÏÂú×ã
!msg.isResponseRequired()£ºÏûÏ¢²»ÐèÒªÏìÓ¦¡£Ä¬ÈÏÂú×ã
!connection.isAlwaysSyncSend()£ºÁ¬½Ó²»ÄÜÉèÖÃΪͬ²½·¢ËÍ¡£Ä¬ÈÏÂú×ã
(!msg.isPersistent() || connection.isUseAsyncSend()
|| txid != null)£ºÊǷdz־û¯ÏûÏ¢»òÕßÉèÖÃΪʹÓÃÒì²½·¢ËÍ»òÕß¿ªÆôÁËÊÂÎñ£¬´ÓÕâÀï¿ÉÒÔ¿´³öĬÈϲßÂÔÖеģº
1.·Ç³Ö¾Ã»¯µÄÏûÏ¢¶¼ÊÇÒì²½·¢Ë͵ġ£!msg.isPersistent()
3.ÔÚ¿ªÆôÊÂÎñµÄÇé¿öÏ£¬ÏûÏ¢¶¼ÊÇÒì²½·¢ËÍ¡£txid != null
ActiveMQConnection.asyncSendPacket->doAsyncSendPacket
¸ú×Ù·¢ËÍ£¬µ½ÈçÏ´úÂë¡£
private void
doAsyncSendPacket(Command command) throws JMSException
{
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
|
ÉÏÃæ´úÂëµÄtransport¶ÔÏóÔÚ¶¨ÒåÀïÃæÃ»ÓнøÐй¹Ô죬ÄÇô²Â²âÓ¦¸ÃÊÇÔÚ´´½¨ActiveMQConnectionʱ£¬¹¹Ôì·½·¨ÀïÃæ×¢ÈëµÄ£¬¿´Ò»ÏÂ×¢ÈëµÄʵÀýÊÇʲô¡£
ActiveMQConnectionFactory.createTransport
´ÓconnectionFactory.createConnection()·½·¨Ò»Â·¸ú×Ù£¬×îºóµ½ÁËÈçÏ´úÂë
protected Transport
createTransport() throws JMSException {
try {
URI connectBrokerUL = brokerURL;
//»ñÈ¡urlÖж¨ÒåµÄÁ¬½Óģʽ
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme
specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto",
"tcp"));
} else if (scheme.equals("auto+ssl"))
{
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl",
"ssl"));
} else if (scheme.equals("auto+nio"))
{
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio",
"nio"));
} else if (scheme.equals("auto+nio+ssl"))
{
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl",
"nio+ssl"));
}
return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not
create Transport. Reason: " + e, e);
}
}
|
À´µ½ÉÏÃæÕâ¶Î´úÂ룬ÊǸù¾ÝÎÒÃÇbrokerURLÖеÄschemeÀ´¹¹½¨Ò»¸öuri¶ÔÏó£¬È»ºóÓÃÕâ¸öuri¶ÔÏóµ÷ÓÃTransportFactory
.connect(connectBrokerUL) ;»ñÈ¡¶ÔÓ¦µÄtransportʵÀý¡£Õ¦Ò»¿´Ö®Ï£¬ÓеãÏñdubboÔ´ÂëÖÐÀûÓÃurl²ÎÊýÇý¶¯£¬ÊÊÅäÆ÷¸ù¾Ýurl²ÎÊýµÄ²»Í¬£¬¼ÓÔØ²»Í¬µÄʵÏÖÀàʵÀý¡£½Óןú×ÙTransportFactory
.connect(connectBrokerUL) ;¿´ÊDz»ÊÇÕâÑù¡£
public static
Transport connect(URI location) throws Exception
{
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location);
}
public static TransportFactory findTransportFactory(URI
location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme
specified:
[" + location + "]");
}
TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
// Try to load if from a META-INF property.
try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.
newInstance
(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport
scheme NOT recognized: [" + scheme + "]",
e);
}
}
return tf;
}
|
¿ÉÒÔ¿´³öÊÇÔÚ(TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);´´½¨ÊµÀýµÄ£¬²é¿´TRANSPORT_FACTORY_FINDERµÄ¶¨ÒåÈçÏ¡£
private static
final FactoryFinder TRANSPORT_FACTORY_FINDER =
new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
|
ÔÚnewInstance·½·¨ÖУ¬´úÂëÈçÏÂ
public Object
newInstance(String key) throws IllegalAccessException,
InstantiationException, IOException, ClassNotFoundException
{
return objectFactory.create(path+key);
}
|
ÉÏÃæµÄpath¾ÍÊÇ¡±¡° META-INF/services/org/apache/activemq/transport/¡±¡°
, keyÔÚÎÒÃǵÄbrokerUrl¾ÍÊÇtcp£¬²Â²â
¾ÍÊÇÕÒÒ»¸öÔڸ÷¾¶ÏµÄÎļþÖж¨ÒåµÄÀàȫ·¾¶×÷ΪʵÀý»¯µÄÀàÐÍÐÅÏ¢£¬²é¿´Â·¾¶ÏµÄÎļþÐÅÏ¢Èç

»¹ÕæÓÐÒ»¸ötcpÎļþ£¬´ò¿ª¿´
class=org.apache.activemq.transport.tcp.
TcpTransportFactory |
ºóÐøÄܹÀ¼Æµ½¾ÍÊÇʵÀý»¯Ò»¸öTcpTransportFactory£¬È»ºó»Øµ½ÏÂÃæµÄ´úÂë¡£
public static
Transport connect(URI location) throws Exception
{
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location);
}
|
TcpTransportFactory.doConnect
public Transport
doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap<String,
String>(URISupport.parseParameters(location));
if( !options.containsKey("wireFormat.host")
) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = createWireFormat(options);
//´´½¨Transport
Transport transport = createTransport(location,
wf);
//°ü×°Transport
Transport rc = configure(transport, wf, options);
//remove auto
IntrospectionSupport.extractProperties(options,
"auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid
connect parameters: " + options);
}
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
|
ÉÏÃæ´úÂë¾ÍÖ´ÐÐÁËÕæÕýµÄ´´½¨Transport£¬Í¬Ê±¸øTransport°ü×°ÁËÍâ³Ç´¦ÀíÂß¼£¬¾ßÌå°ü×°ÔÚ£¬ÈçÏ´úÂë
public Transport
configure(Transport transport,
WireFormat wf,
Map options) throws Exception {
transport = compositeConfigure(transport, wf,
options);
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
}
public Transport compositeConfigure(Transport
transport,
WireFormat format, Map options) {
//Èç¹ûÅäÖÃÁËд³¬Ê±ÔòÖ´Ðаü×°WriteTimeoutFilter
if (options.containsKey(WRITE_TIMEOUT_FILTER))
{
transport = new WriteTimeoutFilter(transport);
String soWriteTimeout = (String)options.remove
(WRITE_TIMEOUT_FILTER);
if (soWriteTimeout!=null) {
((WriteTimeoutFilter)transport).setWriteTimeout
(Long.parseLong
(soWriteTimeout));
}
}
IntrospectionSupport.setProperties(transport,
options);
return transport;
}
|
ËùÒÔËæºóÖ´ÐÐonewayµÄTransportʵÀýÊÇÕâÑùµÄ£ºResponseCorrelator£¨MutexTransport£¨TcpTransport£©£©£¬ÔÚµ÷ÓÃʱ¾Í»á¹¹³Éµ÷ÓÃÁ´ÌõÖð²ãµ÷Óá£
ResponseCorrelator.oneway
public void
oneway(Object o) throws IOException {
Command command = (Command)o;
command.setCommandId(sequenceGenerator.
getNextSequenceId());
command.setResponseRequired(false);
next.oneway(command);
} |
MutexTransport.oneway
¸ºÔð¼ÏËø
@Override
public void oneway(Object command) throws IOException
{
writeLock.lock();
try {
next.oneway(command);
} finally {
writeLock.unlock();
}
}
|
TcpTransport.oneWay
Òì²½·¢ËÍÏûÏ¢
@Override
public void oneway(Object command) throws IOException
{
checkStarted();
wireFormat.marshal(command, dataOut);
dataOut.flush();
}
|
ÕâÖÖÁ´Ê½´úÂë·ç¸ñÖµµÃѧϰ£¬Âß¼³é³öÒ²ºÜÇÉÃîµÄ·Ö²¼ÔÚ²»Í¬µÄ´¦ÀíÆ÷ÖС£
²îµãÍüÁË£¬×îºó»¹ÒªÔö¼ÓproducerWindowSize
if (producerWindow
!= null) {
// Since we defer lots of the marshaling till
we hit the
// wire, this might not
// provide and accurate size. We may change over
to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important
once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
|
È»ºó£¬Òì²½·¢Ë͵ÄÁ÷³Ì¸ã¶¨¡£
ͬ²½·¢ËÍÏûÏ¢·ÖÎö
ActiveMqConnection.syncSendPacket
public Response
syncSendPacket(Command command, int timeout) throws
JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
try {
//µ÷ÓÃTransportµÄrequest'·½·¨
Response response = (Response)(timeout > 0
? this.transport.request(command, timeout)
: this.transport.request(command));
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException)
{
throw (JMSException)er.getException();
} else {
if (isClosed()||closing.get()) {
LOG.debug("Received an exception but connection
is closing");
}
JMSException jmsEx = null;
try {
jmsEx = JMSExceptionSupport.create(er.getException());
} catch(Throwable e) {
LOG.error("Caught an exception trying to
create a JMSException for " +er.getException(),e);
}
if (er.getException() instanceof SecurityException
&& command instanceof ConnectionInfo){
forceCloseOnSecurityException(er.getException());
}
if (jmsEx !=null) {
throw jmsEx;
}
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}
|
ÔÚÒì²½·ÖÎöʱÒѾ֪µÀTransportÊÇÒ»¸öµ÷ÓÃÁ´£¬Ê×ÏÈÀ´µ½µÄÊÇ
ResponseCorrelator.request
public Object
request(Object command) throws IOException {
FutureResponse response = asyncRequest(command,
null);
return response.getResult();
}
|
È»ºóÄã»á·¢ÏÖ£¬Æäʵͬ²½·¢ËÍÒ²ÊÇÏÈÖ´ÐÐÒì²½·¢ËÍÇëÇóasyncRequest£¬È»ºóÔÙ×èÈûresponse.getResult()»ñÈ¡ÏìÓ¦µÄ£¬ËùÒÔÆäʵÊÇÏÈÒì²½ºóͬ²½µÄ¹ý³Ì¡£
public FutureResponse
asyncRequest(Object o, ResponseCallback responseCallback)
throws
IOException {
Command command = (Command) o;
command.setCommandId(sequenceGenerator.
getNextSequenceId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse
(responseCallback, this);
IOException priorError = null;
synchronized (requestMap) {
priorError = this.error;
if (priorError == null) {
requestMap.put(new Integer(command.getCommandId()),
future);
}
}
if (priorError != null) {
future.set(new ExceptionResponse(priorError));
throw priorError;
}
//Òì²½Á÷³Ì
next.oneway(command);
return future;
}
|
|