001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.util.ArrayList; 020import java.util.Iterator; 021import java.util.List; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.locks.Lock; 024import java.util.concurrent.locks.ReentrantLock; 025 026import javax.jms.JMSException; 027import javax.jms.ServerSession; 028import javax.jms.ServerSessionPool; 029import javax.jms.Session; 030import javax.resource.spi.UnavailableException; 031import javax.resource.spi.endpoint.MessageEndpoint; 032 033import org.apache.activemq.ActiveMQConnection; 034import org.apache.activemq.ActiveMQSession; 035import org.apache.activemq.command.MessageDispatch; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * $Date$ 041 */ 042public class ServerSessionPoolImpl implements ServerSessionPool { 043 044 private static final Logger LOG = LoggerFactory.getLogger(ServerSessionPoolImpl.class); 045 046 private final ActiveMQEndpointWorker activeMQAsfEndpointWorker; 047 private final int maxSessions; 048 049 private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>(); 050 private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>(); 051 private final Lock sessionLock = new ReentrantLock(); 052 private final AtomicBoolean closing = new AtomicBoolean(false); 053 054 public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) { 055 this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker; 056 this.maxSessions = maxSessions; 057 } 058 059 private ServerSessionImpl createServerSessionImpl() throws JMSException { 060 MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); 061 int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); 062 final ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); 063 if (connection == null) { 064 // redispatch of pending prefetched messages after disconnect can have a null connection 065 return null; 066 } 067 final ActiveMQSession session = (ActiveMQSession)connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge); 068 MessageEndpoint endpoint; 069 try { 070 int batchSize = 0; 071 if (activationSpec.getEnableBatchBooleanValue()) { 072 batchSize = activationSpec.getMaxMessagesPerBatchIntValue(); 073 } 074 if (activationSpec.isUseRAManagedTransactionEnabled()) { 075 // The RA will manage the transaction commit. 076 endpoint = createEndpoint(null); 077 return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); 078 } else { 079 // Give the container an object to manage to transaction with. 080 endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); 081 return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); 082 } 083 } catch (UnavailableException e) { 084 // The container could be limiting us on the number of endpoints 085 // that are being created. 086 if (LOG.isDebugEnabled()) { 087 LOG.debug("Could not create an endpoint.", e); 088 } 089 session.close(); 090 return null; 091 } 092 } 093 094 private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException { 095 MessageEndpoint endpoint; 096 endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy); 097 MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint); 098 return endpointProxy; 099 } 100 101 /** 102 */ 103 @Override 104 public ServerSession getServerSession() throws JMSException { 105 if (LOG.isDebugEnabled()) { 106 LOG.debug("ServerSession requested."); 107 } 108 if (closing.get()) { 109 throw new JMSException("Session Pool Shutting Down."); 110 } 111 ServerSessionImpl ss = null; 112 sessionLock.lock(); 113 try { 114 ss = getExistingServerSession(false); 115 } finally { 116 sessionLock.unlock(); 117 } 118 if (ss != null) { 119 return ss; 120 } 121 ss = createServerSessionImpl(); 122 sessionLock.lock(); 123 try { 124 // We may not be able to create a session due to the container 125 // restricting us. 126 if (ss == null) { 127 if (activeSessions.isEmpty() && idleSessions.isEmpty()) { 128 throw new JMSException("Endpoint factory did not allow creation of any endpoints."); 129 } 130 131 ss = getExistingServerSession(true); 132 } else { 133 activeSessions.add(ss); 134 } 135 } finally { 136 sessionLock.unlock(); 137 } 138 if (LOG.isDebugEnabled()) { 139 LOG.debug("Created a new session: " + ss); 140 } 141 return ss; 142 143 } 144 145 /** 146 * Must be called with sessionLock held. 147 * Returns an idle session if one exists or an active session if no more 148 * sessions can be created. Sessions can not be created if force is true 149 * or activeSessions >= maxSessions. 150 * @param force do not check activeSessions >= maxSessions, return an active connection anyway. 151 * @return an already existing session. 152 */ 153 private ServerSessionImpl getExistingServerSession(boolean force) { 154 ServerSessionImpl ss = null; 155 if (idleSessions.size() > 0) { 156 ss = idleSessions.remove(idleSessions.size() - 1); 157 } 158 if (ss != null) { 159 activeSessions.add(ss); 160 if (LOG.isDebugEnabled()) { 161 LOG.debug("Using idle session: " + ss); 162 } 163 } else if (force || activeSessions.size() >= maxSessions) { 164 // If we are at the upper limit 165 // then reuse the already created sessions.. 166 // This is going to queue up messages into a session for 167 // processing. 168 ss = getExistingActiveServerSession(); 169 } 170 return ss; 171 } 172 173 /** 174 * Must be called with sessionLock held. 175 * Returns the first session from activeSessions, shifting it to last. 176 * @return session 177 */ 178 private ServerSessionImpl getExistingActiveServerSession() { 179 ServerSessionImpl ss = null; 180 if (!activeSessions.isEmpty()) { 181 if (activeSessions.size() > 1) { 182 // round robin 183 ss = activeSessions.remove(0); 184 activeSessions.add(ss); 185 } else { 186 ss = activeSessions.get(0); 187 } 188 } 189 if (LOG.isDebugEnabled()) { 190 LOG.debug("Reusing an active session: " + ss); 191 } 192 return ss; 193 } 194 195 public void returnToPool(ServerSessionImpl ss) { 196 sessionLock.lock(); 197 activeSessions.remove(ss); 198 try { 199 // make sure we only return non-stale sessions to the pool 200 if ( ss.isStale() ) { 201 if ( LOG.isDebugEnabled() ) { 202 LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss); 203 } 204 ss.close(); 205 } else { 206 if (LOG.isDebugEnabled()) { 207 LOG.debug("ServerSession returned to pool: " + ss); 208 } 209 idleSessions.add(ss); 210 } 211 } finally { 212 sessionLock.unlock(); 213 } 214 synchronized (closing) { 215 closing.notify(); 216 } 217 } 218 219 public void removeFromPool(ServerSessionImpl ss) { 220 sessionLock.lock(); 221 try { 222 activeSessions.remove(ss); 223 } finally { 224 sessionLock.unlock(); 225 } 226 try { 227 ActiveMQSession session = (ActiveMQSession)ss.getSession(); 228 List<MessageDispatch> l = session.getUnconsumedMessages(); 229 if (!l.isEmpty()) { 230 ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); 231 if (connection != null) { 232 for (Iterator<MessageDispatch> i = l.iterator(); i.hasNext();) { 233 MessageDispatch md = i.next(); 234 if (connection.hasDispatcher(md.getConsumerId())) { 235 dispatchToSession(md); 236 LOG.trace("on remove of {} redispatch of {}", session, md); 237 } else { 238 LOG.trace("on remove not redispatching {}, dispatcher no longer present on {}", md, session.getConnection()); 239 } 240 } 241 } else { 242 LOG.trace("on remove of {} not redispatching while disconnected", session); 243 } 244 } 245 } catch (Throwable t) { 246 LOG.error("Error redispatching unconsumed messages from stale server session {}", ss, t); 247 } 248 ss.close(); 249 synchronized (closing) { 250 closing.notify(); 251 } 252 } 253 254 /** 255 * @param messageDispatch 256 * the message to dispatch 257 * @throws JMSException 258 */ 259 private void dispatchToSession(MessageDispatch messageDispatch) 260 throws JMSException { 261 262 ServerSession serverSession = getServerSession(); 263 Session s = serverSession.getSession(); 264 ActiveMQSession session = null; 265 if (s instanceof ActiveMQSession) { 266 session = (ActiveMQSession) s; 267 } else { 268 activeMQAsfEndpointWorker.getConnection() 269 .onAsyncException(new JMSException( 270 "Session pool provided an invalid session type: " 271 + s.getClass())); 272 } 273 session.dispatch(messageDispatch); 274 serverSession.start(); 275 } 276 277 public void close() { 278 closing.set(true); 279 int activeCount = closeSessions(); 280 // we may have to wait erroneously 250ms if an 281 // active session is removed during our wait and we 282 // are not notified 283 while (activeCount > 0) { 284 if (LOG.isDebugEnabled()) { 285 LOG.debug("Active Sessions = " + activeCount); 286 } 287 try { 288 synchronized (closing) { 289 closing.wait(250); 290 } 291 } catch (InterruptedException e) { 292 Thread.currentThread().interrupt(); 293 return; 294 } 295 activeCount = closeSessions(); 296 } 297 } 298 299 300 protected int closeSessions() { 301 sessionLock.lock(); 302 try { 303 for (ServerSessionImpl ss : activeSessions) { 304 try { 305 ActiveMQSession session = (ActiveMQSession) ss.getSession(); 306 if (!session.isClosed()) { 307 session.close(); 308 } 309 } catch (JMSException ignored) { 310 if (LOG.isDebugEnabled()) { 311 LOG.debug("Failed to close active running server session {}, reason:{}", ss, ignored.toString(), ignored); 312 } 313 } 314 } 315 for (ServerSessionImpl ss : idleSessions) { 316 ss.close(); 317 } 318 idleSessions.clear(); 319 return activeSessions.size(); 320 } finally { 321 sessionLock.unlock(); 322 } 323 } 324 325 /** 326 * @return Returns the closing. 327 */ 328 public boolean isClosing() { 329 return closing.get(); 330 } 331 332 /** 333 * @param closing The closing to set. 334 */ 335 public void setClosing(boolean closing) { 336 this.closing.set(closing); 337 } 338 339}