001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.Serializable;
022    import java.net.URL;
023    import java.util.Collections;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    
030    import javax.jms.BytesMessage;
031    import javax.jms.Destination;
032    import javax.jms.IllegalStateException;
033    import javax.jms.InvalidDestinationException;
034    import javax.jms.InvalidSelectorException;
035    import javax.jms.JMSException;
036    import javax.jms.MapMessage;
037    import javax.jms.Message;
038    import javax.jms.MessageConsumer;
039    import javax.jms.MessageListener;
040    import javax.jms.MessageProducer;
041    import javax.jms.ObjectMessage;
042    import javax.jms.Queue;
043    import javax.jms.QueueBrowser;
044    import javax.jms.QueueReceiver;
045    import javax.jms.QueueSender;
046    import javax.jms.QueueSession;
047    import javax.jms.Session;
048    import javax.jms.StreamMessage;
049    import javax.jms.TemporaryQueue;
050    import javax.jms.TemporaryTopic;
051    import javax.jms.TextMessage;
052    import javax.jms.Topic;
053    import javax.jms.TopicPublisher;
054    import javax.jms.TopicSession;
055    import javax.jms.TopicSubscriber;
056    import javax.jms.TransactionRolledBackException;
057    
058    import org.apache.activemq.blob.BlobDownloader;
059    import org.apache.activemq.blob.BlobTransferPolicy;
060    import org.apache.activemq.blob.BlobUploader;
061    import org.apache.activemq.command.ActiveMQBlobMessage;
062    import org.apache.activemq.command.ActiveMQBytesMessage;
063    import org.apache.activemq.command.ActiveMQDestination;
064    import org.apache.activemq.command.ActiveMQMapMessage;
065    import org.apache.activemq.command.ActiveMQMessage;
066    import org.apache.activemq.command.ActiveMQObjectMessage;
067    import org.apache.activemq.command.ActiveMQQueue;
068    import org.apache.activemq.command.ActiveMQStreamMessage;
069    import org.apache.activemq.command.ActiveMQTempDestination;
070    import org.apache.activemq.command.ActiveMQTempQueue;
071    import org.apache.activemq.command.ActiveMQTempTopic;
072    import org.apache.activemq.command.ActiveMQTextMessage;
073    import org.apache.activemq.command.ActiveMQTopic;
074    import org.apache.activemq.command.Command;
075    import org.apache.activemq.command.ConsumerId;
076    import org.apache.activemq.command.MessageAck;
077    import org.apache.activemq.command.MessageDispatch;
078    import org.apache.activemq.command.MessageId;
079    import org.apache.activemq.command.ProducerId;
080    import org.apache.activemq.command.RemoveInfo;
081    import org.apache.activemq.command.Response;
082    import org.apache.activemq.command.SessionId;
083    import org.apache.activemq.command.SessionInfo;
084    import org.apache.activemq.command.TransactionId;
085    import org.apache.activemq.management.JMSSessionStatsImpl;
086    import org.apache.activemq.management.StatsCapable;
087    import org.apache.activemq.management.StatsImpl;
088    import org.apache.activemq.thread.Scheduler;
089    import org.apache.activemq.transaction.Synchronization;
090    import org.apache.activemq.usage.MemoryUsage;
091    import org.apache.activemq.util.Callback;
092    import org.apache.activemq.util.JMSExceptionSupport;
093    import org.apache.activemq.util.LongSequenceGenerator;
094    import org.slf4j.Logger;
095    import org.slf4j.LoggerFactory;
096    
097    /**
098     * <P>
099     * A <CODE>Session</CODE> object is a single-threaded context for producing
100     * and consuming messages. Although it may allocate provider resources outside
101     * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102     * <P>
103     * A session serves several purposes:
104     * <UL>
105     * <LI>It is a factory for its message producers and consumers.
106     * <LI>It supplies provider-optimized message factories.
107     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108     * <CODE>TemporaryQueues</CODE>.
109     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110     * objects for those clients that need to dynamically manipulate
111     * provider-specific destination names.
112     * <LI>It supports a single series of transactions that combine work spanning
113     * its producers and consumers into atomic units.
114     * <LI>It defines a serial order for the messages it consumes and the messages
115     * it produces.
116     * <LI>It retains messages it consumes until they have been acknowledged.
117     * <LI>It serializes execution of message listeners registered with its message
118     * consumers.
119     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120     * </UL>
121     * <P>
122     * A session can create and service multiple message producers and consumers.
123     * <P>
124     * One typical use is to have a thread block on a synchronous
125     * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126     * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127     * <P>
128     * If a client desires to have one thread produce messages while others consume
129     * them, the client should use a separate session for its producing thread.
130     * <P>
131     * Once a connection has been started, any session with one or more registered
132     * message listeners is dedicated to the thread of control that delivers
133     * messages to it. It is erroneous for client code to use this session or any of
134     * its constituent objects from another thread of control. The only exception to
135     * this rule is the use of the session or connection <CODE>close</CODE>
136     * method.
137     * <P>
138     * It should be easy for most clients to partition their work naturally into
139     * sessions. This model allows clients to start simply and incrementally add
140     * message processing complexity as their need for concurrency grows.
141     * <P>
142     * The <CODE>close</CODE> method is the only session method that can be called
143     * while some other session method is being executed in another thread.
144     * <P>
145     * A session may be specified as transacted. Each transacted session supports a
146     * single series of transactions. Each transaction groups a set of message sends
147     * and a set of message receives into an atomic unit of work. In effect,
148     * transactions organize a session's input message stream and output message
149     * stream into series of atomic units. When a transaction commits, its atomic
150     * unit of input is acknowledged and its associated atomic unit of output is
151     * sent. If a transaction rollback is done, the transaction's sent messages are
152     * destroyed and the session's input is automatically recovered.
153     * <P>
154     * The content of a transaction's input and output units is simply those
155     * messages that have been produced and consumed within the session's current
156     * transaction.
157     * <P>
158     * A transaction is completed using either its session's <CODE>commit</CODE>
159     * method or its session's <CODE>rollback </CODE> method. The completion of a
160     * session's current transaction automatically begins the next. The result is
161     * that a transacted session always has a current transaction within which its
162     * work is done.
163     * <P>
164     * The Java Transaction Service (JTS) or some other transaction monitor may be
165     * used to combine a session's transaction with transactions on other resources
166     * (databases, other JMS sessions, etc.). Since Java distributed transactions
167     * are controlled via the Java Transaction API (JTA), use of the session's
168     * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169     * prohibited.
170     * <P>
171     * The JMS API does not require support for JTA; however, it does define how a
172     * provider supplies this support.
173     * <P>
174     * Although it is also possible for a JMS client to handle distributed
175     * transactions directly, it is unlikely that many JMS clients will do this.
176     * Support for JTA in the JMS API is targeted at systems vendors who will be
177     * integrating the JMS API into their application server products.
178     *
179     *
180     * @see javax.jms.Session
181     * @see javax.jms.QueueSession
182     * @see javax.jms.TopicSession
183     * @see javax.jms.XASession
184     */
185    public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186    
187        /**
188         * Only acknowledge an individual message - using message.acknowledge()
189         * as opposed to CLIENT_ACKNOWLEDGE which
190         * acknowledges all messages consumed by a session at when acknowledge()
191         * is called
192         */
193        public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194        public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195    
196        public static interface DeliveryListener {
197            void beforeDelivery(ActiveMQSession session, Message msg);
198    
199            void afterDelivery(ActiveMQSession session, Message msg);
200        }
201    
202        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203        private final ThreadPoolExecutor connectionExecutor;
204    
205        protected int acknowledgementMode;
206        protected final ActiveMQConnection connection;
207        protected final SessionInfo info;
208        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209        protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210        protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211        protected final ActiveMQSessionExecutor executor;
212        protected final AtomicBoolean started = new AtomicBoolean(false);
213    
214        protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215        protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216    
217        protected boolean closed;
218        private volatile boolean synchronizationRegistered;
219        protected boolean asyncDispatch;
220        protected boolean sessionAsyncDispatch;
221        protected final boolean debug;
222        protected Object sendMutex = new Object();
223    
224        private MessageListener messageListener;
225        private final JMSSessionStatsImpl stats;
226        private TransactionContext transactionContext;
227        private DeliveryListener deliveryListener;
228        private MessageTransformer transformer;
229        private BlobTransferPolicy blobTransferPolicy;
230        private long lastDeliveredSequenceId;
231    
232        /**
233         * Construct the Session
234         *
235         * @param connection
236         * @param sessionId
237         * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
238         *                Session.SESSION_TRANSACTED
239         * @param asyncDispatch
240         * @param sessionAsyncDispatch
241         * @throws JMSException on internal error
242         */
243        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
244            this.debug = LOG.isDebugEnabled();
245            this.connection = connection;
246            this.acknowledgementMode = acknowledgeMode;
247            this.asyncDispatch = asyncDispatch;
248            this.sessionAsyncDispatch = sessionAsyncDispatch;
249            this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
250            setTransactionContext(new TransactionContext(connection));
251            stats = new JMSSessionStatsImpl(producers, consumers);
252            this.connection.asyncSendPacket(info);
253            setTransformer(connection.getTransformer());
254            setBlobTransferPolicy(connection.getBlobTransferPolicy());
255            this.connectionExecutor=connection.getExecutor();
256            this.executor = new ActiveMQSessionExecutor(this);
257            connection.addSession(this);
258            if (connection.isStarted()) {
259                start();
260            }
261    
262        }
263    
264        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
265            this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
266        }
267    
268        /**
269         * Sets the transaction context of the session.
270         *
271         * @param transactionContext - provides the means to control a JMS
272         *                transaction.
273         */
274        public void setTransactionContext(TransactionContext transactionContext) {
275            this.transactionContext = transactionContext;
276        }
277    
278        /**
279         * Returns the transaction context of the session.
280         *
281         * @return transactionContext - session's transaction context.
282         */
283        public TransactionContext getTransactionContext() {
284            return transactionContext;
285        }
286    
287        /*
288         * (non-Javadoc)
289         *
290         * @see org.apache.activemq.management.StatsCapable#getStats()
291         */
292        public StatsImpl getStats() {
293            return stats;
294        }
295    
296        /**
297         * Returns the session's statistics.
298         *
299         * @return stats - session's statistics.
300         */
301        public JMSSessionStatsImpl getSessionStats() {
302            return stats;
303        }
304    
305        /**
306         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
307         * object is used to send a message containing a stream of uninterpreted
308         * bytes.
309         *
310         * @return the an ActiveMQBytesMessage
311         * @throws JMSException if the JMS provider fails to create this message due
312         *                 to some internal error.
313         */
314        public BytesMessage createBytesMessage() throws JMSException {
315            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
316            configureMessage(message);
317            return message;
318        }
319    
320        /**
321         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
322         * object is used to send a self-defining set of name-value pairs, where
323         * names are <CODE>String</CODE> objects and values are primitive values
324         * in the Java programming language.
325         *
326         * @return an ActiveMQMapMessage
327         * @throws JMSException if the JMS provider fails to create this message due
328         *                 to some internal error.
329         */
330        public MapMessage createMapMessage() throws JMSException {
331            ActiveMQMapMessage message = new ActiveMQMapMessage();
332            configureMessage(message);
333            return message;
334        }
335    
336        /**
337         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
338         * interface is the root interface of all JMS messages. A
339         * <CODE>Message</CODE> object holds all the standard message header
340         * information. It can be sent when a message containing only header
341         * information is sufficient.
342         *
343         * @return an ActiveMQMessage
344         * @throws JMSException if the JMS provider fails to create this message due
345         *                 to some internal error.
346         */
347        public Message createMessage() throws JMSException {
348            ActiveMQMessage message = new ActiveMQMessage();
349            configureMessage(message);
350            return message;
351        }
352    
353        /**
354         * Creates an <CODE>ObjectMessage</CODE> object. An
355         * <CODE>ObjectMessage</CODE> object is used to send a message that
356         * contains a serializable Java object.
357         *
358         * @return an ActiveMQObjectMessage
359         * @throws JMSException if the JMS provider fails to create this message due
360         *                 to some internal error.
361         */
362        public ObjectMessage createObjectMessage() throws JMSException {
363            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
364            configureMessage(message);
365            return message;
366        }
367    
368        /**
369         * Creates an initialized <CODE>ObjectMessage</CODE> object. An
370         * <CODE>ObjectMessage</CODE> object is used to send a message that
371         * contains a serializable Java object.
372         *
373         * @param object the object to use to initialize this message
374         * @return an ActiveMQObjectMessage
375         * @throws JMSException if the JMS provider fails to create this message due
376         *                 to some internal error.
377         */
378        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
379            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
380            configureMessage(message);
381            message.setObject(object);
382            return message;
383        }
384    
385        /**
386         * Creates a <CODE>StreamMessage</CODE> object. A
387         * <CODE>StreamMessage</CODE> object is used to send a self-defining
388         * stream of primitive values in the Java programming language.
389         *
390         * @return an ActiveMQStreamMessage
391         * @throws JMSException if the JMS provider fails to create this message due
392         *                 to some internal error.
393         */
394        public StreamMessage createStreamMessage() throws JMSException {
395            ActiveMQStreamMessage message = new ActiveMQStreamMessage();
396            configureMessage(message);
397            return message;
398        }
399    
400        /**
401         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
402         * object is used to send a message containing a <CODE>String</CODE>
403         * object.
404         *
405         * @return an ActiveMQTextMessage
406         * @throws JMSException if the JMS provider fails to create this message due
407         *                 to some internal error.
408         */
409        public TextMessage createTextMessage() throws JMSException {
410            ActiveMQTextMessage message = new ActiveMQTextMessage();
411            configureMessage(message);
412            return message;
413        }
414    
415        /**
416         * Creates an initialized <CODE>TextMessage</CODE> object. A
417         * <CODE>TextMessage</CODE> object is used to send a message containing a
418         * <CODE>String</CODE>.
419         *
420         * @param text the string used to initialize this message
421         * @return an ActiveMQTextMessage
422         * @throws JMSException if the JMS provider fails to create this message due
423         *                 to some internal error.
424         */
425        public TextMessage createTextMessage(String text) throws JMSException {
426            ActiveMQTextMessage message = new ActiveMQTextMessage();
427            message.setText(text);
428            configureMessage(message);
429            return message;
430        }
431    
432        /**
433         * Creates an initialized <CODE>BlobMessage</CODE> object. A
434         * <CODE>BlobMessage</CODE> object is used to send a message containing a
435         * <CODE>URL</CODE> which points to some network addressible BLOB.
436         *
437         * @param url the network addressable URL used to pass directly to the
438         *                consumer
439         * @return a BlobMessage
440         * @throws JMSException if the JMS provider fails to create this message due
441         *                 to some internal error.
442         */
443        public BlobMessage createBlobMessage(URL url) throws JMSException {
444            return createBlobMessage(url, false);
445        }
446    
447        /**
448         * Creates an initialized <CODE>BlobMessage</CODE> object. A
449         * <CODE>BlobMessage</CODE> object is used to send a message containing a
450         * <CODE>URL</CODE> which points to some network addressible BLOB.
451         *
452         * @param url the network addressable URL used to pass directly to the
453         *                consumer
454         * @param deletedByBroker indicates whether or not the resource is deleted
455         *                by the broker when the message is acknowledged
456         * @return a BlobMessage
457         * @throws JMSException if the JMS provider fails to create this message due
458         *                 to some internal error.
459         */
460        public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
461            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
462            configureMessage(message);
463            message.setURL(url);
464            message.setDeletedByBroker(deletedByBroker);
465            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
466            return message;
467        }
468    
469        /**
470         * Creates an initialized <CODE>BlobMessage</CODE> object. A
471         * <CODE>BlobMessage</CODE> object is used to send a message containing
472         * the <CODE>File</CODE> content. Before the message is sent the file
473         * conent will be uploaded to the broker or some other remote repository
474         * depending on the {@link #getBlobTransferPolicy()}.
475         *
476         * @param file the file to be uploaded to some remote repo (or the broker)
477         *                depending on the strategy
478         * @return a BlobMessage
479         * @throws JMSException if the JMS provider fails to create this message due
480         *                 to some internal error.
481         */
482        public BlobMessage createBlobMessage(File file) throws JMSException {
483            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
484            configureMessage(message);
485            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
486            message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
487            message.setDeletedByBroker(true);
488            message.setName(file.getName());
489            return message;
490        }
491    
492        /**
493         * Creates an initialized <CODE>BlobMessage</CODE> object. A
494         * <CODE>BlobMessage</CODE> object is used to send a message containing
495         * the <CODE>File</CODE> content. Before the message is sent the file
496         * conent will be uploaded to the broker or some other remote repository
497         * depending on the {@link #getBlobTransferPolicy()}.
498         *
499         * @param in the stream to be uploaded to some remote repo (or the broker)
500         *                depending on the strategy
501         * @return a BlobMessage
502         * @throws JMSException if the JMS provider fails to create this message due
503         *                 to some internal error.
504         */
505        public BlobMessage createBlobMessage(InputStream in) throws JMSException {
506            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
507            configureMessage(message);
508            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
509            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
510            message.setDeletedByBroker(true);
511            return message;
512        }
513    
514        /**
515         * Indicates whether the session is in transacted mode.
516         *
517         * @return true if the session is in transacted mode
518         * @throws JMSException if there is some internal error.
519         */
520        public boolean getTransacted() throws JMSException {
521            checkClosed();
522            return isTransacted();
523        }
524    
525        /**
526         * Returns the acknowledgement mode of the session. The acknowledgement mode
527         * is set at the time that the session is created. If the session is
528         * transacted, the acknowledgement mode is ignored.
529         *
530         * @return If the session is not transacted, returns the current
531         *         acknowledgement mode for the session. If the session is
532         *         transacted, returns SESSION_TRANSACTED.
533         * @throws JMSException
534         * @see javax.jms.Connection#createSession(boolean,int)
535         * @since 1.1 exception JMSException if there is some internal error.
536         */
537        public int getAcknowledgeMode() throws JMSException {
538            checkClosed();
539            return this.acknowledgementMode;
540        }
541    
542        /**
543         * Commits all messages done in this transaction and releases any locks
544         * currently held.
545         *
546         * @throws JMSException if the JMS provider fails to commit the transaction
547         *                 due to some internal error.
548         * @throws TransactionRolledBackException if the transaction is rolled back
549         *                 due to some internal error during commit.
550         * @throws javax.jms.IllegalStateException if the method is not called by a
551         *                 transacted session.
552         */
553        public void commit() throws JMSException {
554            checkClosed();
555            if (!getTransacted()) {
556                throw new javax.jms.IllegalStateException("Not a transacted session");
557            }
558            if (LOG.isDebugEnabled()) {
559                LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
560            }
561            transactionContext.commit();
562        }
563    
564        /**
565         * Rolls back any messages done in this transaction and releases any locks
566         * currently held.
567         *
568         * @throws JMSException if the JMS provider fails to roll back the
569         *                 transaction due to some internal error.
570         * @throws javax.jms.IllegalStateException if the method is not called by a
571         *                 transacted session.
572         */
573        public void rollback() throws JMSException {
574            checkClosed();
575            if (!getTransacted()) {
576                throw new javax.jms.IllegalStateException("Not a transacted session");
577            }
578            if (LOG.isDebugEnabled()) {
579                LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
580            }
581            transactionContext.rollback();
582        }
583    
584        /**
585         * Closes the session.
586         * <P>
587         * Since a provider may allocate some resources on behalf of a session
588         * outside the JVM, clients should close the resources when they are not
589         * needed. Relying on garbage collection to eventually reclaim these
590         * resources may not be timely enough.
591         * <P>
592         * There is no need to close the producers and consumers of a closed
593         * session.
594         * <P>
595         * This call will block until a <CODE>receive</CODE> call or message
596         * listener in progress has completed. A blocked message consumer
597         * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
598         * is closed.
599         * <P>
600         * Closing a transacted session must roll back the transaction in progress.
601         * <P>
602         * This method is the only <CODE>Session</CODE> method that can be called
603         * concurrently.
604         * <P>
605         * Invoking any other <CODE>Session</CODE> method on a closed session must
606         * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
607         * closed session must <I>not </I> throw an exception.
608         *
609         * @throws JMSException if the JMS provider fails to close the session due
610         *                 to some internal error.
611         */
612        public void close() throws JMSException {
613            if (!closed) {
614                if (getTransactionContext().isInXATransaction()) {
615                    if (!synchronizationRegistered) {
616                        synchronizationRegistered = true;
617                        getTransactionContext().addSynchronization(new Synchronization() {
618    
619                                            @Override
620                                            public void afterCommit() throws Exception {
621                                                doClose();
622                                                synchronizationRegistered = false;
623                                            }
624    
625                                            @Override
626                                            public void afterRollback() throws Exception {
627                                                doClose();
628                                                synchronizationRegistered = false;
629                                            }
630                                        });
631                    }
632    
633                } else {
634                    doClose();
635                }
636            }
637        }
638    
639        private void doClose() throws JMSException {
640            boolean interrupted = Thread.interrupted();
641            dispose();
642            RemoveInfo removeCommand = info.createRemoveCommand();
643            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
644            connection.asyncSendPacket(removeCommand);
645            if (interrupted) {
646                Thread.currentThread().interrupt();
647            }
648        }
649    
650        void clearMessagesInProgress() {
651            executor.clearMessagesInProgress();
652            // we are called from inside the transport reconnection logic
653            // which involves us clearing all the connections' consumers
654            // dispatch and delivered lists. So rather than trying to
655            // grab a mutex (which could be already owned by the message
656            // listener calling the send or an ack) we allow it to complete in
657            // a separate thread via the scheduler and notify us via
658            // connection.transportInterruptionProcessingComplete()
659            //
660            for (final ActiveMQMessageConsumer consumer : consumers) {
661                consumer.inProgressClearRequired();
662                try {
663                    connection.getScheduler().executeAfterDelay(new Runnable() {
664                        public void run() {
665                            consumer.clearMessagesInProgress();
666                        }}, 0l);
667                } catch (JMSException e) {
668                    connection.onClientInternalException(e);
669                }
670            }
671        }
672    
673        void deliverAcks() {
674            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
675                ActiveMQMessageConsumer consumer = iter.next();
676                consumer.deliverAcks();
677            }
678        }
679    
680        public synchronized void dispose() throws JMSException {
681            if (!closed) {
682    
683                try {
684                    executor.stop();
685    
686                    for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
687                        ActiveMQMessageConsumer consumer = iter.next();
688                        consumer.setFailureError(connection.getFirstFailureError());
689                        consumer.dispose();
690                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
691                    }
692                    consumers.clear();
693    
694                    for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
695                        ActiveMQMessageProducer producer = iter.next();
696                        producer.dispose();
697                    }
698                    producers.clear();
699    
700                    try {
701                        if (getTransactionContext().isInLocalTransaction()) {
702                            rollback();
703                        }
704                    } catch (JMSException e) {
705                    }
706    
707                } finally {
708                    connection.removeSession(this);
709                    this.transactionContext = null;
710                    closed = true;
711                }
712            }
713        }
714    
715        /**
716         * Checks that the session is not closed then configures the message
717         */
718        protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
719            checkClosed();
720            message.setConnection(connection);
721        }
722    
723        /**
724         * Check if the session is closed. It is used for ensuring that the session
725         * is open before performing various operations.
726         *
727         * @throws IllegalStateException if the Session is closed
728         */
729        protected void checkClosed() throws IllegalStateException {
730            if (closed) {
731                throw new IllegalStateException("The Session is closed");
732            }
733        }
734    
735        /**
736         * Checks if the session is closed.
737         *
738         * @return true if the session is closed, false otherwise.
739         */
740        public boolean isClosed() {
741            return closed;
742        }
743    
744        /**
745         * Stops message delivery in this session, and restarts message delivery
746         * with the oldest unacknowledged message.
747         * <P>
748         * All consumers deliver messages in a serial order. Acknowledging a
749         * received message automatically acknowledges all messages that have been
750         * delivered to the client.
751         * <P>
752         * Restarting a session causes it to take the following actions:
753         * <UL>
754         * <LI>Stop message delivery
755         * <LI>Mark all messages that might have been delivered but not
756         * acknowledged as "redelivered"
757         * <LI>Restart the delivery sequence including all unacknowledged messages
758         * that had been previously delivered. Redelivered messages do not have to
759         * be delivered in exactly their original delivery order.
760         * </UL>
761         *
762         * @throws JMSException if the JMS provider fails to stop and restart
763         *                 message delivery due to some internal error.
764         * @throws IllegalStateException if the method is called by a transacted
765         *                 session.
766         */
767        public void recover() throws JMSException {
768    
769            checkClosed();
770            if (getTransacted()) {
771                throw new IllegalStateException("This session is transacted");
772            }
773    
774            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
775                ActiveMQMessageConsumer c = iter.next();
776                c.rollback();
777            }
778    
779        }
780    
781        /**
782         * Returns the session's distinguished message listener (optional).
783         *
784         * @return the message listener associated with this session
785         * @throws JMSException if the JMS provider fails to get the message
786         *                 listener due to an internal error.
787         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
788         * @see javax.jms.ServerSessionPool
789         * @see javax.jms.ServerSession
790         */
791        public MessageListener getMessageListener() throws JMSException {
792            checkClosed();
793            return this.messageListener;
794        }
795    
796        /**
797         * Sets the session's distinguished message listener (optional).
798         * <P>
799         * When the distinguished message listener is set, no other form of message
800         * receipt in the session can be used; however, all forms of sending
801         * messages are still supported.
802         * <P>
803         * This is an expert facility not used by regular JMS clients.
804         *
805         * @param listener the message listener to associate with this session
806         * @throws JMSException if the JMS provider fails to set the message
807         *                 listener due to an internal error.
808         * @see javax.jms.Session#getMessageListener()
809         * @see javax.jms.ServerSessionPool
810         * @see javax.jms.ServerSession
811         */
812        public void setMessageListener(MessageListener listener) throws JMSException {
813            checkClosed();
814            this.messageListener = listener;
815    
816            if (listener != null) {
817                executor.setDispatchedBySessionPool(true);
818            }
819        }
820    
821        /**
822         * Optional operation, intended to be used only by Application Servers, not
823         * by ordinary JMS clients.
824         *
825         * @see javax.jms.ServerSession
826         */
827        public void run() {
828            MessageDispatch messageDispatch;
829            while ((messageDispatch = executor.dequeueNoWait()) != null) {
830                final MessageDispatch md = messageDispatch;
831                ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
832                if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
833                    // TODO: Ack it without delivery to client
834                    continue;
835                }
836    
837                if (isClientAcknowledge()||isIndividualAcknowledge()) {
838                    message.setAcknowledgeCallback(new Callback() {
839                        public void execute() throws Exception {
840                        }
841                    });
842                }
843    
844                if (deliveryListener != null) {
845                    deliveryListener.beforeDelivery(this, message);
846                }
847    
848                md.setDeliverySequenceId(getNextDeliveryId());
849    
850                try {
851                    messageListener.onMessage(message);
852                } catch (RuntimeException e) {
853                    LOG.error("error dispatching message: ", e);
854                    // A problem while invoking the MessageListener does not
855                    // in general indicate a problem with the connection to the broker, i.e.
856                    // it will usually be sufficient to let the afterDelivery() method either
857                    // commit or roll back in order to deal with the exception.
858                    // However, we notify any registered client internal exception listener
859                    // of the problem.
860                    connection.onClientInternalException(e);
861                }
862    
863                try {
864                    MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
865                    ack.setFirstMessageId(md.getMessage().getMessageId());
866                    doStartTransaction();
867                    ack.setTransactionId(getTransactionContext().getTransactionId());
868                    if (ack.getTransactionId() != null) {
869                        getTransactionContext().addSynchronization(new Synchronization() {
870    
871                            @Override
872                            public void afterRollback() throws Exception {
873                                md.getMessage().onMessageRolledBack();
874                                // ensure we don't filter this as a duplicate
875                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
876                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
877                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
878                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
879                                    && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
880                                    // We need to NACK the messages so that they get
881                                    // sent to the
882                                    // DLQ.
883                                    // Acknowledge the last message.
884                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
885                                    ack.setFirstMessageId(md.getMessage().getMessageId());
886                                    asyncSendPacket(ack);
887                                } else {
888    
889                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
890                                    ack.setFirstMessageId(md.getMessage().getMessageId());
891                                    asyncSendPacket(ack);
892    
893                                    // Figure out how long we should wait to resend
894                                    // this message.
895                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
896                                    for (int i = 0; i < redeliveryCounter; i++) {
897                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
898                                    }
899                                    connection.getScheduler().executeAfterDelay(new Runnable() {
900    
901                                        public void run() {
902                                            ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
903                                        }
904                                    }, redeliveryDelay);
905                                }
906                            }
907                        });
908                    }
909                    asyncSendPacket(ack);
910                } catch (Throwable e) {
911                    connection.onClientInternalException(e);
912                }
913    
914                if (deliveryListener != null) {
915                    deliveryListener.afterDelivery(this, message);
916                }
917            }
918        }
919    
920        /**
921         * Creates a <CODE>MessageProducer</CODE> to send messages to the
922         * specified destination.
923         * <P>
924         * A client uses a <CODE>MessageProducer</CODE> object to send messages to
925         * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
926         * inherit from <CODE>Destination</CODE>, they can be used in the
927         * destination parameter to create a <CODE>MessageProducer</CODE> object.
928         *
929         * @param destination the <CODE>Destination</CODE> to send to, or null if
930         *                this is a producer which does not have a specified
931         *                destination.
932         * @return the MessageProducer
933         * @throws JMSException if the session fails to create a MessageProducer due
934         *                 to some internal error.
935         * @throws InvalidDestinationException if an invalid destination is
936         *                 specified.
937         * @since 1.1
938         */
939        public MessageProducer createProducer(Destination destination) throws JMSException {
940            checkClosed();
941            if (destination instanceof CustomDestination) {
942                CustomDestination customDestination = (CustomDestination)destination;
943                return customDestination.createProducer(this);
944            }
945            int timeSendOut = connection.getSendTimeout();
946            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
947        }
948    
949        /**
950         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
951         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
952         * <CODE>Destination</CODE>, they can be used in the destination
953         * parameter to create a <CODE>MessageConsumer</CODE>.
954         *
955         * @param destination the <CODE>Destination</CODE> to access.
956         * @return the MessageConsumer
957         * @throws JMSException if the session fails to create a consumer due to
958         *                 some internal error.
959         * @throws InvalidDestinationException if an invalid destination is
960         *                 specified.
961         * @since 1.1
962         */
963        public MessageConsumer createConsumer(Destination destination) throws JMSException {
964            return createConsumer(destination, (String) null);
965        }
966    
967        /**
968         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
969         * using a message selector. Since <CODE> Queue</CODE> and
970         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
971         * can be used in the destination parameter to create a
972         * <CODE>MessageConsumer</CODE>.
973         * <P>
974         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
975         * that have been sent to a destination.
976         *
977         * @param destination the <CODE>Destination</CODE> to access
978         * @param messageSelector only messages with properties matching the message
979         *                selector expression are delivered. A value of null or an
980         *                empty string indicates that there is no message selector
981         *                for the message consumer.
982         * @return the MessageConsumer
983         * @throws JMSException if the session fails to create a MessageConsumer due
984         *                 to some internal error.
985         * @throws InvalidDestinationException if an invalid destination is
986         *                 specified.
987         * @throws InvalidSelectorException if the message selector is invalid.
988         * @since 1.1
989         */
990        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
991            return createConsumer(destination, messageSelector, false);
992        }
993    
994        /**
995         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
996         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
997         * <CODE>Destination</CODE>, they can be used in the destination
998         * parameter to create a <CODE>MessageConsumer</CODE>.
999         *
1000         * @param destination the <CODE>Destination</CODE> to access.
1001         * @param messageListener the listener to use for async consumption of messages
1002         * @return the MessageConsumer
1003         * @throws JMSException if the session fails to create a consumer due to
1004         *                 some internal error.
1005         * @throws InvalidDestinationException if an invalid destination is
1006         *                 specified.
1007         * @since 1.1
1008         */
1009        public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1010            return createConsumer(destination, null, messageListener);
1011        }
1012    
1013        /**
1014         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1015         * using a message selector. Since <CODE> Queue</CODE> and
1016         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1017         * can be used in the destination parameter to create a
1018         * <CODE>MessageConsumer</CODE>.
1019         * <P>
1020         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1021         * that have been sent to a destination.
1022         *
1023         * @param destination the <CODE>Destination</CODE> to access
1024         * @param messageSelector only messages with properties matching the message
1025         *                selector expression are delivered. A value of null or an
1026         *                empty string indicates that there is no message selector
1027         *                for the message consumer.
1028         * @param messageListener the listener to use for async consumption of messages
1029         * @return the MessageConsumer
1030         * @throws JMSException if the session fails to create a MessageConsumer due
1031         *                 to some internal error.
1032         * @throws InvalidDestinationException if an invalid destination is
1033         *                 specified.
1034         * @throws InvalidSelectorException if the message selector is invalid.
1035         * @since 1.1
1036         */
1037        public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1038            return createConsumer(destination, messageSelector, false, messageListener);
1039        }
1040    
1041        /**
1042         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1043         * using a message selector. This method can specify whether messages
1044         * published by its own connection should be delivered to it, if the
1045         * destination is a topic.
1046         * <P>
1047         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1048         * <CODE>Destination</CODE>, they can be used in the destination
1049         * parameter to create a <CODE>MessageConsumer</CODE>.
1050         * <P>
1051         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1052         * that have been published to a destination.
1053         * <P>
1054         * In some cases, a connection may both publish and subscribe to a topic.
1055         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1056         * inhibit the delivery of messages published by its own connection. The
1057         * default value for this attribute is False. The <CODE>noLocal</CODE>
1058         * value must be supported by destinations that are topics.
1059         *
1060         * @param destination the <CODE>Destination</CODE> to access
1061         * @param messageSelector only messages with properties matching the message
1062         *                selector expression are delivered. A value of null or an
1063         *                empty string indicates that there is no message selector
1064         *                for the message consumer.
1065         * @param noLocal - if true, and the destination is a topic, inhibits the
1066         *                delivery of messages published by its own connection. The
1067         *                behavior for <CODE>NoLocal</CODE> is not specified if
1068         *                the destination is a queue.
1069         * @return the MessageConsumer
1070         * @throws JMSException if the session fails to create a MessageConsumer due
1071         *                 to some internal error.
1072         * @throws InvalidDestinationException if an invalid destination is
1073         *                 specified.
1074         * @throws InvalidSelectorException if the message selector is invalid.
1075         * @since 1.1
1076         */
1077        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1078            return createConsumer(destination, messageSelector, noLocal, null);
1079        }
1080    
1081        /**
1082         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1083         * using a message selector. This method can specify whether messages
1084         * published by its own connection should be delivered to it, if the
1085         * destination is a topic.
1086         * <P>
1087         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1088         * <CODE>Destination</CODE>, they can be used in the destination
1089         * parameter to create a <CODE>MessageConsumer</CODE>.
1090         * <P>
1091         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1092         * that have been published to a destination.
1093         * <P>
1094         * In some cases, a connection may both publish and subscribe to a topic.
1095         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1096         * inhibit the delivery of messages published by its own connection. The
1097         * default value for this attribute is False. The <CODE>noLocal</CODE>
1098         * value must be supported by destinations that are topics.
1099         *
1100         * @param destination the <CODE>Destination</CODE> to access
1101         * @param messageSelector only messages with properties matching the message
1102         *                selector expression are delivered. A value of null or an
1103         *                empty string indicates that there is no message selector
1104         *                for the message consumer.
1105         * @param noLocal - if true, and the destination is a topic, inhibits the
1106         *                delivery of messages published by its own connection. The
1107         *                behavior for <CODE>NoLocal</CODE> is not specified if
1108         *                the destination is a queue.
1109         * @param messageListener the listener to use for async consumption of messages
1110         * @return the MessageConsumer
1111         * @throws JMSException if the session fails to create a MessageConsumer due
1112         *                 to some internal error.
1113         * @throws InvalidDestinationException if an invalid destination is
1114         *                 specified.
1115         * @throws InvalidSelectorException if the message selector is invalid.
1116         * @since 1.1
1117         */
1118        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1119            checkClosed();
1120    
1121            if (destination instanceof CustomDestination) {
1122                CustomDestination customDestination = (CustomDestination)destination;
1123                return customDestination.createConsumer(this, messageSelector, noLocal);
1124            }
1125    
1126            ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1127            int prefetch = 0;
1128            if (destination instanceof Topic) {
1129                prefetch = prefetchPolicy.getTopicPrefetch();
1130            } else {
1131                prefetch = prefetchPolicy.getQueuePrefetch();
1132            }
1133            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1134            return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1135                    prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1136        }
1137    
1138        /**
1139         * Creates a queue identity given a <CODE>Queue</CODE> name.
1140         * <P>
1141         * This facility is provided for the rare cases where clients need to
1142         * dynamically manipulate queue identity. It allows the creation of a queue
1143         * identity with a provider-specific name. Clients that depend on this
1144         * ability are not portable.
1145         * <P>
1146         * Note that this method is not for creating the physical queue. The
1147         * physical creation of queues is an administrative task and is not to be
1148         * initiated by the JMS API. The one exception is the creation of temporary
1149         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1150         * method.
1151         *
1152         * @param queueName the name of this <CODE>Queue</CODE>
1153         * @return a <CODE>Queue</CODE> with the given name
1154         * @throws JMSException if the session fails to create a queue due to some
1155         *                 internal error.
1156         * @since 1.1
1157         */
1158        public Queue createQueue(String queueName) throws JMSException {
1159            checkClosed();
1160            if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1161                return new ActiveMQTempQueue(queueName);
1162            }
1163            return new ActiveMQQueue(queueName);
1164        }
1165    
1166        /**
1167         * Creates a topic identity given a <CODE>Topic</CODE> name.
1168         * <P>
1169         * This facility is provided for the rare cases where clients need to
1170         * dynamically manipulate topic identity. This allows the creation of a
1171         * topic identity with a provider-specific name. Clients that depend on this
1172         * ability are not portable.
1173         * <P>
1174         * Note that this method is not for creating the physical topic. The
1175         * physical creation of topics is an administrative task and is not to be
1176         * initiated by the JMS API. The one exception is the creation of temporary
1177         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1178         * method.
1179         *
1180         * @param topicName the name of this <CODE>Topic</CODE>
1181         * @return a <CODE>Topic</CODE> with the given name
1182         * @throws JMSException if the session fails to create a topic due to some
1183         *                 internal error.
1184         * @since 1.1
1185         */
1186        public Topic createTopic(String topicName) throws JMSException {
1187            checkClosed();
1188            if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1189                return new ActiveMQTempTopic(topicName);
1190            }
1191            return new ActiveMQTopic(topicName);
1192        }
1193    
1194        /**
1195         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1196         * the specified queue.
1197         *
1198         * @param queue the <CODE>queue</CODE> to access
1199         * @exception InvalidDestinationException if an invalid destination is
1200         *                    specified
1201         * @since 1.1
1202         */
1203        /**
1204         * Creates a durable subscriber to the specified topic.
1205         * <P>
1206         * If a client needs to receive all the messages published on a topic,
1207         * including the ones published while the subscriber is inactive, it uses a
1208         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1209         * record of this durable subscription and insures that all messages from
1210         * the topic's publishers are retained until they are acknowledged by this
1211         * durable subscriber or they have expired.
1212         * <P>
1213         * Sessions with durable subscribers must always provide the same client
1214         * identifier. In addition, each client must specify a name that uniquely
1215         * identifies (within client identifier) each durable subscription it
1216         * creates. Only one session at a time can have a
1217         * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1218         * <P>
1219         * A client can change an existing durable subscription by creating a
1220         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1221         * and/or message selector. Changing a durable subscriber is equivalent to
1222         * unsubscribing (deleting) the old one and creating a new one.
1223         * <P>
1224         * In some cases, a connection may both publish and subscribe to a topic.
1225         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1226         * inhibit the delivery of messages published by its own connection. The
1227         * default value for this attribute is false.
1228         *
1229         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1230         * @param name the name used to identify this subscription
1231         * @return the TopicSubscriber
1232         * @throws JMSException if the session fails to create a subscriber due to
1233         *                 some internal error.
1234         * @throws InvalidDestinationException if an invalid topic is specified.
1235         * @since 1.1
1236         */
1237        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1238            checkClosed();
1239            return createDurableSubscriber(topic, name, null, false);
1240        }
1241    
1242        /**
1243         * Creates a durable subscriber to the specified topic, using a message
1244         * selector and specifying whether messages published by its own connection
1245         * should be delivered to it.
1246         * <P>
1247         * If a client needs to receive all the messages published on a topic,
1248         * including the ones published while the subscriber is inactive, it uses a
1249         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1250         * record of this durable subscription and insures that all messages from
1251         * the topic's publishers are retained until they are acknowledged by this
1252         * durable subscriber or they have expired.
1253         * <P>
1254         * Sessions with durable subscribers must always provide the same client
1255         * identifier. In addition, each client must specify a name which uniquely
1256         * identifies (within client identifier) each durable subscription it
1257         * creates. Only one session at a time can have a
1258         * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1259         * inactive durable subscriber is one that exists but does not currently
1260         * have a message consumer associated with it.
1261         * <P>
1262         * A client can change an existing durable subscription by creating a
1263         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1264         * and/or message selector. Changing a durable subscriber is equivalent to
1265         * unsubscribing (deleting) the old one and creating a new one.
1266         *
1267         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1268         * @param name the name used to identify this subscription
1269         * @param messageSelector only messages with properties matching the message
1270         *                selector expression are delivered. A value of null or an
1271         *                empty string indicates that there is no message selector
1272         *                for the message consumer.
1273         * @param noLocal if set, inhibits the delivery of messages published by its
1274         *                own connection
1275         * @return the Queue Browser
1276         * @throws JMSException if the session fails to create a subscriber due to
1277         *                 some internal error.
1278         * @throws InvalidDestinationException if an invalid topic is specified.
1279         * @throws InvalidSelectorException if the message selector is invalid.
1280         * @since 1.1
1281         */
1282        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1283            checkClosed();
1284    
1285            if (isIndividualAcknowledge()) {
1286                throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
1287                                                 "INDIVIDUAL_ACKNOWLEDGE mode.", null);
1288            }
1289    
1290            if (topic instanceof CustomDestination) {
1291                CustomDestination customDestination = (CustomDestination)topic;
1292                return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1293            }
1294    
1295            connection.checkClientIDWasManuallySpecified();
1296            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1297            int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1298            int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1299            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1300                                               noLocal, false, asyncDispatch);
1301        }
1302    
1303        /**
1304         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1305         * the specified queue.
1306         *
1307         * @param queue the <CODE>queue</CODE> to access
1308         * @return the Queue Browser
1309         * @throws JMSException if the session fails to create a browser due to some
1310         *                 internal error.
1311         * @throws InvalidDestinationException if an invalid destination is
1312         *                 specified
1313         * @since 1.1
1314         */
1315        public QueueBrowser createBrowser(Queue queue) throws JMSException {
1316            checkClosed();
1317            return createBrowser(queue, null);
1318        }
1319    
1320        /**
1321         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1322         * the specified queue using a message selector.
1323         *
1324         * @param queue the <CODE>queue</CODE> to access
1325         * @param messageSelector only messages with properties matching the message
1326         *                selector expression are delivered. A value of null or an
1327         *                empty string indicates that there is no message selector
1328         *                for the message consumer.
1329         * @return the Queue Browser
1330         * @throws JMSException if the session fails to create a browser due to some
1331         *                 internal error.
1332         * @throws InvalidDestinationException if an invalid destination is
1333         *                 specified
1334         * @throws InvalidSelectorException if the message selector is invalid.
1335         * @since 1.1
1336         */
1337        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1338            checkClosed();
1339            return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1340        }
1341    
1342        /**
1343         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1344         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1345         *
1346         * @return a temporary queue identity
1347         * @throws JMSException if the session fails to create a temporary queue due
1348         *                 to some internal error.
1349         * @since 1.1
1350         */
1351        public TemporaryQueue createTemporaryQueue() throws JMSException {
1352            checkClosed();
1353            return (TemporaryQueue)connection.createTempDestination(false);
1354        }
1355    
1356        /**
1357         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1358         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1359         *
1360         * @return a temporary topic identity
1361         * @throws JMSException if the session fails to create a temporary topic due
1362         *                 to some internal error.
1363         * @since 1.1
1364         */
1365        public TemporaryTopic createTemporaryTopic() throws JMSException {
1366            checkClosed();
1367            return (TemporaryTopic)connection.createTempDestination(true);
1368        }
1369    
1370        /**
1371         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1372         * the specified queue.
1373         *
1374         * @param queue the <CODE>Queue</CODE> to access
1375         * @return
1376         * @throws JMSException if the session fails to create a receiver due to
1377         *                 some internal error.
1378         * @throws JMSException
1379         * @throws InvalidDestinationException if an invalid queue is specified.
1380         */
1381        public QueueReceiver createReceiver(Queue queue) throws JMSException {
1382            checkClosed();
1383            return createReceiver(queue, null);
1384        }
1385    
1386        /**
1387         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1388         * the specified queue using a message selector.
1389         *
1390         * @param queue the <CODE>Queue</CODE> to access
1391         * @param messageSelector only messages with properties matching the message
1392         *                selector expression are delivered. A value of null or an
1393         *                empty string indicates that there is no message selector
1394         *                for the message consumer.
1395         * @return QueueReceiver
1396         * @throws JMSException if the session fails to create a receiver due to
1397         *                 some internal error.
1398         * @throws InvalidDestinationException if an invalid queue is specified.
1399         * @throws InvalidSelectorException if the message selector is invalid.
1400         */
1401        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1402            checkClosed();
1403    
1404            if (queue instanceof CustomDestination) {
1405                CustomDestination customDestination = (CustomDestination)queue;
1406                return customDestination.createReceiver(this, messageSelector);
1407            }
1408    
1409            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1410            return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1411                                             prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1412        }
1413    
1414        /**
1415         * Creates a <CODE>QueueSender</CODE> object to send messages to the
1416         * specified queue.
1417         *
1418         * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1419         *                unidentified producer
1420         * @return QueueSender
1421         * @throws JMSException if the session fails to create a sender due to some
1422         *                 internal error.
1423         * @throws InvalidDestinationException if an invalid queue is specified.
1424         */
1425        public QueueSender createSender(Queue queue) throws JMSException {
1426            checkClosed();
1427            if (queue instanceof CustomDestination) {
1428                CustomDestination customDestination = (CustomDestination)queue;
1429                return customDestination.createSender(this);
1430            }
1431            int timeSendOut = connection.getSendTimeout();
1432            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1433        }
1434    
1435        /**
1436         * Creates a nondurable subscriber to the specified topic. <p/>
1437         * <P>
1438         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1439         * that have been published to a topic. <p/>
1440         * <P>
1441         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1442         * receive only messages that are published while they are active. <p/>
1443         * <P>
1444         * In some cases, a connection may both publish and subscribe to a topic.
1445         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1446         * inhibit the delivery of messages published by its own connection. The
1447         * default value for this attribute is false.
1448         *
1449         * @param topic the <CODE>Topic</CODE> to subscribe to
1450         * @return TopicSubscriber
1451         * @throws JMSException if the session fails to create a subscriber due to
1452         *                 some internal error.
1453         * @throws InvalidDestinationException if an invalid topic is specified.
1454         */
1455        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1456            checkClosed();
1457            return createSubscriber(topic, null, false);
1458        }
1459    
1460        /**
1461         * Creates a nondurable subscriber to the specified topic, using a message
1462         * selector or specifying whether messages published by its own connection
1463         * should be delivered to it. <p/>
1464         * <P>
1465         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1466         * that have been published to a topic. <p/>
1467         * <P>
1468         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1469         * receive only messages that are published while they are active. <p/>
1470         * <P>
1471         * Messages filtered out by a subscriber's message selector will never be
1472         * delivered to the subscriber. From the subscriber's perspective, they do
1473         * not exist. <p/>
1474         * <P>
1475         * In some cases, a connection may both publish and subscribe to a topic.
1476         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1477         * inhibit the delivery of messages published by its own connection. The
1478         * default value for this attribute is false.
1479         *
1480         * @param topic the <CODE>Topic</CODE> to subscribe to
1481         * @param messageSelector only messages with properties matching the message
1482         *                selector expression are delivered. A value of null or an
1483         *                empty string indicates that there is no message selector
1484         *                for the message consumer.
1485         * @param noLocal if set, inhibits the delivery of messages published by its
1486         *                own connection
1487         * @return TopicSubscriber
1488         * @throws JMSException if the session fails to create a subscriber due to
1489         *                 some internal error.
1490         * @throws InvalidDestinationException if an invalid topic is specified.
1491         * @throws InvalidSelectorException if the message selector is invalid.
1492         */
1493        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1494            checkClosed();
1495    
1496            if (topic instanceof CustomDestination) {
1497                CustomDestination customDestination = (CustomDestination)topic;
1498                return customDestination.createSubscriber(this, messageSelector, noLocal);
1499            }
1500    
1501            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1502            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1503                .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1504        }
1505    
1506        /**
1507         * Creates a publisher for the specified topic. <p/>
1508         * <P>
1509         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1510         * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1511         * a topic, it defines a new sequence of messages that have no ordering
1512         * relationship with the messages it has previously sent.
1513         *
1514         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1515         *                an unidentified producer
1516         * @return TopicPublisher
1517         * @throws JMSException if the session fails to create a publisher due to
1518         *                 some internal error.
1519         * @throws InvalidDestinationException if an invalid topic is specified.
1520         */
1521        public TopicPublisher createPublisher(Topic topic) throws JMSException {
1522            checkClosed();
1523    
1524            if (topic instanceof CustomDestination) {
1525                CustomDestination customDestination = (CustomDestination)topic;
1526                return customDestination.createPublisher(this);
1527            }
1528            int timeSendOut = connection.getSendTimeout();
1529            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1530        }
1531    
1532        /**
1533         * Unsubscribes a durable subscription that has been created by a client.
1534         * <P>
1535         * This method deletes the state being maintained on behalf of the
1536         * subscriber by its provider.
1537         * <P>
1538         * It is erroneous for a client to delete a durable subscription while there
1539         * is an active <CODE>MessageConsumer </CODE> or
1540         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1541         * message is part of a pending transaction or has not been acknowledged in
1542         * the session.
1543         *
1544         * @param name the name used to identify this subscription
1545         * @throws JMSException if the session fails to unsubscribe to the durable
1546         *                 subscription due to some internal error.
1547         * @throws InvalidDestinationException if an invalid subscription name is
1548         *                 specified.
1549         * @since 1.1
1550         */
1551        public void unsubscribe(String name) throws JMSException {
1552            checkClosed();
1553            connection.unsubscribe(name);
1554        }
1555    
1556        public void dispatch(MessageDispatch messageDispatch) {
1557            try {
1558                executor.execute(messageDispatch);
1559            } catch (InterruptedException e) {
1560                Thread.currentThread().interrupt();
1561                connection.onClientInternalException(e);
1562            }
1563        }
1564    
1565        /**
1566         * Acknowledges all consumed messages of the session of this consumed
1567         * message.
1568         * <P>
1569         * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1570         * for use when a client has specified that its JMS session's consumed
1571         * messages are to be explicitly acknowledged. By invoking
1572         * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1573         * all messages consumed by the session that the message was delivered to.
1574         * <P>
1575         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1576         * sessions and sessions specified to use implicit acknowledgement modes.
1577         * <P>
1578         * A client may individually acknowledge each message as it is consumed, or
1579         * it may choose to acknowledge messages as an application-defined group
1580         * (which is done by calling acknowledge on the last received message of the
1581         * group, thereby acknowledging all messages consumed by the session.)
1582         * <P>
1583         * Messages that have been received but not acknowledged may be redelivered.
1584         *
1585         * @throws JMSException if the JMS provider fails to acknowledge the
1586         *                 messages due to some internal error.
1587         * @throws javax.jms.IllegalStateException if this method is called on a
1588         *                 closed session.
1589         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1590         */
1591        public void acknowledge() throws JMSException {
1592            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1593                ActiveMQMessageConsumer c = iter.next();
1594                c.acknowledge();
1595            }
1596        }
1597    
1598        /**
1599         * Add a message consumer.
1600         *
1601         * @param consumer - message consumer.
1602         * @throws JMSException
1603         */
1604        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1605            this.consumers.add(consumer);
1606            if (consumer.isDurableSubscriber()) {
1607                stats.onCreateDurableSubscriber();
1608            }
1609            this.connection.addDispatcher(consumer.getConsumerId(), this);
1610        }
1611    
1612        /**
1613         * Remove the message consumer.
1614         *
1615         * @param consumer - consumer to be removed.
1616         * @throws JMSException
1617         */
1618        protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1619            this.connection.removeDispatcher(consumer.getConsumerId());
1620            if (consumer.isDurableSubscriber()) {
1621                stats.onRemoveDurableSubscriber();
1622            }
1623            this.consumers.remove(consumer);
1624            this.connection.removeDispatcher(consumer);
1625        }
1626    
1627        /**
1628         * Adds a message producer.
1629         *
1630         * @param producer - message producer to be added.
1631         * @throws JMSException
1632         */
1633        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1634            this.producers.add(producer);
1635            this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1636        }
1637    
1638        /**
1639         * Removes a message producer.
1640         *
1641         * @param producer - message producer to be removed.
1642         * @throws JMSException
1643         */
1644        protected void removeProducer(ActiveMQMessageProducer producer) {
1645            this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1646            this.producers.remove(producer);
1647        }
1648    
1649        /**
1650         * Start this Session.
1651         *
1652         * @throws JMSException
1653         */
1654        protected void start() throws JMSException {
1655            started.set(true);
1656            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1657                ActiveMQMessageConsumer c = iter.next();
1658                c.start();
1659            }
1660            executor.start();
1661        }
1662    
1663        /**
1664         * Stops this session.
1665         *
1666         * @throws JMSException
1667         */
1668        protected void stop() throws JMSException {
1669    
1670            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1671                ActiveMQMessageConsumer c = iter.next();
1672                c.stop();
1673            }
1674    
1675            started.set(false);
1676            executor.stop();
1677        }
1678    
1679        /**
1680         * Returns the session id.
1681         *
1682         * @return value - session id.
1683         */
1684        protected SessionId getSessionId() {
1685            return info.getSessionId();
1686        }
1687    
1688        /**
1689         * @return
1690         */
1691        protected ConsumerId getNextConsumerId() {
1692            return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1693        }
1694    
1695        /**
1696         * @return
1697         */
1698        protected ProducerId getNextProducerId() {
1699            return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1700        }
1701    
1702        /**
1703         * Sends the message for dispatch by the broker.
1704         *
1705         *
1706         * @param producer - message producer.
1707         * @param destination - message destination.
1708         * @param message - message to be sent.
1709         * @param deliveryMode - JMS messsage delivery mode.
1710         * @param priority - message priority.
1711         * @param timeToLive - message expiration.
1712         * @param producerWindow
1713         * @param onComplete
1714         * @throws JMSException
1715         */
1716        protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1717                            MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1718    
1719            checkClosed();
1720            if (destination.isTemporary() && connection.isDeleted(destination)) {
1721                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1722            }
1723            synchronized (sendMutex) {
1724                // tell the Broker we are about to start a new transaction
1725                doStartTransaction();
1726                TransactionId txid = transactionContext.getTransactionId();
1727                long sequenceNumber = producer.getMessageSequence();
1728    
1729                //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1730                message.setJMSDeliveryMode(deliveryMode);
1731                long expiration = 0L;
1732                if (!producer.getDisableMessageTimestamp()) {
1733                    long timeStamp = System.currentTimeMillis();
1734                    message.setJMSTimestamp(timeStamp);
1735                    if (timeToLive > 0) {
1736                        expiration = timeToLive + timeStamp;
1737                    }
1738                }
1739                message.setJMSExpiration(expiration);
1740                message.setJMSPriority(priority);
1741                message.setJMSRedelivered(false);
1742    
1743                // transform to our own message format here
1744                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1745    
1746                // Set the message id.
1747                if (msg == message) {
1748                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1749                } else {
1750                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1751                    message.setJMSMessageID(msg.getMessageId().toString());
1752                }
1753                //clear the brokerPath in case we are re-sending this message
1754                msg.setBrokerPath(null);
1755                // destination format is provider specific so only set on transformed message
1756                msg.setJMSDestination(destination);
1757    
1758                msg.setTransactionId(txid);
1759                if (connection.isCopyMessageOnSend()) {
1760                    msg = (ActiveMQMessage)msg.copy();
1761                }
1762                msg.setConnection(connection);
1763                msg.onSend();
1764                msg.setProducerId(msg.getMessageId().getProducerId());
1765                if (LOG.isTraceEnabled()) {
1766                    LOG.trace(getSessionId() + " sending message: " + msg);
1767                }
1768                if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1769                    this.connection.asyncSendPacket(msg);
1770                    if (producerWindow != null) {
1771                        // Since we defer lots of the marshaling till we hit the
1772                        // wire, this might not
1773                        // provide and accurate size. We may change over to doing
1774                        // more aggressive marshaling,
1775                        // to get more accurate sizes.. this is more important once
1776                        // users start using producer window
1777                        // flow control.
1778                        int size = msg.getSize();
1779                        producerWindow.increaseUsage(size);
1780                    }
1781                } else {
1782                    if (sendTimeout > 0 && onComplete==null) {
1783                        this.connection.syncSendPacket(msg,sendTimeout);
1784                    }else {
1785                        this.connection.syncSendPacket(msg, onComplete);
1786                    }
1787                }
1788    
1789            }
1790        }
1791    
1792        /**
1793         * Send TransactionInfo to indicate transaction has started
1794         *
1795         * @throws JMSException if some internal error occurs
1796         */
1797        protected void doStartTransaction() throws JMSException {
1798            if (getTransacted() && !transactionContext.isInXATransaction()) {
1799                transactionContext.begin();
1800            }
1801        }
1802    
1803        /**
1804         * Checks whether the session has unconsumed messages.
1805         *
1806         * @return true - if there are unconsumed messages.
1807         */
1808        public boolean hasUncomsumedMessages() {
1809            return executor.hasUncomsumedMessages();
1810        }
1811    
1812        /**
1813         * Checks whether the session uses transactions.
1814         *
1815         * @return true - if the session uses transactions.
1816         */
1817        public boolean isTransacted() {
1818            return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1819        }
1820    
1821        /**
1822         * Checks whether the session used client acknowledgment.
1823         *
1824         * @return true - if the session uses client acknowledgment.
1825         */
1826        protected boolean isClientAcknowledge() {
1827            return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1828        }
1829    
1830        /**
1831         * Checks whether the session used auto acknowledgment.
1832         *
1833         * @return true - if the session uses client acknowledgment.
1834         */
1835        public boolean isAutoAcknowledge() {
1836            return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1837        }
1838    
1839        /**
1840         * Checks whether the session used dup ok acknowledgment.
1841         *
1842         * @return true - if the session uses client acknowledgment.
1843         */
1844        public boolean isDupsOkAcknowledge() {
1845            return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1846        }
1847    
1848        public boolean isIndividualAcknowledge(){
1849            return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1850        }
1851    
1852        /**
1853         * Returns the message delivery listener.
1854         *
1855         * @return deliveryListener - message delivery listener.
1856         */
1857        public DeliveryListener getDeliveryListener() {
1858            return deliveryListener;
1859        }
1860    
1861        /**
1862         * Sets the message delivery listener.
1863         *
1864         * @param deliveryListener - message delivery listener.
1865         */
1866        public void setDeliveryListener(DeliveryListener deliveryListener) {
1867            this.deliveryListener = deliveryListener;
1868        }
1869    
1870        /**
1871         * Returns the SessionInfo bean.
1872         *
1873         * @return info - SessionInfo bean.
1874         * @throws JMSException
1875         */
1876        protected SessionInfo getSessionInfo() throws JMSException {
1877            SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1878            return info;
1879        }
1880    
1881        /**
1882         * Send the asynchronus command.
1883         *
1884         * @param command - command to be executed.
1885         * @throws JMSException
1886         */
1887        public void asyncSendPacket(Command command) throws JMSException {
1888            connection.asyncSendPacket(command);
1889        }
1890    
1891        /**
1892         * Send the synchronus command.
1893         *
1894         * @param command - command to be executed.
1895         * @return Response
1896         * @throws JMSException
1897         */
1898        public Response syncSendPacket(Command command) throws JMSException {
1899            return connection.syncSendPacket(command);
1900        }
1901    
1902        public long getNextDeliveryId() {
1903            return deliveryIdGenerator.getNextSequenceId();
1904        }
1905    
1906        public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1907    
1908            List<MessageDispatch> c = unconsumedMessages.removeAll();
1909            for (MessageDispatch md : c) {
1910                this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1911            }
1912            Collections.reverse(c);
1913    
1914            for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1915                MessageDispatch md = iter.next();
1916                executor.executeFirst(md);
1917            }
1918    
1919        }
1920    
1921        public boolean isRunning() {
1922            return started.get();
1923        }
1924    
1925        public boolean isAsyncDispatch() {
1926            return asyncDispatch;
1927        }
1928    
1929        public void setAsyncDispatch(boolean asyncDispatch) {
1930            this.asyncDispatch = asyncDispatch;
1931        }
1932    
1933        /**
1934         * @return Returns the sessionAsyncDispatch.
1935         */
1936        public boolean isSessionAsyncDispatch() {
1937            return sessionAsyncDispatch;
1938        }
1939    
1940        /**
1941         * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1942         */
1943        public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1944            this.sessionAsyncDispatch = sessionAsyncDispatch;
1945        }
1946    
1947        public MessageTransformer getTransformer() {
1948            return transformer;
1949        }
1950    
1951        public ActiveMQConnection getConnection() {
1952            return connection;
1953        }
1954    
1955        /**
1956         * Sets the transformer used to transform messages before they are sent on
1957         * to the JMS bus or when they are received from the bus but before they are
1958         * delivered to the JMS client
1959         */
1960        public void setTransformer(MessageTransformer transformer) {
1961            this.transformer = transformer;
1962        }
1963    
1964        public BlobTransferPolicy getBlobTransferPolicy() {
1965            return blobTransferPolicy;
1966        }
1967    
1968        /**
1969         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1970         * OBjects) are transferred from producers to brokers to consumers
1971         */
1972        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1973            this.blobTransferPolicy = blobTransferPolicy;
1974        }
1975    
1976        public List<MessageDispatch> getUnconsumedMessages() {
1977            return executor.getUnconsumedMessages();
1978        }
1979    
1980        @Override
1981        public String toString() {
1982            return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1983        }
1984    
1985        public void checkMessageListener() throws JMSException {
1986            if (messageListener != null) {
1987                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1988            }
1989            for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1990                ActiveMQMessageConsumer consumer = i.next();
1991                if (consumer.getMessageListener() != null) {
1992                    throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1993                }
1994            }
1995        }
1996    
1997        protected void setOptimizeAcknowledge(boolean value) {
1998            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1999                ActiveMQMessageConsumer c = iter.next();
2000                c.setOptimizeAcknowledge(value);
2001            }
2002        }
2003    
2004        protected void setPrefetchSize(ConsumerId id, int prefetch) {
2005            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2006                ActiveMQMessageConsumer c = iter.next();
2007                if (c.getConsumerId().equals(id)) {
2008                    c.setPrefetchSize(prefetch);
2009                    break;
2010                }
2011            }
2012        }
2013    
2014        protected void close(ConsumerId id) {
2015            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2016                ActiveMQMessageConsumer c = iter.next();
2017                if (c.getConsumerId().equals(id)) {
2018                    try {
2019                        c.close();
2020                    } catch (JMSException e) {
2021                        LOG.warn("Exception closing consumer", e);
2022                    }
2023                    LOG.warn("Closed consumer on Command");
2024                    break;
2025                }
2026            }
2027        }
2028    
2029        public boolean isInUse(ActiveMQTempDestination destination) {
2030            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2031                ActiveMQMessageConsumer c = iter.next();
2032                if (c.isInUse(destination)) {
2033                    return true;
2034                }
2035            }
2036            return false;
2037        }
2038    
2039        /**
2040         * highest sequence id of the last message delivered by this session.
2041         * Passed to the broker in the close command, maintained by dispose()
2042         * @return lastDeliveredSequenceId
2043         */
2044        public long getLastDeliveredSequenceId() {
2045            return lastDeliveredSequenceId;
2046        }
2047    
2048        protected void sendAck(MessageAck ack) throws JMSException {
2049            sendAck(ack,false);
2050        }
2051    
2052        protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2053            if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2054                asyncSendPacket(ack);
2055            } else {
2056                syncSendPacket(ack);
2057            }
2058        }
2059    
2060        protected Scheduler getScheduler() throws JMSException {
2061            return this.connection.getScheduler();
2062        }
2063    
2064        protected ThreadPoolExecutor getConnectionExecutor() {
2065            return this.connectionExecutor;
2066        }
2067    }