001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.pool; 018 019import java.io.Serializable; 020import java.util.Iterator; 021import java.util.concurrent.CopyOnWriteArrayList; 022 023import javax.jms.BytesMessage; 024import javax.jms.Destination; 025import javax.jms.JMSException; 026import javax.jms.MapMessage; 027import javax.jms.Message; 028import javax.jms.MessageConsumer; 029import javax.jms.MessageListener; 030import javax.jms.MessageProducer; 031import javax.jms.ObjectMessage; 032import javax.jms.Queue; 033import javax.jms.QueueBrowser; 034import javax.jms.QueueReceiver; 035import javax.jms.QueueSender; 036import javax.jms.QueueSession; 037import javax.jms.Session; 038import javax.jms.StreamMessage; 039import javax.jms.TemporaryQueue; 040import javax.jms.TemporaryTopic; 041import javax.jms.TextMessage; 042import javax.jms.Topic; 043import javax.jms.TopicPublisher; 044import javax.jms.TopicSession; 045import javax.jms.TopicSubscriber; 046import javax.jms.XASession; 047import javax.transaction.xa.XAResource; 048 049import org.apache.activemq.ActiveMQMessageProducer; 050import org.apache.activemq.ActiveMQQueueSender; 051import org.apache.activemq.ActiveMQSession; 052import org.apache.activemq.ActiveMQTopicPublisher; 053import org.apache.activemq.AlreadyClosedException; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057public class PooledSession implements Session, TopicSession, QueueSession, XASession { 058 private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); 059 060 private ActiveMQSession session; 061 private SessionPool sessionPool; 062 private ActiveMQMessageProducer messageProducer; 063 private ActiveMQQueueSender queueSender; 064 private ActiveMQTopicPublisher topicPublisher; 065 private boolean transactional = true; 066 private boolean ignoreClose; 067 068 private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>(); 069 private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>(); 070 private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners = 071 new CopyOnWriteArrayList<PooledSessionEventListener>(); 072 private boolean isXa; 073 074 public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) { 075 this.session = aSession; 076 this.sessionPool = sessionPool; 077 this.transactional = session.isTransacted(); 078 } 079 080 public void addTempDestEventListener(PooledSessionEventListener listener) { 081 this.tempDestEventListeners.add(listener); 082 } 083 084 protected boolean isIgnoreClose() { 085 return ignoreClose; 086 } 087 088 protected void setIgnoreClose(boolean ignoreClose) { 089 this.ignoreClose = ignoreClose; 090 } 091 092 public void close() throws JMSException { 093 if (!ignoreClose) { 094 // TODO a cleaner way to reset?? 095 096 boolean invalidate = false; 097 try { 098 // lets reset the session 099 getInternalSession().setMessageListener(null); 100 101 // Close any consumers and browsers that may have been created. 102 for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 103 MessageConsumer consumer = iter.next(); 104 consumer.close(); 105 } 106 107 for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) { 108 QueueBrowser browser = iter.next(); 109 browser.close(); 110 } 111 112 if (transactional && !isXa) { 113 try { 114 getInternalSession().rollback(); 115 } catch (JMSException e) { 116 invalidate = true; 117 LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e); 118 } 119 } 120 } catch (JMSException ex) { 121 invalidate = true; 122 LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex); 123 } finally { 124 consumers.clear(); 125 browsers.clear(); 126 } 127 128 if (invalidate) { 129 // lets close the session and not put the session back into 130 // the pool 131 if (session != null) { 132 try { 133 session.close(); 134 } catch (JMSException e1) { 135 LOG.trace("Ignoring exception on close as discarding session: " + e1, e1); 136 } 137 session = null; 138 } 139 sessionPool.invalidateSession(this); 140 } else { 141 sessionPool.returnSession(this); 142 } 143 } 144 } 145 146 public void commit() throws JMSException { 147 getInternalSession().commit(); 148 } 149 150 public BytesMessage createBytesMessage() throws JMSException { 151 return getInternalSession().createBytesMessage(); 152 } 153 154 public MapMessage createMapMessage() throws JMSException { 155 return getInternalSession().createMapMessage(); 156 } 157 158 public Message createMessage() throws JMSException { 159 return getInternalSession().createMessage(); 160 } 161 162 public ObjectMessage createObjectMessage() throws JMSException { 163 return getInternalSession().createObjectMessage(); 164 } 165 166 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { 167 return getInternalSession().createObjectMessage(serializable); 168 } 169 170 public Queue createQueue(String s) throws JMSException { 171 return getInternalSession().createQueue(s); 172 } 173 174 public StreamMessage createStreamMessage() throws JMSException { 175 return getInternalSession().createStreamMessage(); 176 } 177 178 public TemporaryQueue createTemporaryQueue() throws JMSException { 179 TemporaryQueue result; 180 181 result = getInternalSession().createTemporaryQueue(); 182 183 // Notify all of the listeners of the created temporary Queue. 184 for (PooledSessionEventListener listener : this.tempDestEventListeners) { 185 listener.onTemporaryQueueCreate(result); 186 } 187 188 return result; 189 } 190 191 public TemporaryTopic createTemporaryTopic() throws JMSException { 192 TemporaryTopic result; 193 194 result = getInternalSession().createTemporaryTopic(); 195 196 // Notify all of the listeners of the created temporary Topic. 197 for (PooledSessionEventListener listener : this.tempDestEventListeners) { 198 listener.onTemporaryTopicCreate(result); 199 } 200 201 return result; 202 } 203 204 public void unsubscribe(String s) throws JMSException { 205 getInternalSession().unsubscribe(s); 206 } 207 208 public TextMessage createTextMessage() throws JMSException { 209 return getInternalSession().createTextMessage(); 210 } 211 212 public TextMessage createTextMessage(String s) throws JMSException { 213 return getInternalSession().createTextMessage(s); 214 } 215 216 public Topic createTopic(String s) throws JMSException { 217 return getInternalSession().createTopic(s); 218 } 219 220 public int getAcknowledgeMode() throws JMSException { 221 return getInternalSession().getAcknowledgeMode(); 222 } 223 224 public boolean getTransacted() throws JMSException { 225 return getInternalSession().getTransacted(); 226 } 227 228 public void recover() throws JMSException { 229 getInternalSession().recover(); 230 } 231 232 public void rollback() throws JMSException { 233 getInternalSession().rollback(); 234 } 235 236 public XAResource getXAResource() { 237 if (session == null) { 238 throw new IllegalStateException("Session is closed"); 239 } 240 return session.getTransactionContext(); 241 } 242 243 public Session getSession() { 244 return this; 245 } 246 247 public void run() { 248 if (session != null) { 249 session.run(); 250 } 251 } 252 253 // Consumer related methods 254 // ------------------------------------------------------------------------- 255 public QueueBrowser createBrowser(Queue queue) throws JMSException { 256 return addQueueBrowser(getInternalSession().createBrowser(queue)); 257 } 258 259 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { 260 return addQueueBrowser(getInternalSession().createBrowser(queue, selector)); 261 } 262 263 public MessageConsumer createConsumer(Destination destination) throws JMSException { 264 return addConsumer(getInternalSession().createConsumer(destination)); 265 } 266 267 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { 268 return addConsumer(getInternalSession().createConsumer(destination, selector)); 269 } 270 271 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { 272 return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal)); 273 } 274 275 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { 276 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector)); 277 } 278 279 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { 280 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal)); 281 } 282 283 public MessageListener getMessageListener() throws JMSException { 284 return getInternalSession().getMessageListener(); 285 } 286 287 public void setMessageListener(MessageListener messageListener) throws JMSException { 288 getInternalSession().setMessageListener(messageListener); 289 } 290 291 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 292 return addTopicSubscriber(getInternalSession().createSubscriber(topic)); 293 } 294 295 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { 296 return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local)); 297 } 298 299 public QueueReceiver createReceiver(Queue queue) throws JMSException { 300 return addQueueReceiver(getInternalSession().createReceiver(queue)); 301 } 302 303 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { 304 return addQueueReceiver(getInternalSession().createReceiver(queue, selector)); 305 } 306 307 // Producer related methods 308 // ------------------------------------------------------------------------- 309 public MessageProducer createProducer(Destination destination) throws JMSException { 310 return new PooledProducer(getMessageProducer(), destination); 311 } 312 313 public QueueSender createSender(Queue queue) throws JMSException { 314 return new PooledQueueSender(getQueueSender(), queue); 315 } 316 317 public TopicPublisher createPublisher(Topic topic) throws JMSException { 318 return new PooledTopicPublisher(getTopicPublisher(), topic); 319 } 320 321 /** 322 * Callback invoked when the consumer is closed. 323 * <p/> 324 * This is used to keep track of an explicit closed consumer created by this 325 * session, by which we know do not need to keep track of the consumer, as 326 * its already closed. 327 * 328 * @param consumer 329 * the consumer which is being closed 330 */ 331 protected void onConsumerClose(MessageConsumer consumer) { 332 consumers.remove(consumer); 333 } 334 335 public ActiveMQSession getInternalSession() throws AlreadyClosedException { 336 if (session == null) { 337 throw new AlreadyClosedException("The session has already been closed"); 338 } 339 return session; 340 } 341 342 public ActiveMQMessageProducer getMessageProducer() throws JMSException { 343 if (messageProducer == null) { 344 messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null); 345 } 346 return messageProducer; 347 } 348 349 public ActiveMQQueueSender getQueueSender() throws JMSException { 350 if (queueSender == null) { 351 queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null); 352 } 353 return queueSender; 354 } 355 356 public ActiveMQTopicPublisher getTopicPublisher() throws JMSException { 357 if (topicPublisher == null) { 358 topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null); 359 } 360 return topicPublisher; 361 } 362 363 private QueueBrowser addQueueBrowser(QueueBrowser browser) { 364 browsers.add(browser); 365 return browser; 366 } 367 368 private MessageConsumer addConsumer(MessageConsumer consumer) { 369 consumers.add(consumer); 370 // must wrap in PooledMessageConsumer to ensure the onConsumerClose 371 // method is invoked 372 // when the returned consumer is closed, to avoid memory leak in this 373 // session class 374 // in case many consumers is created 375 return new PooledMessageConsumer(this, consumer); 376 } 377 378 private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) { 379 consumers.add(subscriber); 380 return subscriber; 381 } 382 383 private QueueReceiver addQueueReceiver(QueueReceiver receiver) { 384 consumers.add(receiver); 385 return receiver; 386 } 387 388 public void setIsXa(boolean isXa) { 389 this.isXa = isXa; 390 } 391 392 public String toString() { 393 return "PooledSession { " + session + " }"; 394 } 395}