001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.ra; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.concurrent.atomic.AtomicBoolean; 023 import java.util.concurrent.locks.Lock; 024 import java.util.concurrent.locks.ReentrantLock; 025 026 import javax.jms.JMSException; 027 import javax.jms.ServerSession; 028 import javax.jms.ServerSessionPool; 029 import javax.jms.Session; 030 import javax.resource.spi.UnavailableException; 031 import javax.resource.spi.endpoint.MessageEndpoint; 032 033 import org.apache.activemq.ActiveMQQueueSession; 034 import org.apache.activemq.ActiveMQSession; 035 import org.apache.activemq.ActiveMQTopicSession; 036 import org.apache.activemq.command.MessageDispatch; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * $Date$ 042 */ 043 public class ServerSessionPoolImpl implements ServerSessionPool { 044 045 private static final Logger LOG = LoggerFactory.getLogger(ServerSessionPoolImpl.class); 046 047 private final ActiveMQEndpointWorker activeMQAsfEndpointWorker; 048 private final int maxSessions; 049 050 private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>(); 051 private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>(); 052 private final Lock sessionLock = new ReentrantLock(); 053 private final AtomicBoolean closing = new AtomicBoolean(false); 054 055 public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) { 056 this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker; 057 this.maxSessions = maxSessions; 058 } 059 060 private ServerSessionImpl createServerSessionImpl() throws JMSException { 061 MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); 062 int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); 063 final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge); 064 MessageEndpoint endpoint; 065 try { 066 int batchSize = 0; 067 if (activationSpec.getEnableBatchBooleanValue()) { 068 batchSize = activationSpec.getMaxMessagesPerBatchIntValue(); 069 } 070 if (activationSpec.isUseRAManagedTransactionEnabled()) { 071 // The RA will manage the transaction commit. 072 endpoint = createEndpoint(null); 073 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); 074 } else { 075 // Give the container an object to manage to transaction with. 076 endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); 077 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); 078 } 079 } catch (UnavailableException e) { 080 // The container could be limiting us on the number of endpoints 081 // that are being created. 082 if (LOG.isDebugEnabled()) { 083 LOG.debug("Could not create an endpoint.", e); 084 } 085 session.close(); 086 return null; 087 } 088 } 089 090 private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException { 091 MessageEndpoint endpoint; 092 endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy); 093 MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint); 094 return endpointProxy; 095 } 096 097 /** 098 */ 099 public ServerSession getServerSession() throws JMSException { 100 if (LOG.isDebugEnabled()) { 101 LOG.debug("ServerSession requested."); 102 } 103 if (closing.get()) { 104 throw new JMSException("Session Pool Shutting Down."); 105 } 106 ServerSessionImpl ss = null; 107 sessionLock.lock(); 108 try { 109 ss = getExistingServerSession(false); 110 } finally { 111 sessionLock.unlock(); 112 } 113 if (ss != null) { 114 return ss; 115 } 116 ss = createServerSessionImpl(); 117 sessionLock.lock(); 118 try { 119 // We may not be able to create a session due to the container 120 // restricting us. 121 if (ss == null) { 122 if (activeSessions.isEmpty() && idleSessions.isEmpty()) { 123 throw new JMSException("Endpoint factory did not allow creation of any endpoints."); 124 } 125 126 ss = getExistingServerSession(true); 127 } else { 128 activeSessions.add(ss); 129 } 130 } finally { 131 sessionLock.unlock(); 132 } 133 if (LOG.isDebugEnabled()) { 134 LOG.debug("Created a new session: " + ss); 135 } 136 return ss; 137 138 } 139 140 /** 141 * Must be called with sessionLock held. 142 * Returns an idle session if one exists or an active session if no more 143 * sessions can be created. Sessions can not be created if force is true 144 * or activeSessions >= maxSessions. 145 * @param force do not check activeSessions >= maxSessions, return an active connection anyway. 146 * @return an already existing session. 147 */ 148 private ServerSessionImpl getExistingServerSession(boolean force) { 149 ServerSessionImpl ss = null; 150 if (idleSessions.size() > 0) { 151 ss = idleSessions.remove(idleSessions.size() - 1); 152 } 153 if (ss != null) { 154 activeSessions.add(ss); 155 if (LOG.isDebugEnabled()) { 156 LOG.debug("Using idle session: " + ss); 157 } 158 } else if (force || activeSessions.size() >= maxSessions) { 159 // If we are at the upper limit 160 // then reuse the already created sessions.. 161 // This is going to queue up messages into a session for 162 // processing. 163 ss = getExistingActiveServerSession(); 164 } 165 return ss; 166 } 167 168 /** 169 * Must be called with sessionLock held. 170 * Returns the first session from activeSessions, shifting it to last. 171 * @return session 172 */ 173 private ServerSessionImpl getExistingActiveServerSession() { 174 ServerSessionImpl ss = null; 175 if (!activeSessions.isEmpty()) { 176 if (activeSessions.size() > 1) { 177 // round robin 178 ss = activeSessions.remove(0); 179 activeSessions.add(ss); 180 } else { 181 ss = activeSessions.get(0); 182 } 183 } 184 if (LOG.isDebugEnabled()) { 185 LOG.debug("Reusing an active session: " + ss); 186 } 187 return ss; 188 } 189 190 public void returnToPool(ServerSessionImpl ss) { 191 sessionLock.lock(); 192 activeSessions.remove(ss); 193 try { 194 // make sure we only return non-stale sessions to the pool 195 if ( ss.isStale() ) { 196 if ( LOG.isDebugEnabled() ) { 197 LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss); 198 } 199 ss.close(); 200 } else { 201 if (LOG.isDebugEnabled()) { 202 LOG.debug("ServerSession returned to pool: " + ss); 203 } 204 idleSessions.add(ss); 205 } 206 } finally { 207 sessionLock.unlock(); 208 } 209 synchronized (closing) { 210 closing.notify(); 211 } 212 } 213 214 public void removeFromPool(ServerSessionImpl ss) { 215 sessionLock.lock(); 216 try { 217 activeSessions.remove(ss); 218 } finally { 219 sessionLock.unlock(); 220 } 221 try { 222 ActiveMQSession session = (ActiveMQSession)ss.getSession(); 223 List l = session.getUnconsumedMessages(); 224 for (Iterator i = l.iterator(); i.hasNext();) { 225 dispatchToSession((MessageDispatch)i.next()); 226 } 227 } catch (Throwable t) { 228 LOG.error("Error redispatching unconsumed messages from stale session", t); 229 } 230 ss.close(); 231 synchronized (closing) { 232 closing.notify(); 233 } 234 } 235 236 /** 237 * @param messageDispatch 238 * the message to dispatch 239 * @throws JMSException 240 */ 241 private void dispatchToSession(MessageDispatch messageDispatch) 242 throws JMSException { 243 244 ServerSession serverSession = getServerSession(); 245 Session s = serverSession.getSession(); 246 ActiveMQSession session = null; 247 if (s instanceof ActiveMQSession) { 248 session = (ActiveMQSession) s; 249 } else if (s instanceof ActiveMQQueueSession) { 250 session = (ActiveMQSession) s; 251 } else if (s instanceof ActiveMQTopicSession) { 252 session = (ActiveMQSession) s; 253 } else { 254 activeMQAsfEndpointWorker.getConnection() 255 .onAsyncException(new JMSException( 256 "Session pool provided an invalid session type: " 257 + s.getClass())); 258 } 259 session.dispatch(messageDispatch); 260 serverSession.start(); 261 } 262 263 public void close() { 264 closing.set(true); 265 int activeCount = closeIdleSessions(); 266 // we may have to wait erroneously 250ms if an 267 // active session is removed during our wait and we 268 // are not notified 269 while (activeCount > 0) { 270 if (LOG.isDebugEnabled()) { 271 LOG.debug("Active Sessions = " + activeCount); 272 } 273 try { 274 synchronized (closing) { 275 closing.wait(250); 276 } 277 } catch (InterruptedException e) { 278 Thread.currentThread().interrupt(); 279 return; 280 } 281 activeCount = closeIdleSessions(); 282 } 283 } 284 285 286 protected int closeIdleSessions() { 287 sessionLock.lock(); 288 try { 289 for (ServerSessionImpl ss : idleSessions) { 290 ss.close(); 291 } 292 idleSessions.clear(); 293 return activeSessions.size(); 294 } finally { 295 sessionLock.unlock(); 296 } 297 } 298 299 /** 300 * @return Returns the closing. 301 */ 302 public boolean isClosing() { 303 return closing.get(); 304 } 305 306 /** 307 * @param closing The closing to set. 308 */ 309 public void setClosing(boolean closing) { 310 this.closing.set(closing); 311 } 312 313 }