001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.File; 020 import java.io.InputStream; 021 import java.io.Serializable; 022 import java.net.URL; 023 import java.util.Collections; 024 import java.util.Iterator; 025 import java.util.List; 026 import java.util.concurrent.CopyOnWriteArrayList; 027 import java.util.concurrent.ThreadPoolExecutor; 028 import java.util.concurrent.atomic.AtomicBoolean; 029 030 import javax.jms.BytesMessage; 031 import javax.jms.Destination; 032 import javax.jms.IllegalStateException; 033 import javax.jms.InvalidDestinationException; 034 import javax.jms.InvalidSelectorException; 035 import javax.jms.JMSException; 036 import javax.jms.MapMessage; 037 import javax.jms.Message; 038 import javax.jms.MessageConsumer; 039 import javax.jms.MessageListener; 040 import javax.jms.MessageProducer; 041 import javax.jms.ObjectMessage; 042 import javax.jms.Queue; 043 import javax.jms.QueueBrowser; 044 import javax.jms.QueueReceiver; 045 import javax.jms.QueueSender; 046 import javax.jms.QueueSession; 047 import javax.jms.Session; 048 import javax.jms.StreamMessage; 049 import javax.jms.TemporaryQueue; 050 import javax.jms.TemporaryTopic; 051 import javax.jms.TextMessage; 052 import javax.jms.Topic; 053 import javax.jms.TopicPublisher; 054 import javax.jms.TopicSession; 055 import javax.jms.TopicSubscriber; 056 import javax.jms.TransactionRolledBackException; 057 058 import org.apache.activemq.blob.BlobDownloader; 059 import org.apache.activemq.blob.BlobTransferPolicy; 060 import org.apache.activemq.blob.BlobUploader; 061 import org.apache.activemq.command.ActiveMQBlobMessage; 062 import org.apache.activemq.command.ActiveMQBytesMessage; 063 import org.apache.activemq.command.ActiveMQDestination; 064 import org.apache.activemq.command.ActiveMQMapMessage; 065 import org.apache.activemq.command.ActiveMQMessage; 066 import org.apache.activemq.command.ActiveMQObjectMessage; 067 import org.apache.activemq.command.ActiveMQQueue; 068 import org.apache.activemq.command.ActiveMQStreamMessage; 069 import org.apache.activemq.command.ActiveMQTempDestination; 070 import org.apache.activemq.command.ActiveMQTempQueue; 071 import org.apache.activemq.command.ActiveMQTempTopic; 072 import org.apache.activemq.command.ActiveMQTextMessage; 073 import org.apache.activemq.command.ActiveMQTopic; 074 import org.apache.activemq.command.Command; 075 import org.apache.activemq.command.ConsumerId; 076 import org.apache.activemq.command.MessageAck; 077 import org.apache.activemq.command.MessageDispatch; 078 import org.apache.activemq.command.MessageId; 079 import org.apache.activemq.command.ProducerId; 080 import org.apache.activemq.command.RemoveInfo; 081 import org.apache.activemq.command.Response; 082 import org.apache.activemq.command.SessionId; 083 import org.apache.activemq.command.SessionInfo; 084 import org.apache.activemq.command.TransactionId; 085 import org.apache.activemq.management.JMSSessionStatsImpl; 086 import org.apache.activemq.management.StatsCapable; 087 import org.apache.activemq.management.StatsImpl; 088 import org.apache.activemq.thread.Scheduler; 089 import org.apache.activemq.transaction.Synchronization; 090 import org.apache.activemq.usage.MemoryUsage; 091 import org.apache.activemq.util.Callback; 092 import org.apache.activemq.util.JMSExceptionSupport; 093 import org.apache.activemq.util.LongSequenceGenerator; 094 import org.slf4j.Logger; 095 import org.slf4j.LoggerFactory; 096 097 /** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected Object sendMutex = new Object(); 223 224 private MessageListener messageListener; 225 private final JMSSessionStatsImpl stats; 226 private TransactionContext transactionContext; 227 private DeliveryListener deliveryListener; 228 private MessageTransformer transformer; 229 private BlobTransferPolicy blobTransferPolicy; 230 private long lastDeliveredSequenceId; 231 232 /** 233 * Construct the Session 234 * 235 * @param connection 236 * @param sessionId 237 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 238 * Session.SESSION_TRANSACTED 239 * @param asyncDispatch 240 * @param sessionAsyncDispatch 241 * @throws JMSException on internal error 242 */ 243 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 244 this.debug = LOG.isDebugEnabled(); 245 this.connection = connection; 246 this.acknowledgementMode = acknowledgeMode; 247 this.asyncDispatch = asyncDispatch; 248 this.sessionAsyncDispatch = sessionAsyncDispatch; 249 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 250 setTransactionContext(new TransactionContext(connection)); 251 stats = new JMSSessionStatsImpl(producers, consumers); 252 this.connection.asyncSendPacket(info); 253 setTransformer(connection.getTransformer()); 254 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 255 this.connectionExecutor=connection.getExecutor(); 256 this.executor = new ActiveMQSessionExecutor(this); 257 connection.addSession(this); 258 if (connection.isStarted()) { 259 start(); 260 } 261 262 } 263 264 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 265 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 266 } 267 268 /** 269 * Sets the transaction context of the session. 270 * 271 * @param transactionContext - provides the means to control a JMS 272 * transaction. 273 */ 274 public void setTransactionContext(TransactionContext transactionContext) { 275 this.transactionContext = transactionContext; 276 } 277 278 /** 279 * Returns the transaction context of the session. 280 * 281 * @return transactionContext - session's transaction context. 282 */ 283 public TransactionContext getTransactionContext() { 284 return transactionContext; 285 } 286 287 /* 288 * (non-Javadoc) 289 * 290 * @see org.apache.activemq.management.StatsCapable#getStats() 291 */ 292 public StatsImpl getStats() { 293 return stats; 294 } 295 296 /** 297 * Returns the session's statistics. 298 * 299 * @return stats - session's statistics. 300 */ 301 public JMSSessionStatsImpl getSessionStats() { 302 return stats; 303 } 304 305 /** 306 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 307 * object is used to send a message containing a stream of uninterpreted 308 * bytes. 309 * 310 * @return the an ActiveMQBytesMessage 311 * @throws JMSException if the JMS provider fails to create this message due 312 * to some internal error. 313 */ 314 public BytesMessage createBytesMessage() throws JMSException { 315 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 316 configureMessage(message); 317 return message; 318 } 319 320 /** 321 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 322 * object is used to send a self-defining set of name-value pairs, where 323 * names are <CODE>String</CODE> objects and values are primitive values 324 * in the Java programming language. 325 * 326 * @return an ActiveMQMapMessage 327 * @throws JMSException if the JMS provider fails to create this message due 328 * to some internal error. 329 */ 330 public MapMessage createMapMessage() throws JMSException { 331 ActiveMQMapMessage message = new ActiveMQMapMessage(); 332 configureMessage(message); 333 return message; 334 } 335 336 /** 337 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 338 * interface is the root interface of all JMS messages. A 339 * <CODE>Message</CODE> object holds all the standard message header 340 * information. It can be sent when a message containing only header 341 * information is sufficient. 342 * 343 * @return an ActiveMQMessage 344 * @throws JMSException if the JMS provider fails to create this message due 345 * to some internal error. 346 */ 347 public Message createMessage() throws JMSException { 348 ActiveMQMessage message = new ActiveMQMessage(); 349 configureMessage(message); 350 return message; 351 } 352 353 /** 354 * Creates an <CODE>ObjectMessage</CODE> object. An 355 * <CODE>ObjectMessage</CODE> object is used to send a message that 356 * contains a serializable Java object. 357 * 358 * @return an ActiveMQObjectMessage 359 * @throws JMSException if the JMS provider fails to create this message due 360 * to some internal error. 361 */ 362 public ObjectMessage createObjectMessage() throws JMSException { 363 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 364 configureMessage(message); 365 return message; 366 } 367 368 /** 369 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 370 * <CODE>ObjectMessage</CODE> object is used to send a message that 371 * contains a serializable Java object. 372 * 373 * @param object the object to use to initialize this message 374 * @return an ActiveMQObjectMessage 375 * @throws JMSException if the JMS provider fails to create this message due 376 * to some internal error. 377 */ 378 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 379 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 380 configureMessage(message); 381 message.setObject(object); 382 return message; 383 } 384 385 /** 386 * Creates a <CODE>StreamMessage</CODE> object. A 387 * <CODE>StreamMessage</CODE> object is used to send a self-defining 388 * stream of primitive values in the Java programming language. 389 * 390 * @return an ActiveMQStreamMessage 391 * @throws JMSException if the JMS provider fails to create this message due 392 * to some internal error. 393 */ 394 public StreamMessage createStreamMessage() throws JMSException { 395 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 396 configureMessage(message); 397 return message; 398 } 399 400 /** 401 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 402 * object is used to send a message containing a <CODE>String</CODE> 403 * object. 404 * 405 * @return an ActiveMQTextMessage 406 * @throws JMSException if the JMS provider fails to create this message due 407 * to some internal error. 408 */ 409 public TextMessage createTextMessage() throws JMSException { 410 ActiveMQTextMessage message = new ActiveMQTextMessage(); 411 configureMessage(message); 412 return message; 413 } 414 415 /** 416 * Creates an initialized <CODE>TextMessage</CODE> object. A 417 * <CODE>TextMessage</CODE> object is used to send a message containing a 418 * <CODE>String</CODE>. 419 * 420 * @param text the string used to initialize this message 421 * @return an ActiveMQTextMessage 422 * @throws JMSException if the JMS provider fails to create this message due 423 * to some internal error. 424 */ 425 public TextMessage createTextMessage(String text) throws JMSException { 426 ActiveMQTextMessage message = new ActiveMQTextMessage(); 427 message.setText(text); 428 configureMessage(message); 429 return message; 430 } 431 432 /** 433 * Creates an initialized <CODE>BlobMessage</CODE> object. A 434 * <CODE>BlobMessage</CODE> object is used to send a message containing a 435 * <CODE>URL</CODE> which points to some network addressible BLOB. 436 * 437 * @param url the network addressable URL used to pass directly to the 438 * consumer 439 * @return a BlobMessage 440 * @throws JMSException if the JMS provider fails to create this message due 441 * to some internal error. 442 */ 443 public BlobMessage createBlobMessage(URL url) throws JMSException { 444 return createBlobMessage(url, false); 445 } 446 447 /** 448 * Creates an initialized <CODE>BlobMessage</CODE> object. A 449 * <CODE>BlobMessage</CODE> object is used to send a message containing a 450 * <CODE>URL</CODE> which points to some network addressible BLOB. 451 * 452 * @param url the network addressable URL used to pass directly to the 453 * consumer 454 * @param deletedByBroker indicates whether or not the resource is deleted 455 * by the broker when the message is acknowledged 456 * @return a BlobMessage 457 * @throws JMSException if the JMS provider fails to create this message due 458 * to some internal error. 459 */ 460 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 461 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 462 configureMessage(message); 463 message.setURL(url); 464 message.setDeletedByBroker(deletedByBroker); 465 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 466 return message; 467 } 468 469 /** 470 * Creates an initialized <CODE>BlobMessage</CODE> object. A 471 * <CODE>BlobMessage</CODE> object is used to send a message containing 472 * the <CODE>File</CODE> content. Before the message is sent the file 473 * conent will be uploaded to the broker or some other remote repository 474 * depending on the {@link #getBlobTransferPolicy()}. 475 * 476 * @param file the file to be uploaded to some remote repo (or the broker) 477 * depending on the strategy 478 * @return a BlobMessage 479 * @throws JMSException if the JMS provider fails to create this message due 480 * to some internal error. 481 */ 482 public BlobMessage createBlobMessage(File file) throws JMSException { 483 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 484 configureMessage(message); 485 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 486 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 487 message.setDeletedByBroker(true); 488 message.setName(file.getName()); 489 return message; 490 } 491 492 /** 493 * Creates an initialized <CODE>BlobMessage</CODE> object. A 494 * <CODE>BlobMessage</CODE> object is used to send a message containing 495 * the <CODE>File</CODE> content. Before the message is sent the file 496 * conent will be uploaded to the broker or some other remote repository 497 * depending on the {@link #getBlobTransferPolicy()}. 498 * 499 * @param in the stream to be uploaded to some remote repo (or the broker) 500 * depending on the strategy 501 * @return a BlobMessage 502 * @throws JMSException if the JMS provider fails to create this message due 503 * to some internal error. 504 */ 505 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 506 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 507 configureMessage(message); 508 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 509 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 510 message.setDeletedByBroker(true); 511 return message; 512 } 513 514 /** 515 * Indicates whether the session is in transacted mode. 516 * 517 * @return true if the session is in transacted mode 518 * @throws JMSException if there is some internal error. 519 */ 520 public boolean getTransacted() throws JMSException { 521 checkClosed(); 522 return isTransacted(); 523 } 524 525 /** 526 * Returns the acknowledgement mode of the session. The acknowledgement mode 527 * is set at the time that the session is created. If the session is 528 * transacted, the acknowledgement mode is ignored. 529 * 530 * @return If the session is not transacted, returns the current 531 * acknowledgement mode for the session. If the session is 532 * transacted, returns SESSION_TRANSACTED. 533 * @throws JMSException 534 * @see javax.jms.Connection#createSession(boolean,int) 535 * @since 1.1 exception JMSException if there is some internal error. 536 */ 537 public int getAcknowledgeMode() throws JMSException { 538 checkClosed(); 539 return this.acknowledgementMode; 540 } 541 542 /** 543 * Commits all messages done in this transaction and releases any locks 544 * currently held. 545 * 546 * @throws JMSException if the JMS provider fails to commit the transaction 547 * due to some internal error. 548 * @throws TransactionRolledBackException if the transaction is rolled back 549 * due to some internal error during commit. 550 * @throws javax.jms.IllegalStateException if the method is not called by a 551 * transacted session. 552 */ 553 public void commit() throws JMSException { 554 checkClosed(); 555 if (!getTransacted()) { 556 throw new javax.jms.IllegalStateException("Not a transacted session"); 557 } 558 if (LOG.isDebugEnabled()) { 559 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 560 } 561 transactionContext.commit(); 562 } 563 564 /** 565 * Rolls back any messages done in this transaction and releases any locks 566 * currently held. 567 * 568 * @throws JMSException if the JMS provider fails to roll back the 569 * transaction due to some internal error. 570 * @throws javax.jms.IllegalStateException if the method is not called by a 571 * transacted session. 572 */ 573 public void rollback() throws JMSException { 574 checkClosed(); 575 if (!getTransacted()) { 576 throw new javax.jms.IllegalStateException("Not a transacted session"); 577 } 578 if (LOG.isDebugEnabled()) { 579 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 580 } 581 transactionContext.rollback(); 582 } 583 584 /** 585 * Closes the session. 586 * <P> 587 * Since a provider may allocate some resources on behalf of a session 588 * outside the JVM, clients should close the resources when they are not 589 * needed. Relying on garbage collection to eventually reclaim these 590 * resources may not be timely enough. 591 * <P> 592 * There is no need to close the producers and consumers of a closed 593 * session. 594 * <P> 595 * This call will block until a <CODE>receive</CODE> call or message 596 * listener in progress has completed. A blocked message consumer 597 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 598 * is closed. 599 * <P> 600 * Closing a transacted session must roll back the transaction in progress. 601 * <P> 602 * This method is the only <CODE>Session</CODE> method that can be called 603 * concurrently. 604 * <P> 605 * Invoking any other <CODE>Session</CODE> method on a closed session must 606 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 607 * closed session must <I>not </I> throw an exception. 608 * 609 * @throws JMSException if the JMS provider fails to close the session due 610 * to some internal error. 611 */ 612 public void close() throws JMSException { 613 if (!closed) { 614 if (getTransactionContext().isInXATransaction()) { 615 if (!synchronizationRegistered) { 616 synchronizationRegistered = true; 617 getTransactionContext().addSynchronization(new Synchronization() { 618 619 @Override 620 public void afterCommit() throws Exception { 621 doClose(); 622 synchronizationRegistered = false; 623 } 624 625 @Override 626 public void afterRollback() throws Exception { 627 doClose(); 628 synchronizationRegistered = false; 629 } 630 }); 631 } 632 633 } else { 634 doClose(); 635 } 636 } 637 } 638 639 private void doClose() throws JMSException { 640 boolean interrupted = Thread.interrupted(); 641 dispose(); 642 RemoveInfo removeCommand = info.createRemoveCommand(); 643 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 644 connection.asyncSendPacket(removeCommand); 645 if (interrupted) { 646 Thread.currentThread().interrupt(); 647 } 648 } 649 650 void clearMessagesInProgress() { 651 executor.clearMessagesInProgress(); 652 // we are called from inside the transport reconnection logic 653 // which involves us clearing all the connections' consumers 654 // dispatch and delivered lists. So rather than trying to 655 // grab a mutex (which could be already owned by the message 656 // listener calling the send or an ack) we allow it to complete in 657 // a separate thread via the scheduler and notify us via 658 // connection.transportInterruptionProcessingComplete() 659 // 660 for (final ActiveMQMessageConsumer consumer : consumers) { 661 consumer.inProgressClearRequired(); 662 try { 663 connection.getScheduler().executeAfterDelay(new Runnable() { 664 public void run() { 665 consumer.clearMessagesInProgress(); 666 }}, 0l); 667 } catch (JMSException e) { 668 connection.onClientInternalException(e); 669 } 670 } 671 } 672 673 void deliverAcks() { 674 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 675 ActiveMQMessageConsumer consumer = iter.next(); 676 consumer.deliverAcks(); 677 } 678 } 679 680 public synchronized void dispose() throws JMSException { 681 if (!closed) { 682 683 try { 684 executor.stop(); 685 686 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 687 ActiveMQMessageConsumer consumer = iter.next(); 688 consumer.setFailureError(connection.getFirstFailureError()); 689 consumer.dispose(); 690 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 691 } 692 consumers.clear(); 693 694 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 695 ActiveMQMessageProducer producer = iter.next(); 696 producer.dispose(); 697 } 698 producers.clear(); 699 700 try { 701 if (getTransactionContext().isInLocalTransaction()) { 702 rollback(); 703 } 704 } catch (JMSException e) { 705 } 706 707 } finally { 708 connection.removeSession(this); 709 this.transactionContext = null; 710 closed = true; 711 } 712 } 713 } 714 715 /** 716 * Checks that the session is not closed then configures the message 717 */ 718 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 719 checkClosed(); 720 message.setConnection(connection); 721 } 722 723 /** 724 * Check if the session is closed. It is used for ensuring that the session 725 * is open before performing various operations. 726 * 727 * @throws IllegalStateException if the Session is closed 728 */ 729 protected void checkClosed() throws IllegalStateException { 730 if (closed) { 731 throw new IllegalStateException("The Session is closed"); 732 } 733 } 734 735 /** 736 * Checks if the session is closed. 737 * 738 * @return true if the session is closed, false otherwise. 739 */ 740 public boolean isClosed() { 741 return closed; 742 } 743 744 /** 745 * Stops message delivery in this session, and restarts message delivery 746 * with the oldest unacknowledged message. 747 * <P> 748 * All consumers deliver messages in a serial order. Acknowledging a 749 * received message automatically acknowledges all messages that have been 750 * delivered to the client. 751 * <P> 752 * Restarting a session causes it to take the following actions: 753 * <UL> 754 * <LI>Stop message delivery 755 * <LI>Mark all messages that might have been delivered but not 756 * acknowledged as "redelivered" 757 * <LI>Restart the delivery sequence including all unacknowledged messages 758 * that had been previously delivered. Redelivered messages do not have to 759 * be delivered in exactly their original delivery order. 760 * </UL> 761 * 762 * @throws JMSException if the JMS provider fails to stop and restart 763 * message delivery due to some internal error. 764 * @throws IllegalStateException if the method is called by a transacted 765 * session. 766 */ 767 public void recover() throws JMSException { 768 769 checkClosed(); 770 if (getTransacted()) { 771 throw new IllegalStateException("This session is transacted"); 772 } 773 774 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 775 ActiveMQMessageConsumer c = iter.next(); 776 c.rollback(); 777 } 778 779 } 780 781 /** 782 * Returns the session's distinguished message listener (optional). 783 * 784 * @return the message listener associated with this session 785 * @throws JMSException if the JMS provider fails to get the message 786 * listener due to an internal error. 787 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 788 * @see javax.jms.ServerSessionPool 789 * @see javax.jms.ServerSession 790 */ 791 public MessageListener getMessageListener() throws JMSException { 792 checkClosed(); 793 return this.messageListener; 794 } 795 796 /** 797 * Sets the session's distinguished message listener (optional). 798 * <P> 799 * When the distinguished message listener is set, no other form of message 800 * receipt in the session can be used; however, all forms of sending 801 * messages are still supported. 802 * <P> 803 * This is an expert facility not used by regular JMS clients. 804 * 805 * @param listener the message listener to associate with this session 806 * @throws JMSException if the JMS provider fails to set the message 807 * listener due to an internal error. 808 * @see javax.jms.Session#getMessageListener() 809 * @see javax.jms.ServerSessionPool 810 * @see javax.jms.ServerSession 811 */ 812 public void setMessageListener(MessageListener listener) throws JMSException { 813 checkClosed(); 814 this.messageListener = listener; 815 816 if (listener != null) { 817 executor.setDispatchedBySessionPool(true); 818 } 819 } 820 821 /** 822 * Optional operation, intended to be used only by Application Servers, not 823 * by ordinary JMS clients. 824 * 825 * @see javax.jms.ServerSession 826 */ 827 public void run() { 828 MessageDispatch messageDispatch; 829 while ((messageDispatch = executor.dequeueNoWait()) != null) { 830 final MessageDispatch md = messageDispatch; 831 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 832 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) { 833 // TODO: Ack it without delivery to client 834 continue; 835 } 836 837 if (isClientAcknowledge()||isIndividualAcknowledge()) { 838 message.setAcknowledgeCallback(new Callback() { 839 public void execute() throws Exception { 840 } 841 }); 842 } 843 844 if (deliveryListener != null) { 845 deliveryListener.beforeDelivery(this, message); 846 } 847 848 md.setDeliverySequenceId(getNextDeliveryId()); 849 850 try { 851 messageListener.onMessage(message); 852 } catch (RuntimeException e) { 853 LOG.error("error dispatching message: ", e); 854 // A problem while invoking the MessageListener does not 855 // in general indicate a problem with the connection to the broker, i.e. 856 // it will usually be sufficient to let the afterDelivery() method either 857 // commit or roll back in order to deal with the exception. 858 // However, we notify any registered client internal exception listener 859 // of the problem. 860 connection.onClientInternalException(e); 861 } 862 863 try { 864 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 865 ack.setFirstMessageId(md.getMessage().getMessageId()); 866 doStartTransaction(); 867 ack.setTransactionId(getTransactionContext().getTransactionId()); 868 if (ack.getTransactionId() != null) { 869 getTransactionContext().addSynchronization(new Synchronization() { 870 871 @Override 872 public void afterRollback() throws Exception { 873 md.getMessage().onMessageRolledBack(); 874 // ensure we don't filter this as a duplicate 875 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 876 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 877 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 878 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 879 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { 880 // We need to NACK the messages so that they get 881 // sent to the 882 // DLQ. 883 // Acknowledge the last message. 884 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 885 ack.setFirstMessageId(md.getMessage().getMessageId()); 886 asyncSendPacket(ack); 887 } else { 888 889 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 890 ack.setFirstMessageId(md.getMessage().getMessageId()); 891 asyncSendPacket(ack); 892 893 // Figure out how long we should wait to resend 894 // this message. 895 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 896 for (int i = 0; i < redeliveryCounter; i++) { 897 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 898 } 899 connection.getScheduler().executeAfterDelay(new Runnable() { 900 901 public void run() { 902 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); 903 } 904 }, redeliveryDelay); 905 } 906 } 907 }); 908 } 909 asyncSendPacket(ack); 910 } catch (Throwable e) { 911 connection.onClientInternalException(e); 912 } 913 914 if (deliveryListener != null) { 915 deliveryListener.afterDelivery(this, message); 916 } 917 } 918 } 919 920 /** 921 * Creates a <CODE>MessageProducer</CODE> to send messages to the 922 * specified destination. 923 * <P> 924 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 925 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 926 * inherit from <CODE>Destination</CODE>, they can be used in the 927 * destination parameter to create a <CODE>MessageProducer</CODE> object. 928 * 929 * @param destination the <CODE>Destination</CODE> to send to, or null if 930 * this is a producer which does not have a specified 931 * destination. 932 * @return the MessageProducer 933 * @throws JMSException if the session fails to create a MessageProducer due 934 * to some internal error. 935 * @throws InvalidDestinationException if an invalid destination is 936 * specified. 937 * @since 1.1 938 */ 939 public MessageProducer createProducer(Destination destination) throws JMSException { 940 checkClosed(); 941 if (destination instanceof CustomDestination) { 942 CustomDestination customDestination = (CustomDestination)destination; 943 return customDestination.createProducer(this); 944 } 945 int timeSendOut = connection.getSendTimeout(); 946 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 947 } 948 949 /** 950 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 951 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 952 * <CODE>Destination</CODE>, they can be used in the destination 953 * parameter to create a <CODE>MessageConsumer</CODE>. 954 * 955 * @param destination the <CODE>Destination</CODE> to access. 956 * @return the MessageConsumer 957 * @throws JMSException if the session fails to create a consumer due to 958 * some internal error. 959 * @throws InvalidDestinationException if an invalid destination is 960 * specified. 961 * @since 1.1 962 */ 963 public MessageConsumer createConsumer(Destination destination) throws JMSException { 964 return createConsumer(destination, (String) null); 965 } 966 967 /** 968 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 969 * using a message selector. Since <CODE> Queue</CODE> and 970 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 971 * can be used in the destination parameter to create a 972 * <CODE>MessageConsumer</CODE>. 973 * <P> 974 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 975 * that have been sent to a destination. 976 * 977 * @param destination the <CODE>Destination</CODE> to access 978 * @param messageSelector only messages with properties matching the message 979 * selector expression are delivered. A value of null or an 980 * empty string indicates that there is no message selector 981 * for the message consumer. 982 * @return the MessageConsumer 983 * @throws JMSException if the session fails to create a MessageConsumer due 984 * to some internal error. 985 * @throws InvalidDestinationException if an invalid destination is 986 * specified. 987 * @throws InvalidSelectorException if the message selector is invalid. 988 * @since 1.1 989 */ 990 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 991 return createConsumer(destination, messageSelector, false); 992 } 993 994 /** 995 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 996 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 997 * <CODE>Destination</CODE>, they can be used in the destination 998 * parameter to create a <CODE>MessageConsumer</CODE>. 999 * 1000 * @param destination the <CODE>Destination</CODE> to access. 1001 * @param messageListener the listener to use for async consumption of messages 1002 * @return the MessageConsumer 1003 * @throws JMSException if the session fails to create a consumer due to 1004 * some internal error. 1005 * @throws InvalidDestinationException if an invalid destination is 1006 * specified. 1007 * @since 1.1 1008 */ 1009 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1010 return createConsumer(destination, null, messageListener); 1011 } 1012 1013 /** 1014 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1015 * using a message selector. Since <CODE> Queue</CODE> and 1016 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1017 * can be used in the destination parameter to create a 1018 * <CODE>MessageConsumer</CODE>. 1019 * <P> 1020 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1021 * that have been sent to a destination. 1022 * 1023 * @param destination the <CODE>Destination</CODE> to access 1024 * @param messageSelector only messages with properties matching the message 1025 * selector expression are delivered. A value of null or an 1026 * empty string indicates that there is no message selector 1027 * for the message consumer. 1028 * @param messageListener the listener to use for async consumption of messages 1029 * @return the MessageConsumer 1030 * @throws JMSException if the session fails to create a MessageConsumer due 1031 * to some internal error. 1032 * @throws InvalidDestinationException if an invalid destination is 1033 * specified. 1034 * @throws InvalidSelectorException if the message selector is invalid. 1035 * @since 1.1 1036 */ 1037 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1038 return createConsumer(destination, messageSelector, false, messageListener); 1039 } 1040 1041 /** 1042 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1043 * using a message selector. This method can specify whether messages 1044 * published by its own connection should be delivered to it, if the 1045 * destination is a topic. 1046 * <P> 1047 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1048 * <CODE>Destination</CODE>, they can be used in the destination 1049 * parameter to create a <CODE>MessageConsumer</CODE>. 1050 * <P> 1051 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1052 * that have been published to a destination. 1053 * <P> 1054 * In some cases, a connection may both publish and subscribe to a topic. 1055 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1056 * inhibit the delivery of messages published by its own connection. The 1057 * default value for this attribute is False. The <CODE>noLocal</CODE> 1058 * value must be supported by destinations that are topics. 1059 * 1060 * @param destination the <CODE>Destination</CODE> to access 1061 * @param messageSelector only messages with properties matching the message 1062 * selector expression are delivered. A value of null or an 1063 * empty string indicates that there is no message selector 1064 * for the message consumer. 1065 * @param noLocal - if true, and the destination is a topic, inhibits the 1066 * delivery of messages published by its own connection. The 1067 * behavior for <CODE>NoLocal</CODE> is not specified if 1068 * the destination is a queue. 1069 * @return the MessageConsumer 1070 * @throws JMSException if the session fails to create a MessageConsumer due 1071 * to some internal error. 1072 * @throws InvalidDestinationException if an invalid destination is 1073 * specified. 1074 * @throws InvalidSelectorException if the message selector is invalid. 1075 * @since 1.1 1076 */ 1077 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1078 return createConsumer(destination, messageSelector, noLocal, null); 1079 } 1080 1081 /** 1082 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1083 * using a message selector. This method can specify whether messages 1084 * published by its own connection should be delivered to it, if the 1085 * destination is a topic. 1086 * <P> 1087 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1088 * <CODE>Destination</CODE>, they can be used in the destination 1089 * parameter to create a <CODE>MessageConsumer</CODE>. 1090 * <P> 1091 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1092 * that have been published to a destination. 1093 * <P> 1094 * In some cases, a connection may both publish and subscribe to a topic. 1095 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1096 * inhibit the delivery of messages published by its own connection. The 1097 * default value for this attribute is False. The <CODE>noLocal</CODE> 1098 * value must be supported by destinations that are topics. 1099 * 1100 * @param destination the <CODE>Destination</CODE> to access 1101 * @param messageSelector only messages with properties matching the message 1102 * selector expression are delivered. A value of null or an 1103 * empty string indicates that there is no message selector 1104 * for the message consumer. 1105 * @param noLocal - if true, and the destination is a topic, inhibits the 1106 * delivery of messages published by its own connection. The 1107 * behavior for <CODE>NoLocal</CODE> is not specified if 1108 * the destination is a queue. 1109 * @param messageListener the listener to use for async consumption of messages 1110 * @return the MessageConsumer 1111 * @throws JMSException if the session fails to create a MessageConsumer due 1112 * to some internal error. 1113 * @throws InvalidDestinationException if an invalid destination is 1114 * specified. 1115 * @throws InvalidSelectorException if the message selector is invalid. 1116 * @since 1.1 1117 */ 1118 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1119 checkClosed(); 1120 1121 if (destination instanceof CustomDestination) { 1122 CustomDestination customDestination = (CustomDestination)destination; 1123 return customDestination.createConsumer(this, messageSelector, noLocal); 1124 } 1125 1126 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1127 int prefetch = 0; 1128 if (destination instanceof Topic) { 1129 prefetch = prefetchPolicy.getTopicPrefetch(); 1130 } else { 1131 prefetch = prefetchPolicy.getQueuePrefetch(); 1132 } 1133 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1134 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1135 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1136 } 1137 1138 /** 1139 * Creates a queue identity given a <CODE>Queue</CODE> name. 1140 * <P> 1141 * This facility is provided for the rare cases where clients need to 1142 * dynamically manipulate queue identity. It allows the creation of a queue 1143 * identity with a provider-specific name. Clients that depend on this 1144 * ability are not portable. 1145 * <P> 1146 * Note that this method is not for creating the physical queue. The 1147 * physical creation of queues is an administrative task and is not to be 1148 * initiated by the JMS API. The one exception is the creation of temporary 1149 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1150 * method. 1151 * 1152 * @param queueName the name of this <CODE>Queue</CODE> 1153 * @return a <CODE>Queue</CODE> with the given name 1154 * @throws JMSException if the session fails to create a queue due to some 1155 * internal error. 1156 * @since 1.1 1157 */ 1158 public Queue createQueue(String queueName) throws JMSException { 1159 checkClosed(); 1160 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1161 return new ActiveMQTempQueue(queueName); 1162 } 1163 return new ActiveMQQueue(queueName); 1164 } 1165 1166 /** 1167 * Creates a topic identity given a <CODE>Topic</CODE> name. 1168 * <P> 1169 * This facility is provided for the rare cases where clients need to 1170 * dynamically manipulate topic identity. This allows the creation of a 1171 * topic identity with a provider-specific name. Clients that depend on this 1172 * ability are not portable. 1173 * <P> 1174 * Note that this method is not for creating the physical topic. The 1175 * physical creation of topics is an administrative task and is not to be 1176 * initiated by the JMS API. The one exception is the creation of temporary 1177 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1178 * method. 1179 * 1180 * @param topicName the name of this <CODE>Topic</CODE> 1181 * @return a <CODE>Topic</CODE> with the given name 1182 * @throws JMSException if the session fails to create a topic due to some 1183 * internal error. 1184 * @since 1.1 1185 */ 1186 public Topic createTopic(String topicName) throws JMSException { 1187 checkClosed(); 1188 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1189 return new ActiveMQTempTopic(topicName); 1190 } 1191 return new ActiveMQTopic(topicName); 1192 } 1193 1194 /** 1195 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1196 * the specified queue. 1197 * 1198 * @param queue the <CODE>queue</CODE> to access 1199 * @exception InvalidDestinationException if an invalid destination is 1200 * specified 1201 * @since 1.1 1202 */ 1203 /** 1204 * Creates a durable subscriber to the specified topic. 1205 * <P> 1206 * If a client needs to receive all the messages published on a topic, 1207 * including the ones published while the subscriber is inactive, it uses a 1208 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1209 * record of this durable subscription and insures that all messages from 1210 * the topic's publishers are retained until they are acknowledged by this 1211 * durable subscriber or they have expired. 1212 * <P> 1213 * Sessions with durable subscribers must always provide the same client 1214 * identifier. In addition, each client must specify a name that uniquely 1215 * identifies (within client identifier) each durable subscription it 1216 * creates. Only one session at a time can have a 1217 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1218 * <P> 1219 * A client can change an existing durable subscription by creating a 1220 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1221 * and/or message selector. Changing a durable subscriber is equivalent to 1222 * unsubscribing (deleting) the old one and creating a new one. 1223 * <P> 1224 * In some cases, a connection may both publish and subscribe to a topic. 1225 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1226 * inhibit the delivery of messages published by its own connection. The 1227 * default value for this attribute is false. 1228 * 1229 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1230 * @param name the name used to identify this subscription 1231 * @return the TopicSubscriber 1232 * @throws JMSException if the session fails to create a subscriber due to 1233 * some internal error. 1234 * @throws InvalidDestinationException if an invalid topic is specified. 1235 * @since 1.1 1236 */ 1237 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1238 checkClosed(); 1239 return createDurableSubscriber(topic, name, null, false); 1240 } 1241 1242 /** 1243 * Creates a durable subscriber to the specified topic, using a message 1244 * selector and specifying whether messages published by its own connection 1245 * should be delivered to it. 1246 * <P> 1247 * If a client needs to receive all the messages published on a topic, 1248 * including the ones published while the subscriber is inactive, it uses a 1249 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1250 * record of this durable subscription and insures that all messages from 1251 * the topic's publishers are retained until they are acknowledged by this 1252 * durable subscriber or they have expired. 1253 * <P> 1254 * Sessions with durable subscribers must always provide the same client 1255 * identifier. In addition, each client must specify a name which uniquely 1256 * identifies (within client identifier) each durable subscription it 1257 * creates. Only one session at a time can have a 1258 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1259 * inactive durable subscriber is one that exists but does not currently 1260 * have a message consumer associated with it. 1261 * <P> 1262 * A client can change an existing durable subscription by creating a 1263 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1264 * and/or message selector. Changing a durable subscriber is equivalent to 1265 * unsubscribing (deleting) the old one and creating a new one. 1266 * 1267 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1268 * @param name the name used to identify this subscription 1269 * @param messageSelector only messages with properties matching the message 1270 * selector expression are delivered. A value of null or an 1271 * empty string indicates that there is no message selector 1272 * for the message consumer. 1273 * @param noLocal if set, inhibits the delivery of messages published by its 1274 * own connection 1275 * @return the Queue Browser 1276 * @throws JMSException if the session fails to create a subscriber due to 1277 * some internal error. 1278 * @throws InvalidDestinationException if an invalid topic is specified. 1279 * @throws InvalidSelectorException if the message selector is invalid. 1280 * @since 1.1 1281 */ 1282 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1283 checkClosed(); 1284 1285 if (isIndividualAcknowledge()) { 1286 throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+ 1287 "INDIVIDUAL_ACKNOWLEDGE mode.", null); 1288 } 1289 1290 if (topic instanceof CustomDestination) { 1291 CustomDestination customDestination = (CustomDestination)topic; 1292 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1293 } 1294 1295 connection.checkClientIDWasManuallySpecified(); 1296 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1297 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1298 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1299 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1300 noLocal, false, asyncDispatch); 1301 } 1302 1303 /** 1304 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1305 * the specified queue. 1306 * 1307 * @param queue the <CODE>queue</CODE> to access 1308 * @return the Queue Browser 1309 * @throws JMSException if the session fails to create a browser due to some 1310 * internal error. 1311 * @throws InvalidDestinationException if an invalid destination is 1312 * specified 1313 * @since 1.1 1314 */ 1315 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1316 checkClosed(); 1317 return createBrowser(queue, null); 1318 } 1319 1320 /** 1321 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1322 * the specified queue using a message selector. 1323 * 1324 * @param queue the <CODE>queue</CODE> to access 1325 * @param messageSelector only messages with properties matching the message 1326 * selector expression are delivered. A value of null or an 1327 * empty string indicates that there is no message selector 1328 * for the message consumer. 1329 * @return the Queue Browser 1330 * @throws JMSException if the session fails to create a browser due to some 1331 * internal error. 1332 * @throws InvalidDestinationException if an invalid destination is 1333 * specified 1334 * @throws InvalidSelectorException if the message selector is invalid. 1335 * @since 1.1 1336 */ 1337 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1338 checkClosed(); 1339 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1340 } 1341 1342 /** 1343 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1344 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1345 * 1346 * @return a temporary queue identity 1347 * @throws JMSException if the session fails to create a temporary queue due 1348 * to some internal error. 1349 * @since 1.1 1350 */ 1351 public TemporaryQueue createTemporaryQueue() throws JMSException { 1352 checkClosed(); 1353 return (TemporaryQueue)connection.createTempDestination(false); 1354 } 1355 1356 /** 1357 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1358 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1359 * 1360 * @return a temporary topic identity 1361 * @throws JMSException if the session fails to create a temporary topic due 1362 * to some internal error. 1363 * @since 1.1 1364 */ 1365 public TemporaryTopic createTemporaryTopic() throws JMSException { 1366 checkClosed(); 1367 return (TemporaryTopic)connection.createTempDestination(true); 1368 } 1369 1370 /** 1371 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1372 * the specified queue. 1373 * 1374 * @param queue the <CODE>Queue</CODE> to access 1375 * @return 1376 * @throws JMSException if the session fails to create a receiver due to 1377 * some internal error. 1378 * @throws JMSException 1379 * @throws InvalidDestinationException if an invalid queue is specified. 1380 */ 1381 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1382 checkClosed(); 1383 return createReceiver(queue, null); 1384 } 1385 1386 /** 1387 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1388 * the specified queue using a message selector. 1389 * 1390 * @param queue the <CODE>Queue</CODE> to access 1391 * @param messageSelector only messages with properties matching the message 1392 * selector expression are delivered. A value of null or an 1393 * empty string indicates that there is no message selector 1394 * for the message consumer. 1395 * @return QueueReceiver 1396 * @throws JMSException if the session fails to create a receiver due to 1397 * some internal error. 1398 * @throws InvalidDestinationException if an invalid queue is specified. 1399 * @throws InvalidSelectorException if the message selector is invalid. 1400 */ 1401 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1402 checkClosed(); 1403 1404 if (queue instanceof CustomDestination) { 1405 CustomDestination customDestination = (CustomDestination)queue; 1406 return customDestination.createReceiver(this, messageSelector); 1407 } 1408 1409 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1410 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1411 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1412 } 1413 1414 /** 1415 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1416 * specified queue. 1417 * 1418 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1419 * unidentified producer 1420 * @return QueueSender 1421 * @throws JMSException if the session fails to create a sender due to some 1422 * internal error. 1423 * @throws InvalidDestinationException if an invalid queue is specified. 1424 */ 1425 public QueueSender createSender(Queue queue) throws JMSException { 1426 checkClosed(); 1427 if (queue instanceof CustomDestination) { 1428 CustomDestination customDestination = (CustomDestination)queue; 1429 return customDestination.createSender(this); 1430 } 1431 int timeSendOut = connection.getSendTimeout(); 1432 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1433 } 1434 1435 /** 1436 * Creates a nondurable subscriber to the specified topic. <p/> 1437 * <P> 1438 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1439 * that have been published to a topic. <p/> 1440 * <P> 1441 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1442 * receive only messages that are published while they are active. <p/> 1443 * <P> 1444 * In some cases, a connection may both publish and subscribe to a topic. 1445 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1446 * inhibit the delivery of messages published by its own connection. The 1447 * default value for this attribute is false. 1448 * 1449 * @param topic the <CODE>Topic</CODE> to subscribe to 1450 * @return TopicSubscriber 1451 * @throws JMSException if the session fails to create a subscriber due to 1452 * some internal error. 1453 * @throws InvalidDestinationException if an invalid topic is specified. 1454 */ 1455 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1456 checkClosed(); 1457 return createSubscriber(topic, null, false); 1458 } 1459 1460 /** 1461 * Creates a nondurable subscriber to the specified topic, using a message 1462 * selector or specifying whether messages published by its own connection 1463 * should be delivered to it. <p/> 1464 * <P> 1465 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1466 * that have been published to a topic. <p/> 1467 * <P> 1468 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1469 * receive only messages that are published while they are active. <p/> 1470 * <P> 1471 * Messages filtered out by a subscriber's message selector will never be 1472 * delivered to the subscriber. From the subscriber's perspective, they do 1473 * not exist. <p/> 1474 * <P> 1475 * In some cases, a connection may both publish and subscribe to a topic. 1476 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1477 * inhibit the delivery of messages published by its own connection. The 1478 * default value for this attribute is false. 1479 * 1480 * @param topic the <CODE>Topic</CODE> to subscribe to 1481 * @param messageSelector only messages with properties matching the message 1482 * selector expression are delivered. A value of null or an 1483 * empty string indicates that there is no message selector 1484 * for the message consumer. 1485 * @param noLocal if set, inhibits the delivery of messages published by its 1486 * own connection 1487 * @return TopicSubscriber 1488 * @throws JMSException if the session fails to create a subscriber due to 1489 * some internal error. 1490 * @throws InvalidDestinationException if an invalid topic is specified. 1491 * @throws InvalidSelectorException if the message selector is invalid. 1492 */ 1493 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1494 checkClosed(); 1495 1496 if (topic instanceof CustomDestination) { 1497 CustomDestination customDestination = (CustomDestination)topic; 1498 return customDestination.createSubscriber(this, messageSelector, noLocal); 1499 } 1500 1501 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1502 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1503 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1504 } 1505 1506 /** 1507 * Creates a publisher for the specified topic. <p/> 1508 * <P> 1509 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1510 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1511 * a topic, it defines a new sequence of messages that have no ordering 1512 * relationship with the messages it has previously sent. 1513 * 1514 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1515 * an unidentified producer 1516 * @return TopicPublisher 1517 * @throws JMSException if the session fails to create a publisher due to 1518 * some internal error. 1519 * @throws InvalidDestinationException if an invalid topic is specified. 1520 */ 1521 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1522 checkClosed(); 1523 1524 if (topic instanceof CustomDestination) { 1525 CustomDestination customDestination = (CustomDestination)topic; 1526 return customDestination.createPublisher(this); 1527 } 1528 int timeSendOut = connection.getSendTimeout(); 1529 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1530 } 1531 1532 /** 1533 * Unsubscribes a durable subscription that has been created by a client. 1534 * <P> 1535 * This method deletes the state being maintained on behalf of the 1536 * subscriber by its provider. 1537 * <P> 1538 * It is erroneous for a client to delete a durable subscription while there 1539 * is an active <CODE>MessageConsumer </CODE> or 1540 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1541 * message is part of a pending transaction or has not been acknowledged in 1542 * the session. 1543 * 1544 * @param name the name used to identify this subscription 1545 * @throws JMSException if the session fails to unsubscribe to the durable 1546 * subscription due to some internal error. 1547 * @throws InvalidDestinationException if an invalid subscription name is 1548 * specified. 1549 * @since 1.1 1550 */ 1551 public void unsubscribe(String name) throws JMSException { 1552 checkClosed(); 1553 connection.unsubscribe(name); 1554 } 1555 1556 public void dispatch(MessageDispatch messageDispatch) { 1557 try { 1558 executor.execute(messageDispatch); 1559 } catch (InterruptedException e) { 1560 Thread.currentThread().interrupt(); 1561 connection.onClientInternalException(e); 1562 } 1563 } 1564 1565 /** 1566 * Acknowledges all consumed messages of the session of this consumed 1567 * message. 1568 * <P> 1569 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1570 * for use when a client has specified that its JMS session's consumed 1571 * messages are to be explicitly acknowledged. By invoking 1572 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1573 * all messages consumed by the session that the message was delivered to. 1574 * <P> 1575 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1576 * sessions and sessions specified to use implicit acknowledgement modes. 1577 * <P> 1578 * A client may individually acknowledge each message as it is consumed, or 1579 * it may choose to acknowledge messages as an application-defined group 1580 * (which is done by calling acknowledge on the last received message of the 1581 * group, thereby acknowledging all messages consumed by the session.) 1582 * <P> 1583 * Messages that have been received but not acknowledged may be redelivered. 1584 * 1585 * @throws JMSException if the JMS provider fails to acknowledge the 1586 * messages due to some internal error. 1587 * @throws javax.jms.IllegalStateException if this method is called on a 1588 * closed session. 1589 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1590 */ 1591 public void acknowledge() throws JMSException { 1592 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1593 ActiveMQMessageConsumer c = iter.next(); 1594 c.acknowledge(); 1595 } 1596 } 1597 1598 /** 1599 * Add a message consumer. 1600 * 1601 * @param consumer - message consumer. 1602 * @throws JMSException 1603 */ 1604 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1605 this.consumers.add(consumer); 1606 if (consumer.isDurableSubscriber()) { 1607 stats.onCreateDurableSubscriber(); 1608 } 1609 this.connection.addDispatcher(consumer.getConsumerId(), this); 1610 } 1611 1612 /** 1613 * Remove the message consumer. 1614 * 1615 * @param consumer - consumer to be removed. 1616 * @throws JMSException 1617 */ 1618 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1619 this.connection.removeDispatcher(consumer.getConsumerId()); 1620 if (consumer.isDurableSubscriber()) { 1621 stats.onRemoveDurableSubscriber(); 1622 } 1623 this.consumers.remove(consumer); 1624 this.connection.removeDispatcher(consumer); 1625 } 1626 1627 /** 1628 * Adds a message producer. 1629 * 1630 * @param producer - message producer to be added. 1631 * @throws JMSException 1632 */ 1633 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1634 this.producers.add(producer); 1635 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1636 } 1637 1638 /** 1639 * Removes a message producer. 1640 * 1641 * @param producer - message producer to be removed. 1642 * @throws JMSException 1643 */ 1644 protected void removeProducer(ActiveMQMessageProducer producer) { 1645 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1646 this.producers.remove(producer); 1647 } 1648 1649 /** 1650 * Start this Session. 1651 * 1652 * @throws JMSException 1653 */ 1654 protected void start() throws JMSException { 1655 started.set(true); 1656 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1657 ActiveMQMessageConsumer c = iter.next(); 1658 c.start(); 1659 } 1660 executor.start(); 1661 } 1662 1663 /** 1664 * Stops this session. 1665 * 1666 * @throws JMSException 1667 */ 1668 protected void stop() throws JMSException { 1669 1670 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1671 ActiveMQMessageConsumer c = iter.next(); 1672 c.stop(); 1673 } 1674 1675 started.set(false); 1676 executor.stop(); 1677 } 1678 1679 /** 1680 * Returns the session id. 1681 * 1682 * @return value - session id. 1683 */ 1684 protected SessionId getSessionId() { 1685 return info.getSessionId(); 1686 } 1687 1688 /** 1689 * @return 1690 */ 1691 protected ConsumerId getNextConsumerId() { 1692 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1693 } 1694 1695 /** 1696 * @return 1697 */ 1698 protected ProducerId getNextProducerId() { 1699 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1700 } 1701 1702 /** 1703 * Sends the message for dispatch by the broker. 1704 * 1705 * 1706 * @param producer - message producer. 1707 * @param destination - message destination. 1708 * @param message - message to be sent. 1709 * @param deliveryMode - JMS messsage delivery mode. 1710 * @param priority - message priority. 1711 * @param timeToLive - message expiration. 1712 * @param producerWindow 1713 * @param onComplete 1714 * @throws JMSException 1715 */ 1716 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1717 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1718 1719 checkClosed(); 1720 if (destination.isTemporary() && connection.isDeleted(destination)) { 1721 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1722 } 1723 synchronized (sendMutex) { 1724 // tell the Broker we are about to start a new transaction 1725 doStartTransaction(); 1726 TransactionId txid = transactionContext.getTransactionId(); 1727 long sequenceNumber = producer.getMessageSequence(); 1728 1729 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1730 message.setJMSDeliveryMode(deliveryMode); 1731 long expiration = 0L; 1732 if (!producer.getDisableMessageTimestamp()) { 1733 long timeStamp = System.currentTimeMillis(); 1734 message.setJMSTimestamp(timeStamp); 1735 if (timeToLive > 0) { 1736 expiration = timeToLive + timeStamp; 1737 } 1738 } 1739 message.setJMSExpiration(expiration); 1740 message.setJMSPriority(priority); 1741 message.setJMSRedelivered(false); 1742 1743 // transform to our own message format here 1744 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1745 1746 // Set the message id. 1747 if (msg == message) { 1748 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1749 } else { 1750 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1751 message.setJMSMessageID(msg.getMessageId().toString()); 1752 } 1753 //clear the brokerPath in case we are re-sending this message 1754 msg.setBrokerPath(null); 1755 // destination format is provider specific so only set on transformed message 1756 msg.setJMSDestination(destination); 1757 1758 msg.setTransactionId(txid); 1759 if (connection.isCopyMessageOnSend()) { 1760 msg = (ActiveMQMessage)msg.copy(); 1761 } 1762 msg.setConnection(connection); 1763 msg.onSend(); 1764 msg.setProducerId(msg.getMessageId().getProducerId()); 1765 if (LOG.isTraceEnabled()) { 1766 LOG.trace(getSessionId() + " sending message: " + msg); 1767 } 1768 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1769 this.connection.asyncSendPacket(msg); 1770 if (producerWindow != null) { 1771 // Since we defer lots of the marshaling till we hit the 1772 // wire, this might not 1773 // provide and accurate size. We may change over to doing 1774 // more aggressive marshaling, 1775 // to get more accurate sizes.. this is more important once 1776 // users start using producer window 1777 // flow control. 1778 int size = msg.getSize(); 1779 producerWindow.increaseUsage(size); 1780 } 1781 } else { 1782 if (sendTimeout > 0 && onComplete==null) { 1783 this.connection.syncSendPacket(msg,sendTimeout); 1784 }else { 1785 this.connection.syncSendPacket(msg, onComplete); 1786 } 1787 } 1788 1789 } 1790 } 1791 1792 /** 1793 * Send TransactionInfo to indicate transaction has started 1794 * 1795 * @throws JMSException if some internal error occurs 1796 */ 1797 protected void doStartTransaction() throws JMSException { 1798 if (getTransacted() && !transactionContext.isInXATransaction()) { 1799 transactionContext.begin(); 1800 } 1801 } 1802 1803 /** 1804 * Checks whether the session has unconsumed messages. 1805 * 1806 * @return true - if there are unconsumed messages. 1807 */ 1808 public boolean hasUncomsumedMessages() { 1809 return executor.hasUncomsumedMessages(); 1810 } 1811 1812 /** 1813 * Checks whether the session uses transactions. 1814 * 1815 * @return true - if the session uses transactions. 1816 */ 1817 public boolean isTransacted() { 1818 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 1819 } 1820 1821 /** 1822 * Checks whether the session used client acknowledgment. 1823 * 1824 * @return true - if the session uses client acknowledgment. 1825 */ 1826 protected boolean isClientAcknowledge() { 1827 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 1828 } 1829 1830 /** 1831 * Checks whether the session used auto acknowledgment. 1832 * 1833 * @return true - if the session uses client acknowledgment. 1834 */ 1835 public boolean isAutoAcknowledge() { 1836 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 1837 } 1838 1839 /** 1840 * Checks whether the session used dup ok acknowledgment. 1841 * 1842 * @return true - if the session uses client acknowledgment. 1843 */ 1844 public boolean isDupsOkAcknowledge() { 1845 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 1846 } 1847 1848 public boolean isIndividualAcknowledge(){ 1849 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 1850 } 1851 1852 /** 1853 * Returns the message delivery listener. 1854 * 1855 * @return deliveryListener - message delivery listener. 1856 */ 1857 public DeliveryListener getDeliveryListener() { 1858 return deliveryListener; 1859 } 1860 1861 /** 1862 * Sets the message delivery listener. 1863 * 1864 * @param deliveryListener - message delivery listener. 1865 */ 1866 public void setDeliveryListener(DeliveryListener deliveryListener) { 1867 this.deliveryListener = deliveryListener; 1868 } 1869 1870 /** 1871 * Returns the SessionInfo bean. 1872 * 1873 * @return info - SessionInfo bean. 1874 * @throws JMSException 1875 */ 1876 protected SessionInfo getSessionInfo() throws JMSException { 1877 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 1878 return info; 1879 } 1880 1881 /** 1882 * Send the asynchronus command. 1883 * 1884 * @param command - command to be executed. 1885 * @throws JMSException 1886 */ 1887 public void asyncSendPacket(Command command) throws JMSException { 1888 connection.asyncSendPacket(command); 1889 } 1890 1891 /** 1892 * Send the synchronus command. 1893 * 1894 * @param command - command to be executed. 1895 * @return Response 1896 * @throws JMSException 1897 */ 1898 public Response syncSendPacket(Command command) throws JMSException { 1899 return connection.syncSendPacket(command); 1900 } 1901 1902 public long getNextDeliveryId() { 1903 return deliveryIdGenerator.getNextSequenceId(); 1904 } 1905 1906 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 1907 1908 List<MessageDispatch> c = unconsumedMessages.removeAll(); 1909 for (MessageDispatch md : c) { 1910 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 1911 } 1912 Collections.reverse(c); 1913 1914 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 1915 MessageDispatch md = iter.next(); 1916 executor.executeFirst(md); 1917 } 1918 1919 } 1920 1921 public boolean isRunning() { 1922 return started.get(); 1923 } 1924 1925 public boolean isAsyncDispatch() { 1926 return asyncDispatch; 1927 } 1928 1929 public void setAsyncDispatch(boolean asyncDispatch) { 1930 this.asyncDispatch = asyncDispatch; 1931 } 1932 1933 /** 1934 * @return Returns the sessionAsyncDispatch. 1935 */ 1936 public boolean isSessionAsyncDispatch() { 1937 return sessionAsyncDispatch; 1938 } 1939 1940 /** 1941 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 1942 */ 1943 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 1944 this.sessionAsyncDispatch = sessionAsyncDispatch; 1945 } 1946 1947 public MessageTransformer getTransformer() { 1948 return transformer; 1949 } 1950 1951 public ActiveMQConnection getConnection() { 1952 return connection; 1953 } 1954 1955 /** 1956 * Sets the transformer used to transform messages before they are sent on 1957 * to the JMS bus or when they are received from the bus but before they are 1958 * delivered to the JMS client 1959 */ 1960 public void setTransformer(MessageTransformer transformer) { 1961 this.transformer = transformer; 1962 } 1963 1964 public BlobTransferPolicy getBlobTransferPolicy() { 1965 return blobTransferPolicy; 1966 } 1967 1968 /** 1969 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1970 * OBjects) are transferred from producers to brokers to consumers 1971 */ 1972 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1973 this.blobTransferPolicy = blobTransferPolicy; 1974 } 1975 1976 public List<MessageDispatch> getUnconsumedMessages() { 1977 return executor.getUnconsumedMessages(); 1978 } 1979 1980 @Override 1981 public String toString() { 1982 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; 1983 } 1984 1985 public void checkMessageListener() throws JMSException { 1986 if (messageListener != null) { 1987 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 1988 } 1989 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 1990 ActiveMQMessageConsumer consumer = i.next(); 1991 if (consumer.getMessageListener() != null) { 1992 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 1993 } 1994 } 1995 } 1996 1997 protected void setOptimizeAcknowledge(boolean value) { 1998 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1999 ActiveMQMessageConsumer c = iter.next(); 2000 c.setOptimizeAcknowledge(value); 2001 } 2002 } 2003 2004 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2005 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2006 ActiveMQMessageConsumer c = iter.next(); 2007 if (c.getConsumerId().equals(id)) { 2008 c.setPrefetchSize(prefetch); 2009 break; 2010 } 2011 } 2012 } 2013 2014 protected void close(ConsumerId id) { 2015 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2016 ActiveMQMessageConsumer c = iter.next(); 2017 if (c.getConsumerId().equals(id)) { 2018 try { 2019 c.close(); 2020 } catch (JMSException e) { 2021 LOG.warn("Exception closing consumer", e); 2022 } 2023 LOG.warn("Closed consumer on Command"); 2024 break; 2025 } 2026 } 2027 } 2028 2029 public boolean isInUse(ActiveMQTempDestination destination) { 2030 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2031 ActiveMQMessageConsumer c = iter.next(); 2032 if (c.isInUse(destination)) { 2033 return true; 2034 } 2035 } 2036 return false; 2037 } 2038 2039 /** 2040 * highest sequence id of the last message delivered by this session. 2041 * Passed to the broker in the close command, maintained by dispose() 2042 * @return lastDeliveredSequenceId 2043 */ 2044 public long getLastDeliveredSequenceId() { 2045 return lastDeliveredSequenceId; 2046 } 2047 2048 protected void sendAck(MessageAck ack) throws JMSException { 2049 sendAck(ack,false); 2050 } 2051 2052 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2053 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2054 asyncSendPacket(ack); 2055 } else { 2056 syncSendPacket(ack); 2057 } 2058 } 2059 2060 protected Scheduler getScheduler() throws JMSException { 2061 return this.connection.getScheduler(); 2062 } 2063 2064 protected ThreadPoolExecutor getConnectionExecutor() { 2065 return this.connectionExecutor; 2066 } 2067 }