001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.io.Serializable;
020    import java.util.Iterator;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    
023    import javax.jms.BytesMessage;
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    import javax.jms.MapMessage;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.MessageListener;
030    import javax.jms.MessageProducer;
031    import javax.jms.ObjectMessage;
032    import javax.jms.Queue;
033    import javax.jms.QueueBrowser;
034    import javax.jms.QueueReceiver;
035    import javax.jms.QueueSender;
036    import javax.jms.QueueSession;
037    import javax.jms.Session;
038    import javax.jms.StreamMessage;
039    import javax.jms.TemporaryQueue;
040    import javax.jms.TemporaryTopic;
041    import javax.jms.TextMessage;
042    import javax.jms.Topic;
043    import javax.jms.TopicPublisher;
044    import javax.jms.TopicSession;
045    import javax.jms.TopicSubscriber;
046    import javax.jms.XASession;
047    import javax.transaction.xa.XAResource;
048    
049    import org.apache.activemq.ActiveMQMessageProducer;
050    import org.apache.activemq.ActiveMQQueueSender;
051    import org.apache.activemq.ActiveMQSession;
052    import org.apache.activemq.ActiveMQTopicPublisher;
053    import org.apache.activemq.AlreadyClosedException;
054    import org.slf4j.Logger;
055    import org.slf4j.LoggerFactory;
056    
057    public class PooledSession implements Session, TopicSession, QueueSession, XASession {
058        private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
059    
060        private ActiveMQSession session;
061        private SessionPool sessionPool;
062        private ActiveMQMessageProducer messageProducer;
063        private ActiveMQQueueSender queueSender;
064        private ActiveMQTopicPublisher topicPublisher;
065        private boolean transactional = true;
066        private boolean ignoreClose;
067    
068        private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
069        private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
070        private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
071            new CopyOnWriteArrayList<PooledSessionEventListener>();
072        private boolean isXa;
073    
074        public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
075            this.session = aSession;
076            this.sessionPool = sessionPool;
077            this.transactional = session.isTransacted();
078        }
079    
080        public void addTempDestEventListener(PooledSessionEventListener listener) {
081            this.tempDestEventListeners.add(listener);
082        }
083    
084        protected boolean isIgnoreClose() {
085            return ignoreClose;
086        }
087    
088        protected void setIgnoreClose(boolean ignoreClose) {
089            this.ignoreClose = ignoreClose;
090        }
091    
092        public void close() throws JMSException {
093            if (!ignoreClose) {
094                // TODO a cleaner way to reset??
095    
096                boolean invalidate = false;
097                try {
098                    // lets reset the session
099                    getInternalSession().setMessageListener(null);
100    
101                    // Close any consumers and browsers that may have been created.
102                    for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
103                        MessageConsumer consumer = iter.next();
104                        consumer.close();
105                    }
106    
107                    for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
108                        QueueBrowser browser = iter.next();
109                        browser.close();
110                    }
111    
112                    if (transactional && !isXa) {
113                        try {
114                            getInternalSession().rollback();
115                        } catch (JMSException e) {
116                            invalidate = true;
117                            LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
118                        }
119                    }
120                } catch (JMSException ex) {
121                    invalidate = true;
122                    LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
123                } finally {
124                    consumers.clear();
125                    browsers.clear();
126                }
127    
128                if (invalidate) {
129                    // lets close the session and not put the session back into
130                    // the pool
131                    if (session != null) {
132                        try {
133                            session.close();
134                        } catch (JMSException e1) {
135                            LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
136                        }
137                        session = null;
138                    }
139                    sessionPool.invalidateSession(this);
140                } else {
141                    sessionPool.returnSession(this);
142                }
143            }
144        }
145    
146        public void commit() throws JMSException {
147            getInternalSession().commit();
148        }
149    
150        public BytesMessage createBytesMessage() throws JMSException {
151            return getInternalSession().createBytesMessage();
152        }
153    
154        public MapMessage createMapMessage() throws JMSException {
155            return getInternalSession().createMapMessage();
156        }
157    
158        public Message createMessage() throws JMSException {
159            return getInternalSession().createMessage();
160        }
161    
162        public ObjectMessage createObjectMessage() throws JMSException {
163            return getInternalSession().createObjectMessage();
164        }
165    
166        public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
167            return getInternalSession().createObjectMessage(serializable);
168        }
169    
170        public Queue createQueue(String s) throws JMSException {
171            return getInternalSession().createQueue(s);
172        }
173    
174        public StreamMessage createStreamMessage() throws JMSException {
175            return getInternalSession().createStreamMessage();
176        }
177    
178        public TemporaryQueue createTemporaryQueue() throws JMSException {
179            TemporaryQueue result;
180    
181            result = getInternalSession().createTemporaryQueue();
182    
183            // Notify all of the listeners of the created temporary Queue.
184            for (PooledSessionEventListener listener : this.tempDestEventListeners) {
185                listener.onTemporaryQueueCreate(result);
186            }
187    
188            return result;
189        }
190    
191        public TemporaryTopic createTemporaryTopic() throws JMSException {
192            TemporaryTopic result;
193    
194            result = getInternalSession().createTemporaryTopic();
195    
196            // Notify all of the listeners of the created temporary Topic.
197            for (PooledSessionEventListener listener : this.tempDestEventListeners) {
198                listener.onTemporaryTopicCreate(result);
199            }
200    
201            return result;
202        }
203    
204        public void unsubscribe(String s) throws JMSException {
205            getInternalSession().unsubscribe(s);
206        }
207    
208        public TextMessage createTextMessage() throws JMSException {
209            return getInternalSession().createTextMessage();
210        }
211    
212        public TextMessage createTextMessage(String s) throws JMSException {
213            return getInternalSession().createTextMessage(s);
214        }
215    
216        public Topic createTopic(String s) throws JMSException {
217            return getInternalSession().createTopic(s);
218        }
219    
220        public int getAcknowledgeMode() throws JMSException {
221            return getInternalSession().getAcknowledgeMode();
222        }
223    
224        public boolean getTransacted() throws JMSException {
225            return getInternalSession().getTransacted();
226        }
227    
228        public void recover() throws JMSException {
229            getInternalSession().recover();
230        }
231    
232        public void rollback() throws JMSException {
233            getInternalSession().rollback();
234        }
235    
236        public XAResource getXAResource() {
237            if (session == null) {
238                throw new IllegalStateException("Session is closed");
239            }
240            return session.getTransactionContext();
241        }
242    
243        public Session getSession() {
244            return this;
245        }
246    
247        public void run() {
248            if (session != null) {
249                session.run();
250            }
251        }
252    
253        // Consumer related methods
254        // -------------------------------------------------------------------------
255        public QueueBrowser createBrowser(Queue queue) throws JMSException {
256            return addQueueBrowser(getInternalSession().createBrowser(queue));
257        }
258    
259        public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
260            return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
261        }
262    
263        public MessageConsumer createConsumer(Destination destination) throws JMSException {
264            return addConsumer(getInternalSession().createConsumer(destination));
265        }
266    
267        public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
268            return addConsumer(getInternalSession().createConsumer(destination, selector));
269        }
270    
271        public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
272            return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
273        }
274    
275        public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
276            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
277        }
278    
279        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
280            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
281        }
282    
283        public MessageListener getMessageListener() throws JMSException {
284            return getInternalSession().getMessageListener();
285        }
286    
287        public void setMessageListener(MessageListener messageListener) throws JMSException {
288            getInternalSession().setMessageListener(messageListener);
289        }
290    
291        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
292            return addTopicSubscriber(getInternalSession().createSubscriber(topic));
293        }
294    
295        public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
296            return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
297        }
298    
299        public QueueReceiver createReceiver(Queue queue) throws JMSException {
300            return addQueueReceiver(getInternalSession().createReceiver(queue));
301        }
302    
303        public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
304            return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
305        }
306    
307        // Producer related methods
308        // -------------------------------------------------------------------------
309        public MessageProducer createProducer(Destination destination) throws JMSException {
310            return new PooledProducer(getMessageProducer(), destination);
311        }
312    
313        public QueueSender createSender(Queue queue) throws JMSException {
314            return new PooledQueueSender(getQueueSender(), queue);
315        }
316    
317        public TopicPublisher createPublisher(Topic topic) throws JMSException {
318            return new PooledTopicPublisher(getTopicPublisher(), topic);
319        }
320    
321        /**
322         * Callback invoked when the consumer is closed.
323         * <p/>
324         * This is used to keep track of an explicit closed consumer created by this
325         * session, by which we know do not need to keep track of the consumer, as
326         * its already closed.
327         *
328         * @param consumer
329         *            the consumer which is being closed
330         */
331        protected void onConsumerClose(MessageConsumer consumer) {
332            consumers.remove(consumer);
333        }
334    
335        public ActiveMQSession getInternalSession() throws AlreadyClosedException {
336            if (session == null) {
337                throw new AlreadyClosedException("The session has already been closed");
338            }
339            return session;
340        }
341    
342        public ActiveMQMessageProducer getMessageProducer() throws JMSException {
343            if (messageProducer == null) {
344                messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
345            }
346            return messageProducer;
347        }
348    
349        public ActiveMQQueueSender getQueueSender() throws JMSException {
350            if (queueSender == null) {
351                queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
352            }
353            return queueSender;
354        }
355    
356        public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
357            if (topicPublisher == null) {
358                topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
359            }
360            return topicPublisher;
361        }
362    
363        private QueueBrowser addQueueBrowser(QueueBrowser browser) {
364            browsers.add(browser);
365            return browser;
366        }
367    
368        private MessageConsumer addConsumer(MessageConsumer consumer) {
369            consumers.add(consumer);
370            // must wrap in PooledMessageConsumer to ensure the onConsumerClose
371            // method is invoked
372            // when the returned consumer is closed, to avoid memory leak in this
373            // session class
374            // in case many consumers is created
375            return new PooledMessageConsumer(this, consumer);
376        }
377    
378        private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
379            consumers.add(subscriber);
380            return subscriber;
381        }
382    
383        private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
384            consumers.add(receiver);
385            return receiver;
386        }
387    
388        public void setIsXa(boolean isXa) {
389            this.isXa = isXa;
390        }
391    
392        public String toString() {
393            return "PooledSession { " + session + " }";
394        }
395    }