001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.concurrent.*; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.Connection; 032import javax.jms.ConnectionConsumer; 033import javax.jms.ConnectionMetaData; 034import javax.jms.DeliveryMode; 035import javax.jms.Destination; 036import javax.jms.ExceptionListener; 037import javax.jms.IllegalStateException; 038import javax.jms.InvalidDestinationException; 039import javax.jms.JMSException; 040import javax.jms.Queue; 041import javax.jms.QueueConnection; 042import javax.jms.QueueSession; 043import javax.jms.ServerSessionPool; 044import javax.jms.Session; 045import javax.jms.Topic; 046import javax.jms.TopicConnection; 047import javax.jms.TopicSession; 048import javax.jms.XAConnection; 049 050import org.apache.activemq.advisory.DestinationSource; 051import org.apache.activemq.blob.BlobTransferPolicy; 052import org.apache.activemq.command.ActiveMQDestination; 053import org.apache.activemq.command.ActiveMQMessage; 054import org.apache.activemq.command.ActiveMQTempDestination; 055import org.apache.activemq.command.ActiveMQTempQueue; 056import org.apache.activemq.command.ActiveMQTempTopic; 057import org.apache.activemq.command.BrokerInfo; 058import org.apache.activemq.command.Command; 059import org.apache.activemq.command.CommandTypes; 060import org.apache.activemq.command.ConnectionControl; 061import org.apache.activemq.command.ConnectionError; 062import org.apache.activemq.command.ConnectionId; 063import org.apache.activemq.command.ConnectionInfo; 064import org.apache.activemq.command.ConsumerControl; 065import org.apache.activemq.command.ConsumerId; 066import org.apache.activemq.command.ConsumerInfo; 067import org.apache.activemq.command.ControlCommand; 068import org.apache.activemq.command.DestinationInfo; 069import org.apache.activemq.command.ExceptionResponse; 070import org.apache.activemq.command.Message; 071import org.apache.activemq.command.MessageDispatch; 072import org.apache.activemq.command.MessageId; 073import org.apache.activemq.command.ProducerAck; 074import org.apache.activemq.command.ProducerId; 075import org.apache.activemq.command.RemoveInfo; 076import org.apache.activemq.command.RemoveSubscriptionInfo; 077import org.apache.activemq.command.Response; 078import org.apache.activemq.command.SessionId; 079import org.apache.activemq.command.ShutdownInfo; 080import org.apache.activemq.command.WireFormatInfo; 081import org.apache.activemq.management.JMSConnectionStatsImpl; 082import org.apache.activemq.management.JMSStatsImpl; 083import org.apache.activemq.management.StatsCapable; 084import org.apache.activemq.management.StatsImpl; 085import org.apache.activemq.state.CommandVisitorAdapter; 086import org.apache.activemq.thread.Scheduler; 087import org.apache.activemq.thread.TaskRunnerFactory; 088import org.apache.activemq.transport.FutureResponse; 089import org.apache.activemq.transport.ResponseCallback; 090import org.apache.activemq.transport.Transport; 091import org.apache.activemq.transport.TransportListener; 092import org.apache.activemq.transport.failover.FailoverTransport; 093import org.apache.activemq.util.IdGenerator; 094import org.apache.activemq.util.IntrospectionSupport; 095import org.apache.activemq.util.JMSExceptionSupport; 096import org.apache.activemq.util.LongSequenceGenerator; 097import org.apache.activemq.util.ServiceSupport; 098import org.slf4j.Logger; 099import org.slf4j.LoggerFactory; 100 101public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection { 102 103 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 104 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 105 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 106 107 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); 108 109 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>(); 110 111 protected boolean dispatchAsync=true; 112 protected boolean alwaysSessionAsync = true; 113 114 private TaskRunnerFactory sessionTaskRunner; 115 private final ThreadPoolExecutor executor; 116 117 // Connection state variables 118 private final ConnectionInfo info; 119 private ExceptionListener exceptionListener; 120 private ClientInternalExceptionListener clientInternalExceptionListener; 121 private boolean clientIDSet; 122 private boolean isConnectionInfoSentToBroker; 123 private boolean userSpecifiedClientID; 124 125 // Configuration options variables 126 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 127 private BlobTransferPolicy blobTransferPolicy; 128 private RedeliveryPolicy redeliveryPolicy; 129 private MessageTransformer transformer; 130 131 private boolean disableTimeStampsByDefault; 132 private boolean optimizedMessageDispatch = true; 133 private boolean copyMessageOnSend = true; 134 private boolean useCompression; 135 private boolean objectMessageSerializationDefered; 136 private boolean useAsyncSend; 137 private boolean optimizeAcknowledge; 138 private long optimizeAcknowledgeTimeOut = 0; 139 private boolean nestedMapAndListEnabled = true; 140 private boolean useRetroactiveConsumer; 141 private boolean exclusiveConsumer; 142 private boolean alwaysSyncSend; 143 private int closeTimeout = 15000; 144 private boolean watchTopicAdvisories = true; 145 private long warnAboutUnstartedConnectionTimeout = 500L; 146 private int sendTimeout =0; 147 private boolean sendAcksAsync=true; 148 private boolean checkForDuplicates = true; 149 150 private final Transport transport; 151 private final IdGenerator clientIdGenerator; 152 private final JMSStatsImpl factoryStats; 153 private final JMSConnectionStatsImpl stats; 154 155 private final AtomicBoolean started = new AtomicBoolean(false); 156 private final AtomicBoolean closing = new AtomicBoolean(false); 157 private final AtomicBoolean closed = new AtomicBoolean(false); 158 private final AtomicBoolean transportFailed = new AtomicBoolean(false); 159 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); 160 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); 161 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>(); 162 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>(); 163 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); 164 165 // Maps ConsumerIds to ActiveMQConsumer objects 166 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); 167 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); 168 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 169 private final SessionId connectionSessionId; 170 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 171 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 172 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 173 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 174 175 private AdvisoryConsumer advisoryConsumer; 176 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); 177 private BrokerInfo brokerInfo; 178 private IOException firstFailureError; 179 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 180 181 // Assume that protocol is the latest. Change to the actual protocol 182 // version when a WireFormatInfo is received. 183 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 184 private final long timeCreated; 185 private final ConnectionAudit connectionAudit = new ConnectionAudit(); 186 private DestinationSource destinationSource; 187 private final Object ensureConnectionInfoSentMutex = new Object(); 188 private boolean useDedicatedTaskRunner; 189 protected volatile CountDownLatch transportInterruptionProcessingComplete; 190 private long consumerFailoverRedeliveryWaitPeriod; 191 private Scheduler scheduler; 192 private boolean messagePrioritySupported = true; 193 private boolean transactedIndividualAck = false; 194 private boolean nonBlockingRedelivery = false; 195 196 /** 197 * Construct an <code>ActiveMQConnection</code> 198 * 199 * @param transport 200 * @param factoryStats 201 * @throws Exception 202 */ 203 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { 204 205 this.transport = transport; 206 this.clientIdGenerator = clientIdGenerator; 207 this.factoryStats = factoryStats; 208 209 // Configure a single threaded executor who's core thread can timeout if 210 // idle 211 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 212 public Thread newThread(Runnable r) { 213 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); 214 //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 215 //thread.setDaemon(true); 216 return thread; 217 } 218 }); 219 // asyncConnectionThread.allowCoreThreadTimeOut(true); 220 String uniqueId = connectionIdGenerator.generateId(); 221 this.info = new ConnectionInfo(new ConnectionId(uniqueId)); 222 this.info.setManageable(true); 223 this.info.setFaultTolerant(transport.isFaultTolerant()); 224 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 225 226 this.transport.setTransportListener(this); 227 228 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 229 this.factoryStats.addConnection(this); 230 this.timeCreated = System.currentTimeMillis(); 231 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 232 } 233 234 protected void setUserName(String userName) { 235 this.info.setUserName(userName); 236 } 237 238 protected void setPassword(String password) { 239 this.info.setPassword(password); 240 } 241 242 /** 243 * A static helper method to create a new connection 244 * 245 * @return an ActiveMQConnection 246 * @throws JMSException 247 */ 248 public static ActiveMQConnection makeConnection() throws JMSException { 249 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 250 return (ActiveMQConnection)factory.createConnection(); 251 } 252 253 /** 254 * A static helper method to create a new connection 255 * 256 * @param uri 257 * @return and ActiveMQConnection 258 * @throws JMSException 259 */ 260 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException { 261 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 262 return (ActiveMQConnection)factory.createConnection(); 263 } 264 265 /** 266 * A static helper method to create a new connection 267 * 268 * @param user 269 * @param password 270 * @param uri 271 * @return an ActiveMQConnection 272 * @throws JMSException 273 */ 274 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException { 275 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri)); 276 return (ActiveMQConnection)factory.createConnection(); 277 } 278 279 /** 280 * @return a number unique for this connection 281 */ 282 public JMSConnectionStatsImpl getConnectionStats() { 283 return stats; 284 } 285 286 /** 287 * Creates a <CODE>Session</CODE> object. 288 * 289 * @param transacted indicates whether the session is transacted 290 * @param acknowledgeMode indicates whether the consumer or the client will 291 * acknowledge any messages it receives; ignored if the 292 * session is transacted. Legal values are 293 * <code>Session.AUTO_ACKNOWLEDGE</code>, 294 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 295 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 296 * @return a newly created session 297 * @throws JMSException if the <CODE>Connection</CODE> object fails to 298 * create a session due to some internal error or lack of 299 * support for the specific transaction and acknowledgement 300 * mode. 301 * @see Session#AUTO_ACKNOWLEDGE 302 * @see Session#CLIENT_ACKNOWLEDGE 303 * @see Session#DUPS_OK_ACKNOWLEDGE 304 * @since 1.1 305 */ 306 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 307 checkClosedOrFailed(); 308 ensureConnectionInfoSent(); 309 if(!transacted) { 310 if (acknowledgeMode==Session.SESSION_TRANSACTED) { 311 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); 312 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { 313 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + 314 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); 315 } 316 } 317 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED 318 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); 319 } 320 321 /** 322 * @return sessionId 323 */ 324 protected SessionId getNextSessionId() { 325 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 326 } 327 328 /** 329 * Gets the client identifier for this connection. 330 * <P> 331 * This value is specific to the JMS provider. It is either preconfigured by 332 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 333 * dynamically by the application by calling the <code>setClientID</code> 334 * method. 335 * 336 * @return the unique client identifier 337 * @throws JMSException if the JMS provider fails to return the client ID 338 * for this connection due to some internal error. 339 */ 340 public String getClientID() throws JMSException { 341 checkClosedOrFailed(); 342 return this.info.getClientId(); 343 } 344 345 /** 346 * Sets the client identifier for this connection. 347 * <P> 348 * The preferred way to assign a JMS client's client identifier is for it to 349 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 350 * object and transparently assigned to the <CODE>Connection</CODE> object 351 * it creates. 352 * <P> 353 * Alternatively, a client can set a connection's client identifier using a 354 * provider-specific value. The facility to set a connection's client 355 * identifier explicitly is not a mechanism for overriding the identifier 356 * that has been administratively configured. It is provided for the case 357 * where no administratively specified identifier exists. If one does exist, 358 * an attempt to change it by setting it must throw an 359 * <CODE>IllegalStateException</CODE>. If a client sets the client 360 * identifier explicitly, it must do so immediately after it creates the 361 * connection and before any other action on the connection is taken. After 362 * this point, setting the client identifier is a programming error that 363 * should throw an <CODE>IllegalStateException</CODE>. 364 * <P> 365 * The purpose of the client identifier is to associate a connection and its 366 * objects with a state maintained on behalf of the client by a provider. 367 * The only such state identified by the JMS API is that required to support 368 * durable subscriptions. 369 * <P> 370 * If another connection with the same <code>clientID</code> is already 371 * running when this method is called, the JMS provider should detect the 372 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 373 * 374 * @param newClientID the unique client identifier 375 * @throws JMSException if the JMS provider fails to set the client ID for 376 * this connection due to some internal error. 377 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an 378 * invalid or duplicate client ID. 379 * @throws javax.jms.IllegalStateException if the JMS client attempts to set 380 * a connection's client ID at the wrong time or when it has 381 * been administratively configured. 382 */ 383 public void setClientID(String newClientID) throws JMSException { 384 checkClosedOrFailed(); 385 386 if (this.clientIDSet) { 387 throw new IllegalStateException("The clientID has already been set"); 388 } 389 390 if (this.isConnectionInfoSentToBroker) { 391 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); 392 } 393 394 this.info.setClientId(newClientID); 395 this.userSpecifiedClientID = true; 396 ensureConnectionInfoSent(); 397 } 398 399 /** 400 * Sets the default client id that the connection will use if explicitly not 401 * set with the setClientId() call. 402 */ 403 public void setDefaultClientID(String clientID) throws JMSException { 404 this.info.setClientId(clientID); 405 this.userSpecifiedClientID = true; 406 } 407 408 /** 409 * Gets the metadata for this connection. 410 * 411 * @return the connection metadata 412 * @throws JMSException if the JMS provider fails to get the connection 413 * metadata for this connection. 414 * @see javax.jms.ConnectionMetaData 415 */ 416 public ConnectionMetaData getMetaData() throws JMSException { 417 checkClosedOrFailed(); 418 return ActiveMQConnectionMetaData.INSTANCE; 419 } 420 421 /** 422 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 423 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 424 * associated with it. 425 * 426 * @return the <CODE>ExceptionListener</CODE> for this connection, or 427 * null, if no <CODE>ExceptionListener</CODE> is associated with 428 * this connection. 429 * @throws JMSException if the JMS provider fails to get the 430 * <CODE>ExceptionListener</CODE> for this connection. 431 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 432 */ 433 public ExceptionListener getExceptionListener() throws JMSException { 434 checkClosedOrFailed(); 435 return this.exceptionListener; 436 } 437 438 /** 439 * Sets an exception listener for this connection. 440 * <P> 441 * If a JMS provider detects a serious problem with a connection, it informs 442 * the connection's <CODE> ExceptionListener</CODE>, if one has been 443 * registered. It does this by calling the listener's <CODE>onException 444 * </CODE> 445 * method, passing it a <CODE>JMSException</CODE> object describing the 446 * problem. 447 * <P> 448 * An exception listener allows a client to be notified of a problem 449 * asynchronously. Some connections only consume messages, so they would 450 * have no other way to learn their connection has failed. 451 * <P> 452 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 453 * <P> 454 * A JMS provider should attempt to resolve connection problems itself 455 * before it notifies the client of them. 456 * 457 * @param listener the exception listener 458 * @throws JMSException if the JMS provider fails to set the exception 459 * listener for this connection. 460 */ 461 public void setExceptionListener(ExceptionListener listener) throws JMSException { 462 checkClosedOrFailed(); 463 this.exceptionListener = listener; 464 } 465 466 /** 467 * Gets the <code>ClientInternalExceptionListener</code> object for this connection. 468 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE> 469 * associated with it. 470 * 471 * @return the listener or <code>null</code> if no listener is registered with the connection. 472 */ 473 public ClientInternalExceptionListener getClientInternalExceptionListener() 474 { 475 return clientInternalExceptionListener; 476 } 477 478 /** 479 * Sets a client internal exception listener for this connection. 480 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components 481 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. 482 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> 483 * describing the problem. 484 * 485 * @param listener the exception listener 486 */ 487 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) 488 { 489 this.clientInternalExceptionListener = listener; 490 } 491 492 /** 493 * Starts (or restarts) a connection's delivery of incoming messages. A call 494 * to <CODE>start</CODE> on a connection that has already been started is 495 * ignored. 496 * 497 * @throws JMSException if the JMS provider fails to start message delivery 498 * due to some internal error. 499 * @see javax.jms.Connection#stop() 500 */ 501 public void start() throws JMSException { 502 checkClosedOrFailed(); 503 ensureConnectionInfoSent(); 504 if (started.compareAndSet(false, true)) { 505 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 506 ActiveMQSession session = i.next(); 507 session.start(); 508 } 509 } 510 } 511 512 /** 513 * Temporarily stops a connection's delivery of incoming messages. Delivery 514 * can be restarted using the connection's <CODE>start</CODE> method. When 515 * the connection is stopped, delivery to all the connection's message 516 * consumers is inhibited: synchronous receives block, and messages are not 517 * delivered to message listeners. 518 * <P> 519 * This call blocks until receives and/or message listeners in progress have 520 * completed. 521 * <P> 522 * Stopping a connection has no effect on its ability to send messages. A 523 * call to <CODE>stop</CODE> on a connection that has already been stopped 524 * is ignored. 525 * <P> 526 * A call to <CODE>stop</CODE> must not return until delivery of messages 527 * has paused. This means that a client can rely on the fact that none of 528 * its message listeners will be called and that all threads of control 529 * waiting for <CODE>receive</CODE> calls to return will not return with a 530 * message until the connection is restarted. The receive timers for a 531 * stopped connection continue to advance, so receives may time out while 532 * the connection is stopped. 533 * <P> 534 * If message listeners are running when <CODE>stop</CODE> is invoked, the 535 * <CODE>stop</CODE> call must wait until all of them have returned before 536 * it may return. While these message listeners are completing, they must 537 * have the full services of the connection available to them. 538 * 539 * @throws JMSException if the JMS provider fails to stop message delivery 540 * due to some internal error. 541 * @see javax.jms.Connection#start() 542 */ 543 public void stop() throws JMSException { 544 checkClosedOrFailed(); 545 if (started.compareAndSet(true, false)) { 546 synchronized(sessions) { 547 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 548 ActiveMQSession s = i.next(); 549 s.stop(); 550 } 551 } 552 } 553 } 554 555 /** 556 * Closes the connection. 557 * <P> 558 * Since a provider typically allocates significant resources outside the 559 * JVM on behalf of a connection, clients should close these resources when 560 * they are not needed. Relying on garbage collection to eventually reclaim 561 * these resources may not be timely enough. 562 * <P> 563 * There is no need to close the sessions, producers, and consumers of a 564 * closed connection. 565 * <P> 566 * Closing a connection causes all temporary destinations to be deleted. 567 * <P> 568 * When this method is invoked, it should not return until message 569 * processing has been shut down in an orderly fashion. This means that all 570 * message listeners that may have been running have returned, and that all 571 * pending receives have returned. A close terminates all pending message 572 * receives on the connection's sessions' consumers. The receives may return 573 * with a message or with null, depending on whether there was a message 574 * available at the time of the close. If one or more of the connection's 575 * sessions' message listeners is processing a message at the time when 576 * connection <CODE>close</CODE> is invoked, all the facilities of the 577 * connection and its sessions must remain available to those listeners 578 * until they return control to the JMS provider. 579 * <P> 580 * Closing a connection causes any of its sessions' transactions in progress 581 * to be rolled back. In the case where a session's work is coordinated by 582 * an external transaction manager, a session's <CODE>commit</CODE> and 583 * <CODE> rollback</CODE> methods are not used and the result of a closed 584 * session's work is determined later by the transaction manager. Closing a 585 * connection does NOT force an acknowledgment of client-acknowledged 586 * sessions. 587 * <P> 588 * Invoking the <CODE>acknowledge</CODE> method of a received message from 589 * a closed connection's session must throw an 590 * <CODE>IllegalStateException</CODE>. Closing a closed connection must 591 * NOT throw an exception. 592 * 593 * @throws JMSException if the JMS provider fails to close the connection 594 * due to some internal error. For example, a failure to 595 * release resources or to close a socket connection can 596 * cause this exception to be thrown. 597 */ 598 public void close() throws JMSException { 599 // Store the interrupted state and clear so that cleanup happens without 600 // leaking connection resources. Reset in finally to preserve state. 601 boolean interrupted = Thread.interrupted(); 602 603 try { 604 605 // If we were running, lets stop first. 606 if (!closed.get() && !transportFailed.get()) { 607 stop(); 608 } 609 610 synchronized (this) { 611 if (!closed.get()) { 612 closing.set(true); 613 614 if (destinationSource != null) { 615 destinationSource.stop(); 616 destinationSource = null; 617 } 618 if (advisoryConsumer != null) { 619 advisoryConsumer.dispose(); 620 advisoryConsumer = null; 621 } 622 623 Scheduler scheduler = this.scheduler; 624 if (scheduler != null) { 625 try { 626 scheduler.stop(); 627 } catch (Exception e) { 628 JMSException ex = JMSExceptionSupport.create(e); 629 throw ex; 630 } 631 } 632 633 long lastDeliveredSequenceId = 0; 634 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 635 ActiveMQSession s = i.next(); 636 s.dispose(); 637 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId()); 638 } 639 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 640 ActiveMQConnectionConsumer c = i.next(); 641 c.dispose(); 642 } 643 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 644 ActiveMQInputStream c = i.next(); 645 c.dispose(); 646 } 647 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 648 ActiveMQOutputStream c = i.next(); 649 c.dispose(); 650 } 651 652 // As TemporaryQueue and TemporaryTopic instances are bound 653 // to a connection we should just delete them after the connection 654 // is closed to free up memory 655 for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) { 656 ActiveMQTempDestination c = i.next(); 657 c.delete(); 658 } 659 660 if (isConnectionInfoSentToBroker) { 661 // If we announced ourselfs to the broker.. Try to let 662 // the broker 663 // know that the connection is being shutdown. 664 RemoveInfo removeCommand = info.createRemoveCommand(); 665 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 666 doSyncSendPacket(info.createRemoveCommand(), closeTimeout); 667 doAsyncSendPacket(new ShutdownInfo()); 668 } 669 670 started.set(false); 671 672 // TODO if we move the TaskRunnerFactory to the connection 673 // factory 674 // then we may need to call 675 // factory.onConnectionClose(this); 676 if (sessionTaskRunner != null) { 677 sessionTaskRunner.shutdown(); 678 } 679 closed.set(true); 680 closing.set(false); 681 } 682 } 683 } finally { 684 try { 685 if (executor != null) { 686 executor.shutdown(); 687 } 688 } catch (Throwable e) { 689 LOG.error("Error shutting down thread pool " + e, e); 690 } 691 692 ServiceSupport.dispose(this.transport); 693 694 factoryStats.removeConnection(this); 695 if (interrupted) { 696 Thread.currentThread().interrupt(); 697 } 698 } 699 } 700 701 /** 702 * Tells the broker to terminate its VM. This can be used to cleanly 703 * terminate a broker running in a standalone java process. Server must have 704 * property enable.vm.shutdown=true defined to allow this to work. 705 */ 706 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet 707 // implemented. 708 /* 709 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand 710 * command = new BrokerAdminCommand(); 711 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 712 * asyncSendPacket(command); } 713 */ 714 715 /** 716 * Create a durable connection consumer for this connection (optional 717 * operation). This is an expert facility not used by regular JMS clients. 718 * 719 * @param topic topic to access 720 * @param subscriptionName durable subscription name 721 * @param messageSelector only messages with properties matching the message 722 * selector expression are delivered. A value of null or an 723 * empty string indicates that there is no message selector 724 * for the message consumer. 725 * @param sessionPool the server session pool to associate with this durable 726 * connection consumer 727 * @param maxMessages the maximum number of messages that can be assigned to 728 * a server session at one time 729 * @return the durable connection consumer 730 * @throws JMSException if the <CODE>Connection</CODE> object fails to 731 * create a connection consumer due to some internal error 732 * or invalid arguments for <CODE>sessionPool</CODE> and 733 * <CODE>messageSelector</CODE>. 734 * @throws javax.jms.InvalidDestinationException if an invalid destination 735 * is specified. 736 * @throws javax.jms.InvalidSelectorException if the message selector is 737 * invalid. 738 * @see javax.jms.ConnectionConsumer 739 * @since 1.1 740 */ 741 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) 742 throws JMSException { 743 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 744 } 745 746 /** 747 * Create a durable connection consumer for this connection (optional 748 * operation). This is an expert facility not used by regular JMS clients. 749 * 750 * @param topic topic to access 751 * @param subscriptionName durable subscription name 752 * @param messageSelector only messages with properties matching the message 753 * selector expression are delivered. A value of null or an 754 * empty string indicates that there is no message selector 755 * for the message consumer. 756 * @param sessionPool the server session pool to associate with this durable 757 * connection consumer 758 * @param maxMessages the maximum number of messages that can be assigned to 759 * a server session at one time 760 * @param noLocal set true if you want to filter out messages published 761 * locally 762 * @return the durable connection consumer 763 * @throws JMSException if the <CODE>Connection</CODE> object fails to 764 * create a connection consumer due to some internal error 765 * or invalid arguments for <CODE>sessionPool</CODE> and 766 * <CODE>messageSelector</CODE>. 767 * @throws javax.jms.InvalidDestinationException if an invalid destination 768 * is specified. 769 * @throws javax.jms.InvalidSelectorException if the message selector is 770 * invalid. 771 * @see javax.jms.ConnectionConsumer 772 * @since 1.1 773 */ 774 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, 775 boolean noLocal) throws JMSException { 776 checkClosedOrFailed(); 777 ensureConnectionInfoSent(); 778 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 779 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId())); 780 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 781 info.setSubscriptionName(subscriptionName); 782 info.setSelector(messageSelector); 783 info.setPrefetchSize(maxMessages); 784 info.setDispatchAsync(isDispatchAsync()); 785 786 // Allows the options on the destination to configure the consumerInfo 787 if (info.getDestination().getOptions() != null) { 788 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions()); 789 IntrospectionSupport.setProperties(this.info, options, "consumer."); 790 } 791 792 return new ActiveMQConnectionConsumer(this, sessionPool, info); 793 } 794 795 // Properties 796 // ------------------------------------------------------------------------- 797 798 /** 799 * Returns true if this connection has been started 800 * 801 * @return true if this Connection is started 802 */ 803 public boolean isStarted() { 804 return started.get(); 805 } 806 807 /** 808 * Returns true if the connection is closed 809 */ 810 public boolean isClosed() { 811 return closed.get(); 812 } 813 814 /** 815 * Returns true if the connection is in the process of being closed 816 */ 817 public boolean isClosing() { 818 return closing.get(); 819 } 820 821 /** 822 * Returns true if the underlying transport has failed 823 */ 824 public boolean isTransportFailed() { 825 return transportFailed.get(); 826 } 827 828 /** 829 * @return Returns the prefetchPolicy. 830 */ 831 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 832 return prefetchPolicy; 833 } 834 835 /** 836 * Sets the <a 837 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 838 * policy</a> for consumers created by this connection. 839 */ 840 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 841 this.prefetchPolicy = prefetchPolicy; 842 } 843 844 /** 845 */ 846 public Transport getTransportChannel() { 847 return transport; 848 } 849 850 /** 851 * @return Returns the clientID of the connection, forcing one to be 852 * generated if one has not yet been configured. 853 */ 854 public String getInitializedClientID() throws JMSException { 855 ensureConnectionInfoSent(); 856 return info.getClientId(); 857 } 858 859 /** 860 * @return Returns the timeStampsDisableByDefault. 861 */ 862 public boolean isDisableTimeStampsByDefault() { 863 return disableTimeStampsByDefault; 864 } 865 866 /** 867 * Sets whether or not timestamps on messages should be disabled or not. If 868 * you disable them it adds a small performance boost. 869 */ 870 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 871 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 872 } 873 874 /** 875 * @return Returns the dispatchOptimizedMessage. 876 */ 877 public boolean isOptimizedMessageDispatch() { 878 return optimizedMessageDispatch; 879 } 880 881 /** 882 * If this flag is set then an larger prefetch limit is used - only 883 * applicable for durable topic subscribers. 884 */ 885 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 886 this.optimizedMessageDispatch = dispatchOptimizedMessage; 887 } 888 889 /** 890 * @return Returns the closeTimeout. 891 */ 892 public int getCloseTimeout() { 893 return closeTimeout; 894 } 895 896 /** 897 * Sets the timeout before a close is considered complete. Normally a 898 * close() on a connection waits for confirmation from the broker; this 899 * allows that operation to timeout to save the client hanging if there is 900 * no broker 901 */ 902 public void setCloseTimeout(int closeTimeout) { 903 this.closeTimeout = closeTimeout; 904 } 905 906 /** 907 * @return ConnectionInfo 908 */ 909 public ConnectionInfo getConnectionInfo() { 910 return this.info; 911 } 912 913 public boolean isUseRetroactiveConsumer() { 914 return useRetroactiveConsumer; 915 } 916 917 /** 918 * Sets whether or not retroactive consumers are enabled. Retroactive 919 * consumers allow non-durable topic subscribers to receive old messages 920 * that were published before the non-durable subscriber started. 921 */ 922 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 923 this.useRetroactiveConsumer = useRetroactiveConsumer; 924 } 925 926 public boolean isNestedMapAndListEnabled() { 927 return nestedMapAndListEnabled; 928 } 929 930 /** 931 * Enables/disables whether or not Message properties and MapMessage entries 932 * support <a 933 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 934 * Structures</a> of Map and List objects 935 */ 936 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 937 this.nestedMapAndListEnabled = structuredMapsEnabled; 938 } 939 940 public boolean isExclusiveConsumer() { 941 return exclusiveConsumer; 942 } 943 944 /** 945 * Enables or disables whether or not queue consumers should be exclusive or 946 * not for example to preserve ordering when not using <a 947 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 948 * 949 * @param exclusiveConsumer 950 */ 951 public void setExclusiveConsumer(boolean exclusiveConsumer) { 952 this.exclusiveConsumer = exclusiveConsumer; 953 } 954 955 /** 956 * Adds a transport listener so that a client can be notified of events in 957 * the underlying transport 958 */ 959 public void addTransportListener(TransportListener transportListener) { 960 transportListeners.add(transportListener); 961 } 962 963 public void removeTransportListener(TransportListener transportListener) { 964 transportListeners.remove(transportListener); 965 } 966 967 public boolean isUseDedicatedTaskRunner() { 968 return useDedicatedTaskRunner; 969 } 970 971 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 972 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 973 } 974 975 public TaskRunnerFactory getSessionTaskRunner() { 976 synchronized (this) { 977 if (sessionTaskRunner == null) { 978 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner()); 979 } 980 } 981 return sessionTaskRunner; 982 } 983 984 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 985 this.sessionTaskRunner = sessionTaskRunner; 986 } 987 988 public MessageTransformer getTransformer() { 989 return transformer; 990 } 991 992 /** 993 * Sets the transformer used to transform messages before they are sent on 994 * to the JMS bus or when they are received from the bus but before they are 995 * delivered to the JMS client 996 */ 997 public void setTransformer(MessageTransformer transformer) { 998 this.transformer = transformer; 999 } 1000 1001 /** 1002 * @return the statsEnabled 1003 */ 1004 public boolean isStatsEnabled() { 1005 return this.stats.isEnabled(); 1006 } 1007 1008 /** 1009 * @param statsEnabled the statsEnabled to set 1010 */ 1011 public void setStatsEnabled(boolean statsEnabled) { 1012 this.stats.setEnabled(statsEnabled); 1013 } 1014 1015 /** 1016 * Returns the {@link DestinationSource} object which can be used to listen to destinations 1017 * being created or destroyed or to enquire about the current destinations available on the broker 1018 * 1019 * @return a lazily created destination source 1020 * @throws JMSException 1021 */ 1022 public DestinationSource getDestinationSource() throws JMSException { 1023 if (destinationSource == null) { 1024 destinationSource = new DestinationSource(this); 1025 destinationSource.start(); 1026 } 1027 return destinationSource; 1028 } 1029 1030 // Implementation methods 1031 // ------------------------------------------------------------------------- 1032 1033 /** 1034 * Used internally for adding Sessions to the Connection 1035 * 1036 * @param session 1037 * @throws JMSException 1038 * @throws JMSException 1039 */ 1040 protected void addSession(ActiveMQSession session) throws JMSException { 1041 this.sessions.add(session); 1042 if (sessions.size() > 1 || session.isTransacted()) { 1043 optimizedMessageDispatch = false; 1044 } 1045 } 1046 1047 /** 1048 * Used interanlly for removing Sessions from a Connection 1049 * 1050 * @param session 1051 */ 1052 protected void removeSession(ActiveMQSession session) { 1053 this.sessions.remove(session); 1054 this.removeDispatcher(session); 1055 } 1056 1057 /** 1058 * Add a ConnectionConsumer 1059 * 1060 * @param connectionConsumer 1061 * @throws JMSException 1062 */ 1063 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1064 this.connectionConsumers.add(connectionConsumer); 1065 } 1066 1067 /** 1068 * Remove a ConnectionConsumer 1069 * 1070 * @param connectionConsumer 1071 */ 1072 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 1073 this.connectionConsumers.remove(connectionConsumer); 1074 this.removeDispatcher(connectionConsumer); 1075 } 1076 1077 /** 1078 * Creates a <CODE>TopicSession</CODE> object. 1079 * 1080 * @param transacted indicates whether the session is transacted 1081 * @param acknowledgeMode indicates whether the consumer or the client will 1082 * acknowledge any messages it receives; ignored if the 1083 * session is transacted. Legal values are 1084 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1085 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1086 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1087 * @return a newly created topic session 1088 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1089 * to create a session due to some internal error or lack of 1090 * support for the specific transaction and acknowledgement 1091 * mode. 1092 * @see Session#AUTO_ACKNOWLEDGE 1093 * @see Session#CLIENT_ACKNOWLEDGE 1094 * @see Session#DUPS_OK_ACKNOWLEDGE 1095 */ 1096 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 1097 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1098 } 1099 1100 /** 1101 * Creates a connection consumer for this connection (optional operation). 1102 * This is an expert facility not used by regular JMS clients. 1103 * 1104 * @param topic the topic to access 1105 * @param messageSelector only messages with properties matching the message 1106 * selector expression are delivered. A value of null or an 1107 * empty string indicates that there is no message selector 1108 * for the message consumer. 1109 * @param sessionPool the server session pool to associate with this 1110 * connection consumer 1111 * @param maxMessages the maximum number of messages that can be assigned to 1112 * a server session at one time 1113 * @return the connection consumer 1114 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1115 * to create a connection consumer due to some internal 1116 * error or invalid arguments for <CODE>sessionPool</CODE> 1117 * and <CODE>messageSelector</CODE>. 1118 * @throws javax.jms.InvalidDestinationException if an invalid topic is 1119 * specified. 1120 * @throws javax.jms.InvalidSelectorException if the message selector is 1121 * invalid. 1122 * @see javax.jms.ConnectionConsumer 1123 */ 1124 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1125 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1126 } 1127 1128 /** 1129 * Creates a connection consumer for this connection (optional operation). 1130 * This is an expert facility not used by regular JMS clients. 1131 * 1132 * @param queue the queue to access 1133 * @param messageSelector only messages with properties matching the message 1134 * selector expression are delivered. A value of null or an 1135 * empty string indicates that there is no message selector 1136 * for the message consumer. 1137 * @param sessionPool the server session pool to associate with this 1138 * connection consumer 1139 * @param maxMessages the maximum number of messages that can be assigned to 1140 * a server session at one time 1141 * @return the connection consumer 1142 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1143 * to create a connection consumer due to some internal 1144 * error or invalid arguments for <CODE>sessionPool</CODE> 1145 * and <CODE>messageSelector</CODE>. 1146 * @throws javax.jms.InvalidDestinationException if an invalid queue is 1147 * specified. 1148 * @throws javax.jms.InvalidSelectorException if the message selector is 1149 * invalid. 1150 * @see javax.jms.ConnectionConsumer 1151 */ 1152 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1153 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1154 } 1155 1156 /** 1157 * Creates a connection consumer for this connection (optional operation). 1158 * This is an expert facility not used by regular JMS clients. 1159 * 1160 * @param destination the destination to access 1161 * @param messageSelector only messages with properties matching the message 1162 * selector expression are delivered. A value of null or an 1163 * empty string indicates that there is no message selector 1164 * for the message consumer. 1165 * @param sessionPool the server session pool to associate with this 1166 * connection consumer 1167 * @param maxMessages the maximum number of messages that can be assigned to 1168 * a server session at one time 1169 * @return the connection consumer 1170 * @throws JMSException if the <CODE>Connection</CODE> object fails to 1171 * create a connection consumer due to some internal error 1172 * or invalid arguments for <CODE>sessionPool</CODE> and 1173 * <CODE>messageSelector</CODE>. 1174 * @throws javax.jms.InvalidDestinationException if an invalid destination 1175 * is specified. 1176 * @throws javax.jms.InvalidSelectorException if the message selector is 1177 * invalid. 1178 * @see javax.jms.ConnectionConsumer 1179 * @since 1.1 1180 */ 1181 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1182 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1183 } 1184 1185 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 1186 throws JMSException { 1187 1188 checkClosedOrFailed(); 1189 ensureConnectionInfoSent(); 1190 1191 ConsumerId consumerId = createConsumerId(); 1192 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); 1193 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1194 consumerInfo.setSelector(messageSelector); 1195 consumerInfo.setPrefetchSize(maxMessages); 1196 consumerInfo.setNoLocal(noLocal); 1197 consumerInfo.setDispatchAsync(isDispatchAsync()); 1198 1199 // Allows the options on the destination to configure the consumerInfo 1200 if (consumerInfo.getDestination().getOptions() != null) { 1201 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions()); 1202 IntrospectionSupport.setProperties(consumerInfo, options, "consumer."); 1203 } 1204 1205 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo); 1206 } 1207 1208 /** 1209 * @return 1210 */ 1211 private ConsumerId createConsumerId() { 1212 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1213 } 1214 1215 /** 1216 * @return 1217 */ 1218 private ProducerId createProducerId() { 1219 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId()); 1220 } 1221 1222 /** 1223 * Creates a <CODE>QueueSession</CODE> object. 1224 * 1225 * @param transacted indicates whether the session is transacted 1226 * @param acknowledgeMode indicates whether the consumer or the client will 1227 * acknowledge any messages it receives; ignored if the 1228 * session is transacted. Legal values are 1229 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1230 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1231 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1232 * @return a newly created queue session 1233 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1234 * to create a session due to some internal error or lack of 1235 * support for the specific transaction and acknowledgement 1236 * mode. 1237 * @see Session#AUTO_ACKNOWLEDGE 1238 * @see Session#CLIENT_ACKNOWLEDGE 1239 * @see Session#DUPS_OK_ACKNOWLEDGE 1240 */ 1241 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1242 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1243 } 1244 1245 /** 1246 * Ensures that the clientID was manually specified and not auto-generated. 1247 * If the clientID was not specified this method will throw an exception. 1248 * This method is used to ensure that the clientID + durableSubscriber name 1249 * are used correctly. 1250 * 1251 * @throws JMSException 1252 */ 1253 public void checkClientIDWasManuallySpecified() throws JMSException { 1254 if (!userSpecifiedClientID) { 1255 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1256 } 1257 } 1258 1259 /** 1260 * send a Packet through the Connection - for internal use only 1261 * 1262 * @param command 1263 * @throws JMSException 1264 */ 1265 public void asyncSendPacket(Command command) throws JMSException { 1266 if (isClosed()) { 1267 throw new ConnectionClosedException(); 1268 } else { 1269 doAsyncSendPacket(command); 1270 } 1271 } 1272 1273 private void doAsyncSendPacket(Command command) throws JMSException { 1274 try { 1275 this.transport.oneway(command); 1276 } catch (IOException e) { 1277 throw JMSExceptionSupport.create(e); 1278 } 1279 } 1280 1281 /** 1282 * Send a packet through a Connection - for internal use only 1283 * 1284 * @param command 1285 * @return 1286 * @throws JMSException 1287 */ 1288 public void syncSendPacket(Command command, final AsyncCallback onComplete) throws JMSException { 1289 if(onComplete==null) { 1290 syncSendPacket(command); 1291 } else { 1292 if (isClosed()) { 1293 throw new ConnectionClosedException(); 1294 } 1295 try { 1296 this.transport.asyncRequest(command, new ResponseCallback() { 1297 @Override 1298 public void onCompletion(FutureResponse resp) { 1299 Response response; 1300 Throwable exception = null; 1301 try { 1302 response = resp.getResult(); 1303 if (response.isException()) { 1304 ExceptionResponse er = (ExceptionResponse)response; 1305 exception = er.getException(); 1306 } 1307 } catch (Exception e) { 1308 exception = e; 1309 } 1310 if(exception!=null) { 1311 if ( exception instanceof JMSException) { 1312 onComplete.onException((JMSException) exception); 1313 } else { 1314 if (isClosed()||closing.get()) { 1315 LOG.debug("Received an exception but connection is closing"); 1316 } 1317 JMSException jmsEx = null; 1318 try { 1319 jmsEx = JMSExceptionSupport.create(exception); 1320 } catch(Throwable e) { 1321 LOG.error("Caught an exception trying to create a JMSException for " +exception,e); 1322 } 1323 //dispose of transport for security exceptions 1324 if (exception instanceof SecurityException){ 1325 Transport t = transport; 1326 if (null != t){ 1327 ServiceSupport.dispose(t); 1328 } 1329 } 1330 if (jmsEx !=null) { 1331 onComplete.onException(jmsEx); 1332 } 1333 } 1334 } else { 1335 onComplete.onSuccess(); 1336 } 1337 } 1338 }); 1339 } catch (IOException e) { 1340 throw JMSExceptionSupport.create(e); 1341 } 1342 } 1343 } 1344 1345 public Response syncSendPacket(Command command) throws JMSException { 1346 if (isClosed()) { 1347 throw new ConnectionClosedException(); 1348 } else { 1349 1350 try { 1351 Response response = (Response)this.transport.request(command); 1352 if (response.isException()) { 1353 ExceptionResponse er = (ExceptionResponse)response; 1354 if (er.getException() instanceof JMSException) { 1355 throw (JMSException)er.getException(); 1356 } else { 1357 if (isClosed()||closing.get()) { 1358 LOG.debug("Received an exception but connection is closing"); 1359 } 1360 JMSException jmsEx = null; 1361 try { 1362 jmsEx = JMSExceptionSupport.create(er.getException()); 1363 } catch(Throwable e) { 1364 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); 1365 } 1366 //dispose of transport for security exceptions 1367 if (er.getException() instanceof SecurityException){ 1368 Transport t = this.transport; 1369 if (null != t){ 1370 ServiceSupport.dispose(t); 1371 } 1372 } 1373 if (jmsEx !=null) { 1374 throw jmsEx; 1375 } 1376 } 1377 } 1378 return response; 1379 } catch (IOException e) { 1380 throw JMSExceptionSupport.create(e); 1381 } 1382 } 1383 } 1384 1385 /** 1386 * Send a packet through a Connection - for internal use only 1387 * 1388 * @param command 1389 * @return 1390 * @throws JMSException 1391 */ 1392 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1393 if (isClosed() || closing.get()) { 1394 throw new ConnectionClosedException(); 1395 } else { 1396 return doSyncSendPacket(command, timeout); 1397 } 1398 } 1399 1400 private Response doSyncSendPacket(Command command, int timeout) 1401 throws JMSException { 1402 try { 1403 Response response = (Response) (timeout > 0 1404 ? this.transport.request(command, timeout) 1405 : this.transport.request(command)); 1406 if (response != null && response.isException()) { 1407 ExceptionResponse er = (ExceptionResponse)response; 1408 if (er.getException() instanceof JMSException) { 1409 throw (JMSException)er.getException(); 1410 } else { 1411 throw JMSExceptionSupport.create(er.getException()); 1412 } 1413 } 1414 return response; 1415 } catch (IOException e) { 1416 throw JMSExceptionSupport.create(e); 1417 } 1418 } 1419 1420 /** 1421 * @return statistics for this Connection 1422 */ 1423 public StatsImpl getStats() { 1424 return stats; 1425 } 1426 1427 /** 1428 * simply throws an exception if the Connection is already closed or the 1429 * Transport has failed 1430 * 1431 * @throws JMSException 1432 */ 1433 protected synchronized void checkClosedOrFailed() throws JMSException { 1434 checkClosed(); 1435 if (transportFailed.get()) { 1436 throw new ConnectionFailedException(firstFailureError); 1437 } 1438 } 1439 1440 /** 1441 * simply throws an exception if the Connection is already closed 1442 * 1443 * @throws JMSException 1444 */ 1445 protected synchronized void checkClosed() throws JMSException { 1446 if (closed.get()) { 1447 throw new ConnectionClosedException(); 1448 } 1449 } 1450 1451 /** 1452 * Send the ConnectionInfo to the Broker 1453 * 1454 * @throws JMSException 1455 */ 1456 protected void ensureConnectionInfoSent() throws JMSException { 1457 synchronized(this.ensureConnectionInfoSentMutex) { 1458 // Can we skip sending the ConnectionInfo packet?? 1459 if (isConnectionInfoSentToBroker || closed.get()) { 1460 return; 1461 } 1462 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? 1463 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1464 info.setClientId(clientIdGenerator.generateId()); 1465 } 1466 syncSendPacket(info.copy()); 1467 1468 this.isConnectionInfoSentToBroker = true; 1469 // Add a temp destination advisory consumer so that 1470 // We know what the valid temporary destinations are on the 1471 // broker without having to do an RPC to the broker. 1472 1473 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); 1474 if (watchTopicAdvisories) { 1475 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1476 } 1477 } 1478 } 1479 1480 public synchronized boolean isWatchTopicAdvisories() { 1481 return watchTopicAdvisories; 1482 } 1483 1484 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1485 this.watchTopicAdvisories = watchTopicAdvisories; 1486 } 1487 1488 /** 1489 * @return Returns the useAsyncSend. 1490 */ 1491 public boolean isUseAsyncSend() { 1492 return useAsyncSend; 1493 } 1494 1495 /** 1496 * Forces the use of <a 1497 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 1498 * adds a massive performance boost; but means that the send() method will 1499 * return immediately whether the message has been sent or not which could 1500 * lead to message loss. 1501 */ 1502 public void setUseAsyncSend(boolean useAsyncSend) { 1503 this.useAsyncSend = useAsyncSend; 1504 } 1505 1506 /** 1507 * @return true if always sync send messages 1508 */ 1509 public boolean isAlwaysSyncSend() { 1510 return this.alwaysSyncSend; 1511 } 1512 1513 /** 1514 * Set true if always require messages to be sync sent 1515 * 1516 * @param alwaysSyncSend 1517 */ 1518 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 1519 this.alwaysSyncSend = alwaysSyncSend; 1520 } 1521 1522 /** 1523 * @return the messagePrioritySupported 1524 */ 1525 public boolean isMessagePrioritySupported() { 1526 return this.messagePrioritySupported; 1527 } 1528 1529 /** 1530 * @param messagePrioritySupported the messagePrioritySupported to set 1531 */ 1532 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 1533 this.messagePrioritySupported = messagePrioritySupported; 1534 } 1535 1536 /** 1537 * Cleans up this connection so that it's state is as if the connection was 1538 * just created. This allows the Resource Adapter to clean up a connection 1539 * so that it can be reused without having to close and recreate the 1540 * connection. 1541 */ 1542 public void cleanup() throws JMSException { 1543 1544 if (advisoryConsumer != null && !isTransportFailed()) { 1545 advisoryConsumer.dispose(); 1546 advisoryConsumer = null; 1547 } 1548 1549 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1550 ActiveMQSession s = i.next(); 1551 s.dispose(); 1552 } 1553 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 1554 ActiveMQConnectionConsumer c = i.next(); 1555 c.dispose(); 1556 } 1557 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 1558 ActiveMQInputStream c = i.next(); 1559 c.dispose(); 1560 } 1561 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 1562 ActiveMQOutputStream c = i.next(); 1563 c.dispose(); 1564 } 1565 1566 if (isConnectionInfoSentToBroker) { 1567 if (!transportFailed.get() && !closing.get()) { 1568 syncSendPacket(info.createRemoveCommand()); 1569 } 1570 isConnectionInfoSentToBroker = false; 1571 } 1572 if (userSpecifiedClientID) { 1573 info.setClientId(null); 1574 userSpecifiedClientID = false; 1575 } 1576 clientIDSet = false; 1577 1578 started.set(false); 1579 } 1580 1581 public void finalize() throws Throwable{ 1582 Scheduler s = this.scheduler; 1583 if (s != null){ 1584 s.stop(); 1585 } 1586 } 1587 1588 /** 1589 * Changes the associated username/password that is associated with this 1590 * connection. If the connection has been used, you must called cleanup() 1591 * before calling this method. 1592 * 1593 * @throws IllegalStateException if the connection is in used. 1594 */ 1595 public void changeUserInfo(String userName, String password) throws JMSException { 1596 if (isConnectionInfoSentToBroker) { 1597 throw new IllegalStateException("changeUserInfo used Connection is not allowed"); 1598 } 1599 this.info.setUserName(userName); 1600 this.info.setPassword(password); 1601 } 1602 1603 /** 1604 * @return Returns the resourceManagerId. 1605 * @throws JMSException 1606 */ 1607 public String getResourceManagerId() throws JMSException { 1608 waitForBrokerInfo(); 1609 if (brokerInfo == null) { 1610 throw new JMSException("Connection failed before Broker info was received."); 1611 } 1612 return brokerInfo.getBrokerId().getValue(); 1613 } 1614 1615 /** 1616 * Returns the broker name if one is available or null if one is not 1617 * available yet. 1618 */ 1619 public String getBrokerName() { 1620 try { 1621 brokerInfoReceived.await(5, TimeUnit.SECONDS); 1622 if (brokerInfo == null) { 1623 return null; 1624 } 1625 return brokerInfo.getBrokerName(); 1626 } catch (InterruptedException e) { 1627 Thread.currentThread().interrupt(); 1628 return null; 1629 } 1630 } 1631 1632 /** 1633 * Returns the broker information if it is available or null if it is not 1634 * available yet. 1635 */ 1636 public BrokerInfo getBrokerInfo() { 1637 return brokerInfo; 1638 } 1639 1640 /** 1641 * @return Returns the RedeliveryPolicy. 1642 * @throws JMSException 1643 */ 1644 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1645 return redeliveryPolicy; 1646 } 1647 1648 /** 1649 * Sets the redelivery policy to be used when messages are rolled back 1650 */ 1651 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1652 this.redeliveryPolicy = redeliveryPolicy; 1653 } 1654 1655 public BlobTransferPolicy getBlobTransferPolicy() { 1656 if (blobTransferPolicy == null) { 1657 blobTransferPolicy = createBlobTransferPolicy(); 1658 } 1659 return blobTransferPolicy; 1660 } 1661 1662 /** 1663 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1664 * OBjects) are transferred from producers to brokers to consumers 1665 */ 1666 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1667 this.blobTransferPolicy = blobTransferPolicy; 1668 } 1669 1670 /** 1671 * @return Returns the alwaysSessionAsync. 1672 */ 1673 public boolean isAlwaysSessionAsync() { 1674 return alwaysSessionAsync; 1675 } 1676 1677 /** 1678 * If this flag is set then a separate thread is not used for dispatching 1679 * messages for each Session in the Connection. However, a separate thread 1680 * is always used if there is more than one session, or the session isn't in 1681 * auto acknowledge or duplicates ok mode 1682 */ 1683 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1684 this.alwaysSessionAsync = alwaysSessionAsync; 1685 } 1686 1687 /** 1688 * @return Returns the optimizeAcknowledge. 1689 */ 1690 public boolean isOptimizeAcknowledge() { 1691 return optimizeAcknowledge; 1692 } 1693 1694 /** 1695 * Enables an optimised acknowledgement mode where messages are acknowledged 1696 * in batches rather than individually 1697 * 1698 * @param optimizeAcknowledge The optimizeAcknowledge to set. 1699 */ 1700 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 1701 this.optimizeAcknowledge = optimizeAcknowledge; 1702 } 1703 1704 /** 1705 * The max time in milliseconds between optimized ack batches 1706 * @param optimizeAcknowledgeTimeOut 1707 */ 1708 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 1709 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 1710 } 1711 1712 public long getOptimizeAcknowledgeTimeOut() { 1713 return optimizeAcknowledgeTimeOut; 1714 } 1715 1716 public long getWarnAboutUnstartedConnectionTimeout() { 1717 return warnAboutUnstartedConnectionTimeout; 1718 } 1719 1720 /** 1721 * Enables the timeout from a connection creation to when a warning is 1722 * generated if the connection is not properly started via {@link #start()} 1723 * and a message is received by a consumer. It is a very common gotcha to 1724 * forget to <a 1725 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1726 * the connection</a> so this option makes the default case to create a 1727 * warning if the user forgets. To disable the warning just set the value to < 1728 * 0 (say -1). 1729 */ 1730 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1731 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1732 } 1733 1734 /** 1735 * @return the sendTimeout 1736 */ 1737 public int getSendTimeout() { 1738 return sendTimeout; 1739 } 1740 1741 /** 1742 * @param sendTimeout the sendTimeout to set 1743 */ 1744 public void setSendTimeout(int sendTimeout) { 1745 this.sendTimeout = sendTimeout; 1746 } 1747 1748 /** 1749 * @return the sendAcksAsync 1750 */ 1751 public boolean isSendAcksAsync() { 1752 return sendAcksAsync; 1753 } 1754 1755 /** 1756 * @param sendAcksAsync the sendAcksAsync to set 1757 */ 1758 public void setSendAcksAsync(boolean sendAcksAsync) { 1759 this.sendAcksAsync = sendAcksAsync; 1760 } 1761 1762 1763 /** 1764 * Returns the time this connection was created 1765 */ 1766 public long getTimeCreated() { 1767 return timeCreated; 1768 } 1769 1770 private void waitForBrokerInfo() throws JMSException { 1771 try { 1772 brokerInfoReceived.await(); 1773 } catch (InterruptedException e) { 1774 Thread.currentThread().interrupt(); 1775 throw JMSExceptionSupport.create(e); 1776 } 1777 } 1778 1779 // Package protected so that it can be used in unit tests 1780 public Transport getTransport() { 1781 return transport; 1782 } 1783 1784 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1785 producers.put(producerId, producer); 1786 } 1787 1788 public void removeProducer(ProducerId producerId) { 1789 producers.remove(producerId); 1790 } 1791 1792 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1793 dispatchers.put(consumerId, dispatcher); 1794 } 1795 1796 public void removeDispatcher(ConsumerId consumerId) { 1797 dispatchers.remove(consumerId); 1798 } 1799 1800 /** 1801 * @param o - the command to consume 1802 */ 1803 public void onCommand(final Object o) { 1804 final Command command = (Command)o; 1805 if (!closed.get() && command != null) { 1806 try { 1807 command.visit(new CommandVisitorAdapter() { 1808 @Override 1809 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1810 waitForTransportInterruptionProcessingToComplete(); 1811 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 1812 if (dispatcher != null) { 1813 // Copy in case a embedded broker is dispatching via 1814 // vm:// 1815 // md.getMessage() == null to signal end of queue 1816 // browse. 1817 Message msg = md.getMessage(); 1818 if (msg != null) { 1819 msg = msg.copy(); 1820 msg.setReadOnlyBody(true); 1821 msg.setReadOnlyProperties(true); 1822 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1823 msg.setConnection(ActiveMQConnection.this); 1824 md.setMessage(msg); 1825 } 1826 dispatcher.dispatch(md); 1827 } 1828 return null; 1829 } 1830 1831 @Override 1832 public Response processProducerAck(ProducerAck pa) throws Exception { 1833 if (pa != null && pa.getProducerId() != null) { 1834 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1835 if (producer != null) { 1836 producer.onProducerAck(pa); 1837 } 1838 } 1839 return null; 1840 } 1841 1842 @Override 1843 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1844 brokerInfo = info; 1845 brokerInfoReceived.countDown(); 1846 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1847 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1848 return null; 1849 } 1850 1851 @Override 1852 public Response processConnectionError(final ConnectionError error) throws Exception { 1853 executor.execute(new Runnable() { 1854 public void run() { 1855 onAsyncException(error.getException()); 1856 } 1857 }); 1858 return null; 1859 } 1860 1861 @Override 1862 public Response processControlCommand(ControlCommand command) throws Exception { 1863 onControlCommand(command); 1864 return null; 1865 } 1866 1867 @Override 1868 public Response processConnectionControl(ConnectionControl control) throws Exception { 1869 onConnectionControl((ConnectionControl)command); 1870 return null; 1871 } 1872 1873 @Override 1874 public Response processConsumerControl(ConsumerControl control) throws Exception { 1875 onConsumerControl((ConsumerControl)command); 1876 return null; 1877 } 1878 1879 @Override 1880 public Response processWireFormat(WireFormatInfo info) throws Exception { 1881 onWireFormatInfo((WireFormatInfo)command); 1882 return null; 1883 } 1884 }); 1885 } catch (Exception e) { 1886 onClientInternalException(e); 1887 } 1888 1889 } 1890 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1891 TransportListener listener = iter.next(); 1892 listener.onCommand(command); 1893 } 1894 } 1895 1896 protected void onWireFormatInfo(WireFormatInfo info) { 1897 protocolVersion.set(info.getVersion()); 1898 } 1899 1900 /** 1901 * Handles async client internal exceptions. 1902 * A client internal exception is usually one that has been thrown 1903 * by a container runtime component during asynchronous processing of a 1904 * message that does not affect the connection itself. 1905 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking 1906 * its <code>onException</code> method, if one has been registered with this connection. 1907 * 1908 * @param error the exception that the problem 1909 */ 1910 public void onClientInternalException(final Throwable error) { 1911 if ( !closed.get() && !closing.get() ) { 1912 if ( this.clientInternalExceptionListener != null ) { 1913 executor.execute(new Runnable() { 1914 public void run() { 1915 ActiveMQConnection.this.clientInternalExceptionListener.onException(error); 1916 } 1917 }); 1918 } else { 1919 LOG.debug("Async client internal exception occurred with no exception listener registered: " 1920 + error, error); 1921 } 1922 } 1923 } 1924 /** 1925 * Used for handling async exceptions 1926 * 1927 * @param error 1928 */ 1929 public void onAsyncException(Throwable error) { 1930 if (!closed.get() && !closing.get()) { 1931 if (this.exceptionListener != null) { 1932 1933 if (!(error instanceof JMSException)) { 1934 error = JMSExceptionSupport.create(error); 1935 } 1936 final JMSException e = (JMSException)error; 1937 1938 executor.execute(new Runnable() { 1939 public void run() { 1940 ActiveMQConnection.this.exceptionListener.onException(e); 1941 } 1942 }); 1943 1944 } else { 1945 LOG.debug("Async exception with no exception listener: " + error, error); 1946 } 1947 } 1948 } 1949 1950 public void onException(final IOException error) { 1951 onAsyncException(error); 1952 if (!closing.get() && !closed.get()) { 1953 executor.execute(new Runnable() { 1954 public void run() { 1955 transportFailed(error); 1956 ServiceSupport.dispose(ActiveMQConnection.this.transport); 1957 brokerInfoReceived.countDown(); 1958 try { 1959 cleanup(); 1960 } catch (JMSException e) { 1961 LOG.warn("Exception during connection cleanup, " + e, e); 1962 } 1963 for (Iterator<TransportListener> iter = transportListeners 1964 .iterator(); iter.hasNext();) { 1965 TransportListener listener = iter.next(); 1966 listener.onException(error); 1967 } 1968 } 1969 }); 1970 } 1971 } 1972 1973 public void transportInterupted() { 1974 this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); 1975 if (LOG.isDebugEnabled()) { 1976 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount()); 1977 } 1978 signalInterruptionProcessingNeeded(); 1979 1980 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1981 ActiveMQSession s = i.next(); 1982 s.clearMessagesInProgress(); 1983 } 1984 1985 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { 1986 connectionConsumer.clearMessagesInProgress(); 1987 } 1988 1989 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1990 TransportListener listener = iter.next(); 1991 listener.transportInterupted(); 1992 } 1993 } 1994 1995 public void transportResumed() { 1996 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1997 TransportListener listener = iter.next(); 1998 listener.transportResumed(); 1999 } 2000 } 2001 2002 /** 2003 * Create the DestinationInfo object for the temporary destination. 2004 * 2005 * @param topic - if its true topic, else queue. 2006 * @return DestinationInfo 2007 * @throws JMSException 2008 */ 2009 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 2010 2011 // Check if Destination info is of temporary type. 2012 ActiveMQTempDestination dest; 2013 if (topic) { 2014 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2015 } else { 2016 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2017 } 2018 2019 DestinationInfo info = new DestinationInfo(); 2020 info.setConnectionId(this.info.getConnectionId()); 2021 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 2022 info.setDestination(dest); 2023 syncSendPacket(info); 2024 2025 dest.setConnection(this); 2026 activeTempDestinations.put(dest, dest); 2027 return dest; 2028 } 2029 2030 /** 2031 * @param destination 2032 * @throws JMSException 2033 */ 2034 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 2035 2036 checkClosedOrFailed(); 2037 2038 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2039 ActiveMQSession s = i.next(); 2040 if (s.isInUse(destination)) { 2041 throw new JMSException("A consumer is consuming from the temporary destination"); 2042 } 2043 } 2044 2045 activeTempDestinations.remove(destination); 2046 2047 DestinationInfo destInfo = new DestinationInfo(); 2048 destInfo.setConnectionId(this.info.getConnectionId()); 2049 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2050 destInfo.setDestination(destination); 2051 destInfo.setTimeout(0); 2052 syncSendPacket(destInfo); 2053 } 2054 2055 public boolean isDeleted(ActiveMQDestination dest) { 2056 2057 // If we are not watching the advisories.. then 2058 // we will assume that the temp destination does exist. 2059 if (advisoryConsumer == null) { 2060 return false; 2061 } 2062 2063 return !activeTempDestinations.contains(dest); 2064 } 2065 2066 public boolean isCopyMessageOnSend() { 2067 return copyMessageOnSend; 2068 } 2069 2070 public LongSequenceGenerator getLocalTransactionIdGenerator() { 2071 return localTransactionIdGenerator; 2072 } 2073 2074 public boolean isUseCompression() { 2075 return useCompression; 2076 } 2077 2078 /** 2079 * Enables the use of compression of the message bodies 2080 */ 2081 public void setUseCompression(boolean useCompression) { 2082 this.useCompression = useCompression; 2083 } 2084 2085 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 2086 2087 checkClosedOrFailed(); 2088 ensureConnectionInfoSent(); 2089 2090 DestinationInfo info = new DestinationInfo(); 2091 info.setConnectionId(this.info.getConnectionId()); 2092 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2093 info.setDestination(destination); 2094 info.setTimeout(0); 2095 syncSendPacket(info); 2096 2097 } 2098 2099 public boolean isDispatchAsync() { 2100 return dispatchAsync; 2101 } 2102 2103 /** 2104 * Enables or disables the default setting of whether or not consumers have 2105 * their messages <a 2106 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 2107 * synchronously or asynchronously by the broker</a>. For non-durable 2108 * topics for example we typically dispatch synchronously by default to 2109 * minimize context switches which boost performance. However sometimes its 2110 * better to go slower to ensure that a single blocked consumer socket does 2111 * not block delivery to other consumers. 2112 * 2113 * @param asyncDispatch If true then consumers created on this connection 2114 * will default to having their messages dispatched 2115 * asynchronously. The default value is true. 2116 */ 2117 public void setDispatchAsync(boolean asyncDispatch) { 2118 this.dispatchAsync = asyncDispatch; 2119 } 2120 2121 public boolean isObjectMessageSerializationDefered() { 2122 return objectMessageSerializationDefered; 2123 } 2124 2125 /** 2126 * When an object is set on an ObjectMessage, the JMS spec requires the 2127 * object to be serialized by that set method. Enabling this flag causes the 2128 * object to not get serialized. The object may subsequently get serialized 2129 * if the message needs to be sent over a socket or stored to disk. 2130 */ 2131 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 2132 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 2133 } 2134 2135 public InputStream createInputStream(Destination dest) throws JMSException { 2136 return createInputStream(dest, null); 2137 } 2138 2139 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { 2140 return createInputStream(dest, messageSelector, false); 2141 } 2142 2143 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { 2144 return createInputStream(dest, messageSelector, noLocal, -1); 2145 } 2146 2147 2148 2149 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2150 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); 2151 } 2152 2153 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { 2154 return createInputStream(dest, null, false); 2155 } 2156 2157 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { 2158 return createDurableInputStream(dest, name, messageSelector, false); 2159 } 2160 2161 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { 2162 return createDurableInputStream(dest, name, messageSelector, noLocal, -1); 2163 } 2164 2165 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2166 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); 2167 } 2168 2169 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { 2170 checkClosedOrFailed(); 2171 ensureConnectionInfoSent(); 2172 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout); 2173 } 2174 2175 /** 2176 * Creates a persistent output stream; individual messages will be written 2177 * to disk/database by the broker 2178 */ 2179 public OutputStream createOutputStream(Destination dest) throws JMSException { 2180 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2181 } 2182 2183 /** 2184 * Creates a non persistent output stream; messages will not be written to 2185 * disk 2186 */ 2187 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { 2188 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2189 } 2190 2191 /** 2192 * Creates an output stream allowing full control over the delivery mode, 2193 * the priority and time to live of the messages and the properties added to 2194 * messages on the stream. 2195 * 2196 * @param streamProperties defines a map of key-value pairs where the keys 2197 * are strings and the values are primitive values (numbers 2198 * and strings) which are appended to the messages similarly 2199 * to using the 2200 * {@link javax.jms.Message#setObjectProperty(String, Object)} 2201 * method 2202 */ 2203 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { 2204 checkClosedOrFailed(); 2205 ensureConnectionInfoSent(); 2206 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); 2207 } 2208 2209 /** 2210 * Unsubscribes a durable subscription that has been created by a client. 2211 * <P> 2212 * This method deletes the state being maintained on behalf of the 2213 * subscriber by its provider. 2214 * <P> 2215 * It is erroneous for a client to delete a durable subscription while there 2216 * is an active <CODE>MessageConsumer </CODE> or 2217 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 2218 * message is part of a pending transaction or has not been acknowledged in 2219 * the session. 2220 * 2221 * @param name the name used to identify this subscription 2222 * @throws JMSException if the session fails to unsubscribe to the durable 2223 * subscription due to some internal error. 2224 * @throws InvalidDestinationException if an invalid subscription name is 2225 * specified. 2226 * @since 1.1 2227 */ 2228 public void unsubscribe(String name) throws InvalidDestinationException, JMSException { 2229 checkClosedOrFailed(); 2230 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 2231 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 2232 rsi.setSubscriptionName(name); 2233 rsi.setClientId(getConnectionInfo().getClientId()); 2234 syncSendPacket(rsi); 2235 } 2236 2237 /** 2238 * Internal send method optimized: - It does not copy the message - It can 2239 * only handle ActiveMQ messages. - You can specify if the send is async or 2240 * sync - Does not allow you to send /w a transaction. 2241 */ 2242 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 2243 checkClosedOrFailed(); 2244 2245 if (destination.isTemporary() && isDeleted(destination)) { 2246 throw new JMSException("Cannot publish to a deleted Destination: " + destination); 2247 } 2248 2249 msg.setJMSDestination(destination); 2250 msg.setJMSDeliveryMode(deliveryMode); 2251 long expiration = 0L; 2252 2253 if (!isDisableTimeStampsByDefault()) { 2254 long timeStamp = System.currentTimeMillis(); 2255 msg.setJMSTimestamp(timeStamp); 2256 if (timeToLive > 0) { 2257 expiration = timeToLive + timeStamp; 2258 } 2259 } 2260 2261 msg.setJMSExpiration(expiration); 2262 msg.setJMSPriority(priority); 2263 2264 msg.setJMSRedelivered(false); 2265 msg.setMessageId(messageId); 2266 2267 msg.onSend(); 2268 2269 msg.setProducerId(msg.getMessageId().getProducerId()); 2270 2271 if (LOG.isDebugEnabled()) { 2272 LOG.debug("Sending message: " + msg); 2273 } 2274 2275 if (async) { 2276 asyncSendPacket(msg); 2277 } else { 2278 syncSendPacket(msg); 2279 } 2280 2281 } 2282 2283 public void addOutputStream(ActiveMQOutputStream stream) { 2284 outputStreams.add(stream); 2285 } 2286 2287 public void removeOutputStream(ActiveMQOutputStream stream) { 2288 outputStreams.remove(stream); 2289 } 2290 2291 public void addInputStream(ActiveMQInputStream stream) { 2292 inputStreams.add(stream); 2293 } 2294 2295 public void removeInputStream(ActiveMQInputStream stream) { 2296 inputStreams.remove(stream); 2297 } 2298 2299 protected void onControlCommand(ControlCommand command) { 2300 String text = command.getCommand(); 2301 if (text != null) { 2302 if ("shutdown".equals(text)) { 2303 LOG.info("JVM told to shutdown"); 2304 System.exit(0); 2305 } 2306 if (false && "close".equals(text)){ 2307 LOG.error("Broker " + getBrokerInfo() + "shutdown connection"); 2308 try { 2309 close(); 2310 } catch (JMSException e) { 2311 } 2312 } 2313 } 2314 } 2315 2316 protected void onConnectionControl(ConnectionControl command) { 2317 if (command.isFaultTolerant()) { 2318 this.optimizeAcknowledge = false; 2319 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2320 ActiveMQSession s = i.next(); 2321 s.setOptimizeAcknowledge(false); 2322 } 2323 } 2324 } 2325 2326 protected void onConsumerControl(ConsumerControl command) { 2327 if (command.isClose()) { 2328 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2329 ActiveMQSession s = i.next(); 2330 s.close(command.getConsumerId()); 2331 } 2332 } else { 2333 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2334 ActiveMQSession s = i.next(); 2335 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch()); 2336 } 2337 } 2338 } 2339 2340 protected void transportFailed(IOException error) { 2341 transportFailed.set(true); 2342 if (firstFailureError == null) { 2343 firstFailureError = error; 2344 } 2345 } 2346 2347 /** 2348 * Should a JMS message be copied to a new JMS Message object as part of the 2349 * send() method in JMS. This is enabled by default to be compliant with the 2350 * JMS specification. You can disable it if you do not mutate JMS messages 2351 * after they are sent for a performance boost 2352 */ 2353 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2354 this.copyMessageOnSend = copyMessageOnSend; 2355 } 2356 2357 @Override 2358 public String toString() { 2359 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; 2360 } 2361 2362 protected BlobTransferPolicy createBlobTransferPolicy() { 2363 return new BlobTransferPolicy(); 2364 } 2365 2366 public int getProtocolVersion() { 2367 return protocolVersion.get(); 2368 } 2369 2370 public int getProducerWindowSize() { 2371 return producerWindowSize; 2372 } 2373 2374 public void setProducerWindowSize(int producerWindowSize) { 2375 this.producerWindowSize = producerWindowSize; 2376 } 2377 2378 public void setAuditDepth(int auditDepth) { 2379 connectionAudit.setAuditDepth(auditDepth); 2380 } 2381 2382 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 2383 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); 2384 } 2385 2386 protected void removeDispatcher(ActiveMQDispatcher dispatcher) { 2387 connectionAudit.removeDispatcher(dispatcher); 2388 } 2389 2390 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2391 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message); 2392 } 2393 2394 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2395 connectionAudit.rollbackDuplicate(dispatcher, message); 2396 } 2397 2398 public IOException getFirstFailureError() { 2399 return firstFailureError; 2400 } 2401 2402 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { 2403 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2404 if (cdl != null) { 2405 if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) { 2406 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete.."); 2407 cdl.await(10, TimeUnit.SECONDS); 2408 } 2409 signalInterruptionProcessingComplete(); 2410 } 2411 } 2412 2413 protected void transportInterruptionProcessingComplete() { 2414 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2415 if (cdl != null) { 2416 cdl.countDown(); 2417 try { 2418 signalInterruptionProcessingComplete(); 2419 } catch (InterruptedException ignored) {} 2420 } 2421 } 2422 2423 private void signalInterruptionProcessingComplete() throws InterruptedException { 2424 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2425 if (cdl.getCount()==0) { 2426 if (LOG.isDebugEnabled()) { 2427 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId()); 2428 } 2429 this.transportInterruptionProcessingComplete = null; 2430 2431 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2432 if (failoverTransport != null) { 2433 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); 2434 if (LOG.isDebugEnabled()) { 2435 LOG.debug("notified failover transport (" + failoverTransport 2436 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId()); 2437 } 2438 } 2439 2440 } 2441 } 2442 2443 private void signalInterruptionProcessingNeeded() { 2444 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2445 if (failoverTransport != null) { 2446 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId()); 2447 if (LOG.isDebugEnabled()) { 2448 LOG.debug("notified failover transport (" + failoverTransport 2449 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId()); 2450 } 2451 } 2452 } 2453 2454 /* 2455 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery 2456 * will wait to receive re dispatched messages. 2457 * default value is 0 so there is no wait by default. 2458 */ 2459 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 2460 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 2461 } 2462 2463 public long getConsumerFailoverRedeliveryWaitPeriod() { 2464 return consumerFailoverRedeliveryWaitPeriod; 2465 } 2466 2467 protected Scheduler getScheduler() throws JMSException { 2468 Scheduler result = scheduler; 2469 if (result == null) { 2470 synchronized (this) { 2471 result = scheduler; 2472 if (result == null) { 2473 checkClosed(); 2474 try { 2475 result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler"); 2476 scheduler.start(); 2477 } catch(Exception e) { 2478 throw JMSExceptionSupport.create(e); 2479 } 2480 } 2481 } 2482 } 2483 return result; 2484 } 2485 2486 protected ThreadPoolExecutor getExecutor() { 2487 return this.executor; 2488 } 2489 2490 /** 2491 * @return the checkForDuplicates 2492 */ 2493 public boolean isCheckForDuplicates() { 2494 return this.checkForDuplicates; 2495 } 2496 2497 /** 2498 * @param checkForDuplicates the checkForDuplicates to set 2499 */ 2500 public void setCheckForDuplicates(boolean checkForDuplicates) { 2501 this.checkForDuplicates = checkForDuplicates; 2502 } 2503 2504 2505 public boolean isTransactedIndividualAck() { 2506 return transactedIndividualAck; 2507 } 2508 2509 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 2510 this.transactedIndividualAck = transactedIndividualAck; 2511 } 2512 2513 public boolean isNonBlockingRedelivery() { 2514 return nonBlockingRedelivery; 2515 } 2516 2517 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 2518 this.nonBlockingRedelivery = nonBlockingRedelivery; 2519 } 2520 2521 /** 2522 * Removes any TempDestinations that this connection has cached, ignoring 2523 * any exceptions generated because the destination is in use as they should 2524 * not be removed. 2525 */ 2526 public void cleanUpTempDestinations() { 2527 2528 if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) { 2529 return; 2530 } 2531 2532 Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries 2533 = this.activeTempDestinations.entrySet().iterator(); 2534 while(entries.hasNext()) { 2535 ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next(); 2536 try { 2537 // Only delete this temp destination if it was created from this connection. The connection used 2538 // for the advisory consumer may also have a reference to this temp destination. 2539 ActiveMQTempDestination dest = entry.getValue(); 2540 String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString(); 2541 if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) { 2542 this.deleteTempDestination(entry.getValue()); 2543 } 2544 } catch (Exception ex) { 2545 // the temp dest is in use so it can not be deleted. 2546 // it is ok to leave it to connection tear down phase 2547 } 2548 } 2549 } 2550}